Thanks!
I made a jira
https://issues.apache.org/jira/browse/BEAM-7322
And dumped my sample code here:
https://github.com/tims/beam/tree/master/pubsub-watermark
*From: *Alexey Romanenko
*Date: *Wed, May 15, 2019 at 12:18 AM
*To: *
Not sure that this can be very helpful but I recall a similar issue with
> KinesisIO [1] [2] and it was a bug in MovingFunction which was fixed.
>
> [1] https://issues.apache.org/jira/browse/BEAM-5063
> [2] https://github.com/apache/beam/pull/6178
>
> On 13 May 2019, at 20:52, Kenneth Knowles wrote:
>
> You should definitely not feel foolish. That was a great report. I expect
> many users face the same situation. If they are lurking on this list, then
> you will have helped them already.
>
> Reza - I expect you should weigh in on the Jira, too, since the "one
> message test" use case seems like it wouldn't work at all with those
> MovingFunction params. But I may not understand all the subtleties of the
> connector.
>
> Kenn
>
> *From: *Tim Sell
> *Date: *Mon, May 13, 2019 at 8:06 AM
> *To: *
>
> Thanks for the feedback, I did some more investigating after you said 1
>> second frequency should be enough to sample on.. And it is I feel foolish.
>> I think I just wasn't waiting long enough as it takes minutes to close
>> the windows. We waited much longer when we were just messages manually and
>> never had a window close.
>>
>> I'm generating some stats of lag times to window closing for different
>> frequencies, with code so people can reproduce it, then I'll add this to a
>> jira ticket.
>>
>> *From: *Kenneth Knowles
>> *Date: *Mon, May 13, 2019 at 10:48 AM
>> *To: * , dev
>>
>> Nice analysis & details!
>>>
>>> Thanks to your info, I think it is the configuration of MovingFunction
>>> [1] that is the likely culprit, but I don't totally understand why. It is
>>> configured like so:
>>>
>>> - store 60 seconds of data
>>> - update data every 5 seconds
>>> - require at least 10 messages to be 'significant'
>>> - require messages from at least 2 distinct 5 second update periods to
>>> 'significant'
>>>
>>> I would expect a rate of 1 message per second to satisfy this. I may
>>> have read something wrong.
>>>
>>> Have you filed an issue in Jira [2]?
>>>
>>> Kenn
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L508
>>> [2] https://issues.apache.org/jira/projects/BEAM/issues
>>>
>>> *From: *Tim Sell
>>> *Date: *Fri, May 10, 2019 at 4:09 AM
>>> *To: *
>>>
>>> Hello,
>>>>
>>>> I have identified an issue where the watermark does not advance when
>>>> using the beam PubSubIO when volumes are very low.
>>>>
>>>> The behaviour is easily replicated if you apply a fixed window
>>>> triggering after the watermark passes the end of the window.
>>>>
>>>> pipeline.apply(PubsubIO.readStrings().fromSubscription(subscription))
>>>> .apply(ParDo.of(new ParseScoreEventFn()))
>>>>
>>>> .apply(Window.into(FixedWindows.of(Duration.standardSeconds(60)))
>>>> .triggering(AfterWatermark.pastEndOfWindow())
>>>> .withAllowedLateness(Duration.standardSeconds(60))
>>>> .discardingFiredPanes())
>>>> .apply(MapElements.into(kvs(strings(), integers()))
>>>> .via(scoreEvent -> KV.of(scoreEvent.getPlayer(),
>>>> scoreEvent.getScore(
>>>> .apply(Count.perKey())
>>>> .apply(ParDo.of(Log.of("counted per key")));
>>>>
>>>> With this triggering, using both the flink local runner the direct
>>>> runner, *no panes will ever be emitted* if the volume of messages in
>>>> pubsub is very low. eg 1 per second.
>>>>
>>>> If I change the triggering to have early firings I get exactly the
>>>> emitted panes that you would expect.
>>>>
>>>> .apply(Window.into(FixedWindows.of(Duration.standardSeconds(60)))
>>>> .triggering(AfterWatermark.pastEndOfWindow()
>>>> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
>>>> .alignedTo(Duration.standardSeconds(60
>>>> .withAllowedLateness(Duration.standardSeconds(60))
>>>> .discardingFiredPanes())
>&g