[ 
https://issues.apache.org/jira/browse/KAFKA-12477?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-12477:
-------------------------------------------
    Description: 
Users who want to upgrade their applications and enable the COOPERATIVE 
rebalancing protocol in their consumer apps are required to follow a double 
rolling bounce upgrade path. The reason for this is laid out in the [Consumer 
Upgrades|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol#KIP429:KafkaConsumerIncrementalRebalanceProtocol-Consumer]
 section of KIP-429. Basically, the ConsumerCoordinator picks a rebalancing 
protocol in its constructor based on the list of supported partition assignors. 
The protocol is selected as the highest protocol that is commonly supported by 
all assignors in the list, and never changes after that.

This is a bit unfortunate because it may end up using an older protocol even 
after every member in the group has been updated to support the newer protocol. 
After the first rolling bounce of the upgrade, all members will have two 
assignors: "cooperative-sticky" and "range" (or sticky/round-robin/etc). At 
this point the EAGER protocol will still be selected due to the presence of the 
"range" assignor, but it's the "cooperative-sticky" assignor that will 
ultimately be selected for use in rebalances if that assignor is preferred (ie 
positioned first in the list). The only reason for the second rolling bounce is 
to strip off the "range" assignor and allow the upgraded members to switch over 
to COOPERATIVE. We can't allow them to use cooperative rebalancing until 
everyone has been upgraded, but once they have it's safe to do so.

And there is already a way for the client to detect that everyone is on the new 
byte code: if the CooperativeStickyAssignor is selected by the group 
coordinator, then that means it is supported by all consumers in the group and 
therefore everyone must be upgraded. 

We may be able to save the second rolling bounce by dynamically updating the 
rebalancing protocol inside the ConsumerCoordinator as "the highest protocol 
supported by the assignor chosen by the group coordinator". This means we'll 
still be using EAGER at the first rebalance, since we of course need to wait 
for this initial rebalance to get the response from the group coordinator. But 
we should take the hint from the chosen assignor rather than dropping this 
information on the floor and sticking with the original protocol.


Concrete Proposal:
This assumes we will change the default assignor to ["cooperative-sticky", 
"range"] in KIP-726. It also acknowledges that users may attempt any kind of 
upgrade without reading the docs, and so we need to put in safeguards against 
data corruption rather than assume everyone will follow the safe upgrade path.

With this proposal, 
1) New applications on 3.0 will enable cooperative rebalancing by default
2) Existing applications which don’t set an assignor can safely upgrade to 3.0 
using a single rolling bounce with no extra steps, and will automatically 
transition to cooperative rebalancing
3) Existing applications which do set an assignor that uses EAGER can likewise 
upgrade their applications to COOPERATIVE with a single rolling bounce
4) Once on 3.0, applications can safely go back and forth between EAGER and 
COOPERATIVE
5) Applications can safely downgrade away from 3.0

The high-level idea for dynamic protocol upgrades is that the group will 
leverage the assignor selected by the group coordinator to determine when it’s 
safe to upgrade to COOPERATIVE, and trigger a fail-safe to protect the group in 
case of rare events or user misconfiguration. The group coordinator selects the 
most preferred assignor that’s supported by all members of the group, so we 
know that all members will support COOPERATIVE once we receive the 
“cooperative-sticky” assignor after a rebalance. At this point, each member can 
upgrade their own protocol to COOPERATIVE. However, there may be situations in 
which an EAGER member may join the group even after upgrading to COOPERATIVE. 
For example, during a rolling upgrade if the last remaining member on the old 
bytecode misses a rebalance, the other members will be allowed to upgrade to 
COOPERATIVE. If the old member rejoins and is chosen to be the group leader 
before it’s upgraded to 3.0, it won’t be aware that the other members of the 
group have not yet revoked their partitions when computing the assignment.

Short Circuit:
The risk of mixing the cooperative and eager rebalancing protocols is that a 
partition may be assigned to one member while it has yet to be revoked from its 
previous owner. The danger is that the new owner may begin processing and 
committing offsets for this partition while the previous owner is also 
committing offsets in its #onPartitionsRevoked callback, which is invoked at 
the end of the rebalance in the cooperative protocol. This can result in these 
consumers overwriting each other’s offsets and getting a corrupted view of the 
partition. Note that it’s not possible to commit during a rebalance, so we can 
protect against offset corruption by blocking further commits after we detect 
that the group leader may not understand COOPERATIVE, but before we invoke 
#onPartitionsRevoked. This is the “short-circuit” — if we detect that the group 
is in an unsafe state, we invoke #onPartitionsLost instead of 
#onPartitionsRevoked and explicitly prevent offsets from being committed on 
those revoked partitions.

Consumer procedure:
Upon startup, the consumer will initially select the highest commonly-supported 
protocol across its configured assignors. With ["cooperative-sticky", "range”], 
the initial protocol will be EAGER when the member first joins the group. 
Following a rebalance, each member will check the selected assignor. If the 
chosen assignor supports COOPERATIVE, the member can upgrade their used 
protocol to COOPERATIVE and no further action is required. If the member is 
already on COOPERATIVE but the selected assignor does NOT support it, then we 
need to trigger the short-circuit. In this case we will invoke 
#onPartitionsLost instead of #onPartitionsRevoked, and set a flag to block any 
attempts at committing those partitions which have been revoked. If a commit is 
attempted, as may be the case if the user does not implement #onPartitionsLost 
(see KAFKA-12638), we will throw a CommitFailedException which will be bubbled 
up through poll() after completing the rebalance. The member will then 
downgrade its protocol to EAGER for the next rebalance.


  was:
Users who want to upgrade their applications and enable the COOPERATIVE 
rebalancing protocol in their consumer apps are required to follow a double 
rolling bounce upgrade path. The reason for this is laid out in the [Consumer 
Upgrades|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol#KIP429:KafkaConsumerIncrementalRebalanceProtocol-Consumer]
 section of KIP-429. Basically, the ConsumerCoordinator picks a rebalancing 
protocol in its constructor based on the list of supported partition assignors. 
The protocol is selected as the highest protocol that is commonly supported by 
all assignors in the list, and never changes after that.

This is a bit unfortunate because it may end up using an older protocol even 
after every member in the group has been updated to support the newer protocol. 
After the first rolling bounce of the upgrade, all members will have two 
assignors: "cooperative-sticky" and "range" (or sticky/round-robin/etc). At 
this point the EAGER protocol will still be selected due to the presence of the 
"range" assignor, but it's the "cooperative-sticky" assignor that will 
ultimately be selected for use in rebalances if that assignor is preferred (ie 
positioned first in the list). The only reason for the second rolling bounce is 
to strip off the "range" assignor and allow the upgraded members to switch over 
to COOPERATIVE. We can't allow them to use cooperative rebalancing until 
everyone has been upgraded, but once they have it's safe to do so.

And there is already a way for the client to detect that everyone is on the new 
byte code: if the CooperativeStickyAssignor is selected by the group 
coordinator, then that means it is supported by all consumers in the group and 
therefore everyone must be upgraded. 

We may be able to save the second rolling bounce by dynamically updating the 
rebalancing protocol inside the ConsumerCoordinator as "the highest protocol 
supported by the assignor chosen by the group coordinator". This means we'll 
still be using EAGER at the first rebalance, since we of course need to wait 
for this initial rebalance to get the response from the group coordinator. But 
we should take the hint from the chosen assignor rather than dropping this 
information on the floor and sticking with the original protocol


> Smart rebalancing with dynamic protocol selection
> -------------------------------------------------
>
>                 Key: KAFKA-12477
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12477
>             Project: Kafka
>          Issue Type: Improvement
>          Components: consumer
>            Reporter: A. Sophie Blee-Goldman
>            Assignee: A. Sophie Blee-Goldman
>            Priority: Major
>             Fix For: 3.0.0
>
>
> Users who want to upgrade their applications and enable the COOPERATIVE 
> rebalancing protocol in their consumer apps are required to follow a double 
> rolling bounce upgrade path. The reason for this is laid out in the [Consumer 
> Upgrades|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol#KIP429:KafkaConsumerIncrementalRebalanceProtocol-Consumer]
>  section of KIP-429. Basically, the ConsumerCoordinator picks a rebalancing 
> protocol in its constructor based on the list of supported partition 
> assignors. The protocol is selected as the highest protocol that is commonly 
> supported by all assignors in the list, and never changes after that.
> This is a bit unfortunate because it may end up using an older protocol even 
> after every member in the group has been updated to support the newer 
> protocol. After the first rolling bounce of the upgrade, all members will 
> have two assignors: "cooperative-sticky" and "range" (or 
> sticky/round-robin/etc). At this point the EAGER protocol will still be 
> selected due to the presence of the "range" assignor, but it's the 
> "cooperative-sticky" assignor that will ultimately be selected for use in 
> rebalances if that assignor is preferred (ie positioned first in the list). 
> The only reason for the second rolling bounce is to strip off the "range" 
> assignor and allow the upgraded members to switch over to COOPERATIVE. We 
> can't allow them to use cooperative rebalancing until everyone has been 
> upgraded, but once they have it's safe to do so.
> And there is already a way for the client to detect that everyone is on the 
> new byte code: if the CooperativeStickyAssignor is selected by the group 
> coordinator, then that means it is supported by all consumers in the group 
> and therefore everyone must be upgraded. 
> We may be able to save the second rolling bounce by dynamically updating the 
> rebalancing protocol inside the ConsumerCoordinator as "the highest protocol 
> supported by the assignor chosen by the group coordinator". This means we'll 
> still be using EAGER at the first rebalance, since we of course need to wait 
> for this initial rebalance to get the response from the group coordinator. 
> But we should take the hint from the chosen assignor rather than dropping 
> this information on the floor and sticking with the original protocol.
> Concrete Proposal:
> This assumes we will change the default assignor to ["cooperative-sticky", 
> "range"] in KIP-726. It also acknowledges that users may attempt any kind of 
> upgrade without reading the docs, and so we need to put in safeguards against 
> data corruption rather than assume everyone will follow the safe upgrade path.
> With this proposal, 
> 1) New applications on 3.0 will enable cooperative rebalancing by default
> 2) Existing applications which don’t set an assignor can safely upgrade to 
> 3.0 using a single rolling bounce with no extra steps, and will automatically 
> transition to cooperative rebalancing
> 3) Existing applications which do set an assignor that uses EAGER can 
> likewise upgrade their applications to COOPERATIVE with a single rolling 
> bounce
> 4) Once on 3.0, applications can safely go back and forth between EAGER and 
> COOPERATIVE
> 5) Applications can safely downgrade away from 3.0
> The high-level idea for dynamic protocol upgrades is that the group will 
> leverage the assignor selected by the group coordinator to determine when 
> it’s safe to upgrade to COOPERATIVE, and trigger a fail-safe to protect the 
> group in case of rare events or user misconfiguration. The group coordinator 
> selects the most preferred assignor that’s supported by all members of the 
> group, so we know that all members will support COOPERATIVE once we receive 
> the “cooperative-sticky” assignor after a rebalance. At this point, each 
> member can upgrade their own protocol to COOPERATIVE. However, there may be 
> situations in which an EAGER member may join the group even after upgrading 
> to COOPERATIVE. For example, during a rolling upgrade if the last remaining 
> member on the old bytecode misses a rebalance, the other members will be 
> allowed to upgrade to COOPERATIVE. If the old member rejoins and is chosen to 
> be the group leader before it’s upgraded to 3.0, it won’t be aware that the 
> other members of the group have not yet revoked their partitions when 
> computing the assignment.
> Short Circuit:
> The risk of mixing the cooperative and eager rebalancing protocols is that a 
> partition may be assigned to one member while it has yet to be revoked from 
> its previous owner. The danger is that the new owner may begin processing and 
> committing offsets for this partition while the previous owner is also 
> committing offsets in its #onPartitionsRevoked callback, which is invoked at 
> the end of the rebalance in the cooperative protocol. This can result in 
> these consumers overwriting each other’s offsets and getting a corrupted view 
> of the partition. Note that it’s not possible to commit during a rebalance, 
> so we can protect against offset corruption by blocking further commits after 
> we detect that the group leader may not understand COOPERATIVE, but before we 
> invoke #onPartitionsRevoked. This is the “short-circuit” — if we detect that 
> the group is in an unsafe state, we invoke #onPartitionsLost instead of 
> #onPartitionsRevoked and explicitly prevent offsets from being committed on 
> those revoked partitions.
> Consumer procedure:
> Upon startup, the consumer will initially select the highest 
> commonly-supported protocol across its configured assignors. With 
> ["cooperative-sticky", "range”], the initial protocol will be EAGER when the 
> member first joins the group. Following a rebalance, each member will check 
> the selected assignor. If the chosen assignor supports COOPERATIVE, the 
> member can upgrade their used protocol to COOPERATIVE and no further action 
> is required. If the member is already on COOPERATIVE but the selected 
> assignor does NOT support it, then we need to trigger the short-circuit. In 
> this case we will invoke #onPartitionsLost instead of #onPartitionsRevoked, 
> and set a flag to block any attempts at committing those partitions which 
> have been revoked. If a commit is attempted, as may be the case if the user 
> does not implement #onPartitionsLost (see KAFKA-12638), we will throw a 
> CommitFailedException which will be bubbled up through poll() after 
> completing the rebalance. The member will then downgrade its protocol to 
> EAGER for the next rebalance.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to