+1 for rethinking the iterations in DataStream

However, wouldn't this proposal allow the definition of arbitrary loops
(e.g., nested loops) that are not well behaved afaik?

On Tue, Jul 7, 2015 at 4:12 PM, Stephan Ewen <se...@apache.org> wrote:

> I see that the newly proposed API makes some things easier to define.
>
> There is one source of confusion, though, in my opinion:
>
> The new API suggests that the data stream actually refers to the operator
> that created it.
> The "DataStream mapper = source.map(noOpMapper)" here refers to the map
> operator, not to the result of the map function.
>
> When adding the feedback input, you add the input to the stream before the
> stream that you call "addInput()" on. Here, the feedback  actually gets
> unioned with the source data stream, not with the result of the mapper.
> This seems very weird to me.
>
> What happens here:
>
> DataStream source = env.createStream(myKafkaConnector);
> DataStream mapper = source.map(noOpMapper)
> source.addInput(feedback)
>
> or here:
>
> DataStream source1 = env.createStream(myKafkaConnector);
> DataStream source2 = env.createStream(myKafkaConnector);
>
> DataStream joined =
> source1.keyBy(...).join(source2.keyBy(...)).onWindow(...);
> DataStream feedback = joined.map(someMapper);
> joined.addInput(feedback);
>
>
>
>
> On Tue, Jul 7, 2015 at 3:57 PM, Gyula Fóra <gyf...@apache.org> wrote:
>
> > Hey,
> >
> > Along with the suggested changes to the streaming API structure I think
> we
> > should also rework the "iteration" api. Currently the iteration api tries
> > to mimic the syntax of the batch API while the runtime behaviour is quite
> > different.
> >
> > What we create instead of iterations is really just cyclic streams (loops
> > in the streaming job), so the API should somehow be intuitive about this
> > behaviour.
> >
> > I suggest to remove the explicit iterate call and instead add a method to
> > the StreamOperators that allows to connect feedback inputs (create
> loops).
> > It would look like this:
> >
> > A mapper that does nothing but iterates over some filtered input:
> >
> > *Current API :*
> > DataStream source = ..
> > IterativeDataStream it = source.iterate()
> > DataStream mapper = it.map(noOpMapper)
> > DataStream feedback = mapper.filter(...)
> > it.closeWith(feedback)
> >
> > *Suggested API :*
> > DataStream source = ..
> > DataStream mapper = source.map(noOpMapper)
> > DataStream feedback = mapper.filter(...)
> > mapper.addInput(feedback)
> >
> > The suggested approach would let us define inputs to operators after they
> > are created and implicitly union them with the normal input. This is I
> > think a much clearer approach than what we have now.
> >
> > What do you think?
> >
> > Gyula
> >
>

Reply via email to