Hi Luke and Kenn,

I agree, my mental model fits this as well. But still, even in the presence of simultaneuos existence of all windows at once - GBK and stateful DoFns differ in the way they handle time *inside* each window (and I'm as well leaving merging windows outside, partly because they are not currently supported in stateful DoFns). GBK discretizes time (visible to user) through triggers, while stateful DoFn doesn't. That is where differences of these two come from.

Jan

On 1/7/20 10:16 PM, Luke Cwik wrote:
That is a really good way to describe my mental model as well.

On Tue, Jan 7, 2020 at 12:20 PM Kenneth Knowles <[email protected] <mailto:[email protected]>> wrote:



    On Tue, Jan 7, 2020 at 1:39 AM Jan Lukavský <[email protected]
    <mailto:[email protected]>> wrote:

        Hi Kenn,

        I see that my terminology seems not to be 100% aligned with
        Beam's. I'll work on that. :-)

        I agree with what you say, and by "late" I mostly meant
        "droppable" (arriving too late after watermark).

        I'm definitely not proposing to get back to something like
        "out of order" == "late" or anything like that. I'm also aware
        that stateful operation is windowed operation, but the
        semantics of the windowing is different than of a GBK. The
        difference is how time moves in GBK and how moves in stateful
        DoFn. Throwing away some details (early triggers, late data
        triggers), the main difference is that in GBK case, time hops
        just between window boundaries, while in stateful DoFn time
        moves "smoothly" (with each watermark update). Now, this
        difference brings the question about why the definition of
        "droppable" data is the same for both types of operations,
        when there is a difference in how users "perceive" time. As
        the more generic operation, stateful DoFn might deserve a more
        general definition of droppable data, which should degrade
        naturally to the one of GBK in presence of "discrete time hops".


    I understand what you mean. On the other hand, I encourage
    thinking of event time spatially, not as time passing. That is a
    big part of unifying batch/streaming real-time/archival
    processing. The event time window is a secondary key to partition
    the data (merging windows are slightly more complex). All event
    time windows exist simultaneously. So for both stateful ParDo and
    GBK, I find it helpful to consider this perspective where all
    windows are processed simultaneously / in an arbitrary order not
    assuming windows are ordered at all. Then you see that GBK and
    stateful ParDo do not really treat windows / watermark
    differently: both of them process a stream of data for each (key,
    window) pair until the watermark informs them that the stream is
    expired, then they GC the state associated with that (key, window)
    pair.

    Kenn

        This might have some consequences on how the droppable data
        should be handled in presence of (early) triggers, because
        triggerring is actually what makes time to "hop", so we might
        arrive to a conclusion that we might actually drop any data
        that has timestamp less than "last trigger time + allowed
        lateness". This looks appealing to me, because IMO it has
        strong internal logical consistency. Although it is possible
        that it would drop more data, which is generally undesirable,
        I admit that.

        I'm looking for explanation why the current approach was
        chosen instead of the other.

        Jan

        On 1/7/20 12:52 AM, Kenneth Knowles wrote:
        This thread has a lot in it, so I am just top-posting.

         - Stateful DoFn is a windowed operation; state is
        per-window. When the window expires, any further inputs are
        dropped.
         - "Late" is not synonymous with out-of-order. It doesn't
        really have an independent meaning.
            - For a GBK/Combine "late" means "not included prior to
        the on-time output", and "droppable" means "arriving after
        window expiry".
            - For Stateful DoFn there is no real meaning to "late"
        except if one is talking about "droppable", which still means
        "arriving after window expiry". A user may have a special
        timer where they flip a flag and treat elements after the
        timer differently.

        I think the definition of when data is droppable is very
        simple. We explicitly moved to this definition, away from the
        "out of order == late", because it is more robust and simpler
        to think about. Users saw lots of confusing behavior when we
        had "out of order by allowed lateness == droppable" logic.

        Kenn

        On Mon, Jan 6, 2020 at 1:42 AM Jan Lukavský <[email protected]
        <mailto:[email protected]>> wrote:

            > Generally the watermark update can overtake elements,
            because runners  can explicitly ignore late data in the
            watermark calculation (for good reason - those elements
            are already late, so no need to hold up the watermark
            advancing any more).

            This seems not to affect the decision of _not late_ vs.
            _late_, is it? If element is late and gets ignored from
            watermark calculation (whatever that includes in this
            context), than the watermark cannot move past elements
            that were not marked as _not late_ and thus nothing can
            make them _late_.

            > For GBK on-time data simply means the first pane marked
            as on time. For state+timers I don't think it makes sense
            for Beam to define on-time v.s. late, rather I think the
            user can come up with their own definition depending on
            their use case. For example, if you are buffering data
            into BagState and setting a timer to process it, it would
            be logical to say that any element that was buffered
            before the timer expired is on time, and any data that
            showed up after the timer fired is late. This would
            roughly correspond to what GBK does, and the answer would
            be very similar to simply comparing against the watermark
            (as the timers fire when the watermark advances).

            Yes, I'd say that stateful DoFns don't have (well
            defined) concept of pane, because that is related to
            concept of trigger and this is a concept of GBK (or
            windowed operations in general). The only semantic
            meaning of window in stateful DoFn is that it "scopes" state.

            This discussion might have got a little off the original
            question, so I'll try to rephrase it:

            Should stateful DoFn drop *all* late data, not just data
            that arrive after window boundary + allowed lateness?
            Some arguments why I think it should:
             * in windowed operations (GBK), it is correct to drop
            data on window boundaries only, because time (as seen by
            user) effectively hops only on these discrete time points
             * in stateful dofn on the other hand time move
            "smoothly" (yes, with some granularity, millisecond,
            nanosecond, whatever and with watermark updates only, but
            still)
             * this could be viewed that dropping late data
            immediately as time (again, from user perspective) moves
            (not on some more or less artificial boundary having only
            little semantic meaning) is consistent with both the
            above properties

            The negative side effect of this would be, that more data
            could be dropped, but ... isn't this what defines allowed
            lateness? I don't want to discuss the implications on
            user pipelines of such a change (and if we can or cannot
            do it), just trying to build some theoretical
            understanding of the problem as a whole. The decision if
            any change could / should be made can be done afterwards.

            Thanks,
             Jan

            On 1/4/20 10:35 PM, Reuven Lax wrote:


            On Sat, Jan 4, 2020 at 12:13 PM Jan Lukavský
            <[email protected] <mailto:[email protected]>> wrote:

                > Yes, but invariants should hold. If I add a ParDo
                that drops late elements (or, more commonly,diverts
                the late elements  to a different PCollection), then
                the result of that ParDo should _never_ introduce
                and more late data. This cannot be guaranteed simply
                with watermark checks. The ParDo may decide that the
                element was not late, but by the time it outputs the
                element the watermark may have advanced, causing the
                element to actually be late.

                This is actually very interesting. The question is -
                if I decide about lateness based on output watermark
                of a PTransform, is it still the case, that in
                downstream operator(s) the element could be changed
                from "not late" to "late"? Provided the output
                watermark is updated synchronously based on input
                data (which should be) and watermark update cannot
                "overtake" elements, I think that the downstream
                decision should not be changed, so the invariant
                should hold. Or am I missing something?


            Generally the watermark update can overtake elements,
            because runners can explicitly ignore late data in the
            watermark calculation (for good reason - those elements
            are already late, so no need to hold up the watermark
            advancing any more).

            For GBK on-time data simply means the first pane marked
            as on time. For state+timers I don't think it makes
            sense for Beam to define on-time v.s. late, rather I
            think the user can come up with their own definition
            depending on their use case. For example, if you are
            buffering data into BagState and setting a timer to
            process it, it would be logical to say that any element
            that was buffered before the timer expired is on time,
            and any data that showed up after the timer fired is
            late. This would roughly correspond to what GBK does,
            and the answer would be very similar to simply comparing
            against the watermark (as the timers fire when the
            watermark advances).

            Reuven

                On 1/4/20 8:11 PM, Reuven Lax wrote:


                On Sat, Jan 4, 2020 at 11:03 AM Jan Lukavský
                <[email protected] <mailto:[email protected]>> wrote:

                    On 1/4/20 6:14 PM, Reuven Lax wrote:
                    There is a very good reason not to define
                    lateness directly in terms of the watermark.
                    The model does not make any guarantees that
                    the watermark advances synchronously, and in
                    fact for the Dataflow runner the watermark
                    advances asynchronously (i.e. independent of
                    element processing). This means that simply
                    comparing an element timestamp against the
                    watermark creates a race condition. There are
                    cases where the answer could change depending
                    on exactly when you examine the watermark, and
                    if you examine again while processing the same
                    bundle you might come to a different
                    conclusion about lateness.
                    Due to monotonicity of watermark, I don't think
                    that the asynchronous updates of watermark can
                    change the answer from "late" to "not late".
                    That seems fine to me.


                It's the other way around. You check to see whether
                an element is late and the answer is "not late." An
                instant later the answer changes to "late" This
                does cause many problems, and is why this was changed.


                    This non determinism is undesirable when
                    considering lateness, as it can break many
                    invariants that users may rely on (e.g. if I
                    could write a ParDo that filtered all late
                    data, yet still find late data showing up
                    downstream of the ParDo which would be very
                    surprising). For that reason, the SDK always
                    marks things as late based on deterministic
                    signals. e.g. for a triggered GBK everything
                    in the first post-watermark pane is marked as
                    on time (no matter what the watermark is) and
                    everything in subsequent panes is marked as late.
                    Dropping latecomers will always be
                    non-deterministic, that is certain. This is
                    true even in case where watermark is updated
                    synchronously with element processing, due to
                    shuffling and varying (random) differences of
                    processing and event time in upstream
                    operator(s). The question was only if a
                    latecomer should be dropped only at a window
                    boundaries only (which is a sort of artificial
                    time boundary), or right away when spotted (in
                    stateful dofns only). Another question would be
                    if latecomers should be dropped based on input
                    or output watermark, dropping based on output
                    watermark seems even to be stable in the sense,
                    that all downstream operators should come to
                    the same conclusion (this is a bit of a
                    speculation).


                Yes, but invariants should hold. If I add a ParDo
                that drops late elements (or, more commonly,diverts
                the late elements  to a different PCollection),
                then the result of that ParDo should _never_
                introduce and more late data. This cannot be
                guaranteed simply with watermark checks. The ParDo
                may decide that the element was not late, but by
                the time it outputs the element the watermark may
                have advanced, causing the element to actually be late.

                In practice this is important. And early version of
                Dataflow (pre Beam) implemented lateness by
                comparing against the watermark, and it caused no
                end of trouble for users.


                    FYI - this is also the reason why Beam does
                    not currently provide users direct access to
                    the watermark. The asynchronous nature of it 
                    can be very confusing, and often results in
                    users writing bugs in their pipelines. We
                    decided instead to expose
                    easier-to-reason-about signals such as timers
                    (triggered by the watermark), windows, and
                    lateness.

                    Reuven

                    On Sat, Jan 4, 2020 at 1:15 AM Jan Lukavský
                    <[email protected] <mailto:[email protected]>> wrote:

                        I realized the problem. I misinterpreted
                        the LateDataDroppingDoFnRunner. It doesn't
                        drop *all* late (arriving after watermark
                        - allowed lateness) data, but only data,
                        that arrive after maxTimestamp +
                        allowedLateness of their respective windows.

                        Stateful DoFn can run on global window
                        (which was the case of my tests) and there
                        is no dropping then.

                        Two questions arise then:

                         a) does it mean that this is one more
                        argument to move this logic to
                        StatefulDoFnRunner? StatefulDoFnRunner
                        performs state cleanup on window GC time,
                        so without LateDataDroppingDoFnRunner and
                        late data will see empty state and will
                        produce wrong results.

                         b) is this behavior generally intentional
                        and correct? Windows and triggers are (in
                        my point of view) features of GBK, not
                        stateful DoFn. Stateful DoFn is a low
                        level primitive, which can be viewed to
                        operate on "instant" windows, which should
                        then probably be defined as dropping every
                        single element arrive after allowed
                        lateness. This might probably relate to
                        question if operations should be built
                        bottom up from most primitive and generic
                        ones to more specific ones - that is GBK
                        be implemented on top of stateful DoFn and
                        not vice versa.

                        Thoughts?

                        Jan

                        On 1/4/20 1:03 AM, Steve Niemitz wrote:
                        I do agree that the direct runner doesn't
                        drop late data arriving at a stateful
                        DoFn (I just tested as well).

                        However, I believe this is consistent
                        with other runners.  I'm fairly certain
                        (at least last time I checked) that at
                        least Dataflow will also only drop late
                        data at GBK operations, and NOT stateful
                        DoFns. Whether or not this is intentional
                        is debatable however, without being able
                        to inspect the watermark inside the
                        stateful DoFn, it'd be very difficult to
                        do anything useful with late data.


                        On Fri, Jan 3, 2020 at 5:47 PM Jan
                        Lukavský <[email protected]
                        <mailto:[email protected]>> wrote:

                            I did write a test that tested if
                            data is dropped in a plain stateful
                            DoFn. I did this as part of
                            validating that PR [1] didn't drop
                            more data when using
                            @RequiresTimeSortedInput than it
                            would without this annotation. This
                            test failed and I didn't commit it, yet.

                            The test was basically as follows:

                             - use TestStream to generate three
                            elements with timestamps 2, 1 and 0

                             - between elements with timestamp 1
                            and 0 move watermark to 1

                             - use allowed lateness of zero

                             - use stateful dofn that just emits
                            arbitrary data for each input element

                             - use Count.globally to count outputs

                            The outcome was that stateful dofn
                            using @RequiresTimeSortedInput output
                            2 elements, without the annotation it
                            was 3 elements. I think the correct
                            one would be 2 elements in this case.
                            The difference is caused by the
                            annotation having (currently) its own
                            logic for dropping data, which could
                            be removed if we agree, that the data
                            should be dropped in all cases.

                            On 1/3/20 11:23 PM, Kenneth Knowles
                            wrote:
                            Did you write such
                            a @Category(ValidatesRunner.class)
                            test? I believe the Java  direct
                            runner does drop late data, for both
                            GBK and stateful ParDo.

                            Stateful ParDo is implemented on top
                            of GBK:
                            
https://github.com/apache/beam/blob/64262a61402fad67d9ad8a66eaf6322593d3b5dc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java#L172

                            And GroupByKey, via
                            DirectGroupByKey, via
                            DirectGroupAlsoByWindow, does drop
                            late data:
                            
https://github.com/apache/beam/blob/c2f0d282337f3ae0196a7717712396a5a41fdde1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java#L220

                            I'm not sure why it has its own
                            code, since ReduceFnRunner also
                            drops late data, and it does use
                            ReduceFnRunner (the same code path
                            all Java-based runners use).

                            Kenn


                            On Fri, Jan 3, 2020 at 1:02 PM Jan
                            Lukavský <[email protected]
                            <mailto:[email protected]>> wrote:

                                Yes, the non-reliability of late
                                data dropping in distributed
                                runner is understood. But this
                                is even where DirectRunner can
                                play its role, because only
                                there it is actually possible to
                                emulate and test specific
                                watermark conditions. Question
                                regarding this for the java
                                DirectRunner - should we
                                completely drop
                                LataDataDroppingDoFnRunner and
                                delegate the late data dropping
                                to StatefulDoFnRunner? Seems
                                logical to me, as if we agree
                                that late data should always be
                                dropped, then there would no
                                "valid" use of
                                StatefulDoFnRunner without the
                                late data dropping functionality.

                                On 1/3/20 9:32 PM, Robert
                                Bradshaw wrote:
                                I agree, in fact we just
                                recently enabled late data
                                dropping to the direct runner
                                in Python to be able to develop
                                better tests for Dataflow.

                                It should be noted, however,
                                that in a distributed runner
                                (absent the quiessence of
                                TestStream) that one can't
                                *count* on late data being
                                dropped at a certain point, and
                                in fact (due to delays in fully
                                propagating the watermark) late
                                data can even become on-time,
                                so the promises about what
                                happens behind the
                                watermark are necessarily a bit
                                loose.

                                On Fri, Jan 3, 2020 at 9:15 AM
                                Luke Cwik <[email protected]
                                <mailto:[email protected]>> wrote:

                                    I agree that the
                                    DirectRunner should drop
                                    late data. Late data
                                    dropping is optional but
                                    the DirectRunner is used by
                                    many for testing and we
                                    should have the same
                                    behaviour they would get on
                                    other runners or users may
                                    be surprised.

                                    On Fri, Jan 3, 2020 at 3:33
                                    AM Jan Lukavský
                                    <[email protected]
                                    <mailto:[email protected]>>
                                    wrote:

                                        Hi,

                                        I just found out that
                                        DirectRunner is
                                        apparently not using
                                        LateDataDroppingDoFnRunner,
                                        which means that it
                                        doesn't drop late data
                                        in cases where there is
                                        no GBK operation
                                        involved (dropping in
                                        GBK seems
                                        to be correct). There
                                        is apparently no
                                        @Category(ValidatesRunner)
                                        test
                                        for that behavior
                                        (because DirectRunner
                                        would fail it), so the
                                        question
                                        is - should late data
                                        dropping be considered
                                        part of model (of which
                                        DirectRunner should be
                                        a canonical
                                        implementation) and
                                        therefore that
                                        should be fixed there,
                                        or is the late data
                                        dropping an optional
                                        feature
                                        of a runner?

                                        I'm strongly in favor
                                        of the first option,
                                        and I think it is
                                        likely that
                                        all real-world runners
                                        would probably adhere
                                        to that (I didn't check
                                        that, though).

                                        Opinions?

                                          Jan

Reply via email to