But there's no ordering inside a window. A stateful DoFn can see the input elements inside of a window in any order at all. This is another reason it's best to think of time spatially - as another data dimension - rather than like normal processing time.
On Wed, Jan 8, 2020 at 2:26 AM Jan Lukavský <je...@seznam.cz> wrote: > Hi Luke and Kenn, > > I agree, my mental model fits this as well. But still, even in the > presence of simultaneuos existence of all windows at once - GBK and > stateful DoFns differ in the way they handle time *inside* each window (and > I'm as well leaving merging windows outside, partly because they are not > currently supported in stateful DoFns). GBK discretizes time (visible to > user) through triggers, while stateful DoFn doesn't. That is where > differences of these two come from. > > Jan > On 1/7/20 10:16 PM, Luke Cwik wrote: > > That is a really good way to describe my mental model as well. > > On Tue, Jan 7, 2020 at 12:20 PM Kenneth Knowles <k...@apache.org> wrote: > >> >> >> On Tue, Jan 7, 2020 at 1:39 AM Jan Lukavský <je...@seznam.cz> wrote: >> >>> Hi Kenn, >>> >>> I see that my terminology seems not to be 100% aligned with Beam's. I'll >>> work on that. :-) >>> >>> I agree with what you say, and by "late" I mostly meant "droppable" >>> (arriving too late after watermark). >>> >>> I'm definitely not proposing to get back to something like "out of >>> order" == "late" or anything like that. I'm also aware that stateful >>> operation is windowed operation, but the semantics of the windowing is >>> different than of a GBK. The difference is how time moves in GBK and how >>> moves in stateful DoFn. Throwing away some details (early triggers, late >>> data triggers), the main difference is that in GBK case, time hops just >>> between window boundaries, while in stateful DoFn time moves "smoothly" >>> (with each watermark update). Now, this difference brings the question >>> about why the definition of "droppable" data is the same for both types of >>> operations, when there is a difference in how users "perceive" time. As the >>> more generic operation, stateful DoFn might deserve a more general >>> definition of droppable data, which should degrade naturally to the one of >>> GBK in presence of "discrete time hops". >>> >> >> I understand what you mean. On the other hand, I encourage thinking of >> event time spatially, not as time passing. That is a big part of unifying >> batch/streaming real-time/archival processing. The event time window is a >> secondary key to partition the data (merging windows are slightly more >> complex). All event time windows exist simultaneously. So for both stateful >> ParDo and GBK, I find it helpful to consider this perspective where all >> windows are processed simultaneously / in an arbitrary order not assuming >> windows are ordered at all. Then you see that GBK and stateful ParDo do not >> really treat windows / watermark differently: both of them process a stream >> of data for each (key, window) pair until the watermark informs them that >> the stream is expired, then they GC the state associated with that (key, >> window) pair. >> >> Kenn >> >>> This might have some consequences on how the droppable data should be >>> handled in presence of (early) triggers, because triggerring is actually >>> what makes time to "hop", so we might arrive to a conclusion that we might >>> actually drop any data that has timestamp less than "last trigger time + >>> allowed lateness". This looks appealing to me, because IMO it has strong >>> internal logical consistency. Although it is possible that it would drop >>> more data, which is generally undesirable, I admit that. >>> >>> I'm looking for explanation why the current approach was chosen instead >>> of the other. >>> >>> Jan >>> On 1/7/20 12:52 AM, Kenneth Knowles wrote: >>> >>> This thread has a lot in it, so I am just top-posting. >>> >>> - Stateful DoFn is a windowed operation; state is per-window. When the >>> window expires, any further inputs are dropped. >>> - "Late" is not synonymous with out-of-order. It doesn't really have an >>> independent meaning. >>> - For a GBK/Combine "late" means "not included prior to the on-time >>> output", and "droppable" means "arriving after window expiry". >>> - For Stateful DoFn there is no real meaning to "late" except if one >>> is talking about "droppable", which still means "arriving after window >>> expiry". A user may have a special timer where they flip a flag and treat >>> elements after the timer differently. >>> >>> I think the definition of when data is droppable is very simple. We >>> explicitly moved to this definition, away from the "out of order == late", >>> because it is more robust and simpler to think about. Users saw lots of >>> confusing behavior when we had "out of order by allowed lateness == >>> droppable" logic. >>> >>> Kenn >>> >>> On Mon, Jan 6, 2020 at 1:42 AM Jan Lukavský <je...@seznam.cz> wrote: >>> >>>> > Generally the watermark update can overtake elements, because >>>> runners can explicitly ignore late data in the watermark calculation (for >>>> good reason - those elements are already late, so no need to hold up the >>>> watermark advancing any more). >>>> This seems not to affect the decision of _not late_ vs. _late_, is it? >>>> If element is late and gets ignored from watermark calculation (whatever >>>> that includes in this context), than the watermark cannot move past >>>> elements that were not marked as _not late_ and thus nothing can make them >>>> _late_. >>>> >>>> > For GBK on-time data simply means the first pane marked as on time. >>>> For state+timers I don't think it makes sense for Beam to define on-time >>>> v.s. late, rather I think the user can come up with their own definition >>>> depending on their use case. For example, if you are buffering data into >>>> BagState and setting a timer to process it, it would be logical to say that >>>> any element that was buffered before the timer expired is on time, and any >>>> data that showed up after the timer fired is late. This would roughly >>>> correspond to what GBK does, and the answer would be very similar to simply >>>> comparing against the watermark (as the timers fire when the watermark >>>> advances). >>>> >>>> Yes, I'd say that stateful DoFns don't have (well defined) concept of >>>> pane, because that is related to concept of trigger and this is a concept >>>> of GBK (or windowed operations in general). The only semantic meaning of >>>> window in stateful DoFn is that it "scopes" state. >>>> >>>> This discussion might have got a little off the original question, so >>>> I'll try to rephrase it: >>>> >>>> Should stateful DoFn drop *all* late data, not just data that arrive >>>> after window boundary + allowed lateness? Some arguments why I think it >>>> should: >>>> * in windowed operations (GBK), it is correct to drop data on window >>>> boundaries only, because time (as seen by user) effectively hops only on >>>> these discrete time points >>>> * in stateful dofn on the other hand time move "smoothly" (yes, with >>>> some granularity, millisecond, nanosecond, whatever and with watermark >>>> updates only, but still) >>>> * this could be viewed that dropping late data immediately as time >>>> (again, from user perspective) moves (not on some more or less artificial >>>> boundary having only little semantic meaning) is consistent with both the >>>> above properties >>>> >>>> The negative side effect of this would be, that more data could be >>>> dropped, but ... isn't this what defines allowed lateness? I don't want to >>>> discuss the implications on user pipelines of such a change (and if we can >>>> or cannot do it), just trying to build some theoretical understanding of >>>> the problem as a whole. The decision if any change could / should be made >>>> can be done afterwards. >>>> >>>> Thanks, >>>> Jan >>>> >>>> On 1/4/20 10:35 PM, Reuven Lax wrote: >>>> >>>> >>>> >>>> On Sat, Jan 4, 2020 at 12:13 PM Jan Lukavský <je...@seznam.cz> wrote: >>>> >>>>> > Yes, but invariants should hold. If I add a ParDo that drops late >>>>> elements (or, more commonly,diverts the late elements to a different >>>>> PCollection), then the result of that ParDo should _never_ introduce and >>>>> more late data. This cannot be guaranteed simply with watermark checks. >>>>> The >>>>> ParDo may decide that the element was not late, but by the time it outputs >>>>> the element the watermark may have advanced, causing the element to >>>>> actually be late. >>>>> >>>>> This is actually very interesting. The question is - if I decide about >>>>> lateness based on output watermark of a PTransform, is it still the case, >>>>> that in downstream operator(s) the element could be changed from "not >>>>> late" >>>>> to "late"? Provided the output watermark is updated synchronously based on >>>>> input data (which should be) and watermark update cannot "overtake" >>>>> elements, I think that the downstream decision should not be changed, so >>>>> the invariant should hold. Or am I missing something? >>>>> >>>> >>>> Generally the watermark update can overtake elements, because runners >>>> can explicitly ignore late data in the watermark calculation (for good >>>> reason - those elements are already late, so no need to hold up the >>>> watermark advancing any more). >>>> >>>> For GBK on-time data simply means the first pane marked as on time. For >>>> state+timers I don't think it makes sense for Beam to define on-time v.s. >>>> late, rather I think the user can come up with their own definition >>>> depending on their use case. For example, if you are buffering data into >>>> BagState and setting a timer to process it, it would be logical to say that >>>> any element that was buffered before the timer expired is on time, and any >>>> data that showed up after the timer fired is late. This would roughly >>>> correspond to what GBK does, and the answer would be very similar to simply >>>> comparing against the watermark (as the timers fire when the watermark >>>> advances). >>>> >>>> Reuven >>>> >>>>> On 1/4/20 8:11 PM, Reuven Lax wrote: >>>>> >>>>> >>>>> >>>>> On Sat, Jan 4, 2020 at 11:03 AM Jan Lukavský <je...@seznam.cz> wrote: >>>>> >>>>>> On 1/4/20 6:14 PM, Reuven Lax wrote: >>>>>> >>>>>> There is a very good reason not to define lateness directly in terms >>>>>> of the watermark. The model does not make any guarantees that the >>>>>> watermark >>>>>> advances synchronously, and in fact for the Dataflow runner the watermark >>>>>> advances asynchronously (i.e. independent of element processing). This >>>>>> means that simply comparing an element timestamp against the watermark >>>>>> creates a race condition. There are cases where the answer could change >>>>>> depending on exactly when you examine the watermark, and if you examine >>>>>> again while processing the same bundle you might come to a different >>>>>> conclusion about lateness. >>>>>> >>>>>> Due to monotonicity of watermark, I don't think that the asynchronous >>>>>> updates of watermark can change the answer from "late" to "not late". >>>>>> That >>>>>> seems fine to me. >>>>>> >>>>> >>>>> It's the other way around. You check to see whether an element is late >>>>> and the answer is "not late." An instant later the answer changes to >>>>> "late" This does cause many problems, and is why this was changed. >>>>> >>>>>> >>>>>> This non determinism is undesirable when considering lateness, as it >>>>>> can break many invariants that users may rely on (e.g. if I could write a >>>>>> ParDo that filtered all late data, yet still find late data showing up >>>>>> downstream of the ParDo which would be very surprising). For that reason, >>>>>> the SDK always marks things as late based on deterministic signals. e.g. >>>>>> for a triggered GBK everything in the first post-watermark pane is marked >>>>>> as on time (no matter what the watermark is) and everything in subsequent >>>>>> panes is marked as late. >>>>>> >>>>>> Dropping latecomers will always be non-deterministic, that is >>>>>> certain. This is true even in case where watermark is updated >>>>>> synchronously >>>>>> with element processing, due to shuffling and varying (random) >>>>>> differences >>>>>> of processing and event time in upstream operator(s). The question was >>>>>> only >>>>>> if a latecomer should be dropped only at a window boundaries only (which >>>>>> is >>>>>> a sort of artificial time boundary), or right away when spotted (in >>>>>> stateful dofns only). Another question would be if latecomers should be >>>>>> dropped based on input or output watermark, dropping based on output >>>>>> watermark seems even to be stable in the sense, that all downstream >>>>>> operators should come to the same conclusion (this is a bit of a >>>>>> speculation). >>>>>> >>>>> >>>>> Yes, but invariants should hold. If I add a ParDo that drops late >>>>> elements (or, more commonly,diverts the late elements to a different >>>>> PCollection), then the result of that ParDo should _never_ introduce and >>>>> more late data. This cannot be guaranteed simply with watermark checks. >>>>> The >>>>> ParDo may decide that the element was not late, but by the time it outputs >>>>> the element the watermark may have advanced, causing the element to >>>>> actually be late. >>>>> >>>>> In practice this is important. And early version of Dataflow (pre >>>>> Beam) implemented lateness by comparing against the watermark, and it >>>>> caused no end of trouble for users. >>>>> >>>>>> >>>>>> FYI - this is also the reason why Beam does not currently provide >>>>>> users direct access to the watermark. The asynchronous nature of it can >>>>>> be >>>>>> very confusing, and often results in users writing bugs in their >>>>>> pipelines. >>>>>> We decided instead to expose easier-to-reason-about signals such as >>>>>> timers >>>>>> (triggered by the watermark), windows, and lateness. >>>>>> >>>>>> Reuven >>>>>> >>>>>> On Sat, Jan 4, 2020 at 1:15 AM Jan Lukavský <je...@seznam.cz> wrote: >>>>>> >>>>>>> I realized the problem. I misinterpreted the >>>>>>> LateDataDroppingDoFnRunner. It doesn't drop *all* late (arriving after >>>>>>> watermark - allowed lateness) data, but only data, that arrive after >>>>>>> maxTimestamp + allowedLateness of their respective windows. >>>>>>> >>>>>>> Stateful DoFn can run on global window (which was the case of my >>>>>>> tests) and there is no dropping then. >>>>>>> >>>>>>> Two questions arise then: >>>>>>> >>>>>>> a) does it mean that this is one more argument to move this logic >>>>>>> to StatefulDoFnRunner? StatefulDoFnRunner performs state cleanup on >>>>>>> window >>>>>>> GC time, so without LateDataDroppingDoFnRunner and late data will see >>>>>>> empty >>>>>>> state and will produce wrong results. >>>>>>> >>>>>>> b) is this behavior generally intentional and correct? Windows and >>>>>>> triggers are (in my point of view) features of GBK, not stateful DoFn. >>>>>>> Stateful DoFn is a low level primitive, which can be viewed to operate >>>>>>> on >>>>>>> "instant" windows, which should then probably be defined as dropping >>>>>>> every >>>>>>> single element arrive after allowed lateness. This might probably >>>>>>> relate to >>>>>>> question if operations should be built bottom up from most primitive and >>>>>>> generic ones to more specific ones - that is GBK be implemented on top >>>>>>> of >>>>>>> stateful DoFn and not vice versa. >>>>>>> >>>>>>> Thoughts? >>>>>>> >>>>>>> Jan >>>>>>> On 1/4/20 1:03 AM, Steve Niemitz wrote: >>>>>>> >>>>>>> I do agree that the direct runner doesn't drop late data arriving at >>>>>>> a stateful DoFn (I just tested as well). >>>>>>> >>>>>>> However, I believe this is consistent with other runners. I'm >>>>>>> fairly certain (at least last time I checked) that at least Dataflow >>>>>>> will >>>>>>> also only drop late data at GBK operations, and NOT stateful DoFns. >>>>>>> Whether or not this is intentional is debatable however, without being >>>>>>> able >>>>>>> to inspect the watermark inside the stateful DoFn, it'd be very >>>>>>> difficult >>>>>>> to do anything useful with late data. >>>>>>> >>>>>>> >>>>>>> On Fri, Jan 3, 2020 at 5:47 PM Jan Lukavský <je...@seznam.cz> wrote: >>>>>>> >>>>>>>> I did write a test that tested if data is dropped in a plain >>>>>>>> stateful DoFn. I did this as part of validating that PR [1] didn't drop >>>>>>>> more data when using @RequiresTimeSortedInput than it would without >>>>>>>> this >>>>>>>> annotation. This test failed and I didn't commit it, yet. >>>>>>>> >>>>>>>> The test was basically as follows: >>>>>>>> >>>>>>>> - use TestStream to generate three elements with timestamps 2, 1 >>>>>>>> and 0 >>>>>>>> >>>>>>>> - between elements with timestamp 1 and 0 move watermark to 1 >>>>>>>> >>>>>>>> - use allowed lateness of zero >>>>>>>> >>>>>>>> - use stateful dofn that just emits arbitrary data for each input >>>>>>>> element >>>>>>>> >>>>>>>> - use Count.globally to count outputs >>>>>>>> >>>>>>>> The outcome was that stateful dofn using @RequiresTimeSortedInput >>>>>>>> output 2 elements, without the annotation it was 3 elements. I think >>>>>>>> the >>>>>>>> correct one would be 2 elements in this case. The difference is caused >>>>>>>> by >>>>>>>> the annotation having (currently) its own logic for dropping data, >>>>>>>> which >>>>>>>> could be removed if we agree, that the data should be dropped in all >>>>>>>> cases. >>>>>>>> On 1/3/20 11:23 PM, Kenneth Knowles wrote: >>>>>>>> >>>>>>>> Did you write such a @Category(ValidatesRunner.class) test? I >>>>>>>> believe the Java direct runner does drop late data, for both GBK and >>>>>>>> stateful ParDo. >>>>>>>> >>>>>>>> Stateful ParDo is implemented on top of GBK: >>>>>>>> https://github.com/apache/beam/blob/64262a61402fad67d9ad8a66eaf6322593d3b5dc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java#L172 >>>>>>>> >>>>>>>> And GroupByKey, via DirectGroupByKey, via DirectGroupAlsoByWindow, >>>>>>>> does drop late data: >>>>>>>> https://github.com/apache/beam/blob/c2f0d282337f3ae0196a7717712396a5a41fdde1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java#L220 >>>>>>>> >>>>>>>> I'm not sure why it has its own code, since ReduceFnRunner also >>>>>>>> drops late data, and it does use ReduceFnRunner (the same code path all >>>>>>>> Java-based runners use). >>>>>>>> >>>>>>>> Kenn >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Jan 3, 2020 at 1:02 PM Jan Lukavský <je...@seznam.cz> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Yes, the non-reliability of late data dropping in distributed >>>>>>>>> runner is understood. But this is even where DirectRunner can play its >>>>>>>>> role, because only there it is actually possible to emulate and test >>>>>>>>> specific watermark conditions. Question regarding this for the java >>>>>>>>> DirectRunner - should we completely drop LataDataDroppingDoFnRunner >>>>>>>>> and >>>>>>>>> delegate the late data dropping to StatefulDoFnRunner? Seems logical >>>>>>>>> to me, >>>>>>>>> as if we agree that late data should always be dropped, then there >>>>>>>>> would no >>>>>>>>> "valid" use of StatefulDoFnRunner without the late data dropping >>>>>>>>> functionality. >>>>>>>>> On 1/3/20 9:32 PM, Robert Bradshaw wrote: >>>>>>>>> >>>>>>>>> I agree, in fact we just recently enabled late data dropping to >>>>>>>>> the direct runner in Python to be able to develop better tests for >>>>>>>>> Dataflow. >>>>>>>>> >>>>>>>>> It should be noted, however, that in a distributed runner (absent >>>>>>>>> the quiessence of TestStream) that one can't *count* on late data >>>>>>>>> being >>>>>>>>> dropped at a certain point, and in fact (due to delays in fully >>>>>>>>> propagating >>>>>>>>> the watermark) late data can even become on-time, so the promises >>>>>>>>> about >>>>>>>>> what happens behind the watermark are necessarily a bit loose. >>>>>>>>> >>>>>>>>> On Fri, Jan 3, 2020 at 9:15 AM Luke Cwik <lc...@google.com> wrote: >>>>>>>>> >>>>>>>>>> I agree that the DirectRunner should drop late data. Late data >>>>>>>>>> dropping is optional but the DirectRunner is used by many for >>>>>>>>>> testing and >>>>>>>>>> we should have the same behaviour they would get on other runners or >>>>>>>>>> users >>>>>>>>>> may be surprised. >>>>>>>>>> >>>>>>>>>> On Fri, Jan 3, 2020 at 3:33 AM Jan Lukavský <je...@seznam.cz> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> I just found out that DirectRunner is apparently not using >>>>>>>>>>> LateDataDroppingDoFnRunner, which means that it doesn't drop >>>>>>>>>>> late data >>>>>>>>>>> in cases where there is no GBK operation involved (dropping in >>>>>>>>>>> GBK seems >>>>>>>>>>> to be correct). There is apparently no >>>>>>>>>>> @Category(ValidatesRunner) test >>>>>>>>>>> for that behavior (because DirectRunner would fail it), so the >>>>>>>>>>> question >>>>>>>>>>> is - should late data dropping be considered part of model (of >>>>>>>>>>> which >>>>>>>>>>> DirectRunner should be a canonical implementation) and therefore >>>>>>>>>>> that >>>>>>>>>>> should be fixed there, or is the late data dropping an optional >>>>>>>>>>> feature >>>>>>>>>>> of a runner? >>>>>>>>>>> >>>>>>>>>>> I'm strongly in favor of the first option, and I think it is >>>>>>>>>>> likely that >>>>>>>>>>> all real-world runners would probably adhere to that (I didn't >>>>>>>>>>> check >>>>>>>>>>> that, though). >>>>>>>>>>> >>>>>>>>>>> Opinions? >>>>>>>>>>> >>>>>>>>>>> Jan >>>>>>>>>>> >>>>>>>>>>>