Secondary sorting with Avro

In the last Avro sorting post you saw how sorting Avro records works in MapReduce, and how one can ignore fields in Avro records for partitioning, sorting and grouping. In the process you discovered that ignored fields are limited by being immutable (since they can only be defined once for a schema), which means you can’t vary what fields are ignored for partitioning, sorting or grouping, which is key for secondary sort.

If you wish to use secondary sort with Avro, one option would be to emit a custom Writable as the map output key, and emit an Avro record as the map output value. With this approach you’d write a custom partitioner, and sorting/grouping implementation.

This post looks at another option, where with some hacking you can actually have secondary sort with Avro map output keys.

True secondary sort with an AvroKey

Avro has some utility classes for sorting and hashing (required for the partitioner), but the code is locked-down with private methods. The hacking therefore requires lifting certain parts of Avro’s code, and writing some helper functions to easily allow jobs fine-grained control over what fields are used for secondary sort.

Let’s take an example with the same Avro schema we used in the last post:

{"type": "record", "name": "com.alexholmes.avro.WeatherNoIgnore",
 "doc": "A weather reading.",
 "fields": [
     {"name": "station", "type": "string"},
     {"name": "time", "type": "long"},
     {"name": "temp", "type": "int"},
     {"name": "counter", "type": "int", "default": 0}

For secondary sort you may imagine a scenario where you want to partition output records by the station, sort records using the station, time and temp fields, and finally group by the station and time fields. The code to do this is as follows GitHub source:

    .addPartitionField(WeatherNoIgnore.SCHEMA$, "station", true)
    .addSortField(WeatherNoIgnore.SCHEMA$, "station", true)
    .addSortField(WeatherNoIgnore.SCHEMA$, "time", true)
    .addSortField(WeatherNoIgnore.SCHEMA$, "temp", true)
    .addGroupField(WeatherNoIgnore.SCHEMA$, "station", true)
    .addGroupField(WeatherNoIgnore.SCHEMA$, "time", true)

The ordering of the addXXX calls is significant, as it determines the order in which fields are used for sorting and grouping. The last argument in the addXXX methods is a boolean which indicates whether the ordering is ascending.

Most of the heavy lifting is performed in the AvroSort and AvroDataHack - the latter, as its name indicates, is where some hacking took place to get things working.

The only caveat with the current implementation is that Avro union types aren’t currently supported - I’ll look into that in the near future.

About the author

Hadoop in Practice, Second Edition

Alex Holmes works on tough big-data problems. He is a software engineer, author, speaker, and blogger specializing in large-scale Hadoop projects. He is the author of Hadoop in Practice, a book published by Manning Publications. He has presented multiple times at JavaOne, and is a JavaOne Rock Star.

If you want to see what Alex is up to you can check out his work on GitHub, or follow him on Twitter or Google+.

comments powered by Disqus


Full post archive