On 1/12/12 11:24 AM, "Frank Grimes" <frankgrime...@gmail.com> wrote:
> Hi Scott, > > If I have a map-only job, would I want only one mapper running to pull all the > records from the source input files and stream/append them to the target avro > file? > Would that be no different (or more efficient) than doing "hadoop dfs -cat > file1 file2 file3" and piping the output to append to a "hadoop dfs -put > combinedFile"? > In that case, my only question is how would I combine the avro files into a > new file without deserializing them? It would be different. An Avro file has a header that contains the Schema and compression codec info along with other metadata, followed by data blocks. Each data block has a record count and size prefix and a 16 byte delimiter. You cannot simply concatenate them together because the schema or compression codec may differ, a header in the middle of the file is not allowed, and the delimiter may differ. http://avro.apache.org/docs/current/api/java/org/apache/avro/file/DataFileWr iter.html DataFileWriter can append a pre-existing file with the same schema, in particular look at the documentation for appendAllFrom() http://avro.apache.org/docs/current/api/java/org/apache/avro/file/DataFileWr iter.html#appendAllFrom%28org.apache.avro.file.DataFileStream,%20boolean%29 > > Thanks, > > Frank Grimes > > > On 2012-01-12, at 1:14 PM, Scott Carey wrote: > >> >> >> On 1/12/12 8:27 AM, "Frank Grimes" <frankgrime...@gmail.com> wrote: >> >>> Hi All, >>> >>> We have Avro data files in HDFS which are compressed using the Deflate >>> codec. >>> We have written an M/R job using the Avro Mapred API to combine those files. >>> >>> It seems to be working fine, however when we run it we notice that the >>> temporary work area (spills, etc) seem to be uncompressed. >>> We're thinking we might see a speedup due to reduced I/O if the temporary >>> files are compressed as well. >> >> If all you want to do is combine the files, there is no reason to deserialize >> and reserialize the contents, and a map-only job could suffice. >> If this is the case, you might want to consider one of two optoins: >> 1. Use a map only job, with a combined file input. This will produce one >> file per mapper and no intermediate data. >> 2. Use the Avro data file API to append to a file. I am not sure if this >> will work with HDFS without some modifications to Avro, but it should be >> possible since the data file APIs can take InputStream/OutputStream. The >> data file API has the ability to append data blocks from the file if the >> schemas are an exact match. This can be done without deserialization, and >> optionally can change the compression level or leave it alone. >> >>> >>> Is there a way to enable "mapred.compress.map.output" in such a way that >>> those temporary files are compressed as Avro/Deflate? >>> I tried simply setting conf.setBoolean("mapred.compress.map.output", true); >>> but it didn't seem to have any effect. >> >> I am not sure, as I haven't tried it myself. However, the Avro M/R should be >> able to leverage all of the Hadoop compressed intermediate forms. LZO/Snappy >> are fast and in our cluster Snappy is the default. Deflate can be a lot >> slower but much more compact. >> >>> >>> Note that in order to avoid unnecessary sorting overhead, I made each key a >>> constant (1L) so that the logs are combined but ordering isn't necessarily >>> preserved. (we don't care about ordering) >> >> In that case, I think you can use a map only job. There may be some work to >> get a single mapper to read many files however. >> >>> >>> FYI, here are my mapper and reducer. >>> >>> >>> public static class AvroReachMapper extends AvroMapper<DeliveryLogEvent, >>> Pair<Long, DeliveryLogEvent>> { >>> public void map(DeliveryLogEvent levent, AvroCollector<Pair<Long, >>> DeliveryLogEvent>> collector, Reporter reporter) >>> throws IOException { >>> >>> collector.collect(new Pair<Long, DeliveryLogEvent>(1L, levent)); >>> } >>> } >>> >>> public static class Reduce extends AvroReducer<Long, DeliveryLogEvent, >>> DeliveryLogEvent> { >>> >>> @Override >>> public void reduce(Long key, Iterable<DeliveryLogEvent> values, >>> AvroCollector<DeliveryLogEvent> collector, Reporter reporter) >>> throws IOException { >>> >>> for (DeliveryLogEvent event : values) { >>> collector.collect(event); >>> } >>> } >>> >>> } >>> >>> Also, I'm setting the following: >>> >>> AvroJob.setInputSchema(conf, DeliveryLogEvent.SCHEMA$); >>> AvroJob.setMapperClass(conf, Mapper.class); >>> AvroJob.setMapOutputSchema(conf, SCHEMA); >>> >>> AvroJob.setOutputSchema(conf, DeliveryLogEvent.SCHEMA$); >>> AvroJob.setOutputCodec(conf, DataFileConstants.DEFLATE_CODEC); >>> AvroOutputFormat.setDeflateLevel(conf, 9); >>> AvroOutputFormat.setSyncInterval(conf, 1024 * 256); >>> >>> AvroJob.setReducerClass(conf, Reducer.class); >>> >>> JobClient.runJob(conf); >>> >>> >>> Thanks, >>> >>> Frank Grimes >