+1

I think it's a fair claim that a PCollection is "done" when it's watermark
reaches positive infinity, and then it's easy to claim that a Pipeline is
"done" when all of its PCollections are done. Completion is an especially
reasonable claim if we consider positive infinity to be an actual infinity
- so long as allowed lateness is a finite value, elements that arrive
whenever a watermark is at positive infinity will be "infinitely" late, and
thus can be dropped by the runner.

As an aside, this is only about "finishing because the pipeline is
complete" - it's unrelated to "finished because of an unrecoverable error"
or similar reasons pipelines can stop running, yes?

On Wed, Mar 1, 2017 at 5:54 PM, Eugene Kirpichov <
[email protected]> wrote:

> Raising this onto the mailing list from
> https://issues.apache.org/jira/browse/BEAM-849
>
> The issue came up: what does it mean for a pipeline to finish, in the Beam
> model?
>
> Note that I am deliberately not talking about "batch" and "streaming"
> pipelines, because this distinction does not exist in the model. Several
> runners have batch/streaming *modes*, which implement the same semantics
> (potentially different subsets: in batch mode typically a runner will
> reject pipelines that have at least one unbounded PCollection) but in an
> operationally different way. However we should define pipeline termination
> at the level of the unified model, and then make sure that all runners in
> all modes implement that properly.
>
> One natural way is to say "a pipeline terminates when the output watermarks
> of all of its PCollection's progress to +infinity". (Note: this can be
> generalized, I guess, to having partial executions of a pipeline: if you're
> interested in the full contents of only some collections, then you wait
> until only the watermarks of those collections progress to infinity)
>
> A typical "batch" runner mode does not implement watermarks - we can think
> of it as assigning watermark -infinity to an output of a transform that
> hasn't started executing yet, and +infinity to output of a transform that
> has finished executing. This is consistent with how such runners implement
> termination in practice.
>
> Dataflow streaming runner additionally implements such termination for
> pipeline drain operation: it has 2 parts: 1) stop consuming input from the
> sources, and 2) wait until all watermarks progress to infinity.
>
> Let us fill the gap by making this part of the Beam model and declaring
> that all runners should implement this behavior. This will give nice
> properties, e.g.:
> - A pipeline that has only bounded collections can be run by any runner in
> any mode, with the same results and termination behavior (this is actually
> my motivating example for raising this issue is: I was running Splittable
> DoFn tests
> <https://github.com/apache/beam/blob/master/sdks/java/
> core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java>
> with the streaming Dataflow runner - these tests produce only bounded
> collections - and noticed that they wouldn't terminate even though all data
> was processed)
> - It will be possible to implement pipelines that stream data for a while
> and then eventually successfully terminate based on some condition. E.g. a
> pipeline that watches a continuously growing file until it is marked
> read-only, or a pipeline that reads a Kafka topic partition until it
> receives a "poison pill" message. This seems handy.
>

Reply via email to