Hey,

Along with the suggested changes to the streaming API structure I think we
should also rework the "iteration" api. Currently the iteration api tries
to mimic the syntax of the batch API while the runtime behaviour is quite
different.

What we create instead of iterations is really just cyclic streams (loops
in the streaming job), so the API should somehow be intuitive about this
behaviour.

I suggest to remove the explicit iterate call and instead add a method to
the StreamOperators that allows to connect feedback inputs (create loops).
It would look like this:

A mapper that does nothing but iterates over some filtered input:

*Current API :*
DataStream source = ..
IterativeDataStream it = source.iterate()
DataStream mapper = it.map(noOpMapper)
DataStream feedback = mapper.filter(...)
it.closeWith(feedback)

*Suggested API :*
DataStream source = ..
DataStream mapper = source.map(noOpMapper)
DataStream feedback = mapper.filter(...)
mapper.addInput(feedback)

The suggested approach would let us define inputs to operators after they
are created and implicitly union them with the normal input. This is I
think a much clearer approach than what we have now.

What do you think?

Gyula

Reply via email to