Thanks, I'll check it out. On Sat 26 Jan, 2019, 10:48 PM Dawid Wysakowicz <dwysakow...@apache.org wrote:
> Hi, > > Generally speaking you can pass the batch size through RollingPolicy[1]. > Unfortunately BulkFormats uses OnCheckpointRollingPolicy and AFAIK it does > not allow adjusting its behavior on part size. Maybe Kostas have an idea > how to do that in the least invasive way. How to do it for non bulk formats > you can have a look at[2]. > > I assume that you were using AvroParquetWriter. You can specify > compression on the ParquetWriter I guess the same way as before. The code > for doing it can look sth like this: > > StreamingFileSink.forBulkFormat( > Path.fromLocalFile(folder), > new ParquetWriterFactory<>(out -> > AvroParquetWriter.<Datum>builder(out) > .withSchema(...) > .withDataMode(...) > .withCompressionCodec(...) > .build())) > .build() > > Best, > > Dawid > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.html > > [2] > https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java > On 26/01/2019 08:18, Taher Koitawala wrote: > > Can someone please help with this? > > On Fri 25 Jan, 2019, 1:47 PM Taher Koitawala <taher.koitaw...@gslab.com > wrote: > >> Hi All, >> Is there a way to specify *batch size* and *compression *properties >> when using StreamingFileSink just like we did in bucketing sink? The only >> parameters it is accepting is Inactivity bucket check interval and avro >> schema. >> >> We have numerous flink jobs pulling data from the same kafka >> topics, however doing different operations. And each flink job is writing a >> file with different size and we would want to make it consistent. >> >> >> Regards, >> Taher Koitawala >> GS Lab Pune >> +91 8407979163 >> >