On Tue, May 21, 2019 at 7:49 AM Jan Lukavský <je...@seznam.cz> wrote:
> Hi, > > > Actually, I think it is a larger (open) question whether exactly once > is guaranteed by the model or whether runners are allowed to relax that. > I would think, however, that sources correctly implemented should be > idempotent when run atop an exactly once infrastructure such as Flink of > Dataflow. > Exactly once semantics is not required in the Beam model. There is a TODO[1] on the runners capability matrix to expand it stating which runner provides at least once vs exactly once and other runtime characteristics. > I would assume, that the model basically inherits guarantees of > underlying infrastructure. Because Flink does not work as you described > (atomic commit of inputs, state and outputs), but rather a checkpoint > mark is flowing through the DAG much like watermark and on failures > operators are restored and data reprocessed, it (IMHO) implies, that you > have exactly once everywhere in the DAG *but* sinks. That is because > sinks cannot be restored to previous state, instead sinks are supposed > to be idempotent in order for the exactly once to really work (or at > least be able to commit outputs on checkpoint in sink). That implies > that if you don't have sink that is able to commit outputs atomically on > checkpoint, the pipeline execution should be deterministic upon retries, > otherwise shadow writes from failed paths of the pipeline might appear. > There was a discussion about transforms that require stable input to guarantee correct semantics. This requirement usually came up when attempting to write a sink correctly. More details on it in these two threads [2, 3]. > Someone from Flink might correct me if I'm wrong, but that's my current > understanding. > > > Sounds like we should make this clearer. > > I meant that you are right that we must not in any thoughts we are > having forget that streams are by definition out-of-order. That is > property that we cannot change. But - that doesn't limit us from > creating operator that presents the data to UDF as if the stream was > ideally sorted. It can do that by introducing latency, of course. > > On 5/21/19 4:01 PM, Robert Bradshaw wrote: > > Reza: One could provide something like this as a utility class, but > > one downside is that it is not scale invariant. It requires a tuning > > parameter that, if to small, won't mitigate the problem, but if to > > big, greatly increases latency. (Possibly one could define a dynamic > > session-like window to solve this though...) It also might be harder > > for runners that *can* cheaply present stuff in timestamp order to > > optimize. (That and, in practice, our annotation-style process methods > > don't lend themselves to easy composition.) I think it could work in > > specific cases though. > > > > More inline below. > > > > On Tue, May 21, 2019 at 11:38 AM Jan Lukavský <je...@seznam.cz> wrote: > >> Hi Robert, > >> > >> > Beam has an exactly-once model. If the data was consumed, state > >> mutated, and outputs written downstream (these three are committed > >> together atomically) it will not be replayed. That does not, of course, > >> solve the non-determanism due to ordering (including the fact that two > >> operations reading the same PCollection may view different ordering). > >> > >> I think what you describe is a property of a runner, not of the model, > >> right? I think if I run my pipeline on Flink I will not get this > >> atomicity, because although Flink uses also exactly-once model if might > >> write outputs multiple times. > > Actually, I think it is a larger (open) question whether exactly once > > is guaranteed by the model or whether runners are allowed to relax > > that. I would think, however, that sources correctly implemented > > should be idempotent when run atop an exactly once infrastructure such > > as Flink of Dataflow. > > > >> > 1) Is it correct for a (Stateful)DoFn to assume elements are > received > >> in a specific order? In the current model, it is not. Being able to > >> read, handle, and produced out-of-order data, including late data, is a > >> pretty fundamental property of distributed systems. > >> > >> Yes, absolutely. The argument here is not that Stateful ParDo should > >> presume to receive elements in any order, but to _present_ it as such to > >> the user @ProcessElement function. > > Sounds like we should make this clearer. > > > >> > 2) Given that some operations are easier (or possibly only possible) > >> to write when operating on ordered data, and that different runners may > >> have (significantly) cheaper ways to provide this ordering than can be > >> done by the user themselves, should we elevate this to a property of > >> (Stateful?)DoFns that the runner can provide? I think a compelling > >> argument can be made here that we should. > >> > >> +1 > >> > >> Jan > >> > >> On 5/21/19 11:07 AM, Robert Bradshaw wrote: > >>> On Mon, May 20, 2019 at 5:24 PM Jan Lukavský <je...@seznam.cz> wrote: > >>>> > I don't see batch vs. streaming as part of the model. One can > have > >>>> microbatch, or even a runner that alternates between different modes. > >>>> > >>>> Although I understand motivation of this statement, this project name > is > >>>> "Apache Beam: An advanced unified programming model". What does the > >>>> model unify, if "streaming vs. batch" is not part of the model? > >>> What I mean is that streaming vs. batch is no longer part of the model > >>> (or ideally API), but pushed down to be a concern of the runner > >>> (executor) of the pipeline. > >>> > >>> > >>> On Tue, May 21, 2019 at 10:32 AM Jan Lukavský <je...@seznam.cz> wrote: > >>>> Hi Kenn, > >>>> > >>>> OK, so if we introduce annotation, we can have stateful ParDo with > sorting, that would perfectly resolve my issues. I still have some doubts, > though. Let me explain. The current behavior of stateful ParDo has the > following properties: > >>>> > >>>> a) might fail in batch, although runs fine in streaming (that is > due to the buffering, and unbounded lateness in batch, which was discussed > back and forth in this thread) > >>>> > >>>> b) might be non deterministic (this is because the elements arrive > at somewhat random order, and even if you do the operation "assign unique > ID to elements" this might produce different results when run multiple > times) > >>> PCollections are *explicitly* unordered. Any operations that assume or > >>> depend on a specific ordering for correctness (or determinism) must > >>> provide that ordering themselves (i.e. tolerate "arbitrary shuffling > >>> of inputs"). As you point out, that may be very expensive if you have > >>> very hot keys with very large (unbounded) timestamp skew. > >>> > >>> StatefulDoFns are low-level operations that should be used with care; > >>> the simpler windowing model gives determinism in the face of unordered > >>> data (though late data and non-end-of-window triggering introduces > >>> some of the non-determanism back in). > >>> > >>>> What worries me most is the property b), because it seems to me to > have serious consequences - not only that if you run twice batch pipeline > you would get different results, but even on streaming, when pipeline fails > and gets restarted from checkpoint, produced output might differ from the > previous run and data from the first run might have already been persisted > into sink. That would create somewhat messy outputs. > >>> Beam has an exactly-once model. If the data was consumed, state > >>> mutated, and outputs written downstream (these three are committed > >>> together atomically) it will not be replayed. That does not, of > >>> course, solve the non-determanism due to ordering (including the fact > >>> that two operations reading the same PCollection may view different > >>> ordering). > >>> > >>>> These two properties makes me think that the current implementation > is more of a _special case_ than the general one. The general one would be > that your state doesn't have the properties to be able to tolerate > buffering problems and/or non-determinism. Which is the case where you need > sorting in both streaming and batch to be part of the model. > >>>> > >>>> Let me point out one more analogy - that is merging vs. non-merging > windows. The general case (merging windows) implies sorting by timestamp in > both batch case (explicit) and streaming (buffering). The special case > (non-merging windows) doesn't rely on any timestamp ordering, so the > sorting and buffering can be dropped. The underlying root cause of this is > the same for both stateful ParDo and windowing (essentially, assigning > window labels is a stateful operation when windowing function is merging). > >>>> > >>>> The reason for the current behavior of stateful ParDo seems to be > performance, but is it right to abandon correctness in favor of > performance? Wouldn't it be more consistent to have the default behavior > prefer correctness and when you have the specific conditions of state > function having special properties, then you can annotate your DoFn (with > something like @TimeOrderingAgnostic), which would yield a better > performance in that case? > >>> There are two separable questions here. > >>> > >>> 1) Is it correct for a (Stateful)DoFn to assume elements are received > >>> in a specific order? In the current model, it is not. Being able to > >>> read, handle, and produced out-of-order data, including late data, is > >>> a pretty fundamental property of distributed systems. > >>> > >>> 2) Given that some operations are easier (or possibly only possible) > >>> to write when operating on ordered data, and that different runners > >>> may have (significantly) cheaper ways to provide this ordering than > >>> can be done by the user themselves, should we elevate this to a > >>> property of (Stateful?)DoFns that the runner can provide? I think a > >>> compelling argument can be made here that we should. > >>> > >>> - Robert > >>> > >>> > >>> > >>>> On 5/21/19 1:00 AM, Kenneth Knowles wrote: > >>>> > >>>> Thanks for the nice small example of a calculation that depends on > order. You are right that many state machines have this property. I agree > w/ you and Luke that it is convenient for batch processing to sort by event > timestamp before running a stateful ParDo. In streaming you could also > implement "sort by event timestamp" by buffering until you know all earlier > data will be dropped - a slack buffer up to allowed lateness. > >>>> > >>>> I do not think that it is OK to sort in batch and not in streaming. > Many state machines diverge very rapidly when things are out of order. So > each runner if they see the "@OrderByTimestamp" annotation (or whatever) > needs to deliver sorted data (by some mix of buffering and dropping), or to > reject the pipeline as unsupported. > >>>> > >>>> And also want to say that this is not the default case - many uses of > state & timers in ParDo yield different results at the element level, but > the results are equivalent at in the big picture. Such as the example of > "assign a unique sequence number to each element" or "group into batches" > it doesn't matter exactly what the result is, only that it meets the spec. > And other cases like user funnels are monotonic enough that you also don't > actually need sorting. > >>>> > >>>> Kenn > >>>> > >>>> On Mon, May 20, 2019 at 2:59 PM Jan Lukavský <je...@seznam.cz> wrote: > >>>>> Yes, the problem will arise probably mostly when you have not well > distributed keys (or too few keys). I'm really not sure if a pure GBK with > a trigger can solve this - it might help to have data driven trigger. There > would still be some doubts, though. The main question is still here - > people say, that sorting by timestamp before stateful ParDo would be > prohibitively slow, but I don't really see why - the sorting is very > probably already there. And if not (hash grouping instead of sorted > grouping), then the sorting would affect only user defined StatefulParDos. > >>>>> > >>>>> This would suggest that the best way out of this would be really to > add annotation, so that the author of the pipeline can decide. > >>>>> > >>>>> If that would be acceptable I think I can try to prepare some basic > functionality, but I'm not sure, if I would be able to cover all runners / > sdks. > >>>>> > >>>>> On 5/20/19 11:36 PM, Lukasz Cwik wrote: > >>>>> > >>>>> It is read all per key and window and not just read all (this still > won't scale with hot keys in the global window). The GBK preceding the > StatefulParDo will guarantee that you are processing all the values for a > specific key and window at any given time. Is there a specific > window/trigger that is missing that you feel would remove the need for you > to use StatefulParDo? > >>>>> > >>>>> On Mon, May 20, 2019 at 12:54 PM Jan Lukavský <je...@seznam.cz> > wrote: > >>>>>> Hi Lukasz, > >>>>>> > >>>>>>> Today, if you must have a strict order, you must guarantee that > your StatefulParDo implements the necessary "buffering & sorting" into > state. > >>>>>> Yes, no problem with that. But this whole discussion started, > because *this doesn't work on batch*. You simply cannot first read > everything from distributed storage and then buffer it all into memory, > just to read it again, but sorted. That will not work. And even if it > would, it would be a terrible waste of resources. > >>>>>> > >>>>>> Jan > >>>>>> > >>>>>> On 5/20/19 8:39 PM, Lukasz Cwik wrote: > >>>>>> > >>>>>> > >>>>>> > >>>>>> On Mon, May 20, 2019 at 8:24 AM Jan Lukavský <je...@seznam.cz> > wrote: > >>>>>>> This discussion brings many really interesting questions for me. > :-) > >>>>>>> > >>>>>>> > I don't see batch vs. streaming as part of the model. One can > have > >>>>>>> microbatch, or even a runner that alternates between different > modes. > >>>>>>> > >>>>>>> Although I understand motivation of this statement, this project > name is > >>>>>>> "Apache Beam: An advanced unified programming model". What does the > >>>>>>> model unify, if "streaming vs. batch" is not part of the model? > >>>>>>> > >>>>>>> Using microbatching, chaining of batch jobs, or pure streaming are > >>>>>>> exactly the "runtime conditions/characteristics" I refer to. All > these > >>>>>>> define several runtime parameters, which in turn define how > well/badly > >>>>>>> will the pipeline perform and how many resources might be needed. > From > >>>>>>> my point of view, pure streaming should be the most resource > demanding > >>>>>>> (if not, why bother with batch? why not run everything in streaming > >>>>>>> only? what will there remain to "unify"?). > >>>>>>> > >>>>>>> > Fortunately, for batch, only the state for a single key needs > to be > >>>>>>> preserved at a time, rather than the state for all keys across the > range > >>>>>>> of skew. Of course if you have few or hot keys, one can still have > >>>>>>> issues (and this is not specific to StatefulDoFns). > >>>>>>> > >>>>>>> Yes, but here is still the presumption that my stateful DoFn can > >>>>>>> tolerate arbitrary shuffling of inputs. Let me explain the use > case in > >>>>>>> more detail. > >>>>>>> > >>>>>>> Suppose you have input stream consisting of 1s and 0s (and some > key for > >>>>>>> each element, which is irrelevant for the demonstration). Your > task is > >>>>>>> to calculate in running global window the actual number of changes > >>>>>>> between state 0 and state 1 and vice versa. When the state doesn't > >>>>>>> change, you don't calculate anything. If input (for given key) > would be > >>>>>>> (tN denotes timestamp N): > >>>>>>> > >>>>>>> t1: 1 > >>>>>>> > >>>>>>> t2: 0 > >>>>>>> > >>>>>>> t3: 0 > >>>>>>> > >>>>>>> t4: 1 > >>>>>>> > >>>>>>> t5: 1 > >>>>>>> > >>>>>>> t6: 0 > >>>>>>> > >>>>>>> then the output should yield (supposing that default state is > zero): > >>>>>>> > >>>>>>> t1: (one: 1, zero: 0) > >>>>>>> > >>>>>>> t2: (one: 1, zero: 1) > >>>>>>> > >>>>>>> t3: (one: 1, zero: 1) > >>>>>>> > >>>>>>> t4: (one: 2, zero: 1) > >>>>>>> > >>>>>>> t5: (one: 2, zero: 1) > >>>>>>> > >>>>>>> t6: (one: 2, zero: 2) > >>>>>>> > >>>>>>> How would you implement this in current Beam semantics? > >>>>>> I think your saying here that I know that my input is ordered in a > specific way and since I assume the order when writing my pipeline I can > perform this optimization. But there is nothing preventing a runner from > noticing that your processing in the global window with a specific type of > trigger and re-ordering your inputs/processing to get better performance > (since you can't use an AfterWatermark trigger for your pipeline in > streaming for the GlobalWindow). > >>>>>> > >>>>>> Today, if you must have a strict order, you must guarantee that > your StatefulParDo implements the necessary "buffering & sorting" into > state. I can see why you would want an annotation that says I must have > timestamp ordered elements, since it makes writing certain StatefulParDos > much easier. StatefulParDo is a low-level function, it really is the "here > you go and do whatever you need to but here be dragons" function while > windowing and triggering is meant to keep many people from writing > StatefulParDo in the first place. > >>>>>> > >>>>>>> > Pipelines that fail in the "worst case" batch scenario are > likely to > >>>>>>> degrade poorly (possibly catastrophically) when the watermark falls > >>>>>>> behind in streaming mode as well. > >>>>>>> > >>>>>>> But the worst case is defined by input of size (available > resources + > >>>>>>> single byte) -> pipeline fail. Although it could have finished, > given > >>>>>>> the right conditions. > >>>>>>> > >>>>>>> > This might be reasonable, implemented by default by buffering > >>>>>>> everything and releasing elements as the watermark (+lateness) > advances, > >>>>>>> but would likely lead to inefficient (though *maybe* easier to > reason > >>>>>>> about) code. > >>>>>>> > >>>>>>> Sure, the pipeline will be less efficient, because it would have to > >>>>>>> buffer and sort the inputs. But at least it will produce correct > results > >>>>>>> in cases where updates to state are order-sensitive. > >>>>>>> > >>>>>>> > Would it be roughly equivalent to GBK + FlatMap(lambda (key, > values): > >>>>>>> [(key, value) for value in values])? > >>>>>>> > >>>>>>> I'd say roughly yes, but difference would be in the trigger. The > trigger > >>>>>>> should ideally fire as soon as watermark (+lateness) crosses > element > >>>>>>> with lowest timestamp in the buffer. Although this could be somehow > >>>>>>> emulated by fixed trigger each X millis. > >>>>>>> > >>>>>>> > Or is the underlying desire just to be able to hint to the > runner > >>>>>>> that the code may perform better (e.g. require less resources) as > skew > >>>>>>> is reduced (and hence to order by timestamp iff it's cheap)? > >>>>>>> > >>>>>>> No, the sorting would have to be done in streaming case as well. > That is > >>>>>>> an imperative of the unified model. I think it is possible to sort > by > >>>>>>> timestamp only in batch case (and do it for *all* batch stateful > pardos > >>>>>>> without annotation), or introduce annotation, but then make the > same > >>>>>>> guarantees for streaming case as well. > >>>>>>> > >>>>>>> Jan > >>>>>>> > >>>>>>> On 5/20/19 4:41 PM, Robert Bradshaw wrote: > >>>>>>>> On Mon, May 20, 2019 at 1:19 PM Jan Lukavský <je...@seznam.cz> > wrote: > >>>>>>>>> Hi Robert, > >>>>>>>>> > >>>>>>>>> yes, I think you rephrased my point - although no *explicit* > guarantees > >>>>>>>>> of ordering are given in either mode, there is *implicit* > ordering in > >>>>>>>>> streaming case that is due to nature of the processing - the > difference > >>>>>>>>> between watermark and timestamp of elements flowing through the > pipeline > >>>>>>>>> are generally low (too high difference leads to the overbuffering > >>>>>>>>> problem), but there is no such bound on batch. > >>>>>>>> Fortunately, for batch, only the state for a single key needs to > be > >>>>>>>> preserved at a time, rather than the state for all keys across the > >>>>>>>> range of skew. Of course if you have few or hot keys, one can > still > >>>>>>>> have issues (and this is not specific to StatefulDoFns). > >>>>>>>> > >>>>>>>>> As a result, I see a few possible solutions: > >>>>>>>>> > >>>>>>>>> - the best and most natural seems to be extension of the > model, so > >>>>>>>>> that it defines batch as not only "streaming pipeline executed > in batch > >>>>>>>>> fashion", but "pipeline with at least as good runtime > characteristics as > >>>>>>>>> in streaming case, executed in batch fashion", I really don't > think that > >>>>>>>>> there are any conflicts with the current model, or that this > could > >>>>>>>>> affect performance, because the required sorting (as pointed by > >>>>>>>>> Aljoscha) is very probably already done during translation of > stateful > >>>>>>>>> pardos. Also note that this definition only affects user defined > >>>>>>>>> stateful pardos > >>>>>>>> I don't see batch vs. streaming as part of the model. One can have > >>>>>>>> microbatch, or even a runner that alternates between different > modes. > >>>>>>>> The model describes what the valid outputs are given a (sometimes > >>>>>>>> partial) set of inputs. It becomes really hard to define things > like > >>>>>>>> "as good runtime characteristics." Once you allow any > >>>>>>>> out-of-orderedness, it is not very feasible to try and define (and > >>>>>>>> more cheaply implement) a "upper bound" of acceptable > >>>>>>>> out-of-orderedness. > >>>>>>>> > >>>>>>>> Pipelines that fail in the "worst case" batch scenario are likely > to > >>>>>>>> degrade poorly (possibly catastrophically) when the watermark > falls > >>>>>>>> behind in streaming mode as well. > >>>>>>>> > >>>>>>>>> - another option would be to introduce annotation for DoFns > (e.g. > >>>>>>>>> @RequiresStableTimeCharacteristics), which would result in the > sorting > >>>>>>>>> in batch case - but - this extension would have to ensure the > sorting in > >>>>>>>>> streaming mode also - it would require definition of allowed > lateness, > >>>>>>>>> and triggger (essentially similar to window) > >>>>>>>> This might be reasonable, implemented by default by buffering > >>>>>>>> everything and releasing elements as the watermark (+lateness) > >>>>>>>> advances, but would likely lead to inefficient (though *maybe* > easier > >>>>>>>> to reason about) code. Not sure about the semantics of triggering > >>>>>>>> here, especially data-driven triggers. Would it be roughly > equivalent > >>>>>>>> to GBK + FlatMap(lambda (key, values): [(key, value) for value in > >>>>>>>> values])? > >>>>>>>> > >>>>>>>> Or is the underlying desire just to be able to hint to the runner > that > >>>>>>>> the code may perform better (e.g. require less resources) as skew > is > >>>>>>>> reduced (and hence to order by timestamp iff it's cheap)? > >>>>>>>> > >>>>>>>>> - last option would be to introduce these "higher order > guarantees" in > >>>>>>>>> some extension DSL (e.g. Euphoria), but that seems to be the > worst > >>>>>>>>> option to me > >>>>>>>>> > >>>>>>>>> I see the first two options quite equally good, although the > letter one > >>>>>>>>> is probably more time consuming to implement. But it would bring > >>>>>>>>> additional feature to streaming case as well. > >>>>>>>>> > >>>>>>>>> Thanks for any thoughts. > >>>>>>>>> > >>>>>>>>> Jan > >>>>>>>>> > >>>>>>>>> On 5/20/19 12:41 PM, Robert Bradshaw wrote: > >>>>>>>>>> On Fri, May 17, 2019 at 4:48 PM Jan Lukavský <je...@seznam.cz> > wrote: > >>>>>>>>>>> Hi Reuven, > >>>>>>>>>>> > >>>>>>>>>>>> How so? AFAIK stateful DoFns work just fine in batch runners. > >>>>>>>>>>> Stateful ParDo works in batch as far, as the logic inside the > state works for absolutely unbounded out-of-orderness of elements. That > basically (practically) can work only for cases, where the order of input > elements doesn't matter. But, "state" can refer to "state machine", and any > time you have a state machine involved, then the ordering of elements would > matter. > >>>>>>>>>> No guarantees on order are provided in *either* streaming or > batch > >>>>>>>>>> mode by the model. However, it is the case that in order to make > >>>>>>>>>> forward progress most streaming runners attempt to limit the > amount of > >>>>>>>>>> out-of-orderedness of elements (in terms of event time vs. > processing > >>>>>>>>>> time) to make forward progress, which in turn could help cap the > >>>>>>>>>> amount of state that must be held concurrently, whereas a batch > runner > >>>>>>>>>> may not allow any state to be safely discarded until the whole > >>>>>>>>>> timeline from infinite past to infinite future has been > observed. > >>>>>>>>>> > >>>>>>>>>> Also, as pointed out, state is not preserved "batch to batch" > in batch mode. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On Thu, May 16, 2019 at 3:59 PM Maximilian Michels < > m...@apache.org> wrote: > >>>>>>>>>> > >>>>>>>>>>>> batch semantics and streaming semantics differs only in > that I can have GlobalWindow with default trigger on batch and cannot on > stream > >>>>>>>>>>> You can have a GlobalWindow in streaming with a default > trigger. You > >>>>>>>>>>> could define additional triggers that do early firings. And > you could > >>>>>>>>>>> even trigger the global window by advancing the watermark to > +inf. > >>>>>>>>>> IIRC, as a pragmatic note, we prohibited global window with > default > >>>>>>>>>> trigger on unbounded PCollections in the SDK because this is > more > >>>>>>>>>> likely to be user error than an actual desire to have no output > until > >>>>>>>>>> drain. But it's semantically valid in the model. > 1: https://beam.apache.org/documentation/runners/capability-matrix/ 2: https://lists.apache.org/thread.html/7487750085413f11fbeb1e707fa8fc138a07c93d6a394cf7245371fc@%3Cdev.beam.apache.org%3E 3: https://lists.apache.org/thread.html/d0c24cb4d421e25980fee1855d1daa68d343700270ed962d642c8f99@%3Cdev.beam.apache.org%3E