Copilot commented on code in PR #21692:
URL: https://github.com/apache/kafka/pull/21692#discussion_r2907771733
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -630,10 +631,18 @@ public CoordinatorResult<OffsetCommitResponseData,
CoordinatorRecord> commitOffs
.setPartitionIndex(partition.partitionIndex())
.setErrorCode(Errors.OFFSET_METADATA_TOO_LARGE.code()));
} else {
+ // Resolve topic ID if it's ZERO_UUID
+ Uuid resolvedTopicId = topic.topicId();
+ if (resolvedTopicId.equals(Uuid.ZERO_UUID)) {
+ resolvedTopicId = groupMetadataManager.image()
+ .topicMetadata(topic.name())
+ .map(CoordinatorMetadataImage.TopicMetadata::id)
+ .orElse(Uuid.ZERO_UUID);
+ }
// Validate commit per-partition
validator.validate(
topic.name(),
- topic.topicId(),
+ resolvedTopicId,
partition.partitionIndex()
);
Review Comment:
`CommitPartitionValidator.validate(...)` can now throw
`StaleMemberEpochException` on a per-partition basis (assignment-epoch checks).
In `commitOffset`, that exception currently bubbles out of the per-partition
loop, causing the whole OffsetCommit request to fail (and typically returning
the same error for all partitions) instead of returning per-partition error
codes and still committing valid partitions. Catch `StaleMemberEpochException`
around `validator.validate(...)`, set the partition error (e.g.,
`Errors.STALE_MEMBER_EPOCH`), and continue without appending a record for that
partition.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -630,10 +631,18 @@ public CoordinatorResult<OffsetCommitResponseData,
CoordinatorRecord> commitOffs
.setPartitionIndex(partition.partitionIndex())
.setErrorCode(Errors.OFFSET_METADATA_TOO_LARGE.code()));
} else {
+ // Resolve topic ID if it's ZERO_UUID
+ Uuid resolvedTopicId = topic.topicId();
+ if (resolvedTopicId.equals(Uuid.ZERO_UUID)) {
+ resolvedTopicId = groupMetadataManager.image()
+ .topicMetadata(topic.name())
+ .map(CoordinatorMetadataImage.TopicMetadata::id)
+ .orElse(Uuid.ZERO_UUID);
+ }
Review Comment:
Topic ID resolution for `Uuid.ZERO_UUID` is done inside the partition loop,
but the resolved value is identical for every partition in the topic. Resolve
once per topic (before iterating partitions) to avoid redundant metadata
lookups and make the control flow clearer.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -837,6 +853,41 @@ private void validateMemberEpoch(
}
}
+ /**
+ * Creates a validator that checks if the received member epoch is valid
for each partition's assignment epoch.
+ * A commit is rejected if the partition is not assigned to the member
+ * or if the received client-side epoch is older than the partition's
assignment epoch(KIP-1251).
+ *
+ * @param member The consumer group member.
+ * @param receivedMemberEpoch The member epoch from the offset commit
request.
+ * @return A validator that checks each partition's assignment epoch.
+ */
+ private CommitPartitionValidator createAssignmentEpochValidator(
+ ConsumerGroupMember member,
+ int receivedMemberEpoch
+ ) {
+ return (topicName, topicId, partitionId) -> {
+ // Search for the partition in the assigned partitions, then in
partitions pending revocation.
+ Integer assignmentEpoch = member.assignmentEpoch(topicId,
partitionId);
+ if (assignmentEpoch == null) {
+ assignmentEpoch = member.pendingRevocationEpoch(topicId,
partitionId);
+ }
+
+ if (assignmentEpoch == null) {
+ throw new StaleMemberEpochException(String.format(
+ "Partition %s-%d is not assigned or pending revocation for
member.",
+ topicName, partitionId));
+ }
+
+ if (receivedMemberEpoch < assignmentEpoch) {
+ throw new StaleMemberEpochException(
+ String.format("The received member epoch %d is older than
the assignment epoch %d for partition %s-%d.",
Review Comment:
`createAssignmentEpochValidator` relies on a non-`ZERO_UUID` `topicId` to
find assignment epochs (it indexes member assignments by topic ID). However,
`OffsetMetadataManager.commitTransactionalOffset(...)` currently invokes
`validator.validate(...)` with `Uuid.ZERO_UUID` for TxnOffsetCommit requests,
which will cause assignment-epoch validation to fail as "not assigned" whenever
the member epoch is older than the broker epoch. To keep behavior consistent
with the non-transactional OffsetCommit path, resolve the topic ID from
metadata (topic name -> id) for TxnOffsetCommit as well, and pass the resolved
ID into the validator.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -630,10 +631,18 @@ public CoordinatorResult<OffsetCommitResponseData,
CoordinatorRecord> commitOffs
.setPartitionIndex(partition.partitionIndex())
.setErrorCode(Errors.OFFSET_METADATA_TOO_LARGE.code()));
} else {
+ // Resolve topic ID if it's ZERO_UUID
+ Uuid resolvedTopicId = topic.topicId();
+ if (resolvedTopicId.equals(Uuid.ZERO_UUID)) {
+ resolvedTopicId = groupMetadataManager.image()
+ .topicMetadata(topic.name())
+ .map(CoordinatorMetadataImage.TopicMetadata::id)
+ .orElse(Uuid.ZERO_UUID);
+ }
// Validate commit per-partition
validator.validate(
topic.name(),
- topic.topicId(),
+ resolvedTopicId,
partition.partitionIndex()
);
Review Comment:
`resolvedTopicId` is used for validation, but below this block the code
still logs, responds, and persists offsets using `topic.topicId()` from the
request. When the request uses `ZERO_UUID`, this will keep writing legacy
records with `ZERO_UUID` even though the coordinator can resolve the real topic
ID, weakening topic-id mismatch detection in offset fetch/delete logic.
Consider using `resolvedTopicId` consistently for the stored
`OffsetAndMetadata` (and potentially the response/log fields) once it has been
resolved.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]