+1 on Eugene's words - this shows how batch is conceptually a subset of a
streaming problem.
I also believe that Stas has a very good point on education - we have to
try and understand developer's current perspective and try to make the
transition to the Beam model as natural as possible for new users.
In addition to good documentation and examples, I think that
https://issues.apache.org/jira/browse/BEAM-849 is critical, as this is the
user's end-point to the behaviours discussed here, and so it should be:
* clear and concise - pipeline state at any point should be informative.
* well documented - documentation, examples, and use-cases (e.g., Eugene's
"poison pill").
* strict API for runners - joining Stas' not on unified implementation for
portability.

On Thu, Mar 2, 2017 at 8:49 PM Eugene Kirpichov
<kirpic...@google.com.invalid> wrote:

> OK, I'm glad everybody is in agreement on this. I raised this point because
> we've been discussing implementing this behavior in the Dataflow streaming
> runner, and I wanted to make sure that people are okay with it from a
> conceptual point of view before proceeding.
>
> On Thu, Mar 2, 2017 at 10:27 AM Kenneth Knowles <k...@google.com.invalid>
> wrote:
>
> Isn't this already the case? I think semantically it is an unavoidable
> conclusion, so certainly +1 to that.
>
> The DirectRunner and TestDataflowRunner both have this behavior already.
> I've always considered that a streaming job running forever is just [very]
> suboptimal shutdown latency :-)
>
> Some bits of the discussion on the ticket seem to surround whether or how
> to communicate this property in a generic way. Since a runner owns its
> PipelineResult it doesn't seem necessary.
>
> So is the bottom line just that you want to more strongly insist that
> runners really terminate in a timely manner? I'm +1 to that, too, for
> basically the reason Stas gives: In order to easily programmatically
> orchestrate Beam pipelines in a portable way, you do need to know whether
> the pipeline will finish without thinking about the specific runner and its
> options (as with our RunnableOnService tests).
>
> Kenn
>
> On Thu, Mar 2, 2017 at 9:09 AM, Dan Halperin <dhalp...@google.com.invalid>
> wrote:
>
> > Note that even "unbounded pipeline in a streaming
> runner".waitUntilFinish()
> > can return, e.g., if you cancel it or terminate it. It's totally
> reasonable
> > for users to want to understand and handle these cases.
> >
> > +1
> >
> > Dan
> >
> > On Thu, Mar 2, 2017 at 2:53 AM, Jean-Baptiste Onofré <j...@nanthrax.net>
> > wrote:
> >
> > > +1
> > >
> > > Good idea !!
> > >
> > > Regards
> > > JB
> > >
> > >
> > > On 03/02/2017 02:54 AM, Eugene Kirpichov wrote:
> > >
> > >> Raising this onto the mailing list from
> > >> https://issues.apache.org/jira/browse/BEAM-849
> > >>
> > >> The issue came up: what does it mean for a pipeline to finish, in the
> > Beam
> > >> model?
> > >>
> > >> Note that I am deliberately not talking about "batch" and "streaming"
> > >> pipelines, because this distinction does not exist in the model.
> Several
> > >> runners have batch/streaming *modes*, which implement the same
> semantics
> > >> (potentially different subsets: in batch mode typically a runner will
> > >> reject pipelines that have at least one unbounded PCollection) but in
> an
> > >> operationally different way. However we should define pipeline
> > termination
> > >> at the level of the unified model, and then make sure that all runners
> > in
> > >> all modes implement that properly.
> > >>
> > >> One natural way is to say "a pipeline terminates when the output
> > >> watermarks
> > >> of all of its PCollection's progress to +infinity". (Note: this can be
> > >> generalized, I guess, to having partial executions of a pipeline: if
> > >> you're
> > >> interested in the full contents of only some collections, then you
> wait
> > >> until only the watermarks of those collections progress to infinity)
> > >>
> > >> A typical "batch" runner mode does not implement watermarks - we can
> > think
> > >> of it as assigning watermark -infinity to an output of a transform
> that
> > >> hasn't started executing yet, and +infinity to output of a transform
> > that
> > >> has finished executing. This is consistent with how such runners
> > implement
> > >> termination in practice.
> > >>
> > >> Dataflow streaming runner additionally implements such termination for
> > >> pipeline drain operation: it has 2 parts: 1) stop consuming input from
> > the
> > >> sources, and 2) wait until all watermarks progress to infinity.
> > >>
> > >> Let us fill the gap by making this part of the Beam model and
> declaring
> > >> that all runners should implement this behavior. This will give nice
> > >> properties, e.g.:
> > >> - A pipeline that has only bounded collections can be run by any
> runner
> > in
> > >> any mode, with the same results and termination behavior (this is
> > actually
> > >> my motivating example for raising this issue is: I was running
> > Splittable
> > >> DoFn tests
> > >> <https://github.com/apache/beam/blob/master/sdks/java/core/
> > >> src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java>
> > >> with the streaming Dataflow runner - these tests produce only bounded
> > >> collections - and noticed that they wouldn't terminate even though all
> > >> data
> > >> was processed)
> > >> - It will be possible to implement pipelines that stream data for a
> > while
> > >> and then eventually successfully terminate based on some condition.
> > E.g. a
> > >> pipeline that watches a continuously growing file until it is marked
> > >> read-only, or a pipeline that reads a Kafka topic partition until it
> > >> receives a "poison pill" message. This seems handy.
> > >>
> > >>
> > > --
> > > Jean-Baptiste Onofré
> > > jbono...@apache.org
> > > http://blog.nanthrax.net
> > > Talend - http://www.talend.com
> > >
> >
>

Reply via email to