+1 on having the behavior clearly documented, would also be great to try
and add more stat and timer patterns to the Beam docs patterns page
https://beam.apache.org/documentation/patterns/overview/.

I think it might be worth thinking about describing these kind of patterns
with an emphasis on the OnTimer being where the work happens. One thing
that would make all of this a lot easier in reducing the boiler plate code
that would need to be written is a sorted map state. ( a topic of
discussion on a few threads).

On Mon, 10 Aug 2020 at 01:16, Reuven Lax <re...@google.com> wrote:

> Timers in Beam are considered "eligible to fire" once the watermark has
> advanced. This is not the same as saying that they will fire immediately.
> You should not assume ordering between the elements and the timers.
>
> This is one reason (among many) that Beam does not provide a "read
> watermark" primitive, as it leads to confusions such as this. Since there
> is no read-watermark operator, the only way for a user's ParDo to view that
> the watermark has been set is to set a timer and wait for it to expire.
> Watermarks on their own can act in very non-intuitive ways (due to
> asynchronous advancement), so generally we encourage people to reason about
> timers and windowing in their code instead.
>
> Reuven
>
> On Sun, Aug 9, 2020 at 9:39 AM jmac...@godaddy.com <jmac...@godaddy.com>
> wrote:
>
>> I understand that watermarks are concurrently advanced, and that they are
>> estimates and not precise. but I’m not sure this is relevant in this case.
>> In this repro code we are in processElement() and the watermark HAS
>> advanced but the timer has not been called even though we asked the runtime
>> to do that. In this case we are in a per-key stateful operating mode and
>> our timer should not be shared with any other runners (is that correct?) so
>> it seems to me that we should be able to operate in a manner that is
>> locally consistent from the point of view of the DoFn we are writing. That
>> is to say, _*before*_ we enter processElement we check any local timers
>> first. I would argue that this would be far more sensible from the authors
>> perspective.
>>
>>
>>
>> *From: *Reuven Lax <re...@google.com>
>> *Reply-To: *"dev@beam.apache.org" <dev@beam.apache.org>
>> *Date: *Thursday, August 6, 2020 at 11:57 PM
>> *To: *dev <dev@beam.apache.org>
>> *Subject: *Re: Stateful Pardo Question
>>
>>
>>
>> Notice: This email is from an external sender.
>>
>>
>>
>>
>>
>>
>>
>> On Tue, Aug 4, 2020 at 1:08 PM jmac...@godaddy.com <jmac...@godaddy.com>
>> wrote:
>>
>> So, after some additional digging, it appears that Beam does not
>> consistently check for timer expiry before calling process. The result is
>> that it may be the case that the watermark has moved beyond your timer
>> expiry, and if youre counting on the timer callback happening at the time
>> you set it for, that simply may NOT have happened when you are in
>> DoFn.process(). You can “fix” the behavior by simply checking the watermark
>> manually in process() and doing what you would normally do for timestamp
>> exipry before proceeding. See my latest updated code reproducing the issue
>> and showing the fix at  https://github.com/randomsamples/pardo_repro.
>>
>>
>>
>> I would argue that users of this API will naturally expect that timer
>> callback semantics will guarantee that when they are in process(), if the
>> current watermark is past a timers expiry that the timer callback in
>> question will have been called. Is there any reason why this isn’t
>> happening? Am I misunderstanding something?
>>
>>
>>
>> Timers do not expire synchronously with the watermark advancing. So if
>> you have a timer set for 12pm and the watermark advances past 12pm, that
>> timer is now eligible to fire, but might not fire immediately. Some other
>> elements may process before that timer fires.
>>
>>
>>
>> There are multiple reasons for this, but one is that Beam does not
>> guarantee that watermark advancement is synchronous with element
>> processing. The watermark might advance suddenly while in the middle
>> processing an element, or at any other time. This makes it impossible (or
>> at least, exceedingly difficult) to really provide the guarantee you
>> expected.
>>
>>
>>
>> Reuven
>>
>>
>>
>> *From: *"jmac...@godaddy.com" <jmac...@godaddy.com>
>> *Reply-To: *"dev@beam.apache.org" <dev@beam.apache.org>
>> *Date: *Monday, August 3, 2020 at 10:51 AM
>> *To: *"dev@beam.apache.org" <dev@beam.apache.org>
>> *Subject: *Re: Stateful Pardo Question
>>
>>
>>
>> Notice: This email is from an external sender.
>>
>>
>>
>> Yeah, unless I am misunderstanding something. The output from my repro
>> code shows event timestamp and the context timestamp every time we process
>> an event.
>>
>> Receiving event at: 2000-01-01T00:00:00.000Z
>>
>> Resetting timer to : 2000-01-01T00:15:00.000Z
>>
>> Receiving event at: 2000-01-01T00:05:00.000Z
>>
>> Resetting timer to : 2000-01-01T00:20:00.000Z ß Shouldn’t the timer have
>> fired before we processed the next event?
>>
>> Receiving event at: 2000-01-01T00:40:00.000Z
>>
>> Why didnt the timer fire?
>>
>> Resetting timer to : 2000-01-01T00:55:00.000Z
>>
>> Receiving event at: 2000-01-01T00:45:00.000Z
>>
>> Resetting timer to : 2000-01-01T01:00:00.000Z
>>
>> Receiving event at: 2000-01-01T00:50:00.000Z
>>
>> Resetting timer to : 2000-01-01T01:05:00.000Z
>>
>> Timer firing at: 2000-01-01T01:05:00.000Z
>>
>>
>>
>> *From: *Reuven Lax <re...@google.com>
>> *Reply-To: *"dev@beam.apache.org" <dev@beam.apache.org>
>> *Date: *Monday, August 3, 2020 at 10:02 AM
>> *To: *dev <dev@beam.apache.org>
>> *Subject: *Re: Stateful Pardo Question
>>
>>
>>
>> Notice: This email is from an external sender.
>>
>>
>>
>> Are you sure that there is a 15 minute gap in your data?
>>
>>
>>
>> On Mon, Aug 3, 2020 at 6:20 AM jmac...@godaddy.com <jmac...@godaddy.com>
>> wrote:
>>
>> I am confused about the behavior of timers on a simple stateful pardo. I
>> have put together a little repro here:
>> https://github.com/randomsamples/pardo_repro
>>
>>
>>
>> I basically want to build something like a session window, accumulating
>> events until quiescence of the stream for a given key and gap time, then
>> output results. But it appears that the timer is not firing when the
>> watermark is passed it expiration time, so the event stream is not being
>> split as I would have expected. Would love some help getting this work, the
>> behavior is for a project I’m working on.
>>
>>

Reply via email to