Hi, Tim. 

If you look at the doc here 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/datastream/filesystem/#format-types-1
 
<https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/datastream/filesystem/#format-types-1>,
 you just need to write a custom `AvroWriterFactory` method where you could 
pass params such as Codecs to your AvroWriter. Despite the name suggests, it 
applies to the BulkFormats, which includes Parquet files as well. I copy the 
examples below:

```
AvroWriterFactory<?> factory = new AvroWriterFactory<>((AvroBuilder<Address>) 
out -> {
        Schema schema = ReflectData.get().getSchema(Address.class);
        DatumWriter<Address> datumWriter = new ReflectDatumWriter<>(schema);

        DataFileWriter<Address> dataFileWriter = new 
DataFileWriter<>(datumWriter);
        dataFileWriter.setCodec(CodecFactory.snappyCodec());
        dataFileWriter.create(schema, out);
        return dataFileWriter;
});

DataStream<Address> stream = ...
stream.sinkTo(FileSink.forBulkFormat(
        outputBasePath,
        factory).build());
```
Best,
Tiansu 
 

> On 24. 02 2023, at 14:17, Tim Josefsson <tim.josefs...@webstep.se> wrote:
> 
> I'm writing a Flink processor that will read a bunch of JSON records from 
> Kafka and then write them to S3 in parquet format using the FileSink. I've 
> got most things in place, the only thing I haven't been able to figure out is 
> how to change the compression codec used by the writer. Is there any 
> recommended way to do this? Currently I'm using the 
> AvroParquetWriters.forReflectRecord(PlayerEvent.class) to transform my POJOs 
> to Avro and then write them as Parquet files. I've looked into the 
> AvroParquetWriters class but couldn't figure out how to configure the 
> compression codec (or even what codec was used). Is there a way to configure 
> this or do I have to write my own implementation of the Parquet writer and if 
> so, how would one do that?
> 
> Thankful for any help,
> Tim

Reply via email to