I think that generally this is actually less of a big deal than suggested,
for a pretty simple reason:

All bounded pipelines are expected to terminate when they are complete.
Almost all unbounded pipelines are expected to run until explicitly shut
down.

As a result, shutting down an unbounded pipeline when the watermark reaches
positive infinity (in a production environment) will occur in practice
extremely rarely. I'm comfortable saying that the model says that such a
pipeline should terminate, and only support this in some runners.

I'm also going to copy-paste why I think it's correct to shut down, from
earlier in the thread:

"I think it's a fair claim that a PCollection is "done" when it's watermark
reaches positive infinity, and then it's easy to claim that a Pipeline is
"done" when all of its PCollections are done. Completion is an especially
reasonable claim if we consider positive infinity to be an actual infinity
- so long as allowed lateness is a finite value, elements that arrive
whenever a watermark is at positive infinity will be "infinitely" late, and
thus can be dropped by the runner."


On Wed, May 10, 2017 at 2:26 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
> A bit of clarification, the Flink Runner does not terminate a Job when the
> timeout is reached in waitUntilFinish(Duration). When we reach the timeout
> we simply return and keep the job running. I thought that was the expected
> behaviour.
>
> Regarding job termination, I think it’s easy to change the Flink Runner to
> terminate if the watermark reaches +Inf. We would simply set running to
> false in the UnboundedSourceWrapper when the watermark reaches +Inf: [1]
>
> Best,
> Aljoscha
>
> [1] https://github.com/apache/beam/blob/cec71028ff63c7e1b1565c013ae0e3
> 78608cb5f9/runners/flink/src/main/java/org/apache/beam/
> runners/flink/translation/wrappers/streaming/io/
> UnboundedSourceWrapper.java#L428-L428 <https://github.com/apache/
> beam/blob/cec71028ff63c7e1b1565c013ae0e378608cb5f9/runners/flink/src/
> main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/
> UnboundedSourceWrapper.java#L428-L428>
>
> > On 10. May 2017, at 10:58, Jean-Baptiste Onofré <j...@nanthrax.net> wrote:
> >
> > OK, now I understand: you are talking about waitUntilFinish(), whereas I
> was thinking about a simple run().
> >
> > IMHO spark and flink sound like the most logic behavior for a streaming
> pipeline.
> >
> > Regards
> > JB
> >
> > On 05/10/2017 10:20 AM, Etienne Chauchot wrote:
> >> Hi everyone,
> >>
> >> I'm reopening this subject because, IMHO, it is important to unify
> pipelines
> >> termination semantics in the model. Here are the differences I have
> observed in
> >> streaming pipelines termination:
> >>
> >> - direct runner: when the output watermarks of all of its PCollections
> progress
> >> to +infinity
> >>
> >> - apex runner: when the output watermarks of all of its PCollections
> progress to
> >> +infinity
> >>
> >> - dataflow runner: when the output watermarks of all of its PCollections
> >> progress to +infinity
> >>
> >> - spark runner: streaming pipelines do not terminate unless timeout is
> set in
> >> pipelineResult.waitUntilFinish()
> >>
> >> - flink runner: streaming pipelines do not terminate unless timeout is
> set in
> >> pipelineResult.waitUntilFinish() (thanks to Aljoscha for timeout
> support PR
> >> https://github.com/apache/beam/pull/2915#pullrequestreview-37090326)
> >>
> >>
> >> => Is the direct/apex/dataflow behavior the correct "beam model"
> behavior?
> >>
> >>
> >> I know that, at least for spark (mails in this thread), there is no
> easy way to
> >> know that we're done reading a source, so it might be very difficult
> (at least
> >> for this runner) to unify toward +infinity behavior if it is chosen as
> the
> >> standard behavior.
> >>
> >> Best
> >>
> >> Etienne
> >>
> >> Le 18/04/2017 à 12:05, Etienne Chauchot a écrit :
> >>> +1 on "runners really terminate in a timely manner to easily
> programmatically
> >>> orchestrate Beam pipelines in a portable way, you do need to know
> whether
> >>> the pipeline will finish without thinking about the specific runner
> and its
> >>> options"
> >>>
> >>> As an example, in Nexmark, we have streaming mode tests, and for the
> >>> benchmark, we need all the queries to behave the same between runners
> towards
> >>> termination.
> >>>
> >>> For now, to have the consistent behavior, in this mode we need to set a
> >>> timeout (a bit random and flaky) on waitUntilFinish() for spark but
> this
> >>> timeout is not needed for direct runner.
> >>>
> >>> Etienne
> >>>
> >>> Le 02/03/2017 à 19:27, Kenneth Knowles a écrit :
> >>>> Isn't this already the case? I think semantically it is an unavoidable
> >>>> conclusion, so certainly +1 to that.
> >>>>
> >>>> The DirectRunner and TestDataflowRunner both have this behavior
> already.
> >>>> I've always considered that a streaming job running forever is just
> [very]
> >>>> suboptimal shutdown latency :-)
> >>>>
> >>>> Some bits of the discussion on the ticket seem to surround whether or
> how
> >>>> to communicate this property in a generic way. Since a runner owns its
> >>>> PipelineResult it doesn't seem necessary.
> >>>>
> >>>> So is the bottom line just that you want to more strongly insist that
> >>>> runners really terminate in a timely manner? I'm +1 to that, too, for
> >>>> basically the reason Stas gives: In order to easily programmatically
> >>>> orchestrate Beam pipelines in a portable way, you do need to know
> whether
> >>>> the pipeline will finish without thinking about the specific runner
> and its
> >>>> options (as with our RunnableOnService tests).
> >>>>
> >>>> Kenn
> >>>>
> >>>> On Thu, Mar 2, 2017 at 9:09 AM, Dan Halperin
> <dhalp...@google.com.invalid>
> >>>> wrote:
> >>>>
> >>>>> Note that even "unbounded pipeline in a streaming
> runner".waitUntilFinish()
> >>>>> can return, e.g., if you cancel it or terminate it. It's totally
> reasonable
> >>>>> for users to want to understand and handle these cases.
> >>>>>
> >>>>> +1
> >>>>>
> >>>>> Dan
> >>>>>
> >>>>> On Thu, Mar 2, 2017 at 2:53 AM, Jean-Baptiste Onofré <
> j...@nanthrax.net>
> >>>>> wrote:
> >>>>>
> >>>>>> +1
> >>>>>>
> >>>>>> Good idea !!
> >>>>>>
> >>>>>> Regards
> >>>>>> JB
> >>>>>>
> >>>>>>
> >>>>>> On 03/02/2017 02:54 AM, Eugene Kirpichov wrote:
> >>>>>>
> >>>>>>> Raising this onto the mailing list from
> >>>>>>> https://issues.apache.org/jira/browse/BEAM-849
> >>>>>>>
> >>>>>>> The issue came up: what does it mean for a pipeline to finish, in
> the
> >>>>> Beam
> >>>>>>> model?
> >>>>>>>
> >>>>>>> Note that I am deliberately not talking about "batch" and
> "streaming"
> >>>>>>> pipelines, because this distinction does not exist in the model.
> Several
> >>>>>>> runners have batch/streaming *modes*, which implement the same
> semantics
> >>>>>>> (potentially different subsets: in batch mode typically a runner
> will
> >>>>>>> reject pipelines that have at least one unbounded PCollection) but
> in an
> >>>>>>> operationally different way. However we should define pipeline
> >>>>> termination
> >>>>>>> at the level of the unified model, and then make sure that all
> runners
> >>>>> in
> >>>>>>> all modes implement that properly.
> >>>>>>>
> >>>>>>> One natural way is to say "a pipeline terminates when the output
> >>>>>>> watermarks
> >>>>>>> of all of its PCollection's progress to +infinity". (Note: this
> can be
> >>>>>>> generalized, I guess, to having partial executions of a pipeline:
> if
> >>>>>>> you're
> >>>>>>> interested in the full contents of only some collections, then you
> wait
> >>>>>>> until only the watermarks of those collections progress to
> infinity)
> >>>>>>>
> >>>>>>> A typical "batch" runner mode does not implement watermarks - we
> can
> >>>>> think
> >>>>>>> of it as assigning watermark -infinity to an output of a transform
> that
> >>>>>>> hasn't started executing yet, and +infinity to output of a
> transform
> >>>>> that
> >>>>>>> has finished executing. This is consistent with how such runners
> >>>>> implement
> >>>>>>> termination in practice.
> >>>>>>>
> >>>>>>> Dataflow streaming runner additionally implements such termination
> for
> >>>>>>> pipeline drain operation: it has 2 parts: 1) stop consuming input
> from
> >>>>> the
> >>>>>>> sources, and 2) wait until all watermarks progress to infinity.
> >>>>>>>
> >>>>>>> Let us fill the gap by making this part of the Beam model and
> declaring
> >>>>>>> that all runners should implement this behavior. This will give
> nice
> >>>>>>> properties, e.g.:
> >>>>>>> - A pipeline that has only bounded collections can be run by any
> runner
> >>>>> in
> >>>>>>> any mode, with the same results and termination behavior (this is
> >>>>> actually
> >>>>>>> my motivating example for raising this issue is: I was running
> >>>>> Splittable
> >>>>>>> DoFn tests
> >>>>>>> <https://github.com/apache/beam/blob/master/sdks/java/core/
> >>>>>>> src/test/java/org/apache/beam/sdk/transforms/
> SplittableDoFnTest.java>
> >>>>>>> with the streaming Dataflow runner - these tests produce only
> bounded
> >>>>>>> collections - and noticed that they wouldn't terminate even though
> all
> >>>>>>> data
> >>>>>>> was processed)
> >>>>>>> - It will be possible to implement pipelines that stream data for a
> >>>>> while
> >>>>>>> and then eventually successfully terminate based on some condition.
> >>>>> E.g. a
> >>>>>>> pipeline that watches a continuously growing file until it is
> marked
> >>>>>>> read-only, or a pipeline that reads a Kafka topic partition until
> it
> >>>>>>> receives a "poison pill" message. This seems handy.
> >>>>>>>
> >>>>>>>
> >>>>>> --
> >>>>>> Jean-Baptiste Onofré
> >>>>>> jbono...@apache.org
> >>>>>> http://blog.nanthrax.net
> >>>>>> Talend - http://www.talend.com
> >>>>>>
> >>>
> >>
> >
> > --
> > Jean-Baptiste Onofré
> > jbono...@apache.org
> > http://blog.nanthrax.net
> > Talend - http://www.talend.com
>
>

Reply via email to