This is an unfortunate usability problem with triggers where you can
accidentally close the window and drop all data. I think instead, you
probably want this trigger:

  Repeatedly.forever(
      AfterProcessingTime
                    .pastFirstElementInPane()
                    .plusDelayOf(Duration.standardSeconds(1)))

The way I recommend to express this trigger is:

    AfterWatermark.pastEndOfWindow().withEarlyFirings(
        AfterProcessingTime
                    .pastFirstElementInPane()
                    .plusDelayOf(Duration.standardSeconds(1)))

In the second case it is impossible to accidentally "close" the window and
drop all data.

Kenn

On Tue, Oct 8, 2019 at 7:43 AM Eddy G <[email protected]> wrote:

> Been recently developing a Beam (Dataflow) consumer which read from a
> PubSub subscription and outputs to Parquet files the combination of all
> those objects grouped within the same window.
>
> While I was doing testing of this without a huge load everything seemed to
> work fine.
>
> However, after performing some heavy testing I can see that from 1.000.000
> events sent to that PubSub queue, only 1000 make it to Parquet!
>
> According to multiple wall times across different stages, the one which
> parses the events prior applying the window seems to last 58 minutes. The
> last stage which writes to Parquet files lasts 1h and 32 minutes.
>
> I will show now the most relevant parts of the code within, hope you can
> shed some light if its due to the logic that comes before the Window object
> definition or if it's the Window object iself.
>
> pipeline
>         .apply("Reading PubSub Events",
>             PubsubIO.readMessagesWithAttributes()
>                 .fromSubscription(options.getSubscription()))
>         .apply("Map to AvroSchemaRecord (GenericRecord)",
>             ParDo.of(new PubsubMessageToGenericRecord()))
>         .setCoder(AvroCoder.of(AVRO_SCHEMA))
>         .apply("15m window",
>
> Window.<GenericRecord>into(FixedWindows.of(Duration.standardMinutes(15)))
>                 .triggering(AfterProcessingTime
>                     .pastFirstElementInPane()
>                     .plusDelayOf(Duration.standardSeconds(1)))
>                 .withAllowedLateness(Duration.ZERO)
>                 .accumulatingFiredPanes()
>         )
>
> Also note that I'm running Beam 2.9.0.
>
> Tried moving the logic after the Window definition but still, most
> messages don't make it to the Parquet file.
>
> Could the logic inside the second stage be too heavy so that messages
> arrive too late and get discarded in the Window? The logic basically
> consists reading the payload, parsing into a POJO (reading inner Map
> attributes, filtering and such)
>
> However, if I sent a million events to PubSub, all those million events
> make it till the Parquet write to file stage, but when reading those
> parquets in Spark and checking the records they aren't complete. Does that
> make sense?
>
>

Reply via email to