dajac commented on code in PR #15662:
URL: https://github.com/apache/kafka/pull/15662#discussion_r1565866968


##########
group-coordinator/src/main/resources/common/message/ConsumerGroupMemberMetadataValue.json:
##########
@@ -35,6 +35,20 @@
     { "name": "RebalanceTimeoutMs", "type": "int32", "versions": "0+", 
"default": -1,
       "about": "The rebalance timeout" },
     { "name": "ServerAssignor", "versions": "0+", "nullableVersions": "0+", 
"type": "string",
-      "about": "The server assignor to use; or null if not used." }
+      "about": "The server assignor to use; or null if not used." },
+    { "name": "ClassicMemberMetadata", "versions": "0+", "nullableVersions": 
"0+", "type": "ClassicMemberMetadata",

Review Comment:
   Let's make it a tagged field in order to not break the backward 
compatibility of the record. You can do it by adding `"taggedVersions": "0+", 
"tag": 0`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -998,4 +1090,177 @@ public ConsumerGroupDescribeResponseData.DescribedGroup 
asDescribedGroup(
         );
         return describedGroup;
     }
+
+    /**
+     * Create a new consumer group according to the given classic group.
+     *
+     * @param snapshotRegistry  The SnapshotRegistry.
+     * @param metrics           The GroupCoordinatorMetricsShard.
+     * @param classicGroup      The converted classic group.
+     * @param topicsImage       The TopicsImage for topic id and topic name 
conversion.
+     * @param log               The logger to use.
+     * @return  The created ConsumerGruop.
+     */
+    public static ConsumerGroup fromClassicGroup(
+        SnapshotRegistry snapshotRegistry,
+        GroupCoordinatorMetricsShard metrics,
+        ClassicGroup classicGroup,
+        TopicsImage topicsImage,
+        Logger log
+    ) {
+        String groupId = classicGroup.groupId();
+        ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
groupId, metrics);
+        consumerGroup.setGroupEpoch(classicGroup.generationId());
+        consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId());
+
+        classicGroup.allMembers().forEach(classicGroupMember -> {
+            ConsumerPartitionAssignor.Subscription subscription = 
deserializeSubscription(
+                classicGroupMember.metadata(classicGroup.protocolName().get()),
+                log,
+                "group upgrade"
+            );
+            Map<Uuid, Set<Integer>> partitions = 
topicPartitionMapFromList(subscription.ownedPartitions(), topicsImage);
+
+            ConsumerGroupMember newMember = new 
ConsumerGroupMember.Builder(classicGroupMember.memberId())
+                .setMemberEpoch(classicGroup.generationId())
+                .setState(MemberState.STABLE)
+                .setPreviousMemberEpoch(classicGroup.generationId())
+                
.setInstanceId(classicGroupMember.groupInstanceId().orElse(null))
+                .setRackId(subscription.rackId().orElse(null))
+                .setRebalanceTimeoutMs(classicGroupMember.rebalanceTimeoutMs())
+                .setClientId(classicGroupMember.clientId())
+                .setClientHost(classicGroupMember.clientHost())
+                .setSubscribedTopicNames(subscription.topics())
+                .setAssignedPartitions(partitions)
+                
.setSupportedClassicProtocols(classicGroupMember.supportedProtocols())
+                .build();
+            consumerGroup.updateMember(newMember);
+            consumerGroup.updateTargetAssignment(newMember.memberId(), new 
Assignment(partitions));
+        });
+
+        return consumerGroup;
+    }
+
+    /**
+     * Populate the record list with the records needed to create the given 
consumer group.
+     *
+     * @param consumerGroup     The consumer group to create.
+     * @param records           The list to which the new records are added.
+     */
+    public static void createConsumerGroupRecords(
+        ConsumerGroup consumerGroup,
+        List<Record> records
+    ) {
+        String groupId = consumerGroup.groupId;
+
+        consumerGroup.members().forEach((__, consumerGroupMember) ->
+            records.add(RecordHelpers.newMemberSubscriptionRecord(groupId, 
consumerGroupMember))
+        );
+
+        records.add(RecordHelpers.newGroupEpochRecord(groupId, 
consumerGroup.groupEpoch()));
+
+        consumerGroup.members().forEach((consumerGroupMemberId, 
consumerGroupMember) ->
+            records.add(RecordHelpers.newTargetAssignmentRecord(groupId, 
consumerGroupMemberId, 
consumerGroup.targetAssignment(consumerGroupMemberId).partitions()))
+        );
+
+        records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId, 
consumerGroup.groupEpoch()));
+
+        consumerGroup.members().forEach((__, consumerGroupMember) ->
+            records.add(RecordHelpers.newCurrentAssignmentRecord(groupId, 
consumerGroupMember))
+        );
+    }
+
+    /**
+     * Converts the list of TopicPartition to a map of topic id and partition 
set.
+     */
+    private static Map<Uuid, Set<Integer>> topicPartitionMapFromList(
+        List<TopicPartition> partitions,
+        TopicsImage topicsImage
+    ) {
+        Map<Uuid, Set<Integer>> topicPartitionMap = new HashMap<>();
+        partitions.forEach(topicPartition -> {
+            TopicImage topicImage = 
topicsImage.getTopic(topicPartition.topic());
+            if (topicImage != null) {
+                topicPartitionMap
+                    .computeIfAbsent(topicImage.id(), __ -> new HashSet<>())
+                    .add(topicPartition.partition());
+            }
+        });
+        return topicPartitionMap;
+    }
+
+    /**
+     * Checks whether at least one of the given protocols can be supported. A
+     * protocol can be supported if it is supported by all members that use the
+     * classic protocol.
+     *
+     * @param memberProtocolType  the member protocol type.
+     * @param memberProtocols     the set of protocol names.
+     *
+     * @return a boolean based on the condition mentioned above.
+     */
+    public boolean supportsClassicProtocols(String memberProtocolType, 
Set<String> memberProtocols) {
+        if (isEmpty()) {
+            return !memberProtocolType.isEmpty() && !memberProtocols.isEmpty();
+        } else {
+            return ConsumerProtocol.PROTOCOL_TYPE.equals(memberProtocolType) &&
+                memberProtocols.stream()
+                    .anyMatch(name -> 
classicProtocolMembersSupportedProtocols.getOrDefault(name, 0) == 
numClassicProtocolMembers());
+        }
+    }
+
+    /**
+     * @return The boolean indicating whether all the members use the classic 
protocol.
+     */
+    public boolean allMembersUseClassicProtocol() {
+        return numClassicProtocolMembers() == members().size();
+    }
+
+    /**
+     * @param metadata      The metadata to deserialize.
+     * @param log           The log to use.
+     * @param reason        The reason for deserializing the assignment.
+     * @return  The deserialized assignment.
+     */
+    public static ConsumerPartitionAssignor.Assignment 
deserializeAssignment(byte[] metadata, Logger log, String reason) {
+        try {
+            return 
ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(metadata));
+        } catch (SchemaException e) {
+            log.warn("Cannot parse the Consumer Protocol {} when deserializing 
the assignment for {}.",
+                ConsumerProtocol.PROTOCOL_TYPE, reason);
+            throw new GroupIdNotFoundException(String.format("Fail to 
deserialize the assignment when %s.", reason));
+        }
+    }
+
+    /**
+     * @param metadata      The metadata to deserialize.
+     * @param log           The log to use.
+     * @param reason        The reason for deserializing the subscription.
+     * @return  The deserialized subscription.
+     */
+    public static ConsumerPartitionAssignor.Subscription 
deserializeSubscription(byte[] metadata, Logger log, String reason) {
+        try {
+            return 
ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(metadata));
+        } catch (SchemaException e) {
+            log.warn("Cannot parse the Consumer Protocol {} when deserializing 
the subscription for {}.",
+                ConsumerProtocol.PROTOCOL_TYPE, reason);
+            throw new GroupIdNotFoundException(String.format("Fail to 
deserialize the subscription when %s.", reason));

Review Comment:
   ditto.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -998,4 +1090,177 @@ public ConsumerGroupDescribeResponseData.DescribedGroup 
asDescribedGroup(
         );
         return describedGroup;
     }
+
+    /**
+     * Create a new consumer group according to the given classic group.
+     *
+     * @param snapshotRegistry  The SnapshotRegistry.
+     * @param metrics           The GroupCoordinatorMetricsShard.
+     * @param classicGroup      The converted classic group.
+     * @param topicsImage       The TopicsImage for topic id and topic name 
conversion.
+     * @param log               The logger to use.
+     * @return  The created ConsumerGruop.
+     */
+    public static ConsumerGroup fromClassicGroup(
+        SnapshotRegistry snapshotRegistry,
+        GroupCoordinatorMetricsShard metrics,
+        ClassicGroup classicGroup,
+        TopicsImage topicsImage,
+        Logger log
+    ) {
+        String groupId = classicGroup.groupId();
+        ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
groupId, metrics);
+        consumerGroup.setGroupEpoch(classicGroup.generationId());
+        consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId());
+
+        classicGroup.allMembers().forEach(classicGroupMember -> {
+            ConsumerPartitionAssignor.Subscription subscription = 
deserializeSubscription(
+                classicGroupMember.metadata(classicGroup.protocolName().get()),
+                log,
+                "group upgrade"
+            );
+            Map<Uuid, Set<Integer>> partitions = 
topicPartitionMapFromList(subscription.ownedPartitions(), topicsImage);
+
+            ConsumerGroupMember newMember = new 
ConsumerGroupMember.Builder(classicGroupMember.memberId())
+                .setMemberEpoch(classicGroup.generationId())
+                .setState(MemberState.STABLE)
+                .setPreviousMemberEpoch(classicGroup.generationId())
+                
.setInstanceId(classicGroupMember.groupInstanceId().orElse(null))
+                .setRackId(subscription.rackId().orElse(null))
+                .setRebalanceTimeoutMs(classicGroupMember.rebalanceTimeoutMs())
+                .setClientId(classicGroupMember.clientId())
+                .setClientHost(classicGroupMember.clientHost())
+                .setSubscribedTopicNames(subscription.topics())
+                .setAssignedPartitions(partitions)
+                
.setSupportedClassicProtocols(classicGroupMember.supportedProtocols())
+                .build();
+            consumerGroup.updateMember(newMember);
+            consumerGroup.updateTargetAssignment(newMember.memberId(), new 
Assignment(partitions));
+        });
+
+        return consumerGroup;
+    }
+
+    /**
+     * Populate the record list with the records needed to create the given 
consumer group.
+     *
+     * @param consumerGroup     The consumer group to create.
+     * @param records           The list to which the new records are added.
+     */
+    public static void createConsumerGroupRecords(
+        ConsumerGroup consumerGroup,
+        List<Record> records
+    ) {
+        String groupId = consumerGroup.groupId;
+
+        consumerGroup.members().forEach((__, consumerGroupMember) ->
+            records.add(RecordHelpers.newMemberSubscriptionRecord(groupId, 
consumerGroupMember))
+        );
+
+        records.add(RecordHelpers.newGroupEpochRecord(groupId, 
consumerGroup.groupEpoch()));
+
+        consumerGroup.members().forEach((consumerGroupMemberId, 
consumerGroupMember) ->
+            records.add(RecordHelpers.newTargetAssignmentRecord(groupId, 
consumerGroupMemberId, 
consumerGroup.targetAssignment(consumerGroupMemberId).partitions()))
+        );
+
+        records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId, 
consumerGroup.groupEpoch()));
+
+        consumerGroup.members().forEach((__, consumerGroupMember) ->
+            records.add(RecordHelpers.newCurrentAssignmentRecord(groupId, 
consumerGroupMember))
+        );
+    }
+
+    /**
+     * Converts the list of TopicPartition to a map of topic id and partition 
set.
+     */
+    private static Map<Uuid, Set<Integer>> topicPartitionMapFromList(
+        List<TopicPartition> partitions,
+        TopicsImage topicsImage
+    ) {
+        Map<Uuid, Set<Integer>> topicPartitionMap = new HashMap<>();
+        partitions.forEach(topicPartition -> {
+            TopicImage topicImage = 
topicsImage.getTopic(topicPartition.topic());
+            if (topicImage != null) {
+                topicPartitionMap
+                    .computeIfAbsent(topicImage.id(), __ -> new HashSet<>())
+                    .add(topicPartition.partition());
+            }
+        });
+        return topicPartitionMap;
+    }
+
+    /**
+     * Checks whether at least one of the given protocols can be supported. A
+     * protocol can be supported if it is supported by all members that use the
+     * classic protocol.
+     *
+     * @param memberProtocolType  the member protocol type.
+     * @param memberProtocols     the set of protocol names.
+     *
+     * @return a boolean based on the condition mentioned above.
+     */
+    public boolean supportsClassicProtocols(String memberProtocolType, 
Set<String> memberProtocols) {
+        if (isEmpty()) {
+            return !memberProtocolType.isEmpty() && !memberProtocols.isEmpty();
+        } else {
+            return ConsumerProtocol.PROTOCOL_TYPE.equals(memberProtocolType) &&
+                memberProtocols.stream()
+                    .anyMatch(name -> 
classicProtocolMembersSupportedProtocols.getOrDefault(name, 0) == 
numClassicProtocolMembers());
+        }
+    }
+
+    /**
+     * @return The boolean indicating whether all the members use the classic 
protocol.
+     */
+    public boolean allMembersUseClassicProtocol() {
+        return numClassicProtocolMembers() == members().size();
+    }
+
+    /**
+     * @param metadata      The metadata to deserialize.
+     * @param log           The log to use.
+     * @param reason        The reason for deserializing the assignment.
+     * @return  The deserialized assignment.
+     */
+    public static ConsumerPartitionAssignor.Assignment 
deserializeAssignment(byte[] metadata, Logger log, String reason) {
+        try {
+            return 
ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(metadata));
+        } catch (SchemaException e) {
+            log.warn("Cannot parse the Consumer Protocol {} when deserializing 
the assignment for {}.",
+                ConsumerProtocol.PROTOCOL_TYPE, reason);
+            throw new GroupIdNotFoundException(String.format("Fail to 
deserialize the assignment when %s.", reason));

Review Comment:
   I am not a fan of passing the `log` instance around like this. In this case, 
it may be better to thrown an IllegateStateException with the appropriate error 
message. Then, we could catch it in `convertToConsumerGroup`, log it there, and 
convert it to `GroupIdNotFoundException`. What do you think?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -775,6 +776,63 @@ public ClassicGroup classicGroup(
         }
     }
 
+    /**
+     * Validates the online upgrade if the Classic Group receives a 
ConsumerGroupHeartbeat request.
+     *
+     * @param classicGroup A ClassicGroup.
+     * @return the boolean indicating whether it's valid to online upgrade the 
classic group.
+     */
+    private boolean validateOnlineUpgrade(ClassicGroup classicGroup) {
+        if (!consumerGroupMigrationPolicy.isUpgradeEnabled()) {
+            log.info("Cannot upgrade classic group {} to consumer group 
because the online upgrade is disabled.",
+                classicGroup.groupId());
+            return false;
+        } else if (!classicGroup.usesConsumerGroupProtocol()) {
+            log.info("Cannot upgrade classic group {} to consumer group 
because the group does not use the consumer embedded protocol.",
+                classicGroup.groupId());
+            return false;
+        } else if (classicGroup.size() > consumerGroupMaxSize) {
+            log.info("Cannot upgrade classic group {} to consumer group 
because the group size exceeds the consumer group maximum size.",
+                classicGroup.groupId());
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * Creates a ConsumerGroup corresponding to the given classic group.
+     *
+     * @param classicGroup  The ClassicGroup to convert.
+     * @param records       The list of Records.
+     * @return  The created ConsumerGroup.
+     */
+    ConsumerGroup convertToConsumerGroup(ClassicGroup classicGroup, 
List<Record> records) {
+        // The upgrade is always triggered by a new member joining the classic 
group, which always results in
+        // updatedMember.subscribedTopicNames changing, the group epoch being 
bumped, and triggering a new rebalance.
+        // If the ClassicGroup is rebalancing, inform the awaiting consumers 
of another ongoing rebalance
+        // so that they will rejoin for the new rebalance.
+        classicGroup.completeAllJoinFutures(Errors.REBALANCE_IN_PROGRESS);
+        classicGroup.completeAllSyncFutures(Errors.REBALANCE_IN_PROGRESS);
+
+        classicGroup.createGroupTombstoneRecords(records);
+        ConsumerGroup consumerGroup = ConsumerGroup.fromClassicGroup(
+            snapshotRegistry,
+            metrics,
+            classicGroup,
+            metadataImage.topics(),
+            log
+        );
+        ConsumerGroup.createConsumerGroupRecords(consumerGroup, records);

Review Comment:
   nit: Should `createConsumerGroupRecords` be a regular method instead of a 
static method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to