It appears that option 1 is more favored due to unavailability of a use
case which could use option 2.

However, option 2 is problematic in specific cases, like presence of
multiple input ports for example. In case of a linear DAG where control
tuples are flowing in order with the data tuples, it should not be
difficult to guarantee idempotency. For example, cases where there could be
multiple changes in behavior of an operator during a single window, it
should not wait until end window for these changes to take effect. Since,
we don't have a concrete use case right now, perhaps we do not want to go
that road. This feature should be available through a platform attribute
(may be at a later point in time) where the default is option 1.

I think option 1 is suitable for a starting point in the implementation of
this feature and we should proceed with it.

~ Bhupesh



On Fri, Nov 11, 2016 at 12:59 AM, David Yan <da...@datatorrent.com> wrote:

> Good question Tushar. The callback should be called only once.
> The way to implement this is to keep a list of control tuple hashes for the
> given streaming window and only do the callback when the operator has not
> seen it before.
>
> Other thoughts?
>
> David
>
> On Thu, Nov 10, 2016 at 9:32 AM, Tushar Gosavi <tus...@datatorrent.com>
> wrote:
>
> > Hi David,
> >
> > What would be the behaviour in case where we have a DAG with following
> > operators, the number in bracket is number of partitions, X is NxM
> > partitioning.
> > A(1) X B(4) X C(2)
> >
> > If A sends a control tuple, it will be sent to all 4 partition of B,
> > and from each partition from B it goes to C, i.e each partition of C
> > will receive same control tuple originated from A multiple times
> > (number of upstream partitions of C). In this case will the callback
> > function get called multiple times or just once.
> >
> > -Tushar.
> >
> >
> > On Fri, Nov 4, 2016 at 12:14 AM, David Yan <da...@datatorrent.com>
> wrote:
> > > Hi Bhupesh,
> > >
> > > Since each input port has its own incoming control tuple, I would
> imagine
> > > there would be an additional DefaultInputPort.processControl method
> that
> > > operator developers can override.
> > > If we go for option 1, my thinking is that the control tuples would
> > always
> > > be delivered at the next window boundary, even if the emit method is
> > called
> > > within a window.
> > >
> > > David
> > >
> > > On Thu, Nov 3, 2016 at 1:46 AM, Bhupesh Chawda <
> bhup...@datatorrent.com>
> > > wrote:
> > >
> > >> I have a question regarding the callback for a control tuple. Will it
> be
> > >> similar to InputPort::process() method? Something like
> > >> InputPort::processControlTuple(t)
> > >> ? Or will it be a method of the operator similar to beginWindow()?
> > >>
> > >> When we say that the control tuple will be delivered at window
> boundary,
> > >> does that mean all control tuples emitted in that window will be
> > processed
> > >> together at the end of the window? This would imply that there is no
> > >> ordering among regular tuples and control tuples.
> > >>
> > >> I think we should get started with the option 1 - control tuples at
> > window
> > >> boundary, which seems to handle most of the use cases. For some cases
> > which
> > >> require option 2, we can always build on this.
> > >>
> > >> ~ Bhupesh
> > >>
> > >> On Thu, Nov 3, 2016 at 1:35 PM, Thomas Weise <t...@apache.org> wrote:
> > >>
> > >> > I don't see how that would work. Suppose you have a file splitter
> and
> > >> > multiple partitions of block readers. The "end of file" event cannot
> > be
> > >> > processed downstream until all block readers are done. I also think
> > that
> > >> > this is related to the batch demarcation discussion and there should
> > be a
> > >> > single generalized mechanism to support this.
> > >> >
> > >> >
> > >> > On Wed, Nov 2, 2016 at 10:51 PM, Pramod Immaneni <
> > pra...@datatorrent.com
> > >> >
> > >> > wrote:
> > >> >
> > >> > > Suppose I am processing data in a file and I want to do something
> at
> > >> the
> > >> > > end of a file at the output operator, I would send an end file
> > control
> > >> > > tuple and act on it when I receive it at the output. In a single
> > >> window I
> > >> > > may end up processing multiple files and if I don't have multiple
> > ports
> > >> > and
> > >> > > logical paths through the DAG (multiple partitions are ok). I can
> > >> process
> > >> > > end of each file immediately and also know what file was closed
> > without
> > >> > > sending extra identification information in the end file which I
> > would
> > >> > need
> > >> > > if I am collecting all of them and processing at the end of the
> > window.
> > >> > >
> > >> > > On Wed, Nov 2, 2016 at 2:45 PM, Thomas Weise <t...@apache.org>
> > wrote:
> > >> > >
> > >> > > > The use cases listed in the original discussion don't call for
> > option
> > >> > 2.
> > >> > > It
> > >> > > > seems to come with additional complexity and implementation
> cost.
> > >> > > >
> > >> > > > Can those in favor of option 2 please also provide the use case
> > for
> > >> it.
> > >> > > >
> > >> > > > Thanks,
> > >> > > > Thomas
> > >> > > >
> > >> > > >
> > >> > > > On Wed, Nov 2, 2016 at 10:36 PM, Siyuan Hua <
> > siy...@datatorrent.com>
> > >> > > > wrote:
> > >> > > >
> > >> > > > > I will vote for approach 1.
> > >> > > > >
> > >> > > > > First of all that one sounds easier to do to me. And I think
> > >> > > idempotency
> > >> > > > is
> > >> > > > > important. It may run at the cost of higher latency but I
> think
> > it
> > >> is
> > >> > > ok
> > >> > > > >
> > >> > > > > And in addition, when in the future if users do need realtime
> > >> control
> > >> > > > tuple
> > >> > > > > processing, we can always add the option on top of it.
> > >> > > > >
> > >> > > > > So I vote for 1
> > >> > > > >
> > >> > > > > Thanks,
> > >> > > > > Siyuan
> > >> > > > >
> > >> > > > > On Wed, Nov 2, 2016 at 1:28 PM, Pradeep A. Dalvi <
> > p...@apache.org>
> > >> > > > wrote:
> > >> > > > >
> > >> > > > > > As a rule of thumb in any real time operating system,
> control
> > >> > tuples
> > >> > > > > should
> > >> > > > > > always be handled using Priority Queues.
> > >> > > > > >
> > >> > > > > > We may try to control priorities by defining levels. And
> shall
> > >> not
> > >> > > > > > be delivered at window boundaries.
> > >> > > > > >
> > >> > > > > > In short, control tuples shall never be treated as any other
> > >> tuples
> > >> > > in
> > >> > > > > real
> > >> > > > > > time systems.
> > >> > > > > >
> > >> > > > > > On Thursday, November 3, 2016, David Yan <
> > da...@datatorrent.com>
> > >> > > > wrote:
> > >> > > > > >
> > >> > > > > > > Hi all,
> > >> > > > > > >
> > >> > > > > > > I would like to renew the discussion of control tuples.
> > >> > > > > > >
> > >> > > > > > > Last time, we were in a debate about whether:
> > >> > > > > > >
> > >> > > > > > > 1) the platform should enforce that control tuples are
> > >> delivered
> > >> > at
> > >> > > > > > window
> > >> > > > > > > boundaries only
> > >> > > > > > >
> > >> > > > > > > or:
> > >> > > > > > >
> > >> > > > > > > 2) the platform should deliver control tuples just as
> other
> > >> > tuples
> > >> > > > and
> > >> > > > > > it's
> > >> > > > > > > the operator developers' choice whether to handle the
> > control
> > >> > > tuples
> > >> > > > as
> > >> > > > > > > they arrive or delay the processing till the next window
> > >> > boundary.
> > >> > > > > > >
> > >> > > > > > > To summarize the pros and cons:
> > >> > > > > > >
> > >> > > > > > > Approach 1: If processing control tuples results in
> changes
> > of
> > >> > the
> > >> > > > > > behavior
> > >> > > > > > > of the operator, if idempotency needs to be preserved, the
> > >> > > processing
> > >> > > > > > must
> > >> > > > > > > be done at window boundaries. This approach will save the
> > >> > operator
> > >> > > > > > > developers headache to ensure that. However, this will
> take
> > >> away
> > >> > > the
> > >> > > > > > > choices from the operator developer if they just need to
> > >> process
> > >> > > the
> > >> > > > > > > control tuples as soon as possible.
> > >> > > > > > >
> > >> > > > > > > Approach 2: The operator has a chance to immediately
> process
> > >> > > control
> > >> > > > > > > tuples. This would be useful if latency is more valued
> than
> > >> > > > > correctness.
> > >> > > > > > > However, if this would open the possibility for operator
> > >> > developers
> > >> > > > to
> > >> > > > > > > shoot themselves in the foot. This is especially true if
> > there
> > >> > are
> > >> > > > > > multiple
> > >> > > > > > > input ports. as there is no easy way to guarantee
> processing
> > >> > order
> > >> > > > for
> > >> > > > > > > multiple input ports.
> > >> > > > > > >
> > >> > > > > > > We would like to arrive to a consensus and close this
> > >> discussion
> > >> > > soon
> > >> > > > > > this
> > >> > > > > > > time so we can start the work on this important feature.
> > >> > > > > > >
> > >> > > > > > > Thanks!
> > >> > > > > > >
> > >> > > > > > > David
> > >> > > > > > >
> > >> > > > > > > On Tue, Jun 28, 2016 at 10:04 AM, Vlad Rozov <
> > >> > > > v.ro...@datatorrent.com
> > >> > > > > > > <javascript:;>>
> > >> > > > > > > wrote:
> > >> > > > > > >
> > >> > > > > > > > It is not clear how operator will emit custom control
> > tuple
> > >> at
> > >> > > > window
> > >> > > > > > > > boundaries. One way is to cache/accumulate control
> tuples
> > in
> > >> > the
> > >> > > > > > operator
> > >> > > > > > > > output port till window closes (END_WINDOW is inserted
> > into
> > >> the
> > >> > > > > output
> > >> > > > > > > > sink) or only allow an operator to emit control tuples
> > inside
> > >> > the
> > >> > > > > > > > endWindow(). The later is a slight variation of the
> > operator
> > >> > > output
> > >> > > > > > port
> > >> > > > > > > > caching behavior with the only difference that now the
> > >> operator
> > >> > > > > itself
> > >> > > > > > is
> > >> > > > > > > > responsible for caching/accumulating control tuples.
> Note
> > >> that
> > >> > in
> > >> > > > > many
> > >> > > > > > > > cases it will be necessary to postpone emitting payload
> > >> tuples
> > >> > > that
> > >> > > > > > > > logically come after the custom control tuple till the
> > next
> > >> > > window
> > >> > > > > > > begins.
> > >> > > > > > > >
> > >> > > > > > > > IMO, that too restrictive and in a case where input
> > operator
> > >> > > uses a
> > >> > > > > > push
> > >> > > > > > > > instead of a poll (for example, it provides an end point
> > >> where
> > >> > > > remote
> > >> > > > > > > > agents may connect and publish/push data), control
> tuples
> > may
> > >> > be
> > >> > > > used
> > >> > > > > > for
> > >> > > > > > > > connect/disconnect/watermark broadcast to (partitioned)
> > >> > > downstream
> > >> > > > > > > > operators. In this case the platform just need to
> > guarantee
> > >> > order
> > >> > > > > > barrier
> > >> > > > > > > > (any tuple emitted prior to a control tuple needs to be
> > >> > delivered
> > >> > > > > prior
> > >> > > > > > > to
> > >> > > > > > > > the control tuple).
> > >> > > > > > > >
> > >> > > > > > > > Thank you,
> > >> > > > > > > >
> > >> > > > > > > > Vlad
> > >> > > > > > > >
> > >> > > > > > > >
> > >> > > > > > > >
> > >> > > > > > > > On 6/27/16 19:36, Amol Kekre wrote:
> > >> > > > > > > >
> > >> > > > > > > >> I agree with David. Allowing control tuples within a
> > window
> > >> > > (along
> > >> > > > > > with
> > >> > > > > > > >> data tuples) creates very dangerous situation where
> > >> guarantees
> > >> > > are
> > >> > > > > > > >> impacted. It is much safer to enable control tuples
> > >> > > (send/receive)
> > >> > > > > at
> > >> > > > > > > >> window boundaries (after END_WINDOW of window N, and
> > before
> > >> > > > > > BEGIN_WINDOW
> > >> > > > > > > >> for window N+1). My take on David's list is
> > >> > > > > > > >>
> > >> > > > > > > >> 1. -> window boundaries -> Strong +1; there will be a
> big
> > >> > issue
> > >> > > > with
> > >> > > > > > > >> guarantees for operators with multiple ports. (see
> > Thomas's
> > >> > > > > response)
> > >> > > > > > > >> 2. -> All downstream windows -> +1, but there are
> > >> situations;
> > >> > a
> > >> > > > > caveat
> > >> > > > > > > >> could be "only to operators that implement control
> tuple
> > >> > > > > > > >> interface/listeners", which could effectively
> translates
> > to
> > >> > "all
> > >> > > > > > > >> interested
> > >> > > > > > > >> downstream operators"
> > >> > > > > > > >> 3. Only Input operator can create control tuples -> -1;
> > is
> > >> > > > > restrictive
> > >> > > > > > > >> even
> > >> > > > > > > >> though most likely 95% of the time it will be input
> > >> operators
> > >> > > > > > > >>
> > >> > > > > > > >> Thks,
> > >> > > > > > > >> Amol
> > >> > > > > > > >>
> > >> > > > > > > >>
> > >> > > > > > > >> On Mon, Jun 27, 2016 at 4:37 PM, Thomas Weise <
> > >> > > > > tho...@datatorrent.com
> > >> > > > > > > <javascript:;>>
> > >> > > > > > > >> wrote:
> > >> > > > > > > >>
> > >> > > > > > > >> The windowing we discuss here is in general event time
> > >> based,
> > >> > > > > arrival
> > >> > > > > > > time
> > >> > > > > > > >>> is a special case of it.
> > >> > > > > > > >>>
> > >> > > > > > > >>> I don't think state changes can be made independent of
> > the
> > >> > > > > streaming
> > >> > > > > > > >>> window
> > >> > > > > > > >>> boundary as it would prevent idempotent processing and
> > >> > > > transitively
> > >> > > > > > > >>> exactly
> > >> > > > > > > >>> once. For that to work, tuples need to be presented to
> > the
> > >> > > > operator
> > >> > > > > > in
> > >> > > > > > > a
> > >> > > > > > > >>> guaranteed order *within* the streaming window, which
> is
> > >> not
> > >> > > > > possible
> > >> > > > > > > >>> with
> > >> > > > > > > >>> multiple ports (and partitions).
> > >> > > > > > > >>>
> > >> > > > > > > >>> Thomas
> > >> > > > > > > >>>
> > >> > > > > > > >>> On Mon, Jun 27, 2016 at 2:53 PM, David Yan <
> > >> > > > da...@datatorrent.com
> > >> > > > > > > <javascript:;>>
> > >> > > > > > > >>> wrote:
> > >> > > > > > > >>>
> > >> > > > > > > >>> I think for session tracking, if the session
> boundaries
> > are
> > >> > > > allowed
> > >> > > > > > to
> > >> > > > > > > be
> > >> > > > > > > >>>> not aligned with the streaming window boundaries, the
> > user
> > >> > > will
> > >> > > > > > have a
> > >> > > > > > > >>>>
> > >> > > > > > > >>> much
> > >> > > > > > > >>>
> > >> > > > > > > >>>> bigger problem with idempotency. And in most cases,
> > >> session
> > >> > > > > tracking
> > >> > > > > > > is
> > >> > > > > > > >>>> event time based, not ingression time or processing
> > time
> > >> > > based,
> > >> > > > so
> > >> > > > > > > this
> > >> > > > > > > >>>>
> > >> > > > > > > >>> may
> > >> > > > > > > >>>
> > >> > > > > > > >>>> never be a problem. But if that ever happens, the
> user
> > can
> > >> > > > always
> > >> > > > > > > alter
> > >> > > > > > > >>>>
> > >> > > > > > > >>> the
> > >> > > > > > > >>>
> > >> > > > > > > >>>> default 500ms width.
> > >> > > > > > > >>>>
> > >> > > > > > > >>>> David
> > >> > > > > > > >>>>
> > >> > > > > > > >>>> On Mon, Jun 27, 2016 at 2:35 PM, Vlad Rozov <
> > >> > > > > > v.ro...@datatorrent.com
> > >> > > > > > > <javascript:;>>
> > >> > > > > > > >>>> wrote:
> > >> > > > > > > >>>>
> > >> > > > > > > >>>> Ability to send custom control tuples within window
> > may be
> > >> > > > useful,
> > >> > > > > > for
> > >> > > > > > > >>>>> example, for sessions tracking, where session
> > boundaries
> > >> > are
> > >> > > > not
> > >> > > > > > > >>>>>
> > >> > > > > > > >>>> aligned
> > >> > > > > > > >>>
> > >> > > > > > > >>>> with window boundaries and 500 ms latency is not
> > >> acceptable
> > >> > > for
> > >> > > > an
> > >> > > > > > > >>>>> application.
> > >> > > > > > > >>>>>
> > >> > > > > > > >>>>> Thank you,
> > >> > > > > > > >>>>>
> > >> > > > > > > >>>>> Vlad
> > >> > > > > > > >>>>>
> > >> > > > > > > >>>>>
> > >> > > > > > > >>>>> On 6/25/16 10:52, Thomas Weise wrote:
> > >> > > > > > > >>>>>
> > >> > > > > > > >>>>> It should not matter from where the control tuple is
> > >> > > triggered.
> > >> > > > > It
> > >> > > > > > > >>>>>>
> > >> > > > > > > >>>>> will
> > >> > > > > > > >>>
> > >> > > > > > > >>>> be
> > >> > > > > > > >>>>
> > >> > > > > > > >>>>> good to have a generic mechanism to propagate it and
> > >> other
> > >> > > > things
> > >> > > > > > can
> > >> > > > > > > >>>>>>
> > >> > > > > > > >>>>> be
> > >> > > > > > > >>>
> > >> > > > > > > >>>> accomplished outside the engine. For example, the new
> > >> > > > > comprehensive
> > >> > > > > > > >>>>>> support
> > >> > > > > > > >>>>>> for windowing will all be in Malhar, nothing that
> the
> > >> > engine
> > >> > > > > needs
> > >> > > > > > > to
> > >> > > > > > > >>>>>>
> > >> > > > > > > >>>>> know
> > >> > > > > > > >>>>
> > >> > > > > > > >>>>> about it except that we need the control tuple for
> > >> > watermark
> > >> > > > > > > >>>>>>
> > >> > > > > > > >>>>> propagation
> > >> > > > > > > >>>
> > >> > > > > > > >>>> and idempotent processing.
> > >> > > > > > > >>>>>>
> > >> > > > > > > >>>>>> I also think the main difference to other tuples is
> > the
> > >> > need
> > >> > > > to
> > >> > > > > > send
> > >> > > > > > > >>>>>>
> > >> > > > > > > >>>>> it
> > >> > > > > > > >>>
> > >> > > > > > > >>>> to
> > >> > > > > > > >>>>
> > >> > > > > > > >>>>> all partitions. Which is similar to checkpoint
> window
> > >> > tuples,
> > >> > > > but
> > >> > > > > > not
> > >> > > > > > > >>>>>>
> > >> > > > > > > >>>>> the
> > >> > > > > > > >>>>
> > >> > > > > > > >>>>> same. Here, we probably also need the ability for
> the
> > >> user
> > >> > to
> > >> > > > > > control
> > >> > > > > > > >>>>>> whether such tuple should traverse the entire DAG
> or
> > >> not.
> > >> > > For
> > >> > > > a
> > >> > > > > > > batch
> > >> > > > > > > >>>>>>
> > >> > > > > > > >>>>> use
> > >> > > > > > > >>>>
> > >> > > > > > > >>>>> case, for example, we may want to send the end of
> > file to
> > >> > the
> > >> > > > > next
> > >> > > > > > > >>>>>> operator, but not beyond, if the operator has
> > >> asynchronous
> > >> > > > > > > processing
> > >> > > > > > > >>>>>> logic
> > >> > > > > > > >>>>>> in it.
> > >> > > > > > > >>>>>>
> > >> > > > > > > >>>>>> For any logic to be idempotent, the control tuple
> > needs
> > >> to
> > >> > > be
> > >> > > > > > > >>>>>>
> > >> > > > > > > >>>>> processed
> > >> > > > > > > >>>
> > >> > > > > > > >>>> at
> > >> > > > > > > >>>>
> > >> > > > > > > >>>>> a window boundary. Receiving the control tuple in
> the
> > >> > window
> > >> > > > > > callback
> > >> > > > > > > >>>>>> would
> > >> > > > > > > >>>>>> avoid having to track extra state in the operator.
> I
> > >> don't
> > >> > > > think
> > >> > > > > > > >>>>>>
> > >> > > > > > > >>>>> that's
> > >> > > > > > > >>>
> > >> > > > > > > >>>> a
> > >> > > > > > > >>>>
> > >> > > > > > > >>>>> major issue, but what is the use case for
> processing a
> > >> > > control
> > >> > > > > > tuple
> > >> > > > > > > >>>>>> within
> > >> > > > > > > >>>>>> the window?
> > >> > > > > > > >>>>>>
> > >> > > > > > > >>>>>> Thomas
> > >> > > > > > > >>>>>>
> > >> > > > > > > >>>>>>
> > >> > > > > > > >>>>>>
> > >> > > > > > > >>>>>> On Sat, Jun 25, 2016 at 6:19 AM, Pramod Immaneni <
> > >> > > > > > > >>>>>>
> > >> > > > > > > >>>>> pra...@datatorrent.com <javascript:;>>
> > >> > > > > > > >>>>
> > >> > > > > > > >>>>> wrote:
> > >> > > > > > > >>>>>>
> > >> > > > > > > >>>>>> For the use cases you mentioned, I think 1) and 2)
> > are
> > >> > more
> > >> > > > > likely
> > >> > > > > > > to
> > >> > > > > > > >>>>>>
> > >> > > > > > > >>>>>>> be controlled directly by the application, 3) and
> 4)
> > >> are
> > >> > > more
> > >> > > > > > > likely
> > >> > > > > > > >>>>>>> going to be triggered externally and directly
> > handled
> > >> by
> > >> > > the
> > >> > > > > > engine
> > >> > > > > > > >>>>>>> and 3) is already being implemented that way
> > >> > > (apexcore-163).
> > >> > > > > > > >>>>>>>
> > >> > > > > > > >>>>>>> The control tuples emitted by an operator would be
> > sent
> > >> > to
> > >> > > > all
> > >> > > > > > > >>>>>>> downstream partitions isn't it and that would be
> the
> > >> > chief
> > >> > > > > > > >>>>>>>
> > >> > > > > > > >>>>>> distinction
> > >> > > > > > > >>>
> > >> > > > > > > >>>> compared to data (apart from the payload) which would
> > get
> > >> > > > > > partitioned
> > >> > > > > > > >>>>>>> under normal circumstances. It would also be
> > guaranteed
> > >> > > that
> > >> > > > > > > >>>>>>> downstream partitions will receive control tuples
> > only
> > >> > > after
> > >> > > > > the
> > >> > > > > > > data
> > >> > > > > > > >>>>>>> that was sent before it so we could send it
> > immediately
> > >> > > when
> > >> > > > it
> > >> > > > > > is
> > >> > > > > > > >>>>>>> emitted as opposed to window boundaries.
> > >> > > > > > > >>>>>>>
> > >> > > > > > > >>>>>>> However during unification it is important to know
> > if
> > >> > these
> > >> > > > > > control
> > >> > > > > > > >>>>>>> tuples have been received from all upstream
> > partitions
> > >> > > before
> > >> > > > > > > >>>>>>> proceeding with a control operation. One could
> wait
> > >> till
> > >> > > end
> > >> > > > of
> > >> > > > > > the
> > >> > > > > > > >>>>>>> window but that introduces a delay however small,
> I
> > >> would
> > >> > > > like
> > >> > > > > to
> > >> > > > > > > add
> > >> > > > > > > >>>>>>> to the proposal that the platform only hand over
> the
> > >> > > control
> > >> > > > > > tuple
> > >> > > > > > > to
> > >> > > > > > > >>>>>>> the unifier when it has been received from all
> > upstream
> > >> > > > > > partitions
> > >> > > > > > > >>>>>>> much like how end window is processed but not wait
> > till
> > >> > the
> > >> > > > > > actual
> > >> > > > > > > >>>>>>>
> > >> > > > > > > >>>>>> end
> > >> > > > > > > >>>
> > >> > > > > > > >>>> of the window.
> > >> > > > > > > >>>>>>>
> > >> > > > > > > >>>>>>> Regd your concern about idempotency, we typically
> > care
> > >> > > about
> > >> > > > > > > >>>>>>> idempotency at a window level and doing the above
> > will
> > >> > > still
> > >> > > > > > allow
> > >> > > > > > > >>>>>>>
> > >> > > > > > > >>>>>> the
> > >> > > > > > > >>>
> > >> > > > > > > >>>> operators to preserve that easily.
> > >> > > > > > > >>>>>>>
> > >> > > > > > > >>>>>>> Thanks
> > >> > > > > > > >>>>>>>
> > >> > > > > > > >>>>>>> On Jun 24, 2016, at 11:22 AM, David Yan <
> > >> > > > da...@datatorrent.com
> > >> > > > > > > <javascript:;>>
> > >> > > > > > > >>>>>>>
> > >> > > > > > > >>>>>> wrote:
> > >> > > > > > > >>>
> > >> > > > > > > >>>> Hi all,
> > >> > > > > > > >>>>>>>>
> > >> > > > > > > >>>>>>>> I would like to propose a new feature to the Apex
> > core
> > >> > > > engine
> > >> > > > > --
> > >> > > > > > > the
> > >> > > > > > > >>>>>>>> support of custom control tuples. Currently, we
> > have
> > >> > > control
> > >> > > > > > > tuples
> > >> > > > > > > >>>>>>>>
> > >> > > > > > > >>>>>>> such
> > >> > > > > > > >>>>
> > >> > > > > > > >>>>> as
> > >> > > > > > > >>>>>>>
> > >> > > > > > > >>>>>>> BEGIN_WINDOW, END_WINDOW, CHECKPOINT, and so on,
> > but we
> > >> > > don't
> > >> > > > > > have
> > >> > > > > > > >>>>>>>>
> > >> > > > > > > >>>>>>> the
> > >> > > > > > > >>>
> > >> > > > > > > >>>> support for applications to insert their own control
> > >> tuples.
> > >> > > The
> > >> > > > > way
> > >> > > > > > > >>>>>>>> currently to get around this is to use data
> tuples
> > and
> > >> > > have
> > >> > > > a
> > >> > > > > > > >>>>>>>>
> > >> > > > > > > >>>>>>> separate
> > >> > > > > > > >>>
> > >> > > > > > > >>>> port
> > >> > > > > > > >>>>>>>
> > >> > > > > > > >>>>>>> for such tuples that sends tuples to all
> partitions
> > of
> > >> > the
> > >> > > > > > > >>>>>>>>
> > >> > > > > > > >>>>>>> downstream
> > >> > > > > > > >>>
> > >> > > > > > > >>>> operators, which is not exactly developer friendly.
> > >> > > > > > > >>>>>>>>
> > >> > > > > > > >>>>>>>> We have already seen a number of use cases that
> can
> > >> use
> > >> > > this
> > >> > > > > > > >>>>>>>>
> > >> > > > > > > >>>>>>> feature:
> > >> > > > > > > >>>
> > >> > > > > > > >>>> 1) Batch support: We need to tell all operators of
> the
> > >> > > physical
> > >> > > > > DAG
> > >> > > > > > > >>>>>>>>
> > >> > > > > > > >>>>>>> when
> > >> > > > > > > >>>>
> > >> > > > > > > >>>>> a
> > >> > > > > > > >>>>>>>
> > >> > > > > > > >>>>>>> batch starts and ends, so the operators can do
> > whatever
> > >> > > that
> > >> > > > is
> > >> > > > > > > >>>>>>>>
> > >> > > > > > > >>>>>>> needed
> > >> > > > > > > >>>
> > >> > > > > > > >>>> upon
> > >> > > > > > > >>>>>>>
> > >> > > > > > > >>>>>>> the start or the end of a batch.
> > >> > > > > > > >>>>>>>>
> > >> > > > > > > >>>>>>>> 2) Watermark: To support the concepts of event
> time
> > >> > > > windowing,
> > >> > > > > > the
> > >> > > > > > > >>>>>>>> watermark control tuple is needed to tell which
> > >> windows
> > >> > > > should
> > >> > > > > > be
> > >> > > > > > > >>>>>>>> considered late.
> > >> > > > > > > >>>>>>>>
> > >> > > > > > > >>>>>>>> 3) Changing operator properties: We do have the
> > >> support
> > >> > of
> > >> > > > > > > changing
> > >> > > > > > > >>>>>>>> operator properties on the fly, but with a custom
> > >> > control
> > >> > > > > tuple,
> > >> > > > > > > the
> > >> > > > > > > >>>>>>>> command to change operator properties can be
> window
> > >> > > aligned
> > >> > > > > for
> > >> > > > > > > all
> > >> > > > > > > >>>>>>>> partitions and also across the DAG.
> > >> > > > > > > >>>>>>>>
> > >> > > > > > > >>>>>>>> 4) Recording tuples: Like changing operator
> > >> properties,
> > >> > we
> > >> > > > do
> > >> > > > > > have
> > >> > > > > > > >>>>>>>>
> > >> > > > > > > >>>>>>> this
> > >> > > > > > > >>>>
> > >> > > > > > > >>>>> support now but only at the individual physical
> > operator
> > >> > > level,
> > >> > > > > and
> > >> > > > > > > >>>>>>>>
> > >> > > > > > > >>>>>>>> without
> > >> > > > > > > >>>>>>>
> > >> > > > > > > >>>>>>> control of which window to record tuples for.
> With a
> > >> > custom
> > >> > > > > > control
> > >> > > > > > > >>>>>>>>
> > >> > > > > > > >>>>>>>> tuple,
> > >> > > > > > > >>>>>>>
> > >> > > > > > > >>>>>>> because a control tuple must belong to a window,
> all
> > >> > > > operators
> > >> > > > > in
> > >> > > > > > > >>>>>>>>
> > >> > > > > > > >>>>>>> the
> > >> > > > > > > >>>
> > >> > > > > > > >>>> DAG
> > >> > > > > > > >>>>>>>> can start (and stop) recording for the same
> > windows.
> > >> > > > > > > >>>>>>>>
> > >> > > > > > > >>>>>>>> I can think of two options to achieve this:
> > >> > > > > > > >>>>>>>>
> > >> > > > > > > >>>>>>>> 1) new custom control tuple type that takes
> user's
> > >> > > > > serializable
> > >> > > > > > > >>>>>>>>
> > >> > > > > > > >>>>>>> object.
> > >> > > > > > > >>>>
> > >> > > > > > > >>>>> 2) piggy back the current BEGIN_WINDOW and
> END_WINDOW
> > >> > control
> > >> > > > > > > >>>>>>>>
> > >> > > > > > > >>>>>>> tuples.
> > >> > > > > > > >>>
> > >> > > > > > > >>>> Please provide your feedback. Thank you.
> > >> > > > > > > >>>>>>>>
> > >> > > > > > > >>>>>>>> David
> > >> > > > > > > >>>>>>>>
> > >> > > > > > > >>>>>>>>
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
>

Reply via email to