Note that even "unbounded pipeline in a streaming runner".waitUntilFinish() can return, e.g., if you cancel it or terminate it. It's totally reasonable for users to want to understand and handle these cases.
+1 Dan On Thu, Mar 2, 2017 at 2:53 AM, Jean-Baptiste Onofré <[email protected]> wrote: > +1 > > Good idea !! > > Regards > JB > > > On 03/02/2017 02:54 AM, Eugene Kirpichov wrote: > >> Raising this onto the mailing list from >> https://issues.apache.org/jira/browse/BEAM-849 >> >> The issue came up: what does it mean for a pipeline to finish, in the Beam >> model? >> >> Note that I am deliberately not talking about "batch" and "streaming" >> pipelines, because this distinction does not exist in the model. Several >> runners have batch/streaming *modes*, which implement the same semantics >> (potentially different subsets: in batch mode typically a runner will >> reject pipelines that have at least one unbounded PCollection) but in an >> operationally different way. However we should define pipeline termination >> at the level of the unified model, and then make sure that all runners in >> all modes implement that properly. >> >> One natural way is to say "a pipeline terminates when the output >> watermarks >> of all of its PCollection's progress to +infinity". (Note: this can be >> generalized, I guess, to having partial executions of a pipeline: if >> you're >> interested in the full contents of only some collections, then you wait >> until only the watermarks of those collections progress to infinity) >> >> A typical "batch" runner mode does not implement watermarks - we can think >> of it as assigning watermark -infinity to an output of a transform that >> hasn't started executing yet, and +infinity to output of a transform that >> has finished executing. This is consistent with how such runners implement >> termination in practice. >> >> Dataflow streaming runner additionally implements such termination for >> pipeline drain operation: it has 2 parts: 1) stop consuming input from the >> sources, and 2) wait until all watermarks progress to infinity. >> >> Let us fill the gap by making this part of the Beam model and declaring >> that all runners should implement this behavior. This will give nice >> properties, e.g.: >> - A pipeline that has only bounded collections can be run by any runner in >> any mode, with the same results and termination behavior (this is actually >> my motivating example for raising this issue is: I was running Splittable >> DoFn tests >> <https://github.com/apache/beam/blob/master/sdks/java/core/ >> src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java> >> with the streaming Dataflow runner - these tests produce only bounded >> collections - and noticed that they wouldn't terminate even though all >> data >> was processed) >> - It will be possible to implement pipelines that stream data for a while >> and then eventually successfully terminate based on some condition. E.g. a >> pipeline that watches a continuously growing file until it is marked >> read-only, or a pipeline that reads a Kafka topic partition until it >> receives a "poison pill" message. This seems handy. >> >> > -- > Jean-Baptiste Onofré > [email protected] > http://blog.nanthrax.net > Talend - http://www.talend.com >
