Hi folks, I was doing some experiments with DataStream#iterate and what felt strange to me is the fact that #iterate() does not terminate on it's own when consuming a _finite_ stream.
I think this is awkward und unexpected. Only thing that "helped" was setting an arbitrary and meaningless timeout on iterate. Imho this should not be necessary (maybe sent an internal "poison message" downward the iteration stream to signal shutdown of the streaming task?) example: // --------------------------------------------------- // does terminate by introducing a meaningless timeout // --------------------------------------------------- val iterationResult1 = env.generateSequence(1, 4).iterate(it => { (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x')) // dump meaningless 'x' chars just to do anything }, 1000, keepPartitioning = false) iterationResult1.print() // --------------------------------------------------- // does NEVER terminate // --------------------------------------------------- val iterationResult2 = env.generateSequence(1, 4).iterate(it => { (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'y')) // dump meaningless 'y' chars just to do anything }) iterationResult2.print() Can someone elaborate on this - should I file a ticket? Regards Peter