This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6ef10a9bbe2 KAFKA-20272: Rebalance classic members only on new
assignment (#21662)
6ef10a9bbe2 is described below
commit 6ef10a9bbe2c6ee637a0970f89eabcdaadb20438
Author: Sean Quah <[email protected]>
AuthorDate: Mon Mar 9 14:33:35 2026 +0000
KAFKA-20272: Rebalance classic members only on new assignment (#21662)
When assignment batching or offload are enabled, the target assignment
can lag behind the group epoch. We must only tell classic members in
mixed groups to rejoin when a newer target assignment is available.
Reviewers: David Jacot <[email protected]>
---
.../coordinator/group/GroupMetadataManager.java | 8 +-
.../group/GroupMetadataManagerTest.java | 278 ++++++++++++++++++++-
2 files changed, 276 insertions(+), 10 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 624d588149c..925befaeef9 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -1850,11 +1850,11 @@ public class GroupMetadataManager {
ConsumerGroup group,
ConsumerGroupMember member
) {
- // If the group epoch is greater than the member epoch, there is a new
rebalance triggered and the member
+ // If the target assignment epoch is greater than the member epoch,
there is a new rebalance triggered and the member
// needs to rejoin to catch up. However, if the member is in
UNREVOKED_PARTITIONS state, it means the
// member has already rejoined, so it needs to first finish revoking
the partitions and the reconciliation,
// and then the next rejoin will be triggered automatically if needed.
- if (group.groupEpoch() > member.memberEpoch() &&
!member.state().equals(MemberState.UNREVOKED_PARTITIONS)) {
+ if (group.assignmentEpoch() > member.memberEpoch() &&
!member.state().equals(MemberState.UNREVOKED_PARTITIONS)) {
scheduleConsumerGroupJoinTimeoutIfAbsent(group.groupId(),
member.memberId(), member.rebalanceTimeoutMs());
throw Errors.REBALANCE_IN_PROGRESS.exception(
String.format("A new rebalance is triggered in group %s and
member %s should rejoin to catch up.",
@@ -7832,10 +7832,10 @@ public class GroupMetadataManager {
Errors error = Errors.NONE;
// The member should rejoin if any of the following conditions is met.
- // 1) The group epoch is bumped so the member need to rejoin to catch
up.
+ // 1) The target assignment epoch is bumped so the member needs to
rejoin to catch up.
// 2) The member needs to revoke some partitions and rejoin to
reconcile with the new epoch.
// 3) The member's partitions pending assignment are free, so it can
rejoin to get the complete assignment.
- if (member.memberEpoch() < group.groupEpoch() ||
+ if (member.memberEpoch() < group.assignmentEpoch() ||
member.state() == MemberState.UNREVOKED_PARTITIONS ||
(member.state() == MemberState.UNRELEASED_PARTITIONS &&
!group.waitingOnUnreleasedPartition(member))) {
error = Errors.REBALANCE_IN_PROGRESS;
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index e281ba3bb88..d0a961d2f14 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -15137,7 +15137,7 @@ public class GroupMetadataManagerTest {
);
// Consumer group with a member using the classic protocol.
- // The group epoch is greater than the member epoch.
+ // The target assignment epoch is greater than the member epoch.
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG,
ConsumerGroupMigrationPolicy.DOWNGRADE.toString())
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 11)
@@ -15149,7 +15149,8 @@ public class GroupMetadataManagerTest {
.setSupportedProtocols(protocols)
)
.setMemberEpoch(10)
- .build()))
+ .build())
+ .withAssignmentEpoch(11))
.build();
assertThrows(RebalanceInProgressException.class, () ->
context.sendClassicGroupSync(
@@ -15164,6 +15165,139 @@ public class GroupMetadataManagerTest {
context.assertJoinTimeout(groupId, memberId, 10000);
}
+ @Test
+ public void testClassicGroupSyncToConsumerGroupDuringAssignmentDelay() {
+ String groupId = "fooup";
+ String memberId1 = Uuid.randomUuid().toString();
+ String memberId2 = Uuid.randomUuid().toString();
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+ List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols =
List.of(
+ new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+ .setName("range")
+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new
ConsumerPartitionAssignor.Subscription(
+ List.of(fooTopicName),
+ null,
+ List.of(
+ new TopicPartition(fooTopicName, 0),
+ new TopicPartition(fooTopicName, 1),
+ new TopicPartition(fooTopicName, 2)
+ )
+ ))))
+ );
+
+ ConsumerGroupMember member1 = new
ConsumerGroupMember.Builder(memberId1)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(List.of(fooTopicName))
+ .setServerAssignorName("range")
+ .setRebalanceTimeoutMs(45000)
+ .setClassicMemberMetadata(
+ new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+ .setSessionTimeoutMs(5000)
+ .setSupportedProtocols(protocols)
+ )
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2)))
+ .build();
+ ConsumerGroupMember member2 = new
ConsumerGroupMember.Builder(memberId2)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(List.of(barTopicName))
+ .setServerAssignorName("range")
+ .setRebalanceTimeoutMs(45000)
+ .setAssignedPartitions(mkAssignment())
+ .build();
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .addTopic(barTopicId, barTopicName, 3)
+ .addRacks()
+ .buildCoordinatorMetadataImage();
+
+ // Consumer group with two members.
+ // Member 1 uses the classic protocol and member 2 uses the consumer
protocol.
+ // Member 2 has just changed subscription from foo to bar and the new
assignment has not
+ // been computed yet.
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withMetadataImage(metadataImage)
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 11)
+ .withMember(member1)
+ .withMember(member2)
+ .withAssignment(memberId1, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2)))
+ .withAssignment(memberId2, mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4, 5)))
+ .withAssignmentEpoch(10)
+ .withMetadataHash(computeGroupHash(Map.of(
+ fooTopicName, computeTopicHash(fooTopicName,
metadataImage),
+ barTopicName, computeTopicHash(barTopicName, metadataImage)
+ ))))
+ .build();
+
+ // Member 1 is not told to rebalance yet.
+ assertDoesNotThrow(() -> {
+ GroupMetadataManagerTestContext.SyncResult syncResult =
context.sendClassicGroupSync(
+ new GroupMetadataManagerTestContext.SyncGroupRequestBuilder()
+ .withGroupId(groupId)
+ .withMemberId(memberId1)
+ .withGenerationId(10)
+ .build()
+ );
+ syncResult.appendFuture.complete(null);
+ });
+
+ // Member 2 heartbeats and triggers a new assignment.
+ assignor.prepareGroupAssignment(new GroupAssignment(Map.of(
+ memberId1, new MemberAssignmentImpl(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+ )),
+ memberId2, new MemberAssignmentImpl(mkAssignment(
+ mkTopicAssignment(barTopicId, 0, 1, 2)
+ ))
+ )));
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result2 = context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId2)
+ .setMemberEpoch(10));
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId2)
+ .setMemberEpoch(11)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List.of(
+ new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(barTopicId)
+ .setPartitions(List.of(0, 1, 2))
+ ))),
+ result2.response()
+ );
+
+ // Member 1 is told to rebalance now that the new assignment is
available.
+ assertThrows(RebalanceInProgressException.class, () ->
context.sendClassicGroupSync(
+ new GroupMetadataManagerTestContext.SyncGroupRequestBuilder()
+ .withGroupId(groupId)
+ .withMemberId(memberId1)
+ .withGenerationId(10)
+ .build())
+ );
+ }
+
@Test
public void testClassicGroupHeartbeatToConsumerGroupMaintainsSession()
throws Exception {
String groupId = "group-id";
@@ -15241,7 +15375,7 @@ public class GroupMetadataManagerTest {
)))
);
- // Member 1 has a member epoch smaller than the group epoch.
+ // Member 1 has a member epoch smaller than the target assignment
epoch.
ConsumerGroupMember member1 = new
ConsumerGroupMember.Builder(memberId1)
.setRebalanceTimeoutMs(rebalanceTimeout)
.setClassicMemberMetadata(
@@ -15283,7 +15417,8 @@ public class GroupMetadataManagerTest {
.withMember(member1)
.withMember(member2)
.withMember(member3)
- .withAssignment(memberId3,
mkAssignment(mkTopicAssignment(barTopicId, 0, 1, 2))))
+ .withAssignment(memberId3,
mkAssignment(mkTopicAssignment(barTopicId, 0, 1, 2)))
+ .withAssignmentEpoch(10))
.build();
List.of(memberId1, memberId2, memberId3).forEach(memberId -> {
@@ -15397,6 +15532,136 @@ public class GroupMetadataManagerTest {
));
}
+ @Test
+ public void
testClassicGroupHeartbeatToConsumerGroupDuringAssignmentDelay() {
+ String groupId = "fooup";
+ String memberId1 = Uuid.randomUuid().toString();
+ String memberId2 = Uuid.randomUuid().toString();
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+ List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols =
List.of(
+ new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+ .setName("range")
+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new
ConsumerPartitionAssignor.Subscription(
+ List.of(fooTopicName),
+ null,
+ List.of(
+ new TopicPartition(fooTopicName, 0),
+ new TopicPartition(fooTopicName, 1),
+ new TopicPartition(fooTopicName, 2)
+ )
+ ))))
+ );
+
+ ConsumerGroupMember member1 = new
ConsumerGroupMember.Builder(memberId1)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(List.of(fooTopicName))
+ .setServerAssignorName("range")
+ .setRebalanceTimeoutMs(45000)
+ .setClassicMemberMetadata(
+ new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+ .setSessionTimeoutMs(5000)
+ .setSupportedProtocols(protocols)
+ )
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2)))
+ .build();
+ ConsumerGroupMember member2 = new
ConsumerGroupMember.Builder(memberId2)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(List.of(barTopicName))
+ .setServerAssignorName("range")
+ .setRebalanceTimeoutMs(45000)
+ .setAssignedPartitions(mkAssignment())
+ .build();
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .addTopic(barTopicId, barTopicName, 3)
+ .addRacks()
+ .buildCoordinatorMetadataImage();
+
+ // Consumer group with two members.
+ // Member 1 uses the classic protocol and member 2 uses the consumer
protocol.
+ // Member 2 has just changed subscription from foo to bar and the new
assignment has not
+ // been computed yet.
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withMetadataImage(metadataImage)
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 11)
+ .withMember(member1)
+ .withMember(member2)
+ .withAssignment(memberId1, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2)))
+ .withAssignment(memberId2, mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4, 5)))
+ .withAssignmentEpoch(10)
+ .withMetadataHash(computeGroupHash(Map.of(
+ fooTopicName, computeTopicHash(fooTopicName,
metadataImage),
+ barTopicName, computeTopicHash(barTopicName, metadataImage)
+ ))))
+ .build();
+
+ // Member 1 is not told to rebalance yet.
+ HeartbeatResponseData heartbeatResponse1 =
context.sendClassicGroupHeartbeat(
+ new HeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId1)
+ .setGenerationId(10)
+ ).response();
+ assertEquals(Errors.NONE.code(), heartbeatResponse1.errorCode());
+
+ // Member 2 heartbeats and triggers a new assignment.
+ assignor.prepareGroupAssignment(new GroupAssignment(Map.of(
+ memberId1, new MemberAssignmentImpl(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+ )),
+ memberId2, new MemberAssignmentImpl(mkAssignment(
+ mkTopicAssignment(barTopicId, 0, 1, 2)
+ ))
+ )));
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result = context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId2)
+ .setMemberEpoch(10));
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId2)
+ .setMemberEpoch(11)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List.of(
+ new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(barTopicId)
+ .setPartitions(List.of(0, 1, 2))
+ ))),
+ result.response()
+ );
+
+ // Member 1 is told to rebalance now that the new assignment is
available.
+ HeartbeatResponseData heartbeatResponse2 =
context.sendClassicGroupHeartbeat(
+ new HeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId1)
+ .setGenerationId(10)
+ ).response();
+ assertEquals(Errors.REBALANCE_IN_PROGRESS.code(),
heartbeatResponse2.errorCode());
+ }
+
@Test
public void
testConsumerGroupMemberUsingClassicProtocolFencedWhenSessionTimeout() {
String groupId = "group-id";
@@ -15475,7 +15740,7 @@ public class GroupMetadataManagerTest {
)))
);
- // Consumer group with a member using the classic protocol whose
member epoch is smaller than the group epoch.
+ // Consumer group with a member using the classic protocol whose
member epoch is smaller than the target assignment epoch.
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
.withMember(new ConsumerGroupMember.Builder(memberId)
@@ -15486,7 +15751,8 @@ public class GroupMetadataManagerTest {
.setSupportedProtocols(protocols)
)
.setMemberEpoch(9)
- .build()))
+ .build())
+ .withAssignmentEpoch(10))
.build();
// Heartbeat to schedule the join timeout.