Okay, yes I think I understand what you’re saying. Use processEvent just to 
continually accumulate events and reset the timer iff is has never been set 
before, then in onTimer you will need to check each event to see if its in the 
closing session by comparing it with the last set timer time. For each event in 
the closing session, remove it from the bag and send it on its way, finally 
resetting the timer to the last elements time if needed, or setting it to an 
unset value if no additional events are left in the bag. This will work…

But isn’t that quite a bit of gymnastics and highly unintuitive? I think this 
behavior from the runtime is not what users will expect. At least it should be 
clearly documented that you cannot expect timers to be processed before 
processEvent is called _even if the watermark has been advanced_... the user 
code would be MUCH more sensible if the timer checked happened before 
processEvent().

From: Reza Ardeshir Rokni <raro...@gmail.com>
Reply-To: "dev@beam.apache.org" <dev@beam.apache.org>
Date: Sunday, August 9, 2020 at 9:45 AM
To: dev <dev@beam.apache.org>
Subject: Re: Stateful Pardo Question

Notice: This email is from an external sender.



I think the difference is that you could try doing the timer resets within the 
OnTimer code ( after the initial start) rather than onProcess() . This way it 
doesn't matter if more events arrive before the timer fires. As you would sort 
them when the timer actually does go off. You would need to store your elements 
as Timestamped<T> of course. Assuming I have understood the use case correctly.

Sorry I won't have time to try it out myself this week, but it's a worthwhile 
pattern to explore and publish on the patterns page.

Cheers
Rez

On Mon, 10 Aug 2020, 00:30 jmac...@godaddy.com<mailto:jmac...@godaddy.com>, 
<jmac...@godaddy.com<mailto:jmac...@godaddy.com>> wrote:
This is pretty much what the repro code does. The problem is that it doesn’t 
work the way we would expect it should because the timer isn’t called before 
processevent.

From: Reza Ardeshir Rokni <raro...@gmail.com<mailto:raro...@gmail.com>>
Reply-To: "dev@beam.apache.org<mailto:dev@beam.apache.org>" 
<dev@beam.apache.org<mailto:dev@beam.apache.org>>
Date: Friday, August 7, 2020 at 5:34 AM
To: dev <dev@beam.apache.org<mailto:dev@beam.apache.org>>
Subject: Re: Stateful Pardo Question

Notice: This email is from an external sender.


Hi,

One possible approach ( have not tried it out, so might be missing cases..) but 
you can reset the timer from within the OnTimer code.

So maybe you start the timer on the onprocess to go off at current+requiredGap. 
Then OnTimer, you check the list of elements and output a session if nothing 
new. Then reset the timer to go off either at latestTimestampValue+requiredGap 
if there was new elements or at currentEventTime+requiredGap. If a timer fires 
and there are no elements in the bag then you don't rest.

You will need to keep state to know you have a timer firing so as not to set it 
again in OnProcess as there is no read() for timers.

Also we don't have a sorted map state, so you will take a performance hit as 
you will need to keep sorting the events every OnTimer...

Cheers
Reza


On Fri, 7 Aug 2020 at 14:57, Reuven Lax 
<re...@google.com<mailto:re...@google.com>> wrote:


On Tue, Aug 4, 2020 at 1:08 PM jmac...@godaddy.com<mailto:jmac...@godaddy.com> 
<jmac...@godaddy.com<mailto:jmac...@godaddy.com>> wrote:
So, after some additional digging, it appears that Beam does not consistently 
check for timer expiry before calling process. The result is that it may be the 
case that the watermark has moved beyond your timer expiry, and if youre 
counting on the timer callback happening at the time you set it for, that 
simply may NOT have happened when you are in DoFn.process(). You can “fix” the 
behavior by simply checking the watermark manually in process() and doing what 
you would normally do for timestamp exipry before proceeding. See my latest 
updated code reproducing the issue and showing the fix at  
https://github.com/randomsamples/pardo_repro.

I would argue that users of this API will naturally expect that timer callback 
semantics will guarantee that when they are in process(), if the current 
watermark is past a timers expiry that the timer callback in question will have 
been called. Is there any reason why this isn’t happening? Am I 
misunderstanding something?

Timers do not expire synchronously with the watermark advancing. So if you have 
a timer set for 12pm and the watermark advances past 12pm, that timer is now 
eligible to fire, but might not fire immediately. Some other elements may 
process before that timer fires.

There are multiple reasons for this, but one is that Beam does not guarantee 
that watermark advancement is synchronous with element processing. The 
watermark might advance suddenly while in the middle processing an element, or 
at any other time. This makes it impossible (or at least, exceedingly 
difficult) to really provide the guarantee you expected.

Reuven

From: "jmac...@godaddy.com<mailto:jmac...@godaddy.com>" 
<jmac...@godaddy.com<mailto:jmac...@godaddy.com>>
Reply-To: "dev@beam.apache.org<mailto:dev@beam.apache.org>" 
<dev@beam.apache.org<mailto:dev@beam.apache.org>>
Date: Monday, August 3, 2020 at 10:51 AM
To: "dev@beam.apache.org<mailto:dev@beam.apache.org>" 
<dev@beam.apache.org<mailto:dev@beam.apache.org>>
Subject: Re: Stateful Pardo Question

Notice: This email is from an external sender.


Yeah, unless I am misunderstanding something. The output from my repro code 
shows event timestamp and the context timestamp every time we process an event.

Receiving event at: 2000-01-01T00:00:00.000Z
Resetting timer to : 2000-01-01T00:15:00.000Z
Receiving event at: 2000-01-01T00:05:00.000Z
Resetting timer to : 2000-01-01T00:20:00.000Z <-- Shouldn’t the timer have 
fired before we processed the next event?
Receiving event at: 2000-01-01T00:40:00.000Z
Why didnt the timer fire?
Resetting timer to : 2000-01-01T00:55:00.000Z
Receiving event at: 2000-01-01T00:45:00.000Z
Resetting timer to : 2000-01-01T01:00:00.000Z
Receiving event at: 2000-01-01T00:50:00.000Z
Resetting timer to : 2000-01-01T01:05:00.000Z
Timer firing at: 2000-01-01T01:05:00.000Z

From: Reuven Lax <re...@google.com<mailto:re...@google.com>>
Reply-To: "dev@beam.apache.org<mailto:dev@beam.apache.org>" 
<dev@beam.apache.org<mailto:dev@beam.apache.org>>
Date: Monday, August 3, 2020 at 10:02 AM
To: dev <dev@beam.apache.org<mailto:dev@beam.apache.org>>
Subject: Re: Stateful Pardo Question

Notice: This email is from an external sender.


Are you sure that there is a 15 minute gap in your data?

On Mon, Aug 3, 2020 at 6:20 AM jmac...@godaddy.com<mailto:jmac...@godaddy.com> 
<jmac...@godaddy.com<mailto:jmac...@godaddy.com>> wrote:
I am confused about the behavior of timers on a simple stateful pardo. I have 
put together a little repro here: https://github.com/randomsamples/pardo_repro

I basically want to build something like a session window, accumulating events 
until quiescence of the stream for a given key and gap time, then output 
results. But it appears that the timer is not firing when the watermark is 
passed it expiration time, so the event stream is not being split as I would 
have expected. Would love some help getting this work, the behavior is for a 
project I’m working on.

Reply via email to