Thanks Jiangang for bringing this up, and Steven & Peter for the feedback.

I was part of the preliminary offline discussions before this proposal went
public. So maybe I can help clarify things a bit.

In short, despite the phrase "control mode" might be a bit misleading, what
we truly want to do from my side is to make the concept of "control flow"
explicit and expose it to users.

## Background
Jiangang & his colleagues at Kuaishou maintain an internal version of
Flink. One of their custom features is allowing dynamically changing
operator behaviors via the REST APIs. He's willing to contribute this
feature to the community, and came to Yun Gao and me for suggestions. After
discussion, we feel that the underlying question to be answered is how do
we model the control flow in Flink. Dynamically controlling jobs via REST
API can be one of the features built on top of the control flow, and there
could be others.

## Control flow
Control flow refers to the communication channels for sending
events/signals to/between tasks/operators, that changes Flink's behavior in
a way that may or may not affect the computation logic. Typical control
events/signals Flink currently has are watermarks and checkpoint barriers.

In general, for modeling control flow, the following questions should be
considered.
1. Who (which component) is responsible for generating the control messages?
2. Who (which component) is responsible for reacting to the messages.
3. How do the messages propagate?
4. When it comes to affecting the computation logics, how should the
control flow work together with the exact-once consistency.

1) & 2) may vary depending on the use cases, while 3) & 4) probably share
many things in common. A unified control flow model would help deduplicate
the common logics, allowing us to focus on the use case specific parts.

E.g.,
- Watermarks: generated by source operators, handled by window operators.
- Checkpoint barrier: generated by the checkpoint coordinator, handled by
all tasks
- Dynamic controlling: generated by JobMaster (in reaction to the REST
command), handled by specific operators/UDFs
- Operator defined events: The following features are still in planning,
but may potentially benefit from the control flow model. (Please correct me
if I'm wrong, @Yun, @Jark)
  * Iteration: When a certain condition is met, we might want to signal
downstream operators with an event
  * Mini-batch assembling: Flink currently uses special watermarks for
indicating the end of each mini-batch, which makes it tricky to deal with
event time related computations.
  * Hive dimension table join: For periodically reloaded hive tables, it
would be helpful to have specific events signaling that a reloading is
finished.
  * Bootstrap dimension table join: This is similar to the previous one. In
cases where we want to fully load the dimension table before starting
joining the mainstream, it would be helpful to have an event signaling the
finishing of the bootstrap.

## Dynamic REST controlling
Back to the specific feature that Jiangang proposed, I personally think
it's quite convenient. Currently, to dynamically change the behavior of an
operator, we need to set up a separate source for the control events and
leverage broadcast state. Being able to send the events via REST APIs
definitely improves the usability.

Leveraging dynamic configuration frameworks is for sure one possible
approach. The reason we are in favor of introducing the control flow is
that:
- It benefits not only this specific dynamic controlling feature, but
potentially other future features as well.
- AFAICS, it's non-trivial to make a 3rd-party dynamic configuration
framework work together with Flink's consistency mechanism.

Thank you~

Xintong Song



On Mon, Jun 7, 2021 at 11:05 AM 刘建刚 <liujiangangp...@gmail.com> wrote:

> Thank you for the reply. I have checked the post you mentioned. The
> dynamic config may be useful sometimes. But it is hard to keep data
> consistent in flink, for example, what if the dynamic config will take
> effect when failover. Since dynamic config is a desire for users, maybe
> flink can support it in some way.
>
> For the control mode, dynamic config is just one of the control modes. In
> the google doc, I have list some other cases. For example, control events
> are generated in operators or external services. Besides user's dynamic
> config, flink system can support some common dynamic configuration, like
> qps limit, checkpoint control and so on.
>
> It needs good design to handle the control mode structure. Based on that,
> other control features can be added easily later, like changing log level
> when job is running. In the end, flink will not just process data, but also
> interact with users to receive control events like a service.
>
> Steven Wu <stevenz...@gmail.com> 于2021年6月4日周五 下午11:11写道:
>
>> I am not sure if we should solve this problem in Flink. This is more like
>> a dynamic config problem that probably should be solved by some
>> configuration framework. Here is one post from google search:
>> https://medium.com/twodigits/dynamic-app-configuration-inject-configuration-at-run-time-using-spring-boot-and-docker-ffb42631852a
>>
>> On Fri, Jun 4, 2021 at 7:09 AM 刘建刚 <liujiangangp...@gmail.com> wrote:
>>
>>> Hi everyone,
>>>
>>>       Flink jobs are always long-running. When the job is running, users
>>> may want to control the job but not stop it. The control reasons can be
>>> different as following:
>>>
>>>    1.
>>>
>>>    Change data processing’ logic, such as filter condition.
>>>    2.
>>>
>>>    Send trigger events to make the progress forward.
>>>    3.
>>>
>>>    Define some tools to degrade the job, such as limit input qps,
>>>    sampling data.
>>>    4.
>>>
>>>    Change log level to debug current problem.
>>>
>>>       The common way to do this is to stop the job, do modifications and
>>> start the job. It may take a long time to recover. In some situations,
>>> stopping jobs is intolerable, for example, the job is related to money or
>>> important activities.So we need some technologies to control the
>>> running job without stopping the job.
>>>
>>>
>>> We propose to add control mode for flink. A control mode based on the
>>> restful interface is first introduced. It works by these steps:
>>>
>>>
>>>    1. The user can predefine some logic which supports config control,
>>>    such as filter condition.
>>>    2. Run the job.
>>>    3. If the user wants to change the job's running logic, just send a
>>>    restful request with the responding config.
>>>
>>> Other control modes will also be considered in the future. More
>>> introduction can refer to the doc
>>> https://docs.google.com/document/d/1WSU3Tw-pSOcblm3vhKFYApzVkb-UQ3kxso8c8jEzIuA/edit?usp=sharing
>>> . If the community likes the proposal, more discussion is needed and a more
>>> detailed design will be given later. Any suggestions and ideas are welcome.
>>>
>>>

Reply via email to