Thanks Reuven and Jan. Since timers are per key, wouldn't it be that the timer watermark should also be per key for a StatefulDoFn and hence we would still be able to fire multiple timers (at most one per key) and still have good performance even when the input watermark makes a "hop"?
On Thu, Jun 27, 2019 at 3:43 PM Jan Lukavský <[email protected]> wrote: > It would be possible to have "timer watermark", between input and output > watermark, so that input watermark >= timer watermark >= output watermark, > but it turns out, that doing so implies that we fire timers only for single > instant (because until the timer is fired and processed, the "timer > watermark" is on hold). > On 6/28/19 12:40 AM, Jan Lukavský wrote: > > At least the implementation in DirectRunner fires timers according to > input watemark. Holding the timer up to output watermark causes deadlocks, > because timers fired at time T might clear watermark hold for the same time. > On 6/27/19 11:55 PM, Reuven Lax wrote: > > I believe that timers correspond to watermark holds, which hold up the > output watermark, not the input watermark. > > On Thu, Jun 27, 2019 at 11:21 PM Lukasz Cwik <[email protected]> wrote: > >> I'm confused as to why it is valid to advance the watermark to T3 in the >> original scenario. >> >> T1 and T2 should be treated as inputs to the function and hold the input >> watermark hence T1 should fire and if it doesn't produce any new timers >> before T2, then T2 should fire since the watermark will now advance to T2. >> The only time you would have multiple watermark timers fire as part of the >> same bundle is if they were distinct timers both set to the same time. >> >> I have some examples[1] documented in the modelling, scheduling, and >> executing timers doc. >> >> 1: >> https://docs.google.com/document/d/1GRL88rKLHbMR0zJnBHYwM4xtj66VYlB112EWVUFcGB0/edit#heading=h.fzptl5h0vi9k >> >> >> On Wed, Jun 26, 2019 at 6:40 AM Reuven Lax <[email protected]> wrote: >> >>> Earlier than the input watermark only applies to event time timers, but >>> the above problem holds for processing time timers as well. >>> >>> On Wed, Jun 26, 2019, 1:50 PM Robert Bradshaw <[email protected]> >>> wrote: >>> >>>> Yeah, it wouldn't be optimal performance-wise, but I think it's good >>>> to keep the bar for a correct SDK low. Might still be better than >>>> sending one timer per bundle, and you only pay the performance if >>>> timers are set earlier than the input watermark (and there was a timer >>>> firing in this range). (How often this happens probably varies a lot >>>> in practice.) >>>> >>>> On Wed, Jun 26, 2019 at 2:33 PM Reuven Lax <[email protected]> wrote: >>>> > >>>> > This would have a lot of performance problems (especially since there >>>> is user code that caches within a bundle, and invalidates the cache at the >>>> end of every bundle). However this would be a valid "lazy" implementation. >>>> > >>>> > On Wed, Jun 26, 2019 at 2:29 PM Robert Bradshaw <[email protected]> >>>> wrote: >>>> >> >>>> >> Note also that a "lazy" SDK implementation would be to simply return >>>> >> all the timers (as if they were new timers) to runner once a timer >>>> set >>>> >> (before or at the last requested timer in the bundle) is encountered. >>>> >> E.g. Suppose we had timers T1, T3, T5 in the bundle. On firing T1, we >>>> >> set T2 and delete T3. The SDK could then claim that a timers were >>>> >> (again) set at T3, T5, then set one at at T2 and deleted at T3 and >>>> >> then be done with the bundle (not actually process T3 and T5). (One >>>> >> way to think about this is that timers are actually bundle splits >>>> into >>>> >> a bundle of "done" and "future" work.) A more intelligent SDK could, >>>> >> of course, process the whole bundle by tracking modifications to the >>>> >> to-be-fired timers itself rather than requiring a trip through the >>>> >> runner. >>>> >> >>>> >> On Wed, Jun 26, 2019 at 1:51 PM Reuven Lax <[email protected]> wrote: >>>> >> > >>>> >> > I like this option the best. It might be trickier to implement, >>>> but seems like it would be the most consistent solution. >>>> >> > >>>> >> > Another problem it would solve is the following: let's say a >>>> bundle arrives containing timers T1 and T2, and while processing T1 the >>>> user code deletes T2 (or resets it to a time in the far future). I'm >>>> actually not sure what we do today, but I'm a bit afraid that we will go >>>> ahead and fire T2 since it's already in the bundle, which is clearly >>>> incorrect. The SDK needs to keep track of this and skip T2 in order to >>>> solve this, which is the same sort of work needed to implement Robert's >>>> suggestion. >>>> >> > >>>> >> > Reuven >>>> >> > >>>> >> > On Wed, Jun 26, 2019 at 12:28 PM Robert Bradshaw < >>>> [email protected]> wrote: >>>> >> >> >>>> >> >> Another option, that is nice from an API perspective but places a >>>> >> >> burden on SDK implementers (and possibly runners), is to maintain >>>> the >>>> >> >> ordering of timers by requiring timers to be fired in order, and >>>> if >>>> >> >> any timers are set to fire them immediately before processing >>>> later >>>> >> >> timers. In other words, if T1 sets T2 and modifies T3, these would >>>> >> >> take effect (locally, the runner may not even know about T2) >>>> before T3 >>>> >> >> was processed. >>>> >> >> >>>> >> >> On Wed, Jun 26, 2019 at 11:13 AM Jan Lukavský <[email protected]> >>>> wrote: >>>> >> >> > >>>> >> >> > Hi, >>>> >> >> > >>>> >> >> > I have mentioned an issue I have come across [1] on several >>>> other >>>> >> >> > threads, but it probably didn't attract the attention that it >>>> would desire. >>>> >> >> > >>>> >> >> > I will try to restate the problem here for clarity: >>>> >> >> > >>>> >> >> > - on runners that use concept of bundles (the original issue >>>> mentions >>>> >> >> > DirectRunner, but it will probably apply for other runners, >>>> which use >>>> >> >> > bundles, as well), the workflow is as follows: >>>> >> >> > >>>> >> >> > a) process elements in bundle >>>> >> >> > >>>> >> >> > b) advance watermark >>>> >> >> > >>>> >> >> > c) process timers >>>> >> >> > >>>> >> >> > d) continue to next bundle >>>> >> >> > >>>> >> >> > - the issue with this is that when we are initially at time >>>> T0, set >>>> >> >> > two timers for T1 and T3, then advance watermark to T3 (or >>>> beyond), the >>>> >> >> > timers will fire (correctly) in order T1, T3, but if timer at >>>> T1 sets >>>> >> >> > another timer for T2, then this timer will be fired in next >>>> bundle (and >>>> >> >> > therefore after T3) >>>> >> >> > >>>> >> >> > - this causes issues mostly with race conditions in window GC >>>> timers >>>> >> >> > and user timers (and users don't have any way to solve that!) >>>> >> >> > >>>> >> >> > - note that the same applies when one timer tries to reset >>>> timer that >>>> >> >> > is already in the current bundle >>>> >> >> > >>>> >> >> > I have investigated a way of solving this by running timers >>>> only for >>>> >> >> > single timestamp (instant) at each bundle, but as Reuven >>>> pointed out, >>>> >> >> > that could regress performance (mostly by delaying firing of >>>> timers, >>>> >> >> > that could have fired). Options I see: >>>> >> >> > >>>> >> >> > 1) either set the OnTimerContext#timestamp() to current input >>>> >> >> > watermark (not the time that user actually set the timer), or >>>> >> >> > >>>> >> >> > 2) add OnTimerContext#getCurrentInputWatermark() and disallow >>>> setting >>>> >> >> > (or resetting) timers for time between >>>> OnProcessContext#timestamp and >>>> >> >> > OnProcessContext#getCurrentInputWatermark(), by throwing an >>>> exception >>>> >> >> > >>>> >> >> > 3) any other option? >>>> >> >> > >>>> >> >> > Option 1) seems to be broken by design, as it can result in >>>> corrupt data >>>> >> >> > (emitted with wrong timestamp, which is even somewhat >>>> arbitrary), I'm >>>> >> >> > including it just for completeness. Option 2) is breaking >>>> change, that >>>> >> >> > can result in PIpeline failures (although the failures will >>>> happen on >>>> >> >> > Pipelines, that are probably already broken). >>>> >> >> > >>>> >> >> > Although I have come with a workaround in the work where I >>>> originally >>>> >> >> > come across this issue, I think that this is generally serious >>>> and >>>> >> >> > should be dealt with. Mostly because when using user-facing >>>> APIs, there >>>> >> >> > are no workarounds possible, today. >>>> >> >> > >>>> >> >> > Thanks for discussion! >>>> >> >> > >>>> >> >> > Jan >>>> >> >> > >>>> >> >> > [1] https://issues.apache.org/jira/browse/BEAM-7520 >>>> >> >> > >>>> >>>
