Bhupesh,

Annotation seems like a static way to stop propagation. Give these are
programmatically generated I would think the operators should be able to
stop (consume without propagating) programmatically as well.

Thanks

On Thu, Dec 29, 2016 at 8:48 AM, Bhupesh Chawda <[email protected]>
wrote:

> Thanks Vlad, I am trying out the approach you mentioned regarding having
> another interface which allows sinks to put a control tuple.
>
> Regarding the delivery of control tuples, here is what I am planning to do:
> All the control tuples which are emitted in a particular window are
> delivered after all the data tuples have been delivered to the respective
> ports, but before the endWindow() call. The operator can then process the
> control tuples in that window and can do any finalization in the end window
> call. There will be no delivery of control tuples after endWindow() and
> before the next beginWindow() call.
>
> For handling the propagation of control tuples further in the dag, we are
> planning to have an annotation on the Output Port of the operator which
> would be true by default.
> @OutputPortFieldAnnotation(propogateControlTuples = false).
>
> ~ Bhupesh
>
>
> On Thu, Dec 29, 2016 at 6:24 AM, Vlad Rozov <[email protected]>
> wrote:
>
> > Custom control tuples are control tuples emitted by an operator itself
> and
> > not by the platform. Prior to the introduction of the custom control
> > tuples, only Apex engine itself puts control tuples into various sinks,
> so
> > the engine created necessary Tuple objects with the corresponding type
> > prior to calling Sink.put().
> >
> > Not all sinks need to be changed. Only control tuple aware sinks should
> > provide such functionality. In the case there is a lot of code
> duplication,
> > please create an abstract class, that other control aware sinks will
> extend
> > from.
> >
> > Thank you,
> >
> > Vlad
> >
> >
> > On 12/23/16 06:24, Bhupesh Chawda wrote:
> >
> >> Hi Vlad,
> >>
> >> Thanks for the pointer on delegating the wrapping of the user tuple to
> the
> >> control port. I was trying this out today.
> >> The problem I see us if we introduce a putControlTuple() method in Sink,
> >> then a lot of the existing sinks would change. Also the changes seemed
> >> redundant as, the existing control tuples already use the put() method
> of
> >> sinks. So why do something special for custom control tuples?
> >>
> >> The only aspect in which the custom control tuples are different is that
> >> these will be generated by the user and will actually be delivered to
> the
> >> ports in a different order. Perhaps we should be able to use the
> existing
> >> flow. The only problems as outlined before seem to be identification of
> >> the
> >> user tuple as a control tuple.
> >>
> >> ~ Bhupesh
> >>
> >>
> >> On Thu, Dec 22, 2016 at 10:44 PM, Vlad Rozov <[email protected]>
> >> wrote:
> >>
> >> Why is it necessary to wrap in the OutputPort? Can't it be delegated to
> a
> >>> Sink by introducing new putControlTuple method?
> >>>
> >>> Thank you,
> >>>
> >>> Vlad
> >>>
> >>>
> >>> On 12/21/16 22:10, Bhupesh Chawda wrote:
> >>>
> >>> Hi Vlad,
> >>>>
> >>>> The problem in using the Tuple class as the wrapper is that the Ports
> >>>> belong to the API and we want to wrap the payload object of the
> control
> >>>> tuple into the Tuple class which is not part of the API.
> >>>>
> >>>> The output port will just get the payload of the user control tuple.
> For
> >>>> example, if the user emits a Long, as a control tuple, the payload
> >>>> object
> >>>> will just be a Long object.
> >>>>
> >>>> It is necessary to bundle this Long into some recognizable object so
> >>>> that
> >>>> the BufferServerPublisher knows that this is a Control tuple and not a
> >>>> regular tuple and serialize it accordingly. It is therefore necessary
> >>>> that
> >>>> the tuple be part of some known hierarchy so that can be distinguished
> >>>> from
> >>>> other payload tuples. Let us call this class ControlTupleInterface.
> Note
> >>>> that this needs to be done before the tuple is inserted into the sink
> >>>> which
> >>>> is done in the port objects. Once the tuple is inserted into the sink,
> >>>> it
> >>>> would seem just like any other payload tuple and cannot be
> >>>> distinguished.
> >>>>
> >>>> For this reason, I had something like the following in API:
> >>>>
> >>>> package com.datatorrent.api;
> >>>> public class ControlTupleInterface
> >>>> {
> >>>>     Object payload; // User control tuple payload. A Long() for
> example.
> >>>>     UUID id;  // Unique Id to de-duplicate in downstream operators
> >>>> }
> >>>>
> >>>> Regarding your suggestion on using the Tuple class as the wrapper for
> >>>> the
> >>>> control tuple payload, let me mention the current scenario flow to
> make
> >>>> the
> >>>> discussion easier:
> >>>>
> >>>> We have a Tuple class in buffer server which is responsible for
> >>>> serializing
> >>>> the user control tuple bundling together a message type:
> >>>> CUSTOM_CONTROL_TUPLE_VALUE.
> >>>>
> >>>>
> >>>> *com.datatorrent.bufferserver.packet.Tuple|--
> >>>> com.datatorrent.bufferserver.packet.CustomControlTuple*
> >>>> We have another Tuple class in Stram which helps the
> >>>> BufferServerSubscriber
> >>>> to de-serialize the serialized tuples. We should have
> CustomControlTuple
> >>>> in
> >>>> stram as follows:
> >>>>
> >>>>
> >>>> *com.datatorrent.stram.tuple.Tuple|--
> >>>> com.datatorrent.stram.tuple.CustomControlTuple*This will have a field
> >>>> for
> >>>>
> >>>> user control payload.
> >>>>
> >>>> I think we should not expose the Tuple class in stram to the API. That
> >>>> was
> >>>> the main reason I introduced another class/interface
> >>>> ControlTupleInterface
> >>>> as described above.
> >>>>
> >>>> Regarding, adding methods to DefaultInputPort and DefaultOutputPort, I
> >>>> think error detection would not be early enough if the control tuple
> is
> >>>> sent very late in the processing :-)
> >>>> Extending the ports to ControlTupleAware* should help in this case.
> >>>> However, we still need to see if there are any downsides on doing
> this.
> >>>>
> >>>> Thanks.
> >>>>
> >>>> ~ Bhupesh
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> On Thu, Dec 22, 2016 at 7:26 AM, Vlad Rozov <[email protected]>
> >>>> wrote:
> >>>>
> >>>> Hi Bhupesh,
> >>>>
> >>>>> it should not be a CustomWrapper.  The wrapper object should be
> >>>>> CustomControlTuple that extends Tuple. There is already code that
> >>>>> checks
> >>>>> for Tuple instance. The "unWrap" name is misleading, IMO. It should
> be
> >>>>> something like customControlTuple.getPayload() or
> >>>>> customControlTuple.getAttachment(). In the emitControl(), create new
> >>>>> CustomControlTuple using provided payload as one of arguments. It may
> >>>>> still
> >>>>> be good to use common parent other than Object for control tuple
> >>>>> payload
> >>>>> class hierarchy.
> >>>>>
> >>>>> I don't understand how adding more methods to the Default
> >>>>> implementation
> >>>>> will help with early error detection unless application or operator
> >>>>> that
> >>>>> relies on the custom control tuple functionality explicitly checks
> for
> >>>>> the
> >>>>> platform version at run-time or tries to emit a control tuple just to
> >>>>> check
> >>>>> that such functionality is supported by the platform.
> >>>>>
> >>>>> Thank you,
> >>>>>
> >>>>> Vlad
> >>>>>
> >>>>> On 12/21/16 04:58, Bhupesh Chawda wrote:
> >>>>>
> >>>>> Hi Vlad.
> >>>>>
> >>>>>> Yes, the API should not change. We can take an Object instead, and
> >>>>>> later
> >>>>>> wrap it into the required class.
> >>>>>>
> >>>>>> Our InputPort.put and emitControl method would look something like
> the
> >>>>>> following where we handle the wrapping and unwrapping internally.
> >>>>>>
> >>>>>> public void put(T tuple)
> >>>>>> {
> >>>>>>      if (tuple instanceof CustomWrapper) {
> >>>>>>        processControl(tuple.unWrap());
> >>>>>>      }  else {
> >>>>>>        process(tuple)
> >>>>>>      }
> >>>>>> }
> >>>>>>
> >>>>>> emitControl(Object tuple)
> >>>>>> {
> >>>>>>      sink.put(CustomWrapper.wrap(tuple));
> >>>>>> }
> >>>>>>
> >>>>>> Regarding the compatibility issue, I think we have two ways of doing
> >>>>>> it:
> >>>>>>
> >>>>>>       1. Extend DefaultInputPort and DefaultOutputPort and create
> >>>>>>       ControlAwareInput and ControlAwareOutput out of it. This might
> >>>>>> require us
> >>>>>>       to additionally handle specific cases when non-compatible
> ports
> >>>>>>       (ControlAwareOutput and DefaultInput, for example) are
> >>>>>> connected to
> >>>>>> each
> >>>>>>       other in user apps.
> >>>>>>       2. Add the additional methods in the existing Default
> >>>>>> implementations.
> >>>>>>
> >>>>>>
> >>>>>> IMO, both of these would help in early error detection.
> >>>>>>
> >>>>>> ~ Bhupesh
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Wed, Dec 21, 2016 at 1:36 AM, Vlad Rozov <
> [email protected]>
> >>>>>> wrote:
> >>>>>>
> >>>>>> A wrapper class is required for the control tuples delivery, but
> >>>>>>
> >>>>>> Port/Operator API should use Control Tuple payload object only.
> >>>>>>> Implementation of the wrapper class may change from version to
> >>>>>>> version,
> >>>>>>> but
> >>>>>>> API should not be affected by the change.
> >>>>>>>
> >>>>>>> I guess, assumption is that default input and output port will be
> >>>>>>> extended
> >>>>>>> to provide support for the control tuples. This may cause some
> >>>>>>> backward
> >>>>>>> compatibility issues. Consider scenario when a newer version of
> >>>>>>> Malhar
> >>>>>>> that
> >>>>>>> relies on EOF control tuple is deployed into older version of core
> >>>>>>> that
> >>>>>>> does not support control tuples. In such scenario, error will be
> >>>>>>> raised
> >>>>>>> only when an operator tries to emit EOF control tuple at the end
> of a
> >>>>>>> job.
> >>>>>>> Introducing control tuple aware ports solve the early error
> >>>>>>> detection.
> >>>>>>> It
> >>>>>>> will require some operators to be modified to use control tuple
> aware
> >>>>>>> ports, but such change may help to distinguish control tuple aware
> >>>>>>> operators from their old versions.
> >>>>>>>
> >>>>>>> Vlad
> >>>>>>>
> >>>>>>> On 12/20/16 04:09, Bhupesh Chawda wrote:
> >>>>>>>
> >>>>>>> I investigated this and seems like it is better to have a wrapper
> >>>>>>> class
> >>>>>>>
> >>>>>>> for
> >>>>>>>> the user object.
> >>>>>>>> This would serve 2 purposes:
> >>>>>>>>
> >>>>>>>>        1. Allow us to distinguish a custom control tuple from
> other
> >>>>>>>> payload
> >>>>>>>>        tuples.
> >>>>>>>>        2. For the same control tuple received from different
> >>>>>>>> upstream
> >>>>>>>>
> >>>>>>>>        partitions, we would have some mechanism to distinguish
> >>>>>>>> between
> >>>>>>>> the
> >>>>>>>> two in
> >>>>>>>>        order to identify duplicates.
> >>>>>>>>
> >>>>>>>> Additionally, the wrapper class needs to be part of the API as
> >>>>>>>> DefaultOutputPort needs to know about it, before putting it into
> the
> >>>>>>>> sink.
> >>>>>>>> We can make sure that the user is not able to extend or modify
> this
> >>>>>>>> class
> >>>>>>>> in any manner.
> >>>>>>>>
> >>>>>>>> ~ Bhupesh
> >>>>>>>>
> >>>>>>>> On Mon, Dec 19, 2016 at 12:18 PM, David Yan <[email protected]>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>> This C type parameter is going to fix the control tuple type at
> >>>>>>>> compile
> >>>>>>>>
> >>>>>>>> time and this is actually not what we want. Note that the operator
> >>>>>>>> may
> >>>>>>>>
> >>>>>>>>> receive or emit multiple different control tuple types.
> >>>>>>>>>
> >>>>>>>>> David
> >>>>>>>>>
> >>>>>>>>> On Dec 17, 2016 3:33 AM, "Tushar Gosavi" <[email protected]
> >
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>> We do not need to create an interface for data emitted through
> >>>>>>>>> emitControl or processed through processControl. Internally we
> >>>>>>>>> could
> >>>>>>>>> wrap the user object in ControlTuple. you can add type parameter
> >>>>>>>>> for
> >>>>>>>>> control tuple object on ports.
> >>>>>>>>>
> >>>>>>>>> DefaultInputPort<D,C>
> >>>>>>>>> D is the data type and C is the control tuple type for better
> error
> >>>>>>>>> catching at compile phase.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> - Tushar.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Sat, Dec 17, 2016 at 8:35 AM, Bhupesh Chawda <
> >>>>>>>>> [email protected]
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>> Agreed Vlad and David.
> >>>>>>>>>
> >>>>>>>>> I am just suggesting there should be a wrapper for the user
> object.
> >>>>>>>>>> It
> >>>>>>>>>>
> >>>>>>>>>> can
> >>>>>>>>>>
> >>>>>>>>>> be a marker interface and we can call it something else like
> >>>>>>>>>
> >>>>>>>>> "CustomControl".
> >>>>>>>>>>
> >>>>>>>>>> The user object will be wrapped in another class "ControlTuple"
> >>>>>>>>>> which
> >>>>>>>>>> traverses the BufferServer and will perhaps be extended from the
> >>>>>>>>>> packet/Tuple class. This class will not be exposed to the user.
> >>>>>>>>>>
> >>>>>>>>>> ~ Bhupesh
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Sat, Dec 17, 2016 at 4:11 AM, Vlad Rozov <
> >>>>>>>>>> [email protected]>
> >>>>>>>>>>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>> I agree with David. Payload of the control tuple is in the
> >>>>>>>>> userObject
> >>>>>>>>>
> >>>>>>>>> and
> >>>>>>>>>> operators/ports don't need to be exposed to the implementation
> of
> >>>>>>>>>> the
> >>>>>>>>>>
> >>>>>>>>>> ControlTuple class. With the proposed interface operators
> >>>>>>>>>> developers
> >>>>>>>>>>
> >>>>>>>>>>> are
> >>>>>>>>>>> free to extend ControlTuple further and I don't think that such
> >>>>>>>>>>>
> >>>>>>>>>>> capability
> >>>>>>>>>>>
> >>>>>>>>>>> needs to be provided. The wrapping in the ControlTuple class is
> >>>>>>>>>> necessary
> >>>>>>>>>> and most likely ControlTuple needs to be extended from the
> buffer
> >>>>>>>>>> server
> >>>>>>>>>>
> >>>>>>>>>> Tuple. It may be good to have a common parent other than Object
> >>>>>>>>>> for
> >>>>>>>>>>
> >>>>>>>>>>> all
> >>>>>>>>>>> user payloads, but it may be a marker interface as well.
> >>>>>>>>>>>
> >>>>>>>>>>> Thank you,
> >>>>>>>>>>>
> >>>>>>>>>>> Vlad
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On 12/16/16 09:59, Bhupesh Chawda wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> Hi David,
> >>>>>>>>>>>
> >>>>>>>>>>> Actually, I was thinking of another API class called
> >>>>>>>>>>> ControlTuple,
> >>>>>>>>>>>
> >>>>>>>>>>>> different from the actual tuple class in buffer server or
> stram.
> >>>>>>>>>>>> This could serve as a way for the Buffer server publisher to
> >>>>>>>>>>>> understand
> >>>>>>>>>>>> that it is a control tuple and needs to be wrapped
> differently.
> >>>>>>>>>>>>
> >>>>>>>>>>>> ~ Bhupesh
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Dec 16, 2016 22:28, "David Yan" <[email protected]>
> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>       // DefaultInputPort
> >>>>>>>>>>>>        public void processControl(ControlTuple tuple)
> >>>>>>>>>>>>        {
> >>>>>>>>>>>>          // Default Implementation to avoid need to implement
> >>>>>>>>>>>> it in
> >>>>>>>>>>>> all
> >>>>>>>>>>>> implementations
> >>>>>>>>>>>>        }
> >>>>>>>>>>>> {code}
> >>>>>>>>>>>>
> >>>>>>>>>>>> {code}
> >>>>>>>>>>>>       // DefaultOutputPort
> >>>>>>>>>>>>        public void emitControl(ControlTuple tuple)
> >>>>>>>>>>>>        {
> >>>>>>>>>>>>        }
> >>>>>>>>>>>>
> >>>>>>>>>>>> I think we don't need to expose the ControlTuple class to the
> >>>>>>>>>>>> operator
> >>>>>>>>>>>> developers because the window ID is just the current window ID
> >>>>>>>>>>>> when
> >>>>>>>>>>>>
> >>>>>>>>>>>> these
> >>>>>>>>>>>>
> >>>>>>>>>>>> methods are called. How about making them just Object? We also
> >>>>>>>>>>>
> >>>>>>>>>> need to
> >>>>>>>>>>
> >>>>>>>>>> provide the way for the user to specify custom serializer for
> the
> >>>>>>>>>>
> >>>>>>>>>>> control
> >>>>>>>>>>>>
> >>>>>>>>>>>> tuple.
> >>>>>>>>>>>
> >>>>>>>>>> David
> >>>>>>>>>>
> >>>>>>>>>>> On Thu, Dec 15, 2016 at 12:43 AM, Bhupesh Chawda <
> >>>>>>>>>>>>
> >>>>>>>>>>>> [email protected]
> >>>>>>>>>>>>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>> Hi All,
> >>>>>>>>>>
> >>>>>>>>>>> Here are the initial interfaces:
> >>>>>>>>>>>>
> >>>>>>>>>>>> {code}
> >>>>>>>>>>>>>       // DefaultInputPort
> >>>>>>>>>>>>>        public void processControl(ControlTuple tuple)
> >>>>>>>>>>>>>        {
> >>>>>>>>>>>>>          // Default Implementation to avoid need to implement
> >>>>>>>>>>>>> it
> >>>>>>>>>>>>> in
> >>>>>>>>>>>>> all
> >>>>>>>>>>>>> implementations
> >>>>>>>>>>>>>        }
> >>>>>>>>>>>>> {code}
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> {code}
> >>>>>>>>>>>>>       // DefaultOutputPort
> >>>>>>>>>>>>>        public void emitControl(ControlTuple tuple)
> >>>>>>>>>>>>>        {
> >>>>>>>>>>>>>        }
> >>>>>>>>>>>>> {code}
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> We have an option to add these methods to the interfaces -
> >>>>>>>>>>>>> InputPort
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> and
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> OutputPort; But these would not be backward compatible and
> also
> >>>>>>>>>>>>
> >>>>>>>>>>> not
> >>>>>>>>>>> consistent with the current implementation of basic data tuple
> >>>>>>>>>>> flow
> >>>>>>>>>>>
> >>>>>>>>>>> (as
> >>>>>>>>>>>> with process() and emit()).
> >>>>>>>>>>>>
> >>>>>>>>>>> We also need to expose an interface / class for users to wrap
> >>>>>>>>>>> their
> >>>>>>>>>>>
> >>>>>>>>>>> object
> >>>>>>>>>>>>
> >>>>>>>>>>>>> and emit downstream. This should be part of API.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> {code}
> >>>>>>>>>>>>> public class ControlTuple extends Tuple
> >>>>>>>>>>>>> {
> >>>>>>>>>>>>>        Object userObject;
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>        public ControlTuple(long windowId, Object userObject)
> >>>>>>>>>>>>>        {
> >>>>>>>>>>>>>          //
> >>>>>>>>>>>>>        }
> >>>>>>>>>>>>> }
> >>>>>>>>>>>>> {code}
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> The emitted tuples would traverse the same flow as with other
> >>>>>>>>>>>>> control
> >>>>>>>>>>>>> tuples. The plan is to intercept the control tuples in
> >>>>>>>>>>>>> GenericNode
> >>>>>>>>>>>>> and
> >>>>>>>>>>>>> use
> >>>>>>>>>>>>> the Reservior to emit the control tuples at the end of the
> >>>>>>>>>>>>> window.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> GenericNode seems to be the best place to buffer incoming
> >>>>>>>>>>>>> custom
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> control
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> tuples without delivering them immediately to the operator
> >>>>>>>>>>>> port.
> >>>>>>>>>>>>
> >>>>>>>>>>> Once
> >>>>>>>>>>> the
> >>>>>>>>>>> end of the window is reached, we plan to use the reservoir sink
> >>>>>>>>>>> to
> >>>>>>>>>>> push
> >>>>>>>>>>> them to the port. This is different behavior than any other
> >>>>>>>>>>> control
> >>>>>>>>>>> tuple
> >>>>>>>>>>> where we are changing the order of tuples in the stream. The
> >>>>>>>>>>> custom
> >>>>>>>>>>> control
> >>>>>>>>>>>
> >>>>>>>>>>> tuples will be buffered and not delivered to the ports until
> the
> >>>>>>>>>>>> end
> >>>>>>>>>>>> of
> >>>>>>>>>>>>
> >>>>>>>>>>>> the
> >>>>>>>>>>>>
> >>>>>>>>>>> window.
> >>>>>>>>>>>
> >>>>>>>>>>> To accomplish this, we need to have a public method in
> >>>>>>>>>>>>
> >>>>>>>>>>>>> SweepableReservoir
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> which allows to put a tuple back in the sink of the
> reservoir.
> >>>>>>>>>>>>
> >>>>>>>>>>> ~ Bhupesh
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >
>

Reply via email to