BEAM-593 is blocked by Flink issues: - https://issues.apache.org/jira/browse/FLINK-2313: Change Streaming Driver Execution Model - https://issues.apache.org/jira/browse/FLINK-4272: Create a JobClient for job control and monitoring
where the second is kind of a duplicate of the first one. There is a pending PR that is a bit dated but this week I’ll try and see if I can massage this into shape for Flink 1.3, which should happen in about 1.5 months. > On 18. Apr 2017, at 18:17, Ismaël Mejía <ieme...@gmail.com> wrote: > > +1 Having a unified termination semantics for all runners is super important. > > Stas or Aviem, is it feasible to do this for the Spark runner or the > timeout is due to a technical limitation of spark. > > Thomas Weise, Aljoscha anything to say on this? > > Aljoscha, what is the current status for the Flink runner. is there > any progress towards BEAM-593 ? > > > On Tue, Apr 18, 2017 at 5:05 PM, Stas Levin <stasle...@apache.org> wrote: >> Ted, the timeout is needed mostly for testing purposes. >> AFAIK there is no easy way to express the fact a source is "done" in a >> Spark native streaming application. >> Moreover, the Spark streaming "native" flow can either "awaitTermination()" >> or "awaitTerminationOrTimeout(...)". If you "awaitTermination" then you're >> blocked until the execution is either stopped or has failed, so if you wish >> to stop the app sooner, say after a certain period of time, >> "awaitTerminationOrTimeout(...)" may be the way to go. >> >> Using the unified approach discussed in this thread, when a source is >> "done" (i.e. the watermark is +Infinity) the app (e.g. runner) would >> gracefully stop. >> >> >> >> On Tue, Apr 18, 2017 at 3:19 PM Ted Yu <yuzhih...@gmail.com> wrote: >> >>> Why is the timeout needed for Spark ? >>> >>> Thanks >>> >>>> On Apr 18, 2017, at 3:05 AM, Etienne Chauchot <echauc...@gmail.com> >>> wrote: >>>> >>>> +1 on "runners really terminate in a timely manner to easily >>> programmatically orchestrate Beam pipelines in a portable way, you do need >>> to know whether >>>> the pipeline will finish without thinking about the specific runner and >>> its options" >>>> >>>> As an example, in Nexmark, we have streaming mode tests, and for the >>> benchmark, we need all the queries to behave the same between runners >>> towards termination. >>>> >>>> For now, to have the consistent behavior, in this mode we need to set a >>> timeout (a bit random and flaky) on waitUntilFinish() for spark but this >>> timeout is not needed for direct runner. >>>> >>>> Etienne >>>> >>>>> Le 02/03/2017 à 19:27, Kenneth Knowles a écrit : >>>>> Isn't this already the case? I think semantically it is an unavoidable >>>>> conclusion, so certainly +1 to that. >>>>> >>>>> The DirectRunner and TestDataflowRunner both have this behavior already. >>>>> I've always considered that a streaming job running forever is just >>> [very] >>>>> suboptimal shutdown latency :-) >>>>> >>>>> Some bits of the discussion on the ticket seem to surround whether or >>> how >>>>> to communicate this property in a generic way. Since a runner owns its >>>>> PipelineResult it doesn't seem necessary. >>>>> >>>>> So is the bottom line just that you want to more strongly insist that >>>>> runners really terminate in a timely manner? I'm +1 to that, too, for >>>>> basically the reason Stas gives: In order to easily programmatically >>>>> orchestrate Beam pipelines in a portable way, you do need to know >>> whether >>>>> the pipeline will finish without thinking about the specific runner and >>> its >>>>> options (as with our RunnableOnService tests). >>>>> >>>>> Kenn >>>>> >>>>> On Thu, Mar 2, 2017 at 9:09 AM, Dan Halperin >>> <dhalp...@google.com.invalid> >>>>> wrote: >>>>> >>>>>> Note that even "unbounded pipeline in a streaming >>> runner".waitUntilFinish() >>>>>> can return, e.g., if you cancel it or terminate it. It's totally >>> reasonable >>>>>> for users to want to understand and handle these cases. >>>>>> >>>>>> +1 >>>>>> >>>>>> Dan >>>>>> >>>>>> On Thu, Mar 2, 2017 at 2:53 AM, Jean-Baptiste Onofré <j...@nanthrax.net> >>>>>> wrote: >>>>>> >>>>>>> +1 >>>>>>> >>>>>>> Good idea !! >>>>>>> >>>>>>> Regards >>>>>>> JB >>>>>>> >>>>>>> >>>>>>>> On 03/02/2017 02:54 AM, Eugene Kirpichov wrote: >>>>>>>> >>>>>>>> Raising this onto the mailing list from >>>>>>>> https://issues.apache.org/jira/browse/BEAM-849 >>>>>>>> >>>>>>>> The issue came up: what does it mean for a pipeline to finish, in the >>>>>> Beam >>>>>>>> model? >>>>>>>> >>>>>>>> Note that I am deliberately not talking about "batch" and "streaming" >>>>>>>> pipelines, because this distinction does not exist in the model. >>> Several >>>>>>>> runners have batch/streaming *modes*, which implement the same >>> semantics >>>>>>>> (potentially different subsets: in batch mode typically a runner will >>>>>>>> reject pipelines that have at least one unbounded PCollection) but >>> in an >>>>>>>> operationally different way. However we should define pipeline >>>>>> termination >>>>>>>> at the level of the unified model, and then make sure that all >>> runners >>>>>> in >>>>>>>> all modes implement that properly. >>>>>>>> >>>>>>>> One natural way is to say "a pipeline terminates when the output >>>>>>>> watermarks >>>>>>>> of all of its PCollection's progress to +infinity". (Note: this can >>> be >>>>>>>> generalized, I guess, to having partial executions of a pipeline: if >>>>>>>> you're >>>>>>>> interested in the full contents of only some collections, then you >>> wait >>>>>>>> until only the watermarks of those collections progress to infinity) >>>>>>>> >>>>>>>> A typical "batch" runner mode does not implement watermarks - we can >>>>>> think >>>>>>>> of it as assigning watermark -infinity to an output of a transform >>> that >>>>>>>> hasn't started executing yet, and +infinity to output of a transform >>>>>> that >>>>>>>> has finished executing. This is consistent with how such runners >>>>>> implement >>>>>>>> termination in practice. >>>>>>>> >>>>>>>> Dataflow streaming runner additionally implements such termination >>> for >>>>>>>> pipeline drain operation: it has 2 parts: 1) stop consuming input >>> from >>>>>> the >>>>>>>> sources, and 2) wait until all watermarks progress to infinity. >>>>>>>> >>>>>>>> Let us fill the gap by making this part of the Beam model and >>> declaring >>>>>>>> that all runners should implement this behavior. This will give nice >>>>>>>> properties, e.g.: >>>>>>>> - A pipeline that has only bounded collections can be run by any >>> runner >>>>>> in >>>>>>>> any mode, with the same results and termination behavior (this is >>>>>> actually >>>>>>>> my motivating example for raising this issue is: I was running >>>>>> Splittable >>>>>>>> DoFn tests >>>>>>>> <https://github.com/apache/beam/blob/master/sdks/java/core/ >>>>>>>> src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java> >>>>>>>> with the streaming Dataflow runner - these tests produce only bounded >>>>>>>> collections - and noticed that they wouldn't terminate even though >>> all >>>>>>>> data >>>>>>>> was processed) >>>>>>>> - It will be possible to implement pipelines that stream data for a >>>>>> while >>>>>>>> and then eventually successfully terminate based on some condition. >>>>>> E.g. a >>>>>>>> pipeline that watches a continuously growing file until it is marked >>>>>>>> read-only, or a pipeline that reads a Kafka topic partition until it >>>>>>>> receives a "poison pill" message. This seems handy. >>>>>>> -- >>>>>>> Jean-Baptiste Onofré >>>>>>> jbono...@apache.org >>>>>>> http://blog.nanthrax.net >>>>>>> Talend - http://www.talend.com >>>> >>>