Very thanks Jiangang for bringing this up and very thanks for the discussion! 

I also agree with the summarization by Xintong and Jing that control flow seems 
to be
a common buidling block for many functionalities and dynamic configuration 
framework
is a representative application that frequently required by users. Regarding 
the control flow, 
currently we are also considering the design of iteration for the flink-ml, and 
as Xintong has pointed
out, it also required the control flow in cases like detection global 
termination inside the iteration
 (in this case we need to broadcast an event through the iteration body to 
detect if there are still 
records reside in the iteration body). And regarding  whether to implement the 
dynamic configuration 
framework, I also agree with Xintong that the consistency guarantee would be a 
point to consider, we 
might consider if we need to ensure every operator could receive the dynamic 
configuration. 

Best,
Yun



------------------------------------------------------------------
Sender:kai wang<yiduwang...@gmail.com>
Date:2021/06/08 11:52:12
Recipient:JING ZHANG<beyond1...@gmail.com>
Cc:刘建刚<liujiangangp...@gmail.com>; Xintong Song [via Apache Flink User Mailing 
List archive.]<ml+s2336050n44245...@n4.nabble.com>; 
user<u...@flink.apache.org>; dev<dev@flink.apache.org>
Theme:Re: Add control mode for flink



I'm big +1 for this feature. 

Limit the input qps.
Change log level for debug.
in my team, the two examples above are needed
JING ZHANG <beyond1...@gmail.com> 于2021年6月8日周二 上午11:18写道:

Thanks Jiangang for bringing this up. 
As mentioned in Jiangang's email, `dynamic configuration framework` provides 
many useful functions in Kuaishou, because it could update job behavior without 
relaunching the job. The functions are very popular in Kuaishou, we also see 
similar demands in maillist [1].

I'm big +1 for this feature.

Thanks Xintong and Yun for deep thoughts about the issue. I like the idea about 
introducing control mode in Flink. 
It takes the original issue a big step closer to essence which also provides 
the possibility for more fantastic features as mentioned in Xintong and Jark's 
response.
Based on the idea, there are at least two milestones to achieve the goals which 
were proposed by Jiangang:
(1) Build a common control flow framework in Flink. 
     It focuses on control flow propagation. And, how to integrate the common 
control flow framework with existing mechanisms.
(2) Builds a dynamic configuration framework which is exposed to users 
directly. 
     We could see dynamic configuration framework is a top application on the 
underlying control flow framework. 
     It focuses on the Public API which receives configuration updating 
requests from users. Besides, it is necessary to introduce an API protection 
mechanism to avoid job performance degradation caused by too many control 
events.

I suggest splitting the whole design into two after we reach a consensus on 
whether to introduce this feature because these two sub-topic all need careful 
design.


[http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamic-configuration-of-Flink-checkpoint-interval-td44059.html]

Best regards,
JING ZHANG
刘建刚 <liujiangangp...@gmail.com> 于2021年6月8日周二 上午10:01写道:

Thanks Xintong Song for the detailed supplement. Since flink is long-running, 
it is similar to many services. So interacting with it or controlling it is a 
common desire. This was our initial thought when implementing the feature. In 
our inner flink, many configs used in yaml can be adjusted by dynamic to avoid 
restarting the job, for examples as follow:

Limit the input qps.
Degrade the job by sampling and so on.
Reset kafka offset in certain cases.
Stop checkpoint in certain cases.
Control the history consuming.
Change log level for debug.

After deep discussion, we realize that a common control flow will benefit both 
users and developers. Dynamic config is just one of the use cases. For the 
concrete design and implementation, it relates with many components, like 
jobmaster, network channel, operators and so on, which needs deeper 
consideration and design. 
Xintong Song [via Apache Flink User Mailing List archive.] 
<ml+s2336050n44245...@n4.nabble.com> 于2021年6月7日周一 下午2:52写道:

Thanks Jiangang for bringing this up, and Steven & Peter for the feedback.

I was part of the preliminary offline discussions before this proposal went 
public. So maybe I can help clarify things a bit.

In short, despite the phrase "control mode" might be a bit misleading, what we 
truly want to do from my side is to make the concept of "control flow" explicit 
and expose it to users.

## Background
Jiangang & his colleagues at Kuaishou maintain an internal version of Flink. 
One of their custom features is allowing dynamically changing operator 
behaviors via the REST APIs. He's willing to contribute this feature to the 
community, and came to Yun Gao and me for suggestions. After discussion, we 
feel that the underlying question to be answered is how do we model the control 
flow in Flink. Dynamically controlling jobs via REST API can be one of the 
features built on top of the control flow, and there could be others.

## Control flow
Control flow refers to the communication channels for sending events/signals 
to/between tasks/operators, that changes Flink's behavior in a way that may or 
may not affect the computation logic. Typical control events/signals Flink 
currently has are watermarks and checkpoint barriers. 

In general, for modeling control flow, the following questions should be 
considered.
1. Who (which component) is responsible for generating the control messages?
2. Who (which component) is responsible for reacting to the messages.
3. How do the messages propagate?
4. When it comes to affecting the computation logics, how should the control 
flow work together with the exact-once consistency. 

1) & 2) may vary depending on the use cases, while 3) & 4) probably share many 
things in common. A unified control flow model would help deduplicate the 
common logics, allowing us to focus on the use case specific parts.

E.g.,
- Watermarks: generated by source operators, handled by window operators.
- Checkpoint barrier: generated by the checkpoint coordinator, handled by all 
tasks
- Dynamic controlling: generated by JobMaster (in reaction to the REST 
command), handled by specific operators/UDFs
- Operator defined events: The following features are still in planning, but 
may potentially benefit from the control flow model. (Please correct me if I'm 
wrong, @Yun, @Jark)
  * Iteration: When a certain condition is met, we might want to signal 
downstream operators with an event
  * Mini-batch assembling: Flink currently uses special watermarks for 
indicating the end of each mini-batch, which makes it tricky to deal with event 
time related computations.
  * Hive dimension table join: For periodically reloaded hive tables, it would 
be helpful to have specific events signaling that a reloading is finished.
  * Bootstrap dimension table join: This is similar to the previous one. In 
cases where we want to fully load the dimension table before starting joining 
the mainstream, it would be helpful to have an event signaling the finishing of 
the bootstrap.

## Dynamic REST controlling
Back to the specific feature that Jiangang proposed, I personally think it's 
quite convenient. Currently, to dynamically change the behavior of an operator, 
we need to set up a separate source for the control events and leverage 
broadcast state. Being able to send the events via REST APIs definitely 
improves the usability.

Leveraging dynamic configuration frameworks is for sure one possible approach. 
The reason we are in favor of introducing the control flow is that:
- It benefits not only this specific dynamic controlling feature, but 
potentially other future features as well.
- AFAICS, it's non-trivial to make a 3rd-party dynamic configuration framework 
work together with Flink's consistency mechanism.

Thank you~
Xintong Song


On Mon, Jun 7, 2021 at 11:05 AM 刘建刚 <[hidden email]> wrote:

Thank you for the reply. I have checked the post you mentioned. The dynamic 
config may be useful sometimes. But it is hard to keep data consistent in 
flink, for example, what if the dynamic config will take effect when failover. 
Since dynamic config is a desire for users, maybe flink can support it in some 
way.

For the control mode, dynamic config is just one of the control modes. In the 
google doc, I have list some other cases. For example, control events are 
generated in operators or external services. Besides user's dynamic config, 
flink system can support some common dynamic configuration, like qps limit, 
checkpoint control and so on.

It needs good design to handle the control mode structure. Based on that, other 
control features can be added easily later, like changing log level when job is 
running. In the end, flink will not just process data, but also interact with 
users to receive control events like a service.
Steven Wu <[hidden email]> 于2021年6月4日周五 下午11:11写道:

I am not sure if we should solve this problem in Flink. This is more like a 
dynamic config problem that probably should be solved by some configuration 
framework. Here is one post from google search: 
https://medium.com/twodigits/dynamic-app-configuration-inject-configuration-at-run-time-using-spring-boot-and-docker-ffb42631852a
On Fri, Jun 4, 2021 at 7:09 AM 刘建刚 <[hidden email]> wrote:

Hi everyone,

      Flink jobs are always long-running. When the job is running, users may 
want to control the job but not stop it. The control reasons can be different 
as following:
Change data processing’ logic, such as filter condition.
Send trigger events to make the progress forward.
Define some tools to degrade the job, such as limit input qps, sampling data.
Change log level to debug current problem.
      The common way to do this is to stop the job, do modifications and start 
the job. It may take a long time to recover. In some situations, stopping jobs 
is intolerable, for example, the job is related to money or important 
activities.So we need some technologies to control the running job without 
stopping the job.

 We propose to add control mode for flink. A control mode based on the restful 
interface is first introduced. It works by these steps:

The user can predefine some logic which supports config control, such as filter 
condition.
Run the job.
If the user wants to change the job's running logic, just send a restful 
request with the responding config.
 Other control modes will also be considered in the future. More introduction 
can refer to the doc 
https://docs.google.com/document/d/1WSU3Tw-pSOcblm3vhKFYApzVkb-UQ3kxso8c8jEzIuA/edit?usp=sharing
 . If the community likes the proposal, more discussion is needed and a more 
detailed design will be given later. Any suggestions and ideas are welcome.



If you reply to this email, your message will be added to the discussion below: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Add-control-mode-for-flink-tp44203p44245.html
 
 To start a new topic under Apache Flink User Mailing List archive., email 
ml+s2336050n1...@n4.nabble.com 
 To unsubscribe from Apache Flink User Mailing List archive., click here.
NAML    

Reply via email to