Hi David, What would be the behaviour in case where we have a DAG with following operators, the number in bracket is number of partitions, X is NxM partitioning. A(1) X B(4) X C(2)
If A sends a control tuple, it will be sent to all 4 partition of B, and from each partition from B it goes to C, i.e each partition of C will receive same control tuple originated from A multiple times (number of upstream partitions of C). In this case will the callback function get called multiple times or just once. -Tushar. On Fri, Nov 4, 2016 at 12:14 AM, David Yan <da...@datatorrent.com> wrote: > Hi Bhupesh, > > Since each input port has its own incoming control tuple, I would imagine > there would be an additional DefaultInputPort.processControl method that > operator developers can override. > If we go for option 1, my thinking is that the control tuples would always > be delivered at the next window boundary, even if the emit method is called > within a window. > > David > > On Thu, Nov 3, 2016 at 1:46 AM, Bhupesh Chawda <bhup...@datatorrent.com> > wrote: > >> I have a question regarding the callback for a control tuple. Will it be >> similar to InputPort::process() method? Something like >> InputPort::processControlTuple(t) >> ? Or will it be a method of the operator similar to beginWindow()? >> >> When we say that the control tuple will be delivered at window boundary, >> does that mean all control tuples emitted in that window will be processed >> together at the end of the window? This would imply that there is no >> ordering among regular tuples and control tuples. >> >> I think we should get started with the option 1 - control tuples at window >> boundary, which seems to handle most of the use cases. For some cases which >> require option 2, we can always build on this. >> >> ~ Bhupesh >> >> On Thu, Nov 3, 2016 at 1:35 PM, Thomas Weise <t...@apache.org> wrote: >> >> > I don't see how that would work. Suppose you have a file splitter and >> > multiple partitions of block readers. The "end of file" event cannot be >> > processed downstream until all block readers are done. I also think that >> > this is related to the batch demarcation discussion and there should be a >> > single generalized mechanism to support this. >> > >> > >> > On Wed, Nov 2, 2016 at 10:51 PM, Pramod Immaneni <pra...@datatorrent.com >> > >> > wrote: >> > >> > > Suppose I am processing data in a file and I want to do something at >> the >> > > end of a file at the output operator, I would send an end file control >> > > tuple and act on it when I receive it at the output. In a single >> window I >> > > may end up processing multiple files and if I don't have multiple ports >> > and >> > > logical paths through the DAG (multiple partitions are ok). I can >> process >> > > end of each file immediately and also know what file was closed without >> > > sending extra identification information in the end file which I would >> > need >> > > if I am collecting all of them and processing at the end of the window. >> > > >> > > On Wed, Nov 2, 2016 at 2:45 PM, Thomas Weise <t...@apache.org> wrote: >> > > >> > > > The use cases listed in the original discussion don't call for option >> > 2. >> > > It >> > > > seems to come with additional complexity and implementation cost. >> > > > >> > > > Can those in favor of option 2 please also provide the use case for >> it. >> > > > >> > > > Thanks, >> > > > Thomas >> > > > >> > > > >> > > > On Wed, Nov 2, 2016 at 10:36 PM, Siyuan Hua <siy...@datatorrent.com> >> > > > wrote: >> > > > >> > > > > I will vote for approach 1. >> > > > > >> > > > > First of all that one sounds easier to do to me. And I think >> > > idempotency >> > > > is >> > > > > important. It may run at the cost of higher latency but I think it >> is >> > > ok >> > > > > >> > > > > And in addition, when in the future if users do need realtime >> control >> > > > tuple >> > > > > processing, we can always add the option on top of it. >> > > > > >> > > > > So I vote for 1 >> > > > > >> > > > > Thanks, >> > > > > Siyuan >> > > > > >> > > > > On Wed, Nov 2, 2016 at 1:28 PM, Pradeep A. Dalvi <p...@apache.org> >> > > > wrote: >> > > > > >> > > > > > As a rule of thumb in any real time operating system, control >> > tuples >> > > > > should >> > > > > > always be handled using Priority Queues. >> > > > > > >> > > > > > We may try to control priorities by defining levels. And shall >> not >> > > > > > be delivered at window boundaries. >> > > > > > >> > > > > > In short, control tuples shall never be treated as any other >> tuples >> > > in >> > > > > real >> > > > > > time systems. >> > > > > > >> > > > > > On Thursday, November 3, 2016, David Yan <da...@datatorrent.com> >> > > > wrote: >> > > > > > >> > > > > > > Hi all, >> > > > > > > >> > > > > > > I would like to renew the discussion of control tuples. >> > > > > > > >> > > > > > > Last time, we were in a debate about whether: >> > > > > > > >> > > > > > > 1) the platform should enforce that control tuples are >> delivered >> > at >> > > > > > window >> > > > > > > boundaries only >> > > > > > > >> > > > > > > or: >> > > > > > > >> > > > > > > 2) the platform should deliver control tuples just as other >> > tuples >> > > > and >> > > > > > it's >> > > > > > > the operator developers' choice whether to handle the control >> > > tuples >> > > > as >> > > > > > > they arrive or delay the processing till the next window >> > boundary. >> > > > > > > >> > > > > > > To summarize the pros and cons: >> > > > > > > >> > > > > > > Approach 1: If processing control tuples results in changes of >> > the >> > > > > > behavior >> > > > > > > of the operator, if idempotency needs to be preserved, the >> > > processing >> > > > > > must >> > > > > > > be done at window boundaries. This approach will save the >> > operator >> > > > > > > developers headache to ensure that. However, this will take >> away >> > > the >> > > > > > > choices from the operator developer if they just need to >> process >> > > the >> > > > > > > control tuples as soon as possible. >> > > > > > > >> > > > > > > Approach 2: The operator has a chance to immediately process >> > > control >> > > > > > > tuples. This would be useful if latency is more valued than >> > > > > correctness. >> > > > > > > However, if this would open the possibility for operator >> > developers >> > > > to >> > > > > > > shoot themselves in the foot. This is especially true if there >> > are >> > > > > > multiple >> > > > > > > input ports. as there is no easy way to guarantee processing >> > order >> > > > for >> > > > > > > multiple input ports. >> > > > > > > >> > > > > > > We would like to arrive to a consensus and close this >> discussion >> > > soon >> > > > > > this >> > > > > > > time so we can start the work on this important feature. >> > > > > > > >> > > > > > > Thanks! >> > > > > > > >> > > > > > > David >> > > > > > > >> > > > > > > On Tue, Jun 28, 2016 at 10:04 AM, Vlad Rozov < >> > > > v.ro...@datatorrent.com >> > > > > > > <javascript:;>> >> > > > > > > wrote: >> > > > > > > >> > > > > > > > It is not clear how operator will emit custom control tuple >> at >> > > > window >> > > > > > > > boundaries. One way is to cache/accumulate control tuples in >> > the >> > > > > > operator >> > > > > > > > output port till window closes (END_WINDOW is inserted into >> the >> > > > > output >> > > > > > > > sink) or only allow an operator to emit control tuples inside >> > the >> > > > > > > > endWindow(). The later is a slight variation of the operator >> > > output >> > > > > > port >> > > > > > > > caching behavior with the only difference that now the >> operator >> > > > > itself >> > > > > > is >> > > > > > > > responsible for caching/accumulating control tuples. Note >> that >> > in >> > > > > many >> > > > > > > > cases it will be necessary to postpone emitting payload >> tuples >> > > that >> > > > > > > > logically come after the custom control tuple till the next >> > > window >> > > > > > > begins. >> > > > > > > > >> > > > > > > > IMO, that too restrictive and in a case where input operator >> > > uses a >> > > > > > push >> > > > > > > > instead of a poll (for example, it provides an end point >> where >> > > > remote >> > > > > > > > agents may connect and publish/push data), control tuples may >> > be >> > > > used >> > > > > > for >> > > > > > > > connect/disconnect/watermark broadcast to (partitioned) >> > > downstream >> > > > > > > > operators. In this case the platform just need to guarantee >> > order >> > > > > > barrier >> > > > > > > > (any tuple emitted prior to a control tuple needs to be >> > delivered >> > > > > prior >> > > > > > > to >> > > > > > > > the control tuple). >> > > > > > > > >> > > > > > > > Thank you, >> > > > > > > > >> > > > > > > > Vlad >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > On 6/27/16 19:36, Amol Kekre wrote: >> > > > > > > > >> > > > > > > >> I agree with David. Allowing control tuples within a window >> > > (along >> > > > > > with >> > > > > > > >> data tuples) creates very dangerous situation where >> guarantees >> > > are >> > > > > > > >> impacted. It is much safer to enable control tuples >> > > (send/receive) >> > > > > at >> > > > > > > >> window boundaries (after END_WINDOW of window N, and before >> > > > > > BEGIN_WINDOW >> > > > > > > >> for window N+1). My take on David's list is >> > > > > > > >> >> > > > > > > >> 1. -> window boundaries -> Strong +1; there will be a big >> > issue >> > > > with >> > > > > > > >> guarantees for operators with multiple ports. (see Thomas's >> > > > > response) >> > > > > > > >> 2. -> All downstream windows -> +1, but there are >> situations; >> > a >> > > > > caveat >> > > > > > > >> could be "only to operators that implement control tuple >> > > > > > > >> interface/listeners", which could effectively translates to >> > "all >> > > > > > > >> interested >> > > > > > > >> downstream operators" >> > > > > > > >> 3. Only Input operator can create control tuples -> -1; is >> > > > > restrictive >> > > > > > > >> even >> > > > > > > >> though most likely 95% of the time it will be input >> operators >> > > > > > > >> >> > > > > > > >> Thks, >> > > > > > > >> Amol >> > > > > > > >> >> > > > > > > >> >> > > > > > > >> On Mon, Jun 27, 2016 at 4:37 PM, Thomas Weise < >> > > > > tho...@datatorrent.com >> > > > > > > <javascript:;>> >> > > > > > > >> wrote: >> > > > > > > >> >> > > > > > > >> The windowing we discuss here is in general event time >> based, >> > > > > arrival >> > > > > > > time >> > > > > > > >>> is a special case of it. >> > > > > > > >>> >> > > > > > > >>> I don't think state changes can be made independent of the >> > > > > streaming >> > > > > > > >>> window >> > > > > > > >>> boundary as it would prevent idempotent processing and >> > > > transitively >> > > > > > > >>> exactly >> > > > > > > >>> once. For that to work, tuples need to be presented to the >> > > > operator >> > > > > > in >> > > > > > > a >> > > > > > > >>> guaranteed order *within* the streaming window, which is >> not >> > > > > possible >> > > > > > > >>> with >> > > > > > > >>> multiple ports (and partitions). >> > > > > > > >>> >> > > > > > > >>> Thomas >> > > > > > > >>> >> > > > > > > >>> On Mon, Jun 27, 2016 at 2:53 PM, David Yan < >> > > > da...@datatorrent.com >> > > > > > > <javascript:;>> >> > > > > > > >>> wrote: >> > > > > > > >>> >> > > > > > > >>> I think for session tracking, if the session boundaries are >> > > > allowed >> > > > > > to >> > > > > > > be >> > > > > > > >>>> not aligned with the streaming window boundaries, the user >> > > will >> > > > > > have a >> > > > > > > >>>> >> > > > > > > >>> much >> > > > > > > >>> >> > > > > > > >>>> bigger problem with idempotency. And in most cases, >> session >> > > > > tracking >> > > > > > > is >> > > > > > > >>>> event time based, not ingression time or processing time >> > > based, >> > > > so >> > > > > > > this >> > > > > > > >>>> >> > > > > > > >>> may >> > > > > > > >>> >> > > > > > > >>>> never be a problem. But if that ever happens, the user can >> > > > always >> > > > > > > alter >> > > > > > > >>>> >> > > > > > > >>> the >> > > > > > > >>> >> > > > > > > >>>> default 500ms width. >> > > > > > > >>>> >> > > > > > > >>>> David >> > > > > > > >>>> >> > > > > > > >>>> On Mon, Jun 27, 2016 at 2:35 PM, Vlad Rozov < >> > > > > > v.ro...@datatorrent.com >> > > > > > > <javascript:;>> >> > > > > > > >>>> wrote: >> > > > > > > >>>> >> > > > > > > >>>> Ability to send custom control tuples within window may be >> > > > useful, >> > > > > > for >> > > > > > > >>>>> example, for sessions tracking, where session boundaries >> > are >> > > > not >> > > > > > > >>>>> >> > > > > > > >>>> aligned >> > > > > > > >>> >> > > > > > > >>>> with window boundaries and 500 ms latency is not >> acceptable >> > > for >> > > > an >> > > > > > > >>>>> application. >> > > > > > > >>>>> >> > > > > > > >>>>> Thank you, >> > > > > > > >>>>> >> > > > > > > >>>>> Vlad >> > > > > > > >>>>> >> > > > > > > >>>>> >> > > > > > > >>>>> On 6/25/16 10:52, Thomas Weise wrote: >> > > > > > > >>>>> >> > > > > > > >>>>> It should not matter from where the control tuple is >> > > triggered. >> > > > > It >> > > > > > > >>>>>> >> > > > > > > >>>>> will >> > > > > > > >>> >> > > > > > > >>>> be >> > > > > > > >>>> >> > > > > > > >>>>> good to have a generic mechanism to propagate it and >> other >> > > > things >> > > > > > can >> > > > > > > >>>>>> >> > > > > > > >>>>> be >> > > > > > > >>> >> > > > > > > >>>> accomplished outside the engine. For example, the new >> > > > > comprehensive >> > > > > > > >>>>>> support >> > > > > > > >>>>>> for windowing will all be in Malhar, nothing that the >> > engine >> > > > > needs >> > > > > > > to >> > > > > > > >>>>>> >> > > > > > > >>>>> know >> > > > > > > >>>> >> > > > > > > >>>>> about it except that we need the control tuple for >> > watermark >> > > > > > > >>>>>> >> > > > > > > >>>>> propagation >> > > > > > > >>> >> > > > > > > >>>> and idempotent processing. >> > > > > > > >>>>>> >> > > > > > > >>>>>> I also think the main difference to other tuples is the >> > need >> > > > to >> > > > > > send >> > > > > > > >>>>>> >> > > > > > > >>>>> it >> > > > > > > >>> >> > > > > > > >>>> to >> > > > > > > >>>> >> > > > > > > >>>>> all partitions. Which is similar to checkpoint window >> > tuples, >> > > > but >> > > > > > not >> > > > > > > >>>>>> >> > > > > > > >>>>> the >> > > > > > > >>>> >> > > > > > > >>>>> same. Here, we probably also need the ability for the >> user >> > to >> > > > > > control >> > > > > > > >>>>>> whether such tuple should traverse the entire DAG or >> not. >> > > For >> > > > a >> > > > > > > batch >> > > > > > > >>>>>> >> > > > > > > >>>>> use >> > > > > > > >>>> >> > > > > > > >>>>> case, for example, we may want to send the end of file to >> > the >> > > > > next >> > > > > > > >>>>>> operator, but not beyond, if the operator has >> asynchronous >> > > > > > > processing >> > > > > > > >>>>>> logic >> > > > > > > >>>>>> in it. >> > > > > > > >>>>>> >> > > > > > > >>>>>> For any logic to be idempotent, the control tuple needs >> to >> > > be >> > > > > > > >>>>>> >> > > > > > > >>>>> processed >> > > > > > > >>> >> > > > > > > >>>> at >> > > > > > > >>>> >> > > > > > > >>>>> a window boundary. Receiving the control tuple in the >> > window >> > > > > > callback >> > > > > > > >>>>>> would >> > > > > > > >>>>>> avoid having to track extra state in the operator. I >> don't >> > > > think >> > > > > > > >>>>>> >> > > > > > > >>>>> that's >> > > > > > > >>> >> > > > > > > >>>> a >> > > > > > > >>>> >> > > > > > > >>>>> major issue, but what is the use case for processing a >> > > control >> > > > > > tuple >> > > > > > > >>>>>> within >> > > > > > > >>>>>> the window? >> > > > > > > >>>>>> >> > > > > > > >>>>>> Thomas >> > > > > > > >>>>>> >> > > > > > > >>>>>> >> > > > > > > >>>>>> >> > > > > > > >>>>>> On Sat, Jun 25, 2016 at 6:19 AM, Pramod Immaneni < >> > > > > > > >>>>>> >> > > > > > > >>>>> pra...@datatorrent.com <javascript:;>> >> > > > > > > >>>> >> > > > > > > >>>>> wrote: >> > > > > > > >>>>>> >> > > > > > > >>>>>> For the use cases you mentioned, I think 1) and 2) are >> > more >> > > > > likely >> > > > > > > to >> > > > > > > >>>>>> >> > > > > > > >>>>>>> be controlled directly by the application, 3) and 4) >> are >> > > more >> > > > > > > likely >> > > > > > > >>>>>>> going to be triggered externally and directly handled >> by >> > > the >> > > > > > engine >> > > > > > > >>>>>>> and 3) is already being implemented that way >> > > (apexcore-163). >> > > > > > > >>>>>>> >> > > > > > > >>>>>>> The control tuples emitted by an operator would be sent >> > to >> > > > all >> > > > > > > >>>>>>> downstream partitions isn't it and that would be the >> > chief >> > > > > > > >>>>>>> >> > > > > > > >>>>>> distinction >> > > > > > > >>> >> > > > > > > >>>> compared to data (apart from the payload) which would get >> > > > > > partitioned >> > > > > > > >>>>>>> under normal circumstances. It would also be guaranteed >> > > that >> > > > > > > >>>>>>> downstream partitions will receive control tuples only >> > > after >> > > > > the >> > > > > > > data >> > > > > > > >>>>>>> that was sent before it so we could send it immediately >> > > when >> > > > it >> > > > > > is >> > > > > > > >>>>>>> emitted as opposed to window boundaries. >> > > > > > > >>>>>>> >> > > > > > > >>>>>>> However during unification it is important to know if >> > these >> > > > > > control >> > > > > > > >>>>>>> tuples have been received from all upstream partitions >> > > before >> > > > > > > >>>>>>> proceeding with a control operation. One could wait >> till >> > > end >> > > > of >> > > > > > the >> > > > > > > >>>>>>> window but that introduces a delay however small, I >> would >> > > > like >> > > > > to >> > > > > > > add >> > > > > > > >>>>>>> to the proposal that the platform only hand over the >> > > control >> > > > > > tuple >> > > > > > > to >> > > > > > > >>>>>>> the unifier when it has been received from all upstream >> > > > > > partitions >> > > > > > > >>>>>>> much like how end window is processed but not wait till >> > the >> > > > > > actual >> > > > > > > >>>>>>> >> > > > > > > >>>>>> end >> > > > > > > >>> >> > > > > > > >>>> of the window. >> > > > > > > >>>>>>> >> > > > > > > >>>>>>> Regd your concern about idempotency, we typically care >> > > about >> > > > > > > >>>>>>> idempotency at a window level and doing the above will >> > > still >> > > > > > allow >> > > > > > > >>>>>>> >> > > > > > > >>>>>> the >> > > > > > > >>> >> > > > > > > >>>> operators to preserve that easily. >> > > > > > > >>>>>>> >> > > > > > > >>>>>>> Thanks >> > > > > > > >>>>>>> >> > > > > > > >>>>>>> On Jun 24, 2016, at 11:22 AM, David Yan < >> > > > da...@datatorrent.com >> > > > > > > <javascript:;>> >> > > > > > > >>>>>>> >> > > > > > > >>>>>> wrote: >> > > > > > > >>> >> > > > > > > >>>> Hi all, >> > > > > > > >>>>>>>> >> > > > > > > >>>>>>>> I would like to propose a new feature to the Apex core >> > > > engine >> > > > > -- >> > > > > > > the >> > > > > > > >>>>>>>> support of custom control tuples. Currently, we have >> > > control >> > > > > > > tuples >> > > > > > > >>>>>>>> >> > > > > > > >>>>>>> such >> > > > > > > >>>> >> > > > > > > >>>>> as >> > > > > > > >>>>>>> >> > > > > > > >>>>>>> BEGIN_WINDOW, END_WINDOW, CHECKPOINT, and so on, but we >> > > don't >> > > > > > have >> > > > > > > >>>>>>>> >> > > > > > > >>>>>>> the >> > > > > > > >>> >> > > > > > > >>>> support for applications to insert their own control >> tuples. >> > > The >> > > > > way >> > > > > > > >>>>>>>> currently to get around this is to use data tuples and >> > > have >> > > > a >> > > > > > > >>>>>>>> >> > > > > > > >>>>>>> separate >> > > > > > > >>> >> > > > > > > >>>> port >> > > > > > > >>>>>>> >> > > > > > > >>>>>>> for such tuples that sends tuples to all partitions of >> > the >> > > > > > > >>>>>>>> >> > > > > > > >>>>>>> downstream >> > > > > > > >>> >> > > > > > > >>>> operators, which is not exactly developer friendly. >> > > > > > > >>>>>>>> >> > > > > > > >>>>>>>> We have already seen a number of use cases that can >> use >> > > this >> > > > > > > >>>>>>>> >> > > > > > > >>>>>>> feature: >> > > > > > > >>> >> > > > > > > >>>> 1) Batch support: We need to tell all operators of the >> > > physical >> > > > > DAG >> > > > > > > >>>>>>>> >> > > > > > > >>>>>>> when >> > > > > > > >>>> >> > > > > > > >>>>> a >> > > > > > > >>>>>>> >> > > > > > > >>>>>>> batch starts and ends, so the operators can do whatever >> > > that >> > > > is >> > > > > > > >>>>>>>> >> > > > > > > >>>>>>> needed >> > > > > > > >>> >> > > > > > > >>>> upon >> > > > > > > >>>>>>> >> > > > > > > >>>>>>> the start or the end of a batch. >> > > > > > > >>>>>>>> >> > > > > > > >>>>>>>> 2) Watermark: To support the concepts of event time >> > > > windowing, >> > > > > > the >> > > > > > > >>>>>>>> watermark control tuple is needed to tell which >> windows >> > > > should >> > > > > > be >> > > > > > > >>>>>>>> considered late. >> > > > > > > >>>>>>>> >> > > > > > > >>>>>>>> 3) Changing operator properties: We do have the >> support >> > of >> > > > > > > changing >> > > > > > > >>>>>>>> operator properties on the fly, but with a custom >> > control >> > > > > tuple, >> > > > > > > the >> > > > > > > >>>>>>>> command to change operator properties can be window >> > > aligned >> > > > > for >> > > > > > > all >> > > > > > > >>>>>>>> partitions and also across the DAG. >> > > > > > > >>>>>>>> >> > > > > > > >>>>>>>> 4) Recording tuples: Like changing operator >> properties, >> > we >> > > > do >> > > > > > have >> > > > > > > >>>>>>>> >> > > > > > > >>>>>>> this >> > > > > > > >>>> >> > > > > > > >>>>> support now but only at the individual physical operator >> > > level, >> > > > > and >> > > > > > > >>>>>>>> >> > > > > > > >>>>>>>> without >> > > > > > > >>>>>>> >> > > > > > > >>>>>>> control of which window to record tuples for. With a >> > custom >> > > > > > control >> > > > > > > >>>>>>>> >> > > > > > > >>>>>>>> tuple, >> > > > > > > >>>>>>> >> > > > > > > >>>>>>> because a control tuple must belong to a window, all >> > > > operators >> > > > > in >> > > > > > > >>>>>>>> >> > > > > > > >>>>>>> the >> > > > > > > >>> >> > > > > > > >>>> DAG >> > > > > > > >>>>>>>> can start (and stop) recording for the same windows. >> > > > > > > >>>>>>>> >> > > > > > > >>>>>>>> I can think of two options to achieve this: >> > > > > > > >>>>>>>> >> > > > > > > >>>>>>>> 1) new custom control tuple type that takes user's >> > > > > serializable >> > > > > > > >>>>>>>> >> > > > > > > >>>>>>> object. >> > > > > > > >>>> >> > > > > > > >>>>> 2) piggy back the current BEGIN_WINDOW and END_WINDOW >> > control >> > > > > > > >>>>>>>> >> > > > > > > >>>>>>> tuples. >> > > > > > > >>> >> > > > > > > >>>> Please provide your feedback. Thank you. >> > > > > > > >>>>>>>> >> > > > > > > >>>>>>>> David >> > > > > > > >>>>>>>> >> > > > > > > >>>>>>>> >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >>