+1 for having an immediate delivery mechanism as well. I would suggest that the other delivery mechanism stays at end of window, to be consistent, as I think it may be difficult to determine the last arrival of the tuple.
~ Bhupesh On Wed, Feb 15, 2017 at 7:04 AM, Pramod Immaneni <[email protected]> wrote: > There have been some recent developments and discussions on the schema side > (link below) that warrant a reconsideration of how control tuples get > delivered. > > http://apache.markmail.org/search/?q=apex+list%3Aorg. > apache.apex.dev+schema+discovery+support#query:apex% > 20list%3Aorg.apache.apex.dev%20schema%20discovery%20support+page:1+mid: > oaji26y3xfozap5v+state:results > > What I would like to suggest is that we allow two delivery options for > control tuples which can be configured on a per control tuple basis. First > is to deliver control tuple to the operator when the first instance of the > tuple arrives from any path. Second option is to deliver the control tuple > when the last instance of the tuple arrives from all the paths or at the > end window if it is going to be difficult to determine the last arrival. > The developer can choose the delivery option for the control tuple > preferably when the tuple is created. The first option will be useful for > scenarios like schema propagation or begin file in case of batch cases. The > second option will be useful for tuples like end file or end batch in batch > use cases. > > Thanks > > On Tue, Jan 10, 2017 at 12:27 PM, Bhupesh Chawda <[email protected]> > wrote: > > > Hi All, > > > > Based on some discussion here is what is planned for the propagation > > feature for control tuples. > > > > The signature of the *processControl()* method in > > *ControlAwareDefaultInputPort* which is implemented by the operator > > developer will be as follows: > > > > *public abstract boolean processControl(UserDefinedControlTuple > payload);* > > > > The boolean returned by the processControl() method indicates (to the > > engine) whether or not the operator is able to handle the control tuple > and > > wants to take care of the propagation of the control tuple. > > > > - If the method returns true - indicating it is able to handle the > > control tuple, the operator has to explicitly emit the control tuples > to > > the output ports it wishes to propagate to. > > > > > > - If the method returns false - indicating it is not able to handle > the > > control tuple, the control tuple will be propagated by the engine to > all > > output ports. > > > > The operator may even emit new control tuples in either of the cases. > > Note that for ports that are not control aware, the control tuple is > > propagated by default. > > > > We don't need any output port annotations or operator level attributes. > > > > ~ Bhupesh > > > > > > On Mon, Jan 9, 2017 at 5:16 PM, Tushar Gosavi <[email protected]> > > wrote: > > > > > On Sun, Jan 8, 2017 at 11:49 PM, Vlad Rozov <[email protected]> > > > wrote: > > > > +1 to manage propagation at an operator level. An operator is either > > > control > > > > tuple aware and needs to manage how control tuples are routed from > > input > > > > ports to output ports or it is not. In the later case it does not > > matter > > > how > > > > many input and output ports the operator has and it is the Apex > > platform > > > > responsibility to route control tuples. I don't see a use case where > an > > > > operator that is not aware of a control tuple needs to manage one or > > more > > > > input ports (or similar output ports) differently than others. > > > > > > > > > > The problem with giving explicit control to operator for routing of > > > custom tuples is how does the operator > > > developer knows about control tuple requirement for downstream > > > operators in an application. For example in following DAG > > > A -> B -> C > > > A - is my custom source operator which emits a new control tuple type > C1 > > > and C. > > > B - is operator from malhar which handle control tuple C. > > > C - is custom output operator which handles C1. > > > > > > If B is managing control tuples, then it needs to remember to foward > > > unhandled tuples on all output port, else it will block > > > the tuples for downstream operator which might need them, also if new > > > output port is added then B needs to send that tuples > > > on the new output port also. But In this case I can't simply extend B > > > as port objects are transient and mostly anonymous, > > > I can not extend these to send control tuples on new output port. In > > > my opinion we should let the control tuple flow through > > > entire DAG from their source and let each operator in the path to > > > handle/ignore them as required without blocking them. > > > > > > > > > > In general, an operator is aware only of a specific control tuple(s) > > (for > > > > example end of batch or end of file) and for a control tuples that it > > was > > > > not enabled for, the behavior should be exactly the same as if the > > > operator > > > > is not control tuple aware, meaning that those control tuples should > be > > > > propagated from input ports to output ports by the platform. There > > > should be > > > > an ability to let the platform know what control tuples an operator > is > > > aware > > > > of and can handle. This can be done both by API call and an > annotation. > > > > > > > > > > I think this will add overhead while developing applications. Operator > > > developer needs to add code to handle new control tuple also > > > need to update the part of code to register the type with engine. And > > > platfoms needs to perform type check and develiver the tuples > > > accordingly. Instead operator developer could check the type of > > > incoming tuple and handle it as required. > > > > > > - Tushar. > > > > > > > > > > Thank you, > > > > > > > > Vlad > > > > > > > > > > > > On 1/5/17 13:04, Bhupesh Chawda wrote: > > > >> > > > >> Agreed Thomas. > > > >> I was referring to the persona of the operator developer. The user > of > > > the > > > >> operator would not be doing anything related to the propagation of > > > control > > > >> tuples. Actually, the behavior of the operator wrt. propagation of > > > control > > > >> tuples would be part of the operator documentation. > > > >> > > > >> Also, we are providing options for the developer to route the flow > of > > > >> control tuples in code during the development of the operator. The > > > >> annotations would actually help achieve it in a easier way. > > > >> > > > >> ~ Bhupesh > > > >> > > > >> On Jan 5, 2017 21:40, "Thomas Weise" <[email protected]> wrote: > > > >> > > > >> I think it is important to be clear on the roles with regard to this > > > >> functionality. The user of the operator should not have to do > anything > > > to > > > >> get it to work. So while I suggested to consider attributes earlier, > > > there > > > >> should not be any need for the user to set those. The operator needs > > to > > > >> work as is. > > > >> > > > >> The persona concerned with propagation of control tuples is the > > operator > > > >> developer. I think the clear way for the operator developer to > > override > > > >> the > > > >> propagation behavior is in code and if that is possible there is no > > need > > > >> for other things such as attributes or other port level settings. > > > >> > > > >> Thomas > > > >> > > > >> > > > >> On Wed, Jan 4, 2017 at 10:20 PM, Bhupesh Chawda < > > > [email protected]> > > > >> wrote: > > > >> > > > >>> I think we all agree on the use case for selective propagation. The > > > >>> question is about where to have the control - at the operator level > > or > > > at > > > >>> the port level. > > > >>> > > > >>> For this ability, we have the following options: > > > >>> > > > >>> 1. Operator disables the propagation on selected output ports. > > > Other > > > >>> output ports propagate by default. > > > >>> 2. Operator disables propagation for the entire operator (by > > means > > > of > > > >> > > > >> an > > > >>> > > > >>> attribute). Operator developer explicitly emits the received > > > control > > > >>> tuples > > > >>> on selected output ports. > > > >>> > > > >>> If the decision is to completely block the propagation, then > Option 2 > > > is > > > >>> easier to use as just an attribute needs to be set, as opposed to > > > Option > > > >>> 1 > > > >>> where user needs to set the annotation on each output port. > > > >>> > > > >>> However, if selective propagation is needed, Option 1 would just > need > > > the > > > >>> user to disable propagation on certain ports; rest are propagated > by > > > >>> default, while Option 2 requires the user to explicitly emit the > > > control > > > >>> tuples. > > > >>> ~ Bhupesh > > > >>> > > > >>> > > > >>> On Thu, Jan 5, 2017 at 3:46 AM, Thomas Weise <[email protected]> > wrote: > > > >>> > > > >>>> Yes, I think that for any of these cases the operator developer > will > > > >> > > > >> turn > > > >>>> > > > >>>> of implicit propagation for the operator and then write the code > to > > > >> > > > >> route > > > >>>> > > > >>>> or create control tuples as needed. > > > >>>> > > > >>>> Thomas > > > >>>> > > > >>>> On Wed, Jan 4, 2017 at 12:59 PM, Amol Kekre <[email protected] > > > > > >>> > > > >>> wrote: > > > >>>>> > > > >>>>> I agree that by default the propagation must be implicit, i.e. if > > the > > > >>>>> operator does nothing, the control tuple propagates. I do think > > users > > > >>>>> should have control on deciding to "not propagate" or "create > new" > > > and > > > >>> > > > >>> in > > > >>>>> > > > >>>>> these cases they would need to do something explicit (override)? > > > >>>>> > > > >>>>> The following cases come to mind > > > >>>>> 1. Sole consumer of a particular control signal (for example end > of > > > >>> > > > >>> file) > > > >>>>> > > > >>>>> 2. Creator of a particular control signal (start of file, or a > > signal > > > >>> > > > >>> to > > > >>>>> > > > >>>>> pause on something etc.) > > > >>>>> 3. One port on a data pipeline and other port for meta-data > > pipeline > > > >>>>> > > > >>>>> In the above cases emit will be decided on an output port. #1 is > > only > > > >>>> > > > >>>> place > > > >>>>> > > > >>>>> where all output ports will disable the tuple, #2 and #3 most > > likely > > > >>> > > > >>> will > > > >>>>> > > > >>>>> be selective. > > > >>>>> > > > >>>>> Thks > > > >>>>> Amol > > > >>>>> > > > >>>>> > > > >>>>> On Wed, Jan 4, 2017 at 12:25 PM, Thomas Weise <[email protected]> > > > wrote: > > > >>>>> > > > >>>>>> I think there is (1) implicit propagation just like other > control > > > >>>> > > > >>>> tuples > > > >>>>>> > > > >>>>>> where the operator code isn't involved and (2) where the > operator > > > >>>>> > > > >>>>> developer > > > >>>>>> > > > >>>>>> wants to decide how control tuples are created or routed and > will > > > >>>> > > > >>>> receive > > > >>>>>> > > > >>>>>> and emit them on the output ports as desired. > > > >>>>>> > > > >>>>>> I don't see a use case for hybrid approaches? Maybe propagation > > does > > > >>>> > > > >>>> not > > > >>>>>> > > > >>>>>> need to be tied to ports at all, maybe just by annotation at the > > > >>>> > > > >>>> operator > > > >>>>>> > > > >>>>>> level? > > > >>>>>> > > > >>>>>> Thomas > > > >>>>>> > > > >>>>>> > > > >>>>>> On Wed, Jan 4, 2017 at 10:59 AM, Bhupesh Chawda < > > > >>>> > > > >>>> [email protected] > > > >>>>>> > > > >>>>>> wrote: > > > >>>>>> > > > >>>>>>> Wouldn't having this with output ports give a finer control on > > the > > > >>>>>>> propagation of control tuples? > > > >>>>>>> We might have an operator with two output ports each of which > > > >>> > > > >>> creates > > > >>>>> > > > >>>>> two > > > >>>>>>> > > > >>>>>>> different pipelines downstream. We would be able to say that > one > > > >>>>> > > > >>>>> pipeline > > > >>>>>>> > > > >>>>>>> gets the control tuples and the other doesn't. > > > >>>>>>> > > > >>>>>>> ~ Bhupesh > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> On Jan 4, 2017 11:55 PM, "Thomas Weise" <[email protected]> > wrote: > > > >>>>>>> > > > >>>>>>> I'm referring to the operator that needs to make the decision > to > > > >>>>>> > > > >>>>>> propagate > > > >>>>>>> > > > >>>>>>> or not. The tuples come from an input port, so it seems > > > >> > > > >> appropriate > > > >>>> > > > >>>> to > > > >>>>>> > > > >>>>>> say > > > >>>>>>> > > > >>>>>>> "don't propagate control tuples from this port". No matter how > > > >> > > > >> many > > > >>>>>> > > > >>>>>> output > > > >>>>>>> > > > >>>>>>> ports there are. > > > >>>>>>> > > > >>>>>>> Output ports are there for an operator to emit new tuples, in > the > > > >>>> > > > >>>> case > > > >>>>>> > > > >>>>>> you > > > >>>>>>> > > > >>>>>>> are discussing you don't emit new control tuples. > > > >>>>>>> > > > >>>>>>> Thomas > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> On Wed, Jan 4, 2017 at 9:39 AM, Bhupesh Chawda < > > > >>>>> > > > >>>>> [email protected]> > > > >>>>>>> > > > >>>>>>> wrote: > > > >>>>>>> > > > >>>>>>>> Hi Thomas, > > > >>>>>>>> > > > >>>>>>>> Are you suggesting an attribute on the input port for > > > >> > > > >> controlling > > > >>>> > > > >>>> the > > > >>>>>>>> > > > >>>>>>>> propagation of control tuples to downstream operators? > > > >>>>>>>> I think it should be better to do it on the output port since > > > >> > > > >> the > > > >>>>>>> > > > >>>>>>> decision > > > >>>>>>>> > > > >>>>>>>> to block the propagation will be made at the upstream operator > > > >>>> > > > >>>> rather > > > >>>>>>> > > > >>>>>>> than > > > >>>>>>>> > > > >>>>>>>> at the downstream. > > > >>>>>>>> Also, we need another way of controlling the propagation at > run > > > >>>> > > > >>>> time > > > >>>>>> > > > >>>>>> and > > > >>>>>>>> > > > >>>>>>>> hence I was thinking about the method call on the output port, > > > >> > > > >> in > > > >>>>>>> > > > >>>>>>> addition > > > >>>>>>>> > > > >>>>>>>> to the annotation on the output port (which is the static > way). > > > >>>>>>>> > > > >>>>>>>> Please correct me if I have misunderstood your question. > > > >>>>>>>> > > > >>>>>>>> ~ Bhupesh > > > >>>>>>>> > > > >>>>>>>> On Wed, Jan 4, 2017 at 7:26 PM, Thomas Weise <[email protected]> > > > >>>> > > > >>>> wrote: > > > >>>>>>>>> > > > >>>>>>>>> Wouldn't it be more intuitive to control this with an > > > >> > > > >> attribute > > > >>>> > > > >>>> on > > > >>>>>> > > > >>>>>> the > > > >>>>>>>>> > > > >>>>>>>>> input port? > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> On Tue, Jan 3, 2017 at 11:06 PM, Bhupesh Chawda < > > > >>>>>>> > > > >>>>>>> [email protected] > > > >>>>>>>>> > > > >>>>>>>>> wrote: > > > >>>>>>>>> > > > >>>>>>>>>> Hi Pramod, > > > >>>>>>>>>> > > > >>>>>>>>>> I was thinking of a method setPropagateControlTuples( > boolean > > > >>>>>>>> > > > >>>>>>>> propagate) > > > >>>>>>>>> > > > >>>>>>>>> on > > > >>>>>>>>>> > > > >>>>>>>>>> the output port of the operator. > > > >>>>>>>>>> The operator could disable this in the code at any point of > > > >>>> > > > >>>> time. > > > >>>>>>>>>> > > > >>>>>>>>>> Note however that this is to block the propagation of > > > >> > > > >> control > > > >>>>>> > > > >>>>>> tuples > > > >>>>>>>> > > > >>>>>>>> from > > > >>>>>>>>>> > > > >>>>>>>>>> upstream. Any control tuples emitted explicitly by the > > > >>> > > > >>> operator > > > >>>>>> > > > >>>>>> would > > > >>>>>>>>> > > > >>>>>>>>> still > > > >>>>>>>>>> > > > >>>>>>>>>> be emitted and sent to the downstream operators. > > > >>>>>>>>>> > > > >>>>>>>>>> Please see > > > >>>>>>>>>> https://github.com/apache/apex-core/pull/440/files#diff- > > > >>>>>>>>>> 8aa0ca1a3e645fa60e9b376c118c00a3R68 > > > >>>>>>>>>> in the PR. > > > >>>>>>>>>> > > > >>>>>>>>>> ~ Bhupesh > > > >>>>>>>>>> > > > >>>>>>>>>> On Wed, Jan 4, 2017 at 6:53 AM, Pramod Immaneni < > > > >>>>>>>> > > > >>>>>>>> [email protected]> > > > >>>>>>>>>> > > > >>>>>>>>>> wrote: > > > >>>>>>>>>> > > > >>>>>>>>>>> 2 sounds good. Have you thought about what the method > > > >> > > > >> would > > > >>>>> > > > >>>>> look > > > >>>>>>>> > > > >>>>>>>> like. > > > >>>>>>>>>>> > > > >>>>>>>>>>> On Sat, Dec 31, 2016 at 8:29 PM, Bhupesh Chawda < > > > >>>>>>>>> > > > >>>>>>>>> [email protected] > > > >>>>>>>>>>> > > > >>>>>>>>>>> wrote: > > > >>>>>>>>>>> > > > >>>>>>>>>>>> Yes, that makes sense. > > > >>>>>>>>>>>> We have following options: > > > >>>>>>>>>>>> 1. Make the annotation false by default and force the > > > >>> > > > >>> user > > > >>>> > > > >>>> to > > > >>>>>>>> > > > >>>>>>>> forward > > > >>>>>>>>>> > > > >>>>>>>>>> the > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> control tuples explicitly. > > > >>>>>>>>>>>> 2. Annotation is true by default and static way of > > > >>> > > > >>> blocking > > > >>>>>> > > > >>>>>> stays > > > >>>>>>>> > > > >>>>>>>> as > > > >>>>>>>>> > > > >>>>>>>>> it > > > >>>>>>>>>>> > > > >>>>>>>>>>> is. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> We provide another way for blocking programmatically, > > > >>>> > > > >>>> perhaps > > > >>>>>> > > > >>>>>> by > > > >>>>>>>>> > > > >>>>>>>>> means > > > >>>>>>>>>> > > > >>>>>>>>>> of > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> another method call on the port. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> ~ Bhupesh > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> On Dec 30, 2016 00:09, "Pramod Immaneni" < > > > >>>>>> > > > >>>>>> [email protected] > > > >>>>>>>>>> > > > >>>>>>>>>> wrote: > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Bhupesh, > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Annotation seems like a static way to stop > > > >> > > > >> propagation. > > > >>>>> > > > >>>>> Give > > > >>>>>>>> > > > >>>>>>>> these > > > >>>>>>>>>> > > > >>>>>>>>>> are > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> programmatically generated I would think the operators > > > >>>>> > > > >>>>> should > > > >>>>>>> > > > >>>>>>> be > > > >>>>>>>>> > > > >>>>>>>>> able > > > >>>>>>>>>>> > > > >>>>>>>>>>> to > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> stop (consume without propagating) programmatically as > > > >>>>> > > > >>>>> well. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Thanks > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> On Thu, Dec 29, 2016 at 8:48 AM, Bhupesh Chawda < > > > >>>>>>>>>>> > > > >>>>>>>>>>> [email protected] > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Thanks Vlad, I am trying out the approach you > > > >>> > > > >>> mentioned > > > >>>>>>>> > > > >>>>>>>> regarding > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> having > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> another interface which allows sinks to put a > > > >> > > > >> control > > > >>>>>> > > > >>>>>> tuple. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Regarding the delivery of control tuples, here is > > > >>> > > > >>> what > > > >>>> > > > >>>> I > > > >>>>> > > > >>>>> am > > > >>>>>>>>>> > > > >>>>>>>>>> planning > > > >>>>>>>>>>> > > > >>>>>>>>>>> to > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> do: > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> All the control tuples which are emitted in a > > > >>>> > > > >>>> particular > > > >>>>>>> > > > >>>>>>> window > > > >>>>>>>>> > > > >>>>>>>>> are > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> delivered after all the data tuples have been > > > >>> > > > >>> delivered > > > >>>>> > > > >>>>> to > > > >>>>>>> > > > >>>>>>> the > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> respective > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> ports, but before the endWindow() call. The operator > > > >>>> > > > >>>> can > > > >>>>>> > > > >>>>>> then > > > >>>>>>>>>> > > > >>>>>>>>>> process > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> the > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> control tuples in that window and can do any > > > >>>> > > > >>>> finalization > > > >>>>>> > > > >>>>>> in > > > >>>>>>>> > > > >>>>>>>> the > > > >>>>>>>>>> > > > >>>>>>>>>> end > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> window > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> call. There will be no delivery of control tuples > > > >>> > > > >>> after > > > >>>>>>>>> > > > >>>>>>>>> endWindow() > > > >>>>>>>>>>> > > > >>>>>>>>>>> and > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> before the next beginWindow() call. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> For handling the propagation of control tuples > > > >>> > > > >>> further > > > >>>> > > > >>>> in > > > >>>>>> > > > >>>>>> the > > > >>>>>>>>> > > > >>>>>>>>> dag, > > > >>>>>>>>>> > > > >>>>>>>>>> we > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> are > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> planning to have an annotation on the Output Port of > > > >>>> > > > >>>> the > > > >>>>>>>> > > > >>>>>>>> operator > > > >>>>>>>>>>> > > > >>>>>>>>>>> which > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> would be true by default. > > > >>>>>>>>>>>>>> @OutputPortFieldAnnotation(propogateControlTuples = > > > >>>>>> > > > >>>>>> false). > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> ~ Bhupesh > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> On Thu, Dec 29, 2016 at 6:24 AM, Vlad Rozov < > > > >>>>>>>>>> > > > >>>>>>>>>> [email protected] > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Custom control tuples are control tuples emitted > > > >> > > > >> by > > > >>>> > > > >>>> an > > > >>>>>>>> > > > >>>>>>>> operator > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> itself > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> and > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> not by the platform. Prior to the introduction of > > > >>> > > > >>> the > > > >>>>>>> > > > >>>>>>> custom > > > >>>>>>>>>>> > > > >>>>>>>>>>> control > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> tuples, only Apex engine itself puts control > > > >> > > > >> tuples > > > >>>>> > > > >>>>> into > > > >>>>>>>>> > > > >>>>>>>>> various > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> sinks, > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> so > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> the engine created necessary Tuple objects with > > > >> > > > >> the > > > >>>>>>>>> > > > >>>>>>>>> corresponding > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> type > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> prior to calling Sink.put(). > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Not all sinks need to be changed. Only control > > > >>> > > > >>> tuple > > > >>>>>> > > > >>>>>> aware > > > >>>>>>>>> > > > >>>>>>>>> sinks > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> should > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> provide such functionality. In the case there is a > > > >>>> > > > >>>> lot > > > >>>>> > > > >>>>> of > > > >>>>>>>> > > > >>>>>>>> code > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> duplication, > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> please create an abstract class, that other > > > >> > > > >> control > > > >>>>> > > > >>>>> aware > > > >>>>>>>> > > > >>>>>>>> sinks > > > >>>>>>>>>>> > > > >>>>>>>>>>> will > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> extend > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> from. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Thank you, > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Vlad > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> On 12/23/16 06:24, Bhupesh Chawda wrote: > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> Hi Vlad, > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> Thanks for the pointer on delegating the wrapping > > > >>> > > > >>> of > > > >>>>> > > > >>>>> the > > > >>>>>>>> > > > >>>>>>>> user > > > >>>>>>>>>>> > > > >>>>>>>>>>> tuple > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> to > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> control port. I was trying this out today. > > > >>>>>>>>>>>>>>>> The problem I see us if we introduce a > > > >>>>> > > > >>>>> putControlTuple() > > > >>>>>>>>> > > > >>>>>>>>> method > > > >>>>>>>>>> > > > >>>>>>>>>> in > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Sink, > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> then a lot of the existing sinks would change. > > > >>> > > > >>> Also > > > >>>>> > > > >>>>> the > > > >>>>>>>>> > > > >>>>>>>>> changes > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> seemed > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> redundant as, the existing control tuples already > > > >>>> > > > >>>> use > > > >>>>>> > > > >>>>>> the > > > >>>>>>>>> > > > >>>>>>>>> put() > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> method > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> of > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> sinks. So why do something special for custom > > > >>>> > > > >>>> control > > > >>>>>>>> > > > >>>>>>>> tuples? > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> The only aspect in which the custom control > > > >> > > > >> tuples > > > >>>> > > > >>>> are > > > >>>>>>>>> > > > >>>>>>>>> different > > > >>>>>>>>>>> > > > >>>>>>>>>>> is > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> that > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> these will be generated by the user and will > > > >>>> > > > >>>> actually > > > >>>>> > > > >>>>> be > > > >>>>>>>>>> > > > >>>>>>>>>> delivered > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> to > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> ports in a different order. Perhaps we should be > > > >>>> > > > >>>> able > > > >>>>> > > > >>>>> to > > > >>>>>>> > > > >>>>>>> use > > > >>>>>>>>> > > > >>>>>>>>> the > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> existing > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> flow. The only problems as outlined before seem > > > >> > > > >> to > > > >>>> > > > >>>> be > > > >>>>>>>>>>> > > > >>>>>>>>>>> identification > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> of > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>> user tuple as a control tuple. > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> ~ Bhupesh > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> On Thu, Dec 22, 2016 at 10:44 PM, Vlad Rozov < > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> [email protected] > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> Why is it necessary to wrap in the OutputPort? > > > >>> > > > >>> Can't > > > >>>>> > > > >>>>> it > > > >>>>>> > > > >>>>>> be > > > >>>>>>>>>>> > > > >>>>>>>>>>> delegated > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> to > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> a > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> Sink by introducing new putControlTuple method? > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> Thank you, > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> Vlad > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> On 12/21/16 22:10, Bhupesh Chawda wrote: > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> Hi Vlad, > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> The problem in using the Tuple class as the > > > >>>> > > > >>>> wrapper > > > >>>>> > > > >>>>> is > > > >>>>>>>> > > > >>>>>>>> that > > > >>>>>>>>>> > > > >>>>>>>>>> the > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Ports > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> belong to the API and we want to wrap the > > > >>> > > > >>> payload > > > >>>>>> > > > >>>>>> object > > > >>>>>>>> > > > >>>>>>>> of > > > >>>>>>>>>> > > > >>>>>>>>>> the > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> control > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> tuple into the Tuple class which is not part of > > > >>>> > > > >>>> the > > > >>>>>> > > > >>>>>> API. > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> The output port will just get the payload of > > > >> > > > >> the > > > >>>>> > > > >>>>> user > > > >>>>>>>>> > > > >>>>>>>>> control > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> tuple. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> For > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> example, if the user emits a Long, as a control > > > >>>>> > > > >>>>> tuple, > > > >>>>>>> > > > >>>>>>> the > > > >>>>>>>>>>> > > > >>>>>>>>>>> payload > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> object > > > >>>>>>>>>>>>>>>>>> will just be a Long object. > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> It is necessary to bundle this Long into some > > > >>>>>>> > > > >>>>>>> recognizable > > > >>>>>>>>>>> > > > >>>>>>>>>>> object > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> so > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> that > > > >>>>>>>>>>>>>>>>>> the BufferServerPublisher knows that this is a > > > >>>>> > > > >>>>> Control > > > >>>>>>>> > > > >>>>>>>> tuple > > > >>>>>>>>>> > > > >>>>>>>>>> and > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> not a > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> regular tuple and serialize it accordingly. It > > > >>> > > > >>> is > > > >>>>>>>> > > > >>>>>>>> therefore > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> necessary > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> that > > > >>>>>>>>>>>>>>>>>> the tuple be part of some known hierarchy so > > > >>> > > > >>> that > > > >>>>> > > > >>>>> can > > > >>>>>> > > > >>>>>> be > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> distinguished > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> from > > > >>>>>>>>>>>>>>>>>> other payload tuples. Let us call this class > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> ControlTupleInterface. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Note > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> that this needs to be done before the tuple is > > > >>>>>> > > > >>>>>> inserted > > > >>>>>>>> > > > >>>>>>>> into > > > >>>>>>>>>> > > > >>>>>>>>>> the > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> sink > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> which > > > >>>>>>>>>>>>>>>>>> is done in the port objects. Once the tuple is > > > >>>>>> > > > >>>>>> inserted > > > >>>>>>>> > > > >>>>>>>> into > > > >>>>>>>>>> > > > >>>>>>>>>> the > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> sink, > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> it > > > >>>>>>>>>>>>>>>>>> would seem just like any other payload tuple > > > >> > > > >> and > > > >>>>>> > > > >>>>>> cannot > > > >>>>>>> > > > >>>>>>> be > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> distinguished. > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> For this reason, I had something like the > > > >>>> > > > >>>> following > > > >>>>> > > > >>>>> in > > > >>>>>>>> > > > >>>>>>>> API: > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> package com.datatorrent.api; > > > >>>>>>>>>>>>>>>>>> public class ControlTupleInterface > > > >>>>>>>>>>>>>>>>>> { > > > >>>>>>>>>>>>>>>>>> Object payload; // User control tuple > > > >>>> > > > >>>> payload. A > > > >>>>>>>> > > > >>>>>>>> Long() > > > >>>>>>>>>> > > > >>>>>>>>>> for > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> example. > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> UUID id; // Unique Id to de-duplicate in > > > >>>>>> > > > >>>>>> downstream > > > >>>>>>>>>>> > > > >>>>>>>>>>> operators > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> } > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> Regarding your suggestion on using the Tuple > > > >>> > > > >>> class > > > >>>>> > > > >>>>> as > > > >>>>>>> > > > >>>>>>> the > > > >>>>>>>>>>> > > > >>>>>>>>>>> wrapper > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> for > > > > > >
