Copilot commented on code in PR #21692:
URL: https://github.com/apache/kafka/pull/21692#discussion_r2907771733


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -630,10 +631,18 @@ public CoordinatorResult<OffsetCommitResponseData, 
CoordinatorRecord> commitOffs
                         .setPartitionIndex(partition.partitionIndex())
                         
.setErrorCode(Errors.OFFSET_METADATA_TOO_LARGE.code()));
                 } else {
+                    // Resolve topic ID if it's ZERO_UUID
+                    Uuid resolvedTopicId = topic.topicId();
+                    if (resolvedTopicId.equals(Uuid.ZERO_UUID)) {
+                        resolvedTopicId = groupMetadataManager.image()
+                            .topicMetadata(topic.name())
+                            .map(CoordinatorMetadataImage.TopicMetadata::id)
+                            .orElse(Uuid.ZERO_UUID);
+                    }
                     // Validate commit per-partition
                     validator.validate(
                         topic.name(),
-                        topic.topicId(),
+                        resolvedTopicId,
                         partition.partitionIndex()
                     );

Review Comment:
   `CommitPartitionValidator.validate(...)` can now throw 
`StaleMemberEpochException` on a per-partition basis (assignment-epoch checks). 
In `commitOffset`, that exception currently bubbles out of the per-partition 
loop, causing the whole OffsetCommit request to fail (and typically returning 
the same error for all partitions) instead of returning per-partition error 
codes and still committing valid partitions. Catch `StaleMemberEpochException` 
around `validator.validate(...)`, set the partition error (e.g., 
`Errors.STALE_MEMBER_EPOCH`), and continue without appending a record for that 
partition.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -630,10 +631,18 @@ public CoordinatorResult<OffsetCommitResponseData, 
CoordinatorRecord> commitOffs
                         .setPartitionIndex(partition.partitionIndex())
                         
.setErrorCode(Errors.OFFSET_METADATA_TOO_LARGE.code()));
                 } else {
+                    // Resolve topic ID if it's ZERO_UUID
+                    Uuid resolvedTopicId = topic.topicId();
+                    if (resolvedTopicId.equals(Uuid.ZERO_UUID)) {
+                        resolvedTopicId = groupMetadataManager.image()
+                            .topicMetadata(topic.name())
+                            .map(CoordinatorMetadataImage.TopicMetadata::id)
+                            .orElse(Uuid.ZERO_UUID);
+                    }

Review Comment:
   Topic ID resolution for `Uuid.ZERO_UUID` is done inside the partition loop, 
but the resolved value is identical for every partition in the topic. Resolve 
once per topic (before iterating partitions) to avoid redundant metadata 
lookups and make the control flow clearer.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -837,6 +853,41 @@ private void validateMemberEpoch(
         }
     }
 
+    /**
+     * Creates a validator that checks if the received member epoch is valid 
for each partition's assignment epoch.
+     * A commit is rejected if the partition is not assigned to the member
+     * or if the received client-side epoch is older than the partition's 
assignment epoch(KIP-1251).
+     *
+     * @param member              The consumer group member.
+     * @param receivedMemberEpoch The member epoch from the offset commit 
request.
+     * @return A validator that checks each partition's assignment epoch.
+     */
+    private CommitPartitionValidator createAssignmentEpochValidator(
+        ConsumerGroupMember member,
+        int receivedMemberEpoch
+    ) {
+        return (topicName, topicId, partitionId) -> {
+            // Search for the partition in the assigned partitions, then in 
partitions pending revocation.
+            Integer assignmentEpoch = member.assignmentEpoch(topicId, 
partitionId);
+            if (assignmentEpoch == null) {
+                assignmentEpoch = member.pendingRevocationEpoch(topicId, 
partitionId);
+            }
+
+            if (assignmentEpoch == null) {
+                throw new StaleMemberEpochException(String.format(
+                    "Partition %s-%d is not assigned or pending revocation for 
member.",
+                    topicName, partitionId));
+            }
+
+            if (receivedMemberEpoch < assignmentEpoch) {
+                throw new StaleMemberEpochException(
+                    String.format("The received member epoch %d is older than 
the assignment epoch %d for partition %s-%d.",

Review Comment:
   `createAssignmentEpochValidator` relies on a non-`ZERO_UUID` `topicId` to 
find assignment epochs (it indexes member assignments by topic ID). However, 
`OffsetMetadataManager.commitTransactionalOffset(...)` currently invokes 
`validator.validate(...)` with `Uuid.ZERO_UUID` for TxnOffsetCommit requests, 
which will cause assignment-epoch validation to fail as "not assigned" whenever 
the member epoch is older than the broker epoch. To keep behavior consistent 
with the non-transactional OffsetCommit path, resolve the topic ID from 
metadata (topic name -> id) for TxnOffsetCommit as well, and pass the resolved 
ID into the validator.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -630,10 +631,18 @@ public CoordinatorResult<OffsetCommitResponseData, 
CoordinatorRecord> commitOffs
                         .setPartitionIndex(partition.partitionIndex())
                         
.setErrorCode(Errors.OFFSET_METADATA_TOO_LARGE.code()));
                 } else {
+                    // Resolve topic ID if it's ZERO_UUID
+                    Uuid resolvedTopicId = topic.topicId();
+                    if (resolvedTopicId.equals(Uuid.ZERO_UUID)) {
+                        resolvedTopicId = groupMetadataManager.image()
+                            .topicMetadata(topic.name())
+                            .map(CoordinatorMetadataImage.TopicMetadata::id)
+                            .orElse(Uuid.ZERO_UUID);
+                    }
                     // Validate commit per-partition
                     validator.validate(
                         topic.name(),
-                        topic.topicId(),
+                        resolvedTopicId,
                         partition.partitionIndex()
                     );
 

Review Comment:
   `resolvedTopicId` is used for validation, but below this block the code 
still logs, responds, and persists offsets using `topic.topicId()` from the 
request. When the request uses `ZERO_UUID`, this will keep writing legacy 
records with `ZERO_UUID` even though the coordinator can resolve the real topic 
ID, weakening topic-id mismatch detection in offset fetch/delete logic. 
Consider using `resolvedTopicId` consistently for the stored 
`OffsetAndMetadata` (and potentially the response/log fields) once it has been 
resolved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to