Not sure that this can be very helpful but I recall a similar issue with KinesisIO [1] [2] and it was a bug in MovingFunction which was fixed.
[1] https://issues.apache.org/jira/browse/BEAM-5063 <https://issues.apache.org/jira/browse/BEAM-5063> [2] https://github.com/apache/beam/pull/6178 <https://github.com/apache/beam/pull/6178> > On 13 May 2019, at 20:52, Kenneth Knowles <k...@apache.org> wrote: > > You should definitely not feel foolish. That was a great report. I expect > many users face the same situation. If they are lurking on this list, then > you will have helped them already. > > Reza - I expect you should weigh in on the Jira, too, since the "one message > test" use case seems like it wouldn't work at all with those MovingFunction > params. But I may not understand all the subtleties of the connector. > > Kenn > > From: Tim Sell <ts...@google.com <mailto:ts...@google.com>> > Date: Mon, May 13, 2019 at 8:06 AM > To: <user@beam.apache.org <mailto:user@beam.apache.org>> > > Thanks for the feedback, I did some more investigating after you said 1 > second frequency should be enough to sample on.. And it is I feel foolish. > I think I just wasn't waiting long enough as it takes minutes to close the > windows. We waited much longer when we were just messages manually and never > had a window close. > > I'm generating some stats of lag times to window closing for different > frequencies, with code so people can reproduce it, then I'll add this to a > jira ticket. > > From: Kenneth Knowles <k...@apache.org <mailto:k...@apache.org>> > Date: Mon, May 13, 2019 at 10:48 AM > To: <user@beam.apache.org <mailto:user@beam.apache.org>>, dev > > Nice analysis & details! > > Thanks to your info, I think it is the configuration of MovingFunction [1] > that is the likely culprit, but I don't totally understand why. It is > configured like so: > > - store 60 seconds of data > - update data every 5 seconds > - require at least 10 messages to be 'significant' > - require messages from at least 2 distinct 5 second update periods to > 'significant' > > I would expect a rate of 1 message per second to satisfy this. I may have > read something wrong. > > Have you filed an issue in Jira [2]? > > Kenn > > [1] > https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L508 > > <https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L508> > [2] https://issues.apache.org/jira/projects/BEAM/issues > <https://issues.apache.org/jira/projects/BEAM/issues> > From: Tim Sell <ts...@google.com <mailto:ts...@google.com>> > Date: Fri, May 10, 2019 at 4:09 AM > To: <user@beam.apache.org <mailto:user@beam.apache.org>> > > Hello, > > I have identified an issue where the watermark does not advance when using > the beam PubSubIO when volumes are very low. > > The behaviour is easily replicated if you apply a fixed window triggering > after the watermark passes the end of the window. > pipeline.apply(PubsubIO.readStrings().fromSubscription(subscription)) > .apply(ParDo.of(new ParseScoreEventFn())) > > .apply(Window.<ScoreEvent>into(FixedWindows.of(Duration.standardSeconds(60))) > .triggering(AfterWatermark.pastEndOfWindow()) > .withAllowedLateness(Duration.standardSeconds(60)) > .discardingFiredPanes()) > .apply(MapElements.into(kvs(strings(), integers())) > .via(scoreEvent -> KV.of(scoreEvent.getPlayer(), > scoreEvent.getScore()))) > .apply(Count.perKey()) > .apply(ParDo.of(Log.of("counted per key"))); > With this triggering, using both the flink local runner the direct runner, no > panes will ever be emitted if the volume of messages in pubsub is very low. > eg 1 per second. > > If I change the triggering to have early firings I get exactly the emitted > panes that you would expect. > .apply(Window.<ScoreEvent>into(FixedWindows.of(Duration.standardSeconds(60))) > .triggering(AfterWatermark.pastEndOfWindow() > .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane() > .alignedTo(Duration.standardSeconds(60)))) > .withAllowedLateness(Duration.standardSeconds(60)) > .discardingFiredPanes()) > I can use any variation of early firing triggers and they work as expected. > > We believe that the watermark is not advancing at all when the volume is too > low because of the sampling that PubSubIO does to determine it's watermark. > It just never has a large enough sample. > This problem occurs in the direct runner and flink runner, but not in the > dataflow runner (because dataflow uses it's own PubSubIO because dataflow has > access to internal details of pubsub and so doesn't need to do any sampling). > > > I have also verified that for high volumes of messages, the PubSubIO *does* > successfully advance the watermark. Here's a python script I wrote to mass > produce random messages: > import json > import random > from google.cloud import pubsub_v1 > > > def publish_loop(n, project_id, topic_name): > publisher = pubsub_v1.PublisherClient() > topic_path = publisher.topic_path(project_id, topic_name) > rand = random.Random() > players = ["eufy"] > > for i in range(n): > score = rand.randint(1, 10) > player = rand.choice(players) > message = json.dumps({ > "player": player, > "score": score, > }) > print("'%s'" % message) > publisher.publish(topic_path, data=message.encode("utf-8")) > Running my code without early firings on Dataflow, I verified it does count > them as you'd expect. > > <Screen Shot 2019-05-08 at 16.41.02.png> > > Doing the same using the direct runner, it struggles to process messages at > rate they are being produced... but it does eventually close some windows. > Here are screenshots of logs with early firings turned on and then off. > > <Screen Shot 2019-05-08 at 17.02.14.png> > > <Screen Shot 2019-05-08 at 17.21.46.png> > > The key here is that you can see that is logging the ON_TIME panes. This > *never* happened for me if the message rate was as low as 1 per second. > > Has anyone else seen this behaviour, where no ON_TIME panes are emitted when > there are low volumes from a PubSubIO (when not using Dataflow)? > I believe the details that cause this are within the getWatermark function in > PubsubUnboundedSource, but it looks too delicate for me to approach. > > It's a problem because we ideally want it to behave well at low volumes too, > but also because this is often one of the first streaming examples people > try. We discovered this while trying to train people on streaming and it was > a bit awkward :) > > Tim Sell > >