Hi folks,

I was doing some experiments with DataStream#iterate and what felt strange to 
me is the fact that #iterate() does not terminate on it's own when consuming a 
_finite_ stream.

I think this is awkward und unexpected. Only thing that "helped" was setting an 
arbitrary and meaningless timeout on iterate.

Imho this should not be necessary (maybe sent an internal "poison message" 
downward the iteration stream to signal shutdown of the streaming task?)

example:

// ---------------------------------------------------
// does terminate by introducing a meaningless timeout
// ---------------------------------------------------
val iterationResult1 = env.generateSequence(1, 4).iterate(it => {
  (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x')) // dump 
meaningless 'x' chars just to do anything
}, 1000, keepPartitioning = false)

iterationResult1.print()

// ---------------------------------------------------
// does NEVER terminate
// ---------------------------------------------------
val iterationResult2 = env.generateSequence(1, 4).iterate(it => {
  (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'y')) // dump 
meaningless 'y' chars just to do anything
})
iterationResult2.print()

Can someone elaborate on this - should I file a ticket?

Regards
Peter

Reply via email to