Hi Bhupesh,

Since each input port has its own incoming control tuple, I would imagine
there would be an additional DefaultInputPort.processControl method that
operator developers can override.
If we go for option 1, my thinking is that the control tuples would always
be delivered at the next window boundary, even if the emit method is called
within a window.

David

On Thu, Nov 3, 2016 at 1:46 AM, Bhupesh Chawda <bhup...@datatorrent.com>
wrote:

> I have a question regarding the callback for a control tuple. Will it be
> similar to InputPort::process() method? Something like
> InputPort::processControlTuple(t)
> ? Or will it be a method of the operator similar to beginWindow()?
>
> When we say that the control tuple will be delivered at window boundary,
> does that mean all control tuples emitted in that window will be processed
> together at the end of the window? This would imply that there is no
> ordering among regular tuples and control tuples.
>
> I think we should get started with the option 1 - control tuples at window
> boundary, which seems to handle most of the use cases. For some cases which
> require option 2, we can always build on this.
>
> ~ Bhupesh
>
> On Thu, Nov 3, 2016 at 1:35 PM, Thomas Weise <t...@apache.org> wrote:
>
> > I don't see how that would work. Suppose you have a file splitter and
> > multiple partitions of block readers. The "end of file" event cannot be
> > processed downstream until all block readers are done. I also think that
> > this is related to the batch demarcation discussion and there should be a
> > single generalized mechanism to support this.
> >
> >
> > On Wed, Nov 2, 2016 at 10:51 PM, Pramod Immaneni <pra...@datatorrent.com
> >
> > wrote:
> >
> > > Suppose I am processing data in a file and I want to do something at
> the
> > > end of a file at the output operator, I would send an end file control
> > > tuple and act on it when I receive it at the output. In a single
> window I
> > > may end up processing multiple files and if I don't have multiple ports
> > and
> > > logical paths through the DAG (multiple partitions are ok). I can
> process
> > > end of each file immediately and also know what file was closed without
> > > sending extra identification information in the end file which I would
> > need
> > > if I am collecting all of them and processing at the end of the window.
> > >
> > > On Wed, Nov 2, 2016 at 2:45 PM, Thomas Weise <t...@apache.org> wrote:
> > >
> > > > The use cases listed in the original discussion don't call for option
> > 2.
> > > It
> > > > seems to come with additional complexity and implementation cost.
> > > >
> > > > Can those in favor of option 2 please also provide the use case for
> it.
> > > >
> > > > Thanks,
> > > > Thomas
> > > >
> > > >
> > > > On Wed, Nov 2, 2016 at 10:36 PM, Siyuan Hua <siy...@datatorrent.com>
> > > > wrote:
> > > >
> > > > > I will vote for approach 1.
> > > > >
> > > > > First of all that one sounds easier to do to me. And I think
> > > idempotency
> > > > is
> > > > > important. It may run at the cost of higher latency but I think it
> is
> > > ok
> > > > >
> > > > > And in addition, when in the future if users do need realtime
> control
> > > > tuple
> > > > > processing, we can always add the option on top of it.
> > > > >
> > > > > So I vote for 1
> > > > >
> > > > > Thanks,
> > > > > Siyuan
> > > > >
> > > > > On Wed, Nov 2, 2016 at 1:28 PM, Pradeep A. Dalvi <p...@apache.org>
> > > > wrote:
> > > > >
> > > > > > As a rule of thumb in any real time operating system, control
> > tuples
> > > > > should
> > > > > > always be handled using Priority Queues.
> > > > > >
> > > > > > We may try to control priorities by defining levels. And shall
> not
> > > > > > be delivered at window boundaries.
> > > > > >
> > > > > > In short, control tuples shall never be treated as any other
> tuples
> > > in
> > > > > real
> > > > > > time systems.
> > > > > >
> > > > > > On Thursday, November 3, 2016, David Yan <da...@datatorrent.com>
> > > > wrote:
> > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > > I would like to renew the discussion of control tuples.
> > > > > > >
> > > > > > > Last time, we were in a debate about whether:
> > > > > > >
> > > > > > > 1) the platform should enforce that control tuples are
> delivered
> > at
> > > > > > window
> > > > > > > boundaries only
> > > > > > >
> > > > > > > or:
> > > > > > >
> > > > > > > 2) the platform should deliver control tuples just as other
> > tuples
> > > > and
> > > > > > it's
> > > > > > > the operator developers' choice whether to handle the control
> > > tuples
> > > > as
> > > > > > > they arrive or delay the processing till the next window
> > boundary.
> > > > > > >
> > > > > > > To summarize the pros and cons:
> > > > > > >
> > > > > > > Approach 1: If processing control tuples results in changes of
> > the
> > > > > > behavior
> > > > > > > of the operator, if idempotency needs to be preserved, the
> > > processing
> > > > > > must
> > > > > > > be done at window boundaries. This approach will save the
> > operator
> > > > > > > developers headache to ensure that. However, this will take
> away
> > > the
> > > > > > > choices from the operator developer if they just need to
> process
> > > the
> > > > > > > control tuples as soon as possible.
> > > > > > >
> > > > > > > Approach 2: The operator has a chance to immediately process
> > > control
> > > > > > > tuples. This would be useful if latency is more valued than
> > > > > correctness.
> > > > > > > However, if this would open the possibility for operator
> > developers
> > > > to
> > > > > > > shoot themselves in the foot. This is especially true if there
> > are
> > > > > > multiple
> > > > > > > input ports. as there is no easy way to guarantee processing
> > order
> > > > for
> > > > > > > multiple input ports.
> > > > > > >
> > > > > > > We would like to arrive to a consensus and close this
> discussion
> > > soon
> > > > > > this
> > > > > > > time so we can start the work on this important feature.
> > > > > > >
> > > > > > > Thanks!
> > > > > > >
> > > > > > > David
> > > > > > >
> > > > > > > On Tue, Jun 28, 2016 at 10:04 AM, Vlad Rozov <
> > > > v.ro...@datatorrent.com
> > > > > > > <javascript:;>>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > It is not clear how operator will emit custom control tuple
> at
> > > > window
> > > > > > > > boundaries. One way is to cache/accumulate control tuples in
> > the
> > > > > > operator
> > > > > > > > output port till window closes (END_WINDOW is inserted into
> the
> > > > > output
> > > > > > > > sink) or only allow an operator to emit control tuples inside
> > the
> > > > > > > > endWindow(). The later is a slight variation of the operator
> > > output
> > > > > > port
> > > > > > > > caching behavior with the only difference that now the
> operator
> > > > > itself
> > > > > > is
> > > > > > > > responsible for caching/accumulating control tuples. Note
> that
> > in
> > > > > many
> > > > > > > > cases it will be necessary to postpone emitting payload
> tuples
> > > that
> > > > > > > > logically come after the custom control tuple till the next
> > > window
> > > > > > > begins.
> > > > > > > >
> > > > > > > > IMO, that too restrictive and in a case where input operator
> > > uses a
> > > > > > push
> > > > > > > > instead of a poll (for example, it provides an end point
> where
> > > > remote
> > > > > > > > agents may connect and publish/push data), control tuples may
> > be
> > > > used
> > > > > > for
> > > > > > > > connect/disconnect/watermark broadcast to (partitioned)
> > > downstream
> > > > > > > > operators. In this case the platform just need to guarantee
> > order
> > > > > > barrier
> > > > > > > > (any tuple emitted prior to a control tuple needs to be
> > delivered
> > > > > prior
> > > > > > > to
> > > > > > > > the control tuple).
> > > > > > > >
> > > > > > > > Thank you,
> > > > > > > >
> > > > > > > > Vlad
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On 6/27/16 19:36, Amol Kekre wrote:
> > > > > > > >
> > > > > > > >> I agree with David. Allowing control tuples within a window
> > > (along
> > > > > > with
> > > > > > > >> data tuples) creates very dangerous situation where
> guarantees
> > > are
> > > > > > > >> impacted. It is much safer to enable control tuples
> > > (send/receive)
> > > > > at
> > > > > > > >> window boundaries (after END_WINDOW of window N, and before
> > > > > > BEGIN_WINDOW
> > > > > > > >> for window N+1). My take on David's list is
> > > > > > > >>
> > > > > > > >> 1. -> window boundaries -> Strong +1; there will be a big
> > issue
> > > > with
> > > > > > > >> guarantees for operators with multiple ports. (see Thomas's
> > > > > response)
> > > > > > > >> 2. -> All downstream windows -> +1, but there are
> situations;
> > a
> > > > > caveat
> > > > > > > >> could be "only to operators that implement control tuple
> > > > > > > >> interface/listeners", which could effectively translates to
> > "all
> > > > > > > >> interested
> > > > > > > >> downstream operators"
> > > > > > > >> 3. Only Input operator can create control tuples -> -1; is
> > > > > restrictive
> > > > > > > >> even
> > > > > > > >> though most likely 95% of the time it will be input
> operators
> > > > > > > >>
> > > > > > > >> Thks,
> > > > > > > >> Amol
> > > > > > > >>
> > > > > > > >>
> > > > > > > >> On Mon, Jun 27, 2016 at 4:37 PM, Thomas Weise <
> > > > > tho...@datatorrent.com
> > > > > > > <javascript:;>>
> > > > > > > >> wrote:
> > > > > > > >>
> > > > > > > >> The windowing we discuss here is in general event time
> based,
> > > > > arrival
> > > > > > > time
> > > > > > > >>> is a special case of it.
> > > > > > > >>>
> > > > > > > >>> I don't think state changes can be made independent of the
> > > > > streaming
> > > > > > > >>> window
> > > > > > > >>> boundary as it would prevent idempotent processing and
> > > > transitively
> > > > > > > >>> exactly
> > > > > > > >>> once. For that to work, tuples need to be presented to the
> > > > operator
> > > > > > in
> > > > > > > a
> > > > > > > >>> guaranteed order *within* the streaming window, which is
> not
> > > > > possible
> > > > > > > >>> with
> > > > > > > >>> multiple ports (and partitions).
> > > > > > > >>>
> > > > > > > >>> Thomas
> > > > > > > >>>
> > > > > > > >>> On Mon, Jun 27, 2016 at 2:53 PM, David Yan <
> > > > da...@datatorrent.com
> > > > > > > <javascript:;>>
> > > > > > > >>> wrote:
> > > > > > > >>>
> > > > > > > >>> I think for session tracking, if the session boundaries are
> > > > allowed
> > > > > > to
> > > > > > > be
> > > > > > > >>>> not aligned with the streaming window boundaries, the user
> > > will
> > > > > > have a
> > > > > > > >>>>
> > > > > > > >>> much
> > > > > > > >>>
> > > > > > > >>>> bigger problem with idempotency. And in most cases,
> session
> > > > > tracking
> > > > > > > is
> > > > > > > >>>> event time based, not ingression time or processing time
> > > based,
> > > > so
> > > > > > > this
> > > > > > > >>>>
> > > > > > > >>> may
> > > > > > > >>>
> > > > > > > >>>> never be a problem. But if that ever happens, the user can
> > > > always
> > > > > > > alter
> > > > > > > >>>>
> > > > > > > >>> the
> > > > > > > >>>
> > > > > > > >>>> default 500ms width.
> > > > > > > >>>>
> > > > > > > >>>> David
> > > > > > > >>>>
> > > > > > > >>>> On Mon, Jun 27, 2016 at 2:35 PM, Vlad Rozov <
> > > > > > v.ro...@datatorrent.com
> > > > > > > <javascript:;>>
> > > > > > > >>>> wrote:
> > > > > > > >>>>
> > > > > > > >>>> Ability to send custom control tuples within window may be
> > > > useful,
> > > > > > for
> > > > > > > >>>>> example, for sessions tracking, where session boundaries
> > are
> > > > not
> > > > > > > >>>>>
> > > > > > > >>>> aligned
> > > > > > > >>>
> > > > > > > >>>> with window boundaries and 500 ms latency is not
> acceptable
> > > for
> > > > an
> > > > > > > >>>>> application.
> > > > > > > >>>>>
> > > > > > > >>>>> Thank you,
> > > > > > > >>>>>
> > > > > > > >>>>> Vlad
> > > > > > > >>>>>
> > > > > > > >>>>>
> > > > > > > >>>>> On 6/25/16 10:52, Thomas Weise wrote:
> > > > > > > >>>>>
> > > > > > > >>>>> It should not matter from where the control tuple is
> > > triggered.
> > > > > It
> > > > > > > >>>>>>
> > > > > > > >>>>> will
> > > > > > > >>>
> > > > > > > >>>> be
> > > > > > > >>>>
> > > > > > > >>>>> good to have a generic mechanism to propagate it and
> other
> > > > things
> > > > > > can
> > > > > > > >>>>>>
> > > > > > > >>>>> be
> > > > > > > >>>
> > > > > > > >>>> accomplished outside the engine. For example, the new
> > > > > comprehensive
> > > > > > > >>>>>> support
> > > > > > > >>>>>> for windowing will all be in Malhar, nothing that the
> > engine
> > > > > needs
> > > > > > > to
> > > > > > > >>>>>>
> > > > > > > >>>>> know
> > > > > > > >>>>
> > > > > > > >>>>> about it except that we need the control tuple for
> > watermark
> > > > > > > >>>>>>
> > > > > > > >>>>> propagation
> > > > > > > >>>
> > > > > > > >>>> and idempotent processing.
> > > > > > > >>>>>>
> > > > > > > >>>>>> I also think the main difference to other tuples is the
> > need
> > > > to
> > > > > > send
> > > > > > > >>>>>>
> > > > > > > >>>>> it
> > > > > > > >>>
> > > > > > > >>>> to
> > > > > > > >>>>
> > > > > > > >>>>> all partitions. Which is similar to checkpoint window
> > tuples,
> > > > but
> > > > > > not
> > > > > > > >>>>>>
> > > > > > > >>>>> the
> > > > > > > >>>>
> > > > > > > >>>>> same. Here, we probably also need the ability for the
> user
> > to
> > > > > > control
> > > > > > > >>>>>> whether such tuple should traverse the entire DAG or
> not.
> > > For
> > > > a
> > > > > > > batch
> > > > > > > >>>>>>
> > > > > > > >>>>> use
> > > > > > > >>>>
> > > > > > > >>>>> case, for example, we may want to send the end of file to
> > the
> > > > > next
> > > > > > > >>>>>> operator, but not beyond, if the operator has
> asynchronous
> > > > > > > processing
> > > > > > > >>>>>> logic
> > > > > > > >>>>>> in it.
> > > > > > > >>>>>>
> > > > > > > >>>>>> For any logic to be idempotent, the control tuple needs
> to
> > > be
> > > > > > > >>>>>>
> > > > > > > >>>>> processed
> > > > > > > >>>
> > > > > > > >>>> at
> > > > > > > >>>>
> > > > > > > >>>>> a window boundary. Receiving the control tuple in the
> > window
> > > > > > callback
> > > > > > > >>>>>> would
> > > > > > > >>>>>> avoid having to track extra state in the operator. I
> don't
> > > > think
> > > > > > > >>>>>>
> > > > > > > >>>>> that's
> > > > > > > >>>
> > > > > > > >>>> a
> > > > > > > >>>>
> > > > > > > >>>>> major issue, but what is the use case for processing a
> > > control
> > > > > > tuple
> > > > > > > >>>>>> within
> > > > > > > >>>>>> the window?
> > > > > > > >>>>>>
> > > > > > > >>>>>> Thomas
> > > > > > > >>>>>>
> > > > > > > >>>>>>
> > > > > > > >>>>>>
> > > > > > > >>>>>> On Sat, Jun 25, 2016 at 6:19 AM, Pramod Immaneni <
> > > > > > > >>>>>>
> > > > > > > >>>>> pra...@datatorrent.com <javascript:;>>
> > > > > > > >>>>
> > > > > > > >>>>> wrote:
> > > > > > > >>>>>>
> > > > > > > >>>>>> For the use cases you mentioned, I think 1) and 2) are
> > more
> > > > > likely
> > > > > > > to
> > > > > > > >>>>>>
> > > > > > > >>>>>>> be controlled directly by the application, 3) and 4)
> are
> > > more
> > > > > > > likely
> > > > > > > >>>>>>> going to be triggered externally and directly handled
> by
> > > the
> > > > > > engine
> > > > > > > >>>>>>> and 3) is already being implemented that way
> > > (apexcore-163).
> > > > > > > >>>>>>>
> > > > > > > >>>>>>> The control tuples emitted by an operator would be sent
> > to
> > > > all
> > > > > > > >>>>>>> downstream partitions isn't it and that would be the
> > chief
> > > > > > > >>>>>>>
> > > > > > > >>>>>> distinction
> > > > > > > >>>
> > > > > > > >>>> compared to data (apart from the payload) which would get
> > > > > > partitioned
> > > > > > > >>>>>>> under normal circumstances. It would also be guaranteed
> > > that
> > > > > > > >>>>>>> downstream partitions will receive control tuples only
> > > after
> > > > > the
> > > > > > > data
> > > > > > > >>>>>>> that was sent before it so we could send it immediately
> > > when
> > > > it
> > > > > > is
> > > > > > > >>>>>>> emitted as opposed to window boundaries.
> > > > > > > >>>>>>>
> > > > > > > >>>>>>> However during unification it is important to know if
> > these
> > > > > > control
> > > > > > > >>>>>>> tuples have been received from all upstream partitions
> > > before
> > > > > > > >>>>>>> proceeding with a control operation. One could wait
> till
> > > end
> > > > of
> > > > > > the
> > > > > > > >>>>>>> window but that introduces a delay however small, I
> would
> > > > like
> > > > > to
> > > > > > > add
> > > > > > > >>>>>>> to the proposal that the platform only hand over the
> > > control
> > > > > > tuple
> > > > > > > to
> > > > > > > >>>>>>> the unifier when it has been received from all upstream
> > > > > > partitions
> > > > > > > >>>>>>> much like how end window is processed but not wait till
> > the
> > > > > > actual
> > > > > > > >>>>>>>
> > > > > > > >>>>>> end
> > > > > > > >>>
> > > > > > > >>>> of the window.
> > > > > > > >>>>>>>
> > > > > > > >>>>>>> Regd your concern about idempotency, we typically care
> > > about
> > > > > > > >>>>>>> idempotency at a window level and doing the above will
> > > still
> > > > > > allow
> > > > > > > >>>>>>>
> > > > > > > >>>>>> the
> > > > > > > >>>
> > > > > > > >>>> operators to preserve that easily.
> > > > > > > >>>>>>>
> > > > > > > >>>>>>> Thanks
> > > > > > > >>>>>>>
> > > > > > > >>>>>>> On Jun 24, 2016, at 11:22 AM, David Yan <
> > > > da...@datatorrent.com
> > > > > > > <javascript:;>>
> > > > > > > >>>>>>>
> > > > > > > >>>>>> wrote:
> > > > > > > >>>
> > > > > > > >>>> Hi all,
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> I would like to propose a new feature to the Apex core
> > > > engine
> > > > > --
> > > > > > > the
> > > > > > > >>>>>>>> support of custom control tuples. Currently, we have
> > > control
> > > > > > > tuples
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>> such
> > > > > > > >>>>
> > > > > > > >>>>> as
> > > > > > > >>>>>>>
> > > > > > > >>>>>>> BEGIN_WINDOW, END_WINDOW, CHECKPOINT, and so on, but we
> > > don't
> > > > > > have
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>> the
> > > > > > > >>>
> > > > > > > >>>> support for applications to insert their own control
> tuples.
> > > The
> > > > > way
> > > > > > > >>>>>>>> currently to get around this is to use data tuples and
> > > have
> > > > a
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>> separate
> > > > > > > >>>
> > > > > > > >>>> port
> > > > > > > >>>>>>>
> > > > > > > >>>>>>> for such tuples that sends tuples to all partitions of
> > the
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>> downstream
> > > > > > > >>>
> > > > > > > >>>> operators, which is not exactly developer friendly.
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> We have already seen a number of use cases that can
> use
> > > this
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>> feature:
> > > > > > > >>>
> > > > > > > >>>> 1) Batch support: We need to tell all operators of the
> > > physical
> > > > > DAG
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>> when
> > > > > > > >>>>
> > > > > > > >>>>> a
> > > > > > > >>>>>>>
> > > > > > > >>>>>>> batch starts and ends, so the operators can do whatever
> > > that
> > > > is
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>> needed
> > > > > > > >>>
> > > > > > > >>>> upon
> > > > > > > >>>>>>>
> > > > > > > >>>>>>> the start or the end of a batch.
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> 2) Watermark: To support the concepts of event time
> > > > windowing,
> > > > > > the
> > > > > > > >>>>>>>> watermark control tuple is needed to tell which
> windows
> > > > should
> > > > > > be
> > > > > > > >>>>>>>> considered late.
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> 3) Changing operator properties: We do have the
> support
> > of
> > > > > > > changing
> > > > > > > >>>>>>>> operator properties on the fly, but with a custom
> > control
> > > > > tuple,
> > > > > > > the
> > > > > > > >>>>>>>> command to change operator properties can be window
> > > aligned
> > > > > for
> > > > > > > all
> > > > > > > >>>>>>>> partitions and also across the DAG.
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> 4) Recording tuples: Like changing operator
> properties,
> > we
> > > > do
> > > > > > have
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>> this
> > > > > > > >>>>
> > > > > > > >>>>> support now but only at the individual physical operator
> > > level,
> > > > > and
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> without
> > > > > > > >>>>>>>
> > > > > > > >>>>>>> control of which window to record tuples for. With a
> > custom
> > > > > > control
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> tuple,
> > > > > > > >>>>>>>
> > > > > > > >>>>>>> because a control tuple must belong to a window, all
> > > > operators
> > > > > in
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>> the
> > > > > > > >>>
> > > > > > > >>>> DAG
> > > > > > > >>>>>>>> can start (and stop) recording for the same windows.
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> I can think of two options to achieve this:
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> 1) new custom control tuple type that takes user's
> > > > > serializable
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>> object.
> > > > > > > >>>>
> > > > > > > >>>>> 2) piggy back the current BEGIN_WINDOW and END_WINDOW
> > control
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>> tuples.
> > > > > > > >>>
> > > > > > > >>>> Please provide your feedback. Thank you.
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> David
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to