Good question Tushar. The callback should be called only once.
The way to implement this is to keep a list of control tuple hashes for the
given streaming window and only do the callback when the operator has not
seen it before.

Other thoughts?

David

On Thu, Nov 10, 2016 at 9:32 AM, Tushar Gosavi <tus...@datatorrent.com>
wrote:

> Hi David,
>
> What would be the behaviour in case where we have a DAG with following
> operators, the number in bracket is number of partitions, X is NxM
> partitioning.
> A(1) X B(4) X C(2)
>
> If A sends a control tuple, it will be sent to all 4 partition of B,
> and from each partition from B it goes to C, i.e each partition of C
> will receive same control tuple originated from A multiple times
> (number of upstream partitions of C). In this case will the callback
> function get called multiple times or just once.
>
> -Tushar.
>
>
> On Fri, Nov 4, 2016 at 12:14 AM, David Yan <da...@datatorrent.com> wrote:
> > Hi Bhupesh,
> >
> > Since each input port has its own incoming control tuple, I would imagine
> > there would be an additional DefaultInputPort.processControl method that
> > operator developers can override.
> > If we go for option 1, my thinking is that the control tuples would
> always
> > be delivered at the next window boundary, even if the emit method is
> called
> > within a window.
> >
> > David
> >
> > On Thu, Nov 3, 2016 at 1:46 AM, Bhupesh Chawda <bhup...@datatorrent.com>
> > wrote:
> >
> >> I have a question regarding the callback for a control tuple. Will it be
> >> similar to InputPort::process() method? Something like
> >> InputPort::processControlTuple(t)
> >> ? Or will it be a method of the operator similar to beginWindow()?
> >>
> >> When we say that the control tuple will be delivered at window boundary,
> >> does that mean all control tuples emitted in that window will be
> processed
> >> together at the end of the window? This would imply that there is no
> >> ordering among regular tuples and control tuples.
> >>
> >> I think we should get started with the option 1 - control tuples at
> window
> >> boundary, which seems to handle most of the use cases. For some cases
> which
> >> require option 2, we can always build on this.
> >>
> >> ~ Bhupesh
> >>
> >> On Thu, Nov 3, 2016 at 1:35 PM, Thomas Weise <t...@apache.org> wrote:
> >>
> >> > I don't see how that would work. Suppose you have a file splitter and
> >> > multiple partitions of block readers. The "end of file" event cannot
> be
> >> > processed downstream until all block readers are done. I also think
> that
> >> > this is related to the batch demarcation discussion and there should
> be a
> >> > single generalized mechanism to support this.
> >> >
> >> >
> >> > On Wed, Nov 2, 2016 at 10:51 PM, Pramod Immaneni <
> pra...@datatorrent.com
> >> >
> >> > wrote:
> >> >
> >> > > Suppose I am processing data in a file and I want to do something at
> >> the
> >> > > end of a file at the output operator, I would send an end file
> control
> >> > > tuple and act on it when I receive it at the output. In a single
> >> window I
> >> > > may end up processing multiple files and if I don't have multiple
> ports
> >> > and
> >> > > logical paths through the DAG (multiple partitions are ok). I can
> >> process
> >> > > end of each file immediately and also know what file was closed
> without
> >> > > sending extra identification information in the end file which I
> would
> >> > need
> >> > > if I am collecting all of them and processing at the end of the
> window.
> >> > >
> >> > > On Wed, Nov 2, 2016 at 2:45 PM, Thomas Weise <t...@apache.org>
> wrote:
> >> > >
> >> > > > The use cases listed in the original discussion don't call for
> option
> >> > 2.
> >> > > It
> >> > > > seems to come with additional complexity and implementation cost.
> >> > > >
> >> > > > Can those in favor of option 2 please also provide the use case
> for
> >> it.
> >> > > >
> >> > > > Thanks,
> >> > > > Thomas
> >> > > >
> >> > > >
> >> > > > On Wed, Nov 2, 2016 at 10:36 PM, Siyuan Hua <
> siy...@datatorrent.com>
> >> > > > wrote:
> >> > > >
> >> > > > > I will vote for approach 1.
> >> > > > >
> >> > > > > First of all that one sounds easier to do to me. And I think
> >> > > idempotency
> >> > > > is
> >> > > > > important. It may run at the cost of higher latency but I think
> it
> >> is
> >> > > ok
> >> > > > >
> >> > > > > And in addition, when in the future if users do need realtime
> >> control
> >> > > > tuple
> >> > > > > processing, we can always add the option on top of it.
> >> > > > >
> >> > > > > So I vote for 1
> >> > > > >
> >> > > > > Thanks,
> >> > > > > Siyuan
> >> > > > >
> >> > > > > On Wed, Nov 2, 2016 at 1:28 PM, Pradeep A. Dalvi <
> p...@apache.org>
> >> > > > wrote:
> >> > > > >
> >> > > > > > As a rule of thumb in any real time operating system, control
> >> > tuples
> >> > > > > should
> >> > > > > > always be handled using Priority Queues.
> >> > > > > >
> >> > > > > > We may try to control priorities by defining levels. And shall
> >> not
> >> > > > > > be delivered at window boundaries.
> >> > > > > >
> >> > > > > > In short, control tuples shall never be treated as any other
> >> tuples
> >> > > in
> >> > > > > real
> >> > > > > > time systems.
> >> > > > > >
> >> > > > > > On Thursday, November 3, 2016, David Yan <
> da...@datatorrent.com>
> >> > > > wrote:
> >> > > > > >
> >> > > > > > > Hi all,
> >> > > > > > >
> >> > > > > > > I would like to renew the discussion of control tuples.
> >> > > > > > >
> >> > > > > > > Last time, we were in a debate about whether:
> >> > > > > > >
> >> > > > > > > 1) the platform should enforce that control tuples are
> >> delivered
> >> > at
> >> > > > > > window
> >> > > > > > > boundaries only
> >> > > > > > >
> >> > > > > > > or:
> >> > > > > > >
> >> > > > > > > 2) the platform should deliver control tuples just as other
> >> > tuples
> >> > > > and
> >> > > > > > it's
> >> > > > > > > the operator developers' choice whether to handle the
> control
> >> > > tuples
> >> > > > as
> >> > > > > > > they arrive or delay the processing till the next window
> >> > boundary.
> >> > > > > > >
> >> > > > > > > To summarize the pros and cons:
> >> > > > > > >
> >> > > > > > > Approach 1: If processing control tuples results in changes
> of
> >> > the
> >> > > > > > behavior
> >> > > > > > > of the operator, if idempotency needs to be preserved, the
> >> > > processing
> >> > > > > > must
> >> > > > > > > be done at window boundaries. This approach will save the
> >> > operator
> >> > > > > > > developers headache to ensure that. However, this will take
> >> away
> >> > > the
> >> > > > > > > choices from the operator developer if they just need to
> >> process
> >> > > the
> >> > > > > > > control tuples as soon as possible.
> >> > > > > > >
> >> > > > > > > Approach 2: The operator has a chance to immediately process
> >> > > control
> >> > > > > > > tuples. This would be useful if latency is more valued than
> >> > > > > correctness.
> >> > > > > > > However, if this would open the possibility for operator
> >> > developers
> >> > > > to
> >> > > > > > > shoot themselves in the foot. This is especially true if
> there
> >> > are
> >> > > > > > multiple
> >> > > > > > > input ports. as there is no easy way to guarantee processing
> >> > order
> >> > > > for
> >> > > > > > > multiple input ports.
> >> > > > > > >
> >> > > > > > > We would like to arrive to a consensus and close this
> >> discussion
> >> > > soon
> >> > > > > > this
> >> > > > > > > time so we can start the work on this important feature.
> >> > > > > > >
> >> > > > > > > Thanks!
> >> > > > > > >
> >> > > > > > > David
> >> > > > > > >
> >> > > > > > > On Tue, Jun 28, 2016 at 10:04 AM, Vlad Rozov <
> >> > > > v.ro...@datatorrent.com
> >> > > > > > > <javascript:;>>
> >> > > > > > > wrote:
> >> > > > > > >
> >> > > > > > > > It is not clear how operator will emit custom control
> tuple
> >> at
> >> > > > window
> >> > > > > > > > boundaries. One way is to cache/accumulate control tuples
> in
> >> > the
> >> > > > > > operator
> >> > > > > > > > output port till window closes (END_WINDOW is inserted
> into
> >> the
> >> > > > > output
> >> > > > > > > > sink) or only allow an operator to emit control tuples
> inside
> >> > the
> >> > > > > > > > endWindow(). The later is a slight variation of the
> operator
> >> > > output
> >> > > > > > port
> >> > > > > > > > caching behavior with the only difference that now the
> >> operator
> >> > > > > itself
> >> > > > > > is
> >> > > > > > > > responsible for caching/accumulating control tuples. Note
> >> that
> >> > in
> >> > > > > many
> >> > > > > > > > cases it will be necessary to postpone emitting payload
> >> tuples
> >> > > that
> >> > > > > > > > logically come after the custom control tuple till the
> next
> >> > > window
> >> > > > > > > begins.
> >> > > > > > > >
> >> > > > > > > > IMO, that too restrictive and in a case where input
> operator
> >> > > uses a
> >> > > > > > push
> >> > > > > > > > instead of a poll (for example, it provides an end point
> >> where
> >> > > > remote
> >> > > > > > > > agents may connect and publish/push data), control tuples
> may
> >> > be
> >> > > > used
> >> > > > > > for
> >> > > > > > > > connect/disconnect/watermark broadcast to (partitioned)
> >> > > downstream
> >> > > > > > > > operators. In this case the platform just need to
> guarantee
> >> > order
> >> > > > > > barrier
> >> > > > > > > > (any tuple emitted prior to a control tuple needs to be
> >> > delivered
> >> > > > > prior
> >> > > > > > > to
> >> > > > > > > > the control tuple).
> >> > > > > > > >
> >> > > > > > > > Thank you,
> >> > > > > > > >
> >> > > > > > > > Vlad
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > > On 6/27/16 19:36, Amol Kekre wrote:
> >> > > > > > > >
> >> > > > > > > >> I agree with David. Allowing control tuples within a
> window
> >> > > (along
> >> > > > > > with
> >> > > > > > > >> data tuples) creates very dangerous situation where
> >> guarantees
> >> > > are
> >> > > > > > > >> impacted. It is much safer to enable control tuples
> >> > > (send/receive)
> >> > > > > at
> >> > > > > > > >> window boundaries (after END_WINDOW of window N, and
> before
> >> > > > > > BEGIN_WINDOW
> >> > > > > > > >> for window N+1). My take on David's list is
> >> > > > > > > >>
> >> > > > > > > >> 1. -> window boundaries -> Strong +1; there will be a big
> >> > issue
> >> > > > with
> >> > > > > > > >> guarantees for operators with multiple ports. (see
> Thomas's
> >> > > > > response)
> >> > > > > > > >> 2. -> All downstream windows -> +1, but there are
> >> situations;
> >> > a
> >> > > > > caveat
> >> > > > > > > >> could be "only to operators that implement control tuple
> >> > > > > > > >> interface/listeners", which could effectively translates
> to
> >> > "all
> >> > > > > > > >> interested
> >> > > > > > > >> downstream operators"
> >> > > > > > > >> 3. Only Input operator can create control tuples -> -1;
> is
> >> > > > > restrictive
> >> > > > > > > >> even
> >> > > > > > > >> though most likely 95% of the time it will be input
> >> operators
> >> > > > > > > >>
> >> > > > > > > >> Thks,
> >> > > > > > > >> Amol
> >> > > > > > > >>
> >> > > > > > > >>
> >> > > > > > > >> On Mon, Jun 27, 2016 at 4:37 PM, Thomas Weise <
> >> > > > > tho...@datatorrent.com
> >> > > > > > > <javascript:;>>
> >> > > > > > > >> wrote:
> >> > > > > > > >>
> >> > > > > > > >> The windowing we discuss here is in general event time
> >> based,
> >> > > > > arrival
> >> > > > > > > time
> >> > > > > > > >>> is a special case of it.
> >> > > > > > > >>>
> >> > > > > > > >>> I don't think state changes can be made independent of
> the
> >> > > > > streaming
> >> > > > > > > >>> window
> >> > > > > > > >>> boundary as it would prevent idempotent processing and
> >> > > > transitively
> >> > > > > > > >>> exactly
> >> > > > > > > >>> once. For that to work, tuples need to be presented to
> the
> >> > > > operator
> >> > > > > > in
> >> > > > > > > a
> >> > > > > > > >>> guaranteed order *within* the streaming window, which is
> >> not
> >> > > > > possible
> >> > > > > > > >>> with
> >> > > > > > > >>> multiple ports (and partitions).
> >> > > > > > > >>>
> >> > > > > > > >>> Thomas
> >> > > > > > > >>>
> >> > > > > > > >>> On Mon, Jun 27, 2016 at 2:53 PM, David Yan <
> >> > > > da...@datatorrent.com
> >> > > > > > > <javascript:;>>
> >> > > > > > > >>> wrote:
> >> > > > > > > >>>
> >> > > > > > > >>> I think for session tracking, if the session boundaries
> are
> >> > > > allowed
> >> > > > > > to
> >> > > > > > > be
> >> > > > > > > >>>> not aligned with the streaming window boundaries, the
> user
> >> > > will
> >> > > > > > have a
> >> > > > > > > >>>>
> >> > > > > > > >>> much
> >> > > > > > > >>>
> >> > > > > > > >>>> bigger problem with idempotency. And in most cases,
> >> session
> >> > > > > tracking
> >> > > > > > > is
> >> > > > > > > >>>> event time based, not ingression time or processing
> time
> >> > > based,
> >> > > > so
> >> > > > > > > this
> >> > > > > > > >>>>
> >> > > > > > > >>> may
> >> > > > > > > >>>
> >> > > > > > > >>>> never be a problem. But if that ever happens, the user
> can
> >> > > > always
> >> > > > > > > alter
> >> > > > > > > >>>>
> >> > > > > > > >>> the
> >> > > > > > > >>>
> >> > > > > > > >>>> default 500ms width.
> >> > > > > > > >>>>
> >> > > > > > > >>>> David
> >> > > > > > > >>>>
> >> > > > > > > >>>> On Mon, Jun 27, 2016 at 2:35 PM, Vlad Rozov <
> >> > > > > > v.ro...@datatorrent.com
> >> > > > > > > <javascript:;>>
> >> > > > > > > >>>> wrote:
> >> > > > > > > >>>>
> >> > > > > > > >>>> Ability to send custom control tuples within window
> may be
> >> > > > useful,
> >> > > > > > for
> >> > > > > > > >>>>> example, for sessions tracking, where session
> boundaries
> >> > are
> >> > > > not
> >> > > > > > > >>>>>
> >> > > > > > > >>>> aligned
> >> > > > > > > >>>
> >> > > > > > > >>>> with window boundaries and 500 ms latency is not
> >> acceptable
> >> > > for
> >> > > > an
> >> > > > > > > >>>>> application.
> >> > > > > > > >>>>>
> >> > > > > > > >>>>> Thank you,
> >> > > > > > > >>>>>
> >> > > > > > > >>>>> Vlad
> >> > > > > > > >>>>>
> >> > > > > > > >>>>>
> >> > > > > > > >>>>> On 6/25/16 10:52, Thomas Weise wrote:
> >> > > > > > > >>>>>
> >> > > > > > > >>>>> It should not matter from where the control tuple is
> >> > > triggered.
> >> > > > > It
> >> > > > > > > >>>>>>
> >> > > > > > > >>>>> will
> >> > > > > > > >>>
> >> > > > > > > >>>> be
> >> > > > > > > >>>>
> >> > > > > > > >>>>> good to have a generic mechanism to propagate it and
> >> other
> >> > > > things
> >> > > > > > can
> >> > > > > > > >>>>>>
> >> > > > > > > >>>>> be
> >> > > > > > > >>>
> >> > > > > > > >>>> accomplished outside the engine. For example, the new
> >> > > > > comprehensive
> >> > > > > > > >>>>>> support
> >> > > > > > > >>>>>> for windowing will all be in Malhar, nothing that the
> >> > engine
> >> > > > > needs
> >> > > > > > > to
> >> > > > > > > >>>>>>
> >> > > > > > > >>>>> know
> >> > > > > > > >>>>
> >> > > > > > > >>>>> about it except that we need the control tuple for
> >> > watermark
> >> > > > > > > >>>>>>
> >> > > > > > > >>>>> propagation
> >> > > > > > > >>>
> >> > > > > > > >>>> and idempotent processing.
> >> > > > > > > >>>>>>
> >> > > > > > > >>>>>> I also think the main difference to other tuples is
> the
> >> > need
> >> > > > to
> >> > > > > > send
> >> > > > > > > >>>>>>
> >> > > > > > > >>>>> it
> >> > > > > > > >>>
> >> > > > > > > >>>> to
> >> > > > > > > >>>>
> >> > > > > > > >>>>> all partitions. Which is similar to checkpoint window
> >> > tuples,
> >> > > > but
> >> > > > > > not
> >> > > > > > > >>>>>>
> >> > > > > > > >>>>> the
> >> > > > > > > >>>>
> >> > > > > > > >>>>> same. Here, we probably also need the ability for the
> >> user
> >> > to
> >> > > > > > control
> >> > > > > > > >>>>>> whether such tuple should traverse the entire DAG or
> >> not.
> >> > > For
> >> > > > a
> >> > > > > > > batch
> >> > > > > > > >>>>>>
> >> > > > > > > >>>>> use
> >> > > > > > > >>>>
> >> > > > > > > >>>>> case, for example, we may want to send the end of
> file to
> >> > the
> >> > > > > next
> >> > > > > > > >>>>>> operator, but not beyond, if the operator has
> >> asynchronous
> >> > > > > > > processing
> >> > > > > > > >>>>>> logic
> >> > > > > > > >>>>>> in it.
> >> > > > > > > >>>>>>
> >> > > > > > > >>>>>> For any logic to be idempotent, the control tuple
> needs
> >> to
> >> > > be
> >> > > > > > > >>>>>>
> >> > > > > > > >>>>> processed
> >> > > > > > > >>>
> >> > > > > > > >>>> at
> >> > > > > > > >>>>
> >> > > > > > > >>>>> a window boundary. Receiving the control tuple in the
> >> > window
> >> > > > > > callback
> >> > > > > > > >>>>>> would
> >> > > > > > > >>>>>> avoid having to track extra state in the operator. I
> >> don't
> >> > > > think
> >> > > > > > > >>>>>>
> >> > > > > > > >>>>> that's
> >> > > > > > > >>>
> >> > > > > > > >>>> a
> >> > > > > > > >>>>
> >> > > > > > > >>>>> major issue, but what is the use case for processing a
> >> > > control
> >> > > > > > tuple
> >> > > > > > > >>>>>> within
> >> > > > > > > >>>>>> the window?
> >> > > > > > > >>>>>>
> >> > > > > > > >>>>>> Thomas
> >> > > > > > > >>>>>>
> >> > > > > > > >>>>>>
> >> > > > > > > >>>>>>
> >> > > > > > > >>>>>> On Sat, Jun 25, 2016 at 6:19 AM, Pramod Immaneni <
> >> > > > > > > >>>>>>
> >> > > > > > > >>>>> pra...@datatorrent.com <javascript:;>>
> >> > > > > > > >>>>
> >> > > > > > > >>>>> wrote:
> >> > > > > > > >>>>>>
> >> > > > > > > >>>>>> For the use cases you mentioned, I think 1) and 2)
> are
> >> > more
> >> > > > > likely
> >> > > > > > > to
> >> > > > > > > >>>>>>
> >> > > > > > > >>>>>>> be controlled directly by the application, 3) and 4)
> >> are
> >> > > more
> >> > > > > > > likely
> >> > > > > > > >>>>>>> going to be triggered externally and directly
> handled
> >> by
> >> > > the
> >> > > > > > engine
> >> > > > > > > >>>>>>> and 3) is already being implemented that way
> >> > > (apexcore-163).
> >> > > > > > > >>>>>>>
> >> > > > > > > >>>>>>> The control tuples emitted by an operator would be
> sent
> >> > to
> >> > > > all
> >> > > > > > > >>>>>>> downstream partitions isn't it and that would be the
> >> > chief
> >> > > > > > > >>>>>>>
> >> > > > > > > >>>>>> distinction
> >> > > > > > > >>>
> >> > > > > > > >>>> compared to data (apart from the payload) which would
> get
> >> > > > > > partitioned
> >> > > > > > > >>>>>>> under normal circumstances. It would also be
> guaranteed
> >> > > that
> >> > > > > > > >>>>>>> downstream partitions will receive control tuples
> only
> >> > > after
> >> > > > > the
> >> > > > > > > data
> >> > > > > > > >>>>>>> that was sent before it so we could send it
> immediately
> >> > > when
> >> > > > it
> >> > > > > > is
> >> > > > > > > >>>>>>> emitted as opposed to window boundaries.
> >> > > > > > > >>>>>>>
> >> > > > > > > >>>>>>> However during unification it is important to know
> if
> >> > these
> >> > > > > > control
> >> > > > > > > >>>>>>> tuples have been received from all upstream
> partitions
> >> > > before
> >> > > > > > > >>>>>>> proceeding with a control operation. One could wait
> >> till
> >> > > end
> >> > > > of
> >> > > > > > the
> >> > > > > > > >>>>>>> window but that introduces a delay however small, I
> >> would
> >> > > > like
> >> > > > > to
> >> > > > > > > add
> >> > > > > > > >>>>>>> to the proposal that the platform only hand over the
> >> > > control
> >> > > > > > tuple
> >> > > > > > > to
> >> > > > > > > >>>>>>> the unifier when it has been received from all
> upstream
> >> > > > > > partitions
> >> > > > > > > >>>>>>> much like how end window is processed but not wait
> till
> >> > the
> >> > > > > > actual
> >> > > > > > > >>>>>>>
> >> > > > > > > >>>>>> end
> >> > > > > > > >>>
> >> > > > > > > >>>> of the window.
> >> > > > > > > >>>>>>>
> >> > > > > > > >>>>>>> Regd your concern about idempotency, we typically
> care
> >> > > about
> >> > > > > > > >>>>>>> idempotency at a window level and doing the above
> will
> >> > > still
> >> > > > > > allow
> >> > > > > > > >>>>>>>
> >> > > > > > > >>>>>> the
> >> > > > > > > >>>
> >> > > > > > > >>>> operators to preserve that easily.
> >> > > > > > > >>>>>>>
> >> > > > > > > >>>>>>> Thanks
> >> > > > > > > >>>>>>>
> >> > > > > > > >>>>>>> On Jun 24, 2016, at 11:22 AM, David Yan <
> >> > > > da...@datatorrent.com
> >> > > > > > > <javascript:;>>
> >> > > > > > > >>>>>>>
> >> > > > > > > >>>>>> wrote:
> >> > > > > > > >>>
> >> > > > > > > >>>> Hi all,
> >> > > > > > > >>>>>>>>
> >> > > > > > > >>>>>>>> I would like to propose a new feature to the Apex
> core
> >> > > > engine
> >> > > > > --
> >> > > > > > > the
> >> > > > > > > >>>>>>>> support of custom control tuples. Currently, we
> have
> >> > > control
> >> > > > > > > tuples
> >> > > > > > > >>>>>>>>
> >> > > > > > > >>>>>>> such
> >> > > > > > > >>>>
> >> > > > > > > >>>>> as
> >> > > > > > > >>>>>>>
> >> > > > > > > >>>>>>> BEGIN_WINDOW, END_WINDOW, CHECKPOINT, and so on,
> but we
> >> > > don't
> >> > > > > > have
> >> > > > > > > >>>>>>>>
> >> > > > > > > >>>>>>> the
> >> > > > > > > >>>
> >> > > > > > > >>>> support for applications to insert their own control
> >> tuples.
> >> > > The
> >> > > > > way
> >> > > > > > > >>>>>>>> currently to get around this is to use data tuples
> and
> >> > > have
> >> > > > a
> >> > > > > > > >>>>>>>>
> >> > > > > > > >>>>>>> separate
> >> > > > > > > >>>
> >> > > > > > > >>>> port
> >> > > > > > > >>>>>>>
> >> > > > > > > >>>>>>> for such tuples that sends tuples to all partitions
> of
> >> > the
> >> > > > > > > >>>>>>>>
> >> > > > > > > >>>>>>> downstream
> >> > > > > > > >>>
> >> > > > > > > >>>> operators, which is not exactly developer friendly.
> >> > > > > > > >>>>>>>>
> >> > > > > > > >>>>>>>> We have already seen a number of use cases that can
> >> use
> >> > > this
> >> > > > > > > >>>>>>>>
> >> > > > > > > >>>>>>> feature:
> >> > > > > > > >>>
> >> > > > > > > >>>> 1) Batch support: We need to tell all operators of the
> >> > > physical
> >> > > > > DAG
> >> > > > > > > >>>>>>>>
> >> > > > > > > >>>>>>> when
> >> > > > > > > >>>>
> >> > > > > > > >>>>> a
> >> > > > > > > >>>>>>>
> >> > > > > > > >>>>>>> batch starts and ends, so the operators can do
> whatever
> >> > > that
> >> > > > is
> >> > > > > > > >>>>>>>>
> >> > > > > > > >>>>>>> needed
> >> > > > > > > >>>
> >> > > > > > > >>>> upon
> >> > > > > > > >>>>>>>
> >> > > > > > > >>>>>>> the start or the end of a batch.
> >> > > > > > > >>>>>>>>
> >> > > > > > > >>>>>>>> 2) Watermark: To support the concepts of event time
> >> > > > windowing,
> >> > > > > > the
> >> > > > > > > >>>>>>>> watermark control tuple is needed to tell which
> >> windows
> >> > > > should
> >> > > > > > be
> >> > > > > > > >>>>>>>> considered late.
> >> > > > > > > >>>>>>>>
> >> > > > > > > >>>>>>>> 3) Changing operator properties: We do have the
> >> support
> >> > of
> >> > > > > > > changing
> >> > > > > > > >>>>>>>> operator properties on the fly, but with a custom
> >> > control
> >> > > > > tuple,
> >> > > > > > > the
> >> > > > > > > >>>>>>>> command to change operator properties can be window
> >> > > aligned
> >> > > > > for
> >> > > > > > > all
> >> > > > > > > >>>>>>>> partitions and also across the DAG.
> >> > > > > > > >>>>>>>>
> >> > > > > > > >>>>>>>> 4) Recording tuples: Like changing operator
> >> properties,
> >> > we
> >> > > > do
> >> > > > > > have
> >> > > > > > > >>>>>>>>
> >> > > > > > > >>>>>>> this
> >> > > > > > > >>>>
> >> > > > > > > >>>>> support now but only at the individual physical
> operator
> >> > > level,
> >> > > > > and
> >> > > > > > > >>>>>>>>
> >> > > > > > > >>>>>>>> without
> >> > > > > > > >>>>>>>
> >> > > > > > > >>>>>>> control of which window to record tuples for. With a
> >> > custom
> >> > > > > > control
> >> > > > > > > >>>>>>>>
> >> > > > > > > >>>>>>>> tuple,
> >> > > > > > > >>>>>>>
> >> > > > > > > >>>>>>> because a control tuple must belong to a window, all
> >> > > > operators
> >> > > > > in
> >> > > > > > > >>>>>>>>
> >> > > > > > > >>>>>>> the
> >> > > > > > > >>>
> >> > > > > > > >>>> DAG
> >> > > > > > > >>>>>>>> can start (and stop) recording for the same
> windows.
> >> > > > > > > >>>>>>>>
> >> > > > > > > >>>>>>>> I can think of two options to achieve this:
> >> > > > > > > >>>>>>>>
> >> > > > > > > >>>>>>>> 1) new custom control tuple type that takes user's
> >> > > > > serializable
> >> > > > > > > >>>>>>>>
> >> > > > > > > >>>>>>> object.
> >> > > > > > > >>>>
> >> > > > > > > >>>>> 2) piggy back the current BEGIN_WINDOW and END_WINDOW
> >> > control
> >> > > > > > > >>>>>>>>
> >> > > > > > > >>>>>>> tuples.
> >> > > > > > > >>>
> >> > > > > > > >>>> Please provide your feedback. Thank you.
> >> > > > > > > >>>>>>>>
> >> > > > > > > >>>>>>>> David
> >> > > > > > > >>>>>>>>
> >> > > > > > > >>>>>>>>
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
>

Reply via email to