dajac commented on code in PR #21692:
URL: https://github.com/apache/kafka/pull/21692#discussion_r2940086529


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -285,9 +285,11 @@ class KafkaApis(val requestChannel: RequestChannel,
 
       if (useTopicIds) {
         offsetCommitRequest.data.topics.forEach { topic =>
-          if (topic.topicId != Uuid.ZERO_UUID) {
-            metadataCache.getTopicName(topic.topicId).ifPresent(name => 
topic.setName(name))
-          }
+          metadataCache.getTopicName(topic.topicId).ifPresent(name => 
topic.setName(name))
+        }
+      } else {
+        offsetCommitRequest.data.topics.forEach { topic =>
+          topic.setTopicId(metadataCache.getTopicId(topic.name))

Review Comment:
   At L316, we check whether the topic exists. Hence, if a topic is deleted, 
the check will return `UNKNOWN_TOPIC_OR_PARTITION` for the topic.
   
   I wonder whether we could have race condition between this and that check 
though. We may want to combine both. For instance, we could replace 
`metadataCache.contains` by `metadataCache.getTopicId` and check if the return 
topic id is non-zero. Thoughts?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -837,6 +855,41 @@ private void validateMemberEpoch(
         }
     }
 
+    /**
+     * Creates a validator that checks if the received member epoch is valid 
for each partition's assignment epoch.
+     * A commit is rejected if the partition is not assigned to the member
+     * or if the received client-side epoch is older than the partition's 
assignment epoch (KIP-1251).
+     *
+     * @param member              The consumer whose assignments are being 
validated.
+     * @param receivedMemberEpoch The received member epoch.
+     * @return A validator for per-partition validation.
+     */
+    private CommitPartitionValidator createAssignmentEpochValidator(
+        ConsumerGroupMember member,
+        int receivedMemberEpoch
+    ) {
+        return (topicName, topicId, partitionId) -> {
+            // Search for the partition in the assigned partitions, then in 
partitions pending revocation.
+            Integer assignmentEpoch = member.assignmentEpoch(topicId, 
partitionId);
+            if (assignmentEpoch == null) {
+                assignmentEpoch = member.pendingRevocationEpoch(topicId, 
partitionId);
+            }
+
+            if (assignmentEpoch == null) {
+                throw new StaleMemberEpochException(String.format(
+                    "Partition %s-%d is not assigned or pending revocation for 
member.",
+                    topicName, partitionId));

Review Comment:
   nit: Could you align the style to the style you used for the following one?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -624,6 +631,12 @@ public CoordinatorResult<OffsetCommitResponseData, 
CoordinatorRecord> commitOffs
                 .setName(topic.name());
             response.topics().add(topicResponse);
 
+            if (topic.topicId().equals(Uuid.ZERO_UUID)) {
+                if (validator != CommitPartitionValidator.NO_OP) {
+                    throw Errors.STALE_MEMBER_EPOCH.exception();
+                }
+            }

Review Comment:
   This should never happen here so not sure whether we really want to be 
defensive.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -837,6 +855,41 @@ private void validateMemberEpoch(
         }
     }
 
+    /**
+     * Creates a validator that checks if the received member epoch is valid 
for each partition's assignment epoch.
+     * A commit is rejected if the partition is not assigned to the member
+     * or if the received client-side epoch is older than the partition's 
assignment epoch (KIP-1251).
+     *
+     * @param member              The consumer whose assignments are being 
validated.
+     * @param receivedMemberEpoch The received member epoch.
+     * @return A validator for per-partition validation.
+     */
+    private CommitPartitionValidator createAssignmentEpochValidator(
+        ConsumerGroupMember member,
+        int receivedMemberEpoch
+    ) {
+        return (topicName, topicId, partitionId) -> {
+            // Search for the partition in the assigned partitions, then in 
partitions pending revocation.

Review Comment:
   Should we explicitly check topicId == ZERO here and return the generic 
error? By generic, I mean the one returned by validateOffsetCommit when the 
epoch is stale. It may be better than returning that the partition is not 
assigned.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -670,11 +670,29 @@ public CommitPartitionValidator validateOffsetCommit(
         // the member should be using the OffsetCommit API version >= 9.
         if (!isTransactional && !member.useClassicProtocol() && apiVersion < 
9) {
             throw new UnsupportedVersionException("OffsetCommit version 9 or 
above must be used " +
-                "by members using the modern group protocol");
+                "by members using the consumer group protocol");
         }
 
-        validateMemberEpoch(memberEpoch, member.memberEpoch(), 
member.useClassicProtocol());
-        return CommitPartitionValidator.NO_OP;
+        // For members using the classic protocol, the epoch must match the 
last epoch sent
+        // in a heartbeat.
+        if (member.useClassicProtocol()) {
+            validateMemberEpoch(memberEpoch, member.memberEpoch(), true);
+            return CommitPartitionValidator.NO_OP;
+        }

Review Comment:
   @lucasbru Can't we relax the epoch validation for classic members too? I am 
not sure about it though.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -837,6 +855,41 @@ private void validateMemberEpoch(
         }
     }
 
+    /**
+     * Creates a validator that checks if the received member epoch is valid 
for each partition's assignment epoch.
+     * A commit is rejected if the partition is not assigned to the member
+     * or if the received client-side epoch is older than the partition's 
assignment epoch (KIP-1251).
+     *
+     * @param member              The consumer whose assignments are being 
validated.
+     * @param receivedMemberEpoch The received member epoch.
+     * @return A validator for per-partition validation.
+     */
+    private CommitPartitionValidator createAssignmentEpochValidator(
+        ConsumerGroupMember member,
+        int receivedMemberEpoch
+    ) {
+        return (topicName, topicId, partitionId) -> {
+            // Search for the partition in the assigned partitions, then in 
partitions pending revocation.

Review Comment:
   I see that we have an explicit check about it on the transactional offset 
commit path so it does not seem necessary to add it here too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to