@Thomas, @Aljoscha, @Kenneth, @JB, I was indeed talking about whether or
not waitUntilFinish(duration) is blocking on an unbounded-but-finite
collection.
Thanks for your comments, they make a lot of sense.
Etienne
Le 10/05/2017 à 19:15, Kenneth Knowles a écrit :
+1 to what Thomas said: At +infinity all data is droppable so,
philosophically, leaving the pipeline up is just burning CPU. Since
TestStream is a ways off for most runners, we should make sure we have
tests for other sorts of unbounded-but-finite collections to track which
runners this works on.
+1 to Aljoscha: That behavior of waitToFinish(Duration) is correct. It is
expected that run() is mostly asynchronous and waitToFinish() is blocking.
Of course, the amount of work needed to get a pipeline to a detachable
state in run() varies from runner to runner.
On Wed, May 10, 2017 at 8:49 AM, Thomas Groh <[email protected]>
wrote:
I think that generally this is actually less of a big deal than suggested,
for a pretty simple reason:
All bounded pipelines are expected to terminate when they are complete.
Almost all unbounded pipelines are expected to run until explicitly shut
down.
As a result, shutting down an unbounded pipeline when the watermark reaches
positive infinity (in a production environment) will occur in practice
extremely rarely. I'm comfortable saying that the model says that such a
pipeline should terminate, and only support this in some runners.
I'm also going to copy-paste why I think it's correct to shut down, from
earlier in the thread:
"I think it's a fair claim that a PCollection is "done" when it's watermark
reaches positive infinity, and then it's easy to claim that a Pipeline is
"done" when all of its PCollections are done. Completion is an especially
reasonable claim if we consider positive infinity to be an actual infinity
- so long as allowed lateness is a finite value, elements that arrive
whenever a watermark is at positive infinity will be "infinitely" late, and
thus can be dropped by the runner."
On Wed, May 10, 2017 at 2:26 AM, Aljoscha Krettek <[email protected]>
wrote:
Hi,
A bit of clarification, the Flink Runner does not terminate a Job when
the
timeout is reached in waitUntilFinish(Duration). When we reach the
timeout
we simply return and keep the job running. I thought that was the
expected
behaviour.
Regarding job termination, I think it’s easy to change the Flink Runner
to
terminate if the watermark reaches +Inf. We would simply set running to
false in the UnboundedSourceWrapper when the watermark reaches +Inf: [1]
Best,
Aljoscha
[1] https://github.com/apache/beam/blob/cec71028ff63c7e1b1565c013ae0e3
78608cb5f9/runners/flink/src/main/java/org/apache/beam/
runners/flink/translation/wrappers/streaming/io/
UnboundedSourceWrapper.java#L428-L428 <https://github.com/apache/
beam/blob/cec71028ff63c7e1b1565c013ae0e378608cb5f9/runners/flink/src/
main/java/org/apache/beam/runners/flink/translation/
wrappers/streaming/io/
UnboundedSourceWrapper.java#L428-L428>
On 10. May 2017, at 10:58, Jean-Baptiste Onofré <[email protected]>
wrote:
OK, now I understand: you are talking about waitUntilFinish(), whereas
I
was thinking about a simple run().
IMHO spark and flink sound like the most logic behavior for a streaming
pipeline.
Regards
JB
On 05/10/2017 10:20 AM, Etienne Chauchot wrote:
Hi everyone,
I'm reopening this subject because, IMHO, it is important to unify
pipelines
termination semantics in the model. Here are the differences I have
observed in
streaming pipelines termination:
- direct runner: when the output watermarks of all of its PCollections
progress
to +infinity
- apex runner: when the output watermarks of all of its PCollections
progress to
+infinity
- dataflow runner: when the output watermarks of all of its
PCollections
progress to +infinity
- spark runner: streaming pipelines do not terminate unless timeout is
set in
pipelineResult.waitUntilFinish()
- flink runner: streaming pipelines do not terminate unless timeout is
set in
pipelineResult.waitUntilFinish() (thanks to Aljoscha for timeout
support PR
https://github.com/apache/beam/pull/2915#pullrequestreview-37090326)
=> Is the direct/apex/dataflow behavior the correct "beam model"
behavior?
I know that, at least for spark (mails in this thread), there is no
easy way to
know that we're done reading a source, so it might be very difficult
(at least
for this runner) to unify toward +infinity behavior if it is chosen as
the
standard behavior.
Best
Etienne
Le 18/04/2017 à 12:05, Etienne Chauchot a écrit :
+1 on "runners really terminate in a timely manner to easily
programmatically
orchestrate Beam pipelines in a portable way, you do need to know
whether
the pipeline will finish without thinking about the specific runner
and its
options"
As an example, in Nexmark, we have streaming mode tests, and for the
benchmark, we need all the queries to behave the same between runners
towards
termination.
For now, to have the consistent behavior, in this mode we need to
set a
timeout (a bit random and flaky) on waitUntilFinish() for spark but
this
timeout is not needed for direct runner.
Etienne
Le 02/03/2017 à 19:27, Kenneth Knowles a écrit :
Isn't this already the case? I think semantically it is an
unavoidable
conclusion, so certainly +1 to that.
The DirectRunner and TestDataflowRunner both have this behavior
already.
I've always considered that a streaming job running forever is just
[very]
suboptimal shutdown latency :-)
Some bits of the discussion on the ticket seem to surround whether
or
how
to communicate this property in a generic way. Since a runner owns
its
PipelineResult it doesn't seem necessary.
So is the bottom line just that you want to more strongly insist
that
runners really terminate in a timely manner? I'm +1 to that, too,
for
basically the reason Stas gives: In order to easily programmatically
orchestrate Beam pipelines in a portable way, you do need to know
whether
the pipeline will finish without thinking about the specific runner
and its
options (as with our RunnableOnService tests).
Kenn
On Thu, Mar 2, 2017 at 9:09 AM, Dan Halperin
<[email protected]>
wrote:
Note that even "unbounded pipeline in a streaming
runner".waitUntilFinish()
can return, e.g., if you cancel it or terminate it. It's totally
reasonable
for users to want to understand and handle these cases.
+1
Dan
On Thu, Mar 2, 2017 at 2:53 AM, Jean-Baptiste Onofré <
[email protected]>
wrote:
+1
Good idea !!
Regards
JB
On 03/02/2017 02:54 AM, Eugene Kirpichov wrote:
Raising this onto the mailing list from
https://issues.apache.org/jira/browse/BEAM-849
The issue came up: what does it mean for a pipeline to finish, in
the
Beam
model?
Note that I am deliberately not talking about "batch" and
"streaming"
pipelines, because this distinction does not exist in the model.
Several
runners have batch/streaming *modes*, which implement the same
semantics
(potentially different subsets: in batch mode typically a runner
will
reject pipelines that have at least one unbounded PCollection)
but
in an
operationally different way. However we should define pipeline
termination
at the level of the unified model, and then make sure that all
runners
in
all modes implement that properly.
One natural way is to say "a pipeline terminates when the output
watermarks
of all of its PCollection's progress to +infinity". (Note: this
can be
generalized, I guess, to having partial executions of a pipeline:
if
you're
interested in the full contents of only some collections, then
you
wait
until only the watermarks of those collections progress to
infinity)
A typical "batch" runner mode does not implement watermarks - we
can
think
of it as assigning watermark -infinity to an output of a
transform
that
hasn't started executing yet, and +infinity to output of a
transform
that
has finished executing. This is consistent with how such runners
implement
termination in practice.
Dataflow streaming runner additionally implements such
termination
for
pipeline drain operation: it has 2 parts: 1) stop consuming input
from
the
sources, and 2) wait until all watermarks progress to infinity.
Let us fill the gap by making this part of the Beam model and
declaring
that all runners should implement this behavior. This will give
nice
properties, e.g.:
- A pipeline that has only bounded collections can be run by any
runner
in
any mode, with the same results and termination behavior (this is
actually
my motivating example for raising this issue is: I was running
Splittable
DoFn tests
<https://github.com/apache/beam/blob/master/sdks/java/core/
src/test/java/org/apache/beam/sdk/transforms/
SplittableDoFnTest.java>
with the streaming Dataflow runner - these tests produce only
bounded
collections - and noticed that they wouldn't terminate even
though
all
data
was processed)
- It will be possible to implement pipelines that stream data
for a
while
and then eventually successfully terminate based on some
condition.
E.g. a
pipeline that watches a continuously growing file until it is
marked
read-only, or a pipeline that reads a Kafka topic partition until
it
receives a "poison pill" message. This seems handy.
--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com
--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com