dongnuo123 commented on code in PR #15662:
URL: https://github.com/apache/kafka/pull/15662#discussion_r1567506439
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -998,4 +1088,133 @@ public ConsumerGroupDescribeResponseData.DescribedGroup
asDescribedGroup(
);
return describedGroup;
}
+
+ /**
+ * Create a new consumer group according to the given classic group.
+ *
+ * @param snapshotRegistry The SnapshotRegistry.
+ * @param metrics The GroupCoordinatorMetricsShard.
+ * @param classicGroup The converted classic group.
+ * @param topicsImage The TopicsImage for topic id and topic name
conversion.
+ * @param log The logger to use.
+ * @return The created ConsumerGruop.
+ */
+ public static ConsumerGroup fromClassicGroup(
+ SnapshotRegistry snapshotRegistry,
+ GroupCoordinatorMetricsShard metrics,
+ ClassicGroup classicGroup,
+ TopicsImage topicsImage,
+ Logger log
+ ) {
+ String groupId = classicGroup.groupId();
+ ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry,
groupId, metrics);
+ consumerGroup.setGroupEpoch(classicGroup.generationId());
+ consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId());
+
+ classicGroup.allMembers().forEach(classicGroupMember -> {
+ // The new ConsumerGroupMember's assignedPartitions and
targetAssignmentSet need to be the same
+ // in order to keep it stable. Thus, both of them are set to be
classicGroupMember.assignment().
+ // If the consumer's real assigned partitions haven't been updated
according to
+ // classicGroupMember.assignment(), it will retry the request.
+ ConsumerPartitionAssignor.Assignment assignment =
ConsumerProtocol.deserializeAssignment(
+ ByteBuffer.wrap(classicGroupMember.assignment()));
+ Map<Uuid, Set<Integer>> partitions =
topicPartitionMapFromList(assignment.partitions(), topicsImage);
+
+ ConsumerPartitionAssignor.Subscription subscription =
ConsumerProtocol.deserializeSubscription(
+
ByteBuffer.wrap(classicGroupMember.metadata(classicGroup.protocolName().get())));
+
+ ConsumerGroupMember newMember = new
ConsumerGroupMember.Builder(classicGroupMember.memberId())
+ .setMemberEpoch(classicGroup.generationId())
+ .setState(MemberState.STABLE)
+ .setPreviousMemberEpoch(classicGroup.generationId())
+
.setInstanceId(classicGroupMember.groupInstanceId().orElse(null))
+ .setRackId(subscription.rackId().orElse(null))
+ .setRebalanceTimeoutMs(classicGroupMember.rebalanceTimeoutMs())
+ .setClientId(classicGroupMember.clientId())
+ .setClientHost(classicGroupMember.clientHost())
+ .setSubscribedTopicNames(subscription.topics())
+ .setAssignedPartitions(partitions)
+
.setSupportedClassicProtocols(classicGroupMember.supportedProtocols())
+ .build();
+ consumerGroup.updateTargetAssignment(newMember.memberId(), new
Assignment(partitions));
+ consumerGroup.updateMember(newMember);
+ });
+
+ return consumerGroup;
+ }
+
+ /**
+ * Populate the record list with the records needed to create the given
consumer group.
+ *
+ * @param records The list to which the new records are added.
+ */
+ public void createConsumerGroupRecords(
+ List<Record> records
+ ) {
+ members().forEach((__, consumerGroupMember) ->
+ records.add(RecordHelpers.newMemberSubscriptionRecord(groupId(),
consumerGroupMember))
+ );
+
+ records.add(RecordHelpers.newGroupEpochRecord(groupId(),
groupEpoch()));
+
+ members().forEach((consumerGroupMemberId, consumerGroupMember) ->
+ records.add(RecordHelpers.newTargetAssignmentRecord(
+ groupId(),
+ consumerGroupMemberId,
+ targetAssignment(consumerGroupMemberId).partitions()
+ ))
+ );
+
+ records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(),
groupEpoch()));
+
+ members().forEach((__, consumerGroupMember) ->
+ records.add(RecordHelpers.newCurrentAssignmentRecord(groupId(),
consumerGroupMember))
+ );
+ }
+
+ /**
+ * Converts the list of TopicPartition to a map of topic id and partition
set.
+ */
+ private static Map<Uuid, Set<Integer>> topicPartitionMapFromList(
+ List<TopicPartition> partitions,
+ TopicsImage topicsImage
+ ) {
+ Map<Uuid, Set<Integer>> topicPartitionMap = new HashMap<>();
+ partitions.forEach(topicPartition -> {
+ TopicImage topicImage =
topicsImage.getTopic(topicPartition.topic());
+ if (topicImage != null) {
+ topicPartitionMap
+ .computeIfAbsent(topicImage.id(), __ -> new HashSet<>())
+ .add(topicPartition.partition());
+ }
+ });
+ return topicPartitionMap;
+ }
+
+ /**
+ * Checks whether at least one of the given protocols can be supported. A
+ * protocol can be supported if it is supported by all members that use the
+ * classic protocol.
+ *
+ * @param memberProtocolType the member protocol type.
+ * @param memberProtocols the set of protocol names.
+ *
+ * @return a boolean based on the condition mentioned above.
+ */
+ public boolean supportsClassicProtocols(String memberProtocolType,
Set<String> memberProtocols) {
+ if (isEmpty()) {
+ return !memberProtocolType.isEmpty() && !memberProtocols.isEmpty();
+ } else {
+ return ConsumerProtocol.PROTOCOL_TYPE.equals(memberProtocolType) &&
+ memberProtocols.stream()
+ .anyMatch(name ->
classicProtocolMembersSupportedProtocols.getOrDefault(name, 0) ==
numClassicProtocolMembers());
Review Comment:
Let's still separate
`ConsumerProtocol.PROTOCOL_TYPE.equals(memberProtocolType)` and
`memberProtocols.stream().anyMatch` to two lines. Just changed it to
```
return ConsumerProtocol.PROTOCOL_TYPE.equals(memberProtocolType) &&
memberProtocols.stream().anyMatch(
name -> classicProtocolMembersSupportedProtocols.getOrDefault(name,
0) == numClassicProtocolMembers()
);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]