+1! I think it's a very cool way to abstract away the batch vs. streaming dissonance from the Beam model.
It does require that practitioners are *educated* to think this way as well. I believe that nowadays the terms "batch" and "streaming" are so deeply rooted, that they play a key role in the users' mental model. For example, these terms are often employed to reason about whether "pipeline.waitUntilFinish(...)" is expected to ever return (batch - yes, streaming - not so much). The approach Eugene advocates turns the question of whether an execution of a pipeline ever finishes into a property of the pipeline (i.e., its sources), instead of a property of the runner (i.e., whether it runs as batch or streaming). This sounds like something that makes the world a better place, and may help address some of the major points discussed in https://issues.apache.org/jira/browse/BEAM-849. On Thu, Mar 2, 2017 at 4:03 AM Thomas Groh <tg...@google.com.invalid> wrote: > +1 > > I think it's a fair claim that a PCollection is "done" when it's watermark > reaches positive infinity, and then it's easy to claim that a Pipeline is > "done" when all of its PCollections are done. Completion is an especially > reasonable claim if we consider positive infinity to be an actual infinity > - so long as allowed lateness is a finite value, elements that arrive > whenever a watermark is at positive infinity will be "infinitely" late, and > thus can be dropped by the runner. > > As an aside, this is only about "finishing because the pipeline is > complete" - it's unrelated to "finished because of an unrecoverable error" > or similar reasons pipelines can stop running, yes? > > On Wed, Mar 1, 2017 at 5:54 PM, Eugene Kirpichov < > kirpic...@google.com.invalid> wrote: > > > Raising this onto the mailing list from > > https://issues.apache.org/jira/browse/BEAM-849 > > > > The issue came up: what does it mean for a pipeline to finish, in the > Beam > > model? > > > > Note that I am deliberately not talking about "batch" and "streaming" > > pipelines, because this distinction does not exist in the model. Several > > runners have batch/streaming *modes*, which implement the same semantics > > (potentially different subsets: in batch mode typically a runner will > > reject pipelines that have at least one unbounded PCollection) but in an > > operationally different way. However we should define pipeline > termination > > at the level of the unified model, and then make sure that all runners in > > all modes implement that properly. > > > > One natural way is to say "a pipeline terminates when the output > watermarks > > of all of its PCollection's progress to +infinity". (Note: this can be > > generalized, I guess, to having partial executions of a pipeline: if > you're > > interested in the full contents of only some collections, then you wait > > until only the watermarks of those collections progress to infinity) > > > > A typical "batch" runner mode does not implement watermarks - we can > think > > of it as assigning watermark -infinity to an output of a transform that > > hasn't started executing yet, and +infinity to output of a transform that > > has finished executing. This is consistent with how such runners > implement > > termination in practice. > > > > Dataflow streaming runner additionally implements such termination for > > pipeline drain operation: it has 2 parts: 1) stop consuming input from > the > > sources, and 2) wait until all watermarks progress to infinity. > > > > Let us fill the gap by making this part of the Beam model and declaring > > that all runners should implement this behavior. This will give nice > > properties, e.g.: > > - A pipeline that has only bounded collections can be run by any runner > in > > any mode, with the same results and termination behavior (this is > actually > > my motivating example for raising this issue is: I was running Splittable > > DoFn tests > > <https://github.com/apache/beam/blob/master/sdks/java/ > > > core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java> > > with the streaming Dataflow runner - these tests produce only bounded > > collections - and noticed that they wouldn't terminate even though all > data > > was processed) > > - It will be possible to implement pipelines that stream data for a while > > and then eventually successfully terminate based on some condition. E.g. > a > > pipeline that watches a continuously growing file until it is marked > > read-only, or a pipeline that reads a Kafka topic partition until it > > receives a "poison pill" message. This seems handy. > > >