Hi Xingcan!

if a _finite_ stream would, at the end, emit a special, trailing "End-Of-Stream 
Message" that floats downward the operator stream, wouldn't this enable us to 
deterministically end the iteration without needing a timeout?

Having an arbitrary timeout that must be longer than any iteration step takes 
seems really awkward.

What you think?

Best regards
Peter


> Am 02.09.2017 um 17:16 schrieb Xingcan Cui <xingc...@gmail.com 
> <mailto:xingc...@gmail.com>>:
> 
> Hi Peter,
> 
> I just omitted the filter part. Sorry for that.
> 
> Actually, as the javadoc explained, by default a DataStream with iteration 
> will never terminate. That's because in a
> stream environment with iteration, the operator will never know whether the 
> feedback stream has reached its end
> (though the data source is terminated, there may be unknowable subsequent 
> data) and that's why it needs a
> timeout value to make the judgement, just like many other function calls in 
> network connection. In other words,
> you know the feedback stream will be empty in the future, but the operator 
> doesn't. Thus we provide it a maximum
> waiting time for the next record.
> 
> Internally, this mechanism is implemented via a blocking queue (the related 
> code can be found here 
> <https://github.com/apache/flink/blob/12b4185c6c09101b64e12a84c33dc4d28f95cff9/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java#L80>).
> 
> Hope everything is considered this time : )
> 
> Best,
> Xingcan
> 
> On Sat, Sep 2, 2017 at 10:08 PM, Peter Ertl <peter.e...@gmx.net 
> <mailto:peter.e...@gmx.net>> wrote:
> 
>> Am 02.09.2017 um 04:45 schrieb Xingcan Cui <xingc...@gmail.com 
>> <mailto:xingc...@gmail.com>>:
>> 
>> In your codes, all the the long values will subtract 1 and be sent back to 
>> the iterate operator, endlessly.
> 
> 
> Is this true? shouldn't
>   val iterationResult2 = env.generateSequence(1, 4).iterate(it => {
>     (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'y')) // dump 
> meaningless 'y' chars just to do anything
>   })
>   iterationResult2.print()
> 
> produce the following _feedback_ streams?
> 
> initial input to #iterate(): [1 2 3 4]
> 
> iteration #1 : [1 2 3]
> iteration #2 : [1 2]
> iteration #3 : [1]
> iteration #4 : []  => empty feedback stream => cause termination? (which 
> actually only happens when setting a timeout value)
> 
> Best regards
> Peter



> Am 02.09.2017 um 17:16 schrieb Xingcan Cui <xingc...@gmail.com>:
> 
> Hi Peter,
> 
> I just omitted the filter part. Sorry for that.
> 
> Actually, as the javadoc explained, by default a DataStream with iteration 
> will never terminate. That's because in a
> stream environment with iteration, the operator will never know whether the 
> feedback stream has reached its end
> (though the data source is terminated, there may be unknowable subsequent 
> data) and that's why it needs a
> timeout value to make the judgement, just like many other function calls in 
> network connection. In other words,
> you know the feedback stream will be empty in the future, but the operator 
> doesn't. Thus we provide it a maximum
> waiting time for the next record.
> 
> Internally, this mechanism is implemented via a blocking queue (the related 
> code can be found here 
> <https://github.com/apache/flink/blob/12b4185c6c09101b64e12a84c33dc4d28f95cff9/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java#L80>).
> 
> Hope everything is considered this time : )
> 
> Best,
> Xingcan
> 
> On Sat, Sep 2, 2017 at 10:08 PM, Peter Ertl <peter.e...@gmx.net 
> <mailto:peter.e...@gmx.net>> wrote:
> 
>> Am 02.09.2017 um 04:45 schrieb Xingcan Cui <xingc...@gmail.com 
>> <mailto:xingc...@gmail.com>>:
>> 
>> In your codes, all the the long values will subtract 1 and be sent back to 
>> the iterate operator, endlessly.
> 
> 
> Is this true? shouldn't
>   val iterationResult2 = env.generateSequence(1, 4).iterate(it => {
>     (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'y')) // dump 
> meaningless 'y' chars just to do anything
>   })
>   iterationResult2.print()
> 
> produce the following _feedback_ streams?
> 
> initial input to #iterate(): [1 2 3 4]
> 
> iteration #1 : [1 2 3]
> iteration #2 : [1 2]
> iteration #3 : [1]
> iteration #4 : []  => empty feedback stream => cause termination? (which 
> actually only happens when setting a timeout value)
> 
> Best regards
> Peter
> 
> 
> 

Reply via email to