You're getting 1 shard per pane, and you get a pane every time it's triggered on an early firing. And then another one in the final on-time pane. To have 1 file with 1 shard for every 15 minute window you need to only fire on window close. Ie AfterWatermark.pastendofwindow, without early firing.
On Mon, 14 Oct 2019, 14:35 Eddy G, <kaotix...@gmail.com> wrote: > Thanks a lot everyone for your so valuable feedback! > > Just updated my code, made some minor refactoring and seems to be working > like a charm. Still some data being dropped due to lateness (but I'm > talking about 100 elements per 2 million, so no "big deal" there, I will > take a look into extending lateness and overall performance bits that I'm > missing out). > > A thing that worries me a lot is that the wall time has been exponentially > increasing up to 1 day and 3 hours in the stage that is in charge of > writing all that captured data into parquet files, supposedly due to > .parquet file writing code. > > I suppose that this is also the reason why I still get tons of small > parquet files within a same bucket, as I should only have, in a perfect > scenario, 4 files (1 each 15 minutes due to the Window object length), when > I'm currently having +60! > > .apply("Write .parquet File(s)", > FileIO > .<String, GenericRecord>writeDynamic() > .by((SerializableFunction<GenericRecord, String>) > event -> { > // specify partitioning here > }) > .via(ParquetIO.sink(AVRO_SCHEMA)) > .to(options.getOutputDirectory()) > .withNaming(type -> ParquetFileNaming.getNaming(...)) > .withDestinationCoder(StringUtf8Coder.of()) > .withNumShards(1) // should this be 0? Could this > imply increasing of costs if set to 0? >