Hi Peter,

Let me try to explain this.

As you shown in the examples, the iterate method takes a function, which
"split" the initial stream
into two separate streams, i.e., initialStream => (stream1, stream2). The
stream2 works as the output
stream, whose results will be emitted to the successor operators (PrintSink
in your example), while
the stream1 works as a feedback stream, whose results will be resent to the
iterate operator.

In your codes, all the the long values will subtract 1 and be sent back to
the iterate operator, endlessly.
Try replacing your first map function to (_ + 1) and you'll see the
infinite results. For more information,
you can refer to this
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/datastream_api.html#iterations>
or
read the javadoc.

Hope that helps.

Best,
Xingcan

On Fri, Sep 1, 2017 at 5:29 PM, Peter Ertl <peter.e...@gmx.net> wrote:

> Hi folks,
>
> I was doing some experiments with DataStream#iterate and what felt strange
> to me is the fact that #iterate() does not terminate on it's own when
> consuming a _finite_ stream.
>
> I think this is awkward und unexpected. Only thing that "helped" was
> setting an arbitrary and meaningless timeout on iterate.
>
> Imho this should not be necessary (maybe sent an internal "poison message"
> downward the iteration stream to signal shutdown of the streaming task?)
>
> example:
>
> // ---------------------------------------------------
>
> // does terminate by introducing a meaningless timeout
> // ---------------------------------------------------
> val iterationResult1 = env.generateSequence(1, 4).iterate(it => {
>   (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x')) // dump 
> meaningless 'x' chars just to do anything
> }, 1000, keepPartitioning = false)
>
> iterationResult1.print()
>
> // ---------------------------------------------------
> // does NEVER terminate
> // ---------------------------------------------------
> val iterationResult2 = env.generateSequence(1, 4).iterate(it => {
>   (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'y')) // dump 
> meaningless 'y' chars just to do anything
> })
> iterationResult2.print()
>
>
> Can someone elaborate on this - should I file a ticket?
>
> Regards
> Peter
>

Reply via email to