Thanks!

I made a jira
https://issues.apache.org/jira/browse/BEAM-7322

And dumped my sample code here:
https://github.com/tims/beam/tree/master/pubsub-watermark

*From: *Alexey Romanenko <aromanenko....@gmail.com>
*Date: *Wed, May 15, 2019 at 12:18 AM
*To: * <user@beam.apache.org>

Not sure that this can be very helpful but I recall a similar issue with
> KinesisIO [1] [2] and it was a bug in MovingFunction which was fixed.
>
> [1] https://issues.apache.org/jira/browse/BEAM-5063
> [2] https://github.com/apache/beam/pull/6178
>
> On 13 May 2019, at 20:52, Kenneth Knowles <k...@apache.org> wrote:
>
> You should definitely not feel foolish. That was a great report. I expect
> many users face the same situation. If they are lurking on this list, then
> you will have helped them already.
>
> Reza - I expect you should weigh in on the Jira, too, since the "one
> message test" use case seems like it wouldn't work at all with those
> MovingFunction params. But I may not understand all the subtleties of the
> connector.
>
> Kenn
>
> *From: *Tim Sell <ts...@google.com>
> *Date: *Mon, May 13, 2019 at 8:06 AM
> *To: * <user@beam.apache.org>
>
> Thanks for the feedback, I did some more investigating after you said 1
>> second frequency should be enough to sample on.. And it is I feel foolish.
>> I think I just wasn't waiting long enough as it takes minutes to close
>> the windows. We waited much longer when we were just messages manually and
>> never had a window close.
>>
>> I'm generating some stats of lag times to window closing for different
>> frequencies, with code so people can reproduce it, then I'll add this to a
>> jira ticket.
>>
>> *From: *Kenneth Knowles <k...@apache.org>
>> *Date: *Mon, May 13, 2019 at 10:48 AM
>> *To: * <user@beam.apache.org>, dev
>>
>> Nice analysis & details!
>>>
>>> Thanks to your info, I think it is the configuration of MovingFunction
>>> [1] that is the likely culprit, but I don't totally understand why. It is
>>> configured like so:
>>>
>>>  - store 60 seconds of data
>>>  - update data every 5 seconds
>>>  - require at least 10 messages to be 'significant'
>>>  - require messages from at least 2 distinct 5 second update periods to
>>> 'significant'
>>>
>>> I would expect a rate of 1 message per second to satisfy this. I may
>>> have read something wrong.
>>>
>>> Have you filed an issue in Jira [2]?
>>>
>>> Kenn
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L508
>>> [2] https://issues.apache.org/jira/projects/BEAM/issues
>>>
>>> *From: *Tim Sell <ts...@google.com>
>>> *Date: *Fri, May 10, 2019 at 4:09 AM
>>> *To: * <user@beam.apache.org>
>>>
>>> Hello,
>>>>
>>>> I have identified an issue where the watermark does not advance when
>>>> using the beam PubSubIO when volumes are very low.
>>>>
>>>> The behaviour is easily replicated if you apply a fixed window
>>>> triggering after the watermark passes the end of the window.
>>>>
>>>> pipeline.apply(PubsubIO.readStrings().fromSubscription(subscription))
>>>>     .apply(ParDo.of(new ParseScoreEventFn()))
>>>>     
>>>> .apply(Window.<ScoreEvent>into(FixedWindows.of(Duration.standardSeconds(60)))
>>>>         .triggering(AfterWatermark.pastEndOfWindow())
>>>>         .withAllowedLateness(Duration.standardSeconds(60))
>>>>         .discardingFiredPanes())
>>>>     .apply(MapElements.into(kvs(strings(), integers()))
>>>>         .via(scoreEvent -> KV.of(scoreEvent.getPlayer(), 
>>>> scoreEvent.getScore())))
>>>>     .apply(Count.perKey())
>>>>     .apply(ParDo.of(Log.of("counted per key")));
>>>>
>>>> With this triggering, using both the flink local runner the direct
>>>> runner, *no panes will ever be emitted* if the volume of messages in
>>>> pubsub is very low. eg 1 per second.
>>>>
>>>> If I change the triggering to have early firings I get exactly the
>>>> emitted panes that you would expect.
>>>>
>>>> .apply(Window.<ScoreEvent>into(FixedWindows.of(Duration.standardSeconds(60)))
>>>>     .triggering(AfterWatermark.pastEndOfWindow()
>>>>         .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
>>>>             .alignedTo(Duration.standardSeconds(60))))
>>>>     .withAllowedLateness(Duration.standardSeconds(60))
>>>>     .discardingFiredPanes())
>>>>
>>>> I can use any variation of early firing triggers and they work as
>>>> expected.
>>>>
>>>> We believe that the watermark is not advancing at all when the volume
>>>> is too low because of the sampling that PubSubIO does to determine it's
>>>> watermark. It just never has a large enough sample.
>>>> This problem occurs in the direct runner and flink runner, but not in
>>>> the dataflow runner (because dataflow uses it's own PubSubIO because
>>>> dataflow has access to internal details of pubsub and so doesn't need to do
>>>> any sampling).
>>>>
>>>>
>>>> I have also verified that for high volumes of messages, the PubSubIO
>>>> *does* successfully advance the watermark. Here's a python script I wrote
>>>> to mass produce random messages:
>>>>
>>>> import json
>>>> import random
>>>> from google.cloud import pubsub_v1
>>>>
>>>>
>>>> def publish_loop(n, project_id, topic_name):
>>>>     publisher = pubsub_v1.PublisherClient()
>>>>     topic_path = publisher.topic_path(project_id, topic_name)
>>>>     rand = random.Random()
>>>>     players = ["eufy"]
>>>>
>>>>     for i in range(n):
>>>>         score = rand.randint(1, 10)
>>>>         player = rand.choice(players)
>>>>         message = json.dumps({
>>>>           "player": player,
>>>>           "score": score,
>>>>         })
>>>>         print("'%s'" % message)
>>>>         publisher.publish(topic_path, data=message.encode("utf-8"))
>>>>
>>>> Running my code without early firings on Dataflow, I verified it does
>>>> count them as you'd expect.
>>>>
>>>> <Screen Shot 2019-05-08 at 16.41.02.png>
>>>>
>>>> Doing the same using the direct runner, it struggles to process
>>>> messages at rate they are being produced... but it does eventually close
>>>> some windows. Here are screenshots of logs with early firings turned on and
>>>> then off.
>>>>
>>>> <Screen Shot 2019-05-08 at 17.02.14.png>
>>>>
>>>> <Screen Shot 2019-05-08 at 17.21.46.png>
>>>>
>>>> The key here is that you can see that is logging the ON_TIME panes.
>>>> This *never* happened for me if the message rate was as low as 1 per 
>>>> second.
>>>>
>>>> Has anyone else seen this behaviour, where no ON_TIME panes are emitted
>>>> when there are low volumes from a PubSubIO (when not using Dataflow)?
>>>> I believe the details that cause this are within the getWatermark
>>>> function in PubsubUnboundedSource, but it looks too delicate for me to
>>>> approach.
>>>>
>>>> It's a problem because we ideally want it to behave well at low volumes
>>>> too, but also because this is often one of the first streaming examples
>>>> people try. We discovered this while trying to train people on streaming
>>>> and it was a bit awkward :)
>>>>
>>>> Tim Sell
>>>>
>>>>
>>>>
>

Reply via email to