The control tuple could be delivered to the operator only after it is
received from all upstream partitions but still allow other data from an
upstream partition after it's control tuple is received, we don't have to
necessarily block and do complete synchronization like in end window. You
are right that this would still require some identification information be
sent in the control tuple in the example I mentioned but you save on the
waiting till end window and are still guaranteed that all data sent before
the control tuple is received before it is received.

On Thu, Nov 3, 2016 at 1:05 AM, Thomas Weise <t...@apache.org> wrote:

> I don't see how that would work. Suppose you have a file splitter and
> multiple partitions of block readers. The "end of file" event cannot be
> processed downstream until all block readers are done. I also think that
> this is related to the batch demarcation discussion and there should be a
> single generalized mechanism to support this.
>
>
> On Wed, Nov 2, 2016 at 10:51 PM, Pramod Immaneni <pra...@datatorrent.com>
> wrote:
>
> > Suppose I am processing data in a file and I want to do something at the
> > end of a file at the output operator, I would send an end file control
> > tuple and act on it when I receive it at the output. In a single window I
> > may end up processing multiple files and if I don't have multiple ports
> and
> > logical paths through the DAG (multiple partitions are ok). I can process
> > end of each file immediately and also know what file was closed without
> > sending extra identification information in the end file which I would
> need
> > if I am collecting all of them and processing at the end of the window.
> >
> > On Wed, Nov 2, 2016 at 2:45 PM, Thomas Weise <t...@apache.org> wrote:
> >
> > > The use cases listed in the original discussion don't call for option
> 2.
> > It
> > > seems to come with additional complexity and implementation cost.
> > >
> > > Can those in favor of option 2 please also provide the use case for it.
> > >
> > > Thanks,
> > > Thomas
> > >
> > >
> > > On Wed, Nov 2, 2016 at 10:36 PM, Siyuan Hua <siy...@datatorrent.com>
> > > wrote:
> > >
> > > > I will vote for approach 1.
> > > >
> > > > First of all that one sounds easier to do to me. And I think
> > idempotency
> > > is
> > > > important. It may run at the cost of higher latency but I think it is
> > ok
> > > >
> > > > And in addition, when in the future if users do need realtime control
> > > tuple
> > > > processing, we can always add the option on top of it.
> > > >
> > > > So I vote for 1
> > > >
> > > > Thanks,
> > > > Siyuan
> > > >
> > > > On Wed, Nov 2, 2016 at 1:28 PM, Pradeep A. Dalvi <p...@apache.org>
> > > wrote:
> > > >
> > > > > As a rule of thumb in any real time operating system, control
> tuples
> > > > should
> > > > > always be handled using Priority Queues.
> > > > >
> > > > > We may try to control priorities by defining levels. And shall not
> > > > > be delivered at window boundaries.
> > > > >
> > > > > In short, control tuples shall never be treated as any other tuples
> > in
> > > > real
> > > > > time systems.
> > > > >
> > > > > On Thursday, November 3, 2016, David Yan <da...@datatorrent.com>
> > > wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > I would like to renew the discussion of control tuples.
> > > > > >
> > > > > > Last time, we were in a debate about whether:
> > > > > >
> > > > > > 1) the platform should enforce that control tuples are delivered
> at
> > > > > window
> > > > > > boundaries only
> > > > > >
> > > > > > or:
> > > > > >
> > > > > > 2) the platform should deliver control tuples just as other
> tuples
> > > and
> > > > > it's
> > > > > > the operator developers' choice whether to handle the control
> > tuples
> > > as
> > > > > > they arrive or delay the processing till the next window
> boundary.
> > > > > >
> > > > > > To summarize the pros and cons:
> > > > > >
> > > > > > Approach 1: If processing control tuples results in changes of
> the
> > > > > behavior
> > > > > > of the operator, if idempotency needs to be preserved, the
> > processing
> > > > > must
> > > > > > be done at window boundaries. This approach will save the
> operator
> > > > > > developers headache to ensure that. However, this will take away
> > the
> > > > > > choices from the operator developer if they just need to process
> > the
> > > > > > control tuples as soon as possible.
> > > > > >
> > > > > > Approach 2: The operator has a chance to immediately process
> > control
> > > > > > tuples. This would be useful if latency is more valued than
> > > > correctness.
> > > > > > However, if this would open the possibility for operator
> developers
> > > to
> > > > > > shoot themselves in the foot. This is especially true if there
> are
> > > > > multiple
> > > > > > input ports. as there is no easy way to guarantee processing
> order
> > > for
> > > > > > multiple input ports.
> > > > > >
> > > > > > We would like to arrive to a consensus and close this discussion
> > soon
> > > > > this
> > > > > > time so we can start the work on this important feature.
> > > > > >
> > > > > > Thanks!
> > > > > >
> > > > > > David
> > > > > >
> > > > > > On Tue, Jun 28, 2016 at 10:04 AM, Vlad Rozov <
> > > v.ro...@datatorrent.com
> > > > > > <javascript:;>>
> > > > > > wrote:
> > > > > >
> > > > > > > It is not clear how operator will emit custom control tuple at
> > > window
> > > > > > > boundaries. One way is to cache/accumulate control tuples in
> the
> > > > > operator
> > > > > > > output port till window closes (END_WINDOW is inserted into the
> > > > output
> > > > > > > sink) or only allow an operator to emit control tuples inside
> the
> > > > > > > endWindow(). The later is a slight variation of the operator
> > output
> > > > > port
> > > > > > > caching behavior with the only difference that now the operator
> > > > itself
> > > > > is
> > > > > > > responsible for caching/accumulating control tuples. Note that
> in
> > > > many
> > > > > > > cases it will be necessary to postpone emitting payload tuples
> > that
> > > > > > > logically come after the custom control tuple till the next
> > window
> > > > > > begins.
> > > > > > >
> > > > > > > IMO, that too restrictive and in a case where input operator
> > uses a
> > > > > push
> > > > > > > instead of a poll (for example, it provides an end point where
> > > remote
> > > > > > > agents may connect and publish/push data), control tuples may
> be
> > > used
> > > > > for
> > > > > > > connect/disconnect/watermark broadcast to (partitioned)
> > downstream
> > > > > > > operators. In this case the platform just need to guarantee
> order
> > > > > barrier
> > > > > > > (any tuple emitted prior to a control tuple needs to be
> delivered
> > > > prior
> > > > > > to
> > > > > > > the control tuple).
> > > > > > >
> > > > > > > Thank you,
> > > > > > >
> > > > > > > Vlad
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On 6/27/16 19:36, Amol Kekre wrote:
> > > > > > >
> > > > > > >> I agree with David. Allowing control tuples within a window
> > (along
> > > > > with
> > > > > > >> data tuples) creates very dangerous situation where guarantees
> > are
> > > > > > >> impacted. It is much safer to enable control tuples
> > (send/receive)
> > > > at
> > > > > > >> window boundaries (after END_WINDOW of window N, and before
> > > > > BEGIN_WINDOW
> > > > > > >> for window N+1). My take on David's list is
> > > > > > >>
> > > > > > >> 1. -> window boundaries -> Strong +1; there will be a big
> issue
> > > with
> > > > > > >> guarantees for operators with multiple ports. (see Thomas's
> > > > response)
> > > > > > >> 2. -> All downstream windows -> +1, but there are situations;
> a
> > > > caveat
> > > > > > >> could be "only to operators that implement control tuple
> > > > > > >> interface/listeners", which could effectively translates to
> "all
> > > > > > >> interested
> > > > > > >> downstream operators"
> > > > > > >> 3. Only Input operator can create control tuples -> -1; is
> > > > restrictive
> > > > > > >> even
> > > > > > >> though most likely 95% of the time it will be input operators
> > > > > > >>
> > > > > > >> Thks,
> > > > > > >> Amol
> > > > > > >>
> > > > > > >>
> > > > > > >> On Mon, Jun 27, 2016 at 4:37 PM, Thomas Weise <
> > > > tho...@datatorrent.com
> > > > > > <javascript:;>>
> > > > > > >> wrote:
> > > > > > >>
> > > > > > >> The windowing we discuss here is in general event time based,
> > > > arrival
> > > > > > time
> > > > > > >>> is a special case of it.
> > > > > > >>>
> > > > > > >>> I don't think state changes can be made independent of the
> > > > streaming
> > > > > > >>> window
> > > > > > >>> boundary as it would prevent idempotent processing and
> > > transitively
> > > > > > >>> exactly
> > > > > > >>> once. For that to work, tuples need to be presented to the
> > > operator
> > > > > in
> > > > > > a
> > > > > > >>> guaranteed order *within* the streaming window, which is not
> > > > possible
> > > > > > >>> with
> > > > > > >>> multiple ports (and partitions).
> > > > > > >>>
> > > > > > >>> Thomas
> > > > > > >>>
> > > > > > >>> On Mon, Jun 27, 2016 at 2:53 PM, David Yan <
> > > da...@datatorrent.com
> > > > > > <javascript:;>>
> > > > > > >>> wrote:
> > > > > > >>>
> > > > > > >>> I think for session tracking, if the session boundaries are
> > > allowed
> > > > > to
> > > > > > be
> > > > > > >>>> not aligned with the streaming window boundaries, the user
> > will
> > > > > have a
> > > > > > >>>>
> > > > > > >>> much
> > > > > > >>>
> > > > > > >>>> bigger problem with idempotency. And in most cases, session
> > > > tracking
> > > > > > is
> > > > > > >>>> event time based, not ingression time or processing time
> > based,
> > > so
> > > > > > this
> > > > > > >>>>
> > > > > > >>> may
> > > > > > >>>
> > > > > > >>>> never be a problem. But if that ever happens, the user can
> > > always
> > > > > > alter
> > > > > > >>>>
> > > > > > >>> the
> > > > > > >>>
> > > > > > >>>> default 500ms width.
> > > > > > >>>>
> > > > > > >>>> David
> > > > > > >>>>
> > > > > > >>>> On Mon, Jun 27, 2016 at 2:35 PM, Vlad Rozov <
> > > > > v.ro...@datatorrent.com
> > > > > > <javascript:;>>
> > > > > > >>>> wrote:
> > > > > > >>>>
> > > > > > >>>> Ability to send custom control tuples within window may be
> > > useful,
> > > > > for
> > > > > > >>>>> example, for sessions tracking, where session boundaries
> are
> > > not
> > > > > > >>>>>
> > > > > > >>>> aligned
> > > > > > >>>
> > > > > > >>>> with window boundaries and 500 ms latency is not acceptable
> > for
> > > an
> > > > > > >>>>> application.
> > > > > > >>>>>
> > > > > > >>>>> Thank you,
> > > > > > >>>>>
> > > > > > >>>>> Vlad
> > > > > > >>>>>
> > > > > > >>>>>
> > > > > > >>>>> On 6/25/16 10:52, Thomas Weise wrote:
> > > > > > >>>>>
> > > > > > >>>>> It should not matter from where the control tuple is
> > triggered.
> > > > It
> > > > > > >>>>>>
> > > > > > >>>>> will
> > > > > > >>>
> > > > > > >>>> be
> > > > > > >>>>
> > > > > > >>>>> good to have a generic mechanism to propagate it and other
> > > things
> > > > > can
> > > > > > >>>>>>
> > > > > > >>>>> be
> > > > > > >>>
> > > > > > >>>> accomplished outside the engine. For example, the new
> > > > comprehensive
> > > > > > >>>>>> support
> > > > > > >>>>>> for windowing will all be in Malhar, nothing that the
> engine
> > > > needs
> > > > > > to
> > > > > > >>>>>>
> > > > > > >>>>> know
> > > > > > >>>>
> > > > > > >>>>> about it except that we need the control tuple for
> watermark
> > > > > > >>>>>>
> > > > > > >>>>> propagation
> > > > > > >>>
> > > > > > >>>> and idempotent processing.
> > > > > > >>>>>>
> > > > > > >>>>>> I also think the main difference to other tuples is the
> need
> > > to
> > > > > send
> > > > > > >>>>>>
> > > > > > >>>>> it
> > > > > > >>>
> > > > > > >>>> to
> > > > > > >>>>
> > > > > > >>>>> all partitions. Which is similar to checkpoint window
> tuples,
> > > but
> > > > > not
> > > > > > >>>>>>
> > > > > > >>>>> the
> > > > > > >>>>
> > > > > > >>>>> same. Here, we probably also need the ability for the user
> to
> > > > > control
> > > > > > >>>>>> whether such tuple should traverse the entire DAG or not.
> > For
> > > a
> > > > > > batch
> > > > > > >>>>>>
> > > > > > >>>>> use
> > > > > > >>>>
> > > > > > >>>>> case, for example, we may want to send the end of file to
> the
> > > > next
> > > > > > >>>>>> operator, but not beyond, if the operator has asynchronous
> > > > > > processing
> > > > > > >>>>>> logic
> > > > > > >>>>>> in it.
> > > > > > >>>>>>
> > > > > > >>>>>> For any logic to be idempotent, the control tuple needs to
> > be
> > > > > > >>>>>>
> > > > > > >>>>> processed
> > > > > > >>>
> > > > > > >>>> at
> > > > > > >>>>
> > > > > > >>>>> a window boundary. Receiving the control tuple in the
> window
> > > > > callback
> > > > > > >>>>>> would
> > > > > > >>>>>> avoid having to track extra state in the operator. I don't
> > > think
> > > > > > >>>>>>
> > > > > > >>>>> that's
> > > > > > >>>
> > > > > > >>>> a
> > > > > > >>>>
> > > > > > >>>>> major issue, but what is the use case for processing a
> > control
> > > > > tuple
> > > > > > >>>>>> within
> > > > > > >>>>>> the window?
> > > > > > >>>>>>
> > > > > > >>>>>> Thomas
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>> On Sat, Jun 25, 2016 at 6:19 AM, Pramod Immaneni <
> > > > > > >>>>>>
> > > > > > >>>>> pra...@datatorrent.com <javascript:;>>
> > > > > > >>>>
> > > > > > >>>>> wrote:
> > > > > > >>>>>>
> > > > > > >>>>>> For the use cases you mentioned, I think 1) and 2) are
> more
> > > > likely
> > > > > > to
> > > > > > >>>>>>
> > > > > > >>>>>>> be controlled directly by the application, 3) and 4) are
> > more
> > > > > > likely
> > > > > > >>>>>>> going to be triggered externally and directly handled by
> > the
> > > > > engine
> > > > > > >>>>>>> and 3) is already being implemented that way
> > (apexcore-163).
> > > > > > >>>>>>>
> > > > > > >>>>>>> The control tuples emitted by an operator would be sent
> to
> > > all
> > > > > > >>>>>>> downstream partitions isn't it and that would be the
> chief
> > > > > > >>>>>>>
> > > > > > >>>>>> distinction
> > > > > > >>>
> > > > > > >>>> compared to data (apart from the payload) which would get
> > > > > partitioned
> > > > > > >>>>>>> under normal circumstances. It would also be guaranteed
> > that
> > > > > > >>>>>>> downstream partitions will receive control tuples only
> > after
> > > > the
> > > > > > data
> > > > > > >>>>>>> that was sent before it so we could send it immediately
> > when
> > > it
> > > > > is
> > > > > > >>>>>>> emitted as opposed to window boundaries.
> > > > > > >>>>>>>
> > > > > > >>>>>>> However during unification it is important to know if
> these
> > > > > control
> > > > > > >>>>>>> tuples have been received from all upstream partitions
> > before
> > > > > > >>>>>>> proceeding with a control operation. One could wait till
> > end
> > > of
> > > > > the
> > > > > > >>>>>>> window but that introduces a delay however small, I would
> > > like
> > > > to
> > > > > > add
> > > > > > >>>>>>> to the proposal that the platform only hand over the
> > control
> > > > > tuple
> > > > > > to
> > > > > > >>>>>>> the unifier when it has been received from all upstream
> > > > > partitions
> > > > > > >>>>>>> much like how end window is processed but not wait till
> the
> > > > > actual
> > > > > > >>>>>>>
> > > > > > >>>>>> end
> > > > > > >>>
> > > > > > >>>> of the window.
> > > > > > >>>>>>>
> > > > > > >>>>>>> Regd your concern about idempotency, we typically care
> > about
> > > > > > >>>>>>> idempotency at a window level and doing the above will
> > still
> > > > > allow
> > > > > > >>>>>>>
> > > > > > >>>>>> the
> > > > > > >>>
> > > > > > >>>> operators to preserve that easily.
> > > > > > >>>>>>>
> > > > > > >>>>>>> Thanks
> > > > > > >>>>>>>
> > > > > > >>>>>>> On Jun 24, 2016, at 11:22 AM, David Yan <
> > > da...@datatorrent.com
> > > > > > <javascript:;>>
> > > > > > >>>>>>>
> > > > > > >>>>>> wrote:
> > > > > > >>>
> > > > > > >>>> Hi all,
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> I would like to propose a new feature to the Apex core
> > > engine
> > > > --
> > > > > > the
> > > > > > >>>>>>>> support of custom control tuples. Currently, we have
> > control
> > > > > > tuples
> > > > > > >>>>>>>>
> > > > > > >>>>>>> such
> > > > > > >>>>
> > > > > > >>>>> as
> > > > > > >>>>>>>
> > > > > > >>>>>>> BEGIN_WINDOW, END_WINDOW, CHECKPOINT, and so on, but we
> > don't
> > > > > have
> > > > > > >>>>>>>>
> > > > > > >>>>>>> the
> > > > > > >>>
> > > > > > >>>> support for applications to insert their own control tuples.
> > The
> > > > way
> > > > > > >>>>>>>> currently to get around this is to use data tuples and
> > have
> > > a
> > > > > > >>>>>>>>
> > > > > > >>>>>>> separate
> > > > > > >>>
> > > > > > >>>> port
> > > > > > >>>>>>>
> > > > > > >>>>>>> for such tuples that sends tuples to all partitions of
> the
> > > > > > >>>>>>>>
> > > > > > >>>>>>> downstream
> > > > > > >>>
> > > > > > >>>> operators, which is not exactly developer friendly.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> We have already seen a number of use cases that can use
> > this
> > > > > > >>>>>>>>
> > > > > > >>>>>>> feature:
> > > > > > >>>
> > > > > > >>>> 1) Batch support: We need to tell all operators of the
> > physical
> > > > DAG
> > > > > > >>>>>>>>
> > > > > > >>>>>>> when
> > > > > > >>>>
> > > > > > >>>>> a
> > > > > > >>>>>>>
> > > > > > >>>>>>> batch starts and ends, so the operators can do whatever
> > that
> > > is
> > > > > > >>>>>>>>
> > > > > > >>>>>>> needed
> > > > > > >>>
> > > > > > >>>> upon
> > > > > > >>>>>>>
> > > > > > >>>>>>> the start or the end of a batch.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> 2) Watermark: To support the concepts of event time
> > > windowing,
> > > > > the
> > > > > > >>>>>>>> watermark control tuple is needed to tell which windows
> > > should
> > > > > be
> > > > > > >>>>>>>> considered late.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> 3) Changing operator properties: We do have the support
> of
> > > > > > changing
> > > > > > >>>>>>>> operator properties on the fly, but with a custom
> control
> > > > tuple,
> > > > > > the
> > > > > > >>>>>>>> command to change operator properties can be window
> > aligned
> > > > for
> > > > > > all
> > > > > > >>>>>>>> partitions and also across the DAG.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> 4) Recording tuples: Like changing operator properties,
> we
> > > do
> > > > > have
> > > > > > >>>>>>>>
> > > > > > >>>>>>> this
> > > > > > >>>>
> > > > > > >>>>> support now but only at the individual physical operator
> > level,
> > > > and
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> without
> > > > > > >>>>>>>
> > > > > > >>>>>>> control of which window to record tuples for. With a
> custom
> > > > > control
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> tuple,
> > > > > > >>>>>>>
> > > > > > >>>>>>> because a control tuple must belong to a window, all
> > > operators
> > > > in
> > > > > > >>>>>>>>
> > > > > > >>>>>>> the
> > > > > > >>>
> > > > > > >>>> DAG
> > > > > > >>>>>>>> can start (and stop) recording for the same windows.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> I can think of two options to achieve this:
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> 1) new custom control tuple type that takes user's
> > > > serializable
> > > > > > >>>>>>>>
> > > > > > >>>>>>> object.
> > > > > > >>>>
> > > > > > >>>>> 2) piggy back the current BEGIN_WINDOW and END_WINDOW
> control
> > > > > > >>>>>>>>
> > > > > > >>>>>>> tuples.
> > > > > > >>>
> > > > > > >>>> Please provide your feedback. Thank you.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> David
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to