Earlier it was said that performance was poor if we moved to a model where we prevented multiple timer firings. Since timer firings are per key, can you provide details of what use case has multiple user timer firings per key?
On Thu, Jun 27, 2019 at 4:34 PM Reuven Lax <[email protected]> wrote: > The watermark holds (which is how the timer holds up the watermark today, > as there is no timer watermark) is per key. Usually the input watermark > making a "hop" is not a problem, in fact it's the normal state of affairs. > > On Fri, Jun 28, 2019 at 1:08 AM Lukasz Cwik <[email protected]> wrote: > >> Thanks Reuven and Jan. >> >> Since timers are per key, wouldn't it be that the timer watermark should >> also be per key for a StatefulDoFn and hence we would still be able to fire >> multiple timers (at most one per key) and still have good performance even >> when the input watermark makes a "hop"? >> >> >> On Thu, Jun 27, 2019 at 3:43 PM Jan Lukavský <[email protected]> wrote: >> >>> It would be possible to have "timer watermark", between input and output >>> watermark, so that input watermark >= timer watermark >= output watermark, >>> but it turns out, that doing so implies that we fire timers only for single >>> instant (because until the timer is fired and processed, the "timer >>> watermark" is on hold). >>> On 6/28/19 12:40 AM, Jan Lukavský wrote: >>> >>> At least the implementation in DirectRunner fires timers according to >>> input watemark. Holding the timer up to output watermark causes deadlocks, >>> because timers fired at time T might clear watermark hold for the same time. >>> On 6/27/19 11:55 PM, Reuven Lax wrote: >>> >>> I believe that timers correspond to watermark holds, which hold up the >>> output watermark, not the input watermark. >>> >>> On Thu, Jun 27, 2019 at 11:21 PM Lukasz Cwik <[email protected]> wrote: >>> >>>> I'm confused as to why it is valid to advance the watermark to T3 in >>>> the original scenario. >>>> >>>> T1 and T2 should be treated as inputs to the function and hold the >>>> input watermark hence T1 should fire and if it doesn't produce any new >>>> timers before T2, then T2 should fire since the watermark will now advance >>>> to T2. The only time you would have multiple watermark timers fire as part >>>> of the same bundle is if they were distinct timers both set to the same >>>> time. >>>> >>>> I have some examples[1] documented in the modelling, scheduling, and >>>> executing timers doc. >>>> >>>> 1: >>>> https://docs.google.com/document/d/1GRL88rKLHbMR0zJnBHYwM4xtj66VYlB112EWVUFcGB0/edit#heading=h.fzptl5h0vi9k >>>> >>>> >>>> On Wed, Jun 26, 2019 at 6:40 AM Reuven Lax <[email protected]> wrote: >>>> >>>>> Earlier than the input watermark only applies to event time timers, >>>>> but the above problem holds for processing time timers as well. >>>>> >>>>> On Wed, Jun 26, 2019, 1:50 PM Robert Bradshaw <[email protected]> >>>>> wrote: >>>>> >>>>>> Yeah, it wouldn't be optimal performance-wise, but I think it's good >>>>>> to keep the bar for a correct SDK low. Might still be better than >>>>>> sending one timer per bundle, and you only pay the performance if >>>>>> timers are set earlier than the input watermark (and there was a timer >>>>>> firing in this range). (How often this happens probably varies a lot >>>>>> in practice.) >>>>>> >>>>>> On Wed, Jun 26, 2019 at 2:33 PM Reuven Lax <[email protected]> wrote: >>>>>> > >>>>>> > This would have a lot of performance problems (especially since >>>>>> there is user code that caches within a bundle, and invalidates the cache >>>>>> at the end of every bundle). However this would be a valid "lazy" >>>>>> implementation. >>>>>> > >>>>>> > On Wed, Jun 26, 2019 at 2:29 PM Robert Bradshaw < >>>>>> [email protected]> wrote: >>>>>> >> >>>>>> >> Note also that a "lazy" SDK implementation would be to simply >>>>>> return >>>>>> >> all the timers (as if they were new timers) to runner once a timer >>>>>> set >>>>>> >> (before or at the last requested timer in the bundle) is >>>>>> encountered. >>>>>> >> E.g. Suppose we had timers T1, T3, T5 in the bundle. On firing T1, >>>>>> we >>>>>> >> set T2 and delete T3. The SDK could then claim that a timers were >>>>>> >> (again) set at T3, T5, then set one at at T2 and deleted at T3 and >>>>>> >> then be done with the bundle (not actually process T3 and T5). (One >>>>>> >> way to think about this is that timers are actually bundle splits >>>>>> into >>>>>> >> a bundle of "done" and "future" work.) A more intelligent SDK >>>>>> could, >>>>>> >> of course, process the whole bundle by tracking modifications to >>>>>> the >>>>>> >> to-be-fired timers itself rather than requiring a trip through the >>>>>> >> runner. >>>>>> >> >>>>>> >> On Wed, Jun 26, 2019 at 1:51 PM Reuven Lax <[email protected]> >>>>>> wrote: >>>>>> >> > >>>>>> >> > I like this option the best. It might be trickier to implement, >>>>>> but seems like it would be the most consistent solution. >>>>>> >> > >>>>>> >> > Another problem it would solve is the following: let's say a >>>>>> bundle arrives containing timers T1 and T2, and while processing T1 the >>>>>> user code deletes T2 (or resets it to a time in the far future). I'm >>>>>> actually not sure what we do today, but I'm a bit afraid that we will go >>>>>> ahead and fire T2 since it's already in the bundle, which is clearly >>>>>> incorrect. The SDK needs to keep track of this and skip T2 in order to >>>>>> solve this, which is the same sort of work needed to implement Robert's >>>>>> suggestion. >>>>>> >> > >>>>>> >> > Reuven >>>>>> >> > >>>>>> >> > On Wed, Jun 26, 2019 at 12:28 PM Robert Bradshaw < >>>>>> [email protected]> wrote: >>>>>> >> >> >>>>>> >> >> Another option, that is nice from an API perspective but places >>>>>> a >>>>>> >> >> burden on SDK implementers (and possibly runners), is to >>>>>> maintain the >>>>>> >> >> ordering of timers by requiring timers to be fired in order, >>>>>> and if >>>>>> >> >> any timers are set to fire them immediately before processing >>>>>> later >>>>>> >> >> timers. In other words, if T1 sets T2 and modifies T3, these >>>>>> would >>>>>> >> >> take effect (locally, the runner may not even know about T2) >>>>>> before T3 >>>>>> >> >> was processed. >>>>>> >> >> >>>>>> >> >> On Wed, Jun 26, 2019 at 11:13 AM Jan Lukavský <[email protected]> >>>>>> wrote: >>>>>> >> >> > >>>>>> >> >> > Hi, >>>>>> >> >> > >>>>>> >> >> > I have mentioned an issue I have come across [1] on several >>>>>> other >>>>>> >> >> > threads, but it probably didn't attract the attention that it >>>>>> would desire. >>>>>> >> >> > >>>>>> >> >> > I will try to restate the problem here for clarity: >>>>>> >> >> > >>>>>> >> >> > - on runners that use concept of bundles (the original >>>>>> issue mentions >>>>>> >> >> > DirectRunner, but it will probably apply for other runners, >>>>>> which use >>>>>> >> >> > bundles, as well), the workflow is as follows: >>>>>> >> >> > >>>>>> >> >> > a) process elements in bundle >>>>>> >> >> > >>>>>> >> >> > b) advance watermark >>>>>> >> >> > >>>>>> >> >> > c) process timers >>>>>> >> >> > >>>>>> >> >> > d) continue to next bundle >>>>>> >> >> > >>>>>> >> >> > - the issue with this is that when we are initially at time >>>>>> T0, set >>>>>> >> >> > two timers for T1 and T3, then advance watermark to T3 (or >>>>>> beyond), the >>>>>> >> >> > timers will fire (correctly) in order T1, T3, but if timer at >>>>>> T1 sets >>>>>> >> >> > another timer for T2, then this timer will be fired in next >>>>>> bundle (and >>>>>> >> >> > therefore after T3) >>>>>> >> >> > >>>>>> >> >> > - this causes issues mostly with race conditions in window >>>>>> GC timers >>>>>> >> >> > and user timers (and users don't have any way to solve that!) >>>>>> >> >> > >>>>>> >> >> > - note that the same applies when one timer tries to reset >>>>>> timer that >>>>>> >> >> > is already in the current bundle >>>>>> >> >> > >>>>>> >> >> > I have investigated a way of solving this by running timers >>>>>> only for >>>>>> >> >> > single timestamp (instant) at each bundle, but as Reuven >>>>>> pointed out, >>>>>> >> >> > that could regress performance (mostly by delaying firing of >>>>>> timers, >>>>>> >> >> > that could have fired). Options I see: >>>>>> >> >> > >>>>>> >> >> > 1) either set the OnTimerContext#timestamp() to current >>>>>> input >>>>>> >> >> > watermark (not the time that user actually set the timer), or >>>>>> >> >> > >>>>>> >> >> > 2) add OnTimerContext#getCurrentInputWatermark() and >>>>>> disallow setting >>>>>> >> >> > (or resetting) timers for time between >>>>>> OnProcessContext#timestamp and >>>>>> >> >> > OnProcessContext#getCurrentInputWatermark(), by throwing an >>>>>> exception >>>>>> >> >> > >>>>>> >> >> > 3) any other option? >>>>>> >> >> > >>>>>> >> >> > Option 1) seems to be broken by design, as it can result in >>>>>> corrupt data >>>>>> >> >> > (emitted with wrong timestamp, which is even somewhat >>>>>> arbitrary), I'm >>>>>> >> >> > including it just for completeness. Option 2) is breaking >>>>>> change, that >>>>>> >> >> > can result in PIpeline failures (although the failures will >>>>>> happen on >>>>>> >> >> > Pipelines, that are probably already broken). >>>>>> >> >> > >>>>>> >> >> > Although I have come with a workaround in the work where I >>>>>> originally >>>>>> >> >> > come across this issue, I think that this is generally >>>>>> serious and >>>>>> >> >> > should be dealt with. Mostly because when using user-facing >>>>>> APIs, there >>>>>> >> >> > are no workarounds possible, today. >>>>>> >> >> > >>>>>> >> >> > Thanks for discussion! >>>>>> >> >> > >>>>>> >> >> > Jan >>>>>> >> >> > >>>>>> >> >> > [1] https://issues.apache.org/jira/browse/BEAM-7520 >>>>>> >> >> > >>>>>> >>>>>
