> Since timers are per key, wouldn't it be that the timer watermark should also be per key for a StatefulDoFn and hence we would still be able to fire multiple timers (at most one per key) and still have good performance even when the input watermark makes a "hop"?

Yes, multiple timers per different keys will still be possible. The question here is, what consequences would have not firing a timer, that could have been fired, but was hold just for case that timer T1 modifies something. Maybe a lazy approach mentioned by Robert is optimization of this, because most probably, the timer will not modify anything. Question is also, what happens when we don't fire a timer that is set for time below input watermark? Does it have to wait for next bundle? If so, that would be serious issue, because the bundle may come after a significant period of (processing) time. Can we iterate - i.e. extract timers multiple times per single bundle? Does that solve the performance issues Reuven was talking about?

> Earlier it was said that performance was poor if we moved to a model where we prevented multiple timer firings. Since timer firings are per key, can you provide details of what use case has multiple user timer firings per key?

That is very often. Mostly user timer and window GC timer (that is one system timer that clears state and second is user timer). The consequence of swapping these two is obviously a data loss (which is how I came across this).

On 6/28/19 1:41 AM, Lukasz Cwik wrote:
Earlier it was said that performance was poor if we moved to a model where we prevented multiple timer firings. Since timer firings are per key, can you provide details of what use case has multiple user timer firings per key?

On Thu, Jun 27, 2019 at 4:34 PM Reuven Lax <[email protected] <mailto:[email protected]>> wrote:

    The watermark holds (which is how the timer holds up the watermark
    today, as there is no timer watermark) is per key. Usually the
    input watermark making a "hop" is not a problem, in fact it's the
    normal state of affairs.

    On Fri, Jun 28, 2019 at 1:08 AM Lukasz Cwik <[email protected]
    <mailto:[email protected]>> wrote:

        Thanks Reuven and Jan.

        Since timers are per key, wouldn't it be that the timer
        watermark should also be per key for a StatefulDoFn and hence
        we would still be able to fire multiple timers (at most one
        per key) and still have good performance even when the input
        watermark makes a "hop"?


        On Thu, Jun 27, 2019 at 3:43 PM Jan Lukavský <[email protected]
        <mailto:[email protected]>> wrote:

            It would be possible to have "timer watermark", between
            input and output watermark, so that input watermark >=
            timer watermark >= output watermark, but it turns out,
            that doing so implies that we fire timers only for single
            instant (because until the timer is fired and processed,
            the "timer watermark" is on hold).

            On 6/28/19 12:40 AM, Jan Lukavský wrote:

            At least the implementation in DirectRunner fires timers
            according to input watemark. Holding the timer up to
            output watermark causes deadlocks, because timers fired
            at time T might clear watermark hold for the same time.

            On 6/27/19 11:55 PM, Reuven Lax wrote:
            I believe that timers correspond to watermark holds,
            which hold up the output watermark, not the input watermark.

            On Thu, Jun 27, 2019 at 11:21 PM Lukasz Cwik
            <[email protected] <mailto:[email protected]>> wrote:

                I'm confused as to why it is valid to advance the
                watermark to T3 in the original scenario.

                T1 and T2 should be treated as inputs to the
                function and hold the input watermark hence T1
                should fire and if it doesn't produce any new timers
                before T2, then T2 should fire since the watermark
                will now advance to T2. The only time you would have
                multiple watermark timers fire as part of the same
                bundle is if they were distinct timers both set to
                the same time.

                I have some examples[1] documented in the modelling,
                scheduling, and executing timers doc.

                1:
                
https://docs.google.com/document/d/1GRL88rKLHbMR0zJnBHYwM4xtj66VYlB112EWVUFcGB0/edit#heading=h.fzptl5h0vi9k


                On Wed, Jun 26, 2019 at 6:40 AM Reuven Lax
                <[email protected] <mailto:[email protected]>> wrote:

                    Earlier than the input watermark only applies to
                    event time timers, but the above problem holds
                    for processing time timers as well.

                    On Wed, Jun 26, 2019, 1:50 PM Robert Bradshaw
                    <[email protected]
                    <mailto:[email protected]>> wrote:

                        Yeah, it wouldn't be optimal
                        performance-wise, but I think it's good
                        to keep the bar for a correct SDK low. Might
                        still be better than
                        sending one timer per bundle, and you only
                        pay the performance if
                        timers are set earlier than the input
                        watermark (and there was a timer
                        firing in this range). (How often this
                        happens probably varies a lot
                        in practice.)

                        On Wed, Jun 26, 2019 at 2:33 PM Reuven Lax
                        <[email protected] <mailto:[email protected]>>
                        wrote:
                        >
                        > This would have a lot of performance
                        problems (especially since there is user
                        code that caches within a bundle, and
                        invalidates the cache at the end of every
                        bundle). However this would be a valid
                        "lazy" implementation.
                        >
                        > On Wed, Jun 26, 2019 at 2:29 PM Robert
                        Bradshaw <[email protected]
                        <mailto:[email protected]>> wrote:
                        >>
                        >> Note also that a "lazy" SDK
                        implementation would be to simply return
                        >> all the timers (as if they were new
                        timers) to runner once a timer set
                        >> (before or at the last requested timer in
                        the bundle) is encountered.
                        >> E.g. Suppose we had timers T1, T3, T5 in
                        the bundle. On firing T1, we
                        >> set T2 and delete T3. The SDK could then
                        claim that a timers were
                        >> (again) set at T3, T5, then set one at at
                        T2 and deleted at T3 and
                        >> then be done with the bundle (not
                        actually process T3 and T5). (One
                        >> way to think about this is that timers
                        are actually bundle splits into
                        >> a bundle of "done" and "future" work.) A
                        more intelligent SDK could,
                        >> of course, process the whole bundle by
                        tracking modifications to the
                        >> to-be-fired timers itself rather than
                        requiring a trip through the
                        >> runner.
                        >>
                        >> On Wed, Jun 26, 2019 at 1:51 PM Reuven
                        Lax <[email protected]
                        <mailto:[email protected]>> wrote:
                        >> >
                        >> > I like this option the best. It might
                        be trickier to implement, but seems like it
                        would be the most consistent solution.
                        >> >
                        >> > Another problem it would solve is the
                        following: let's say a bundle arrives
                        containing timers T1 and T2, and while
                        processing T1 the user code deletes T2 (or
                        resets it to a time in the far future). I'm
                        actually not sure what we do today, but I'm
                        a bit afraid that we will go ahead and fire
                        T2 since it's already in the bundle, which
                        is clearly incorrect. The SDK needs to keep
                        track of this and skip T2 in order to solve
                        this, which is the same sort of work needed
                        to implement Robert's suggestion.
                        >> >
                        >> > Reuven
                        >> >
                        >> > On Wed, Jun 26, 2019 at 12:28 PM Robert
                        Bradshaw <[email protected]
                        <mailto:[email protected]>> wrote:
                        >> >>
                        >> >> Another option, that is nice from an
                        API perspective but places a
                        >> >> burden on SDK implementers (and
                        possibly runners), is to maintain the
                        >> >> ordering of timers by requiring timers
                        to be fired in order, and if
                        >> >> any timers are set to fire them
                        immediately before processing later
                        >> >> timers. In other words, if T1 sets T2
                        and modifies T3, these would
                        >> >> take effect (locally, the runner may
                        not even know about T2) before T3
                        >> >> was processed.
                        >> >>
                        >> >> On Wed, Jun 26, 2019 at 11:13 AM Jan
                        Lukavský <[email protected]
                        <mailto:[email protected]>> wrote:
                        >> >> >
                        >> >> > Hi,
                        >> >> >
                        >> >> > I have mentioned an issue I have
                        come across [1] on several other
                        >> >> > threads, but it probably didn't
                        attract the attention that it would desire.
                        >> >> >
                        >> >> > I will try to restate the problem
                        here for clarity:
                        >> >> >
                        >> >> >   - on runners that use concept of
                        bundles (the original issue mentions
                        >> >> > DirectRunner, but it will probably
                        apply for other runners, which use
                        >> >> > bundles, as well), the workflow is
                        as follows:
                        >> >> >
                        >> >> >    a) process elements in bundle
                        >> >> >
                        >> >> >    b) advance watermark
                        >> >> >
                        >> >> >    c) process timers
                        >> >> >
                        >> >> >    d) continue to next bundle
                        >> >> >
                        >> >> >   - the issue with this is that when
                        we are initially at time T0, set
                        >> >> > two timers for T1 and T3, then
                        advance watermark to T3 (or beyond), the
                        >> >> > timers will fire (correctly) in
                        order T1, T3, but if timer at T1 sets
                        >> >> > another timer for T2, then this
                        timer will be fired in next bundle (and
                        >> >> > therefore after T3)
                        >> >> >
                        >> >> >   - this causes issues mostly with
                        race conditions in window GC timers
                        >> >> > and user timers (and users don't
                        have any way to solve that!)
                        >> >> >
                        >> >> >   - note that the same applies when
                        one timer tries to reset timer that
                        >> >> > is already in the current bundle
                        >> >> >
                        >> >> > I have investigated a way of solving
                        this by running timers only for
                        >> >> > single timestamp (instant) at each
                        bundle, but as Reuven pointed out,
                        >> >> > that could regress performance
                        (mostly by delaying firing of timers,
                        >> >> > that could have fired). Options I see:
                        >> >> >
                        >> >> >   1) either set the
                        OnTimerContext#timestamp() to current input
                        >> >> > watermark (not the time that user
                        actually set the timer), or
                        >> >> >
                        >> >> >   2) add
                        OnTimerContext#getCurrentInputWatermark()
                        and disallow setting
                        >> >> > (or resetting) timers for time
                        between OnProcessContext#timestamp and
                        >> >> >
                        OnProcessContext#getCurrentInputWatermark(),
                        by throwing an exception
                        >> >> >
                        >> >> >   3) any other option?
                        >> >> >
                        >> >> > Option 1) seems to be broken by
                        design, as it can result in corrupt data
                        >> >> > (emitted with wrong timestamp, which
                        is even somewhat arbitrary), I'm
                        >> >> > including it just for completeness.
                        Option 2) is breaking change, that
                        >> >> > can result in PIpeline failures
                        (although the failures will happen on
                        >> >> > Pipelines, that are probably already
                        broken).
                        >> >> >
                        >> >> > Although I have come with a
                        workaround in the work where I originally
                        >> >> > come across this issue, I think that
                        this is generally serious and
                        >> >> > should be dealt with. Mostly because
                        when using user-facing APIs, there
                        >> >> > are no workarounds possible, today.
                        >> >> >
                        >> >> > Thanks for discussion!
                        >> >> >
                        >> >> > Jan
                        >> >> >
                        >> >> > [1]
                        https://issues.apache.org/jira/browse/BEAM-7520
                        >> >> >

Reply via email to