Hi everyone,
I'm reopening this subject because, IMHO, it is important to unify
pipelines termination semantics in the model. Here are the differences I
have observed in streaming pipelines termination:
- direct runner: when the output watermarks of all of its PCollections
progress to +infinity
- apex runner: when the output watermarks of all of its PCollections
progress to +infinity
- dataflow runner: when the output watermarks of all of its PCollections
progress to +infinity
- spark runner: streaming pipelines do not terminate unless timeout is
set in pipelineResult.waitUntilFinish()
- flink runner: streaming pipelines do not terminate unless timeout is
set in pipelineResult.waitUntilFinish() (thanks to Aljoscha for timeout
support PR
https://github.com/apache/beam/pull/2915#pullrequestreview-37090326)
=> Is the direct/apex/dataflow behavior the correct "beam model" behavior?
I know that, at least for spark (mails in this thread), there is no easy
way to know that we're done reading a source, so it might be very
difficult (at least for this runner) to unify toward +infinity behavior
if it is chosen as the standard behavior.
Best
Etienne
Le 18/04/2017 à 12:05, Etienne Chauchot a écrit :
+1 on "runners really terminate in a timely manner to easily
programmatically orchestrate Beam pipelines in a portable way, you do
need to know whether
the pipeline will finish without thinking about the specific runner
and its options"
As an example, in Nexmark, we have streaming mode tests, and for the
benchmark, we need all the queries to behave the same between runners
towards termination.
For now, to have the consistent behavior, in this mode we need to set
a timeout (a bit random and flaky) on waitUntilFinish() for spark but
this timeout is not needed for direct runner.
Etienne
Le 02/03/2017 à 19:27, Kenneth Knowles a écrit :
Isn't this already the case? I think semantically it is an unavoidable
conclusion, so certainly +1 to that.
The DirectRunner and TestDataflowRunner both have this behavior already.
I've always considered that a streaming job running forever is just
[very]
suboptimal shutdown latency :-)
Some bits of the discussion on the ticket seem to surround whether or
how
to communicate this property in a generic way. Since a runner owns its
PipelineResult it doesn't seem necessary.
So is the bottom line just that you want to more strongly insist that
runners really terminate in a timely manner? I'm +1 to that, too, for
basically the reason Stas gives: In order to easily programmatically
orchestrate Beam pipelines in a portable way, you do need to know
whether
the pipeline will finish without thinking about the specific runner
and its
options (as with our RunnableOnService tests).
Kenn
On Thu, Mar 2, 2017 at 9:09 AM, Dan Halperin
<dhalp...@google.com.invalid>
wrote:
Note that even "unbounded pipeline in a streaming
runner".waitUntilFinish()
can return, e.g., if you cancel it or terminate it. It's totally
reasonable
for users to want to understand and handle these cases.
+1
Dan
On Thu, Mar 2, 2017 at 2:53 AM, Jean-Baptiste Onofré <j...@nanthrax.net>
wrote:
+1
Good idea !!
Regards
JB
On 03/02/2017 02:54 AM, Eugene Kirpichov wrote:
Raising this onto the mailing list from
https://issues.apache.org/jira/browse/BEAM-849
The issue came up: what does it mean for a pipeline to finish, in the
Beam
model?
Note that I am deliberately not talking about "batch" and "streaming"
pipelines, because this distinction does not exist in the model.
Several
runners have batch/streaming *modes*, which implement the same
semantics
(potentially different subsets: in batch mode typically a runner will
reject pipelines that have at least one unbounded PCollection) but
in an
operationally different way. However we should define pipeline
termination
at the level of the unified model, and then make sure that all
runners
in
all modes implement that properly.
One natural way is to say "a pipeline terminates when the output
watermarks
of all of its PCollection's progress to +infinity". (Note: this
can be
generalized, I guess, to having partial executions of a pipeline: if
you're
interested in the full contents of only some collections, then you
wait
until only the watermarks of those collections progress to infinity)
A typical "batch" runner mode does not implement watermarks - we can
think
of it as assigning watermark -infinity to an output of a transform
that
hasn't started executing yet, and +infinity to output of a transform
that
has finished executing. This is consistent with how such runners
implement
termination in practice.
Dataflow streaming runner additionally implements such termination
for
pipeline drain operation: it has 2 parts: 1) stop consuming input
from
the
sources, and 2) wait until all watermarks progress to infinity.
Let us fill the gap by making this part of the Beam model and
declaring
that all runners should implement this behavior. This will give nice
properties, e.g.:
- A pipeline that has only bounded collections can be run by any
runner
in
any mode, with the same results and termination behavior (this is
actually
my motivating example for raising this issue is: I was running
Splittable
DoFn tests
<https://github.com/apache/beam/blob/master/sdks/java/core/
src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java>
with the streaming Dataflow runner - these tests produce only bounded
collections - and noticed that they wouldn't terminate even though
all
data
was processed)
- It will be possible to implement pipelines that stream data for a
while
and then eventually successfully terminate based on some condition.
E.g. a
pipeline that watches a continuously growing file until it is marked
read-only, or a pipeline that reads a Kafka topic partition until it
receives a "poison pill" message. This seems handy.
--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com