+1 I think it's a fair claim that a PCollection is "done" when it's watermark reaches positive infinity, and then it's easy to claim that a Pipeline is "done" when all of its PCollections are done. Completion is an especially reasonable claim if we consider positive infinity to be an actual infinity - so long as allowed lateness is a finite value, elements that arrive whenever a watermark is at positive infinity will be "infinitely" late, and thus can be dropped by the runner.
As an aside, this is only about "finishing because the pipeline is complete" - it's unrelated to "finished because of an unrecoverable error" or similar reasons pipelines can stop running, yes? On Wed, Mar 1, 2017 at 5:54 PM, Eugene Kirpichov < [email protected]> wrote: > Raising this onto the mailing list from > https://issues.apache.org/jira/browse/BEAM-849 > > The issue came up: what does it mean for a pipeline to finish, in the Beam > model? > > Note that I am deliberately not talking about "batch" and "streaming" > pipelines, because this distinction does not exist in the model. Several > runners have batch/streaming *modes*, which implement the same semantics > (potentially different subsets: in batch mode typically a runner will > reject pipelines that have at least one unbounded PCollection) but in an > operationally different way. However we should define pipeline termination > at the level of the unified model, and then make sure that all runners in > all modes implement that properly. > > One natural way is to say "a pipeline terminates when the output watermarks > of all of its PCollection's progress to +infinity". (Note: this can be > generalized, I guess, to having partial executions of a pipeline: if you're > interested in the full contents of only some collections, then you wait > until only the watermarks of those collections progress to infinity) > > A typical "batch" runner mode does not implement watermarks - we can think > of it as assigning watermark -infinity to an output of a transform that > hasn't started executing yet, and +infinity to output of a transform that > has finished executing. This is consistent with how such runners implement > termination in practice. > > Dataflow streaming runner additionally implements such termination for > pipeline drain operation: it has 2 parts: 1) stop consuming input from the > sources, and 2) wait until all watermarks progress to infinity. > > Let us fill the gap by making this part of the Beam model and declaring > that all runners should implement this behavior. This will give nice > properties, e.g.: > - A pipeline that has only bounded collections can be run by any runner in > any mode, with the same results and termination behavior (this is actually > my motivating example for raising this issue is: I was running Splittable > DoFn tests > <https://github.com/apache/beam/blob/master/sdks/java/ > core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java> > with the streaming Dataflow runner - these tests produce only bounded > collections - and noticed that they wouldn't terminate even though all data > was processed) > - It will be possible to implement pipelines that stream data for a while > and then eventually successfully terminate based on some condition. E.g. a > pipeline that watches a continuously growing file until it is marked > read-only, or a pipeline that reads a Kafka topic partition until it > receives a "poison pill" message. This seems handy. >
