Did you write such a @Category(ValidatesRunner.class) test? I believe the
Java  direct runner does drop late data, for both GBK and stateful ParDo.

Stateful ParDo is implemented on top of GBK:
https://github.com/apache/beam/blob/64262a61402fad67d9ad8a66eaf6322593d3b5dc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java#L172

And GroupByKey, via DirectGroupByKey, via DirectGroupAlsoByWindow, does
drop late data:
https://github.com/apache/beam/blob/c2f0d282337f3ae0196a7717712396a5a41fdde1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java#L220

I'm not sure why it has its own code, since ReduceFnRunner also drops late
data, and it does use ReduceFnRunner (the same code path all Java-based
runners use).

Kenn


On Fri, Jan 3, 2020 at 1:02 PM Jan Lukavský <[email protected]> wrote:

> Yes, the non-reliability of late data dropping in distributed runner is
> understood. But this is even where DirectRunner can play its role, because
> only there it is actually possible to emulate and test specific watermark
> conditions. Question regarding this for the java DirectRunner - should we
> completely drop LataDataDroppingDoFnRunner and delegate the late data
> dropping to StatefulDoFnRunner? Seems logical to me, as if we agree that
> late data should always be dropped, then there would no "valid" use of
> StatefulDoFnRunner without the late data dropping functionality.
> On 1/3/20 9:32 PM, Robert Bradshaw wrote:
>
> I agree, in fact we just recently enabled late data dropping to the direct
> runner in Python to be able to develop better tests for Dataflow.
>
> It should be noted, however, that in a distributed runner (absent the
> quiessence of TestStream) that one can't *count* on late data being dropped
> at a certain point, and in fact (due to delays in fully propagating the
> watermark) late data can even become on-time, so the promises about what
> happens behind the watermark are necessarily a bit loose.
>
> On Fri, Jan 3, 2020 at 9:15 AM Luke Cwik <[email protected]> wrote:
>
>> I agree that the DirectRunner should drop late data. Late data dropping
>> is optional but the DirectRunner is used by many for testing and we should
>> have the same behaviour they would get on other runners or users may be
>> surprised.
>>
>> On Fri, Jan 3, 2020 at 3:33 AM Jan Lukavský <[email protected]> wrote:
>>
>>> Hi,
>>>
>>> I just found out that DirectRunner is apparently not using
>>> LateDataDroppingDoFnRunner, which means that it doesn't drop late data
>>> in cases where there is no GBK operation involved (dropping in GBK seems
>>> to be correct). There is apparently no @Category(ValidatesRunner) test
>>> for that behavior (because DirectRunner would fail it), so the question
>>> is - should late data dropping be considered part of model (of which
>>> DirectRunner should be a canonical implementation) and therefore that
>>> should be fixed there, or is the late data dropping an optional feature
>>> of a runner?
>>>
>>> I'm strongly in favor of the first option, and I think it is likely that
>>> all real-world runners would probably adhere to that (I didn't check
>>> that, though).
>>>
>>> Opinions?
>>>
>>>   Jan
>>>
>>>

Reply via email to