There have been some recent developments and discussions on the schema side
(link below) that warrant a reconsideration of how control tuples get
delivered.

http://apache.markmail.org/search/?q=apex+list%3Aorg.apache.apex.dev+schema+
discovery+support#query:apex%20list%3Aorg.apache.apex.dev%
20schema%20discovery%20support+page:1+mid:oaji26y3xfozap5v+state:results

What I would like to suggest is that we allow two delivery options for
control tuples which can be configured on a per control tuple basis. First
is to deliver control tuple to the operator when the first instance of the
tuple arrives from any path. Second option is to deliver the control tuple when
the last instance of the tuple arrives from all the paths or at the end
window if it is going to be difficult to determine the last arrival. The
developer can choose the delivery option for the control tuple preferably
when the tuple is created. The first option will be useful for scenarios
like schema propagation or begin file in case of batch cases. The second
option will be useful for tuples like end file or end batch in batch use
cases.

Thanks

On Sat, Dec 3, 2016 at 1:12 AM, David Yan <da...@datatorrent.com> wrote:

> Bhupesh, Sandesh, Tushar:
>
> Thanks for volunteering. This task probably needs all three of you to work
> closely together.
>
> The subtasks so far are:
>
> https://issues.apache.org/jira/browse/APEXCORE-580
> https://issues.apache.org/jira/browse/APEXCORE-581
>
> Please first review the subtasks and see whether anything is missing and
> add your thoughts to the tickets if you have any preliminary idea how to
> implement them.
>
> By the way, I think APEXCORE-581 is more involving and it might be a good
> idea to split that up further. It also makes sense since there are three of
> you.
>
> David
>
>
> On Thu, Dec 1, 2016 at 3:41 AM, Tushar Gosavi <tus...@datatorrent.com>
> wrote:
>
> > I am also interested working on this feature.
> >
> > - Tushar.
> >
> >
> > On Thu, Dec 1, 2016 at 10:27 AM, Bhupesh Chawda <bhup...@datatorrent.com
> >
> > wrote:
> > > I would like to work on https://issues.apache.org/
> > jira/browse/APEXCORE-580.
> > >
> > > ~ Bhupesh
> > >
> > > On Thu, Dec 1, 2016 at 5:42 AM, Sandesh Hegde <sand...@datatorrent.com
> >
> > > wrote:
> > >
> > >> I am interested in working on the following subtask
> > >>
> > >> https://issues.apache.org/jira/browse/APEXCORE-581
> > >>
> > >> Thanks
> > >>
> > >>
> > >> On Wed, Nov 30, 2016 at 2:07 PM David Yan <da...@datatorrent.com>
> > wrote:
> > >>
> > >> > I have created an umbrella ticket for control tuple support:
> > >> >
> > >> > https://issues.apache.org/jira/browse/APEXCORE-579
> > >> >
> > >> > Currently it has two subtasks. Please have a look at them and see
> > whether
> > >> > I'm missing anything or if you have anything to add. You are welcome
> > to
> > >> add
> > >> > more subtasks or comment on the existing subtasks.
> > >> >
> > >> > We would like to kick start the implementation soon.
> > >> >
> > >> > Thanks!
> > >> >
> > >> > David
> > >> >
> > >> > On Mon, Nov 28, 2016 at 5:22 PM, Bhupesh Chawda <
> > bhup...@datatorrent.com
> > >> >
> > >> > wrote:
> > >> >
> > >> > > +1 for the plan.
> > >> > >
> > >> > > I would be interested in contributing to this feature.
> > >> > >
> > >> > > ~ Bhupesh
> > >> > >
> > >> > > On Nov 29, 2016 03:26, "Sandesh Hegde" <sand...@datatorrent.com>
> > >> wrote:
> > >> > >
> > >> > > > I am interested in contributing to this feature.
> > >> > > >
> > >> > > > On Mon, Nov 28, 2016 at 1:54 PM David Yan <
> da...@datatorrent.com>
> > >> > wrote:
> > >> > > >
> > >> > > > > I think we should probably go ahead with option 1 since this
> > works
> > >> > with
> > >> > > > > most use cases and prevents developers from shooting
> themselves
> > in
> > >> > the
> > >> > > > foot
> > >> > > > > in terms of idempotency.
> > >> > > > >
> > >> > > > > We can have a configuration property that enables option 2
> > later if
> > >> > we
> > >> > > > have
> > >> > > > > concrete use cases that call for it.
> > >> > > > >
> > >> > > > > Please share your thoughts if you think you don't agree with
> > this
> > >> > plan.
> > >> > > > > Also, please indicate if you're interested in contributing to
> > this
> > >> > > > feature.
> > >> > > > >
> > >> > > > > David
> > >> > > > >
> > >> > > > > On Sun, Nov 27, 2016 at 9:02 PM, Bhupesh Chawda <
> > >> > > bhup...@datatorrent.com
> > >> > > > >
> > >> > > > > wrote:
> > >> > > > >
> > >> > > > > > It appears that option 1 is more favored due to
> unavailability
> > >> of a
> > >> > > use
> > >> > > > > > case which could use option 2.
> > >> > > > > >
> > >> > > > > > However, option 2 is problematic in specific cases, like
> > presence
> > >> > of
> > >> > > > > > multiple input ports for example. In case of a linear DAG
> > where
> > >> > > control
> > >> > > > > > tuples are flowing in order with the data tuples, it should
> > not
> > >> be
> > >> > > > > > difficult to guarantee idempotency. For example, cases where
> > >> there
> > >> > > > could
> > >> > > > > be
> > >> > > > > > multiple changes in behavior of an operator during a single
> > >> window,
> > >> > > it
> > >> > > > > > should not wait until end window for these changes to take
> > >> effect.
> > >> > > > Since,
> > >> > > > > > we don't have a concrete use case right now, perhaps we do
> not
> > >> want
> > >> > > to
> > >> > > > go
> > >> > > > > > that road. This feature should be available through a
> platform
> > >> > > > attribute
> > >> > > > > > (may be at a later point in time) where the default is
> option
> > 1.
> > >> > > > > >
> > >> > > > > > I think option 1 is suitable for a starting point in the
> > >> > > implementation
> > >> > > > > of
> > >> > > > > > this feature and we should proceed with it.
> > >> > > > > >
> > >> > > > > > ~ Bhupesh
> > >> > > > > >
> > >> > > > > >
> > >> > > > > >
> > >> > > > > > On Fri, Nov 11, 2016 at 12:59 AM, David Yan <
> > >> da...@datatorrent.com
> > >> > >
> > >> > > > > wrote:
> > >> > > > > >
> > >> > > > > > > Good question Tushar. The callback should be called only
> > once.
> > >> > > > > > > The way to implement this is to keep a list of control
> tuple
> > >> > hashes
> > >> > > > for
> > >> > > > > > the
> > >> > > > > > > given streaming window and only do the callback when the
> > >> operator
> > >> > > has
> > >> > > > > not
> > >> > > > > > > seen it before.
> > >> > > > > > >
> > >> > > > > > > Other thoughts?
> > >> > > > > > >
> > >> > > > > > > David
> > >> > > > > > >
> > >> > > > > > > On Thu, Nov 10, 2016 at 9:32 AM, Tushar Gosavi <
> > >> > > > tus...@datatorrent.com
> > >> > > > > >
> > >> > > > > > > wrote:
> > >> > > > > > >
> > >> > > > > > > > Hi David,
> > >> > > > > > > >
> > >> > > > > > > > What would be the behaviour in case where we have a DAG
> > with
> > >> > > > > following
> > >> > > > > > > > operators, the number in bracket is number of
> partitions,
> > X
> > >> is
> > >> > > NxM
> > >> > > > > > > > partitioning.
> > >> > > > > > > > A(1) X B(4) X C(2)
> > >> > > > > > > >
> > >> > > > > > > > If A sends a control tuple, it will be sent to all 4
> > >> partition
> > >> > of
> > >> > > > B,
> > >> > > > > > > > and from each partition from B it goes to C, i.e each
> > >> partition
> > >> > > of
> > >> > > > C
> > >> > > > > > > > will receive same control tuple originated from A
> multiple
> > >> > times
> > >> > > > > > > > (number of upstream partitions of C). In this case will
> > the
> > >> > > > callback
> > >> > > > > > > > function get called multiple times or just once.
> > >> > > > > > > >
> > >> > > > > > > > -Tushar.
> > >> > > > > > > >
> > >> > > > > > > >
> > >> > > > > > > > On Fri, Nov 4, 2016 at 12:14 AM, David Yan <
> > >> > > da...@datatorrent.com>
> > >> > > > > > > wrote:
> > >> > > > > > > > > Hi Bhupesh,
> > >> > > > > > > > >
> > >> > > > > > > > > Since each input port has its own incoming control
> > tuple, I
> > >> > > would
> > >> > > > > > > imagine
> > >> > > > > > > > > there would be an additional DefaultInputPort.
> > >> processControl
> > >> > > > method
> > >> > > > > > > that
> > >> > > > > > > > > operator developers can override.
> > >> > > > > > > > > If we go for option 1, my thinking is that the control
> > >> tuples
> > >> > > > would
> > >> > > > > > > > always
> > >> > > > > > > > > be delivered at the next window boundary, even if the
> > emit
> > >> > > method
> > >> > > > > is
> > >> > > > > > > > called
> > >> > > > > > > > > within a window.
> > >> > > > > > > > >
> > >> > > > > > > > > David
> > >> > > > > > > > >
> > >> > > > > > > > > On Thu, Nov 3, 2016 at 1:46 AM, Bhupesh Chawda <
> > >> > > > > > > bhup...@datatorrent.com>
> > >> > > > > > > > > wrote:
> > >> > > > > > > > >
> > >> > > > > > > > >> I have a question regarding the callback for a
> control
> > >> > tuple.
> > >> > > > Will
> > >> > > > > > it
> > >> > > > > > > be
> > >> > > > > > > > >> similar to InputPort::process() method? Something
> like
> > >> > > > > > > > >> InputPort::processControlTuple(t)
> > >> > > > > > > > >> ? Or will it be a method of the operator similar to
> > >> > > > beginWindow()?
> > >> > > > > > > > >>
> > >> > > > > > > > >> When we say that the control tuple will be delivered
> at
> > >> > window
> > >> > > > > > > boundary,
> > >> > > > > > > > >> does that mean all control tuples emitted in that
> > window
> > >> > will
> > >> > > be
> > >> > > > > > > > processed
> > >> > > > > > > > >> together at the end of the window? This would imply
> > that
> > >> > there
> > >> > > > is
> > >> > > > > no
> > >> > > > > > > > >> ordering among regular tuples and control tuples.
> > >> > > > > > > > >>
> > >> > > > > > > > >> I think we should get started with the option 1 -
> > control
> > >> > > tuples
> > >> > > > > at
> > >> > > > > > > > window
> > >> > > > > > > > >> boundary, which seems to handle most of the use
> cases.
> > For
> > >> > > some
> > >> > > > > > cases
> > >> > > > > > > > which
> > >> > > > > > > > >> require option 2, we can always build on this.
> > >> > > > > > > > >>
> > >> > > > > > > > >> ~ Bhupesh
> > >> > > > > > > > >>
> > >> > > > > > > > >> On Thu, Nov 3, 2016 at 1:35 PM, Thomas Weise <
> > >> > t...@apache.org>
> > >> > > > > > wrote:
> > >> > > > > > > > >>
> > >> > > > > > > > >> > I don't see how that would work. Suppose you have a
> > file
> > >> > > > > splitter
> > >> > > > > > > and
> > >> > > > > > > > >> > multiple partitions of block readers. The "end of
> > file"
> > >> > > event
> > >> > > > > > cannot
> > >> > > > > > > > be
> > >> > > > > > > > >> > processed downstream until all block readers are
> > done. I
> > >> > > also
> > >> > > > > > think
> > >> > > > > > > > that
> > >> > > > > > > > >> > this is related to the batch demarcation discussion
> > and
> > >> > > there
> > >> > > > > > should
> > >> > > > > > > > be a
> > >> > > > > > > > >> > single generalized mechanism to support this.
> > >> > > > > > > > >> >
> > >> > > > > > > > >> >
> > >> > > > > > > > >> > On Wed, Nov 2, 2016 at 10:51 PM, Pramod Immaneni <
> > >> > > > > > > > pra...@datatorrent.com
> > >> > > > > > > > >> >
> > >> > > > > > > > >> > wrote:
> > >> > > > > > > > >> >
> > >> > > > > > > > >> > > Suppose I am processing data in a file and I want
> > to
> > >> do
> > >> > > > > > something
> > >> > > > > > > at
> > >> > > > > > > > >> the
> > >> > > > > > > > >> > > end of a file at the output operator, I would
> send
> > an
> > >> > end
> > >> > > > file
> > >> > > > > > > > control
> > >> > > > > > > > >> > > tuple and act on it when I receive it at the
> > output.
> > >> In
> > >> > a
> > >> > > > > single
> > >> > > > > > > > >> window I
> > >> > > > > > > > >> > > may end up processing multiple files and if I
> don't
> > >> have
> > >> > > > > > multiple
> > >> > > > > > > > ports
> > >> > > > > > > > >> > and
> > >> > > > > > > > >> > > logical paths through the DAG (multiple
> partitions
> > are
> > >> > > ok).
> > >> > > > I
> > >> > > > > > can
> > >> > > > > > > > >> process
> > >> > > > > > > > >> > > end of each file immediately and also know what
> > file
> > >> was
> > >> > > > > closed
> > >> > > > > > > > without
> > >> > > > > > > > >> > > sending extra identification information in the
> end
> > >> file
> > >> > > > > which I
> > >> > > > > > > > would
> > >> > > > > > > > >> > need
> > >> > > > > > > > >> > > if I am collecting all of them and processing at
> > the
> > >> end
> > >> > > of
> > >> > > > > the
> > >> > > > > > > > window.
> > >> > > > > > > > >> > >
> > >> > > > > > > > >> > > On Wed, Nov 2, 2016 at 2:45 PM, Thomas Weise <
> > >> > > > t...@apache.org>
> > >> > > > > > > > wrote:
> > >> > > > > > > > >> > >
> > >> > > > > > > > >> > > > The use cases listed in the original discussion
> > >> don't
> > >> > > call
> > >> > > > > for
> > >> > > > > > > > option
> > >> > > > > > > > >> > 2.
> > >> > > > > > > > >> > > It
> > >> > > > > > > > >> > > > seems to come with additional complexity and
> > >> > > > implementation
> > >> > > > > > > cost.
> > >> > > > > > > > >> > > >
> > >> > > > > > > > >> > > > Can those in favor of option 2 please also
> > provide
> > >> the
> > >> > > use
> > >> > > > > > case
> > >> > > > > > > > for
> > >> > > > > > > > >> it.
> > >> > > > > > > > >> > > >
> > >> > > > > > > > >> > > > Thanks,
> > >> > > > > > > > >> > > > Thomas
> > >> > > > > > > > >> > > >
> > >> > > > > > > > >> > > >
> > >> > > > > > > > >> > > > On Wed, Nov 2, 2016 at 10:36 PM, Siyuan Hua <
> > >> > > > > > > > siy...@datatorrent.com>
> > >> > > > > > > > >> > > > wrote:
> > >> > > > > > > > >> > > >
> > >> > > > > > > > >> > > > > I will vote for approach 1.
> > >> > > > > > > > >> > > > >
> > >> > > > > > > > >> > > > > First of all that one sounds easier to do to
> > me.
> > >> > And I
> > >> > > > > think
> > >> > > > > > > > >> > > idempotency
> > >> > > > > > > > >> > > > is
> > >> > > > > > > > >> > > > > important. It may run at the cost of higher
> > >> latency
> > >> > > but
> > >> > > > I
> > >> > > > > > > think
> > >> > > > > > > > it
> > >> > > > > > > > >> is
> > >> > > > > > > > >> > > ok
> > >> > > > > > > > >> > > > >
> > >> > > > > > > > >> > > > > And in addition, when in the future if users
> do
> > >> need
> > >> > > > > > realtime
> > >> > > > > > > > >> control
> > >> > > > > > > > >> > > > tuple
> > >> > > > > > > > >> > > > > processing, we can always add the option on
> > top of
> > >> > it.
> > >> > > > > > > > >> > > > >
> > >> > > > > > > > >> > > > > So I vote for 1
> > >> > > > > > > > >> > > > >
> > >> > > > > > > > >> > > > > Thanks,
> > >> > > > > > > > >> > > > > Siyuan
> > >> > > > > > > > >> > > > >
> > >> > > > > > > > >> > > > > On Wed, Nov 2, 2016 at 1:28 PM, Pradeep A.
> > Dalvi <
> > >> > > > > > > > p...@apache.org>
> > >> > > > > > > > >> > > > wrote:
> > >> > > > > > > > >> > > > >
> > >> > > > > > > > >> > > > > > As a rule of thumb in any real time
> operating
> > >> > > system,
> > >> > > > > > > control
> > >> > > > > > > > >> > tuples
> > >> > > > > > > > >> > > > > should
> > >> > > > > > > > >> > > > > > always be handled using Priority Queues.
> > >> > > > > > > > >> > > > > >
> > >> > > > > > > > >> > > > > > We may try to control priorities by
> defining
> > >> > levels.
> > >> > > > And
> > >> > > > > > > shall
> > >> > > > > > > > >> not
> > >> > > > > > > > >> > > > > > be delivered at window boundaries.
> > >> > > > > > > > >> > > > > >
> > >> > > > > > > > >> > > > > > In short, control tuples shall never be
> > treated
> > >> as
> > >> > > any
> > >> > > > > > other
> > >> > > > > > > > >> tuples
> > >> > > > > > > > >> > > in
> > >> > > > > > > > >> > > > > real
> > >> > > > > > > > >> > > > > > time systems.
> > >> > > > > > > > >> > > > > >
> > >> > > > > > > > >> > > > > > On Thursday, November 3, 2016, David Yan <
> > >> > > > > > > > da...@datatorrent.com>
> > >> > > > > > > > >> > > > wrote:
> > >> > > > > > > > >> > > > > >
> > >> > > > > > > > >> > > > > > > Hi all,
> > >> > > > > > > > >> > > > > > >
> > >> > > > > > > > >> > > > > > > I would like to renew the discussion of
> > >> control
> > >> > > > > tuples.
> > >> > > > > > > > >> > > > > > >
> > >> > > > > > > > >> > > > > > > Last time, we were in a debate about
> > whether:
> > >> > > > > > > > >> > > > > > >
> > >> > > > > > > > >> > > > > > > 1) the platform should enforce that
> control
> > >> > tuples
> > >> > > > are
> > >> > > > > > > > >> delivered
> > >> > > > > > > > >> > at
> > >> > > > > > > > >> > > > > > window
> > >> > > > > > > > >> > > > > > > boundaries only
> > >> > > > > > > > >> > > > > > >
> > >> > > > > > > > >> > > > > > > or:
> > >> > > > > > > > >> > > > > > >
> > >> > > > > > > > >> > > > > > > 2) the platform should deliver control
> > tuples
> > >> > just
> > >> > > > as
> > >> > > > > > > other
> > >> > > > > > > > >> > tuples
> > >> > > > > > > > >> > > > and
> > >> > > > > > > > >> > > > > > it's
> > >> > > > > > > > >> > > > > > > the operator developers' choice whether
> to
> > >> > handle
> > >> > > > the
> > >> > > > > > > > control
> > >> > > > > > > > >> > > tuples
> > >> > > > > > > > >> > > > as
> > >> > > > > > > > >> > > > > > > they arrive or delay the processing till
> > the
> > >> > next
> > >> > > > > window
> > >> > > > > > > > >> > boundary.
> > >> > > > > > > > >> > > > > > >
> > >> > > > > > > > >> > > > > > > To summarize the pros and cons:
> > >> > > > > > > > >> > > > > > >
> > >> > > > > > > > >> > > > > > > Approach 1: If processing control tuples
> > >> results
> > >> > > in
> > >> > > > > > > changes
> > >> > > > > > > > of
> > >> > > > > > > > >> > the
> > >> > > > > > > > >> > > > > > behavior
> > >> > > > > > > > >> > > > > > > of the operator, if idempotency needs to
> be
> > >> > > > preserved,
> > >> > > > > > the
> > >> > > > > > > > >> > > processing
> > >> > > > > > > > >> > > > > > must
> > >> > > > > > > > >> > > > > > > be done at window boundaries. This
> approach
> > >> will
> > >> > > > save
> > >> > > > > > the
> > >> > > > > > > > >> > operator
> > >> > > > > > > > >> > > > > > > developers headache to ensure that.
> > However,
> > >> > this
> > >> > > > will
> > >> > > > > > > take
> > >> > > > > > > > >> away
> > >> > > > > > > > >> > > the
> > >> > > > > > > > >> > > > > > > choices from the operator developer if
> they
> > >> just
> > >> > > > need
> > >> > > > > to
> > >> > > > > > > > >> process
> > >> > > > > > > > >> > > the
> > >> > > > > > > > >> > > > > > > control tuples as soon as possible.
> > >> > > > > > > > >> > > > > > >
> > >> > > > > > > > >> > > > > > > Approach 2: The operator has a chance to
> > >> > > immediately
> > >> > > > > > > process
> > >> > > > > > > > >> > > control
> > >> > > > > > > > >> > > > > > > tuples. This would be useful if latency
> is
> > >> more
> > >> > > > valued
> > >> > > > > > > than
> > >> > > > > > > > >> > > > > correctness.
> > >> > > > > > > > >> > > > > > > However, if this would open the
> possibility
> > >> for
> > >> > > > > operator
> > >> > > > > > > > >> > developers
> > >> > > > > > > > >> > > > to
> > >> > > > > > > > >> > > > > > > shoot themselves in the foot. This is
> > >> especially
> > >> > > > true
> > >> > > > > if
> > >> > > > > > > > there
> > >> > > > > > > > >> > are
> > >> > > > > > > > >> > > > > > multiple
> > >> > > > > > > > >> > > > > > > input ports. as there is no easy way to
> > >> > guarantee
> > >> > > > > > > processing
> > >> > > > > > > > >> > order
> > >> > > > > > > > >> > > > for
> > >> > > > > > > > >> > > > > > > multiple input ports.
> > >> > > > > > > > >> > > > > > >
> > >> > > > > > > > >> > > > > > > We would like to arrive to a consensus
> and
> > >> close
> > >> > > > this
> > >> > > > > > > > >> discussion
> > >> > > > > > > > >> > > soon
> > >> > > > > > > > >> > > > > > this
> > >> > > > > > > > >> > > > > > > time so we can start the work on this
> > >> important
> > >> > > > > feature.
> > >> > > > > > > > >> > > > > > >
> > >> > > > > > > > >> > > > > > > Thanks!
> > >> > > > > > > > >> > > > > > >
> > >> > > > > > > > >> > > > > > > David
> > >> > > > > > > > >> > > > > > >
> > >> > > > > > > > >> > > > > > > On Tue, Jun 28, 2016 at 10:04 AM, Vlad
> > Rozov <
> > >> > > > > > > > >> > > > v.ro...@datatorrent.com
> > >> > > > > > > > >> > > > > > > <javascript:;>>
> > >> > > > > > > > >> > > > > > > wrote:
> > >> > > > > > > > >> > > > > > >
> > >> > > > > > > > >> > > > > > > > It is not clear how operator will emit
> > >> custom
> > >> > > > > control
> > >> > > > > > > > tuple
> > >> > > > > > > > >> at
> > >> > > > > > > > >> > > > window
> > >> > > > > > > > >> > > > > > > > boundaries. One way is to
> > cache/accumulate
> > >> > > control
> > >> > > > > > > tuples
> > >> > > > > > > > in
> > >> > > > > > > > >> > the
> > >> > > > > > > > >> > > > > > operator
> > >> > > > > > > > >> > > > > > > > output port till window closes
> > (END_WINDOW
> > >> is
> > >> > > > > inserted
> > >> > > > > > > > into
> > >> > > > > > > > >> the
> > >> > > > > > > > >> > > > > output
> > >> > > > > > > > >> > > > > > > > sink) or only allow an operator to emit
> > >> > control
> > >> > > > > tuples
> > >> > > > > > > > inside
> > >> > > > > > > > >> > the
> > >> > > > > > > > >> > > > > > > > endWindow(). The later is a slight
> > variation
> > >> > of
> > >> > > > the
> > >> > > > > > > > operator
> > >> > > > > > > > >> > > output
> > >> > > > > > > > >> > > > > > port
> > >> > > > > > > > >> > > > > > > > caching behavior with the only
> difference
> > >> that
> > >> > > now
> > >> > > > > the
> > >> > > > > > > > >> operator
> > >> > > > > > > > >> > > > > itself
> > >> > > > > > > > >> > > > > > is
> > >> > > > > > > > >> > > > > > > > responsible for caching/accumulating
> > control
> > >> > > > tuples.
> > >> > > > > > > Note
> > >> > > > > > > > >> that
> > >> > > > > > > > >> > in
> > >> > > > > > > > >> > > > > many
> > >> > > > > > > > >> > > > > > > > cases it will be necessary to postpone
> > >> > emitting
> > >> > > > > > payload
> > >> > > > > > > > >> tuples
> > >> > > > > > > > >> > > that
> > >> > > > > > > > >> > > > > > > > logically come after the custom control
> > >> tuple
> > >> > > till
> > >> > > > > the
> > >> > > > > > > > next
> > >> > > > > > > > >> > > window
> > >> > > > > > > > >> > > > > > > begins.
> > >> > > > > > > > >> > > > > > > >
> > >> > > > > > > > >> > > > > > > > IMO, that too restrictive and in a case
> > >> where
> > >> > > > input
> > >> > > > > > > > operator
> > >> > > > > > > > >> > > uses a
> > >> > > > > > > > >> > > > > > push
> > >> > > > > > > > >> > > > > > > > instead of a poll (for example, it
> > provides
> > >> an
> > >> > > end
> > >> > > > > > point
> > >> > > > > > > > >> where
> > >> > > > > > > > >> > > > remote
> > >> > > > > > > > >> > > > > > > > agents may connect and publish/push
> > data),
> > >> > > control
> > >> > > > > > > tuples
> > >> > > > > > > > may
> > >> > > > > > > > >> > be
> > >> > > > > > > > >> > > > used
> > >> > > > > > > > >> > > > > > for
> > >> > > > > > > > >> > > > > > > > connect/disconnect/watermark broadcast
> to
> > >> > > > > > (partitioned)
> > >> > > > > > > > >> > > downstream
> > >> > > > > > > > >> > > > > > > > operators. In this case the platform
> just
> > >> need
> > >> > > to
> > >> > > > > > > > guarantee
> > >> > > > > > > > >> > order
> > >> > > > > > > > >> > > > > > barrier
> > >> > > > > > > > >> > > > > > > > (any tuple emitted prior to a control
> > tuple
> > >> > > needs
> > >> > > > to
> > >> > > > > > be
> > >> > > > > > > > >> > delivered
> > >> > > > > > > > >> > > > > prior
> > >> > > > > > > > >> > > > > > > to
> > >> > > > > > > > >> > > > > > > > the control tuple).
> > >> > > > > > > > >> > > > > > > >
> > >> > > > > > > > >> > > > > > > > Thank you,
> > >> > > > > > > > >> > > > > > > >
> > >> > > > > > > > >> > > > > > > > Vlad
> > >> > > > > > > > >> > > > > > > >
> > >> > > > > > > > >> > > > > > > >
> > >> > > > > > > > >> > > > > > > >
> > >> > > > > > > > >> > > > > > > > On 6/27/16 19:36, Amol Kekre wrote:
> > >> > > > > > > > >> > > > > > > >
> > >> > > > > > > > >> > > > > > > >> I agree with David. Allowing control
> > tuples
> > >> > > > within
> > >> > > > > a
> > >> > > > > > > > window
> > >> > > > > > > > >> > > (along
> > >> > > > > > > > >> > > > > > with
> > >> > > > > > > > >> > > > > > > >> data tuples) creates very dangerous
> > >> situation
> > >> > > > where
> > >> > > > > > > > >> guarantees
> > >> > > > > > > > >> > > are
> > >> > > > > > > > >> > > > > > > >> impacted. It is much safer to enable
> > >> control
> > >> > > > tuples
> > >> > > > > > > > >> > > (send/receive)
> > >> > > > > > > > >> > > > > at
> > >> > > > > > > > >> > > > > > > >> window boundaries (after END_WINDOW of
> > >> window
> > >> > > N,
> > >> > > > > and
> > >> > > > > > > > before
> > >> > > > > > > > >> > > > > > BEGIN_WINDOW
> > >> > > > > > > > >> > > > > > > >> for window N+1). My take on David's
> > list is
> > >> > > > > > > > >> > > > > > > >>
> > >> > > > > > > > >> > > > > > > >> 1. -> window boundaries -> Strong +1;
> > there
> > >> > > will
> > >> > > > > be a
> > >> > > > > > > big
> > >> > > > > > > > >> > issue
> > >> > > > > > > > >> > > > with
> > >> > > > > > > > >> > > > > > > >> guarantees for operators with multiple
> > >> ports.
> > >> > > > (see
> > >> > > > > > > > Thomas's
> > >> > > > > > > > >> > > > > response)
> > >> > > > > > > > >> > > > > > > >> 2. -> All downstream windows -> +1,
> but
> > >> there
> > >> > > are
> > >> > > > > > > > >> situations;
> > >> > > > > > > > >> > a
> > >> > > > > > > > >> > > > > caveat
> > >> > > > > > > > >> > > > > > > >> could be "only to operators that
> > implement
> > >> > > > control
> > >> > > > > > > tuple
> > >> > > > > > > > >> > > > > > > >> interface/listeners", which could
> > >> effectively
> > >> > > > > > > translates
> > >> > > > > > > > to
> > >> > > > > > > > >> > "all
> > >> > > > > > > > >> > > > > > > >> interested
> > >> > > > > > > > >> > > > > > > >> downstream operators"
> > >> > > > > > > > >> > > > > > > >> 3. Only Input operator can create
> > control
> > >> > > tuples
> > >> > > > ->
> > >> > > > > > -1;
> > >> > > > > > > > is
> > >> > > > > > > > >> > > > > restrictive
> > >> > > > > > > > >> > > > > > > >> even
> > >> > > > > > > > >> > > > > > > >> though most likely 95% of the time it
> > will
> > >> be
> > >> > > > input
> > >> > > > > > > > >> operators
> > >> > > > > > > > >> > > > > > > >>
> > >> > > > > > > > >> > > > > > > >> Thks,
> > >> > > > > > > > >> > > > > > > >> Amol
> > >> > > > > > > > >> > > > > > > >>
> > >> > > > > > > > >> > > > > > > >>
> > >> > > > > > > > >> > > > > > > >> On Mon, Jun 27, 2016 at 4:37 PM,
> Thomas
> > >> > Weise <
> > >> > > > > > > > >> > > > > tho...@datatorrent.com
> > >> > > > > > > > >> > > > > > > <javascript:;>>
> > >> > > > > > > > >> > > > > > > >> wrote:
> > >> > > > > > > > >> > > > > > > >>
> > >> > > > > > > > >> > > > > > > >> The windowing we discuss here is in
> > general
> > >> > > event
> > >> > > > > > time
> > >> > > > > > > > >> based,
> > >> > > > > > > > >> > > > > arrival
> > >> > > > > > > > >> > > > > > > time
> > >> > > > > > > > >> > > > > > > >>> is a special case of it.
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>> I don't think state changes can be
> made
> > >> > > > > independent
> > >> > > > > > of
> > >> > > > > > > > the
> > >> > > > > > > > >> > > > > streaming
> > >> > > > > > > > >> > > > > > > >>> window
> > >> > > > > > > > >> > > > > > > >>> boundary as it would prevent
> idempotent
> > >> > > > processing
> > >> > > > > > and
> > >> > > > > > > > >> > > > transitively
> > >> > > > > > > > >> > > > > > > >>> exactly
> > >> > > > > > > > >> > > > > > > >>> once. For that to work, tuples need
> to
> > be
> > >> > > > > presented
> > >> > > > > > to
> > >> > > > > > > > the
> > >> > > > > > > > >> > > > operator
> > >> > > > > > > > >> > > > > > in
> > >> > > > > > > > >> > > > > > > a
> > >> > > > > > > > >> > > > > > > >>> guaranteed order *within* the
> streaming
> > >> > > window,
> > >> > > > > > which
> > >> > > > > > > is
> > >> > > > > > > > >> not
> > >> > > > > > > > >> > > > > possible
> > >> > > > > > > > >> > > > > > > >>> with
> > >> > > > > > > > >> > > > > > > >>> multiple ports (and partitions).
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>> Thomas
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>> On Mon, Jun 27, 2016 at 2:53 PM,
> David
> > >> Yan <
> > >> > > > > > > > >> > > > da...@datatorrent.com
> > >> > > > > > > > >> > > > > > > <javascript:;>>
> > >> > > > > > > > >> > > > > > > >>> wrote:
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>> I think for session tracking, if the
> > >> session
> > >> > > > > > > boundaries
> > >> > > > > > > > are
> > >> > > > > > > > >> > > > allowed
> > >> > > > > > > > >> > > > > > to
> > >> > > > > > > > >> > > > > > > be
> > >> > > > > > > > >> > > > > > > >>>> not aligned with the streaming
> window
> > >> > > > boundaries,
> > >> > > > > > the
> > >> > > > > > > > user
> > >> > > > > > > > >> > > will
> > >> > > > > > > > >> > > > > > have a
> > >> > > > > > > > >> > > > > > > >>>>
> > >> > > > > > > > >> > > > > > > >>> much
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>>> bigger problem with idempotency. And
> > in
> > >> > most
> > >> > > > > cases,
> > >> > > > > > > > >> session
> > >> > > > > > > > >> > > > > tracking
> > >> > > > > > > > >> > > > > > > is
> > >> > > > > > > > >> > > > > > > >>>> event time based, not ingression
> time
> > or
> > >> > > > > processing
> > >> > > > > > > > time
> > >> > > > > > > > >> > > based,
> > >> > > > > > > > >> > > > so
> > >> > > > > > > > >> > > > > > > this
> > >> > > > > > > > >> > > > > > > >>>>
> > >> > > > > > > > >> > > > > > > >>> may
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>>> never be a problem. But if that ever
> > >> > happens,
> > >> > > > the
> > >> > > > > > > user
> > >> > > > > > > > can
> > >> > > > > > > > >> > > > always
> > >> > > > > > > > >> > > > > > > alter
> > >> > > > > > > > >> > > > > > > >>>>
> > >> > > > > > > > >> > > > > > > >>> the
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>>> default 500ms width.
> > >> > > > > > > > >> > > > > > > >>>>
> > >> > > > > > > > >> > > > > > > >>>> David
> > >> > > > > > > > >> > > > > > > >>>>
> > >> > > > > > > > >> > > > > > > >>>> On Mon, Jun 27, 2016 at 2:35 PM,
> Vlad
> > >> > Rozov <
> > >> > > > > > > > >> > > > > > v.ro...@datatorrent.com
> > >> > > > > > > > >> > > > > > > <javascript:;>>
> > >> > > > > > > > >> > > > > > > >>>> wrote:
> > >> > > > > > > > >> > > > > > > >>>>
> > >> > > > > > > > >> > > > > > > >>>> Ability to send custom control
> tuples
> > >> > within
> > >> > > > > window
> > >> > > > > > > > may be
> > >> > > > > > > > >> > > > useful,
> > >> > > > > > > > >> > > > > > for
> > >> > > > > > > > >> > > > > > > >>>>> example, for sessions tracking,
> where
> > >> > > session
> > >> > > > > > > > boundaries
> > >> > > > > > > > >> > are
> > >> > > > > > > > >> > > > not
> > >> > > > > > > > >> > > > > > > >>>>>
> > >> > > > > > > > >> > > > > > > >>>> aligned
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>>> with window boundaries and 500 ms
> > latency
> > >> > is
> > >> > > > not
> > >> > > > > > > > >> acceptable
> > >> > > > > > > > >> > > for
> > >> > > > > > > > >> > > > an
> > >> > > > > > > > >> > > > > > > >>>>> application.
> > >> > > > > > > > >> > > > > > > >>>>>
> > >> > > > > > > > >> > > > > > > >>>>> Thank you,
> > >> > > > > > > > >> > > > > > > >>>>>
> > >> > > > > > > > >> > > > > > > >>>>> Vlad
> > >> > > > > > > > >> > > > > > > >>>>>
> > >> > > > > > > > >> > > > > > > >>>>>
> > >> > > > > > > > >> > > > > > > >>>>> On 6/25/16 10:52, Thomas Weise
> wrote:
> > >> > > > > > > > >> > > > > > > >>>>>
> > >> > > > > > > > >> > > > > > > >>>>> It should not matter from where the
> > >> > control
> > >> > > > > tuple
> > >> > > > > > is
> > >> > > > > > > > >> > > triggered.
> > >> > > > > > > > >> > > > > It
> > >> > > > > > > > >> > > > > > > >>>>>>
> > >> > > > > > > > >> > > > > > > >>>>> will
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>>> be
> > >> > > > > > > > >> > > > > > > >>>>
> > >> > > > > > > > >> > > > > > > >>>>> good to have a generic mechanism to
> > >> > > propagate
> > >> > > > it
> > >> > > > > > and
> > >> > > > > > > > >> other
> > >> > > > > > > > >> > > > things
> > >> > > > > > > > >> > > > > > can
> > >> > > > > > > > >> > > > > > > >>>>>>
> > >> > > > > > > > >> > > > > > > >>>>> be
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>>> accomplished outside the engine. For
> > >> > example,
> > >> > > > the
> > >> > > > > > new
> > >> > > > > > > > >> > > > > comprehensive
> > >> > > > > > > > >> > > > > > > >>>>>> support
> > >> > > > > > > > >> > > > > > > >>>>>> for windowing will all be in
> Malhar,
> > >> > > nothing
> > >> > > > > that
> > >> > > > > > > the
> > >> > > > > > > > >> > engine
> > >> > > > > > > > >> > > > > needs
> > >> > > > > > > > >> > > > > > > to
> > >> > > > > > > > >> > > > > > > >>>>>>
> > >> > > > > > > > >> > > > > > > >>>>> know
> > >> > > > > > > > >> > > > > > > >>>>
> > >> > > > > > > > >> > > > > > > >>>>> about it except that we need the
> > control
> > >> > > tuple
> > >> > > > > for
> > >> > > > > > > > >> > watermark
> > >> > > > > > > > >> > > > > > > >>>>>>
> > >> > > > > > > > >> > > > > > > >>>>> propagation
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>>> and idempotent processing.
> > >> > > > > > > > >> > > > > > > >>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>> I also think the main difference
> to
> > >> other
> > >> > > > > tuples
> > >> > > > > > is
> > >> > > > > > > > the
> > >> > > > > > > > >> > need
> > >> > > > > > > > >> > > > to
> > >> > > > > > > > >> > > > > > send
> > >> > > > > > > > >> > > > > > > >>>>>>
> > >> > > > > > > > >> > > > > > > >>>>> it
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>>> to
> > >> > > > > > > > >> > > > > > > >>>>
> > >> > > > > > > > >> > > > > > > >>>>> all partitions. Which is similar to
> > >> > > checkpoint
> > >> > > > > > > window
> > >> > > > > > > > >> > tuples,
> > >> > > > > > > > >> > > > but
> > >> > > > > > > > >> > > > > > not
> > >> > > > > > > > >> > > > > > > >>>>>>
> > >> > > > > > > > >> > > > > > > >>>>> the
> > >> > > > > > > > >> > > > > > > >>>>
> > >> > > > > > > > >> > > > > > > >>>>> same. Here, we probably also need
> the
> > >> > > ability
> > >> > > > > for
> > >> > > > > > > the
> > >> > > > > > > > >> user
> > >> > > > > > > > >> > to
> > >> > > > > > > > >> > > > > > control
> > >> > > > > > > > >> > > > > > > >>>>>> whether such tuple should traverse
> > the
> > >> > > entire
> > >> > > > > DAG
> > >> > > > > > > or
> > >> > > > > > > > >> not.
> > >> > > > > > > > >> > > For
> > >> > > > > > > > >> > > > a
> > >> > > > > > > > >> > > > > > > batch
> > >> > > > > > > > >> > > > > > > >>>>>>
> > >> > > > > > > > >> > > > > > > >>>>> use
> > >> > > > > > > > >> > > > > > > >>>>
> > >> > > > > > > > >> > > > > > > >>>>> case, for example, we may want to
> > send
> > >> the
> > >> > > end
> > >> > > > > of
> > >> > > > > > > > file to
> > >> > > > > > > > >> > the
> > >> > > > > > > > >> > > > > next
> > >> > > > > > > > >> > > > > > > >>>>>> operator, but not beyond, if the
> > >> operator
> > >> > > has
> > >> > > > > > > > >> asynchronous
> > >> > > > > > > > >> > > > > > > processing
> > >> > > > > > > > >> > > > > > > >>>>>> logic
> > >> > > > > > > > >> > > > > > > >>>>>> in it.
> > >> > > > > > > > >> > > > > > > >>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>> For any logic to be idempotent,
> the
> > >> > control
> > >> > > > > tuple
> > >> > > > > > > > needs
> > >> > > > > > > > >> to
> > >> > > > > > > > >> > > be
> > >> > > > > > > > >> > > > > > > >>>>>>
> > >> > > > > > > > >> > > > > > > >>>>> processed
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>>> at
> > >> > > > > > > > >> > > > > > > >>>>
> > >> > > > > > > > >> > > > > > > >>>>> a window boundary. Receiving the
> > control
> > >> > > tuple
> > >> > > > > in
> > >> > > > > > > the
> > >> > > > > > > > >> > window
> > >> > > > > > > > >> > > > > > callback
> > >> > > > > > > > >> > > > > > > >>>>>> would
> > >> > > > > > > > >> > > > > > > >>>>>> avoid having to track extra state
> in
> > >> the
> > >> > > > > > operator.
> > >> > > > > > > I
> > >> > > > > > > > >> don't
> > >> > > > > > > > >> > > > think
> > >> > > > > > > > >> > > > > > > >>>>>>
> > >> > > > > > > > >> > > > > > > >>>>> that's
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>>> a
> > >> > > > > > > > >> > > > > > > >>>>
> > >> > > > > > > > >> > > > > > > >>>>> major issue, but what is the use
> case
> > >> for
> > >> > > > > > > processing a
> > >> > > > > > > > >> > > control
> > >> > > > > > > > >> > > > > > tuple
> > >> > > > > > > > >> > > > > > > >>>>>> within
> > >> > > > > > > > >> > > > > > > >>>>>> the window?
> > >> > > > > > > > >> > > > > > > >>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>> Thomas
> > >> > > > > > > > >> > > > > > > >>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>> On Sat, Jun 25, 2016 at 6:19 AM,
> > Pramod
> > >> > > > > Immaneni
> > >> > > > > > <
> > >> > > > > > > > >> > > > > > > >>>>>>
> > >> > > > > > > > >> > > > > > > >>>>> pra...@datatorrent.com
> > <javascript:;>>
> > >> > > > > > > > >> > > > > > > >>>>
> > >> > > > > > > > >> > > > > > > >>>>> wrote:
> > >> > > > > > > > >> > > > > > > >>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>> For the use cases you mentioned, I
> > >> think
> > >> > 1)
> > >> > > > and
> > >> > > > > > 2)
> > >> > > > > > > > are
> > >> > > > > > > > >> > more
> > >> > > > > > > > >> > > > > likely
> > >> > > > > > > > >> > > > > > > to
> > >> > > > > > > > >> > > > > > > >>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>> be controlled directly by the
> > >> > application,
> > >> > > > 3)
> > >> > > > > > and
> > >> > > > > > > 4)
> > >> > > > > > > > >> are
> > >> > > > > > > > >> > > more
> > >> > > > > > > > >> > > > > > > likely
> > >> > > > > > > > >> > > > > > > >>>>>>> going to be triggered externally
> > and
> > >> > > > directly
> > >> > > > > > > > handled
> > >> > > > > > > > >> by
> > >> > > > > > > > >> > > the
> > >> > > > > > > > >> > > > > > engine
> > >> > > > > > > > >> > > > > > > >>>>>>> and 3) is already being
> implemented
> > >> that
> > >> > > way
> > >> > > > > > > > >> > > (apexcore-163).
> > >> > > > > > > > >> > > > > > > >>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>> The control tuples emitted by an
> > >> > operator
> > >> > > > > would
> > >> > > > > > be
> > >> > > > > > > > sent
> > >> > > > > > > > >> > to
> > >> > > > > > > > >> > > > all
> > >> > > > > > > > >> > > > > > > >>>>>>> downstream partitions isn't it
> and
> > >> that
> > >> > > > would
> > >> > > > > be
> > >> > > > > > > the
> > >> > > > > > > > >> > chief
> > >> > > > > > > > >> > > > > > > >>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>> distinction
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>>> compared to data (apart from the
> > payload)
> > >> > > which
> > >> > > > > > would
> > >> > > > > > > > get
> > >> > > > > > > > >> > > > > > partitioned
> > >> > > > > > > > >> > > > > > > >>>>>>> under normal circumstances. It
> > would
> > >> > also
> > >> > > be
> > >> > > > > > > > guaranteed
> > >> > > > > > > > >> > > that
> > >> > > > > > > > >> > > > > > > >>>>>>> downstream partitions will
> receive
> > >> > control
> > >> > > > > > tuples
> > >> > > > > > > > only
> > >> > > > > > > > >> > > after
> > >> > > > > > > > >> > > > > the
> > >> > > > > > > > >> > > > > > > data
> > >> > > > > > > > >> > > > > > > >>>>>>> that was sent before it so we
> could
> > >> send
> > >> > > it
> > >> > > > > > > > immediately
> > >> > > > > > > > >> > > when
> > >> > > > > > > > >> > > > it
> > >> > > > > > > > >> > > > > > is
> > >> > > > > > > > >> > > > > > > >>>>>>> emitted as opposed to window
> > >> boundaries.
> > >> > > > > > > > >> > > > > > > >>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>> However during unification it is
> > >> > important
> > >> > > > to
> > >> > > > > > know
> > >> > > > > > > > if
> > >> > > > > > > > >> > these
> > >> > > > > > > > >> > > > > > control
> > >> > > > > > > > >> > > > > > > >>>>>>> tuples have been received from
> all
> > >> > > upstream
> > >> > > > > > > > partitions
> > >> > > > > > > > >> > > before
> > >> > > > > > > > >> > > > > > > >>>>>>> proceeding with a control
> > operation.
> > >> One
> > >> > > > could
> > >> > > > > > > wait
> > >> > > > > > > > >> till
> > >> > > > > > > > >> > > end
> > >> > > > > > > > >> > > > of
> > >> > > > > > > > >> > > > > > the
> > >> > > > > > > > >> > > > > > > >>>>>>> window but that introduces a
> delay
> > >> > however
> > >> > > > > > small,
> > >> > > > > > > I
> > >> > > > > > > > >> would
> > >> > > > > > > > >> > > > like
> > >> > > > > > > > >> > > > > to
> > >> > > > > > > > >> > > > > > > add
> > >> > > > > > > > >> > > > > > > >>>>>>> to the proposal that the platform
> > only
> > >> > > hand
> > >> > > > > over
> > >> > > > > > > the
> > >> > > > > > > > >> > > control
> > >> > > > > > > > >> > > > > > tuple
> > >> > > > > > > > >> > > > > > > to
> > >> > > > > > > > >> > > > > > > >>>>>>> the unifier when it has been
> > received
> > >> > from
> > >> > > > all
> > >> > > > > > > > upstream
> > >> > > > > > > > >> > > > > > partitions
> > >> > > > > > > > >> > > > > > > >>>>>>> much like how end window is
> > processed
> > >> > but
> > >> > > > not
> > >> > > > > > wait
> > >> > > > > > > > till
> > >> > > > > > > > >> > the
> > >> > > > > > > > >> > > > > > actual
> > >> > > > > > > > >> > > > > > > >>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>> end
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>>> of the window.
> > >> > > > > > > > >> > > > > > > >>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>> Regd your concern about
> > idempotency,
> > >> we
> > >> > > > > > typically
> > >> > > > > > > > care
> > >> > > > > > > > >> > > about
> > >> > > > > > > > >> > > > > > > >>>>>>> idempotency at a window level and
> > >> doing
> > >> > > the
> > >> > > > > > above
> > >> > > > > > > > will
> > >> > > > > > > > >> > > still
> > >> > > > > > > > >> > > > > > allow
> > >> > > > > > > > >> > > > > > > >>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>> the
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>>> operators to preserve that easily.
> > >> > > > > > > > >> > > > > > > >>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>> Thanks
> > >> > > > > > > > >> > > > > > > >>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>> On Jun 24, 2016, at 11:22 AM,
> David
> > >> Yan
> > >> > <
> > >> > > > > > > > >> > > > da...@datatorrent.com
> > >> > > > > > > > >> > > > > > > <javascript:;>>
> > >> > > > > > > > >> > > > > > > >>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>> wrote:
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>>> Hi all,
> > >> > > > > > > > >> > > > > > > >>>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>>> I would like to propose a new
> > feature
> > >> > to
> > >> > > > the
> > >> > > > > > Apex
> > >> > > > > > > > core
> > >> > > > > > > > >> > > > engine
> > >> > > > > > > > >> > > > > --
> > >> > > > > > > > >> > > > > > > the
> > >> > > > > > > > >> > > > > > > >>>>>>>> support of custom control
> tuples.
> > >> > > > Currently,
> > >> > > > > we
> > >> > > > > > > > have
> > >> > > > > > > > >> > > control
> > >> > > > > > > > >> > > > > > > tuples
> > >> > > > > > > > >> > > > > > > >>>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>> such
> > >> > > > > > > > >> > > > > > > >>>>
> > >> > > > > > > > >> > > > > > > >>>>> as
> > >> > > > > > > > >> > > > > > > >>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>> BEGIN_WINDOW, END_WINDOW,
> > CHECKPOINT,
> > >> > and
> > >> > > so
> > >> > > > > on,
> > >> > > > > > > > but we
> > >> > > > > > > > >> > > don't
> > >> > > > > > > > >> > > > > > have
> > >> > > > > > > > >> > > > > > > >>>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>> the
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>>> support for applications to insert
> > their
> > >> > own
> > >> > > > > > control
> > >> > > > > > > > >> tuples.
> > >> > > > > > > > >> > > The
> > >> > > > > > > > >> > > > > way
> > >> > > > > > > > >> > > > > > > >>>>>>>> currently to get around this is
> to
> > >> use
> > >> > > data
> > >> > > > > > > tuples
> > >> > > > > > > > and
> > >> > > > > > > > >> > > have
> > >> > > > > > > > >> > > > a
> > >> > > > > > > > >> > > > > > > >>>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>> separate
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>>> port
> > >> > > > > > > > >> > > > > > > >>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>> for such tuples that sends tuples
> > to
> > >> all
> > >> > > > > > > partitions
> > >> > > > > > > > of
> > >> > > > > > > > >> > the
> > >> > > > > > > > >> > > > > > > >>>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>> downstream
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>>> operators, which is not exactly
> > developer
> > >> > > > > friendly.
> > >> > > > > > > > >> > > > > > > >>>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>>> We have already seen a number of
> > use
> > >> > > cases
> > >> > > > > that
> > >> > > > > > > can
> > >> > > > > > > > >> use
> > >> > > > > > > > >> > > this
> > >> > > > > > > > >> > > > > > > >>>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>> feature:
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>>> 1) Batch support: We need to tell
> all
> > >> > > operators
> > >> > > > > of
> > >> > > > > > > the
> > >> > > > > > > > >> > > physical
> > >> > > > > > > > >> > > > > DAG
> > >> > > > > > > > >> > > > > > > >>>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>> when
> > >> > > > > > > > >> > > > > > > >>>>
> > >> > > > > > > > >> > > > > > > >>>>> a
> > >> > > > > > > > >> > > > > > > >>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>> batch starts and ends, so the
> > >> operators
> > >> > > can
> > >> > > > do
> > >> > > > > > > > whatever
> > >> > > > > > > > >> > > that
> > >> > > > > > > > >> > > > is
> > >> > > > > > > > >> > > > > > > >>>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>> needed
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>>> upon
> > >> > > > > > > > >> > > > > > > >>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>> the start or the end of a batch.
> > >> > > > > > > > >> > > > > > > >>>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>>> 2) Watermark: To support the
> > concepts
> > >> > of
> > >> > > > > event
> > >> > > > > > > time
> > >> > > > > > > > >> > > > windowing,
> > >> > > > > > > > >> > > > > > the
> > >> > > > > > > > >> > > > > > > >>>>>>>> watermark control tuple is
> needed
> > to
> > >> > tell
> > >> > > > > which
> > >> > > > > > > > >> windows
> > >> > > > > > > > >> > > > should
> > >> > > > > > > > >> > > > > > be
> > >> > > > > > > > >> > > > > > > >>>>>>>> considered late.
> > >> > > > > > > > >> > > > > > > >>>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>>> 3) Changing operator properties:
> > We
> > >> do
> > >> > > have
> > >> > > > > the
> > >> > > > > > > > >> support
> > >> > > > > > > > >> > of
> > >> > > > > > > > >> > > > > > > changing
> > >> > > > > > > > >> > > > > > > >>>>>>>> operator properties on the fly,
> > but
> > >> > with
> > >> > > a
> > >> > > > > > custom
> > >> > > > > > > > >> > control
> > >> > > > > > > > >> > > > > tuple,
> > >> > > > > > > > >> > > > > > > the
> > >> > > > > > > > >> > > > > > > >>>>>>>> command to change operator
> > properties
> > >> > can
> > >> > > > be
> > >> > > > > > > window
> > >> > > > > > > > >> > > aligned
> > >> > > > > > > > >> > > > > for
> > >> > > > > > > > >> > > > > > > all
> > >> > > > > > > > >> > > > > > > >>>>>>>> partitions and also across the
> > DAG.
> > >> > > > > > > > >> > > > > > > >>>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>>> 4) Recording tuples: Like
> changing
> > >> > > operator
> > >> > > > > > > > >> properties,
> > >> > > > > > > > >> > we
> > >> > > > > > > > >> > > > do
> > >> > > > > > > > >> > > > > > have
> > >> > > > > > > > >> > > > > > > >>>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>> this
> > >> > > > > > > > >> > > > > > > >>>>
> > >> > > > > > > > >> > > > > > > >>>>> support now but only at the
> > individual
> > >> > > > physical
> > >> > > > > > > > operator
> > >> > > > > > > > >> > > level,
> > >> > > > > > > > >> > > > > and
> > >> > > > > > > > >> > > > > > > >>>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>>> without
> > >> > > > > > > > >> > > > > > > >>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>> control of which window to record
> > >> tuples
> > >> > > > for.
> > >> > > > > > > With a
> > >> > > > > > > > >> > custom
> > >> > > > > > > > >> > > > > > control
> > >> > > > > > > > >> > > > > > > >>>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>>> tuple,
> > >> > > > > > > > >> > > > > > > >>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>> because a control tuple must
> belong
> > >> to a
> > >> > > > > window,
> > >> > > > > > > all
> > >> > > > > > > > >> > > > operators
> > >> > > > > > > > >> > > > > in
> > >> > > > > > > > >> > > > > > > >>>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>> the
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>>> DAG
> > >> > > > > > > > >> > > > > > > >>>>>>>> can start (and stop) recording
> for
> > >> the
> > >> > > same
> > >> > > > > > > > windows.
> > >> > > > > > > > >> > > > > > > >>>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>>> I can think of two options to
> > achieve
> > >> > > this:
> > >> > > > > > > > >> > > > > > > >>>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>>> 1) new custom control tuple type
> > that
> > >> > > takes
> > >> > > > > > > user's
> > >> > > > > > > > >> > > > > serializable
> > >> > > > > > > > >> > > > > > > >>>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>> object.
> > >> > > > > > > > >> > > > > > > >>>>
> > >> > > > > > > > >> > > > > > > >>>>> 2) piggy back the current
> > BEGIN_WINDOW
> > >> and
> > >> > > > > > > END_WINDOW
> > >> > > > > > > > >> > control
> > >> > > > > > > > >> > > > > > > >>>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>> tuples.
> > >> > > > > > > > >> > > > > > > >>>
> > >> > > > > > > > >> > > > > > > >>>> Please provide your feedback. Thank
> > you.
> > >> > > > > > > > >> > > > > > > >>>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>>> David
> > >> > > > > > > > >> > > > > > > >>>>>>>>
> > >> > > > > > > > >> > > > > > > >>>>>>>>
> > >> > > > > > > > >> > > > > > > >
> > >> > > > > > > > >> > > > > > >
> > >> > > > > > > > >> > > > > >
> > >> > > > > > > > >> > > > >
> > >> > > > > > > > >> > > >
> > >> > > > > > > > >> > >
> > >> > > > > > > > >> >
> > >> > > > > > > > >>
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> >
>

Reply via email to