It would be possible to have "timer watermark", between input and output watermark, so that input watermark >= timer watermark >= output watermark, but it turns out, that doing so implies that we fire timers only for single instant (because until the timer is fired and processed, the "timer watermark" is on hold).

On 6/28/19 12:40 AM, Jan Lukavský wrote:

At least the implementation in DirectRunner fires timers according to input watemark. Holding the timer up to output watermark causes deadlocks, because timers fired at time T might clear watermark hold for the same time.

On 6/27/19 11:55 PM, Reuven Lax wrote:
I believe that timers correspond to watermark holds, which hold up the output watermark, not the input watermark.

On Thu, Jun 27, 2019 at 11:21 PM Lukasz Cwik <lc...@google.com <mailto:lc...@google.com>> wrote:

    I'm confused as to why it is valid to advance the watermark to T3
    in the original scenario.

    T1 and T2 should be treated as inputs to the function and hold
    the input watermark hence T1 should fire and if it doesn't
    produce any new timers before T2, then T2 should fire since the
    watermark will now advance to T2. The only time you would have
    multiple watermark timers fire as part of the same bundle is if
    they were distinct timers both set to the same time.

    I have some examples[1] documented in the modelling, scheduling,
    and executing timers doc.

    1:
    
https://docs.google.com/document/d/1GRL88rKLHbMR0zJnBHYwM4xtj66VYlB112EWVUFcGB0/edit#heading=h.fzptl5h0vi9k


    On Wed, Jun 26, 2019 at 6:40 AM Reuven Lax <re...@google.com
    <mailto:re...@google.com>> wrote:

        Earlier than the input watermark only applies to event time
        timers, but the above problem holds for processing time
        timers as well.

        On Wed, Jun 26, 2019, 1:50 PM Robert Bradshaw
        <rober...@google.com <mailto:rober...@google.com>> wrote:

            Yeah, it wouldn't be optimal performance-wise, but I
            think it's good
            to keep the bar for a correct SDK low. Might still be
            better than
            sending one timer per bundle, and you only pay the
            performance if
            timers are set earlier than the input watermark (and
            there was a timer
            firing in this range). (How often this happens probably
            varies a lot
            in practice.)

            On Wed, Jun 26, 2019 at 2:33 PM Reuven Lax
            <re...@google.com <mailto:re...@google.com>> wrote:
            >
            > This would have a lot of performance problems
            (especially since there is user code that caches within a
            bundle, and invalidates the cache at the end of every
            bundle). However this would be a valid "lazy" implementation.
            >
            > On Wed, Jun 26, 2019 at 2:29 PM Robert Bradshaw
            <rober...@google.com <mailto:rober...@google.com>> wrote:
            >>
            >> Note also that a "lazy" SDK implementation would be to
            simply return
            >> all the timers (as if they were new timers) to runner
            once a timer set
            >> (before or at the last requested timer in the bundle)
            is encountered.
            >> E.g. Suppose we had timers T1, T3, T5 in the bundle.
            On firing T1, we
            >> set T2 and delete T3. The SDK could then claim that a
            timers were
            >> (again) set at T3, T5, then set one at at T2 and
            deleted at T3 and
            >> then be done with the bundle (not actually process T3
            and T5). (One
            >> way to think about this is that timers are actually
            bundle splits into
            >> a bundle of "done" and "future" work.) A more
            intelligent SDK could,
            >> of course, process the whole bundle by tracking
            modifications to the
            >> to-be-fired timers itself rather than requiring a trip
            through the
            >> runner.
            >>
            >> On Wed, Jun 26, 2019 at 1:51 PM Reuven Lax
            <re...@google.com <mailto:re...@google.com>> wrote:
            >> >
            >> > I like this option the best. It might be trickier to
            implement, but seems like it would be the most consistent
            solution.
            >> >
            >> > Another problem it would solve is the following:
            let's say a bundle arrives containing timers T1 and T2,
            and while processing T1 the user code deletes T2 (or
            resets it to a time in the far future). I'm actually not
            sure what we do today, but I'm a bit afraid that we will
            go ahead and fire T2 since it's already in the bundle,
            which is clearly incorrect. The SDK needs to keep track
            of this and skip T2 in order to solve this, which is the
            same sort of work needed to implement Robert's suggestion.
            >> >
            >> > Reuven
            >> >
            >> > On Wed, Jun 26, 2019 at 12:28 PM Robert Bradshaw
            <rober...@google.com <mailto:rober...@google.com>> wrote:
            >> >>
            >> >> Another option, that is nice from an API
            perspective but places a
            >> >> burden on SDK implementers (and possibly runners),
            is to maintain the
            >> >> ordering of timers by requiring timers to be fired
            in order, and if
            >> >> any timers are set to fire them immediately before
            processing later
            >> >> timers. In other words, if T1 sets T2 and modifies
            T3, these would
            >> >> take effect (locally, the runner may not even know
            about T2) before T3
            >> >> was processed.
            >> >>
            >> >> On Wed, Jun 26, 2019 at 11:13 AM Jan Lukavský
            <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
            >> >> >
            >> >> > Hi,
            >> >> >
            >> >> > I have mentioned an issue I have come across [1]
            on several other
            >> >> > threads, but it probably didn't attract the
            attention that it would desire.
            >> >> >
            >> >> > I will try to restate the problem here for clarity:
            >> >> >
            >> >> >   - on runners that use concept of bundles (the
            original issue mentions
            >> >> > DirectRunner, but it will probably apply for
            other runners, which use
            >> >> > bundles, as well), the workflow is as follows:
            >> >> >
            >> >> >    a) process elements in bundle
            >> >> >
            >> >> >    b) advance watermark
            >> >> >
            >> >> >    c) process timers
            >> >> >
            >> >> >    d) continue to next bundle
            >> >> >
            >> >> >   - the issue with this is that when we are
            initially at time T0, set
            >> >> > two timers for T1 and T3, then advance watermark
            to T3 (or beyond), the
            >> >> > timers will fire (correctly) in order T1, T3, but
            if timer at T1 sets
            >> >> > another timer for T2, then this timer will be
            fired in next bundle (and
            >> >> > therefore after T3)
            >> >> >
            >> >> >   - this causes issues mostly with race
            conditions in window GC timers
            >> >> > and user timers (and users don't have any way to
            solve that!)
            >> >> >
            >> >> >   - note that the same applies when one timer
            tries to reset timer that
            >> >> > is already in the current bundle
            >> >> >
            >> >> > I have investigated a way of solving this by
            running timers only for
            >> >> > single timestamp (instant) at each bundle, but as
            Reuven pointed out,
            >> >> > that could regress performance (mostly by
            delaying firing of timers,
            >> >> > that could have fired). Options I see:
            >> >> >
            >> >> >   1) either set the OnTimerContext#timestamp() to
            current input
            >> >> > watermark (not the time that user actually set
            the timer), or
            >> >> >
            >> >> >   2) add
            OnTimerContext#getCurrentInputWatermark() and disallow
            setting
            >> >> > (or resetting) timers for time between
            OnProcessContext#timestamp and
            >> >> > OnProcessContext#getCurrentInputWatermark(), by
            throwing an exception
            >> >> >
            >> >> >   3) any other option?
            >> >> >
            >> >> > Option 1) seems to be broken by design, as it can
            result in corrupt data
            >> >> > (emitted with wrong timestamp, which is even
            somewhat arbitrary), I'm
            >> >> > including it just for completeness. Option 2) is
            breaking change, that
            >> >> > can result in PIpeline failures (although the
            failures will happen on
            >> >> > Pipelines, that are probably already broken).
            >> >> >
            >> >> > Although I have come with a workaround in the
            work where I originally
            >> >> > come across this issue, I think that this is
            generally serious and
            >> >> > should be dealt with. Mostly because when using
            user-facing APIs, there
            >> >> > are no workarounds possible, today.
            >> >> >
            >> >> > Thanks for discussion!
            >> >> >
            >> >> > Jan
            >> >> >
            >> >> > [1] https://issues.apache.org/jira/browse/BEAM-7520
            >> >> >

Reply via email to