I am interested in working on the following subtask https://issues.apache.org/jira/browse/APEXCORE-581
Thanks On Wed, Nov 30, 2016 at 2:07 PM David Yan <da...@datatorrent.com> wrote: > I have created an umbrella ticket for control tuple support: > > https://issues.apache.org/jira/browse/APEXCORE-579 > > Currently it has two subtasks. Please have a look at them and see whether > I'm missing anything or if you have anything to add. You are welcome to add > more subtasks or comment on the existing subtasks. > > We would like to kick start the implementation soon. > > Thanks! > > David > > On Mon, Nov 28, 2016 at 5:22 PM, Bhupesh Chawda <bhup...@datatorrent.com> > wrote: > > > +1 for the plan. > > > > I would be interested in contributing to this feature. > > > > ~ Bhupesh > > > > On Nov 29, 2016 03:26, "Sandesh Hegde" <sand...@datatorrent.com> wrote: > > > > > I am interested in contributing to this feature. > > > > > > On Mon, Nov 28, 2016 at 1:54 PM David Yan <da...@datatorrent.com> > wrote: > > > > > > > I think we should probably go ahead with option 1 since this works > with > > > > most use cases and prevents developers from shooting themselves in > the > > > foot > > > > in terms of idempotency. > > > > > > > > We can have a configuration property that enables option 2 later if > we > > > have > > > > concrete use cases that call for it. > > > > > > > > Please share your thoughts if you think you don't agree with this > plan. > > > > Also, please indicate if you're interested in contributing to this > > > feature. > > > > > > > > David > > > > > > > > On Sun, Nov 27, 2016 at 9:02 PM, Bhupesh Chawda < > > bhup...@datatorrent.com > > > > > > > > wrote: > > > > > > > > > It appears that option 1 is more favored due to unavailability of a > > use > > > > > case which could use option 2. > > > > > > > > > > However, option 2 is problematic in specific cases, like presence > of > > > > > multiple input ports for example. In case of a linear DAG where > > control > > > > > tuples are flowing in order with the data tuples, it should not be > > > > > difficult to guarantee idempotency. For example, cases where there > > > could > > > > be > > > > > multiple changes in behavior of an operator during a single window, > > it > > > > > should not wait until end window for these changes to take effect. > > > Since, > > > > > we don't have a concrete use case right now, perhaps we do not want > > to > > > go > > > > > that road. This feature should be available through a platform > > > attribute > > > > > (may be at a later point in time) where the default is option 1. > > > > > > > > > > I think option 1 is suitable for a starting point in the > > implementation > > > > of > > > > > this feature and we should proceed with it. > > > > > > > > > > ~ Bhupesh > > > > > > > > > > > > > > > > > > > > On Fri, Nov 11, 2016 at 12:59 AM, David Yan <da...@datatorrent.com > > > > > > wrote: > > > > > > > > > > > Good question Tushar. The callback should be called only once. > > > > > > The way to implement this is to keep a list of control tuple > hashes > > > for > > > > > the > > > > > > given streaming window and only do the callback when the operator > > has > > > > not > > > > > > seen it before. > > > > > > > > > > > > Other thoughts? > > > > > > > > > > > > David > > > > > > > > > > > > On Thu, Nov 10, 2016 at 9:32 AM, Tushar Gosavi < > > > tus...@datatorrent.com > > > > > > > > > > > wrote: > > > > > > > > > > > > > Hi David, > > > > > > > > > > > > > > What would be the behaviour in case where we have a DAG with > > > > following > > > > > > > operators, the number in bracket is number of partitions, X is > > NxM > > > > > > > partitioning. > > > > > > > A(1) X B(4) X C(2) > > > > > > > > > > > > > > If A sends a control tuple, it will be sent to all 4 partition > of > > > B, > > > > > > > and from each partition from B it goes to C, i.e each partition > > of > > > C > > > > > > > will receive same control tuple originated from A multiple > times > > > > > > > (number of upstream partitions of C). In this case will the > > > callback > > > > > > > function get called multiple times or just once. > > > > > > > > > > > > > > -Tushar. > > > > > > > > > > > > > > > > > > > > > On Fri, Nov 4, 2016 at 12:14 AM, David Yan < > > da...@datatorrent.com> > > > > > > wrote: > > > > > > > > Hi Bhupesh, > > > > > > > > > > > > > > > > Since each input port has its own incoming control tuple, I > > would > > > > > > imagine > > > > > > > > there would be an additional DefaultInputPort.processControl > > > method > > > > > > that > > > > > > > > operator developers can override. > > > > > > > > If we go for option 1, my thinking is that the control tuples > > > would > > > > > > > always > > > > > > > > be delivered at the next window boundary, even if the emit > > method > > > > is > > > > > > > called > > > > > > > > within a window. > > > > > > > > > > > > > > > > David > > > > > > > > > > > > > > > > On Thu, Nov 3, 2016 at 1:46 AM, Bhupesh Chawda < > > > > > > bhup...@datatorrent.com> > > > > > > > > wrote: > > > > > > > > > > > > > > > >> I have a question regarding the callback for a control > tuple. > > > Will > > > > > it > > > > > > be > > > > > > > >> similar to InputPort::process() method? Something like > > > > > > > >> InputPort::processControlTuple(t) > > > > > > > >> ? Or will it be a method of the operator similar to > > > beginWindow()? > > > > > > > >> > > > > > > > >> When we say that the control tuple will be delivered at > window > > > > > > boundary, > > > > > > > >> does that mean all control tuples emitted in that window > will > > be > > > > > > > processed > > > > > > > >> together at the end of the window? This would imply that > there > > > is > > > > no > > > > > > > >> ordering among regular tuples and control tuples. > > > > > > > >> > > > > > > > >> I think we should get started with the option 1 - control > > tuples > > > > at > > > > > > > window > > > > > > > >> boundary, which seems to handle most of the use cases. For > > some > > > > > cases > > > > > > > which > > > > > > > >> require option 2, we can always build on this. > > > > > > > >> > > > > > > > >> ~ Bhupesh > > > > > > > >> > > > > > > > >> On Thu, Nov 3, 2016 at 1:35 PM, Thomas Weise < > t...@apache.org> > > > > > wrote: > > > > > > > >> > > > > > > > >> > I don't see how that would work. Suppose you have a file > > > > splitter > > > > > > and > > > > > > > >> > multiple partitions of block readers. The "end of file" > > event > > > > > cannot > > > > > > > be > > > > > > > >> > processed downstream until all block readers are done. I > > also > > > > > think > > > > > > > that > > > > > > > >> > this is related to the batch demarcation discussion and > > there > > > > > should > > > > > > > be a > > > > > > > >> > single generalized mechanism to support this. > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > On Wed, Nov 2, 2016 at 10:51 PM, Pramod Immaneni < > > > > > > > pra...@datatorrent.com > > > > > > > >> > > > > > > > > >> > wrote: > > > > > > > >> > > > > > > > > >> > > Suppose I am processing data in a file and I want to do > > > > > something > > > > > > at > > > > > > > >> the > > > > > > > >> > > end of a file at the output operator, I would send an > end > > > file > > > > > > > control > > > > > > > >> > > tuple and act on it when I receive it at the output. In > a > > > > single > > > > > > > >> window I > > > > > > > >> > > may end up processing multiple files and if I don't have > > > > > multiple > > > > > > > ports > > > > > > > >> > and > > > > > > > >> > > logical paths through the DAG (multiple partitions are > > ok). > > > I > > > > > can > > > > > > > >> process > > > > > > > >> > > end of each file immediately and also know what file was > > > > closed > > > > > > > without > > > > > > > >> > > sending extra identification information in the end file > > > > which I > > > > > > > would > > > > > > > >> > need > > > > > > > >> > > if I am collecting all of them and processing at the end > > of > > > > the > > > > > > > window. > > > > > > > >> > > > > > > > > > >> > > On Wed, Nov 2, 2016 at 2:45 PM, Thomas Weise < > > > t...@apache.org> > > > > > > > wrote: > > > > > > > >> > > > > > > > > > >> > > > The use cases listed in the original discussion don't > > call > > > > for > > > > > > > option > > > > > > > >> > 2. > > > > > > > >> > > It > > > > > > > >> > > > seems to come with additional complexity and > > > implementation > > > > > > cost. > > > > > > > >> > > > > > > > > > > >> > > > Can those in favor of option 2 please also provide the > > use > > > > > case > > > > > > > for > > > > > > > >> it. > > > > > > > >> > > > > > > > > > > >> > > > Thanks, > > > > > > > >> > > > Thomas > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > On Wed, Nov 2, 2016 at 10:36 PM, Siyuan Hua < > > > > > > > siy...@datatorrent.com> > > > > > > > >> > > > wrote: > > > > > > > >> > > > > > > > > > > >> > > > > I will vote for approach 1. > > > > > > > >> > > > > > > > > > > > >> > > > > First of all that one sounds easier to do to me. > And I > > > > think > > > > > > > >> > > idempotency > > > > > > > >> > > > is > > > > > > > >> > > > > important. It may run at the cost of higher latency > > but > > > I > > > > > > think > > > > > > > it > > > > > > > >> is > > > > > > > >> > > ok > > > > > > > >> > > > > > > > > > > > >> > > > > And in addition, when in the future if users do need > > > > > realtime > > > > > > > >> control > > > > > > > >> > > > tuple > > > > > > > >> > > > > processing, we can always add the option on top of > it. > > > > > > > >> > > > > > > > > > > > >> > > > > So I vote for 1 > > > > > > > >> > > > > > > > > > > > >> > > > > Thanks, > > > > > > > >> > > > > Siyuan > > > > > > > >> > > > > > > > > > > > >> > > > > On Wed, Nov 2, 2016 at 1:28 PM, Pradeep A. Dalvi < > > > > > > > p...@apache.org> > > > > > > > >> > > > wrote: > > > > > > > >> > > > > > > > > > > > >> > > > > > As a rule of thumb in any real time operating > > system, > > > > > > control > > > > > > > >> > tuples > > > > > > > >> > > > > should > > > > > > > >> > > > > > always be handled using Priority Queues. > > > > > > > >> > > > > > > > > > > > > >> > > > > > We may try to control priorities by defining > levels. > > > And > > > > > > shall > > > > > > > >> not > > > > > > > >> > > > > > be delivered at window boundaries. > > > > > > > >> > > > > > > > > > > > > >> > > > > > In short, control tuples shall never be treated as > > any > > > > > other > > > > > > > >> tuples > > > > > > > >> > > in > > > > > > > >> > > > > real > > > > > > > >> > > > > > time systems. > > > > > > > >> > > > > > > > > > > > > >> > > > > > On Thursday, November 3, 2016, David Yan < > > > > > > > da...@datatorrent.com> > > > > > > > >> > > > wrote: > > > > > > > >> > > > > > > > > > > > > >> > > > > > > Hi all, > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > I would like to renew the discussion of control > > > > tuples. > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > Last time, we were in a debate about whether: > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > 1) the platform should enforce that control > tuples > > > are > > > > > > > >> delivered > > > > > > > >> > at > > > > > > > >> > > > > > window > > > > > > > >> > > > > > > boundaries only > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > or: > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > 2) the platform should deliver control tuples > just > > > as > > > > > > other > > > > > > > >> > tuples > > > > > > > >> > > > and > > > > > > > >> > > > > > it's > > > > > > > >> > > > > > > the operator developers' choice whether to > handle > > > the > > > > > > > control > > > > > > > >> > > tuples > > > > > > > >> > > > as > > > > > > > >> > > > > > > they arrive or delay the processing till the > next > > > > window > > > > > > > >> > boundary. > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > To summarize the pros and cons: > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > Approach 1: If processing control tuples results > > in > > > > > > changes > > > > > > > of > > > > > > > >> > the > > > > > > > >> > > > > > behavior > > > > > > > >> > > > > > > of the operator, if idempotency needs to be > > > preserved, > > > > > the > > > > > > > >> > > processing > > > > > > > >> > > > > > must > > > > > > > >> > > > > > > be done at window boundaries. This approach will > > > save > > > > > the > > > > > > > >> > operator > > > > > > > >> > > > > > > developers headache to ensure that. However, > this > > > will > > > > > > take > > > > > > > >> away > > > > > > > >> > > the > > > > > > > >> > > > > > > choices from the operator developer if they just > > > need > > > > to > > > > > > > >> process > > > > > > > >> > > the > > > > > > > >> > > > > > > control tuples as soon as possible. > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > Approach 2: The operator has a chance to > > immediately > > > > > > process > > > > > > > >> > > control > > > > > > > >> > > > > > > tuples. This would be useful if latency is more > > > valued > > > > > > than > > > > > > > >> > > > > correctness. > > > > > > > >> > > > > > > However, if this would open the possibility for > > > > operator > > > > > > > >> > developers > > > > > > > >> > > > to > > > > > > > >> > > > > > > shoot themselves in the foot. This is especially > > > true > > > > if > > > > > > > there > > > > > > > >> > are > > > > > > > >> > > > > > multiple > > > > > > > >> > > > > > > input ports. as there is no easy way to > guarantee > > > > > > processing > > > > > > > >> > order > > > > > > > >> > > > for > > > > > > > >> > > > > > > multiple input ports. > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > We would like to arrive to a consensus and close > > > this > > > > > > > >> discussion > > > > > > > >> > > soon > > > > > > > >> > > > > > this > > > > > > > >> > > > > > > time so we can start the work on this important > > > > feature. > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > Thanks! > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > David > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > On Tue, Jun 28, 2016 at 10:04 AM, Vlad Rozov < > > > > > > > >> > > > v.ro...@datatorrent.com > > > > > > > >> > > > > > > <javascript:;>> > > > > > > > >> > > > > > > wrote: > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > It is not clear how operator will emit custom > > > > control > > > > > > > tuple > > > > > > > >> at > > > > > > > >> > > > window > > > > > > > >> > > > > > > > boundaries. One way is to cache/accumulate > > control > > > > > > tuples > > > > > > > in > > > > > > > >> > the > > > > > > > >> > > > > > operator > > > > > > > >> > > > > > > > output port till window closes (END_WINDOW is > > > > inserted > > > > > > > into > > > > > > > >> the > > > > > > > >> > > > > output > > > > > > > >> > > > > > > > sink) or only allow an operator to emit > control > > > > tuples > > > > > > > inside > > > > > > > >> > the > > > > > > > >> > > > > > > > endWindow(). The later is a slight variation > of > > > the > > > > > > > operator > > > > > > > >> > > output > > > > > > > >> > > > > > port > > > > > > > >> > > > > > > > caching behavior with the only difference that > > now > > > > the > > > > > > > >> operator > > > > > > > >> > > > > itself > > > > > > > >> > > > > > is > > > > > > > >> > > > > > > > responsible for caching/accumulating control > > > tuples. > > > > > > Note > > > > > > > >> that > > > > > > > >> > in > > > > > > > >> > > > > many > > > > > > > >> > > > > > > > cases it will be necessary to postpone > emitting > > > > > payload > > > > > > > >> tuples > > > > > > > >> > > that > > > > > > > >> > > > > > > > logically come after the custom control tuple > > till > > > > the > > > > > > > next > > > > > > > >> > > window > > > > > > > >> > > > > > > begins. > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > IMO, that too restrictive and in a case where > > > input > > > > > > > operator > > > > > > > >> > > uses a > > > > > > > >> > > > > > push > > > > > > > >> > > > > > > > instead of a poll (for example, it provides an > > end > > > > > point > > > > > > > >> where > > > > > > > >> > > > remote > > > > > > > >> > > > > > > > agents may connect and publish/push data), > > control > > > > > > tuples > > > > > > > may > > > > > > > >> > be > > > > > > > >> > > > used > > > > > > > >> > > > > > for > > > > > > > >> > > > > > > > connect/disconnect/watermark broadcast to > > > > > (partitioned) > > > > > > > >> > > downstream > > > > > > > >> > > > > > > > operators. In this case the platform just need > > to > > > > > > > guarantee > > > > > > > >> > order > > > > > > > >> > > > > > barrier > > > > > > > >> > > > > > > > (any tuple emitted prior to a control tuple > > needs > > > to > > > > > be > > > > > > > >> > delivered > > > > > > > >> > > > > prior > > > > > > > >> > > > > > > to > > > > > > > >> > > > > > > > the control tuple). > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > Thank you, > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > Vlad > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > On 6/27/16 19:36, Amol Kekre wrote: > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > >> I agree with David. Allowing control tuples > > > within > > > > a > > > > > > > window > > > > > > > >> > > (along > > > > > > > >> > > > > > with > > > > > > > >> > > > > > > >> data tuples) creates very dangerous situation > > > where > > > > > > > >> guarantees > > > > > > > >> > > are > > > > > > > >> > > > > > > >> impacted. It is much safer to enable control > > > tuples > > > > > > > >> > > (send/receive) > > > > > > > >> > > > > at > > > > > > > >> > > > > > > >> window boundaries (after END_WINDOW of window > > N, > > > > and > > > > > > > before > > > > > > > >> > > > > > BEGIN_WINDOW > > > > > > > >> > > > > > > >> for window N+1). My take on David's list is > > > > > > > >> > > > > > > >> > > > > > > > >> > > > > > > >> 1. -> window boundaries -> Strong +1; there > > will > > > > be a > > > > > > big > > > > > > > >> > issue > > > > > > > >> > > > with > > > > > > > >> > > > > > > >> guarantees for operators with multiple ports. > > > (see > > > > > > > Thomas's > > > > > > > >> > > > > response) > > > > > > > >> > > > > > > >> 2. -> All downstream windows -> +1, but there > > are > > > > > > > >> situations; > > > > > > > >> > a > > > > > > > >> > > > > caveat > > > > > > > >> > > > > > > >> could be "only to operators that implement > > > control > > > > > > tuple > > > > > > > >> > > > > > > >> interface/listeners", which could effectively > > > > > > translates > > > > > > > to > > > > > > > >> > "all > > > > > > > >> > > > > > > >> interested > > > > > > > >> > > > > > > >> downstream operators" > > > > > > > >> > > > > > > >> 3. Only Input operator can create control > > tuples > > > -> > > > > > -1; > > > > > > > is > > > > > > > >> > > > > restrictive > > > > > > > >> > > > > > > >> even > > > > > > > >> > > > > > > >> though most likely 95% of the time it will be > > > input > > > > > > > >> operators > > > > > > > >> > > > > > > >> > > > > > > > >> > > > > > > >> Thks, > > > > > > > >> > > > > > > >> Amol > > > > > > > >> > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > >> > > > > > > >> On Mon, Jun 27, 2016 at 4:37 PM, Thomas > Weise < > > > > > > > >> > > > > tho...@datatorrent.com > > > > > > > >> > > > > > > <javascript:;>> > > > > > > > >> > > > > > > >> wrote: > > > > > > > >> > > > > > > >> > > > > > > > >> > > > > > > >> The windowing we discuss here is in general > > event > > > > > time > > > > > > > >> based, > > > > > > > >> > > > > arrival > > > > > > > >> > > > > > > time > > > > > > > >> > > > > > > >>> is a special case of it. > > > > > > > >> > > > > > > >>> > > > > > > > >> > > > > > > >>> I don't think state changes can be made > > > > independent > > > > > of > > > > > > > the > > > > > > > >> > > > > streaming > > > > > > > >> > > > > > > >>> window > > > > > > > >> > > > > > > >>> boundary as it would prevent idempotent > > > processing > > > > > and > > > > > > > >> > > > transitively > > > > > > > >> > > > > > > >>> exactly > > > > > > > >> > > > > > > >>> once. For that to work, tuples need to be > > > > presented > > > > > to > > > > > > > the > > > > > > > >> > > > operator > > > > > > > >> > > > > > in > > > > > > > >> > > > > > > a > > > > > > > >> > > > > > > >>> guaranteed order *within* the streaming > > window, > > > > > which > > > > > > is > > > > > > > >> not > > > > > > > >> > > > > possible > > > > > > > >> > > > > > > >>> with > > > > > > > >> > > > > > > >>> multiple ports (and partitions). > > > > > > > >> > > > > > > >>> > > > > > > > >> > > > > > > >>> Thomas > > > > > > > >> > > > > > > >>> > > > > > > > >> > > > > > > >>> On Mon, Jun 27, 2016 at 2:53 PM, David Yan < > > > > > > > >> > > > da...@datatorrent.com > > > > > > > >> > > > > > > <javascript:;>> > > > > > > > >> > > > > > > >>> wrote: > > > > > > > >> > > > > > > >>> > > > > > > > >> > > > > > > >>> I think for session tracking, if the session > > > > > > boundaries > > > > > > > are > > > > > > > >> > > > allowed > > > > > > > >> > > > > > to > > > > > > > >> > > > > > > be > > > > > > > >> > > > > > > >>>> not aligned with the streaming window > > > boundaries, > > > > > the > > > > > > > user > > > > > > > >> > > will > > > > > > > >> > > > > > have a > > > > > > > >> > > > > > > >>>> > > > > > > > >> > > > > > > >>> much > > > > > > > >> > > > > > > >>> > > > > > > > >> > > > > > > >>>> bigger problem with idempotency. And in > most > > > > cases, > > > > > > > >> session > > > > > > > >> > > > > tracking > > > > > > > >> > > > > > > is > > > > > > > >> > > > > > > >>>> event time based, not ingression time or > > > > processing > > > > > > > time > > > > > > > >> > > based, > > > > > > > >> > > > so > > > > > > > >> > > > > > > this > > > > > > > >> > > > > > > >>>> > > > > > > > >> > > > > > > >>> may > > > > > > > >> > > > > > > >>> > > > > > > > >> > > > > > > >>>> never be a problem. But if that ever > happens, > > > the > > > > > > user > > > > > > > can > > > > > > > >> > > > always > > > > > > > >> > > > > > > alter > > > > > > > >> > > > > > > >>>> > > > > > > > >> > > > > > > >>> the > > > > > > > >> > > > > > > >>> > > > > > > > >> > > > > > > >>>> default 500ms width. > > > > > > > >> > > > > > > >>>> > > > > > > > >> > > > > > > >>>> David > > > > > > > >> > > > > > > >>>> > > > > > > > >> > > > > > > >>>> On Mon, Jun 27, 2016 at 2:35 PM, Vlad > Rozov < > > > > > > > >> > > > > > v.ro...@datatorrent.com > > > > > > > >> > > > > > > <javascript:;>> > > > > > > > >> > > > > > > >>>> wrote: > > > > > > > >> > > > > > > >>>> > > > > > > > >> > > > > > > >>>> Ability to send custom control tuples > within > > > > window > > > > > > > may be > > > > > > > >> > > > useful, > > > > > > > >> > > > > > for > > > > > > > >> > > > > > > >>>>> example, for sessions tracking, where > > session > > > > > > > boundaries > > > > > > > >> > are > > > > > > > >> > > > not > > > > > > > >> > > > > > > >>>>> > > > > > > > >> > > > > > > >>>> aligned > > > > > > > >> > > > > > > >>> > > > > > > > >> > > > > > > >>>> with window boundaries and 500 ms latency > is > > > not > > > > > > > >> acceptable > > > > > > > >> > > for > > > > > > > >> > > > an > > > > > > > >> > > > > > > >>>>> application. > > > > > > > >> > > > > > > >>>>> > > > > > > > >> > > > > > > >>>>> Thank you, > > > > > > > >> > > > > > > >>>>> > > > > > > > >> > > > > > > >>>>> Vlad > > > > > > > >> > > > > > > >>>>> > > > > > > > >> > > > > > > >>>>> > > > > > > > >> > > > > > > >>>>> On 6/25/16 10:52, Thomas Weise wrote: > > > > > > > >> > > > > > > >>>>> > > > > > > > >> > > > > > > >>>>> It should not matter from where the > control > > > > tuple > > > > > is > > > > > > > >> > > triggered. > > > > > > > >> > > > > It > > > > > > > >> > > > > > > >>>>>> > > > > > > > >> > > > > > > >>>>> will > > > > > > > >> > > > > > > >>> > > > > > > > >> > > > > > > >>>> be > > > > > > > >> > > > > > > >>>> > > > > > > > >> > > > > > > >>>>> good to have a generic mechanism to > > propagate > > > it > > > > > and > > > > > > > >> other > > > > > > > >> > > > things > > > > > > > >> > > > > > can > > > > > > > >> > > > > > > >>>>>> > > > > > > > >> > > > > > > >>>>> be > > > > > > > >> > > > > > > >>> > > > > > > > >> > > > > > > >>>> accomplished outside the engine. For > example, > > > the > > > > > new > > > > > > > >> > > > > comprehensive > > > > > > > >> > > > > > > >>>>>> support > > > > > > > >> > > > > > > >>>>>> for windowing will all be in Malhar, > > nothing > > > > that > > > > > > the > > > > > > > >> > engine > > > > > > > >> > > > > needs > > > > > > > >> > > > > > > to > > > > > > > >> > > > > > > >>>>>> > > > > > > > >> > > > > > > >>>>> know > > > > > > > >> > > > > > > >>>> > > > > > > > >> > > > > > > >>>>> about it except that we need the control > > tuple > > > > for > > > > > > > >> > watermark > > > > > > > >> > > > > > > >>>>>> > > > > > > > >> > > > > > > >>>>> propagation > > > > > > > >> > > > > > > >>> > > > > > > > >> > > > > > > >>>> and idempotent processing. > > > > > > > >> > > > > > > >>>>>> > > > > > > > >> > > > > > > >>>>>> I also think the main difference to other > > > > tuples > > > > > is > > > > > > > the > > > > > > > >> > need > > > > > > > >> > > > to > > > > > > > >> > > > > > send > > > > > > > >> > > > > > > >>>>>> > > > > > > > >> > > > > > > >>>>> it > > > > > > > >> > > > > > > >>> > > > > > > > >> > > > > > > >>>> to > > > > > > > >> > > > > > > >>>> > > > > > > > >> > > > > > > >>>>> all partitions. Which is similar to > > checkpoint > > > > > > window > > > > > > > >> > tuples, > > > > > > > >> > > > but > > > > > > > >> > > > > > not > > > > > > > >> > > > > > > >>>>>> > > > > > > > >> > > > > > > >>>>> the > > > > > > > >> > > > > > > >>>> > > > > > > > >> > > > > > > >>>>> same. Here, we probably also need the > > ability > > > > for > > > > > > the > > > > > > > >> user > > > > > > > >> > to > > > > > > > >> > > > > > control > > > > > > > >> > > > > > > >>>>>> whether such tuple should traverse the > > entire > > > > DAG > > > > > > or > > > > > > > >> not. > > > > > > > >> > > For > > > > > > > >> > > > a > > > > > > > >> > > > > > > batch > > > > > > > >> > > > > > > >>>>>> > > > > > > > >> > > > > > > >>>>> use > > > > > > > >> > > > > > > >>>> > > > > > > > >> > > > > > > >>>>> case, for example, we may want to send the > > end > > > > of > > > > > > > file to > > > > > > > >> > the > > > > > > > >> > > > > next > > > > > > > >> > > > > > > >>>>>> operator, but not beyond, if the operator > > has > > > > > > > >> asynchronous > > > > > > > >> > > > > > > processing > > > > > > > >> > > > > > > >>>>>> logic > > > > > > > >> > > > > > > >>>>>> in it. > > > > > > > >> > > > > > > >>>>>> > > > > > > > >> > > > > > > >>>>>> For any logic to be idempotent, the > control > > > > tuple > > > > > > > needs > > > > > > > >> to > > > > > > > >> > > be > > > > > > > >> > > > > > > >>>>>> > > > > > > > >> > > > > > > >>>>> processed > > > > > > > >> > > > > > > >>> > > > > > > > >> > > > > > > >>>> at > > > > > > > >> > > > > > > >>>> > > > > > > > >> > > > > > > >>>>> a window boundary. Receiving the control > > tuple > > > > in > > > > > > the > > > > > > > >> > window > > > > > > > >> > > > > > callback > > > > > > > >> > > > > > > >>>>>> would > > > > > > > >> > > > > > > >>>>>> avoid having to track extra state in the > > > > > operator. > > > > > > I > > > > > > > >> don't > > > > > > > >> > > > think > > > > > > > >> > > > > > > >>>>>> > > > > > > > >> > > > > > > >>>>> that's > > > > > > > >> > > > > > > >>> > > > > > > > >> > > > > > > >>>> a > > > > > > > >> > > > > > > >>>> > > > > > > > >> > > > > > > >>>>> major issue, but what is the use case for > > > > > > processing a > > > > > > > >> > > control > > > > > > > >> > > > > > tuple > > > > > > > >> > > > > > > >>>>>> within > > > > > > > >> > > > > > > >>>>>> the window? > > > > > > > >> > > > > > > >>>>>> > > > > > > > >> > > > > > > >>>>>> Thomas > > > > > > > >> > > > > > > >>>>>> > > > > > > > >> > > > > > > >>>>>> > > > > > > > >> > > > > > > >>>>>> > > > > > > > >> > > > > > > >>>>>> On Sat, Jun 25, 2016 at 6:19 AM, Pramod > > > > Immaneni > > > > > < > > > > > > > >> > > > > > > >>>>>> > > > > > > > >> > > > > > > >>>>> pra...@datatorrent.com <javascript:;>> > > > > > > > >> > > > > > > >>>> > > > > > > > >> > > > > > > >>>>> wrote: > > > > > > > >> > > > > > > >>>>>> > > > > > > > >> > > > > > > >>>>>> For the use cases you mentioned, I think > 1) > > > and > > > > > 2) > > > > > > > are > > > > > > > >> > more > > > > > > > >> > > > > likely > > > > > > > >> > > > > > > to > > > > > > > >> > > > > > > >>>>>> > > > > > > > >> > > > > > > >>>>>>> be controlled directly by the > application, > > > 3) > > > > > and > > > > > > 4) > > > > > > > >> are > > > > > > > >> > > more > > > > > > > >> > > > > > > likely > > > > > > > >> > > > > > > >>>>>>> going to be triggered externally and > > > directly > > > > > > > handled > > > > > > > >> by > > > > > > > >> > > the > > > > > > > >> > > > > > engine > > > > > > > >> > > > > > > >>>>>>> and 3) is already being implemented that > > way > > > > > > > >> > > (apexcore-163). > > > > > > > >> > > > > > > >>>>>>> > > > > > > > >> > > > > > > >>>>>>> The control tuples emitted by an > operator > > > > would > > > > > be > > > > > > > sent > > > > > > > >> > to > > > > > > > >> > > > all > > > > > > > >> > > > > > > >>>>>>> downstream partitions isn't it and that > > > would > > > > be > > > > > > the > > > > > > > >> > chief > > > > > > > >> > > > > > > >>>>>>> > > > > > > > >> > > > > > > >>>>>> distinction > > > > > > > >> > > > > > > >>> > > > > > > > >> > > > > > > >>>> compared to data (apart from the payload) > > which > > > > > would > > > > > > > get > > > > > > > >> > > > > > partitioned > > > > > > > >> > > > > > > >>>>>>> under normal circumstances. It would > also > > be > > > > > > > guaranteed > > > > > > > >> > > that > > > > > > > >> > > > > > > >>>>>>> downstream partitions will receive > control > > > > > tuples > > > > > > > only > > > > > > > >> > > after > > > > > > > >> > > > > the > > > > > > > >> > > > > > > data > > > > > > > >> > > > > > > >>>>>>> that was sent before it so we could send > > it > > > > > > > immediately > > > > > > > >> > > when > > > > > > > >> > > > it > > > > > > > >> > > > > > is > > > > > > > >> > > > > > > >>>>>>> emitted as opposed to window boundaries. > > > > > > > >> > > > > > > >>>>>>> > > > > > > > >> > > > > > > >>>>>>> However during unification it is > important > > > to > > > > > know > > > > > > > if > > > > > > > >> > these > > > > > > > >> > > > > > control > > > > > > > >> > > > > > > >>>>>>> tuples have been received from all > > upstream > > > > > > > partitions > > > > > > > >> > > before > > > > > > > >> > > > > > > >>>>>>> proceeding with a control operation. One > > > could > > > > > > wait > > > > > > > >> till > > > > > > > >> > > end > > > > > > > >> > > > of > > > > > > > >> > > > > > the > > > > > > > >> > > > > > > >>>>>>> window but that introduces a delay > however > > > > > small, > > > > > > I > > > > > > > >> would > > > > > > > >> > > > like > > > > > > > >> > > > > to > > > > > > > >> > > > > > > add > > > > > > > >> > > > > > > >>>>>>> to the proposal that the platform only > > hand > > > > over > > > > > > the > > > > > > > >> > > control > > > > > > > >> > > > > > tuple > > > > > > > >> > > > > > > to > > > > > > > >> > > > > > > >>>>>>> the unifier when it has been received > from > > > all > > > > > > > upstream > > > > > > > >> > > > > > partitions > > > > > > > >> > > > > > > >>>>>>> much like how end window is processed > but > > > not > > > > > wait > > > > > > > till > > > > > > > >> > the > > > > > > > >> > > > > > actual > > > > > > > >> > > > > > > >>>>>>> > > > > > > > >> > > > > > > >>>>>> end > > > > > > > >> > > > > > > >>> > > > > > > > >> > > > > > > >>>> of the window. > > > > > > > >> > > > > > > >>>>>>> > > > > > > > >> > > > > > > >>>>>>> Regd your concern about idempotency, we > > > > > typically > > > > > > > care > > > > > > > >> > > about > > > > > > > >> > > > > > > >>>>>>> idempotency at a window level and doing > > the > > > > > above > > > > > > > will > > > > > > > >> > > still > > > > > > > >> > > > > > allow > > > > > > > >> > > > > > > >>>>>>> > > > > > > > >> > > > > > > >>>>>> the > > > > > > > >> > > > > > > >>> > > > > > > > >> > > > > > > >>>> operators to preserve that easily. > > > > > > > >> > > > > > > >>>>>>> > > > > > > > >> > > > > > > >>>>>>> Thanks > > > > > > > >> > > > > > > >>>>>>> > > > > > > > >> > > > > > > >>>>>>> On Jun 24, 2016, at 11:22 AM, David Yan > < > > > > > > > >> > > > da...@datatorrent.com > > > > > > > >> > > > > > > <javascript:;>> > > > > > > > >> > > > > > > >>>>>>> > > > > > > > >> > > > > > > >>>>>> wrote: > > > > > > > >> > > > > > > >>> > > > > > > > >> > > > > > > >>>> Hi all, > > > > > > > >> > > > > > > >>>>>>>> > > > > > > > >> > > > > > > >>>>>>>> I would like to propose a new feature > to > > > the > > > > > Apex > > > > > > > core > > > > > > > >> > > > engine > > > > > > > >> > > > > -- > > > > > > > >> > > > > > > the > > > > > > > >> > > > > > > >>>>>>>> support of custom control tuples. > > > Currently, > > > > we > > > > > > > have > > > > > > > >> > > control > > > > > > > >> > > > > > > tuples > > > > > > > >> > > > > > > >>>>>>>> > > > > > > > >> > > > > > > >>>>>>> such > > > > > > > >> > > > > > > >>>> > > > > > > > >> > > > > > > >>>>> as > > > > > > > >> > > > > > > >>>>>>> > > > > > > > >> > > > > > > >>>>>>> BEGIN_WINDOW, END_WINDOW, CHECKPOINT, > and > > so > > > > on, > > > > > > > but we > > > > > > > >> > > don't > > > > > > > >> > > > > > have > > > > > > > >> > > > > > > >>>>>>>> > > > > > > > >> > > > > > > >>>>>>> the > > > > > > > >> > > > > > > >>> > > > > > > > >> > > > > > > >>>> support for applications to insert their > own > > > > > control > > > > > > > >> tuples. > > > > > > > >> > > The > > > > > > > >> > > > > way > > > > > > > >> > > > > > > >>>>>>>> currently to get around this is to use > > data > > > > > > tuples > > > > > > > and > > > > > > > >> > > have > > > > > > > >> > > > a > > > > > > > >> > > > > > > >>>>>>>> > > > > > > > >> > > > > > > >>>>>>> separate > > > > > > > >> > > > > > > >>> > > > > > > > >> > > > > > > >>>> port > > > > > > > >> > > > > > > >>>>>>> > > > > > > > >> > > > > > > >>>>>>> for such tuples that sends tuples to all > > > > > > partitions > > > > > > > of > > > > > > > >> > the > > > > > > > >> > > > > > > >>>>>>>> > > > > > > > >> > > > > > > >>>>>>> downstream > > > > > > > >> > > > > > > >>> > > > > > > > >> > > > > > > >>>> operators, which is not exactly developer > > > > friendly. > > > > > > > >> > > > > > > >>>>>>>> > > > > > > > >> > > > > > > >>>>>>>> We have already seen a number of use > > cases > > > > that > > > > > > can > > > > > > > >> use > > > > > > > >> > > this > > > > > > > >> > > > > > > >>>>>>>> > > > > > > > >> > > > > > > >>>>>>> feature: > > > > > > > >> > > > > > > >>> > > > > > > > >> > > > > > > >>>> 1) Batch support: We need to tell all > > operators > > > > of > > > > > > the > > > > > > > >> > > physical > > > > > > > >> > > > > DAG > > > > > > > >> > > > > > > >>>>>>>> > > > > > > > >> > > > > > > >>>>>>> when > > > > > > > >> > > > > > > >>>> > > > > > > > >> > > > > > > >>>>> a > > > > > > > >> > > > > > > >>>>>>> > > > > > > > >> > > > > > > >>>>>>> batch starts and ends, so the operators > > can > > > do > > > > > > > whatever > > > > > > > >> > > that > > > > > > > >> > > > is > > > > > > > >> > > > > > > >>>>>>>> > > > > > > > >> > > > > > > >>>>>>> needed > > > > > > > >> > > > > > > >>> > > > > > > > >> > > > > > > >>>> upon > > > > > > > >> > > > > > > >>>>>>> > > > > > > > >> > > > > > > >>>>>>> the start or the end of a batch. > > > > > > > >> > > > > > > >>>>>>>> > > > > > > > >> > > > > > > >>>>>>>> 2) Watermark: To support the concepts > of > > > > event > > > > > > time > > > > > > > >> > > > windowing, > > > > > > > >> > > > > > the > > > > > > > >> > > > > > > >>>>>>>> watermark control tuple is needed to > tell > > > > which > > > > > > > >> windows > > > > > > > >> > > > should > > > > > > > >> > > > > > be > > > > > > > >> > > > > > > >>>>>>>> considered late. > > > > > > > >> > > > > > > >>>>>>>> > > > > > > > >> > > > > > > >>>>>>>> 3) Changing operator properties: We do > > have > > > > the > > > > > > > >> support > > > > > > > >> > of > > > > > > > >> > > > > > > changing > > > > > > > >> > > > > > > >>>>>>>> operator properties on the fly, but > with > > a > > > > > custom > > > > > > > >> > control > > > > > > > >> > > > > tuple, > > > > > > > >> > > > > > > the > > > > > > > >> > > > > > > >>>>>>>> command to change operator properties > can > > > be > > > > > > window > > > > > > > >> > > aligned > > > > > > > >> > > > > for > > > > > > > >> > > > > > > all > > > > > > > >> > > > > > > >>>>>>>> partitions and also across the DAG. > > > > > > > >> > > > > > > >>>>>>>> > > > > > > > >> > > > > > > >>>>>>>> 4) Recording tuples: Like changing > > operator > > > > > > > >> properties, > > > > > > > >> > we > > > > > > > >> > > > do > > > > > > > >> > > > > > have > > > > > > > >> > > > > > > >>>>>>>> > > > > > > > >> > > > > > > >>>>>>> this > > > > > > > >> > > > > > > >>>> > > > > > > > >> > > > > > > >>>>> support now but only at the individual > > > physical > > > > > > > operator > > > > > > > >> > > level, > > > > > > > >> > > > > and > > > > > > > >> > > > > > > >>>>>>>> > > > > > > > >> > > > > > > >>>>>>>> without > > > > > > > >> > > > > > > >>>>>>> > > > > > > > >> > > > > > > >>>>>>> control of which window to record tuples > > > for. > > > > > > With a > > > > > > > >> > custom > > > > > > > >> > > > > > control > > > > > > > >> > > > > > > >>>>>>>> > > > > > > > >> > > > > > > >>>>>>>> tuple, > > > > > > > >> > > > > > > >>>>>>> > > > > > > > >> > > > > > > >>>>>>> because a control tuple must belong to a > > > > window, > > > > > > all > > > > > > > >> > > > operators > > > > > > > >> > > > > in > > > > > > > >> > > > > > > >>>>>>>> > > > > > > > >> > > > > > > >>>>>>> the > > > > > > > >> > > > > > > >>> > > > > > > > >> > > > > > > >>>> DAG > > > > > > > >> > > > > > > >>>>>>>> can start (and stop) recording for the > > same > > > > > > > windows. > > > > > > > >> > > > > > > >>>>>>>> > > > > > > > >> > > > > > > >>>>>>>> I can think of two options to achieve > > this: > > > > > > > >> > > > > > > >>>>>>>> > > > > > > > >> > > > > > > >>>>>>>> 1) new custom control tuple type that > > takes > > > > > > user's > > > > > > > >> > > > > serializable > > > > > > > >> > > > > > > >>>>>>>> > > > > > > > >> > > > > > > >>>>>>> object. > > > > > > > >> > > > > > > >>>> > > > > > > > >> > > > > > > >>>>> 2) piggy back the current BEGIN_WINDOW and > > > > > > END_WINDOW > > > > > > > >> > control > > > > > > > >> > > > > > > >>>>>>>> > > > > > > > >> > > > > > > >>>>>>> tuples. > > > > > > > >> > > > > > > >>> > > > > > > > >> > > > > > > >>>> Please provide your feedback. Thank you. > > > > > > > >> > > > > > > >>>>>>>> > > > > > > > >> > > > > > > >>>>>>>> David > > > > > > > >> > > > > > > >>>>>>>> > > > > > > > >> > > > > > > >>>>>>>> > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > >