Hi Vijayendra, There is an issue with the CustomeAvroWriters.java which i shared with you earlier, i am sending you the fixed version, hope this will resolve the issue of reading it from the avro tool.
Please use below supported possible string value for codecName null - for nullCodec deflate - for deflateCodec snappy - for snappyCodec bzip2 - for bzip2Codec xz - for xzCodec Regards, Ravi On Thu, Jul 30, 2020 at 8:21 AM Ravi Bhushan Ratnakar < ravibhushanratna...@gmail.com> wrote: > If it is possible, please share the sample output file. > Regards, > Ravi > > On Thu, Jul 30, 2020 at 3:17 AM Vijayendra Yadav <contact....@gmail.com> > wrote: > >> Hi Ravi, >> >> With CustomAvroWriters (SNAPPY) when I run on a cluster, it does create >> files, but files are not recognized as avro files by avro tools jar, when I >> try to deserialize it to json. >> >> Flink Logs shows: >> 2020-07-29 23:54:23,270 INFO com.hadoop.compression.lzo.LzoCodec - >> Successfully loaded & initialized native-lzo library [hadoop-lzo rev >> ff8f5709577defb6b78cdc1f98cfe129c4b6fe46] >> 2020-07-29 23:54:23,277 INFO org.apache.hadoop.io.compress.CodecPool - *Got >> brand-new compressor [.snappy]* >> >> 020-07-29 23:54:28,931 INFO >> org.apache.flink.fs.s3.common.writer.S3Committer - Committing >> la/gold/vdcs_gold/test/bob/raw/year=2020/month=07/day=29/hour=23/ip-10-223-69-238-2020-07-29-23-54-00-121-5e51c2df-1-0.avro >> with MPU ID >> >> *Avro tools:* >> >> java -jar avro-tools-1.7.4.jar *tojson* >> /tmp/test-s3-flink-new/raw/year\=2020/month\=07/day\=29/hour\=20/ubuntu-2020-07-29-20-35-50-746-87802bc3-2-2.avro >> >> *Exception in thread "main" java.io.IOException: Not an Avro data file* >> >> >> Am I missing something ? >> >> Regards, >> Vijay >> >> >> >> On Wed, Jul 29, 2020 at 2:08 PM Vijayendra Yadav <contact....@gmail.com> >> wrote: >> >>> Hi Ravi, >>> >>> Thanks for details. CustomAvrowriter was working for now. Although its >>> failing for Snappy codec in local IDE with "java.lang.UnsatisfiedLinkError: >>> org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z" >>> I think I will have to try running it in an EMR/Hadoop environment to >>> get the SNAPPY library resolved. >>> >>> *About this another approach of AvroOutputFormat.* >>> >>> Does it fit in streamingfilesink API? >>> >>> StreamingFileSink.forBulkFormat(new Path(outPath),CustomAvroWriters >>> .forGenericRecord(schema, codecName)) >>> >>> Or its different api. Could you send one sample if you have one for >>> another sink api. >>> >>> Regards, >>> Vijay >>> >>> On Wed, Jul 29, 2020 at 12:45 PM Ravi Bhushan Ratnakar < >>> ravibhushanratna...@gmail.com> wrote: >>> >>>> There is another alternative which you could try like this >>>> >>>> val stream:DataStream[GenericRecord] = _ >>>> >>>> val aof:AvroOutputFormat[GenericRecord] = new AvroOutputFormat(new >>>> org.apache.flink.core.fs.Path(""),classOf[GenericRecord]) >>>> >>>> aof.setSchema(schema) >>>> >>>> aof.setCodec(AvroOutputFormat.Codec.SNAPPY) >>>> >>>> stream:DataStream.writeUsingOutputFormat(aof) >>>> >>>> Regards, >>>> >>>> Ravi >>>> >>>> >>>> >>>> On Wed, Jul 29, 2020 at 9:12 PM Ravi Bhushan Ratnakar < >>>> ravibhushanratna...@gmail.com> wrote: >>>> >>>>> Hi Vijayendra, >>>>> >>>>> Currently AvroWriters doesn't support compression. If you want to use >>>>> compression then you need to have a custom implementation of AvroWriter >>>>> where you can add features of compression. Please find a sample >>>>> customization for AvroWriters where you could use compression. You can use >>>>> the example below. >>>>> >>>>> codeName = org.apache.hadoop.io.compress.SnappyCodec >>>>> >>>>> CustomAvroWriters.forGenericRecord(schema, codeName) >>>>> >>>>> Regards, >>>>> Ravi >>>>> >>>>> On Wed, Jul 29, 2020 at 7:36 PM Vijayendra Yadav < >>>>> contact....@gmail.com> wrote: >>>>> >>>>>> Hi Team, >>>>>> >>>>>> Could you please provide a sample for Enabling Compression (Snappy) >>>>>> of Avro: >>>>>> DataStream[GenericRecord] >>>>>> >>>>>> AvroWriters.forGenericRecord(schema) >>>>>> >>>>>> Regards, >>>>>> Vijay >>>>>> >>>>>
package org.apache.flink.formats.avro; import org.apache.avro.Schema; import org.apache.avro.Schema.Parser; import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumWriter; import org.apache.avro.reflect.ReflectData; import org.apache.avro.reflect.ReflectDatumWriter; import org.apache.avro.specific.SpecificData; import org.apache.avro.specific.SpecificDatumWriter; import org.apache.avro.specific.SpecificRecordBase; import java.io.IOException; import java.io.OutputStream; import java.util.function.Function; public class CustomAvroWriters { public static <T extends SpecificRecordBase> AvroWriterFactory<T> forSpecificRecord(Class<T> type) { String schemaString = SpecificData.get().getSchema(type).toString(); AvroBuilder<T> builder = (out) -> { return createAvroDataFileWriter(schemaString, SpecificDatumWriter::new, out); }; return new AvroWriterFactory(builder); } public static AvroWriterFactory<GenericRecord> forGenericRecord(Schema schema) { String schemaString = schema.toString(); AvroBuilder<GenericRecord> builder = (out) -> { return createAvroDataFileWriter(schemaString, GenericDatumWriter::new, out); }; return new AvroWriterFactory(builder); } public static AvroWriterFactory<GenericRecord> forGenericRecord(Schema schema, String codecName) { String schemaString = schema.toString(); AvroBuilder<GenericRecord> builder = (out) -> { return createAvroDataFileWriter(schemaString, GenericDatumWriter::new, out, codecName); }; return new AvroWriterFactory(builder); } public static <T> AvroWriterFactory<T> forReflectRecord(Class<T> type) { String schemaString = ReflectData.get().getSchema(type).toString(); AvroBuilder<T> builder = (out) -> { return createAvroDataFileWriter(schemaString, ReflectDatumWriter::new, out); }; return new AvroWriterFactory(builder); } private static <T> DataFileWriter<T> createAvroDataFileWriter(String schemaString, Function<Schema, DatumWriter<T>> datumWriterFactory, OutputStream out) throws IOException { Schema schema = (new Parser()).parse(schemaString); DatumWriter<T> datumWriter = (DatumWriter)datumWriterFactory.apply(schema); DataFileWriter<T> dataFileWriter = new DataFileWriter(datumWriter); dataFileWriter.create(schema, out); return dataFileWriter; } private static <T> DataFileWriter<T> createAvroDataFileWriter(String schemaString, Function<Schema, DatumWriter<T>> datumWriterFactory, OutputStream out, String codecName) throws IOException { Schema schema = (new Parser()).parse(schemaString); DatumWriter<T> datumWriter = (DatumWriter)datumWriterFactory.apply(schema); DataFileWriter<T> dataFileWriter = new DataFileWriter(datumWriter); dataFileWriter.setCodec(CodecFactory.fromString(codecName)); dataFileWriter.create(schema, out); return dataFileWriter; } private CustomAvroWriters() { } }