If I have a map-only job, would I want only one mapper running to pull all the 
records from the source input files and stream/append them to the target avro 
Would that be no different (or more efficient) than doing "hadoop dfs -cat 
file1 file2 file3" and piping the output to append to a "hadoop dfs -put 
In that case, my only question is how would I combine the avro files into a new 
file without deserializing them?


Frank Grimes

On 2012-01-12, at 1:14 PM, Scott Carey wrote:

> On 1/12/12 8:27 AM, "Frank Grimes" <frankgrime...@gmail.com> wrote:
>> Hi All,
>> We have Avro data files in HDFS which are compressed using the Deflate codec.
>> We have written an M/R job using the Avro Mapred API to combine those files.
>> It seems to be working fine, however when we run it we notice that the 
>> temporary work area (spills, etc) seem to be uncompressed.
>> We're thinking we might see a speedup due to reduced I/O if the temporary 
>> files are compressed as well.
> If all you want to do is combine the files, there is no reason to deserialize 
> and reserialize the contents, and a map-only job could suffice.
> If this is the case, you might want to consider one of two optoins:
> 1.  Use a map only job, with a combined file input.  This will produce one 
> file per mapper and no intermediate data.
> 2.  Use the Avro data file API to append to a file.  I am not sure if this 
> will work with HDFS without some modifications to Avro, but it should be 
> possible since the data file APIs can take InputStream/OutputStream.  The 
> data file API has the ability to append data blocks from the file if the 
> schemas are an exact match.  This can be done without deserialization, and 
> optionally can change the compression level or leave it alone.
>> Is there a way to enable "mapred.compress.map.output" in such a way that 
>> those temporary files are compressed as Avro/Deflate?
>> I tried simply setting conf.setBoolean("mapred.compress.map.output", true); 
>> but it didn't seem to have any effect.
> I am not sure, as I haven't tried it myself.  However, the Avro M/R should be 
> able to leverage all of the Hadoop compressed intermediate forms.  LZO/Snappy 
> are fast and in our cluster Snappy is the default.  Deflate can be a lot 
> slower but much more compact.
>> Note that in order to avoid unnecessary sorting overhead, I made each key a 
>> constant (1L) so that the logs are combined but ordering isn't necessarily 
>> preserved. (we don't care about ordering)
> In that case, I think you can use a map only job.  There may be some work to 
> get a single mapper to read many files however.
>> FYI, here are my mapper and reducer.
>>      public static class AvroReachMapper extends 
>> AvroMapper<DeliveryLogEvent, Pair<Long, DeliveryLogEvent>> {
>>              public void map(DeliveryLogEvent levent, 
>> AvroCollector<Pair<Long, DeliveryLogEvent>> collector, Reporter reporter)
>>                      throws IOException {
>>                      collector.collect(new Pair<Long, DeliveryLogEvent>(1L, 
>> levent));
>>              }
>>      }
>>      public static class Reduce extends AvroReducer<Long, DeliveryLogEvent, 
>> DeliveryLogEvent> {
>>              @Override
>>              public void reduce(Long key, Iterable<DeliveryLogEvent> values,
>>                              AvroCollector<DeliveryLogEvent> collector, 
>> Reporter reporter)
>>                              throws IOException {
>>                      for (DeliveryLogEvent event : values) {
>>                              collector.collect(event);
>>                      }
>>              }
>>      }
>> Also, I'm setting the following:
>>      AvroJob.setInputSchema(conf, DeliveryLogEvent.SCHEMA$);
>>      AvroJob.setMapperClass(conf, Mapper.class);
>>      AvroJob.setMapOutputSchema(conf, SCHEMA);
>>      AvroJob.setOutputSchema(conf, DeliveryLogEvent.SCHEMA$);
>>      AvroJob.setOutputCodec(conf, DataFileConstants.DEFLATE_CODEC);
>>      AvroOutputFormat.setDeflateLevel(conf, 9);
>>      AvroOutputFormat.setSyncInterval(conf, 1024 * 256);
>>      AvroJob.setReducerClass(conf, Reducer.class);
>>      JobClient.runJob(conf);
>> Thanks,
>> Frank Grimes

