dajac commented on code in PR #15662: URL: https://github.com/apache/kafka/pull/15662#discussion_r1559052702
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -761,6 +776,58 @@ public ClassicGroup classicGroup( } } + /** + * Validates the online upgrade if the Classic Group receives a ConsumerGroupHeartbeat request. + * + * @param classicGroup A ClassicGroup. + * @return the boolean indicating whether it's valid to online upgrade the classic group. + */ + private boolean validateOnlineUpgrade(ClassicGroup classicGroup) { + if (!consumerGroupMigrationPolicy.isUpgradeEnabled()) { + log.debug("Online upgrade is invalid because the consumer group {} migration config is {} so online upgrade is not enabled.", + classicGroup.groupId(), consumerGroupMigrationPolicy); + return false; + } else if (classicGroup.isInState(DEAD)) { Review Comment: Could this really happen? I would have thought that it would be automatically converted as Dead or equivalent to Empty. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ########## @@ -411,6 +432,20 @@ public int numMembers() { return members.size(); } + /** + * @return The number of members that use the legacy protocol. + */ + public int numLegacyProtocolMember() { + return (int) members.values().stream().filter(member -> member.useLegacyProtocol()).count(); Review Comment: It may be better to maintain this count in the group state instead of having to go through all members. Is it possible? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -761,6 +776,58 @@ public ClassicGroup classicGroup( } } + /** + * Validates the online upgrade if the Classic Group receives a ConsumerGroupHeartbeat request. + * + * @param classicGroup A ClassicGroup. + * @return the boolean indicating whether it's valid to online upgrade the classic group. + */ + private boolean validateOnlineUpgrade(ClassicGroup classicGroup) { + if (!consumerGroupMigrationPolicy.isUpgradeEnabled()) { + log.debug("Online upgrade is invalid because the consumer group {} migration config is {} so online upgrade is not enabled.", + classicGroup.groupId(), consumerGroupMigrationPolicy); + return false; + } else if (classicGroup.isInState(DEAD)) { + log.debug("Online upgrade is invalid because the classic group {} is in DEAD state.", classicGroup.groupId()); + return false; + } else if (!classicGroup.usesConsumerGroupProtocol()) { + log.debug("Online upgrade is invalid because the classic group {} has protocol type {} and doesn't use the consumer group protocol.", Review Comment: nit: `Cannot upgrade classic group {} to consumer group because the group does not use the consumer embedded protocol.` ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -761,6 +776,58 @@ public ClassicGroup classicGroup( } } + /** + * Validates the online upgrade if the Classic Group receives a ConsumerGroupHeartbeat request. + * + * @param classicGroup A ClassicGroup. + * @return the boolean indicating whether it's valid to online upgrade the classic group. + */ + private boolean validateOnlineUpgrade(ClassicGroup classicGroup) { + if (!consumerGroupMigrationPolicy.isUpgradeEnabled()) { + log.debug("Online upgrade is invalid because the consumer group {} migration config is {} so online upgrade is not enabled.", + classicGroup.groupId(), consumerGroupMigrationPolicy); + return false; + } else if (classicGroup.isInState(DEAD)) { + log.debug("Online upgrade is invalid because the classic group {} is in DEAD state.", classicGroup.groupId()); + return false; + } else if (!classicGroup.usesConsumerGroupProtocol()) { + log.debug("Online upgrade is invalid because the classic group {} has protocol type {} and doesn't use the consumer group protocol.", + classicGroup.groupId(), classicGroup.protocolType().orElse("")); + return false; + } else if (classicGroup.size() > consumerGroupMaxSize) { + log.debug("Online upgrade is invalid because the classic group {} size {} exceeds the consumer group maximum size {}.", Review Comment: nit: Same idea. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -761,6 +776,58 @@ public ClassicGroup classicGroup( } } + /** + * Validates the online upgrade if the Classic Group receives a ConsumerGroupHeartbeat request. + * + * @param classicGroup A ClassicGroup. + * @return the boolean indicating whether it's valid to online upgrade the classic group. + */ + private boolean validateOnlineUpgrade(ClassicGroup classicGroup) { + if (!consumerGroupMigrationPolicy.isUpgradeEnabled()) { + log.debug("Online upgrade is invalid because the consumer group {} migration config is {} so online upgrade is not enabled.", Review Comment: nit: `Cannot upgrade classic group {} to consumer group because the online upgrade is disabled.` ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -761,6 +776,58 @@ public ClassicGroup classicGroup( } } + /** + * Validates the online upgrade if the Classic Group receives a ConsumerGroupHeartbeat request. + * + * @param classicGroup A ClassicGroup. + * @return the boolean indicating whether it's valid to online upgrade the classic group. + */ + private boolean validateOnlineUpgrade(ClassicGroup classicGroup) { + if (!consumerGroupMigrationPolicy.isUpgradeEnabled()) { + log.debug("Online upgrade is invalid because the consumer group {} migration config is {} so online upgrade is not enabled.", + classicGroup.groupId(), consumerGroupMigrationPolicy); + return false; + } else if (classicGroup.isInState(DEAD)) { + log.debug("Online upgrade is invalid because the classic group {} is in DEAD state.", classicGroup.groupId()); + return false; + } else if (!classicGroup.usesConsumerGroupProtocol()) { + log.debug("Online upgrade is invalid because the classic group {} has protocol type {} and doesn't use the consumer group protocol.", + classicGroup.groupId(), classicGroup.protocolType().orElse("")); + return false; + } else if (classicGroup.size() > consumerGroupMaxSize) { + log.debug("Online upgrade is invalid because the classic group {} size {} exceeds the consumer group maximum size {}.", + classicGroup.groupId(), classicGroup.size(), consumerGroupMaxSize); + return false; + } + return true; + } + + /** + * Creates a ConsumerGroup corresponding to the given classic group. + * + * @param classicGroup The ClassicGroup to convert. + * @param records The list of Records. + * @return The created ConsumerGroup. + */ + ConsumerGroup convertToConsumerGroup(ClassicGroup classicGroup, List<Record> records) { + // The upgrade is always triggered by a new member joining the classic group, which always results in + // updatedMember.subscribedTopicNames changing, the group epoch being bumped, and triggering a new rebalance. + // If the ClassicGroup is rebalancing, inform the awaiting consumers of another ongoing rebalance + // so that they will rejoin for the new rebalance. + classicGroup.completeAllJoinFutures(Errors.REBALANCE_IN_PROGRESS); + classicGroup.completeAllSyncFutures(Errors.REBALANCE_IN_PROGRESS); + + classicGroup.createGroupTombstoneRecords(records); + ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, logContext, classicGroup.groupId(), metrics); + consumerGroup.fromClassicGroup(classicGroup, records, metadataImage.topics()); Review Comment: nit: I may be better to have a static method - `ConsumerGroup.fromClassicGroup(....)` - which creates the `ConsumerGroup` and populates it from the classic group. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ########## @@ -998,4 +1055,93 @@ public ConsumerGroupDescribeResponseData.DescribedGroup asDescribedGroup( ); return describedGroup; } + + + /** + * Set the attributes of the consumer group according to a classic group. + * Add the records for creating and updating the consumer group. + * + * @param classicGroup The converted classic group. + * @param records The list to which the new records are added. + */ + public void fromClassicGroup( + ClassicGroup classicGroup, + List<Record> records, + TopicsImage topicsImage + ) { + setGroupEpoch(classicGroup.generationId()); + records.add(RecordHelpers.newGroupEpochRecord(groupId(), classicGroup.generationId())); + // SubscriptionMetadata will be computed in the following consumerGroupHeartbeat. + records.add(RecordHelpers.newGroupSubscriptionMetadataRecord(groupId(), Collections.emptyMap())); + + setTargetAssignmentEpoch(classicGroup.generationId()); + records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(), classicGroup.generationId())); + + classicGroup.allMembers().forEach(member -> { + try { + ConsumerPartitionAssignor.Assignment assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment())); + Map<Uuid, Set<Integer>> partitions = topicPartitionMapFromList(assignment.partitions(), topicsImage); + ConsumerPartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(member.metadata(classicGroup.protocolName().get()))); + + ConsumerGroupMember newMember = new ConsumerGroupMember.Builder(member.memberId()) + .setMemberEpoch(classicGroup.generationId()) + .setPreviousMemberEpoch(classicGroup.generationId()) + .setInstanceId(member.groupInstanceId().orElse(null)) + .setRackId(subscription.rackId().orElse(null)) + .setRebalanceTimeoutMs(member.rebalanceTimeoutMs()) + .setClientId(member.clientId()) + .setClientHost(member.clientHost()) + .setSubscribedTopicNames(subscription.topics()) + .setAssignedPartitions(partitions) + .setSupportedProtocols(member.supportedProtocols()) + .build(); + updateMember(newMember); + + records.add(RecordHelpers.newMemberSubscriptionRecord(groupId(), newMember)); + records.add(RecordHelpers.newCurrentAssignmentRecord(groupId(), newMember)); + records.add(RecordHelpers.newTargetAssignmentRecord(groupId(), member.memberId(), partitions)); + } catch (SchemaException e) { + log.warn("Failed to parse Consumer Protocol " + ConsumerProtocol.PROTOCOL_TYPE + ":" + + classicGroup.protocolName().get() + " of group " + groupId + ".", e); + } + }); + } + + /** + * Converts the list of TopicPartition to a map of topic id and partition set. + */ + private static Map<Uuid, Set<Integer>> topicPartitionMapFromList( + List<TopicPartition> partitions, + TopicsImage topicsImage + ) { + Map<Uuid, Set<Integer>> topicPartitionMap = new HashMap<>(); + partitions.forEach(topicPartition -> { + TopicImage topicImage = topicsImage.getTopic(topicPartition.topic()); + if (topicImage != null) { + topicPartitionMap.computeIfAbsent(topicImage.id(), __ -> new HashSet<>()) + .add(topicPartition.partition()); + } + }); + return topicPartitionMap; + } + + /** + * Checks whether at least one of the given protocols can be supported. A + * protocol can be supported if it is supported by all members that use the + * legacy protocol. + * + * @param memberProtocolType the member protocol type. + * @param memberProtocols the set of protocol names. + * + * @return a boolean based on the condition mentioned above. + */ + public boolean supportsProtocols(String memberProtocolType, Set<String> memberProtocols) { Review Comment: nit: The name is a bit confusing. Would `supportsClassicProtocols` be better? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ########## @@ -998,4 +1055,93 @@ public ConsumerGroupDescribeResponseData.DescribedGroup asDescribedGroup( ); return describedGroup; } + + + /** + * Set the attributes of the consumer group according to a classic group. + * Add the records for creating and updating the consumer group. + * + * @param classicGroup The converted classic group. + * @param records The list to which the new records are added. + */ + public void fromClassicGroup( + ClassicGroup classicGroup, + List<Record> records, + TopicsImage topicsImage + ) { + setGroupEpoch(classicGroup.generationId()); + records.add(RecordHelpers.newGroupEpochRecord(groupId(), classicGroup.generationId())); + // SubscriptionMetadata will be computed in the following consumerGroupHeartbeat. + records.add(RecordHelpers.newGroupSubscriptionMetadataRecord(groupId(), Collections.emptyMap())); + + setTargetAssignmentEpoch(classicGroup.generationId()); + records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(), classicGroup.generationId())); + + classicGroup.allMembers().forEach(member -> { + try { + ConsumerPartitionAssignor.Assignment assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment())); + Map<Uuid, Set<Integer>> partitions = topicPartitionMapFromList(assignment.partitions(), topicsImage); + ConsumerPartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(member.metadata(classicGroup.protocolName().get()))); + + ConsumerGroupMember newMember = new ConsumerGroupMember.Builder(member.memberId()) + .setMemberEpoch(classicGroup.generationId()) + .setPreviousMemberEpoch(classicGroup.generationId()) + .setInstanceId(member.groupInstanceId().orElse(null)) + .setRackId(subscription.rackId().orElse(null)) + .setRebalanceTimeoutMs(member.rebalanceTimeoutMs()) + .setClientId(member.clientId()) + .setClientHost(member.clientHost()) + .setSubscribedTopicNames(subscription.topics()) + .setAssignedPartitions(partitions) + .setSupportedProtocols(member.supportedProtocols()) + .build(); + updateMember(newMember); + + records.add(RecordHelpers.newMemberSubscriptionRecord(groupId(), newMember)); + records.add(RecordHelpers.newCurrentAssignmentRecord(groupId(), newMember)); + records.add(RecordHelpers.newTargetAssignmentRecord(groupId(), member.memberId(), partitions)); + } catch (SchemaException e) { + log.warn("Failed to parse Consumer Protocol " + ConsumerProtocol.PROTOCOL_TYPE + ":" + Review Comment: nit: `Cannot upgrade the classic group to a consumer group because parsing the Consumer Protocol fails....`. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ########## @@ -998,4 +1055,93 @@ public ConsumerGroupDescribeResponseData.DescribedGroup asDescribedGroup( ); return describedGroup; } + + + /** + * Set the attributes of the consumer group according to a classic group. + * Add the records for creating and updating the consumer group. + * + * @param classicGroup The converted classic group. + * @param records The list to which the new records are added. + */ + public void fromClassicGroup( + ClassicGroup classicGroup, + List<Record> records, + TopicsImage topicsImage + ) { + setGroupEpoch(classicGroup.generationId()); + records.add(RecordHelpers.newGroupEpochRecord(groupId(), classicGroup.generationId())); + // SubscriptionMetadata will be computed in the following consumerGroupHeartbeat. + records.add(RecordHelpers.newGroupSubscriptionMetadataRecord(groupId(), Collections.emptyMap())); + + setTargetAssignmentEpoch(classicGroup.generationId()); + records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(), classicGroup.generationId())); + + classicGroup.allMembers().forEach(member -> { + try { + ConsumerPartitionAssignor.Assignment assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment())); + Map<Uuid, Set<Integer>> partitions = topicPartitionMapFromList(assignment.partitions(), topicsImage); + ConsumerPartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(member.metadata(classicGroup.protocolName().get()))); + + ConsumerGroupMember newMember = new ConsumerGroupMember.Builder(member.memberId()) + .setMemberEpoch(classicGroup.generationId()) + .setPreviousMemberEpoch(classicGroup.generationId()) + .setInstanceId(member.groupInstanceId().orElse(null)) + .setRackId(subscription.rackId().orElse(null)) + .setRebalanceTimeoutMs(member.rebalanceTimeoutMs()) + .setClientId(member.clientId()) + .setClientHost(member.clientHost()) + .setSubscribedTopicNames(subscription.topics()) + .setAssignedPartitions(partitions) + .setSupportedProtocols(member.supportedProtocols()) + .build(); + updateMember(newMember); + + records.add(RecordHelpers.newMemberSubscriptionRecord(groupId(), newMember)); + records.add(RecordHelpers.newCurrentAssignmentRecord(groupId(), newMember)); + records.add(RecordHelpers.newTargetAssignmentRecord(groupId(), member.memberId(), partitions)); + } catch (SchemaException e) { + log.warn("Failed to parse Consumer Protocol " + ConsumerProtocol.PROTOCOL_TYPE + ":" + + classicGroup.protocolName().get() + " of group " + groupId + ".", e); + } + }); + } + + /** + * Converts the list of TopicPartition to a map of topic id and partition set. + */ + private static Map<Uuid, Set<Integer>> topicPartitionMapFromList( + List<TopicPartition> partitions, + TopicsImage topicsImage + ) { + Map<Uuid, Set<Integer>> topicPartitionMap = new HashMap<>(); + partitions.forEach(topicPartition -> { + TopicImage topicImage = topicsImage.getTopic(topicPartition.topic()); + if (topicImage != null) { + topicPartitionMap.computeIfAbsent(topicImage.id(), __ -> new HashSet<>()) Review Comment: nit: Let's add `.computeIfAbsent` on a new line too. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -761,6 +777,31 @@ public ClassicGroup classicGroup( } } + public boolean validateOnlineUpgrade(ClassicGroup classicGroup) { + return ConsumerGroupMigrationPolicy.isUpgradeEnabled(consumerGroupMigrationPolicy) && + !classicGroup.isInState(DEAD) && + ConsumerProtocol.PROTOCOL_TYPE.equals(classicGroup.protocolType().orElse(null)) && + classicGroup.size() <= consumerGroupMaxSize; + } + + ConsumerGroup convertToConsumerGroup(ClassicGroup classicGroup, List<Record> records) { + classicGroup.completeAllJoinFutures(Errors.REBALANCE_IN_PROGRESS); + classicGroup.completeAllSyncFutures(Errors.REBALANCE_IN_PROGRESS); + + createGroupTombstoneRecords(classicGroup, records); + ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, classicGroup.groupId(), metrics); + classicGroup.convertToConsumerGroup(consumerGroup, records, metadataImage.topics()); + + consumerGroup.members().forEach((memberId, __) -> + scheduleConsumerGroupSessionTimeout(consumerGroup.groupId(), memberId) Review Comment: Understood. Let's add a comment about it too. ########## group-coordinator/src/main/resources/common/message/ConsumerGroupMemberMetadataValue.json: ########## @@ -35,6 +35,13 @@ { "name": "RebalanceTimeoutMs", "type": "int32", "versions": "0+", "default": -1, "about": "The rebalance timeout" }, { "name": "ServerAssignor", "versions": "0+", "nullableVersions": "0+", "type": "string", - "about": "The server assignor to use; or null if not used." } + "about": "The server assignor to use; or null if not used." }, + { "name": "SupportedProtocols", "type": "[]ClassicJoinGroupRequestProtocol", "versions": "0+", + "about": "The list of protocols that the member supports if the consumer uses the legacy protocol.", "fields": [ + { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, + "about": "The protocol name." }, + { "name": "Metadata", "type": "bytes", "versions": "0+", + "about": "The protocol metadata." } + ]} Review Comment: I think that it may be better to introduce a struct field `ClassicMemberMetadata` which then contain the `SupportedProtocols` field. I would also use a nullable tagged field for `ClassicMemberMetadata` in order to make it optional. What do you think? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java: ########## @@ -311,6 +337,11 @@ public ConsumerGroupMember build() { */ private final Map<Uuid, Set<Integer>> partitionsPendingRevocation; + /** + * The list of supported protocols if the consumer uses the legacy protocol. + */ + private final ConsumerGroupMemberMetadataValue.ClassicJoinGroupRequestProtocolCollection supportedProtocols; Review Comment: nit: `supportedClassicProtocols` ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ########## @@ -998,4 +1055,93 @@ public ConsumerGroupDescribeResponseData.DescribedGroup asDescribedGroup( ); return describedGroup; } + + + /** + * Set the attributes of the consumer group according to a classic group. + * Add the records for creating and updating the consumer group. + * + * @param classicGroup The converted classic group. + * @param records The list to which the new records are added. + */ + public void fromClassicGroup( Review Comment: The order of the records are still incorrect. I let you go back to my previous comment about it. We can also discuss it offline if needed. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ########## @@ -180,12 +192,19 @@ public static class DeadlineAndEpoch { */ private DeadlineAndEpoch metadataRefreshDeadline = DeadlineAndEpoch.EMPTY; + /** + * Map of protocol names to the number of members that use legacy protocol and support them. + */ + private final TimelineHashMap<String, Integer> legacyProtocolMembersSupportedProtocols; Review Comment: I am not sure to fully understand why we need to maintain this. Could you elaborate a bit more? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java: ########## @@ -504,6 +544,10 @@ private static String lookupTopicNameById( } } + public boolean useLegacyProtocol() { Review Comment: nit: useClassicProtocol? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ########## @@ -998,4 +1055,93 @@ public ConsumerGroupDescribeResponseData.DescribedGroup asDescribedGroup( ); return describedGroup; } + + + /** + * Set the attributes of the consumer group according to a classic group. + * Add the records for creating and updating the consumer group. + * + * @param classicGroup The converted classic group. + * @param records The list to which the new records are added. + */ + public void fromClassicGroup( + ClassicGroup classicGroup, + List<Record> records, + TopicsImage topicsImage + ) { + setGroupEpoch(classicGroup.generationId()); + records.add(RecordHelpers.newGroupEpochRecord(groupId(), classicGroup.generationId())); + // SubscriptionMetadata will be computed in the following consumerGroupHeartbeat. + records.add(RecordHelpers.newGroupSubscriptionMetadataRecord(groupId(), Collections.emptyMap())); + + setTargetAssignmentEpoch(classicGroup.generationId()); + records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(), classicGroup.generationId())); + + classicGroup.allMembers().forEach(member -> { Review Comment: nit: `classicMember`. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -761,6 +776,58 @@ public ClassicGroup classicGroup( } } + /** + * Validates the online upgrade if the Classic Group receives a ConsumerGroupHeartbeat request. + * + * @param classicGroup A ClassicGroup. + * @return the boolean indicating whether it's valid to online upgrade the classic group. + */ + private boolean validateOnlineUpgrade(ClassicGroup classicGroup) { + if (!consumerGroupMigrationPolicy.isUpgradeEnabled()) { + log.debug("Online upgrade is invalid because the consumer group {} migration config is {} so online upgrade is not enabled.", Review Comment: I also wonder if we should log those as `info`. This seems quite useful in production. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ########## @@ -998,4 +1055,93 @@ public ConsumerGroupDescribeResponseData.DescribedGroup asDescribedGroup( ); return describedGroup; } + + + /** + * Set the attributes of the consumer group according to a classic group. + * Add the records for creating and updating the consumer group. + * + * @param classicGroup The converted classic group. + * @param records The list to which the new records are added. + */ + public void fromClassicGroup( + ClassicGroup classicGroup, + List<Record> records, + TopicsImage topicsImage + ) { + setGroupEpoch(classicGroup.generationId()); + records.add(RecordHelpers.newGroupEpochRecord(groupId(), classicGroup.generationId())); + // SubscriptionMetadata will be computed in the following consumerGroupHeartbeat. + records.add(RecordHelpers.newGroupSubscriptionMetadataRecord(groupId(), Collections.emptyMap())); + + setTargetAssignmentEpoch(classicGroup.generationId()); + records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(), classicGroup.generationId())); + + classicGroup.allMembers().forEach(member -> { + try { + ConsumerPartitionAssignor.Assignment assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment())); + Map<Uuid, Set<Integer>> partitions = topicPartitionMapFromList(assignment.partitions(), topicsImage); + ConsumerPartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(member.metadata(classicGroup.protocolName().get()))); + + ConsumerGroupMember newMember = new ConsumerGroupMember.Builder(member.memberId()) + .setMemberEpoch(classicGroup.generationId()) + .setPreviousMemberEpoch(classicGroup.generationId()) + .setInstanceId(member.groupInstanceId().orElse(null)) + .setRackId(subscription.rackId().orElse(null)) + .setRebalanceTimeoutMs(member.rebalanceTimeoutMs()) + .setClientId(member.clientId()) + .setClientHost(member.clientHost()) + .setSubscribedTopicNames(subscription.topics()) + .setAssignedPartitions(partitions) + .setSupportedProtocols(member.supportedProtocols()) + .build(); + updateMember(newMember); + + records.add(RecordHelpers.newMemberSubscriptionRecord(groupId(), newMember)); + records.add(RecordHelpers.newCurrentAssignmentRecord(groupId(), newMember)); + records.add(RecordHelpers.newTargetAssignmentRecord(groupId(), member.memberId(), partitions)); + } catch (SchemaException e) { + log.warn("Failed to parse Consumer Protocol " + ConsumerProtocol.PROTOCOL_TYPE + ":" + + classicGroup.protocolName().get() + " of group " + groupId + ".", e); Review Comment: Shouldn't we throw an exception to fail the request too? It could be a `GroupIdNotFoundException` with an appropriate error message. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org