I believe that timers correspond to watermark holds, which hold up the
output watermark, not the input watermark.

On Thu, Jun 27, 2019 at 11:21 PM Lukasz Cwik <[email protected]> wrote:

> I'm confused as to why it is valid to advance the watermark to T3 in the
> original scenario.
>
> T1 and T2 should be treated as inputs to the function and hold the input
> watermark hence T1 should fire and if it doesn't produce any new timers
> before T2, then T2 should fire since the watermark will now advance to T2.
> The only time you would have multiple watermark timers fire as part of the
> same bundle is if they were distinct timers both set to the same time.
>
> I have some examples[1] documented in the modelling, scheduling, and
> executing timers doc.
>
> 1:
> https://docs.google.com/document/d/1GRL88rKLHbMR0zJnBHYwM4xtj66VYlB112EWVUFcGB0/edit#heading=h.fzptl5h0vi9k
>
>
> On Wed, Jun 26, 2019 at 6:40 AM Reuven Lax <[email protected]> wrote:
>
>> Earlier than the input watermark only applies to event time timers, but
>> the above problem holds for processing time timers as well.
>>
>> On Wed, Jun 26, 2019, 1:50 PM Robert Bradshaw <[email protected]>
>> wrote:
>>
>>> Yeah, it wouldn't be optimal performance-wise, but I think it's good
>>> to keep the bar for a correct SDK low. Might still be better than
>>> sending one timer per bundle, and you only pay the performance if
>>> timers are set earlier than the input watermark (and there was a timer
>>> firing in this range). (How often this happens probably varies a lot
>>> in practice.)
>>>
>>> On Wed, Jun 26, 2019 at 2:33 PM Reuven Lax <[email protected]> wrote:
>>> >
>>> > This would have a lot of performance problems (especially since there
>>> is user code that caches within a bundle, and invalidates the cache at the
>>> end of every bundle). However this would be a valid "lazy" implementation.
>>> >
>>> > On Wed, Jun 26, 2019 at 2:29 PM Robert Bradshaw <[email protected]>
>>> wrote:
>>> >>
>>> >> Note also that a "lazy" SDK implementation would be to simply return
>>> >> all the timers (as if they were new timers) to runner once a timer set
>>> >> (before or at the last requested timer in the bundle) is encountered.
>>> >> E.g. Suppose we had timers T1, T3, T5 in the bundle. On firing T1, we
>>> >> set T2 and delete T3. The SDK could then claim that a timers were
>>> >> (again) set at T3, T5, then set one at at T2 and deleted at T3 and
>>> >> then be done with the bundle (not actually process T3 and T5). (One
>>> >> way to think about this is that timers are actually bundle splits into
>>> >> a bundle of "done" and "future" work.) A more intelligent SDK could,
>>> >> of course, process the whole bundle by tracking modifications to the
>>> >> to-be-fired timers itself rather than requiring a trip through the
>>> >> runner.
>>> >>
>>> >> On Wed, Jun 26, 2019 at 1:51 PM Reuven Lax <[email protected]> wrote:
>>> >> >
>>> >> > I like this option the best. It might be trickier to implement, but
>>> seems like it would be the most consistent solution.
>>> >> >
>>> >> > Another problem it would solve is the following: let's say a bundle
>>> arrives containing timers T1 and T2, and while processing T1 the user code
>>> deletes T2 (or resets it to a time in the far future). I'm actually not
>>> sure what we do today, but I'm a bit afraid that we will go ahead and fire
>>> T2 since it's already in the bundle, which is clearly incorrect. The SDK
>>> needs to keep track of this and skip T2 in order to solve this, which is
>>> the same sort of work needed to implement Robert's suggestion.
>>> >> >
>>> >> > Reuven
>>> >> >
>>> >> > On Wed, Jun 26, 2019 at 12:28 PM Robert Bradshaw <
>>> [email protected]> wrote:
>>> >> >>
>>> >> >> Another option, that is nice from an API perspective but places a
>>> >> >> burden on SDK implementers (and possibly runners), is to maintain
>>> the
>>> >> >> ordering of timers by requiring timers to be fired in order, and if
>>> >> >> any timers are set to fire them immediately before processing later
>>> >> >> timers. In other words, if T1 sets T2 and modifies T3, these would
>>> >> >> take effect (locally, the runner may not even know about T2)
>>> before T3
>>> >> >> was processed.
>>> >> >>
>>> >> >> On Wed, Jun 26, 2019 at 11:13 AM Jan Lukavský <[email protected]>
>>> wrote:
>>> >> >> >
>>> >> >> > Hi,
>>> >> >> >
>>> >> >> > I have mentioned an issue I have come across [1] on several other
>>> >> >> > threads, but it probably didn't attract the attention that it
>>> would desire.
>>> >> >> >
>>> >> >> > I will try to restate the problem here for clarity:
>>> >> >> >
>>> >> >> >   - on runners that use concept of bundles (the original issue
>>> mentions
>>> >> >> > DirectRunner, but it will probably apply for other runners,
>>> which use
>>> >> >> > bundles, as well), the workflow is as follows:
>>> >> >> >
>>> >> >> >    a) process elements in bundle
>>> >> >> >
>>> >> >> >    b) advance watermark
>>> >> >> >
>>> >> >> >    c) process timers
>>> >> >> >
>>> >> >> >    d) continue to next bundle
>>> >> >> >
>>> >> >> >   - the issue with this is that when we are initially at time
>>> T0, set
>>> >> >> > two timers for T1 and T3, then advance watermark to T3 (or
>>> beyond), the
>>> >> >> > timers will fire (correctly) in order T1, T3, but if timer at T1
>>> sets
>>> >> >> > another timer for T2, then this timer will be fired in next
>>> bundle (and
>>> >> >> > therefore after T3)
>>> >> >> >
>>> >> >> >   - this causes issues mostly with race conditions in window GC
>>> timers
>>> >> >> > and user timers (and users don't have any way to solve that!)
>>> >> >> >
>>> >> >> >   - note that the same applies when one timer tries to reset
>>> timer that
>>> >> >> > is already in the current bundle
>>> >> >> >
>>> >> >> > I have investigated a way of solving this by running timers only
>>> for
>>> >> >> > single timestamp (instant) at each bundle, but as Reuven pointed
>>> out,
>>> >> >> > that could regress performance (mostly by delaying firing of
>>> timers,
>>> >> >> > that could have fired). Options I see:
>>> >> >> >
>>> >> >> >   1) either set the OnTimerContext#timestamp() to current input
>>> >> >> > watermark (not the time that user actually set the timer), or
>>> >> >> >
>>> >> >> >   2) add OnTimerContext#getCurrentInputWatermark() and disallow
>>> setting
>>> >> >> > (or resetting) timers for time between
>>> OnProcessContext#timestamp and
>>> >> >> > OnProcessContext#getCurrentInputWatermark(), by throwing an
>>> exception
>>> >> >> >
>>> >> >> >   3) any other option?
>>> >> >> >
>>> >> >> > Option 1) seems to be broken by design, as it can result in
>>> corrupt data
>>> >> >> > (emitted with wrong timestamp, which is even somewhat
>>> arbitrary), I'm
>>> >> >> > including it just for completeness. Option 2) is breaking
>>> change, that
>>> >> >> > can result in PIpeline failures (although the failures will
>>> happen on
>>> >> >> > Pipelines, that are probably already broken).
>>> >> >> >
>>> >> >> > Although I have come with a workaround in the work where I
>>> originally
>>> >> >> > come across this issue, I think that this is generally serious
>>> and
>>> >> >> > should be dealt with. Mostly because when using user-facing
>>> APIs, there
>>> >> >> > are no workarounds possible, today.
>>> >> >> >
>>> >> >> > Thanks for discussion!
>>> >> >> >
>>> >> >> > Jan
>>> >> >> >
>>> >> >> > [1] https://issues.apache.org/jira/browse/BEAM-7520
>>> >> >> >
>>>
>>

Reply via email to