Earlier than the input watermark only applies to event time timers, but the above problem holds for processing time timers as well.
On Wed, Jun 26, 2019, 1:50 PM Robert Bradshaw <rober...@google.com> wrote: > Yeah, it wouldn't be optimal performance-wise, but I think it's good > to keep the bar for a correct SDK low. Might still be better than > sending one timer per bundle, and you only pay the performance if > timers are set earlier than the input watermark (and there was a timer > firing in this range). (How often this happens probably varies a lot > in practice.) > > On Wed, Jun 26, 2019 at 2:33 PM Reuven Lax <re...@google.com> wrote: > > > > This would have a lot of performance problems (especially since there is > user code that caches within a bundle, and invalidates the cache at the end > of every bundle). However this would be a valid "lazy" implementation. > > > > On Wed, Jun 26, 2019 at 2:29 PM Robert Bradshaw <rober...@google.com> > wrote: > >> > >> Note also that a "lazy" SDK implementation would be to simply return > >> all the timers (as if they were new timers) to runner once a timer set > >> (before or at the last requested timer in the bundle) is encountered. > >> E.g. Suppose we had timers T1, T3, T5 in the bundle. On firing T1, we > >> set T2 and delete T3. The SDK could then claim that a timers were > >> (again) set at T3, T5, then set one at at T2 and deleted at T3 and > >> then be done with the bundle (not actually process T3 and T5). (One > >> way to think about this is that timers are actually bundle splits into > >> a bundle of "done" and "future" work.) A more intelligent SDK could, > >> of course, process the whole bundle by tracking modifications to the > >> to-be-fired timers itself rather than requiring a trip through the > >> runner. > >> > >> On Wed, Jun 26, 2019 at 1:51 PM Reuven Lax <re...@google.com> wrote: > >> > > >> > I like this option the best. It might be trickier to implement, but > seems like it would be the most consistent solution. > >> > > >> > Another problem it would solve is the following: let's say a bundle > arrives containing timers T1 and T2, and while processing T1 the user code > deletes T2 (or resets it to a time in the far future). I'm actually not > sure what we do today, but I'm a bit afraid that we will go ahead and fire > T2 since it's already in the bundle, which is clearly incorrect. The SDK > needs to keep track of this and skip T2 in order to solve this, which is > the same sort of work needed to implement Robert's suggestion. > >> > > >> > Reuven > >> > > >> > On Wed, Jun 26, 2019 at 12:28 PM Robert Bradshaw <rober...@google.com> > wrote: > >> >> > >> >> Another option, that is nice from an API perspective but places a > >> >> burden on SDK implementers (and possibly runners), is to maintain the > >> >> ordering of timers by requiring timers to be fired in order, and if > >> >> any timers are set to fire them immediately before processing later > >> >> timers. In other words, if T1 sets T2 and modifies T3, these would > >> >> take effect (locally, the runner may not even know about T2) before > T3 > >> >> was processed. > >> >> > >> >> On Wed, Jun 26, 2019 at 11:13 AM Jan Lukavský <je...@seznam.cz> > wrote: > >> >> > > >> >> > Hi, > >> >> > > >> >> > I have mentioned an issue I have come across [1] on several other > >> >> > threads, but it probably didn't attract the attention that it > would desire. > >> >> > > >> >> > I will try to restate the problem here for clarity: > >> >> > > >> >> > - on runners that use concept of bundles (the original issue > mentions > >> >> > DirectRunner, but it will probably apply for other runners, which > use > >> >> > bundles, as well), the workflow is as follows: > >> >> > > >> >> > a) process elements in bundle > >> >> > > >> >> > b) advance watermark > >> >> > > >> >> > c) process timers > >> >> > > >> >> > d) continue to next bundle > >> >> > > >> >> > - the issue with this is that when we are initially at time T0, > set > >> >> > two timers for T1 and T3, then advance watermark to T3 (or > beyond), the > >> >> > timers will fire (correctly) in order T1, T3, but if timer at T1 > sets > >> >> > another timer for T2, then this timer will be fired in next bundle > (and > >> >> > therefore after T3) > >> >> > > >> >> > - this causes issues mostly with race conditions in window GC > timers > >> >> > and user timers (and users don't have any way to solve that!) > >> >> > > >> >> > - note that the same applies when one timer tries to reset timer > that > >> >> > is already in the current bundle > >> >> > > >> >> > I have investigated a way of solving this by running timers only > for > >> >> > single timestamp (instant) at each bundle, but as Reuven pointed > out, > >> >> > that could regress performance (mostly by delaying firing of > timers, > >> >> > that could have fired). Options I see: > >> >> > > >> >> > 1) either set the OnTimerContext#timestamp() to current input > >> >> > watermark (not the time that user actually set the timer), or > >> >> > > >> >> > 2) add OnTimerContext#getCurrentInputWatermark() and disallow > setting > >> >> > (or resetting) timers for time between OnProcessContext#timestamp > and > >> >> > OnProcessContext#getCurrentInputWatermark(), by throwing an > exception > >> >> > > >> >> > 3) any other option? > >> >> > > >> >> > Option 1) seems to be broken by design, as it can result in > corrupt data > >> >> > (emitted with wrong timestamp, which is even somewhat arbitrary), > I'm > >> >> > including it just for completeness. Option 2) is breaking change, > that > >> >> > can result in PIpeline failures (although the failures will happen > on > >> >> > Pipelines, that are probably already broken). > >> >> > > >> >> > Although I have come with a workaround in the work where I > originally > >> >> > come across this issue, I think that this is generally serious and > >> >> > should be dealt with. Mostly because when using user-facing APIs, > there > >> >> > are no workarounds possible, today. > >> >> > > >> >> > Thanks for discussion! > >> >> > > >> >> > Jan > >> >> > > >> >> > [1] https://issues.apache.org/jira/browse/BEAM-7520 > >> >> > >