Hey, Along with the suggested changes to the streaming API structure I think we should also rework the "iteration" api. Currently the iteration api tries to mimic the syntax of the batch API while the runtime behaviour is quite different.
What we create instead of iterations is really just cyclic streams (loops in the streaming job), so the API should somehow be intuitive about this behaviour. I suggest to remove the explicit iterate call and instead add a method to the StreamOperators that allows to connect feedback inputs (create loops). It would look like this: A mapper that does nothing but iterates over some filtered input: *Current API :* DataStream source = .. IterativeDataStream it = source.iterate() DataStream mapper = it.map(noOpMapper) DataStream feedback = mapper.filter(...) it.closeWith(feedback) *Suggested API :* DataStream source = .. DataStream mapper = source.map(noOpMapper) DataStream feedback = mapper.filter(...) mapper.addInput(feedback) The suggested approach would let us define inputs to operators after they are created and implicitly union them with the normal input. This is I think a much clearer approach than what we have now. What do you think? Gyula