This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.1 by this push:
new e76213e1824 KAFKA-19546: Rebalance should be triggered by subscription
change during group protocol downgrade (#20581)
e76213e1824 is described below
commit e76213e182435505c6760fb7630b67a881ac5f74
Author: Dongnuo Lyu <[email protected]>
AuthorDate: Fri Sep 26 05:00:23 2025 -0400
KAFKA-19546: Rebalance should be triggered by subscription change during
group protocol downgrade (#20581)
Cherry-pick KAFKA-19546 to 4.1.
During online downgrade, when a static member using the consumer
protocol which is also the last member using the consumer protocol is
replaced by another static member using the classic protocol with the
same instance id, the latter will take the assignment of the former and
an online downgrade will be triggered.
In the current implementation, if the replacing static member has a
different subscription, no rebalance will be triggered when the
downgrade happens. The patch checks whether the static member has
changed subscription and triggers a rebalance when it does.
Reviewers: Sean Quah <[email protected]>, David Jacot <[email protected]>
---
.../coordinator/group/GroupMetadataManager.java | 104 ++++++++++-------
.../group/GroupMetadataManagerTest.java | 129 +++++++++++++--------
2 files changed, 144 insertions(+), 89 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 080fa265221..96d4fc08697 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -1252,16 +1252,19 @@ public class GroupMetadataManager {
/**
* Creates a ClassicGroup corresponding to the given ConsumerGroup.
*
- * @param consumerGroup The converted ConsumerGroup.
- * @param leavingMembers The leaving member(s) that triggered the
downgrade validation.
- * @param joiningMember The newly joined member if the downgrade is
triggered by static member replacement.
- * When not null, must have an instanceId that
matches an existing member.
- * @param records The record list to which the conversion
records are added.
+ * @param consumerGroup The converted ConsumerGroup.
+ * @param leavingMembers The leaving member(s) that triggered
the downgrade validation.
+ * @param joiningMember The newly joined member if the
downgrade is triggered by static member replacement.
+ * When not null, must have an instanceId
that matches the replaced member.
+ * @param hasSubscriptionChanged The boolean indicating whether the
joining member has a different subscription
+ * from the replaced member. Only used
when joiningMember is set.
+ * @param records The record list to which the
conversion records are added.
*/
private void convertToClassicGroup(
ConsumerGroup consumerGroup,
Set<ConsumerGroupMember> leavingMembers,
ConsumerGroupMember joiningMember,
+ boolean hasSubscriptionChanged,
List<CoordinatorRecord> records
) {
if (joiningMember == null) {
@@ -1302,9 +1305,12 @@ public class GroupMetadataManager {
classicGroup.allMembers().forEach(member ->
rescheduleClassicGroupMemberHeartbeat(classicGroup, member));
- // If the downgrade is triggered by a member leaving the group, a
rebalance should be triggered.
+ // If the downgrade is triggered by a member leaving the group or a
static
+ // member replacement with a different subscription, a rebalance
should be triggered.
if (joiningMember == null) {
- prepareRebalance(classicGroup, String.format("Downgrade group %s
from consumer to classic.", classicGroup.groupId()));
+ prepareRebalance(classicGroup, String.format("Downgrade group %s
from consumer to classic for member leaving.", classicGroup.groupId()));
+ } else if (hasSubscriptionChanged) {
+ prepareRebalance(classicGroup, String.format("Downgrade group %s
from consumer to classic for static member replacement with different
subscription.", classicGroup.groupId()));
}
log.info("[GroupId {}] Converted the consumer group to a classic
group.", consumerGroup.groupId());
@@ -2397,6 +2403,10 @@ public class GroupMetadataManager {
);
}
+ ConsumerGroupMember existingStaticMemberOrNull =
group.staticMember(request.groupInstanceId());
+ boolean downgrade = existingStaticMemberOrNull != null &&
+ validateOnlineDowngradeWithReplacedMember(group,
existingStaticMemberOrNull);
+
int groupEpoch = group.groupEpoch();
SubscriptionType subscriptionType = group.subscriptionType();
final ConsumerProtocolSubscription subscription =
deserializeSubscription(protocols);
@@ -2443,49 +2453,61 @@ public class GroupMetadataManager {
subscriptionType = result.subscriptionType;
}
- // 2. Update the target assignment if the group epoch is larger than
the target assignment epoch. The delta between
- // the existing and the new target assignment is persisted to the
partition.
- final int targetAssignmentEpoch;
- final Assignment targetAssignment;
+ if (downgrade) {
+ // 2. If the static member subscription hasn't changed, reconcile
the member's assignment with the existing
+ // assignment if the member is not fully reconciled yet. If the
static member subscription has changed, a
+ // rebalance will be triggered during downgrade anyway so we can
skip the reconciliation.
+ if (!bumpGroupEpoch) {
+ updatedMember = maybeReconcile(
+ groupId,
+ updatedMember,
+ group::currentPartitionEpoch,
+ group.assignmentEpoch(),
+ group.targetAssignment(updatedMember.memberId(),
updatedMember.instanceId()),
+ toTopicPartitions(subscription.ownedPartitions(),
metadataImage.topics()),
+ records
+ );
+ }
- if (groupEpoch > group.assignmentEpoch()) {
- targetAssignment = updateTargetAssignment(
+ // 3. Downgrade the consumer group.
+ convertToClassicGroup(
group,
- groupEpoch,
- member,
+ Set.of(),
updatedMember,
- subscriptionType,
+ bumpGroupEpoch,
records
);
- targetAssignmentEpoch = groupEpoch;
} else {
- targetAssignmentEpoch = group.assignmentEpoch();
- targetAssignment =
group.targetAssignment(updatedMember.memberId(), updatedMember.instanceId());
+ // If no downgrade is triggered.
- }
+ // 2. Update the target assignment if the group epoch is larger
than the target assignment epoch.
+ // The delta between the existing and the new target assignment is
persisted to the partition.
+ final int targetAssignmentEpoch;
+ final Assignment targetAssignment;
- // 3. Reconcile the member's assignment with the target assignment if
the member is not
- // fully reconciled yet.
- updatedMember = maybeReconcile(
- groupId,
- updatedMember,
- group::currentPartitionEpoch,
- targetAssignmentEpoch,
- targetAssignment,
- toTopicPartitions(subscription.ownedPartitions(),
metadataImage.topics()),
- records
- );
+ if (groupEpoch > group.assignmentEpoch()) {
+ targetAssignment = updateTargetAssignment(
+ group,
+ groupEpoch,
+ member,
+ updatedMember,
+ subscriptionType,
+ records
+ );
+ targetAssignmentEpoch = groupEpoch;
+ } else {
+ targetAssignmentEpoch = group.assignmentEpoch();
+ targetAssignment =
group.targetAssignment(updatedMember.memberId(), updatedMember.instanceId());
+ }
- // 4. Maybe downgrade the consumer group if the last static member
using the
- // consumer protocol is replaced by the joining static member.
- ConsumerGroupMember existingStaticMemberOrNull =
group.staticMember(request.groupInstanceId());
- boolean downgrade = existingStaticMemberOrNull != null &&
- validateOnlineDowngradeWithReplacedMember(group,
existingStaticMemberOrNull);
- if (downgrade) {
- convertToClassicGroup(
- group,
- Set.of(),
+ // 3. Reconcile the member's assignment with the target assignment
if the member is not fully reconciled yet.
+ updatedMember = maybeReconcile(
+ groupId,
updatedMember,
+ group::currentPartitionEpoch,
+ targetAssignmentEpoch,
+ targetAssignment,
+ toTopicPartitions(subscription.ownedPartitions(),
metadataImage.topics()),
records
);
}
@@ -4058,7 +4080,7 @@ public class GroupMetadataManager {
List<CoordinatorRecord> records = new ArrayList<>();
if (validateOnlineDowngradeWithFencedMembers(group, members)) {
- convertToClassicGroup(group, members, null, records);
+ convertToClassicGroup(group, members, null, false, records);
return new CoordinatorResult<>(records, response, null, false);
} else {
for (ConsumerGroupMember member : members) {
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index bc5afd7704f..880c7cbbc59 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -12451,8 +12451,11 @@ public class GroupMetadataManagerTest {
assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
}
- @Test
- public void
testLastStaticConsumerProtocolMemberReplacedByClassicProtocolMember() throws
ExecutionException, InterruptedException {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void
testLastStaticConsumerProtocolMemberReplacedByClassicProtocolMember(
+ boolean isSubscriptionChanged
+ ) throws ExecutionException, InterruptedException {
String groupId = "group-id";
String memberId1 = Uuid.randomUuid().toString();
String oldMemberId2 = Uuid.randomUuid().toString();
@@ -12463,11 +12466,9 @@ public class GroupMetadataManagerTest {
Uuid barTopicId = Uuid.randomUuid();
String barTopicName = "bar";
- MockPartitionAssignor assignor = new MockPartitionAssignor("range");
-
List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols1 =
List.of(
new ConsumerGroupMemberMetadataValue.ClassicProtocol()
- .setName("range")
+ .setName(NoOpPartitionAssignor.NAME)
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new
ConsumerPartitionAssignor.Subscription(
List.of(fooTopicName, barTopicName),
null,
@@ -12487,8 +12488,8 @@ public class GroupMetadataManagerTest {
.setPreviousMemberEpoch(9)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
- .setSubscribedTopicNames(List.of("foo", "bar"))
- .setServerAssignorName("range")
+ .setSubscribedTopicNames(List.of(fooTopicName, barTopicName))
+ .setServerAssignorName(NoOpPartitionAssignor.NAME)
.setRebalanceTimeoutMs(45000)
.setClassicMemberMetadata(
new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
@@ -12506,8 +12507,8 @@ public class GroupMetadataManagerTest {
.setPreviousMemberEpoch(9)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
- .setSubscribedTopicNames(List.of("foo"))
- .setServerAssignorName("range")
+ .setSubscribedTopicNames(List.of(fooTopicName))
+ .setServerAssignorName(NoOpPartitionAssignor.NAME)
.setRebalanceTimeoutMs(45000)
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4, 5)))
@@ -12518,12 +12519,14 @@ public class GroupMetadataManagerTest {
.addTopic(barTopicId, barTopicName, 2)
.addRacks()
.build();
+ long fooTopicHash = computeTopicHash(fooTopicName, metadataImage);
+ long barTopicHash = computeTopicHash(barTopicName, metadataImage);
// Consumer group with two members.
// Member 1 uses the classic protocol and static member 2 uses the
consumer protocol.
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG,
ConsumerGroupMigrationPolicy.DOWNGRADE.toString())
-
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(new
NoOpPartitionAssignor()))
.withMetadataImage(metadataImage)
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
.withMember(member1)
@@ -12543,12 +12546,19 @@ public class GroupMetadataManagerTest {
context.groupMetadataManager.consumerGroup(groupId).setMetadataRefreshDeadline(Long.MAX_VALUE,
10);
// A new member using classic protocol with the same instance id
joins, scheduling the downgrade.
+ byte[] protocolsMetadata2 =
Utils.toArray(ConsumerProtocol.serializeSubscription(new
ConsumerPartitionAssignor.Subscription(
+ isSubscriptionChanged ? List.of(fooTopicName, barTopicName) :
List.of(fooTopicName))));
+ JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols2 =
+ new JoinGroupRequestData.JoinGroupRequestProtocolCollection(1);
+ protocols2.add(new JoinGroupRequestProtocol()
+ .setName(NoOpPartitionAssignor.NAME)
+ .setMetadata(protocolsMetadata2));
JoinGroupRequestData joinRequest = new
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
.withGroupId(groupId)
.withMemberId(UNKNOWN_MEMBER_ID)
.withGroupInstanceId(instanceId)
.withProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
- .withDefaultProtocolTypeAndProtocols()
+ .withProtocols(protocols2)
.build();
GroupMetadataManagerTestContext.JoinResult result =
context.sendClassicGroupJoin(joinRequest);
result.appendFuture.complete(null);
@@ -12560,14 +12570,15 @@ public class GroupMetadataManagerTest {
.build();
ConsumerGroupMember expectedNewClassicMember2 = new
ConsumerGroupMember.Builder(oldMember2, newMemberId2)
.setPreviousMemberEpoch(0)
+ .setMemberEpoch(isSubscriptionChanged ? 11 : 10)
+ .setSubscribedTopicNames(isSubscriptionChanged ?
List.of(fooTopicName, barTopicName) : List.of(fooTopicName))
.setRebalanceTimeoutMs(joinRequest.rebalanceTimeoutMs())
.setClassicMemberMetadata(
new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
.setSessionTimeoutMs(joinRequest.sessionTimeoutMs())
.setSupportedProtocols(List.of(new
ConsumerGroupMemberMetadataValue.ClassicProtocol()
- .setName("range")
-
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new
ConsumerPartitionAssignor.Subscription(
- List.of(fooTopicName)))))))
+ .setName(NoOpPartitionAssignor.NAME)
+ .setMetadata(protocolsMetadata2)))
).build();
byte[] assignment1 =
Utils.toArray(ConsumerProtocol.serializeAssignment(new
ConsumerPartitionAssignor.Assignment(List.of(
@@ -12594,7 +12605,7 @@ public class GroupMetadataManagerTest {
context.time,
10,
Optional.of(ConsumerProtocol.PROTOCOL_TYPE),
- Optional.of("range"),
+ Optional.of(NoOpPartitionAssignor.NAME),
Optional.of(memberId1),
Optional.of(context.time.milliseconds())
);
@@ -12630,42 +12641,60 @@ public class GroupMetadataManagerTest {
assertTrue(Set.of(memberId1, newMemberId2).contains(leader));
expectedClassicGroup.setLeaderId(Optional.of(leader));
- assertUnorderedRecordsEquals(
- List.of(
- // Remove the existing member 2 that uses the consumer
protocol.
-
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
oldMemberId2)),
-
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
oldMemberId2)),
-
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
oldMemberId2)),
+ List<List<CoordinatorRecord>> replacingRecords = List.of(
+ // Remove the existing member 2 that uses the consumer protocol.
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
oldMemberId2)),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
oldMemberId2)),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
oldMemberId2)),
- // Create the new member 2 that uses the consumer protocol.
-
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedNewConsumerMember2)),
-
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
newMemberId2, expectedNewConsumerMember2.assignedPartitions())),
-
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedNewConsumerMember2)),
+ // Create the new member 2 that uses the consumer protocol.
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedNewConsumerMember2)),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
newMemberId2, expectedNewConsumerMember2.assignedPartitions())),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedNewConsumerMember2))
+ );
- // Update the new member 2 to the member that uses classic
protocol.
+ List<List<CoordinatorRecord>> memberUpdateRecords;
+ if (isSubscriptionChanged) {
+ memberUpdateRecords = List.of(
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedNewClassicMember2)),
-
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedNewClassicMember2)),
-
- // Remove member 1, member 2 and the consumer group.
- List.of(
-
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
memberId1),
-
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
newMemberId2)
- ),
- List.of(
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
memberId1),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
newMemberId2)
- ),
-
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord(groupId)),
- List.of(
-
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId1),
-
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
newMemberId2)
- ),
-
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId)),
-
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord(groupId)),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11,
computeGroupHash(Map.of(
+ fooTopicName, fooTopicHash,
+ barTopicName, barTopicHash
+ ))))
+ );
+ } else {
+ memberUpdateRecords = List.of(
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedNewClassicMember2)),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedNewClassicMember2))
+ );
+ }
- // Create the classic group.
-
List.of(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(expectedClassicGroup,
assignments))
+ List<List<CoordinatorRecord>> downgradeRecords = List.of(
+ // Remove member 1, member 2 and the consumer group.
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
memberId1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
newMemberId2)
+ ),
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
memberId1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
newMemberId2)
),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord(groupId)),
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
newMemberId2)
+ ),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId)),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord(groupId)),
+
+ // Create the classic group.
+
List.of(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(expectedClassicGroup,
assignments))
+ );
+
+ assertUnorderedRecordsEquals(
+ Stream.of(replacingRecords, memberUpdateRecords, downgradeRecords)
+ .flatMap(List::stream)
+ .collect(Collectors.toList()),
result.records
);
@@ -12675,9 +12704,13 @@ public class GroupMetadataManagerTest {
);
assertNotNull(heartbeatTimeout);
- // No rebalance is triggered.
+ // If the subscription is changed, a rebalance is triggered.
ClassicGroup classicGroup =
context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId, false);
- assertTrue(classicGroup.isInState(STABLE));
+ if (isSubscriptionChanged) {
+ assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
+ } else {
+ assertTrue(classicGroup.isInState(STABLE));
+ }
}
@Test