I did write a test that tested if data is dropped in a plain stateful DoFn. I did this as part of validating that PR [1] didn't drop more data when using @RequiresTimeSortedInput than it would without this annotation. This test failed and I didn't commit it, yet.

The test was basically as follows:

 - use TestStream to generate three elements with timestamps 2, 1 and 0

 - between elements with timestamp 1 and 0 move watermark to 1

 - use allowed lateness of zero

 - use stateful dofn that just emits arbitrary data for each input element

 - use Count.globally to count outputs

The outcome was that stateful dofn using @RequiresTimeSortedInput output 2 elements, without the annotation it was 3 elements. I think the correct one would be 2 elements in this case. The difference is caused by the annotation having (currently) its own logic for dropping data, which could be removed if we agree, that the data should be dropped in all cases.

On 1/3/20 11:23 PM, Kenneth Knowles wrote:
Did you write such a @Category(ValidatesRunner.class) test? I believe the Java direct runner does drop late data, for both GBK and stateful ParDo.

Stateful ParDo is implemented on top of GBK: https://github.com/apache/beam/blob/64262a61402fad67d9ad8a66eaf6322593d3b5dc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java#L172

And GroupByKey, via DirectGroupByKey, via DirectGroupAlsoByWindow, does drop late data: https://github.com/apache/beam/blob/c2f0d282337f3ae0196a7717712396a5a41fdde1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java#L220

I'm not sure why it has its own code, since ReduceFnRunner also drops late data, and it does use ReduceFnRunner (the same code path all Java-based runners use).

Kenn


On Fri, Jan 3, 2020 at 1:02 PM Jan Lukavský <[email protected] <mailto:[email protected]>> wrote:

    Yes, the non-reliability of late data dropping in distributed
    runner is understood. But this is even where DirectRunner can play
    its role, because only there it is actually possible to emulate
    and test specific watermark conditions. Question regarding this
    for the java DirectRunner - should we completely drop
    LataDataDroppingDoFnRunner and delegate the late data dropping to
    StatefulDoFnRunner? Seems logical to me, as if we agree that late
    data should always be dropped, then there would no "valid" use of
    StatefulDoFnRunner without the late data dropping functionality.

    On 1/3/20 9:32 PM, Robert Bradshaw wrote:
    I agree, in fact we just recently enabled late data dropping to
    the direct runner in Python to be able to develop better tests
    for Dataflow.

    It should be noted, however, that in a distributed runner (absent
    the quiessence of TestStream) that one can't *count* on late data
    being dropped at a certain point, and in fact (due to delays in
    fully propagating the watermark) late data can even become
    on-time, so the promises about what happens behind the
    watermark are necessarily a bit loose.

    On Fri, Jan 3, 2020 at 9:15 AM Luke Cwik <[email protected]
    <mailto:[email protected]>> wrote:

        I agree that the DirectRunner should drop late data. Late
        data dropping is optional but the DirectRunner is used by
        many for testing and we should have the same behaviour they
        would get on other runners or users may be surprised.

        On Fri, Jan 3, 2020 at 3:33 AM Jan Lukavský <[email protected]
        <mailto:[email protected]>> wrote:

            Hi,

            I just found out that DirectRunner is apparently not using
            LateDataDroppingDoFnRunner, which means that it doesn't
            drop late data
            in cases where there is no GBK operation involved
            (dropping in GBK seems
            to be correct). There is apparently no
            @Category(ValidatesRunner) test
            for that behavior (because DirectRunner would fail it),
            so the question
            is - should late data dropping be considered part of
            model (of which
            DirectRunner should be a canonical implementation) and
            therefore that
            should be fixed there, or is the late data dropping an
            optional feature
            of a runner?

            I'm strongly in favor of the first option, and I think it
            is likely that
            all real-world runners would probably adhere to that (I
            didn't check
            that, though).

            Opinions?

              Jan

Reply via email to