[ https://issues.apache.org/jira/browse/BEAM-7322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16840472#comment-16840472 ]
Tim Sell commented on BEAM-7322: -------------------------------- In my example code, I have a job that produces events per pane with their lag, reading from different pubsub streams publishing at different frequencies. output attached: [^data.json] > PubSubIO watermark does not advance for very low volumes > -------------------------------------------------------- > > Key: BEAM-7322 > URL: https://issues.apache.org/jira/browse/BEAM-7322 > Project: Beam > Issue Type: Bug > Components: io-java-gcp > Reporter: Tim Sell > Priority: Minor > Attachments: data.json > > > I have identified an issue where the watermark does not advance when using > the beam PubSubIO when volumes are very low. > I have created a mini example project to demonstrate the behaviour with a > python script for generating messages at different frequencies: > https://github.com/tims/beam/tree/master/pubsub-watermark > [note: this is in a directory of a Beam fork for corp hoop jumping > convenience on my end, it is not intended for merging]. > The behaviour is easily replicated if you apply a fixed window triggering > after the watermark passes the end of the window. > pipeline.apply(PubsubIO.readStrings().fromSubscription(subscription)) > .apply(ParDo.of(new ParseScoreEventFn())) > > .apply(Window.<ScoreEvent>into(FixedWindows.of(Duration.standardSeconds(60))) > .triggering(AfterWatermark.pastEndOfWindow()) > .withAllowedLateness(Duration.standardSeconds(60)) > .discardingFiredPanes()) > .apply(MapElements.into(kvs(strings(), integers())) > .via(scoreEvent -> KV.of(scoreEvent.getPlayer(), > scoreEvent.getScore()))) > .apply(Count.perKey()) > .apply(ParDo.of(Log.of("counted per key"))); > With this triggering, using both the flink local runner the direct runner, > panes will be fired after a long delay (minutes) for low frequencies of > messages in pubsub (seconds). The biggest issue is that it seems no panes > will ever be emitted if you just send a few events and stop. This is > particularly likely trip up people new to Beam. > If I change the triggering to have early firings I get exactly the emitted > panes that you would expect. > .apply(Window.<ScoreEvent>into(FixedWindows.of(Duration.standardSeconds(60))) > .triggering(AfterWatermark.pastEndOfWindow() > .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane() > .alignedTo(Duration.standardSeconds(60)))) > .withAllowedLateness(Duration.standardSeconds(60)) > .discardingFiredPanes()) > I can use any variation of early firing triggers and they work as expected. > We believe that the watermark is not advancing when the volume is too low > because of the sampling that PubSubIO does to determine it's watermark. It > just never has a large enough sample. > This problem occurs in the direct runner and flink runner, but not in the > dataflow runner (because dataflow uses it's own PubSubIO because dataflow has > access to internal details of pubsub and so doesn't need to do any sampling). > For extra context from the user@ list: > *Kenneth Knowles:* > Thanks to your info, I think it is the configuration of MovingFunction [1] > that is the likely culprit, but I don't totally understand why. It is > configured like so: > - store 60 seconds of data > - update data every 5 seconds > - require at least 10 messages to be 'significant' > - require messages from at least 2 distinct 5 second update periods to > 'significant' > I would expect a rate of 1 message per second to satisfy this. I may have > read something wrong. > Have you filed an issue in Jira [2]? > Kenn > [1] > https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L508 > [2] https://issues.apache.org/jira/projects/BEAM/issues > *Alexey Romanenko:* > Not sure that this can be very helpful but I recall a similar issue with > KinesisIO [1] [2] and it was a bug in MovingFunction which was fixed. > [1] https://issues.apache.org/jira/browse/BEAM-5063 > [2] https://github.com/apache/beam/pull/6178 -- This message was sent by Atlassian JIRA (v7.6.3#76005)