+1 on "runners really terminate in a timely manner to easily programmatically orchestrate Beam pipelines in a portable way, you do need to know whether the pipeline will finish without thinking about the specific runner and its options"

As an example, in Nexmark, we have streaming mode tests, and for the benchmark, we need all the queries to behave the same between runners towards termination.

For now, to have the consistent behavior, in this mode we need to set a timeout (a bit random and flaky) on waitUntilFinish() for spark but this timeout is not needed for direct runner.

Etienne

Le 02/03/2017 à 19:27, Kenneth Knowles a écrit :
Isn't this already the case? I think semantically it is an unavoidable
conclusion, so certainly +1 to that.

The DirectRunner and TestDataflowRunner both have this behavior already.
I've always considered that a streaming job running forever is just [very]
suboptimal shutdown latency :-)

Some bits of the discussion on the ticket seem to surround whether or how
to communicate this property in a generic way. Since a runner owns its
PipelineResult it doesn't seem necessary.

So is the bottom line just that you want to more strongly insist that
runners really terminate in a timely manner? I'm +1 to that, too, for
basically the reason Stas gives: In order to easily programmatically
orchestrate Beam pipelines in a portable way, you do need to know whether
the pipeline will finish without thinking about the specific runner and its
options (as with our RunnableOnService tests).

Kenn

On Thu, Mar 2, 2017 at 9:09 AM, Dan Halperin <dhalp...@google.com.invalid>
wrote:

Note that even "unbounded pipeline in a streaming runner".waitUntilFinish()
can return, e.g., if you cancel it or terminate it. It's totally reasonable
for users to want to understand and handle these cases.

+1

Dan

On Thu, Mar 2, 2017 at 2:53 AM, Jean-Baptiste Onofré <j...@nanthrax.net>
wrote:

+1

Good idea !!

Regards
JB


On 03/02/2017 02:54 AM, Eugene Kirpichov wrote:

Raising this onto the mailing list from
https://issues.apache.org/jira/browse/BEAM-849

The issue came up: what does it mean for a pipeline to finish, in the
Beam
model?

Note that I am deliberately not talking about "batch" and "streaming"
pipelines, because this distinction does not exist in the model. Several
runners have batch/streaming *modes*, which implement the same semantics
(potentially different subsets: in batch mode typically a runner will
reject pipelines that have at least one unbounded PCollection) but in an
operationally different way. However we should define pipeline
termination
at the level of the unified model, and then make sure that all runners
in
all modes implement that properly.

One natural way is to say "a pipeline terminates when the output
watermarks
of all of its PCollection's progress to +infinity". (Note: this can be
generalized, I guess, to having partial executions of a pipeline: if
you're
interested in the full contents of only some collections, then you wait
until only the watermarks of those collections progress to infinity)

A typical "batch" runner mode does not implement watermarks - we can
think
of it as assigning watermark -infinity to an output of a transform that
hasn't started executing yet, and +infinity to output of a transform
that
has finished executing. This is consistent with how such runners
implement
termination in practice.

Dataflow streaming runner additionally implements such termination for
pipeline drain operation: it has 2 parts: 1) stop consuming input from
the
sources, and 2) wait until all watermarks progress to infinity.

Let us fill the gap by making this part of the Beam model and declaring
that all runners should implement this behavior. This will give nice
properties, e.g.:
- A pipeline that has only bounded collections can be run by any runner
in
any mode, with the same results and termination behavior (this is
actually
my motivating example for raising this issue is: I was running
Splittable
DoFn tests
<https://github.com/apache/beam/blob/master/sdks/java/core/
src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java>
with the streaming Dataflow runner - these tests produce only bounded
collections - and noticed that they wouldn't terminate even though all
data
was processed)
- It will be possible to implement pipelines that stream data for a
while
and then eventually successfully terminate based on some condition.
E.g. a
pipeline that watches a continuously growing file until it is marked
read-only, or a pipeline that reads a Kafka topic partition until it
receives a "poison pill" message. This seems handy.


--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


Reply via email to