The watermark holds (which is how the timer holds up the watermark today,
as there is no timer watermark) is per key. Usually the input watermark
making a "hop" is not a problem, in fact it's the normal state of affairs.

On Fri, Jun 28, 2019 at 1:08 AM Lukasz Cwik <lc...@google.com> wrote:

> Thanks Reuven and Jan.
>
> Since timers are per key, wouldn't it be that the timer watermark should
> also be per key for a StatefulDoFn and hence we would still be able to fire
> multiple timers (at most one per key) and still have good performance even
> when the input watermark makes a "hop"?
>
>
> On Thu, Jun 27, 2019 at 3:43 PM Jan Lukavský <je...@seznam.cz> wrote:
>
>> It would be possible to have "timer watermark", between input and output
>> watermark, so that input watermark >= timer watermark >= output watermark,
>> but it turns out, that doing so implies that we fire timers only for single
>> instant (because until the timer is fired and processed, the "timer
>> watermark" is on hold).
>> On 6/28/19 12:40 AM, Jan Lukavský wrote:
>>
>> At least the implementation in DirectRunner fires timers according to
>> input watemark. Holding the timer up to output watermark causes deadlocks,
>> because timers fired at time T might clear watermark hold for the same time.
>> On 6/27/19 11:55 PM, Reuven Lax wrote:
>>
>> I believe that timers correspond to watermark holds, which hold up the
>> output watermark, not the input watermark.
>>
>> On Thu, Jun 27, 2019 at 11:21 PM Lukasz Cwik <lc...@google.com> wrote:
>>
>>> I'm confused as to why it is valid to advance the watermark to T3 in the
>>> original scenario.
>>>
>>> T1 and T2 should be treated as inputs to the function and hold the input
>>> watermark hence T1 should fire and if it doesn't produce any new timers
>>> before T2, then T2 should fire since the watermark will now advance to T2.
>>> The only time you would have multiple watermark timers fire as part of the
>>> same bundle is if they were distinct timers both set to the same time.
>>>
>>> I have some examples[1] documented in the modelling, scheduling, and
>>> executing timers doc.
>>>
>>> 1:
>>> https://docs.google.com/document/d/1GRL88rKLHbMR0zJnBHYwM4xtj66VYlB112EWVUFcGB0/edit#heading=h.fzptl5h0vi9k
>>>
>>>
>>> On Wed, Jun 26, 2019 at 6:40 AM Reuven Lax <re...@google.com> wrote:
>>>
>>>> Earlier than the input watermark only applies to event time timers, but
>>>> the above problem holds for processing time timers as well.
>>>>
>>>> On Wed, Jun 26, 2019, 1:50 PM Robert Bradshaw <rober...@google.com>
>>>> wrote:
>>>>
>>>>> Yeah, it wouldn't be optimal performance-wise, but I think it's good
>>>>> to keep the bar for a correct SDK low. Might still be better than
>>>>> sending one timer per bundle, and you only pay the performance if
>>>>> timers are set earlier than the input watermark (and there was a timer
>>>>> firing in this range). (How often this happens probably varies a lot
>>>>> in practice.)
>>>>>
>>>>> On Wed, Jun 26, 2019 at 2:33 PM Reuven Lax <re...@google.com> wrote:
>>>>> >
>>>>> > This would have a lot of performance problems (especially since
>>>>> there is user code that caches within a bundle, and invalidates the cache
>>>>> at the end of every bundle). However this would be a valid "lazy"
>>>>> implementation.
>>>>> >
>>>>> > On Wed, Jun 26, 2019 at 2:29 PM Robert Bradshaw <rober...@google.com>
>>>>> wrote:
>>>>> >>
>>>>> >> Note also that a "lazy" SDK implementation would be to simply return
>>>>> >> all the timers (as if they were new timers) to runner once a timer
>>>>> set
>>>>> >> (before or at the last requested timer in the bundle) is
>>>>> encountered.
>>>>> >> E.g. Suppose we had timers T1, T3, T5 in the bundle. On firing T1,
>>>>> we
>>>>> >> set T2 and delete T3. The SDK could then claim that a timers were
>>>>> >> (again) set at T3, T5, then set one at at T2 and deleted at T3 and
>>>>> >> then be done with the bundle (not actually process T3 and T5). (One
>>>>> >> way to think about this is that timers are actually bundle splits
>>>>> into
>>>>> >> a bundle of "done" and "future" work.) A more intelligent SDK could,
>>>>> >> of course, process the whole bundle by tracking modifications to the
>>>>> >> to-be-fired timers itself rather than requiring a trip through the
>>>>> >> runner.
>>>>> >>
>>>>> >> On Wed, Jun 26, 2019 at 1:51 PM Reuven Lax <re...@google.com>
>>>>> wrote:
>>>>> >> >
>>>>> >> > I like this option the best. It might be trickier to implement,
>>>>> but seems like it would be the most consistent solution.
>>>>> >> >
>>>>> >> > Another problem it would solve is the following: let's say a
>>>>> bundle arrives containing timers T1 and T2, and while processing T1 the
>>>>> user code deletes T2 (or resets it to a time in the far future). I'm
>>>>> actually not sure what we do today, but I'm a bit afraid that we will go
>>>>> ahead and fire T2 since it's already in the bundle, which is clearly
>>>>> incorrect. The SDK needs to keep track of this and skip T2 in order to
>>>>> solve this, which is the same sort of work needed to implement Robert's
>>>>> suggestion.
>>>>> >> >
>>>>> >> > Reuven
>>>>> >> >
>>>>> >> > On Wed, Jun 26, 2019 at 12:28 PM Robert Bradshaw <
>>>>> rober...@google.com> wrote:
>>>>> >> >>
>>>>> >> >> Another option, that is nice from an API perspective but places a
>>>>> >> >> burden on SDK implementers (and possibly runners), is to
>>>>> maintain the
>>>>> >> >> ordering of timers by requiring timers to be fired in order, and
>>>>> if
>>>>> >> >> any timers are set to fire them immediately before processing
>>>>> later
>>>>> >> >> timers. In other words, if T1 sets T2 and modifies T3, these
>>>>> would
>>>>> >> >> take effect (locally, the runner may not even know about T2)
>>>>> before T3
>>>>> >> >> was processed.
>>>>> >> >>
>>>>> >> >> On Wed, Jun 26, 2019 at 11:13 AM Jan Lukavský <je...@seznam.cz>
>>>>> wrote:
>>>>> >> >> >
>>>>> >> >> > Hi,
>>>>> >> >> >
>>>>> >> >> > I have mentioned an issue I have come across [1] on several
>>>>> other
>>>>> >> >> > threads, but it probably didn't attract the attention that it
>>>>> would desire.
>>>>> >> >> >
>>>>> >> >> > I will try to restate the problem here for clarity:
>>>>> >> >> >
>>>>> >> >> >   - on runners that use concept of bundles (the original issue
>>>>> mentions
>>>>> >> >> > DirectRunner, but it will probably apply for other runners,
>>>>> which use
>>>>> >> >> > bundles, as well), the workflow is as follows:
>>>>> >> >> >
>>>>> >> >> >    a) process elements in bundle
>>>>> >> >> >
>>>>> >> >> >    b) advance watermark
>>>>> >> >> >
>>>>> >> >> >    c) process timers
>>>>> >> >> >
>>>>> >> >> >    d) continue to next bundle
>>>>> >> >> >
>>>>> >> >> >   - the issue with this is that when we are initially at time
>>>>> T0, set
>>>>> >> >> > two timers for T1 and T3, then advance watermark to T3 (or
>>>>> beyond), the
>>>>> >> >> > timers will fire (correctly) in order T1, T3, but if timer at
>>>>> T1 sets
>>>>> >> >> > another timer for T2, then this timer will be fired in next
>>>>> bundle (and
>>>>> >> >> > therefore after T3)
>>>>> >> >> >
>>>>> >> >> >   - this causes issues mostly with race conditions in window
>>>>> GC timers
>>>>> >> >> > and user timers (and users don't have any way to solve that!)
>>>>> >> >> >
>>>>> >> >> >   - note that the same applies when one timer tries to reset
>>>>> timer that
>>>>> >> >> > is already in the current bundle
>>>>> >> >> >
>>>>> >> >> > I have investigated a way of solving this by running timers
>>>>> only for
>>>>> >> >> > single timestamp (instant) at each bundle, but as Reuven
>>>>> pointed out,
>>>>> >> >> > that could regress performance (mostly by delaying firing of
>>>>> timers,
>>>>> >> >> > that could have fired). Options I see:
>>>>> >> >> >
>>>>> >> >> >   1) either set the OnTimerContext#timestamp() to current input
>>>>> >> >> > watermark (not the time that user actually set the timer), or
>>>>> >> >> >
>>>>> >> >> >   2) add OnTimerContext#getCurrentInputWatermark() and
>>>>> disallow setting
>>>>> >> >> > (or resetting) timers for time between
>>>>> OnProcessContext#timestamp and
>>>>> >> >> > OnProcessContext#getCurrentInputWatermark(), by throwing an
>>>>> exception
>>>>> >> >> >
>>>>> >> >> >   3) any other option?
>>>>> >> >> >
>>>>> >> >> > Option 1) seems to be broken by design, as it can result in
>>>>> corrupt data
>>>>> >> >> > (emitted with wrong timestamp, which is even somewhat
>>>>> arbitrary), I'm
>>>>> >> >> > including it just for completeness. Option 2) is breaking
>>>>> change, that
>>>>> >> >> > can result in PIpeline failures (although the failures will
>>>>> happen on
>>>>> >> >> > Pipelines, that are probably already broken).
>>>>> >> >> >
>>>>> >> >> > Although I have come with a workaround in the work where I
>>>>> originally
>>>>> >> >> > come across this issue, I think that this is generally serious
>>>>> and
>>>>> >> >> > should be dealt with. Mostly because when using user-facing
>>>>> APIs, there
>>>>> >> >> > are no workarounds possible, today.
>>>>> >> >> >
>>>>> >> >> > Thanks for discussion!
>>>>> >> >> >
>>>>> >> >> > Jan
>>>>> >> >> >
>>>>> >> >> > [1] https://issues.apache.org/jira/browse/BEAM-7520
>>>>> >> >> >
>>>>>
>>>>

Reply via email to