Thanks for the valuable design. The auto-detecting can decrease great work
for us. We have implemented the similar feature in our inner flink version.
Below is something that I care about:

   1. For auto-detecting, I wonder how to make the strategy and mark a node
   blocked? Sometimes the blocked node is hard to be detected, for example,
   the upper node or the down node will be blocked when network unreachable.
   2. I see that the strategy is made in JobMaster side. How about
   implementing the similar logic in resource manager? In session mode, multi
   jobs can fail on the same bad node and the node should be marked blocked.
   If the job makes the strategy, the node may be not marked blocked if the
   fail times don't exceed the threshold.


Zhu Zhu <reed...@gmail.com> 于2022年5月5日周四 23:35写道:

> Thank you for all your feedback!
>
> Besides the answers from Lijie, I'd like to share some of my thoughts:
> 1. Whether to enable automatical blocklist
> Generally speaking, it is not a goal of FLIP-224.
> The automatical way should be something built upon the blocklist
> mechanism and well decoupled. It was designed to be a configurable
> blocklist strategy, but I think we can further decouple it by
> introducing a abnormal node detector, as Becket suggested, which just
> uses the blocklist mechanism once bad nodes are detected. However, it
> should be a separate FLIP with further dev discussions and feedback
> from users. I also agree with Becket that different users have different
> requirements, and we should listen to them.
>
> 2. Is it enough to just take away abnormal nodes externally
> My answer is no. As Lijie has mentioned, we need a way to avoid
> deploying tasks to temporary hot nodes. In this case, users may just
> want to limit the load of the node and do not want to kill all the
> processes on it. Another case is the speculative execution[1] which
> may also leverage this feature to avoid starting mirror tasks on slow
> nodes.
>
> Thanks,
> Zhu
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+execution+for+Batch+Job
>
> Lijie Wang <wangdachui9...@gmail.com> 于2022年5月5日周四 15:56写道:
>
> >
> > Hi everyone,
> >
> >
> > Thanks for your feedback.
> >
> >
> > There's one detail that I'd like to re-emphasize here because it can
> affect the value and design of the blocklist mechanism (perhaps I should
> highlight it in the FLIP). We propose two actions in FLIP:
> >
> > 1) MARK_BLOCKLISTED: Just mark the task manager or node as blocked.
> Future slots should not be allocated from the blocked task manager or node.
> But slots that are already allocated will not be affected. A typical
> application scenario is to mitigate machine hotspots. In this case, we hope
> that subsequent resource allocations will not be on the hot machine, but
> tasks currently running on it should not be affected.
> >
> > 2) MARK_BLOCKLISTED_AND_EVACUATE_TASKS: Mark the task manager or node as
> blocked, and evacuate all tasks on it. Evacuated tasks will be restarted on
> non-blocked task managers.
> >
> > For the above 2 actions, the former may more highlight the meaning of
> this FLIP, because the external system cannot do that.
> >
> >
> > Regarding *Manually* and *Automatically*, I basically agree with @Becket
> Qin: different users have different answers. Not all users’ deployment
> environments have a special external system that can perform the anomaly
> detection. In addition, adding pluggable/optional auto-detection doesn't
> require much extra work on top of manual specification.
> >
> >
> > I will answer your other questions one by one.
> >
> >
> > @Yangze
> >
> > a) I think you are right, we do not need to expose the
> `cluster.resource-blocklist.item.timeout-check-interval` to users.
> >
> > b) We can abstract the `notifyException` to a separate interface (maybe
> BlocklistExceptionListener), and the ResourceManagerBlocklistHandler can
> implement it in the future.
> >
> >
> > @Martijn
> >
> > a) I also think the manual blocking should be done by cluster operators.
> >
> > b) I think manual blocking makes sense, because according to my
> experience, users are often the first to perceive the machine problems
> (because of job failover or delay), and they will contact cluster operators
> to solve it, or even tell the cluster operators which machine is
> problematic. From this point of view, I think the people who really need
> the manual blocking are the users, and it’s just performed by the cluster
> operator, so I think the manual blocking makes sense.
> >
> >
> > @Chesnay
> >
> > We need to touch the logic of JM/SlotPool, because for MARK_BLOCKLISTED
> , we need to know whether the slot is blocklisted when the task is
> FINISHED/CANCELLED/FAILED. If so,  SlotPool should release the slot
> directly to avoid assigning other tasks (of this job) on it. If we only
> maintain the blocklist information on the RM, JM needs to retrieve it by
> RPC. I think the performance overhead of that is relatively large, so I
> think it's worth maintaining the blocklist information on the JM side and
> syncing them.
> >
> >
> > @Роман
> >
> >     a) “Probably storing inside Zookeeper/Configmap might be helpful
> here.”  Can you explain it in detail? I don't fully understand that. In my
> opinion, non-active and active are the same, and no special treatment is
> required.
> >
> > b) I agree with you, the `endTimestamp` makes sense, I will add it to
> FLIP.
> >
> >
> > @Yang
> >
> > As mentioned above, AFAK, the external system cannot support the
> MARK_BLOCKLISTED action.
> >
> >
> > Looking forward to your further feedback.
> >
> >
> > Best,
> >
> > Lijie
> >
> >
> > Yang Wang <danrtsey...@gmail.com> 于2022年5月3日周二 21:09写道:
> >>
> >> Thanks Lijie and Zhu for creating the proposal.
> >>
> >> I want to share some thoughts about Flink cluster operations.
> >>
> >> In the production environment, the SRE(aka Site Reliability Engineer)
> >> already has many tools to detect the unstable nodes, which could take
> the
> >> system logs/metrics into consideration.
> >> Then they use graceful-decomission in YARN and taint in K8s to prevent
> new
> >> allocations on these unstable nodes.
> >> At last, they will evict all the containers and pods running on these
> nodes.
> >> This mechanism also works for planned maintenance. So I am afraid this
> is
> >> not the typical use case for FLIP-224.
> >>
> >> If we only support to block nodes manually, then I could not see
> >> the obvious advantages compared with current SRE's approach(via *yarn
> >> rmadmin or kubectl taint*).
> >> At least, we need to have a pluggable component which could expose the
> >> potential unstable nodes automatically and block them if enabled
> explicitly.
> >>
> >>
> >> Best,
> >> Yang
> >>
> >>
> >>
> >> Becket Qin <becket....@gmail.com> 于2022年5月2日周一 16:36写道:
> >>
> >> > Thanks for the proposal, Lijie.
> >> >
> >> > This is an interesting feature and discussion, and somewhat related
> to the
> >> > design principle about how people should operate Flink.
> >> >
> >> > I think there are three things involved in this FLIP.
> >> >      a) Detect and report the unstable node.
> >> >      b) Collect the information of the unstable node and form a
> blocklist.
> >> >      c) Take the action to block nodes.
> >> >
> >> > My two cents:
> >> >
> >> > 1. It looks like people all agree that Flink should have c). It is
> not only
> >> > useful for cases of node failures, but also handy for some planned
> >> > maintenance.
> >> >
> >> > 2. People have different opinions on b), i.e. who should be the brain
> to
> >> > make the decision to block a node. I think this largely depends on
> who we
> >> > talk to. Different users would probably give different answers. For
> people
> >> > who do have a centralized node health management service, let Flink
> do just
> >> > do a) and c) would be preferred. So essentially Flink would be one of
> the
> >> > sources that may detect unstable nodes, report it to that service,
> and then
> >> > take the command from that service to block the problematic nodes. On
> the
> >> > other hand, for users who do not have such a service, simply letting
> Flink
> >> > be clever by itself to block the suspicious nodes might be desired to
> >> > ensure the jobs are running smoothly.
> >> >
> >> > So that indicates a) and b) here should be pluggable / optional.
> >> >
> >> > In light of this, maybe it would make sense to have something
> pluggable
> >> > like a UnstableNodeReporter which exposes unstable nodes actively. (A
> more
> >> > general interface should be JobInfoReporter<T> which can be used to
> report
> >> > any information of type <T>. But I'll just keep the scope relevant to
> this
> >> > FLIP here). Personally speaking, I think it is OK to have a default
> >> > implementation of a reporter which just tells Flink to take action to
> block
> >> > problematic nodes and also unblocks them after timeout.
> >> >
> >> > Thanks,
> >> >
> >> > Jiangjie (Becket) Qin
> >> >
> >> >
> >> > On Mon, May 2, 2022 at 3:27 PM Роман Бойко <ro.v.bo...@gmail.com>
> wrote:
> >> >
> >> > > Thanks for good initiative, Lijie and Zhu!
> >> > >
> >> > > If it's possible I'd like to participate in development.
> >> > >
> >> > > I agree with 3rd point of Konstantin's reply - we should consider
> to move
> >> > > somehow the information of blocklisted nodes/TMs from active
> >> > > ResourceManager to non-active ones. Probably storing inside
> >> > > Zookeeper/Configmap might be helpful here.
> >> > >
> >> > > And I agree with Martijn that a lot of organizations don't want to
> expose
> >> > > such API for a cluster user group. But I think it's necessary to
> have the
> >> > > mechanism for unblocking the nodes/TMs anyway for avoiding incorrect
> >> > > automatic behaviour.
> >> > >
> >> > > And another one small suggestion - I think it would be better to
> extend
> >> > the
> >> > > *BlocklistedItem* class with the *endTimestamp* field and fill it
> at the
> >> > > item creation. This simple addition will allow to:
> >> > >
> >> > >    -
> >> > >
> >> > >    Provide the ability to users to setup the exact time of
> blocklist end
> >> > >    through RestAPI
> >> > >    -
> >> > >
> >> > >    Not being tied to a single value of
> >> > >    *cluster.resource-blacklist.item.timeout*
> >> > >
> >> > >
> >> > > On Mon, 2 May 2022 at 14:17, Chesnay Schepler <ches...@apache.org>
> >> > wrote:
> >> > >
> >> > > > I do share the concern between blurring the lines a bit.
> >> > > >
> >> > > > That said, I'd prefer to not have any auto-detection and only
> have an
> >> > > > opt-in mechanism
> >> > > > to manually block processes/nodes. To me this sounds yet again
> like one
> >> > > > of those
> >> > > > magical mechanisms that will rarely work just right.
> >> > > > An external system can leverage way more information after all.
> >> > > >
> >> > > > Moreover, I'm quite concerned about the complexity of this
> proposal.
> >> > > > Tracking on both the RM/JM side; syncing between components;
> >> > adjustments
> >> > > > to the
> >> > > > slot and resource protocol.
> >> > > >
> >> > > > In a way it seems overly complicated.
> >> > > >
> >> > > > If we look at it purely from an active resource management
> perspective,
> >> > > > then there
> >> > > > isn't really a need to touch the slot protocol at all (or in fact
> to
> >> > > > anything in the JobMaster),
> >> > > > because there isn't any point in keeping around blocked TMs in the
> >> > first
> >> > > > place.
> >> > > > They'd just be idling, potentially shutting down after a while by
> the
> >> > RM
> >> > > > because of
> >> > > > it (unless we _also_ touch that logic).
> >> > > > Here the blocking of a process (be it by blocking the process or
> node)
> >> > is
> >> > > > equivalent with shutting down the blocked process(es).
> >> > > > Once the block is lifted we can just spin it back up.
> >> > > >
> >> > > > And I do wonder whether we couldn't apply the same line of
> thinking to
> >> > > > standalone resource management.
> >> > > > Here being able to stop/restart a process/node manually should be
> a
> >> > core
> >> > > > requirement for a Flink deployment anyway.
> >> > > >
> >> > > >
> >> > > > On 02/05/2022 08:49, Martijn Visser wrote:
> >> > > > > Hi everyone,
> >> > > > >
> >> > > > > Thanks for creating this FLIP. I can understand the problem and
> I see
> >> > > > value
> >> > > > > in the automatic detection and blocklisting. I do have some
> concerns
> >> > > with
> >> > > > > the ability to manually specify to be blocked resources. I have
> two
> >> > > > > concerns;
> >> > > > >
> >> > > > > * Most organizations explicitly have a separation of concerns,
> >> > meaning
> >> > > > that
> >> > > > > there's a group who's responsible for managing a cluster and
> there's
> >> > a
> >> > > > user
> >> > > > > group who uses that cluster. With the introduction of this
> mechanism,
> >> > > the
> >> > > > > latter group now can influence the responsibility of the first
> group.
> >> > > So
> >> > > > it
> >> > > > > can be possible that someone from the user group blocks
> something,
> >> > > which
> >> > > > > causes an outage (which could result in paging mechanism
> triggering
> >> > > etc)
> >> > > > > which impacts the first group.
> >> > > > > * How big is the group of people who can go through the process
> of
> >> > > > manually
> >> > > > > identifying a node that isn't behaving as it should be? I do
> think
> >> > this
> >> > > > > group is relatively limited. Does it then make sense to
> introduce
> >> > such
> >> > > a
> >> > > > > feature, which would only be used by a really small user group
> of
> >> > > Flink?
> >> > > > We
> >> > > > > still have to maintain, test and support such a feature.
> >> > > > >
> >> > > > > I'm +1 for the autodetection features, but I'm leaning towards
> not
> >> > > > exposing
> >> > > > > this to the user group but having this available strictly for
> cluster
> >> > > > > operators. They could then also set up their
> paging/metrics/logging
> >> > > > system
> >> > > > > to take this into account.
> >> > > > >
> >> > > > > Best regards,
> >> > > > >
> >> > > > > Martijn Visser
> >> > > > > https://twitter.com/MartijnVisser82
> >> > > > > https://github.com/MartijnVisser
> >> > > > >
> >> > > > >
> >> > > > > On Fri, 29 Apr 2022 at 09:39, Yangze Guo <karma...@gmail.com>
> wrote:
> >> > > > >
> >> > > > >> Thanks for driving this, Zhu and Lijie.
> >> > > > >>
> >> > > > >> +1 for the overall proposal. Just share some cents here:
> >> > > > >>
> >> > > > >> - Why do we need to expose
> >> > > > >> cluster.resource-blacklist.item.timeout-check-interval to the
> user?
> >> > > > >> I think the semantics of
> `cluster.resource-blacklist.item.timeout`
> >> > is
> >> > > > >> sufficient for the user. How to guarantee the timeout
> mechanism is
> >> > > > >> Flink's internal implementation. I think it will be very
> confusing
> >> > and
> >> > > > >> we do not need to expose it to users.
> >> > > > >>
> >> > > > >> - ResourceManager can notify the exception of a task manager to
> >> > > > >> `BlacklistHandler` as well.
> >> > > > >> For example, the slot allocation might fail in case the target
> task
> >> > > > >> manager is busy or has a network jitter. I don't mean we need
> to
> >> > cover
> >> > > > >> this case in this version, but we can also open a
> `notifyException`
> >> > in
> >> > > > >> `ResourceManagerBlacklistHandler`.
> >> > > > >>
> >> > > > >> - Before we sync the blocklist to ResourceManager, will the
> slot of
> >> > a
> >> > > > >> blocked task manager continues to be released and allocated?
> >> > > > >>
> >> > > > >> Best,
> >> > > > >> Yangze Guo
> >> > > > >>
> >> > > > >> On Thu, Apr 28, 2022 at 3:11 PM Lijie Wang <
> >> > wangdachui9...@gmail.com>
> >> > > > >> wrote:
> >> > > > >>> Hi Konstantin,
> >> > > > >>>
> >> > > > >>> Thanks for your feedback. I will response your 4 remarks:
> >> > > > >>>
> >> > > > >>>
> >> > > > >>> 1) Thanks for reminding me of the controversy. I think
> “BlockList”
> >> > is
> >> > > > >> good
> >> > > > >>> enough, and I will change it in FLIP.
> >> > > > >>>
> >> > > > >>>
> >> > > > >>> 2) Your suggestion for the REST API is a good idea. Based on
> the
> >> > > > above, I
> >> > > > >>> would change REST API as following:
> >> > > > >>>
> >> > > > >>> POST/GET <host>/blocklist/nodes
> >> > > > >>>
> >> > > > >>> POST/GET <host>/blocklist/taskmanagers
> >> > > > >>>
> >> > > > >>> DELETE <host>/blocklist/node/<identifier>
> >> > > > >>>
> >> > > > >>> DELETE <host>/blocklist/taskmanager/<identifier>
> >> > > > >>>
> >> > > > >>>
> >> > > > >>> 3) If a node is blocking/blocklisted, it means that all task
> >> > managers
> >> > > > on
> >> > > > >>> this node are blocklisted. All slots on these TMs are not
> >> > available.
> >> > > > This
> >> > > > >>> is actually a bit like TM losts, but these TMs are not really
> lost,
> >> > > > they
> >> > > > >>> are in an unavailable status, and they are still registered
> in this
> >> > > > flink
> >> > > > >>> cluster. They will be available again once the corresponding
> >> > > blocklist
> >> > > > >> item
> >> > > > >>> is removed. This behavior is the same in active/non-active
> >> > clusters.
> >> > > > >>> However in the active clusters, these TMs may be released due
> to
> >> > idle
> >> > > > >>> timeouts.
> >> > > > >>>
> >> > > > >>>
> >> > > > >>> 4) For the item timeout, I prefer to keep it. The reasons are
> as
> >> > > > >> following:
> >> > > > >>> a) The timeout will not affect users adding or removing items
> via
> >> > > REST
> >> > > > >> API,
> >> > > > >>> and users can disable it by configuring it to Long.MAX_VALUE .
> >> > > > >>>
> >> > > > >>> b) Some node problems can recover after a period of time
> (such as
> >> > > > machine
> >> > > > >>> hotspots), in which case users may prefer that Flink can do
> this
> >> > > > >>> automatically instead of requiring the user to do it manually.
> >> > > > >>>
> >> > > > >>>
> >> > > > >>> Best,
> >> > > > >>>
> >> > > > >>> Lijie
> >> > > > >>>
> >> > > > >>> Konstantin Knauf <kna...@apache.org> 于2022年4月27日周三 19:23写道:
> >> > > > >>>
> >> > > > >>>> Hi Lijie,
> >> > > > >>>>
> >> > > > >>>> I think, this makes sense and +1 to only support manually
> blocking
> >> > > > >>>> taskmanagers and nodes. Maybe the different strategies can
> also be
> >> > > > >>>> maintained outside of Apache Flink.
> >> > > > >>>>
> >> > > > >>>> A few remarks:
> >> > > > >>>>
> >> > > > >>>> 1) Can we use another term than "bla.cklist" due to the
> >> > controversy
> >> > > > >> around
> >> > > > >>>> the term? [1] There was also a Jira Ticket about this topic a
> >> > while
> >> > > > >> back
> >> > > > >>>> and there was generally a consensus to avoid the term
> blacklist &
> >> > > > >> whitelist
> >> > > > >>>> [2]? We could use "blocklist" "denylist" or "quarantined"
> >> > > > >>>> 2) For the REST API, I'd prefer a slightly different design
> as
> >> > verbs
> >> > > > >> like
> >> > > > >>>> add/remove often considered an anti-pattern for REST APIs.
> POST
> >> > on a
> >> > > > >> list
> >> > > > >>>> item is generally the standard to add items. DELETE on the
> >> > > individual
> >> > > > >>>> resource is standard to remove an item.
> >> > > > >>>>
> >> > > > >>>> POST <host>/quarantine/items
> >> > > > >>>> DELETE <host>/quarantine/items/<itemidentifier>
> >> > > > >>>>
> >> > > > >>>> We could also consider to separate taskmanagers and nodes in
> the
> >> > > REST
> >> > > > >> API
> >> > > > >>>> (and internal data structures). Any opinion on this?
> >> > > > >>>>
> >> > > > >>>> POST/GET <host>/quarantine/nodes
> >> > > > >>>> POST/GET <host>/quarantine/taskmanager
> >> > > > >>>> DELETE <host>/quarantine/nodes/<identifier>
> >> > > > >>>> DELETE <host>/quarantine/taskmanager/<identifier>
> >> > > > >>>>
> >> > > > >>>> 3) How would blocking nodes behave with non-active resource
> >> > > managers,
> >> > > > >> i.e.
> >> > > > >>>> standalone or reactive mode?
> >> > > > >>>>
> >> > > > >>>> 4) To keep the implementation even more minimal, do we need
> the
> >> > > > timeout
> >> > > > >>>> behavior? If items are added/removed manually we could
> delegate
> >> > this
> >> > > > >> to the
> >> > > > >>>> user easily. In my opinion the timeout behavior would better
> fit
> >> > > into
> >> > > > >>>> specific strategies at a later point.
> >> > > > >>>>
> >> > > > >>>> Looking forward to your thoughts.
> >> > > > >>>>
> >> > > > >>>> Cheers and thank you,
> >> > > > >>>>
> >> > > > >>>> Konstantin
> >> > > > >>>>
> >> > > > >>>> [1]
> >> > > > >>>>
> >> > > > >>>>
> >> > > > >>
> >> > > >
> >> > >
> >> >
> https://en.wikipedia.org/wiki/Blacklist_(computing)#Controversy_over_use_of_the_term
> >> > > > >>>> [2] https://issues.apache.org/jira/browse/FLINK-18209
> >> > > > >>>>
> >> > > > >>>> Am Mi., 27. Apr. 2022 um 04:04 Uhr schrieb Lijie Wang <
> >> > > > >>>> wangdachui9...@gmail.com>:
> >> > > > >>>>
> >> > > > >>>>> Hi all,
> >> > > > >>>>>
> >> > > > >>>>> Flink job failures may happen due to cluster node issues
> >> > > > >> (insufficient
> >> > > > >>>> disk
> >> > > > >>>>> space, bad hardware, network abnormalities). Flink will
> take care
> >> > > of
> >> > > > >> the
> >> > > > >>>>> failures and redeploy the tasks. However, due to data
> locality
> >> > and
> >> > > > >>>> limited
> >> > > > >>>>> resources, the new tasks are very likely to be redeployed
> to the
> >> > > same
> >> > > > >>>>> nodes, which will result in continuous task abnormalities
> and
> >> > > affect
> >> > > > >> job
> >> > > > >>>>> progress.
> >> > > > >>>>>
> >> > > > >>>>> Currently, Flink users need to manually identify the
> problematic
> >> > > > >> node and
> >> > > > >>>>> take it offline to solve this problem. But this approach has
> >> > > > >> following
> >> > > > >>>>> disadvantages:
> >> > > > >>>>>
> >> > > > >>>>> 1. Taking a node offline can be a heavy process. Users may
> need
> >> > to
> >> > > > >>>> contact
> >> > > > >>>>> cluster administors to do this. The operation can even be
> >> > dangerous
> >> > > > >> and
> >> > > > >>>> not
> >> > > > >>>>> allowed during some important business events.
> >> > > > >>>>>
> >> > > > >>>>> 2. Identifying and solving this kind of problems manually
> would
> >> > be
> >> > > > >> slow
> >> > > > >>>> and
> >> > > > >>>>> a waste of human resources.
> >> > > > >>>>>
> >> > > > >>>>> To solve this problem, Zhu Zhu and I propose to introduce a
> >> > > blacklist
> >> > > > >>>>> mechanism for Flink to filter out problematic resources.
> >> > > > >>>>>
> >> > > > >>>>>
> >> > > > >>>>> You can find more details in FLIP-224[1]. Looking forward
> to your
> >> > > > >>>> feedback.
> >> > > > >>>>> [1]
> >> > > > >>>>>
> >> > > > >>>>>
> >> > > > >>
> >> > > >
> >> > >
> >> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-224%3A+Blacklist+Mechanism
> >> > > > >>>>>
> >> > > > >>>>> Best,
> >> > > > >>>>>
> >> > > > >>>>> Lijie
> >> > > > >>>>>
> >> > > >
> >> > > >
> >> > >
> >> >
>

Reply via email to