Wouldn't it be more intuitive to control this with an attribute on the
input port?


On Tue, Jan 3, 2017 at 11:06 PM, Bhupesh Chawda <bhup...@datatorrent.com>
wrote:

> Hi Pramod,
>
> I was thinking of a method setPropagateControlTuples(boolean propagate) on
> the output port of the operator.
> The operator could disable this in the code at any point of time.
> Note however that this is to block the propagation of control tuples from
> upstream. Any control tuples emitted explicitly by the operator would still
> be emitted and sent to the downstream operators.
>
> Please see
> https://github.com/apache/apex-core/pull/440/files#diff-
> 8aa0ca1a3e645fa60e9b376c118c00a3R68
> in the PR.
>
> ~ Bhupesh
>
> On Wed, Jan 4, 2017 at 6:53 AM, Pramod Immaneni <pra...@datatorrent.com>
> wrote:
>
> > 2 sounds good. Have you thought about what the method would look like.
> >
> > On Sat, Dec 31, 2016 at 8:29 PM, Bhupesh Chawda <bhup...@datatorrent.com
> >
> > wrote:
> >
> > > Yes, that makes sense.
> > > We have following options:
> > > 1. Make the annotation false by default and force the user to forward
> the
> > > control tuples explicitly.
> > > 2. Annotation is true by default and static way of blocking stays as it
> > is.
> > > We provide another way for blocking programmatically, perhaps by means
> of
> > > another method call on the port.
> > >
> > > ~ Bhupesh
> > >
> > > On Dec 30, 2016 00:09, "Pramod Immaneni" <pra...@datatorrent.com>
> wrote:
> > >
> > > > Bhupesh,
> > > >
> > > > Annotation seems like a static way to stop propagation. Give these
> are
> > > > programmatically generated I would think the operators should be able
> > to
> > > > stop (consume without propagating) programmatically as well.
> > > >
> > > > Thanks
> > > >
> > > > On Thu, Dec 29, 2016 at 8:48 AM, Bhupesh Chawda <
> > bhup...@datatorrent.com
> > > >
> > > > wrote:
> > > >
> > > > > Thanks Vlad, I am trying out the approach you mentioned regarding
> > > having
> > > > > another interface which allows sinks to put a control tuple.
> > > > >
> > > > > Regarding the delivery of control tuples, here is what I am
> planning
> > to
> > > > do:
> > > > > All the control tuples which are emitted in a particular window are
> > > > > delivered after all the data tuples have been delivered to the
> > > respective
> > > > > ports, but before the endWindow() call. The operator can then
> process
> > > the
> > > > > control tuples in that window and can do any finalization in the
> end
> > > > window
> > > > > call. There will be no delivery of control tuples after endWindow()
> > and
> > > > > before the next beginWindow() call.
> > > > >
> > > > > For handling the propagation of control tuples further in the dag,
> we
> > > are
> > > > > planning to have an annotation on the Output Port of the operator
> > which
> > > > > would be true by default.
> > > > > @OutputPortFieldAnnotation(propogateControlTuples = false).
> > > > >
> > > > > ~ Bhupesh
> > > > >
> > > > >
> > > > > On Thu, Dec 29, 2016 at 6:24 AM, Vlad Rozov <
> v.ro...@datatorrent.com
> > >
> > > > > wrote:
> > > > >
> > > > > > Custom control tuples are control tuples emitted by an operator
> > > itself
> > > > > and
> > > > > > not by the platform. Prior to the introduction of the custom
> > control
> > > > > > tuples, only Apex engine itself puts control tuples into various
> > > sinks,
> > > > > so
> > > > > > the engine created necessary Tuple objects with the corresponding
> > > type
> > > > > > prior to calling Sink.put().
> > > > > >
> > > > > > Not all sinks need to be changed. Only control tuple aware sinks
> > > should
> > > > > > provide such functionality. In the case there is a lot of code
> > > > > duplication,
> > > > > > please create an abstract class, that other control aware sinks
> > will
> > > > > extend
> > > > > > from.
> > > > > >
> > > > > > Thank you,
> > > > > >
> > > > > > Vlad
> > > > > >
> > > > > >
> > > > > > On 12/23/16 06:24, Bhupesh Chawda wrote:
> > > > > >
> > > > > >> Hi Vlad,
> > > > > >>
> > > > > >> Thanks for the pointer on delegating the wrapping of the user
> > tuple
> > > to
> > > > > the
> > > > > >> control port. I was trying this out today.
> > > > > >> The problem I see us if we introduce a putControlTuple() method
> in
> > > > Sink,
> > > > > >> then a lot of the existing sinks would change. Also the changes
> > > seemed
> > > > > >> redundant as, the existing control tuples already use the put()
> > > method
> > > > > of
> > > > > >> sinks. So why do something special for custom control tuples?
> > > > > >>
> > > > > >> The only aspect in which the custom control tuples are different
> > is
> > > > that
> > > > > >> these will be generated by the user and will actually be
> delivered
> > > to
> > > > > the
> > > > > >> ports in a different order. Perhaps we should be able to use the
> > > > > existing
> > > > > >> flow. The only problems as outlined before seem to be
> > identification
> > > > of
> > > > > >> the
> > > > > >> user tuple as a control tuple.
> > > > > >>
> > > > > >> ~ Bhupesh
> > > > > >>
> > > > > >>
> > > > > >> On Thu, Dec 22, 2016 at 10:44 PM, Vlad Rozov <
> > > v.ro...@datatorrent.com
> > > > >
> > > > > >> wrote:
> > > > > >>
> > > > > >> Why is it necessary to wrap in the OutputPort? Can't it be
> > delegated
> > > > to
> > > > > a
> > > > > >>> Sink by introducing new putControlTuple method?
> > > > > >>>
> > > > > >>> Thank you,
> > > > > >>>
> > > > > >>> Vlad
> > > > > >>>
> > > > > >>>
> > > > > >>> On 12/21/16 22:10, Bhupesh Chawda wrote:
> > > > > >>>
> > > > > >>> Hi Vlad,
> > > > > >>>>
> > > > > >>>> The problem in using the Tuple class as the wrapper is that
> the
> > > > Ports
> > > > > >>>> belong to the API and we want to wrap the payload object of
> the
> > > > > control
> > > > > >>>> tuple into the Tuple class which is not part of the API.
> > > > > >>>>
> > > > > >>>> The output port will just get the payload of the user control
> > > tuple.
> > > > > For
> > > > > >>>> example, if the user emits a Long, as a control tuple, the
> > payload
> > > > > >>>> object
> > > > > >>>> will just be a Long object.
> > > > > >>>>
> > > > > >>>> It is necessary to bundle this Long into some recognizable
> > object
> > > so
> > > > > >>>> that
> > > > > >>>> the BufferServerPublisher knows that this is a Control tuple
> and
> > > > not a
> > > > > >>>> regular tuple and serialize it accordingly. It is therefore
> > > > necessary
> > > > > >>>> that
> > > > > >>>> the tuple be part of some known hierarchy so that can be
> > > > distinguished
> > > > > >>>> from
> > > > > >>>> other payload tuples. Let us call this class
> > > ControlTupleInterface.
> > > > > Note
> > > > > >>>> that this needs to be done before the tuple is inserted into
> the
> > > > sink
> > > > > >>>> which
> > > > > >>>> is done in the port objects. Once the tuple is inserted into
> the
> > > > sink,
> > > > > >>>> it
> > > > > >>>> would seem just like any other payload tuple and cannot be
> > > > > >>>> distinguished.
> > > > > >>>>
> > > > > >>>> For this reason, I had something like the following in API:
> > > > > >>>>
> > > > > >>>> package com.datatorrent.api;
> > > > > >>>> public class ControlTupleInterface
> > > > > >>>> {
> > > > > >>>>     Object payload; // User control tuple payload. A Long()
> for
> > > > > example.
> > > > > >>>>     UUID id;  // Unique Id to de-duplicate in downstream
> > operators
> > > > > >>>> }
> > > > > >>>>
> > > > > >>>> Regarding your suggestion on using the Tuple class as the
> > wrapper
> > > > for
> > > > > >>>> the
> > > > > >>>> control tuple payload, let me mention the current scenario
> flow
> > to
> > > > > make
> > > > > >>>> the
> > > > > >>>> discussion easier:
> > > > > >>>>
> > > > > >>>> We have a Tuple class in buffer server which is responsible
> for
> > > > > >>>> serializing
> > > > > >>>> the user control tuple bundling together a message type:
> > > > > >>>> CUSTOM_CONTROL_TUPLE_VALUE.
> > > > > >>>>
> > > > > >>>>
> > > > > >>>> *com.datatorrent.bufferserver.packet.Tuple|--
> > > > > >>>> com.datatorrent.bufferserver.packet.CustomControlTuple*
> > > > > >>>> We have another Tuple class in Stram which helps the
> > > > > >>>> BufferServerSubscriber
> > > > > >>>> to de-serialize the serialized tuples. We should have
> > > > > CustomControlTuple
> > > > > >>>> in
> > > > > >>>> stram as follows:
> > > > > >>>>
> > > > > >>>>
> > > > > >>>> *com.datatorrent.stram.tuple.Tuple|--
> > > > > >>>> com.datatorrent.stram.tuple.CustomControlTuple*This will
> have a
> > > > field
> > > > > >>>> for
> > > > > >>>>
> > > > > >>>> user control payload.
> > > > > >>>>
> > > > > >>>> I think we should not expose the Tuple class in stram to the
> > API.
> > > > That
> > > > > >>>> was
> > > > > >>>> the main reason I introduced another class/interface
> > > > > >>>> ControlTupleInterface
> > > > > >>>> as described above.
> > > > > >>>>
> > > > > >>>> Regarding, adding methods to DefaultInputPort and
> > > > DefaultOutputPort, I
> > > > > >>>> think error detection would not be early enough if the control
> > > tuple
> > > > > is
> > > > > >>>> sent very late in the processing :-)
> > > > > >>>> Extending the ports to ControlTupleAware* should help in this
> > > case.
> > > > > >>>> However, we still need to see if there are any downsides on
> > doing
> > > > > this.
> > > > > >>>>
> > > > > >>>> Thanks.
> > > > > >>>>
> > > > > >>>> ~ Bhupesh
> > > > > >>>>
> > > > > >>>>
> > > > > >>>>
> > > > > >>>>
> > > > > >>>> On Thu, Dec 22, 2016 at 7:26 AM, Vlad Rozov <
> > > > v.ro...@datatorrent.com>
> > > > > >>>> wrote:
> > > > > >>>>
> > > > > >>>> Hi Bhupesh,
> > > > > >>>>
> > > > > >>>>> it should not be a CustomWrapper.  The wrapper object should
> be
> > > > > >>>>> CustomControlTuple that extends Tuple. There is already code
> > that
> > > > > >>>>> checks
> > > > > >>>>> for Tuple instance. The "unWrap" name is misleading, IMO. It
> > > should
> > > > > be
> > > > > >>>>> something like customControlTuple.getPayload() or
> > > > > >>>>> customControlTuple.getAttachment(). In the emitControl(),
> > create
> > > > new
> > > > > >>>>> CustomControlTuple using provided payload as one of
> arguments.
> > It
> > > > may
> > > > > >>>>> still
> > > > > >>>>> be good to use common parent other than Object for control
> > tuple
> > > > > >>>>> payload
> > > > > >>>>> class hierarchy.
> > > > > >>>>>
> > > > > >>>>> I don't understand how adding more methods to the Default
> > > > > >>>>> implementation
> > > > > >>>>> will help with early error detection unless application or
> > > operator
> > > > > >>>>> that
> > > > > >>>>> relies on the custom control tuple functionality explicitly
> > > checks
> > > > > for
> > > > > >>>>> the
> > > > > >>>>> platform version at run-time or tries to emit a control tuple
> > > just
> > > > to
> > > > > >>>>> check
> > > > > >>>>> that such functionality is supported by the platform.
> > > > > >>>>>
> > > > > >>>>> Thank you,
> > > > > >>>>>
> > > > > >>>>> Vlad
> > > > > >>>>>
> > > > > >>>>> On 12/21/16 04:58, Bhupesh Chawda wrote:
> > > > > >>>>>
> > > > > >>>>> Hi Vlad.
> > > > > >>>>>
> > > > > >>>>>> Yes, the API should not change. We can take an Object
> instead,
> > > and
> > > > > >>>>>> later
> > > > > >>>>>> wrap it into the required class.
> > > > > >>>>>>
> > > > > >>>>>> Our InputPort.put and emitControl method would look
> something
> > > like
> > > > > the
> > > > > >>>>>> following where we handle the wrapping and unwrapping
> > > internally.
> > > > > >>>>>>
> > > > > >>>>>> public void put(T tuple)
> > > > > >>>>>> {
> > > > > >>>>>>      if (tuple instanceof CustomWrapper) {
> > > > > >>>>>>        processControl(tuple.unWrap());
> > > > > >>>>>>      }  else {
> > > > > >>>>>>        process(tuple)
> > > > > >>>>>>      }
> > > > > >>>>>> }
> > > > > >>>>>>
> > > > > >>>>>> emitControl(Object tuple)
> > > > > >>>>>> {
> > > > > >>>>>>      sink.put(CustomWrapper.wrap(tuple));
> > > > > >>>>>> }
> > > > > >>>>>>
> > > > > >>>>>> Regarding the compatibility issue, I think we have two ways
> of
> > > > doing
> > > > > >>>>>> it:
> > > > > >>>>>>
> > > > > >>>>>>       1. Extend DefaultInputPort and DefaultOutputPort and
> > > create
> > > > > >>>>>>       ControlAwareInput and ControlAwareOutput out of it.
> This
> > > > might
> > > > > >>>>>> require us
> > > > > >>>>>>       to additionally handle specific cases when
> > non-compatible
> > > > > ports
> > > > > >>>>>>       (ControlAwareOutput and DefaultInput, for example) are
> > > > > >>>>>> connected to
> > > > > >>>>>> each
> > > > > >>>>>>       other in user apps.
> > > > > >>>>>>       2. Add the additional methods in the existing Default
> > > > > >>>>>> implementations.
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>> IMO, both of these would help in early error detection.
> > > > > >>>>>>
> > > > > >>>>>> ~ Bhupesh
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>> On Wed, Dec 21, 2016 at 1:36 AM, Vlad Rozov <
> > > > > v.ro...@datatorrent.com>
> > > > > >>>>>> wrote:
> > > > > >>>>>>
> > > > > >>>>>> A wrapper class is required for the control tuples delivery,
> > but
> > > > > >>>>>>
> > > > > >>>>>> Port/Operator API should use Control Tuple payload object
> > only.
> > > > > >>>>>>> Implementation of the wrapper class may change from version
> > to
> > > > > >>>>>>> version,
> > > > > >>>>>>> but
> > > > > >>>>>>> API should not be affected by the change.
> > > > > >>>>>>>
> > > > > >>>>>>> I guess, assumption is that default input and output port
> > will
> > > be
> > > > > >>>>>>> extended
> > > > > >>>>>>> to provide support for the control tuples. This may cause
> > some
> > > > > >>>>>>> backward
> > > > > >>>>>>> compatibility issues. Consider scenario when a newer
> version
> > of
> > > > > >>>>>>> Malhar
> > > > > >>>>>>> that
> > > > > >>>>>>> relies on EOF control tuple is deployed into older version
> of
> > > > core
> > > > > >>>>>>> that
> > > > > >>>>>>> does not support control tuples. In such scenario, error
> will
> > > be
> > > > > >>>>>>> raised
> > > > > >>>>>>> only when an operator tries to emit EOF control tuple at
> the
> > > end
> > > > > of a
> > > > > >>>>>>> job.
> > > > > >>>>>>> Introducing control tuple aware ports solve the early error
> > > > > >>>>>>> detection.
> > > > > >>>>>>> It
> > > > > >>>>>>> will require some operators to be modified to use control
> > tuple
> > > > > aware
> > > > > >>>>>>> ports, but such change may help to distinguish control
> tuple
> > > > aware
> > > > > >>>>>>> operators from their old versions.
> > > > > >>>>>>>
> > > > > >>>>>>> Vlad
> > > > > >>>>>>>
> > > > > >>>>>>> On 12/20/16 04:09, Bhupesh Chawda wrote:
> > > > > >>>>>>>
> > > > > >>>>>>> I investigated this and seems like it is better to have a
> > > wrapper
> > > > > >>>>>>> class
> > > > > >>>>>>>
> > > > > >>>>>>> for
> > > > > >>>>>>>> the user object.
> > > > > >>>>>>>> This would serve 2 purposes:
> > > > > >>>>>>>>
> > > > > >>>>>>>>        1. Allow us to distinguish a custom control tuple
> > from
> > > > > other
> > > > > >>>>>>>> payload
> > > > > >>>>>>>>        tuples.
> > > > > >>>>>>>>        2. For the same control tuple received from
> different
> > > > > >>>>>>>> upstream
> > > > > >>>>>>>>
> > > > > >>>>>>>>        partitions, we would have some mechanism to
> > distinguish
> > > > > >>>>>>>> between
> > > > > >>>>>>>> the
> > > > > >>>>>>>> two in
> > > > > >>>>>>>>        order to identify duplicates.
> > > > > >>>>>>>>
> > > > > >>>>>>>> Additionally, the wrapper class needs to be part of the
> API
> > as
> > > > > >>>>>>>> DefaultOutputPort needs to know about it, before putting
> it
> > > into
> > > > > the
> > > > > >>>>>>>> sink.
> > > > > >>>>>>>> We can make sure that the user is not able to extend or
> > modify
> > > > > this
> > > > > >>>>>>>> class
> > > > > >>>>>>>> in any manner.
> > > > > >>>>>>>>
> > > > > >>>>>>>> ~ Bhupesh
> > > > > >>>>>>>>
> > > > > >>>>>>>> On Mon, Dec 19, 2016 at 12:18 PM, David Yan <
> > > david...@gmail.com
> > > > >
> > > > > >>>>>>>> wrote:
> > > > > >>>>>>>>
> > > > > >>>>>>>> This C type parameter is going to fix the control tuple
> type
> > > at
> > > > > >>>>>>>> compile
> > > > > >>>>>>>>
> > > > > >>>>>>>> time and this is actually not what we want. Note that the
> > > > operator
> > > > > >>>>>>>> may
> > > > > >>>>>>>>
> > > > > >>>>>>>>> receive or emit multiple different control tuple types.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> David
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> On Dec 17, 2016 3:33 AM, "Tushar Gosavi" <
> > > > tus...@datatorrent.com
> > > > > >
> > > > > >>>>>>>>> wrote:
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> We do not need to create an interface for data emitted
> > > through
> > > > > >>>>>>>>> emitControl or processed through processControl.
> Internally
> > > we
> > > > > >>>>>>>>> could
> > > > > >>>>>>>>> wrap the user object in ControlTuple. you can add type
> > > > parameter
> > > > > >>>>>>>>> for
> > > > > >>>>>>>>> control tuple object on ports.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> DefaultInputPort<D,C>
> > > > > >>>>>>>>> D is the data type and C is the control tuple type for
> > better
> > > > > error
> > > > > >>>>>>>>> catching at compile phase.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> - Tushar.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> On Sat, Dec 17, 2016 at 8:35 AM, Bhupesh Chawda <
> > > > > >>>>>>>>> bhup...@datatorrent.com
> > > > > >>>>>>>>> wrote:
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> Agreed Vlad and David.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> I am just suggesting there should be a wrapper for the
> user
> > > > > object.
> > > > > >>>>>>>>>> It
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> can
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> be a marker interface and we can call it something else
> > like
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> "CustomControl".
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> The user object will be wrapped in another class
> > > > "ControlTuple"
> > > > > >>>>>>>>>> which
> > > > > >>>>>>>>>> traverses the BufferServer and will perhaps be extended
> > from
> > > > the
> > > > > >>>>>>>>>> packet/Tuple class. This class will not be exposed to
> the
> > > > user.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> ~ Bhupesh
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> On Sat, Dec 17, 2016 at 4:11 AM, Vlad Rozov <
> > > > > >>>>>>>>>> v.ro...@datatorrent.com>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> wrote:
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> I agree with David. Payload of the control tuple is in
> the
> > > > > >>>>>>>>> userObject
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> and
> > > > > >>>>>>>>>> operators/ports don't need to be exposed to the
> > > implementation
> > > > > of
> > > > > >>>>>>>>>> the
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> ControlTuple class. With the proposed interface
> operators
> > > > > >>>>>>>>>> developers
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>> are
> > > > > >>>>>>>>>>> free to extend ControlTuple further and I don't think
> > that
> > > > such
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> capability
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> needs to be provided. The wrapping in the ControlTuple
> > > class
> > > > is
> > > > > >>>>>>>>>> necessary
> > > > > >>>>>>>>>> and most likely ControlTuple needs to be extended from
> the
> > > > > buffer
> > > > > >>>>>>>>>> server
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> Tuple. It may be good to have a common parent other than
> > > > Object
> > > > > >>>>>>>>>> for
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>> all
> > > > > >>>>>>>>>>> user payloads, but it may be a marker interface as
> well.
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> Thank you,
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> Vlad
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> On 12/16/16 09:59, Bhupesh Chawda wrote:
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> Hi David,
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> Actually, I was thinking of another API class called
> > > > > >>>>>>>>>>> ControlTuple,
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>>> different from the actual tuple class in buffer server
> > or
> > > > > stram.
> > > > > >>>>>>>>>>>> This could serve as a way for the Buffer server
> > publisher
> > > to
> > > > > >>>>>>>>>>>> understand
> > > > > >>>>>>>>>>>> that it is a control tuple and needs to be wrapped
> > > > > differently.
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> ~ Bhupesh
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> On Dec 16, 2016 22:28, "David Yan" <
> david...@gmail.com>
> > > > > wrote:
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>>       // DefaultInputPort
> > > > > >>>>>>>>>>>>        public void processControl(ControlTuple tuple)
> > > > > >>>>>>>>>>>>        {
> > > > > >>>>>>>>>>>>          // Default Implementation to avoid need to
> > > > implement
> > > > > >>>>>>>>>>>> it in
> > > > > >>>>>>>>>>>> all
> > > > > >>>>>>>>>>>> implementations
> > > > > >>>>>>>>>>>>        }
> > > > > >>>>>>>>>>>> {code}
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> {code}
> > > > > >>>>>>>>>>>>       // DefaultOutputPort
> > > > > >>>>>>>>>>>>        public void emitControl(ControlTuple tuple)
> > > > > >>>>>>>>>>>>        {
> > > > > >>>>>>>>>>>>        }
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> I think we don't need to expose the ControlTuple class
> > to
> > > > the
> > > > > >>>>>>>>>>>> operator
> > > > > >>>>>>>>>>>> developers because the window ID is just the current
> > > window
> > > > ID
> > > > > >>>>>>>>>>>> when
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> these
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> methods are called. How about making them just Object?
> > We
> > > > also
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>> need to
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> provide the way for the user to specify custom
> serializer
> > > for
> > > > > the
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>> control
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> tuple.
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>> David
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>> On Thu, Dec 15, 2016 at 12:43 AM, Bhupesh Chawda <
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> bhup...@datatorrent.com
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> wrote:
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>> Hi All,
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>> Here are the initial interfaces:
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> {code}
> > > > > >>>>>>>>>>>>>       // DefaultInputPort
> > > > > >>>>>>>>>>>>>        public void processControl(ControlTuple tuple)
> > > > > >>>>>>>>>>>>>        {
> > > > > >>>>>>>>>>>>>          // Default Implementation to avoid need to
> > > > implement
> > > > > >>>>>>>>>>>>> it
> > > > > >>>>>>>>>>>>> in
> > > > > >>>>>>>>>>>>> all
> > > > > >>>>>>>>>>>>> implementations
> > > > > >>>>>>>>>>>>>        }
> > > > > >>>>>>>>>>>>> {code}
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> {code}
> > > > > >>>>>>>>>>>>>       // DefaultOutputPort
> > > > > >>>>>>>>>>>>>        public void emitControl(ControlTuple tuple)
> > > > > >>>>>>>>>>>>>        {
> > > > > >>>>>>>>>>>>>        }
> > > > > >>>>>>>>>>>>> {code}
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> We have an option to add these methods to the
> > interfaces
> > > -
> > > > > >>>>>>>>>>>>> InputPort
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> and
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> OutputPort; But these would not be backward
> compatible
> > > and
> > > > > also
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>> not
> > > > > >>>>>>>>>>> consistent with the current implementation of basic
> data
> > > > tuple
> > > > > >>>>>>>>>>> flow
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> (as
> > > > > >>>>>>>>>>>> with process() and emit()).
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>> We also need to expose an interface / class for users
> to
> > > wrap
> > > > > >>>>>>>>>>> their
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> object
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> and emit downstream. This should be part of API.
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> {code}
> > > > > >>>>>>>>>>>>> public class ControlTuple extends Tuple
> > > > > >>>>>>>>>>>>> {
> > > > > >>>>>>>>>>>>>        Object userObject;
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>        public ControlTuple(long windowId, Object
> > > > userObject)
> > > > > >>>>>>>>>>>>>        {
> > > > > >>>>>>>>>>>>>          //
> > > > > >>>>>>>>>>>>>        }
> > > > > >>>>>>>>>>>>> }
> > > > > >>>>>>>>>>>>> {code}
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> The emitted tuples would traverse the same flow as
> with
> > > > other
> > > > > >>>>>>>>>>>>> control
> > > > > >>>>>>>>>>>>> tuples. The plan is to intercept the control tuples
> in
> > > > > >>>>>>>>>>>>> GenericNode
> > > > > >>>>>>>>>>>>> and
> > > > > >>>>>>>>>>>>> use
> > > > > >>>>>>>>>>>>> the Reservior to emit the control tuples at the end
> of
> > > the
> > > > > >>>>>>>>>>>>> window.
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> GenericNode seems to be the best place to buffer
> > incoming
> > > > > >>>>>>>>>>>>> custom
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> control
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> tuples without delivering them immediately to the
> > > operator
> > > > > >>>>>>>>>>>> port.
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>> Once
> > > > > >>>>>>>>>>> the
> > > > > >>>>>>>>>>> end of the window is reached, we plan to use the
> > reservoir
> > > > sink
> > > > > >>>>>>>>>>> to
> > > > > >>>>>>>>>>> push
> > > > > >>>>>>>>>>> them to the port. This is different behavior than any
> > other
> > > > > >>>>>>>>>>> control
> > > > > >>>>>>>>>>> tuple
> > > > > >>>>>>>>>>> where we are changing the order of tuples in the
> stream.
> > > The
> > > > > >>>>>>>>>>> custom
> > > > > >>>>>>>>>>> control
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> tuples will be buffered and not delivered to the ports
> > > until
> > > > > the
> > > > > >>>>>>>>>>>> end
> > > > > >>>>>>>>>>>> of
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> the
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>> window.
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> To accomplish this, we need to have a public method in
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> SweepableReservoir
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> which allows to put a tuple back in the sink of the
> > > > > reservoir.
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>> ~ Bhupesh
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to