+1 to what Thomas said: At +infinity all data is droppable so,
philosophically, leaving the pipeline up is just burning CPU. Since
TestStream is a ways off for most runners, we should make sure we have
tests for other sorts of unbounded-but-finite collections to track which
runners this works on.

+1 to Aljoscha: That behavior of waitToFinish(Duration) is correct. It is
expected that run() is mostly asynchronous and waitToFinish() is blocking.
Of course, the amount of work needed to get a pipeline to a detachable
state in run() varies from runner to runner.

On Wed, May 10, 2017 at 8:49 AM, Thomas Groh <tg...@google.com.invalid>
wrote:

> I think that generally this is actually less of a big deal than suggested,
> for a pretty simple reason:
>
> All bounded pipelines are expected to terminate when they are complete.
> Almost all unbounded pipelines are expected to run until explicitly shut
> down.
>
> As a result, shutting down an unbounded pipeline when the watermark reaches
> positive infinity (in a production environment) will occur in practice
> extremely rarely. I'm comfortable saying that the model says that such a
> pipeline should terminate, and only support this in some runners.
>
> I'm also going to copy-paste why I think it's correct to shut down, from
> earlier in the thread:
>
> "I think it's a fair claim that a PCollection is "done" when it's watermark
> reaches positive infinity, and then it's easy to claim that a Pipeline is
> "done" when all of its PCollections are done. Completion is an especially
> reasonable claim if we consider positive infinity to be an actual infinity
> - so long as allowed lateness is a finite value, elements that arrive
> whenever a watermark is at positive infinity will be "infinitely" late, and
> thus can be dropped by the runner."
>
>
> On Wed, May 10, 2017 at 2:26 AM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
> > Hi,
> > A bit of clarification, the Flink Runner does not terminate a Job when
> the
> > timeout is reached in waitUntilFinish(Duration). When we reach the
> timeout
> > we simply return and keep the job running. I thought that was the
> expected
> > behaviour.
> >
> > Regarding job termination, I think it’s easy to change the Flink Runner
> to
> > terminate if the watermark reaches +Inf. We would simply set running to
> > false in the UnboundedSourceWrapper when the watermark reaches +Inf: [1]
> >
> > Best,
> > Aljoscha
> >
> > [1] https://github.com/apache/beam/blob/cec71028ff63c7e1b1565c013ae0e3
> > 78608cb5f9/runners/flink/src/main/java/org/apache/beam/
> > runners/flink/translation/wrappers/streaming/io/
> > UnboundedSourceWrapper.java#L428-L428 <https://github.com/apache/
> > beam/blob/cec71028ff63c7e1b1565c013ae0e378608cb5f9/runners/flink/src/
> > main/java/org/apache/beam/runners/flink/translation/
> wrappers/streaming/io/
> > UnboundedSourceWrapper.java#L428-L428>
> >
> > > On 10. May 2017, at 10:58, Jean-Baptiste Onofré <j...@nanthrax.net>
> wrote:
> > >
> > > OK, now I understand: you are talking about waitUntilFinish(), whereas
> I
> > was thinking about a simple run().
> > >
> > > IMHO spark and flink sound like the most logic behavior for a streaming
> > pipeline.
> > >
> > > Regards
> > > JB
> > >
> > > On 05/10/2017 10:20 AM, Etienne Chauchot wrote:
> > >> Hi everyone,
> > >>
> > >> I'm reopening this subject because, IMHO, it is important to unify
> > pipelines
> > >> termination semantics in the model. Here are the differences I have
> > observed in
> > >> streaming pipelines termination:
> > >>
> > >> - direct runner: when the output watermarks of all of its PCollections
> > progress
> > >> to +infinity
> > >>
> > >> - apex runner: when the output watermarks of all of its PCollections
> > progress to
> > >> +infinity
> > >>
> > >> - dataflow runner: when the output watermarks of all of its
> PCollections
> > >> progress to +infinity
> > >>
> > >> - spark runner: streaming pipelines do not terminate unless timeout is
> > set in
> > >> pipelineResult.waitUntilFinish()
> > >>
> > >> - flink runner: streaming pipelines do not terminate unless timeout is
> > set in
> > >> pipelineResult.waitUntilFinish() (thanks to Aljoscha for timeout
> > support PR
> > >> https://github.com/apache/beam/pull/2915#pullrequestreview-37090326)
> > >>
> > >>
> > >> => Is the direct/apex/dataflow behavior the correct "beam model"
> > behavior?
> > >>
> > >>
> > >> I know that, at least for spark (mails in this thread), there is no
> > easy way to
> > >> know that we're done reading a source, so it might be very difficult
> > (at least
> > >> for this runner) to unify toward +infinity behavior if it is chosen as
> > the
> > >> standard behavior.
> > >>
> > >> Best
> > >>
> > >> Etienne
> > >>
> > >> Le 18/04/2017 à 12:05, Etienne Chauchot a écrit :
> > >>> +1 on "runners really terminate in a timely manner to easily
> > programmatically
> > >>> orchestrate Beam pipelines in a portable way, you do need to know
> > whether
> > >>> the pipeline will finish without thinking about the specific runner
> > and its
> > >>> options"
> > >>>
> > >>> As an example, in Nexmark, we have streaming mode tests, and for the
> > >>> benchmark, we need all the queries to behave the same between runners
> > towards
> > >>> termination.
> > >>>
> > >>> For now, to have the consistent behavior, in this mode we need to
> set a
> > >>> timeout (a bit random and flaky) on waitUntilFinish() for spark but
> > this
> > >>> timeout is not needed for direct runner.
> > >>>
> > >>> Etienne
> > >>>
> > >>> Le 02/03/2017 à 19:27, Kenneth Knowles a écrit :
> > >>>> Isn't this already the case? I think semantically it is an
> unavoidable
> > >>>> conclusion, so certainly +1 to that.
> > >>>>
> > >>>> The DirectRunner and TestDataflowRunner both have this behavior
> > already.
> > >>>> I've always considered that a streaming job running forever is just
> > [very]
> > >>>> suboptimal shutdown latency :-)
> > >>>>
> > >>>> Some bits of the discussion on the ticket seem to surround whether
> or
> > how
> > >>>> to communicate this property in a generic way. Since a runner owns
> its
> > >>>> PipelineResult it doesn't seem necessary.
> > >>>>
> > >>>> So is the bottom line just that you want to more strongly insist
> that
> > >>>> runners really terminate in a timely manner? I'm +1 to that, too,
> for
> > >>>> basically the reason Stas gives: In order to easily programmatically
> > >>>> orchestrate Beam pipelines in a portable way, you do need to know
> > whether
> > >>>> the pipeline will finish without thinking about the specific runner
> > and its
> > >>>> options (as with our RunnableOnService tests).
> > >>>>
> > >>>> Kenn
> > >>>>
> > >>>> On Thu, Mar 2, 2017 at 9:09 AM, Dan Halperin
> > <dhalp...@google.com.invalid>
> > >>>> wrote:
> > >>>>
> > >>>>> Note that even "unbounded pipeline in a streaming
> > runner".waitUntilFinish()
> > >>>>> can return, e.g., if you cancel it or terminate it. It's totally
> > reasonable
> > >>>>> for users to want to understand and handle these cases.
> > >>>>>
> > >>>>> +1
> > >>>>>
> > >>>>> Dan
> > >>>>>
> > >>>>> On Thu, Mar 2, 2017 at 2:53 AM, Jean-Baptiste Onofré <
> > j...@nanthrax.net>
> > >>>>> wrote:
> > >>>>>
> > >>>>>> +1
> > >>>>>>
> > >>>>>> Good idea !!
> > >>>>>>
> > >>>>>> Regards
> > >>>>>> JB
> > >>>>>>
> > >>>>>>
> > >>>>>> On 03/02/2017 02:54 AM, Eugene Kirpichov wrote:
> > >>>>>>
> > >>>>>>> Raising this onto the mailing list from
> > >>>>>>> https://issues.apache.org/jira/browse/BEAM-849
> > >>>>>>>
> > >>>>>>> The issue came up: what does it mean for a pipeline to finish, in
> > the
> > >>>>> Beam
> > >>>>>>> model?
> > >>>>>>>
> > >>>>>>> Note that I am deliberately not talking about "batch" and
> > "streaming"
> > >>>>>>> pipelines, because this distinction does not exist in the model.
> > Several
> > >>>>>>> runners have batch/streaming *modes*, which implement the same
> > semantics
> > >>>>>>> (potentially different subsets: in batch mode typically a runner
> > will
> > >>>>>>> reject pipelines that have at least one unbounded PCollection)
> but
> > in an
> > >>>>>>> operationally different way. However we should define pipeline
> > >>>>> termination
> > >>>>>>> at the level of the unified model, and then make sure that all
> > runners
> > >>>>> in
> > >>>>>>> all modes implement that properly.
> > >>>>>>>
> > >>>>>>> One natural way is to say "a pipeline terminates when the output
> > >>>>>>> watermarks
> > >>>>>>> of all of its PCollection's progress to +infinity". (Note: this
> > can be
> > >>>>>>> generalized, I guess, to having partial executions of a pipeline:
> > if
> > >>>>>>> you're
> > >>>>>>> interested in the full contents of only some collections, then
> you
> > wait
> > >>>>>>> until only the watermarks of those collections progress to
> > infinity)
> > >>>>>>>
> > >>>>>>> A typical "batch" runner mode does not implement watermarks - we
> > can
> > >>>>> think
> > >>>>>>> of it as assigning watermark -infinity to an output of a
> transform
> > that
> > >>>>>>> hasn't started executing yet, and +infinity to output of a
> > transform
> > >>>>> that
> > >>>>>>> has finished executing. This is consistent with how such runners
> > >>>>> implement
> > >>>>>>> termination in practice.
> > >>>>>>>
> > >>>>>>> Dataflow streaming runner additionally implements such
> termination
> > for
> > >>>>>>> pipeline drain operation: it has 2 parts: 1) stop consuming input
> > from
> > >>>>> the
> > >>>>>>> sources, and 2) wait until all watermarks progress to infinity.
> > >>>>>>>
> > >>>>>>> Let us fill the gap by making this part of the Beam model and
> > declaring
> > >>>>>>> that all runners should implement this behavior. This will give
> > nice
> > >>>>>>> properties, e.g.:
> > >>>>>>> - A pipeline that has only bounded collections can be run by any
> > runner
> > >>>>> in
> > >>>>>>> any mode, with the same results and termination behavior (this is
> > >>>>> actually
> > >>>>>>> my motivating example for raising this issue is: I was running
> > >>>>> Splittable
> > >>>>>>> DoFn tests
> > >>>>>>> <https://github.com/apache/beam/blob/master/sdks/java/core/
> > >>>>>>> src/test/java/org/apache/beam/sdk/transforms/
> > SplittableDoFnTest.java>
> > >>>>>>> with the streaming Dataflow runner - these tests produce only
> > bounded
> > >>>>>>> collections - and noticed that they wouldn't terminate even
> though
> > all
> > >>>>>>> data
> > >>>>>>> was processed)
> > >>>>>>> - It will be possible to implement pipelines that stream data
> for a
> > >>>>> while
> > >>>>>>> and then eventually successfully terminate based on some
> condition.
> > >>>>> E.g. a
> > >>>>>>> pipeline that watches a continuously growing file until it is
> > marked
> > >>>>>>> read-only, or a pipeline that reads a Kafka topic partition until
> > it
> > >>>>>>> receives a "poison pill" message. This seems handy.
> > >>>>>>>
> > >>>>>>>
> > >>>>>> --
> > >>>>>> Jean-Baptiste Onofré
> > >>>>>> jbono...@apache.org
> > >>>>>> http://blog.nanthrax.net
> > >>>>>> Talend - http://www.talend.com
> > >>>>>>
> > >>>
> > >>
> > >
> > > --
> > > Jean-Baptiste Onofré
> > > jbono...@apache.org
> > > http://blog.nanthrax.net
> > > Talend - http://www.talend.com
> >
> >
>

Reply via email to