I don't see how that would work. Suppose you have a file splitter and
multiple partitions of block readers. The "end of file" event cannot be
processed downstream until all block readers are done. I also think that
this is related to the batch demarcation discussion and there should be a
single generalized mechanism to support this.


On Wed, Nov 2, 2016 at 10:51 PM, Pramod Immaneni <pra...@datatorrent.com>
wrote:

> Suppose I am processing data in a file and I want to do something at the
> end of a file at the output operator, I would send an end file control
> tuple and act on it when I receive it at the output. In a single window I
> may end up processing multiple files and if I don't have multiple ports and
> logical paths through the DAG (multiple partitions are ok). I can process
> end of each file immediately and also know what file was closed without
> sending extra identification information in the end file which I would need
> if I am collecting all of them and processing at the end of the window.
>
> On Wed, Nov 2, 2016 at 2:45 PM, Thomas Weise <t...@apache.org> wrote:
>
> > The use cases listed in the original discussion don't call for option 2.
> It
> > seems to come with additional complexity and implementation cost.
> >
> > Can those in favor of option 2 please also provide the use case for it.
> >
> > Thanks,
> > Thomas
> >
> >
> > On Wed, Nov 2, 2016 at 10:36 PM, Siyuan Hua <siy...@datatorrent.com>
> > wrote:
> >
> > > I will vote for approach 1.
> > >
> > > First of all that one sounds easier to do to me. And I think
> idempotency
> > is
> > > important. It may run at the cost of higher latency but I think it is
> ok
> > >
> > > And in addition, when in the future if users do need realtime control
> > tuple
> > > processing, we can always add the option on top of it.
> > >
> > > So I vote for 1
> > >
> > > Thanks,
> > > Siyuan
> > >
> > > On Wed, Nov 2, 2016 at 1:28 PM, Pradeep A. Dalvi <p...@apache.org>
> > wrote:
> > >
> > > > As a rule of thumb in any real time operating system, control tuples
> > > should
> > > > always be handled using Priority Queues.
> > > >
> > > > We may try to control priorities by defining levels. And shall not
> > > > be delivered at window boundaries.
> > > >
> > > > In short, control tuples shall never be treated as any other tuples
> in
> > > real
> > > > time systems.
> > > >
> > > > On Thursday, November 3, 2016, David Yan <da...@datatorrent.com>
> > wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > I would like to renew the discussion of control tuples.
> > > > >
> > > > > Last time, we were in a debate about whether:
> > > > >
> > > > > 1) the platform should enforce that control tuples are delivered at
> > > > window
> > > > > boundaries only
> > > > >
> > > > > or:
> > > > >
> > > > > 2) the platform should deliver control tuples just as other tuples
> > and
> > > > it's
> > > > > the operator developers' choice whether to handle the control
> tuples
> > as
> > > > > they arrive or delay the processing till the next window boundary.
> > > > >
> > > > > To summarize the pros and cons:
> > > > >
> > > > > Approach 1: If processing control tuples results in changes of the
> > > > behavior
> > > > > of the operator, if idempotency needs to be preserved, the
> processing
> > > > must
> > > > > be done at window boundaries. This approach will save the operator
> > > > > developers headache to ensure that. However, this will take away
> the
> > > > > choices from the operator developer if they just need to process
> the
> > > > > control tuples as soon as possible.
> > > > >
> > > > > Approach 2: The operator has a chance to immediately process
> control
> > > > > tuples. This would be useful if latency is more valued than
> > > correctness.
> > > > > However, if this would open the possibility for operator developers
> > to
> > > > > shoot themselves in the foot. This is especially true if there are
> > > > multiple
> > > > > input ports. as there is no easy way to guarantee processing order
> > for
> > > > > multiple input ports.
> > > > >
> > > > > We would like to arrive to a consensus and close this discussion
> soon
> > > > this
> > > > > time so we can start the work on this important feature.
> > > > >
> > > > > Thanks!
> > > > >
> > > > > David
> > > > >
> > > > > On Tue, Jun 28, 2016 at 10:04 AM, Vlad Rozov <
> > v.ro...@datatorrent.com
> > > > > <javascript:;>>
> > > > > wrote:
> > > > >
> > > > > > It is not clear how operator will emit custom control tuple at
> > window
> > > > > > boundaries. One way is to cache/accumulate control tuples in the
> > > > operator
> > > > > > output port till window closes (END_WINDOW is inserted into the
> > > output
> > > > > > sink) or only allow an operator to emit control tuples inside the
> > > > > > endWindow(). The later is a slight variation of the operator
> output
> > > > port
> > > > > > caching behavior with the only difference that now the operator
> > > itself
> > > > is
> > > > > > responsible for caching/accumulating control tuples. Note that in
> > > many
> > > > > > cases it will be necessary to postpone emitting payload tuples
> that
> > > > > > logically come after the custom control tuple till the next
> window
> > > > > begins.
> > > > > >
> > > > > > IMO, that too restrictive and in a case where input operator
> uses a
> > > > push
> > > > > > instead of a poll (for example, it provides an end point where
> > remote
> > > > > > agents may connect and publish/push data), control tuples may be
> > used
> > > > for
> > > > > > connect/disconnect/watermark broadcast to (partitioned)
> downstream
> > > > > > operators. In this case the platform just need to guarantee order
> > > > barrier
> > > > > > (any tuple emitted prior to a control tuple needs to be delivered
> > > prior
> > > > > to
> > > > > > the control tuple).
> > > > > >
> > > > > > Thank you,
> > > > > >
> > > > > > Vlad
> > > > > >
> > > > > >
> > > > > >
> > > > > > On 6/27/16 19:36, Amol Kekre wrote:
> > > > > >
> > > > > >> I agree with David. Allowing control tuples within a window
> (along
> > > > with
> > > > > >> data tuples) creates very dangerous situation where guarantees
> are
> > > > > >> impacted. It is much safer to enable control tuples
> (send/receive)
> > > at
> > > > > >> window boundaries (after END_WINDOW of window N, and before
> > > > BEGIN_WINDOW
> > > > > >> for window N+1). My take on David's list is
> > > > > >>
> > > > > >> 1. -> window boundaries -> Strong +1; there will be a big issue
> > with
> > > > > >> guarantees for operators with multiple ports. (see Thomas's
> > > response)
> > > > > >> 2. -> All downstream windows -> +1, but there are situations; a
> > > caveat
> > > > > >> could be "only to operators that implement control tuple
> > > > > >> interface/listeners", which could effectively translates to "all
> > > > > >> interested
> > > > > >> downstream operators"
> > > > > >> 3. Only Input operator can create control tuples -> -1; is
> > > restrictive
> > > > > >> even
> > > > > >> though most likely 95% of the time it will be input operators
> > > > > >>
> > > > > >> Thks,
> > > > > >> Amol
> > > > > >>
> > > > > >>
> > > > > >> On Mon, Jun 27, 2016 at 4:37 PM, Thomas Weise <
> > > tho...@datatorrent.com
> > > > > <javascript:;>>
> > > > > >> wrote:
> > > > > >>
> > > > > >> The windowing we discuss here is in general event time based,
> > > arrival
> > > > > time
> > > > > >>> is a special case of it.
> > > > > >>>
> > > > > >>> I don't think state changes can be made independent of the
> > > streaming
> > > > > >>> window
> > > > > >>> boundary as it would prevent idempotent processing and
> > transitively
> > > > > >>> exactly
> > > > > >>> once. For that to work, tuples need to be presented to the
> > operator
> > > > in
> > > > > a
> > > > > >>> guaranteed order *within* the streaming window, which is not
> > > possible
> > > > > >>> with
> > > > > >>> multiple ports (and partitions).
> > > > > >>>
> > > > > >>> Thomas
> > > > > >>>
> > > > > >>> On Mon, Jun 27, 2016 at 2:53 PM, David Yan <
> > da...@datatorrent.com
> > > > > <javascript:;>>
> > > > > >>> wrote:
> > > > > >>>
> > > > > >>> I think for session tracking, if the session boundaries are
> > allowed
> > > > to
> > > > > be
> > > > > >>>> not aligned with the streaming window boundaries, the user
> will
> > > > have a
> > > > > >>>>
> > > > > >>> much
> > > > > >>>
> > > > > >>>> bigger problem with idempotency. And in most cases, session
> > > tracking
> > > > > is
> > > > > >>>> event time based, not ingression time or processing time
> based,
> > so
> > > > > this
> > > > > >>>>
> > > > > >>> may
> > > > > >>>
> > > > > >>>> never be a problem. But if that ever happens, the user can
> > always
> > > > > alter
> > > > > >>>>
> > > > > >>> the
> > > > > >>>
> > > > > >>>> default 500ms width.
> > > > > >>>>
> > > > > >>>> David
> > > > > >>>>
> > > > > >>>> On Mon, Jun 27, 2016 at 2:35 PM, Vlad Rozov <
> > > > v.ro...@datatorrent.com
> > > > > <javascript:;>>
> > > > > >>>> wrote:
> > > > > >>>>
> > > > > >>>> Ability to send custom control tuples within window may be
> > useful,
> > > > for
> > > > > >>>>> example, for sessions tracking, where session boundaries are
> > not
> > > > > >>>>>
> > > > > >>>> aligned
> > > > > >>>
> > > > > >>>> with window boundaries and 500 ms latency is not acceptable
> for
> > an
> > > > > >>>>> application.
> > > > > >>>>>
> > > > > >>>>> Thank you,
> > > > > >>>>>
> > > > > >>>>> Vlad
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>> On 6/25/16 10:52, Thomas Weise wrote:
> > > > > >>>>>
> > > > > >>>>> It should not matter from where the control tuple is
> triggered.
> > > It
> > > > > >>>>>>
> > > > > >>>>> will
> > > > > >>>
> > > > > >>>> be
> > > > > >>>>
> > > > > >>>>> good to have a generic mechanism to propagate it and other
> > things
> > > > can
> > > > > >>>>>>
> > > > > >>>>> be
> > > > > >>>
> > > > > >>>> accomplished outside the engine. For example, the new
> > > comprehensive
> > > > > >>>>>> support
> > > > > >>>>>> for windowing will all be in Malhar, nothing that the engine
> > > needs
> > > > > to
> > > > > >>>>>>
> > > > > >>>>> know
> > > > > >>>>
> > > > > >>>>> about it except that we need the control tuple for watermark
> > > > > >>>>>>
> > > > > >>>>> propagation
> > > > > >>>
> > > > > >>>> and idempotent processing.
> > > > > >>>>>>
> > > > > >>>>>> I also think the main difference to other tuples is the need
> > to
> > > > send
> > > > > >>>>>>
> > > > > >>>>> it
> > > > > >>>
> > > > > >>>> to
> > > > > >>>>
> > > > > >>>>> all partitions. Which is similar to checkpoint window tuples,
> > but
> > > > not
> > > > > >>>>>>
> > > > > >>>>> the
> > > > > >>>>
> > > > > >>>>> same. Here, we probably also need the ability for the user to
> > > > control
> > > > > >>>>>> whether such tuple should traverse the entire DAG or not.
> For
> > a
> > > > > batch
> > > > > >>>>>>
> > > > > >>>>> use
> > > > > >>>>
> > > > > >>>>> case, for example, we may want to send the end of file to the
> > > next
> > > > > >>>>>> operator, but not beyond, if the operator has asynchronous
> > > > > processing
> > > > > >>>>>> logic
> > > > > >>>>>> in it.
> > > > > >>>>>>
> > > > > >>>>>> For any logic to be idempotent, the control tuple needs to
> be
> > > > > >>>>>>
> > > > > >>>>> processed
> > > > > >>>
> > > > > >>>> at
> > > > > >>>>
> > > > > >>>>> a window boundary. Receiving the control tuple in the window
> > > > callback
> > > > > >>>>>> would
> > > > > >>>>>> avoid having to track extra state in the operator. I don't
> > think
> > > > > >>>>>>
> > > > > >>>>> that's
> > > > > >>>
> > > > > >>>> a
> > > > > >>>>
> > > > > >>>>> major issue, but what is the use case for processing a
> control
> > > > tuple
> > > > > >>>>>> within
> > > > > >>>>>> the window?
> > > > > >>>>>>
> > > > > >>>>>> Thomas
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>> On Sat, Jun 25, 2016 at 6:19 AM, Pramod Immaneni <
> > > > > >>>>>>
> > > > > >>>>> pra...@datatorrent.com <javascript:;>>
> > > > > >>>>
> > > > > >>>>> wrote:
> > > > > >>>>>>
> > > > > >>>>>> For the use cases you mentioned, I think 1) and 2) are more
> > > likely
> > > > > to
> > > > > >>>>>>
> > > > > >>>>>>> be controlled directly by the application, 3) and 4) are
> more
> > > > > likely
> > > > > >>>>>>> going to be triggered externally and directly handled by
> the
> > > > engine
> > > > > >>>>>>> and 3) is already being implemented that way
> (apexcore-163).
> > > > > >>>>>>>
> > > > > >>>>>>> The control tuples emitted by an operator would be sent to
> > all
> > > > > >>>>>>> downstream partitions isn't it and that would be the chief
> > > > > >>>>>>>
> > > > > >>>>>> distinction
> > > > > >>>
> > > > > >>>> compared to data (apart from the payload) which would get
> > > > partitioned
> > > > > >>>>>>> under normal circumstances. It would also be guaranteed
> that
> > > > > >>>>>>> downstream partitions will receive control tuples only
> after
> > > the
> > > > > data
> > > > > >>>>>>> that was sent before it so we could send it immediately
> when
> > it
> > > > is
> > > > > >>>>>>> emitted as opposed to window boundaries.
> > > > > >>>>>>>
> > > > > >>>>>>> However during unification it is important to know if these
> > > > control
> > > > > >>>>>>> tuples have been received from all upstream partitions
> before
> > > > > >>>>>>> proceeding with a control operation. One could wait till
> end
> > of
> > > > the
> > > > > >>>>>>> window but that introduces a delay however small, I would
> > like
> > > to
> > > > > add
> > > > > >>>>>>> to the proposal that the platform only hand over the
> control
> > > > tuple
> > > > > to
> > > > > >>>>>>> the unifier when it has been received from all upstream
> > > > partitions
> > > > > >>>>>>> much like how end window is processed but not wait till the
> > > > actual
> > > > > >>>>>>>
> > > > > >>>>>> end
> > > > > >>>
> > > > > >>>> of the window.
> > > > > >>>>>>>
> > > > > >>>>>>> Regd your concern about idempotency, we typically care
> about
> > > > > >>>>>>> idempotency at a window level and doing the above will
> still
> > > > allow
> > > > > >>>>>>>
> > > > > >>>>>> the
> > > > > >>>
> > > > > >>>> operators to preserve that easily.
> > > > > >>>>>>>
> > > > > >>>>>>> Thanks
> > > > > >>>>>>>
> > > > > >>>>>>> On Jun 24, 2016, at 11:22 AM, David Yan <
> > da...@datatorrent.com
> > > > > <javascript:;>>
> > > > > >>>>>>>
> > > > > >>>>>> wrote:
> > > > > >>>
> > > > > >>>> Hi all,
> > > > > >>>>>>>>
> > > > > >>>>>>>> I would like to propose a new feature to the Apex core
> > engine
> > > --
> > > > > the
> > > > > >>>>>>>> support of custom control tuples. Currently, we have
> control
> > > > > tuples
> > > > > >>>>>>>>
> > > > > >>>>>>> such
> > > > > >>>>
> > > > > >>>>> as
> > > > > >>>>>>>
> > > > > >>>>>>> BEGIN_WINDOW, END_WINDOW, CHECKPOINT, and so on, but we
> don't
> > > > have
> > > > > >>>>>>>>
> > > > > >>>>>>> the
> > > > > >>>
> > > > > >>>> support for applications to insert their own control tuples.
> The
> > > way
> > > > > >>>>>>>> currently to get around this is to use data tuples and
> have
> > a
> > > > > >>>>>>>>
> > > > > >>>>>>> separate
> > > > > >>>
> > > > > >>>> port
> > > > > >>>>>>>
> > > > > >>>>>>> for such tuples that sends tuples to all partitions of the
> > > > > >>>>>>>>
> > > > > >>>>>>> downstream
> > > > > >>>
> > > > > >>>> operators, which is not exactly developer friendly.
> > > > > >>>>>>>>
> > > > > >>>>>>>> We have already seen a number of use cases that can use
> this
> > > > > >>>>>>>>
> > > > > >>>>>>> feature:
> > > > > >>>
> > > > > >>>> 1) Batch support: We need to tell all operators of the
> physical
> > > DAG
> > > > > >>>>>>>>
> > > > > >>>>>>> when
> > > > > >>>>
> > > > > >>>>> a
> > > > > >>>>>>>
> > > > > >>>>>>> batch starts and ends, so the operators can do whatever
> that
> > is
> > > > > >>>>>>>>
> > > > > >>>>>>> needed
> > > > > >>>
> > > > > >>>> upon
> > > > > >>>>>>>
> > > > > >>>>>>> the start or the end of a batch.
> > > > > >>>>>>>>
> > > > > >>>>>>>> 2) Watermark: To support the concepts of event time
> > windowing,
> > > > the
> > > > > >>>>>>>> watermark control tuple is needed to tell which windows
> > should
> > > > be
> > > > > >>>>>>>> considered late.
> > > > > >>>>>>>>
> > > > > >>>>>>>> 3) Changing operator properties: We do have the support of
> > > > > changing
> > > > > >>>>>>>> operator properties on the fly, but with a custom control
> > > tuple,
> > > > > the
> > > > > >>>>>>>> command to change operator properties can be window
> aligned
> > > for
> > > > > all
> > > > > >>>>>>>> partitions and also across the DAG.
> > > > > >>>>>>>>
> > > > > >>>>>>>> 4) Recording tuples: Like changing operator properties, we
> > do
> > > > have
> > > > > >>>>>>>>
> > > > > >>>>>>> this
> > > > > >>>>
> > > > > >>>>> support now but only at the individual physical operator
> level,
> > > and
> > > > > >>>>>>>>
> > > > > >>>>>>>> without
> > > > > >>>>>>>
> > > > > >>>>>>> control of which window to record tuples for. With a custom
> > > > control
> > > > > >>>>>>>>
> > > > > >>>>>>>> tuple,
> > > > > >>>>>>>
> > > > > >>>>>>> because a control tuple must belong to a window, all
> > operators
> > > in
> > > > > >>>>>>>>
> > > > > >>>>>>> the
> > > > > >>>
> > > > > >>>> DAG
> > > > > >>>>>>>> can start (and stop) recording for the same windows.
> > > > > >>>>>>>>
> > > > > >>>>>>>> I can think of two options to achieve this:
> > > > > >>>>>>>>
> > > > > >>>>>>>> 1) new custom control tuple type that takes user's
> > > serializable
> > > > > >>>>>>>>
> > > > > >>>>>>> object.
> > > > > >>>>
> > > > > >>>>> 2) piggy back the current BEGIN_WINDOW and END_WINDOW control
> > > > > >>>>>>>>
> > > > > >>>>>>> tuples.
> > > > > >>>
> > > > > >>>> Please provide your feedback. Thank you.
> > > > > >>>>>>>>
> > > > > >>>>>>>> David
> > > > > >>>>>>>>
> > > > > >>>>>>>>
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to