On 6/20/19 9:30 PM, Reuven Lax wrote:


On Thu, Jun 20, 2019 at 8:54 PM Jan Lukavský <[email protected] <mailto:[email protected]>> wrote:

    > But that is exactly how time advances. Watermarks often don't
    move smoothly, as a single old element can hold up the watermark.
    When that element is finished, the watermark can jump forward in
    time, triggering many timers.

    Sure. Absolutely agree. But the move from time T1 to T2 can be
    viewed as discrete jump, or smooth move, so that when you fire
    timer, any internal timings are set to the actual timestamp of the
    timer. I believe that is how flink works. And this might be
    related to the fact that Flink lacks concept of bundles.

    > I'm not sure how this breaks that invariant. The input watermark
    has only moved forward, as should be true fo the output watermark.
    The output watermark is help up by watermark holds in the step,
    which usually means that the output watermark is already being
    help to the earliest pending timer.

    The problem was stated at the beginning of this thread. I can
    restate it:

    - let's have four times - T0 < T1 < T2 < T3

    - let's have a two timers A and B, set for time T1 and T3,
    respectively

    - watermark moves time from T0 to T3

    - that move fires both timers A and B (in this order), *but* timer
    A is free to set more timers, let's suppose it sets timer for T2

    - the second instance of timer A (set for T2) will fire *after*
    timer B (set for T3), breaking time invariant

Ah, by time invariant you mean the in-order firing of timers?]
Yes, sorry, I meant "time monotonicity invariant with relation to timers". Basically that timers should be fired in timestamp order, because otherwise it might cause unpredictable results.

    Jan

    On 6/20/19 8:43 PM, Reuven Lax wrote:


    On Thu, Jun 20, 2019 at 8:03 PM Jan Lukavský <[email protected]
    <mailto:[email protected]>> wrote:

        Hi Reuven,

        > I would be cautious changing this. Being able to put
        multiple timers in the same bundle saves a lot, and if we
        force them to all run separate through ReduceFnRunner we risk
        regressing performance of some pipelines.

        I understand your point. The issue here is that, the current
        behavior is at least ... unexpected. There might be one
        different conceptual approach to that:

         a) if a bundle contains timers for several distinct
        timestamps (say T1 and T2), then it implies, that timer T1 is
        effectively not fired at time T1, but at time T2 - that is
        due to the fact, that logically, the time hopped discretely
        from some previous time T0 to T2 without any "stopping by".
        Hence, it should be invalid to setup timer for any time lower
        than T2.


    But that is exactly how time advances. Watermarks often don't
    move smoothly, as a single old element can hold up the watermark.
    When that element is finished, the watermark can jump forward in
    time, triggering many timers.

        b) the time will move smoothly (or, millisecond precision
        smoothly), but that implies, that there cannot be more
        distinct timers inside single bundle.

        If we don't want to take path b), we are probably left with
        path a) (as doing nothing seems weird, because it breaks one
        invariant, that time can only move forward).

    I'm not sure how this breaks that invariant. The input watermark
    has only moved forward, as should be true fo the output
    watermark. The output watermark is help up by watermark holds in
    the step, which usually means that the output watermark is
    already being help to the earliest pending timer.

        Option a) can be done - we might add something like
        `getInputWatermark()` and `getOutputWatermark()` to
        `DoFn.OnTimerContext`, and throw exception when user tries to
        setup timer for time before input watermark. Effectively,
        that way we will let the user know, that his timer was set to
        time T1, but was fired at T2. But, that seems to be breaking
        change, unfortunately.

        What do you think?

        Jan

        On 6/20/19 5:29 PM, Reuven Lax wrote:


        On Thu, Jun 20, 2019 at 3:08 PM Jan Lukavský
        <[email protected] <mailto:[email protected]>> wrote:

            Hi,

            this problem seems to be harder than I thought. I have a
            somewhat working code in [1], but there are still
            failing some tests (now tests for ReduceFnRunner), but
            I'm not sure, if the problem is not in the tests, so
            that my current behavior is actually correct. Let me
            explain the problem:

             - let's have a fixed window with allowed lateness of 1 ms

             - let's add two elements into the window (on time), no
            late elements

             - now, ReduceFnRunner with default trigger will set
            *two* timers - one for window.maxTimestamp() and second
            for window.maxTimestamp() + allowedLateness

             - the previous implementation fired *both* timers at
            once (within single call to ReduceFnRunner#onTimers),
            but now it fires twice - once for the first timer and
            second for the other


        I would be cautious changing this. Being able to put
        multiple timers in the same bundle saves a lot, and if we
        force them to all run separate through ReduceFnRunner we
        risk regressing performance of some pipelines.

             - the result of this is that although in both cases
            only single pane is emitted, in my branch the fired pane
            doesn't have the `isLast` flag set (that is because the
            window is not yet garbage collected - waiting for late
            data - but the second time it is not fired, because no
            late data arrived)

            Would anyone know what is actually the correct behavior
            regarding the PaneInfo.isLast? I suppose there are only
            two options - either two panes can come with isLast flag
            (both end-of-window and late), or it might be possible,
            that no pane will marked with this flag (because no late
            pane is fired).

            Jan

             [1] https://github.com/apache/beam/pull/8815


            On 6/10/19 6:26 PM, Jan Lukavský wrote:
            It seems to me that watermark hold cannot change it
            (currently), because in the current implementation
            timers fire according to input watermark, but watermark
            holds apply to output watermark. If I didn't miss anything.

            Dne 10. 6. 2019 18:15 napsal uživatel Lukasz Cwik
            <[email protected]> <mailto:[email protected]>:

                I see. Is there a missing watermark hold for timers
                less then T2?

                On Mon, Jun 10, 2019 at 9:08 AM Jan Lukavský
                <[email protected] <mailto:[email protected]>> wrote:

                    Yes, there is no difference between GC and user
                    timers in this case. I think the problem is
                    simply that when watermark moves from time T1
                    to T2, DirectRunner fires all timers that fire
                    until T2, but that can create new timers for
                    time between T1 and T2, and these will be fired
                    later, although should have been fired before T2.

                    Jan

                    On 6/10/19 5:48 PM, Kenneth Knowles wrote:

                        Reading your Jira, I believe this problem
                        will manifest without the interaction of
                        user timers and GC. Interesting case. It
                        surrounds whether a runner makes a timer
                        available or fires it prior to the bundle
                        being committed.

                        I have commented elsewhere about this part,
                        quoting the Jira:

                        > have experimented with this a little and
                        have not yet figured out what the correct
                        solution should be. What I tried:
                        > 1) hold input watermark for min(setup timers)
                        > 2) fire timers based not on input
                        watermark, but on output watermark (output
                        watermark is held by min timer stamp)

                        Neither of these quite works. What we need
                        is a separate "element input watermark" and
                        "timer input watermark". The overall input
                        watermark that drives GC is the min of
                        these. The output watermark is also held to
                        this overall input watermark. User timers
                        fire according to the element input watermark.

                        Kenn

                        On Mon, Jun 10, 2019 at 8:44 AM Lukasz Cwik
                        <[email protected]
                        <mailto:[email protected]>> wrote:

                            Jan are you editing the implementation
                            of how timers work within the
                            DirectRunner or are trying to build
                            support for time sorted input on top of
                            the Beam model for timers?
                            Because I think you will need to do the
                            former.

                            On Mon, Jun 10, 2019 at 8:41 AM Jan
                            Lukavský <[email protected]
                            <mailto:[email protected]>> wrote:

                                Hm, that would probably work, thanks!

                                But, should the timers behave like
                                that? I'm trying to fix tris by
                                introducing a sequence of watermarks

                                 inputs watermark -> timer
                                watermark -> output watermark

                                as suggested in the JIRA, and it
                                actually seems to be working as
                                expected. It even cleans some code
                                paths, but I'm debugging some
                                strange behavior this exposed -
                                
`WatermarkHold.watermarkHoldTagForTimestampCombiner`
                                seems to have stopped clearing
                                itself after this change and some
                                Pipelines therefore stopped
                                working. I'm little lost why this
                                happened. I can push code I have if
                                anyone interested.

                                Jan

                                On 6/10/19 5:32 PM, Lukasz Cwik wrote:

                                    We hit an instance of this
                                    problem before and solved it
                                    rescheduling the GC timer again
                                    if there was a conflicting
                                    timer that was also meant to fire.

                                    On Mon, Jun 10, 2019 at 8:17 AM
                                    Jan Lukavský <[email protected]
                                    <mailto:[email protected]>> wrote:

                                        For a single key. I'm
                                        getting into collision of
                                        timerId
                                        `__StatefulParDoGcTimerId`
                                        (StatefulDoFnRunner) and my
                                        timerId for flushing sorted
                                        elements in implementation
                                        of
                                        @RequiresTimeSortedInput.
                                        The timers are being
                                        swapped at the end of input
                                        (but it can happen anywhere
                                        near end of window), which
                                        results in state being
                                        cleared before it gets
                                        flushed, which means data loss.

                                         Jan

                                        On 6/10/19 5:08 PM, Reuven
                                        Lax wrote:

                                            Do you mean for a
                                            single key or across keys?

                                            On Mon, Jun 10, 2019,
                                            5:11 AM Jan Lukavský
                                            <[email protected]
                                            <mailto:[email protected]>>
                                            wrote:

                                                Hi,

                                                I have come across
                                                issue [1], where
                                                I'm not sure how to
                                                solve this in
                                                most elegant way.

                                                Any suggestions?

                                                Thanks,

                                                  Jan

                                                [1]
                                                
https://issues.apache.org/jira/browse/BEAM-7520



            It seems to me that watermark hold cannot change it (currently), 
because in the current implementation timers fire according to input watermark, 
but watermark holds apply to output watermark. If I didn't miss anything.

            Dne 10. 6. 2019 18:15 napsal uživatel Lukasz Cwik<[email protected]>  
<mailto:[email protected]>:
            I see. Is there a missing watermark hold for timers less then T2?

            On Mon, Jun 10, 2019 at 9:08 AM Jan Lukavský<[email protected]>  
<mailto:[email protected]>  wrote:
            Yes, there is no difference between GC and user timers in this 
case. I think the problem is simply that when watermark moves from time T1 to 
T2, DirectRunner fires all timers that fire until T2, but that can create new 
timers for time between T1 and T2, and these will be fired later, although 
should have been fired before T2.

            Jan

            On 6/10/19 5:48 PM, Kenneth Knowles wrote:
            Reading your Jira, I believe this problem will manifest without the 
interaction of user timers and GC. Interesting case. It surrounds whether a 
runner makes a timer available or fires it prior to the bundle being committed.

            I have commented elsewhere about this part, quoting the Jira:

            have experimented with this a little and have not yet figured out 
what the correct solution should be. What I tried:
            1) hold input watermark for min(setup timers)
            2) fire timers based not on input watermark, but on output 
watermark (output watermark is held by min timer stamp)
            Neither of these quite works. What we need is a separate "element input 
watermark" and "timer input watermark". The overall input watermark that drives GC 
is the min of these. The output watermark is also held to this overall input watermark. User timers 
fire according to the element input watermark.

            Kenn

            On Mon, Jun 10, 2019 at 8:44 AM Lukasz Cwik<[email protected]>  
<mailto:[email protected]>  wrote:
            Jan are you editing the implementation of how timers work within 
the DirectRunner or are trying to build support for time sorted input on top of 
the Beam model for timers?
            Because I think you will need to do the former.

            On Mon, Jun 10, 2019 at 8:41 AM Jan Lukavský<[email protected]>  
<mailto:[email protected]>  wrote:
            Hm, that would probably work, thanks!

            But, should the timers behave like that? I'm trying to fix tris by 
introducing a sequence of watermarks

              inputs watermark -> timer watermark -> output watermark

            as suggested in the JIRA, and it actually seems to be working as 
expected. It even cleans some code paths, but I'm debugging some strange 
behavior this exposed - `WatermarkHold.watermarkHoldTagForTimestampCombiner` 
seems to have stopped clearing itself after this change and some Pipelines 
therefore stopped working. I'm little lost why this happened. I can push code I 
have if anyone interested.

            Jan

            On 6/10/19 5:32 PM, Lukasz Cwik wrote:
            We hit an instance of this problem before and solved it 
rescheduling the GC timer again if there was a conflicting timer that was also 
meant to fire.

            On Mon, Jun 10, 2019 at 8:17 AM Jan Lukavský<[email protected]>  
<mailto:[email protected]>  wrote:
            For a single key. I'm getting into collision of timerId 
`__StatefulParDoGcTimerId` (StatefulDoFnRunner) and my timerId for flushing 
sorted elements in implementation of @RequiresTimeSortedInput. The timers are 
being swapped at the end of input (but it can happen anywhere near end of 
window), which results in state being cleared before it gets flushed, which 
means data loss.

              Jan

            On 6/10/19 5:08 PM, Reuven Lax wrote:
            Do you mean for a single key or across keys?

            On Mon, Jun 10, 2019, 5:11 AM Jan Lukavský<[email protected]>  
<mailto:[email protected]>  wrote:
            Hi,

            I have come across issue [1], where I'm not sure how to solve this 
in
            most elegant way.

            Any suggestions?

            Thanks,

               Jan

            [1]https://issues.apache.org/jira/browse/BEAM-7520

Reply via email to