Thanks for the reply. It is a good question. There are multi choices as follows:
1. We can persist control signals in HighAvailabilityServices and replay them after failover. 2. Only tell the users that the control signals take effect after they are checkpointed. Steven Wu [via Apache Flink User Mailing List archive.] < ml+s2336050n44278...@n4.nabble.com> 于2021年6月8日周二 下午2:15写道: > > I can see the benefits of control flow. E.g., it might help the old (and > inactive) FLIP-17 side input. I would suggest that we add more details of > some of the potential use cases. > > Here is one mismatch with using control flow for dynamic config. Dynamic > config is typically targeted/loaded by one specific operator. Control flow > will propagate the dynamic config to all operators. not a problem per se > > Regarding using the REST api (to jobmanager) for accepting control > signals from external system, where are we going to persist/checkpoint the > signal? jobmanager can die before the control signal is propagated and > checkpointed. Did we lose the control signal in this case? > > > On Mon, Jun 7, 2021 at 11:05 PM Xintong Song <[hidden email] > <http:///user/SendEmail.jtp?type=node&node=44278&i=0>> wrote: > >> +1 on separating the effort into two steps: >> >> 1. Introduce a common control flow framework, with flexible >> interfaces for generating / reacting to control messages for various >> purposes. >> 2. Features that leverating the control flow can be worked on >> concurrently >> >> Meantime, keeping collecting potential features that may leverage the >> control flow should be helpful. It provides good inputs for the control >> flow framework design, to make the framework common enough to cover the >> potential use cases. >> >> My suggestions on the next steps: >> >> 1. Allow more time for opinions to be heard and potential use cases >> to be collected >> 2. Draft a FLIP with the scope of common control flow framework >> 3. We probably need a poc implementation to make sure the framework >> covers at least the following scenarios >> 1. Produce control events from arbitrary operators >> 2. Produce control events from JobMaster >> 3. Consume control events from arbitrary operators downstream >> where the events are produced >> >> >> Thank you~ >> >> Xintong Song >> >> >> >> On Tue, Jun 8, 2021 at 1:37 PM Yun Gao <[hidden email] >> <http:///user/SendEmail.jtp?type=node&node=44278&i=1>> wrote: >> >>> Very thanks Jiangang for bringing this up and very thanks for the >>> discussion! >>> >>> I also agree with the summarization by Xintong and Jing that control >>> flow seems to be >>> a common buidling block for many functionalities and dynamic >>> configuration framework >>> is a representative application that frequently required by users. >>> Regarding the control flow, >>> currently we are also considering the design of iteration for the >>> flink-ml, and as Xintong has pointed >>> out, it also required the control flow in cases like detection global >>> termination inside the iteration >>> (in this case we need to broadcast an event through the iteration body >>> to detect if there are still >>> records reside in the iteration body). And regarding whether to >>> implement the dynamic configuration >>> framework, I also agree with Xintong that the consistency guarantee >>> would be a point to consider, we >>> might consider if we need to ensure every operator could receive the >>> dynamic configuration. >>> >>> Best, >>> Yun >>> >>> >>> >>> ------------------------------------------------------------------ >>> Sender:kai wang<[hidden email] >>> <http:///user/SendEmail.jtp?type=node&node=44278&i=2>> >>> Date:2021/06/08 11:52:12 >>> Recipient:JING ZHANG<[hidden email] >>> <http:///user/SendEmail.jtp?type=node&node=44278&i=3>> >>> Cc:刘建刚<[hidden email] >>> <http:///user/SendEmail.jtp?type=node&node=44278&i=4>>; Xintong Song >>> [via Apache Flink User Mailing List archive.]<[hidden email] >>> <http:///user/SendEmail.jtp?type=node&node=44278&i=5>>; user<[hidden >>> email] <http:///user/SendEmail.jtp?type=node&node=44278&i=6>>; dev<[hidden >>> email] <http:///user/SendEmail.jtp?type=node&node=44278&i=7>> >>> Theme:Re: Add control mode for flink >>> >>> >>> >>> I'm big +1 for this feature. >>> >>> 1. Limit the input qps. >>> 2. Change log level for debug. >>> >>> in my team, the two examples above are needed >>> >>> JING ZHANG <[hidden email] >>> <http:///user/SendEmail.jtp?type=node&node=44278&i=8>> 于2021年6月8日周二 >>> 上午11:18写道: >>> >>>> Thanks Jiangang for bringing this up. >>>> As mentioned in Jiangang's email, `dynamic configuration framework` >>>> provides many useful functions in Kuaishou, because it could update job >>>> behavior without relaunching the job. The functions are very popular in >>>> Kuaishou, we also see similar demands in maillist [1]. >>>> >>>> I'm big +1 for this feature. >>>> >>>> Thanks Xintong and Yun for deep thoughts about the issue. I like the >>>> idea about introducing control mode in Flink. >>>> It takes the original issue a big step closer to essence which also >>>> provides the possibility for more fantastic features as mentioned in >>>> Xintong and Jark's response. >>>> Based on the idea, there are at least two milestones to achieve the >>>> goals which were proposed by Jiangang: >>>> (1) Build a common control flow framework in Flink. >>>> It focuses on control flow propagation. And, how to integrate the >>>> common control flow framework with existing mechanisms. >>>> (2) Builds a dynamic configuration framework which is exposed to users >>>> directly. >>>> We could see dynamic configuration framework is a top application >>>> on the underlying control flow framework. >>>> It focuses on the Public API which receives configuration updating >>>> requests from users. Besides, it is necessary to introduce an API >>>> protection mechanism to avoid job performance degradation caused by too >>>> many control events. >>>> >>>> I suggest splitting the whole design into two after we reach a >>>> consensus on whether to introduce this feature because these two sub-topic >>>> all need careful design. >>>> >>>> >>>> [ >>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamic-configuration-of-Flink-checkpoint-interval-td44059.html >>>> ] >>>> >>>> Best regards, >>>> JING ZHANG >>>> >>>> 刘建刚 <[hidden email] >>>> <http:///user/SendEmail.jtp?type=node&node=44278&i=9>> 于2021年6月8日周二 >>>> 上午10:01写道: >>>> >>>>> Thanks Xintong Song for the detailed supplement. Since flink is >>>>> long-running, it is similar to many services. So interacting with it or >>>>> controlling it is a common desire. This was our initial thought when >>>>> implementing the feature. In our inner flink, many configs used in yaml >>>>> can >>>>> be adjusted by dynamic to avoid restarting the job, for examples as >>>>> follow: >>>>> >>>>> 1. Limit the input qps. >>>>> 2. Degrade the job by sampling and so on. >>>>> 3. Reset kafka offset in certain cases. >>>>> 4. Stop checkpoint in certain cases. >>>>> 5. Control the history consuming. >>>>> 6. Change log level for debug. >>>>> >>>>> >>>>> After deep discussion, we realize that a common control flow >>>>> will benefit both users and developers. Dynamic config is just one of the >>>>> use cases. For the concrete design and implementation, it relates with >>>>> many >>>>> components, like jobmaster, network channel, operators and so on, which >>>>> needs deeper consideration and design. >>>>> >>>>> Xintong Song [via Apache Flink User Mailing List archive.] <[hidden >>>>> email] <http:///user/SendEmail.jtp?type=node&node=44278&i=10>> >>>>> 于2021年6月7日周一 下午2:52写道: >>>>> >>>>>> Thanks Jiangang for bringing this up, and Steven & Peter for the >>>>>> feedback. >>>>>> >>>>>> I was part of the preliminary offline discussions before this >>>>>> proposal went public. So maybe I can help clarify things a bit. >>>>>> >>>>>> In short, despite the phrase "control mode" might be a bit >>>>>> misleading, what we truly want to do from my side is to make the concept >>>>>> of >>>>>> "control flow" explicit and expose it to users. >>>>>> >>>>>> ## Background >>>>>> Jiangang & his colleagues at Kuaishou maintain an internal version of >>>>>> Flink. One of their custom features is allowing dynamically changing >>>>>> operator behaviors via the REST APIs. He's willing to contribute this >>>>>> feature to the community, and came to Yun Gao and me for suggestions. >>>>>> After >>>>>> discussion, we feel that the underlying question to be answered is how do >>>>>> we model the control flow in Flink. Dynamically controlling jobs via REST >>>>>> API can be one of the features built on top of the control flow, and >>>>>> there >>>>>> could be others. >>>>>> >>>>>> ## Control flow >>>>>> Control flow refers to the communication channels for sending >>>>>> events/signals to/between tasks/operators, that changes Flink's behavior >>>>>> in >>>>>> a way that may or may not affect the computation logic. Typical control >>>>>> events/signals Flink currently has are watermarks and checkpoint >>>>>> barriers. >>>>>> >>>>>> In general, for modeling control flow, the following questions should >>>>>> be considered. >>>>>> 1. Who (which component) is responsible for generating the control >>>>>> messages? >>>>>> 2. Who (which component) is responsible for reacting to the messages. >>>>>> 3. How do the messages propagate? >>>>>> 4. When it comes to affecting the computation logics, how should the >>>>>> control flow work together with the exact-once consistency. >>>>>> >>>>>> 1) & 2) may vary depending on the use cases, while 3) & 4) probably >>>>>> share many things in common. A unified control flow model would help >>>>>> deduplicate the common logics, allowing us to focus on the use case >>>>>> specific parts. >>>>>> >>>>>> E.g., >>>>>> - Watermarks: generated by source operators, handled by window >>>>>> operators. >>>>>> - Checkpoint barrier: generated by the checkpoint coordinator, >>>>>> handled by all tasks >>>>>> - Dynamic controlling: generated by JobMaster (in reaction to the >>>>>> REST command), handled by specific operators/UDFs >>>>>> - Operator defined events: The following features are still in >>>>>> planning, but may potentially benefit from the control flow model. >>>>>> (Please >>>>>> correct me if I'm wrong, @Yun, @Jark) >>>>>> * Iteration: When a certain condition is met, we might want to >>>>>> signal downstream operators with an event >>>>>> * Mini-batch assembling: Flink currently uses special watermarks >>>>>> for indicating the end of each mini-batch, which makes it tricky to deal >>>>>> with event time related computations. >>>>>> * Hive dimension table join: For periodically reloaded hive tables, >>>>>> it would be helpful to have specific events signaling that a reloading is >>>>>> finished. >>>>>> * Bootstrap dimension table join: This is similar to the previous >>>>>> one. In cases where we want to fully load the dimension table before >>>>>> starting joining the mainstream, it would be helpful to have an event >>>>>> signaling the finishing of the bootstrap. >>>>>> >>>>>> ## Dynamic REST controlling >>>>>> Back to the specific feature that Jiangang proposed, I personally >>>>>> think it's quite convenient. Currently, to dynamically change the >>>>>> behavior >>>>>> of an operator, we need to set up a separate source for the control >>>>>> events >>>>>> and leverage broadcast state. Being able to send the events via REST APIs >>>>>> definitely improves the usability. >>>>>> >>>>>> Leveraging dynamic configuration frameworks is for sure one possible >>>>>> approach. The reason we are in favor of introducing the control flow is >>>>>> that: >>>>>> - It benefits not only this specific dynamic controlling feature, but >>>>>> potentially other future features as well. >>>>>> - AFAICS, it's non-trivial to make a 3rd-party dynamic configuration >>>>>> framework work together with Flink's consistency mechanism. >>>>>> >>>>>> Thank you~ >>>>>> >>>>>> Xintong Song >>>>>> >>>>>> >>>>>> >>>>>> On Mon, Jun 7, 2021 at 11:05 AM 刘建刚 <[hidden email] >>>>>> <http:///user/SendEmail.jtp?type=node&node=44245&i=0>> wrote: >>>>>> >>>>>>> Thank you for the reply. I have checked the post you mentioned. The >>>>>>> dynamic config may be useful sometimes. But it is hard to keep data >>>>>>> consistent in flink, for example, what if the dynamic config will take >>>>>>> effect when failover. Since dynamic config is a desire for users, maybe >>>>>>> flink can support it in some way. >>>>>>> >>>>>>> For the control mode, dynamic config is just one of the control >>>>>>> modes. In the google doc, I have list some other cases. For example, >>>>>>> control events are generated in operators or external services. Besides >>>>>>> user's dynamic config, flink system can support some common dynamic >>>>>>> configuration, like qps limit, checkpoint control and so on. >>>>>>> >>>>>>> It needs good design to handle the control mode structure. Based on >>>>>>> that, other control features can be added easily later, like changing >>>>>>> log >>>>>>> level when job is running. In the end, flink will not just process data, >>>>>>> but also interact with users to receive control events like a service. >>>>>>> >>>>>>> Steven Wu <[hidden email] >>>>>>> <http:///user/SendEmail.jtp?type=node&node=44245&i=1>> 于2021年6月4日周五 >>>>>>> 下午11:11写道: >>>>>>> >>>>>>>> I am not sure if we should solve this problem in Flink. This is >>>>>>>> more like a dynamic config problem that probably should be solved by >>>>>>>> some >>>>>>>> configuration framework. Here is one post from google search: >>>>>>>> https://medium.com/twodigits/dynamic-app-configuration-inject-configuration-at-run-time-using-spring-boot-and-docker-ffb42631852a >>>>>>>> >>>>>>>> On Fri, Jun 4, 2021 at 7:09 AM 刘建刚 <[hidden email] >>>>>>>> <http:///user/SendEmail.jtp?type=node&node=44245&i=2>> wrote: >>>>>>>> >>>>>>>>> Hi everyone, >>>>>>>>> >>>>>>>>> Flink jobs are always long-running. When the job is running, >>>>>>>>> users may want to control the job but not stop it. The control >>>>>>>>> reasons can >>>>>>>>> be different as following: >>>>>>>>> >>>>>>>>> 1. >>>>>>>>> >>>>>>>>> Change data processing’ logic, such as filter condition. >>>>>>>>> 2. >>>>>>>>> >>>>>>>>> Send trigger events to make the progress forward. >>>>>>>>> 3. >>>>>>>>> >>>>>>>>> Define some tools to degrade the job, such as limit input qps, >>>>>>>>> sampling data. >>>>>>>>> 4. >>>>>>>>> >>>>>>>>> Change log level to debug current problem. >>>>>>>>> >>>>>>>>> The common way to do this is to stop the job, do >>>>>>>>> modifications and start the job. It may take a long time to recover. >>>>>>>>> In >>>>>>>>> some situations, stopping jobs is intolerable, for example, the job is >>>>>>>>> related to money or important activities.So we need some >>>>>>>>> technologies to control the running job without stopping the job. >>>>>>>>> >>>>>>>>> >>>>>>>>> We propose to add control mode for flink. A control mode based on >>>>>>>>> the restful interface is first introduced. It works by these steps: >>>>>>>>> >>>>>>>>> >>>>>>>>> 1. The user can predefine some logic which supports config >>>>>>>>> control, such as filter condition. >>>>>>>>> 2. Run the job. >>>>>>>>> 3. If the user wants to change the job's running logic, just >>>>>>>>> send a restful request with the responding config. >>>>>>>>> >>>>>>>>> Other control modes will also be considered in the future. More >>>>>>>>> introduction can refer to the doc >>>>>>>>> https://docs.google.com/document/d/1WSU3Tw-pSOcblm3vhKFYApzVkb-UQ3kxso8c8jEzIuA/edit?usp=sharing >>>>>>>>> . If the community likes the proposal, more discussion is needed and >>>>>>>>> a more >>>>>>>>> detailed design will be given later. Any suggestions and ideas are >>>>>>>>> welcome. >>>>>>>>> >>>>>>>>> >>>>>> >>>>>> ------------------------------ >>>>>> If you reply to this email, your message will be added to the >>>>>> discussion below: >>>>>> >>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Add-control-mode-for-flink-tp44203p44245.html >>>>>> To start a new topic under Apache Flink User Mailing List archive., >>>>>> email [hidden email] >>>>>> <http:///user/SendEmail.jtp?type=node&node=44278&i=11> >>>>>> To unsubscribe from Apache Flink User Mailing List archive., click >>>>>> here. >>>>>> NAML >>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >>>>>> >>>>> >>> > > ------------------------------ > If you reply to this email, your message will be added to the discussion > below: > > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Add-control-mode-for-flink-tp44203p44278.html > To start a new topic under Apache Flink User Mailing List archive., email > ml+s2336050n1...@n4.nabble.com > To unsubscribe from Apache Flink User Mailing List archive., click here > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=bGl1amlhbmdhbmdwZW5nQGdtYWlsLmNvbXwxfC0xMTYwNzM3MjI=> > . > NAML > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >