BEAM-593 is blocked by Flink issues:

- https://issues.apache.org/jira/browse/FLINK-2313: Change Streaming Driver 
Execution Model
- https://issues.apache.org/jira/browse/FLINK-4272: Create a JobClient for job 
control and monitoring

where the second is kind of a duplicate of the first one.

There is a pending PR that is a bit dated but this week I’ll try and see if I 
can massage this into shape for Flink 1.3, which should happen in about 1.5 
months.

> On 18. Apr 2017, at 18:17, Ismaël Mejía <ieme...@gmail.com> wrote:
> 
> +1 Having a unified termination semantics for all runners is super important.
> 
> Stas or Aviem, is it feasible to do this for the Spark runner or the
> timeout is due to a technical limitation of spark.
> 
> Thomas Weise, Aljoscha anything to say on this?
> 
> Aljoscha, what is the current status for the Flink runner. is there
> any progress towards BEAM-593 ?
> 
> 
> On Tue, Apr 18, 2017 at 5:05 PM, Stas Levin <stasle...@apache.org> wrote:
>> Ted, the timeout is needed mostly for testing purposes.
>> AFAIK there is no easy way to express the fact a source is "done" in a
>> Spark native streaming application.
>> Moreover, the Spark streaming "native" flow can either "awaitTermination()"
>> or "awaitTerminationOrTimeout(...)". If you "awaitTermination" then you're
>> blocked until the execution is either stopped or has failed, so if you wish
>> to stop the app sooner, say after a certain period of time,
>> "awaitTerminationOrTimeout(...)" may be the way to go.
>> 
>> Using the unified approach discussed in this thread, when a source is
>> "done" (i.e. the watermark is +Infinity) the app (e.g. runner) would
>> gracefully stop.
>> 
>> 
>> 
>> On Tue, Apr 18, 2017 at 3:19 PM Ted Yu <yuzhih...@gmail.com> wrote:
>> 
>>> Why is the timeout needed for Spark ?
>>> 
>>> Thanks
>>> 
>>>> On Apr 18, 2017, at 3:05 AM, Etienne Chauchot <echauc...@gmail.com>
>>> wrote:
>>>> 
>>>> +1 on "runners really terminate in a timely manner to easily
>>> programmatically orchestrate Beam pipelines in a portable way, you do need
>>> to know whether
>>>> the pipeline will finish without thinking about the specific runner and
>>> its options"
>>>> 
>>>> As an example, in Nexmark, we have streaming mode tests, and for the
>>> benchmark, we need all the queries to behave the same between runners
>>> towards termination.
>>>> 
>>>> For now, to have the consistent behavior, in this mode we need to set a
>>> timeout (a bit random and flaky) on waitUntilFinish() for spark but this
>>> timeout is not needed for direct runner.
>>>> 
>>>> Etienne
>>>> 
>>>>> Le 02/03/2017 à 19:27, Kenneth Knowles a écrit :
>>>>> Isn't this already the case? I think semantically it is an unavoidable
>>>>> conclusion, so certainly +1 to that.
>>>>> 
>>>>> The DirectRunner and TestDataflowRunner both have this behavior already.
>>>>> I've always considered that a streaming job running forever is just
>>> [very]
>>>>> suboptimal shutdown latency :-)
>>>>> 
>>>>> Some bits of the discussion on the ticket seem to surround whether or
>>> how
>>>>> to communicate this property in a generic way. Since a runner owns its
>>>>> PipelineResult it doesn't seem necessary.
>>>>> 
>>>>> So is the bottom line just that you want to more strongly insist that
>>>>> runners really terminate in a timely manner? I'm +1 to that, too, for
>>>>> basically the reason Stas gives: In order to easily programmatically
>>>>> orchestrate Beam pipelines in a portable way, you do need to know
>>> whether
>>>>> the pipeline will finish without thinking about the specific runner and
>>> its
>>>>> options (as with our RunnableOnService tests).
>>>>> 
>>>>> Kenn
>>>>> 
>>>>> On Thu, Mar 2, 2017 at 9:09 AM, Dan Halperin
>>> <dhalp...@google.com.invalid>
>>>>> wrote:
>>>>> 
>>>>>> Note that even "unbounded pipeline in a streaming
>>> runner".waitUntilFinish()
>>>>>> can return, e.g., if you cancel it or terminate it. It's totally
>>> reasonable
>>>>>> for users to want to understand and handle these cases.
>>>>>> 
>>>>>> +1
>>>>>> 
>>>>>> Dan
>>>>>> 
>>>>>> On Thu, Mar 2, 2017 at 2:53 AM, Jean-Baptiste Onofré <j...@nanthrax.net>
>>>>>> wrote:
>>>>>> 
>>>>>>> +1
>>>>>>> 
>>>>>>> Good idea !!
>>>>>>> 
>>>>>>> Regards
>>>>>>> JB
>>>>>>> 
>>>>>>> 
>>>>>>>> On 03/02/2017 02:54 AM, Eugene Kirpichov wrote:
>>>>>>>> 
>>>>>>>> Raising this onto the mailing list from
>>>>>>>> https://issues.apache.org/jira/browse/BEAM-849
>>>>>>>> 
>>>>>>>> The issue came up: what does it mean for a pipeline to finish, in the
>>>>>> Beam
>>>>>>>> model?
>>>>>>>> 
>>>>>>>> Note that I am deliberately not talking about "batch" and "streaming"
>>>>>>>> pipelines, because this distinction does not exist in the model.
>>> Several
>>>>>>>> runners have batch/streaming *modes*, which implement the same
>>> semantics
>>>>>>>> (potentially different subsets: in batch mode typically a runner will
>>>>>>>> reject pipelines that have at least one unbounded PCollection) but
>>> in an
>>>>>>>> operationally different way. However we should define pipeline
>>>>>> termination
>>>>>>>> at the level of the unified model, and then make sure that all
>>> runners
>>>>>> in
>>>>>>>> all modes implement that properly.
>>>>>>>> 
>>>>>>>> One natural way is to say "a pipeline terminates when the output
>>>>>>>> watermarks
>>>>>>>> of all of its PCollection's progress to +infinity". (Note: this can
>>> be
>>>>>>>> generalized, I guess, to having partial executions of a pipeline: if
>>>>>>>> you're
>>>>>>>> interested in the full contents of only some collections, then you
>>> wait
>>>>>>>> until only the watermarks of those collections progress to infinity)
>>>>>>>> 
>>>>>>>> A typical "batch" runner mode does not implement watermarks - we can
>>>>>> think
>>>>>>>> of it as assigning watermark -infinity to an output of a transform
>>> that
>>>>>>>> hasn't started executing yet, and +infinity to output of a transform
>>>>>> that
>>>>>>>> has finished executing. This is consistent with how such runners
>>>>>> implement
>>>>>>>> termination in practice.
>>>>>>>> 
>>>>>>>> Dataflow streaming runner additionally implements such termination
>>> for
>>>>>>>> pipeline drain operation: it has 2 parts: 1) stop consuming input
>>> from
>>>>>> the
>>>>>>>> sources, and 2) wait until all watermarks progress to infinity.
>>>>>>>> 
>>>>>>>> Let us fill the gap by making this part of the Beam model and
>>> declaring
>>>>>>>> that all runners should implement this behavior. This will give nice
>>>>>>>> properties, e.g.:
>>>>>>>> - A pipeline that has only bounded collections can be run by any
>>> runner
>>>>>> in
>>>>>>>> any mode, with the same results and termination behavior (this is
>>>>>> actually
>>>>>>>> my motivating example for raising this issue is: I was running
>>>>>> Splittable
>>>>>>>> DoFn tests
>>>>>>>> <https://github.com/apache/beam/blob/master/sdks/java/core/
>>>>>>>> src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java>
>>>>>>>> with the streaming Dataflow runner - these tests produce only bounded
>>>>>>>> collections - and noticed that they wouldn't terminate even though
>>> all
>>>>>>>> data
>>>>>>>> was processed)
>>>>>>>> - It will be possible to implement pipelines that stream data for a
>>>>>> while
>>>>>>>> and then eventually successfully terminate based on some condition.
>>>>>> E.g. a
>>>>>>>> pipeline that watches a continuously growing file until it is marked
>>>>>>>> read-only, or a pipeline that reads a Kafka topic partition until it
>>>>>>>> receives a "poison pill" message. This seems handy.
>>>>>>> --
>>>>>>> Jean-Baptiste Onofré
>>>>>>> jbono...@apache.org
>>>>>>> http://blog.nanthrax.net
>>>>>>> Talend - http://www.talend.com
>>>> 
>>> 

Reply via email to