The Recodec tool may be useful, and the source code is a good reference.

java ­jar avro-tools-<VERSION>.jar
http://svn.apache.org/viewvc/avro/tags/release-1.6.1/lang/java/tools/src/ma
in/java/org/apache/avro/tool/RecodecTool.java?view=co

https://issues.apache.org/jira/browse/AVRO-684



On 1/12/12 12:53 PM, "Scott Carey" <scottca...@apache.org> wrote:


>
>
>On 1/12/12 12:35 PM, "Frank Grimes" <frankgrime...@gmail.com> wrote:
>
>
>>So I decided to try writing my own AvroStreamCombiner utility and it
>>seems to choke when passing multiple input files:
>>
>>
>>hadoop dfs -cat hdfs://hadoop/machine1.log.avro
>>hdfs://hadoop/machine2.log.avro | ./deliveryLogAvroStreamCombiner.sh >
>>combined.log.avro
>>
>>
>>
>>
>>Exception in thread "main" java.io.IOException: Invalid sync!
>>
>>at 
>>org.apache.avro.file.DataFileStream.nextRawBlock(DataFileStream.java:293)
>>at 
>>org.apache.avro.file.DataFileWriter.appendAllFrom(DataFileWriter.java:329
>>)
>>at DeliveryLogAvroStreamCombiner.main(Unknown Source)
>>
>>
>>
>>
>>Here's the code in question:
>>
>>public class DeliveryLogAvroStreamCombiner {
>>
>>
>>      /**
>>       * @param args
>>       */
>>      public static void main(String[] args) throws Exception {
>>              DataFileStream<DeliveryLogEvent> dfs = null;
>>              DataFileWriter<DeliveryLogEvent> dfw = null;
>>
>>
>>              try {
>>                      dfs = new DataFileStream<DeliveryLogEvent>(System.in, 
>> new
>>SpecificDatumReader<DeliveryLogEvent>());
>>
>>
>>                      OutputStream stdout = System.out;
>>
>>
>>                      dfw = new DataFileWriter<DeliveryLogEvent>(new
>>SpecificDatumWriter<DeliveryLogEvent>());
>>                      dfw.setCodec(CodecFactory.deflateCodec(9));
>>                      dfw.setSyncInterval(1024 * 256);
>>                      dfw.create(DeliveryLogEvent.SCHEMA$, stdout);
>>
>>                      dfw.appendAllFrom(dfs, false);
>>
>>
>>
>
>dfs is from System.in, which has multiple files one after the other.
>Each file will need its own DataFileStream (has its own header and
>metadata).   
>
>In Java you could get the list of files, and for each file use HDFS's API
>to open the file stream, and append that to your one file.
>In bash you could loop over all the source files and append one at a time
>(the above fails on the second file).  However, in order to append to the
>end of a pre-existing file the only API now takes a File, not a seekable
>stream, so Avro would need a patch to allow that in HDFS (also, only an
>HDFS version that supports appends would work).
>
>Other things of note:
>You will probably get better total file size compression by using a
>larger sync interval (1M to 4 M) than deflate level 9.  Deflate 9 is VERY
>slow and almost never compresses more than 1% better than deflate 6,
>which is much faster.  I suggest experimenting with the 'recodec' option
>on some of your files to see what the best size / performance tradeoff
>is.  I doubt that 256K (pre-compression) blocks with level 9 compression
>is the way to go.
>
>For reference: http://tukaani.org/lzma/benchmarks.html
>(gzip uses deflate compression)
>
>-Scott
>
>
>
>>              }
>>              finally {
>>                      if (dfs != null) try {dfs.close();} catch (Exception e)
>>{e.printStackTrace();}
>>                      if (dfw != null) try {dfw.close();} catch (Exception e)
>>{e.printStackTrace();}
>>              }
>>      }
>>
>>}
>>
>>
>>Is there any way this could be made to work without needing to download
>>the individual files to disk and calling append for each of them?
>>
>>Thanks,
>>
>>Frank Grimes
>>
>>
>>On 2012-01-12, at 2:24 PM, Frank Grimes wrote:
>>
>>
>>Hi Scott,
>>
>>If I have a map-only job, would I want only one mapper running to pull
>>all the records from the source input files and stream/append them to
>>the target avro file?
>>Would that be no different (or more efficient) than doing "hadoop dfs
>>-cat file1 file2 file3" and piping the output to append to a "hadoop dfs
>>-put combinedFile"?
>>In that case, my only question is how would I combine the avro files
>>into a new file without deserializing them?
>>
>>Thanks,
>>
>>Frank Grimes
>>
>>
>>On 2012-01-12, at 1:14 PM, Scott Carey wrote:
>>
>>
>>
>>
>>On 1/12/12 8:27 AM, "Frank Grimes" <frankgrime...@gmail.com> wrote:
>>
>>
>>>Hi All,
>>>We have Avro data files in HDFS which are compressed using the Deflate
>>>codec.
>>>We have written an M/R job using the Avro Mapred API to combine those
>>>files.
>>>
>>>It seems to be working fine, however when we run it we notice that the
>>>temporary work area (spills, etc) seem to be uncompressed.
>>>We're thinking we might see a speedup due to reduced I/O if the
>>>temporary files are compressed as well.
>>>
>>>
>>
>>If all you want to do is combine the files, there is no reason to
>>deserialize and reserialize the contents, and a map-only job could
>>suffice.
>>If this is the case, you might want to consider one of two optoins:
>>1.  Use a map only job, with a combined file input.  This will produce
>>one file per mapper and no intermediate data.
>>2.  Use the Avro data file API to append to a file.  I am not sure if
>>this will work with HDFS without some modifications to Avro, but it
>>should be possible since the data file APIs can take
>>InputStream/OutputStream.  The data file API has the ability to append
>>data blocks from the file if the schemas are an exact match.  This can
>>be done without deserialization, and optionally can change the
>>compression level or leave it alone.
>>
>>>
>>>Is there a way to enable "mapred.compress.map.output" in such a way
>>>that those temporary files are compressed as Avro/Deflate?
>>>I tried simply setting conf.setBoolean("mapred.compress.map.output",
>>>true); but it didn't seem to have any effect.
>>>
>>>
>>
>>I am not sure, as I haven't tried it myself.  However, the Avro M/R
>>should be able to leverage all of the Hadoop compressed intermediate
>>forms.  LZO/Snappy are fast and in our cluster Snappy is the default.
>>Deflate can be a lot slower but much more compact.
>>
>>>
>>>Note that in order to avoid unnecessary sorting overhead, I made each
>>>key a constant (1L) so that the logs are combined but ordering isn't
>>>necessarily preserved. (we don't care about ordering)
>>>
>>>
>>>
>>
>>In that case, I think you can use a map only job.  There may be some
>>work to get a single mapper to read many files however.
>>
>>>
>>>FYI, here are my mapper and reducer.
>>>
>>>
>>>     public static class AvroReachMapper extends
>>>AvroMapper<DeliveryLogEvent, Pair<Long, DeliveryLogEvent>> {
>>>             public void map(DeliveryLogEvent levent, 
>>> AvroCollector<Pair<Long,
>>>DeliveryLogEvent>> collector, Reporter reporter)
>>>                     throws IOException {
>>>
>>>
>>>                     collector.collect(new Pair<Long, DeliveryLogEvent>(1L, 
>>> levent));
>>>             }
>>>     }
>>>
>>>
>>>     public static class Reduce extends AvroReducer<Long, DeliveryLogEvent,
>>>DeliveryLogEvent> {
>>>
>>>             @Override
>>>             public void reduce(Long key, Iterable<DeliveryLogEvent> values,
>>>                             AvroCollector<DeliveryLogEvent> collector, 
>>> Reporter reporter)
>>>                             throws IOException {
>>>
>>>                     for (DeliveryLogEvent event : values) {
>>>                             collector.collect(event);
>>>                     }
>>>             }
>>>
>>>     }
>>>
>>>
>>>
>>>
>>>     AvroJob.setInputSchema(conf, DeliveryLogEvent.SCHEMA$);
>>>     AvroJob.setMapperClass(conf, Mapper.class);
>>>     AvroJob.setMapOutputSchema(conf, SCHEMA);
>>>
>>>
>>>     AvroJob.setOutputSchema(conf, DeliveryLogEvent.SCHEMA$);
>>>     AvroJob.setOutputCodec(conf, DataFileConstants.DEFLATE_CODEC);
>>>     AvroOutputFormat.setDeflateLevel(conf, 9);
>>>     AvroOutputFormat.setSyncInterval(conf, 1024 * 256);
>>>
>>>     AvroJob.setReducerClass(conf, Reducer.class);
>>>
>>>
>>>     JobClient.runJob(conf);
>>>
>>>
>>>Thanks,
>>>
>>>Frank Grimes
>>>
>>>
>>>
>>
>>
>>
>>
>>
>>
>>
>>
>>


Reply via email to