The Recodec tool may be useful, and the source code is a good reference. java jar avro-tools-<VERSION>.jar http://svn.apache.org/viewvc/avro/tags/release-1.6.1/lang/java/tools/src/ma in/java/org/apache/avro/tool/RecodecTool.java?view=co
https://issues.apache.org/jira/browse/AVRO-684 On 1/12/12 12:53 PM, "Scott Carey" <scottca...@apache.org> wrote: > > >On 1/12/12 12:35 PM, "Frank Grimes" <frankgrime...@gmail.com> wrote: > > >>So I decided to try writing my own AvroStreamCombiner utility and it >>seems to choke when passing multiple input files: >> >> >>hadoop dfs -cat hdfs://hadoop/machine1.log.avro >>hdfs://hadoop/machine2.log.avro | ./deliveryLogAvroStreamCombiner.sh > >>combined.log.avro >> >> >> >> >>Exception in thread "main" java.io.IOException: Invalid sync! >> >>at >>org.apache.avro.file.DataFileStream.nextRawBlock(DataFileStream.java:293) >>at >>org.apache.avro.file.DataFileWriter.appendAllFrom(DataFileWriter.java:329 >>) >>at DeliveryLogAvroStreamCombiner.main(Unknown Source) >> >> >> >> >>Here's the code in question: >> >>public class DeliveryLogAvroStreamCombiner { >> >> >> /** >> * @param args >> */ >> public static void main(String[] args) throws Exception { >> DataFileStream<DeliveryLogEvent> dfs = null; >> DataFileWriter<DeliveryLogEvent> dfw = null; >> >> >> try { >> dfs = new DataFileStream<DeliveryLogEvent>(System.in, >> new >>SpecificDatumReader<DeliveryLogEvent>()); >> >> >> OutputStream stdout = System.out; >> >> >> dfw = new DataFileWriter<DeliveryLogEvent>(new >>SpecificDatumWriter<DeliveryLogEvent>()); >> dfw.setCodec(CodecFactory.deflateCodec(9)); >> dfw.setSyncInterval(1024 * 256); >> dfw.create(DeliveryLogEvent.SCHEMA$, stdout); >> >> dfw.appendAllFrom(dfs, false); >> >> >> > >dfs is from System.in, which has multiple files one after the other. >Each file will need its own DataFileStream (has its own header and >metadata). > >In Java you could get the list of files, and for each file use HDFS's API >to open the file stream, and append that to your one file. >In bash you could loop over all the source files and append one at a time >(the above fails on the second file). However, in order to append to the >end of a pre-existing file the only API now takes a File, not a seekable >stream, so Avro would need a patch to allow that in HDFS (also, only an >HDFS version that supports appends would work). > >Other things of note: >You will probably get better total file size compression by using a >larger sync interval (1M to 4 M) than deflate level 9. Deflate 9 is VERY >slow and almost never compresses more than 1% better than deflate 6, >which is much faster. I suggest experimenting with the 'recodec' option >on some of your files to see what the best size / performance tradeoff >is. I doubt that 256K (pre-compression) blocks with level 9 compression >is the way to go. > >For reference: http://tukaani.org/lzma/benchmarks.html >(gzip uses deflate compression) > >-Scott > > > >> } >> finally { >> if (dfs != null) try {dfs.close();} catch (Exception e) >>{e.printStackTrace();} >> if (dfw != null) try {dfw.close();} catch (Exception e) >>{e.printStackTrace();} >> } >> } >> >>} >> >> >>Is there any way this could be made to work without needing to download >>the individual files to disk and calling append for each of them? >> >>Thanks, >> >>Frank Grimes >> >> >>On 2012-01-12, at 2:24 PM, Frank Grimes wrote: >> >> >>Hi Scott, >> >>If I have a map-only job, would I want only one mapper running to pull >>all the records from the source input files and stream/append them to >>the target avro file? >>Would that be no different (or more efficient) than doing "hadoop dfs >>-cat file1 file2 file3" and piping the output to append to a "hadoop dfs >>-put combinedFile"? >>In that case, my only question is how would I combine the avro files >>into a new file without deserializing them? >> >>Thanks, >> >>Frank Grimes >> >> >>On 2012-01-12, at 1:14 PM, Scott Carey wrote: >> >> >> >> >>On 1/12/12 8:27 AM, "Frank Grimes" <frankgrime...@gmail.com> wrote: >> >> >>>Hi All, >>>We have Avro data files in HDFS which are compressed using the Deflate >>>codec. >>>We have written an M/R job using the Avro Mapred API to combine those >>>files. >>> >>>It seems to be working fine, however when we run it we notice that the >>>temporary work area (spills, etc) seem to be uncompressed. >>>We're thinking we might see a speedup due to reduced I/O if the >>>temporary files are compressed as well. >>> >>> >> >>If all you want to do is combine the files, there is no reason to >>deserialize and reserialize the contents, and a map-only job could >>suffice. >>If this is the case, you might want to consider one of two optoins: >>1. Use a map only job, with a combined file input. This will produce >>one file per mapper and no intermediate data. >>2. Use the Avro data file API to append to a file. I am not sure if >>this will work with HDFS without some modifications to Avro, but it >>should be possible since the data file APIs can take >>InputStream/OutputStream. The data file API has the ability to append >>data blocks from the file if the schemas are an exact match. This can >>be done without deserialization, and optionally can change the >>compression level or leave it alone. >> >>> >>>Is there a way to enable "mapred.compress.map.output" in such a way >>>that those temporary files are compressed as Avro/Deflate? >>>I tried simply setting conf.setBoolean("mapred.compress.map.output", >>>true); but it didn't seem to have any effect. >>> >>> >> >>I am not sure, as I haven't tried it myself. However, the Avro M/R >>should be able to leverage all of the Hadoop compressed intermediate >>forms. LZO/Snappy are fast and in our cluster Snappy is the default. >>Deflate can be a lot slower but much more compact. >> >>> >>>Note that in order to avoid unnecessary sorting overhead, I made each >>>key a constant (1L) so that the logs are combined but ordering isn't >>>necessarily preserved. (we don't care about ordering) >>> >>> >>> >> >>In that case, I think you can use a map only job. There may be some >>work to get a single mapper to read many files however. >> >>> >>>FYI, here are my mapper and reducer. >>> >>> >>> public static class AvroReachMapper extends >>>AvroMapper<DeliveryLogEvent, Pair<Long, DeliveryLogEvent>> { >>> public void map(DeliveryLogEvent levent, >>> AvroCollector<Pair<Long, >>>DeliveryLogEvent>> collector, Reporter reporter) >>> throws IOException { >>> >>> >>> collector.collect(new Pair<Long, DeliveryLogEvent>(1L, >>> levent)); >>> } >>> } >>> >>> >>> public static class Reduce extends AvroReducer<Long, DeliveryLogEvent, >>>DeliveryLogEvent> { >>> >>> @Override >>> public void reduce(Long key, Iterable<DeliveryLogEvent> values, >>> AvroCollector<DeliveryLogEvent> collector, >>> Reporter reporter) >>> throws IOException { >>> >>> for (DeliveryLogEvent event : values) { >>> collector.collect(event); >>> } >>> } >>> >>> } >>> >>> >>> >>> >>> AvroJob.setInputSchema(conf, DeliveryLogEvent.SCHEMA$); >>> AvroJob.setMapperClass(conf, Mapper.class); >>> AvroJob.setMapOutputSchema(conf, SCHEMA); >>> >>> >>> AvroJob.setOutputSchema(conf, DeliveryLogEvent.SCHEMA$); >>> AvroJob.setOutputCodec(conf, DataFileConstants.DEFLATE_CODEC); >>> AvroOutputFormat.setDeflateLevel(conf, 9); >>> AvroOutputFormat.setSyncInterval(conf, 1024 * 256); >>> >>> AvroJob.setReducerClass(conf, Reducer.class); >>> >>> >>> JobClient.runJob(conf); >>> >>> >>>Thanks, >>> >>>Frank Grimes >>> >>> >>> >> >> >> >> >> >> >> >> >>