Bhupesh, Annotation seems like a static way to stop propagation. Give these are programmatically generated I would think the operators should be able to stop (consume without propagating) programmatically as well.
Thanks On Thu, Dec 29, 2016 at 8:48 AM, Bhupesh Chawda <[email protected]> wrote: > Thanks Vlad, I am trying out the approach you mentioned regarding having > another interface which allows sinks to put a control tuple. > > Regarding the delivery of control tuples, here is what I am planning to do: > All the control tuples which are emitted in a particular window are > delivered after all the data tuples have been delivered to the respective > ports, but before the endWindow() call. The operator can then process the > control tuples in that window and can do any finalization in the end window > call. There will be no delivery of control tuples after endWindow() and > before the next beginWindow() call. > > For handling the propagation of control tuples further in the dag, we are > planning to have an annotation on the Output Port of the operator which > would be true by default. > @OutputPortFieldAnnotation(propogateControlTuples = false). > > ~ Bhupesh > > > On Thu, Dec 29, 2016 at 6:24 AM, Vlad Rozov <[email protected]> > wrote: > > > Custom control tuples are control tuples emitted by an operator itself > and > > not by the platform. Prior to the introduction of the custom control > > tuples, only Apex engine itself puts control tuples into various sinks, > so > > the engine created necessary Tuple objects with the corresponding type > > prior to calling Sink.put(). > > > > Not all sinks need to be changed. Only control tuple aware sinks should > > provide such functionality. In the case there is a lot of code > duplication, > > please create an abstract class, that other control aware sinks will > extend > > from. > > > > Thank you, > > > > Vlad > > > > > > On 12/23/16 06:24, Bhupesh Chawda wrote: > > > >> Hi Vlad, > >> > >> Thanks for the pointer on delegating the wrapping of the user tuple to > the > >> control port. I was trying this out today. > >> The problem I see us if we introduce a putControlTuple() method in Sink, > >> then a lot of the existing sinks would change. Also the changes seemed > >> redundant as, the existing control tuples already use the put() method > of > >> sinks. So why do something special for custom control tuples? > >> > >> The only aspect in which the custom control tuples are different is that > >> these will be generated by the user and will actually be delivered to > the > >> ports in a different order. Perhaps we should be able to use the > existing > >> flow. The only problems as outlined before seem to be identification of > >> the > >> user tuple as a control tuple. > >> > >> ~ Bhupesh > >> > >> > >> On Thu, Dec 22, 2016 at 10:44 PM, Vlad Rozov <[email protected]> > >> wrote: > >> > >> Why is it necessary to wrap in the OutputPort? Can't it be delegated to > a > >>> Sink by introducing new putControlTuple method? > >>> > >>> Thank you, > >>> > >>> Vlad > >>> > >>> > >>> On 12/21/16 22:10, Bhupesh Chawda wrote: > >>> > >>> Hi Vlad, > >>>> > >>>> The problem in using the Tuple class as the wrapper is that the Ports > >>>> belong to the API and we want to wrap the payload object of the > control > >>>> tuple into the Tuple class which is not part of the API. > >>>> > >>>> The output port will just get the payload of the user control tuple. > For > >>>> example, if the user emits a Long, as a control tuple, the payload > >>>> object > >>>> will just be a Long object. > >>>> > >>>> It is necessary to bundle this Long into some recognizable object so > >>>> that > >>>> the BufferServerPublisher knows that this is a Control tuple and not a > >>>> regular tuple and serialize it accordingly. It is therefore necessary > >>>> that > >>>> the tuple be part of some known hierarchy so that can be distinguished > >>>> from > >>>> other payload tuples. Let us call this class ControlTupleInterface. > Note > >>>> that this needs to be done before the tuple is inserted into the sink > >>>> which > >>>> is done in the port objects. Once the tuple is inserted into the sink, > >>>> it > >>>> would seem just like any other payload tuple and cannot be > >>>> distinguished. > >>>> > >>>> For this reason, I had something like the following in API: > >>>> > >>>> package com.datatorrent.api; > >>>> public class ControlTupleInterface > >>>> { > >>>> Object payload; // User control tuple payload. A Long() for > example. > >>>> UUID id; // Unique Id to de-duplicate in downstream operators > >>>> } > >>>> > >>>> Regarding your suggestion on using the Tuple class as the wrapper for > >>>> the > >>>> control tuple payload, let me mention the current scenario flow to > make > >>>> the > >>>> discussion easier: > >>>> > >>>> We have a Tuple class in buffer server which is responsible for > >>>> serializing > >>>> the user control tuple bundling together a message type: > >>>> CUSTOM_CONTROL_TUPLE_VALUE. > >>>> > >>>> > >>>> *com.datatorrent.bufferserver.packet.Tuple|-- > >>>> com.datatorrent.bufferserver.packet.CustomControlTuple* > >>>> We have another Tuple class in Stram which helps the > >>>> BufferServerSubscriber > >>>> to de-serialize the serialized tuples. We should have > CustomControlTuple > >>>> in > >>>> stram as follows: > >>>> > >>>> > >>>> *com.datatorrent.stram.tuple.Tuple|-- > >>>> com.datatorrent.stram.tuple.CustomControlTuple*This will have a field > >>>> for > >>>> > >>>> user control payload. > >>>> > >>>> I think we should not expose the Tuple class in stram to the API. That > >>>> was > >>>> the main reason I introduced another class/interface > >>>> ControlTupleInterface > >>>> as described above. > >>>> > >>>> Regarding, adding methods to DefaultInputPort and DefaultOutputPort, I > >>>> think error detection would not be early enough if the control tuple > is > >>>> sent very late in the processing :-) > >>>> Extending the ports to ControlTupleAware* should help in this case. > >>>> However, we still need to see if there are any downsides on doing > this. > >>>> > >>>> Thanks. > >>>> > >>>> ~ Bhupesh > >>>> > >>>> > >>>> > >>>> > >>>> On Thu, Dec 22, 2016 at 7:26 AM, Vlad Rozov <[email protected]> > >>>> wrote: > >>>> > >>>> Hi Bhupesh, > >>>> > >>>>> it should not be a CustomWrapper. The wrapper object should be > >>>>> CustomControlTuple that extends Tuple. There is already code that > >>>>> checks > >>>>> for Tuple instance. The "unWrap" name is misleading, IMO. It should > be > >>>>> something like customControlTuple.getPayload() or > >>>>> customControlTuple.getAttachment(). In the emitControl(), create new > >>>>> CustomControlTuple using provided payload as one of arguments. It may > >>>>> still > >>>>> be good to use common parent other than Object for control tuple > >>>>> payload > >>>>> class hierarchy. > >>>>> > >>>>> I don't understand how adding more methods to the Default > >>>>> implementation > >>>>> will help with early error detection unless application or operator > >>>>> that > >>>>> relies on the custom control tuple functionality explicitly checks > for > >>>>> the > >>>>> platform version at run-time or tries to emit a control tuple just to > >>>>> check > >>>>> that such functionality is supported by the platform. > >>>>> > >>>>> Thank you, > >>>>> > >>>>> Vlad > >>>>> > >>>>> On 12/21/16 04:58, Bhupesh Chawda wrote: > >>>>> > >>>>> Hi Vlad. > >>>>> > >>>>>> Yes, the API should not change. We can take an Object instead, and > >>>>>> later > >>>>>> wrap it into the required class. > >>>>>> > >>>>>> Our InputPort.put and emitControl method would look something like > the > >>>>>> following where we handle the wrapping and unwrapping internally. > >>>>>> > >>>>>> public void put(T tuple) > >>>>>> { > >>>>>> if (tuple instanceof CustomWrapper) { > >>>>>> processControl(tuple.unWrap()); > >>>>>> } else { > >>>>>> process(tuple) > >>>>>> } > >>>>>> } > >>>>>> > >>>>>> emitControl(Object tuple) > >>>>>> { > >>>>>> sink.put(CustomWrapper.wrap(tuple)); > >>>>>> } > >>>>>> > >>>>>> Regarding the compatibility issue, I think we have two ways of doing > >>>>>> it: > >>>>>> > >>>>>> 1. Extend DefaultInputPort and DefaultOutputPort and create > >>>>>> ControlAwareInput and ControlAwareOutput out of it. This might > >>>>>> require us > >>>>>> to additionally handle specific cases when non-compatible > ports > >>>>>> (ControlAwareOutput and DefaultInput, for example) are > >>>>>> connected to > >>>>>> each > >>>>>> other in user apps. > >>>>>> 2. Add the additional methods in the existing Default > >>>>>> implementations. > >>>>>> > >>>>>> > >>>>>> IMO, both of these would help in early error detection. > >>>>>> > >>>>>> ~ Bhupesh > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> On Wed, Dec 21, 2016 at 1:36 AM, Vlad Rozov < > [email protected]> > >>>>>> wrote: > >>>>>> > >>>>>> A wrapper class is required for the control tuples delivery, but > >>>>>> > >>>>>> Port/Operator API should use Control Tuple payload object only. > >>>>>>> Implementation of the wrapper class may change from version to > >>>>>>> version, > >>>>>>> but > >>>>>>> API should not be affected by the change. > >>>>>>> > >>>>>>> I guess, assumption is that default input and output port will be > >>>>>>> extended > >>>>>>> to provide support for the control tuples. This may cause some > >>>>>>> backward > >>>>>>> compatibility issues. Consider scenario when a newer version of > >>>>>>> Malhar > >>>>>>> that > >>>>>>> relies on EOF control tuple is deployed into older version of core > >>>>>>> that > >>>>>>> does not support control tuples. In such scenario, error will be > >>>>>>> raised > >>>>>>> only when an operator tries to emit EOF control tuple at the end > of a > >>>>>>> job. > >>>>>>> Introducing control tuple aware ports solve the early error > >>>>>>> detection. > >>>>>>> It > >>>>>>> will require some operators to be modified to use control tuple > aware > >>>>>>> ports, but such change may help to distinguish control tuple aware > >>>>>>> operators from their old versions. > >>>>>>> > >>>>>>> Vlad > >>>>>>> > >>>>>>> On 12/20/16 04:09, Bhupesh Chawda wrote: > >>>>>>> > >>>>>>> I investigated this and seems like it is better to have a wrapper > >>>>>>> class > >>>>>>> > >>>>>>> for > >>>>>>>> the user object. > >>>>>>>> This would serve 2 purposes: > >>>>>>>> > >>>>>>>> 1. Allow us to distinguish a custom control tuple from > other > >>>>>>>> payload > >>>>>>>> tuples. > >>>>>>>> 2. For the same control tuple received from different > >>>>>>>> upstream > >>>>>>>> > >>>>>>>> partitions, we would have some mechanism to distinguish > >>>>>>>> between > >>>>>>>> the > >>>>>>>> two in > >>>>>>>> order to identify duplicates. > >>>>>>>> > >>>>>>>> Additionally, the wrapper class needs to be part of the API as > >>>>>>>> DefaultOutputPort needs to know about it, before putting it into > the > >>>>>>>> sink. > >>>>>>>> We can make sure that the user is not able to extend or modify > this > >>>>>>>> class > >>>>>>>> in any manner. > >>>>>>>> > >>>>>>>> ~ Bhupesh > >>>>>>>> > >>>>>>>> On Mon, Dec 19, 2016 at 12:18 PM, David Yan <[email protected]> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>> This C type parameter is going to fix the control tuple type at > >>>>>>>> compile > >>>>>>>> > >>>>>>>> time and this is actually not what we want. Note that the operator > >>>>>>>> may > >>>>>>>> > >>>>>>>>> receive or emit multiple different control tuple types. > >>>>>>>>> > >>>>>>>>> David > >>>>>>>>> > >>>>>>>>> On Dec 17, 2016 3:33 AM, "Tushar Gosavi" <[email protected] > > > >>>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>> We do not need to create an interface for data emitted through > >>>>>>>>> emitControl or processed through processControl. Internally we > >>>>>>>>> could > >>>>>>>>> wrap the user object in ControlTuple. you can add type parameter > >>>>>>>>> for > >>>>>>>>> control tuple object on ports. > >>>>>>>>> > >>>>>>>>> DefaultInputPort<D,C> > >>>>>>>>> D is the data type and C is the control tuple type for better > error > >>>>>>>>> catching at compile phase. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> - Tushar. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Sat, Dec 17, 2016 at 8:35 AM, Bhupesh Chawda < > >>>>>>>>> [email protected] > >>>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>> Agreed Vlad and David. > >>>>>>>>> > >>>>>>>>> I am just suggesting there should be a wrapper for the user > object. > >>>>>>>>>> It > >>>>>>>>>> > >>>>>>>>>> can > >>>>>>>>>> > >>>>>>>>>> be a marker interface and we can call it something else like > >>>>>>>>> > >>>>>>>>> "CustomControl". > >>>>>>>>>> > >>>>>>>>>> The user object will be wrapped in another class "ControlTuple" > >>>>>>>>>> which > >>>>>>>>>> traverses the BufferServer and will perhaps be extended from the > >>>>>>>>>> packet/Tuple class. This class will not be exposed to the user. > >>>>>>>>>> > >>>>>>>>>> ~ Bhupesh > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On Sat, Dec 17, 2016 at 4:11 AM, Vlad Rozov < > >>>>>>>>>> [email protected]> > >>>>>>>>>> > >>>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>> I agree with David. Payload of the control tuple is in the > >>>>>>>>> userObject > >>>>>>>>> > >>>>>>>>> and > >>>>>>>>>> operators/ports don't need to be exposed to the implementation > of > >>>>>>>>>> the > >>>>>>>>>> > >>>>>>>>>> ControlTuple class. With the proposed interface operators > >>>>>>>>>> developers > >>>>>>>>>> > >>>>>>>>>>> are > >>>>>>>>>>> free to extend ControlTuple further and I don't think that such > >>>>>>>>>>> > >>>>>>>>>>> capability > >>>>>>>>>>> > >>>>>>>>>>> needs to be provided. The wrapping in the ControlTuple class is > >>>>>>>>>> necessary > >>>>>>>>>> and most likely ControlTuple needs to be extended from the > buffer > >>>>>>>>>> server > >>>>>>>>>> > >>>>>>>>>> Tuple. It may be good to have a common parent other than Object > >>>>>>>>>> for > >>>>>>>>>> > >>>>>>>>>>> all > >>>>>>>>>>> user payloads, but it may be a marker interface as well. > >>>>>>>>>>> > >>>>>>>>>>> Thank you, > >>>>>>>>>>> > >>>>>>>>>>> Vlad > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On 12/16/16 09:59, Bhupesh Chawda wrote: > >>>>>>>>>>> > >>>>>>>>>>> Hi David, > >>>>>>>>>>> > >>>>>>>>>>> Actually, I was thinking of another API class called > >>>>>>>>>>> ControlTuple, > >>>>>>>>>>> > >>>>>>>>>>>> different from the actual tuple class in buffer server or > stram. > >>>>>>>>>>>> This could serve as a way for the Buffer server publisher to > >>>>>>>>>>>> understand > >>>>>>>>>>>> that it is a control tuple and needs to be wrapped > differently. > >>>>>>>>>>>> > >>>>>>>>>>>> ~ Bhupesh > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> On Dec 16, 2016 22:28, "David Yan" <[email protected]> > wrote: > >>>>>>>>>>>> > >>>>>>>>>>>> // DefaultInputPort > >>>>>>>>>>>> public void processControl(ControlTuple tuple) > >>>>>>>>>>>> { > >>>>>>>>>>>> // Default Implementation to avoid need to implement > >>>>>>>>>>>> it in > >>>>>>>>>>>> all > >>>>>>>>>>>> implementations > >>>>>>>>>>>> } > >>>>>>>>>>>> {code} > >>>>>>>>>>>> > >>>>>>>>>>>> {code} > >>>>>>>>>>>> // DefaultOutputPort > >>>>>>>>>>>> public void emitControl(ControlTuple tuple) > >>>>>>>>>>>> { > >>>>>>>>>>>> } > >>>>>>>>>>>> > >>>>>>>>>>>> I think we don't need to expose the ControlTuple class to the > >>>>>>>>>>>> operator > >>>>>>>>>>>> developers because the window ID is just the current window ID > >>>>>>>>>>>> when > >>>>>>>>>>>> > >>>>>>>>>>>> these > >>>>>>>>>>>> > >>>>>>>>>>>> methods are called. How about making them just Object? We also > >>>>>>>>>>> > >>>>>>>>>> need to > >>>>>>>>>> > >>>>>>>>>> provide the way for the user to specify custom serializer for > the > >>>>>>>>>> > >>>>>>>>>>> control > >>>>>>>>>>>> > >>>>>>>>>>>> tuple. > >>>>>>>>>>> > >>>>>>>>>> David > >>>>>>>>>> > >>>>>>>>>>> On Thu, Dec 15, 2016 at 12:43 AM, Bhupesh Chawda < > >>>>>>>>>>>> > >>>>>>>>>>>> [email protected] > >>>>>>>>>>>> > >>>>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>> Hi All, > >>>>>>>>>> > >>>>>>>>>>> Here are the initial interfaces: > >>>>>>>>>>>> > >>>>>>>>>>>> {code} > >>>>>>>>>>>>> // DefaultInputPort > >>>>>>>>>>>>> public void processControl(ControlTuple tuple) > >>>>>>>>>>>>> { > >>>>>>>>>>>>> // Default Implementation to avoid need to implement > >>>>>>>>>>>>> it > >>>>>>>>>>>>> in > >>>>>>>>>>>>> all > >>>>>>>>>>>>> implementations > >>>>>>>>>>>>> } > >>>>>>>>>>>>> {code} > >>>>>>>>>>>>> > >>>>>>>>>>>>> {code} > >>>>>>>>>>>>> // DefaultOutputPort > >>>>>>>>>>>>> public void emitControl(ControlTuple tuple) > >>>>>>>>>>>>> { > >>>>>>>>>>>>> } > >>>>>>>>>>>>> {code} > >>>>>>>>>>>>> > >>>>>>>>>>>>> We have an option to add these methods to the interfaces - > >>>>>>>>>>>>> InputPort > >>>>>>>>>>>>> > >>>>>>>>>>>>> and > >>>>>>>>>>>>> > >>>>>>>>>>>>> OutputPort; But these would not be backward compatible and > also > >>>>>>>>>>>> > >>>>>>>>>>> not > >>>>>>>>>>> consistent with the current implementation of basic data tuple > >>>>>>>>>>> flow > >>>>>>>>>>> > >>>>>>>>>>> (as > >>>>>>>>>>>> with process() and emit()). > >>>>>>>>>>>> > >>>>>>>>>>> We also need to expose an interface / class for users to wrap > >>>>>>>>>>> their > >>>>>>>>>>> > >>>>>>>>>>> object > >>>>>>>>>>>> > >>>>>>>>>>>>> and emit downstream. This should be part of API. > >>>>>>>>>>>>> > >>>>>>>>>>>>> {code} > >>>>>>>>>>>>> public class ControlTuple extends Tuple > >>>>>>>>>>>>> { > >>>>>>>>>>>>> Object userObject; > >>>>>>>>>>>>> > >>>>>>>>>>>>> public ControlTuple(long windowId, Object userObject) > >>>>>>>>>>>>> { > >>>>>>>>>>>>> // > >>>>>>>>>>>>> } > >>>>>>>>>>>>> } > >>>>>>>>>>>>> {code} > >>>>>>>>>>>>> > >>>>>>>>>>>>> The emitted tuples would traverse the same flow as with other > >>>>>>>>>>>>> control > >>>>>>>>>>>>> tuples. The plan is to intercept the control tuples in > >>>>>>>>>>>>> GenericNode > >>>>>>>>>>>>> and > >>>>>>>>>>>>> use > >>>>>>>>>>>>> the Reservior to emit the control tuples at the end of the > >>>>>>>>>>>>> window. > >>>>>>>>>>>>> > >>>>>>>>>>>>> GenericNode seems to be the best place to buffer incoming > >>>>>>>>>>>>> custom > >>>>>>>>>>>>> > >>>>>>>>>>>>> control > >>>>>>>>>>>>> > >>>>>>>>>>>>> tuples without delivering them immediately to the operator > >>>>>>>>>>>> port. > >>>>>>>>>>>> > >>>>>>>>>>> Once > >>>>>>>>>>> the > >>>>>>>>>>> end of the window is reached, we plan to use the reservoir sink > >>>>>>>>>>> to > >>>>>>>>>>> push > >>>>>>>>>>> them to the port. This is different behavior than any other > >>>>>>>>>>> control > >>>>>>>>>>> tuple > >>>>>>>>>>> where we are changing the order of tuples in the stream. The > >>>>>>>>>>> custom > >>>>>>>>>>> control > >>>>>>>>>>> > >>>>>>>>>>> tuples will be buffered and not delivered to the ports until > the > >>>>>>>>>>>> end > >>>>>>>>>>>> of > >>>>>>>>>>>> > >>>>>>>>>>>> the > >>>>>>>>>>>> > >>>>>>>>>>> window. > >>>>>>>>>>> > >>>>>>>>>>> To accomplish this, we need to have a public method in > >>>>>>>>>>>> > >>>>>>>>>>>>> SweepableReservoir > >>>>>>>>>>>>> > >>>>>>>>>>>>> which allows to put a tuple back in the sink of the > reservoir. > >>>>>>>>>>>> > >>>>>>>>>>> ~ Bhupesh > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > > >
