[ 
https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-12495:
------------------------------
    Description: 
In Kafka Connect, we implement incremental cooperative rebalance algorithm 
based on KIP-415 
([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect].
 However, we have a bad assumption in the algorithm implementation, which is: 
after revoking rebalance completed, the member(worker) count will be the same 
as the previous round of reblance.

 

Let's take a look at the example in the KIP-415:

!image-2021-03-18-15-07-27-103.png|width=441,height=556!

It works well for most cases. But what if W4 added after 1st rebalance 
completed and before 2nd rebalance started? Let's see what will happened? Let's 
see this example: (we'll use 10 tasks here):

 
{code:java}
Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, 
BT4, BT4, BT5])
Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, 
BT5
W1 is current leader
W2 joins with assignment: []
Rebalance is triggered
W3 joins while rebalance is still active with assignment: []
W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, 
BT4, BT5]
W1 becomes leader
W1 computes and sends assignments:
W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, BT2, 
BT4, BT4, BT5])
W2(delay: 0, assigned: [], revoked: [])
W3(delay: 0, assigned: [], revoked: [])
W1 stops revoked resources
W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
Rebalance is triggered
W2 joins with assignment: []
W3 joins with assignment: []

// one more member joined
W4 joins with assignment: []
W1 becomes leader
W1 computes and sends assignments:

// We assigned all the previous revoked Connectors/Tasks to the new member, 
which cause unbalanced distribution
W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [])
W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: [])
W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: [])
W2(delay: 0, assigned: [BT4, BT5], revoked: [])
{code}
We cannot just assign the revoked C/T to the new members in the 2nd round of 
rebalance. We should check if we need one more round of revocation.

 

Note: The consumer's cooperative sticky assignor won't have this issue since we 
re-compute the assignment in each round.

  was:
In Kafka Connect, we implement incremental cooperative rebalance algorithm 
based on KIP-415 
([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect].
 However, we have a bad assumption in the algorithm implementation, which is: 
after revoking rebalance completed, the member(worker) count will be the same 
as the previous round of reblance.

 

Let's take a look at the example in the KIP-415:

!image-2021-03-18-15-07-27-103.png|width=441,height=556!

It works well for most cases. But what if W3 left after 1st rebalance completed 
and before 2nd rebalance started? Let's see what will happened? Let's see this 
example: (we'll use 10 tasks here):

 
{code:java}
Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, 
BT4, BT4, BT5])
Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, 
BT5
W1 is current leader
W2 joins with assignment: []
Rebalance is triggered
W3 joins while rebalance is still active with assignment: []
W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, 
BT4, BT5]
W1 becomes leader
W1 computes and sends assignments:
W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, BT2, 
BT4, BT4, BT5])
W2(delay: 0, assigned: [], revoked: [])
W3(delay: 0, assigned: [], revoked: [])
W1 stops revoked resources
W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
Rebalance is triggered
W2 joins with assignment: []
// W3 is down
W3 doesn't join
W1 becomes leader
W1 computes and sends assignments:

// We assigned all the previous revoked Connectors/Tasks to the new member, 
which cause unbalanced distribution
W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [])
W2(delay: 0, assigned: [AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5], revoked: [])
{code}
We cannot assume the member count after keeps the same right after revocation.

 

Note: The consumer's cooperative sticky assignor won't have this issue since we 
re-compute the assignment in each round.


> Unbalanced connectors/tasks distribution will happen in Connect's incremental 
> cooperative assignor
> --------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-12495
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12495
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Luke Chen
>            Assignee: Luke Chen
>            Priority: Major
>         Attachments: image-2021-03-18-15-04-57-854.png, 
> image-2021-03-18-15-05-52-557.png, image-2021-03-18-15-07-27-103.png
>
>
> In Kafka Connect, we implement incremental cooperative rebalance algorithm 
> based on KIP-415 
> ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect].
>  However, we have a bad assumption in the algorithm implementation, which is: 
> after revoking rebalance completed, the member(worker) count will be the same 
> as the previous round of reblance.
>  
> Let's take a look at the example in the KIP-415:
> !image-2021-03-18-15-07-27-103.png|width=441,height=556!
> It works well for most cases. But what if W4 added after 1st rebalance 
> completed and before 2nd rebalance started? Let's see what will happened? 
> Let's see this example: (we'll use 10 tasks here):
>  
> {code:java}
> Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, 
> BT5
> W1 is current leader
> W2 joins with assignment: []
> Rebalance is triggered
> W3 joins while rebalance is still active with assignment: []
> W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, 
> BT4, BT5]
> W1 becomes leader
> W1 computes and sends assignments:
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> W2(delay: 0, assigned: [], revoked: [])
> W3(delay: 0, assigned: [], revoked: [])
> W1 stops revoked resources
> W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
> Rebalance is triggered
> W2 joins with assignment: []
> W3 joins with assignment: []
> // one more member joined
> W4 joins with assignment: []
> W1 becomes leader
> W1 computes and sends assignments:
> // We assigned all the previous revoked Connectors/Tasks to the new member, 
> which cause unbalanced distribution
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [])
> W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: [])
> W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: [])
> W2(delay: 0, assigned: [BT4, BT5], revoked: [])
> {code}
> We cannot just assign the revoked C/T to the new members in the 2nd round of 
> rebalance. We should check if we need one more round of revocation.
>  
> Note: The consumer's cooperative sticky assignor won't have this issue since 
> we re-compute the assignment in each round.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to