Hi Xingcan! if a _finite_ stream would, at the end, emit a special, trailing "End-Of-Stream Message" that floats downward the operator stream, wouldn't this enable us to deterministically end the iteration without needing a timeout?
Having an arbitrary timeout that must be longer than any iteration step takes seems really awkward. What you think? Best regards Peter > Am 02.09.2017 um 17:16 schrieb Xingcan Cui <xingc...@gmail.com > <mailto:xingc...@gmail.com>>: > > Hi Peter, > > I just omitted the filter part. Sorry for that. > > Actually, as the javadoc explained, by default a DataStream with iteration > will never terminate. That's because in a > stream environment with iteration, the operator will never know whether the > feedback stream has reached its end > (though the data source is terminated, there may be unknowable subsequent > data) and that's why it needs a > timeout value to make the judgement, just like many other function calls in > network connection. In other words, > you know the feedback stream will be empty in the future, but the operator > doesn't. Thus we provide it a maximum > waiting time for the next record. > > Internally, this mechanism is implemented via a blocking queue (the related > code can be found here > <https://github.com/apache/flink/blob/12b4185c6c09101b64e12a84c33dc4d28f95cff9/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java#L80>). > > Hope everything is considered this time : ) > > Best, > Xingcan > > On Sat, Sep 2, 2017 at 10:08 PM, Peter Ertl <peter.e...@gmx.net > <mailto:peter.e...@gmx.net>> wrote: > >> Am 02.09.2017 um 04:45 schrieb Xingcan Cui <xingc...@gmail.com >> <mailto:xingc...@gmail.com>>: >> >> In your codes, all the the long values will subtract 1 and be sent back to >> the iterate operator, endlessly. > > > Is this true? shouldn't > val iterationResult2 = env.generateSequence(1, 4).iterate(it => { > (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'y')) // dump > meaningless 'y' chars just to do anything > }) > iterationResult2.print() > > produce the following _feedback_ streams? > > initial input to #iterate(): [1 2 3 4] > > iteration #1 : [1 2 3] > iteration #2 : [1 2] > iteration #3 : [1] > iteration #4 : [] => empty feedback stream => cause termination? (which > actually only happens when setting a timeout value) > > Best regards > Peter > Am 02.09.2017 um 17:16 schrieb Xingcan Cui <xingc...@gmail.com>: > > Hi Peter, > > I just omitted the filter part. Sorry for that. > > Actually, as the javadoc explained, by default a DataStream with iteration > will never terminate. That's because in a > stream environment with iteration, the operator will never know whether the > feedback stream has reached its end > (though the data source is terminated, there may be unknowable subsequent > data) and that's why it needs a > timeout value to make the judgement, just like many other function calls in > network connection. In other words, > you know the feedback stream will be empty in the future, but the operator > doesn't. Thus we provide it a maximum > waiting time for the next record. > > Internally, this mechanism is implemented via a blocking queue (the related > code can be found here > <https://github.com/apache/flink/blob/12b4185c6c09101b64e12a84c33dc4d28f95cff9/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java#L80>). > > Hope everything is considered this time : ) > > Best, > Xingcan > > On Sat, Sep 2, 2017 at 10:08 PM, Peter Ertl <peter.e...@gmx.net > <mailto:peter.e...@gmx.net>> wrote: > >> Am 02.09.2017 um 04:45 schrieb Xingcan Cui <xingc...@gmail.com >> <mailto:xingc...@gmail.com>>: >> >> In your codes, all the the long values will subtract 1 and be sent back to >> the iterate operator, endlessly. > > > Is this true? shouldn't > val iterationResult2 = env.generateSequence(1, 4).iterate(it => { > (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'y')) // dump > meaningless 'y' chars just to do anything > }) > iterationResult2.print() > > produce the following _feedback_ streams? > > initial input to #iterate(): [1 2 3 4] > > iteration #1 : [1 2 3] > iteration #2 : [1 2] > iteration #3 : [1] > iteration #4 : [] => empty feedback stream => cause termination? (which > actually only happens when setting a timeout value) > > Best regards > Peter > > >