dongnuo123 commented on code in PR #15662:
URL: https://github.com/apache/kafka/pull/15662#discussion_r1567506439


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -998,4 +1088,133 @@ public ConsumerGroupDescribeResponseData.DescribedGroup 
asDescribedGroup(
         );
         return describedGroup;
     }
+
+    /**
+     * Create a new consumer group according to the given classic group.
+     *
+     * @param snapshotRegistry  The SnapshotRegistry.
+     * @param metrics           The GroupCoordinatorMetricsShard.
+     * @param classicGroup      The converted classic group.
+     * @param topicsImage       The TopicsImage for topic id and topic name 
conversion.
+     * @param log               The logger to use.
+     * @return  The created ConsumerGruop.
+     */
+    public static ConsumerGroup fromClassicGroup(
+        SnapshotRegistry snapshotRegistry,
+        GroupCoordinatorMetricsShard metrics,
+        ClassicGroup classicGroup,
+        TopicsImage topicsImage,
+        Logger log
+    ) {
+        String groupId = classicGroup.groupId();
+        ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
groupId, metrics);
+        consumerGroup.setGroupEpoch(classicGroup.generationId());
+        consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId());
+
+        classicGroup.allMembers().forEach(classicGroupMember -> {
+            // The new ConsumerGroupMember's assignedPartitions and 
targetAssignmentSet need to be the same
+            // in order to keep it stable. Thus, both of them are set to be 
classicGroupMember.assignment().
+            // If the consumer's real assigned partitions haven't been updated 
according to
+            // classicGroupMember.assignment(), it will retry the request.
+            ConsumerPartitionAssignor.Assignment assignment = 
ConsumerProtocol.deserializeAssignment(
+                ByteBuffer.wrap(classicGroupMember.assignment()));
+            Map<Uuid, Set<Integer>> partitions = 
topicPartitionMapFromList(assignment.partitions(), topicsImage);
+
+            ConsumerPartitionAssignor.Subscription subscription = 
ConsumerProtocol.deserializeSubscription(
+                
ByteBuffer.wrap(classicGroupMember.metadata(classicGroup.protocolName().get())));
+
+            ConsumerGroupMember newMember = new 
ConsumerGroupMember.Builder(classicGroupMember.memberId())
+                .setMemberEpoch(classicGroup.generationId())
+                .setState(MemberState.STABLE)
+                .setPreviousMemberEpoch(classicGroup.generationId())
+                
.setInstanceId(classicGroupMember.groupInstanceId().orElse(null))
+                .setRackId(subscription.rackId().orElse(null))
+                .setRebalanceTimeoutMs(classicGroupMember.rebalanceTimeoutMs())
+                .setClientId(classicGroupMember.clientId())
+                .setClientHost(classicGroupMember.clientHost())
+                .setSubscribedTopicNames(subscription.topics())
+                .setAssignedPartitions(partitions)
+                
.setSupportedClassicProtocols(classicGroupMember.supportedProtocols())
+                .build();
+            consumerGroup.updateTargetAssignment(newMember.memberId(), new 
Assignment(partitions));
+            consumerGroup.updateMember(newMember);
+        });
+
+        return consumerGroup;
+    }
+
+    /**
+     * Populate the record list with the records needed to create the given 
consumer group.
+     *
+     * @param records           The list to which the new records are added.
+     */
+    public void createConsumerGroupRecords(
+        List<Record> records
+    ) {
+        members().forEach((__, consumerGroupMember) ->
+            records.add(RecordHelpers.newMemberSubscriptionRecord(groupId(), 
consumerGroupMember))
+        );
+
+        records.add(RecordHelpers.newGroupEpochRecord(groupId(), 
groupEpoch()));
+
+        members().forEach((consumerGroupMemberId, consumerGroupMember) ->
+            records.add(RecordHelpers.newTargetAssignmentRecord(
+                groupId(),
+                consumerGroupMemberId,
+                targetAssignment(consumerGroupMemberId).partitions()
+            ))
+        );
+
+        records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(), 
groupEpoch()));
+
+        members().forEach((__, consumerGroupMember) ->
+            records.add(RecordHelpers.newCurrentAssignmentRecord(groupId(), 
consumerGroupMember))
+        );
+    }
+
+    /**
+     * Converts the list of TopicPartition to a map of topic id and partition 
set.
+     */
+    private static Map<Uuid, Set<Integer>> topicPartitionMapFromList(
+        List<TopicPartition> partitions,
+        TopicsImage topicsImage
+    ) {
+        Map<Uuid, Set<Integer>> topicPartitionMap = new HashMap<>();
+        partitions.forEach(topicPartition -> {
+            TopicImage topicImage = 
topicsImage.getTopic(topicPartition.topic());
+            if (topicImage != null) {
+                topicPartitionMap
+                    .computeIfAbsent(topicImage.id(), __ -> new HashSet<>())
+                    .add(topicPartition.partition());
+            }
+        });
+        return topicPartitionMap;
+    }
+
+    /**
+     * Checks whether at least one of the given protocols can be supported. A
+     * protocol can be supported if it is supported by all members that use the
+     * classic protocol.
+     *
+     * @param memberProtocolType  the member protocol type.
+     * @param memberProtocols     the set of protocol names.
+     *
+     * @return a boolean based on the condition mentioned above.
+     */
+    public boolean supportsClassicProtocols(String memberProtocolType, 
Set<String> memberProtocols) {
+        if (isEmpty()) {
+            return !memberProtocolType.isEmpty() && !memberProtocols.isEmpty();
+        } else {
+            return ConsumerProtocol.PROTOCOL_TYPE.equals(memberProtocolType) &&
+                memberProtocols.stream()
+                    .anyMatch(name -> 
classicProtocolMembersSupportedProtocols.getOrDefault(name, 0) == 
numClassicProtocolMembers());

Review Comment:
   Let's still separate 
`ConsumerProtocol.PROTOCOL_TYPE.equals(memberProtocolType)` and 
`memberProtocols.stream().anyMatch` to two lines. Just changed it to
   ```
   return ConsumerProtocol.PROTOCOL_TYPE.equals(memberProtocolType) &&
       memberProtocols.stream().anyMatch(
           name -> classicProtocolMembersSupportedProtocols.getOrDefault(name, 
0) == numClassicProtocolMembers()
       );
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to