[
https://issues.apache.org/jira/browse/KAFKA-12477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17377622#comment-17377622
]
Konstantine Karantasis commented on KAFKA-12477:
------------------------------------------------
[~ableegoldman] [~guozhang] is this issue a blocker for 3.0 ? If not I'd like
to postpone it to the next release
> Smart rebalancing with dynamic protocol selection
> -------------------------------------------------
>
> Key: KAFKA-12477
> URL: https://issues.apache.org/jira/browse/KAFKA-12477
> Project: Kafka
> Issue Type: Improvement
> Components: consumer
> Reporter: A. Sophie Blee-Goldman
> Assignee: A. Sophie Blee-Goldman
> Priority: Major
> Fix For: 3.0.0
>
>
> Users who want to upgrade their applications and enable the COOPERATIVE
> rebalancing protocol in their consumer apps are required to follow a double
> rolling bounce upgrade path. The reason for this is laid out in the [Consumer
> Upgrades|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol#KIP429:KafkaConsumerIncrementalRebalanceProtocol-Consumer]
> section of KIP-429. Basically, the ConsumerCoordinator picks a rebalancing
> protocol in its constructor based on the list of supported partition
> assignors. The protocol is selected as the highest protocol that is commonly
> supported by all assignors in the list, and never changes after that.
> This is a bit unfortunate because it may end up using an older protocol even
> after every member in the group has been updated to support the newer
> protocol. After the first rolling bounce of the upgrade, all members will
> have two assignors: "cooperative-sticky" and "range" (or
> sticky/round-robin/etc). At this point the EAGER protocol will still be
> selected due to the presence of the "range" assignor, but it's the
> "cooperative-sticky" assignor that will ultimately be selected for use in
> rebalances if that assignor is preferred (ie positioned first in the list).
> The only reason for the second rolling bounce is to strip off the "range"
> assignor and allow the upgraded members to switch over to COOPERATIVE. We
> can't allow them to use cooperative rebalancing until everyone has been
> upgraded, but once they have it's safe to do so.
> And there is already a way for the client to detect that everyone is on the
> new byte code: if the CooperativeStickyAssignor is selected by the group
> coordinator, then that means it is supported by all consumers in the group
> and therefore everyone must be upgraded.
> We may be able to save the second rolling bounce by dynamically updating the
> rebalancing protocol inside the ConsumerCoordinator as "the highest protocol
> supported by the assignor chosen by the group coordinator". This means we'll
> still be using EAGER at the first rebalance, since we of course need to wait
> for this initial rebalance to get the response from the group coordinator.
> But we should take the hint from the chosen assignor rather than dropping
> this information on the floor and sticking with the original protocol.
> Concrete Proposal:
> This assumes we will change the default assignor to ["cooperative-sticky",
> "range"] in KIP-726. It also acknowledges that users may attempt any kind of
> upgrade without reading the docs, and so we need to put in safeguards against
> data corruption rather than assume everyone will follow the safe upgrade path.
> With this proposal,
> 1) New applications on 3.0 will enable cooperative rebalancing by default
> 2) Existing applications which don’t set an assignor can safely upgrade to
> 3.0 using a single rolling bounce with no extra steps, and will automatically
> transition to cooperative rebalancing
> 3) Existing applications which do set an assignor that uses EAGER can
> likewise upgrade their applications to COOPERATIVE with a single rolling
> bounce
> 4) Once on 3.0, applications can safely go back and forth between EAGER and
> COOPERATIVE
> 5) Applications can safely downgrade away from 3.0
> The high-level idea for dynamic protocol upgrades is that the group will
> leverage the assignor selected by the group coordinator to determine when
> it’s safe to upgrade to COOPERATIVE, and trigger a fail-safe to protect the
> group in case of rare events or user misconfiguration. The group coordinator
> selects the most preferred assignor that’s supported by all members of the
> group, so we know that all members will support COOPERATIVE once we receive
> the “cooperative-sticky” assignor after a rebalance. At this point, each
> member can upgrade their own protocol to COOPERATIVE. However, there may be
> situations in which an EAGER member may join the group even after upgrading
> to COOPERATIVE. For example, during a rolling upgrade if the last remaining
> member on the old bytecode misses a rebalance, the other members will be
> allowed to upgrade to COOPERATIVE. If the old member rejoins and is chosen to
> be the group leader before it’s upgraded to 3.0, it won’t be aware that the
> other members of the group have not yet revoked their partitions when
> computing the assignment.
> Short Circuit:
> The risk of mixing the cooperative and eager rebalancing protocols is that a
> partition may be assigned to one member while it has yet to be revoked from
> its previous owner. The danger is that the new owner may begin processing and
> committing offsets for this partition while the previous owner is also
> committing offsets in its #onPartitionsRevoked callback, which is invoked at
> the end of the rebalance in the cooperative protocol. This can result in
> these consumers overwriting each other’s offsets and getting a corrupted view
> of the partition. Note that it’s not possible to commit during a rebalance,
> so we can protect against offset corruption by blocking further commits after
> we detect that the group leader may not understand COOPERATIVE, but before we
> invoke #onPartitionsRevoked. This is the “short-circuit” — if we detect that
> the group is in an unsafe state, we invoke #onPartitionsLost instead of
> #onPartitionsRevoked and explicitly prevent offsets from being committed on
> those revoked partitions.
> Consumer procedure:
> Upon startup, the consumer will initially select the highest
> commonly-supported protocol across its configured assignors. With
> ["cooperative-sticky", "range”], the initial protocol will be EAGER when the
> member first joins the group. Following a rebalance, each member will check
> the selected assignor. If the chosen assignor supports COOPERATIVE, the
> member can upgrade their used protocol to COOPERATIVE and no further action
> is required. If the member is already on COOPERATIVE but the selected
> assignor does NOT support it, then we need to trigger the short-circuit. In
> this case we will invoke #onPartitionsLost instead of #onPartitionsRevoked,
> and set a flag to block any attempts at committing those partitions which
> have been revoked. If a commit is attempted, as may be the case if the user
> does not implement #onPartitionsLost (see KAFKA-12638), we will throw a
> CommitFailedException which will be bubbled up through poll() after
> completing the rebalance. The member will then downgrade its protocol to
> EAGER for the next rebalance.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)