Hi All,

I have created a review only PR based on the discussion so far.
This will also help make the discussion easier and we can continue with the
review in parallel.

Here is the PR: https://github.com/apache/apex-core/pull/440

Please help review this. I am still working on documentation and tests.

~ Bhupesh

On Sun, Jan 1, 2017 at 9:59 AM, Bhupesh Chawda <bhup...@datatorrent.com>
wrote:

> Yes, that makes sense.
> We have following options:
> 1. Make the annotation false by default and force the user to forward the
> control tuples explicitly.
> 2. Annotation is true by default and static way of blocking stays as it
> is. We provide another way for blocking programmatically, perhaps by means
> of another method call on the port.
>
> ~ Bhupesh
>
> On Dec 30, 2016 00:09, "Pramod Immaneni" <pra...@datatorrent.com> wrote:
>
>> Bhupesh,
>>
>> Annotation seems like a static way to stop propagation. Give these are
>> programmatically generated I would think the operators should be able to
>> stop (consume without propagating) programmatically as well.
>>
>> Thanks
>>
>> On Thu, Dec 29, 2016 at 8:48 AM, Bhupesh Chawda <bhup...@datatorrent.com>
>> wrote:
>>
>> > Thanks Vlad, I am trying out the approach you mentioned regarding having
>> > another interface which allows sinks to put a control tuple.
>> >
>> > Regarding the delivery of control tuples, here is what I am planning to
>> do:
>> > All the control tuples which are emitted in a particular window are
>> > delivered after all the data tuples have been delivered to the
>> respective
>> > ports, but before the endWindow() call. The operator can then process
>> the
>> > control tuples in that window and can do any finalization in the end
>> window
>> > call. There will be no delivery of control tuples after endWindow() and
>> > before the next beginWindow() call.
>> >
>> > For handling the propagation of control tuples further in the dag, we
>> are
>> > planning to have an annotation on the Output Port of the operator which
>> > would be true by default.
>> > @OutputPortFieldAnnotation(propogateControlTuples = false).
>> >
>> > ~ Bhupesh
>> >
>> >
>> > On Thu, Dec 29, 2016 at 6:24 AM, Vlad Rozov <v.ro...@datatorrent.com>
>> > wrote:
>> >
>> > > Custom control tuples are control tuples emitted by an operator itself
>> > and
>> > > not by the platform. Prior to the introduction of the custom control
>> > > tuples, only Apex engine itself puts control tuples into various
>> sinks,
>> > so
>> > > the engine created necessary Tuple objects with the corresponding type
>> > > prior to calling Sink.put().
>> > >
>> > > Not all sinks need to be changed. Only control tuple aware sinks
>> should
>> > > provide such functionality. In the case there is a lot of code
>> > duplication,
>> > > please create an abstract class, that other control aware sinks will
>> > extend
>> > > from.
>> > >
>> > > Thank you,
>> > >
>> > > Vlad
>> > >
>> > >
>> > > On 12/23/16 06:24, Bhupesh Chawda wrote:
>> > >
>> > >> Hi Vlad,
>> > >>
>> > >> Thanks for the pointer on delegating the wrapping of the user tuple
>> to
>> > the
>> > >> control port. I was trying this out today.
>> > >> The problem I see us if we introduce a putControlTuple() method in
>> Sink,
>> > >> then a lot of the existing sinks would change. Also the changes
>> seemed
>> > >> redundant as, the existing control tuples already use the put()
>> method
>> > of
>> > >> sinks. So why do something special for custom control tuples?
>> > >>
>> > >> The only aspect in which the custom control tuples are different is
>> that
>> > >> these will be generated by the user and will actually be delivered to
>> > the
>> > >> ports in a different order. Perhaps we should be able to use the
>> > existing
>> > >> flow. The only problems as outlined before seem to be identification
>> of
>> > >> the
>> > >> user tuple as a control tuple.
>> > >>
>> > >> ~ Bhupesh
>> > >>
>> > >>
>> > >> On Thu, Dec 22, 2016 at 10:44 PM, Vlad Rozov <
>> v.ro...@datatorrent.com>
>> > >> wrote:
>> > >>
>> > >> Why is it necessary to wrap in the OutputPort? Can't it be delegated
>> to
>> > a
>> > >>> Sink by introducing new putControlTuple method?
>> > >>>
>> > >>> Thank you,
>> > >>>
>> > >>> Vlad
>> > >>>
>> > >>>
>> > >>> On 12/21/16 22:10, Bhupesh Chawda wrote:
>> > >>>
>> > >>> Hi Vlad,
>> > >>>>
>> > >>>> The problem in using the Tuple class as the wrapper is that the
>> Ports
>> > >>>> belong to the API and we want to wrap the payload object of the
>> > control
>> > >>>> tuple into the Tuple class which is not part of the API.
>> > >>>>
>> > >>>> The output port will just get the payload of the user control
>> tuple.
>> > For
>> > >>>> example, if the user emits a Long, as a control tuple, the payload
>> > >>>> object
>> > >>>> will just be a Long object.
>> > >>>>
>> > >>>> It is necessary to bundle this Long into some recognizable object
>> so
>> > >>>> that
>> > >>>> the BufferServerPublisher knows that this is a Control tuple and
>> not a
>> > >>>> regular tuple and serialize it accordingly. It is therefore
>> necessary
>> > >>>> that
>> > >>>> the tuple be part of some known hierarchy so that can be
>> distinguished
>> > >>>> from
>> > >>>> other payload tuples. Let us call this class ControlTupleInterface.
>> > Note
>> > >>>> that this needs to be done before the tuple is inserted into the
>> sink
>> > >>>> which
>> > >>>> is done in the port objects. Once the tuple is inserted into the
>> sink,
>> > >>>> it
>> > >>>> would seem just like any other payload tuple and cannot be
>> > >>>> distinguished.
>> > >>>>
>> > >>>> For this reason, I had something like the following in API:
>> > >>>>
>> > >>>> package com.datatorrent.api;
>> > >>>> public class ControlTupleInterface
>> > >>>> {
>> > >>>>     Object payload; // User control tuple payload. A Long() for
>> > example.
>> > >>>>     UUID id;  // Unique Id to de-duplicate in downstream operators
>> > >>>> }
>> > >>>>
>> > >>>> Regarding your suggestion on using the Tuple class as the wrapper
>> for
>> > >>>> the
>> > >>>> control tuple payload, let me mention the current scenario flow to
>> > make
>> > >>>> the
>> > >>>> discussion easier:
>> > >>>>
>> > >>>> We have a Tuple class in buffer server which is responsible for
>> > >>>> serializing
>> > >>>> the user control tuple bundling together a message type:
>> > >>>> CUSTOM_CONTROL_TUPLE_VALUE.
>> > >>>>
>> > >>>>
>> > >>>> *com.datatorrent.bufferserver.packet.Tuple|--
>> > >>>> com.datatorrent.bufferserver.packet.CustomControlTuple*
>> > >>>> We have another Tuple class in Stram which helps the
>> > >>>> BufferServerSubscriber
>> > >>>> to de-serialize the serialized tuples. We should have
>> > CustomControlTuple
>> > >>>> in
>> > >>>> stram as follows:
>> > >>>>
>> > >>>>
>> > >>>> *com.datatorrent.stram.tuple.Tuple|--
>> > >>>> com.datatorrent.stram.tuple.CustomControlTuple*This will have a
>> field
>> > >>>> for
>> > >>>>
>> > >>>> user control payload.
>> > >>>>
>> > >>>> I think we should not expose the Tuple class in stram to the API.
>> That
>> > >>>> was
>> > >>>> the main reason I introduced another class/interface
>> > >>>> ControlTupleInterface
>> > >>>> as described above.
>> > >>>>
>> > >>>> Regarding, adding methods to DefaultInputPort and
>> DefaultOutputPort, I
>> > >>>> think error detection would not be early enough if the control
>> tuple
>> > is
>> > >>>> sent very late in the processing :-)
>> > >>>> Extending the ports to ControlTupleAware* should help in this case.
>> > >>>> However, we still need to see if there are any downsides on doing
>> > this.
>> > >>>>
>> > >>>> Thanks.
>> > >>>>
>> > >>>> ~ Bhupesh
>> > >>>>
>> > >>>>
>> > >>>>
>> > >>>>
>> > >>>> On Thu, Dec 22, 2016 at 7:26 AM, Vlad Rozov <
>> v.ro...@datatorrent.com>
>> > >>>> wrote:
>> > >>>>
>> > >>>> Hi Bhupesh,
>> > >>>>
>> > >>>>> it should not be a CustomWrapper.  The wrapper object should be
>> > >>>>> CustomControlTuple that extends Tuple. There is already code that
>> > >>>>> checks
>> > >>>>> for Tuple instance. The "unWrap" name is misleading, IMO. It
>> should
>> > be
>> > >>>>> something like customControlTuple.getPayload() or
>> > >>>>> customControlTuple.getAttachment(). In the emitControl(), create
>> new
>> > >>>>> CustomControlTuple using provided payload as one of arguments. It
>> may
>> > >>>>> still
>> > >>>>> be good to use common parent other than Object for control tuple
>> > >>>>> payload
>> > >>>>> class hierarchy.
>> > >>>>>
>> > >>>>> I don't understand how adding more methods to the Default
>> > >>>>> implementation
>> > >>>>> will help with early error detection unless application or
>> operator
>> > >>>>> that
>> > >>>>> relies on the custom control tuple functionality explicitly checks
>> > for
>> > >>>>> the
>> > >>>>> platform version at run-time or tries to emit a control tuple
>> just to
>> > >>>>> check
>> > >>>>> that such functionality is supported by the platform.
>> > >>>>>
>> > >>>>> Thank you,
>> > >>>>>
>> > >>>>> Vlad
>> > >>>>>
>> > >>>>> On 12/21/16 04:58, Bhupesh Chawda wrote:
>> > >>>>>
>> > >>>>> Hi Vlad.
>> > >>>>>
>> > >>>>>> Yes, the API should not change. We can take an Object instead,
>> and
>> > >>>>>> later
>> > >>>>>> wrap it into the required class.
>> > >>>>>>
>> > >>>>>> Our InputPort.put and emitControl method would look something
>> like
>> > the
>> > >>>>>> following where we handle the wrapping and unwrapping internally.
>> > >>>>>>
>> > >>>>>> public void put(T tuple)
>> > >>>>>> {
>> > >>>>>>      if (tuple instanceof CustomWrapper) {
>> > >>>>>>        processControl(tuple.unWrap());
>> > >>>>>>      }  else {
>> > >>>>>>        process(tuple)
>> > >>>>>>      }
>> > >>>>>> }
>> > >>>>>>
>> > >>>>>> emitControl(Object tuple)
>> > >>>>>> {
>> > >>>>>>      sink.put(CustomWrapper.wrap(tuple));
>> > >>>>>> }
>> > >>>>>>
>> > >>>>>> Regarding the compatibility issue, I think we have two ways of
>> doing
>> > >>>>>> it:
>> > >>>>>>
>> > >>>>>>       1. Extend DefaultInputPort and DefaultOutputPort and create
>> > >>>>>>       ControlAwareInput and ControlAwareOutput out of it. This
>> might
>> > >>>>>> require us
>> > >>>>>>       to additionally handle specific cases when non-compatible
>> > ports
>> > >>>>>>       (ControlAwareOutput and DefaultInput, for example) are
>> > >>>>>> connected to
>> > >>>>>> each
>> > >>>>>>       other in user apps.
>> > >>>>>>       2. Add the additional methods in the existing Default
>> > >>>>>> implementations.
>> > >>>>>>
>> > >>>>>>
>> > >>>>>> IMO, both of these would help in early error detection.
>> > >>>>>>
>> > >>>>>> ~ Bhupesh
>> > >>>>>>
>> > >>>>>>
>> > >>>>>>
>> > >>>>>>
>> > >>>>>> On Wed, Dec 21, 2016 at 1:36 AM, Vlad Rozov <
>> > v.ro...@datatorrent.com>
>> > >>>>>> wrote:
>> > >>>>>>
>> > >>>>>> A wrapper class is required for the control tuples delivery, but
>> > >>>>>>
>> > >>>>>> Port/Operator API should use Control Tuple payload object only.
>> > >>>>>>> Implementation of the wrapper class may change from version to
>> > >>>>>>> version,
>> > >>>>>>> but
>> > >>>>>>> API should not be affected by the change.
>> > >>>>>>>
>> > >>>>>>> I guess, assumption is that default input and output port will
>> be
>> > >>>>>>> extended
>> > >>>>>>> to provide support for the control tuples. This may cause some
>> > >>>>>>> backward
>> > >>>>>>> compatibility issues. Consider scenario when a newer version of
>> > >>>>>>> Malhar
>> > >>>>>>> that
>> > >>>>>>> relies on EOF control tuple is deployed into older version of
>> core
>> > >>>>>>> that
>> > >>>>>>> does not support control tuples. In such scenario, error will be
>> > >>>>>>> raised
>> > >>>>>>> only when an operator tries to emit EOF control tuple at the end
>> > of a
>> > >>>>>>> job.
>> > >>>>>>> Introducing control tuple aware ports solve the early error
>> > >>>>>>> detection.
>> > >>>>>>> It
>> > >>>>>>> will require some operators to be modified to use control tuple
>> > aware
>> > >>>>>>> ports, but such change may help to distinguish control tuple
>> aware
>> > >>>>>>> operators from their old versions.
>> > >>>>>>>
>> > >>>>>>> Vlad
>> > >>>>>>>
>> > >>>>>>> On 12/20/16 04:09, Bhupesh Chawda wrote:
>> > >>>>>>>
>> > >>>>>>> I investigated this and seems like it is better to have a
>> wrapper
>> > >>>>>>> class
>> > >>>>>>>
>> > >>>>>>> for
>> > >>>>>>>> the user object.
>> > >>>>>>>> This would serve 2 purposes:
>> > >>>>>>>>
>> > >>>>>>>>        1. Allow us to distinguish a custom control tuple from
>> > other
>> > >>>>>>>> payload
>> > >>>>>>>>        tuples.
>> > >>>>>>>>        2. For the same control tuple received from different
>> > >>>>>>>> upstream
>> > >>>>>>>>
>> > >>>>>>>>        partitions, we would have some mechanism to distinguish
>> > >>>>>>>> between
>> > >>>>>>>> the
>> > >>>>>>>> two in
>> > >>>>>>>>        order to identify duplicates.
>> > >>>>>>>>
>> > >>>>>>>> Additionally, the wrapper class needs to be part of the API as
>> > >>>>>>>> DefaultOutputPort needs to know about it, before putting it
>> into
>> > the
>> > >>>>>>>> sink.
>> > >>>>>>>> We can make sure that the user is not able to extend or modify
>> > this
>> > >>>>>>>> class
>> > >>>>>>>> in any manner.
>> > >>>>>>>>
>> > >>>>>>>> ~ Bhupesh
>> > >>>>>>>>
>> > >>>>>>>> On Mon, Dec 19, 2016 at 12:18 PM, David Yan <
>> david...@gmail.com>
>> > >>>>>>>> wrote:
>> > >>>>>>>>
>> > >>>>>>>> This C type parameter is going to fix the control tuple type at
>> > >>>>>>>> compile
>> > >>>>>>>>
>> > >>>>>>>> time and this is actually not what we want. Note that the
>> operator
>> > >>>>>>>> may
>> > >>>>>>>>
>> > >>>>>>>>> receive or emit multiple different control tuple types.
>> > >>>>>>>>>
>> > >>>>>>>>> David
>> > >>>>>>>>>
>> > >>>>>>>>> On Dec 17, 2016 3:33 AM, "Tushar Gosavi" <
>> tus...@datatorrent.com
>> > >
>> > >>>>>>>>> wrote:
>> > >>>>>>>>>
>> > >>>>>>>>> We do not need to create an interface for data emitted through
>> > >>>>>>>>> emitControl or processed through processControl. Internally we
>> > >>>>>>>>> could
>> > >>>>>>>>> wrap the user object in ControlTuple. you can add type
>> parameter
>> > >>>>>>>>> for
>> > >>>>>>>>> control tuple object on ports.
>> > >>>>>>>>>
>> > >>>>>>>>> DefaultInputPort<D,C>
>> > >>>>>>>>> D is the data type and C is the control tuple type for better
>> > error
>> > >>>>>>>>> catching at compile phase.
>> > >>>>>>>>>
>> > >>>>>>>>>
>> > >>>>>>>>> - Tushar.
>> > >>>>>>>>>
>> > >>>>>>>>>
>> > >>>>>>>>> On Sat, Dec 17, 2016 at 8:35 AM, Bhupesh Chawda <
>> > >>>>>>>>> bhup...@datatorrent.com
>> > >>>>>>>>> wrote:
>> > >>>>>>>>>
>> > >>>>>>>>> Agreed Vlad and David.
>> > >>>>>>>>>
>> > >>>>>>>>> I am just suggesting there should be a wrapper for the user
>> > object.
>> > >>>>>>>>>> It
>> > >>>>>>>>>>
>> > >>>>>>>>>> can
>> > >>>>>>>>>>
>> > >>>>>>>>>> be a marker interface and we can call it something else like
>> > >>>>>>>>>
>> > >>>>>>>>> "CustomControl".
>> > >>>>>>>>>>
>> > >>>>>>>>>> The user object will be wrapped in another class
>> "ControlTuple"
>> > >>>>>>>>>> which
>> > >>>>>>>>>> traverses the BufferServer and will perhaps be extended from
>> the
>> > >>>>>>>>>> packet/Tuple class. This class will not be exposed to the
>> user.
>> > >>>>>>>>>>
>> > >>>>>>>>>> ~ Bhupesh
>> > >>>>>>>>>>
>> > >>>>>>>>>>
>> > >>>>>>>>>> On Sat, Dec 17, 2016 at 4:11 AM, Vlad Rozov <
>> > >>>>>>>>>> v.ro...@datatorrent.com>
>> > >>>>>>>>>>
>> > >>>>>>>>>> wrote:
>> > >>>>>>>>>>
>> > >>>>>>>>>> I agree with David. Payload of the control tuple is in the
>> > >>>>>>>>> userObject
>> > >>>>>>>>>
>> > >>>>>>>>> and
>> > >>>>>>>>>> operators/ports don't need to be exposed to the
>> implementation
>> > of
>> > >>>>>>>>>> the
>> > >>>>>>>>>>
>> > >>>>>>>>>> ControlTuple class. With the proposed interface operators
>> > >>>>>>>>>> developers
>> > >>>>>>>>>>
>> > >>>>>>>>>>> are
>> > >>>>>>>>>>> free to extend ControlTuple further and I don't think that
>> such
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> capability
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> needs to be provided. The wrapping in the ControlTuple
>> class is
>> > >>>>>>>>>> necessary
>> > >>>>>>>>>> and most likely ControlTuple needs to be extended from the
>> > buffer
>> > >>>>>>>>>> server
>> > >>>>>>>>>>
>> > >>>>>>>>>> Tuple. It may be good to have a common parent other than
>> Object
>> > >>>>>>>>>> for
>> > >>>>>>>>>>
>> > >>>>>>>>>>> all
>> > >>>>>>>>>>> user payloads, but it may be a marker interface as well.
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> Thank you,
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> Vlad
>> > >>>>>>>>>>>
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> On 12/16/16 09:59, Bhupesh Chawda wrote:
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> Hi David,
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> Actually, I was thinking of another API class called
>> > >>>>>>>>>>> ControlTuple,
>> > >>>>>>>>>>>
>> > >>>>>>>>>>>> different from the actual tuple class in buffer server or
>> > stram.
>> > >>>>>>>>>>>> This could serve as a way for the Buffer server publisher
>> to
>> > >>>>>>>>>>>> understand
>> > >>>>>>>>>>>> that it is a control tuple and needs to be wrapped
>> > differently.
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> ~ Bhupesh
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> On Dec 16, 2016 22:28, "David Yan" <david...@gmail.com>
>> > wrote:
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>       // DefaultInputPort
>> > >>>>>>>>>>>>        public void processControl(ControlTuple tuple)
>> > >>>>>>>>>>>>        {
>> > >>>>>>>>>>>>          // Default Implementation to avoid need to
>> implement
>> > >>>>>>>>>>>> it in
>> > >>>>>>>>>>>> all
>> > >>>>>>>>>>>> implementations
>> > >>>>>>>>>>>>        }
>> > >>>>>>>>>>>> {code}
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> {code}
>> > >>>>>>>>>>>>       // DefaultOutputPort
>> > >>>>>>>>>>>>        public void emitControl(ControlTuple tuple)
>> > >>>>>>>>>>>>        {
>> > >>>>>>>>>>>>        }
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> I think we don't need to expose the ControlTuple class to
>> the
>> > >>>>>>>>>>>> operator
>> > >>>>>>>>>>>> developers because the window ID is just the current
>> window ID
>> > >>>>>>>>>>>> when
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> these
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> methods are called. How about making them just Object? We
>> also
>> > >>>>>>>>>>>
>> > >>>>>>>>>> need to
>> > >>>>>>>>>>
>> > >>>>>>>>>> provide the way for the user to specify custom serializer for
>> > the
>> > >>>>>>>>>>
>> > >>>>>>>>>>> control
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> tuple.
>> > >>>>>>>>>>>
>> > >>>>>>>>>> David
>> > >>>>>>>>>>
>> > >>>>>>>>>>> On Thu, Dec 15, 2016 at 12:43 AM, Bhupesh Chawda <
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> bhup...@datatorrent.com
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> wrote:
>> > >>>>>>>>>>>
>> > >>>>>>>>>> Hi All,
>> > >>>>>>>>>>
>> > >>>>>>>>>>> Here are the initial interfaces:
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> {code}
>> > >>>>>>>>>>>>>       // DefaultInputPort
>> > >>>>>>>>>>>>>        public void processControl(ControlTuple tuple)
>> > >>>>>>>>>>>>>        {
>> > >>>>>>>>>>>>>          // Default Implementation to avoid need to
>> implement
>> > >>>>>>>>>>>>> it
>> > >>>>>>>>>>>>> in
>> > >>>>>>>>>>>>> all
>> > >>>>>>>>>>>>> implementations
>> > >>>>>>>>>>>>>        }
>> > >>>>>>>>>>>>> {code}
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> {code}
>> > >>>>>>>>>>>>>       // DefaultOutputPort
>> > >>>>>>>>>>>>>        public void emitControl(ControlTuple tuple)
>> > >>>>>>>>>>>>>        {
>> > >>>>>>>>>>>>>        }
>> > >>>>>>>>>>>>> {code}
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> We have an option to add these methods to the interfaces -
>> > >>>>>>>>>>>>> InputPort
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> and
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> OutputPort; But these would not be backward compatible and
>> > also
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>> not
>> > >>>>>>>>>>> consistent with the current implementation of basic data
>> tuple
>> > >>>>>>>>>>> flow
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> (as
>> > >>>>>>>>>>>> with process() and emit()).
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>> We also need to expose an interface / class for users to
>> wrap
>> > >>>>>>>>>>> their
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> object
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>> and emit downstream. This should be part of API.
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> {code}
>> > >>>>>>>>>>>>> public class ControlTuple extends Tuple
>> > >>>>>>>>>>>>> {
>> > >>>>>>>>>>>>>        Object userObject;
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>>        public ControlTuple(long windowId, Object
>> userObject)
>> > >>>>>>>>>>>>>        {
>> > >>>>>>>>>>>>>          //
>> > >>>>>>>>>>>>>        }
>> > >>>>>>>>>>>>> }
>> > >>>>>>>>>>>>> {code}
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> The emitted tuples would traverse the same flow as with
>> other
>> > >>>>>>>>>>>>> control
>> > >>>>>>>>>>>>> tuples. The plan is to intercept the control tuples in
>> > >>>>>>>>>>>>> GenericNode
>> > >>>>>>>>>>>>> and
>> > >>>>>>>>>>>>> use
>> > >>>>>>>>>>>>> the Reservior to emit the control tuples at the end of the
>> > >>>>>>>>>>>>> window.
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> GenericNode seems to be the best place to buffer incoming
>> > >>>>>>>>>>>>> custom
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> control
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> tuples without delivering them immediately to the operator
>> > >>>>>>>>>>>> port.
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>> Once
>> > >>>>>>>>>>> the
>> > >>>>>>>>>>> end of the window is reached, we plan to use the reservoir
>> sink
>> > >>>>>>>>>>> to
>> > >>>>>>>>>>> push
>> > >>>>>>>>>>> them to the port. This is different behavior than any other
>> > >>>>>>>>>>> control
>> > >>>>>>>>>>> tuple
>> > >>>>>>>>>>> where we are changing the order of tuples in the stream. The
>> > >>>>>>>>>>> custom
>> > >>>>>>>>>>> control
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> tuples will be buffered and not delivered to the ports until
>> > the
>> > >>>>>>>>>>>> end
>> > >>>>>>>>>>>> of
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>> the
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>> window.
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> To accomplish this, we need to have a public method in
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>>>> SweepableReservoir
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>> which allows to put a tuple back in the sink of the
>> > reservoir.
>> > >>>>>>>>>>>>
>> > >>>>>>>>>>> ~ Bhupesh
>> > >>>>>>>>>>>
>> > >>>>>>>>>>>
>> > >>>>>>>>>>>>>
>> > >>>>>>>>>>>>>
>> > >
>> >
>>
>

Reply via email to