[ 
https://issues.apache.org/jira/browse/KAFKA-14016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703797#comment-17703797
 ] 

Philip Nee commented on KAFKA-14016:
------------------------------------

Hey all, in particular, [~showuon] [~aiquestion] 

I've been trying to understand and watch this issue for a while - I'm curious, 
what exactly do we want to achieve here? Is it that we want to minimize the 
partition movement as much as possible? In the case when all partitions are 
correctly assigned. If one consumer joins with an earlier generation, is the 
desired behavior to have nothing revoked? (Assuming the partition assignment is 
already in a good state).

 

I wrote a small test to demonstrate what I think we want:
{code:java}
public void testExpiredGenerationNoPartitionMovement() {
Map<String, Integer> partitionsPerTopic = new HashMap<>();
partitionsPerTopic.put(topic, 3);

subscriptions.put(consumer1, buildSubscriptionV1(topics(topic), 
partitions(tp(topic, 0)), 2));
subscriptions.put(consumer2, buildSubscriptionV1(topics(topic), 
partitions(tp(topic, 1), tp(topic, 2)), 1));

Map<String, List<TopicPartition>> assignment = 
assignor.assign(partitionsPerTopic, subscriptions);
assertEquals(partitions(tp(topic, 0)), assignment.get(consumer1));
assertEquals(partitions(tp(topic, 1), tp(topic, 2)), assignment.get(consumer2));
...
}{code}

> Revoke more partitions than expected in Cooperative rebalance
> -------------------------------------------------------------
>
>                 Key: KAFKA-14016
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14016
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 3.3.0
>            Reporter: Shawn Wang
>            Priority: Major
>              Labels: new-rebalance-should-fix
>
> In https://issues.apache.org/jira/browse/KAFKA-13419 we found that some 
> consumer didn't reset generation and state after sync group fail with 
> REABALANCE_IN_PROGRESS error.
> So we fixed it by reset generationId (no memberId) when  sync group fail with 
> REABALANCE_IN_PROGRESS error.
> But this change missed the reset part, so another change made in 
> https://issues.apache.org/jira/browse/KAFKA-13891 make this works.
> After apply this change, we found that: sometimes consumer will revoker 
> almost 2/3 of the partitions with cooperative enabled. Because if a consumer 
> did a very quick re-join, other consumers will get REABALANCE_IN_PROGRESS in 
> syncGroup and revoked their partition before re-jion. example:
>  # consumer A1-A10 (ten consumers) joined and synced group successfully with 
> generation 1 
>  # New consumer B1 joined and start a rebalance
>  # all consumer joined successfully and then A1 need to revoke partition to 
> transfer to B1
>  # A1 do a very quick syncGroup and re-join, because it revoked partition
>  # A2-A10 didn't send syncGroup before A1 re-join, so after the send 
> syncGruop, will get REBALANCE_IN_PROGRESS
>  # A2-A10 will revoke there partitions and re-join
> So in this rebalance almost every partition revoked, which highly decrease 
> the benefit of Cooperative rebalance 
> I think instead of "{*}resetStateAndRejoin{*} when 
> *RebalanceInProgressException* errors happend in {*}sync group{*}" we need 
> another way to fix it.
> Here is my proposal:
>  # revert the change in https://issues.apache.org/jira/browse/KAFKA-13891
>  # In Server Coordinator handleSyncGroup when generationId checked and group 
> state is PreparingRebalance. We can send the assignment along with the error 
> code REBALANCE_IN_PROGRESS. ( i think it's safe since we verified the 
> generation first )
>  # When get the REBALANCE_IN_PROGRESS error in client, try to apply the 
> assignment first and then set the rejoinNeeded = true to make it re-join 
> immediately 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to