*From: *Jozef Vilcek <jozo.vil...@gmail.com>
*Date: *Fri, May 17, 2019 at 2:31 AM
*To: * <dev@beam.apache.org>

Interesting discussion. I think it is very important information, that when
> user will use a stateful ParDo, he can run into the situation where it will
> not behave correctly in "batch operating mode".
>

How so? AFAIK stateful DoFns work just fine in batch runners.


> But some transforms known to Beam, like fixed-window, will work fine? Is
> there a sorting applied to keyed elements before evaluating window key
> group? If answer is yes, then why not also do the same in case of stateful
> ParDo? It would feel consistent to me.
>
> Part of SDK or not, I see DataFlow runner is doing this optimisation,
> probably precisely for making stateful ParDo operations stable in batch mode
>
> https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java#L64
>
>
> On Thu, May 16, 2019 at 5:09 PM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi Max,
>> answers inline.
>> ---------- Původní e-mail ----------
>> Od: Maximilian Michels <m...@apache.org>
>> Komu: dev@beam.apache.org
>> Datum: 16. 5. 2019 15:59:59
>> Předmět: Re: Definition of Unified model (WAS: Semantics of
>> PCollection.isBounded)
>>
>> Hi Jan,
>>
>> Thanks for the discussion. Aljoscha already gave great answers. Just a
>> couple of remarks:
>>
>> > a) streaming semantics (i.e. what I can express using Transforms) are
>> subset of batch semantics
>>
>> I think you mean streaming is a superset of batch, or batch is a subset
>> of streaming. This is the ideal. In practice, the two execution modes
>> are sometimes accomplished by two different execution engines. Even in
>> Flink, we have independent APIs for batch and streaming and the
>> execution semantics are slightly different. For example, there are no
>> watermarks in the batch API. Thus, batch rarely is simply an execution
>> mode of streaming. However, I still think the unified Beam model works
>> in both cases.
>>
>> > batch semantics and streaming semantics differs only in that I can have
>> GlobalWindow with default trigger on batch and cannot on stream
>>
>> Actually I really thought, that regarding semantics, streaming should be
>> subset of batch. That is because in batch, you can be sure that the
>> watermark will eventually approach infinity. That gives you one additional
>> feature, that streaming generally doesn't have (if you don't manually
>> forward watermark to infinity as you suggest).
>>
>>
>>
>> You can have a GlobalWindow in streaming with a default trigger. You
>> could define additional triggers that do early firings. And you could
>> even trigger the global window by advancing the watermark to +inf.
>>
>> Yes, but then you actually changed streaming to batch, you just execute
>> batch pipeline in streaming way.
>>
>>
>>
>> > On batch engines, this is generally not an issue, because the buffering
>> is eliminated by sorting - when a Group by operation occurs, batch runners
>> sort elements with the same key to be together and therefore eliminate the
>> need for potentially infinite cache.
>>
>> The batch engines you normally use might do that. However, I do not see
>> how sorting is an inherent property of the streaming model. We do not
>> guarantee a deterministic order of events in streaming with respect to
>> event time. In that regard, batch is a true subset of streaming because
>> we make no guarantees on the order. Actually, because we only advance
>> the watermark from -inf to +inf once we have read all data, this nicely
>> aligns with the streaming model.
>>
>>
>> Sure, streaming, doesn't  have the time ordering guarantees. Having so
>> would be impractical. But - there is no issues in having these quarantees
>> in batch mode, moreover, it gives the pipelines, that need to have "bounded
>> out of orderness" the chance to ever finish.
>>
>>
>> I think that there is some issues in how we think about the properties of
>> batch vs. stream. If we define streaming as the superset, then we cannot
>> define some properties for batch, that streaming doesn't have. But - if we
>> just split it on the part of semantics and on the part of runtime
>> properties and guarantees, than it is possible to define properties of
>> batch, that streaming doesn't have.
>>
>>
>> Jan
>>
>>
>>
>>
>> -Max
>>
>> On 16.05.19 15:20, Aljoscha Krettek wrote:
>> > Hi,
>> >
>> > I think it’s helpful to consider that events never truly arrive in
>> order in the real world (you mentioned as much yourself). For streaming use
>> cases, there might be some out-of-orderness (or a lot of it, depending on
>> the use case) so your implementation has to be able to deal with that. On
>> the other end of the spectrum we have batch use cases, where
>> out-of-orderness is potentially even bigger because it allows for more
>> efficient parallel execution. If your implementation can deal with
>> out-of-orderness that also shouldn’t be a problem.
>> >
>> > Another angle is completeness vs. latency: you usually cannot have both
>> in a streaming world. If you want 100 % completeness, i.e. you want to
>> ensure that you process all events and never drop anything, you can never
>> advance the watermark from its initial -Inf if you want to also never have
>> watermark violations. In typical use cases I would expect any sorting
>> guarantees to be constantly violated, unless you are willing to drop late
>> data.
>> >
>> > I think these are some reasons why there is no mention of ordering by
>> timestamp anywhere (unless I’m mistaken and there is somewhere).
>> >
>> > You are right, of course, that batch-style runners can use
>> grouping/sorting for a GroupByKey operation. Flink does that and even
>> allows sorting by secondary key, so you could manually sort by timestamp as
>> a secondary key with hardly any additional cost. However, exposing that in
>> the model would make implementing Runners quite hard, or they would be
>> prohibitively slow.
>> >
>> > You’re also right that user functions that do arbitrary stateful
>> operations can be quite dangerous and lead to unexpected behaviour. You
>> example of reacting to changes in 0 and 1 would produce wrong results if
>> events are not 100% sorted by timestamp. In general, state changes that
>> rely on processing order are problematic while operations that move
>> monotonously though some space are fine. Examples of such operations are
>> adding elements to a set or summing numbers. If you “see” a given set of
>> events you can apply them to state in any order and as long as you see the
>> same set of events on different executions the result will be the same.
>> >
>> > As for the Beam execution model in relation to processing and time, I
>> think the only “guarantees” are:
>> > - you will eventually see all events
>> > - the timestamp of those events is usually not less than the watermark
>> (but not always)
>> > - the watermark will advance when the system thinks you won’t see
>> events with a smaller timestamp in the future (but you sometimes might)
>> >
>> > Those seem quite “poor”, but I think you can’t get better guarantees
>> for general cases for the reasons mentioned above. Also, this is just of
>> the top of my head and I might be wrong in my understanding of the Beam
>> model. :-O
>> >
>> > Best,
>> > Aljoscha
>> >
>> >> On 16. May 2019, at 13:53, Jan Lukavský <je...@seznam.cz> wrote:
>> >>
>> >> Hi,
>> >>
>> >> this is starting to be really exciting. It seems to me that there is
>> either something wrong with my definition of "Unified model" or with how it
>> is implemented inside (at least) Direct and Flink Runners.
>> >>
>> >> So, first what I see as properties of Unified model:
>> >>
>> >> a) streaming semantics (i.e. what I can express using Transforms) are
>> subset of batch semantics
>> >>
>> >> - this is true, batch semantics and streaming semantics differs only
>> in that I can have GlobalWindow with default trigger on batch and cannot on
>> stream
>> >>
>> >> b) runtime conditions of batch have to be subset of streaming
>> conditions
>> >>
>> >> - this is because otherwise it might be intractable to run streaming
>> pipeline on batch engine
>> >>
>> >> - generally this is also true - in batch mode watermark advances only
>> between two states (-inf and +inf), which makes it possible to turn (most)
>> stateful operations into group by key operations, and take advantage of
>> many other optimizations (ability to re-read inputs make it possible to
>> drop checkpointing, etc, etc)
>> >>
>> >> Now there is also one not so obvious runtime condition of streaming
>> engines - that is how skewed watermark and event time of elements being
>> processed can be - if this gets too high (i.e. watermark is not moving,
>> and/or elements are very out-of-order, then the processing might become
>> intractable, because everything might have to be buffered).
>> >>
>> >> On batch engines, this is generally not an issue, because the
>> buffering is eliminated by sorting - when a Group by operation occurs,
>> batch runners sort elements with the same key to be together and therefore
>> eliminate the need for potentially infinite cache.
>> >>
>> >> When this turns out to be an issue, is whenever there is a stateful
>> ParDo operation, because then (without sorting) there is violation of
>> property b) - on streaming engine the difference between element timestamp
>> and watermark will tend to be generally low (and late events will be
>> dropped to restrict the size of buffers), but on batch it can be
>> arbitrarily large and therefore size buffers that would be needed is
>> potentially unbounded.
>> >>
>> >> This line of thinking leads me to a conclusion, that if Beam doesn't
>> (on purpose) sort elements before stateful ParDo by timestamp, then it
>> basically violates the Unified model, because pipelines with stateful ParDo
>> will not function properly on batch engines. Which is what I observe -
>> there is non determinism on batch pipeline although everything seems to be
>> "well defined", elements arrive arbitrarily out of order and are
>> arbitrarily out of order dropped. This leads to different results everytime
>> batch pipeline is run.
>> >>
>> >> Looking forward to any comments on this.
>> >>
>> >> Jan
>> >>
>> >> On 5/16/19 10:53 AM, Aljoscha Krettek wrote:
>> >>> Please take this with a grain of salt, because I might be a bit rusty
>> on this.
>> >>>
>> >>> I think the Beam model does not prescribe any ordering (by time or
>> otherwise) on inputs. Mostly because always requiring it would be
>> prohibitively expensive on most Runners, especially global sorting.
>> >>>
>> >>> If you want to have sorting by key, you could do a GroupByKey and
>> then sort the groups in memory. This only works, of course, if your groups
>> are not too large.
>> >>>
>> >>>> On 15. May 2019, at 21:01, Jan Lukavský <je...@seznam.cz> wrote:
>> >>>>
>> >>>> Hmmm, looking into the code of FlinkRunner (and also by observing
>> results from the stateful ParDo), it seems, that I got it wrong from the
>> beginning. The data is not sorted before the stateful ParDo, but that a
>> little surprises me. How the operator should work in this case? It would
>> mean, that in the batch case I have to hold arbitrarily long
>> allowedLateness inside the BagState, which seems to be kind of suboptimal.
>> Or am I missing something obvious here? I'll describe the use case in more
>> detail, let's suppose I have a series of ones and zeros and I want emit at
>> each time point value of 1 if value changes from 0 to 1, value of -1 if
>> changes from 1 to 0 and 0 otherwise. So:
>> >>>>
>> >>>> 0, 1, 1, 0, 0, 1 -> 0, 1, 0, -1, 0, 1
>> >>>>
>> >>>> Does anyone have a better idea how to solve it? And if not, how to
>> make it running on batch, without possibly infinite buffer? Should the
>> input to stateful ParDo be sorted in batch case? My intuition would be that
>> it should be, because in my understanding of "batch as a special case of
>> streaming" in batch case, there is (by default) single window, time
>> advances from -inf to +inf at the end, and the data contains no out of
>> order data, in places where this might matter (which therefore enables some
>> optimizations). The order would be relevant only in the stateful ParDo, I'd
>> say.
>> >>>>
>> >>>> Jan
>> >>>>
>> >>>> On 5/15/19 8:34 PM, Jan Lukavský wrote:
>> >>>>> Just to clarify, I understand, that changing semantics of the
>> PCollection.isBounded, is probably impossible now, because would probably
>> introduce chicken egg problem. Maybe I will state it more clearly - would
>> it be better to be able to run bounded pipelines using batch semantics on
>> DirectRunner (including sorting before stateful ParDos), or would it be
>> better to come up with some way to notify the pipeline that it will be
>> running in a streaming way although it consists only of bounded inputs? And
>> I'm not saying how to do it, just trying to find out if anyone else ever
>> had such a need.
>> >>>>>
>> >>>>> Jan
>> >>>>>
>> >>>>> On 5/15/19 5:20 PM, Jan Lukavský wrote:
>> >>>>>> Hi,
>> >>>>>>
>> >>>>>> I have come across unexpected (at least for me) behavior of some
>> apparent inconsistency of how a PCollection is processed in DirectRunner
>> and what PCollection.isBounded signals. Let me explain:
>> >>>>>>
>> >>>>>> - I have a stateful ParDo, which needs to make sure that elements
>> arrive in order - it accomplishes this by defining BagState for buffering
>> input elements and sorting them inside this buffer, it also keeps track of
>> element with highest timestamp to somehow estimate local watermark (minus
>> some allowed lateness), to know when to remove elements from the buffer,
>> sort them by time and pass them to some (time ordered) processing
>> >>>>>>
>> >>>>>> - this seems to work well for streaming (unbounded) data
>> >>>>>>
>> >>>>>> - for batch (bounded) data the semantics of stateful ParDo should
>> be (please correct me if I'm wrong) that elements always arrive in order,
>> because the runner can sort them by timestamp
>> >>>>>>
>> >>>>>> - this implies that for batch processed input (bounded) the
>> allowedLateness can be set to zero, so that the processing is little more
>> effective, because it doesn't have to use the BagState at all
>> >>>>>>
>> >>>>>> - now, the trouble seems to be, that DirectRunner always uses
>> streaming processing, even if the input is bounded (that is by definition
>> possible), but there is no way now to know when it is possible to change
>> allowed lateness to zero (because input will arrive ordered)
>> >>>>>>
>> >>>>>> - so - it seems to me, that either DirectRunner should apply
>> sorting to stateful ParDo, when it processes bounded data (the same way
>> that other runners do), or it can apply streaming processing, but then it
>> should change PCollection.isBounded to UNBOUNDED, even if the input is
>> originally bounded
>> >>>>>>
>> >>>>>> - that way, the semantics of PCollection.isBounded, would be not
>> if the data are known in advance to be finite, but *how* the data are going
>> to be processed, which is much more valuable (IMO)
>> >>>>>>
>> >>>>>> Any thoughts?
>> >>>>>>
>> >>>>>> Jan
>> >>>>>>
>> >
>>
>>

Reply via email to