Ted, the timeout is needed mostly for testing purposes.
AFAIK there is no easy way to express the fact a source is "done" in a
Spark native streaming application.
Moreover, the Spark streaming "native" flow can either "awaitTermination()"
or "awaitTerminationOrTimeout(...)". If you "awaitTermination" then you're
blocked until the execution is either stopped or has failed, so if you wish
to stop the app sooner, say after a certain period of time,
"awaitTerminationOrTimeout(...)" may be the way to go.

Using the unified approach discussed in this thread, when a source is
"done" (i.e. the watermark is +Infinity) the app (e.g. runner) would
gracefully stop.



On Tue, Apr 18, 2017 at 3:19 PM Ted Yu <yuzhih...@gmail.com> wrote:

> Why is the timeout needed for Spark ?
>
> Thanks
>
> > On Apr 18, 2017, at 3:05 AM, Etienne Chauchot <echauc...@gmail.com>
> wrote:
> >
> > +1 on "runners really terminate in a timely manner to easily
> programmatically orchestrate Beam pipelines in a portable way, you do need
> to know whether
> > the pipeline will finish without thinking about the specific runner and
> its options"
> >
> > As an example, in Nexmark, we have streaming mode tests, and for the
> benchmark, we need all the queries to behave the same between runners
> towards termination.
> >
> > For now, to have the consistent behavior, in this mode we need to set a
> timeout (a bit random and flaky) on waitUntilFinish() for spark but this
> timeout is not needed for direct runner.
> >
> > Etienne
> >
> >> Le 02/03/2017 à 19:27, Kenneth Knowles a écrit :
> >> Isn't this already the case? I think semantically it is an unavoidable
> >> conclusion, so certainly +1 to that.
> >>
> >> The DirectRunner and TestDataflowRunner both have this behavior already.
> >> I've always considered that a streaming job running forever is just
> [very]
> >> suboptimal shutdown latency :-)
> >>
> >> Some bits of the discussion on the ticket seem to surround whether or
> how
> >> to communicate this property in a generic way. Since a runner owns its
> >> PipelineResult it doesn't seem necessary.
> >>
> >> So is the bottom line just that you want to more strongly insist that
> >> runners really terminate in a timely manner? I'm +1 to that, too, for
> >> basically the reason Stas gives: In order to easily programmatically
> >> orchestrate Beam pipelines in a portable way, you do need to know
> whether
> >> the pipeline will finish without thinking about the specific runner and
> its
> >> options (as with our RunnableOnService tests).
> >>
> >> Kenn
> >>
> >> On Thu, Mar 2, 2017 at 9:09 AM, Dan Halperin
> <dhalp...@google.com.invalid>
> >> wrote:
> >>
> >>> Note that even "unbounded pipeline in a streaming
> runner".waitUntilFinish()
> >>> can return, e.g., if you cancel it or terminate it. It's totally
> reasonable
> >>> for users to want to understand and handle these cases.
> >>>
> >>> +1
> >>>
> >>> Dan
> >>>
> >>> On Thu, Mar 2, 2017 at 2:53 AM, Jean-Baptiste Onofré <j...@nanthrax.net>
> >>> wrote:
> >>>
> >>>> +1
> >>>>
> >>>> Good idea !!
> >>>>
> >>>> Regards
> >>>> JB
> >>>>
> >>>>
> >>>>> On 03/02/2017 02:54 AM, Eugene Kirpichov wrote:
> >>>>>
> >>>>> Raising this onto the mailing list from
> >>>>> https://issues.apache.org/jira/browse/BEAM-849
> >>>>>
> >>>>> The issue came up: what does it mean for a pipeline to finish, in the
> >>> Beam
> >>>>> model?
> >>>>>
> >>>>> Note that I am deliberately not talking about "batch" and "streaming"
> >>>>> pipelines, because this distinction does not exist in the model.
> Several
> >>>>> runners have batch/streaming *modes*, which implement the same
> semantics
> >>>>> (potentially different subsets: in batch mode typically a runner will
> >>>>> reject pipelines that have at least one unbounded PCollection) but
> in an
> >>>>> operationally different way. However we should define pipeline
> >>> termination
> >>>>> at the level of the unified model, and then make sure that all
> runners
> >>> in
> >>>>> all modes implement that properly.
> >>>>>
> >>>>> One natural way is to say "a pipeline terminates when the output
> >>>>> watermarks
> >>>>> of all of its PCollection's progress to +infinity". (Note: this can
> be
> >>>>> generalized, I guess, to having partial executions of a pipeline: if
> >>>>> you're
> >>>>> interested in the full contents of only some collections, then you
> wait
> >>>>> until only the watermarks of those collections progress to infinity)
> >>>>>
> >>>>> A typical "batch" runner mode does not implement watermarks - we can
> >>> think
> >>>>> of it as assigning watermark -infinity to an output of a transform
> that
> >>>>> hasn't started executing yet, and +infinity to output of a transform
> >>> that
> >>>>> has finished executing. This is consistent with how such runners
> >>> implement
> >>>>> termination in practice.
> >>>>>
> >>>>> Dataflow streaming runner additionally implements such termination
> for
> >>>>> pipeline drain operation: it has 2 parts: 1) stop consuming input
> from
> >>> the
> >>>>> sources, and 2) wait until all watermarks progress to infinity.
> >>>>>
> >>>>> Let us fill the gap by making this part of the Beam model and
> declaring
> >>>>> that all runners should implement this behavior. This will give nice
> >>>>> properties, e.g.:
> >>>>> - A pipeline that has only bounded collections can be run by any
> runner
> >>> in
> >>>>> any mode, with the same results and termination behavior (this is
> >>> actually
> >>>>> my motivating example for raising this issue is: I was running
> >>> Splittable
> >>>>> DoFn tests
> >>>>> <https://github.com/apache/beam/blob/master/sdks/java/core/
> >>>>> src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java>
> >>>>> with the streaming Dataflow runner - these tests produce only bounded
> >>>>> collections - and noticed that they wouldn't terminate even though
> all
> >>>>> data
> >>>>> was processed)
> >>>>> - It will be possible to implement pipelines that stream data for a
> >>> while
> >>>>> and then eventually successfully terminate based on some condition.
> >>> E.g. a
> >>>>> pipeline that watches a continuously growing file until it is marked
> >>>>> read-only, or a pipeline that reads a Kafka topic partition until it
> >>>>> receives a "poison pill" message. This seems handy.
> >>>> --
> >>>> Jean-Baptiste Onofré
> >>>> jbono...@apache.org
> >>>> http://blog.nanthrax.net
> >>>> Talend - http://www.talend.com
> >
>

Reply via email to