On Tue, May 21, 2019 at 7:49 AM Jan Lukavský <je...@seznam.cz> wrote:

> Hi,
>
>  > Actually, I think it is a larger (open) question whether exactly once
> is guaranteed by the model or whether runners are allowed to relax that.
> I would think, however, that sources correctly implemented should be
> idempotent when run atop an exactly once infrastructure such as Flink of
> Dataflow.
>

Exactly once semantics is not required in the Beam model. There is a
TODO[1] on the runners capability matrix to expand it stating which runner
provides at least once vs exactly once and other runtime characteristics.


> I would assume, that the model basically inherits guarantees of
> underlying infrastructure. Because Flink does not work as you described
> (atomic commit of inputs, state and outputs), but rather a checkpoint
> mark is flowing through the DAG much like watermark and on failures
> operators are restored and data reprocessed, it (IMHO) implies, that you
> have exactly once everywhere in the DAG *but* sinks. That is because
> sinks cannot be restored to previous state, instead sinks are supposed
> to be idempotent in order for the exactly once to really work (or at
> least be able to commit outputs on checkpoint in sink). That implies
> that if you don't have sink that is able to commit outputs atomically on
> checkpoint, the pipeline execution should be deterministic upon retries,
> otherwise shadow writes from failed paths of the pipeline might appear.
>

There was a discussion about transforms that require stable input to
guarantee correct semantics. This requirement usually came up when
attempting to write a sink correctly. More details on it in these two
threads [2, 3].


> Someone from Flink might correct me if I'm wrong, but that's my current
> understanding.
>
>  > Sounds like we should make this clearer.
>
> I meant that you are right that we must not in any thoughts we are
> having forget that streams are by definition out-of-order. That is
> property that we cannot change. But - that doesn't limit us from
> creating operator that presents the data to UDF as if the stream was
> ideally sorted. It can do that by introducing latency, of course.
>
> On 5/21/19 4:01 PM, Robert Bradshaw wrote:
> > Reza: One could provide something like this as a utility class, but
> > one downside is that it is not scale invariant. It requires a tuning
> > parameter that, if to small, won't mitigate the problem, but if to
> > big, greatly increases latency. (Possibly one could define a dynamic
> > session-like window to solve this though...) It also might be harder
> > for runners that *can* cheaply present stuff in timestamp order to
> > optimize. (That and, in practice, our annotation-style process methods
> > don't lend themselves to easy composition.) I think it could work in
> > specific cases though.
> >
> > More inline below.
> >
> > On Tue, May 21, 2019 at 11:38 AM Jan Lukavský <je...@seznam.cz> wrote:
> >> Hi Robert,
> >>
> >>   > Beam has an exactly-once model. If the data was consumed, state
> >> mutated, and outputs written downstream (these three are committed
> >> together atomically) it will not be replayed. That does not, of course,
> >> solve the non-determanism due to ordering (including the fact that two
> >> operations reading the same PCollection may view different ordering).
> >>
> >> I think what you describe is a property of a runner, not of the model,
> >> right? I think if I run my pipeline on Flink I will not get this
> >> atomicity, because although Flink uses also exactly-once model if might
> >> write outputs multiple times.
> > Actually, I think it is a larger (open) question whether exactly once
> > is guaranteed by the model or whether runners are allowed to relax
> > that. I would think, however, that sources correctly implemented
> > should be idempotent when run atop an exactly once infrastructure such
> > as Flink of Dataflow.
> >
> >>   > 1) Is it correct for a (Stateful)DoFn to assume elements are
> received
> >> in a specific order? In the current model, it is not. Being able to
> >> read, handle, and produced out-of-order data, including late data, is a
> >> pretty fundamental property of distributed systems.
> >>
> >> Yes, absolutely. The argument here is not that Stateful ParDo should
> >> presume to receive elements in any order, but to _present_ it as such to
> >> the user @ProcessElement function.
> > Sounds like we should make this clearer.
> >
> >>   > 2) Given that some operations are easier (or possibly only possible)
> >> to write when operating on ordered data, and that different runners may
> >> have (significantly) cheaper ways to provide this ordering than can be
> >> done by the user themselves, should we elevate this to a property of
> >> (Stateful?)DoFns that the runner can provide? I think a compelling
> >> argument can be made here that we should.
> >>
> >> +1
> >>
> >> Jan
> >>
> >> On 5/21/19 11:07 AM, Robert Bradshaw wrote:
> >>> On Mon, May 20, 2019 at 5:24 PM Jan Lukavský <je...@seznam.cz> wrote:
> >>>>    > I don't see batch vs. streaming as part of the model. One can
> have
> >>>> microbatch, or even a runner that alternates between different modes.
> >>>>
> >>>> Although I understand motivation of this statement, this project name
> is
> >>>> "Apache Beam: An advanced unified programming model". What does the
> >>>> model unify, if "streaming vs. batch" is not part of the model?
> >>> What I mean is that streaming vs. batch is no longer part of the model
> >>> (or ideally API), but pushed down to be a concern of the runner
> >>> (executor) of the pipeline.
> >>>
> >>>
> >>> On Tue, May 21, 2019 at 10:32 AM Jan Lukavský <je...@seznam.cz> wrote:
> >>>> Hi Kenn,
> >>>>
> >>>> OK, so if we introduce annotation, we can have stateful ParDo with
> sorting, that would perfectly resolve my issues. I still have some doubts,
> though. Let me explain. The current behavior of stateful ParDo has the
> following properties:
> >>>>
> >>>>    a) might fail in batch, although runs fine in streaming (that is
> due to the buffering, and unbounded lateness in batch, which was discussed
> back and forth in this thread)
> >>>>
> >>>>    b) might be non deterministic (this is because the elements arrive
> at somewhat random order, and even if you do the operation "assign unique
> ID to elements" this might produce different results when run multiple
> times)
> >>> PCollections are *explicitly* unordered. Any operations that assume or
> >>> depend on a specific ordering for correctness (or determinism) must
> >>> provide that ordering themselves (i.e. tolerate "arbitrary shuffling
> >>> of inputs"). As you point out, that may be very expensive if you have
> >>> very hot keys with very large (unbounded) timestamp skew.
> >>>
> >>> StatefulDoFns are low-level operations that should be used with care;
> >>> the simpler windowing model gives determinism in the face of unordered
> >>> data (though late data and non-end-of-window triggering introduces
> >>> some of the non-determanism back in).
> >>>
> >>>> What worries me most is the property b), because it seems to me to
> have serious consequences - not only that if you run twice batch pipeline
> you would get different results, but even on streaming, when pipeline fails
> and gets restarted from checkpoint, produced output might differ from the
> previous run and data from the first run might have already been persisted
> into sink. That would create somewhat messy outputs.
> >>> Beam has an exactly-once model. If the data was consumed, state
> >>> mutated, and outputs written downstream (these three are committed
> >>> together atomically) it will not be replayed. That does not, of
> >>> course, solve the non-determanism due to ordering (including the fact
> >>> that two operations reading the same PCollection may view different
> >>> ordering).
> >>>
> >>>> These two properties makes me think that the current implementation
> is more of a _special case_ than the general one. The general one would be
> that your state doesn't have the properties to be able to tolerate
> buffering problems and/or non-determinism. Which is the case where you need
> sorting in both streaming and batch to be part of the model.
> >>>>
> >>>> Let me point out one more analogy - that is merging vs. non-merging
> windows. The general case (merging windows) implies sorting by timestamp in
> both batch case (explicit) and streaming (buffering). The special case
> (non-merging windows) doesn't rely on any timestamp ordering, so the
> sorting and buffering can be dropped. The underlying root cause of this is
> the same for both stateful ParDo and windowing (essentially, assigning
> window labels is a stateful operation when windowing function is merging).
> >>>>
> >>>> The reason for the current behavior of stateful ParDo seems to be
> performance, but is it right to abandon correctness in favor of
> performance? Wouldn't it be more consistent to have the default behavior
> prefer correctness and when you have the specific conditions of state
> function having special properties, then you can annotate your DoFn (with
> something like @TimeOrderingAgnostic), which would yield a better
> performance in that case?
> >>> There are two separable questions here.
> >>>
> >>> 1) Is it correct for a (Stateful)DoFn to assume elements are received
> >>> in a specific order? In the current model, it is not. Being able to
> >>> read, handle, and produced out-of-order data, including late data, is
> >>> a pretty fundamental property of distributed systems.
> >>>
> >>> 2) Given that some operations are easier (or possibly only possible)
> >>> to write when operating on ordered data, and that different runners
> >>> may have (significantly) cheaper ways to provide this ordering than
> >>> can be done by the user themselves, should we elevate this to a
> >>> property of (Stateful?)DoFns that the runner can provide? I think a
> >>> compelling argument can be made here that we should.
> >>>
> >>> - Robert
> >>>
> >>>
> >>>
> >>>> On 5/21/19 1:00 AM, Kenneth Knowles wrote:
> >>>>
> >>>> Thanks for the nice small example of a calculation that depends on
> order. You are right that many state machines have this property. I agree
> w/ you and Luke that it is convenient for batch processing to sort by event
> timestamp before running a stateful ParDo. In streaming you could also
> implement "sort by event timestamp" by buffering until you know all earlier
> data will be dropped - a slack buffer up to allowed lateness.
> >>>>
> >>>> I do not think that it is OK to sort in batch and not in streaming.
> Many state machines diverge very rapidly when things are out of order. So
> each runner if they see the "@OrderByTimestamp" annotation (or whatever)
> needs to deliver sorted data (by some mix of buffering and dropping), or to
> reject the pipeline as unsupported.
> >>>>
> >>>> And also want to say that this is not the default case - many uses of
> state & timers in ParDo yield different results at the element level, but
> the results are equivalent at in the big picture. Such as the example of
> "assign a unique sequence number to each element" or "group into batches"
> it doesn't matter exactly what the result is, only that it meets the spec.
> And other cases like user funnels are monotonic enough that you also don't
> actually need sorting.
> >>>>
> >>>> Kenn
> >>>>
> >>>> On Mon, May 20, 2019 at 2:59 PM Jan Lukavský <je...@seznam.cz> wrote:
> >>>>> Yes, the problem will arise probably mostly when you have not well
> distributed keys (or too few keys). I'm really not sure if a pure GBK with
> a trigger can solve this - it might help to have data driven trigger. There
> would still be some doubts, though. The main question is still here -
> people say, that sorting by timestamp before stateful ParDo would be
> prohibitively slow, but I don't really see why - the sorting is very
> probably already there. And if not (hash grouping instead of sorted
> grouping), then the sorting would affect only user defined StatefulParDos.
> >>>>>
> >>>>> This would suggest that the best way out of this would be really to
> add annotation, so that the author of the pipeline can decide.
> >>>>>
> >>>>> If that would be acceptable I think I can try to prepare some basic
> functionality, but I'm not sure, if I would be able to cover all runners /
> sdks.
> >>>>>
> >>>>> On 5/20/19 11:36 PM, Lukasz Cwik wrote:
> >>>>>
> >>>>> It is read all per key and window and not just read all (this still
> won't scale with hot keys in the global window). The GBK preceding the
> StatefulParDo will guarantee that you are processing all the values for a
> specific key and window at any given time. Is there a specific
> window/trigger that is missing that you feel would remove the need for you
> to use StatefulParDo?
> >>>>>
> >>>>> On Mon, May 20, 2019 at 12:54 PM Jan Lukavský <je...@seznam.cz>
> wrote:
> >>>>>> Hi Lukasz,
> >>>>>>
> >>>>>>> Today, if you must have a strict order, you must guarantee that
> your StatefulParDo implements the necessary "buffering & sorting" into
> state.
> >>>>>> Yes, no problem with that. But this whole discussion started,
> because *this doesn't work on batch*. You simply cannot first read
> everything from distributed storage and then buffer it all into memory,
> just to read it again, but sorted. That will not work. And even if it
> would, it would be a terrible waste of resources.
> >>>>>>
> >>>>>> Jan
> >>>>>>
> >>>>>> On 5/20/19 8:39 PM, Lukasz Cwik wrote:
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Mon, May 20, 2019 at 8:24 AM Jan Lukavský <je...@seznam.cz>
> wrote:
> >>>>>>> This discussion brings many really interesting questions for me.
> :-)
> >>>>>>>
> >>>>>>>    > I don't see batch vs. streaming as part of the model. One can
> have
> >>>>>>> microbatch, or even a runner that alternates between different
> modes.
> >>>>>>>
> >>>>>>> Although I understand motivation of this statement, this project
> name is
> >>>>>>> "Apache Beam: An advanced unified programming model". What does the
> >>>>>>> model unify, if "streaming vs. batch" is not part of the model?
> >>>>>>>
> >>>>>>> Using microbatching, chaining of batch jobs, or pure streaming are
> >>>>>>> exactly the "runtime conditions/characteristics" I refer to. All
> these
> >>>>>>> define several runtime parameters, which in turn define how
> well/badly
> >>>>>>> will the pipeline perform and how many resources might be needed.
> From
> >>>>>>> my point of view, pure streaming should be the most resource
> demanding
> >>>>>>> (if not, why bother with batch? why not run everything in streaming
> >>>>>>> only? what will there remain to "unify"?).
> >>>>>>>
> >>>>>>>    > Fortunately, for batch, only the state for a single key needs
> to be
> >>>>>>> preserved at a time, rather than the state for all keys across the
> range
> >>>>>>> of skew. Of course if you have few or hot keys, one can still have
> >>>>>>> issues (and this is not specific to StatefulDoFns).
> >>>>>>>
> >>>>>>> Yes, but here is still the presumption that my stateful DoFn can
> >>>>>>> tolerate arbitrary shuffling of inputs. Let me explain the use
> case in
> >>>>>>> more detail.
> >>>>>>>
> >>>>>>> Suppose you have input stream consisting of 1s and 0s (and some
> key for
> >>>>>>> each element, which is irrelevant for the demonstration). Your
> task is
> >>>>>>> to calculate in running global window the actual number of changes
> >>>>>>> between state 0 and state 1 and vice versa. When the state doesn't
> >>>>>>> change, you don't calculate anything. If input (for given key)
> would be
> >>>>>>> (tN denotes timestamp N):
> >>>>>>>
> >>>>>>>     t1: 1
> >>>>>>>
> >>>>>>>     t2: 0
> >>>>>>>
> >>>>>>>     t3: 0
> >>>>>>>
> >>>>>>>     t4: 1
> >>>>>>>
> >>>>>>>     t5: 1
> >>>>>>>
> >>>>>>>     t6: 0
> >>>>>>>
> >>>>>>> then the output should yield (supposing that default state is
> zero):
> >>>>>>>
> >>>>>>>     t1: (one: 1, zero: 0)
> >>>>>>>
> >>>>>>>     t2: (one: 1, zero: 1)
> >>>>>>>
> >>>>>>>     t3: (one: 1, zero: 1)
> >>>>>>>
> >>>>>>>     t4: (one: 2, zero: 1)
> >>>>>>>
> >>>>>>>     t5: (one: 2, zero: 1)
> >>>>>>>
> >>>>>>>     t6: (one: 2, zero: 2)
> >>>>>>>
> >>>>>>> How would you implement this in current Beam semantics?
> >>>>>> I think your saying here that I know that my input is ordered in a
> specific way and since I assume the order when writing my pipeline I can
> perform this optimization. But there is nothing preventing a runner from
> noticing that your processing in the global window with a specific type of
> trigger and re-ordering your inputs/processing to get better performance
> (since you can't use an AfterWatermark trigger for your pipeline in
> streaming for the GlobalWindow).
> >>>>>>
> >>>>>> Today, if you must have a strict order, you must guarantee that
> your StatefulParDo implements the necessary "buffering & sorting" into
> state. I can see why you would want an annotation that says I must have
> timestamp ordered elements, since it makes writing certain StatefulParDos
> much easier. StatefulParDo is a low-level function, it really is the "here
> you go and do whatever you need to but here be dragons" function while
> windowing and triggering is meant to keep many people from writing
> StatefulParDo in the first place.
> >>>>>>
> >>>>>>>    > Pipelines that fail in the "worst case" batch scenario are
> likely to
> >>>>>>> degrade poorly (possibly catastrophically) when the watermark falls
> >>>>>>> behind in streaming mode as well.
> >>>>>>>
> >>>>>>> But the worst case is defined by input of size (available
> resources +
> >>>>>>> single byte) -> pipeline fail. Although it could have finished,
> given
> >>>>>>> the right conditions.
> >>>>>>>
> >>>>>>>    > This might be reasonable, implemented by default by buffering
> >>>>>>> everything and releasing elements as the watermark (+lateness)
> advances,
> >>>>>>> but would likely lead to inefficient (though *maybe* easier to
> reason
> >>>>>>> about) code.
> >>>>>>>
> >>>>>>> Sure, the pipeline will be less efficient, because it would have to
> >>>>>>> buffer and sort the inputs. But at least it will produce correct
> results
> >>>>>>> in cases where updates to state are order-sensitive.
> >>>>>>>
> >>>>>>>    > Would it be roughly equivalent to GBK + FlatMap(lambda (key,
> values):
> >>>>>>> [(key, value) for value in values])?
> >>>>>>>
> >>>>>>> I'd say roughly yes, but difference would be in the trigger. The
> trigger
> >>>>>>> should ideally fire as soon as watermark (+lateness) crosses
> element
> >>>>>>> with lowest timestamp in the buffer. Although this could be somehow
> >>>>>>> emulated by fixed trigger each X millis.
> >>>>>>>
> >>>>>>>    > Or is the underlying desire just to be able to hint to the
> runner
> >>>>>>> that the code may perform better (e.g. require less resources) as
> skew
> >>>>>>> is reduced (and hence to order by timestamp iff it's cheap)?
> >>>>>>>
> >>>>>>> No, the sorting would have to be done in streaming case as well.
> That is
> >>>>>>> an imperative of the unified model. I think it is possible to sort
> by
> >>>>>>> timestamp only in batch case (and do it for *all* batch stateful
> pardos
> >>>>>>> without annotation), or introduce annotation, but then make the
> same
> >>>>>>> guarantees for streaming case as well.
> >>>>>>>
> >>>>>>> Jan
> >>>>>>>
> >>>>>>> On 5/20/19 4:41 PM, Robert Bradshaw wrote:
> >>>>>>>> On Mon, May 20, 2019 at 1:19 PM Jan Lukavský <je...@seznam.cz>
> wrote:
> >>>>>>>>> Hi Robert,
> >>>>>>>>>
> >>>>>>>>> yes, I think you rephrased my point - although no *explicit*
> guarantees
> >>>>>>>>> of ordering are given in either mode, there is *implicit*
> ordering in
> >>>>>>>>> streaming case that is due to nature of the processing - the
> difference
> >>>>>>>>> between watermark and timestamp of elements flowing through the
> pipeline
> >>>>>>>>> are generally low (too high difference leads to the overbuffering
> >>>>>>>>> problem), but there is no such bound on batch.
> >>>>>>>> Fortunately, for batch, only the state for a single key needs to
> be
> >>>>>>>> preserved at a time, rather than the state for all keys across the
> >>>>>>>> range of skew. Of course if you have few or hot keys, one can
> still
> >>>>>>>> have issues (and this is not specific to StatefulDoFns).
> >>>>>>>>
> >>>>>>>>> As a result, I see a few possible solutions:
> >>>>>>>>>
> >>>>>>>>>      - the best and most natural seems to be extension of the
> model, so
> >>>>>>>>> that it defines batch as not only "streaming pipeline executed
> in batch
> >>>>>>>>> fashion", but "pipeline with at least as good runtime
> characteristics as
> >>>>>>>>> in streaming case, executed in batch fashion", I really don't
> think that
> >>>>>>>>> there are any conflicts with the current model, or that this
> could
> >>>>>>>>> affect performance, because the required sorting (as pointed by
> >>>>>>>>> Aljoscha) is very probably already done during translation of
> stateful
> >>>>>>>>> pardos. Also note that this definition only affects user defined
> >>>>>>>>> stateful pardos
> >>>>>>>> I don't see batch vs. streaming as part of the model. One can have
> >>>>>>>> microbatch, or even a runner that alternates between different
> modes.
> >>>>>>>> The model describes what the valid outputs are given a (sometimes
> >>>>>>>> partial) set of inputs. It becomes really hard to define things
> like
> >>>>>>>> "as good runtime characteristics." Once you allow any
> >>>>>>>> out-of-orderedness, it is not very feasible to try and define (and
> >>>>>>>> more cheaply implement) a "upper bound" of acceptable
> >>>>>>>> out-of-orderedness.
> >>>>>>>>
> >>>>>>>> Pipelines that fail in the "worst case" batch scenario are likely
> to
> >>>>>>>> degrade poorly (possibly catastrophically) when the watermark
> falls
> >>>>>>>> behind in streaming mode as well.
> >>>>>>>>
> >>>>>>>>>      - another option would be to introduce annotation for DoFns
> (e.g.
> >>>>>>>>> @RequiresStableTimeCharacteristics), which would result in the
> sorting
> >>>>>>>>> in batch case - but - this extension would have to ensure the
> sorting in
> >>>>>>>>> streaming mode also - it would require definition of allowed
> lateness,
> >>>>>>>>> and triggger (essentially similar to window)
> >>>>>>>> This might be reasonable, implemented by default by buffering
> >>>>>>>> everything and releasing elements as the watermark (+lateness)
> >>>>>>>> advances, but would likely lead to inefficient (though *maybe*
> easier
> >>>>>>>> to reason about) code. Not sure about the semantics of triggering
> >>>>>>>> here, especially data-driven triggers. Would it be roughly
> equivalent
> >>>>>>>> to GBK + FlatMap(lambda (key, values): [(key, value) for value in
> >>>>>>>> values])?
> >>>>>>>>
> >>>>>>>> Or is the underlying desire just to be able to hint to the runner
> that
> >>>>>>>> the code may perform better (e.g. require less resources) as skew
> is
> >>>>>>>> reduced (and hence to order by timestamp iff it's cheap)?
> >>>>>>>>
> >>>>>>>>>      - last option would be to introduce these "higher order
> guarantees" in
> >>>>>>>>> some extension DSL (e.g. Euphoria), but that seems to be the
> worst
> >>>>>>>>> option to me
> >>>>>>>>>
> >>>>>>>>> I see the first two options quite equally good, although the
> letter one
> >>>>>>>>> is probably more time consuming to implement. But it would bring
> >>>>>>>>> additional feature to streaming case as well.
> >>>>>>>>>
> >>>>>>>>> Thanks for any thoughts.
> >>>>>>>>>
> >>>>>>>>>      Jan
> >>>>>>>>>
> >>>>>>>>> On 5/20/19 12:41 PM, Robert Bradshaw wrote:
> >>>>>>>>>> On Fri, May 17, 2019 at 4:48 PM Jan Lukavský <je...@seznam.cz>
> wrote:
> >>>>>>>>>>> Hi Reuven,
> >>>>>>>>>>>
> >>>>>>>>>>>> How so? AFAIK stateful DoFns work just fine in batch runners.
> >>>>>>>>>>> Stateful ParDo works in batch as far, as the logic inside the
> state works for absolutely unbounded out-of-orderness of elements. That
> basically (practically) can work only for cases, where the order of input
> elements doesn't matter. But, "state" can refer to "state machine", and any
> time you have a state machine involved, then the ordering of elements would
> matter.
> >>>>>>>>>> No guarantees on order are provided in *either* streaming or
> batch
> >>>>>>>>>> mode by the model. However, it is the case that in order to make
> >>>>>>>>>> forward progress most streaming runners attempt to limit the
> amount of
> >>>>>>>>>> out-of-orderedness of elements (in terms of event time vs.
> processing
> >>>>>>>>>> time) to make forward progress, which in turn could help cap the
> >>>>>>>>>> amount of state that must be held concurrently, whereas a batch
> runner
> >>>>>>>>>> may not allow any state to be safely discarded until the whole
> >>>>>>>>>> timeline from infinite past to infinite future has been
> observed.
> >>>>>>>>>>
> >>>>>>>>>> Also, as pointed out, state is not preserved "batch to batch"
> in batch mode.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Thu, May 16, 2019 at 3:59 PM Maximilian Michels <
> m...@apache.org> wrote:
> >>>>>>>>>>
> >>>>>>>>>>>>      batch semantics and streaming semantics differs only in
> that I can have GlobalWindow with default trigger on batch and cannot on
> stream
> >>>>>>>>>>> You can have a GlobalWindow in streaming with a default
> trigger. You
> >>>>>>>>>>> could define additional triggers that do early firings. And
> you could
> >>>>>>>>>>> even trigger the global window by advancing the watermark to
> +inf.
> >>>>>>>>>> IIRC, as a pragmatic note, we prohibited global window with
> default
> >>>>>>>>>> trigger on unbounded PCollections in the SDK because this is
> more
> >>>>>>>>>> likely to be user error than an actual desire to have no output
> until
> >>>>>>>>>> drain. But it's semantically valid in the model.
>

1: https://beam.apache.org/documentation/runners/capability-matrix/
2:
https://lists.apache.org/thread.html/7487750085413f11fbeb1e707fa8fc138a07c93d6a394cf7245371fc@%3Cdev.beam.apache.org%3E
3:
https://lists.apache.org/thread.html/d0c24cb4d421e25980fee1855d1daa68d343700270ed962d642c8f99@%3Cdev.beam.apache.org%3E

Reply via email to