This is needed, the batch start-end have similar semantics as start-end window from operational/functional perspective.
Thks Amol *Join us at Apex Big Data World-San Jose <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!* [image: http://www.apexbigdata.com/san-jose-register.html] <http://www.apexbigdata.com/san-jose-register.html> On Tue, Feb 14, 2017 at 9:55 PM, Bhupesh Chawda <[email protected]> wrote: > +1 for having an immediate delivery mechanism as well. > > I would suggest that the other delivery mechanism stays at end of window, > to be consistent, as I think it may be difficult to determine the last > arrival of the tuple. > > ~ Bhupesh > > On Wed, Feb 15, 2017 at 7:04 AM, Pramod Immaneni <[email protected]> > wrote: > > > There have been some recent developments and discussions on the schema > side > > (link below) that warrant a reconsideration of how control tuples get > > delivered. > > > > http://apache.markmail.org/search/?q=apex+list%3Aorg. > > apache.apex.dev+schema+discovery+support#query:apex% > > 20list%3Aorg.apache.apex.dev%20schema%20discovery%20support+page:1+mid: > > oaji26y3xfozap5v+state:results > > > > What I would like to suggest is that we allow two delivery options for > > control tuples which can be configured on a per control tuple basis. > First > > is to deliver control tuple to the operator when the first instance of > the > > tuple arrives from any path. Second option is to deliver the control > tuple > > when the last instance of the tuple arrives from all the paths or at the > > end window if it is going to be difficult to determine the last arrival. > > The developer can choose the delivery option for the control tuple > > preferably when the tuple is created. The first option will be useful for > > scenarios like schema propagation or begin file in case of batch cases. > The > > second option will be useful for tuples like end file or end batch in > batch > > use cases. > > > > Thanks > > > > On Tue, Jan 10, 2017 at 12:27 PM, Bhupesh Chawda < > [email protected]> > > wrote: > > > > > Hi All, > > > > > > Based on some discussion here is what is planned for the propagation > > > feature for control tuples. > > > > > > The signature of the *processControl()* method in > > > *ControlAwareDefaultInputPort* which is implemented by the operator > > > developer will be as follows: > > > > > > *public abstract boolean processControl(UserDefinedControlTuple > > payload);* > > > > > > The boolean returned by the processControl() method indicates (to the > > > engine) whether or not the operator is able to handle the control tuple > > and > > > wants to take care of the propagation of the control tuple. > > > > > > - If the method returns true - indicating it is able to handle the > > > control tuple, the operator has to explicitly emit the control > tuples > > to > > > the output ports it wishes to propagate to. > > > > > > > > > - If the method returns false - indicating it is not able to handle > > the > > > control tuple, the control tuple will be propagated by the engine to > > all > > > output ports. > > > > > > The operator may even emit new control tuples in either of the cases. > > > Note that for ports that are not control aware, the control tuple is > > > propagated by default. > > > > > > We don't need any output port annotations or operator level attributes. > > > > > > ~ Bhupesh > > > > > > > > > On Mon, Jan 9, 2017 at 5:16 PM, Tushar Gosavi <[email protected]> > > > wrote: > > > > > > > On Sun, Jan 8, 2017 at 11:49 PM, Vlad Rozov <[email protected] > > > > > > wrote: > > > > > +1 to manage propagation at an operator level. An operator is > either > > > > control > > > > > tuple aware and needs to manage how control tuples are routed from > > > input > > > > > ports to output ports or it is not. In the later case it does not > > > matter > > > > how > > > > > many input and output ports the operator has and it is the Apex > > > platform > > > > > responsibility to route control tuples. I don't see a use case > where > > an > > > > > operator that is not aware of a control tuple needs to manage one > or > > > more > > > > > input ports (or similar output ports) differently than others. > > > > > > > > > > > > > The problem with giving explicit control to operator for routing of > > > > custom tuples is how does the operator > > > > developer knows about control tuple requirement for downstream > > > > operators in an application. For example in following DAG > > > > A -> B -> C > > > > A - is my custom source operator which emits a new control tuple type > > C1 > > > > and C. > > > > B - is operator from malhar which handle control tuple C. > > > > C - is custom output operator which handles C1. > > > > > > > > If B is managing control tuples, then it needs to remember to foward > > > > unhandled tuples on all output port, else it will block > > > > the tuples for downstream operator which might need them, also if new > > > > output port is added then B needs to send that tuples > > > > on the new output port also. But In this case I can't simply extend B > > > > as port objects are transient and mostly anonymous, > > > > I can not extend these to send control tuples on new output port. In > > > > my opinion we should let the control tuple flow through > > > > entire DAG from their source and let each operator in the path to > > > > handle/ignore them as required without blocking them. > > > > > > > > > > > > > In general, an operator is aware only of a specific control > tuple(s) > > > (for > > > > > example end of batch or end of file) and for a control tuples that > it > > > was > > > > > not enabled for, the behavior should be exactly the same as if the > > > > operator > > > > > is not control tuple aware, meaning that those control tuples > should > > be > > > > > propagated from input ports to output ports by the platform. There > > > > should be > > > > > an ability to let the platform know what control tuples an operator > > is > > > > aware > > > > > of and can handle. This can be done both by API call and an > > annotation. > > > > > > > > > > > > > I think this will add overhead while developing applications. > Operator > > > > developer needs to add code to handle new control tuple also > > > > need to update the part of code to register the type with engine. And > > > > platfoms needs to perform type check and develiver the tuples > > > > accordingly. Instead operator developer could check the type of > > > > incoming tuple and handle it as required. > > > > > > > > - Tushar. > > > > > > > > > > > > > Thank you, > > > > > > > > > > Vlad > > > > > > > > > > > > > > > On 1/5/17 13:04, Bhupesh Chawda wrote: > > > > >> > > > > >> Agreed Thomas. > > > > >> I was referring to the persona of the operator developer. The user > > of > > > > the > > > > >> operator would not be doing anything related to the propagation of > > > > control > > > > >> tuples. Actually, the behavior of the operator wrt. propagation of > > > > control > > > > >> tuples would be part of the operator documentation. > > > > >> > > > > >> Also, we are providing options for the developer to route the flow > > of > > > > >> control tuples in code during the development of the operator. The > > > > >> annotations would actually help achieve it in a easier way. > > > > >> > > > > >> ~ Bhupesh > > > > >> > > > > >> On Jan 5, 2017 21:40, "Thomas Weise" <[email protected]> wrote: > > > > >> > > > > >> I think it is important to be clear on the roles with regard to > this > > > > >> functionality. The user of the operator should not have to do > > anything > > > > to > > > > >> get it to work. So while I suggested to consider attributes > earlier, > > > > there > > > > >> should not be any need for the user to set those. The operator > needs > > > to > > > > >> work as is. > > > > >> > > > > >> The persona concerned with propagation of control tuples is the > > > operator > > > > >> developer. I think the clear way for the operator developer to > > > override > > > > >> the > > > > >> propagation behavior is in code and if that is possible there is > no > > > need > > > > >> for other things such as attributes or other port level settings. > > > > >> > > > > >> Thomas > > > > >> > > > > >> > > > > >> On Wed, Jan 4, 2017 at 10:20 PM, Bhupesh Chawda < > > > > [email protected]> > > > > >> wrote: > > > > >> > > > > >>> I think we all agree on the use case for selective propagation. > The > > > > >>> question is about where to have the control - at the operator > level > > > or > > > > at > > > > >>> the port level. > > > > >>> > > > > >>> For this ability, we have the following options: > > > > >>> > > > > >>> 1. Operator disables the propagation on selected output > ports. > > > > Other > > > > >>> output ports propagate by default. > > > > >>> 2. Operator disables propagation for the entire operator (by > > > means > > > > of > > > > >> > > > > >> an > > > > >>> > > > > >>> attribute). Operator developer explicitly emits the received > > > > control > > > > >>> tuples > > > > >>> on selected output ports. > > > > >>> > > > > >>> If the decision is to completely block the propagation, then > > Option 2 > > > > is > > > > >>> easier to use as just an attribute needs to be set, as opposed to > > > > Option > > > > >>> 1 > > > > >>> where user needs to set the annotation on each output port. > > > > >>> > > > > >>> However, if selective propagation is needed, Option 1 would just > > need > > > > the > > > > >>> user to disable propagation on certain ports; rest are propagated > > by > > > > >>> default, while Option 2 requires the user to explicitly emit the > > > > control > > > > >>> tuples. > > > > >>> ~ Bhupesh > > > > >>> > > > > >>> > > > > >>> On Thu, Jan 5, 2017 at 3:46 AM, Thomas Weise <[email protected]> > > wrote: > > > > >>> > > > > >>>> Yes, I think that for any of these cases the operator developer > > will > > > > >> > > > > >> turn > > > > >>>> > > > > >>>> of implicit propagation for the operator and then write the code > > to > > > > >> > > > > >> route > > > > >>>> > > > > >>>> or create control tuples as needed. > > > > >>>> > > > > >>>> Thomas > > > > >>>> > > > > >>>> On Wed, Jan 4, 2017 at 12:59 PM, Amol Kekre < > [email protected] > > > > > > > >>> > > > > >>> wrote: > > > > >>>>> > > > > >>>>> I agree that by default the propagation must be implicit, i.e. > if > > > the > > > > >>>>> operator does nothing, the control tuple propagates. I do think > > > users > > > > >>>>> should have control on deciding to "not propagate" or "create > > new" > > > > and > > > > >>> > > > > >>> in > > > > >>>>> > > > > >>>>> these cases they would need to do something explicit > (override)? > > > > >>>>> > > > > >>>>> The following cases come to mind > > > > >>>>> 1. Sole consumer of a particular control signal (for example > end > > of > > > > >>> > > > > >>> file) > > > > >>>>> > > > > >>>>> 2. Creator of a particular control signal (start of file, or a > > > signal > > > > >>> > > > > >>> to > > > > >>>>> > > > > >>>>> pause on something etc.) > > > > >>>>> 3. One port on a data pipeline and other port for meta-data > > > pipeline > > > > >>>>> > > > > >>>>> In the above cases emit will be decided on an output port. #1 > is > > > only > > > > >>>> > > > > >>>> place > > > > >>>>> > > > > >>>>> where all output ports will disable the tuple, #2 and #3 most > > > likely > > > > >>> > > > > >>> will > > > > >>>>> > > > > >>>>> be selective. > > > > >>>>> > > > > >>>>> Thks > > > > >>>>> Amol > > > > >>>>> > > > > >>>>> > > > > >>>>> On Wed, Jan 4, 2017 at 12:25 PM, Thomas Weise <[email protected]> > > > > wrote: > > > > >>>>> > > > > >>>>>> I think there is (1) implicit propagation just like other > > control > > > > >>>> > > > > >>>> tuples > > > > >>>>>> > > > > >>>>>> where the operator code isn't involved and (2) where the > > operator > > > > >>>>> > > > > >>>>> developer > > > > >>>>>> > > > > >>>>>> wants to decide how control tuples are created or routed and > > will > > > > >>>> > > > > >>>> receive > > > > >>>>>> > > > > >>>>>> and emit them on the output ports as desired. > > > > >>>>>> > > > > >>>>>> I don't see a use case for hybrid approaches? Maybe > propagation > > > does > > > > >>>> > > > > >>>> not > > > > >>>>>> > > > > >>>>>> need to be tied to ports at all, maybe just by annotation at > the > > > > >>>> > > > > >>>> operator > > > > >>>>>> > > > > >>>>>> level? > > > > >>>>>> > > > > >>>>>> Thomas > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> On Wed, Jan 4, 2017 at 10:59 AM, Bhupesh Chawda < > > > > >>>> > > > > >>>> [email protected] > > > > >>>>>> > > > > >>>>>> wrote: > > > > >>>>>> > > > > >>>>>>> Wouldn't having this with output ports give a finer control > on > > > the > > > > >>>>>>> propagation of control tuples? > > > > >>>>>>> We might have an operator with two output ports each of which > > > > >>> > > > > >>> creates > > > > >>>>> > > > > >>>>> two > > > > >>>>>>> > > > > >>>>>>> different pipelines downstream. We would be able to say that > > one > > > > >>>>> > > > > >>>>> pipeline > > > > >>>>>>> > > > > >>>>>>> gets the control tuples and the other doesn't. > > > > >>>>>>> > > > > >>>>>>> ~ Bhupesh > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>>> On Jan 4, 2017 11:55 PM, "Thomas Weise" <[email protected]> > > wrote: > > > > >>>>>>> > > > > >>>>>>> I'm referring to the operator that needs to make the decision > > to > > > > >>>>>> > > > > >>>>>> propagate > > > > >>>>>>> > > > > >>>>>>> or not. The tuples come from an input port, so it seems > > > > >> > > > > >> appropriate > > > > >>>> > > > > >>>> to > > > > >>>>>> > > > > >>>>>> say > > > > >>>>>>> > > > > >>>>>>> "don't propagate control tuples from this port". No matter > how > > > > >> > > > > >> many > > > > >>>>>> > > > > >>>>>> output > > > > >>>>>>> > > > > >>>>>>> ports there are. > > > > >>>>>>> > > > > >>>>>>> Output ports are there for an operator to emit new tuples, in > > the > > > > >>>> > > > > >>>> case > > > > >>>>>> > > > > >>>>>> you > > > > >>>>>>> > > > > >>>>>>> are discussing you don't emit new control tuples. > > > > >>>>>>> > > > > >>>>>>> Thomas > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>>> On Wed, Jan 4, 2017 at 9:39 AM, Bhupesh Chawda < > > > > >>>>> > > > > >>>>> [email protected]> > > > > >>>>>>> > > > > >>>>>>> wrote: > > > > >>>>>>> > > > > >>>>>>>> Hi Thomas, > > > > >>>>>>>> > > > > >>>>>>>> Are you suggesting an attribute on the input port for > > > > >> > > > > >> controlling > > > > >>>> > > > > >>>> the > > > > >>>>>>>> > > > > >>>>>>>> propagation of control tuples to downstream operators? > > > > >>>>>>>> I think it should be better to do it on the output port > since > > > > >> > > > > >> the > > > > >>>>>>> > > > > >>>>>>> decision > > > > >>>>>>>> > > > > >>>>>>>> to block the propagation will be made at the upstream > operator > > > > >>>> > > > > >>>> rather > > > > >>>>>>> > > > > >>>>>>> than > > > > >>>>>>>> > > > > >>>>>>>> at the downstream. > > > > >>>>>>>> Also, we need another way of controlling the propagation at > > run > > > > >>>> > > > > >>>> time > > > > >>>>>> > > > > >>>>>> and > > > > >>>>>>>> > > > > >>>>>>>> hence I was thinking about the method call on the output > port, > > > > >> > > > > >> in > > > > >>>>>>> > > > > >>>>>>> addition > > > > >>>>>>>> > > > > >>>>>>>> to the annotation on the output port (which is the static > > way). > > > > >>>>>>>> > > > > >>>>>>>> Please correct me if I have misunderstood your question. > > > > >>>>>>>> > > > > >>>>>>>> ~ Bhupesh > > > > >>>>>>>> > > > > >>>>>>>> On Wed, Jan 4, 2017 at 7:26 PM, Thomas Weise < > [email protected]> > > > > >>>> > > > > >>>> wrote: > > > > >>>>>>>>> > > > > >>>>>>>>> Wouldn't it be more intuitive to control this with an > > > > >> > > > > >> attribute > > > > >>>> > > > > >>>> on > > > > >>>>>> > > > > >>>>>> the > > > > >>>>>>>>> > > > > >>>>>>>>> input port? > > > > >>>>>>>>> > > > > >>>>>>>>> > > > > >>>>>>>>> On Tue, Jan 3, 2017 at 11:06 PM, Bhupesh Chawda < > > > > >>>>>>> > > > > >>>>>>> [email protected] > > > > >>>>>>>>> > > > > >>>>>>>>> wrote: > > > > >>>>>>>>> > > > > >>>>>>>>>> Hi Pramod, > > > > >>>>>>>>>> > > > > >>>>>>>>>> I was thinking of a method setPropagateControlTuples( > > boolean > > > > >>>>>>>> > > > > >>>>>>>> propagate) > > > > >>>>>>>>> > > > > >>>>>>>>> on > > > > >>>>>>>>>> > > > > >>>>>>>>>> the output port of the operator. > > > > >>>>>>>>>> The operator could disable this in the code at any point > of > > > > >>>> > > > > >>>> time. > > > > >>>>>>>>>> > > > > >>>>>>>>>> Note however that this is to block the propagation of > > > > >> > > > > >> control > > > > >>>>>> > > > > >>>>>> tuples > > > > >>>>>>>> > > > > >>>>>>>> from > > > > >>>>>>>>>> > > > > >>>>>>>>>> upstream. Any control tuples emitted explicitly by the > > > > >>> > > > > >>> operator > > > > >>>>>> > > > > >>>>>> would > > > > >>>>>>>>> > > > > >>>>>>>>> still > > > > >>>>>>>>>> > > > > >>>>>>>>>> be emitted and sent to the downstream operators. > > > > >>>>>>>>>> > > > > >>>>>>>>>> Please see > > > > >>>>>>>>>> https://github.com/apache/apex-core/pull/440/files#diff- > > > > >>>>>>>>>> 8aa0ca1a3e645fa60e9b376c118c00a3R68 > > > > >>>>>>>>>> in the PR. > > > > >>>>>>>>>> > > > > >>>>>>>>>> ~ Bhupesh > > > > >>>>>>>>>> > > > > >>>>>>>>>> On Wed, Jan 4, 2017 at 6:53 AM, Pramod Immaneni < > > > > >>>>>>>> > > > > >>>>>>>> [email protected]> > > > > >>>>>>>>>> > > > > >>>>>>>>>> wrote: > > > > >>>>>>>>>> > > > > >>>>>>>>>>> 2 sounds good. Have you thought about what the method > > > > >> > > > > >> would > > > > >>>>> > > > > >>>>> look > > > > >>>>>>>> > > > > >>>>>>>> like. > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> On Sat, Dec 31, 2016 at 8:29 PM, Bhupesh Chawda < > > > > >>>>>>>>> > > > > >>>>>>>>> [email protected] > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> wrote: > > > > >>>>>>>>>>> > > > > >>>>>>>>>>>> Yes, that makes sense. > > > > >>>>>>>>>>>> We have following options: > > > > >>>>>>>>>>>> 1. Make the annotation false by default and force the > > > > >>> > > > > >>> user > > > > >>>> > > > > >>>> to > > > > >>>>>>>> > > > > >>>>>>>> forward > > > > >>>>>>>>>> > > > > >>>>>>>>>> the > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> control tuples explicitly. > > > > >>>>>>>>>>>> 2. Annotation is true by default and static way of > > > > >>> > > > > >>> blocking > > > > >>>>>> > > > > >>>>>> stays > > > > >>>>>>>> > > > > >>>>>>>> as > > > > >>>>>>>>> > > > > >>>>>>>>> it > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> is. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> We provide another way for blocking programmatically, > > > > >>>> > > > > >>>> perhaps > > > > >>>>>> > > > > >>>>>> by > > > > >>>>>>>>> > > > > >>>>>>>>> means > > > > >>>>>>>>>> > > > > >>>>>>>>>> of > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> another method call on the port. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> ~ Bhupesh > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> On Dec 30, 2016 00:09, "Pramod Immaneni" < > > > > >>>>>> > > > > >>>>>> [email protected] > > > > >>>>>>>>>> > > > > >>>>>>>>>> wrote: > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> Bhupesh, > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> Annotation seems like a static way to stop > > > > >> > > > > >> propagation. > > > > >>>>> > > > > >>>>> Give > > > > >>>>>>>> > > > > >>>>>>>> these > > > > >>>>>>>>>> > > > > >>>>>>>>>> are > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> programmatically generated I would think the operators > > > > >>>>> > > > > >>>>> should > > > > >>>>>>> > > > > >>>>>>> be > > > > >>>>>>>>> > > > > >>>>>>>>> able > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> to > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> stop (consume without propagating) programmatically as > > > > >>>>> > > > > >>>>> well. > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> Thanks > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> On Thu, Dec 29, 2016 at 8:48 AM, Bhupesh Chawda < > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> [email protected] > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> wrote: > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> Thanks Vlad, I am trying out the approach you > > > > >>> > > > > >>> mentioned > > > > >>>>>>>> > > > > >>>>>>>> regarding > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> having > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> another interface which allows sinks to put a > > > > >> > > > > >> control > > > > >>>>>> > > > > >>>>>> tuple. > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> Regarding the delivery of control tuples, here is > > > > >>> > > > > >>> what > > > > >>>> > > > > >>>> I > > > > >>>>> > > > > >>>>> am > > > > >>>>>>>>>> > > > > >>>>>>>>>> planning > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> to > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> do: > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> All the control tuples which are emitted in a > > > > >>>> > > > > >>>> particular > > > > >>>>>>> > > > > >>>>>>> window > > > > >>>>>>>>> > > > > >>>>>>>>> are > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> delivered after all the data tuples have been > > > > >>> > > > > >>> delivered > > > > >>>>> > > > > >>>>> to > > > > >>>>>>> > > > > >>>>>>> the > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> respective > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> ports, but before the endWindow() call. The operator > > > > >>>> > > > > >>>> can > > > > >>>>>> > > > > >>>>>> then > > > > >>>>>>>>>> > > > > >>>>>>>>>> process > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> the > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> control tuples in that window and can do any > > > > >>>> > > > > >>>> finalization > > > > >>>>>> > > > > >>>>>> in > > > > >>>>>>>> > > > > >>>>>>>> the > > > > >>>>>>>>>> > > > > >>>>>>>>>> end > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> window > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> call. There will be no delivery of control tuples > > > > >>> > > > > >>> after > > > > >>>>>>>>> > > > > >>>>>>>>> endWindow() > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> and > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> before the next beginWindow() call. > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> For handling the propagation of control tuples > > > > >>> > > > > >>> further > > > > >>>> > > > > >>>> in > > > > >>>>>> > > > > >>>>>> the > > > > >>>>>>>>> > > > > >>>>>>>>> dag, > > > > >>>>>>>>>> > > > > >>>>>>>>>> we > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> are > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> planning to have an annotation on the Output Port of > > > > >>>> > > > > >>>> the > > > > >>>>>>>> > > > > >>>>>>>> operator > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> which > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> would be true by default. > > > > >>>>>>>>>>>>>> @OutputPortFieldAnnotation(propogateControlTuples = > > > > >>>>>> > > > > >>>>>> false). > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> ~ Bhupesh > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> On Thu, Dec 29, 2016 at 6:24 AM, Vlad Rozov < > > > > >>>>>>>>>> > > > > >>>>>>>>>> [email protected] > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> wrote: > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> Custom control tuples are control tuples emitted > > > > >> > > > > >> by > > > > >>>> > > > > >>>> an > > > > >>>>>>>> > > > > >>>>>>>> operator > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> itself > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> and > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> not by the platform. Prior to the introduction of > > > > >>> > > > > >>> the > > > > >>>>>>> > > > > >>>>>>> custom > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> control > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> tuples, only Apex engine itself puts control > > > > >> > > > > >> tuples > > > > >>>>> > > > > >>>>> into > > > > >>>>>>>>> > > > > >>>>>>>>> various > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> sinks, > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> so > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> the engine created necessary Tuple objects with > > > > >> > > > > >> the > > > > >>>>>>>>> > > > > >>>>>>>>> corresponding > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> type > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> prior to calling Sink.put(). > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> Not all sinks need to be changed. Only control > > > > >>> > > > > >>> tuple > > > > >>>>>> > > > > >>>>>> aware > > > > >>>>>>>>> > > > > >>>>>>>>> sinks > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> should > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> provide such functionality. In the case there is a > > > > >>>> > > > > >>>> lot > > > > >>>>> > > > > >>>>> of > > > > >>>>>>>> > > > > >>>>>>>> code > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> duplication, > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> please create an abstract class, that other > > > > >> > > > > >> control > > > > >>>>> > > > > >>>>> aware > > > > >>>>>>>> > > > > >>>>>>>> sinks > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> will > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> extend > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> from. > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> Thank you, > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> Vlad > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> On 12/23/16 06:24, Bhupesh Chawda wrote: > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> Hi Vlad, > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> Thanks for the pointer on delegating the wrapping > > > > >>> > > > > >>> of > > > > >>>>> > > > > >>>>> the > > > > >>>>>>>> > > > > >>>>>>>> user > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> tuple > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> to > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> the > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> control port. I was trying this out today. > > > > >>>>>>>>>>>>>>>> The problem I see us if we introduce a > > > > >>>>> > > > > >>>>> putControlTuple() > > > > >>>>>>>>> > > > > >>>>>>>>> method > > > > >>>>>>>>>> > > > > >>>>>>>>>> in > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> Sink, > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> then a lot of the existing sinks would change. > > > > >>> > > > > >>> Also > > > > >>>>> > > > > >>>>> the > > > > >>>>>>>>> > > > > >>>>>>>>> changes > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> seemed > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> redundant as, the existing control tuples already > > > > >>>> > > > > >>>> use > > > > >>>>>> > > > > >>>>>> the > > > > >>>>>>>>> > > > > >>>>>>>>> put() > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> method > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> of > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> sinks. So why do something special for custom > > > > >>>> > > > > >>>> control > > > > >>>>>>>> > > > > >>>>>>>> tuples? > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> The only aspect in which the custom control > > > > >> > > > > >> tuples > > > > >>>> > > > > >>>> are > > > > >>>>>>>>> > > > > >>>>>>>>> different > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> is > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> that > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> these will be generated by the user and will > > > > >>>> > > > > >>>> actually > > > > >>>>> > > > > >>>>> be > > > > >>>>>>>>>> > > > > >>>>>>>>>> delivered > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> to > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> the > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> ports in a different order. Perhaps we should be > > > > >>>> > > > > >>>> able > > > > >>>>> > > > > >>>>> to > > > > >>>>>>> > > > > >>>>>>> use > > > > >>>>>>>>> > > > > >>>>>>>>> the > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> existing > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> flow. The only problems as outlined before seem > > > > >> > > > > >> to > > > > >>>> > > > > >>>> be > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> identification > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> of > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> the > > > > >>>>>>>>>>>>>>>> user tuple as a control tuple. > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> ~ Bhupesh > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> On Thu, Dec 22, 2016 at 10:44 PM, Vlad Rozov < > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> [email protected] > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> wrote: > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> Why is it necessary to wrap in the OutputPort? > > > > >>> > > > > >>> Can't > > > > >>>>> > > > > >>>>> it > > > > >>>>>> > > > > >>>>>> be > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> delegated > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> to > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> a > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> Sink by introducing new putControlTuple method? > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> Thank you, > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> Vlad > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> On 12/21/16 22:10, Bhupesh Chawda wrote: > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> Hi Vlad, > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> The problem in using the Tuple class as the > > > > >>>> > > > > >>>> wrapper > > > > >>>>> > > > > >>>>> is > > > > >>>>>>>> > > > > >>>>>>>> that > > > > >>>>>>>>>> > > > > >>>>>>>>>> the > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> Ports > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> belong to the API and we want to wrap the > > > > >>> > > > > >>> payload > > > > >>>>>> > > > > >>>>>> object > > > > >>>>>>>> > > > > >>>>>>>> of > > > > >>>>>>>>>> > > > > >>>>>>>>>> the > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> control > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> tuple into the Tuple class which is not part of > > > > >>>> > > > > >>>> the > > > > >>>>>> > > > > >>>>>> API. > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> The output port will just get the payload of > > > > >> > > > > >> the > > > > >>>>> > > > > >>>>> user > > > > >>>>>>>>> > > > > >>>>>>>>> control > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> tuple. > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> For > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> example, if the user emits a Long, as a control > > > > >>>>> > > > > >>>>> tuple, > > > > >>>>>>> > > > > >>>>>>> the > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> payload > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> object > > > > >>>>>>>>>>>>>>>>>> will just be a Long object. > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> It is necessary to bundle this Long into some > > > > >>>>>>> > > > > >>>>>>> recognizable > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> object > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> so > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> that > > > > >>>>>>>>>>>>>>>>>> the BufferServerPublisher knows that this is a > > > > >>>>> > > > > >>>>> Control > > > > >>>>>>>> > > > > >>>>>>>> tuple > > > > >>>>>>>>>> > > > > >>>>>>>>>> and > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> not a > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> regular tuple and serialize it accordingly. It > > > > >>> > > > > >>> is > > > > >>>>>>>> > > > > >>>>>>>> therefore > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> necessary > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> that > > > > >>>>>>>>>>>>>>>>>> the tuple be part of some known hierarchy so > > > > >>> > > > > >>> that > > > > >>>>> > > > > >>>>> can > > > > >>>>>> > > > > >>>>>> be > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> distinguished > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> from > > > > >>>>>>>>>>>>>>>>>> other payload tuples. Let us call this class > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> ControlTupleInterface. > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> Note > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> that this needs to be done before the tuple is > > > > >>>>>> > > > > >>>>>> inserted > > > > >>>>>>>> > > > > >>>>>>>> into > > > > >>>>>>>>>> > > > > >>>>>>>>>> the > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> sink > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> which > > > > >>>>>>>>>>>>>>>>>> is done in the port objects. Once the tuple is > > > > >>>>>> > > > > >>>>>> inserted > > > > >>>>>>>> > > > > >>>>>>>> into > > > > >>>>>>>>>> > > > > >>>>>>>>>> the > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> sink, > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> it > > > > >>>>>>>>>>>>>>>>>> would seem just like any other payload tuple > > > > >> > > > > >> and > > > > >>>>>> > > > > >>>>>> cannot > > > > >>>>>>> > > > > >>>>>>> be > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> distinguished. > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> For this reason, I had something like the > > > > >>>> > > > > >>>> following > > > > >>>>> > > > > >>>>> in > > > > >>>>>>>> > > > > >>>>>>>> API: > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> package com.datatorrent.api; > > > > >>>>>>>>>>>>>>>>>> public class ControlTupleInterface > > > > >>>>>>>>>>>>>>>>>> { > > > > >>>>>>>>>>>>>>>>>> Object payload; // User control tuple > > > > >>>> > > > > >>>> payload. A > > > > >>>>>>>> > > > > >>>>>>>> Long() > > > > >>>>>>>>>> > > > > >>>>>>>>>> for > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> example. > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> UUID id; // Unique Id to de-duplicate in > > > > >>>>>> > > > > >>>>>> downstream > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> operators > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> } > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> Regarding your suggestion on using the Tuple > > > > >>> > > > > >>> class > > > > >>>>> > > > > >>>>> as > > > > >>>>>>> > > > > >>>>>>> the > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> wrapper > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> for > > > > > > > > > >
