Hi Lukasz,

that was my initial thought, but it turns out, that doing so might have performance issues. And it is only a little of a philosophical question, if - when watermark moves from one time to another - you assume time to move "smoothly" (which suggests firing timers for single instant at a moment), or "hops" (which allows firing multiple timers, because all times in between are considered "equal" at least from the runner's perspective). The only difference between these two approaches (semantically) is what to do, when a timer sets (or resets) something in the "hop" time.

On 6/27/19 11:21 PM, Lukasz Cwik wrote:
I'm confused as to why it is valid to advance the watermark to T3 in the original scenario.

T1 and T2 should be treated as inputs to the function and hold the input watermark hence T1 should fire and if it doesn't produce any new timers before T2, then T2 should fire since the watermark will now advance to T2. The only time you would have multiple watermark timers fire as part of the same bundle is if they were distinct timers both set to the same time.

I have some examples[1] documented in the modelling, scheduling, and executing timers doc.

1: https://docs.google.com/document/d/1GRL88rKLHbMR0zJnBHYwM4xtj66VYlB112EWVUFcGB0/edit#heading=h.fzptl5h0vi9k


On Wed, Jun 26, 2019 at 6:40 AM Reuven Lax <re...@google.com <mailto:re...@google.com>> wrote:

    Earlier than the input watermark only applies to event time
    timers, but the above problem holds for processing time timers as
    well.

    On Wed, Jun 26, 2019, 1:50 PM Robert Bradshaw <rober...@google.com
    <mailto:rober...@google.com>> wrote:

        Yeah, it wouldn't be optimal performance-wise, but I think
        it's good
        to keep the bar for a correct SDK low. Might still be better than
        sending one timer per bundle, and you only pay the performance if
        timers are set earlier than the input watermark (and there was
        a timer
        firing in this range). (How often this happens probably varies
        a lot
        in practice.)

        On Wed, Jun 26, 2019 at 2:33 PM Reuven Lax <re...@google.com
        <mailto:re...@google.com>> wrote:
        >
        > This would have a lot of performance problems (especially
        since there is user code that caches within a bundle, and
        invalidates the cache at the end of every bundle). However
        this would be a valid "lazy" implementation.
        >
        > On Wed, Jun 26, 2019 at 2:29 PM Robert Bradshaw
        <rober...@google.com <mailto:rober...@google.com>> wrote:
        >>
        >> Note also that a "lazy" SDK implementation would be to
        simply return
        >> all the timers (as if they were new timers) to runner once
        a timer set
        >> (before or at the last requested timer in the bundle) is
        encountered.
        >> E.g. Suppose we had timers T1, T3, T5 in the bundle. On
        firing T1, we
        >> set T2 and delete T3. The SDK could then claim that a
        timers were
        >> (again) set at T3, T5, then set one at at T2 and deleted at
        T3 and
        >> then be done with the bundle (not actually process T3 and
        T5). (One
        >> way to think about this is that timers are actually bundle
        splits into
        >> a bundle of "done" and "future" work.) A more intelligent
        SDK could,
        >> of course, process the whole bundle by tracking
        modifications to the
        >> to-be-fired timers itself rather than requiring a trip
        through the
        >> runner.
        >>
        >> On Wed, Jun 26, 2019 at 1:51 PM Reuven Lax
        <re...@google.com <mailto:re...@google.com>> wrote:
        >> >
        >> > I like this option the best. It might be trickier to
        implement, but seems like it would be the most consistent
        solution.
        >> >
        >> > Another problem it would solve is the following: let's
        say a bundle arrives containing timers T1 and T2, and while
        processing T1 the user code deletes T2 (or resets it to a time
        in the far future). I'm actually not sure what we do today,
        but I'm a bit afraid that we will go ahead and fire T2 since
        it's already in the bundle, which is clearly incorrect. The
        SDK needs to keep track of this and skip T2 in order to solve
        this, which is the same sort of work needed to implement
        Robert's suggestion.
        >> >
        >> > Reuven
        >> >
        >> > On Wed, Jun 26, 2019 at 12:28 PM Robert Bradshaw
        <rober...@google.com <mailto:rober...@google.com>> wrote:
        >> >>
        >> >> Another option, that is nice from an API perspective but
        places a
        >> >> burden on SDK implementers (and possibly runners), is to
        maintain the
        >> >> ordering of timers by requiring timers to be fired in
        order, and if
        >> >> any timers are set to fire them immediately before
        processing later
        >> >> timers. In other words, if T1 sets T2 and modifies T3,
        these would
        >> >> take effect (locally, the runner may not even know about
        T2) before T3
        >> >> was processed.
        >> >>
        >> >> On Wed, Jun 26, 2019 at 11:13 AM Jan Lukavský
        <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
        >> >> >
        >> >> > Hi,
        >> >> >
        >> >> > I have mentioned an issue I have come across [1] on
        several other
        >> >> > threads, but it probably didn't attract the attention
        that it would desire.
        >> >> >
        >> >> > I will try to restate the problem here for clarity:
        >> >> >
        >> >> >   - on runners that use concept of bundles (the
        original issue mentions
        >> >> > DirectRunner, but it will probably apply for other
        runners, which use
        >> >> > bundles, as well), the workflow is as follows:
        >> >> >
        >> >> >    a) process elements in bundle
        >> >> >
        >> >> >    b) advance watermark
        >> >> >
        >> >> >    c) process timers
        >> >> >
        >> >> >    d) continue to next bundle
        >> >> >
        >> >> >   - the issue with this is that when we are initially
        at time T0, set
        >> >> > two timers for T1 and T3, then advance watermark to T3
        (or beyond), the
        >> >> > timers will fire (correctly) in order T1, T3, but if
        timer at T1 sets
        >> >> > another timer for T2, then this timer will be fired in
        next bundle (and
        >> >> > therefore after T3)
        >> >> >
        >> >> >   - this causes issues mostly with race conditions in
        window GC timers
        >> >> > and user timers (and users don't have any way to solve
        that!)
        >> >> >
        >> >> >   - note that the same applies when one timer tries to
        reset timer that
        >> >> > is already in the current bundle
        >> >> >
        >> >> > I have investigated a way of solving this by running
        timers only for
        >> >> > single timestamp (instant) at each bundle, but as
        Reuven pointed out,
        >> >> > that could regress performance (mostly by delaying
        firing of timers,
        >> >> > that could have fired). Options I see:
        >> >> >
        >> >> >   1) either set the OnTimerContext#timestamp() to
        current input
        >> >> > watermark (not the time that user actually set the
        timer), or
        >> >> >
        >> >> >   2) add OnTimerContext#getCurrentInputWatermark() and
        disallow setting
        >> >> > (or resetting) timers for time between
        OnProcessContext#timestamp and
        >> >> > OnProcessContext#getCurrentInputWatermark(), by
        throwing an exception
        >> >> >
        >> >> >   3) any other option?
        >> >> >
        >> >> > Option 1) seems to be broken by design, as it can
        result in corrupt data
        >> >> > (emitted with wrong timestamp, which is even somewhat
        arbitrary), I'm
        >> >> > including it just for completeness. Option 2) is
        breaking change, that
        >> >> > can result in PIpeline failures (although the failures
        will happen on
        >> >> > Pipelines, that are probably already broken).
        >> >> >
        >> >> > Although I have come with a workaround in the work
        where I originally
        >> >> > come across this issue, I think that this is generally
        serious and
        >> >> > should be dealt with. Mostly because when using
        user-facing APIs, there
        >> >> > are no workarounds possible, today.
        >> >> >
        >> >> > Thanks for discussion!
        >> >> >
        >> >> > Jan
        >> >> >
        >> >> > [1] https://issues.apache.org/jira/browse/BEAM-7520
        >> >> >

Reply via email to