I think that generally this is actually less of a big deal than suggested, for a pretty simple reason:
All bounded pipelines are expected to terminate when they are complete. Almost all unbounded pipelines are expected to run until explicitly shut down. As a result, shutting down an unbounded pipeline when the watermark reaches positive infinity (in a production environment) will occur in practice extremely rarely. I'm comfortable saying that the model says that such a pipeline should terminate, and only support this in some runners. I'm also going to copy-paste why I think it's correct to shut down, from earlier in the thread: "I think it's a fair claim that a PCollection is "done" when it's watermark reaches positive infinity, and then it's easy to claim that a Pipeline is "done" when all of its PCollections are done. Completion is an especially reasonable claim if we consider positive infinity to be an actual infinity - so long as allowed lateness is a finite value, elements that arrive whenever a watermark is at positive infinity will be "infinitely" late, and thus can be dropped by the runner." On Wed, May 10, 2017 at 2:26 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > A bit of clarification, the Flink Runner does not terminate a Job when the > timeout is reached in waitUntilFinish(Duration). When we reach the timeout > we simply return and keep the job running. I thought that was the expected > behaviour. > > Regarding job termination, I think it’s easy to change the Flink Runner to > terminate if the watermark reaches +Inf. We would simply set running to > false in the UnboundedSourceWrapper when the watermark reaches +Inf: [1] > > Best, > Aljoscha > > [1] https://github.com/apache/beam/blob/cec71028ff63c7e1b1565c013ae0e3 > 78608cb5f9/runners/flink/src/main/java/org/apache/beam/ > runners/flink/translation/wrappers/streaming/io/ > UnboundedSourceWrapper.java#L428-L428 <https://github.com/apache/ > beam/blob/cec71028ff63c7e1b1565c013ae0e378608cb5f9/runners/flink/src/ > main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/ > UnboundedSourceWrapper.java#L428-L428> > > > On 10. May 2017, at 10:58, Jean-Baptiste Onofré <j...@nanthrax.net> wrote: > > > > OK, now I understand: you are talking about waitUntilFinish(), whereas I > was thinking about a simple run(). > > > > IMHO spark and flink sound like the most logic behavior for a streaming > pipeline. > > > > Regards > > JB > > > > On 05/10/2017 10:20 AM, Etienne Chauchot wrote: > >> Hi everyone, > >> > >> I'm reopening this subject because, IMHO, it is important to unify > pipelines > >> termination semantics in the model. Here are the differences I have > observed in > >> streaming pipelines termination: > >> > >> - direct runner: when the output watermarks of all of its PCollections > progress > >> to +infinity > >> > >> - apex runner: when the output watermarks of all of its PCollections > progress to > >> +infinity > >> > >> - dataflow runner: when the output watermarks of all of its PCollections > >> progress to +infinity > >> > >> - spark runner: streaming pipelines do not terminate unless timeout is > set in > >> pipelineResult.waitUntilFinish() > >> > >> - flink runner: streaming pipelines do not terminate unless timeout is > set in > >> pipelineResult.waitUntilFinish() (thanks to Aljoscha for timeout > support PR > >> https://github.com/apache/beam/pull/2915#pullrequestreview-37090326) > >> > >> > >> => Is the direct/apex/dataflow behavior the correct "beam model" > behavior? > >> > >> > >> I know that, at least for spark (mails in this thread), there is no > easy way to > >> know that we're done reading a source, so it might be very difficult > (at least > >> for this runner) to unify toward +infinity behavior if it is chosen as > the > >> standard behavior. > >> > >> Best > >> > >> Etienne > >> > >> Le 18/04/2017 à 12:05, Etienne Chauchot a écrit : > >>> +1 on "runners really terminate in a timely manner to easily > programmatically > >>> orchestrate Beam pipelines in a portable way, you do need to know > whether > >>> the pipeline will finish without thinking about the specific runner > and its > >>> options" > >>> > >>> As an example, in Nexmark, we have streaming mode tests, and for the > >>> benchmark, we need all the queries to behave the same between runners > towards > >>> termination. > >>> > >>> For now, to have the consistent behavior, in this mode we need to set a > >>> timeout (a bit random and flaky) on waitUntilFinish() for spark but > this > >>> timeout is not needed for direct runner. > >>> > >>> Etienne > >>> > >>> Le 02/03/2017 à 19:27, Kenneth Knowles a écrit : > >>>> Isn't this already the case? I think semantically it is an unavoidable > >>>> conclusion, so certainly +1 to that. > >>>> > >>>> The DirectRunner and TestDataflowRunner both have this behavior > already. > >>>> I've always considered that a streaming job running forever is just > [very] > >>>> suboptimal shutdown latency :-) > >>>> > >>>> Some bits of the discussion on the ticket seem to surround whether or > how > >>>> to communicate this property in a generic way. Since a runner owns its > >>>> PipelineResult it doesn't seem necessary. > >>>> > >>>> So is the bottom line just that you want to more strongly insist that > >>>> runners really terminate in a timely manner? I'm +1 to that, too, for > >>>> basically the reason Stas gives: In order to easily programmatically > >>>> orchestrate Beam pipelines in a portable way, you do need to know > whether > >>>> the pipeline will finish without thinking about the specific runner > and its > >>>> options (as with our RunnableOnService tests). > >>>> > >>>> Kenn > >>>> > >>>> On Thu, Mar 2, 2017 at 9:09 AM, Dan Halperin > <dhalp...@google.com.invalid> > >>>> wrote: > >>>> > >>>>> Note that even "unbounded pipeline in a streaming > runner".waitUntilFinish() > >>>>> can return, e.g., if you cancel it or terminate it. It's totally > reasonable > >>>>> for users to want to understand and handle these cases. > >>>>> > >>>>> +1 > >>>>> > >>>>> Dan > >>>>> > >>>>> On Thu, Mar 2, 2017 at 2:53 AM, Jean-Baptiste Onofré < > j...@nanthrax.net> > >>>>> wrote: > >>>>> > >>>>>> +1 > >>>>>> > >>>>>> Good idea !! > >>>>>> > >>>>>> Regards > >>>>>> JB > >>>>>> > >>>>>> > >>>>>> On 03/02/2017 02:54 AM, Eugene Kirpichov wrote: > >>>>>> > >>>>>>> Raising this onto the mailing list from > >>>>>>> https://issues.apache.org/jira/browse/BEAM-849 > >>>>>>> > >>>>>>> The issue came up: what does it mean for a pipeline to finish, in > the > >>>>> Beam > >>>>>>> model? > >>>>>>> > >>>>>>> Note that I am deliberately not talking about "batch" and > "streaming" > >>>>>>> pipelines, because this distinction does not exist in the model. > Several > >>>>>>> runners have batch/streaming *modes*, which implement the same > semantics > >>>>>>> (potentially different subsets: in batch mode typically a runner > will > >>>>>>> reject pipelines that have at least one unbounded PCollection) but > in an > >>>>>>> operationally different way. However we should define pipeline > >>>>> termination > >>>>>>> at the level of the unified model, and then make sure that all > runners > >>>>> in > >>>>>>> all modes implement that properly. > >>>>>>> > >>>>>>> One natural way is to say "a pipeline terminates when the output > >>>>>>> watermarks > >>>>>>> of all of its PCollection's progress to +infinity". (Note: this > can be > >>>>>>> generalized, I guess, to having partial executions of a pipeline: > if > >>>>>>> you're > >>>>>>> interested in the full contents of only some collections, then you > wait > >>>>>>> until only the watermarks of those collections progress to > infinity) > >>>>>>> > >>>>>>> A typical "batch" runner mode does not implement watermarks - we > can > >>>>> think > >>>>>>> of it as assigning watermark -infinity to an output of a transform > that > >>>>>>> hasn't started executing yet, and +infinity to output of a > transform > >>>>> that > >>>>>>> has finished executing. This is consistent with how such runners > >>>>> implement > >>>>>>> termination in practice. > >>>>>>> > >>>>>>> Dataflow streaming runner additionally implements such termination > for > >>>>>>> pipeline drain operation: it has 2 parts: 1) stop consuming input > from > >>>>> the > >>>>>>> sources, and 2) wait until all watermarks progress to infinity. > >>>>>>> > >>>>>>> Let us fill the gap by making this part of the Beam model and > declaring > >>>>>>> that all runners should implement this behavior. This will give > nice > >>>>>>> properties, e.g.: > >>>>>>> - A pipeline that has only bounded collections can be run by any > runner > >>>>> in > >>>>>>> any mode, with the same results and termination behavior (this is > >>>>> actually > >>>>>>> my motivating example for raising this issue is: I was running > >>>>> Splittable > >>>>>>> DoFn tests > >>>>>>> <https://github.com/apache/beam/blob/master/sdks/java/core/ > >>>>>>> src/test/java/org/apache/beam/sdk/transforms/ > SplittableDoFnTest.java> > >>>>>>> with the streaming Dataflow runner - these tests produce only > bounded > >>>>>>> collections - and noticed that they wouldn't terminate even though > all > >>>>>>> data > >>>>>>> was processed) > >>>>>>> - It will be possible to implement pipelines that stream data for a > >>>>> while > >>>>>>> and then eventually successfully terminate based on some condition. > >>>>> E.g. a > >>>>>>> pipeline that watches a continuously growing file until it is > marked > >>>>>>> read-only, or a pipeline that reads a Kafka topic partition until > it > >>>>>>> receives a "poison pill" message. This seems handy. > >>>>>>> > >>>>>>> > >>>>>> -- > >>>>>> Jean-Baptiste Onofré > >>>>>> jbono...@apache.org > >>>>>> http://blog.nanthrax.net > >>>>>> Talend - http://www.talend.com > >>>>>> > >>> > >> > > > > -- > > Jean-Baptiste Onofré > > jbono...@apache.org > > http://blog.nanthrax.net > > Talend - http://www.talend.com > >