Hi Bhupesh,

I don't see anything stateful here as control tuples are delivered at the
end of the streaming window in which they were generated?

Also, control tuples would need to be broadcast to all partitions, and
therefore also joined downstream by the engine? (In addition to the case
you describe where different partitions generate control tuples, which all
need to be delivered downstream).

Thanks,
Thomas



On Fri, Dec 16, 2016 at 5:15 AM, Bhupesh Chawda <[email protected]>
wrote:

> As I understand from the discussion on the other thread, we want custom
> control tuples to behave like existing control tuples similar to begin
> window and end window.
>
> However the fact that we are allowing the user to bundle a user object
> inside the control tuple differentiates it from the existing control tuples
> and makes it a stateful tuple. This is what is delivered to the downstream
> operator during the call back.
>
> Perhaps we should add the following requirements for the custom control
> tuple behavior:
>
>    1. All control tuples generated from upstream partitions must be
>    delivered to the downstream operator. Since all control tuples have a
>    different user object bundled inside, the engine should make sure that
> all
>    of the control tuples are received by the down stream operator. However
> the
>    callbacks would only be after all data tuples have been delivered to the
>    operator.
>    2. The alternate behavior could be to behave similar to the existing
>    control tuples (like begin window and end window) and consider the
> custom
>    control tuple as stateless. Otherwise the question arises on which user
>    object (from among all control tuples from upstream) is passed on to the
>    downstream operator.
>
> We can control the behavior via some attribute.
>
> Thoughts?
>
> ~ Bhupesh
>
>
>
>
> On Thu, Dec 15, 2016 at 2:13 PM, Bhupesh Chawda <[email protected]>
> wrote:
>
> > Hi All,
> >
> > Here are the initial interfaces:
> >
> > {code}
> >  // DefaultInputPort
> >   public void processControl(ControlTuple tuple)
> >   {
> >     // Default Implementation to avoid need to implement it in all
> > implementations
> >   }
> > {code}
> >
> > {code}
> >  // DefaultOutputPort
> >   public void emitControl(ControlTuple tuple)
> >   {
> >   }
> > {code}
> >
> > We have an option to add these methods to the interfaces - InputPort and
> > OutputPort; But these would not be backward compatible and also not
> > consistent with the current implementation of basic data tuple flow (as
> > with process() and emit()).
> >
> > We also need to expose an interface / class for users to wrap their
> object
> > and emit downstream. This should be part of API.
> >
> > {code}
> > public class ControlTuple extends Tuple
> > {
> >   Object userObject;
> >
> >   public ControlTuple(long windowId, Object userObject)
> >   {
> >     //
> >   }
> > }
> > {code}
> >
> > The emitted tuples would traverse the same flow as with other control
> > tuples. The plan is to intercept the control tuples in GenericNode and
> use
> > the Reservior to emit the control tuples at the end of the window.
> >
> > GenericNode seems to be the best place to buffer incoming custom control
> > tuples without delivering them immediately to the operator port. Once the
> > end of the window is reached, we plan to use the reservoir sink to push
> > them to the port. This is different behavior than any other control tuple
> > where we are changing the order of tuples in the stream. The custom
> control
> > tuples will be buffered and not delivered to the ports until the end of
> the
> > window.
> > To accomplish this, we need to have a public method in SweepableReservoir
> > which allows to put a tuple back in the sink of the reservoir.
> >
> > ~ Bhupesh
> >
> >
>

Reply via email to