Did you write such a @Category(ValidatesRunner.class) test? I believe the Java direct runner does drop late data, for both GBK and stateful ParDo.
Stateful ParDo is implemented on top of GBK: https://github.com/apache/beam/blob/64262a61402fad67d9ad8a66eaf6322593d3b5dc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java#L172 And GroupByKey, via DirectGroupByKey, via DirectGroupAlsoByWindow, does drop late data: https://github.com/apache/beam/blob/c2f0d282337f3ae0196a7717712396a5a41fdde1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java#L220 I'm not sure why it has its own code, since ReduceFnRunner also drops late data, and it does use ReduceFnRunner (the same code path all Java-based runners use). Kenn On Fri, Jan 3, 2020 at 1:02 PM Jan Lukavský <[email protected]> wrote: > Yes, the non-reliability of late data dropping in distributed runner is > understood. But this is even where DirectRunner can play its role, because > only there it is actually possible to emulate and test specific watermark > conditions. Question regarding this for the java DirectRunner - should we > completely drop LataDataDroppingDoFnRunner and delegate the late data > dropping to StatefulDoFnRunner? Seems logical to me, as if we agree that > late data should always be dropped, then there would no "valid" use of > StatefulDoFnRunner without the late data dropping functionality. > On 1/3/20 9:32 PM, Robert Bradshaw wrote: > > I agree, in fact we just recently enabled late data dropping to the direct > runner in Python to be able to develop better tests for Dataflow. > > It should be noted, however, that in a distributed runner (absent the > quiessence of TestStream) that one can't *count* on late data being dropped > at a certain point, and in fact (due to delays in fully propagating the > watermark) late data can even become on-time, so the promises about what > happens behind the watermark are necessarily a bit loose. > > On Fri, Jan 3, 2020 at 9:15 AM Luke Cwik <[email protected]> wrote: > >> I agree that the DirectRunner should drop late data. Late data dropping >> is optional but the DirectRunner is used by many for testing and we should >> have the same behaviour they would get on other runners or users may be >> surprised. >> >> On Fri, Jan 3, 2020 at 3:33 AM Jan Lukavský <[email protected]> wrote: >> >>> Hi, >>> >>> I just found out that DirectRunner is apparently not using >>> LateDataDroppingDoFnRunner, which means that it doesn't drop late data >>> in cases where there is no GBK operation involved (dropping in GBK seems >>> to be correct). There is apparently no @Category(ValidatesRunner) test >>> for that behavior (because DirectRunner would fail it), so the question >>> is - should late data dropping be considered part of model (of which >>> DirectRunner should be a canonical implementation) and therefore that >>> should be fixed there, or is the late data dropping an optional feature >>> of a runner? >>> >>> I'm strongly in favor of the first option, and I think it is likely that >>> all real-world runners would probably adhere to that (I didn't check >>> that, though). >>> >>> Opinions? >>> >>> Jan >>> >>>
