This is needed, the batch start-end have similar semantics as start-end
window from operational/functional perspective.

Thks
Amol


*Join us at Apex Big Data World-San Jose
<http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
[image: http://www.apexbigdata.com/san-jose-register.html]
<http://www.apexbigdata.com/san-jose-register.html>

On Tue, Feb 14, 2017 at 9:55 PM, Bhupesh Chawda <[email protected]>
wrote:

> +1 for having an immediate delivery mechanism as well.
>
> I would suggest that the other delivery mechanism stays at end of window,
> to be consistent, as I think it may be difficult to determine the last
> arrival of the tuple.
>
> ~ Bhupesh
>
> On Wed, Feb 15, 2017 at 7:04 AM, Pramod Immaneni <[email protected]>
> wrote:
>
> > There have been some recent developments and discussions on the schema
> side
> > (link below) that warrant a reconsideration of how control tuples get
> > delivered.
> >
> > http://apache.markmail.org/search/?q=apex+list%3Aorg.
> > apache.apex.dev+schema+discovery+support#query:apex%
> > 20list%3Aorg.apache.apex.dev%20schema%20discovery%20support+page:1+mid:
> > oaji26y3xfozap5v+state:results
> >
> > What I would like to suggest is that we allow two delivery options for
> > control tuples which can be configured on a per control tuple basis.
> First
> > is to deliver control tuple to the operator when the first instance of
> the
> > tuple arrives from any path. Second option is to deliver the control
> tuple
> > when the last instance of the tuple arrives from all the paths or at the
> > end window if it is going to be difficult to determine the last arrival.
> > The developer can choose the delivery option for the control tuple
> > preferably when the tuple is created. The first option will be useful for
> > scenarios like schema propagation or begin file in case of batch cases.
> The
> > second option will be useful for tuples like end file or end batch in
> batch
> > use cases.
> >
> > Thanks
> >
> > On Tue, Jan 10, 2017 at 12:27 PM, Bhupesh Chawda <
> [email protected]>
> > wrote:
> >
> > > Hi All,
> > >
> > > Based on some discussion here is what is planned for the propagation
> > > feature for control tuples.
> > >
> > > The signature of the *processControl()* method in
> > > *ControlAwareDefaultInputPort* which is implemented by the operator
> > > developer will be as follows:
> > >
> > > *public abstract boolean processControl(UserDefinedControlTuple
> > payload);*
> > >
> > > The boolean returned by the processControl() method indicates (to the
> > > engine) whether or not the operator is able to handle the control tuple
> > and
> > > wants to take care of the propagation of the control tuple.
> > >
> > >    - If the method returns true - indicating it is able to handle the
> > >    control tuple, the operator has to explicitly emit the control
> tuples
> > to
> > >    the output ports it wishes to propagate to.
> > >
> > >
> > >    - If the method returns false - indicating it is not able to handle
> > the
> > >    control tuple, the control tuple will be propagated by the engine to
> > all
> > >    output ports.
> > >
> > > The operator may even emit new control tuples in either of the cases.
> > > Note that for ports that are not control aware, the control tuple is
> > > propagated by default.
> > >
> > > We don't need any output port annotations or operator level attributes.
> > >
> > > ~ Bhupesh
> > >
> > >
> > > On Mon, Jan 9, 2017 at 5:16 PM, Tushar Gosavi <[email protected]>
> > > wrote:
> > >
> > > > On Sun, Jan 8, 2017 at 11:49 PM, Vlad Rozov <[email protected]
> >
> > > > wrote:
> > > > > +1 to manage propagation at an operator level. An operator is
> either
> > > > control
> > > > > tuple aware and needs to manage how control tuples are routed from
> > > input
> > > > > ports to output ports or it is not. In the later case it does not
> > > matter
> > > > how
> > > > > many input and output ports the operator has and it is the Apex
> > > platform
> > > > > responsibility to route control tuples. I don't see a use case
> where
> > an
> > > > > operator that is not aware of a control tuple needs to manage one
> or
> > > more
> > > > > input ports (or similar output ports) differently than others.
> > > > >
> > > >
> > > > The problem with giving explicit control to operator for routing of
> > > > custom tuples is how does the operator
> > > > developer knows about control tuple requirement for downstream
> > > > operators in an application. For example in following DAG
> > > > A -> B -> C
> > > > A - is my custom source operator which emits a new control tuple type
> > C1
> > > > and C.
> > > > B - is operator from malhar which handle control tuple C.
> > > > C - is custom output operator which handles C1.
> > > >
> > > > If B is managing control tuples, then it needs to remember to foward
> > > > unhandled tuples on all output port, else it will block
> > > > the tuples for downstream operator which might need them, also if new
> > > > output port is added then B needs to send that tuples
> > > > on the new output port also. But In this case I can't simply extend B
> > > > as port objects are transient and mostly anonymous,
> > > > I can not extend these to send control tuples on new output port. In
> > > > my opinion we should let the control tuple flow through
> > > > entire DAG from their source and let each operator in the path to
> > > > handle/ignore them as required without blocking them.
> > > >
> > > >
> > > > > In general, an operator is aware only of a specific control
> tuple(s)
> > > (for
> > > > > example end of batch or end of file) and for a control tuples that
> it
> > > was
> > > > > not enabled for, the behavior should be exactly the same as if the
> > > > operator
> > > > > is not control tuple aware, meaning that those control tuples
> should
> > be
> > > > > propagated from input ports to output ports by the platform. There
> > > > should be
> > > > > an ability to let the platform know what control tuples an operator
> > is
> > > > aware
> > > > > of and can handle. This can be done both by API call and an
> > annotation.
> > > > >
> > > >
> > > > I think this will add overhead while developing applications.
> Operator
> > > > developer needs to add code to handle new control tuple also
> > > > need to update the part of code to register the type with engine. And
> > > > platfoms needs to perform type check and develiver the tuples
> > > > accordingly. Instead operator developer could check the type of
> > > > incoming tuple and handle it as required.
> > > >
> > > > - Tushar.
> > > >
> > > >
> > > > > Thank you,
> > > > >
> > > > > Vlad
> > > > >
> > > > >
> > > > > On 1/5/17 13:04, Bhupesh Chawda wrote:
> > > > >>
> > > > >> Agreed Thomas.
> > > > >> I was referring to the persona of the operator developer. The user
> > of
> > > > the
> > > > >> operator would not be doing anything related to the propagation of
> > > > control
> > > > >> tuples. Actually, the behavior of the operator wrt. propagation of
> > > > control
> > > > >> tuples would be part of the operator documentation.
> > > > >>
> > > > >> Also, we are providing options for the developer to route the flow
> > of
> > > > >> control tuples in code during the development of the operator. The
> > > > >> annotations would actually help achieve it in a easier way.
> > > > >>
> > > > >> ~ Bhupesh
> > > > >>
> > > > >> On Jan 5, 2017 21:40, "Thomas Weise" <[email protected]> wrote:
> > > > >>
> > > > >> I think it is important to be clear on the roles with regard to
> this
> > > > >> functionality. The user of the operator should not have to do
> > anything
> > > > to
> > > > >> get it to work. So while I suggested to consider attributes
> earlier,
> > > > there
> > > > >> should not be any need for the user to set those. The operator
> needs
> > > to
> > > > >> work as is.
> > > > >>
> > > > >> The persona concerned with propagation of control tuples is the
> > > operator
> > > > >> developer. I think the clear way for the operator developer to
> > > override
> > > > >> the
> > > > >> propagation behavior is in code and if that is possible there is
> no
> > > need
> > > > >> for other things such as attributes or other port level settings.
> > > > >>
> > > > >> Thomas
> > > > >>
> > > > >>
> > > > >> On Wed, Jan 4, 2017 at 10:20 PM, Bhupesh Chawda <
> > > > [email protected]>
> > > > >> wrote:
> > > > >>
> > > > >>> I think we all agree on the use case for selective propagation.
> The
> > > > >>> question is about where to have the control - at the operator
> level
> > > or
> > > > at
> > > > >>> the port level.
> > > > >>>
> > > > >>> For this ability, we have the following options:
> > > > >>>
> > > > >>>     1. Operator disables the propagation on selected output
> ports.
> > > > Other
> > > > >>>     output ports propagate by default.
> > > > >>>     2. Operator disables propagation for the entire operator (by
> > > means
> > > > of
> > > > >>
> > > > >> an
> > > > >>>
> > > > >>>     attribute). Operator developer explicitly emits the received
> > > > control
> > > > >>> tuples
> > > > >>>     on selected output ports.
> > > > >>>
> > > > >>> If the decision is to completely block the propagation, then
> > Option 2
> > > > is
> > > > >>> easier to use as just an attribute needs to be set, as opposed to
> > > > Option
> > > > >>> 1
> > > > >>> where user needs to set the annotation on each output port.
> > > > >>>
> > > > >>> However, if selective propagation is needed, Option 1 would just
> > need
> > > > the
> > > > >>> user to disable propagation on certain ports; rest are propagated
> > by
> > > > >>> default, while Option 2 requires the user to explicitly emit the
> > > > control
> > > > >>> tuples.
> > > > >>> ~ Bhupesh
> > > > >>>
> > > > >>>
> > > > >>> On Thu, Jan 5, 2017 at 3:46 AM, Thomas Weise <[email protected]>
> > wrote:
> > > > >>>
> > > > >>>> Yes, I think that for any of these cases the operator developer
> > will
> > > > >>
> > > > >> turn
> > > > >>>>
> > > > >>>> of implicit propagation for the operator and then write the code
> > to
> > > > >>
> > > > >> route
> > > > >>>>
> > > > >>>> or create control tuples as needed.
> > > > >>>>
> > > > >>>> Thomas
> > > > >>>>
> > > > >>>> On Wed, Jan 4, 2017 at 12:59 PM, Amol Kekre <
> [email protected]
> > >
> > > > >>>
> > > > >>> wrote:
> > > > >>>>>
> > > > >>>>> I agree that by default the propagation must be implicit, i.e.
> if
> > > the
> > > > >>>>> operator does nothing, the control tuple propagates. I do think
> > > users
> > > > >>>>> should have control on deciding to "not propagate" or "create
> > new"
> > > > and
> > > > >>>
> > > > >>> in
> > > > >>>>>
> > > > >>>>> these cases they would need to do something explicit
> (override)?
> > > > >>>>>
> > > > >>>>> The following cases come to mind
> > > > >>>>> 1. Sole consumer of a particular control signal (for example
> end
> > of
> > > > >>>
> > > > >>> file)
> > > > >>>>>
> > > > >>>>> 2. Creator of a particular control signal (start of file, or a
> > > signal
> > > > >>>
> > > > >>> to
> > > > >>>>>
> > > > >>>>> pause on something etc.)
> > > > >>>>> 3. One port on a data pipeline and other port for meta-data
> > > pipeline
> > > > >>>>>
> > > > >>>>> In the above cases emit will be decided on an output port. #1
> is
> > > only
> > > > >>>>
> > > > >>>> place
> > > > >>>>>
> > > > >>>>> where all output ports will disable the tuple, #2 and #3 most
> > > likely
> > > > >>>
> > > > >>> will
> > > > >>>>>
> > > > >>>>> be selective.
> > > > >>>>>
> > > > >>>>> Thks
> > > > >>>>> Amol
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> On Wed, Jan 4, 2017 at 12:25 PM, Thomas Weise <[email protected]>
> > > > wrote:
> > > > >>>>>
> > > > >>>>>> I think there is (1) implicit propagation just like other
> > control
> > > > >>>>
> > > > >>>> tuples
> > > > >>>>>>
> > > > >>>>>> where the operator code isn't involved and (2) where the
> > operator
> > > > >>>>>
> > > > >>>>> developer
> > > > >>>>>>
> > > > >>>>>> wants to decide how control tuples are created or routed and
> > will
> > > > >>>>
> > > > >>>> receive
> > > > >>>>>>
> > > > >>>>>> and emit them on the output ports as desired.
> > > > >>>>>>
> > > > >>>>>> I don't see a use case for hybrid approaches? Maybe
> propagation
> > > does
> > > > >>>>
> > > > >>>> not
> > > > >>>>>>
> > > > >>>>>> need to be tied to ports at all, maybe just by annotation at
> the
> > > > >>>>
> > > > >>>> operator
> > > > >>>>>>
> > > > >>>>>> level?
> > > > >>>>>>
> > > > >>>>>> Thomas
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> On Wed, Jan 4, 2017 at 10:59 AM, Bhupesh Chawda <
> > > > >>>>
> > > > >>>> [email protected]
> > > > >>>>>>
> > > > >>>>>> wrote:
> > > > >>>>>>
> > > > >>>>>>> Wouldn't having this with output ports give a finer control
> on
> > > the
> > > > >>>>>>> propagation of control tuples?
> > > > >>>>>>> We might have an operator with two output ports each of which
> > > > >>>
> > > > >>> creates
> > > > >>>>>
> > > > >>>>> two
> > > > >>>>>>>
> > > > >>>>>>> different pipelines downstream. We would be able to say that
> > one
> > > > >>>>>
> > > > >>>>> pipeline
> > > > >>>>>>>
> > > > >>>>>>> gets the control tuples and the other doesn't.
> > > > >>>>>>>
> > > > >>>>>>> ~ Bhupesh
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> On Jan 4, 2017 11:55 PM, "Thomas Weise" <[email protected]>
> > wrote:
> > > > >>>>>>>
> > > > >>>>>>> I'm referring to the operator that needs to make the decision
> > to
> > > > >>>>>>
> > > > >>>>>> propagate
> > > > >>>>>>>
> > > > >>>>>>> or not. The tuples come from an input port, so it seems
> > > > >>
> > > > >> appropriate
> > > > >>>>
> > > > >>>> to
> > > > >>>>>>
> > > > >>>>>> say
> > > > >>>>>>>
> > > > >>>>>>> "don't propagate control tuples from this port". No matter
> how
> > > > >>
> > > > >> many
> > > > >>>>>>
> > > > >>>>>> output
> > > > >>>>>>>
> > > > >>>>>>> ports there are.
> > > > >>>>>>>
> > > > >>>>>>> Output ports are there for an operator to emit new tuples, in
> > the
> > > > >>>>
> > > > >>>> case
> > > > >>>>>>
> > > > >>>>>> you
> > > > >>>>>>>
> > > > >>>>>>> are discussing you don't emit new control tuples.
> > > > >>>>>>>
> > > > >>>>>>> Thomas
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> On Wed, Jan 4, 2017 at 9:39 AM, Bhupesh Chawda <
> > > > >>>>>
> > > > >>>>> [email protected]>
> > > > >>>>>>>
> > > > >>>>>>> wrote:
> > > > >>>>>>>
> > > > >>>>>>>> Hi Thomas,
> > > > >>>>>>>>
> > > > >>>>>>>> Are you suggesting an attribute on the input port for
> > > > >>
> > > > >> controlling
> > > > >>>>
> > > > >>>> the
> > > > >>>>>>>>
> > > > >>>>>>>> propagation of control tuples to downstream operators?
> > > > >>>>>>>> I think it should be better to do it on the output port
> since
> > > > >>
> > > > >> the
> > > > >>>>>>>
> > > > >>>>>>> decision
> > > > >>>>>>>>
> > > > >>>>>>>> to block the propagation will be made at the upstream
> operator
> > > > >>>>
> > > > >>>> rather
> > > > >>>>>>>
> > > > >>>>>>> than
> > > > >>>>>>>>
> > > > >>>>>>>> at the downstream.
> > > > >>>>>>>> Also, we need another way of controlling the propagation at
> > run
> > > > >>>>
> > > > >>>> time
> > > > >>>>>>
> > > > >>>>>> and
> > > > >>>>>>>>
> > > > >>>>>>>> hence I was thinking about the method call on the output
> port,
> > > > >>
> > > > >> in
> > > > >>>>>>>
> > > > >>>>>>> addition
> > > > >>>>>>>>
> > > > >>>>>>>> to the annotation on the output port (which is the static
> > way).
> > > > >>>>>>>>
> > > > >>>>>>>> Please correct me if I have misunderstood your question.
> > > > >>>>>>>>
> > > > >>>>>>>> ~ Bhupesh
> > > > >>>>>>>>
> > > > >>>>>>>> On Wed, Jan 4, 2017 at 7:26 PM, Thomas Weise <
> [email protected]>
> > > > >>>>
> > > > >>>> wrote:
> > > > >>>>>>>>>
> > > > >>>>>>>>> Wouldn't it be more intuitive to control this with an
> > > > >>
> > > > >> attribute
> > > > >>>>
> > > > >>>> on
> > > > >>>>>>
> > > > >>>>>> the
> > > > >>>>>>>>>
> > > > >>>>>>>>> input port?
> > > > >>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>> On Tue, Jan 3, 2017 at 11:06 PM, Bhupesh Chawda <
> > > > >>>>>>>
> > > > >>>>>>> [email protected]
> > > > >>>>>>>>>
> > > > >>>>>>>>> wrote:
> > > > >>>>>>>>>
> > > > >>>>>>>>>> Hi Pramod,
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> I was thinking of a method setPropagateControlTuples(
> > boolean
> > > > >>>>>>>>
> > > > >>>>>>>> propagate)
> > > > >>>>>>>>>
> > > > >>>>>>>>> on
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> the output port of the operator.
> > > > >>>>>>>>>> The operator could disable this in the code at any point
> of
> > > > >>>>
> > > > >>>> time.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> Note however that this is to block the propagation of
> > > > >>
> > > > >> control
> > > > >>>>>>
> > > > >>>>>> tuples
> > > > >>>>>>>>
> > > > >>>>>>>> from
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> upstream. Any control tuples emitted explicitly by the
> > > > >>>
> > > > >>> operator
> > > > >>>>>>
> > > > >>>>>> would
> > > > >>>>>>>>>
> > > > >>>>>>>>> still
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> be emitted and sent to the downstream operators.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> Please see
> > > > >>>>>>>>>> https://github.com/apache/apex-core/pull/440/files#diff-
> > > > >>>>>>>>>> 8aa0ca1a3e645fa60e9b376c118c00a3R68
> > > > >>>>>>>>>> in the PR.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> ~ Bhupesh
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> On Wed, Jan 4, 2017 at 6:53 AM, Pramod Immaneni <
> > > > >>>>>>>>
> > > > >>>>>>>> [email protected]>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> wrote:
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> 2 sounds good. Have you thought about what the method
> > > > >>
> > > > >> would
> > > > >>>>>
> > > > >>>>> look
> > > > >>>>>>>>
> > > > >>>>>>>> like.
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> On Sat, Dec 31, 2016 at 8:29 PM, Bhupesh Chawda <
> > > > >>>>>>>>>
> > > > >>>>>>>>> [email protected]
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>>> Yes, that makes sense.
> > > > >>>>>>>>>>>> We have following options:
> > > > >>>>>>>>>>>> 1. Make the annotation false by default and force the
> > > > >>>
> > > > >>> user
> > > > >>>>
> > > > >>>> to
> > > > >>>>>>>>
> > > > >>>>>>>> forward
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> the
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> control tuples explicitly.
> > > > >>>>>>>>>>>> 2. Annotation is true by default and static way of
> > > > >>>
> > > > >>> blocking
> > > > >>>>>>
> > > > >>>>>> stays
> > > > >>>>>>>>
> > > > >>>>>>>> as
> > > > >>>>>>>>>
> > > > >>>>>>>>> it
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> is.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> We provide another way for blocking programmatically,
> > > > >>>>
> > > > >>>> perhaps
> > > > >>>>>>
> > > > >>>>>> by
> > > > >>>>>>>>>
> > > > >>>>>>>>> means
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> of
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> another method call on the port.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> ~ Bhupesh
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> On Dec 30, 2016 00:09, "Pramod Immaneni" <
> > > > >>>>>>
> > > > >>>>>> [email protected]
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Bhupesh,
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Annotation seems like a static way to stop
> > > > >>
> > > > >> propagation.
> > > > >>>>>
> > > > >>>>> Give
> > > > >>>>>>>>
> > > > >>>>>>>> these
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> are
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> programmatically generated I would think the operators
> > > > >>>>>
> > > > >>>>> should
> > > > >>>>>>>
> > > > >>>>>>> be
> > > > >>>>>>>>>
> > > > >>>>>>>>> able
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> to
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> stop (consume without propagating) programmatically as
> > > > >>>>>
> > > > >>>>> well.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Thanks
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> On Thu, Dec 29, 2016 at 8:48 AM, Bhupesh Chawda <
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> [email protected]
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> Thanks Vlad, I am trying out the approach you
> > > > >>>
> > > > >>> mentioned
> > > > >>>>>>>>
> > > > >>>>>>>> regarding
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> having
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> another interface which allows sinks to put a
> > > > >>
> > > > >> control
> > > > >>>>>>
> > > > >>>>>> tuple.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> Regarding the delivery of control tuples, here is
> > > > >>>
> > > > >>> what
> > > > >>>>
> > > > >>>> I
> > > > >>>>>
> > > > >>>>> am
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> planning
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> to
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> do:
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> All the control tuples which are emitted in a
> > > > >>>>
> > > > >>>> particular
> > > > >>>>>>>
> > > > >>>>>>> window
> > > > >>>>>>>>>
> > > > >>>>>>>>> are
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> delivered after all the data tuples have been
> > > > >>>
> > > > >>> delivered
> > > > >>>>>
> > > > >>>>> to
> > > > >>>>>>>
> > > > >>>>>>> the
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> respective
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> ports, but before the endWindow() call. The operator
> > > > >>>>
> > > > >>>> can
> > > > >>>>>>
> > > > >>>>>> then
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> process
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> control tuples in that window and can do any
> > > > >>>>
> > > > >>>> finalization
> > > > >>>>>>
> > > > >>>>>> in
> > > > >>>>>>>>
> > > > >>>>>>>> the
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> end
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> window
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> call. There will be no delivery of control tuples
> > > > >>>
> > > > >>> after
> > > > >>>>>>>>>
> > > > >>>>>>>>> endWindow()
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> and
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> before the next beginWindow() call.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> For handling the propagation of control tuples
> > > > >>>
> > > > >>> further
> > > > >>>>
> > > > >>>> in
> > > > >>>>>>
> > > > >>>>>> the
> > > > >>>>>>>>>
> > > > >>>>>>>>> dag,
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> we
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> are
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> planning to have an annotation on the Output Port of
> > > > >>>>
> > > > >>>> the
> > > > >>>>>>>>
> > > > >>>>>>>> operator
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> which
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> would be true by default.
> > > > >>>>>>>>>>>>>> @OutputPortFieldAnnotation(propogateControlTuples =
> > > > >>>>>>
> > > > >>>>>> false).
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> ~ Bhupesh
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> On Thu, Dec 29, 2016 at 6:24 AM, Vlad Rozov <
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> [email protected]
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> Custom control tuples are control tuples emitted
> > > > >>
> > > > >> by
> > > > >>>>
> > > > >>>> an
> > > > >>>>>>>>
> > > > >>>>>>>> operator
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> itself
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> and
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> not by the platform. Prior to the introduction of
> > > > >>>
> > > > >>> the
> > > > >>>>>>>
> > > > >>>>>>> custom
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> control
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> tuples, only Apex engine itself puts control
> > > > >>
> > > > >> tuples
> > > > >>>>>
> > > > >>>>> into
> > > > >>>>>>>>>
> > > > >>>>>>>>> various
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> sinks,
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> so
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> the engine created necessary Tuple objects with
> > > > >>
> > > > >> the
> > > > >>>>>>>>>
> > > > >>>>>>>>> corresponding
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> type
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> prior to calling Sink.put().
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> Not all sinks need to be changed. Only control
> > > > >>>
> > > > >>> tuple
> > > > >>>>>>
> > > > >>>>>> aware
> > > > >>>>>>>>>
> > > > >>>>>>>>> sinks
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> should
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> provide such functionality. In the case there is a
> > > > >>>>
> > > > >>>> lot
> > > > >>>>>
> > > > >>>>> of
> > > > >>>>>>>>
> > > > >>>>>>>> code
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> duplication,
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> please create an abstract class, that other
> > > > >>
> > > > >> control
> > > > >>>>>
> > > > >>>>> aware
> > > > >>>>>>>>
> > > > >>>>>>>> sinks
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> will
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> extend
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> from.
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> Thank you,
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> Vlad
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> On 12/23/16 06:24, Bhupesh Chawda wrote:
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> Hi Vlad,
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> Thanks for the pointer on delegating the wrapping
> > > > >>>
> > > > >>> of
> > > > >>>>>
> > > > >>>>> the
> > > > >>>>>>>>
> > > > >>>>>>>> user
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> tuple
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> to
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> control port. I was trying this out today.
> > > > >>>>>>>>>>>>>>>> The problem I see us if we introduce a
> > > > >>>>>
> > > > >>>>> putControlTuple()
> > > > >>>>>>>>>
> > > > >>>>>>>>> method
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> in
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Sink,
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> then a lot of the existing sinks would change.
> > > > >>>
> > > > >>> Also
> > > > >>>>>
> > > > >>>>> the
> > > > >>>>>>>>>
> > > > >>>>>>>>> changes
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> seemed
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> redundant as, the existing control tuples already
> > > > >>>>
> > > > >>>> use
> > > > >>>>>>
> > > > >>>>>> the
> > > > >>>>>>>>>
> > > > >>>>>>>>> put()
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> method
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> of
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> sinks. So why do something special for custom
> > > > >>>>
> > > > >>>> control
> > > > >>>>>>>>
> > > > >>>>>>>> tuples?
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> The only aspect in which the custom control
> > > > >>
> > > > >> tuples
> > > > >>>>
> > > > >>>> are
> > > > >>>>>>>>>
> > > > >>>>>>>>> different
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> is
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> that
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> these will be generated by the user and will
> > > > >>>>
> > > > >>>> actually
> > > > >>>>>
> > > > >>>>> be
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> delivered
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> to
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> ports in a different order. Perhaps we should be
> > > > >>>>
> > > > >>>> able
> > > > >>>>>
> > > > >>>>> to
> > > > >>>>>>>
> > > > >>>>>>> use
> > > > >>>>>>>>>
> > > > >>>>>>>>> the
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> existing
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> flow. The only problems as outlined before seem
> > > > >>
> > > > >> to
> > > > >>>>
> > > > >>>> be
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> identification
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> of
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>> user tuple as a control tuple.
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> ~ Bhupesh
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> On Thu, Dec 22, 2016 at 10:44 PM, Vlad Rozov <
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> [email protected]
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> Why is it necessary to wrap in the OutputPort?
> > > > >>>
> > > > >>> Can't
> > > > >>>>>
> > > > >>>>> it
> > > > >>>>>>
> > > > >>>>>> be
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> delegated
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> to
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> a
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> Sink by introducing new putControlTuple method?
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> Thank you,
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> Vlad
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> On 12/21/16 22:10, Bhupesh Chawda wrote:
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> Hi Vlad,
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> The problem in using the Tuple class as the
> > > > >>>>
> > > > >>>> wrapper
> > > > >>>>>
> > > > >>>>> is
> > > > >>>>>>>>
> > > > >>>>>>>> that
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> the
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Ports
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> belong to the API and we want to wrap the
> > > > >>>
> > > > >>> payload
> > > > >>>>>>
> > > > >>>>>> object
> > > > >>>>>>>>
> > > > >>>>>>>> of
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> the
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> control
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> tuple into the Tuple class which is not part of
> > > > >>>>
> > > > >>>> the
> > > > >>>>>>
> > > > >>>>>> API.
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> The output port will just get the payload of
> > > > >>
> > > > >> the
> > > > >>>>>
> > > > >>>>> user
> > > > >>>>>>>>>
> > > > >>>>>>>>> control
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> tuple.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> For
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> example, if the user emits a Long, as a control
> > > > >>>>>
> > > > >>>>> tuple,
> > > > >>>>>>>
> > > > >>>>>>> the
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> payload
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> object
> > > > >>>>>>>>>>>>>>>>>> will just be a Long object.
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> It is necessary to bundle this Long into some
> > > > >>>>>>>
> > > > >>>>>>> recognizable
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> object
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> so
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> that
> > > > >>>>>>>>>>>>>>>>>> the BufferServerPublisher knows that this is a
> > > > >>>>>
> > > > >>>>> Control
> > > > >>>>>>>>
> > > > >>>>>>>> tuple
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> and
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> not a
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> regular tuple and serialize it accordingly. It
> > > > >>>
> > > > >>> is
> > > > >>>>>>>>
> > > > >>>>>>>> therefore
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> necessary
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> that
> > > > >>>>>>>>>>>>>>>>>> the tuple be part of some known hierarchy so
> > > > >>>
> > > > >>> that
> > > > >>>>>
> > > > >>>>> can
> > > > >>>>>>
> > > > >>>>>> be
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> distinguished
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> from
> > > > >>>>>>>>>>>>>>>>>> other payload tuples. Let us call this class
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> ControlTupleInterface.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> Note
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> that this needs to be done before the tuple is
> > > > >>>>>>
> > > > >>>>>> inserted
> > > > >>>>>>>>
> > > > >>>>>>>> into
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> the
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> sink
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> which
> > > > >>>>>>>>>>>>>>>>>> is done in the port objects. Once the tuple is
> > > > >>>>>>
> > > > >>>>>> inserted
> > > > >>>>>>>>
> > > > >>>>>>>> into
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> the
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> sink,
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> it
> > > > >>>>>>>>>>>>>>>>>> would seem just like any other payload tuple
> > > > >>
> > > > >> and
> > > > >>>>>>
> > > > >>>>>> cannot
> > > > >>>>>>>
> > > > >>>>>>> be
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> distinguished.
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> For this reason, I had something like the
> > > > >>>>
> > > > >>>> following
> > > > >>>>>
> > > > >>>>> in
> > > > >>>>>>>>
> > > > >>>>>>>> API:
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> package com.datatorrent.api;
> > > > >>>>>>>>>>>>>>>>>> public class ControlTupleInterface
> > > > >>>>>>>>>>>>>>>>>> {
> > > > >>>>>>>>>>>>>>>>>>      Object payload; // User control tuple
> > > > >>>>
> > > > >>>> payload. A
> > > > >>>>>>>>
> > > > >>>>>>>> Long()
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> for
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> example.
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>      UUID id;  // Unique Id to de-duplicate in
> > > > >>>>>>
> > > > >>>>>> downstream
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> operators
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> }
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> Regarding your suggestion on using the Tuple
> > > > >>>
> > > > >>> class
> > > > >>>>>
> > > > >>>>> as
> > > > >>>>>>>
> > > > >>>>>>> the
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> wrapper
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> for
> > > >
> > >
> >
>

Reply via email to