big +1 for this feature,

   1. Reset kafka offset in certain cases.
   2. Stop checkpoint in certain cases.
   3. Change log level for debug.


刘建刚 <liujiangangp...@gmail.com> 于2021年6月11日周五 下午12:17写道:

>     Thanks for all the discussions and suggestions. Since the topic has
> been discussed for about a week, it is time to have a conclusion and new
> ideas are welcomed at the same time.
>     First, the topic starts with use cases in restful interface. The
> restful interface supported many useful interactions with users, for
> example as follows. It is an easy way to control the job compared with
> broadcast api.
>
>    1. Change data processing’ logic by dynamic configs, such as filter
>    condition.
>    2. Define some tools to control the job, such as QPS limit, sampling,
>    change log level and so on.
>
>     Second, we broaden the topic to control flow in order to support all
> kinds of control events besides the above user cases. There is a strong
> demand to support custom (broadcast) events for iteration, SQL control
> events and so on. As Xintong Song said, the key to the control flow lies as
> follows:
>
>    1. Who (which component) is responsible for generating the control
>    messages? It may be the jobmaster by some ways, the inner operator and so
>    on.
>    2. Who (which component) is responsible for reacting to the messages.
>    3. How do the messages propagate? Flink should support sending control
>    messages by channels.
>    4. When it comes to affecting the computation logics, how should the
>    control flow work together with the exact-once consistency.  To use the
>    checkpoint mechanism, control messages flowing from source to down tasks
>    may be a good idea.
>
>     Third, a common and flexible control flow design requires good design
> and implementation as a base. Future features and existing features should
> both be considered. For future features, a common restful interface is
> first needed to support dynamic configs. For existing features, There exist
> checkpoint barriers, watermark and latency marker. They have some special
> behaviors but also share a lot in common. The common logic should be
> considered but maybe they should remain unchanged until the control flow is
> stable.
>     Some other problems as follows:
>
>    1. How to persist the control signals when the jobmaster fails? An
>    idea is to persist control signals in HighAvailabilityServices and replay
>    them after failover. The restful request should be non-blocking.
>    2. Should all the operators receive the control messages? All
>    operators should have the ability to receive upper operators' control
>    messages but maybe not process them. If we want to persist the control
>    message state, all the subtasks belonging to one operator should have the
>    same control events in order to rescale easily.
>
>     For the next step, I will draft a FLIP with the scope of common
> control flow framework. More discussions, ideas and problems are still
> welcome.
>
> Thank you~
>
> Jiangang Liu
>
>
>
>
>
>
>
> Xintong Song <tonysong...@gmail.com> 于2021年6月9日周三 下午12:01写道:
>
>> >
>> > 2. There are two kinds of existing special elements, special stream
>> > records (e.g. watermarks) and events (e.g. checkpoint barrier). They all
>> > flow through the whole DAG, but events needs to be acknowledged by
>> > downstream and can overtake records, while stream records are not). So
>> I’m
>> > wondering if we plan to unify the two approaches in the new control flow
>> > (as Xintong mentioned both in the previous mails)?
>> >
>>
>> TBH, I don't really know yet. We feel that the control flow is a
>> non-trivial topic and it would be better to bring it up publicly as early
>> as possible, while the concrete plan is still on the way.
>>
>> Personally, I'm leaning towards not touching the existing watermarks and
>> checkpoint barriers in the first step.
>> - I'd expect the control flow to be introduced as an experimental feature
>> that takes time to stabilize. It would be better that the existing
>> important features like checkpointing and watermarks stay unaffected.
>> - Checkpoint barriers are a little different, as other control messages
>> somehow rely on it to achieve exactly once consistency. Without the
>> concrete design, I'm not entirely sure whether it can be properly modeled
>> as a special case of general control messages.
>> - Watermarks are probably similar to the other control messages. However,
>> it's already exposed to users as public APIs. If we want to migrate it to
>> the new control flow, we'd be very careful not to break any compatibility.
>>
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Wed, Jun 9, 2021 at 11:30 AM Steven Wu <stevenz...@gmail.com> wrote:
>>
>> > > producing control events from JobMaster is similar to triggering a
>> > savepoint.
>> >
>> > Paul, here is what I see the difference. Upon job or jobmanager
>> recovery,
>> > we don't need to recover and replay the savepoint trigger signal.
>> >
>> > On Tue, Jun 8, 2021 at 8:20 PM Paul Lam <paullin3...@gmail.com> wrote:
>> >
>> >> +1 for this feature. Setting up a separate control stream is too much
>> for
>> >> many use cases, it would very helpful if users can leverage the
>> built-in
>> >> control flow of Flink.
>> >>
>> >> My 2 cents:
>> >> 1. @Steven IMHO, producing control events from JobMaster is similar to
>> >> triggering a savepoint. The REST api is non-blocking, and users should
>> poll
>> >> the results to confirm the operation is succeeded. If something goes
>> wrong,
>> >> it’s user’s responsibility to retry.
>> >> 2. There are two kinds of existing special elements, special stream
>> >> records (e.g. watermarks) and events (e.g. checkpoint barrier). They
>> all
>> >> flow through the whole DAG, but events needs to be acknowledged by
>> >> downstream and can overtake records, while stream records are not). So
>> I’m
>> >> wondering if we plan to unify the two approaches in the new control
>> flow
>> >> (as Xintong mentioned both in the previous mails)?
>> >>
>> >> Best,
>> >> Paul Lam
>> >>
>> >> 2021年6月8日 14:08,Steven Wu <stevenz...@gmail.com> 写道:
>> >>
>> >>
>> >> I can see the benefits of control flow. E.g., it might help the old
>> (and
>> >> inactive) FLIP-17 side input. I would suggest that we add more details
>> of
>> >> some of the potential use cases.
>> >>
>> >> Here is one mismatch with using control flow for dynamic config.
>> Dynamic
>> >> config is typically targeted/loaded by one specific operator. Control
>> flow
>> >> will propagate the dynamic config to all operators. not a problem per
>> se
>> >>
>> >> Regarding using the REST api (to jobmanager) for accepting control
>> >> signals from external system, where are we going to persist/checkpoint
>> the
>> >> signal? jobmanager can die before the control signal is propagated and
>> >> checkpointed. Did we lose the control signal in this case?
>> >>
>> >>
>> >> On Mon, Jun 7, 2021 at 11:05 PM Xintong Song <tonysong...@gmail.com>
>> >> wrote:
>> >>
>> >>> +1 on separating the effort into two steps:
>> >>>
>> >>>    1. Introduce a common control flow framework, with flexible
>> >>>    interfaces for generating / reacting to control messages for
>> various
>> >>>    purposes.
>> >>>    2. Features that leverating the control flow can be worked on
>> >>>    concurrently
>> >>>
>> >>> Meantime, keeping collecting potential features that may leverage the
>> >>> control flow should be helpful. It provides good inputs for the
>> control
>> >>> flow framework design, to make the framework common enough to cover
>> the
>> >>> potential use cases.
>> >>>
>> >>> My suggestions on the next steps:
>> >>>
>> >>>    1. Allow more time for opinions to be heard and potential use cases
>> >>>    to be collected
>> >>>    2. Draft a FLIP with the scope of common control flow framework
>> >>>    3. We probably need a poc implementation to make sure the framework
>> >>>    covers at least the following scenarios
>> >>>       1. Produce control events from arbitrary operators
>> >>>       2. Produce control events from JobMaster
>> >>>       3. Consume control events from arbitrary operators downstream
>> >>>       where the events are produced
>> >>>
>> >>>
>> >>> Thank you~
>> >>> Xintong Song
>> >>>
>> >>>
>> >>>
>> >>> On Tue, Jun 8, 2021 at 1:37 PM Yun Gao <yungao...@aliyun.com> wrote:
>> >>>
>> >>>> Very thanks Jiangang for bringing this up and very thanks for the
>> >>>> discussion!
>> >>>>
>> >>>> I also agree with the summarization by Xintong and Jing that control
>> >>>> flow seems to be
>> >>>> a common buidling block for many functionalities and dynamic
>> >>>> configuration framework
>> >>>> is a representative application that frequently required by users.
>> >>>> Regarding the control flow,
>> >>>> currently we are also considering the design of iteration for the
>> >>>> flink-ml, and as Xintong has pointed
>> >>>> out, it also required the control flow in cases like detection global
>> >>>> termination inside the iteration
>> >>>>  (in this case we need to broadcast an event through the iteration
>> >>>> body to detect if there are still
>> >>>> records reside in the iteration body). And regarding  whether to
>> >>>> implement the dynamic configuration
>> >>>> framework, I also agree with Xintong that the consistency guarantee
>> >>>> would be a point to consider, we
>> >>>> might consider if we need to ensure every operator could receive the
>> >>>> dynamic configuration.
>> >>>>
>> >>>> Best,
>> >>>> Yun
>> >>>>
>> >>>>
>> >>>>
>> >>>> ------------------------------------------------------------------
>> >>>> Sender:kai wang<yiduwang...@gmail.com>
>> >>>> Date:2021/06/08 11:52:12
>> >>>> Recipient:JING ZHANG<beyond1...@gmail.com>
>> >>>> Cc:刘建刚<liujiangangp...@gmail.com>; Xintong Song [via Apache Flink
>> User
>> >>>> Mailing List archive.]<ml+s2336050n44245...@n4.nabble.com>; user<
>> >>>> u...@flink.apache.org>; dev<dev@flink.apache.org>
>> >>>> Theme:Re: Add control mode for flink
>> >>>>
>> >>>>
>> >>>>
>> >>>> I'm big +1 for this feature.
>> >>>>
>> >>>>    1. Limit the input qps.
>> >>>>    2. Change log level for debug.
>> >>>>
>> >>>> in my team, the two examples above are needed
>> >>>>
>> >>>> JING ZHANG <beyond1...@gmail.com> 于2021年6月8日周二 上午11:18写道:
>> >>>>
>> >>>>> Thanks Jiangang for bringing this up.
>> >>>>> As mentioned in Jiangang's email, `dynamic configuration framework`
>> >>>>> provides many useful functions in Kuaishou, because it could update
>> job
>> >>>>> behavior without relaunching the job. The functions are very
>> popular in
>> >>>>> Kuaishou, we also see similar demands in maillist [1].
>> >>>>>
>> >>>>> I'm big +1 for this feature.
>> >>>>>
>> >>>>> Thanks Xintong and Yun for deep thoughts about the issue. I like the
>> >>>>> idea about introducing control mode in Flink.
>> >>>>> It takes the original issue a big step closer to essence which also
>> >>>>> provides the possibility for more fantastic features as mentioned in
>> >>>>> Xintong and Jark's response.
>> >>>>> Based on the idea, there are at least two milestones to achieve the
>> >>>>> goals which were proposed by Jiangang:
>> >>>>> (1) Build a common control flow framework in Flink.
>> >>>>>      It focuses on control flow propagation. And, how to integrate
>> the
>> >>>>> common control flow framework with existing mechanisms.
>> >>>>> (2) Builds a dynamic configuration framework which is exposed to
>> users
>> >>>>> directly.
>> >>>>>      We could see dynamic configuration framework is a top
>> application
>> >>>>> on the underlying control flow framework.
>> >>>>>      It focuses on the Public API which receives configuration
>> >>>>> updating requests from users. Besides, it is necessary to introduce
>> an API
>> >>>>> protection mechanism to avoid job performance degradation caused by
>> too
>> >>>>> many control events.
>> >>>>>
>> >>>>> I suggest splitting the whole design into two after we reach a
>> >>>>> consensus on whether to introduce this feature because these two
>> sub-topic
>> >>>>> all need careful design.
>> >>>>>
>> >>>>>
>> >>>>> [
>> >>>>>
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamic-configuration-of-Flink-checkpoint-interval-td44059.html
>> >>>>> ]
>> >>>>>
>> >>>>> Best regards,
>> >>>>> JING ZHANG
>> >>>>>
>> >>>>> 刘建刚 <liujiangangp...@gmail.com> 于2021年6月8日周二 上午10:01写道:
>> >>>>>
>> >>>>>> Thanks Xintong Song for the detailed supplement. Since flink is
>> >>>>>> long-running, it is similar to many services. So interacting with
>> it or
>> >>>>>> controlling it is a common desire. This was our initial thought
>> when
>> >>>>>> implementing the feature. In our inner flink, many configs used in
>> yaml can
>> >>>>>> be adjusted by dynamic to avoid restarting the job, for examples
>> as follow:
>> >>>>>>
>> >>>>>>    1. Limit the input qps.
>> >>>>>>    2. Degrade the job by sampling and so on.
>> >>>>>>    3. Reset kafka offset in certain cases.
>> >>>>>>    4. Stop checkpoint in certain cases.
>> >>>>>>    5. Control the history consuming.
>> >>>>>>    6. Change log level for debug.
>> >>>>>>
>> >>>>>>
>> >>>>>> After deep discussion, we realize that a common control flow
>> >>>>>> will benefit both users and developers. Dynamic config is just one
>> of the
>> >>>>>> use cases. For the concrete design and implementation, it relates
>> with many
>> >>>>>> components, like jobmaster, network channel, operators and so on,
>> which
>> >>>>>> needs deeper consideration and design.
>> >>>>>>
>> >>>>>> Xintong Song [via Apache Flink User Mailing List archive.] <
>> >>>>>> ml+s2336050n44245...@n4.nabble.com> 于2021年6月7日周一 下午2:52写道:
>> >>>>>>
>> >>>>>>> Thanks Jiangang for bringing this up, and Steven & Peter for the
>> >>>>>>> feedback.
>> >>>>>>>
>> >>>>>>> I was part of the preliminary offline discussions before this
>> >>>>>>> proposal went public. So maybe I can help clarify things a bit.
>> >>>>>>>
>> >>>>>>> In short, despite the phrase "control mode" might be a bit
>> >>>>>>> misleading, what we truly want to do from my side is to make the
>> concept of
>> >>>>>>> "control flow" explicit and expose it to users.
>> >>>>>>>
>> >>>>>>> ## Background
>> >>>>>>> Jiangang & his colleagues at Kuaishou maintain an internal version
>> >>>>>>> of Flink. One of their custom features is allowing dynamically
>> changing
>> >>>>>>> operator behaviors via the REST APIs. He's willing to contribute
>> this
>> >>>>>>> feature to the community, and came to Yun Gao and me for
>> suggestions. After
>> >>>>>>> discussion, we feel that the underlying question to be answered
>> is how do
>> >>>>>>> we model the control flow in Flink. Dynamically controlling jobs
>> via REST
>> >>>>>>> API can be one of the features built on top of the control flow,
>> and there
>> >>>>>>> could be others.
>> >>>>>>>
>> >>>>>>> ## Control flow
>> >>>>>>> Control flow refers to the communication channels for sending
>> >>>>>>> events/signals to/between tasks/operators, that changes Flink's
>> behavior in
>> >>>>>>> a way that may or may not affect the computation logic. Typical
>> control
>> >>>>>>> events/signals Flink currently has are watermarks and checkpoint
>> barriers.
>> >>>>>>>
>> >>>>>>> In general, for modeling control flow, the following questions
>> >>>>>>> should be considered.
>> >>>>>>> 1. Who (which component) is responsible for generating the control
>> >>>>>>> messages?
>> >>>>>>> 2. Who (which component) is responsible for reacting to the
>> messages.
>> >>>>>>> 3. How do the messages propagate?
>> >>>>>>> 4. When it comes to affecting the computation logics, how should
>> the
>> >>>>>>> control flow work together with the exact-once consistency.
>> >>>>>>>
>> >>>>>>> 1) & 2) may vary depending on the use cases, while 3) & 4)
>> probably
>> >>>>>>> share many things in common. A unified control flow model would
>> help
>> >>>>>>> deduplicate the common logics, allowing us to focus on the use
>> case
>> >>>>>>> specific parts.
>> >>>>>>>
>> >>>>>>> E.g.,
>> >>>>>>> - Watermarks: generated by source operators, handled by window
>> >>>>>>> operators.
>> >>>>>>> - Checkpoint barrier: generated by the checkpoint coordinator,
>> >>>>>>> handled by all tasks
>> >>>>>>> - Dynamic controlling: generated by JobMaster (in reaction to the
>> >>>>>>> REST command), handled by specific operators/UDFs
>> >>>>>>> - Operator defined events: The following features are still in
>> >>>>>>> planning, but may potentially benefit from the control flow
>> model. (Please
>> >>>>>>> correct me if I'm wrong, @Yun, @Jark)
>> >>>>>>>   * Iteration: When a certain condition is met, we might want to
>> >>>>>>> signal downstream operators with an event
>> >>>>>>>   * Mini-batch assembling: Flink currently uses special watermarks
>> >>>>>>> for indicating the end of each mini-batch, which makes it tricky
>> to deal
>> >>>>>>> with event time related computations.
>> >>>>>>>   * Hive dimension table join: For periodically reloaded hive
>> >>>>>>> tables, it would be helpful to have specific events signaling
>> that a
>> >>>>>>> reloading is finished.
>> >>>>>>>   * Bootstrap dimension table join: This is similar to the
>> previous
>> >>>>>>> one. In cases where we want to fully load the dimension table
>> before
>> >>>>>>> starting joining the mainstream, it would be helpful to have an
>> event
>> >>>>>>> signaling the finishing of the bootstrap.
>> >>>>>>>
>> >>>>>>> ## Dynamic REST controlling
>> >>>>>>> Back to the specific feature that Jiangang proposed, I personally
>> >>>>>>> think it's quite convenient. Currently, to dynamically change the
>> behavior
>> >>>>>>> of an operator, we need to set up a separate source for the
>> control events
>> >>>>>>> and leverage broadcast state. Being able to send the events via
>> REST APIs
>> >>>>>>> definitely improves the usability.
>> >>>>>>>
>> >>>>>>> Leveraging dynamic configuration frameworks is for sure one
>> possible
>> >>>>>>> approach. The reason we are in favor of introducing the control
>> flow is
>> >>>>>>> that:
>> >>>>>>> - It benefits not only this specific dynamic controlling feature,
>> >>>>>>> but potentially other future features as well.
>> >>>>>>> - AFAICS, it's non-trivial to make a 3rd-party dynamic
>> configuration
>> >>>>>>> framework work together with Flink's consistency mechanism.
>> >>>>>>>
>> >>>>>>> Thank you~
>> >>>>>>> Xintong Song
>> >>>>>>>
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> On Mon, Jun 7, 2021 at 11:05 AM 刘建刚 <[hidden email]
>> >>>>>>> <http://user/SendEmail.jtp?type=node&node=44245&i=0>> wrote:
>> >>>>>>>
>> >>>>>>>> Thank you for the reply. I have checked the post you mentioned.
>> The
>> >>>>>>>> dynamic config may be useful sometimes. But it is hard to keep
>> data
>> >>>>>>>> consistent in flink, for example, what if the dynamic config
>> will take
>> >>>>>>>> effect when failover. Since dynamic config is a desire for
>> users, maybe
>> >>>>>>>> flink can support it in some way.
>> >>>>>>>>
>> >>>>>>>> For the control mode, dynamic config is just one of the control
>> >>>>>>>> modes. In the google doc, I have list some other cases. For
>> example,
>> >>>>>>>> control events are generated in operators or external services.
>> Besides
>> >>>>>>>> user's dynamic config, flink system can support some common
>> dynamic
>> >>>>>>>> configuration, like qps limit, checkpoint control and so on.
>> >>>>>>>>
>> >>>>>>>> It needs good design to handle the control mode structure. Based
>> on
>> >>>>>>>> that, other control features can be added easily later, like
>> changing log
>> >>>>>>>> level when job is running. In the end, flink will not just
>> process data,
>> >>>>>>>> but also interact with users to receive control events like a
>> service.
>> >>>>>>>>
>> >>>>>>>> Steven Wu <[hidden email]
>> >>>>>>>> <http://user/SendEmail.jtp?type=node&node=44245&i=1>>
>> 于2021年6月4日周五
>> >>>>>>>> 下午11:11写道:
>> >>>>>>>>
>> >>>>>>>>> I am not sure if we should solve this problem in Flink. This is
>> >>>>>>>>> more like a dynamic config problem that probably should be
>> solved by some
>> >>>>>>>>> configuration framework. Here is one post from google search:
>> >>>>>>>>>
>> https://medium.com/twodigits/dynamic-app-configuration-inject-configuration-at-run-time-using-spring-boot-and-docker-ffb42631852a
>> >>>>>>>>>
>> >>>>>>>>> On Fri, Jun 4, 2021 at 7:09 AM 刘建刚 <[hidden email]
>> >>>>>>>>> <http://user/SendEmail.jtp?type=node&node=44245&i=2>> wrote:
>> >>>>>>>>>
>> >>>>>>>>>> Hi everyone,
>> >>>>>>>>>>       Flink jobs are always long-running. When the job is
>> >>>>>>>>>> running, users may want to control the job but not stop it.
>> The control
>> >>>>>>>>>> reasons can be different as following:
>> >>>>>>>>>>
>> >>>>>>>>>>    1. Change data processing’ logic, such as filter condition.
>> >>>>>>>>>>    2. Send trigger events to make the progress forward.
>> >>>>>>>>>>    3. Define some tools to degrade the job, such as limit input
>> >>>>>>>>>>    qps, sampling data.
>> >>>>>>>>>>    4. Change log level to debug current problem.
>> >>>>>>>>>>
>> >>>>>>>>>>       The common way to do this is to stop the job, do
>> >>>>>>>>>> modifications and start the job. It may take a long time to
>> recover. In
>> >>>>>>>>>> some situations, stopping jobs is intolerable, for example,
>> the job is
>> >>>>>>>>>> related to money or important activities.So we need some
>> >>>>>>>>>> technologies to control the running job without stopping the
>> job.
>> >>>>>>>>>>
>> >>>>>>>>>> We propose to add control mode for flink. A control mode based
>> on
>> >>>>>>>>>> the restful interface is first introduced. It works by these
>> steps:
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>    1. The user can predefine some logic which supports config
>> >>>>>>>>>>    control, such as filter condition.
>> >>>>>>>>>>    2. Run the job.
>> >>>>>>>>>>    3. If the user wants to change the job's running logic, just
>> >>>>>>>>>>    send a restful request with the responding config.
>> >>>>>>>>>>
>> >>>>>>>>>> Other control modes will also be considered in the future. More
>> >>>>>>>>>> introduction can refer to the doc
>> >>>>>>>>>>
>> https://docs.google.com/document/d/1WSU3Tw-pSOcblm3vhKFYApzVkb-UQ3kxso8c8jEzIuA/edit?usp=sharing
>> >>>>>>>>>> . If the community likes the proposal, more discussion is
>> needed and a more
>> >>>>>>>>>> detailed design will be given later. Any suggestions and ideas
>> are welcome.
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>
>> >>>>>>> ------------------------------
>> >>>>>>> If you reply to this email, your message will be added to the
>> >>>>>>> discussion below:
>> >>>>>>>
>> >>>>>>>
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Add-control-mode-for-flink-tp44203p44245.html
>> >>>>>>> To start a new topic under Apache Flink User Mailing List
>> archive.,
>> >>>>>>> email ml+s2336050n1...@n4.nabble.com
>> >>>>>>> To unsubscribe from Apache Flink User Mailing List archive., click
>> >>>>>>> here
>> >>>>>>> <
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=bGl1amlhbmdhbmdwZW5nQGdtYWlsLmNvbXwxfC0xMTYwNzM3MjI=
>> >
>> >>>>>>> .
>> >>>>>>> NAML
>> >>>>>>> <
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml
>> >
>> >>>>>>>
>> >>>>>>
>> >>>>
>> >>
>>
>

Reply via email to