logo


Bucketing, multiplexing and combining in Hadoop - part 2

In the first post of this series, we looked at how the MultipleOutputFormat class could be used in a task to write to multiple output files. This approach had a few shortcomings which included that it couldn’t be used in the map-side of a job that used reducers, and it only worked with the old mapred API.

In this post we’ll look at the MultipleOutputs class, which offers an alternative to the MultipleOutputFormat and also addresses its shortcomings.

MultipleOutputs

Using the MultipleOutputs class is a more modern Hadoop way of writing to multiple outputs. It has both mapred and mapreduce API implementations, and allows you to work with multiple OutputFormat classes in your job. Its approach is different from MultipleOutputFormat - rather than defining its own OutputFormat it merely provides some helper methods which need to be called in your driver code, as well as in your mapper/reducer.

The two MultipleOutputs classes in mapred and mapreduce are close in functionality, the main difference being support of multi-named outputs, which we’ll examine later in this post.

Let’s look at how we would achieve the same result as we did with MultipleOutputFormat. If you recall from the previous post in this series, we were working with some sample data from a fruit market, where the data points were the location of each market, and the fruit that was sold:

cupertino   apple
sunnyvale   banana
cupertino   pear

Our goal is to partition the outputs by city, so there would be city-specific files. First up is our driver code, where we need to tell MultipleOutputs the named outputs, and their related OutputFormat classes. For simplicity we’ve chosen TextOutputFormat for both, but you can use different OutputFormats for each named output.

MultipleOutputs.addNamedOutput(jobConf, "cupertino", TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.addNamedOutput(jobConf, "sunnyvale", TextOutputFormat.class, Text.class, Text.class);

The named outputs “cupertino” and “sunnyvale” are used for two purposes in MultipleOutputs - first as logical keys that you use in your mapper and reducer to lookup their associated OutputCollector classes. And second, they are used as the output filenames in HDFS.

We can’t use an identity reducer in this example as we have to use the MultipleOutputs class to redirect our output to the appropriate file, so let’s go ahead and see what the reducer will look like.

class Reduce extends MapReduceBase
        implements Reducer<Text, Text, Text, Text> {

    private MultipleOutputs output;

    @Override
    public void configure(final JobConf job) {
        super.configure(job);
        output = new MultipleOutputs(job);
    }

    @Override
    public void reduce(final Text key, final Iterator<Text> values,
                       final OutputCollector<Text, Text> collector, final Reporter reporter)
            throws IOException {
        while (values.hasNext()) {
            output.getCollector(key.toString(), reporter).collect(key, values.next());
        }
    }
}

As you can you’re not using the OutputCollector supplied to us in the reduce method. Instead you create a MultipleOutputs instance in the configure method which is used in the reduce method. For each reducer input record, we use the key to lookup the OutputCollector and then emit each key/value pair to that collector. Remember that when calling getCollector you must use one of the named outputs that you defined in the job driver. In our case our input keys are either “cupertino” or “sunnyvale”, and they map directly to the named outputs we defined in our driver, so we’re in good shape.

Let’s examine the contents of the job output directory after running the job.

$ hadooop -lsr /output
/output/cupertino-r-00000
/output/sunnyvale-r-00000
/output/part-00000
/output/part-00001

This output highlights one of the key differences between MultipleOutputs and MultipleOutputFormat. When using MultipleOutputs you can output to the reducer’s regular OutputCollector, or to the OutputCollector for a named output, or to both, which is why you see part-nnnnn files.

But wait! One problem with MultipleOutputs is that you needed to pre-define the partitions “cupertino” and “sunnyvale” ahead of time in our driver. What if we didn’t know the partitions ahead of time?

Dynamic files with the MultipleOutputs class

Up until now MultipleOutputs has treated us well - it supported both the old and new MapReduce API’s, and can also support multiple OutputFormat classes within the same reducer. But as we saw we essentially had to pre-define the output files in our driver code. So how do we handle cases where we want this to be dynamically performed in the reducer?

Luckily the MultipleOutputs has a notion of “multi named” output. In the driver method instead of enumerating all the output files we want, we’ll simply just add a single logical name called “fruit”, using addMultiNamedOutput instead of addNamedOutput:

MultipleOutputs.addMultiNamedOutput(jobConf, "fruit", TextOutputFormat.class, Text.class, Text.class);

In our reducer we always specify “fruit” as the name, but we use a different getCollector method which takes an additional field, which is used to determine the filename which is used for output:

output.getCollector("fruit", key.toString(), reporter).collect(key, values.next());

Let’s do another HDFS listing:

$ hadooop -lsr /output
/output/fruit_cupertino-r-00000
/output/fruit_sunnyvale-r-00000
/output/part-00000
/output/part-00001

Hurray! We now have multiple output files that are dynamically created based on the reducer output key, just like we did with MultipleOutputFormat.

Now unfortunately the multi-named output is only supported by the old mapred API, whereas with the new mapreduce API you are forced to define your partitions in your job driver.

Conclusion

There are plenty of things to like about MultipleOutputs, namely its support for both “old” and “new” MapReduce API’s, and its support for multiple OutputFormat classes. Its only real downside is that multi named outputs are only supported in the old mapred API, so those looking for dynamic partitions in the new mapreduce API are not supported by either MultipleOutputs or MultipleOutputFormat described in part 1.

About the author

Hadoop in Practice, Second Edition

Alex Holmes works on tough big-data problems. He is a software engineer, author, speaker, and blogger specializing in large-scale Hadoop projects. He is the author of Hadoop in Practice, a book published by Manning Publications. He has presented multiple times at JavaOne, and is a JavaOne Rock Star.

If you want to see what Alex is up to you can check out his work on GitHub, or follow him on Twitter or Google+.

comments powered by Disqus

RECENT BLOG POSTS

Full post archive