lucasbru commented on code in PR #21692:
URL: https://github.com/apache/kafka/pull/21692#discussion_r2910658125
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -156,10 +157,16 @@ public String toLowerCaseString() {
private final TimelineObject<Boolean> hasSubscriptionMetadataRecord;
+ /**
+ * The supplier for the metadata image, used to resolve topic names to IDs.
+ */
+ private final Supplier<CoordinatorMetadataImage> metadataImageSupplier;
Review Comment:
Not sure I like this. There are two problems with this
- we need to re-resolve each topic id for each partition, causing
performance overhead.
- we complicate the ConsumerGroup object quit a bit.
How about we already resolve topic IDs in
OffsetMetadataManager.commitOffset() before calling validator.validate()?
```
Uuid resolvedTopicId = topic.topicId();
if (resolvedTopicId.equals(Uuid.ZERO_UUID)) {
resolvedTopicId = groupMetadataManager.image()
.topicMetadata(topic.name())
.map(CoordinatorMetadataImage.TopicMetadata::id)
.orElse(Uuid.ZERO_UUID);
}
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -837,6 +861,50 @@ private void validateMemberEpoch(
}
}
+ /**
+ * Creates a validator that checks if the received member epoch is valid
for each partition's assignment epoch.
+ * A commit is rejected if the partition is not assigned to the member
+ * or if the received client-side epoch is older than the partition's
assignment epoch(KIP-1251).
Review Comment:
```suggestion
* or if the received client-side epoch is older than the partition's
assignment epoch (KIP-1251).
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -837,6 +861,50 @@ private void validateMemberEpoch(
}
}
+ /**
+ * Creates a validator that checks if the received member epoch is valid
for each partition's assignment epoch.
+ * A commit is rejected if the partition is not assigned to the member
+ * or if the received client-side epoch is older than the partition's
assignment epoch(KIP-1251).
+ *
+ * @param member The consumer group member.
+ * @param receivedMemberEpoch The member epoch from the offset commit
request.
+ * @return A validator that checks each partition's assignment epoch.
+ */
+ private CommitPartitionValidator createAssignmentEpochValidator(
+ ConsumerGroupMember member,
+ int receivedMemberEpoch
+ ) {
+ return (topicName, topicId, partitionId) -> {
+ // Resolve topic ID if it's ZERO_UUID (for older API versions
without topic ID).
+ Uuid resolvedTopicId = topicId;
+ if (topicId.equals(Uuid.ZERO_UUID) && metadataImageSupplier !=
null) {
Review Comment:
Why do we need this null check?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]