lucasbru commented on code in PR #21692:
URL: https://github.com/apache/kafka/pull/21692#discussion_r2942579036
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -692,6 +699,18 @@ public CoordinatorResult<TxnOffsetCommitResponseData,
CoordinatorRecord> commitT
final TxnOffsetCommitResponseTopic topicResponse = new
TxnOffsetCommitResponseTopic().setName(topic.name());
response.topics().add(topicResponse);
+ // Resolve topic ID if it's ZERO_UUID
Review Comment:
If what is ZERO_UUID? This comment seems incorrect.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -692,6 +699,18 @@ public CoordinatorResult<TxnOffsetCommitResponseData,
CoordinatorRecord> commitT
final TxnOffsetCommitResponseTopic topicResponse = new
TxnOffsetCommitResponseTopic().setName(topic.name());
response.topics().add(topicResponse);
+ // Resolve topic ID if it's ZERO_UUID
+ final Uuid resolvedTopicId = metadataImage
+ .topicMetadata(topic.name())
+ .map(CoordinatorMetadataImage.TopicMetadata::id)
+ .orElse(Uuid.ZERO_UUID);
+
+ if (resolvedTopicId.equals(Uuid.ZERO_UUID)) {
Review Comment:
could be a single if with &&
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -934,14 +940,208 @@ public void testValidateOffsetCommit(short version) {
}
}
+ @ParameterizedTest
+ @MethodSource("offsetCommitVersionsAndTransactionalParams")
+ public void testValidateOffsetCommitWithAssignmentEpochValidation(boolean
isTransactional, short version) {
+ Uuid topicId = Uuid.randomUuid();
+
+ ConsumerGroup group = createConsumerGroup("group-foo");
+
+ group.updateMember(new ConsumerGroupMember.Builder("member-id")
+ .setMemberEpoch(10)
+ .setAssignedPartitions(mkAssignmentWithEpochs(
+ mkTopicAssignmentWithEpochs(topicId, 7, 0)))
+ .build());
+
+ // When client epoch (10) == broker epoch (10), no exception thrown.
+ if (isTransactional || version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", 10, isTransactional, version
+ );
+ assertDoesNotThrow(() -> validator.validate("foo", topicId, 0));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", 10,
isTransactional, version));
+ }
+
+ // When assignment epoch (7) <= client epoch (7) <= broker epoch (10),
no exception thrown.
+ if (isTransactional || version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", 7, isTransactional, version
+ );
+ assertDoesNotThrow(() -> validator.validate("foo", topicId, 0));
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", 7,
isTransactional, version));
+ }
+
+ // When client epoch (6) != broker epoch (10) and client epoch (6) <
assignment epoch (7),
+ // stale member epoch exception thrown from assignment epoch validator.
+ if (isTransactional || version >= 9) {
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-id", "", 6, isTransactional, version
+ );
+ StaleMemberEpochException ex =
assertThrows(StaleMemberEpochException.class, () ->
+ validator.validate("foo", topicId, 0));
+ assertEquals(
+ "Received member epoch 6 is older than assignment epoch 7 for
partition foo-0.",
+ ex.getMessage()
+ );
+ } else {
+ assertThrows(UnsupportedVersionException.class, () ->
+ group.validateOffsetCommit("member-id", "", 6,
isTransactional, version));
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("offsetCommitVersionsAndTransactionalParams")
+ public void testValidateOffsetCommitWithClassicProtocolMember(boolean
isTransactional, short version) {
Review Comment:
nit: is this where we'd test the memberEpoch > brokerEpoch case with classic
protocol members (should throw IllegalGenerationException)?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]