Hi Vijayendra,

Currently AvroWriters doesn't support compression. If you want to use
compression then you need to have a custom implementation of AvroWriter
where you can add features of compression. Please find a sample
customization for AvroWriters where you could use compression. You can use
the example below.

codeName = org.apache.hadoop.io.compress.SnappyCodec

CustomAvroWriters.forGenericRecord(schema, codeName)

Regards,
Ravi

On Wed, Jul 29, 2020 at 7:36 PM Vijayendra Yadav <contact....@gmail.com>
wrote:

> Hi Team,
>
> Could you please provide a sample for Enabling Compression (Snappy) of
> Avro:
> DataStream[GenericRecord]
>
> AvroWriters.forGenericRecord(schema)
>
> Regards,
> Vijay
>
package org.apache.flink.formats.avro;

import java.io.IOException;
import java.io.OutputStream;
import java.util.function.Function;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Parser;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionOutputStream;

public class CustomAvroWriters {
    public static <T extends SpecificRecordBase> AvroWriterFactory<T> forSpecificRecord(Class<T> type) {
        String schemaString = SpecificData.get().getSchema(type).toString();
        AvroBuilder<T> builder = (out) -> {
            return createAvroDataFileWriter(schemaString, SpecificDatumWriter::new, out);
        };
        return new AvroWriterFactory(builder);
    }

    public static AvroWriterFactory<GenericRecord> forGenericRecord(Schema schema) {
        String schemaString = schema.toString();
        AvroBuilder<GenericRecord> builder = (out) -> {
            return createAvroDataFileWriter(schemaString, GenericDatumWriter::new, out);
        };
        return new AvroWriterFactory(builder);
    }

    public static AvroWriterFactory<GenericRecord> forGenericRecord(Schema schema, String codecName) {
        String schemaString = schema.toString();
        AvroBuilder<GenericRecord> builder = (out) -> {
            return createAvroDataFileWriter(schemaString, GenericDatumWriter::new, out, codecName);
        };
        return new AvroWriterFactory(builder);
    }

    public static <T> AvroWriterFactory<T> forReflectRecord(Class<T> type) {
        String schemaString = ReflectData.get().getSchema(type).toString();
        AvroBuilder<T> builder = (out) -> {
            return createAvroDataFileWriter(schemaString, ReflectDatumWriter::new, out);
        };
        return new AvroWriterFactory(builder);
    }

    private static <T> DataFileWriter<T> createAvroDataFileWriter(String schemaString, Function<Schema, DatumWriter<T>> datumWriterFactory, OutputStream out) throws IOException {
        Schema schema = (new Parser()).parse(schemaString);
        DatumWriter<T> datumWriter = (DatumWriter)datumWriterFactory.apply(schema);
        DataFileWriter<T> dataFileWriter = new DataFileWriter(datumWriter);
        dataFileWriter.create(schema, out);
        return dataFileWriter;
    }

    private static <T> DataFileWriter<T> createAvroDataFileWriter(String schemaString, Function<Schema, DatumWriter<T>> datumWriterFactory, OutputStream out, String codecName) throws IOException {
        Schema schema = (new Parser()).parse(schemaString);
        DatumWriter<T> datumWriter = (DatumWriter)datumWriterFactory.apply(schema);
        DataFileWriter<T> dataFileWriter = new DataFileWriter(datumWriter);
        CompressionCodec codec = getCompressionCodec(codecName);
        CompressionOutputStream cos = codec.createOutputStream(out);
        dataFileWriter.create(schema, cos);
        return dataFileWriter;
    }

    private CustomAvroWriters() {
    }

    private static CompressionCodec getCompressionCodec(String codecName) {
        CompressionCodecFactory codecFactory  = new CompressionCodecFactory(new Configuration());
        CompressionCodec codec = codecFactory.getCodecByName(codecName);
        if (codec == null) {
            try {
                codec = (CompressionCodec) Class.forName(codecName).newInstance();
            }catch(Exception ex) {
                throw new RuntimeException("Codec " + codecName + " not found.",ex);
            }
        }
        return codec;
    }
}

Reply via email to