Hi Bhupesh,

We could have a marker interface with just one method getType() in
apex-api, our CustomControlTuple and stram Tuple will implement this
new interface.

interface ControlTuple {
  MessageType getType();
}

Add CUSTOM_TUPLE in MessageType enum and bring it in apex-api.

Our CustomControlTuple will implement ControlTuple with getType
returning CUSTOM_TUPLE, and Tuple in stram will implement ControlTuple
in api with getType return type.

Regards,
- Tushar.

On Thu, Dec 22, 2016 at 12:12 PM, Bhupesh Chawda
<[email protected]> wrote:
> There is another issue with the current approach which fails in case tuples
> skip serialization. To address this, we have the following options:
>
> One of the options, is that we have a common class in API which is the
> parent for both Tuple class in Stram and also the parent of the custom
> control tuple payload. This can help us treat the control tuple payload as
> a Tuple.
> However, this might require us to move some things like MessageType into
> api as well. And would also need a lot of refactorings in the existing
> Reservoir code.
>
> Another option is to specially handle the case of CustomControlTuple in all
> the Reservoir sweep() methods where we check for not only a Tuple class,
> but also for the user control tuple payload class and return it once we
> encounter it, thus identifying the user custom payload as a control tuple.
>
> Thoughts?
>
> ~ Bhupesh
>
>
>
>
>
> On Thu, Dec 22, 2016 at 11:40 AM, Bhupesh Chawda <[email protected]>
> wrote:
>
>> Hi Vlad,
>>
>> The problem in using the Tuple class as the wrapper is that the Ports
>> belong to the API and we want to wrap the payload object of the control
>> tuple into the Tuple class which is not part of the API.
>>
>> The output port will just get the payload of the user control tuple. For
>> example, if the user emits a Long, as a control tuple, the payload object
>> will just be a Long object.
>>
>> It is necessary to bundle this Long into some recognizable object so that
>> the BufferServerPublisher knows that this is a Control tuple and not a
>> regular tuple and serialize it accordingly. It is therefore necessary that
>> the tuple be part of some known hierarchy so that can be distinguished from
>> other payload tuples. Let us call this class ControlTupleInterface. Note
>> that this needs to be done before the tuple is inserted into the sink which
>> is done in the port objects. Once the tuple is inserted into the sink, it
>> would seem just like any other payload tuple and cannot be distinguished.
>>
>> For this reason, I had something like the following in API:
>>
>> package com.datatorrent.api;
>> public class ControlTupleInterface
>> {
>>   Object payload; // User control tuple payload. A Long() for example.
>>   UUID id;  // Unique Id to de-duplicate in downstream operators
>> }
>>
>> Regarding your suggestion on using the Tuple class as the wrapper for the
>> control tuple payload, let me mention the current scenario flow to make the
>> discussion easier:
>>
>> We have a Tuple class in buffer server which is responsible for
>> serializing the user control tuple bundling together a message type:
>> CUSTOM_CONTROL_TUPLE_VALUE.
>>
>>
>> *com.datatorrent.bufferserver.packet.Tuple|--
>> com.datatorrent.bufferserver.packet.CustomControlTuple*
>> We have another Tuple class in Stram which helps the
>> BufferServerSubscriber to de-serialize the serialized tuples. We should
>> have CustomControlTuple in stram as follows:
>>
>>
>> *com.datatorrent.stram.tuple.Tuple|--
>> com.datatorrent.stram.tuple.CustomControlTuple*This will have a field for
>> user control payload.
>>
>> I think we should not expose the Tuple class in stram to the API. That was
>> the main reason I introduced another class/interface ControlTupleInterface
>> as described above.
>>
>> Regarding, adding methods to DefaultInputPort and DefaultOutputPort, I
>> think error detection would not be early enough if the control tuple is
>> sent very late in the processing :-)
>> Extending the ports to ControlTupleAware* should help in this case.
>> However, we still need to see if there are any downsides on doing this.
>>
>> Thanks.
>>
>> ~ Bhupesh
>>
>>
>>
>>
>> On Thu, Dec 22, 2016 at 7:26 AM, Vlad Rozov <[email protected]>
>> wrote:
>>
>>> Hi Bhupesh,
>>>
>>> it should not be a CustomWrapper.  The wrapper object should be
>>> CustomControlTuple that extends Tuple. There is already code that checks
>>> for Tuple instance. The "unWrap" name is misleading, IMO. It should be
>>> something like customControlTuple.getPayload() or
>>> customControlTuple.getAttachment(). In the emitControl(), create new
>>> CustomControlTuple using provided payload as one of arguments. It may still
>>> be good to use common parent other than Object for control tuple payload
>>> class hierarchy.
>>>
>>> I don't understand how adding more methods to the Default implementation
>>> will help with early error detection unless application or operator that
>>> relies on the custom control tuple functionality explicitly checks for the
>>> platform version at run-time or tries to emit a control tuple just to check
>>> that such functionality is supported by the platform.
>>>
>>> Thank you,
>>>
>>> Vlad
>>>
>>> On 12/21/16 04:58, Bhupesh Chawda wrote:
>>>
>>>> Hi Vlad.
>>>>
>>>> Yes, the API should not change. We can take an Object instead, and later
>>>> wrap it into the required class.
>>>>
>>>> Our InputPort.put and emitControl method would look something like the
>>>> following where we handle the wrapping and unwrapping internally.
>>>>
>>>> public void put(T tuple)
>>>> {
>>>>    if (tuple instanceof CustomWrapper) {
>>>>      processControl(tuple.unWrap());
>>>>    }  else {
>>>>      process(tuple)
>>>>    }
>>>> }
>>>>
>>>> emitControl(Object tuple)
>>>> {
>>>>    sink.put(CustomWrapper.wrap(tuple));
>>>> }
>>>>
>>>> Regarding the compatibility issue, I think we have two ways of doing it:
>>>>
>>>>     1. Extend DefaultInputPort and DefaultOutputPort and create
>>>>     ControlAwareInput and ControlAwareOutput out of it. This might
>>>> require us
>>>>     to additionally handle specific cases when non-compatible ports
>>>>     (ControlAwareOutput and DefaultInput, for example) are connected to
>>>> each
>>>>     other in user apps.
>>>>     2. Add the additional methods in the existing Default
>>>> implementations.
>>>>
>>>>
>>>> IMO, both of these would help in early error detection.
>>>>
>>>> ~ Bhupesh
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, Dec 21, 2016 at 1:36 AM, Vlad Rozov <[email protected]>
>>>> wrote:
>>>>
>>>> A wrapper class is required for the control tuples delivery, but
>>>>> Port/Operator API should use Control Tuple payload object only.
>>>>> Implementation of the wrapper class may change from version to version,
>>>>> but
>>>>> API should not be affected by the change.
>>>>>
>>>>> I guess, assumption is that default input and output port will be
>>>>> extended
>>>>> to provide support for the control tuples. This may cause some backward
>>>>> compatibility issues. Consider scenario when a newer version of Malhar
>>>>> that
>>>>> relies on EOF control tuple is deployed into older version of core that
>>>>> does not support control tuples. In such scenario, error will be raised
>>>>> only when an operator tries to emit EOF control tuple at the end of a
>>>>> job.
>>>>> Introducing control tuple aware ports solve the early error detection.
>>>>> It
>>>>> will require some operators to be modified to use control tuple aware
>>>>> ports, but such change may help to distinguish control tuple aware
>>>>> operators from their old versions.
>>>>>
>>>>> Vlad
>>>>>
>>>>> On 12/20/16 04:09, Bhupesh Chawda wrote:
>>>>>
>>>>> I investigated this and seems like it is better to have a wrapper class
>>>>>> for
>>>>>> the user object.
>>>>>> This would serve 2 purposes:
>>>>>>
>>>>>>      1. Allow us to distinguish a custom control tuple from other
>>>>>> payload
>>>>>>      tuples.
>>>>>>      2. For the same control tuple received from different upstream
>>>>>>
>>>>>>      partitions, we would have some mechanism to distinguish between
>>>>>> the
>>>>>> two in
>>>>>>      order to identify duplicates.
>>>>>>
>>>>>> Additionally, the wrapper class needs to be part of the API as
>>>>>> DefaultOutputPort needs to know about it, before putting it into the
>>>>>> sink.
>>>>>> We can make sure that the user is not able to extend or modify this
>>>>>> class
>>>>>> in any manner.
>>>>>>
>>>>>> ~ Bhupesh
>>>>>>
>>>>>> On Mon, Dec 19, 2016 at 12:18 PM, David Yan <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>> This C type parameter is going to fix the control tuple type at compile
>>>>>>
>>>>>>> time and this is actually not what we want. Note that the operator may
>>>>>>> receive or emit multiple different control tuple types.
>>>>>>>
>>>>>>> David
>>>>>>>
>>>>>>> On Dec 17, 2016 3:33 AM, "Tushar Gosavi" <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>> We do not need to create an interface for data emitted through
>>>>>>> emitControl or processed through processControl. Internally we could
>>>>>>> wrap the user object in ControlTuple. you can add type parameter for
>>>>>>> control tuple object on ports.
>>>>>>>
>>>>>>> DefaultInputPort<D,C>
>>>>>>> D is the data type and C is the control tuple type for better error
>>>>>>> catching at compile phase.
>>>>>>>
>>>>>>>
>>>>>>> - Tushar.
>>>>>>>
>>>>>>>
>>>>>>> On Sat, Dec 17, 2016 at 8:35 AM, Bhupesh Chawda <
>>>>>>> [email protected]
>>>>>>> wrote:
>>>>>>>
>>>>>>> Agreed Vlad and David.
>>>>>>>> I am just suggesting there should be a wrapper for the user object.
>>>>>>>> It
>>>>>>>>
>>>>>>>> can
>>>>>>>
>>>>>>> be a marker interface and we can call it something else like
>>>>>>>> "CustomControl".
>>>>>>>>
>>>>>>>> The user object will be wrapped in another class "ControlTuple" which
>>>>>>>> traverses the BufferServer and will perhaps be extended from the
>>>>>>>> packet/Tuple class. This class will not be exposed to the user.
>>>>>>>>
>>>>>>>> ~ Bhupesh
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sat, Dec 17, 2016 at 4:11 AM, Vlad Rozov <[email protected]
>>>>>>>> >
>>>>>>>>
>>>>>>>> wrote:
>>>>>>>
>>>>>>> I agree with David. Payload of the control tuple is in the userObject
>>>>>>>> and
>>>>>>>> operators/ports don't need to be exposed to the implementation of the
>>>>>>>>
>>>>>>>>> ControlTuple class. With the proposed interface operators developers
>>>>>>>>> are
>>>>>>>>> free to extend ControlTuple further and I don't think that such
>>>>>>>>>
>>>>>>>>> capability
>>>>>>>> needs to be provided. The wrapping in the ControlTuple class is
>>>>>>>> necessary
>>>>>>>> and most likely ControlTuple needs to be extended from the buffer
>>>>>>>> server
>>>>>>>>
>>>>>>>>> Tuple. It may be good to have a common parent other than Object for
>>>>>>>>> all
>>>>>>>>> user payloads, but it may be a marker interface as well.
>>>>>>>>>
>>>>>>>>> Thank you,
>>>>>>>>>
>>>>>>>>> Vlad
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 12/16/16 09:59, Bhupesh Chawda wrote:
>>>>>>>>>
>>>>>>>>> Hi David,
>>>>>>>>>
>>>>>>>>>> Actually, I was thinking of another API class called ControlTuple,
>>>>>>>>>> different from the actual tuple class in buffer server or stram.
>>>>>>>>>> This could serve as a way for the Buffer server publisher to
>>>>>>>>>> understand
>>>>>>>>>> that it is a control tuple and needs to be wrapped differently.
>>>>>>>>>>
>>>>>>>>>> ~ Bhupesh
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Dec 16, 2016 22:28, "David Yan" <[email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>     // DefaultInputPort
>>>>>>>>>>      public void processControl(ControlTuple tuple)
>>>>>>>>>>      {
>>>>>>>>>>        // Default Implementation to avoid need to implement it in
>>>>>>>>>> all
>>>>>>>>>> implementations
>>>>>>>>>>      }
>>>>>>>>>> {code}
>>>>>>>>>>
>>>>>>>>>> {code}
>>>>>>>>>>     // DefaultOutputPort
>>>>>>>>>>      public void emitControl(ControlTuple tuple)
>>>>>>>>>>      {
>>>>>>>>>>      }
>>>>>>>>>>
>>>>>>>>>> I think we don't need to expose the ControlTuple class to the
>>>>>>>>>> operator
>>>>>>>>>> developers because the window ID is just the current window ID when
>>>>>>>>>>
>>>>>>>>>> these
>>>>>>>>>
>>>>>>>> methods are called. How about making them just Object? We also need
>>>>>>>> to
>>>>>>>>
>>>>>>>>> provide the way for the user to specify custom serializer for the
>>>>>>>>>>
>>>>>>>>>> control
>>>>>>>>>
>>>>>>>> tuple.
>>>>>>>>
>>>>>>>>> David
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Dec 15, 2016 at 12:43 AM, Bhupesh Chawda <
>>>>>>>>>>
>>>>>>>>>> [email protected]
>>>>>>>>>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi All,
>>>>>>>>>>
>>>>>>>>>> Here are the initial interfaces:
>>>>>>>>>>>
>>>>>>>>>>> {code}
>>>>>>>>>>>     // DefaultInputPort
>>>>>>>>>>>      public void processControl(ControlTuple tuple)
>>>>>>>>>>>      {
>>>>>>>>>>>        // Default Implementation to avoid need to implement it in
>>>>>>>>>>> all
>>>>>>>>>>> implementations
>>>>>>>>>>>      }
>>>>>>>>>>> {code}
>>>>>>>>>>>
>>>>>>>>>>> {code}
>>>>>>>>>>>     // DefaultOutputPort
>>>>>>>>>>>      public void emitControl(ControlTuple tuple)
>>>>>>>>>>>      {
>>>>>>>>>>>      }
>>>>>>>>>>> {code}
>>>>>>>>>>>
>>>>>>>>>>> We have an option to add these methods to the interfaces -
>>>>>>>>>>> InputPort
>>>>>>>>>>>
>>>>>>>>>>> and
>>>>>>>>>>
>>>>>>>>> OutputPort; But these would not be backward compatible and also not
>>>>>>>>
>>>>>>>>> consistent with the current implementation of basic data tuple flow
>>>>>>>>>>>
>>>>>>>>>>> (as
>>>>>>>>>>
>>>>>>>>> with process() and emit()).
>>>>>>>>
>>>>>>>>> We also need to expose an interface / class for users to wrap their
>>>>>>>>>>> object
>>>>>>>>>>> and emit downstream. This should be part of API.
>>>>>>>>>>>
>>>>>>>>>>> {code}
>>>>>>>>>>> public class ControlTuple extends Tuple
>>>>>>>>>>> {
>>>>>>>>>>>      Object userObject;
>>>>>>>>>>>
>>>>>>>>>>>      public ControlTuple(long windowId, Object userObject)
>>>>>>>>>>>      {
>>>>>>>>>>>        //
>>>>>>>>>>>      }
>>>>>>>>>>> }
>>>>>>>>>>> {code}
>>>>>>>>>>>
>>>>>>>>>>> The emitted tuples would traverse the same flow as with other
>>>>>>>>>>> control
>>>>>>>>>>> tuples. The plan is to intercept the control tuples in GenericNode
>>>>>>>>>>> and
>>>>>>>>>>> use
>>>>>>>>>>> the Reservior to emit the control tuples at the end of the window.
>>>>>>>>>>>
>>>>>>>>>>> GenericNode seems to be the best place to buffer incoming custom
>>>>>>>>>>>
>>>>>>>>>>> control
>>>>>>>>>>
>>>>>>>>> tuples without delivering them immediately to the operator port.
>>>>>>>> Once
>>>>>>>>
>>>>>>>>> the
>>>>>>>>>>
>>>>>>>>> end of the window is reached, we plan to use the reservoir sink to
>>>>>>>>
>>>>>>>>> push
>>>>>>>>>>
>>>>>>>>> them to the port. This is different behavior than any other control
>>>>>>>>
>>>>>>>>> tuple
>>>>>>>>>>
>>>>>>>>> where we are changing the order of tuples in the stream. The custom
>>>>>>>>
>>>>>>>>> control
>>>>>>>>>>>
>>>>>>>>>> tuples will be buffered and not delivered to the ports until the
>>>>>>>>>> end
>>>>>>>>>> of
>>>>>>>>>>
>>>>>>>>> the
>>>>>>>>
>>>>>>>>> window.
>>>>>>>>>>
>>>>>>>>>>> To accomplish this, we need to have a public method in
>>>>>>>>>>>
>>>>>>>>>>> SweepableReservoir
>>>>>>>>>>
>>>>>>>>> which allows to put a tuple back in the sink of the reservoir.
>>>>>>>>
>>>>>>>>> ~ Bhupesh
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>
>>

Reply via email to