
I made a jira

And dumped my sample code here:

*From: *Alexey Romanenko <>
*Date: *Wed, May 15, 2019 at 12:18 AM
*To: * <>

Not sure that this can be very helpful but I recall a similar issue with
> KinesisIO [1] [2] and it was a bug in MovingFunction which was fixed.
> [1]
> [2]
> On 13 May 2019, at 20:52, Kenneth Knowles <> wrote:
> You should definitely not feel foolish. That was a great report. I expect
> many users face the same situation. If they are lurking on this list, then
> you will have helped them already.
> Reza - I expect you should weigh in on the Jira, too, since the "one
> message test" use case seems like it wouldn't work at all with those
> MovingFunction params. But I may not understand all the subtleties of the
> connector.
> Kenn
> *From: *Tim Sell <>
> *Date: *Mon, May 13, 2019 at 8:06 AM
> *To: * <>
> Thanks for the feedback, I did some more investigating after you said 1
>> second frequency should be enough to sample on.. And it is I feel foolish.
>> I think I just wasn't waiting long enough as it takes minutes to close
>> the windows. We waited much longer when we were just messages manually and
>> never had a window close.
>> I'm generating some stats of lag times to window closing for different
>> frequencies, with code so people can reproduce it, then I'll add this to a
>> jira ticket.
>> *From: *Kenneth Knowles <>
>> *Date: *Mon, May 13, 2019 at 10:48 AM
>> *To: * <>, dev
>> Nice analysis & details!
>>> Thanks to your info, I think it is the configuration of MovingFunction
>>> [1] that is the likely culprit, but I don't totally understand why. It is
>>> configured like so:
>>>  - store 60 seconds of data
>>>  - update data every 5 seconds
>>>  - require at least 10 messages to be 'significant'
>>>  - require messages from at least 2 distinct 5 second update periods to
>>> 'significant'
>>> I would expect a rate of 1 message per second to satisfy this. I may
>>> have read something wrong.
>>> Have you filed an issue in Jira [2]?
>>> Kenn
>>> [1]
>>> [2]
>>> *From: *Tim Sell <>
>>> *Date: *Fri, May 10, 2019 at 4:09 AM
>>> *To: * <>
>>> Hello,
>>>> I have identified an issue where the watermark does not advance when
>>>> using the beam PubSubIO when volumes are very low.
>>>> The behaviour is easily replicated if you apply a fixed window
>>>> triggering after the watermark passes the end of the window.
>>>> pipeline.apply(PubsubIO.readStrings().fromSubscription(subscription))
>>>>     .apply(ParDo.of(new ParseScoreEventFn()))
>>>> .apply(Window.<ScoreEvent>into(FixedWindows.of(Duration.standardSeconds(60)))
>>>>         .triggering(AfterWatermark.pastEndOfWindow())
>>>>         .withAllowedLateness(Duration.standardSeconds(60))
>>>>         .discardingFiredPanes())
>>>>     .apply(MapElements.into(kvs(strings(), integers()))
>>>>         .via(scoreEvent -> KV.of(scoreEvent.getPlayer(), 
>>>> scoreEvent.getScore())))
>>>>     .apply(Count.perKey())
>>>>     .apply(ParDo.of(Log.of("counted per key")));
>>>> With this triggering, using both the flink local runner the direct
>>>> runner, *no panes will ever be emitted* if the volume of messages in
>>>> pubsub is very low. eg 1 per second.
>>>> If I change the triggering to have early firings I get exactly the
>>>> emitted panes that you would expect.
>>>> .apply(Window.<ScoreEvent>into(FixedWindows.of(Duration.standardSeconds(60)))
>>>>     .triggering(AfterWatermark.pastEndOfWindow()
>>>>         .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
>>>>             .alignedTo(Duration.standardSeconds(60))))
>>>>     .withAllowedLateness(Duration.standardSeconds(60))
>>>>     .discardingFiredPanes())
>>>> I can use any variation of early firing triggers and they work as
>>>> expected.
>>>> We believe that the watermark is not advancing at all when the volume
>>>> is too low because of the sampling that PubSubIO does to determine it's
>>>> watermark. It just never has a large enough sample.
>>>> This problem occurs in the direct runner and flink runner, but not in
>>>> the dataflow runner (because dataflow uses it's own PubSubIO because
>>>> dataflow has access to internal details of pubsub and so doesn't need to do
>>>> any sampling).
>>>> I have also verified that for high volumes of messages, the PubSubIO
>>>> *does* successfully advance the watermark. Here's a python script I wrote
>>>> to mass produce random messages:
>>>> import json
>>>> import random
>>>> from import pubsub_v1
>>>> def publish_loop(n, project_id, topic_name):
>>>>     publisher = pubsub_v1.PublisherClient()
>>>>     topic_path = publisher.topic_path(project_id, topic_name)
>>>>     rand = random.Random()
>>>>     players = ["eufy"]
>>>>     for i in range(n):
>>>>         score = rand.randint(1, 10)
>>>>         player = rand.choice(players)
>>>>         message = json.dumps({
>>>>           "player": player,
>>>>           "score": score,
>>>>         })
>>>>         print("'%s'" % message)
>>>>         publisher.publish(topic_path, data=message.encode("utf-8"))
>>>> Running my code without early firings on Dataflow, I verified it does
>>>> count them as you'd expect.
>>>> <Screen Shot 2019-05-08 at 16.41.02.png>
>>>> Doing the same using the direct runner, it struggles to process
>>>> messages at rate they are being produced... but it does eventually close
>>>> some windows. Here are screenshots of logs with early firings turned on and
>>>> then off.
>>>> <Screen Shot 2019-05-08 at 17.02.14.png>
>>>> <Screen Shot 2019-05-08 at 17.21.46.png>
>>>> The key here is that you can see that is logging the ON_TIME panes.
>>>> This *never* happened for me if the message rate was as low as 1 per 
>>>> second.
>>>> Has anyone else seen this behaviour, where no ON_TIME panes are emitted
>>>> when there are low volumes from a PubSubIO (when not using Dataflow)?
>>>> I believe the details that cause this are within the getWatermark
>>>> function in PubsubUnboundedSource, but it looks too delicate for me to
>>>> approach.
>>>> It's a problem because we ideally want it to behave well at low volumes
>>>> too, but also because this is often one of the first streaming examples
>>>> people try. We discovered this while trying to train people on streaming
>>>> and it was a bit awkward :)
>>>> Tim Sell

Reply via email to