+1 to what Thomas said: At +infinity all data is droppable so, philosophically, leaving the pipeline up is just burning CPU. Since TestStream is a ways off for most runners, we should make sure we have tests for other sorts of unbounded-but-finite collections to track which runners this works on.
+1 to Aljoscha: That behavior of waitToFinish(Duration) is correct. It is expected that run() is mostly asynchronous and waitToFinish() is blocking. Of course, the amount of work needed to get a pipeline to a detachable state in run() varies from runner to runner. On Wed, May 10, 2017 at 8:49 AM, Thomas Groh <tg...@google.com.invalid> wrote: > I think that generally this is actually less of a big deal than suggested, > for a pretty simple reason: > > All bounded pipelines are expected to terminate when they are complete. > Almost all unbounded pipelines are expected to run until explicitly shut > down. > > As a result, shutting down an unbounded pipeline when the watermark reaches > positive infinity (in a production environment) will occur in practice > extremely rarely. I'm comfortable saying that the model says that such a > pipeline should terminate, and only support this in some runners. > > I'm also going to copy-paste why I think it's correct to shut down, from > earlier in the thread: > > "I think it's a fair claim that a PCollection is "done" when it's watermark > reaches positive infinity, and then it's easy to claim that a Pipeline is > "done" when all of its PCollections are done. Completion is an especially > reasonable claim if we consider positive infinity to be an actual infinity > - so long as allowed lateness is a finite value, elements that arrive > whenever a watermark is at positive infinity will be "infinitely" late, and > thus can be dropped by the runner." > > > On Wed, May 10, 2017 at 2:26 AM, Aljoscha Krettek <aljos...@apache.org> > wrote: > > > Hi, > > A bit of clarification, the Flink Runner does not terminate a Job when > the > > timeout is reached in waitUntilFinish(Duration). When we reach the > timeout > > we simply return and keep the job running. I thought that was the > expected > > behaviour. > > > > Regarding job termination, I think it’s easy to change the Flink Runner > to > > terminate if the watermark reaches +Inf. We would simply set running to > > false in the UnboundedSourceWrapper when the watermark reaches +Inf: [1] > > > > Best, > > Aljoscha > > > > [1] https://github.com/apache/beam/blob/cec71028ff63c7e1b1565c013ae0e3 > > 78608cb5f9/runners/flink/src/main/java/org/apache/beam/ > > runners/flink/translation/wrappers/streaming/io/ > > UnboundedSourceWrapper.java#L428-L428 <https://github.com/apache/ > > beam/blob/cec71028ff63c7e1b1565c013ae0e378608cb5f9/runners/flink/src/ > > main/java/org/apache/beam/runners/flink/translation/ > wrappers/streaming/io/ > > UnboundedSourceWrapper.java#L428-L428> > > > > > On 10. May 2017, at 10:58, Jean-Baptiste Onofré <j...@nanthrax.net> > wrote: > > > > > > OK, now I understand: you are talking about waitUntilFinish(), whereas > I > > was thinking about a simple run(). > > > > > > IMHO spark and flink sound like the most logic behavior for a streaming > > pipeline. > > > > > > Regards > > > JB > > > > > > On 05/10/2017 10:20 AM, Etienne Chauchot wrote: > > >> Hi everyone, > > >> > > >> I'm reopening this subject because, IMHO, it is important to unify > > pipelines > > >> termination semantics in the model. Here are the differences I have > > observed in > > >> streaming pipelines termination: > > >> > > >> - direct runner: when the output watermarks of all of its PCollections > > progress > > >> to +infinity > > >> > > >> - apex runner: when the output watermarks of all of its PCollections > > progress to > > >> +infinity > > >> > > >> - dataflow runner: when the output watermarks of all of its > PCollections > > >> progress to +infinity > > >> > > >> - spark runner: streaming pipelines do not terminate unless timeout is > > set in > > >> pipelineResult.waitUntilFinish() > > >> > > >> - flink runner: streaming pipelines do not terminate unless timeout is > > set in > > >> pipelineResult.waitUntilFinish() (thanks to Aljoscha for timeout > > support PR > > >> https://github.com/apache/beam/pull/2915#pullrequestreview-37090326) > > >> > > >> > > >> => Is the direct/apex/dataflow behavior the correct "beam model" > > behavior? > > >> > > >> > > >> I know that, at least for spark (mails in this thread), there is no > > easy way to > > >> know that we're done reading a source, so it might be very difficult > > (at least > > >> for this runner) to unify toward +infinity behavior if it is chosen as > > the > > >> standard behavior. > > >> > > >> Best > > >> > > >> Etienne > > >> > > >> Le 18/04/2017 à 12:05, Etienne Chauchot a écrit : > > >>> +1 on "runners really terminate in a timely manner to easily > > programmatically > > >>> orchestrate Beam pipelines in a portable way, you do need to know > > whether > > >>> the pipeline will finish without thinking about the specific runner > > and its > > >>> options" > > >>> > > >>> As an example, in Nexmark, we have streaming mode tests, and for the > > >>> benchmark, we need all the queries to behave the same between runners > > towards > > >>> termination. > > >>> > > >>> For now, to have the consistent behavior, in this mode we need to > set a > > >>> timeout (a bit random and flaky) on waitUntilFinish() for spark but > > this > > >>> timeout is not needed for direct runner. > > >>> > > >>> Etienne > > >>> > > >>> Le 02/03/2017 à 19:27, Kenneth Knowles a écrit : > > >>>> Isn't this already the case? I think semantically it is an > unavoidable > > >>>> conclusion, so certainly +1 to that. > > >>>> > > >>>> The DirectRunner and TestDataflowRunner both have this behavior > > already. > > >>>> I've always considered that a streaming job running forever is just > > [very] > > >>>> suboptimal shutdown latency :-) > > >>>> > > >>>> Some bits of the discussion on the ticket seem to surround whether > or > > how > > >>>> to communicate this property in a generic way. Since a runner owns > its > > >>>> PipelineResult it doesn't seem necessary. > > >>>> > > >>>> So is the bottom line just that you want to more strongly insist > that > > >>>> runners really terminate in a timely manner? I'm +1 to that, too, > for > > >>>> basically the reason Stas gives: In order to easily programmatically > > >>>> orchestrate Beam pipelines in a portable way, you do need to know > > whether > > >>>> the pipeline will finish without thinking about the specific runner > > and its > > >>>> options (as with our RunnableOnService tests). > > >>>> > > >>>> Kenn > > >>>> > > >>>> On Thu, Mar 2, 2017 at 9:09 AM, Dan Halperin > > <dhalp...@google.com.invalid> > > >>>> wrote: > > >>>> > > >>>>> Note that even "unbounded pipeline in a streaming > > runner".waitUntilFinish() > > >>>>> can return, e.g., if you cancel it or terminate it. It's totally > > reasonable > > >>>>> for users to want to understand and handle these cases. > > >>>>> > > >>>>> +1 > > >>>>> > > >>>>> Dan > > >>>>> > > >>>>> On Thu, Mar 2, 2017 at 2:53 AM, Jean-Baptiste Onofré < > > j...@nanthrax.net> > > >>>>> wrote: > > >>>>> > > >>>>>> +1 > > >>>>>> > > >>>>>> Good idea !! > > >>>>>> > > >>>>>> Regards > > >>>>>> JB > > >>>>>> > > >>>>>> > > >>>>>> On 03/02/2017 02:54 AM, Eugene Kirpichov wrote: > > >>>>>> > > >>>>>>> Raising this onto the mailing list from > > >>>>>>> https://issues.apache.org/jira/browse/BEAM-849 > > >>>>>>> > > >>>>>>> The issue came up: what does it mean for a pipeline to finish, in > > the > > >>>>> Beam > > >>>>>>> model? > > >>>>>>> > > >>>>>>> Note that I am deliberately not talking about "batch" and > > "streaming" > > >>>>>>> pipelines, because this distinction does not exist in the model. > > Several > > >>>>>>> runners have batch/streaming *modes*, which implement the same > > semantics > > >>>>>>> (potentially different subsets: in batch mode typically a runner > > will > > >>>>>>> reject pipelines that have at least one unbounded PCollection) > but > > in an > > >>>>>>> operationally different way. However we should define pipeline > > >>>>> termination > > >>>>>>> at the level of the unified model, and then make sure that all > > runners > > >>>>> in > > >>>>>>> all modes implement that properly. > > >>>>>>> > > >>>>>>> One natural way is to say "a pipeline terminates when the output > > >>>>>>> watermarks > > >>>>>>> of all of its PCollection's progress to +infinity". (Note: this > > can be > > >>>>>>> generalized, I guess, to having partial executions of a pipeline: > > if > > >>>>>>> you're > > >>>>>>> interested in the full contents of only some collections, then > you > > wait > > >>>>>>> until only the watermarks of those collections progress to > > infinity) > > >>>>>>> > > >>>>>>> A typical "batch" runner mode does not implement watermarks - we > > can > > >>>>> think > > >>>>>>> of it as assigning watermark -infinity to an output of a > transform > > that > > >>>>>>> hasn't started executing yet, and +infinity to output of a > > transform > > >>>>> that > > >>>>>>> has finished executing. This is consistent with how such runners > > >>>>> implement > > >>>>>>> termination in practice. > > >>>>>>> > > >>>>>>> Dataflow streaming runner additionally implements such > termination > > for > > >>>>>>> pipeline drain operation: it has 2 parts: 1) stop consuming input > > from > > >>>>> the > > >>>>>>> sources, and 2) wait until all watermarks progress to infinity. > > >>>>>>> > > >>>>>>> Let us fill the gap by making this part of the Beam model and > > declaring > > >>>>>>> that all runners should implement this behavior. This will give > > nice > > >>>>>>> properties, e.g.: > > >>>>>>> - A pipeline that has only bounded collections can be run by any > > runner > > >>>>> in > > >>>>>>> any mode, with the same results and termination behavior (this is > > >>>>> actually > > >>>>>>> my motivating example for raising this issue is: I was running > > >>>>> Splittable > > >>>>>>> DoFn tests > > >>>>>>> <https://github.com/apache/beam/blob/master/sdks/java/core/ > > >>>>>>> src/test/java/org/apache/beam/sdk/transforms/ > > SplittableDoFnTest.java> > > >>>>>>> with the streaming Dataflow runner - these tests produce only > > bounded > > >>>>>>> collections - and noticed that they wouldn't terminate even > though > > all > > >>>>>>> data > > >>>>>>> was processed) > > >>>>>>> - It will be possible to implement pipelines that stream data > for a > > >>>>> while > > >>>>>>> and then eventually successfully terminate based on some > condition. > > >>>>> E.g. a > > >>>>>>> pipeline that watches a continuously growing file until it is > > marked > > >>>>>>> read-only, or a pipeline that reads a Kafka topic partition until > > it > > >>>>>>> receives a "poison pill" message. This seems handy. > > >>>>>>> > > >>>>>>> > > >>>>>> -- > > >>>>>> Jean-Baptiste Onofré > > >>>>>> jbono...@apache.org > > >>>>>> http://blog.nanthrax.net > > >>>>>> Talend - http://www.talend.com > > >>>>>> > > >>> > > >> > > > > > > -- > > > Jean-Baptiste Onofré > > > jbono...@apache.org > > > http://blog.nanthrax.net > > > Talend - http://www.talend.com > > > > >