[ 
https://issues.apache.org/jira/browse/BEAM-7322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16840834#comment-16840834
 ] 

Tim Sell commented on BEAM-7322:
--------------------------------

Further explanation of the data.json I uploaded, I set it my job to have a 1 
minute fixed windows. It consumes, windows and counts different pubsub 
subscriptions separately. They had publishing rates of 10, 5, 1 and 0.5 second 
frequencies.

After about 10 minutes I had on_time and late panes for the 5,1 and 0.5 second 
streams, but had none for the 10 second frequency stream. 


> PubSubIO watermark does not advance for very low volumes
> --------------------------------------------------------
>
>                 Key: BEAM-7322
>                 URL: https://issues.apache.org/jira/browse/BEAM-7322
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-gcp
>            Reporter: Tim Sell
>            Priority: Minor
>         Attachments: data.json
>
>
> I have identified an issue where the watermark does not advance when using 
> the beam PubSubIO when volumes are very low.
> I have created a mini example project to demonstrate the behaviour with a 
> python script for generating messages at different frequencies:
> https://github.com/tims/beam/tree/master/pubsub-watermark 
> [note: this is in a directory of a Beam fork for corp hoop jumping 
> convenience on my end, it is not intended for merging].
> The behaviour is easily replicated if you apply a fixed window triggering 
> after the watermark passes the end of the window.
> {code}
> pipeline.apply(PubsubIO.readStrings().fromSubscription(subscription))
>     .apply(ParDo.of(new ParseScoreEventFn()))
>     
> .apply(Window.<ScoreEvent>into(FixedWindows.of(Duration.standardSeconds(60)))
>         .triggering(AfterWatermark.pastEndOfWindow())
>         .withAllowedLateness(Duration.standardSeconds(60))
>         .discardingFiredPanes())
>     .apply(MapElements.into(kvs(strings(), integers()))
>         .via(scoreEvent -> KV.of(scoreEvent.getPlayer(), 
> scoreEvent.getScore())))
>     .apply(Count.perKey())
>     .apply(ParDo.of(Log.of("counted per key")));
> {code}
> With this triggering, using both the flink local runner the direct runner, 
> panes will be fired after a long delay (minutes) for low frequencies of 
> messages in pubsub (seconds). The biggest issue is that it seems no panes 
> will ever be emitted if you just send a few events and stop. This is 
> particularly likely trip up people new to Beam.
> If I change the triggering to have early firings I get exactly the emitted 
> panes that you would expect.
> {code}
> .apply(Window.<ScoreEvent>into(FixedWindows.of(Duration.standardSeconds(60)))
>     .triggering(AfterWatermark.pastEndOfWindow()
>         .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
>             .alignedTo(Duration.standardSeconds(60))))
>     .withAllowedLateness(Duration.standardSeconds(60))
>     .discardingFiredPanes())
> {code}
> I can use any variation of early firing triggers and they work as expected.
> We believe that the watermark is not advancing when the volume is too low 
> because of the sampling that PubSubIO does to determine it's watermark. It 
> just never has a large enough sample. 
> This problem occurs in the direct runner and flink runner, but not in the 
> dataflow runner (because dataflow uses it's own PubSubIO because dataflow has 
> access to internal details of pubsub and so doesn't need to do any sampling).
> For extra context from the user@ list:
> *Kenneth Knowles:*
> Thanks to your info, I think it is the configuration of MovingFunction [1] 
> that is the likely culprit, but I don't totally understand why. It is 
> configured like so:
>  - store 60 seconds of data
>  - update data every 5 seconds
>  - require at least 10 messages to be 'significant'
>  - require messages from at least 2 distinct 5 second update periods to 
> 'significant'
> I would expect a rate of 1 message per second to satisfy this. I may have 
> read something wrong.
> Have you filed an issue in Jira [2]?
> Kenn
> [1] 
> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L508
> [2] https://issues.apache.org/jira/projects/BEAM/issues
> *Alexey Romanenko:*
> Not sure that this can be very helpful but I recall a similar issue with 
> KinesisIO [1] [2] and it was a bug in MovingFunction which was fixed.
> [1] https://issues.apache.org/jira/browse/BEAM-5063
> [2] https://github.com/apache/beam/pull/6178



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to