squah-confluent commented on code in PR #21508:
URL: https://github.com/apache/kafka/pull/21508#discussion_r2829447490
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -837,6 +851,46 @@ private void validateMemberEpoch(
}
}
+ /**
+ * Creates a validator that checks per-partition assignment epochs.
+ * A commit is rejected if the partition is not assigned to the member
+ * or if the received client-side epoch is older than the partition's
assignment epoch(KIP-1251).
+ *
+ * @param member The consumer group member.
+ * @param receivedMemberEpoch The member epoch from the offset commit
request.
+ * @return A validator that checks each partition's assignment epoch.
+ */
+ private CommitPartitionValidator createAssignmentEpochValidator(
+ ConsumerGroupMember member,
+ int receivedMemberEpoch
+ ) {
+ return (topicName, topicId, partitionId) -> {
+ // Check if the partition is in the assigned partitions.
+ // If not found in assigned, check partitions pending revocation.
+ Integer assignmentEpoch = member.getAssignmentEpoch(topicId,
partitionId);
+ if (assignmentEpoch == null) {
+ assignmentEpoch = member.getPendingRevocationEpoch(topicId,
partitionId);
+ }
+
+ // If the partition is not assigned to this member, reject.
+ if (assignmentEpoch == null) {
+ throw new StaleMemberEpochException(
+ String.format("Partition %s-%d is not assigned to member
%s.",
Review Comment:
I'm in favor of aligning with the streams implementation as much as
possible. Streams has the same pattern and uses the message `Task %s-%d is not
assigned or pending revocation for member.`. Shall we update the streams
message too (in a separate PR maybe)?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]