Hi Vijayendra,
There is an issue with the CustomeAvroWriters.java which i shared with you
earlier, i am sending you the fixed version, hope this will resolve the
issue of reading it from the avro tool.
Please use below supported possible string value for codecName
null - for nullCodec
deflate - for deflateCodec
snappy - for snappyCodec
bzip2 - for bzip2Codec
xz - for xzCodec
Regards,
Ravi
On Thu, Jul 30, 2020 at 8:21 AM Ravi Bhushan Ratnakar <
[email protected]> wrote:
> If it is possible, please share the sample output file.
> Regards,
> Ravi
>
> On Thu, Jul 30, 2020 at 3:17 AM Vijayendra Yadav <[email protected]>
> wrote:
>
>> Hi Ravi,
>>
>> With CustomAvroWriters (SNAPPY) when I run on a cluster, it does create
>> files, but files are not recognized as avro files by avro tools jar, when I
>> try to deserialize it to json.
>>
>> Flink Logs shows:
>> 2020-07-29 23:54:23,270 INFO com.hadoop.compression.lzo.LzoCodec -
>> Successfully loaded & initialized native-lzo library [hadoop-lzo rev
>> ff8f5709577defb6b78cdc1f98cfe129c4b6fe46]
>> 2020-07-29 23:54:23,277 INFO org.apache.hadoop.io.compress.CodecPool - *Got
>> brand-new compressor [.snappy]*
>>
>> 020-07-29 23:54:28,931 INFO
>> org.apache.flink.fs.s3.common.writer.S3Committer - Committing
>> la/gold/vdcs_gold/test/bob/raw/year=2020/month=07/day=29/hour=23/ip-10-223-69-238-2020-07-29-23-54-00-121-5e51c2df-1-0.avro
>> with MPU ID
>>
>> *Avro tools:*
>>
>> java -jar avro-tools-1.7.4.jar *tojson*
>> /tmp/test-s3-flink-new/raw/year\=2020/month\=07/day\=29/hour\=20/ubuntu-2020-07-29-20-35-50-746-87802bc3-2-2.avro
>>
>> *Exception in thread "main" java.io.IOException: Not an Avro data file*
>>
>>
>> Am I missing something ?
>>
>> Regards,
>> Vijay
>>
>>
>>
>> On Wed, Jul 29, 2020 at 2:08 PM Vijayendra Yadav <[email protected]>
>> wrote:
>>
>>> Hi Ravi,
>>>
>>> Thanks for details. CustomAvrowriter was working for now. Although its
>>> failing for Snappy codec in local IDE with "java.lang.UnsatisfiedLinkError:
>>> org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z"
>>> I think I will have to try running it in an EMR/Hadoop environment to
>>> get the SNAPPY library resolved.
>>>
>>> *About this another approach of AvroOutputFormat.*
>>>
>>> Does it fit in streamingfilesink API?
>>>
>>> StreamingFileSink.forBulkFormat(new Path(outPath),CustomAvroWriters
>>> .forGenericRecord(schema, codecName))
>>>
>>> Or its different api. Could you send one sample if you have one for
>>> another sink api.
>>>
>>> Regards,
>>> Vijay
>>>
>>> On Wed, Jul 29, 2020 at 12:45 PM Ravi Bhushan Ratnakar <
>>> [email protected]> wrote:
>>>
>>>> There is another alternative which you could try like this
>>>>
>>>> val stream:DataStream[GenericRecord] = _
>>>>
>>>> val aof:AvroOutputFormat[GenericRecord] = new AvroOutputFormat(new
>>>> org.apache.flink.core.fs.Path(""),classOf[GenericRecord])
>>>>
>>>> aof.setSchema(schema)
>>>>
>>>> aof.setCodec(AvroOutputFormat.Codec.SNAPPY)
>>>>
>>>> stream:DataStream.writeUsingOutputFormat(aof)
>>>>
>>>> Regards,
>>>>
>>>> Ravi
>>>>
>>>>
>>>>
>>>> On Wed, Jul 29, 2020 at 9:12 PM Ravi Bhushan Ratnakar <
>>>> [email protected]> wrote:
>>>>
>>>>> Hi Vijayendra,
>>>>>
>>>>> Currently AvroWriters doesn't support compression. If you want to use
>>>>> compression then you need to have a custom implementation of AvroWriter
>>>>> where you can add features of compression. Please find a sample
>>>>> customization for AvroWriters where you could use compression. You can use
>>>>> the example below.
>>>>>
>>>>> codeName = org.apache.hadoop.io.compress.SnappyCodec
>>>>>
>>>>> CustomAvroWriters.forGenericRecord(schema, codeName)
>>>>>
>>>>> Regards,
>>>>> Ravi
>>>>>
>>>>> On Wed, Jul 29, 2020 at 7:36 PM Vijayendra Yadav <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Hi Team,
>>>>>>
>>>>>> Could you please provide a sample for Enabling Compression (Snappy)
>>>>>> of Avro:
>>>>>> DataStream[GenericRecord]
>>>>>>
>>>>>> AvroWriters.forGenericRecord(schema)
>>>>>>
>>>>>> Regards,
>>>>>> Vijay
>>>>>>
>>>>>
package org.apache.flink.formats.avro;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Parser;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecordBase;
import java.io.IOException;
import java.io.OutputStream;
import java.util.function.Function;
public class CustomAvroWriters {
public static <T extends SpecificRecordBase> AvroWriterFactory<T> forSpecificRecord(Class<T> type) {
String schemaString = SpecificData.get().getSchema(type).toString();
AvroBuilder<T> builder = (out) -> {
return createAvroDataFileWriter(schemaString, SpecificDatumWriter::new, out);
};
return new AvroWriterFactory(builder);
}
public static AvroWriterFactory<GenericRecord> forGenericRecord(Schema schema) {
String schemaString = schema.toString();
AvroBuilder<GenericRecord> builder = (out) -> {
return createAvroDataFileWriter(schemaString, GenericDatumWriter::new, out);
};
return new AvroWriterFactory(builder);
}
public static AvroWriterFactory<GenericRecord> forGenericRecord(Schema schema, String codecName) {
String schemaString = schema.toString();
AvroBuilder<GenericRecord> builder = (out) -> {
return createAvroDataFileWriter(schemaString, GenericDatumWriter::new, out, codecName);
};
return new AvroWriterFactory(builder);
}
public static <T> AvroWriterFactory<T> forReflectRecord(Class<T> type) {
String schemaString = ReflectData.get().getSchema(type).toString();
AvroBuilder<T> builder = (out) -> {
return createAvroDataFileWriter(schemaString, ReflectDatumWriter::new, out);
};
return new AvroWriterFactory(builder);
}
private static <T> DataFileWriter<T> createAvroDataFileWriter(String schemaString, Function<Schema, DatumWriter<T>> datumWriterFactory, OutputStream out) throws IOException {
Schema schema = (new Parser()).parse(schemaString);
DatumWriter<T> datumWriter = (DatumWriter)datumWriterFactory.apply(schema);
DataFileWriter<T> dataFileWriter = new DataFileWriter(datumWriter);
dataFileWriter.create(schema, out);
return dataFileWriter;
}
private static <T> DataFileWriter<T> createAvroDataFileWriter(String schemaString, Function<Schema, DatumWriter<T>> datumWriterFactory, OutputStream out, String codecName) throws IOException {
Schema schema = (new Parser()).parse(schemaString);
DatumWriter<T> datumWriter = (DatumWriter)datumWriterFactory.apply(schema);
DataFileWriter<T> dataFileWriter = new DataFileWriter(datumWriter);
dataFileWriter.setCodec(CodecFactory.fromString(codecName));
dataFileWriter.create(schema, out);
return dataFileWriter;
}
private CustomAvroWriters() {
}
}