dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1580243392


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1092,6 +1097,86 @@ private void 
throwIfStaticMemberIsUnknown(ConsumerGroupMember staticMember, Stri
         }
     }
 
+    /**
+     * Validates if the received classic member protocols are supported by the 
group.
+     *
+     * @param group         The ConsumerGroup.
+     * @param memberId      The joining member id.
+     * @param protocolType  The joining member protocol type.
+     * @param protocols     The joining member protocol collection.
+     */
+    private void throwIfClassicProtocolIsNotSupported(
+        ConsumerGroup group,
+        String memberId,
+        String protocolType,
+        JoinGroupRequestProtocolCollection protocols
+    ) {
+        if (!group.supportsClassicProtocols(protocolType, 
ClassicGroupMember.plainProtocolSet(protocols))) {
+            throw Errors.INCONSISTENT_GROUP_PROTOCOL.exception("Member " + 
memberId + "'s protocols are not supported.");
+        }
+    }
+
+    /**
+     * Deserialize the subscription in JoinGroupRequestProtocolCollection.
+     * All the protocols have the same subscription, so the method picks a 
random one.
+     *
+     * @param protocols The JoinGroupRequestProtocolCollection.
+     * @return The Subscription.
+     */
+    private static ConsumerPartitionAssignor.Subscription 
deserializeSubscription(
+        JoinGroupRequestProtocolCollection protocols
+    ) {
+        try {
+            return ConsumerProtocol.deserializeSubscription(
+                ByteBuffer.wrap(protocols.stream().findAny().get().metadata())
+            );
+        } catch (SchemaException e) {
+            throw new IllegalStateException("Malformed embedded consumer 
protocol.");
+        }
+    }
+
+    /**
+     * Validates the generation id and returns the owned partitions in the 
JoinGroupRequest to a consumer group.
+     *
+     * @param member        The joining member.
+     * @param subscription  The Subscription.
+     * @return The owned partitions if valid, otherwise return null.
+     */
+    private List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
validateGenerationIdAndGetOwnedPartition(
+        ConsumerGroupMember member,
+        ConsumerPartitionAssignor.Subscription subscription
+    ) {
+        List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
ownedPartitions =
+            toTopicPartitions(subscription.ownedPartitions(), 
metadataImage.topics());
+        if (subscription.generationId().isPresent() && 
subscription.generationId().get() == member.memberEpoch()) {
+            return ownedPartitions;
+        } else {
+            // If the generation id is not provided or doesn't match the 
member epoch, it's still safe to
+            // accept the ownedPartitions that is a subset of the assigned 
partition. Otherwise, set the
+            // ownedPartition to be null. When a new assignment is provided, 
the consumer will stop fetching
+            // from and revoke the partitions it does not own.
+            if (isSubset(ownedPartitions, member.assignedPartitions())) {
+                return ownedPartitions;
+            } else {
+                return null;
+            }
+        }
+    }
+
+    /**
+     * @return The ConsumerGroupHeartbeatRequestData.TopicPartitions list 
converted from the TopicPartitions list.
+     */
+    private static List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
toTopicPartitions(
+        List<TopicPartition> partitions,
+        TopicsImage topicsImage
+    ) {
+        return ConsumerGroup.topicPartitionMapFromList(partitions, 
topicsImage).entrySet().stream().map(

Review Comment:
   I thought we need to sort the partitions by topic anyways, so I don't quite 
get how to combine(?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to