*From: *Jozef Vilcek <[email protected]> *Date: *Fri, May 17, 2019 at 2:31 AM *To: * <[email protected]>
Interesting discussion. I think it is very important information, that when > user will use a stateful ParDo, he can run into the situation where it will > not behave correctly in "batch operating mode". > How so? AFAIK stateful DoFns work just fine in batch runners. > But some transforms known to Beam, like fixed-window, will work fine? Is > there a sorting applied to keyed elements before evaluating window key > group? If answer is yes, then why not also do the same in case of stateful > ParDo? It would feel consistent to me. > > Part of SDK or not, I see DataFlow runner is doing this optimisation, > probably precisely for making stateful ParDo operations stable in batch mode > > https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java#L64 > > > On Thu, May 16, 2019 at 5:09 PM Jan Lukavský <[email protected]> wrote: > >> Hi Max, >> answers inline. >> ---------- Původní e-mail ---------- >> Od: Maximilian Michels <[email protected]> >> Komu: [email protected] >> Datum: 16. 5. 2019 15:59:59 >> Předmět: Re: Definition of Unified model (WAS: Semantics of >> PCollection.isBounded) >> >> Hi Jan, >> >> Thanks for the discussion. Aljoscha already gave great answers. Just a >> couple of remarks: >> >> > a) streaming semantics (i.e. what I can express using Transforms) are >> subset of batch semantics >> >> I think you mean streaming is a superset of batch, or batch is a subset >> of streaming. This is the ideal. In practice, the two execution modes >> are sometimes accomplished by two different execution engines. Even in >> Flink, we have independent APIs for batch and streaming and the >> execution semantics are slightly different. For example, there are no >> watermarks in the batch API. Thus, batch rarely is simply an execution >> mode of streaming. However, I still think the unified Beam model works >> in both cases. >> >> > batch semantics and streaming semantics differs only in that I can have >> GlobalWindow with default trigger on batch and cannot on stream >> >> Actually I really thought, that regarding semantics, streaming should be >> subset of batch. That is because in batch, you can be sure that the >> watermark will eventually approach infinity. That gives you one additional >> feature, that streaming generally doesn't have (if you don't manually >> forward watermark to infinity as you suggest). >> >> >> >> You can have a GlobalWindow in streaming with a default trigger. You >> could define additional triggers that do early firings. And you could >> even trigger the global window by advancing the watermark to +inf. >> >> Yes, but then you actually changed streaming to batch, you just execute >> batch pipeline in streaming way. >> >> >> >> > On batch engines, this is generally not an issue, because the buffering >> is eliminated by sorting - when a Group by operation occurs, batch runners >> sort elements with the same key to be together and therefore eliminate the >> need for potentially infinite cache. >> >> The batch engines you normally use might do that. However, I do not see >> how sorting is an inherent property of the streaming model. We do not >> guarantee a deterministic order of events in streaming with respect to >> event time. In that regard, batch is a true subset of streaming because >> we make no guarantees on the order. Actually, because we only advance >> the watermark from -inf to +inf once we have read all data, this nicely >> aligns with the streaming model. >> >> >> Sure, streaming, doesn't have the time ordering guarantees. Having so >> would be impractical. But - there is no issues in having these quarantees >> in batch mode, moreover, it gives the pipelines, that need to have "bounded >> out of orderness" the chance to ever finish. >> >> >> I think that there is some issues in how we think about the properties of >> batch vs. stream. If we define streaming as the superset, then we cannot >> define some properties for batch, that streaming doesn't have. But - if we >> just split it on the part of semantics and on the part of runtime >> properties and guarantees, than it is possible to define properties of >> batch, that streaming doesn't have. >> >> >> Jan >> >> >> >> >> -Max >> >> On 16.05.19 15:20, Aljoscha Krettek wrote: >> > Hi, >> > >> > I think it’s helpful to consider that events never truly arrive in >> order in the real world (you mentioned as much yourself). For streaming use >> cases, there might be some out-of-orderness (or a lot of it, depending on >> the use case) so your implementation has to be able to deal with that. On >> the other end of the spectrum we have batch use cases, where >> out-of-orderness is potentially even bigger because it allows for more >> efficient parallel execution. If your implementation can deal with >> out-of-orderness that also shouldn’t be a problem. >> > >> > Another angle is completeness vs. latency: you usually cannot have both >> in a streaming world. If you want 100 % completeness, i.e. you want to >> ensure that you process all events and never drop anything, you can never >> advance the watermark from its initial -Inf if you want to also never have >> watermark violations. In typical use cases I would expect any sorting >> guarantees to be constantly violated, unless you are willing to drop late >> data. >> > >> > I think these are some reasons why there is no mention of ordering by >> timestamp anywhere (unless I’m mistaken and there is somewhere). >> > >> > You are right, of course, that batch-style runners can use >> grouping/sorting for a GroupByKey operation. Flink does that and even >> allows sorting by secondary key, so you could manually sort by timestamp as >> a secondary key with hardly any additional cost. However, exposing that in >> the model would make implementing Runners quite hard, or they would be >> prohibitively slow. >> > >> > You’re also right that user functions that do arbitrary stateful >> operations can be quite dangerous and lead to unexpected behaviour. You >> example of reacting to changes in 0 and 1 would produce wrong results if >> events are not 100% sorted by timestamp. In general, state changes that >> rely on processing order are problematic while operations that move >> monotonously though some space are fine. Examples of such operations are >> adding elements to a set or summing numbers. If you “see” a given set of >> events you can apply them to state in any order and as long as you see the >> same set of events on different executions the result will be the same. >> > >> > As for the Beam execution model in relation to processing and time, I >> think the only “guarantees” are: >> > - you will eventually see all events >> > - the timestamp of those events is usually not less than the watermark >> (but not always) >> > - the watermark will advance when the system thinks you won’t see >> events with a smaller timestamp in the future (but you sometimes might) >> > >> > Those seem quite “poor”, but I think you can’t get better guarantees >> for general cases for the reasons mentioned above. Also, this is just of >> the top of my head and I might be wrong in my understanding of the Beam >> model. :-O >> > >> > Best, >> > Aljoscha >> > >> >> On 16. May 2019, at 13:53, Jan Lukavský <[email protected]> wrote: >> >> >> >> Hi, >> >> >> >> this is starting to be really exciting. It seems to me that there is >> either something wrong with my definition of "Unified model" or with how it >> is implemented inside (at least) Direct and Flink Runners. >> >> >> >> So, first what I see as properties of Unified model: >> >> >> >> a) streaming semantics (i.e. what I can express using Transforms) are >> subset of batch semantics >> >> >> >> - this is true, batch semantics and streaming semantics differs only >> in that I can have GlobalWindow with default trigger on batch and cannot on >> stream >> >> >> >> b) runtime conditions of batch have to be subset of streaming >> conditions >> >> >> >> - this is because otherwise it might be intractable to run streaming >> pipeline on batch engine >> >> >> >> - generally this is also true - in batch mode watermark advances only >> between two states (-inf and +inf), which makes it possible to turn (most) >> stateful operations into group by key operations, and take advantage of >> many other optimizations (ability to re-read inputs make it possible to >> drop checkpointing, etc, etc) >> >> >> >> Now there is also one not so obvious runtime condition of streaming >> engines - that is how skewed watermark and event time of elements being >> processed can be - if this gets too high (i.e. watermark is not moving, >> and/or elements are very out-of-order, then the processing might become >> intractable, because everything might have to be buffered). >> >> >> >> On batch engines, this is generally not an issue, because the >> buffering is eliminated by sorting - when a Group by operation occurs, >> batch runners sort elements with the same key to be together and therefore >> eliminate the need for potentially infinite cache. >> >> >> >> When this turns out to be an issue, is whenever there is a stateful >> ParDo operation, because then (without sorting) there is violation of >> property b) - on streaming engine the difference between element timestamp >> and watermark will tend to be generally low (and late events will be >> dropped to restrict the size of buffers), but on batch it can be >> arbitrarily large and therefore size buffers that would be needed is >> potentially unbounded. >> >> >> >> This line of thinking leads me to a conclusion, that if Beam doesn't >> (on purpose) sort elements before stateful ParDo by timestamp, then it >> basically violates the Unified model, because pipelines with stateful ParDo >> will not function properly on batch engines. Which is what I observe - >> there is non determinism on batch pipeline although everything seems to be >> "well defined", elements arrive arbitrarily out of order and are >> arbitrarily out of order dropped. This leads to different results everytime >> batch pipeline is run. >> >> >> >> Looking forward to any comments on this. >> >> >> >> Jan >> >> >> >> On 5/16/19 10:53 AM, Aljoscha Krettek wrote: >> >>> Please take this with a grain of salt, because I might be a bit rusty >> on this. >> >>> >> >>> I think the Beam model does not prescribe any ordering (by time or >> otherwise) on inputs. Mostly because always requiring it would be >> prohibitively expensive on most Runners, especially global sorting. >> >>> >> >>> If you want to have sorting by key, you could do a GroupByKey and >> then sort the groups in memory. This only works, of course, if your groups >> are not too large. >> >>> >> >>>> On 15. May 2019, at 21:01, Jan Lukavský <[email protected]> wrote: >> >>>> >> >>>> Hmmm, looking into the code of FlinkRunner (and also by observing >> results from the stateful ParDo), it seems, that I got it wrong from the >> beginning. The data is not sorted before the stateful ParDo, but that a >> little surprises me. How the operator should work in this case? It would >> mean, that in the batch case I have to hold arbitrarily long >> allowedLateness inside the BagState, which seems to be kind of suboptimal. >> Or am I missing something obvious here? I'll describe the use case in more >> detail, let's suppose I have a series of ones and zeros and I want emit at >> each time point value of 1 if value changes from 0 to 1, value of -1 if >> changes from 1 to 0 and 0 otherwise. So: >> >>>> >> >>>> 0, 1, 1, 0, 0, 1 -> 0, 1, 0, -1, 0, 1 >> >>>> >> >>>> Does anyone have a better idea how to solve it? And if not, how to >> make it running on batch, without possibly infinite buffer? Should the >> input to stateful ParDo be sorted in batch case? My intuition would be that >> it should be, because in my understanding of "batch as a special case of >> streaming" in batch case, there is (by default) single window, time >> advances from -inf to +inf at the end, and the data contains no out of >> order data, in places where this might matter (which therefore enables some >> optimizations). The order would be relevant only in the stateful ParDo, I'd >> say. >> >>>> >> >>>> Jan >> >>>> >> >>>> On 5/15/19 8:34 PM, Jan Lukavský wrote: >> >>>>> Just to clarify, I understand, that changing semantics of the >> PCollection.isBounded, is probably impossible now, because would probably >> introduce chicken egg problem. Maybe I will state it more clearly - would >> it be better to be able to run bounded pipelines using batch semantics on >> DirectRunner (including sorting before stateful ParDos), or would it be >> better to come up with some way to notify the pipeline that it will be >> running in a streaming way although it consists only of bounded inputs? And >> I'm not saying how to do it, just trying to find out if anyone else ever >> had such a need. >> >>>>> >> >>>>> Jan >> >>>>> >> >>>>> On 5/15/19 5:20 PM, Jan Lukavský wrote: >> >>>>>> Hi, >> >>>>>> >> >>>>>> I have come across unexpected (at least for me) behavior of some >> apparent inconsistency of how a PCollection is processed in DirectRunner >> and what PCollection.isBounded signals. Let me explain: >> >>>>>> >> >>>>>> - I have a stateful ParDo, which needs to make sure that elements >> arrive in order - it accomplishes this by defining BagState for buffering >> input elements and sorting them inside this buffer, it also keeps track of >> element with highest timestamp to somehow estimate local watermark (minus >> some allowed lateness), to know when to remove elements from the buffer, >> sort them by time and pass them to some (time ordered) processing >> >>>>>> >> >>>>>> - this seems to work well for streaming (unbounded) data >> >>>>>> >> >>>>>> - for batch (bounded) data the semantics of stateful ParDo should >> be (please correct me if I'm wrong) that elements always arrive in order, >> because the runner can sort them by timestamp >> >>>>>> >> >>>>>> - this implies that for batch processed input (bounded) the >> allowedLateness can be set to zero, so that the processing is little more >> effective, because it doesn't have to use the BagState at all >> >>>>>> >> >>>>>> - now, the trouble seems to be, that DirectRunner always uses >> streaming processing, even if the input is bounded (that is by definition >> possible), but there is no way now to know when it is possible to change >> allowed lateness to zero (because input will arrive ordered) >> >>>>>> >> >>>>>> - so - it seems to me, that either DirectRunner should apply >> sorting to stateful ParDo, when it processes bounded data (the same way >> that other runners do), or it can apply streaming processing, but then it >> should change PCollection.isBounded to UNBOUNDED, even if the input is >> originally bounded >> >>>>>> >> >>>>>> - that way, the semantics of PCollection.isBounded, would be not >> if the data are known in advance to be finite, but *how* the data are going >> to be processed, which is much more valuable (IMO) >> >>>>>> >> >>>>>> Any thoughts? >> >>>>>> >> >>>>>> Jan >> >>>>>> >> > >> >>
