dajac commented on code in PR #15662:
URL: https://github.com/apache/kafka/pull/15662#discussion_r1566918622


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -775,6 +778,72 @@ public ClassicGroup classicGroup(
         }
     }
 
+    /**
+     * Validates the online upgrade if the Classic Group receives a 
ConsumerGroupHeartbeat request.
+     *
+     * @param classicGroup A ClassicGroup.
+     * @return the boolean indicating whether it's valid to online upgrade the 
classic group.
+     */
+    private boolean validateOnlineUpgrade(ClassicGroup classicGroup) {
+        if (!consumerGroupMigrationPolicy.isUpgradeEnabled()) {
+            log.info("Cannot upgrade classic group {} to consumer group 
because the online upgrade is disabled.",
+                classicGroup.groupId());
+            return false;
+        } else if (!classicGroup.usesConsumerGroupProtocol()) {
+            log.info("Cannot upgrade classic group {} to consumer group 
because the group does not use the consumer embedded protocol.",
+                classicGroup.groupId());
+            return false;
+        } else if (classicGroup.size() > consumerGroupMaxSize) {
+            log.info("Cannot upgrade classic group {} to consumer group 
because the group size exceeds the consumer group maximum size.",
+                classicGroup.groupId());
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * Creates a ConsumerGroup corresponding to the given classic group.
+     *
+     * @param classicGroup  The ClassicGroup to convert.
+     * @param records       The list of Records.
+     * @return  The created ConsumerGroup.
+     */
+    ConsumerGroup convertToConsumerGroup(ClassicGroup classicGroup, 
List<Record> records) {
+        // The upgrade is always triggered by a new member joining the classic 
group, which always results in
+        // updatedMember.subscribedTopicNames changing, the group epoch being 
bumped, and triggering a new rebalance.
+        // If the ClassicGroup is rebalancing, inform the awaiting consumers 
of another ongoing rebalance
+        // so that they will rejoin for the new rebalance.
+        classicGroup.completeAllJoinFutures(Errors.REBALANCE_IN_PROGRESS);
+        classicGroup.completeAllSyncFutures(Errors.REBALANCE_IN_PROGRESS);
+
+        classicGroup.createGroupTombstoneRecords(records);
+        ConsumerGroup consumerGroup;
+        try {
+            consumerGroup = ConsumerGroup.fromClassicGroup(
+                snapshotRegistry,
+                metrics,
+                classicGroup,
+                metadataImage.topics(),
+                log
+            );
+        } catch (SchemaException e) {
+            log.warn("Cannot upgrade the classic group " + 
classicGroup.groupId() + ": fail to parse " +

Review Comment:
   nit: `log.warn("Cannot upgrade the classic group " + classicGroup.groupId() 
+ " to consumer group because the embedded consumer protocol is malformed: " + 
e.getMessage() + ".", e);`



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -775,6 +778,72 @@ public ClassicGroup classicGroup(
         }
     }
 
+    /**
+     * Validates the online upgrade if the Classic Group receives a 
ConsumerGroupHeartbeat request.
+     *
+     * @param classicGroup A ClassicGroup.
+     * @return the boolean indicating whether it's valid to online upgrade the 
classic group.
+     */
+    private boolean validateOnlineUpgrade(ClassicGroup classicGroup) {
+        if (!consumerGroupMigrationPolicy.isUpgradeEnabled()) {
+            log.info("Cannot upgrade classic group {} to consumer group 
because the online upgrade is disabled.",
+                classicGroup.groupId());
+            return false;
+        } else if (!classicGroup.usesConsumerGroupProtocol()) {
+            log.info("Cannot upgrade classic group {} to consumer group 
because the group does not use the consumer embedded protocol.",
+                classicGroup.groupId());
+            return false;
+        } else if (classicGroup.size() > consumerGroupMaxSize) {
+            log.info("Cannot upgrade classic group {} to consumer group 
because the group size exceeds the consumer group maximum size.",
+                classicGroup.groupId());
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * Creates a ConsumerGroup corresponding to the given classic group.
+     *
+     * @param classicGroup  The ClassicGroup to convert.
+     * @param records       The list of Records.
+     * @return  The created ConsumerGroup.

Review Comment:
   nit: There is an extra space after `return`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -775,6 +778,72 @@ public ClassicGroup classicGroup(
         }
     }
 
+    /**
+     * Validates the online upgrade if the Classic Group receives a 
ConsumerGroupHeartbeat request.
+     *
+     * @param classicGroup A ClassicGroup.
+     * @return the boolean indicating whether it's valid to online upgrade the 
classic group.

Review Comment:
   nit: `The...`



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -998,4 +1088,133 @@ public ConsumerGroupDescribeResponseData.DescribedGroup 
asDescribedGroup(
         );
         return describedGroup;
     }
+
+    /**
+     * Create a new consumer group according to the given classic group.
+     *
+     * @param snapshotRegistry  The SnapshotRegistry.
+     * @param metrics           The GroupCoordinatorMetricsShard.
+     * @param classicGroup      The converted classic group.
+     * @param topicsImage       The TopicsImage for topic id and topic name 
conversion.
+     * @param log               The logger to use.
+     * @return  The created ConsumerGruop.
+     */
+    public static ConsumerGroup fromClassicGroup(
+        SnapshotRegistry snapshotRegistry,
+        GroupCoordinatorMetricsShard metrics,
+        ClassicGroup classicGroup,
+        TopicsImage topicsImage,
+        Logger log

Review Comment:
   nit: It looks like `log` is not used anymore.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -998,4 +1088,133 @@ public ConsumerGroupDescribeResponseData.DescribedGroup 
asDescribedGroup(
         );
         return describedGroup;
     }
+
+    /**
+     * Create a new consumer group according to the given classic group.
+     *
+     * @param snapshotRegistry  The SnapshotRegistry.
+     * @param metrics           The GroupCoordinatorMetricsShard.
+     * @param classicGroup      The converted classic group.
+     * @param topicsImage       The TopicsImage for topic id and topic name 
conversion.
+     * @param log               The logger to use.
+     * @return  The created ConsumerGruop.
+     */
+    public static ConsumerGroup fromClassicGroup(
+        SnapshotRegistry snapshotRegistry,
+        GroupCoordinatorMetricsShard metrics,
+        ClassicGroup classicGroup,
+        TopicsImage topicsImage,
+        Logger log
+    ) {
+        String groupId = classicGroup.groupId();
+        ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
groupId, metrics);
+        consumerGroup.setGroupEpoch(classicGroup.generationId());
+        consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId());
+
+        classicGroup.allMembers().forEach(classicGroupMember -> {
+            // The new ConsumerGroupMember's assignedPartitions and 
targetAssignmentSet need to be the same
+            // in order to keep it stable. Thus, both of them are set to be 
classicGroupMember.assignment().
+            // If the consumer's real assigned partitions haven't been updated 
according to
+            // classicGroupMember.assignment(), it will retry the request.
+            ConsumerPartitionAssignor.Assignment assignment = 
ConsumerProtocol.deserializeAssignment(
+                ByteBuffer.wrap(classicGroupMember.assignment()));
+            Map<Uuid, Set<Integer>> partitions = 
topicPartitionMapFromList(assignment.partitions(), topicsImage);
+
+            ConsumerPartitionAssignor.Subscription subscription = 
ConsumerProtocol.deserializeSubscription(
+                
ByteBuffer.wrap(classicGroupMember.metadata(classicGroup.protocolName().get())));
+
+            ConsumerGroupMember newMember = new 
ConsumerGroupMember.Builder(classicGroupMember.memberId())
+                .setMemberEpoch(classicGroup.generationId())
+                .setState(MemberState.STABLE)
+                .setPreviousMemberEpoch(classicGroup.generationId())
+                
.setInstanceId(classicGroupMember.groupInstanceId().orElse(null))
+                .setRackId(subscription.rackId().orElse(null))
+                .setRebalanceTimeoutMs(classicGroupMember.rebalanceTimeoutMs())
+                .setClientId(classicGroupMember.clientId())
+                .setClientHost(classicGroupMember.clientHost())
+                .setSubscribedTopicNames(subscription.topics())
+                .setAssignedPartitions(partitions)
+                
.setSupportedClassicProtocols(classicGroupMember.supportedProtocols())
+                .build();
+            consumerGroup.updateTargetAssignment(newMember.memberId(), new 
Assignment(partitions));
+            consumerGroup.updateMember(newMember);
+        });
+
+        return consumerGroup;
+    }
+
+    /**
+     * Populate the record list with the records needed to create the given 
consumer group.
+     *
+     * @param records           The list to which the new records are added.
+     */
+    public void createConsumerGroupRecords(
+        List<Record> records
+    ) {
+        members().forEach((__, consumerGroupMember) ->
+            records.add(RecordHelpers.newMemberSubscriptionRecord(groupId(), 
consumerGroupMember))
+        );
+
+        records.add(RecordHelpers.newGroupEpochRecord(groupId(), 
groupEpoch()));
+
+        members().forEach((consumerGroupMemberId, consumerGroupMember) ->
+            records.add(RecordHelpers.newTargetAssignmentRecord(
+                groupId(),
+                consumerGroupMemberId,
+                targetAssignment(consumerGroupMemberId).partitions()
+            ))
+        );
+
+        records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(), 
groupEpoch()));
+
+        members().forEach((__, consumerGroupMember) ->
+            records.add(RecordHelpers.newCurrentAssignmentRecord(groupId(), 
consumerGroupMember))
+        );
+    }
+
+    /**
+     * Converts the list of TopicPartition to a map of topic id and partition 
set.
+     */
+    private static Map<Uuid, Set<Integer>> topicPartitionMapFromList(
+        List<TopicPartition> partitions,
+        TopicsImage topicsImage
+    ) {
+        Map<Uuid, Set<Integer>> topicPartitionMap = new HashMap<>();
+        partitions.forEach(topicPartition -> {
+            TopicImage topicImage = 
topicsImage.getTopic(topicPartition.topic());
+            if (topicImage != null) {
+                topicPartitionMap
+                    .computeIfAbsent(topicImage.id(), __ -> new HashSet<>())
+                    .add(topicPartition.partition());
+            }
+        });
+        return topicPartitionMap;
+    }
+
+    /**
+     * Checks whether at least one of the given protocols can be supported. A
+     * protocol can be supported if it is supported by all members that use the
+     * classic protocol.
+     *
+     * @param memberProtocolType  the member protocol type.
+     * @param memberProtocols     the set of protocol names.
+     *
+     * @return a boolean based on the condition mentioned above.
+     */
+    public boolean supportsClassicProtocols(String memberProtocolType, 
Set<String> memberProtocols) {
+        if (isEmpty()) {
+            return !memberProtocolType.isEmpty() && !memberProtocols.isEmpty();
+        } else {
+            return ConsumerProtocol.PROTOCOL_TYPE.equals(memberProtocolType) &&
+                memberProtocols.stream()
+                    .anyMatch(name -> 
classicProtocolMembersSupportedProtocols.getOrDefault(name, 0) == 
numClassicProtocolMembers());
+        }
+    }
+
+    /**
+     * @return The boolean indicating whether all the members use the classic 
protocol.

Review Comment:
   nit: `The` -> `A`?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -998,4 +1088,133 @@ public ConsumerGroupDescribeResponseData.DescribedGroup 
asDescribedGroup(
         );
         return describedGroup;
     }
+
+    /**
+     * Create a new consumer group according to the given classic group.
+     *
+     * @param snapshotRegistry  The SnapshotRegistry.
+     * @param metrics           The GroupCoordinatorMetricsShard.
+     * @param classicGroup      The converted classic group.
+     * @param topicsImage       The TopicsImage for topic id and topic name 
conversion.
+     * @param log               The logger to use.
+     * @return  The created ConsumerGruop.
+     */
+    public static ConsumerGroup fromClassicGroup(
+        SnapshotRegistry snapshotRegistry,
+        GroupCoordinatorMetricsShard metrics,
+        ClassicGroup classicGroup,
+        TopicsImage topicsImage,
+        Logger log
+    ) {
+        String groupId = classicGroup.groupId();
+        ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
groupId, metrics);
+        consumerGroup.setGroupEpoch(classicGroup.generationId());
+        consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId());
+
+        classicGroup.allMembers().forEach(classicGroupMember -> {
+            // The new ConsumerGroupMember's assignedPartitions and 
targetAssignmentSet need to be the same
+            // in order to keep it stable. Thus, both of them are set to be 
classicGroupMember.assignment().
+            // If the consumer's real assigned partitions haven't been updated 
according to
+            // classicGroupMember.assignment(), it will retry the request.
+            ConsumerPartitionAssignor.Assignment assignment = 
ConsumerProtocol.deserializeAssignment(
+                ByteBuffer.wrap(classicGroupMember.assignment()));
+            Map<Uuid, Set<Integer>> partitions = 
topicPartitionMapFromList(assignment.partitions(), topicsImage);
+
+            ConsumerPartitionAssignor.Subscription subscription = 
ConsumerProtocol.deserializeSubscription(
+                
ByteBuffer.wrap(classicGroupMember.metadata(classicGroup.protocolName().get())));
+
+            ConsumerGroupMember newMember = new 
ConsumerGroupMember.Builder(classicGroupMember.memberId())
+                .setMemberEpoch(classicGroup.generationId())
+                .setState(MemberState.STABLE)
+                .setPreviousMemberEpoch(classicGroup.generationId())
+                
.setInstanceId(classicGroupMember.groupInstanceId().orElse(null))
+                .setRackId(subscription.rackId().orElse(null))
+                .setRebalanceTimeoutMs(classicGroupMember.rebalanceTimeoutMs())
+                .setClientId(classicGroupMember.clientId())
+                .setClientHost(classicGroupMember.clientHost())
+                .setSubscribedTopicNames(subscription.topics())
+                .setAssignedPartitions(partitions)
+                
.setSupportedClassicProtocols(classicGroupMember.supportedProtocols())
+                .build();
+            consumerGroup.updateTargetAssignment(newMember.memberId(), new 
Assignment(partitions));
+            consumerGroup.updateMember(newMember);
+        });
+
+        return consumerGroup;
+    }
+
+    /**
+     * Populate the record list with the records needed to create the given 
consumer group.
+     *
+     * @param records           The list to which the new records are added.
+     */
+    public void createConsumerGroupRecords(
+        List<Record> records
+    ) {
+        members().forEach((__, consumerGroupMember) ->
+            records.add(RecordHelpers.newMemberSubscriptionRecord(groupId(), 
consumerGroupMember))
+        );
+
+        records.add(RecordHelpers.newGroupEpochRecord(groupId(), 
groupEpoch()));
+
+        members().forEach((consumerGroupMemberId, consumerGroupMember) ->
+            records.add(RecordHelpers.newTargetAssignmentRecord(
+                groupId(),
+                consumerGroupMemberId,
+                targetAssignment(consumerGroupMemberId).partitions()
+            ))
+        );
+
+        records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(), 
groupEpoch()));
+
+        members().forEach((__, consumerGroupMember) ->
+            records.add(RecordHelpers.newCurrentAssignmentRecord(groupId(), 
consumerGroupMember))
+        );
+    }
+
+    /**
+     * Converts the list of TopicPartition to a map of topic id and partition 
set.
+     */
+    private static Map<Uuid, Set<Integer>> topicPartitionMapFromList(
+        List<TopicPartition> partitions,
+        TopicsImage topicsImage
+    ) {
+        Map<Uuid, Set<Integer>> topicPartitionMap = new HashMap<>();
+        partitions.forEach(topicPartition -> {
+            TopicImage topicImage = 
topicsImage.getTopic(topicPartition.topic());
+            if (topicImage != null) {
+                topicPartitionMap
+                    .computeIfAbsent(topicImage.id(), __ -> new HashSet<>())
+                    .add(topicPartition.partition());
+            }
+        });
+        return topicPartitionMap;
+    }
+
+    /**
+     * Checks whether at least one of the given protocols can be supported. A
+     * protocol can be supported if it is supported by all members that use the
+     * classic protocol.
+     *
+     * @param memberProtocolType  the member protocol type.
+     * @param memberProtocols     the set of protocol names.
+     *
+     * @return a boolean based on the condition mentioned above.
+     */
+    public boolean supportsClassicProtocols(String memberProtocolType, 
Set<String> memberProtocols) {
+        if (isEmpty()) {
+            return !memberProtocolType.isEmpty() && !memberProtocols.isEmpty();
+        } else {
+            return ConsumerProtocol.PROTOCOL_TYPE.equals(memberProtocolType) &&
+                memberProtocols.stream()
+                    .anyMatch(name -> 
classicProtocolMembersSupportedProtocols.getOrDefault(name, 0) == 
numClassicProtocolMembers());

Review Comment:
   I find this line hard to parse. I wonder if it would be better like this:
   
   ```
   return ConsumerProtocol.PROTOCOL_TYPE.equals(memberProtocolType) && 
memberProtocols.stream().anyMatch(
        name -> classicProtocolMembersSupportedProtocols.getOrDefault(name, 0) 
== numClassicProtocolMembers()
   );
   ```
   I leave it up to you to decide.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -827,6 +871,52 @@ private static void maybeUpdateServerAssignors(
         }
     }
 
+    /**
+     * Updates the number of the members that use the classic protocol.
+     *
+     * @param oldMember The old member.
+     * @param newMember The new member.
+     */
+    private void maybeUpdateNumClassicProtocolMembers(
+        ConsumerGroupMember oldMember,
+        ConsumerGroupMember newMember
+    ) {
+        int delta = 0;
+        if (oldMember != null && oldMember.useClassicProtocol()) {
+            delta--;
+        }
+        if (newMember != null && newMember.useClassicProtocol()) {
+            delta++;
+        }
+        setNumClassicProtocolMembers(numClassicProtocolMembers() + delta);
+    }
+
+    /**
+     * Updates the supported protocol count of the members that use the 
classic protocol.
+     *
+     * @param oldMember The old member.
+     * @param newMember The new member.
+     */
+    private void maybeUpdateClassicProtocolMembersSupportedProtocols(
+        ConsumerGroupMember oldMember,
+        ConsumerGroupMember newMember
+    ) {
+        if (oldMember != null && oldMember.useClassicProtocol()) {
+            oldMember.supportedClassicProtocols().ifPresent(protocols ->
+                protocols.forEach(protocol ->
+                    
classicProtocolMembersSupportedProtocols.compute(protocol.name(), 
ConsumerGroup::decValue)
+                )
+            );
+        }
+        if (newMember != null && newMember.useClassicProtocol()) {

Review Comment:
   ditto.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -827,6 +871,52 @@ private static void maybeUpdateServerAssignors(
         }
     }
 
+    /**
+     * Updates the number of the members that use the classic protocol.
+     *
+     * @param oldMember The old member.
+     * @param newMember The new member.
+     */
+    private void maybeUpdateNumClassicProtocolMembers(
+        ConsumerGroupMember oldMember,
+        ConsumerGroupMember newMember
+    ) {
+        int delta = 0;
+        if (oldMember != null && oldMember.useClassicProtocol()) {
+            delta--;
+        }
+        if (newMember != null && newMember.useClassicProtocol()) {
+            delta++;
+        }
+        setNumClassicProtocolMembers(numClassicProtocolMembers() + delta);
+    }
+
+    /**
+     * Updates the supported protocol count of the members that use the 
classic protocol.
+     *
+     * @param oldMember The old member.
+     * @param newMember The new member.
+     */
+    private void maybeUpdateClassicProtocolMembersSupportedProtocols(
+        ConsumerGroupMember oldMember,
+        ConsumerGroupMember newMember
+    ) {
+        if (oldMember != null && oldMember.useClassicProtocol()) {

Review Comment:
   nit: The `oldMember.useClassicProtocol()` condition is not necessary because 
`supportedClassicProtocols()` used below returns an empty optional if it does 
not.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -775,6 +778,72 @@ public ClassicGroup classicGroup(
         }
     }
 
+    /**
+     * Validates the online upgrade if the Classic Group receives a 
ConsumerGroupHeartbeat request.
+     *
+     * @param classicGroup A ClassicGroup.
+     * @return the boolean indicating whether it's valid to online upgrade the 
classic group.
+     */
+    private boolean validateOnlineUpgrade(ClassicGroup classicGroup) {
+        if (!consumerGroupMigrationPolicy.isUpgradeEnabled()) {
+            log.info("Cannot upgrade classic group {} to consumer group 
because the online upgrade is disabled.",
+                classicGroup.groupId());
+            return false;
+        } else if (!classicGroup.usesConsumerGroupProtocol()) {
+            log.info("Cannot upgrade classic group {} to consumer group 
because the group does not use the consumer embedded protocol.",
+                classicGroup.groupId());
+            return false;
+        } else if (classicGroup.size() > consumerGroupMaxSize) {
+            log.info("Cannot upgrade classic group {} to consumer group 
because the group size exceeds the consumer group maximum size.",
+                classicGroup.groupId());
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * Creates a ConsumerGroup corresponding to the given classic group.
+     *
+     * @param classicGroup  The ClassicGroup to convert.
+     * @param records       The list of Records.
+     * @return  The created ConsumerGroup.
+     */
+    ConsumerGroup convertToConsumerGroup(ClassicGroup classicGroup, 
List<Record> records) {
+        // The upgrade is always triggered by a new member joining the classic 
group, which always results in
+        // updatedMember.subscribedTopicNames changing, the group epoch being 
bumped, and triggering a new rebalance.
+        // If the ClassicGroup is rebalancing, inform the awaiting consumers 
of another ongoing rebalance
+        // so that they will rejoin for the new rebalance.
+        classicGroup.completeAllJoinFutures(Errors.REBALANCE_IN_PROGRESS);
+        classicGroup.completeAllSyncFutures(Errors.REBALANCE_IN_PROGRESS);
+
+        classicGroup.createGroupTombstoneRecords(records);
+        ConsumerGroup consumerGroup;

Review Comment:
   nit: Should we put an empty line before this one?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -775,6 +778,72 @@ public ClassicGroup classicGroup(
         }
     }
 
+    /**
+     * Validates the online upgrade if the Classic Group receives a 
ConsumerGroupHeartbeat request.
+     *
+     * @param classicGroup A ClassicGroup.
+     * @return the boolean indicating whether it's valid to online upgrade the 
classic group.
+     */
+    private boolean validateOnlineUpgrade(ClassicGroup classicGroup) {
+        if (!consumerGroupMigrationPolicy.isUpgradeEnabled()) {
+            log.info("Cannot upgrade classic group {} to consumer group 
because the online upgrade is disabled.",
+                classicGroup.groupId());
+            return false;
+        } else if (!classicGroup.usesConsumerGroupProtocol()) {
+            log.info("Cannot upgrade classic group {} to consumer group 
because the group does not use the consumer embedded protocol.",
+                classicGroup.groupId());
+            return false;
+        } else if (classicGroup.size() > consumerGroupMaxSize) {
+            log.info("Cannot upgrade classic group {} to consumer group 
because the group size exceeds the consumer group maximum size.",
+                classicGroup.groupId());
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * Creates a ConsumerGroup corresponding to the given classic group.
+     *
+     * @param classicGroup  The ClassicGroup to convert.
+     * @param records       The list of Records.
+     * @return  The created ConsumerGroup.
+     */
+    ConsumerGroup convertToConsumerGroup(ClassicGroup classicGroup, 
List<Record> records) {
+        // The upgrade is always triggered by a new member joining the classic 
group, which always results in
+        // updatedMember.subscribedTopicNames changing, the group epoch being 
bumped, and triggering a new rebalance.
+        // If the ClassicGroup is rebalancing, inform the awaiting consumers 
of another ongoing rebalance
+        // so that they will rejoin for the new rebalance.
+        classicGroup.completeAllJoinFutures(Errors.REBALANCE_IN_PROGRESS);
+        classicGroup.completeAllSyncFutures(Errors.REBALANCE_IN_PROGRESS);
+
+        classicGroup.createGroupTombstoneRecords(records);
+        ConsumerGroup consumerGroup;
+        try {
+            consumerGroup = ConsumerGroup.fromClassicGroup(
+                snapshotRegistry,
+                metrics,
+                classicGroup,
+                metadataImage.topics(),
+                log
+            );
+        } catch (SchemaException e) {
+            log.warn("Cannot upgrade the classic group " + 
classicGroup.groupId() + ": fail to parse " +
+                "the Consumer Protocol " + ConsumerProtocol.PROTOCOL_TYPE + 
":" + classicGroup.protocolName().get() + ".", e);
+
+            throw new GroupIdNotFoundException(String.format("Cannot upgrade 
the classic group %s: %s.",

Review Comment:
   nit: `Cannot upgrade the classic group " + classicGroup.groupId() + " to 
consumer group because the embedded consumer protocol is malformed`. I would 
not put the exception in the error here.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -998,4 +1088,133 @@ public ConsumerGroupDescribeResponseData.DescribedGroup 
asDescribedGroup(
         );
         return describedGroup;
     }
+
+    /**
+     * Create a new consumer group according to the given classic group.
+     *
+     * @param snapshotRegistry  The SnapshotRegistry.
+     * @param metrics           The GroupCoordinatorMetricsShard.
+     * @param classicGroup      The converted classic group.
+     * @param topicsImage       The TopicsImage for topic id and topic name 
conversion.
+     * @param log               The logger to use.
+     * @return  The created ConsumerGruop.
+     */
+    public static ConsumerGroup fromClassicGroup(
+        SnapshotRegistry snapshotRegistry,
+        GroupCoordinatorMetricsShard metrics,
+        ClassicGroup classicGroup,
+        TopicsImage topicsImage,
+        Logger log
+    ) {
+        String groupId = classicGroup.groupId();
+        ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
groupId, metrics);
+        consumerGroup.setGroupEpoch(classicGroup.generationId());
+        consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId());
+
+        classicGroup.allMembers().forEach(classicGroupMember -> {
+            // The new ConsumerGroupMember's assignedPartitions and 
targetAssignmentSet need to be the same
+            // in order to keep it stable. Thus, both of them are set to be 
classicGroupMember.assignment().
+            // If the consumer's real assigned partitions haven't been updated 
according to
+            // classicGroupMember.assignment(), it will retry the request.
+            ConsumerPartitionAssignor.Assignment assignment = 
ConsumerProtocol.deserializeAssignment(
+                ByteBuffer.wrap(classicGroupMember.assignment()));
+            Map<Uuid, Set<Integer>> partitions = 
topicPartitionMapFromList(assignment.partitions(), topicsImage);
+
+            ConsumerPartitionAssignor.Subscription subscription = 
ConsumerProtocol.deserializeSubscription(
+                
ByteBuffer.wrap(classicGroupMember.metadata(classicGroup.protocolName().get())));
+
+            ConsumerGroupMember newMember = new 
ConsumerGroupMember.Builder(classicGroupMember.memberId())
+                .setMemberEpoch(classicGroup.generationId())
+                .setState(MemberState.STABLE)
+                .setPreviousMemberEpoch(classicGroup.generationId())
+                
.setInstanceId(classicGroupMember.groupInstanceId().orElse(null))
+                .setRackId(subscription.rackId().orElse(null))
+                .setRebalanceTimeoutMs(classicGroupMember.rebalanceTimeoutMs())
+                .setClientId(classicGroupMember.clientId())
+                .setClientHost(classicGroupMember.clientHost())
+                .setSubscribedTopicNames(subscription.topics())
+                .setAssignedPartitions(partitions)
+                
.setSupportedClassicProtocols(classicGroupMember.supportedProtocols())
+                .build();
+            consumerGroup.updateTargetAssignment(newMember.memberId(), new 
Assignment(partitions));
+            consumerGroup.updateMember(newMember);
+        });
+
+        return consumerGroup;
+    }
+
+    /**
+     * Populate the record list with the records needed to create the given 
consumer group.
+     *
+     * @param records           The list to which the new records are added.
+     */
+    public void createConsumerGroupRecords(
+        List<Record> records
+    ) {
+        members().forEach((__, consumerGroupMember) ->
+            records.add(RecordHelpers.newMemberSubscriptionRecord(groupId(), 
consumerGroupMember))
+        );
+
+        records.add(RecordHelpers.newGroupEpochRecord(groupId(), 
groupEpoch()));
+
+        members().forEach((consumerGroupMemberId, consumerGroupMember) ->
+            records.add(RecordHelpers.newTargetAssignmentRecord(
+                groupId(),
+                consumerGroupMemberId,
+                targetAssignment(consumerGroupMemberId).partitions()
+            ))
+        );
+
+        records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(), 
groupEpoch()));
+
+        members().forEach((__, consumerGroupMember) ->
+            records.add(RecordHelpers.newCurrentAssignmentRecord(groupId(), 
consumerGroupMember))
+        );
+    }
+
+    /**
+     * Converts the list of TopicPartition to a map of topic id and partition 
set.

Review Comment:
   nit: `@return ...`



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -998,4 +1088,133 @@ public ConsumerGroupDescribeResponseData.DescribedGroup 
asDescribedGroup(
         );
         return describedGroup;
     }
+
+    /**
+     * Create a new consumer group according to the given classic group.
+     *
+     * @param snapshotRegistry  The SnapshotRegistry.
+     * @param metrics           The GroupCoordinatorMetricsShard.
+     * @param classicGroup      The converted classic group.
+     * @param topicsImage       The TopicsImage for topic id and topic name 
conversion.
+     * @param log               The logger to use.
+     * @return  The created ConsumerGruop.
+     */
+    public static ConsumerGroup fromClassicGroup(
+        SnapshotRegistry snapshotRegistry,
+        GroupCoordinatorMetricsShard metrics,
+        ClassicGroup classicGroup,
+        TopicsImage topicsImage,
+        Logger log
+    ) {
+        String groupId = classicGroup.groupId();
+        ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
groupId, metrics);
+        consumerGroup.setGroupEpoch(classicGroup.generationId());
+        consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId());
+
+        classicGroup.allMembers().forEach(classicGroupMember -> {
+            // The new ConsumerGroupMember's assignedPartitions and 
targetAssignmentSet need to be the same
+            // in order to keep it stable. Thus, both of them are set to be 
classicGroupMember.assignment().
+            // If the consumer's real assigned partitions haven't been updated 
according to
+            // classicGroupMember.assignment(), it will retry the request.
+            ConsumerPartitionAssignor.Assignment assignment = 
ConsumerProtocol.deserializeAssignment(
+                ByteBuffer.wrap(classicGroupMember.assignment()));
+            Map<Uuid, Set<Integer>> partitions = 
topicPartitionMapFromList(assignment.partitions(), topicsImage);
+
+            ConsumerPartitionAssignor.Subscription subscription = 
ConsumerProtocol.deserializeSubscription(
+                
ByteBuffer.wrap(classicGroupMember.metadata(classicGroup.protocolName().get())));
+
+            ConsumerGroupMember newMember = new 
ConsumerGroupMember.Builder(classicGroupMember.memberId())
+                .setMemberEpoch(classicGroup.generationId())
+                .setState(MemberState.STABLE)
+                .setPreviousMemberEpoch(classicGroup.generationId())
+                
.setInstanceId(classicGroupMember.groupInstanceId().orElse(null))
+                .setRackId(subscription.rackId().orElse(null))
+                .setRebalanceTimeoutMs(classicGroupMember.rebalanceTimeoutMs())
+                .setClientId(classicGroupMember.clientId())
+                .setClientHost(classicGroupMember.clientHost())
+                .setSubscribedTopicNames(subscription.topics())
+                .setAssignedPartitions(partitions)
+                
.setSupportedClassicProtocols(classicGroupMember.supportedProtocols())
+                .build();
+            consumerGroup.updateTargetAssignment(newMember.memberId(), new 
Assignment(partitions));
+            consumerGroup.updateMember(newMember);
+        });
+
+        return consumerGroup;
+    }
+
+    /**
+     * Populate the record list with the records needed to create the given 
consumer group.
+     *
+     * @param records           The list to which the new records are added.

Review Comment:
   nit: We can remove the extra spaces after `records`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -998,4 +1088,133 @@ public ConsumerGroupDescribeResponseData.DescribedGroup 
asDescribedGroup(
         );
         return describedGroup;
     }
+
+    /**
+     * Create a new consumer group according to the given classic group.
+     *
+     * @param snapshotRegistry  The SnapshotRegistry.
+     * @param metrics           The GroupCoordinatorMetricsShard.
+     * @param classicGroup      The converted classic group.
+     * @param topicsImage       The TopicsImage for topic id and topic name 
conversion.
+     * @param log               The logger to use.
+     * @return  The created ConsumerGruop.
+     */
+    public static ConsumerGroup fromClassicGroup(
+        SnapshotRegistry snapshotRegistry,
+        GroupCoordinatorMetricsShard metrics,
+        ClassicGroup classicGroup,
+        TopicsImage topicsImage,
+        Logger log
+    ) {
+        String groupId = classicGroup.groupId();
+        ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
groupId, metrics);
+        consumerGroup.setGroupEpoch(classicGroup.generationId());
+        consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId());
+
+        classicGroup.allMembers().forEach(classicGroupMember -> {
+            // The new ConsumerGroupMember's assignedPartitions and 
targetAssignmentSet need to be the same
+            // in order to keep it stable. Thus, both of them are set to be 
classicGroupMember.assignment().
+            // If the consumer's real assigned partitions haven't been updated 
according to
+            // classicGroupMember.assignment(), it will retry the request.
+            ConsumerPartitionAssignor.Assignment assignment = 
ConsumerProtocol.deserializeAssignment(
+                ByteBuffer.wrap(classicGroupMember.assignment()));
+            Map<Uuid, Set<Integer>> partitions = 
topicPartitionMapFromList(assignment.partitions(), topicsImage);
+
+            ConsumerPartitionAssignor.Subscription subscription = 
ConsumerProtocol.deserializeSubscription(
+                
ByteBuffer.wrap(classicGroupMember.metadata(classicGroup.protocolName().get())));
+
+            ConsumerGroupMember newMember = new 
ConsumerGroupMember.Builder(classicGroupMember.memberId())
+                .setMemberEpoch(classicGroup.generationId())
+                .setState(MemberState.STABLE)
+                .setPreviousMemberEpoch(classicGroup.generationId())
+                
.setInstanceId(classicGroupMember.groupInstanceId().orElse(null))
+                .setRackId(subscription.rackId().orElse(null))
+                .setRebalanceTimeoutMs(classicGroupMember.rebalanceTimeoutMs())
+                .setClientId(classicGroupMember.clientId())
+                .setClientHost(classicGroupMember.clientHost())
+                .setSubscribedTopicNames(subscription.topics())
+                .setAssignedPartitions(partitions)
+                
.setSupportedClassicProtocols(classicGroupMember.supportedProtocols())
+                .build();
+            consumerGroup.updateTargetAssignment(newMember.memberId(), new 
Assignment(partitions));
+            consumerGroup.updateMember(newMember);
+        });
+
+        return consumerGroup;
+    }
+
+    /**
+     * Populate the record list with the records needed to create the given 
consumer group.
+     *
+     * @param records           The list to which the new records are added.
+     */
+    public void createConsumerGroupRecords(
+        List<Record> records
+    ) {
+        members().forEach((__, consumerGroupMember) ->
+            records.add(RecordHelpers.newMemberSubscriptionRecord(groupId(), 
consumerGroupMember))
+        );
+
+        records.add(RecordHelpers.newGroupEpochRecord(groupId(), 
groupEpoch()));
+
+        members().forEach((consumerGroupMemberId, consumerGroupMember) ->
+            records.add(RecordHelpers.newTargetAssignmentRecord(
+                groupId(),
+                consumerGroupMemberId,
+                targetAssignment(consumerGroupMemberId).partitions()
+            ))
+        );
+
+        records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(), 
groupEpoch()));
+
+        members().forEach((__, consumerGroupMember) ->
+            records.add(RecordHelpers.newCurrentAssignmentRecord(groupId(), 
consumerGroupMember))
+        );
+    }
+
+    /**
+     * Converts the list of TopicPartition to a map of topic id and partition 
set.
+     */
+    private static Map<Uuid, Set<Integer>> topicPartitionMapFromList(
+        List<TopicPartition> partitions,
+        TopicsImage topicsImage
+    ) {
+        Map<Uuid, Set<Integer>> topicPartitionMap = new HashMap<>();
+        partitions.forEach(topicPartition -> {
+            TopicImage topicImage = 
topicsImage.getTopic(topicPartition.topic());
+            if (topicImage != null) {
+                topicPartitionMap
+                    .computeIfAbsent(topicImage.id(), __ -> new HashSet<>())
+                    .add(topicPartition.partition());
+            }
+        });
+        return topicPartitionMap;
+    }
+
+    /**
+     * Checks whether at least one of the given protocols can be supported. A
+     * protocol can be supported if it is supported by all members that use the
+     * classic protocol.
+     *
+     * @param memberProtocolType  the member protocol type.
+     * @param memberProtocols     the set of protocol names.

Review Comment:
   nit: `The...`



##########
group-coordinator/src/main/resources/common/message/ConsumerGroupMemberMetadataValue.json:
##########
@@ -35,6 +35,20 @@
     { "name": "RebalanceTimeoutMs", "type": "int32", "versions": "0+", 
"default": -1,
       "about": "The rebalance timeout" },
     { "name": "ServerAssignor", "versions": "0+", "nullableVersions": "0+", 
"type": "string",
-      "about": "The server assignor to use; or null if not used." }
+      "about": "The server assignor to use; or null if not used." },
+    { "name": "ClassicMemberMetadata", "versions": "0+", "nullableVersions": 
"0+", "type": "ClassicMemberMetadata",

Review Comment:
   Should we add `"default": "null"` too?



##########
group-coordinator/src/main/resources/common/message/ConsumerGroupMemberMetadataValue.json:
##########
@@ -35,6 +35,20 @@
     { "name": "RebalanceTimeoutMs", "type": "int32", "versions": "0+", 
"default": -1,
       "about": "The rebalance timeout" },
     { "name": "ServerAssignor", "versions": "0+", "nullableVersions": "0+", 
"type": "string",
-      "about": "The server assignor to use; or null if not used." }
+      "about": "The server assignor to use; or null if not used." },
+    { "name": "ClassicMemberMetadata", "versions": "0+", "nullableVersions": 
"0+", "type": "ClassicMemberMetadata",
+      "about": "The classic member metadata which includes the supported 
protocols of the consumer if it uses the classic protocol. The metadata is null 
if the consumer uses the consumer protocol.",
+      "fields": [
+        { "name": "SupportedProtocols", "type": "[]ClassicProtocol", 
"versions": "0+", "taggedVersions": "0+", "tag": 0,

Review Comment:
   I think that we should rather put the tag on the `ClassicMemberMetadata` 
structure. Is there a reason why you did not put it there?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -998,4 +1088,133 @@ public ConsumerGroupDescribeResponseData.DescribedGroup 
asDescribedGroup(
         );
         return describedGroup;
     }
+
+    /**
+     * Create a new consumer group according to the given classic group.
+     *
+     * @param snapshotRegistry  The SnapshotRegistry.
+     * @param metrics           The GroupCoordinatorMetricsShard.
+     * @param classicGroup      The converted classic group.
+     * @param topicsImage       The TopicsImage for topic id and topic name 
conversion.
+     * @param log               The logger to use.
+     * @return  The created ConsumerGruop.
+     */
+    public static ConsumerGroup fromClassicGroup(
+        SnapshotRegistry snapshotRegistry,
+        GroupCoordinatorMetricsShard metrics,
+        ClassicGroup classicGroup,
+        TopicsImage topicsImage,
+        Logger log
+    ) {
+        String groupId = classicGroup.groupId();
+        ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
groupId, metrics);
+        consumerGroup.setGroupEpoch(classicGroup.generationId());
+        consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId());
+
+        classicGroup.allMembers().forEach(classicGroupMember -> {
+            // The new ConsumerGroupMember's assignedPartitions and 
targetAssignmentSet need to be the same
+            // in order to keep it stable. Thus, both of them are set to be 
classicGroupMember.assignment().
+            // If the consumer's real assigned partitions haven't been updated 
according to
+            // classicGroupMember.assignment(), it will retry the request.

Review Comment:
   Let's move this comment to right before `newMember`. I would say the 
following:
   
   > The target assignment and the assigned partitions of each member are set 
based on the last assignment of the classic group. All the members are put in 
the Stable state. If the classic group was in Preparing Rebalance or Completing 
Rebalance states, the classic members are asked to rejoin the group to 
re-trigger a rebalance or collect their assignments.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -998,4 +1088,133 @@ public ConsumerGroupDescribeResponseData.DescribedGroup 
asDescribedGroup(
         );
         return describedGroup;
     }
+
+    /**
+     * Create a new consumer group according to the given classic group.
+     *
+     * @param snapshotRegistry  The SnapshotRegistry.
+     * @param metrics           The GroupCoordinatorMetricsShard.
+     * @param classicGroup      The converted classic group.
+     * @param topicsImage       The TopicsImage for topic id and topic name 
conversion.
+     * @param log               The logger to use.
+     * @return  The created ConsumerGruop.
+     */
+    public static ConsumerGroup fromClassicGroup(
+        SnapshotRegistry snapshotRegistry,
+        GroupCoordinatorMetricsShard metrics,
+        ClassicGroup classicGroup,
+        TopicsImage topicsImage,
+        Logger log
+    ) {
+        String groupId = classicGroup.groupId();
+        ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
groupId, metrics);
+        consumerGroup.setGroupEpoch(classicGroup.generationId());
+        consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId());
+
+        classicGroup.allMembers().forEach(classicGroupMember -> {
+            // The new ConsumerGroupMember's assignedPartitions and 
targetAssignmentSet need to be the same
+            // in order to keep it stable. Thus, both of them are set to be 
classicGroupMember.assignment().
+            // If the consumer's real assigned partitions haven't been updated 
according to
+            // classicGroupMember.assignment(), it will retry the request.
+            ConsumerPartitionAssignor.Assignment assignment = 
ConsumerProtocol.deserializeAssignment(
+                ByteBuffer.wrap(classicGroupMember.assignment()));
+            Map<Uuid, Set<Integer>> partitions = 
topicPartitionMapFromList(assignment.partitions(), topicsImage);
+
+            ConsumerPartitionAssignor.Subscription subscription = 
ConsumerProtocol.deserializeSubscription(
+                
ByteBuffer.wrap(classicGroupMember.metadata(classicGroup.protocolName().get())));
+
+            ConsumerGroupMember newMember = new 
ConsumerGroupMember.Builder(classicGroupMember.memberId())
+                .setMemberEpoch(classicGroup.generationId())
+                .setState(MemberState.STABLE)
+                .setPreviousMemberEpoch(classicGroup.generationId())
+                
.setInstanceId(classicGroupMember.groupInstanceId().orElse(null))
+                .setRackId(subscription.rackId().orElse(null))
+                .setRebalanceTimeoutMs(classicGroupMember.rebalanceTimeoutMs())
+                .setClientId(classicGroupMember.clientId())
+                .setClientHost(classicGroupMember.clientHost())
+                .setSubscribedTopicNames(subscription.topics())
+                .setAssignedPartitions(partitions)
+                
.setSupportedClassicProtocols(classicGroupMember.supportedProtocols())
+                .build();
+            consumerGroup.updateTargetAssignment(newMember.memberId(), new 
Assignment(partitions));
+            consumerGroup.updateMember(newMember);
+        });
+
+        return consumerGroup;
+    }
+
+    /**
+     * Populate the record list with the records needed to create the given 
consumer group.
+     *
+     * @param records           The list to which the new records are added.
+     */
+    public void createConsumerGroupRecords(
+        List<Record> records
+    ) {
+        members().forEach((__, consumerGroupMember) ->
+            records.add(RecordHelpers.newMemberSubscriptionRecord(groupId(), 
consumerGroupMember))
+        );
+
+        records.add(RecordHelpers.newGroupEpochRecord(groupId(), 
groupEpoch()));
+
+        members().forEach((consumerGroupMemberId, consumerGroupMember) ->
+            records.add(RecordHelpers.newTargetAssignmentRecord(
+                groupId(),
+                consumerGroupMemberId,
+                targetAssignment(consumerGroupMemberId).partitions()
+            ))
+        );
+
+        records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(), 
groupEpoch()));
+
+        members().forEach((__, consumerGroupMember) ->
+            records.add(RecordHelpers.newCurrentAssignmentRecord(groupId(), 
consumerGroupMember))
+        );
+    }
+
+    /**
+     * Converts the list of TopicPartition to a map of topic id and partition 
set.
+     */
+    private static Map<Uuid, Set<Integer>> topicPartitionMapFromList(
+        List<TopicPartition> partitions,
+        TopicsImage topicsImage
+    ) {
+        Map<Uuid, Set<Integer>> topicPartitionMap = new HashMap<>();
+        partitions.forEach(topicPartition -> {
+            TopicImage topicImage = 
topicsImage.getTopic(topicPartition.topic());
+            if (topicImage != null) {
+                topicPartitionMap
+                    .computeIfAbsent(topicImage.id(), __ -> new HashSet<>())
+                    .add(topicPartition.partition());
+            }
+        });
+        return topicPartitionMap;
+    }
+
+    /**
+     * Checks whether at least one of the given protocols can be supported. A
+     * protocol can be supported if it is supported by all members that use the
+     * classic protocol.
+     *
+     * @param memberProtocolType  the member protocol type.
+     * @param memberProtocols     the set of protocol names.
+     *
+     * @return a boolean based on the condition mentioned above.

Review Comment:
   nit: `A boolean...`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to