Thanks for the explanation.

On Fri, Jun 28, 2019 at 6:49 AM Reuven Lax <[email protected]> wrote:

> This happens when the watermark hops forward. In practice whenever there
> is any backlog, this is the normal mode of operation.
>
> On Fri, Jun 28, 2019, 12:42 AM Lukasz Cwik <[email protected]> wrote:
>
>> Earlier it was said that performance was poor if we moved to a model
>> where we prevented multiple timer firings. Since timer firings are per key,
>> can you provide details of what use case has multiple user timer firings
>> per key?
>>
>> On Thu, Jun 27, 2019 at 4:34 PM Reuven Lax <[email protected]> wrote:
>>
>>> The watermark holds (which is how the timer holds up the watermark
>>> today, as there is no timer watermark) is per key. Usually the input
>>> watermark making a "hop" is not a problem, in fact it's the normal state of
>>> affairs.
>>>
>>> On Fri, Jun 28, 2019 at 1:08 AM Lukasz Cwik <[email protected]> wrote:
>>>
>>>> Thanks Reuven and Jan.
>>>>
>>>> Since timers are per key, wouldn't it be that the timer watermark
>>>> should also be per key for a StatefulDoFn and hence we would still be able
>>>> to fire multiple timers (at most one per key) and still have good
>>>> performance even when the input watermark makes a "hop"?
>>>>
>>>>
>>>> On Thu, Jun 27, 2019 at 3:43 PM Jan Lukavský <[email protected]> wrote:
>>>>
>>>>> It would be possible to have "timer watermark", between input and
>>>>> output watermark, so that input watermark >= timer watermark >= output
>>>>> watermark, but it turns out, that doing so implies that we fire timers 
>>>>> only
>>>>> for single instant (because until the timer is fired and processed, the
>>>>> "timer watermark" is on hold).
>>>>> On 6/28/19 12:40 AM, Jan Lukavský wrote:
>>>>>
>>>>> At least the implementation in DirectRunner fires timers according to
>>>>> input watemark. Holding the timer up to output watermark causes deadlocks,
>>>>> because timers fired at time T might clear watermark hold for the same 
>>>>> time.
>>>>> On 6/27/19 11:55 PM, Reuven Lax wrote:
>>>>>
>>>>> I believe that timers correspond to watermark holds, which hold up the
>>>>> output watermark, not the input watermark.
>>>>>
>>>>> On Thu, Jun 27, 2019 at 11:21 PM Lukasz Cwik <[email protected]> wrote:
>>>>>
>>>>>> I'm confused as to why it is valid to advance the watermark to T3 in
>>>>>> the original scenario.
>>>>>>
>>>>>> T1 and T2 should be treated as inputs to the function and hold the
>>>>>> input watermark hence T1 should fire and if it doesn't produce any new
>>>>>> timers before T2, then T2 should fire since the watermark will now 
>>>>>> advance
>>>>>> to T2. The only time you would have multiple watermark timers fire as 
>>>>>> part
>>>>>> of the same bundle is if they were distinct timers both set to the same
>>>>>> time.
>>>>>>
>>>>>> I have some examples[1] documented in the modelling, scheduling, and
>>>>>> executing timers doc.
>>>>>>
>>>>>> 1:
>>>>>> https://docs.google.com/document/d/1GRL88rKLHbMR0zJnBHYwM4xtj66VYlB112EWVUFcGB0/edit#heading=h.fzptl5h0vi9k
>>>>>>
>>>>>>
>>>>>> On Wed, Jun 26, 2019 at 6:40 AM Reuven Lax <[email protected]> wrote:
>>>>>>
>>>>>>> Earlier than the input watermark only applies to event time timers,
>>>>>>> but the above problem holds for processing time timers as well.
>>>>>>>
>>>>>>> On Wed, Jun 26, 2019, 1:50 PM Robert Bradshaw <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Yeah, it wouldn't be optimal performance-wise, but I think it's good
>>>>>>>> to keep the bar for a correct SDK low. Might still be better than
>>>>>>>> sending one timer per bundle, and you only pay the performance if
>>>>>>>> timers are set earlier than the input watermark (and there was a
>>>>>>>> timer
>>>>>>>> firing in this range). (How often this happens probably varies a lot
>>>>>>>> in practice.)
>>>>>>>>
>>>>>>>> On Wed, Jun 26, 2019 at 2:33 PM Reuven Lax <[email protected]>
>>>>>>>> wrote:
>>>>>>>> >
>>>>>>>> > This would have a lot of performance problems (especially since
>>>>>>>> there is user code that caches within a bundle, and invalidates the 
>>>>>>>> cache
>>>>>>>> at the end of every bundle). However this would be a valid "lazy"
>>>>>>>> implementation.
>>>>>>>> >
>>>>>>>> > On Wed, Jun 26, 2019 at 2:29 PM Robert Bradshaw <
>>>>>>>> [email protected]> wrote:
>>>>>>>> >>
>>>>>>>> >> Note also that a "lazy" SDK implementation would be to simply
>>>>>>>> return
>>>>>>>> >> all the timers (as if they were new timers) to runner once a
>>>>>>>> timer set
>>>>>>>> >> (before or at the last requested timer in the bundle) is
>>>>>>>> encountered.
>>>>>>>> >> E.g. Suppose we had timers T1, T3, T5 in the bundle. On firing
>>>>>>>> T1, we
>>>>>>>> >> set T2 and delete T3. The SDK could then claim that a timers were
>>>>>>>> >> (again) set at T3, T5, then set one at at T2 and deleted at T3
>>>>>>>> and
>>>>>>>> >> then be done with the bundle (not actually process T3 and T5).
>>>>>>>> (One
>>>>>>>> >> way to think about this is that timers are actually bundle
>>>>>>>> splits into
>>>>>>>> >> a bundle of "done" and "future" work.) A more intelligent SDK
>>>>>>>> could,
>>>>>>>> >> of course, process the whole bundle by tracking modifications to
>>>>>>>> the
>>>>>>>> >> to-be-fired timers itself rather than requiring a trip through
>>>>>>>> the
>>>>>>>> >> runner.
>>>>>>>> >>
>>>>>>>> >> On Wed, Jun 26, 2019 at 1:51 PM Reuven Lax <[email protected]>
>>>>>>>> wrote:
>>>>>>>> >> >
>>>>>>>> >> > I like this option the best. It might be trickier to
>>>>>>>> implement, but seems like it would be the most consistent solution.
>>>>>>>> >> >
>>>>>>>> >> > Another problem it would solve is the following: let's say a
>>>>>>>> bundle arrives containing timers T1 and T2, and while processing T1 the
>>>>>>>> user code deletes T2 (or resets it to a time in the far future). I'm
>>>>>>>> actually not sure what we do today, but I'm a bit afraid that we will 
>>>>>>>> go
>>>>>>>> ahead and fire T2 since it's already in the bundle, which is clearly
>>>>>>>> incorrect. The SDK needs to keep track of this and skip T2 in order to
>>>>>>>> solve this, which is the same sort of work needed to implement Robert's
>>>>>>>> suggestion.
>>>>>>>> >> >
>>>>>>>> >> > Reuven
>>>>>>>> >> >
>>>>>>>> >> > On Wed, Jun 26, 2019 at 12:28 PM Robert Bradshaw <
>>>>>>>> [email protected]> wrote:
>>>>>>>> >> >>
>>>>>>>> >> >> Another option, that is nice from an API perspective but
>>>>>>>> places a
>>>>>>>> >> >> burden on SDK implementers (and possibly runners), is to
>>>>>>>> maintain the
>>>>>>>> >> >> ordering of timers by requiring timers to be fired in order,
>>>>>>>> and if
>>>>>>>> >> >> any timers are set to fire them immediately before processing
>>>>>>>> later
>>>>>>>> >> >> timers. In other words, if T1 sets T2 and modifies T3, these
>>>>>>>> would
>>>>>>>> >> >> take effect (locally, the runner may not even know about T2)
>>>>>>>> before T3
>>>>>>>> >> >> was processed.
>>>>>>>> >> >>
>>>>>>>> >> >> On Wed, Jun 26, 2019 at 11:13 AM Jan Lukavský <
>>>>>>>> [email protected]> wrote:
>>>>>>>> >> >> >
>>>>>>>> >> >> > Hi,
>>>>>>>> >> >> >
>>>>>>>> >> >> > I have mentioned an issue I have come across [1] on several
>>>>>>>> other
>>>>>>>> >> >> > threads, but it probably didn't attract the attention that
>>>>>>>> it would desire.
>>>>>>>> >> >> >
>>>>>>>> >> >> > I will try to restate the problem here for clarity:
>>>>>>>> >> >> >
>>>>>>>> >> >> >   - on runners that use concept of bundles (the original
>>>>>>>> issue mentions
>>>>>>>> >> >> > DirectRunner, but it will probably apply for other runners,
>>>>>>>> which use
>>>>>>>> >> >> > bundles, as well), the workflow is as follows:
>>>>>>>> >> >> >
>>>>>>>> >> >> >    a) process elements in bundle
>>>>>>>> >> >> >
>>>>>>>> >> >> >    b) advance watermark
>>>>>>>> >> >> >
>>>>>>>> >> >> >    c) process timers
>>>>>>>> >> >> >
>>>>>>>> >> >> >    d) continue to next bundle
>>>>>>>> >> >> >
>>>>>>>> >> >> >   - the issue with this is that when we are initially at
>>>>>>>> time T0, set
>>>>>>>> >> >> > two timers for T1 and T3, then advance watermark to T3 (or
>>>>>>>> beyond), the
>>>>>>>> >> >> > timers will fire (correctly) in order T1, T3, but if timer
>>>>>>>> at T1 sets
>>>>>>>> >> >> > another timer for T2, then this timer will be fired in next
>>>>>>>> bundle (and
>>>>>>>> >> >> > therefore after T3)
>>>>>>>> >> >> >
>>>>>>>> >> >> >   - this causes issues mostly with race conditions in
>>>>>>>> window GC timers
>>>>>>>> >> >> > and user timers (and users don't have any way to solve
>>>>>>>> that!)
>>>>>>>> >> >> >
>>>>>>>> >> >> >   - note that the same applies when one timer tries to
>>>>>>>> reset timer that
>>>>>>>> >> >> > is already in the current bundle
>>>>>>>> >> >> >
>>>>>>>> >> >> > I have investigated a way of solving this by running timers
>>>>>>>> only for
>>>>>>>> >> >> > single timestamp (instant) at each bundle, but as Reuven
>>>>>>>> pointed out,
>>>>>>>> >> >> > that could regress performance (mostly by delaying firing
>>>>>>>> of timers,
>>>>>>>> >> >> > that could have fired). Options I see:
>>>>>>>> >> >> >
>>>>>>>> >> >> >   1) either set the OnTimerContext#timestamp() to current
>>>>>>>> input
>>>>>>>> >> >> > watermark (not the time that user actually set the timer),
>>>>>>>> or
>>>>>>>> >> >> >
>>>>>>>> >> >> >   2) add OnTimerContext#getCurrentInputWatermark() and
>>>>>>>> disallow setting
>>>>>>>> >> >> > (or resetting) timers for time between
>>>>>>>> OnProcessContext#timestamp and
>>>>>>>> >> >> > OnProcessContext#getCurrentInputWatermark(), by throwing an
>>>>>>>> exception
>>>>>>>> >> >> >
>>>>>>>> >> >> >   3) any other option?
>>>>>>>> >> >> >
>>>>>>>> >> >> > Option 1) seems to be broken by design, as it can result in
>>>>>>>> corrupt data
>>>>>>>> >> >> > (emitted with wrong timestamp, which is even somewhat
>>>>>>>> arbitrary), I'm
>>>>>>>> >> >> > including it just for completeness. Option 2) is breaking
>>>>>>>> change, that
>>>>>>>> >> >> > can result in PIpeline failures (although the failures will
>>>>>>>> happen on
>>>>>>>> >> >> > Pipelines, that are probably already broken).
>>>>>>>> >> >> >
>>>>>>>> >> >> > Although I have come with a workaround in the work where I
>>>>>>>> originally
>>>>>>>> >> >> > come across this issue, I think that this is generally
>>>>>>>> serious and
>>>>>>>> >> >> > should be dealt with. Mostly because when using user-facing
>>>>>>>> APIs, there
>>>>>>>> >> >> > are no workarounds possible, today.
>>>>>>>> >> >> >
>>>>>>>> >> >> > Thanks for discussion!
>>>>>>>> >> >> >
>>>>>>>> >> >> > Jan
>>>>>>>> >> >> >
>>>>>>>> >> >> > [1] https://issues.apache.org/jira/browse/BEAM-7520
>>>>>>>> >> >> >
>>>>>>>>
>>>>>>>

Reply via email to