This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4ea29da3ca1 KAFKA-20273: Rebalance on classic group downgrade with
stale assignment (#21663)
4ea29da3ca1 is described below
commit 4ea29da3ca1776f6870905eed015ef5751e4866c
Author: Sean Quah <[email protected]>
AuthorDate: Mon Mar 9 14:43:22 2026 +0000
KAFKA-20273: Rebalance on classic group downgrade with stale assignment
(#21663)
When assignment batching or offload are enabled, the target assignment
can lag behind member subscriptions. When downgrading a consumer group
to a classic group, we must check if the target assignment is stale and
enter the PREPARING_REBALANCE state if needed. Downgrades triggered by a
member leaving the group already trigger a rebalance, so there is no
change on that path.
Reviewers: David Jacot <[email protected]>
---
.../coordinator/group/GroupMetadataManager.java | 32 +++---
.../group/GroupMetadataManagerTest.java | 113 +++++++++++++++++++++
2 files changed, 133 insertions(+), 12 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 925befaeef9..a03e8900555 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -1263,15 +1263,16 @@ public class GroupMetadataManager {
* @param leavingMembers The leaving member(s) that triggered
the downgrade validation.
* @param joiningMember The newly joined member if the
downgrade is triggered by static member replacement.
* When not null, must have an instanceId
that matches the replaced member.
- * @param hasSubscriptionChanged The boolean indicating whether the
joining member has a different subscription
- * from the replaced member. Only used
when joiningMember is set.
+ * @param rebalance Whether to trigger a rebalance after
the downgrade.
+ * @param rebalanceReason The reason for the rebalance, if
{@code rebalance} is {@code true}.
* @param records The record list to which the
conversion records are added.
*/
private void convertToClassicGroup(
ConsumerGroup consumerGroup,
Set<ConsumerGroupMember> leavingMembers,
ConsumerGroupMember joiningMember,
- boolean hasSubscriptionChanged,
+ boolean rebalance,
+ String rebalanceReason,
List<CoordinatorRecord> records
) {
if (joiningMember == null) {
@@ -1312,12 +1313,8 @@ public class GroupMetadataManager {
classicGroup.allMembers().forEach(member ->
rescheduleClassicGroupMemberHeartbeat(classicGroup, member));
- // If the downgrade is triggered by a member leaving the group or a
static
- // member replacement with a different subscription, a rebalance
should be triggered.
- if (joiningMember == null) {
- prepareRebalance(classicGroup, String.format("Downgrade group %s
from consumer to classic for member leaving.", classicGroup.groupId()));
- } else if (hasSubscriptionChanged) {
- prepareRebalance(classicGroup, String.format("Downgrade group %s
from consumer to classic for static member replacement with different
subscription.", classicGroup.groupId()));
+ if (rebalance) {
+ prepareRebalance(classicGroup, rebalanceReason);
}
log.info("[GroupId {}] Converted the consumer group to a classic
group.", consumerGroup.groupId());
@@ -2585,7 +2582,8 @@ public class GroupMetadataManager {
// 2. If the static member subscription hasn't changed, reconcile
the member's assignment with the existing
// assignment if the member is not fully reconciled yet. If the
static member subscription has changed, a
// rebalance will be triggered during downgrade anyway so we can
skip the reconciliation.
- if (!bumpGroupEpoch) {
+ boolean rebalance = group.assignmentEpoch() < groupEpoch;
+ if (!rebalance) {
updatedMember = maybeReconcile(
groupId,
updatedMember,
@@ -2604,7 +2602,10 @@ public class GroupMetadataManager {
group,
Set.of(),
updatedMember,
- bumpGroupEpoch,
+ rebalance,
+ rebalance ?
+ String.format("Downgrade group %s from consumer to classic
with stale assignment.", group.groupId()) :
+ null,
records
);
} else {
@@ -4209,7 +4210,14 @@ public class GroupMetadataManager {
List<CoordinatorRecord> records = new ArrayList<>();
if (validateOnlineDowngradeWithFencedMembers(group, members)) {
- convertToClassicGroup(group, members, null, false, records);
+ convertToClassicGroup(
+ group,
+ members,
+ null,
+ true,
+ String.format("Downgrade group %s from consumer to classic for
member leaving.", group.groupId()),
+ records
+ );
return new CoordinatorResult<>(records, response, null, false);
} else {
for (ConsumerGroupMember member : members) {
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index d0a961d2f14..e4bf1c40a10 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -13567,6 +13567,119 @@ public class GroupMetadataManagerTest {
}
}
+ @Test
+ public void
testLastStaticConsumerProtocolMemberReplacedByClassicProtocolMemberWhenTargetAssignmentIsStale()
throws ExecutionException, InterruptedException {
+ String groupId = "group-id";
+ String memberId1 = Uuid.randomUuid().toString();
+ String oldMemberId2 = Uuid.randomUuid().toString();
+ String instanceId = "instance-id";
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+
+ List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols1 =
List.of(
+ new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+ .setName(NoOpPartitionAssignor.NAME)
+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new
ConsumerPartitionAssignor.Subscription(
+ List.of(fooTopicName, barTopicName),
+ null,
+ List.of(
+ new TopicPartition(fooTopicName, 0),
+ new TopicPartition(fooTopicName, 1),
+ new TopicPartition(fooTopicName, 2),
+ new TopicPartition(barTopicName, 0),
+ new TopicPartition(barTopicName, 1)
+ )
+ ))))
+ );
+
+ ConsumerGroupMember member1 = new
ConsumerGroupMember.Builder(memberId1)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(List.of(fooTopicName, barTopicName))
+ .setServerAssignorName(NoOpPartitionAssignor.NAME)
+ .setRebalanceTimeoutMs(45000)
+ .setClassicMemberMetadata(
+ new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+ .setSessionTimeoutMs(5000)
+ .setSupportedProtocols(protocols1)
+ )
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2),
+ mkTopicAssignment(barTopicId, 0, 1)))
+ .build();
+ ConsumerGroupMember oldMember2 = new
ConsumerGroupMember.Builder(oldMemberId2)
+ .setInstanceId(instanceId)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(List.of(fooTopicName, barTopicName))
+ .setServerAssignorName(NoOpPartitionAssignor.NAME)
+ .setRebalanceTimeoutMs(45000)
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4, 5)))
+ .build();
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .addTopic(barTopicId, barTopicName, 2)
+ .addRacks()
+ .buildCoordinatorMetadataImage();
+
+ // Consumer group with two members.
+ // Member 1 uses the classic protocol and static member 2 uses the
consumer protocol.
+ // Member 2 has just changed subscription from foo to bar and the new
assignment has not
+ // been computed yet.
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG,
ConsumerGroupMigrationPolicy.DOWNGRADE.toString())
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(new
NoOpPartitionAssignor()))
+ .withMetadataImage(metadataImage)
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 11)
+ .withMember(member1)
+ .withMember(oldMember2)
+ .withAssignment(memberId1, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2),
+ mkTopicAssignment(barTopicId, 0, 1)))
+ .withAssignment(oldMemberId2, mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4, 5)))
+ .withAssignmentEpoch(10)
+ .withMetadataHash(computeGroupHash(Map.of(
+ fooTopicName, computeTopicHash(fooTopicName,
metadataImage),
+ barTopicName, computeTopicHash(barTopicName, metadataImage)
+ ))))
+ .build();
+
+ // A new member using classic protocol with the same instance id
joins, scheduling the downgrade.
+ byte[] protocolsMetadata2 =
Utils.toArray(ConsumerProtocol.serializeSubscription(new
ConsumerPartitionAssignor.Subscription(
+ List.of(fooTopicName, barTopicName))));
+ JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols2 =
+ new JoinGroupRequestData.JoinGroupRequestProtocolCollection(1);
+ protocols2.add(new JoinGroupRequestProtocol()
+ .setName(NoOpPartitionAssignor.NAME)
+ .setMetadata(protocolsMetadata2));
+ JoinGroupRequestData joinRequest = new
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+ .withGroupId(groupId)
+ .withMemberId(UNKNOWN_MEMBER_ID)
+ .withGroupInstanceId(instanceId)
+ .withProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+ .withProtocols(protocols2)
+ .build();
+ GroupMetadataManagerTestContext.JoinResult result =
context.sendClassicGroupJoin(joinRequest);
+ result.appendFuture.complete(null);
+ result.joinFuture.get();
+
+ // A rebalance is triggered.
+ ClassicGroup classicGroup =
context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId, false);
+ assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
+ }
+
@Test
public void testJoiningConsumerGroupThrowsExceptionIfGroupOverMaxSize() {
String groupId = "group-id";