Good points. If we want to structured loops on streaming we will need to inject 
iteration counters. The question is if we really need structured iterations on 
plain data streams. Window iterations are must-have on the other hand...

Paris

> On 07 Jul 2015, at 16:43, Kostas Tzoumas <ktzou...@apache.org> wrote:
> 
> I see. Perhaps more important IMO is defining the semantics of stream loops
> with event time.
> 
> The reason I asked about nested is that Naiad and other designs used a
> multidimensional timestamp to capture loops: (outer loop counter, inner
> loop counter, timestamp). I assume that currently making sense of which
> iteration an element comes from is left to the user. Should we aim to
> change that with the API redesign?
> 
> 
> On Tue, Jul 7, 2015 at 4:30 PM, Gyula Fóra <gyula.f...@gmail.com> wrote:
> 
>> Okay, I am fine with this approach as well I see the advantages. Then we
>> just need to find a suitable name for marking a "FeedbackPoint" :)
>> 
>> Stephan Ewen <se...@apache.org> ezt írta (időpont: 2015. júl. 7., K,
>> 16:28):
>> 
>>> In Aljoscha's approach, we would need a special mutable stream. We could
>> do
>>> it like this:
>>> 
>>> DataStream source = ...
>>> 
>>> FeedbackPoint pt = source.createFeedbackPoint();
>>> 
>>> DataStream mapper = pt .map(noOpMapper)
>>> DataStream feedback = mapper.filter(...)
>>> pt .addFeedbacl(feedback)
>>> 
>>> 
>>> It is basically like the current approach, with different names.
>>> 
>>> I actually like the current approach, because it is explicit where
>> streams
>>> could be altered in hind-sight (after their definition).
>>> 
>>> 
>>> On Tue, Jul 7, 2015 at 4:20 PM, Gyula Fóra <gyula.f...@gmail.com> wrote:
>>> 
>>>> @Aljoscha:
>>>> Yes, thats basically my point as well. This is what happens now too but
>>> we
>>>> give this mutable datastream a special name : IterativeDataStream
>>>> 
>>>> This can be handled in very different ways through the api, the goal
>>> would
>>>> be to make something easy to use. I am fine with what we have now
>>> because I
>>>> know how it works but it might confuse people to call it iterate.
>>>> 
>>>> Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2015. júl.
>> 7.,
>>>> K,
>>>> 16:18):
>>>> 
>>>>> I think it could work if we allowed a DataStream to be unioned after
>>>>> creation. For example:
>>>>> 
>>>>> DataStream source = ..
>>>>> DataStream mapper = source.map(noOpMapper)
>>>>> DataStream feedback = mapper.filter(...)
>>>>> source.union(feedback)
>>>>> 
>>>>> This would basically mean that a DataStream is mutable and can be
>>>> extended
>>>>> after creation with more streams.
>>>>> 
>>>>> On Tue, 7 Jul 2015 at 16:12 Aljoscha Krettek <aljos...@apache.org>
>>>> wrote:
>>>>> 
>>>>>> I think this would be good yes. I was just about to open an Issue
>> for
>>>>>> changing the Streaming Iteration API. :D
>>>>>> 
>>>>>> Then we should also make the implementation very straightforward
>> and
>>>>>> simple, right now, the implementation of the iterations is all over
>>> the
>>>>>> place.
>>>>>> 
>>>>>> On Tue, 7 Jul 2015 at 15:57 Gyula Fóra <gyf...@apache.org> wrote:
>>>>>> 
>>>>>>> Hey,
>>>>>>> 
>>>>>>> Along with the suggested changes to the streaming API structure I
>>>> think
>>>>> we
>>>>>>> should also rework the "iteration" api. Currently the iteration
>> api
>>>>> tries
>>>>>>> to mimic the syntax of the batch API while the runtime behaviour
>> is
>>>>> quite
>>>>>>> different.
>>>>>>> 
>>>>>>> What we create instead of iterations is really just cyclic streams
>>>>> (loops
>>>>>>> in the streaming job), so the API should somehow be intuitive
>> about
>>>> this
>>>>>>> behaviour.
>>>>>>> 
>>>>>>> I suggest to remove the explicit iterate call and instead add a
>>> method
>>>>> to
>>>>>>> the StreamOperators that allows to connect feedback inputs (create
>>>>> loops).
>>>>>>> It would look like this:
>>>>>>> 
>>>>>>> A mapper that does nothing but iterates over some filtered input:
>>>>>>> 
>>>>>>> *Current API :*
>>>>>>> DataStream source = ..
>>>>>>> IterativeDataStream it = source.iterate()
>>>>>>> DataStream mapper = it.map(noOpMapper)
>>>>>>> DataStream feedback = mapper.filter(...)
>>>>>>> it.closeWith(feedback)
>>>>>>> 
>>>>>>> *Suggested API :*
>>>>>>> DataStream source = ..
>>>>>>> DataStream mapper = source.map(noOpMapper)
>>>>>>> DataStream feedback = mapper.filter(...)
>>>>>>> mapper.addInput(feedback)
>>>>>>> 
>>>>>>> The suggested approach would let us define inputs to operators
>> after
>>>>> they
>>>>>>> are created and implicitly union them with the normal input. This
>>> is I
>>>>>>> think a much clearer approach than what we have now.
>>>>>>> 
>>>>>>> What do you think?
>>>>>>> 
>>>>>>> Gyula
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 

Reply via email to