I think the difference is that you could try doing the timer resets within
the OnTimer code ( after the initial start) rather than onProcess() . This
way it doesn't matter if more events arrive before the timer fires. As you
would sort them when the timer actually does go off. You would need to
store your elements as Timestamped<T> of course. Assuming I have understood
the use case correctly.

Sorry I won't have time to try it out myself this week, but it's a
worthwhile pattern to explore and publish on the patterns page.

Cheers
Rez

On Mon, 10 Aug 2020, 00:30 jmac...@godaddy.com, <jmac...@godaddy.com> wrote:

> This is pretty much what the repro code does. The problem is that it
> doesn’t work the way we would expect it should because the timer isn’t
> called before processevent.
>
>
>
> *From: *Reza Ardeshir Rokni <raro...@gmail.com>
> *Reply-To: *"dev@beam.apache.org" <dev@beam.apache.org>
> *Date: *Friday, August 7, 2020 at 5:34 AM
> *To: *dev <dev@beam.apache.org>
> *Subject: *Re: Stateful Pardo Question
>
>
>
> Notice: This email is from an external sender.
>
>
>
> Hi,
>
>
>
> One possible approach ( have not tried it out, so might be missing
> cases..) but you can reset the timer from within the OnTimer code.
>
>
>
> So maybe you start the timer on the onprocess to go off at
> current+requiredGap. Then OnTimer, you check the list of elements and
> output a session if nothing new. Then reset the timer to go off either at
> latestTimestampValue+requiredGap if there was new elements or at
> currentEventTime+requiredGap. If a timer fires and there are no elements in
> the bag then you don't rest.
>
>
>
> You will need to keep state to know you have a timer firing so as not to
> set it again in OnProcess as there is no read() for timers.
>
>
>
> Also we don't have a sorted map state, so you will take a performance hit
> as you will need to keep sorting the events every OnTimer...
>
>
>
> Cheers
>
> Reza
>
>
>
>
>
> On Fri, 7 Aug 2020 at 14:57, Reuven Lax <re...@google.com> wrote:
>
>
>
>
>
> On Tue, Aug 4, 2020 at 1:08 PM jmac...@godaddy.com <jmac...@godaddy.com>
> wrote:
>
> So, after some additional digging, it appears that Beam does not
> consistently check for timer expiry before calling process. The result is
> that it may be the case that the watermark has moved beyond your timer
> expiry, and if youre counting on the timer callback happening at the time
> you set it for, that simply may NOT have happened when you are in
> DoFn.process(). You can “fix” the behavior by simply checking the watermark
> manually in process() and doing what you would normally do for timestamp
> exipry before proceeding. See my latest updated code reproducing the issue
> and showing the fix at  https://github.com/randomsamples/pardo_repro.
>
>
>
> I would argue that users of this API will naturally expect that timer
> callback semantics will guarantee that when they are in process(), if the
> current watermark is past a timers expiry that the timer callback in
> question will have been called. Is there any reason why this isn’t
> happening? Am I misunderstanding something?
>
>
>
> Timers do not expire synchronously with the watermark advancing. So if you
> have a timer set for 12pm and the watermark advances past 12pm, that timer
> is now eligible to fire, but might not fire immediately. Some other
> elements may process before that timer fires.
>
>
>
> There are multiple reasons for this, but one is that Beam does not
> guarantee that watermark advancement is synchronous with element
> processing. The watermark might advance suddenly while in the middle
> processing an element, or at any other time. This makes it impossible (or
> at least, exceedingly difficult) to really provide the guarantee you
> expected.
>
>
>
> Reuven
>
>
>
> *From: *"jmac...@godaddy.com" <jmac...@godaddy.com>
> *Reply-To: *"dev@beam.apache.org" <dev@beam.apache.org>
> *Date: *Monday, August 3, 2020 at 10:51 AM
> *To: *"dev@beam.apache.org" <dev@beam.apache.org>
> *Subject: *Re: Stateful Pardo Question
>
>
>
> Notice: This email is from an external sender.
>
>
>
> Yeah, unless I am misunderstanding something. The output from my repro
> code shows event timestamp and the context timestamp every time we process
> an event.
>
> Receiving event at: 2000-01-01T00:00:00.000Z
>
> Resetting timer to : 2000-01-01T00:15:00.000Z
>
> Receiving event at: 2000-01-01T00:05:00.000Z
>
> Resetting timer to : 2000-01-01T00:20:00.000Z ß Shouldn’t the timer have
> fired before we processed the next event?
>
> Receiving event at: 2000-01-01T00:40:00.000Z
>
> Why didnt the timer fire?
>
> Resetting timer to : 2000-01-01T00:55:00.000Z
>
> Receiving event at: 2000-01-01T00:45:00.000Z
>
> Resetting timer to : 2000-01-01T01:00:00.000Z
>
> Receiving event at: 2000-01-01T00:50:00.000Z
>
> Resetting timer to : 2000-01-01T01:05:00.000Z
>
> Timer firing at: 2000-01-01T01:05:00.000Z
>
>
>
> *From: *Reuven Lax <re...@google.com>
> *Reply-To: *"dev@beam.apache.org" <dev@beam.apache.org>
> *Date: *Monday, August 3, 2020 at 10:02 AM
> *To: *dev <dev@beam.apache.org>
> *Subject: *Re: Stateful Pardo Question
>
>
>
> Notice: This email is from an external sender.
>
>
>
> Are you sure that there is a 15 minute gap in your data?
>
>
>
> On Mon, Aug 3, 2020 at 6:20 AM jmac...@godaddy.com <jmac...@godaddy.com>
> wrote:
>
> I am confused about the behavior of timers on a simple stateful pardo. I
> have put together a little repro here:
> https://github.com/randomsamples/pardo_repro
>
>
>
> I basically want to build something like a session window, accumulating
> events until quiescence of the stream for a given key and gap time, then
> output results. But it appears that the timer is not firing when the
> watermark is passed it expiration time, so the event stream is not being
> split as I would have expected. Would love some help getting this work, the
> behavior is for a project I’m working on.
>
>

Reply via email to