[jira] [Commented] (KAFKA-12495) Unbalanced connectors/tasks distribution will happen in Connect's incremental cooperative assignor

2022-05-11 Thread Manuel Garcia Cabrera (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17535020#comment-17535020
 ] 

Manuel Garcia Cabrera commented on KAFKA-12495:
---

I see two different issues when doing deployments with 2.6.2. In my deployments 
I add 5 Workers at once and then remove 5 other Workers at once (starting with 
11, so it goes from 11 to 16 and then back to 11). I say at once but they don't 
really get started and removed at once, there maybe could be a 30 seconds 
different between the first one being ready and the last one being ready. 
Problems I see are:
 # What's mentioned in this ticket when Workers are being added. Basically, 
there are more workers in the second round than they were in the first round, 
which leads to unbalanced assignments.
 # When Workers are going away, we sometimes end up with one Worker with ALL 
the assignments that were in the 5 Workers that went away. The Worker that gets 
all this assignments is the last one that got started. When this happens, I see 
that in one generation that Worker had no assignments, and in the next one it 
has but it also shows the delay that comes from waiting on the Workers that 
left ({{{}scheduled.rebalance.max.delay.ms{}}}). After that delay expires, all 
assignments from the 5 Workers that went away go into that one Worker. My 
theory here by looking at the code is that it may have become the only one in 
{{candidateWorkersForReassignment}} at the time were it had no assignments, and 
then remained that way even when we started waiting on the Workers that went 
away even though this Worker had assignments by now. Either way, I don't really 
get this `candidateWorkersForReassignment`, because such workers would get all 
the assignments of the ones that went away, right? What if more Workers went 
away than the ones that don't have assignments?

As a note, I also looked at what happens during my deployments when everything 
ends up being balanced, and I don't think it's working as it should even if the 
end result is balanced. I noticed that doing this same deploying of adding 5 
and then removing 5, it never waited on the 5 that left as it should according 
to the configured {{{}scheduled.rebalance.max.delay.ms{}}}. Looking at the 
logs, it's always shown as 0 in this case (as opposed to the previous case 
where it actually shows up as expected). I'm thinking that there are race 
conditions here as well.

> Unbalanced connectors/tasks distribution will happen in Connect's incremental 
> cooperative assignor
> --
>
> Key: KAFKA-12495
> URL: https://issues.apache.org/jira/browse/KAFKA-12495
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Luke Chen
>Priority: Critical
> Attachments: image-2021-03-18-15-04-57-854.png, 
> image-2021-03-18-15-05-52-557.png, image-2021-03-18-15-07-27-103.png
>
>
> In Kafka Connect, we implement incremental cooperative rebalance algorithm 
> based on KIP-415 
> ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect].
>  However, we have a bad assumption in the algorithm implementation, which is: 
> after revoking rebalance completed, the member(worker) count will be the same 
> as the previous round of reblance.
>  
> Let's take a look at the example in the KIP-415:
> !image-2021-03-18-15-07-27-103.png|width=441,height=556!
> It works well for most cases. But what if W4 added after 1st rebalance 
> completed and before 2nd rebalance started? Let's see what will happened? 
> Let's see this example: (we'll use 10 tasks here):
>  
> {code:java}
> Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, 
> BT5
> W1 is current leader
> W2 joins with assignment: []
> Rebalance is triggered
> W3 joins while rebalance is still active with assignment: []
> W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, 
> BT4, BT5]
> W1 becomes leader
> W1 computes and sends assignments:
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> W2(delay: 0, assigned: [], revoked: [])
> W3(delay: 0, assigned: [], revoked: [])
> W1 stops revoked resources
> W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
> Rebalance is triggered
> W2 joins with assignment: []
> W3 joins with assignment: []
> // one more member joined
> W4 joins with assignment: []
> W1 becomes leader
> W1 computes and sends assignments:
> // We assigned all the previous revoked Connect

[jira] [Comment Edited] (KAFKA-12495) Unbalanced connectors/tasks distribution will happen in Connect's incremental cooperative assignor

2022-05-11 Thread Manuel Garcia Cabrera (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17535020#comment-17535020
 ] 

Manuel Garcia Cabrera edited comment on KAFKA-12495 at 5/11/22 5:02 PM:


I see two different issues when doing deployments with 2.6.2. In my deployments 
I add 5 Workers at once and then remove 5 other Workers at once (starting with 
11, so it goes from 11 to 16 and then back to 11). I say at once but they don't 
really get started and removed at once, there maybe could be a 30 seconds 
different between the first one being ready and the last one being ready. 
Problems I see are:
 # What's mentioned in this ticket when Workers are being added. Basically, 
there are more workers in the second round than they were in the first round, 
which leads to unbalanced assignments.
 # When Workers are going away, we sometimes end up with one Worker with ALL 
the assignments that were in the 5 Workers that went away. The Worker that gets 
all this assignments is the last one that got started. When this happens, I see 
that in one generation that Worker had no assignments, and in the next one it 
has but it also shows the delay that comes from waiting on the Workers that 
left ({{{}scheduled.rebalance.max.delay.ms{}}}). After that delay expires, all 
assignments from the 5 Workers that went away go into that one Worker. My 
theory here by looking at the code is that it may have become the only one in 
{{candidateWorkersForReassignment}} at the time were it had no assignments, and 
then remained that way even when we started waiting on the Workers that went 
away even though this Worker had assignments by now. Either way, I don't really 
get this {{{}candidateWorkersForReassignment{}}}, because such workers would 
get all the assignments of the ones that went away, right? What if more Workers 
went away than the ones that don't have assignments?

As a note, I also looked at what happens during my deployments when everything 
ends up being balanced, and I don't think it's working as it should even if the 
end result is balanced. I noticed that doing this same deploying of adding 5 
and then removing 5, it never waited on the 5 that left as it should according 
to the configured {{{}scheduled.rebalance.max.delay.ms{}}}. Looking at the 
logs, it's always shown as 0 in this case (as opposed to the previous case 
where it actually shows up as expected). I'm thinking that there are race 
conditions here as well.


was (Author: JIRAUSER289345):
I see two different issues when doing deployments with 2.6.2. In my deployments 
I add 5 Workers at once and then remove 5 other Workers at once (starting with 
11, so it goes from 11 to 16 and then back to 11). I say at once but they don't 
really get started and removed at once, there maybe could be a 30 seconds 
different between the first one being ready and the last one being ready. 
Problems I see are:
 # What's mentioned in this ticket when Workers are being added. Basically, 
there are more workers in the second round than they were in the first round, 
which leads to unbalanced assignments.
 # When Workers are going away, we sometimes end up with one Worker with ALL 
the assignments that were in the 5 Workers that went away. The Worker that gets 
all this assignments is the last one that got started. When this happens, I see 
that in one generation that Worker had no assignments, and in the next one it 
has but it also shows the delay that comes from waiting on the Workers that 
left ({{{}scheduled.rebalance.max.delay.ms{}}}). After that delay expires, all 
assignments from the 5 Workers that went away go into that one Worker. My 
theory here by looking at the code is that it may have become the only one in 
{{candidateWorkersForReassignment}} at the time were it had no assignments, and 
then remained that way even when we started waiting on the Workers that went 
away even though this Worker had assignments by now. Either way, I don't really 
get this `candidateWorkersForReassignment`, because such workers would get all 
the assignments of the ones that went away, right? What if more Workers went 
away than the ones that don't have assignments?

As a note, I also looked at what happens during my deployments when everything 
ends up being balanced, and I don't think it's working as it should even if the 
end result is balanced. I noticed that doing this same deploying of adding 5 
and then removing 5, it never waited on the 5 that left as it should according 
to the configured {{{}scheduled.rebalance.max.delay.ms{}}}. Looking at the 
logs, it's always shown as 0 in this case (as opposed to the previous case 
where it actually shows up as expected). I'm thinking that there are race 
conditions here as well.

> Unbalanced connectors/tasks distribution will happen in Connect's incremental 
> cooperative assignor
> -

[jira] [Comment Edited] (KAFKA-12495) Unbalanced connectors/tasks distribution will happen in Connect's incremental cooperative assignor

2022-05-11 Thread Manuel Garcia Cabrera (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17535020#comment-17535020
 ] 

Manuel Garcia Cabrera edited comment on KAFKA-12495 at 5/11/22 5:44 PM:


I see two different issues when doing deployments with 2.6.2. In my deployments 
I add 5 Workers at once and then remove 5 other Workers at once (starting with 
11, so it goes from 11 to 16 and then back to 11). I say at once but they don't 
really get started and removed at once, there maybe could be a 30 seconds 
different between the first one being ready and the last one being ready. 
Problems I see are:
 # What's mentioned in this ticket when Workers are being added. Basically, 
there are more workers in the second round than they were in the first round, 
which leads to unbalanced assignments.
 # When Workers are going away, we sometimes end up with one Worker with ALL 
the assignments that were in the 5 Workers that went away. The Worker that gets 
all this assignments is the last one that got started. When this happens, I see 
that in one generation that Worker had no assignments, and in the next one it 
has but it also shows the delay that comes from waiting on the Workers that 
left ({{{}scheduled.rebalance.max.delay.ms{}}}). After that delay expires, all 
assignments from the 5 Workers that went away go into that one Worker. My 
theory here by looking at the code is that it may have become the only one in 
{{candidateWorkersForReassignment}} at the time were it had no assignments, and 
then remained that way even when we started waiting on the Workers that went 
away even though this Worker had assignments by now. Either way, I don't really 
get this {{{}candidateWorkersForReassignment{}}}, because such workers would 
get all the assignments of the ones that went away, right? What if more Workers 
went away than the ones that don't have assignments? This is in fact what 
happens when I test removing 5 Workers and in the waiting period add 1 Worker, 
everything goes into that single new Worker.

 

As a note, I also looked at what happens during my deployments when everything 
ends up being balanced, and I don't think it's working as it should even if the 
end result is balanced. I noticed that doing this same deploying of adding 5 
and then removing 5, it never waited on the 5 that left as it should according 
to the configured {{{}scheduled.rebalance.max.delay.ms{}}}. Looking at the 
logs, it's always shown as 0 in this case (as opposed to the previous case 
where it actually shows up as expected). I'm thinking that there are race 
conditions here as well.

 


was (Author: JIRAUSER289345):
I see two different issues when doing deployments with 2.6.2. In my deployments 
I add 5 Workers at once and then remove 5 other Workers at once (starting with 
11, so it goes from 11 to 16 and then back to 11). I say at once but they don't 
really get started and removed at once, there maybe could be a 30 seconds 
different between the first one being ready and the last one being ready. 
Problems I see are:
 # What's mentioned in this ticket when Workers are being added. Basically, 
there are more workers in the second round than they were in the first round, 
which leads to unbalanced assignments.
 # When Workers are going away, we sometimes end up with one Worker with ALL 
the assignments that were in the 5 Workers that went away. The Worker that gets 
all this assignments is the last one that got started. When this happens, I see 
that in one generation that Worker had no assignments, and in the next one it 
has but it also shows the delay that comes from waiting on the Workers that 
left ({{{}scheduled.rebalance.max.delay.ms{}}}). After that delay expires, all 
assignments from the 5 Workers that went away go into that one Worker. My 
theory here by looking at the code is that it may have become the only one in 
{{candidateWorkersForReassignment}} at the time were it had no assignments, and 
then remained that way even when we started waiting on the Workers that went 
away even though this Worker had assignments by now. Either way, I don't really 
get this {{{}candidateWorkersForReassignment{}}}, because such workers would 
get all the assignments of the ones that went away, right? What if more Workers 
went away than the ones that don't have assignments? In fact, this is in fact 
what happens when I test removing 5 Workers and in the waiting period add 1 
Worker, everything goes into that single new Worker.

 

As a note, I also looked at what happens during my deployments when everything 
ends up being balanced, and I don't think it's working as it should even if the 
end result is balanced. I noticed that doing this same deploying of adding 5 
and then removing 5, it never waited on the 5 that left as it should according 
to the configured {{{}scheduled.rebalance.max.delay.ms{}}}. Lo

[jira] [Comment Edited] (KAFKA-12495) Unbalanced connectors/tasks distribution will happen in Connect's incremental cooperative assignor

2022-05-11 Thread Manuel Garcia Cabrera (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17535020#comment-17535020
 ] 

Manuel Garcia Cabrera edited comment on KAFKA-12495 at 5/11/22 5:44 PM:


I see two different issues when doing deployments with 2.6.2. In my deployments 
I add 5 Workers at once and then remove 5 other Workers at once (starting with 
11, so it goes from 11 to 16 and then back to 11). I say at once but they don't 
really get started and removed at once, there maybe could be a 30 seconds 
different between the first one being ready and the last one being ready. 
Problems I see are:
 # What's mentioned in this ticket when Workers are being added. Basically, 
there are more workers in the second round than they were in the first round, 
which leads to unbalanced assignments.
 # When Workers are going away, we sometimes end up with one Worker with ALL 
the assignments that were in the 5 Workers that went away. The Worker that gets 
all this assignments is the last one that got started. When this happens, I see 
that in one generation that Worker had no assignments, and in the next one it 
has but it also shows the delay that comes from waiting on the Workers that 
left ({{{}scheduled.rebalance.max.delay.ms{}}}). After that delay expires, all 
assignments from the 5 Workers that went away go into that one Worker. My 
theory here by looking at the code is that it may have become the only one in 
{{candidateWorkersForReassignment}} at the time were it had no assignments, and 
then remained that way even when we started waiting on the Workers that went 
away even though this Worker had assignments by now. Either way, I don't really 
get this {{{}candidateWorkersForReassignment{}}}, because such workers would 
get all the assignments of the ones that went away, right? What if more Workers 
went away than the ones that don't have assignments? In fact, this is in fact 
what happens when I test removing 5 Workers and in the waiting period add 1 
Worker, everything goes into that single new Worker.

 

As a note, I also looked at what happens during my deployments when everything 
ends up being balanced, and I don't think it's working as it should even if the 
end result is balanced. I noticed that doing this same deploying of adding 5 
and then removing 5, it never waited on the 5 that left as it should according 
to the configured {{{}scheduled.rebalance.max.delay.ms{}}}. Looking at the 
logs, it's always shown as 0 in this case (as opposed to the previous case 
where it actually shows up as expected). I'm thinking that there are race 
conditions here as well.

 


was (Author: JIRAUSER289345):
I see two different issues when doing deployments with 2.6.2. In my deployments 
I add 5 Workers at once and then remove 5 other Workers at once (starting with 
11, so it goes from 11 to 16 and then back to 11). I say at once but they don't 
really get started and removed at once, there maybe could be a 30 seconds 
different between the first one being ready and the last one being ready. 
Problems I see are:
 # What's mentioned in this ticket when Workers are being added. Basically, 
there are more workers in the second round than they were in the first round, 
which leads to unbalanced assignments.
 # When Workers are going away, we sometimes end up with one Worker with ALL 
the assignments that were in the 5 Workers that went away. The Worker that gets 
all this assignments is the last one that got started. When this happens, I see 
that in one generation that Worker had no assignments, and in the next one it 
has but it also shows the delay that comes from waiting on the Workers that 
left ({{{}scheduled.rebalance.max.delay.ms{}}}). After that delay expires, all 
assignments from the 5 Workers that went away go into that one Worker. My 
theory here by looking at the code is that it may have become the only one in 
{{candidateWorkersForReassignment}} at the time were it had no assignments, and 
then remained that way even when we started waiting on the Workers that went 
away even though this Worker had assignments by now. Either way, I don't really 
get this {{{}candidateWorkersForReassignment{}}}, because such workers would 
get all the assignments of the ones that went away, right? What if more Workers 
went away than the ones that don't have assignments?

As a note, I also looked at what happens during my deployments when everything 
ends up being balanced, and I don't think it's working as it should even if the 
end result is balanced. I noticed that doing this same deploying of adding 5 
and then removing 5, it never waited on the 5 that left as it should according 
to the configured {{{}scheduled.rebalance.max.delay.ms{}}}. Looking at the 
logs, it's always shown as 0 in this case (as opposed to the previous case 
where it actually shows up as expected). I'm thinking that t

[jira] [Comment Edited] (KAFKA-12495) Unbalanced connectors/tasks distribution will happen in Connect's incremental cooperative assignor

2022-05-12 Thread Manuel Garcia Cabrera (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17535020#comment-17535020
 ] 

Manuel Garcia Cabrera edited comment on KAFKA-12495 at 5/12/22 9:59 PM:


I see two different issues when doing deployments with 2.6.2. In my deployments 
I add 5 Workers at once and then remove 5 other Workers at once (starting with 
11, so it goes from 11 to 16 and then back to 11). I say at once but they don't 
really get started and removed at once, there maybe could be a 30 seconds 
different between the first one being ready and the last one being ready. 
Problems I see are:
 # What's mentioned in this ticket when Workers are being added. Basically, 
there are more workers in the second round than they were in the first round, 
which leads to unbalanced assignments.
 # When Workers are going away, we sometimes end up with one Worker with ALL 
the assignments that were in the 5 Workers that went away. The Worker that gets 
all this assignments is the last one that got started. When this happens, I see 
that in one generation that Worker had no assignments, and in the next one it 
has but it also shows the delay that comes from waiting on the Workers that 
left ({{{}scheduled.rebalance.max.delay.ms{}}}). After that delay expires, all 
assignments from the 5 Workers that went away go into that one Worker. My 
theory here by looking at the code is that it may have become the only one in 
{{candidateWorkersForReassignment}} at the time were it had no assignments, and 
then remained that way even when we started waiting on the Workers that went 
away even though this Worker had assignments by now. Either way, I don't really 
get this {{{}candidateWorkersForReassignment{}}}, because such workers would 
get all the assignments of the ones that went away, right? What if more Workers 
went away than the ones that don't have assignments? This is in fact what 
happens when I test removing 5 Workers and in the waiting period add 1 Worker, 
everything goes into that single new Worker.

 

 


was (Author: JIRAUSER289345):
I see two different issues when doing deployments with 2.6.2. In my deployments 
I add 5 Workers at once and then remove 5 other Workers at once (starting with 
11, so it goes from 11 to 16 and then back to 11). I say at once but they don't 
really get started and removed at once, there maybe could be a 30 seconds 
different between the first one being ready and the last one being ready. 
Problems I see are:
 # What's mentioned in this ticket when Workers are being added. Basically, 
there are more workers in the second round than they were in the first round, 
which leads to unbalanced assignments.
 # When Workers are going away, we sometimes end up with one Worker with ALL 
the assignments that were in the 5 Workers that went away. The Worker that gets 
all this assignments is the last one that got started. When this happens, I see 
that in one generation that Worker had no assignments, and in the next one it 
has but it also shows the delay that comes from waiting on the Workers that 
left ({{{}scheduled.rebalance.max.delay.ms{}}}). After that delay expires, all 
assignments from the 5 Workers that went away go into that one Worker. My 
theory here by looking at the code is that it may have become the only one in 
{{candidateWorkersForReassignment}} at the time were it had no assignments, and 
then remained that way even when we started waiting on the Workers that went 
away even though this Worker had assignments by now. Either way, I don't really 
get this {{{}candidateWorkersForReassignment{}}}, because such workers would 
get all the assignments of the ones that went away, right? What if more Workers 
went away than the ones that don't have assignments? This is in fact what 
happens when I test removing 5 Workers and in the waiting period add 1 Worker, 
everything goes into that single new Worker.

 

As a note, I also looked at what happens during my deployments when everything 
ends up being balanced, and I don't think it's working as it should even if the 
end result is balanced. I noticed that doing this same deploying of adding 5 
and then removing 5, it never waited on the 5 that left as it should according 
to the configured {{{}scheduled.rebalance.max.delay.ms{}}}. Looking at the 
logs, it's always shown as 0 in this case (as opposed to the previous case 
where it actually shows up as expected). I'm thinking that there are race 
conditions here as well.

 

> Unbalanced connectors/tasks distribution will happen in Connect's incremental 
> cooperative assignor
> --
>
> Key: KAFKA-12495
> URL: https://issues.apache.org/jira/browse/KAFKA-12495
> Project: Kafka
>  Issue Type: Bug
>  Compo

[jira] [Commented] (KAFKA-12495) Unbalanced connectors/tasks distribution will happen in Connect's incremental cooperative assignor

2022-05-16 Thread Manuel Garcia Cabrera (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17537612#comment-17537612
 ] 

Manuel Garcia Cabrera commented on KAFKA-12495:
---

[~showuon] I'm short on time right now, so I won't be able to tackle this.

> Unbalanced connectors/tasks distribution will happen in Connect's incremental 
> cooperative assignor
> --
>
> Key: KAFKA-12495
> URL: https://issues.apache.org/jira/browse/KAFKA-12495
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Luke Chen
>Priority: Critical
> Attachments: image-2021-03-18-15-04-57-854.png, 
> image-2021-03-18-15-05-52-557.png, image-2021-03-18-15-07-27-103.png
>
>
> In Kafka Connect, we implement incremental cooperative rebalance algorithm 
> based on KIP-415 
> ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect].
>  However, we have a bad assumption in the algorithm implementation, which is: 
> after revoking rebalance completed, the member(worker) count will be the same 
> as the previous round of reblance.
>  
> Let's take a look at the example in the KIP-415:
> !image-2021-03-18-15-07-27-103.png|width=441,height=556!
> It works well for most cases. But what if W4 added after 1st rebalance 
> completed and before 2nd rebalance started? Let's see what will happened? 
> Let's see this example: (we'll use 10 tasks here):
>  
> {code:java}
> Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, 
> BT5
> W1 is current leader
> W2 joins with assignment: []
> Rebalance is triggered
> W3 joins while rebalance is still active with assignment: []
> W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, 
> BT4, BT5]
> W1 becomes leader
> W1 computes and sends assignments:
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> W2(delay: 0, assigned: [], revoked: [])
> W3(delay: 0, assigned: [], revoked: [])
> W1 stops revoked resources
> W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
> Rebalance is triggered
> W2 joins with assignment: []
> W3 joins with assignment: []
> // one more member joined
> W4 joins with assignment: []
> W1 becomes leader
> W1 computes and sends assignments:
> // We assigned all the previous revoked Connectors/Tasks to the new member, 
> but we didn't revoke any more C/T in this round, which cause unbalanced 
> distribution
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [])
> W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: [])
> W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: [])
> W2(delay: 0, assigned: [BT4, BT5], revoked: [])
> {code}
> Because we didn't allow to do consecutive revoke in two consecutive 
> rebalances (under the same leader), we will have this uneven distribution 
> under this situation. We should allow consecutive rebalance to have another 
> round of revocation to revoke the C/T to the other members in this case.
> expected:
> {code:java}
> Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, 
> BT5
> W1 is current leader
> W2 joins with assignment: []
> Rebalance is triggered
> W3 joins while rebalance is still active with assignment: []
> W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, 
> BT4, BT5]
> W1 becomes leader
> W1 computes and sends assignments:
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> W2(delay: 0, assigned: [], revoked: [])
> W3(delay: 0, assigned: [], revoked: [])
> W1 stops revoked resources
> W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
> Rebalance is triggered
> W2 joins with assignment: []
> W3 joins with assignment: []
> // one more member joined
> W4 joins with assignment: []
> W1 becomes leader
> W1 computes and sends assignments:
> // We assigned all the previous revoked Connectors/Tasks to the new member, 
> **and also revoke some C/T** 
> W1(delay: 0, assigned: [AC0, AT1, AT2], revoked: [AT3])
> W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: [])
> W3(delay: 0, assigned: [BT1, BT2, BT4], revoked: [])
> W4(delay: 0, assigned: [BT4, BT5], revoked: [])
> // another round of rebalance to assign the new revoked C/T to the other 
> members
> W1 rejoins with assignment: [AC0, AT1, AT2] 
> Rebalance is triggered 
> W2 joins with assignment: [AT4, AT5, BC0] 
> W3 joins with assignme