dajac commented on code in PR #15662:
URL: https://github.com/apache/kafka/pull/15662#discussion_r1559052702


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -761,6 +776,58 @@ public ClassicGroup classicGroup(
         }
     }
 
+    /**
+     * Validates the online upgrade if the Classic Group receives a 
ConsumerGroupHeartbeat request.
+     *
+     * @param classicGroup A ClassicGroup.
+     * @return the boolean indicating whether it's valid to online upgrade the 
classic group.
+     */
+    private boolean validateOnlineUpgrade(ClassicGroup classicGroup) {
+        if (!consumerGroupMigrationPolicy.isUpgradeEnabled()) {
+            log.debug("Online upgrade is invalid because the consumer group {} 
migration config is {} so online upgrade is not enabled.",
+                classicGroup.groupId(), consumerGroupMigrationPolicy);
+            return false;
+        } else if (classicGroup.isInState(DEAD)) {

Review Comment:
   Could this really happen? I would have thought that it would be 
automatically converted as Dead or equivalent to Empty.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -411,6 +432,20 @@ public int numMembers() {
         return members.size();
     }
 
+    /**
+     * @return The number of members that use the legacy protocol.
+     */
+    public int numLegacyProtocolMember() {
+        return (int) members.values().stream().filter(member -> 
member.useLegacyProtocol()).count();

Review Comment:
   It may be better to maintain this count in the group state instead of having 
to go through all members. Is it possible?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -761,6 +776,58 @@ public ClassicGroup classicGroup(
         }
     }
 
+    /**
+     * Validates the online upgrade if the Classic Group receives a 
ConsumerGroupHeartbeat request.
+     *
+     * @param classicGroup A ClassicGroup.
+     * @return the boolean indicating whether it's valid to online upgrade the 
classic group.
+     */
+    private boolean validateOnlineUpgrade(ClassicGroup classicGroup) {
+        if (!consumerGroupMigrationPolicy.isUpgradeEnabled()) {
+            log.debug("Online upgrade is invalid because the consumer group {} 
migration config is {} so online upgrade is not enabled.",
+                classicGroup.groupId(), consumerGroupMigrationPolicy);
+            return false;
+        } else if (classicGroup.isInState(DEAD)) {
+            log.debug("Online upgrade is invalid because the classic group {} 
is in DEAD state.", classicGroup.groupId());
+            return false;
+        } else if (!classicGroup.usesConsumerGroupProtocol()) {
+            log.debug("Online upgrade is invalid because the classic group {} 
has protocol type {} and doesn't use the consumer group protocol.",

Review Comment:
   nit: `Cannot upgrade classic group {} to consumer group because the group 
does not use the consumer embedded protocol.`



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -761,6 +776,58 @@ public ClassicGroup classicGroup(
         }
     }
 
+    /**
+     * Validates the online upgrade if the Classic Group receives a 
ConsumerGroupHeartbeat request.
+     *
+     * @param classicGroup A ClassicGroup.
+     * @return the boolean indicating whether it's valid to online upgrade the 
classic group.
+     */
+    private boolean validateOnlineUpgrade(ClassicGroup classicGroup) {
+        if (!consumerGroupMigrationPolicy.isUpgradeEnabled()) {
+            log.debug("Online upgrade is invalid because the consumer group {} 
migration config is {} so online upgrade is not enabled.",
+                classicGroup.groupId(), consumerGroupMigrationPolicy);
+            return false;
+        } else if (classicGroup.isInState(DEAD)) {
+            log.debug("Online upgrade is invalid because the classic group {} 
is in DEAD state.", classicGroup.groupId());
+            return false;
+        } else if (!classicGroup.usesConsumerGroupProtocol()) {
+            log.debug("Online upgrade is invalid because the classic group {} 
has protocol type {} and doesn't use the consumer group protocol.",
+                classicGroup.groupId(), 
classicGroup.protocolType().orElse(""));
+            return false;
+        } else if (classicGroup.size() > consumerGroupMaxSize) {
+            log.debug("Online upgrade is invalid because the classic group {} 
size {} exceeds the consumer group maximum size {}.",

Review Comment:
   nit: Same idea.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -761,6 +776,58 @@ public ClassicGroup classicGroup(
         }
     }
 
+    /**
+     * Validates the online upgrade if the Classic Group receives a 
ConsumerGroupHeartbeat request.
+     *
+     * @param classicGroup A ClassicGroup.
+     * @return the boolean indicating whether it's valid to online upgrade the 
classic group.
+     */
+    private boolean validateOnlineUpgrade(ClassicGroup classicGroup) {
+        if (!consumerGroupMigrationPolicy.isUpgradeEnabled()) {
+            log.debug("Online upgrade is invalid because the consumer group {} 
migration config is {} so online upgrade is not enabled.",

Review Comment:
   nit: `Cannot upgrade classic group {} to consumer group because the online 
upgrade is disabled.`



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -761,6 +776,58 @@ public ClassicGroup classicGroup(
         }
     }
 
+    /**
+     * Validates the online upgrade if the Classic Group receives a 
ConsumerGroupHeartbeat request.
+     *
+     * @param classicGroup A ClassicGroup.
+     * @return the boolean indicating whether it's valid to online upgrade the 
classic group.
+     */
+    private boolean validateOnlineUpgrade(ClassicGroup classicGroup) {
+        if (!consumerGroupMigrationPolicy.isUpgradeEnabled()) {
+            log.debug("Online upgrade is invalid because the consumer group {} 
migration config is {} so online upgrade is not enabled.",
+                classicGroup.groupId(), consumerGroupMigrationPolicy);
+            return false;
+        } else if (classicGroup.isInState(DEAD)) {
+            log.debug("Online upgrade is invalid because the classic group {} 
is in DEAD state.", classicGroup.groupId());
+            return false;
+        } else if (!classicGroup.usesConsumerGroupProtocol()) {
+            log.debug("Online upgrade is invalid because the classic group {} 
has protocol type {} and doesn't use the consumer group protocol.",
+                classicGroup.groupId(), 
classicGroup.protocolType().orElse(""));
+            return false;
+        } else if (classicGroup.size() > consumerGroupMaxSize) {
+            log.debug("Online upgrade is invalid because the classic group {} 
size {} exceeds the consumer group maximum size {}.",
+                classicGroup.groupId(), classicGroup.size(), 
consumerGroupMaxSize);
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * Creates a ConsumerGroup corresponding to the given classic group.
+     *
+     * @param classicGroup  The ClassicGroup to convert.
+     * @param records       The list of Records.
+     * @return  The created ConsumerGroup.
+     */
+    ConsumerGroup convertToConsumerGroup(ClassicGroup classicGroup, 
List<Record> records) {
+        // The upgrade is always triggered by a new member joining the classic 
group, which always results in
+        // updatedMember.subscribedTopicNames changing, the group epoch being 
bumped, and triggering a new rebalance.
+        // If the ClassicGroup is rebalancing, inform the awaiting consumers 
of another ongoing rebalance
+        // so that they will rejoin for the new rebalance.
+        classicGroup.completeAllJoinFutures(Errors.REBALANCE_IN_PROGRESS);
+        classicGroup.completeAllSyncFutures(Errors.REBALANCE_IN_PROGRESS);
+
+        classicGroup.createGroupTombstoneRecords(records);
+        ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
logContext, classicGroup.groupId(), metrics);
+        consumerGroup.fromClassicGroup(classicGroup, records, 
metadataImage.topics());

Review Comment:
   nit: I may be better to have a static method - 
`ConsumerGroup.fromClassicGroup(....)` - which creates the `ConsumerGroup` and 
populates it from the classic group.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -998,4 +1055,93 @@ public ConsumerGroupDescribeResponseData.DescribedGroup 
asDescribedGroup(
         );
         return describedGroup;
     }
+
+
+    /**
+     * Set the attributes of the consumer group according to a classic group.
+     * Add the records for creating and updating the consumer group.
+     *
+     * @param classicGroup      The converted classic group.
+     * @param records           The list to which the new records are added.
+     */
+    public void fromClassicGroup(
+        ClassicGroup classicGroup,
+        List<Record> records,
+        TopicsImage topicsImage
+    ) {
+        setGroupEpoch(classicGroup.generationId());
+        records.add(RecordHelpers.newGroupEpochRecord(groupId(), 
classicGroup.generationId()));
+        // SubscriptionMetadata will be computed in the following 
consumerGroupHeartbeat.
+        
records.add(RecordHelpers.newGroupSubscriptionMetadataRecord(groupId(), 
Collections.emptyMap()));
+
+        setTargetAssignmentEpoch(classicGroup.generationId());
+        records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(), 
classicGroup.generationId()));
+
+        classicGroup.allMembers().forEach(member -> {
+            try {
+                ConsumerPartitionAssignor.Assignment assignment = 
ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment()));
+                Map<Uuid, Set<Integer>> partitions = 
topicPartitionMapFromList(assignment.partitions(), topicsImage);
+                ConsumerPartitionAssignor.Subscription subscription = 
ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(member.metadata(classicGroup.protocolName().get())));
+
+                ConsumerGroupMember newMember = new 
ConsumerGroupMember.Builder(member.memberId())
+                    .setMemberEpoch(classicGroup.generationId())
+                    .setPreviousMemberEpoch(classicGroup.generationId())
+                    .setInstanceId(member.groupInstanceId().orElse(null))
+                    .setRackId(subscription.rackId().orElse(null))
+                    .setRebalanceTimeoutMs(member.rebalanceTimeoutMs())
+                    .setClientId(member.clientId())
+                    .setClientHost(member.clientHost())
+                    .setSubscribedTopicNames(subscription.topics())
+                    .setAssignedPartitions(partitions)
+                    .setSupportedProtocols(member.supportedProtocols())
+                    .build();
+                updateMember(newMember);
+
+                
records.add(RecordHelpers.newMemberSubscriptionRecord(groupId(), newMember));
+                
records.add(RecordHelpers.newCurrentAssignmentRecord(groupId(), newMember));
+                records.add(RecordHelpers.newTargetAssignmentRecord(groupId(), 
member.memberId(), partitions));
+            } catch (SchemaException e) {
+                log.warn("Failed to parse Consumer Protocol " + 
ConsumerProtocol.PROTOCOL_TYPE + ":" +
+                    classicGroup.protocolName().get() + " of group " + groupId 
+ ".", e);
+            }
+        });
+    }
+
+    /**
+     * Converts the list of TopicPartition to a map of topic id and partition 
set.
+     */
+    private static Map<Uuid, Set<Integer>> topicPartitionMapFromList(
+        List<TopicPartition> partitions,
+        TopicsImage topicsImage
+    ) {
+        Map<Uuid, Set<Integer>> topicPartitionMap = new HashMap<>();
+        partitions.forEach(topicPartition -> {
+            TopicImage topicImage = 
topicsImage.getTopic(topicPartition.topic());
+            if (topicImage != null) {
+                topicPartitionMap.computeIfAbsent(topicImage.id(), __ -> new 
HashSet<>())
+                    .add(topicPartition.partition());
+            }
+        });
+        return topicPartitionMap;
+    }
+
+    /**
+     * Checks whether at least one of the given protocols can be supported. A
+     * protocol can be supported if it is supported by all members that use the
+     * legacy protocol.
+     *
+     * @param memberProtocolType  the member protocol type.
+     * @param memberProtocols     the set of protocol names.
+     *
+     * @return a boolean based on the condition mentioned above.
+     */
+    public boolean supportsProtocols(String memberProtocolType, Set<String> 
memberProtocols) {

Review Comment:
   nit: The name is a bit confusing. Would `supportsClassicProtocols` be better?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -998,4 +1055,93 @@ public ConsumerGroupDescribeResponseData.DescribedGroup 
asDescribedGroup(
         );
         return describedGroup;
     }
+
+
+    /**
+     * Set the attributes of the consumer group according to a classic group.
+     * Add the records for creating and updating the consumer group.
+     *
+     * @param classicGroup      The converted classic group.
+     * @param records           The list to which the new records are added.
+     */
+    public void fromClassicGroup(
+        ClassicGroup classicGroup,
+        List<Record> records,
+        TopicsImage topicsImage
+    ) {
+        setGroupEpoch(classicGroup.generationId());
+        records.add(RecordHelpers.newGroupEpochRecord(groupId(), 
classicGroup.generationId()));
+        // SubscriptionMetadata will be computed in the following 
consumerGroupHeartbeat.
+        
records.add(RecordHelpers.newGroupSubscriptionMetadataRecord(groupId(), 
Collections.emptyMap()));
+
+        setTargetAssignmentEpoch(classicGroup.generationId());
+        records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(), 
classicGroup.generationId()));
+
+        classicGroup.allMembers().forEach(member -> {
+            try {
+                ConsumerPartitionAssignor.Assignment assignment = 
ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment()));
+                Map<Uuid, Set<Integer>> partitions = 
topicPartitionMapFromList(assignment.partitions(), topicsImage);
+                ConsumerPartitionAssignor.Subscription subscription = 
ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(member.metadata(classicGroup.protocolName().get())));
+
+                ConsumerGroupMember newMember = new 
ConsumerGroupMember.Builder(member.memberId())
+                    .setMemberEpoch(classicGroup.generationId())
+                    .setPreviousMemberEpoch(classicGroup.generationId())
+                    .setInstanceId(member.groupInstanceId().orElse(null))
+                    .setRackId(subscription.rackId().orElse(null))
+                    .setRebalanceTimeoutMs(member.rebalanceTimeoutMs())
+                    .setClientId(member.clientId())
+                    .setClientHost(member.clientHost())
+                    .setSubscribedTopicNames(subscription.topics())
+                    .setAssignedPartitions(partitions)
+                    .setSupportedProtocols(member.supportedProtocols())
+                    .build();
+                updateMember(newMember);
+
+                
records.add(RecordHelpers.newMemberSubscriptionRecord(groupId(), newMember));
+                
records.add(RecordHelpers.newCurrentAssignmentRecord(groupId(), newMember));
+                records.add(RecordHelpers.newTargetAssignmentRecord(groupId(), 
member.memberId(), partitions));
+            } catch (SchemaException e) {
+                log.warn("Failed to parse Consumer Protocol " + 
ConsumerProtocol.PROTOCOL_TYPE + ":" +

Review Comment:
   nit: `Cannot upgrade the classic group to a consumer group because parsing 
the Consumer Protocol fails....`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -998,4 +1055,93 @@ public ConsumerGroupDescribeResponseData.DescribedGroup 
asDescribedGroup(
         );
         return describedGroup;
     }
+
+
+    /**
+     * Set the attributes of the consumer group according to a classic group.
+     * Add the records for creating and updating the consumer group.
+     *
+     * @param classicGroup      The converted classic group.
+     * @param records           The list to which the new records are added.
+     */
+    public void fromClassicGroup(
+        ClassicGroup classicGroup,
+        List<Record> records,
+        TopicsImage topicsImage
+    ) {
+        setGroupEpoch(classicGroup.generationId());
+        records.add(RecordHelpers.newGroupEpochRecord(groupId(), 
classicGroup.generationId()));
+        // SubscriptionMetadata will be computed in the following 
consumerGroupHeartbeat.
+        
records.add(RecordHelpers.newGroupSubscriptionMetadataRecord(groupId(), 
Collections.emptyMap()));
+
+        setTargetAssignmentEpoch(classicGroup.generationId());
+        records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(), 
classicGroup.generationId()));
+
+        classicGroup.allMembers().forEach(member -> {
+            try {
+                ConsumerPartitionAssignor.Assignment assignment = 
ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment()));
+                Map<Uuid, Set<Integer>> partitions = 
topicPartitionMapFromList(assignment.partitions(), topicsImage);
+                ConsumerPartitionAssignor.Subscription subscription = 
ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(member.metadata(classicGroup.protocolName().get())));
+
+                ConsumerGroupMember newMember = new 
ConsumerGroupMember.Builder(member.memberId())
+                    .setMemberEpoch(classicGroup.generationId())
+                    .setPreviousMemberEpoch(classicGroup.generationId())
+                    .setInstanceId(member.groupInstanceId().orElse(null))
+                    .setRackId(subscription.rackId().orElse(null))
+                    .setRebalanceTimeoutMs(member.rebalanceTimeoutMs())
+                    .setClientId(member.clientId())
+                    .setClientHost(member.clientHost())
+                    .setSubscribedTopicNames(subscription.topics())
+                    .setAssignedPartitions(partitions)
+                    .setSupportedProtocols(member.supportedProtocols())
+                    .build();
+                updateMember(newMember);
+
+                
records.add(RecordHelpers.newMemberSubscriptionRecord(groupId(), newMember));
+                
records.add(RecordHelpers.newCurrentAssignmentRecord(groupId(), newMember));
+                records.add(RecordHelpers.newTargetAssignmentRecord(groupId(), 
member.memberId(), partitions));
+            } catch (SchemaException e) {
+                log.warn("Failed to parse Consumer Protocol " + 
ConsumerProtocol.PROTOCOL_TYPE + ":" +
+                    classicGroup.protocolName().get() + " of group " + groupId 
+ ".", e);
+            }
+        });
+    }
+
+    /**
+     * Converts the list of TopicPartition to a map of topic id and partition 
set.
+     */
+    private static Map<Uuid, Set<Integer>> topicPartitionMapFromList(
+        List<TopicPartition> partitions,
+        TopicsImage topicsImage
+    ) {
+        Map<Uuid, Set<Integer>> topicPartitionMap = new HashMap<>();
+        partitions.forEach(topicPartition -> {
+            TopicImage topicImage = 
topicsImage.getTopic(topicPartition.topic());
+            if (topicImage != null) {
+                topicPartitionMap.computeIfAbsent(topicImage.id(), __ -> new 
HashSet<>())

Review Comment:
   nit: Let's add `.computeIfAbsent` on a new line too.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -761,6 +777,31 @@ public ClassicGroup classicGroup(
         }
     }
 
+    public boolean validateOnlineUpgrade(ClassicGroup classicGroup) {
+        return 
ConsumerGroupMigrationPolicy.isUpgradeEnabled(consumerGroupMigrationPolicy) &&
+            !classicGroup.isInState(DEAD) &&
+            
ConsumerProtocol.PROTOCOL_TYPE.equals(classicGroup.protocolType().orElse(null)) 
&&
+            classicGroup.size() <= consumerGroupMaxSize;
+    }
+
+    ConsumerGroup convertToConsumerGroup(ClassicGroup classicGroup, 
List<Record> records) {
+        classicGroup.completeAllJoinFutures(Errors.REBALANCE_IN_PROGRESS);
+        classicGroup.completeAllSyncFutures(Errors.REBALANCE_IN_PROGRESS);
+
+        createGroupTombstoneRecords(classicGroup, records);
+        ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
classicGroup.groupId(), metrics);
+        classicGroup.convertToConsumerGroup(consumerGroup, records, 
metadataImage.topics());
+
+        consumerGroup.members().forEach((memberId, __) ->
+            scheduleConsumerGroupSessionTimeout(consumerGroup.groupId(), 
memberId)

Review Comment:
   Understood. Let's add a comment about it too.



##########
group-coordinator/src/main/resources/common/message/ConsumerGroupMemberMetadataValue.json:
##########
@@ -35,6 +35,13 @@
     { "name": "RebalanceTimeoutMs", "type": "int32", "versions": "0+", 
"default": -1,
       "about": "The rebalance timeout" },
     { "name": "ServerAssignor", "versions": "0+", "nullableVersions": "0+", 
"type": "string",
-      "about": "The server assignor to use; or null if not used." }
+      "about": "The server assignor to use; or null if not used." },
+    { "name": "SupportedProtocols", "type": 
"[]ClassicJoinGroupRequestProtocol", "versions": "0+",
+      "about": "The list of protocols that the member supports if the consumer 
uses the legacy protocol.", "fields": [
+      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
+        "about": "The protocol name." },
+      { "name": "Metadata", "type": "bytes", "versions": "0+",
+        "about": "The protocol metadata." }
+    ]}

Review Comment:
   I think that it may be better to introduce a struct field 
`ClassicMemberMetadata` which then contain the `SupportedProtocols` field. I 
would also use a nullable tagged field for `ClassicMemberMetadata` in order to 
make it optional. What do you think?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java:
##########
@@ -311,6 +337,11 @@ public ConsumerGroupMember build() {
      */
     private final Map<Uuid, Set<Integer>> partitionsPendingRevocation;
 
+    /**
+     * The list of supported protocols if the consumer uses the legacy 
protocol.
+     */
+    private final 
ConsumerGroupMemberMetadataValue.ClassicJoinGroupRequestProtocolCollection 
supportedProtocols;

Review Comment:
   nit: `supportedClassicProtocols`



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -998,4 +1055,93 @@ public ConsumerGroupDescribeResponseData.DescribedGroup 
asDescribedGroup(
         );
         return describedGroup;
     }
+
+
+    /**
+     * Set the attributes of the consumer group according to a classic group.
+     * Add the records for creating and updating the consumer group.
+     *
+     * @param classicGroup      The converted classic group.
+     * @param records           The list to which the new records are added.
+     */
+    public void fromClassicGroup(

Review Comment:
   The order of the records are still incorrect. I let you go back to my 
previous comment about it. We can also discuss it offline if needed.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -180,12 +192,19 @@ public static class DeadlineAndEpoch {
      */
     private DeadlineAndEpoch metadataRefreshDeadline = DeadlineAndEpoch.EMPTY;
 
+    /**
+     * Map of protocol names to the number of members that use legacy protocol 
and support them.
+     */
+    private final TimelineHashMap<String, Integer> 
legacyProtocolMembersSupportedProtocols;

Review Comment:
   I am not sure to fully understand why we need to maintain this. Could you 
elaborate a bit more?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java:
##########
@@ -504,6 +544,10 @@ private static String lookupTopicNameById(
         }
     }
 
+    public boolean useLegacyProtocol() {

Review Comment:
   nit: useClassicProtocol?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -998,4 +1055,93 @@ public ConsumerGroupDescribeResponseData.DescribedGroup 
asDescribedGroup(
         );
         return describedGroup;
     }
+
+
+    /**
+     * Set the attributes of the consumer group according to a classic group.
+     * Add the records for creating and updating the consumer group.
+     *
+     * @param classicGroup      The converted classic group.
+     * @param records           The list to which the new records are added.
+     */
+    public void fromClassicGroup(
+        ClassicGroup classicGroup,
+        List<Record> records,
+        TopicsImage topicsImage
+    ) {
+        setGroupEpoch(classicGroup.generationId());
+        records.add(RecordHelpers.newGroupEpochRecord(groupId(), 
classicGroup.generationId()));
+        // SubscriptionMetadata will be computed in the following 
consumerGroupHeartbeat.
+        
records.add(RecordHelpers.newGroupSubscriptionMetadataRecord(groupId(), 
Collections.emptyMap()));
+
+        setTargetAssignmentEpoch(classicGroup.generationId());
+        records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(), 
classicGroup.generationId()));
+
+        classicGroup.allMembers().forEach(member -> {

Review Comment:
   nit: `classicMember`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -761,6 +776,58 @@ public ClassicGroup classicGroup(
         }
     }
 
+    /**
+     * Validates the online upgrade if the Classic Group receives a 
ConsumerGroupHeartbeat request.
+     *
+     * @param classicGroup A ClassicGroup.
+     * @return the boolean indicating whether it's valid to online upgrade the 
classic group.
+     */
+    private boolean validateOnlineUpgrade(ClassicGroup classicGroup) {
+        if (!consumerGroupMigrationPolicy.isUpgradeEnabled()) {
+            log.debug("Online upgrade is invalid because the consumer group {} 
migration config is {} so online upgrade is not enabled.",

Review Comment:
   I also wonder if we should log those as `info`. This seems quite useful in 
production.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -998,4 +1055,93 @@ public ConsumerGroupDescribeResponseData.DescribedGroup 
asDescribedGroup(
         );
         return describedGroup;
     }
+
+
+    /**
+     * Set the attributes of the consumer group according to a classic group.
+     * Add the records for creating and updating the consumer group.
+     *
+     * @param classicGroup      The converted classic group.
+     * @param records           The list to which the new records are added.
+     */
+    public void fromClassicGroup(
+        ClassicGroup classicGroup,
+        List<Record> records,
+        TopicsImage topicsImage
+    ) {
+        setGroupEpoch(classicGroup.generationId());
+        records.add(RecordHelpers.newGroupEpochRecord(groupId(), 
classicGroup.generationId()));
+        // SubscriptionMetadata will be computed in the following 
consumerGroupHeartbeat.
+        
records.add(RecordHelpers.newGroupSubscriptionMetadataRecord(groupId(), 
Collections.emptyMap()));
+
+        setTargetAssignmentEpoch(classicGroup.generationId());
+        records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(), 
classicGroup.generationId()));
+
+        classicGroup.allMembers().forEach(member -> {
+            try {
+                ConsumerPartitionAssignor.Assignment assignment = 
ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment()));
+                Map<Uuid, Set<Integer>> partitions = 
topicPartitionMapFromList(assignment.partitions(), topicsImage);
+                ConsumerPartitionAssignor.Subscription subscription = 
ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(member.metadata(classicGroup.protocolName().get())));
+
+                ConsumerGroupMember newMember = new 
ConsumerGroupMember.Builder(member.memberId())
+                    .setMemberEpoch(classicGroup.generationId())
+                    .setPreviousMemberEpoch(classicGroup.generationId())
+                    .setInstanceId(member.groupInstanceId().orElse(null))
+                    .setRackId(subscription.rackId().orElse(null))
+                    .setRebalanceTimeoutMs(member.rebalanceTimeoutMs())
+                    .setClientId(member.clientId())
+                    .setClientHost(member.clientHost())
+                    .setSubscribedTopicNames(subscription.topics())
+                    .setAssignedPartitions(partitions)
+                    .setSupportedProtocols(member.supportedProtocols())
+                    .build();
+                updateMember(newMember);
+
+                
records.add(RecordHelpers.newMemberSubscriptionRecord(groupId(), newMember));
+                
records.add(RecordHelpers.newCurrentAssignmentRecord(groupId(), newMember));
+                records.add(RecordHelpers.newTargetAssignmentRecord(groupId(), 
member.memberId(), partitions));
+            } catch (SchemaException e) {
+                log.warn("Failed to parse Consumer Protocol " + 
ConsumerProtocol.PROTOCOL_TYPE + ":" +
+                    classicGroup.protocolName().get() + " of group " + groupId 
+ ".", e);

Review Comment:
   Shouldn't we throw an exception to fail the request too? It could be a 
`GroupIdNotFoundException` with an appropriate error message.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to