Hi Vijayendra,
Currently AvroWriters doesn't support compression. If you want to use
compression then you need to have a custom implementation of AvroWriter
where you can add features of compression. Please find a sample
customization for AvroWriters where you could use compression. You can use
the example below.
codeName = org.apache.hadoop.io.compress.SnappyCodec
CustomAvroWriters.forGenericRecord(schema, codeName)
Regards,
Ravi
On Wed, Jul 29, 2020 at 7:36 PM Vijayendra Yadav <[email protected]>
wrote:
> Hi Team,
>
> Could you please provide a sample for Enabling Compression (Snappy) of
> Avro:
> DataStream[GenericRecord]
>
> AvroWriters.forGenericRecord(schema)
>
> Regards,
> Vijay
>
package org.apache.flink.formats.avro;
import java.io.IOException;
import java.io.OutputStream;
import java.util.function.Function;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Parser;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionOutputStream;
public class CustomAvroWriters {
public static <T extends SpecificRecordBase> AvroWriterFactory<T> forSpecificRecord(Class<T> type) {
String schemaString = SpecificData.get().getSchema(type).toString();
AvroBuilder<T> builder = (out) -> {
return createAvroDataFileWriter(schemaString, SpecificDatumWriter::new, out);
};
return new AvroWriterFactory(builder);
}
public static AvroWriterFactory<GenericRecord> forGenericRecord(Schema schema) {
String schemaString = schema.toString();
AvroBuilder<GenericRecord> builder = (out) -> {
return createAvroDataFileWriter(schemaString, GenericDatumWriter::new, out);
};
return new AvroWriterFactory(builder);
}
public static AvroWriterFactory<GenericRecord> forGenericRecord(Schema schema, String codecName) {
String schemaString = schema.toString();
AvroBuilder<GenericRecord> builder = (out) -> {
return createAvroDataFileWriter(schemaString, GenericDatumWriter::new, out, codecName);
};
return new AvroWriterFactory(builder);
}
public static <T> AvroWriterFactory<T> forReflectRecord(Class<T> type) {
String schemaString = ReflectData.get().getSchema(type).toString();
AvroBuilder<T> builder = (out) -> {
return createAvroDataFileWriter(schemaString, ReflectDatumWriter::new, out);
};
return new AvroWriterFactory(builder);
}
private static <T> DataFileWriter<T> createAvroDataFileWriter(String schemaString, Function<Schema, DatumWriter<T>> datumWriterFactory, OutputStream out) throws IOException {
Schema schema = (new Parser()).parse(schemaString);
DatumWriter<T> datumWriter = (DatumWriter)datumWriterFactory.apply(schema);
DataFileWriter<T> dataFileWriter = new DataFileWriter(datumWriter);
dataFileWriter.create(schema, out);
return dataFileWriter;
}
private static <T> DataFileWriter<T> createAvroDataFileWriter(String schemaString, Function<Schema, DatumWriter<T>> datumWriterFactory, OutputStream out, String codecName) throws IOException {
Schema schema = (new Parser()).parse(schemaString);
DatumWriter<T> datumWriter = (DatumWriter)datumWriterFactory.apply(schema);
DataFileWriter<T> dataFileWriter = new DataFileWriter(datumWriter);
CompressionCodec codec = getCompressionCodec(codecName);
CompressionOutputStream cos = codec.createOutputStream(out);
dataFileWriter.create(schema, cos);
return dataFileWriter;
}
private CustomAvroWriters() {
}
private static CompressionCodec getCompressionCodec(String codecName) {
CompressionCodecFactory codecFactory = new CompressionCodecFactory(new Configuration());
CompressionCodec codec = codecFactory.getCodecByName(codecName);
if (codec == null) {
try {
codec = (CompressionCodec) Class.forName(codecName).newInstance();
}catch(Exception ex) {
throw new RuntimeException("Codec " + codecName + " not found.",ex);
}
}
return codec;
}
}