Secondary sorting with Avro
In the last Avro sorting post you saw how sorting Avro records works in MapReduce, and how one can ignore fields in Avro records for partitioning, sorting and grouping. In the process you discovered that ignored fields are limited by being immutable (since they can only be defined once for a schema), which means you can’t vary what fields are ignored for partitioning, sorting or grouping, which is key for secondary sort.
If you wish to use secondary sort with Avro, one option would be to emit a custom Writable as the map output key, and emit an Avro record as the map output value. With this approach you’d write a custom partitioner, and sorting/grouping implementation.
This post looks at another option, where with some hacking you can actually have secondary sort with Avro map output keys.
True secondary sort with an AvroKey
Avro has some utility classes for sorting and hashing (required for the partitioner), but the code is locked-down with private methods. The hacking therefore requires lifting certain parts of Avro’s code, and writing some helper functions to easily allow jobs fine-grained control over what fields are used for secondary sort.
Let’s take an example with the same Avro schema we used in the last post:
{"type": "record", "name": "com.alexholmes.avro.WeatherNoIgnore",
"doc": "A weather reading.",
"fields": [
{"name": "station", "type": "string"},
{"name": "time", "type": "long"},
{"name": "temp", "type": "int"},
{"name": "counter", "type": "int", "default": 0}
]
}
For secondary sort you may imagine a scenario where you want to partition output records by the station, sort records using the station, time and temp fields, and finally group by the station and time fields. The code to do this is as follows GitHub source:
AvroSort.builder()
.setJob(job)
.addPartitionField(WeatherNoIgnore.SCHEMA$, "station", true)
.addSortField(WeatherNoIgnore.SCHEMA$, "station", true)
.addSortField(WeatherNoIgnore.SCHEMA$, "time", true)
.addSortField(WeatherNoIgnore.SCHEMA$, "temp", true)
.addGroupField(WeatherNoIgnore.SCHEMA$, "station", true)
.addGroupField(WeatherNoIgnore.SCHEMA$, "time", true)
.configure();
The ordering of the addXXX
calls is significant, as it determines the order in which fields are used for sorting and grouping. The last argument in the addXXX
methods is a boolean which indicates whether the ordering is ascending.
Most of the heavy lifting is performed in the AvroSort and AvroDataHack - the latter, as its name indicates, is where some hacking took place to get things working.
The only caveat with the current implementation is that Avro union types aren’t currently supported - I’ll look into that in the near future.
About the author
Alex Holmes works on tough big-data problems. He is a software engineer, author, speaker, and blogger specializing in large-scale Hadoop projects. He is the author of Hadoop in Practice, a book published by Manning Publications. He has presented multiple times at JavaOne, and is a JavaOne Rock Star.
If you want to see what Alex is up to you can check out his work on GitHub, or follow him on Twitter or Google+.
RECENT BLOG POSTS
-
Configuring memory for MapReduce running on YARN
This post examines the various memory configuration settings for your MapReduce job.
-
Big data anti-patterns presentation
Details on the presentation I have at JavaOne in 2015 on big data antipatterns.
-
Understanding how Parquet integrates with Avro, Thrift and Protocol Buffers
Parquet offers integration with a number of object models, and this post shows how Parquet supports various object models.
-
Using Oozie 4.4.0 with Hadoop 2.2
Patching Oozie's build so that you can create a package targetting Hadoop 2.2.0.
-
Hadoop in Practice, Second Edition
A sneak peek at what's coming in the second edition of my book.