+1!

I think it's a very cool way to abstract away the batch vs. streaming
dissonance from the Beam model.

It does require that practitioners are *educated* to think this way as well.
I believe that nowadays the terms "batch" and "streaming" are so deeply
rooted, that they play a key role in the users' mental model. For example,
these terms are often employed to reason about whether
"pipeline.waitUntilFinish(...)" is expected to ever return (batch - yes,
streaming - not so much).

The approach Eugene advocates turns the question of whether an execution of
a pipeline ever finishes into a property of the pipeline (i.e., its
sources), instead of a property of the runner (i.e., whether it runs as
batch or streaming).

This sounds like something that makes the world a better place, and may
help address some of the major points discussed in
https://issues.apache.org/jira/browse/BEAM-849.


On Thu, Mar 2, 2017 at 4:03 AM Thomas Groh <tg...@google.com.invalid> wrote:

> +1
>
> I think it's a fair claim that a PCollection is "done" when it's watermark
> reaches positive infinity, and then it's easy to claim that a Pipeline is
> "done" when all of its PCollections are done. Completion is an especially
> reasonable claim if we consider positive infinity to be an actual infinity
> - so long as allowed lateness is a finite value, elements that arrive
> whenever a watermark is at positive infinity will be "infinitely" late, and
> thus can be dropped by the runner.
>
> As an aside, this is only about "finishing because the pipeline is
> complete" - it's unrelated to "finished because of an unrecoverable error"
> or similar reasons pipelines can stop running, yes?
>
> On Wed, Mar 1, 2017 at 5:54 PM, Eugene Kirpichov <
> kirpic...@google.com.invalid> wrote:
>
> > Raising this onto the mailing list from
> > https://issues.apache.org/jira/browse/BEAM-849
> >
> > The issue came up: what does it mean for a pipeline to finish, in the
> Beam
> > model?
> >
> > Note that I am deliberately not talking about "batch" and "streaming"
> > pipelines, because this distinction does not exist in the model. Several
> > runners have batch/streaming *modes*, which implement the same semantics
> > (potentially different subsets: in batch mode typically a runner will
> > reject pipelines that have at least one unbounded PCollection) but in an
> > operationally different way. However we should define pipeline
> termination
> > at the level of the unified model, and then make sure that all runners in
> > all modes implement that properly.
> >
> > One natural way is to say "a pipeline terminates when the output
> watermarks
> > of all of its PCollection's progress to +infinity". (Note: this can be
> > generalized, I guess, to having partial executions of a pipeline: if
> you're
> > interested in the full contents of only some collections, then you wait
> > until only the watermarks of those collections progress to infinity)
> >
> > A typical "batch" runner mode does not implement watermarks - we can
> think
> > of it as assigning watermark -infinity to an output of a transform that
> > hasn't started executing yet, and +infinity to output of a transform that
> > has finished executing. This is consistent with how such runners
> implement
> > termination in practice.
> >
> > Dataflow streaming runner additionally implements such termination for
> > pipeline drain operation: it has 2 parts: 1) stop consuming input from
> the
> > sources, and 2) wait until all watermarks progress to infinity.
> >
> > Let us fill the gap by making this part of the Beam model and declaring
> > that all runners should implement this behavior. This will give nice
> > properties, e.g.:
> > - A pipeline that has only bounded collections can be run by any runner
> in
> > any mode, with the same results and termination behavior (this is
> actually
> > my motivating example for raising this issue is: I was running Splittable
> > DoFn tests
> > <https://github.com/apache/beam/blob/master/sdks/java/
> >
> core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java>
> > with the streaming Dataflow runner - these tests produce only bounded
> > collections - and noticed that they wouldn't terminate even though all
> data
> > was processed)
> > - It will be possible to implement pipelines that stream data for a while
> > and then eventually successfully terminate based on some condition. E.g.
> a
> > pipeline that watches a continuously growing file until it is marked
> > read-only, or a pipeline that reads a Kafka topic partition until it
> > receives a "poison pill" message. This seems handy.
> >
>

Reply via email to