You're getting 1 shard per pane, and you get a pane every time it's
triggered on an early firing. And then another one in the final on-time
pane. To have 1 file with 1 shard for every 15 minute window you need to
only fire on window close. Ie AfterWatermark.pastendofwindow, without early
firing.

On Mon, 14 Oct 2019, 14:35 Eddy G, <kaotix...@gmail.com> wrote:

> Thanks a lot everyone for your so valuable feedback!
>
> Just updated my code, made some minor refactoring and seems to be working
> like a charm. Still some data being dropped due to lateness (but I'm
> talking about 100 elements per 2 million, so no "big deal" there, I will
> take a look into extending lateness and overall performance bits that I'm
> missing out).
>
> A thing that worries me a lot is that the wall time has been exponentially
> increasing up to 1 day and 3 hours in the stage that is in charge of
> writing all that captured data into parquet files, supposedly due to
> .parquet file writing code.
>
> I suppose that this is also the reason why I still get tons of small
> parquet files within a same bucket, as I should only have, in a perfect
> scenario, 4 files (1 each 15 minutes due to the Window object length), when
> I'm currently having +60!
>
>             .apply("Write .parquet File(s)",
>                 FileIO
>                     .<String, GenericRecord>writeDynamic()
>                     .by((SerializableFunction<GenericRecord, String>)
> event -> {
>                         // specify partitioning here
>                     })
>                     .via(ParquetIO.sink(AVRO_SCHEMA))
>                     .to(options.getOutputDirectory())
>                     .withNaming(type -> ParquetFileNaming.getNaming(...))
>                     .withDestinationCoder(StringUtf8Coder.of())
>                     .withNumShards(1) // should this be 0? Could this
> imply increasing of costs if set to 0?
>

Reply via email to