This is an unfortunate usability problem with triggers where you can
accidentally close the window and drop all data. I think instead, you
probably want this trigger:
Repeatedly.forever(
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(1)))
The way I recommend to express this trigger is:
AfterWatermark.pastEndOfWindow().withEarlyFirings(
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(1)))
In the second case it is impossible to accidentally "close" the window and
drop all data.
Kenn
On Tue, Oct 8, 2019 at 7:43 AM Eddy G <[email protected]> wrote:
> Been recently developing a Beam (Dataflow) consumer which read from a
> PubSub subscription and outputs to Parquet files the combination of all
> those objects grouped within the same window.
>
> While I was doing testing of this without a huge load everything seemed to
> work fine.
>
> However, after performing some heavy testing I can see that from 1.000.000
> events sent to that PubSub queue, only 1000 make it to Parquet!
>
> According to multiple wall times across different stages, the one which
> parses the events prior applying the window seems to last 58 minutes. The
> last stage which writes to Parquet files lasts 1h and 32 minutes.
>
> I will show now the most relevant parts of the code within, hope you can
> shed some light if its due to the logic that comes before the Window object
> definition or if it's the Window object iself.
>
> pipeline
> .apply("Reading PubSub Events",
> PubsubIO.readMessagesWithAttributes()
> .fromSubscription(options.getSubscription()))
> .apply("Map to AvroSchemaRecord (GenericRecord)",
> ParDo.of(new PubsubMessageToGenericRecord()))
> .setCoder(AvroCoder.of(AVRO_SCHEMA))
> .apply("15m window",
>
> Window.<GenericRecord>into(FixedWindows.of(Duration.standardMinutes(15)))
> .triggering(AfterProcessingTime
> .pastFirstElementInPane()
> .plusDelayOf(Duration.standardSeconds(1)))
> .withAllowedLateness(Duration.ZERO)
> .accumulatingFiredPanes()
> )
>
> Also note that I'm running Beam 2.9.0.
>
> Tried moving the logic after the Window definition but still, most
> messages don't make it to the Parquet file.
>
> Could the logic inside the second stage be too heavy so that messages
> arrive too late and get discarded in the Window? The logic basically
> consists reading the payload, parsing into a POJO (reading inner Map
> attributes, filtering and such)
>
> However, if I sent a million events to PubSub, all those million events
> make it till the Parquet write to file stage, but when reading those
> parquets in Spark and checking the records they aren't complete. Does that
> make sense?
>
>