Hi Vijayendra,

There is an issue with the CustomeAvroWriters.java which i shared with you
earlier, i am sending you the fixed version, hope this will resolve the
issue of reading it from the avro tool.

Please use below supported possible string value for codecName

null - for nullCodec
deflate - for deflateCodec
snappy - for snappyCodec
bzip2 - for bzip2Codec
xz - for xzCodec


Regards,
Ravi

On Thu, Jul 30, 2020 at 8:21 AM Ravi Bhushan Ratnakar <
ravibhushanratna...@gmail.com> wrote:

> If it is possible, please share the sample output file.
> Regards,
> Ravi
>
> On Thu, Jul 30, 2020 at 3:17 AM Vijayendra Yadav <contact....@gmail.com>
> wrote:
>
>> Hi Ravi,
>>
>> With CustomAvroWriters (SNAPPY) when I run on a cluster, it does create
>> files, but files are not recognized as avro files by avro tools jar, when I
>> try to deserialize it to json.
>>
>> Flink Logs shows:
>> 2020-07-29 23:54:23,270 INFO com.hadoop.compression.lzo.LzoCodec -
>> Successfully loaded & initialized native-lzo library [hadoop-lzo rev
>> ff8f5709577defb6b78cdc1f98cfe129c4b6fe46]
>> 2020-07-29 23:54:23,277 INFO org.apache.hadoop.io.compress.CodecPool - *Got
>> brand-new compressor [.snappy]*
>>
>> 020-07-29 23:54:28,931 INFO
>> org.apache.flink.fs.s3.common.writer.S3Committer - Committing
>> la/gold/vdcs_gold/test/bob/raw/year=2020/month=07/day=29/hour=23/ip-10-223-69-238-2020-07-29-23-54-00-121-5e51c2df-1-0.avro
>> with MPU ID
>>
>> *Avro tools:*
>>
>> java -jar avro-tools-1.7.4.jar *tojson*
>> /tmp/test-s3-flink-new/raw/year\=2020/month\=07/day\=29/hour\=20/ubuntu-2020-07-29-20-35-50-746-87802bc3-2-2.avro
>>
>> *Exception in thread "main" java.io.IOException: Not an Avro data file*
>>
>>
>> Am I missing something ?
>>
>> Regards,
>> Vijay
>>
>>
>>
>> On Wed, Jul 29, 2020 at 2:08 PM Vijayendra Yadav <contact....@gmail.com>
>> wrote:
>>
>>> Hi Ravi,
>>>
>>> Thanks for details. CustomAvrowriter was working for now.  Although its
>>> failing for Snappy codec in local IDE with "java.lang.UnsatisfiedLinkError:
>>> org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z"
>>> I think I will have to try running it in an EMR/Hadoop environment to
>>> get the SNAPPY library resolved.
>>>
>>> *About this another approach of AvroOutputFormat.*
>>>
>>> Does it fit in streamingfilesink API?
>>>
>>> StreamingFileSink.forBulkFormat(new Path(outPath),CustomAvroWriters
>>>         .forGenericRecord(schema, codecName))
>>>
>>> Or its different api. Could you send one sample if you have one for
>>> another sink api.
>>>
>>> Regards,
>>> Vijay
>>>
>>> On Wed, Jul 29, 2020 at 12:45 PM Ravi Bhushan Ratnakar <
>>> ravibhushanratna...@gmail.com> wrote:
>>>
>>>> There is another alternative which you could try like this
>>>>
>>>> val stream:DataStream[GenericRecord] = _
>>>>
>>>> val aof:AvroOutputFormat[GenericRecord] = new AvroOutputFormat(new 
>>>> org.apache.flink.core.fs.Path(""),classOf[GenericRecord])
>>>>
>>>> aof.setSchema(schema)
>>>>
>>>> aof.setCodec(AvroOutputFormat.Codec.SNAPPY)
>>>>
>>>> stream:DataStream.writeUsingOutputFormat(aof)
>>>>
>>>> Regards,
>>>>
>>>> Ravi
>>>>
>>>>
>>>>
>>>> On Wed, Jul 29, 2020 at 9:12 PM Ravi Bhushan Ratnakar <
>>>> ravibhushanratna...@gmail.com> wrote:
>>>>
>>>>> Hi Vijayendra,
>>>>>
>>>>> Currently AvroWriters doesn't support compression. If you want to use
>>>>> compression then you need to have a custom implementation of AvroWriter
>>>>> where you can add features of compression. Please find a sample
>>>>> customization for AvroWriters where you could use compression. You can use
>>>>> the example below.
>>>>>
>>>>> codeName = org.apache.hadoop.io.compress.SnappyCodec
>>>>>
>>>>> CustomAvroWriters.forGenericRecord(schema, codeName)
>>>>>
>>>>> Regards,
>>>>> Ravi
>>>>>
>>>>> On Wed, Jul 29, 2020 at 7:36 PM Vijayendra Yadav <
>>>>> contact....@gmail.com> wrote:
>>>>>
>>>>>> Hi Team,
>>>>>>
>>>>>> Could you please provide a sample for Enabling Compression (Snappy)
>>>>>> of Avro:
>>>>>> DataStream[GenericRecord]
>>>>>>
>>>>>> AvroWriters.forGenericRecord(schema)
>>>>>>
>>>>>> Regards,
>>>>>> Vijay
>>>>>>
>>>>>
package org.apache.flink.formats.avro;

import org.apache.avro.Schema;
import org.apache.avro.Schema.Parser;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecordBase;

import java.io.IOException;
import java.io.OutputStream;
import java.util.function.Function;

public class CustomAvroWriters {
    public static <T extends SpecificRecordBase> AvroWriterFactory<T> forSpecificRecord(Class<T> type) {
        String schemaString = SpecificData.get().getSchema(type).toString();
        AvroBuilder<T> builder = (out) -> {
            return createAvroDataFileWriter(schemaString, SpecificDatumWriter::new, out);
        };
        return new AvroWriterFactory(builder);
    }

    public static AvroWriterFactory<GenericRecord> forGenericRecord(Schema schema) {
        String schemaString = schema.toString();
        AvroBuilder<GenericRecord> builder = (out) -> {
            return createAvroDataFileWriter(schemaString, GenericDatumWriter::new, out);
        };
        return new AvroWriterFactory(builder);
    }

    public static AvroWriterFactory<GenericRecord> forGenericRecord(Schema schema, String codecName) {
        String schemaString = schema.toString();
        AvroBuilder<GenericRecord> builder = (out) -> {
            return createAvroDataFileWriter(schemaString, GenericDatumWriter::new, out, codecName);
        };
        return new AvroWriterFactory(builder);
    }

    public static <T> AvroWriterFactory<T> forReflectRecord(Class<T> type) {
        String schemaString = ReflectData.get().getSchema(type).toString();
        AvroBuilder<T> builder = (out) -> {
            return createAvroDataFileWriter(schemaString, ReflectDatumWriter::new, out);
        };
        return new AvroWriterFactory(builder);
    }

    private static <T> DataFileWriter<T> createAvroDataFileWriter(String schemaString, Function<Schema, DatumWriter<T>> datumWriterFactory, OutputStream out) throws IOException {
        Schema schema = (new Parser()).parse(schemaString);
        DatumWriter<T> datumWriter = (DatumWriter)datumWriterFactory.apply(schema);
        DataFileWriter<T> dataFileWriter = new DataFileWriter(datumWriter);
        dataFileWriter.create(schema, out);
        return dataFileWriter;
    }

    private static <T> DataFileWriter<T> createAvroDataFileWriter(String schemaString, Function<Schema, DatumWriter<T>> datumWriterFactory, OutputStream out, String codecName) throws IOException {
        Schema schema = (new Parser()).parse(schemaString);
        DatumWriter<T> datumWriter = (DatumWriter)datumWriterFactory.apply(schema);
        DataFileWriter<T> dataFileWriter = new DataFileWriter(datumWriter);
        dataFileWriter.setCodec(CodecFactory.fromString(codecName));
        dataFileWriter.create(schema, out);
        return dataFileWriter;
    }

    private CustomAvroWriters() {
    }
}

Reply via email to