@Aljoscha: Yes, thats basically my point as well. This is what happens now too but we give this mutable datastream a special name : IterativeDataStream
This can be handled in very different ways through the api, the goal would be to make something easy to use. I am fine with what we have now because I know how it works but it might confuse people to call it iterate. Aljoscha Krettek <[email protected]> ezt írta (időpont: 2015. júl. 7., K, 16:18): > I think it could work if we allowed a DataStream to be unioned after > creation. For example: > > DataStream source = .. > DataStream mapper = source.map(noOpMapper) > DataStream feedback = mapper.filter(...) > source.union(feedback) > > This would basically mean that a DataStream is mutable and can be extended > after creation with more streams. > > On Tue, 7 Jul 2015 at 16:12 Aljoscha Krettek <[email protected]> wrote: > > > I think this would be good yes. I was just about to open an Issue for > > changing the Streaming Iteration API. :D > > > > Then we should also make the implementation very straightforward and > > simple, right now, the implementation of the iterations is all over the > > place. > > > > On Tue, 7 Jul 2015 at 15:57 Gyula Fóra <[email protected]> wrote: > > > >> Hey, > >> > >> Along with the suggested changes to the streaming API structure I think > we > >> should also rework the "iteration" api. Currently the iteration api > tries > >> to mimic the syntax of the batch API while the runtime behaviour is > quite > >> different. > >> > >> What we create instead of iterations is really just cyclic streams > (loops > >> in the streaming job), so the API should somehow be intuitive about this > >> behaviour. > >> > >> I suggest to remove the explicit iterate call and instead add a method > to > >> the StreamOperators that allows to connect feedback inputs (create > loops). > >> It would look like this: > >> > >> A mapper that does nothing but iterates over some filtered input: > >> > >> *Current API :* > >> DataStream source = .. > >> IterativeDataStream it = source.iterate() > >> DataStream mapper = it.map(noOpMapper) > >> DataStream feedback = mapper.filter(...) > >> it.closeWith(feedback) > >> > >> *Suggested API :* > >> DataStream source = .. > >> DataStream mapper = source.map(noOpMapper) > >> DataStream feedback = mapper.filter(...) > >> mapper.addInput(feedback) > >> > >> The suggested approach would let us define inputs to operators after > they > >> are created and implicitly union them with the normal input. This is I > >> think a much clearer approach than what we have now. > >> > >> What do you think? > >> > >> Gyula > >> > > >
