dongnuo123 commented on code in PR #15662: URL: https://github.com/apache/kafka/pull/15662#discussion_r1567506439
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ########## @@ -998,4 +1088,133 @@ public ConsumerGroupDescribeResponseData.DescribedGroup asDescribedGroup( ); return describedGroup; } + + /** + * Create a new consumer group according to the given classic group. + * + * @param snapshotRegistry The SnapshotRegistry. + * @param metrics The GroupCoordinatorMetricsShard. + * @param classicGroup The converted classic group. + * @param topicsImage The TopicsImage for topic id and topic name conversion. + * @param log The logger to use. + * @return The created ConsumerGruop. + */ + public static ConsumerGroup fromClassicGroup( + SnapshotRegistry snapshotRegistry, + GroupCoordinatorMetricsShard metrics, + ClassicGroup classicGroup, + TopicsImage topicsImage, + Logger log + ) { + String groupId = classicGroup.groupId(); + ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId, metrics); + consumerGroup.setGroupEpoch(classicGroup.generationId()); + consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId()); + + classicGroup.allMembers().forEach(classicGroupMember -> { + // The new ConsumerGroupMember's assignedPartitions and targetAssignmentSet need to be the same + // in order to keep it stable. Thus, both of them are set to be classicGroupMember.assignment(). + // If the consumer's real assigned partitions haven't been updated according to + // classicGroupMember.assignment(), it will retry the request. + ConsumerPartitionAssignor.Assignment assignment = ConsumerProtocol.deserializeAssignment( + ByteBuffer.wrap(classicGroupMember.assignment())); + Map<Uuid, Set<Integer>> partitions = topicPartitionMapFromList(assignment.partitions(), topicsImage); + + ConsumerPartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription( + ByteBuffer.wrap(classicGroupMember.metadata(classicGroup.protocolName().get()))); + + ConsumerGroupMember newMember = new ConsumerGroupMember.Builder(classicGroupMember.memberId()) + .setMemberEpoch(classicGroup.generationId()) + .setState(MemberState.STABLE) + .setPreviousMemberEpoch(classicGroup.generationId()) + .setInstanceId(classicGroupMember.groupInstanceId().orElse(null)) + .setRackId(subscription.rackId().orElse(null)) + .setRebalanceTimeoutMs(classicGroupMember.rebalanceTimeoutMs()) + .setClientId(classicGroupMember.clientId()) + .setClientHost(classicGroupMember.clientHost()) + .setSubscribedTopicNames(subscription.topics()) + .setAssignedPartitions(partitions) + .setSupportedClassicProtocols(classicGroupMember.supportedProtocols()) + .build(); + consumerGroup.updateTargetAssignment(newMember.memberId(), new Assignment(partitions)); + consumerGroup.updateMember(newMember); + }); + + return consumerGroup; + } + + /** + * Populate the record list with the records needed to create the given consumer group. + * + * @param records The list to which the new records are added. + */ + public void createConsumerGroupRecords( + List<Record> records + ) { + members().forEach((__, consumerGroupMember) -> + records.add(RecordHelpers.newMemberSubscriptionRecord(groupId(), consumerGroupMember)) + ); + + records.add(RecordHelpers.newGroupEpochRecord(groupId(), groupEpoch())); + + members().forEach((consumerGroupMemberId, consumerGroupMember) -> + records.add(RecordHelpers.newTargetAssignmentRecord( + groupId(), + consumerGroupMemberId, + targetAssignment(consumerGroupMemberId).partitions() + )) + ); + + records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(), groupEpoch())); + + members().forEach((__, consumerGroupMember) -> + records.add(RecordHelpers.newCurrentAssignmentRecord(groupId(), consumerGroupMember)) + ); + } + + /** + * Converts the list of TopicPartition to a map of topic id and partition set. + */ + private static Map<Uuid, Set<Integer>> topicPartitionMapFromList( + List<TopicPartition> partitions, + TopicsImage topicsImage + ) { + Map<Uuid, Set<Integer>> topicPartitionMap = new HashMap<>(); + partitions.forEach(topicPartition -> { + TopicImage topicImage = topicsImage.getTopic(topicPartition.topic()); + if (topicImage != null) { + topicPartitionMap + .computeIfAbsent(topicImage.id(), __ -> new HashSet<>()) + .add(topicPartition.partition()); + } + }); + return topicPartitionMap; + } + + /** + * Checks whether at least one of the given protocols can be supported. A + * protocol can be supported if it is supported by all members that use the + * classic protocol. + * + * @param memberProtocolType the member protocol type. + * @param memberProtocols the set of protocol names. + * + * @return a boolean based on the condition mentioned above. + */ + public boolean supportsClassicProtocols(String memberProtocolType, Set<String> memberProtocols) { + if (isEmpty()) { + return !memberProtocolType.isEmpty() && !memberProtocols.isEmpty(); + } else { + return ConsumerProtocol.PROTOCOL_TYPE.equals(memberProtocolType) && + memberProtocols.stream() + .anyMatch(name -> classicProtocolMembersSupportedProtocols.getOrDefault(name, 0) == numClassicProtocolMembers()); Review Comment: Let's still separate `ConsumerProtocol.PROTOCOL_TYPE.equals(memberProtocolType)` and `memberProtocols.stream().anyMatch` to two lines. Just changed it to ``` return ConsumerProtocol.PROTOCOL_TYPE.equals(memberProtocolType) && memberProtocols.stream().anyMatch( name -> classicProtocolMembersSupportedProtocols.getOrDefault(name, 0) == numClassicProtocolMembers() ); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org