If we allow these flexibilities for control tuples, we are back to square one and the regular data tuple is specifically for that.
When we talk about control tuples, I think it's safe to make these two assumptions: 1) The control tuple is always sent and handled at streaming window boundary. 2) The control tuple is always sent to all downstream partitions And a third one which is debatable: 3) The control tuple is (always?) sent from the input operator. David On Mon, Jun 27, 2016 at 2:43 PM, Chinmay Kolhatkar <[email protected]> wrote: > I hope I'm not commenting too late on this thread. > > From the above discussion, it looks like the requirement is to have a > custom tuple which has following 2 capabilities to influence streaming > engine on: > 1. When to send it (between windows/within window) > 2. Where to send it (all partitions, some partition(s), one partition etc) > > I think if we design the custom tuple keeping in mind above 2 capabilities, > user can have all the flexibility to make right use as per need. > > Thoughts? > > - Chinmay. > > > > > On Mon, Jun 27, 2016 at 2:35 PM, Vlad Rozov <[email protected]> > wrote: > > > Ability to send custom control tuples within window may be useful, for > > example, for sessions tracking, where session boundaries are not aligned > > with window boundaries and 500 ms latency is not acceptable for an > > application. > > > > Thank you, > > > > Vlad > > > > > > On 6/25/16 10:52, Thomas Weise wrote: > > > >> It should not matter from where the control tuple is triggered. It will > be > >> good to have a generic mechanism to propagate it and other things can be > >> accomplished outside the engine. For example, the new comprehensive > >> support > >> for windowing will all be in Malhar, nothing that the engine needs to > know > >> about it except that we need the control tuple for watermark propagation > >> and idempotent processing. > >> > >> I also think the main difference to other tuples is the need to send it > to > >> all partitions. Which is similar to checkpoint window tuples, but not > the > >> same. Here, we probably also need the ability for the user to control > >> whether such tuple should traverse the entire DAG or not. For a batch > use > >> case, for example, we may want to send the end of file to the next > >> operator, but not beyond, if the operator has asynchronous processing > >> logic > >> in it. > >> > >> For any logic to be idempotent, the control tuple needs to be processed > at > >> a window boundary. Receiving the control tuple in the window callback > >> would > >> avoid having to track extra state in the operator. I don't think that's > a > >> major issue, but what is the use case for processing a control tuple > >> within > >> the window? > >> > >> Thomas > >> > >> > >> > >> On Sat, Jun 25, 2016 at 6:19 AM, Pramod Immaneni < > [email protected]> > >> wrote: > >> > >> For the use cases you mentioned, I think 1) and 2) are more likely to > >>> be controlled directly by the application, 3) and 4) are more likely > >>> going to be triggered externally and directly handled by the engine > >>> and 3) is already being implemented that way (apexcore-163). > >>> > >>> The control tuples emitted by an operator would be sent to all > >>> downstream partitions isn't it and that would be the chief distinction > >>> compared to data (apart from the payload) which would get partitioned > >>> under normal circumstances. It would also be guaranteed that > >>> downstream partitions will receive control tuples only after the data > >>> that was sent before it so we could send it immediately when it is > >>> emitted as opposed to window boundaries. > >>> > >>> However during unification it is important to know if these control > >>> tuples have been received from all upstream partitions before > >>> proceeding with a control operation. One could wait till end of the > >>> window but that introduces a delay however small, I would like to add > >>> to the proposal that the platform only hand over the control tuple to > >>> the unifier when it has been received from all upstream partitions > >>> much like how end window is processed but not wait till the actual end > >>> of the window. > >>> > >>> Regd your concern about idempotency, we typically care about > >>> idempotency at a window level and doing the above will still allow the > >>> operators to preserve that easily. > >>> > >>> Thanks > >>> > >>> On Jun 24, 2016, at 11:22 AM, David Yan <[email protected]> wrote: > >>>> > >>>> Hi all, > >>>> > >>>> I would like to propose a new feature to the Apex core engine -- the > >>>> support of custom control tuples. Currently, we have control tuples > such > >>>> > >>> as > >>> > >>>> BEGIN_WINDOW, END_WINDOW, CHECKPOINT, and so on, but we don't have the > >>>> support for applications to insert their own control tuples. The way > >>>> currently to get around this is to use data tuples and have a separate > >>>> > >>> port > >>> > >>>> for such tuples that sends tuples to all partitions of the downstream > >>>> operators, which is not exactly developer friendly. > >>>> > >>>> We have already seen a number of use cases that can use this feature: > >>>> > >>>> 1) Batch support: We need to tell all operators of the physical DAG > when > >>>> > >>> a > >>> > >>>> batch starts and ends, so the operators can do whatever that is needed > >>>> > >>> upon > >>> > >>>> the start or the end of a batch. > >>>> > >>>> 2) Watermark: To support the concepts of event time windowing, the > >>>> watermark control tuple is needed to tell which windows should be > >>>> considered late. > >>>> > >>>> 3) Changing operator properties: We do have the support of changing > >>>> operator properties on the fly, but with a custom control tuple, the > >>>> command to change operator properties can be window aligned for all > >>>> partitions and also across the DAG. > >>>> > >>>> 4) Recording tuples: Like changing operator properties, we do have > this > >>>> support now but only at the individual physical operator level, and > >>>> > >>> without > >>> > >>>> control of which window to record tuples for. With a custom control > >>>> > >>> tuple, > >>> > >>>> because a control tuple must belong to a window, all operators in the > >>>> DAG > >>>> can start (and stop) recording for the same windows. > >>>> > >>>> I can think of two options to achieve this: > >>>> > >>>> 1) new custom control tuple type that takes user's serializable > object. > >>>> > >>>> 2) piggy back the current BEGIN_WINDOW and END_WINDOW control tuples. > >>>> > >>>> Please provide your feedback. Thank you. > >>>> > >>>> David > >>>> > >>> > > >
