Hi Scott, If I have a map-only job, would I want only one mapper running to pull all the records from the source input files and stream/append them to the target avro file? Would that be no different (or more efficient) than doing "hadoop dfs -cat file1 file2 file3" and piping the output to append to a "hadoop dfs -put combinedFile"? In that case, my only question is how would I combine the avro files into a new file without deserializing them?
Thanks, Frank Grimes On 2012-01-12, at 1:14 PM, Scott Carey wrote: > > > On 1/12/12 8:27 AM, "Frank Grimes" <frankgrime...@gmail.com> wrote: > >> Hi All, >> >> We have Avro data files in HDFS which are compressed using the Deflate codec. >> We have written an M/R job using the Avro Mapred API to combine those files. >> >> It seems to be working fine, however when we run it we notice that the >> temporary work area (spills, etc) seem to be uncompressed. >> We're thinking we might see a speedup due to reduced I/O if the temporary >> files are compressed as well. > > > If all you want to do is combine the files, there is no reason to deserialize > and reserialize the contents, and a map-only job could suffice. > If this is the case, you might want to consider one of two optoins: > 1. Use a map only job, with a combined file input. This will produce one > file per mapper and no intermediate data. > 2. Use the Avro data file API to append to a file. I am not sure if this > will work with HDFS without some modifications to Avro, but it should be > possible since the data file APIs can take InputStream/OutputStream. The > data file API has the ability to append data blocks from the file if the > schemas are an exact match. This can be done without deserialization, and > optionally can change the compression level or leave it alone. > >> >> Is there a way to enable "mapred.compress.map.output" in such a way that >> those temporary files are compressed as Avro/Deflate? >> I tried simply setting conf.setBoolean("mapred.compress.map.output", true); >> but it didn't seem to have any effect. > > > I am not sure, as I haven't tried it myself. However, the Avro M/R should be > able to leverage all of the Hadoop compressed intermediate forms. LZO/Snappy > are fast and in our cluster Snappy is the default. Deflate can be a lot > slower but much more compact. > >> >> Note that in order to avoid unnecessary sorting overhead, I made each key a >> constant (1L) so that the logs are combined but ordering isn't necessarily >> preserved. (we don't care about ordering) > > > In that case, I think you can use a map only job. There may be some work to > get a single mapper to read many files however. > >> >> FYI, here are my mapper and reducer. >> >> >> public static class AvroReachMapper extends >> AvroMapper<DeliveryLogEvent, Pair<Long, DeliveryLogEvent>> { >> public void map(DeliveryLogEvent levent, >> AvroCollector<Pair<Long, DeliveryLogEvent>> collector, Reporter reporter) >> throws IOException { >> >> collector.collect(new Pair<Long, DeliveryLogEvent>(1L, >> levent)); >> } >> } >> >> public static class Reduce extends AvroReducer<Long, DeliveryLogEvent, >> DeliveryLogEvent> { >> >> @Override >> public void reduce(Long key, Iterable<DeliveryLogEvent> values, >> AvroCollector<DeliveryLogEvent> collector, >> Reporter reporter) >> throws IOException { >> >> for (DeliveryLogEvent event : values) { >> collector.collect(event); >> } >> } >> >> } >> >> Also, I'm setting the following: >> >> AvroJob.setInputSchema(conf, DeliveryLogEvent.SCHEMA$); >> AvroJob.setMapperClass(conf, Mapper.class); >> AvroJob.setMapOutputSchema(conf, SCHEMA); >> >> AvroJob.setOutputSchema(conf, DeliveryLogEvent.SCHEMA$); >> AvroJob.setOutputCodec(conf, DataFileConstants.DEFLATE_CODEC); >> AvroOutputFormat.setDeflateLevel(conf, 9); >> AvroOutputFormat.setSyncInterval(conf, 1024 * 256); >> >> AvroJob.setReducerClass(conf, Reducer.class); >> >> JobClient.runJob(conf); >> >> >> Thanks, >> >> Frank Grimes