On 1/12/12 11:24 AM, "Frank Grimes" <frankgrime...@gmail.com> wrote:

> Hi Scott,
> 
> If I have a map-only job, would I want only one mapper running to pull all the
> records from the source input files and stream/append them to the target avro
> file?
> Would that be no different (or more efficient) than doing "hadoop dfs -cat
> file1 file2 file3" and piping the output to append to a "hadoop dfs -put
> combinedFile"?
> In that case, my only question is how would I combine the avro files into a
> new file without deserializing them?

It would be different.  An Avro file has a header that contains the Schema
and compression codec info along with other metadata, followed by data
blocks.  Each data block has a record count and size prefix and a 16 byte
delimiter.  You cannot simply concatenate them together because the schema
or compression codec may differ, a header in the middle of the file is not
allowed, and the delimiter may differ.

http://avro.apache.org/docs/current/api/java/org/apache/avro/file/DataFileWr
iter.html

DataFileWriter can append a pre-existing file with the same schema, in
particular look at the documentation for appendAllFrom()
http://avro.apache.org/docs/current/api/java/org/apache/avro/file/DataFileWr
iter.html#appendAllFrom%28org.apache.avro.file.DataFileStream,%20boolean%29



> 
> Thanks,
> 
> Frank Grimes
> 
> 
> On 2012-01-12, at 1:14 PM, Scott Carey wrote:
> 
>> 
>> 
>> On 1/12/12 8:27 AM, "Frank Grimes" <frankgrime...@gmail.com> wrote:
>> 
>>> Hi All,
>>> 
>>> We have Avro data files in HDFS which are compressed using the Deflate
>>> codec.
>>> We have written an M/R job using the Avro Mapred API to combine those files.
>>> 
>>> It seems to be working fine, however when we run it we notice that the
>>> temporary work area (spills, etc) seem to be uncompressed.
>>> We're thinking we might see a speedup due to reduced I/O if the temporary
>>> files are compressed as well.
>> 
>> If all you want to do is combine the files, there is no reason to deserialize
>> and reserialize the contents, and a map-only job could suffice.
>> If this is the case, you might want to consider one of two optoins:
>> 1.  Use a map only job, with a combined file input.  This will produce one
>> file per mapper and no intermediate data.
>> 2.  Use the Avro data file API to append to a file.  I am not sure if this
>> will work with HDFS without some modifications to Avro, but it should be
>> possible since the data file APIs can take InputStream/OutputStream.  The
>> data file API has the ability to append data blocks from the file if the
>> schemas are an exact match.  This can be done without deserialization, and
>> optionally can change the compression level or leave it alone.
>> 
>>> 
>>> Is there a way to enable "mapred.compress.map.output" in such a way that
>>> those temporary files are compressed as Avro/Deflate?
>>> I tried simply setting conf.setBoolean("mapred.compress.map.output", true);
>>> but it didn't seem to have any effect.
>> 
>> I am not sure, as I haven't tried it myself.  However, the Avro M/R should be
>> able to leverage all of the Hadoop compressed intermediate forms.  LZO/Snappy
>> are fast and in our cluster Snappy is the default.  Deflate can be a lot
>> slower but much more compact.
>> 
>>> 
>>> Note that in order to avoid unnecessary sorting overhead, I made each key a
>>> constant (1L) so that the logs are combined but ordering isn't necessarily
>>> preserved. (we don't care about ordering)
>> 
>> In that case, I think you can use a map only job.  There may be some work to
>> get a single mapper to read many files however.
>> 
>>> 
>>> FYI, here are my mapper and reducer.
>>> 
>>> 
>>> public static class AvroReachMapper extends AvroMapper<DeliveryLogEvent,
>>> Pair<Long, DeliveryLogEvent>> {
>>> public void map(DeliveryLogEvent levent, AvroCollector<Pair<Long,
>>> DeliveryLogEvent>> collector, Reporter reporter)
>>> throws IOException {
>>> 
>>> collector.collect(new Pair<Long, DeliveryLogEvent>(1L, levent));
>>> }
>>> }
>>> 
>>> public static class Reduce extends AvroReducer<Long, DeliveryLogEvent,
>>> DeliveryLogEvent> {
>>> 
>>> @Override
>>> public void reduce(Long key, Iterable<DeliveryLogEvent> values,
>>> AvroCollector<DeliveryLogEvent> collector, Reporter reporter)
>>> throws IOException {
>>> 
>>> for (DeliveryLogEvent event : values) {
>>> collector.collect(event);
>>> }
>>> }
>>> 
>>> }
>>> 
>>> Also, I'm setting the following:
>>> 
>>> AvroJob.setInputSchema(conf, DeliveryLogEvent.SCHEMA$);
>>> AvroJob.setMapperClass(conf, Mapper.class);
>>> AvroJob.setMapOutputSchema(conf, SCHEMA);
>>> 
>>> AvroJob.setOutputSchema(conf, DeliveryLogEvent.SCHEMA$);
>>> AvroJob.setOutputCodec(conf, DataFileConstants.DEFLATE_CODEC);
>>> AvroOutputFormat.setDeflateLevel(conf, 9);
>>> AvroOutputFormat.setSyncInterval(conf, 1024 * 256);
>>> 
>>> AvroJob.setReducerClass(conf, Reducer.class);
>>> 
>>> JobClient.runJob(conf);
>>> 
>>> 
>>> Thanks,
>>> 
>>> Frank Grimes
> 


Reply via email to