I think it could work if we allowed a DataStream to be unioned after creation. For example:
DataStream source = .. DataStream mapper = source.map(noOpMapper) DataStream feedback = mapper.filter(...) source.union(feedback) This would basically mean that a DataStream is mutable and can be extended after creation with more streams. On Tue, 7 Jul 2015 at 16:12 Aljoscha Krettek <aljos...@apache.org> wrote: > I think this would be good yes. I was just about to open an Issue for > changing the Streaming Iteration API. :D > > Then we should also make the implementation very straightforward and > simple, right now, the implementation of the iterations is all over the > place. > > On Tue, 7 Jul 2015 at 15:57 Gyula Fóra <gyf...@apache.org> wrote: > >> Hey, >> >> Along with the suggested changes to the streaming API structure I think we >> should also rework the "iteration" api. Currently the iteration api tries >> to mimic the syntax of the batch API while the runtime behaviour is quite >> different. >> >> What we create instead of iterations is really just cyclic streams (loops >> in the streaming job), so the API should somehow be intuitive about this >> behaviour. >> >> I suggest to remove the explicit iterate call and instead add a method to >> the StreamOperators that allows to connect feedback inputs (create loops). >> It would look like this: >> >> A mapper that does nothing but iterates over some filtered input: >> >> *Current API :* >> DataStream source = .. >> IterativeDataStream it = source.iterate() >> DataStream mapper = it.map(noOpMapper) >> DataStream feedback = mapper.filter(...) >> it.closeWith(feedback) >> >> *Suggested API :* >> DataStream source = .. >> DataStream mapper = source.map(noOpMapper) >> DataStream feedback = mapper.filter(...) >> mapper.addInput(feedback) >> >> The suggested approach would let us define inputs to operators after they >> are created and implicitly union them with the normal input. This is I >> think a much clearer approach than what we have now. >> >> What do you think? >> >> Gyula >> >