+1 To help integrate this we can start by adding `ValidatesRunner` tests with a new category and run it only with runners which adhere to the rules mentioned, and eventually in all runners.
On Fri, Mar 3, 2017 at 12:46 AM Amit Sela <amitsel...@gmail.com> wrote: > +1 on Eugene's words - this shows how batch is conceptually a subset of a > streaming problem. > I also believe that Stas has a very good point on education - we have to > try and understand developer's current perspective and try to make the > transition to the Beam model as natural as possible for new users. > In addition to good documentation and examples, I think that > https://issues.apache.org/jira/browse/BEAM-849 is critical, as this is the > user's end-point to the behaviours discussed here, and so it should be: > * clear and concise - pipeline state at any point should be informative. > * well documented - documentation, examples, and use-cases (e.g., Eugene's > "poison pill"). > * strict API for runners - joining Stas' not on unified implementation for > portability. > > On Thu, Mar 2, 2017 at 8:49 PM Eugene Kirpichov > <kirpic...@google.com.invalid> wrote: > > > OK, I'm glad everybody is in agreement on this. I raised this point > because > > we've been discussing implementing this behavior in the Dataflow > streaming > > runner, and I wanted to make sure that people are okay with it from a > > conceptual point of view before proceeding. > > > > On Thu, Mar 2, 2017 at 10:27 AM Kenneth Knowles <k...@google.com.invalid> > > wrote: > > > > Isn't this already the case? I think semantically it is an unavoidable > > conclusion, so certainly +1 to that. > > > > The DirectRunner and TestDataflowRunner both have this behavior already. > > I've always considered that a streaming job running forever is just > [very] > > suboptimal shutdown latency :-) > > > > Some bits of the discussion on the ticket seem to surround whether or how > > to communicate this property in a generic way. Since a runner owns its > > PipelineResult it doesn't seem necessary. > > > > So is the bottom line just that you want to more strongly insist that > > runners really terminate in a timely manner? I'm +1 to that, too, for > > basically the reason Stas gives: In order to easily programmatically > > orchestrate Beam pipelines in a portable way, you do need to know whether > > the pipeline will finish without thinking about the specific runner and > its > > options (as with our RunnableOnService tests). > > > > Kenn > > > > On Thu, Mar 2, 2017 at 9:09 AM, Dan Halperin <dhalp...@google.com.invalid > > > > wrote: > > > > > Note that even "unbounded pipeline in a streaming > > runner".waitUntilFinish() > > > can return, e.g., if you cancel it or terminate it. It's totally > > reasonable > > > for users to want to understand and handle these cases. > > > > > > +1 > > > > > > Dan > > > > > > On Thu, Mar 2, 2017 at 2:53 AM, Jean-Baptiste Onofré <j...@nanthrax.net> > > > wrote: > > > > > > > +1 > > > > > > > > Good idea !! > > > > > > > > Regards > > > > JB > > > > > > > > > > > > On 03/02/2017 02:54 AM, Eugene Kirpichov wrote: > > > > > > > >> Raising this onto the mailing list from > > > >> https://issues.apache.org/jira/browse/BEAM-849 > > > >> > > > >> The issue came up: what does it mean for a pipeline to finish, in > the > > > Beam > > > >> model? > > > >> > > > >> Note that I am deliberately not talking about "batch" and > "streaming" > > > >> pipelines, because this distinction does not exist in the model. > > Several > > > >> runners have batch/streaming *modes*, which implement the same > > semantics > > > >> (potentially different subsets: in batch mode typically a runner > will > > > >> reject pipelines that have at least one unbounded PCollection) but > in > > an > > > >> operationally different way. However we should define pipeline > > > termination > > > >> at the level of the unified model, and then make sure that all > runners > > > in > > > >> all modes implement that properly. > > > >> > > > >> One natural way is to say "a pipeline terminates when the output > > > >> watermarks > > > >> of all of its PCollection's progress to +infinity". (Note: this can > be > > > >> generalized, I guess, to having partial executions of a pipeline: if > > > >> you're > > > >> interested in the full contents of only some collections, then you > > wait > > > >> until only the watermarks of those collections progress to infinity) > > > >> > > > >> A typical "batch" runner mode does not implement watermarks - we can > > > think > > > >> of it as assigning watermark -infinity to an output of a transform > > that > > > >> hasn't started executing yet, and +infinity to output of a transform > > > that > > > >> has finished executing. This is consistent with how such runners > > > implement > > > >> termination in practice. > > > >> > > > >> Dataflow streaming runner additionally implements such termination > for > > > >> pipeline drain operation: it has 2 parts: 1) stop consuming input > from > > > the > > > >> sources, and 2) wait until all watermarks progress to infinity. > > > >> > > > >> Let us fill the gap by making this part of the Beam model and > > declaring > > > >> that all runners should implement this behavior. This will give nice > > > >> properties, e.g.: > > > >> - A pipeline that has only bounded collections can be run by any > > runner > > > in > > > >> any mode, with the same results and termination behavior (this is > > > actually > > > >> my motivating example for raising this issue is: I was running > > > Splittable > > > >> DoFn tests > > > >> <https://github.com/apache/beam/blob/master/sdks/java/core/ > > > >> > src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java> > > > >> with the streaming Dataflow runner - these tests produce only > bounded > > > >> collections - and noticed that they wouldn't terminate even though > all > > > >> data > > > >> was processed) > > > >> - It will be possible to implement pipelines that stream data for a > > > while > > > >> and then eventually successfully terminate based on some condition. > > > E.g. a > > > >> pipeline that watches a continuously growing file until it is marked > > > >> read-only, or a pipeline that reads a Kafka topic partition until it > > > >> receives a "poison pill" message. This seems handy. > > > >> > > > >> > > > > -- > > > > Jean-Baptiste Onofré > > > > jbono...@apache.org > > > > http://blog.nanthrax.net > > > > Talend - http://www.talend.com > > > > > > > > > >