This is an automated email from the ASF dual-hosted git repository. dajac pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new fcb722dc883 KAFKA-18687: Setting the subscriptionMetadata during conversion to consumer group (#19790) fcb722dc883 is described below commit fcb722dc8833aad84493e3e91f7ed6997a629451 Author: Dongnuo Lyu <139248811+dongnuo...@users.noreply.github.com> AuthorDate: Tue May 27 05:25:57 2025 -0400 KAFKA-18687: Setting the subscriptionMetadata during conversion to consumer group (#19790) When a consumer protocol static member replaces an existing member in a classic group, it's not necessary to recompute the assignment. However, it happens anyway. In [ConsumerGroup.fromClassicGroup](https://github.com/apache/kafka/blob/0ff4dafb7de4e24ddb7961d52e50e728f2eee4eb/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java#L1140), we don't set the group's subscriptionMetadata. Later in the consumer group heartbeat, we [call updateSubscriptionMetadata](https://github.com/apache/kafka/blob/0ff4dafb7de4e24ddb7961d52e50e728f2eee4eb/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java#L1748), which [notices that the group's subscriptionMetadata needs an update](https://github.com/apache/kafka/blob/0ff4dafb7de4e24ddb7961d52e50e728f2eee4eb/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java#L2757) and bumps the epoch. Since the epoch is bumped, we [recompute the assignment](https://github.com/apache/kafka/blob/0ff4dafb7de4e24ddb7961d52e50e728f2eee4eb/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java#L1766). As a fix, this patch sets the subscriptionMetadata in ConsumerGroup.fromClassicGroup. Reviewers: Sean Quah <sq...@confluent.io>, David Jacot <dja...@confluent.io> --- .../coordinator/group/GroupMetadataManager.java | 3 +- .../group/modern/consumer/ConsumerGroup.java | 15 ++++- .../group/GroupMetadataManagerTest.java | 66 +++++++++------------- .../group/modern/consumer/ConsumerGroupTest.java | 8 ++- 4 files changed, 49 insertions(+), 43 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 7a1c98bc9e1..0df0d09c57a 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -1355,7 +1355,8 @@ public class GroupMetadataManager { snapshotRegistry, metrics, classicGroup, - metadataImage.topics() + metadataImage.topics(), + metadataImage.cluster() ); } catch (SchemaException e) { log.warn("Cannot upgrade classic group " + classicGroup.groupId() + diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java index 66d7de9b5fb..2be03621213 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java @@ -43,6 +43,7 @@ import org.apache.kafka.coordinator.group.modern.MemberState; import org.apache.kafka.coordinator.group.modern.ModernGroup; import org.apache.kafka.coordinator.group.modern.ModernGroupMember; import org.apache.kafka.coordinator.group.modern.SubscriptionCount; +import org.apache.kafka.image.ClusterImage; import org.apache.kafka.image.TopicsImage; import org.apache.kafka.timeline.SnapshotRegistry; import org.apache.kafka.timeline.TimelineHashMap; @@ -1129,7 +1130,8 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> { * @param snapshotRegistry The SnapshotRegistry. * @param metrics The GroupCoordinatorMetricsShard. * @param classicGroup The converted classic group. - * @param topicsImage The TopicsImage for topic id and topic name conversion. + * @param topicsImage The current metadata for all available topics. + * @param clusterImage The current metadata for the Kafka cluster. * @return The created ConsumerGroup. * * @throws SchemaException if any member's subscription or assignment cannot be deserialized. @@ -1139,7 +1141,8 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> { SnapshotRegistry snapshotRegistry, GroupCoordinatorMetricsShard metrics, ClassicGroup classicGroup, - TopicsImage topicsImage + TopicsImage topicsImage, + ClusterImage clusterImage ) { String groupId = classicGroup.groupId(); ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId, metrics); @@ -1195,6 +1198,12 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> { consumerGroup.updateMember(newMember); }); + consumerGroup.setSubscriptionMetadata(consumerGroup.computeSubscriptionMetadata( + consumerGroup.subscribedTopicNames(), + topicsImage, + clusterImage + )); + return consumerGroup; } @@ -1210,6 +1219,8 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> { records.add(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId(), consumerGroupMember)) ); + records.add(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId(), subscriptionMetadata())); + records.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId(), groupEpoch(), 0)); members().forEach((consumerGroupMemberId, consumerGroupMember) -> diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index b2e3a3e4acc..bf6884a5336 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -10239,6 +10239,10 @@ public class GroupMetadataManagerTest { // Create the new consumer group with member 1. GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1), + GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of( + fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1), + barTopicName, new TopicMetadata(barTopicId, barTopicName, 1) + )), GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 0, 0), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, expectedMember1.assignedPartitions()), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 0), @@ -10247,12 +10251,6 @@ public class GroupMetadataManagerTest { // Member 2 joins the new consumer group. GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2), - // The subscription metadata hasn't been updated during the conversion, so a new one is computed. - GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of( - fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1), - barTopicName, new TopicMetadata(barTopicId, barTopicName, 1) - )), - // Newly joining member 2 bumps the group epoch. A new target assignment is computed. GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, 0), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, assignor.targetPartitions(memberId2)), @@ -10454,6 +10452,11 @@ public class GroupMetadataManagerTest { GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1), GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2), + GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of( + fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2), + barTopicName, new TopicMetadata(barTopicId, barTopicName, 1) + )), + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 0, 0), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, expectedMember1.assignedPartitions()), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, expectedMember2.assignedPartitions()), @@ -10466,12 +10469,6 @@ public class GroupMetadataManagerTest { // Member 3 joins the new consumer group. GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember3), - // The subscription metadata hasn't been updated during the conversion, so a new one is computed. - GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of( - fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2), - barTopicName, new TopicMetadata(barTopicId, barTopicName, 1) - )), - // Newly joining member 3 bumps the group epoch. A new target assignment is computed. GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, 0), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, assignor.targetPartitions(memberId1)), @@ -10659,7 +10656,7 @@ public class GroupMetadataManagerTest { ); group.transitionTo(PREPARING_REBALANCE); - group.transitionTo(COMPLETING_REBALANCE); + group.initNextGeneration(); group.transitionTo(STABLE); context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(group, assignments)); @@ -10681,8 +10678,8 @@ public class GroupMetadataManagerTest { ConsumerGroupMember expectedClassicMember = new ConsumerGroupMember.Builder(memberId) .setInstanceId(instanceId) - .setMemberEpoch(0) - .setPreviousMemberEpoch(0) + .setMemberEpoch(group.generationId()) + .setPreviousMemberEpoch(group.generationId()) .setClientId(DEFAULT_CLIENT_ID) .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) .setSubscribedTopicNames(List.of(fooTopicName)) @@ -10718,7 +10715,7 @@ public class GroupMetadataManagerTest { .build(); ConsumerGroupMember expectedFinalConsumerMember = new ConsumerGroupMember.Builder(expectedReplacingConsumerMember) - .setMemberEpoch(1) + .setMemberEpoch(group.generationId()) .setServerAssignorName(NoOpPartitionAssignor.NAME) .setRebalanceTimeoutMs(5000) .setClassicMemberMetadata(null) @@ -10730,9 +10727,10 @@ public class GroupMetadataManagerTest { // Create the new consumer group with the static member. GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedClassicMember), - GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 0, 0), + GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1))), + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, group.generationId(), 0), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, expectedClassicMember.assignedPartitions()), - GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 0), + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, group.generationId()), GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedClassicMember), // Remove the static member because the rejoining member replaces it. @@ -10745,17 +10743,10 @@ public class GroupMetadataManagerTest { GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, newMemberId, mkAssignment(mkTopicAssignment(fooTopicId, 0))), GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedReplacingConsumerMember), - // The static member rejoins the new consumer group. + // The static member rejoins the new consumer group with the same instance id and + // takes the assignment of the previous member. No new target assignment is computed. GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedFinalConsumerMember), - // The subscription metadata hasn't been updated during the conversion, so a new one is computed. - GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1))), - - // Newly joining static member bumps the group epoch. A new target assignment is computed. - GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, 0), - GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, newMemberId, mkAssignment(mkTopicAssignment(fooTopicId, 0))), - GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 1), - // The newly created static member takes the assignment from the existing member. // Bump its member epoch and transition to STABLE. GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedFinalConsumerMember) @@ -10856,6 +10847,10 @@ public class GroupMetadataManagerTest { // Create the new consumer group with member 1. GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1), + GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of( + fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1), + barTopicName, new TopicMetadata(barTopicId, barTopicName, 1) + )), GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, 0), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, expectedMember1.assignedPartitions()), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 1), @@ -10864,12 +10859,6 @@ public class GroupMetadataManagerTest { // Member 2 joins the new consumer group. GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2), - // The subscription metadata hasn't been updated during the conversion, so a new one is computed. - GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of( - fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1), - barTopicName, new TopicMetadata(barTopicId, barTopicName, 1) - )), - // Newly joining member 2 bumps the group epoch. A new target assignment is computed. GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2, 0), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, Map.of()), @@ -11241,6 +11230,11 @@ public class GroupMetadataManagerTest { GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1), GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2), + GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of( + fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2), + barTopicName, new TopicMetadata(barTopicId, barTopicName, 1) + )), + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, 0), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, expectedMember1.assignedPartitions()), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, expectedMember2.assignedPartitions()), @@ -11253,12 +11247,6 @@ public class GroupMetadataManagerTest { // Member 3 joins the new consumer group. GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember3), - // The subscription metadata hasn't been updated during the conversion, so a new one is computed. - GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of( - fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2), - barTopicName, new TopicMetadata(barTopicId, barTopicName, 1) - )), - // Newly joining member 3 bumps the group epoch. A new target assignment is computed. GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2, 0), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, assignor.targetPartitions(memberId1)), diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java index a6b91e5a83b..ccbed0cf9fd 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java @@ -1532,7 +1532,8 @@ public class ConsumerGroupTest { new SnapshotRegistry(logContext), mock(GroupCoordinatorMetricsShard.class), classicGroup, - metadataImage.topics() + metadataImage.topics(), + metadataImage.cluster() ); ConsumerGroup expectedConsumerGroup = new ConsumerGroup( @@ -1545,6 +1546,10 @@ public class ConsumerGroupTest { expectedConsumerGroup.updateTargetAssignment(memberId, new Assignment(mkAssignment( mkTopicAssignment(fooTopicId, 0) ))); + expectedConsumerGroup.setSubscriptionMetadata(Map.of( + fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1), + barTopicName, new TopicMetadata(barTopicId, barTopicName, 1) + )); expectedConsumerGroup.updateMember(new ConsumerGroupMember.Builder(memberId) .setMemberEpoch(classicGroup.generationId()) .setState(MemberState.STABLE) @@ -1576,6 +1581,7 @@ public class ConsumerGroupTest { assertEquals(expectedConsumerGroup.groupEpoch(), consumerGroup.groupEpoch()); assertEquals(expectedConsumerGroup.state(), consumerGroup.state()); assertEquals(expectedConsumerGroup.preferredServerAssignor(), consumerGroup.preferredServerAssignor()); + assertEquals(Map.copyOf(expectedConsumerGroup.subscriptionMetadata()), Map.copyOf(consumerGroup.subscriptionMetadata())); assertEquals(expectedConsumerGroup.members(), consumerGroup.members()); }