+1 Having a unified termination semantics for all runners is super important.
Stas or Aviem, is it feasible to do this for the Spark runner or the timeout is due to a technical limitation of spark. Thomas Weise, Aljoscha anything to say on this? Aljoscha, what is the current status for the Flink runner. is there any progress towards BEAM-593 ? On Tue, Apr 18, 2017 at 5:05 PM, Stas Levin <[email protected]> wrote: > Ted, the timeout is needed mostly for testing purposes. > AFAIK there is no easy way to express the fact a source is "done" in a > Spark native streaming application. > Moreover, the Spark streaming "native" flow can either "awaitTermination()" > or "awaitTerminationOrTimeout(...)". If you "awaitTermination" then you're > blocked until the execution is either stopped or has failed, so if you wish > to stop the app sooner, say after a certain period of time, > "awaitTerminationOrTimeout(...)" may be the way to go. > > Using the unified approach discussed in this thread, when a source is > "done" (i.e. the watermark is +Infinity) the app (e.g. runner) would > gracefully stop. > > > > On Tue, Apr 18, 2017 at 3:19 PM Ted Yu <[email protected]> wrote: > >> Why is the timeout needed for Spark ? >> >> Thanks >> >> > On Apr 18, 2017, at 3:05 AM, Etienne Chauchot <[email protected]> >> wrote: >> > >> > +1 on "runners really terminate in a timely manner to easily >> programmatically orchestrate Beam pipelines in a portable way, you do need >> to know whether >> > the pipeline will finish without thinking about the specific runner and >> its options" >> > >> > As an example, in Nexmark, we have streaming mode tests, and for the >> benchmark, we need all the queries to behave the same between runners >> towards termination. >> > >> > For now, to have the consistent behavior, in this mode we need to set a >> timeout (a bit random and flaky) on waitUntilFinish() for spark but this >> timeout is not needed for direct runner. >> > >> > Etienne >> > >> >> Le 02/03/2017 à 19:27, Kenneth Knowles a écrit : >> >> Isn't this already the case? I think semantically it is an unavoidable >> >> conclusion, so certainly +1 to that. >> >> >> >> The DirectRunner and TestDataflowRunner both have this behavior already. >> >> I've always considered that a streaming job running forever is just >> [very] >> >> suboptimal shutdown latency :-) >> >> >> >> Some bits of the discussion on the ticket seem to surround whether or >> how >> >> to communicate this property in a generic way. Since a runner owns its >> >> PipelineResult it doesn't seem necessary. >> >> >> >> So is the bottom line just that you want to more strongly insist that >> >> runners really terminate in a timely manner? I'm +1 to that, too, for >> >> basically the reason Stas gives: In order to easily programmatically >> >> orchestrate Beam pipelines in a portable way, you do need to know >> whether >> >> the pipeline will finish without thinking about the specific runner and >> its >> >> options (as with our RunnableOnService tests). >> >> >> >> Kenn >> >> >> >> On Thu, Mar 2, 2017 at 9:09 AM, Dan Halperin >> <[email protected]> >> >> wrote: >> >> >> >>> Note that even "unbounded pipeline in a streaming >> runner".waitUntilFinish() >> >>> can return, e.g., if you cancel it or terminate it. It's totally >> reasonable >> >>> for users to want to understand and handle these cases. >> >>> >> >>> +1 >> >>> >> >>> Dan >> >>> >> >>> On Thu, Mar 2, 2017 at 2:53 AM, Jean-Baptiste Onofré <[email protected]> >> >>> wrote: >> >>> >> >>>> +1 >> >>>> >> >>>> Good idea !! >> >>>> >> >>>> Regards >> >>>> JB >> >>>> >> >>>> >> >>>>> On 03/02/2017 02:54 AM, Eugene Kirpichov wrote: >> >>>>> >> >>>>> Raising this onto the mailing list from >> >>>>> https://issues.apache.org/jira/browse/BEAM-849 >> >>>>> >> >>>>> The issue came up: what does it mean for a pipeline to finish, in the >> >>> Beam >> >>>>> model? >> >>>>> >> >>>>> Note that I am deliberately not talking about "batch" and "streaming" >> >>>>> pipelines, because this distinction does not exist in the model. >> Several >> >>>>> runners have batch/streaming *modes*, which implement the same >> semantics >> >>>>> (potentially different subsets: in batch mode typically a runner will >> >>>>> reject pipelines that have at least one unbounded PCollection) but >> in an >> >>>>> operationally different way. However we should define pipeline >> >>> termination >> >>>>> at the level of the unified model, and then make sure that all >> runners >> >>> in >> >>>>> all modes implement that properly. >> >>>>> >> >>>>> One natural way is to say "a pipeline terminates when the output >> >>>>> watermarks >> >>>>> of all of its PCollection's progress to +infinity". (Note: this can >> be >> >>>>> generalized, I guess, to having partial executions of a pipeline: if >> >>>>> you're >> >>>>> interested in the full contents of only some collections, then you >> wait >> >>>>> until only the watermarks of those collections progress to infinity) >> >>>>> >> >>>>> A typical "batch" runner mode does not implement watermarks - we can >> >>> think >> >>>>> of it as assigning watermark -infinity to an output of a transform >> that >> >>>>> hasn't started executing yet, and +infinity to output of a transform >> >>> that >> >>>>> has finished executing. This is consistent with how such runners >> >>> implement >> >>>>> termination in practice. >> >>>>> >> >>>>> Dataflow streaming runner additionally implements such termination >> for >> >>>>> pipeline drain operation: it has 2 parts: 1) stop consuming input >> from >> >>> the >> >>>>> sources, and 2) wait until all watermarks progress to infinity. >> >>>>> >> >>>>> Let us fill the gap by making this part of the Beam model and >> declaring >> >>>>> that all runners should implement this behavior. This will give nice >> >>>>> properties, e.g.: >> >>>>> - A pipeline that has only bounded collections can be run by any >> runner >> >>> in >> >>>>> any mode, with the same results and termination behavior (this is >> >>> actually >> >>>>> my motivating example for raising this issue is: I was running >> >>> Splittable >> >>>>> DoFn tests >> >>>>> <https://github.com/apache/beam/blob/master/sdks/java/core/ >> >>>>> src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java> >> >>>>> with the streaming Dataflow runner - these tests produce only bounded >> >>>>> collections - and noticed that they wouldn't terminate even though >> all >> >>>>> data >> >>>>> was processed) >> >>>>> - It will be possible to implement pipelines that stream data for a >> >>> while >> >>>>> and then eventually successfully terminate based on some condition. >> >>> E.g. a >> >>>>> pipeline that watches a continuously growing file until it is marked >> >>>>> read-only, or a pipeline that reads a Kafka topic partition until it >> >>>>> receives a "poison pill" message. This seems handy. >> >>>> -- >> >>>> Jean-Baptiste Onofré >> >>>> [email protected] >> >>>> http://blog.nanthrax.net >> >>>> Talend - http://www.talend.com >> > >>
