Ahhh I see. Thank you very much for this additional info. Really helpful!  I 
think after considering further, its probably more appropriate and less risky 
in my current scenario to try to use the Session combiner. I did really like 
the Stateful ParDo way of doing things tho, if it were simpler to get correct 
and as performant as Windows (I understand that Flink has some special 
optimizations for windowing that go all the way down into the rocks db code) I 
might have liked to see this method work out.

Thanks again!

From: Reuven Lax <re...@google.com>
Reply-To: "dev@beam.apache.org" <dev@beam.apache.org>
Date: Sunday, August 9, 2020 at 11:25 PM
To: dev <dev@beam.apache.org>
Subject: Re: Stateful Pardo Question

Notice: This email is from an external sender.


Lookin at the code in the repo, it seems to assume that context.timestamp() is 
the "watermark" time. It is not - context.timestamp() is the time of the 
current element being processed. Generally the watermark will always be smaller 
than the timestamp of the current element, as the watermark is a lower bound on 
element timestamps (so you can't really check context.timestamp() to determine 
if a timer is eligible to fire). It's also worth mentioning that Beam provides 
no ordering guarantees on the input elements (unless you are using TestStream 
in a unit test). In theory they could arrive in reverse timestamp order. In the 
real world that degree of disorder is probably unlikely (and would be 
inefficient, as the watermark would then not advance until all elements were 
processed), however the model makes no guarantees about order.

The fact that inputs can arrive in any order means that the sessions code you 
are trying to implement would need some more complexity if you wanted it to be 
correct. The problem is that you may have buffered elements from multiple 
different sessions in your bag, and you may see those elements out of order. 
Resetting the timer to event.getTimestamp().plus(SESSION_TIMEOUT) will cause 
you to potentially create a timer that is too early. There are various ways to 
solve this (e.g. storing an interval tree in a separate state tag so you can 
keep track of which sessions are in flight). The upcoming TimestampOrderedList 
state type will also help to make this sort of use case easier and more 
effficient.

Reuven

On Sun, Aug 9, 2020 at 5:05 PM Reza Ardeshir Rokni 
<raro...@gmail.com<mailto:raro...@gmail.com>> wrote:
+1 on having the behavior clearly documented, would also be great to try and 
add more stat and timer patterns to the Beam docs patterns page 
https://beam.apache.org/documentation/patterns/overview/.

I think it might be worth thinking about describing these kind of patterns with 
an emphasis on the OnTimer being where the work happens. One thing that would 
make all of this a lot easier in reducing the boiler plate code that would need 
to be written is a sorted map state. ( a topic of discussion on a few threads).

On Mon, 10 Aug 2020 at 01:16, Reuven Lax 
<re...@google.com<mailto:re...@google.com>> wrote:
Timers in Beam are considered "eligible to fire" once the watermark has 
advanced. This is not the same as saying that they will fire immediately. You 
should not assume ordering between the elements and the timers.

This is one reason (among many) that Beam does not provide a "read watermark" 
primitive, as it leads to confusions such as this. Since there is no 
read-watermark operator, the only way for a user's ParDo to view that the 
watermark has been set is to set a timer and wait for it to expire. Watermarks 
on their own can act in very non-intuitive ways (due to asynchronous 
advancement), so generally we encourage people to reason about timers and 
windowing in their code instead.

Reuven

On Sun, Aug 9, 2020 at 9:39 AM jmac...@godaddy.com<mailto:jmac...@godaddy.com> 
<jmac...@godaddy.com<mailto:jmac...@godaddy.com>> wrote:
I understand that watermarks are concurrently advanced, and that they are 
estimates and not precise. but I’m not sure this is relevant in this case. In 
this repro code we are in processElement() and the watermark HAS advanced but 
the timer has not been called even though we asked the runtime to do that. In 
this case we are in a per-key stateful operating mode and our timer should not 
be shared with any other runners (is that correct?) so it seems to me that we 
should be able to operate in a manner that is locally consistent from the point 
of view of the DoFn we are writing. That is to say, _before_ we enter 
processElement we check any local timers first. I would argue that this would 
be far more sensible from the authors perspective.

From: Reuven Lax <re...@google.com<mailto:re...@google.com>>
Reply-To: "dev@beam.apache.org<mailto:dev@beam.apache.org>" 
<dev@beam.apache.org<mailto:dev@beam.apache.org>>
Date: Thursday, August 6, 2020 at 11:57 PM
To: dev <dev@beam.apache.org<mailto:dev@beam.apache.org>>
Subject: Re: Stateful Pardo Question

Notice: This email is from an external sender.




On Tue, Aug 4, 2020 at 1:08 PM jmac...@godaddy.com<mailto:jmac...@godaddy.com> 
<jmac...@godaddy.com<mailto:jmac...@godaddy.com>> wrote:
So, after some additional digging, it appears that Beam does not consistently 
check for timer expiry before calling process. The result is that it may be the 
case that the watermark has moved beyond your timer expiry, and if youre 
counting on the timer callback happening at the time you set it for, that 
simply may NOT have happened when you are in DoFn.process(). You can “fix” the 
behavior by simply checking the watermark manually in process() and doing what 
you would normally do for timestamp exipry before proceeding. See my latest 
updated code reproducing the issue and showing the fix at  
https://github.com/randomsamples/pardo_repro.

I would argue that users of this API will naturally expect that timer callback 
semantics will guarantee that when they are in process(), if the current 
watermark is past a timers expiry that the timer callback in question will have 
been called. Is there any reason why this isn’t happening? Am I 
misunderstanding something?

Timers do not expire synchronously with the watermark advancing. So if you have 
a timer set for 12pm and the watermark advances past 12pm, that timer is now 
eligible to fire, but might not fire immediately. Some other elements may 
process before that timer fires.

There are multiple reasons for this, but one is that Beam does not guarantee 
that watermark advancement is synchronous with element processing. The 
watermark might advance suddenly while in the middle processing an element, or 
at any other time. This makes it impossible (or at least, exceedingly 
difficult) to really provide the guarantee you expected.

Reuven

From: "jmac...@godaddy.com<mailto:jmac...@godaddy.com>" 
<jmac...@godaddy.com<mailto:jmac...@godaddy.com>>
Reply-To: "dev@beam.apache.org<mailto:dev@beam.apache.org>" 
<dev@beam.apache.org<mailto:dev@beam.apache.org>>
Date: Monday, August 3, 2020 at 10:51 AM
To: "dev@beam.apache.org<mailto:dev@beam.apache.org>" 
<dev@beam.apache.org<mailto:dev@beam.apache.org>>
Subject: Re: Stateful Pardo Question

Notice: This email is from an external sender.


Yeah, unless I am misunderstanding something. The output from my repro code 
shows event timestamp and the context timestamp every time we process an event.

Receiving event at: 2000-01-01T00:00:00.000Z
Resetting timer to : 2000-01-01T00:15:00.000Z
Receiving event at: 2000-01-01T00:05:00.000Z
Resetting timer to : 2000-01-01T00:20:00.000Z <-- Shouldn’t the timer have 
fired before we processed the next event?
Receiving event at: 2000-01-01T00:40:00.000Z
Why didnt the timer fire?
Resetting timer to : 2000-01-01T00:55:00.000Z
Receiving event at: 2000-01-01T00:45:00.000Z
Resetting timer to : 2000-01-01T01:00:00.000Z
Receiving event at: 2000-01-01T00:50:00.000Z
Resetting timer to : 2000-01-01T01:05:00.000Z
Timer firing at: 2000-01-01T01:05:00.000Z

From: Reuven Lax <re...@google.com<mailto:re...@google.com>>
Reply-To: "dev@beam.apache.org<mailto:dev@beam.apache.org>" 
<dev@beam.apache.org<mailto:dev@beam.apache.org>>
Date: Monday, August 3, 2020 at 10:02 AM
To: dev <dev@beam.apache.org<mailto:dev@beam.apache.org>>
Subject: Re: Stateful Pardo Question

Notice: This email is from an external sender.


Are you sure that there is a 15 minute gap in your data?

On Mon, Aug 3, 2020 at 6:20 AM jmac...@godaddy.com<mailto:jmac...@godaddy.com> 
<jmac...@godaddy.com<mailto:jmac...@godaddy.com>> wrote:
I am confused about the behavior of timers on a simple stateful pardo. I have 
put together a little repro here: https://github.com/randomsamples/pardo_repro

I basically want to build something like a session window, accumulating events 
until quiescence of the stream for a given key and gap time, then output 
results. But it appears that the timer is not firing when the watermark is 
passed it expiration time, so the event stream is not being split as I would 
have expected. Would love some help getting this work, the behavior is for a 
project I’m working on.

Reply via email to