[ https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Luke Chen updated KAFKA-12495: ------------------------------ Description: In Kafka Connect, we implement incremental cooperative rebalance algorithm based on KIP-415 ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect]. However, we have a bad assumption in the algorithm implementation, which is: after revoking rebalance completed, the member(worker) count will be the same as the previous round of reblance. Let's take a look at the example in the KIP-415: !image-2021-03-18-15-07-27-103.png|width=441,height=556! It works well for most cases. But what if W4 added after 1st rebalance completed and before 2nd rebalance started? Let's see what will happened? Let's see this example: (we'll use 10 tasks here): {code:java} Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5]) Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5 W1 is current leader W2 joins with assignment: [] Rebalance is triggered W3 joins while rebalance is still active with assignment: [] W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5] W1 becomes leader W1 computes and sends assignments: W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5]) W2(delay: 0, assigned: [], revoked: []) W3(delay: 0, assigned: [], revoked: []) W1 stops revoked resources W1 rejoins with assignment: [AC0, AT1, AT2, AT3] Rebalance is triggered W2 joins with assignment: [] W3 joins with assignment: [] // one more member joined W4 joins with assignment: [] W1 becomes leader W1 computes and sends assignments: // We assigned all the previous revoked Connectors/Tasks to the new member, which cause unbalanced distribution W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: []) W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: []) W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: []) W2(delay: 0, assigned: [BT4, BT5], revoked: []) {code} We cannot just assign the revoked C/T to the new members in the 2nd round of rebalance. We should check if we need one more round of revocation. Note: The consumer's cooperative sticky assignor won't have this issue since we re-compute the assignment in each round. was: In Kafka Connect, we implement incremental cooperative rebalance algorithm based on KIP-415 ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect]. However, we have a bad assumption in the algorithm implementation, which is: after revoking rebalance completed, the member(worker) count will be the same as the previous round of reblance. Let's take a look at the example in the KIP-415: !image-2021-03-18-15-07-27-103.png|width=441,height=556! It works well for most cases. But what if W3 left after 1st rebalance completed and before 2nd rebalance started? Let's see what will happened? Let's see this example: (we'll use 10 tasks here): {code:java} Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5]) Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5 W1 is current leader W2 joins with assignment: [] Rebalance is triggered W3 joins while rebalance is still active with assignment: [] W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5] W1 becomes leader W1 computes and sends assignments: W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5]) W2(delay: 0, assigned: [], revoked: []) W3(delay: 0, assigned: [], revoked: []) W1 stops revoked resources W1 rejoins with assignment: [AC0, AT1, AT2, AT3] Rebalance is triggered W2 joins with assignment: [] // W3 is down W3 doesn't join W1 becomes leader W1 computes and sends assignments: // We assigned all the previous revoked Connectors/Tasks to the new member, which cause unbalanced distribution W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: []) W2(delay: 0, assigned: [AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5], revoked: []) {code} We cannot assume the member count after keeps the same right after revocation. Note: The consumer's cooperative sticky assignor won't have this issue since we re-compute the assignment in each round. > Unbalanced connectors/tasks distribution will happen in Connect's incremental > cooperative assignor > -------------------------------------------------------------------------------------------------- > > Key: KAFKA-12495 > URL: https://issues.apache.org/jira/browse/KAFKA-12495 > Project: Kafka > Issue Type: Bug > Reporter: Luke Chen > Assignee: Luke Chen > Priority: Major > Attachments: image-2021-03-18-15-04-57-854.png, > image-2021-03-18-15-05-52-557.png, image-2021-03-18-15-07-27-103.png > > > In Kafka Connect, we implement incremental cooperative rebalance algorithm > based on KIP-415 > ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect]. > However, we have a bad assumption in the algorithm implementation, which is: > after revoking rebalance completed, the member(worker) count will be the same > as the previous round of reblance. > > Let's take a look at the example in the KIP-415: > !image-2021-03-18-15-07-27-103.png|width=441,height=556! > It works well for most cases. But what if W4 added after 1st rebalance > completed and before 2nd rebalance started? Let's see what will happened? > Let's see this example: (we'll use 10 tasks here): > > {code:java} > Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, > BT5 > W1 is current leader > W2 joins with assignment: [] > Rebalance is triggered > W3 joins while rebalance is still active with assignment: [] > W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, > BT4, BT5] > W1 becomes leader > W1 computes and sends assignments: > W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > W2(delay: 0, assigned: [], revoked: []) > W3(delay: 0, assigned: [], revoked: []) > W1 stops revoked resources > W1 rejoins with assignment: [AC0, AT1, AT2, AT3] > Rebalance is triggered > W2 joins with assignment: [] > W3 joins with assignment: [] > // one more member joined > W4 joins with assignment: [] > W1 becomes leader > W1 computes and sends assignments: > // We assigned all the previous revoked Connectors/Tasks to the new member, > which cause unbalanced distribution > W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: []) > W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: []) > W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: []) > W2(delay: 0, assigned: [BT4, BT5], revoked: []) > {code} > We cannot just assign the revoked C/T to the new members in the 2nd round of > rebalance. We should check if we need one more round of revocation. > > Note: The consumer's cooperative sticky assignor won't have this issue since > we re-compute the assignment in each round. -- This message was sent by Atlassian Jira (v8.3.4#803005)