On Tue, Jan 7, 2020 at 1:39 AM Jan Lukavský <je...@seznam.cz> wrote:
> Hi Kenn, > > I see that my terminology seems not to be 100% aligned with Beam's. I'll > work on that. :-) > > I agree with what you say, and by "late" I mostly meant "droppable" > (arriving too late after watermark). > > I'm definitely not proposing to get back to something like "out of order" > == "late" or anything like that. I'm also aware that stateful operation is > windowed operation, but the semantics of the windowing is different than of > a GBK. The difference is how time moves in GBK and how moves in stateful > DoFn. Throwing away some details (early triggers, late data triggers), the > main difference is that in GBK case, time hops just between window > boundaries, while in stateful DoFn time moves "smoothly" (with each > watermark update). Now, this difference brings the question about why the > definition of "droppable" data is the same for both types of operations, > when there is a difference in how users "perceive" time. As the more > generic operation, stateful DoFn might deserve a more general definition of > droppable data, which should degrade naturally to the one of GBK in > presence of "discrete time hops". > I understand what you mean. On the other hand, I encourage thinking of event time spatially, not as time passing. That is a big part of unifying batch/streaming real-time/archival processing. The event time window is a secondary key to partition the data (merging windows are slightly more complex). All event time windows exist simultaneously. So for both stateful ParDo and GBK, I find it helpful to consider this perspective where all windows are processed simultaneously / in an arbitrary order not assuming windows are ordered at all. Then you see that GBK and stateful ParDo do not really treat windows / watermark differently: both of them process a stream of data for each (key, window) pair until the watermark informs them that the stream is expired, then they GC the state associated with that (key, window) pair. Kenn > This might have some consequences on how the droppable data should be > handled in presence of (early) triggers, because triggerring is actually > what makes time to "hop", so we might arrive to a conclusion that we might > actually drop any data that has timestamp less than "last trigger time + > allowed lateness". This looks appealing to me, because IMO it has strong > internal logical consistency. Although it is possible that it would drop > more data, which is generally undesirable, I admit that. > > I'm looking for explanation why the current approach was chosen instead of > the other. > > Jan > On 1/7/20 12:52 AM, Kenneth Knowles wrote: > > This thread has a lot in it, so I am just top-posting. > > - Stateful DoFn is a windowed operation; state is per-window. When the > window expires, any further inputs are dropped. > - "Late" is not synonymous with out-of-order. It doesn't really have an > independent meaning. > - For a GBK/Combine "late" means "not included prior to the on-time > output", and "droppable" means "arriving after window expiry". > - For Stateful DoFn there is no real meaning to "late" except if one > is talking about "droppable", which still means "arriving after window > expiry". A user may have a special timer where they flip a flag and treat > elements after the timer differently. > > I think the definition of when data is droppable is very simple. We > explicitly moved to this definition, away from the "out of order == late", > because it is more robust and simpler to think about. Users saw lots of > confusing behavior when we had "out of order by allowed lateness == > droppable" logic. > > Kenn > > On Mon, Jan 6, 2020 at 1:42 AM Jan Lukavský <je...@seznam.cz> wrote: > >> > Generally the watermark update can overtake elements, because runners >> can explicitly ignore late data in the watermark calculation (for good >> reason - those elements are already late, so no need to hold up the >> watermark advancing any more). >> This seems not to affect the decision of _not late_ vs. _late_, is it? If >> element is late and gets ignored from watermark calculation (whatever that >> includes in this context), than the watermark cannot move past elements >> that were not marked as _not late_ and thus nothing can make them _late_. >> >> > For GBK on-time data simply means the first pane marked as on time. For >> state+timers I don't think it makes sense for Beam to define on-time v.s. >> late, rather I think the user can come up with their own definition >> depending on their use case. For example, if you are buffering data into >> BagState and setting a timer to process it, it would be logical to say that >> any element that was buffered before the timer expired is on time, and any >> data that showed up after the timer fired is late. This would roughly >> correspond to what GBK does, and the answer would be very similar to simply >> comparing against the watermark (as the timers fire when the watermark >> advances). >> >> Yes, I'd say that stateful DoFns don't have (well defined) concept of >> pane, because that is related to concept of trigger and this is a concept >> of GBK (or windowed operations in general). The only semantic meaning of >> window in stateful DoFn is that it "scopes" state. >> >> This discussion might have got a little off the original question, so >> I'll try to rephrase it: >> >> Should stateful DoFn drop *all* late data, not just data that arrive >> after window boundary + allowed lateness? Some arguments why I think it >> should: >> * in windowed operations (GBK), it is correct to drop data on window >> boundaries only, because time (as seen by user) effectively hops only on >> these discrete time points >> * in stateful dofn on the other hand time move "smoothly" (yes, with >> some granularity, millisecond, nanosecond, whatever and with watermark >> updates only, but still) >> * this could be viewed that dropping late data immediately as time >> (again, from user perspective) moves (not on some more or less artificial >> boundary having only little semantic meaning) is consistent with both the >> above properties >> >> The negative side effect of this would be, that more data could be >> dropped, but ... isn't this what defines allowed lateness? I don't want to >> discuss the implications on user pipelines of such a change (and if we can >> or cannot do it), just trying to build some theoretical understanding of >> the problem as a whole. The decision if any change could / should be made >> can be done afterwards. >> >> Thanks, >> Jan >> >> On 1/4/20 10:35 PM, Reuven Lax wrote: >> >> >> >> On Sat, Jan 4, 2020 at 12:13 PM Jan Lukavský <je...@seznam.cz> wrote: >> >>> > Yes, but invariants should hold. If I add a ParDo that drops late >>> elements (or, more commonly,diverts the late elements to a different >>> PCollection), then the result of that ParDo should _never_ introduce and >>> more late data. This cannot be guaranteed simply with watermark checks. The >>> ParDo may decide that the element was not late, but by the time it outputs >>> the element the watermark may have advanced, causing the element to >>> actually be late. >>> >>> This is actually very interesting. The question is - if I decide about >>> lateness based on output watermark of a PTransform, is it still the case, >>> that in downstream operator(s) the element could be changed from "not late" >>> to "late"? Provided the output watermark is updated synchronously based on >>> input data (which should be) and watermark update cannot "overtake" >>> elements, I think that the downstream decision should not be changed, so >>> the invariant should hold. Or am I missing something? >>> >> >> Generally the watermark update can overtake elements, because runners >> can explicitly ignore late data in the watermark calculation (for good >> reason - those elements are already late, so no need to hold up the >> watermark advancing any more). >> >> For GBK on-time data simply means the first pane marked as on time. For >> state+timers I don't think it makes sense for Beam to define on-time v.s. >> late, rather I think the user can come up with their own definition >> depending on their use case. For example, if you are buffering data into >> BagState and setting a timer to process it, it would be logical to say that >> any element that was buffered before the timer expired is on time, and any >> data that showed up after the timer fired is late. This would roughly >> correspond to what GBK does, and the answer would be very similar to simply >> comparing against the watermark (as the timers fire when the watermark >> advances). >> >> Reuven >> >>> On 1/4/20 8:11 PM, Reuven Lax wrote: >>> >>> >>> >>> On Sat, Jan 4, 2020 at 11:03 AM Jan Lukavský <je...@seznam.cz> wrote: >>> >>>> On 1/4/20 6:14 PM, Reuven Lax wrote: >>>> >>>> There is a very good reason not to define lateness directly in terms of >>>> the watermark. The model does not make any guarantees that the watermark >>>> advances synchronously, and in fact for the Dataflow runner the watermark >>>> advances asynchronously (i.e. independent of element processing). This >>>> means that simply comparing an element timestamp against the watermark >>>> creates a race condition. There are cases where the answer could change >>>> depending on exactly when you examine the watermark, and if you examine >>>> again while processing the same bundle you might come to a different >>>> conclusion about lateness. >>>> >>>> Due to monotonicity of watermark, I don't think that the asynchronous >>>> updates of watermark can change the answer from "late" to "not late". That >>>> seems fine to me. >>>> >>> >>> It's the other way around. You check to see whether an element is late >>> and the answer is "not late." An instant later the answer changes to >>> "late" This does cause many problems, and is why this was changed. >>> >>>> >>>> This non determinism is undesirable when considering lateness, as it >>>> can break many invariants that users may rely on (e.g. if I could write a >>>> ParDo that filtered all late data, yet still find late data showing up >>>> downstream of the ParDo which would be very surprising). For that reason, >>>> the SDK always marks things as late based on deterministic signals. e.g. >>>> for a triggered GBK everything in the first post-watermark pane is marked >>>> as on time (no matter what the watermark is) and everything in subsequent >>>> panes is marked as late. >>>> >>>> Dropping latecomers will always be non-deterministic, that is certain. >>>> This is true even in case where watermark is updated synchronously with >>>> element processing, due to shuffling and varying (random) differences of >>>> processing and event time in upstream operator(s). The question was only if >>>> a latecomer should be dropped only at a window boundaries only (which is a >>>> sort of artificial time boundary), or right away when spotted (in stateful >>>> dofns only). Another question would be if latecomers should be dropped >>>> based on input or output watermark, dropping based on output watermark >>>> seems even to be stable in the sense, that all downstream operators should >>>> come to the same conclusion (this is a bit of a speculation). >>>> >>> >>> Yes, but invariants should hold. If I add a ParDo that drops late >>> elements (or, more commonly,diverts the late elements to a different >>> PCollection), then the result of that ParDo should _never_ introduce and >>> more late data. This cannot be guaranteed simply with watermark checks. The >>> ParDo may decide that the element was not late, but by the time it outputs >>> the element the watermark may have advanced, causing the element to >>> actually be late. >>> >>> In practice this is important. And early version of Dataflow (pre Beam) >>> implemented lateness by comparing against the watermark, and it caused no >>> end of trouble for users. >>> >>>> >>>> FYI - this is also the reason why Beam does not currently provide users >>>> direct access to the watermark. The asynchronous nature of it can be very >>>> confusing, and often results in users writing bugs in their pipelines. We >>>> decided instead to expose easier-to-reason-about signals such as timers >>>> (triggered by the watermark), windows, and lateness. >>>> >>>> Reuven >>>> >>>> On Sat, Jan 4, 2020 at 1:15 AM Jan Lukavský <je...@seznam.cz> wrote: >>>> >>>>> I realized the problem. I misinterpreted the >>>>> LateDataDroppingDoFnRunner. It doesn't drop *all* late (arriving after >>>>> watermark - allowed lateness) data, but only data, that arrive after >>>>> maxTimestamp + allowedLateness of their respective windows. >>>>> >>>>> Stateful DoFn can run on global window (which was the case of my >>>>> tests) and there is no dropping then. >>>>> >>>>> Two questions arise then: >>>>> >>>>> a) does it mean that this is one more argument to move this logic to >>>>> StatefulDoFnRunner? StatefulDoFnRunner performs state cleanup on window GC >>>>> time, so without LateDataDroppingDoFnRunner and late data will see empty >>>>> state and will produce wrong results. >>>>> >>>>> b) is this behavior generally intentional and correct? Windows and >>>>> triggers are (in my point of view) features of GBK, not stateful DoFn. >>>>> Stateful DoFn is a low level primitive, which can be viewed to operate on >>>>> "instant" windows, which should then probably be defined as dropping every >>>>> single element arrive after allowed lateness. This might probably relate >>>>> to >>>>> question if operations should be built bottom up from most primitive and >>>>> generic ones to more specific ones - that is GBK be implemented on top of >>>>> stateful DoFn and not vice versa. >>>>> >>>>> Thoughts? >>>>> >>>>> Jan >>>>> On 1/4/20 1:03 AM, Steve Niemitz wrote: >>>>> >>>>> I do agree that the direct runner doesn't drop late data arriving at a >>>>> stateful DoFn (I just tested as well). >>>>> >>>>> However, I believe this is consistent with other runners. I'm fairly >>>>> certain (at least last time I checked) that at least Dataflow will also >>>>> only drop late data at GBK operations, and NOT stateful DoFns. Whether or >>>>> not this is intentional is debatable however, without being able to >>>>> inspect >>>>> the watermark inside the stateful DoFn, it'd be very difficult to do >>>>> anything useful with late data. >>>>> >>>>> >>>>> On Fri, Jan 3, 2020 at 5:47 PM Jan Lukavský <je...@seznam.cz> wrote: >>>>> >>>>>> I did write a test that tested if data is dropped in a plain stateful >>>>>> DoFn. I did this as part of validating that PR [1] didn't drop more data >>>>>> when using @RequiresTimeSortedInput than it would without this >>>>>> annotation. >>>>>> This test failed and I didn't commit it, yet. >>>>>> >>>>>> The test was basically as follows: >>>>>> >>>>>> - use TestStream to generate three elements with timestamps 2, 1 and >>>>>> 0 >>>>>> >>>>>> - between elements with timestamp 1 and 0 move watermark to 1 >>>>>> >>>>>> - use allowed lateness of zero >>>>>> >>>>>> - use stateful dofn that just emits arbitrary data for each input >>>>>> element >>>>>> >>>>>> - use Count.globally to count outputs >>>>>> >>>>>> The outcome was that stateful dofn using @RequiresTimeSortedInput >>>>>> output 2 elements, without the annotation it was 3 elements. I think the >>>>>> correct one would be 2 elements in this case. The difference is caused by >>>>>> the annotation having (currently) its own logic for dropping data, which >>>>>> could be removed if we agree, that the data should be dropped in all >>>>>> cases. >>>>>> On 1/3/20 11:23 PM, Kenneth Knowles wrote: >>>>>> >>>>>> Did you write such a @Category(ValidatesRunner.class) test? I believe >>>>>> the Java direct runner does drop late data, for both GBK and stateful >>>>>> ParDo. >>>>>> >>>>>> Stateful ParDo is implemented on top of GBK: >>>>>> https://github.com/apache/beam/blob/64262a61402fad67d9ad8a66eaf6322593d3b5dc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java#L172 >>>>>> >>>>>> And GroupByKey, via DirectGroupByKey, via DirectGroupAlsoByWindow, >>>>>> does drop late data: >>>>>> https://github.com/apache/beam/blob/c2f0d282337f3ae0196a7717712396a5a41fdde1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java#L220 >>>>>> >>>>>> I'm not sure why it has its own code, since ReduceFnRunner also drops >>>>>> late data, and it does use ReduceFnRunner (the same code path all >>>>>> Java-based runners use). >>>>>> >>>>>> Kenn >>>>>> >>>>>> >>>>>> On Fri, Jan 3, 2020 at 1:02 PM Jan Lukavský <je...@seznam.cz> wrote: >>>>>> >>>>>>> Yes, the non-reliability of late data dropping in distributed runner >>>>>>> is understood. But this is even where DirectRunner can play its role, >>>>>>> because only there it is actually possible to emulate and test specific >>>>>>> watermark conditions. Question regarding this for the java DirectRunner >>>>>>> - >>>>>>> should we completely drop LataDataDroppingDoFnRunner and delegate the >>>>>>> late >>>>>>> data dropping to StatefulDoFnRunner? Seems logical to me, as if we agree >>>>>>> that late data should always be dropped, then there would no "valid" >>>>>>> use of >>>>>>> StatefulDoFnRunner without the late data dropping functionality. >>>>>>> On 1/3/20 9:32 PM, Robert Bradshaw wrote: >>>>>>> >>>>>>> I agree, in fact we just recently enabled late data dropping to the >>>>>>> direct runner in Python to be able to develop better tests for Dataflow. >>>>>>> >>>>>>> It should be noted, however, that in a distributed runner (absent >>>>>>> the quiessence of TestStream) that one can't *count* on late data being >>>>>>> dropped at a certain point, and in fact (due to delays in fully >>>>>>> propagating >>>>>>> the watermark) late data can even become on-time, so the promises about >>>>>>> what happens behind the watermark are necessarily a bit loose. >>>>>>> >>>>>>> On Fri, Jan 3, 2020 at 9:15 AM Luke Cwik <lc...@google.com> wrote: >>>>>>> >>>>>>>> I agree that the DirectRunner should drop late data. Late data >>>>>>>> dropping is optional but the DirectRunner is used by many for testing >>>>>>>> and >>>>>>>> we should have the same behaviour they would get on other runners or >>>>>>>> users >>>>>>>> may be surprised. >>>>>>>> >>>>>>>> On Fri, Jan 3, 2020 at 3:33 AM Jan Lukavský <je...@seznam.cz> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> I just found out that DirectRunner is apparently not using >>>>>>>>> LateDataDroppingDoFnRunner, which means that it doesn't drop late >>>>>>>>> data >>>>>>>>> in cases where there is no GBK operation involved (dropping in GBK >>>>>>>>> seems >>>>>>>>> to be correct). There is apparently no @Category(ValidatesRunner) >>>>>>>>> test >>>>>>>>> for that behavior (because DirectRunner would fail it), so the >>>>>>>>> question >>>>>>>>> is - should late data dropping be considered part of model (of >>>>>>>>> which >>>>>>>>> DirectRunner should be a canonical implementation) and therefore >>>>>>>>> that >>>>>>>>> should be fixed there, or is the late data dropping an optional >>>>>>>>> feature >>>>>>>>> of a runner? >>>>>>>>> >>>>>>>>> I'm strongly in favor of the first option, and I think it is >>>>>>>>> likely that >>>>>>>>> all real-world runners would probably adhere to that (I didn't >>>>>>>>> check >>>>>>>>> that, though). >>>>>>>>> >>>>>>>>> Opinions? >>>>>>>>> >>>>>>>>> Jan >>>>>>>>> >>>>>>>>>