Note that even "unbounded pipeline in a streaming runner".waitUntilFinish()
can return, e.g., if you cancel it or terminate it. It's totally reasonable
for users to want to understand and handle these cases.

+1

Dan

On Thu, Mar 2, 2017 at 2:53 AM, Jean-Baptiste Onofré <[email protected]>
wrote:

> +1
>
> Good idea !!
>
> Regards
> JB
>
>
> On 03/02/2017 02:54 AM, Eugene Kirpichov wrote:
>
>> Raising this onto the mailing list from
>> https://issues.apache.org/jira/browse/BEAM-849
>>
>> The issue came up: what does it mean for a pipeline to finish, in the Beam
>> model?
>>
>> Note that I am deliberately not talking about "batch" and "streaming"
>> pipelines, because this distinction does not exist in the model. Several
>> runners have batch/streaming *modes*, which implement the same semantics
>> (potentially different subsets: in batch mode typically a runner will
>> reject pipelines that have at least one unbounded PCollection) but in an
>> operationally different way. However we should define pipeline termination
>> at the level of the unified model, and then make sure that all runners in
>> all modes implement that properly.
>>
>> One natural way is to say "a pipeline terminates when the output
>> watermarks
>> of all of its PCollection's progress to +infinity". (Note: this can be
>> generalized, I guess, to having partial executions of a pipeline: if
>> you're
>> interested in the full contents of only some collections, then you wait
>> until only the watermarks of those collections progress to infinity)
>>
>> A typical "batch" runner mode does not implement watermarks - we can think
>> of it as assigning watermark -infinity to an output of a transform that
>> hasn't started executing yet, and +infinity to output of a transform that
>> has finished executing. This is consistent with how such runners implement
>> termination in practice.
>>
>> Dataflow streaming runner additionally implements such termination for
>> pipeline drain operation: it has 2 parts: 1) stop consuming input from the
>> sources, and 2) wait until all watermarks progress to infinity.
>>
>> Let us fill the gap by making this part of the Beam model and declaring
>> that all runners should implement this behavior. This will give nice
>> properties, e.g.:
>> - A pipeline that has only bounded collections can be run by any runner in
>> any mode, with the same results and termination behavior (this is actually
>> my motivating example for raising this issue is: I was running Splittable
>> DoFn tests
>> <https://github.com/apache/beam/blob/master/sdks/java/core/
>> src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java>
>> with the streaming Dataflow runner - these tests produce only bounded
>> collections - and noticed that they wouldn't terminate even though all
>> data
>> was processed)
>> - It will be possible to implement pipelines that stream data for a while
>> and then eventually successfully terminate based on some condition. E.g. a
>> pipeline that watches a continuously growing file until it is marked
>> read-only, or a pipeline that reads a Kafka topic partition until it
>> receives a "poison pill" message. This seems handy.
>>
>>
> --
> Jean-Baptiste Onofré
> [email protected]
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Reply via email to