Thanks for the explanation. On Fri, Jun 28, 2019 at 6:49 AM Reuven Lax <[email protected]> wrote:
> This happens when the watermark hops forward. In practice whenever there > is any backlog, this is the normal mode of operation. > > On Fri, Jun 28, 2019, 12:42 AM Lukasz Cwik <[email protected]> wrote: > >> Earlier it was said that performance was poor if we moved to a model >> where we prevented multiple timer firings. Since timer firings are per key, >> can you provide details of what use case has multiple user timer firings >> per key? >> >> On Thu, Jun 27, 2019 at 4:34 PM Reuven Lax <[email protected]> wrote: >> >>> The watermark holds (which is how the timer holds up the watermark >>> today, as there is no timer watermark) is per key. Usually the input >>> watermark making a "hop" is not a problem, in fact it's the normal state of >>> affairs. >>> >>> On Fri, Jun 28, 2019 at 1:08 AM Lukasz Cwik <[email protected]> wrote: >>> >>>> Thanks Reuven and Jan. >>>> >>>> Since timers are per key, wouldn't it be that the timer watermark >>>> should also be per key for a StatefulDoFn and hence we would still be able >>>> to fire multiple timers (at most one per key) and still have good >>>> performance even when the input watermark makes a "hop"? >>>> >>>> >>>> On Thu, Jun 27, 2019 at 3:43 PM Jan Lukavský <[email protected]> wrote: >>>> >>>>> It would be possible to have "timer watermark", between input and >>>>> output watermark, so that input watermark >= timer watermark >= output >>>>> watermark, but it turns out, that doing so implies that we fire timers >>>>> only >>>>> for single instant (because until the timer is fired and processed, the >>>>> "timer watermark" is on hold). >>>>> On 6/28/19 12:40 AM, Jan Lukavský wrote: >>>>> >>>>> At least the implementation in DirectRunner fires timers according to >>>>> input watemark. Holding the timer up to output watermark causes deadlocks, >>>>> because timers fired at time T might clear watermark hold for the same >>>>> time. >>>>> On 6/27/19 11:55 PM, Reuven Lax wrote: >>>>> >>>>> I believe that timers correspond to watermark holds, which hold up the >>>>> output watermark, not the input watermark. >>>>> >>>>> On Thu, Jun 27, 2019 at 11:21 PM Lukasz Cwik <[email protected]> wrote: >>>>> >>>>>> I'm confused as to why it is valid to advance the watermark to T3 in >>>>>> the original scenario. >>>>>> >>>>>> T1 and T2 should be treated as inputs to the function and hold the >>>>>> input watermark hence T1 should fire and if it doesn't produce any new >>>>>> timers before T2, then T2 should fire since the watermark will now >>>>>> advance >>>>>> to T2. The only time you would have multiple watermark timers fire as >>>>>> part >>>>>> of the same bundle is if they were distinct timers both set to the same >>>>>> time. >>>>>> >>>>>> I have some examples[1] documented in the modelling, scheduling, and >>>>>> executing timers doc. >>>>>> >>>>>> 1: >>>>>> https://docs.google.com/document/d/1GRL88rKLHbMR0zJnBHYwM4xtj66VYlB112EWVUFcGB0/edit#heading=h.fzptl5h0vi9k >>>>>> >>>>>> >>>>>> On Wed, Jun 26, 2019 at 6:40 AM Reuven Lax <[email protected]> wrote: >>>>>> >>>>>>> Earlier than the input watermark only applies to event time timers, >>>>>>> but the above problem holds for processing time timers as well. >>>>>>> >>>>>>> On Wed, Jun 26, 2019, 1:50 PM Robert Bradshaw <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Yeah, it wouldn't be optimal performance-wise, but I think it's good >>>>>>>> to keep the bar for a correct SDK low. Might still be better than >>>>>>>> sending one timer per bundle, and you only pay the performance if >>>>>>>> timers are set earlier than the input watermark (and there was a >>>>>>>> timer >>>>>>>> firing in this range). (How often this happens probably varies a lot >>>>>>>> in practice.) >>>>>>>> >>>>>>>> On Wed, Jun 26, 2019 at 2:33 PM Reuven Lax <[email protected]> >>>>>>>> wrote: >>>>>>>> > >>>>>>>> > This would have a lot of performance problems (especially since >>>>>>>> there is user code that caches within a bundle, and invalidates the >>>>>>>> cache >>>>>>>> at the end of every bundle). However this would be a valid "lazy" >>>>>>>> implementation. >>>>>>>> > >>>>>>>> > On Wed, Jun 26, 2019 at 2:29 PM Robert Bradshaw < >>>>>>>> [email protected]> wrote: >>>>>>>> >> >>>>>>>> >> Note also that a "lazy" SDK implementation would be to simply >>>>>>>> return >>>>>>>> >> all the timers (as if they were new timers) to runner once a >>>>>>>> timer set >>>>>>>> >> (before or at the last requested timer in the bundle) is >>>>>>>> encountered. >>>>>>>> >> E.g. Suppose we had timers T1, T3, T5 in the bundle. On firing >>>>>>>> T1, we >>>>>>>> >> set T2 and delete T3. The SDK could then claim that a timers were >>>>>>>> >> (again) set at T3, T5, then set one at at T2 and deleted at T3 >>>>>>>> and >>>>>>>> >> then be done with the bundle (not actually process T3 and T5). >>>>>>>> (One >>>>>>>> >> way to think about this is that timers are actually bundle >>>>>>>> splits into >>>>>>>> >> a bundle of "done" and "future" work.) A more intelligent SDK >>>>>>>> could, >>>>>>>> >> of course, process the whole bundle by tracking modifications to >>>>>>>> the >>>>>>>> >> to-be-fired timers itself rather than requiring a trip through >>>>>>>> the >>>>>>>> >> runner. >>>>>>>> >> >>>>>>>> >> On Wed, Jun 26, 2019 at 1:51 PM Reuven Lax <[email protected]> >>>>>>>> wrote: >>>>>>>> >> > >>>>>>>> >> > I like this option the best. It might be trickier to >>>>>>>> implement, but seems like it would be the most consistent solution. >>>>>>>> >> > >>>>>>>> >> > Another problem it would solve is the following: let's say a >>>>>>>> bundle arrives containing timers T1 and T2, and while processing T1 the >>>>>>>> user code deletes T2 (or resets it to a time in the far future). I'm >>>>>>>> actually not sure what we do today, but I'm a bit afraid that we will >>>>>>>> go >>>>>>>> ahead and fire T2 since it's already in the bundle, which is clearly >>>>>>>> incorrect. The SDK needs to keep track of this and skip T2 in order to >>>>>>>> solve this, which is the same sort of work needed to implement Robert's >>>>>>>> suggestion. >>>>>>>> >> > >>>>>>>> >> > Reuven >>>>>>>> >> > >>>>>>>> >> > On Wed, Jun 26, 2019 at 12:28 PM Robert Bradshaw < >>>>>>>> [email protected]> wrote: >>>>>>>> >> >> >>>>>>>> >> >> Another option, that is nice from an API perspective but >>>>>>>> places a >>>>>>>> >> >> burden on SDK implementers (and possibly runners), is to >>>>>>>> maintain the >>>>>>>> >> >> ordering of timers by requiring timers to be fired in order, >>>>>>>> and if >>>>>>>> >> >> any timers are set to fire them immediately before processing >>>>>>>> later >>>>>>>> >> >> timers. In other words, if T1 sets T2 and modifies T3, these >>>>>>>> would >>>>>>>> >> >> take effect (locally, the runner may not even know about T2) >>>>>>>> before T3 >>>>>>>> >> >> was processed. >>>>>>>> >> >> >>>>>>>> >> >> On Wed, Jun 26, 2019 at 11:13 AM Jan Lukavský < >>>>>>>> [email protected]> wrote: >>>>>>>> >> >> > >>>>>>>> >> >> > Hi, >>>>>>>> >> >> > >>>>>>>> >> >> > I have mentioned an issue I have come across [1] on several >>>>>>>> other >>>>>>>> >> >> > threads, but it probably didn't attract the attention that >>>>>>>> it would desire. >>>>>>>> >> >> > >>>>>>>> >> >> > I will try to restate the problem here for clarity: >>>>>>>> >> >> > >>>>>>>> >> >> > - on runners that use concept of bundles (the original >>>>>>>> issue mentions >>>>>>>> >> >> > DirectRunner, but it will probably apply for other runners, >>>>>>>> which use >>>>>>>> >> >> > bundles, as well), the workflow is as follows: >>>>>>>> >> >> > >>>>>>>> >> >> > a) process elements in bundle >>>>>>>> >> >> > >>>>>>>> >> >> > b) advance watermark >>>>>>>> >> >> > >>>>>>>> >> >> > c) process timers >>>>>>>> >> >> > >>>>>>>> >> >> > d) continue to next bundle >>>>>>>> >> >> > >>>>>>>> >> >> > - the issue with this is that when we are initially at >>>>>>>> time T0, set >>>>>>>> >> >> > two timers for T1 and T3, then advance watermark to T3 (or >>>>>>>> beyond), the >>>>>>>> >> >> > timers will fire (correctly) in order T1, T3, but if timer >>>>>>>> at T1 sets >>>>>>>> >> >> > another timer for T2, then this timer will be fired in next >>>>>>>> bundle (and >>>>>>>> >> >> > therefore after T3) >>>>>>>> >> >> > >>>>>>>> >> >> > - this causes issues mostly with race conditions in >>>>>>>> window GC timers >>>>>>>> >> >> > and user timers (and users don't have any way to solve >>>>>>>> that!) >>>>>>>> >> >> > >>>>>>>> >> >> > - note that the same applies when one timer tries to >>>>>>>> reset timer that >>>>>>>> >> >> > is already in the current bundle >>>>>>>> >> >> > >>>>>>>> >> >> > I have investigated a way of solving this by running timers >>>>>>>> only for >>>>>>>> >> >> > single timestamp (instant) at each bundle, but as Reuven >>>>>>>> pointed out, >>>>>>>> >> >> > that could regress performance (mostly by delaying firing >>>>>>>> of timers, >>>>>>>> >> >> > that could have fired). Options I see: >>>>>>>> >> >> > >>>>>>>> >> >> > 1) either set the OnTimerContext#timestamp() to current >>>>>>>> input >>>>>>>> >> >> > watermark (not the time that user actually set the timer), >>>>>>>> or >>>>>>>> >> >> > >>>>>>>> >> >> > 2) add OnTimerContext#getCurrentInputWatermark() and >>>>>>>> disallow setting >>>>>>>> >> >> > (or resetting) timers for time between >>>>>>>> OnProcessContext#timestamp and >>>>>>>> >> >> > OnProcessContext#getCurrentInputWatermark(), by throwing an >>>>>>>> exception >>>>>>>> >> >> > >>>>>>>> >> >> > 3) any other option? >>>>>>>> >> >> > >>>>>>>> >> >> > Option 1) seems to be broken by design, as it can result in >>>>>>>> corrupt data >>>>>>>> >> >> > (emitted with wrong timestamp, which is even somewhat >>>>>>>> arbitrary), I'm >>>>>>>> >> >> > including it just for completeness. Option 2) is breaking >>>>>>>> change, that >>>>>>>> >> >> > can result in PIpeline failures (although the failures will >>>>>>>> happen on >>>>>>>> >> >> > Pipelines, that are probably already broken). >>>>>>>> >> >> > >>>>>>>> >> >> > Although I have come with a workaround in the work where I >>>>>>>> originally >>>>>>>> >> >> > come across this issue, I think that this is generally >>>>>>>> serious and >>>>>>>> >> >> > should be dealt with. Mostly because when using user-facing >>>>>>>> APIs, there >>>>>>>> >> >> > are no workarounds possible, today. >>>>>>>> >> >> > >>>>>>>> >> >> > Thanks for discussion! >>>>>>>> >> >> > >>>>>>>> >> >> > Jan >>>>>>>> >> >> > >>>>>>>> >> >> > [1] https://issues.apache.org/jira/browse/BEAM-7520 >>>>>>>> >> >> > >>>>>>>> >>>>>>>
