lucasbru commented on code in PR #21692:
URL: https://github.com/apache/kafka/pull/21692#discussion_r2942579036


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -692,6 +699,18 @@ public CoordinatorResult<TxnOffsetCommitResponseData, 
CoordinatorRecord> commitT
             final TxnOffsetCommitResponseTopic topicResponse = new 
TxnOffsetCommitResponseTopic().setName(topic.name());
             response.topics().add(topicResponse);
 
+            // Resolve topic ID if it's ZERO_UUID

Review Comment:
   If what is ZERO_UUID? This comment seems incorrect.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -692,6 +699,18 @@ public CoordinatorResult<TxnOffsetCommitResponseData, 
CoordinatorRecord> commitT
             final TxnOffsetCommitResponseTopic topicResponse = new 
TxnOffsetCommitResponseTopic().setName(topic.name());
             response.topics().add(topicResponse);
 
+            // Resolve topic ID if it's ZERO_UUID
+            final Uuid resolvedTopicId = metadataImage
+                .topicMetadata(topic.name())
+                .map(CoordinatorMetadataImage.TopicMetadata::id)
+                .orElse(Uuid.ZERO_UUID);
+
+            if (resolvedTopicId.equals(Uuid.ZERO_UUID)) {

Review Comment:
   could be a single if with &&



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -934,14 +940,208 @@ public void testValidateOffsetCommit(short version) {
         }
     }
 
+    @ParameterizedTest
+    @MethodSource("offsetCommitVersionsAndTransactionalParams")
+    public void testValidateOffsetCommitWithAssignmentEpochValidation(boolean 
isTransactional, short version) {
+        Uuid topicId = Uuid.randomUuid();
+
+        ConsumerGroup group = createConsumerGroup("group-foo");
+
+        group.updateMember(new ConsumerGroupMember.Builder("member-id")
+            .setMemberEpoch(10)
+            .setAssignedPartitions(mkAssignmentWithEpochs(
+                mkTopicAssignmentWithEpochs(topicId, 7, 0)))
+            .build());
+
+        // When client epoch (10) == broker epoch (10), no exception thrown.
+        if (isTransactional || version >= 9) {
+            CommitPartitionValidator validator = group.validateOffsetCommit(
+                "member-id", "", 10, isTransactional, version
+            );
+            assertDoesNotThrow(() -> validator.validate("foo", topicId, 0));
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", 10, 
isTransactional, version));
+        }
+
+        // When assignment epoch (7) <= client epoch (7) <= broker epoch (10), 
no exception thrown.
+        if (isTransactional || version >= 9) {
+            CommitPartitionValidator validator = group.validateOffsetCommit(
+                "member-id", "", 7, isTransactional, version
+            );
+            assertDoesNotThrow(() -> validator.validate("foo", topicId, 0));
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", 7, 
isTransactional, version));
+        }
+
+        // When client epoch (6) != broker epoch (10) and client epoch (6) < 
assignment epoch (7),
+        // stale member epoch exception thrown from assignment epoch validator.
+        if (isTransactional || version >= 9) {
+            CommitPartitionValidator validator = group.validateOffsetCommit(
+                "member-id", "", 6, isTransactional, version
+            );
+            StaleMemberEpochException ex = 
assertThrows(StaleMemberEpochException.class, () ->
+                validator.validate("foo", topicId, 0));
+            assertEquals(
+                "Received member epoch 6 is older than assignment epoch 7 for 
partition foo-0.",
+                ex.getMessage()
+            );
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", 6, 
isTransactional, version));
+        }
+    }
+
+    @ParameterizedTest
+    @MethodSource("offsetCommitVersionsAndTransactionalParams")
+    public void testValidateOffsetCommitWithClassicProtocolMember(boolean 
isTransactional, short version) {

Review Comment:
   nit: is this where we'd test the memberEpoch > brokerEpoch case with classic 
protocol members (should throw IllegalGenerationException)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to