This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6ef10a9bbe2 KAFKA-20272: Rebalance classic members only on new 
assignment (#21662)
6ef10a9bbe2 is described below

commit 6ef10a9bbe2c6ee637a0970f89eabcdaadb20438
Author: Sean Quah <[email protected]>
AuthorDate: Mon Mar 9 14:33:35 2026 +0000

    KAFKA-20272: Rebalance classic members only on new assignment (#21662)
    
    When assignment batching or offload are enabled, the target assignment
    can lag behind the group epoch. We must only tell classic members in
    mixed groups to rejoin when a newer target assignment is available.
    
    Reviewers: David Jacot <[email protected]>
---
 .../coordinator/group/GroupMetadataManager.java    |   8 +-
 .../group/GroupMetadataManagerTest.java            | 278 ++++++++++++++++++++-
 2 files changed, 276 insertions(+), 10 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 624d588149c..925befaeef9 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -1850,11 +1850,11 @@ public class GroupMetadataManager {
         ConsumerGroup group,
         ConsumerGroupMember member
     ) {
-        // If the group epoch is greater than the member epoch, there is a new 
rebalance triggered and the member
+        // If the target assignment epoch is greater than the member epoch, 
there is a new rebalance triggered and the member
         // needs to rejoin to catch up. However, if the member is in 
UNREVOKED_PARTITIONS state, it means the
         // member has already rejoined, so it needs to first finish revoking 
the partitions and the reconciliation,
         // and then the next rejoin will be triggered automatically if needed.
-        if (group.groupEpoch() > member.memberEpoch() && 
!member.state().equals(MemberState.UNREVOKED_PARTITIONS)) {
+        if (group.assignmentEpoch() > member.memberEpoch() && 
!member.state().equals(MemberState.UNREVOKED_PARTITIONS)) {
             scheduleConsumerGroupJoinTimeoutIfAbsent(group.groupId(), 
member.memberId(), member.rebalanceTimeoutMs());
             throw Errors.REBALANCE_IN_PROGRESS.exception(
                 String.format("A new rebalance is triggered in group %s and 
member %s should rejoin to catch up.",
@@ -7832,10 +7832,10 @@ public class GroupMetadataManager {
 
         Errors error = Errors.NONE;
         // The member should rejoin if any of the following conditions is met.
-        // 1) The group epoch is bumped so the member need to rejoin to catch 
up.
+        // 1) The target assignment epoch is bumped so the member needs to 
rejoin to catch up.
         // 2) The member needs to revoke some partitions and rejoin to 
reconcile with the new epoch.
         // 3) The member's partitions pending assignment are free, so it can 
rejoin to get the complete assignment.
-        if (member.memberEpoch() < group.groupEpoch() ||
+        if (member.memberEpoch() < group.assignmentEpoch() ||
             member.state() == MemberState.UNREVOKED_PARTITIONS ||
             (member.state() == MemberState.UNRELEASED_PARTITIONS && 
!group.waitingOnUnreleasedPartition(member))) {
             error = Errors.REBALANCE_IN_PROGRESS;
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index e281ba3bb88..d0a961d2f14 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -15137,7 +15137,7 @@ public class GroupMetadataManagerTest {
         );
 
         // Consumer group with a member using the classic protocol.
-        // The group epoch is greater than the member epoch.
+        // The target assignment epoch is greater than the member epoch.
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
             
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, 
ConsumerGroupMigrationPolicy.DOWNGRADE.toString())
             .withConsumerGroup(new ConsumerGroupBuilder(groupId, 11)
@@ -15149,7 +15149,8 @@ public class GroupMetadataManagerTest {
                             .setSupportedProtocols(protocols)
                     )
                     .setMemberEpoch(10)
-                    .build()))
+                    .build())
+                .withAssignmentEpoch(11))
             .build();
 
         assertThrows(RebalanceInProgressException.class, () -> 
context.sendClassicGroupSync(
@@ -15164,6 +15165,139 @@ public class GroupMetadataManagerTest {
         context.assertJoinTimeout(groupId, memberId, 10000);
     }
 
+    @Test
+    public void testClassicGroupSyncToConsumerGroupDuringAssignmentDelay() {
+        String groupId = "fooup";
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+        List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols = 
List.of(
+            new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+                .setName("range")
+                
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+                    List.of(fooTopicName),
+                    null,
+                    List.of(
+                        new TopicPartition(fooTopicName, 0),
+                        new TopicPartition(fooTopicName, 1),
+                        new TopicPartition(fooTopicName, 2)
+                    )
+                ))))
+        );
+
+        ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder(memberId1)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setSubscribedTopicNames(List.of(fooTopicName))
+            .setServerAssignorName("range")
+            .setRebalanceTimeoutMs(45000)
+            .setClassicMemberMetadata(
+                new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+                    .setSessionTimeoutMs(5000)
+                    .setSupportedProtocols(protocols)
+            )
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2)))
+            .build();
+        ConsumerGroupMember member2 = new 
ConsumerGroupMember.Builder(memberId2)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setSubscribedTopicNames(List.of(barTopicName))
+            .setServerAssignorName("range")
+            .setRebalanceTimeoutMs(45000)
+            .setAssignedPartitions(mkAssignment())
+            .build();
+
+        CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 6)
+            .addTopic(barTopicId, barTopicName, 3)
+            .addRacks()
+            .buildCoordinatorMetadataImage();
+
+        // Consumer group with two members.
+        // Member 1 uses the classic protocol and member 2 uses the consumer 
protocol.
+        // Member 2 has just changed subscription from foo to bar and the new 
assignment has not
+        // been computed yet.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
List.of(assignor))
+            .withMetadataImage(metadataImage)
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 11)
+                .withMember(member1)
+                .withMember(member2)
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                .withAssignment(memberId2, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5)))
+                .withAssignmentEpoch(10)
+                .withMetadataHash(computeGroupHash(Map.of(
+                    fooTopicName, computeTopicHash(fooTopicName, 
metadataImage),
+                    barTopicName, computeTopicHash(barTopicName, metadataImage)
+                ))))
+            .build();
+
+        // Member 1 is not told to rebalance yet.
+        assertDoesNotThrow(() -> {
+            GroupMetadataManagerTestContext.SyncResult syncResult = 
context.sendClassicGroupSync(
+                new GroupMetadataManagerTestContext.SyncGroupRequestBuilder()
+                    .withGroupId(groupId)
+                    .withMemberId(memberId1)
+                    .withGenerationId(10)
+                    .build()
+            );
+            syncResult.appendFuture.complete(null);
+        });
+
+        // Member 2 heartbeats and triggers a new assignment.
+        assignor.prepareGroupAssignment(new GroupAssignment(Map.of(
+            memberId1, new MemberAssignmentImpl(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+            )),
+            memberId2, new MemberAssignmentImpl(mkAssignment(
+                mkTopicAssignment(barTopicId, 0, 1, 2)
+            ))
+        )));
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> result2 = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId2)
+                .setMemberEpoch(10));
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId2)
+                .setMemberEpoch(11)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setTopicPartitions(List.of(
+                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(barTopicId)
+                            .setPartitions(List.of(0, 1, 2))
+                    ))),
+            result2.response()
+        );
+
+        // Member 1 is told to rebalance now that the new assignment is 
available.
+        assertThrows(RebalanceInProgressException.class, () -> 
context.sendClassicGroupSync(
+            new GroupMetadataManagerTestContext.SyncGroupRequestBuilder()
+                .withGroupId(groupId)
+                .withMemberId(memberId1)
+                .withGenerationId(10)
+                .build())
+        );
+    }
+
     @Test
     public void testClassicGroupHeartbeatToConsumerGroupMaintainsSession() 
throws Exception {
         String groupId = "group-id";
@@ -15241,7 +15375,7 @@ public class GroupMetadataManagerTest {
                 )))
         );
 
-        // Member 1 has a member epoch smaller than the group epoch.
+        // Member 1 has a member epoch smaller than the target assignment 
epoch.
         ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder(memberId1)
             .setRebalanceTimeoutMs(rebalanceTimeout)
             .setClassicMemberMetadata(
@@ -15283,7 +15417,8 @@ public class GroupMetadataManagerTest {
                 .withMember(member1)
                 .withMember(member2)
                 .withMember(member3)
-                .withAssignment(memberId3, 
mkAssignment(mkTopicAssignment(barTopicId, 0, 1, 2))))
+                .withAssignment(memberId3, 
mkAssignment(mkTopicAssignment(barTopicId, 0, 1, 2)))
+                .withAssignmentEpoch(10))
             .build();
 
         List.of(memberId1, memberId2, memberId3).forEach(memberId -> {
@@ -15397,6 +15532,136 @@ public class GroupMetadataManagerTest {
         ));
     }
 
+    @Test
+    public void 
testClassicGroupHeartbeatToConsumerGroupDuringAssignmentDelay() {
+        String groupId = "fooup";
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+        List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols = 
List.of(
+            new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+                .setName("range")
+                
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+                    List.of(fooTopicName),
+                    null,
+                    List.of(
+                        new TopicPartition(fooTopicName, 0),
+                        new TopicPartition(fooTopicName, 1),
+                        new TopicPartition(fooTopicName, 2)
+                    )
+                ))))
+        );
+
+        ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder(memberId1)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setSubscribedTopicNames(List.of(fooTopicName))
+            .setServerAssignorName("range")
+            .setRebalanceTimeoutMs(45000)
+            .setClassicMemberMetadata(
+                new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+                    .setSessionTimeoutMs(5000)
+                    .setSupportedProtocols(protocols)
+            )
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2)))
+            .build();
+        ConsumerGroupMember member2 = new 
ConsumerGroupMember.Builder(memberId2)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setSubscribedTopicNames(List.of(barTopicName))
+            .setServerAssignorName("range")
+            .setRebalanceTimeoutMs(45000)
+            .setAssignedPartitions(mkAssignment())
+            .build();
+
+        CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 6)
+            .addTopic(barTopicId, barTopicName, 3)
+            .addRacks()
+            .buildCoordinatorMetadataImage();
+
+        // Consumer group with two members.
+        // Member 1 uses the classic protocol and member 2 uses the consumer 
protocol.
+        // Member 2 has just changed subscription from foo to bar and the new 
assignment has not
+        // been computed yet.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
List.of(assignor))
+            .withMetadataImage(metadataImage)
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 11)
+                .withMember(member1)
+                .withMember(member2)
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                .withAssignment(memberId2, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5)))
+                .withAssignmentEpoch(10)
+                .withMetadataHash(computeGroupHash(Map.of(
+                    fooTopicName, computeTopicHash(fooTopicName, 
metadataImage),
+                    barTopicName, computeTopicHash(barTopicName, metadataImage)
+                ))))
+            .build();
+
+        // Member 1 is not told to rebalance yet.
+        HeartbeatResponseData heartbeatResponse1 = 
context.sendClassicGroupHeartbeat(
+            new HeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId1)
+                .setGenerationId(10)
+        ).response();
+        assertEquals(Errors.NONE.code(), heartbeatResponse1.errorCode());
+
+        // Member 2 heartbeats and triggers a new assignment.
+        assignor.prepareGroupAssignment(new GroupAssignment(Map.of(
+            memberId1, new MemberAssignmentImpl(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+            )),
+            memberId2, new MemberAssignmentImpl(mkAssignment(
+                mkTopicAssignment(barTopicId, 0, 1, 2)
+            ))
+        )));
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId2)
+                .setMemberEpoch(10));
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId2)
+                .setMemberEpoch(11)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setTopicPartitions(List.of(
+                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(barTopicId)
+                            .setPartitions(List.of(0, 1, 2))
+                    ))),
+            result.response()
+        );
+
+        // Member 1 is told to rebalance now that the new assignment is 
available.
+        HeartbeatResponseData heartbeatResponse2 = 
context.sendClassicGroupHeartbeat(
+            new HeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId1)
+                .setGenerationId(10)
+        ).response();
+        assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), 
heartbeatResponse2.errorCode());
+    }
+
     @Test
     public void 
testConsumerGroupMemberUsingClassicProtocolFencedWhenSessionTimeout() {
         String groupId = "group-id";
@@ -15475,7 +15740,7 @@ public class GroupMetadataManagerTest {
                 )))
         );
 
-        // Consumer group with a member using the classic protocol whose 
member epoch is smaller than the group epoch.
+        // Consumer group with a member using the classic protocol whose 
member epoch is smaller than the target assignment epoch.
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
             .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
                 .withMember(new ConsumerGroupMember.Builder(memberId)
@@ -15486,7 +15751,8 @@ public class GroupMetadataManagerTest {
                             .setSupportedProtocols(protocols)
                     )
                     .setMemberEpoch(9)
-                    .build()))
+                    .build())
+                .withAssignmentEpoch(10))
             .build();
 
         // Heartbeat to schedule the join timeout.

Reply via email to