Sorry to pick up this discussion again after a long time. @Yun Gao, could you share the PoC if possible?
Best Yun Tang On 2022/12/08 10:20:21 Piotr Nowojski wrote: > Sounds good to me as well! > > Best, > Piotrek > > czw., 8 gru 2022 o 09:53 Dawid Wysakowicz <dwysakow...@apache.org> > napisał(a): > > > Sounds like a good plan to me. > > On 08/12/2022 08:58, Yun Gao wrote: > > > > Hi Dawid, > > > > Very thanks for the discussion and sorry for the delayed response > > since I was hesitated on some points. > > > > But as a whole, with some more thought, first I agree with that adding > > the trigger() / cancle() methods to some kind of timer object is not > > necessary > > for us to achieve the exactly-once for the operators. We could follow the > > direction of "modifying the implementation of the operators" to achieve the > > same target. > > > > But continue to think with this direction, it now looks to me it is also > > not > > needed to add the callback to the timer services: > > 1. For InternalTimerService, the operators could just call > > `InternalTimerService > > #forEachProcessingTimer()` on finish to handle the pending timers. > > 2. For the timers registered to the underlying ProcessingTimerService, at > > least in > > the currently listed scenarios, the operators itself knows what is the > > remaining work > > (e.g., the FileWriter knows if it has in-progress file to flush). > > > > Operators could handle the remaining timers in finish() method. > > > > Then the only interface we need to consider is that added to the > > ProcessFunction. The > > current interface also looks ok to me. > > > > If you think the above option works, I could first have a PoC that > > demonstrate it is sufficient > > to only modify the operator implementation to handling the remaining > > workers properly on > > finish(). If there are new issues I'll post here and we could have some > > more discussion. > > > > Best, > > Yun Gao > > > > > > ------------------Original Mail ------------------ > > *Sender:*Dawid Wysakowicz <dwysakow...@apache.org> > > <dwysakow...@apache.org> > > *Send Date:*Fri Dec 2 21:21:25 2022 > > *Recipients:*Dev <dev@flink.apache.org> <dev@flink.apache.org> > > *Subject:*Re: [DISCUSS] FLIP-269: Properly Handling the Processing Timers > > on Job Termination > > > >> Ad. 1 > >> > >> I'd start with ProcessingTimerService as that's the only public > >> interface. It is exposed in the Sink V2 interface. In this scenario it > >> would be the Sink interface that need to extend from a EOFTimersHandler. I > >> believe it would be hard to pass it from there to the ProcessingTimeService > >> as it is passed from the outside e.g. in the ProcessingTimeServiceAware. > >> For that reason I'd go with a registration method in that interface. > >> > >> In ProcessFunction I'd go with a mixin approach, so a ProcessFunction can > >> extend from EOFTimersHandler. I'd do that because ProcessFunction does not > >> have an init/open method where we could register the handler. > >> > >> On operator level I'd have a registration method in InternalTimerService. > >> I believe that's the only way to handle the above ProcessFunction aproach. > >> E.g. in KeyedProcessOperator you need to check if the UDF extend from the > >> interface not the operator itself. > >> > >> Ad. 2 > >> > >> I'd go with > >> > >> *(Keyed)ProcessFunction:* > >> > >> interface EOFTimersHandler { > >> > >> void handleProcessingTimer(long timestamp, Context); > >> > >> } > >> > >> interface Context { > >> public abstract <X> void output(OutputTag<X> outputTag, X value); > >> > >> public abstract K getCurrentKey(); > >> > >> // we can extend it for waitFor later > >> > >> } > >> > >> *ProcessingTimeService: * > >> > >> interface EOFTimersHandler { > >> > >> void handleProcessingTimer(long timestamp, Context); > >> > >> } > >> > >> interface Context { > >> > >> // we can extend it for waitFor later > >> > >> } > >> > >> *InternalTimeService:* > >> > >> interface EOFTimersHandler { > >> > >> void handleProcessingTimer(InternalTimer<K,N> timer Context); > >> > >> } > >> > >> interface Context { > >> > >> // we can extend it for waitFor later > >> > >> } > >> > >> Personally I'd not try to unify those places too much. They have also > >> different visibilities (public/internal), have access to different set of > >> metadata (key/namespace). > >> > >> > >> Ad 3. > >> > >> I don't like the having the trigger/cancel methods, because: > >> > >> 1. I don't like the back and forth between system and UDF > >> > >> 2. Yes, the biggest issue I have is with the possibility with registering > >> new timers. I am trying to be on the safe side here. I don't like the idea > >> of dropping them, because it is again making assumptions what users do with > >> those timers. What if they e.g. emit counter if it reached certain > >> threshold? We'd need an additional flag in the method that is the final > >> timer. My sentiment is that we're making it questionably easier to trigger > >> a timer for the cost of openning up for unforeseen problems with follow up > >> registration. > >> > >> Best, > >> > >> Dawid > >> On 30/11/2022 12:13, Yun Gao wrote: > >> > >> Hi Dawid, PiotrVery thanks for the discussion!As a whole I think we are > >> already consistent with the callback option, and I don't think I opposed > >> that we could modify the current internal implementation. But from my side > >> it is still not clear what the actual interfaces are proposing. Let me > >> first try to summarize that a bit:1) Which object does the handlers > >> register on?It seems there are two options, one is to timer services > >> (InternalTimerService/ ProcessingTimerService or some equivalent things > >> after refactoring), the otherone is as a lifecycle of the operator. I'm > >> now tending to the latter one, how do you think on this part?2) What is > >> the interface of the handler?Option 1 is that interface SomeHandlerName { > >> void processingTimer(Timer timer);}class Timer { long getTimestamp(); void > >> trigger(); void cancel(); // Other actions if required. }But it seems > >> there is controversy on whether to add actions to the timer class. If > >> without that, with my understanding the interfaces of the Option 2 > >> areinterface SomeHandlerName { void processTimer(Timer timer); }interface > >> KeyedSomeHandlerName<KEY, NAMESPACE> { void > >> processKeyedTimer(KeyedTimer<KEY, NAMESPACE> timer, Context ctx); }class > >> Timer { long getTimestamp();}class KeyedTimer<KEY, NAMESPACE> extends > >> Timer { KEY getKey(); NAMESPACE getNamespace();}void Context {void > >> executeAtScheduledTime(Consumer<timer> handler);}As Piotr has pointed out, > >> if we could eliminate the logic of namespace, we could thenremove the > >> namespace related type parameter and method from the interfaces.Do I > >> understand right?Besides, I'm still fully got the reason that why we > >> should not add the actions to the timer class, in consideration that it > >> seems in most cases users could implement their logical with simply > >> calling timer.trigger() (I think the repeat registration is indeed a > >> problem, but I think we could ignore the timers registered during > >> termination). Could you further enlighten me a bit on this part?Best,Yun > >> Gao------------------------------------------------------------------From:Piotr > >> Nowojski <pnowoj...@apache.org> <pnowoj...@apache.org>Send Time:2022 Nov. > >> 30 (Wed.) 17:10To:dev <dev@flink.apache.org> > >> <dev@flink.apache.org>Subject:Re: [DISCUSS] FLIP-269: Properly Handling > >> the Processing Timers on Job TerminationHi,I have a couple of > >> remarks.First a general one. For me the important part in the design of > >> this API ishow to expose this to Flink users in public interfaces. > >> NamelyProcessFunction and StreamOperator. InternalTimerService is an > >> internalclass, so we can change it and break it as needed in the > >> future.For registering a handler like proposed by Dawid:interface > >> SomeHandlerName { void onTimer(/* whatever type it is */ timer, Context > >> ctx ) { }}makes sense to me. For the InternalTimerService I think it > >> doesn't mattertoo much what we do. We could provide a similar interface as > >> for theProcessFunction/StreamOperator, it doesn't have to be the same one. > >> On thecontrary, I think it shouldn't be the same, as part of this effort > >> weshouldn't be exposing the concept of `Namespaces` to the public facing > >> API.Re the "waitFor". Theoretically I see arguments why users might want > >> to usethis, but I'm also not convinced whether that's necessary in > >> practice. Iwould be +1 either way. First version can be without this > >> functionality andwe can add it later (given that we designed a good place > >> to add it in thefuture, like the `Context` proposed by Dawid). But I'm > >> also fine adding itnow if others are insisting.Best,Piotrekśr., 30 lis > >> 2022 o 09:18 Dawid Wysakowicz <dwysakow...@apache.org> > >> <dwysakow...@apache.org>napisał(a): > >> > >> WindowOperator is not implemented by users. I can see that > >> forInternalTimerService we'll needinterface PendingTimerProcessor<KEY, > >> NAMESPACE> {void onTimer(InternalTimer<KEY, NAMESPACE> timer) > >> {doHandleTimer(timer);}I don't see a problem with that.As you said > >> ProcessingTimeService is a user facing interface andcompletely unrelated > >> to the InternalTimerService. I don't see a reasonwhy we'd need to unify > >> those.As for the waitFor behaviour. Personally, I have not been convinced > >> itis necessary. Maybe it's just my lack of vision, but I can't think of > >> ascenario where I'd use it. Still if we need it, I'd go for something > >> like:void onTimer(/* whatever type it is */ timer, Context ctx ) > >> {}interface Context {void executeAtScheduledTime(Consumer<timer> > >> handler);}That way you have independent simple interfaces that need to > >> work onlyin a single well defined scenario and you don't need to match > >> aninterface to multiple different cases.Best,DawidOn 30/11/2022 07:27, Yun > >> Gao wrote: > >> > >> Hi Dawid,Thanks for the comments!As a whole I'm also open to the API and I > >> also prefer to use simplebut flexible interfaces, but it still looks there > >> are some problem tojust let users to implement the termination > >> actions.Let's take the WindowOperator as an example. As seen in [1],in the > >> timer processing logic it needs to acquire the key / namespaceinformation > >> bound to the timer (which is only supported by the > >> > >> InternalTimerService). > >> > >> Thus if we want users to implement the same logic on termination, we > >> > >> either let users > >> > >> to trigger the timer handler directly or we also allows users to access > >> > >> these piece of > >> > >> information. If we go with the later direction, we might need to provide > >> > >> interfaces like > >> > >> interface PendingTimerProcessor<KEY, NAMESPACE> {void onTimer(Timer<KEY, > >> NAMESPACE> timer) {doHandleTimer(timer);}}class Timer<KEY, NAMESPACE> > >> {long getTimestamp();KEY getKey();NAMESPACE getNamespace();}Then we'll > >> have the issue that since we need the interface to handle > >> > >> both of cases of > >> > >> InternalTimerSerivce and raw ProcessTimeService, the later do not have > >> > >> key and > >> > >> namespace information attached, and its also be a bit inconsistency for > >> > >> users to have to set > >> > >> the KEY and NAMESPACE types.Besides, it looks to me that if we want to > >> implement behaviors like > >> > >> waiting for, it might > >> > >> be not simply reuse the time handler time, then it requires every > >> > >> operator authors to > >> > >> re-implement such waiting logics. > >> > >> Moreover it still have the downside that if you call back to the > >> > >> `onTimer` method after > >> > >> `trigger` you have access to the Context which lets you register new > >> > >> timers. > >> > >> I think we could simply drop the timers registered during we start > >> > >> processing the pending timers > >> > >> on termination. Logically there should be no new data after termination. > >> > >> I think I am not convinced to these arguments. First of all I'm afraid > >> > >> there is no clear distinction > >> > >> in that area what is runtime and what is not. I always found > >> > >> `AbstracStreamOperator(*)` actually part > >> > >> of runtime or Flink's internals and thus I don't find > >> > >> `InternalTimerService` a utility, but a vital part > >> > >> of the system. Let's be honest it is impossible to implement an > >> > >> operator without extending from > >> > >> `AbstractStreamOperator*`.What would be the problem with having a > >> > >> proper implementation in > >> > >> `InternalTimerService`? Can't we do it like this?: > >> > >> I think the original paragraph is only explanation to that the interface > >> > >> is harder to support if we > >> > >> allows the users to implement the arbitrary logic. But since now we are > >> > >> at the page with the callback > >> > >> option, users could always be allowed to implement arbitrary logic no > >> > >> matter we support timer.trigger() > >> > >> or not, thus I think now there is no divergence on this point. I also > >> > >> believe in we'll finally have some logic > >> > >> similar to the proposed one that drain all the times and process > >> it.Best,Yun Gao[1] > >> > >> https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488 > >> > >> <https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488 > >> > > >> <https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488><https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488 > >> > >> <https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488 > >> > > >> <https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488> > >> > >> ------------------------------------------------------------------From:Dawid > >> Wysakowicz <dwysakow...@apache.org> <dwysakow...@apache.org>Send > >> Time:2022 Nov. 28 (Mon.) 23:33To:dev <dev@flink.apache.org> > >> <dev@flink.apache.org>Subject:Re: [DISCUSS] FLIP-269: Properly Handling > >> the Processing Timers > >> > >> on Job Termination > >> > >> Do we really need to have separate methods for > >> > >> triggering/waiting/cancelling. To me it sounds rather counterintuitive. > >> Whycan't users just execute whatever they want in the handler itself > >> insteadof additional back and forth with the system? Moreover it still > >> have thedownside that if you call back to the `onTimer` method after > >> `trigger` youhave access to the Context which lets you register new timers. > >> > >> I find following approach much simpler:void onTimer(...) > >> {doHandleTimer(timestamp);}void processPendingTimer(...) {// > >> triggerdoHandleTimer(timestamp);// for cancel, simply do nothing...}Sorry > >> I might not make it very clear here. I think the difficulty with > >> > >> supported setting the currentKey is a special issue for the > >> callbackoptions (no matter what the interface is) since it allows users to > >> executelogic other than the one registered with the timers. The complexity > >> comesfrom that currently we have two level of TimerServices: > >> TheProcessingTimerService (there is no key) and InternalTimerService > >> (withkey). Currently only ProcessingTimerService is exposed to the runtime > >> andInternalTimerService is much more a utility to implement the operator. > >> Thenwith the current code, the runtime could only access > >> toProcessingTimerService on termination. > >> > >> I think I am not convinced to these arguments. First of all I'm afraid > >> > >> there is no clear distinction in that area what is runtime and what is > >> not.I always found `AbstracStreamOperator(*)` actually part of runtime > >> orFlink's internals and thus I don't find `InternalTimerService` a > >> utility,but a vital part of the system. Let's be honest it is impossible > >> toimplement an operator without extending from `AbstractStreamOperator*`. > >> > >> What would be the problem with having a proper implementation in > >> > >> `InternalTimerService`? Can't we do it like this?: > >> > >> AbstractStreamOperator#finish() > >> {internalTimerService.finish();}InternalTimerService#finish() {while > >> ((timer = processingTimeTimersQueue.peek()) != null) > >> {keyContext.setCurrentKey(timer.getKey());processingTimeTimersQueue.poll();onEndOfInputHandler.processPendingTimer(timer);}}If > >> we only executes some predefined actions, we do not need to worry > >> > >> about the implementation of InternalTimerService and just execute > >> theregistered timers. But if we allow users to execute arbitrary logic, > >> weneed to be also aware of the InternalTimerServices and parse the key > >> fromthe timers stored in it. I think we should always have method to > >> overcomethis issue, but to support the callback options would be more > >> complex. > >> > >> I am not sure, having "predefined actions" would be good enough that we > >> > >> do not need to set a key. As a user I'd anyhow expect the proper key to > >> beset in processPendingTimer. > >> > >> Best,DawidOn 24/11/2022 08:51, Yun Gao wrote:Hi Piotr / Divye, Very thanks > >> for the discussion! First IMO it seems we > >> > >> have reached the consensus on the high-level API: Most operators > >> shouldusually have only one reasonable action to the pending timers > >> ontermination, thus we could let the operators to implement its own > >> actionswith the low-level interface provided. The only exception is > >> theProcessFunction, with which users might register customized timers, > >> thususers might also defines the actions on termination (If I > >> havemisunderstandings here, please correct me). For the low-level API, I > >> couldget the benefits with the callback options: since in most cases an > >> operatorhas only one action to all the timers, its a waste for us to store > >> the sameflag for all the timers, also with a lot of code / state format > >> changes.But since it is enough for most users to simply trigger / cacnel > >> thetimers, it would be redundant for users to implement the logic twice. > >> Thusperhaps we might combine the benefits of the two options: We might > >> have aseparate interface public interface TimerHandlersOnTermination { > >> voidprocessPendingTimer(Timer timer, long currentTime); } public class > >> Timer {long getRegisteredTimestamp(); void trigger(); void waitFor(); > >> voidcancel(); } Then if an operator have implemented > >> theTimerHandlersOnTermination interface, on termination we could > >> callprocessPendingTimer(xx) for every pending timers. Users might > >> simplytrigger / waitFor / cancel it, or execute some other logics if > >> needed. Thenfor the ProcessFunction we might have a similar interface > >> toprocessPendingTimer, except we might need to provide Context / Collector > >> tothe ProcessFunction. Do you think this would be a good direction? > >> Also@Piotr I don't see a problem here. Interface doesn't have to reflect > >> that,only the runtime must set the correct key context before executing > >> thehandler dealing with the processing time timers at the end of > >> input/time.Sorry I might not make it very clear here. I think the > >> difficulty withsupported setting the currentKey is a special issue for the > >> callbackoptions (no matter what the interface is) since it allows users to > >> executelogic other than the one registered with the timers. The complexity > >> comesfrom that currently we have two level of TimerServices: > >> TheProcessingTimerService (there is no key) and InternalTimerService > >> (withkey). Currently only ProcessingTimerService is exposed to the runtime > >> andInternalTimerService is much more a utility to implement the operator. > >> Thenwith the current code, the runtime could only access > >> toProcessingTimerService on termination. If we only executes some > >> predefinedactions, we do not need to worry about the implementation > >> ofInternalTimerService and just execute the registered timers. But if > >> weallow users to execute arbitrary logic, we need to be also aware of > >> theInternalTimerServices and parse the key from the timers stored in it. > >> Ithink we should always have method to overcome this issue, but to > >> supportthe callback options would be more complex. Best, Yun > >> Gao------------------------------------------------------------------From:Divye > >> Kapoor <dkap...@pinterest.com.INVALID> <dkap...@pinterest.com.INVALID> > >> <mailto:dkap...@pinterest.com.INVALID > <dkap...@pinterest.com.INVALID> > >> Send Time:2022 Nov. 24 (Thu.) 08:50To:dev <dev@flink.apache.org> > >> <dev@flink.apache.org> <mailto:dev@flink.apache.org > > >> <dev@flink.apache.org> Cc:XenonDevelopment Team <xenon-...@pinterest.com> > >> <xenon-...@pinterest.com> <mailto:xenon-...@pinterest.com > >> <xenon-...@pinterest.com> > >> > >> Subject:Re: Re: [DISCUSS] FLIP-269: Properly Handling the Processing > >> > >> Timers on Job Termination Sounds good. Looks like we're on the same > >> page.Thanks! Divye On Wed, Nov 23, 2022 at 2:41 AM Piotr Nowojski > >> <pnowoj...@apache.org> <mailto:pnowoj...@apache.org > > >> <pnowoj...@apache.org> wrote: Hi Divye Ithink we are mostly on the same > >> page. Just to clarify/rephrase: One thingto think about - on EOF “trigger > >> immediately” will mean that theasynchronous wait timeout timers will also > >> fire - which is undesirable Ididn't mean to fire all timers immediately in > >> all of the built-inoperators. Just that each built-in operator can have a > >> hard coded way(without a way for users to change it) to handle those > >> timers. Windowedoperators would trigger the lingering timers (flush > >> outputs),AsyncWaitOperator could just ignore them. The same way users > >> could registerEOF timer handlers in the ProcessFunction as Dawid > >> Wysakowicz proposed, we(as flink developers) could use the same mechanism > >> to implement anybehaviour we want for the built-in operators. There should > >> be no need toadd any separate mechanism. Best, Piotrek śr., 23 lis 2022 o > >> 08:21 DivyeKapoor <dkap...@pinterest.com.invalid> > >> <dkap...@pinterest.com.invalid> <mailto:dkap...@pinterest.com.invalid > > >> <dkap...@pinterest.com.invalid> napisał(a): Thanks Yun/Piotrek, Somebrief > >> comments inline below. On Tue, Nov 22, 2022 at 1:37 AM Piotr > >> Nowojski<pnowoj...@apache.org> <pnowoj...@apache.org> > >> <mailto:pnowoj...@apache.org > <pnowoj...@apache.org> wrote: Hi, All inall > >> I would agree with Dawid's proposal. +1 We can add the flexibility ofhow > >> to deal with the timers in the low level API via adding a handler - > >> ifsomeone needs to customize it, he will always have a workaround. Note > >> aftergiving it more thought, I agree that registering some handlers is > >> betterthan overloading the register timer method and modifying the timer's > >> state.+1. At the same time, we can force the most sensible semantic that > >> we thinkfor the couple of built-in operators, which should be > >> prettystraightforward (either ignore the timers, or fire them at once). I > >> agreethere might be some edge cases, that theoretically user might want to > >> waitfor the timer to fire naturally, but: 1. I'm not sure how common > >> inpractice this will be. If not at all, then why should we be > >> complicatingthe API/system? That’s fair. However, the specifics are very > >> importanthere. One thing to think about - on EOF “trigger immediately” > >> will meanthat the asynchronous wait timeout timers will also fire - which > >> isundesirable (because they are racing with the last async call). > >> However,the issue is cleanly resolved by waiting for the timer to be > >> canceled whenthe last event is processed. (“Wait for” case). Ignoring the > >> timer has theleast justification. Registering the handler as per Dawid’s > >> proposal andhaving that handler unregister the timers on EOF makes best > >> sense. Thissolution also unifies the trigger immediately case as that > >> handler canreregister the timers for early termination. The proposal: 1. > >> Operatorreceives EOF 2. EOF timer handler triggers 3. EOF handler adjusts > >> theregistered timers for early trigger or ignore. If wait-for behavior > >> isdesired, timers are not changed. This is controlled in client code. > >> 4.Operator waits for all timers to drain/trigger. (“Always”). There is > >> nospecial handling for ignore/early trigger. 5. Operator allows job > >> toproceed with shutdown. The only api change needed is an EOF handler. > >> Theother agreement we need is that “Wait for” is the desired behavior > >> inprocessing time and that processing time is fundamentally different > >> fromevent time in this respect. (I have changed my thinking since the > >> lastmail). 2. We can always expand the API in the future, and let the > >> useroverride the default built-in behaviour of the operators via some > >> setter onthe stream transformation (`SingleOutputStreamOperator`), or via > >> somecustom API DSL style in each of the operators separately. This is > >> notrequired. See above. Re forcing the same semantics for processing > >> timetimers as for event time ones - this is tempting, but indeed I see > >> apossibility that users need to adhere to some external constraints > >> whenusing processing time. +1. As above, we should consider the 2 > >> casesfundamentally different in this area. Re: Yun - b) Another issue is > >> thatwhat if users use timers with different termination actions in the > >> sameoperator / UDF? For example, users use some kind of timeout (like > >> throwsexception if some thing not happen after some other thing), and also > >> somekind of window aggregation logic. In this case, without additional > >> tags,users might not be able to distinguish which timer should be canceled > >> andwhich time should be triggered ? as above. The EOF handler makes > >> thechoice. 4. How could these scenarios adjust their APIs ? From the > >> currentlisted scenarios, I'm more tend to that as @Dawid pointed out, > >> there mightbe only one expected behavior for each scenario, thus it does > >> not seems toneed to allow users to adjust the behavior. Thus @Divye may I > >> have a doubleconfirmation currently do we have explicit scenarios that is > >> expected tochange the different behaviors for the same scenario? Wait-for > >> behavior isprobably the only expected behavior and any alterations should > >> be from theEOF handler managing the registered timers. Besides @Divye from > >> the listedscenarios, I have another concern for global configuration is > >> that for onejob, different operators seems to still might have different > >> expectedbehaviors. For example, A job using both Window operator > >> andAsyncWaitOperator might have different requirements for timers > >> ontermination? Thank you for raising this case. This changed my > >> thinking.Based on your point, we should try and align on the “Wait-for” > >> with EOFhandler proposal. I’m withdrawing the “single-runtime-config” > >> proposal.Best, Divye > >> > >> >