On 1/4/20 6:14 PM, Reuven Lax wrote:
There is a very good reason not to define lateness directly in terms of the watermark. The model does not make any guarantees that the watermark advances synchronously, and in fact for the Dataflow runner the watermark advances asynchronously (i.e. independent of element processing). This means that simply comparing an element timestamp against the watermark creates a race condition. There are cases where the answer could change depending on exactly when you examine the watermark, and if you examine again while processing the same bundle you might come to a different conclusion about lateness.
Due to monotonicity of watermark, I don't think that the asynchronous updates of watermark can change the answer from "late" to "not late". That seems fine to me.

This non determinism is undesirable when considering lateness, as it can break many invariants that users may rely on (e.g. if I could write a ParDo that filtered all late data, yet still find late data showing up downstream of the ParDo which would be very surprising). For that reason, the SDK always marks things as late based on deterministic signals. e.g. for a triggered GBK everything in the first post-watermark pane is marked as on time (no matter what the watermark is) and everything in subsequent panes is marked as late.
Dropping latecomers will always be non-deterministic, that is certain. This is true even in case where watermark is updated synchronously with element processing, due to shuffling and varying (random) differences of processing and event time in upstream operator(s). The question was only if a latecomer should be dropped only at a window boundaries only (which is a sort of artificial time boundary), or right away when spotted (in stateful dofns only). Another question would be if latecomers should be dropped based on input or output watermark, dropping based on output watermark seems even to be stable in the sense, that all downstream operators should come to the same conclusion (this is a bit of a speculation).

FYI - this is also the reason why Beam does not currently provide users direct access to the watermark. The asynchronous nature of it  can be very confusing, and often results in users writing bugs in their pipelines. We decided instead to expose easier-to-reason-about signals such as timers (triggered by the watermark), windows, and lateness.

Reuven

On Sat, Jan 4, 2020 at 1:15 AM Jan Lukavský <[email protected] <mailto:[email protected]>> wrote:

    I realized the problem. I misinterpreted the
    LateDataDroppingDoFnRunner. It doesn't drop *all* late (arriving
    after watermark - allowed lateness) data, but only data, that
    arrive after maxTimestamp + allowedLateness of their respective
    windows.

    Stateful DoFn can run on global window (which was the case of my
    tests) and there is no dropping then.

    Two questions arise then:

     a) does it mean that this is one more argument to move this logic
    to StatefulDoFnRunner? StatefulDoFnRunner performs state cleanup
    on window GC time, so without LateDataDroppingDoFnRunner and late
    data will see empty state and will produce wrong results.

     b) is this behavior generally intentional and correct? Windows
    and triggers are (in my point of view) features of GBK, not
    stateful DoFn. Stateful DoFn is a low level primitive, which can
    be viewed to operate on "instant" windows, which should then
    probably be defined as dropping every single element arrive after
    allowed lateness. This might probably relate to question if
    operations should be built bottom up from most primitive and
    generic ones to more specific ones - that is GBK be implemented on
    top of stateful DoFn and not vice versa.

    Thoughts?

    Jan

    On 1/4/20 1:03 AM, Steve Niemitz wrote:
    I do agree that the direct runner doesn't drop late data arriving
    at a stateful DoFn (I just tested as well).

    However, I believe this is consistent with other runners.  I'm
    fairly certain (at least last time I checked) that at least
    Dataflow will also only drop late data at GBK operations, and NOT
    stateful DoFns. Whether or not this is intentional is debatable
    however, without being able to inspect the watermark inside the
    stateful DoFn, it'd be very difficult to do anything useful with
    late data.


    On Fri, Jan 3, 2020 at 5:47 PM Jan Lukavský <[email protected]
    <mailto:[email protected]>> wrote:

        I did write a test that tested if data is dropped in a plain
        stateful DoFn. I did this as part of validating that PR [1]
        didn't drop more data when using @RequiresTimeSortedInput
        than it would without this annotation. This test failed and I
        didn't commit it, yet.

        The test was basically as follows:

         - use TestStream to generate three elements with timestamps
        2, 1 and 0

         - between elements with timestamp 1 and 0 move watermark to 1

         - use allowed lateness of zero

         - use stateful dofn that just emits arbitrary data for each
        input element

         - use Count.globally to count outputs

        The outcome was that stateful dofn using
        @RequiresTimeSortedInput output 2 elements, without the
        annotation it was 3 elements. I think the correct one would
        be 2 elements in this case. The difference is caused by the
        annotation having (currently) its own logic for dropping
        data, which could be removed if we agree, that the data
        should be dropped in all cases.

        On 1/3/20 11:23 PM, Kenneth Knowles wrote:
        Did you write such a @Category(ValidatesRunner.class) test?
        I believe the Java  direct runner does drop late data, for
        both GBK and stateful ParDo.

        Stateful ParDo is implemented on top of GBK:
        
https://github.com/apache/beam/blob/64262a61402fad67d9ad8a66eaf6322593d3b5dc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java#L172

        And GroupByKey, via DirectGroupByKey, via
        DirectGroupAlsoByWindow, does drop late data:
        
https://github.com/apache/beam/blob/c2f0d282337f3ae0196a7717712396a5a41fdde1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java#L220

        I'm not sure why it has its own code, since ReduceFnRunner
        also drops late data, and it does use ReduceFnRunner (the
        same code path all Java-based runners use).

        Kenn


        On Fri, Jan 3, 2020 at 1:02 PM Jan Lukavský <[email protected]
        <mailto:[email protected]>> wrote:

            Yes, the non-reliability of late data dropping in
            distributed runner is understood. But this is even where
            DirectRunner can play its role, because only there it is
            actually possible to emulate and test specific watermark
            conditions. Question regarding this for the java
            DirectRunner - should we completely drop
            LataDataDroppingDoFnRunner and delegate the late data
            dropping to StatefulDoFnRunner? Seems logical to me, as
            if we agree that late data should always be dropped,
            then there would no "valid" use of StatefulDoFnRunner
            without the late data dropping functionality.

            On 1/3/20 9:32 PM, Robert Bradshaw wrote:
            I agree, in fact we just recently enabled late data
            dropping to the direct runner in Python to be able to
            develop better tests for Dataflow.

            It should be noted, however, that in a distributed
            runner (absent the quiessence of TestStream) that one
            can't *count* on late data being dropped at a certain
            point, and in fact (due to delays in fully propagating
            the watermark) late data can even become on-time, so
            the promises about what happens behind the
            watermark are necessarily a bit loose.

            On Fri, Jan 3, 2020 at 9:15 AM Luke Cwik
            <[email protected] <mailto:[email protected]>> wrote:

                I agree that the DirectRunner should drop late
                data. Late data dropping is optional but the
                DirectRunner is used by many for testing and we
                should have the same behaviour they would get on
                other runners or users may be surprised.

                On Fri, Jan 3, 2020 at 3:33 AM Jan Lukavský
                <[email protected] <mailto:[email protected]>> wrote:

                    Hi,

                    I just found out that DirectRunner is
                    apparently not using
                    LateDataDroppingDoFnRunner, which means that it
                    doesn't drop late data
                    in cases where there is no GBK operation
                    involved (dropping in GBK seems
                    to be correct). There is apparently no
                    @Category(ValidatesRunner) test
                    for that behavior (because DirectRunner would
                    fail it), so the question
                    is - should late data dropping be considered
                    part of model (of which
                    DirectRunner should be a canonical
                    implementation) and therefore that
                    should be fixed there, or is the late data
                    dropping an optional feature
                    of a runner?

                    I'm strongly in favor of the first option, and
                    I think it is likely that
                    all real-world runners would probably adhere to
                    that (I didn't check
                    that, though).

                    Opinions?

                      Jan

Reply via email to