Not sure that this can be very helpful but I recall a similar issue with 
KinesisIO [1] [2] and it was a bug in MovingFunction which was fixed.

[1] https://issues.apache.org/jira/browse/BEAM-5063 
<https://issues.apache.org/jira/browse/BEAM-5063>
[2] https://github.com/apache/beam/pull/6178 
<https://github.com/apache/beam/pull/6178>

> On 13 May 2019, at 20:52, Kenneth Knowles <k...@apache.org> wrote:
> 
> You should definitely not feel foolish. That was a great report. I expect 
> many users face the same situation. If they are lurking on this list, then 
> you will have helped them already.
> 
> Reza - I expect you should weigh in on the Jira, too, since the "one message 
> test" use case seems like it wouldn't work at all with those MovingFunction 
> params. But I may not understand all the subtleties of the connector.
> 
> Kenn
> 
> From: Tim Sell <ts...@google.com <mailto:ts...@google.com>>
> Date: Mon, May 13, 2019 at 8:06 AM
> To: <user@beam.apache.org <mailto:user@beam.apache.org>>
> 
> Thanks for the feedback, I did some more investigating after you said 1 
> second frequency should be enough to sample on.. And it is I feel foolish. 
> I think I just wasn't waiting long enough as it takes minutes to close the 
> windows. We waited much longer when we were just messages manually and never 
> had a window close.
> 
> I'm generating some stats of lag times to window closing for different 
> frequencies, with code so people can reproduce it, then I'll add this to a 
> jira ticket.
> 
> From: Kenneth Knowles <k...@apache.org <mailto:k...@apache.org>>
> Date: Mon, May 13, 2019 at 10:48 AM
> To: <user@beam.apache.org <mailto:user@beam.apache.org>>, dev
> 
> Nice analysis & details!
> 
> Thanks to your info, I think it is the configuration of MovingFunction [1] 
> that is the likely culprit, but I don't totally understand why. It is 
> configured like so:
> 
>  - store 60 seconds of data
>  - update data every 5 seconds
>  - require at least 10 messages to be 'significant'
>  - require messages from at least 2 distinct 5 second update periods to 
> 'significant'
> 
> I would expect a rate of 1 message per second to satisfy this. I may have 
> read something wrong.
> 
> Have you filed an issue in Jira [2]?
> 
> Kenn
> 
> [1] 
> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L508
>  
> <https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L508>
> [2] https://issues.apache.org/jira/projects/BEAM/issues 
> <https://issues.apache.org/jira/projects/BEAM/issues>
> From: Tim Sell <ts...@google.com <mailto:ts...@google.com>>
> Date: Fri, May 10, 2019 at 4:09 AM
> To: <user@beam.apache.org <mailto:user@beam.apache.org>>
> 
> Hello,
> 
> I have identified an issue where the watermark does not advance when using 
> the beam PubSubIO when volumes are very low.
> 
> The behaviour is easily replicated if you apply a fixed window triggering 
> after the watermark passes the end of the window.
> pipeline.apply(PubsubIO.readStrings().fromSubscription(subscription))
>     .apply(ParDo.of(new ParseScoreEventFn()))
>     
> .apply(Window.<ScoreEvent>into(FixedWindows.of(Duration.standardSeconds(60)))
>         .triggering(AfterWatermark.pastEndOfWindow())
>         .withAllowedLateness(Duration.standardSeconds(60))
>         .discardingFiredPanes())
>     .apply(MapElements.into(kvs(strings(), integers()))
>         .via(scoreEvent -> KV.of(scoreEvent.getPlayer(), 
> scoreEvent.getScore())))
>     .apply(Count.perKey())
>     .apply(ParDo.of(Log.of("counted per key")));
> With this triggering, using both the flink local runner the direct runner, no 
> panes will ever be emitted if the volume of messages in pubsub is very low. 
> eg 1 per second.
> 
> If I change the triggering to have early firings I get exactly the emitted 
> panes that you would expect.
> .apply(Window.<ScoreEvent>into(FixedWindows.of(Duration.standardSeconds(60)))
>     .triggering(AfterWatermark.pastEndOfWindow()
>         .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
>             .alignedTo(Duration.standardSeconds(60))))
>     .withAllowedLateness(Duration.standardSeconds(60))
>     .discardingFiredPanes())
> I can use any variation of early firing triggers and they work as expected.
> 
> We believe that the watermark is not advancing at all when the volume is too 
> low because of the sampling that PubSubIO does to determine it's watermark. 
> It just never has a large enough sample. 
> This problem occurs in the direct runner and flink runner, but not in the 
> dataflow runner (because dataflow uses it's own PubSubIO because dataflow has 
> access to internal details of pubsub and so doesn't need to do any sampling).
> 
> 
> I have also verified that for high volumes of messages, the PubSubIO *does* 
> successfully advance the watermark. Here's a python script I wrote to mass 
> produce random messages:
> import json
> import random
> from google.cloud import pubsub_v1
> 
> 
> def publish_loop(n, project_id, topic_name):
>     publisher = pubsub_v1.PublisherClient()
>     topic_path = publisher.topic_path(project_id, topic_name)
>     rand = random.Random()
>     players = ["eufy"]
> 
>     for i in range(n):
>         score = rand.randint(1, 10)
>         player = rand.choice(players)
>         message = json.dumps({
>           "player": player,
>           "score": score,
>         })
>         print("'%s'" % message)
>         publisher.publish(topic_path, data=message.encode("utf-8"))
> Running my code without early firings on Dataflow, I verified it does count 
> them as you'd expect.
> 
> <Screen Shot 2019-05-08 at 16.41.02.png>
> 
> Doing the same using the direct runner, it struggles to process messages at 
> rate they are being produced... but it does eventually close some windows. 
> Here are screenshots of logs with early firings turned on and then off.
> 
> <Screen Shot 2019-05-08 at 17.02.14.png>
> 
> <Screen Shot 2019-05-08 at 17.21.46.png>
> 
> The key here is that you can see that is logging the ON_TIME panes. This 
> *never* happened for me if the message rate was as low as 1 per second.
> 
> Has anyone else seen this behaviour, where no ON_TIME panes are emitted when 
> there are low volumes from a PubSubIO (when not using Dataflow)? 
> I believe the details that cause this are within the getWatermark function in 
> PubsubUnboundedSource, but it looks too delicate for me to approach.
> 
> It's a problem because we ideally want it to behave well at low volumes too, 
> but also because this is often one of the first streaming examples people 
> try. We discovered this while trying to train people on streaming and it was 
> a bit awkward :)
> 
> Tim Sell
> 
> 

Reply via email to