That is a really good way to describe my mental model as well. On Tue, Jan 7, 2020 at 12:20 PM Kenneth Knowles <[email protected]> wrote:
> > > On Tue, Jan 7, 2020 at 1:39 AM Jan Lukavský <[email protected]> wrote: > >> Hi Kenn, >> >> I see that my terminology seems not to be 100% aligned with Beam's. I'll >> work on that. :-) >> >> I agree with what you say, and by "late" I mostly meant "droppable" >> (arriving too late after watermark). >> >> I'm definitely not proposing to get back to something like "out of order" >> == "late" or anything like that. I'm also aware that stateful operation is >> windowed operation, but the semantics of the windowing is different than of >> a GBK. The difference is how time moves in GBK and how moves in stateful >> DoFn. Throwing away some details (early triggers, late data triggers), the >> main difference is that in GBK case, time hops just between window >> boundaries, while in stateful DoFn time moves "smoothly" (with each >> watermark update). Now, this difference brings the question about why the >> definition of "droppable" data is the same for both types of operations, >> when there is a difference in how users "perceive" time. As the more >> generic operation, stateful DoFn might deserve a more general definition of >> droppable data, which should degrade naturally to the one of GBK in >> presence of "discrete time hops". >> > > I understand what you mean. On the other hand, I encourage thinking of > event time spatially, not as time passing. That is a big part of unifying > batch/streaming real-time/archival processing. The event time window is a > secondary key to partition the data (merging windows are slightly more > complex). All event time windows exist simultaneously. So for both stateful > ParDo and GBK, I find it helpful to consider this perspective where all > windows are processed simultaneously / in an arbitrary order not assuming > windows are ordered at all. Then you see that GBK and stateful ParDo do not > really treat windows / watermark differently: both of them process a stream > of data for each (key, window) pair until the watermark informs them that > the stream is expired, then they GC the state associated with that (key, > window) pair. > > Kenn > >> This might have some consequences on how the droppable data should be >> handled in presence of (early) triggers, because triggerring is actually >> what makes time to "hop", so we might arrive to a conclusion that we might >> actually drop any data that has timestamp less than "last trigger time + >> allowed lateness". This looks appealing to me, because IMO it has strong >> internal logical consistency. Although it is possible that it would drop >> more data, which is generally undesirable, I admit that. >> >> I'm looking for explanation why the current approach was chosen instead >> of the other. >> >> Jan >> On 1/7/20 12:52 AM, Kenneth Knowles wrote: >> >> This thread has a lot in it, so I am just top-posting. >> >> - Stateful DoFn is a windowed operation; state is per-window. When the >> window expires, any further inputs are dropped. >> - "Late" is not synonymous with out-of-order. It doesn't really have an >> independent meaning. >> - For a GBK/Combine "late" means "not included prior to the on-time >> output", and "droppable" means "arriving after window expiry". >> - For Stateful DoFn there is no real meaning to "late" except if one >> is talking about "droppable", which still means "arriving after window >> expiry". A user may have a special timer where they flip a flag and treat >> elements after the timer differently. >> >> I think the definition of when data is droppable is very simple. We >> explicitly moved to this definition, away from the "out of order == late", >> because it is more robust and simpler to think about. Users saw lots of >> confusing behavior when we had "out of order by allowed lateness == >> droppable" logic. >> >> Kenn >> >> On Mon, Jan 6, 2020 at 1:42 AM Jan Lukavský <[email protected]> wrote: >> >>> > Generally the watermark update can overtake elements, because runners >>> can explicitly ignore late data in the watermark calculation (for good >>> reason - those elements are already late, so no need to hold up the >>> watermark advancing any more). >>> This seems not to affect the decision of _not late_ vs. _late_, is it? >>> If element is late and gets ignored from watermark calculation (whatever >>> that includes in this context), than the watermark cannot move past >>> elements that were not marked as _not late_ and thus nothing can make them >>> _late_. >>> >>> > For GBK on-time data simply means the first pane marked as on time. >>> For state+timers I don't think it makes sense for Beam to define on-time >>> v.s. late, rather I think the user can come up with their own definition >>> depending on their use case. For example, if you are buffering data into >>> BagState and setting a timer to process it, it would be logical to say that >>> any element that was buffered before the timer expired is on time, and any >>> data that showed up after the timer fired is late. This would roughly >>> correspond to what GBK does, and the answer would be very similar to simply >>> comparing against the watermark (as the timers fire when the watermark >>> advances). >>> >>> Yes, I'd say that stateful DoFns don't have (well defined) concept of >>> pane, because that is related to concept of trigger and this is a concept >>> of GBK (or windowed operations in general). The only semantic meaning of >>> window in stateful DoFn is that it "scopes" state. >>> >>> This discussion might have got a little off the original question, so >>> I'll try to rephrase it: >>> >>> Should stateful DoFn drop *all* late data, not just data that arrive >>> after window boundary + allowed lateness? Some arguments why I think it >>> should: >>> * in windowed operations (GBK), it is correct to drop data on window >>> boundaries only, because time (as seen by user) effectively hops only on >>> these discrete time points >>> * in stateful dofn on the other hand time move "smoothly" (yes, with >>> some granularity, millisecond, nanosecond, whatever and with watermark >>> updates only, but still) >>> * this could be viewed that dropping late data immediately as time >>> (again, from user perspective) moves (not on some more or less artificial >>> boundary having only little semantic meaning) is consistent with both the >>> above properties >>> >>> The negative side effect of this would be, that more data could be >>> dropped, but ... isn't this what defines allowed lateness? I don't want to >>> discuss the implications on user pipelines of such a change (and if we can >>> or cannot do it), just trying to build some theoretical understanding of >>> the problem as a whole. The decision if any change could / should be made >>> can be done afterwards. >>> >>> Thanks, >>> Jan >>> >>> On 1/4/20 10:35 PM, Reuven Lax wrote: >>> >>> >>> >>> On Sat, Jan 4, 2020 at 12:13 PM Jan Lukavský <[email protected]> wrote: >>> >>>> > Yes, but invariants should hold. If I add a ParDo that drops late >>>> elements (or, more commonly,diverts the late elements to a different >>>> PCollection), then the result of that ParDo should _never_ introduce and >>>> more late data. This cannot be guaranteed simply with watermark checks. The >>>> ParDo may decide that the element was not late, but by the time it outputs >>>> the element the watermark may have advanced, causing the element to >>>> actually be late. >>>> >>>> This is actually very interesting. The question is - if I decide about >>>> lateness based on output watermark of a PTransform, is it still the case, >>>> that in downstream operator(s) the element could be changed from "not late" >>>> to "late"? Provided the output watermark is updated synchronously based on >>>> input data (which should be) and watermark update cannot "overtake" >>>> elements, I think that the downstream decision should not be changed, so >>>> the invariant should hold. Or am I missing something? >>>> >>> >>> Generally the watermark update can overtake elements, because runners >>> can explicitly ignore late data in the watermark calculation (for good >>> reason - those elements are already late, so no need to hold up the >>> watermark advancing any more). >>> >>> For GBK on-time data simply means the first pane marked as on time. For >>> state+timers I don't think it makes sense for Beam to define on-time v.s. >>> late, rather I think the user can come up with their own definition >>> depending on their use case. For example, if you are buffering data into >>> BagState and setting a timer to process it, it would be logical to say that >>> any element that was buffered before the timer expired is on time, and any >>> data that showed up after the timer fired is late. This would roughly >>> correspond to what GBK does, and the answer would be very similar to simply >>> comparing against the watermark (as the timers fire when the watermark >>> advances). >>> >>> Reuven >>> >>>> On 1/4/20 8:11 PM, Reuven Lax wrote: >>>> >>>> >>>> >>>> On Sat, Jan 4, 2020 at 11:03 AM Jan Lukavský <[email protected]> wrote: >>>> >>>>> On 1/4/20 6:14 PM, Reuven Lax wrote: >>>>> >>>>> There is a very good reason not to define lateness directly in terms >>>>> of the watermark. The model does not make any guarantees that the >>>>> watermark >>>>> advances synchronously, and in fact for the Dataflow runner the watermark >>>>> advances asynchronously (i.e. independent of element processing). This >>>>> means that simply comparing an element timestamp against the watermark >>>>> creates a race condition. There are cases where the answer could change >>>>> depending on exactly when you examine the watermark, and if you examine >>>>> again while processing the same bundle you might come to a different >>>>> conclusion about lateness. >>>>> >>>>> Due to monotonicity of watermark, I don't think that the asynchronous >>>>> updates of watermark can change the answer from "late" to "not late". That >>>>> seems fine to me. >>>>> >>>> >>>> It's the other way around. You check to see whether an element is late >>>> and the answer is "not late." An instant later the answer changes to >>>> "late" This does cause many problems, and is why this was changed. >>>> >>>>> >>>>> This non determinism is undesirable when considering lateness, as it >>>>> can break many invariants that users may rely on (e.g. if I could write a >>>>> ParDo that filtered all late data, yet still find late data showing up >>>>> downstream of the ParDo which would be very surprising). For that reason, >>>>> the SDK always marks things as late based on deterministic signals. e.g. >>>>> for a triggered GBK everything in the first post-watermark pane is marked >>>>> as on time (no matter what the watermark is) and everything in subsequent >>>>> panes is marked as late. >>>>> >>>>> Dropping latecomers will always be non-deterministic, that is certain. >>>>> This is true even in case where watermark is updated synchronously with >>>>> element processing, due to shuffling and varying (random) differences of >>>>> processing and event time in upstream operator(s). The question was only >>>>> if >>>>> a latecomer should be dropped only at a window boundaries only (which is a >>>>> sort of artificial time boundary), or right away when spotted (in stateful >>>>> dofns only). Another question would be if latecomers should be dropped >>>>> based on input or output watermark, dropping based on output watermark >>>>> seems even to be stable in the sense, that all downstream operators should >>>>> come to the same conclusion (this is a bit of a speculation). >>>>> >>>> >>>> Yes, but invariants should hold. If I add a ParDo that drops late >>>> elements (or, more commonly,diverts the late elements to a different >>>> PCollection), then the result of that ParDo should _never_ introduce and >>>> more late data. This cannot be guaranteed simply with watermark checks. The >>>> ParDo may decide that the element was not late, but by the time it outputs >>>> the element the watermark may have advanced, causing the element to >>>> actually be late. >>>> >>>> In practice this is important. And early version of Dataflow (pre Beam) >>>> implemented lateness by comparing against the watermark, and it caused no >>>> end of trouble for users. >>>> >>>>> >>>>> FYI - this is also the reason why Beam does not currently provide >>>>> users direct access to the watermark. The asynchronous nature of it can >>>>> be >>>>> very confusing, and often results in users writing bugs in their >>>>> pipelines. >>>>> We decided instead to expose easier-to-reason-about signals such as timers >>>>> (triggered by the watermark), windows, and lateness. >>>>> >>>>> Reuven >>>>> >>>>> On Sat, Jan 4, 2020 at 1:15 AM Jan Lukavský <[email protected]> wrote: >>>>> >>>>>> I realized the problem. I misinterpreted the >>>>>> LateDataDroppingDoFnRunner. It doesn't drop *all* late (arriving after >>>>>> watermark - allowed lateness) data, but only data, that arrive after >>>>>> maxTimestamp + allowedLateness of their respective windows. >>>>>> >>>>>> Stateful DoFn can run on global window (which was the case of my >>>>>> tests) and there is no dropping then. >>>>>> >>>>>> Two questions arise then: >>>>>> >>>>>> a) does it mean that this is one more argument to move this logic to >>>>>> StatefulDoFnRunner? StatefulDoFnRunner performs state cleanup on window >>>>>> GC >>>>>> time, so without LateDataDroppingDoFnRunner and late data will see empty >>>>>> state and will produce wrong results. >>>>>> >>>>>> b) is this behavior generally intentional and correct? Windows and >>>>>> triggers are (in my point of view) features of GBK, not stateful DoFn. >>>>>> Stateful DoFn is a low level primitive, which can be viewed to operate on >>>>>> "instant" windows, which should then probably be defined as dropping >>>>>> every >>>>>> single element arrive after allowed lateness. This might probably relate >>>>>> to >>>>>> question if operations should be built bottom up from most primitive and >>>>>> generic ones to more specific ones - that is GBK be implemented on top of >>>>>> stateful DoFn and not vice versa. >>>>>> >>>>>> Thoughts? >>>>>> >>>>>> Jan >>>>>> On 1/4/20 1:03 AM, Steve Niemitz wrote: >>>>>> >>>>>> I do agree that the direct runner doesn't drop late data arriving at >>>>>> a stateful DoFn (I just tested as well). >>>>>> >>>>>> However, I believe this is consistent with other runners. I'm fairly >>>>>> certain (at least last time I checked) that at least Dataflow will also >>>>>> only drop late data at GBK operations, and NOT stateful DoFns. Whether >>>>>> or >>>>>> not this is intentional is debatable however, without being able to >>>>>> inspect >>>>>> the watermark inside the stateful DoFn, it'd be very difficult to do >>>>>> anything useful with late data. >>>>>> >>>>>> >>>>>> On Fri, Jan 3, 2020 at 5:47 PM Jan Lukavský <[email protected]> wrote: >>>>>> >>>>>>> I did write a test that tested if data is dropped in a plain >>>>>>> stateful DoFn. I did this as part of validating that PR [1] didn't drop >>>>>>> more data when using @RequiresTimeSortedInput than it would without this >>>>>>> annotation. This test failed and I didn't commit it, yet. >>>>>>> >>>>>>> The test was basically as follows: >>>>>>> >>>>>>> - use TestStream to generate three elements with timestamps 2, 1 >>>>>>> and 0 >>>>>>> >>>>>>> - between elements with timestamp 1 and 0 move watermark to 1 >>>>>>> >>>>>>> - use allowed lateness of zero >>>>>>> >>>>>>> - use stateful dofn that just emits arbitrary data for each input >>>>>>> element >>>>>>> >>>>>>> - use Count.globally to count outputs >>>>>>> >>>>>>> The outcome was that stateful dofn using @RequiresTimeSortedInput >>>>>>> output 2 elements, without the annotation it was 3 elements. I think the >>>>>>> correct one would be 2 elements in this case. The difference is caused >>>>>>> by >>>>>>> the annotation having (currently) its own logic for dropping data, which >>>>>>> could be removed if we agree, that the data should be dropped in all >>>>>>> cases. >>>>>>> On 1/3/20 11:23 PM, Kenneth Knowles wrote: >>>>>>> >>>>>>> Did you write such a @Category(ValidatesRunner.class) test? I >>>>>>> believe the Java direct runner does drop late data, for both GBK and >>>>>>> stateful ParDo. >>>>>>> >>>>>>> Stateful ParDo is implemented on top of GBK: >>>>>>> https://github.com/apache/beam/blob/64262a61402fad67d9ad8a66eaf6322593d3b5dc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java#L172 >>>>>>> >>>>>>> And GroupByKey, via DirectGroupByKey, via DirectGroupAlsoByWindow, >>>>>>> does drop late data: >>>>>>> https://github.com/apache/beam/blob/c2f0d282337f3ae0196a7717712396a5a41fdde1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java#L220 >>>>>>> >>>>>>> I'm not sure why it has its own code, since ReduceFnRunner also >>>>>>> drops late data, and it does use ReduceFnRunner (the same code path all >>>>>>> Java-based runners use). >>>>>>> >>>>>>> Kenn >>>>>>> >>>>>>> >>>>>>> On Fri, Jan 3, 2020 at 1:02 PM Jan Lukavský <[email protected]> wrote: >>>>>>> >>>>>>>> Yes, the non-reliability of late data dropping in distributed >>>>>>>> runner is understood. But this is even where DirectRunner can play its >>>>>>>> role, because only there it is actually possible to emulate and test >>>>>>>> specific watermark conditions. Question regarding this for the java >>>>>>>> DirectRunner - should we completely drop LataDataDroppingDoFnRunner and >>>>>>>> delegate the late data dropping to StatefulDoFnRunner? Seems logical >>>>>>>> to me, >>>>>>>> as if we agree that late data should always be dropped, then there >>>>>>>> would no >>>>>>>> "valid" use of StatefulDoFnRunner without the late data dropping >>>>>>>> functionality. >>>>>>>> On 1/3/20 9:32 PM, Robert Bradshaw wrote: >>>>>>>> >>>>>>>> I agree, in fact we just recently enabled late data dropping to the >>>>>>>> direct runner in Python to be able to develop better tests for >>>>>>>> Dataflow. >>>>>>>> >>>>>>>> It should be noted, however, that in a distributed runner (absent >>>>>>>> the quiessence of TestStream) that one can't *count* on late data being >>>>>>>> dropped at a certain point, and in fact (due to delays in fully >>>>>>>> propagating >>>>>>>> the watermark) late data can even become on-time, so the promises about >>>>>>>> what happens behind the watermark are necessarily a bit loose. >>>>>>>> >>>>>>>> On Fri, Jan 3, 2020 at 9:15 AM Luke Cwik <[email protected]> wrote: >>>>>>>> >>>>>>>>> I agree that the DirectRunner should drop late data. Late data >>>>>>>>> dropping is optional but the DirectRunner is used by many for testing >>>>>>>>> and >>>>>>>>> we should have the same behaviour they would get on other runners or >>>>>>>>> users >>>>>>>>> may be surprised. >>>>>>>>> >>>>>>>>> On Fri, Jan 3, 2020 at 3:33 AM Jan Lukavský <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> I just found out that DirectRunner is apparently not using >>>>>>>>>> LateDataDroppingDoFnRunner, which means that it doesn't drop late >>>>>>>>>> data >>>>>>>>>> in cases where there is no GBK operation involved (dropping in >>>>>>>>>> GBK seems >>>>>>>>>> to be correct). There is apparently no @Category(ValidatesRunner) >>>>>>>>>> test >>>>>>>>>> for that behavior (because DirectRunner would fail it), so the >>>>>>>>>> question >>>>>>>>>> is - should late data dropping be considered part of model (of >>>>>>>>>> which >>>>>>>>>> DirectRunner should be a canonical implementation) and therefore >>>>>>>>>> that >>>>>>>>>> should be fixed there, or is the late data dropping an optional >>>>>>>>>> feature >>>>>>>>>> of a runner? >>>>>>>>>> >>>>>>>>>> I'm strongly in favor of the first option, and I think it is >>>>>>>>>> likely that >>>>>>>>>> all real-world runners would probably adhere to that (I didn't >>>>>>>>>> check >>>>>>>>>> that, though). >>>>>>>>>> >>>>>>>>>> Opinions? >>>>>>>>>> >>>>>>>>>> Jan >>>>>>>>>> >>>>>>>>>>
