[ 
https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17582706#comment-17582706
 ] 

Luke Chen edited comment on KAFKA-12495 at 8/22/22 3:08 AM:
------------------------------------------------------------

Sorry for the delay. My thoughts are below

 

> `I should note that I don't believe the proposal to use the scheduled 
> rebalance delay in between successive revocation rounds is safe, though`.  I 
> actually couldn't follow this comment. Today the protocol doesn't allow 2 
> successive rebalances happening by the same leader and Luke has been saying 
> all along to remove it. I think that's what his PR does as well.

 

Yes, my PR is to allow successive rebalances "without delay". But the code 
author, Konstantine, suggested we should have "delay" between each rebalance 
(revocation), to avoid rebalance storm. But Chris doesn't agree that there 
would cause rebalance storm, and also, the delay might even slow down the 
normal scale up/down pace. That's why, Chris suggested, with some compromise, 
we can have a exponential backoff between successive revocation rounds. That 
is, with current "scheduled delay" method, each rebalance needs 5 mins to 
trigger the rebalance. But with "exponential backoff" way, it'll speed up the 
rebalance, but also keep the concept of "delay" rebalance.

 

> I think to be on the safer side, I would implement the exponential back off 
> logic starting from 0 -> scheduled rebalance delay.

 

Sounds good

 

> But we won't still do it while a delayed rebalance is active. It could be 
> counter based as well i.e we will tolerate w workers to be added before 
> triggering a rebalance. Complexity wise it seems the same to me but how do 
> the 2 compare?

 

Yes, I agree complexity wise it seems the same. Again, this way is to keep the 
original concept of "rebalance delay", and also speed up the rebalance pace.

 

[~sagarrao] , I saw you created a KIP for next generation Connect rabalance 
protocol 
[here|https://cwiki.apache.org/confluence/display/KAFKA/%5BDRAFT%5DIntegrating+Kafka+Connect+With+New+Consumer+Rebalance+Protocol].
 I have a quick scan, and looks like it only integrates the concept of KIP-848 
into Kafka Connect, and keep Connect client assignor. I'm wondering that if we 
can move the client assignor in Connect into group coordinator like consumer 
does? I don't think Connect needs to have a custom client assignor like Stream 
does, because Connect has no complicated "active"/"standBy" tasks like Stream 
does. If we can move the algorithm onto group coordinator, I think it'll make 
our life easier. WDYT?

 

However, I think it still needs a lot of discussion for the new KIP. Before 
that, we can still have a acceptable fix for this bug. Thank you.


was (Author: showuon):
Sorry for the delay. My thoughts are below

 

> `I should note that I don't believe the proposal to use the scheduled 
> rebalance delay in between successive revocation rounds is safe, though`.  I 
> actually couldn't follow this comment. Today the protocol doesn't allow 2 
> successive rebalances happening by the same leader and Luke has been saying 
> all along to remove it. I think that's what his PR does as well.

 

Yes, my PR is to allow successive rebalances "without delay". But the code 
author, Konstantine, suggested we should have "delay" between each rebalance 
(revocation), to avoid rebalance storm. But Chris doesn't agree that there 
would cause rebalance storm, and also, the delay might even slow down the 
normal scale up/down pace. That's why, Chris suggested, with some compromise, 
we can have a exponential backoff between successive revocation rounds. That 
is, with current "scheduled delay" method, each rebalance needs 5 mins to 
trigger the rebalance. But with "exponential backoff" way, it'll speed up the 
rebalance, but also keep the concept of "delay" rebalance.

 

> I think to be on the safer side, I would implement the exponential back off 
> logic starting from 0 -> scheduled rebalance delay.

 

Sounds good

 

> But we won't still do it while a delayed rebalance is active. It could be 
> counter based as well i.e we will tolerate w workers to be added before 
> triggering a rebalance. Complexity wise it seems the same to me but how do 
> the 2 compare?

 

Yes, I agree complexity wise it seems the same. Again, this way is to keep the 
original concept of "rebalance delay", and also speed up the rebalance pace.

 

[~sagarrao] , I saw you created a KIP for next generation Connect rabalance 
protocol 
[here|https://cwiki.apache.org/confluence/display/KAFKA/%5BDRAFT%5DIntegrating+Kafka+Connect+With+New+Consumer+Rebalance+Protocol].
 I have a quick scan, and looks like it only integrates the concept of KIP-848 
into Kafka Connect, and keep Connect client assignor. I'm wondering that if we 
can move the client assignor in Connect into group coordinator like consumer 
does? I don't think Connect needs to have a custom client assignor like Stream 
does, because Connect has no complicated "active"/"standBy" tasks like Stream 
does. If we can move the algorithm onto group coordinator, I think it'll make 
our life easier. WDYT?

 

However, I think it still needs a lot of discussion for the new KIP. Before 
that, we can still have a acceptable fix for this bug. Thank you.

 

 

 

no  I think maybe it's great if you have any thought to improve the Connect 
assignment algorithm, you can add it there. Like removing the scheduled delay 
concept, and run some checks afterwards to see if the assignments are balanced 
or skewed and then trigger a rebalance later on, like Kafka Stream does. Look 
forward to your proposal!

 

> Unbalanced connectors/tasks distribution will happen in Connect's incremental 
> cooperative assignor
> --------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-12495
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12495
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>            Reporter: Luke Chen
>            Assignee: Sagar Rao
>            Priority: Critical
>         Attachments: image-2021-03-18-15-04-57-854.png, 
> image-2021-03-18-15-05-52-557.png, image-2021-03-18-15-07-27-103.png
>
>
> In Kafka Connect, we implement incremental cooperative rebalance algorithm 
> based on KIP-415 
> ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect].
>  However, we have a bad assumption in the algorithm implementation, which is: 
> after revoking rebalance completed, the member(worker) count will be the same 
> as the previous round of reblance.
>  
> Let's take a look at the example in the KIP-415:
> !image-2021-03-18-15-07-27-103.png|width=441,height=556!
> It works well for most cases. But what if W4 added after 1st rebalance 
> completed and before 2nd rebalance started? Let's see what will happened? 
> Let's see this example: (we'll use 10 tasks here):
>  
> {code:java}
> Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, 
> BT5
> W1 is current leader
> W2 joins with assignment: []
> Rebalance is triggered
> W3 joins while rebalance is still active with assignment: []
> W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, 
> BT4, BT5]
> W1 becomes leader
> W1 computes and sends assignments:
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> W2(delay: 0, assigned: [], revoked: [])
> W3(delay: 0, assigned: [], revoked: [])
> W1 stops revoked resources
> W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
> Rebalance is triggered
> W2 joins with assignment: []
> W3 joins with assignment: []
> // one more member joined
> W4 joins with assignment: []
> W1 becomes leader
> W1 computes and sends assignments:
> // We assigned all the previous revoked Connectors/Tasks to the new member, 
> but we didn't revoke any more C/T in this round, which cause unbalanced 
> distribution
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [])
> W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: [])
> W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: [])
> W2(delay: 0, assigned: [BT4, BT5], revoked: [])
> {code}
> Because we didn't allow to do consecutive revoke in two consecutive 
> rebalances (under the same leader), we will have this uneven distribution 
> under this situation. We should allow consecutive rebalance to have another 
> round of revocation to revoke the C/T to the other members in this case.
> expected:
> {code:java}
> Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, 
> BT5
> W1 is current leader
> W2 joins with assignment: []
> Rebalance is triggered
> W3 joins while rebalance is still active with assignment: []
> W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, 
> BT4, BT5]
> W1 becomes leader
> W1 computes and sends assignments:
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> W2(delay: 0, assigned: [], revoked: [])
> W3(delay: 0, assigned: [], revoked: [])
> W1 stops revoked resources
> W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
> Rebalance is triggered
> W2 joins with assignment: []
> W3 joins with assignment: []
> // one more member joined
> W4 joins with assignment: []
> W1 becomes leader
> W1 computes and sends assignments:
> // We assigned all the previous revoked Connectors/Tasks to the new member, 
> **and also revoke some C/T** 
> W1(delay: 0, assigned: [AC0, AT1, AT2], revoked: [AT3])
> W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: [])
> W3(delay: 0, assigned: [BT1, BT2, BT4], revoked: [])
> W4(delay: 0, assigned: [BT4, BT5], revoked: [])
> // another round of rebalance to assign the new revoked C/T to the other 
> members
> W1 rejoins with assignment: [AC0, AT1, AT2] 
> Rebalance is triggered 
> W2 joins with assignment: [AT4, AT5, BC0] 
> W3 joins with assignment: [BT1, BT2, BT4]
> W4 joins with assignment: [BT4, BT5]
> W1 becomes leader 
> W1 computes and sends assignments:
> // (final) We assigned all the previous revoked Connectors/Tasks to the 
> members
> W1(delay: 0, assigned: [AC0, AT1, AT2], revoked: []) 
> W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: []) 
> W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: []) 
> W2(delay: 0, assigned: [BT4, BT5, AT3], revoked: [])
> {code}
> Note: The consumer's cooperative sticky assignor won't have this issue since 
> we re-compute the assignment in each round.
>  
> Note2: this issue makes KAFKA-12283 test flaky.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to