squah-confluent commented on code in PR #21692:
URL: https://github.com/apache/kafka/pull/21692#discussion_r2921054377


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -624,6 +630,14 @@ public CoordinatorResult<OffsetCommitResponseData, 
CoordinatorRecord> commitOffs
                 .setName(topic.name());
             response.topics().add(topicResponse);
 
+            // Resolve topic ID if it's ZERO_UUID
+            final Uuid resolvedTopicId = topic.topicId().equals(Uuid.ZERO_UUID)
+                ? metadataImage
+                .topicMetadata(topic.name())
+                .map(CoordinatorMetadataImage.TopicMetadata::id)
+                .orElse(Uuid.ZERO_UUID)

Review Comment:
   The formatting is off here.
   ```suggestion
                       .topicMetadata(topic.name())
                       .map(CoordinatorMetadataImage.TopicMetadata::id)
                       .orElse(Uuid.ZERO_UUID)
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -1273,6 +1287,15 @@ public void replayEndTransactionMarker(
         }
     }
 
+    /**
+     * Updates the metadata image.
+     *
+     * @param newImage The new metadata image.
+     */
+    public void onMetadataUpdate(CoordinatorMetadataImage newImage) {

Review Comment:
   Could we copy `GroupMetadataManager`?
   ```suggestion
       /**
        * A new metadata image is available.
        *
        * @param delta    The delta image.
        * @param newImage The new metadata image.
        */
       public void onMetadataUpdate(CoordinatorMetadataDelta delta, 
CoordinatorMetadataImage newImage) {
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -934,6 +934,194 @@ public void testValidateOffsetCommit(short version) {
         }
     }
 
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+    public void testValidateOffsetCommitWithAssignmentEpochValidation(short 
version) {
+        Uuid topicId = Uuid.randomUuid();
+
+        for (boolean isTransactional : new boolean[] {true, false}) {
+            ConsumerGroup group = createConsumerGroup("group-foo");
+
+            group.updateMember(new ConsumerGroupMember.Builder("member-id")
+                .setMemberEpoch(10)
+                .setAssignedPartitions(mkAssignmentWithEpochs(
+                    mkTopicAssignmentWithEpochs(topicId, 7, 0)))
+                .build());
+
+            // When assignment epoch(7) <= client epoch(9) <= broker 
epoch(10), no exception thrown.
+            if (version >= 9 || isTransactional) {
+                CommitPartitionValidator validator = 
group.validateOffsetCommit(
+                    "member-id", "", 9, isTransactional, version
+                );
+                assertDoesNotThrow(() -> validator.validate("foo", topicId, 
0));
+            } else {
+                assertThrows(UnsupportedVersionException.class, () ->
+                    group.validateOffsetCommit("member-id", "", 9, 
isTransactional, version));
+            }
+
+            // When client epoch (11) > broker epoch (10) , exception thrown 
directly from validateOffsetCommit.
+            if (version >= 9 || isTransactional) {
+                StaleMemberEpochException ex = 
assertThrows(StaleMemberEpochException.class, () ->
+                    group.validateOffsetCommit("member-id", "", 11, 
isTransactional, version));
+                assertEquals("Received member epoch 11 is newer than current 
member epoch 10.",
+                    ex.getMessage()
+                );
+            } else {
+                assertThrows(UnsupportedVersionException.class, () ->
+                    group.validateOffsetCommit("member-id", "", 11, 
isTransactional, version));
+            }
+
+            // When client epoch (10) == broker epoch (10), no exception 
thrown.
+            if (version >= 9 || isTransactional) {
+                CommitPartitionValidator validator = 
group.validateOffsetCommit(
+                    "member-id", "", 7, isTransactional, version
+                );
+                assertDoesNotThrow(() -> validator.validate("foo", topicId, 
0));
+            } else {
+                assertThrows(UnsupportedVersionException.class, () ->
+                    group.validateOffsetCommit("member-id", "", 7, 
isTransactional, version));
+            }
+
+            // When client epoch (8) != broker epoch (10) and client epoch (8) 
> assignment epoch (7),
+            // stale member epoch exception thrown from validateOffsetCommit.
+            if (version >= 9 || isTransactional) {
+                CommitPartitionValidator validator = 
group.validateOffsetCommit(
+                    "member-id", "", 6, isTransactional, version
+                );
+                StaleMemberEpochException ex = 
assertThrows(StaleMemberEpochException.class, () ->
+                    validator.validate("foo", topicId, 0));
+                assertEquals(
+                    "Received member epoch 6 is older than assignment epoch 7 
for partition foo-0.",
+                    ex.getMessage()
+                );
+            } else {
+                assertThrows(UnsupportedVersionException.class, () ->
+                    group.validateOffsetCommit("member-id", "", 6, 
isTransactional, version));
+            }
+        }
+    }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+    public void testValidateOffsetCommitWithPartitionPendingRevocation(short 
version) {
+        Uuid topicId = Uuid.randomUuid();
+
+        for (boolean isTransactional : new boolean[] {true, false}) {
+            ConsumerGroup group = createConsumerGroup("group-foo");
+
+            group.updateMember(new ConsumerGroupMember.Builder("member-id")
+                .setMemberEpoch(10)
+                .setSubscribedTopicNames(List.of("foo"))
+                .setPartitionsPendingRevocation(mkAssignmentWithEpochs(
+                    mkTopicAssignmentWithEpochs(topicId, 7, 0)))
+                .build());
+
+            // When client epoch (10) == broker epoch (10), no exception 
thrown.
+            if (version >= 9 || isTransactional) {
+                CommitPartitionValidator validator = 
group.validateOffsetCommit(
+                    "member-id", "", 10, isTransactional, version
+                );
+                assertDoesNotThrow(() -> validator.validate("foo", topicId, 
0));
+            } else {
+                assertThrows(UnsupportedVersionException.class, () ->
+                    group.validateOffsetCommit("member-id", "", 10, 
isTransactional, version));
+            }
+
+            // When client epoch (11) > broker epoch (10), exception thrown 
directly from validateOffsetCommit.
+            if (version >= 9 || isTransactional) {
+                StaleMemberEpochException ex = 
assertThrows(StaleMemberEpochException.class, () ->
+                    group.validateOffsetCommit("member-id", "", 11, 
isTransactional, version));
+                assertEquals(
+                    "Received member epoch 11 is newer than current member 
epoch 10.",
+                    ex.getMessage()
+                );
+            } else {
+                assertThrows(UnsupportedVersionException.class, () ->
+                    group.validateOffsetCommit("member-id", "", 11, 
isTransactional, version));
+            }
+
+            // When partition epoch (7) <= client epoch (7)  <= broker epoch 
(10), no exception thrown.
+            if (version >= 9 || isTransactional) {
+                CommitPartitionValidator validator = 
group.validateOffsetCommit(
+                    "member-id", "", 7, isTransactional, version
+                );
+                assertDoesNotThrow(() -> validator.validate("foo", topicId, 
0));
+            } else {
+                assertThrows(UnsupportedVersionException.class, () ->
+                    group.validateOffsetCommit("member-id", "", 7, 
isTransactional, version));
+            }
+
+            // When client epoch (6) != broker epoch (10) and client epoch (6) 
< partition epoch (7),
+            // exception thrown directly from validateOffsetCommit.

Review Comment:
   This comment doesn't match the check. The exception comes from the validator.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -934,6 +934,194 @@ public void testValidateOffsetCommit(short version) {
         }
     }
 
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+    public void testValidateOffsetCommitWithAssignmentEpochValidation(short 
version) {
+        Uuid topicId = Uuid.randomUuid();
+
+        for (boolean isTransactional : new boolean[] {true, false}) {
+            ConsumerGroup group = createConsumerGroup("group-foo");
+
+            group.updateMember(new ConsumerGroupMember.Builder("member-id")
+                .setMemberEpoch(10)
+                .setAssignedPartitions(mkAssignmentWithEpochs(
+                    mkTopicAssignmentWithEpochs(topicId, 7, 0)))
+                .build());
+
+            // When assignment epoch(7) <= client epoch(9) <= broker 
epoch(10), no exception thrown.
+            if (version >= 9 || isTransactional) {
+                CommitPartitionValidator validator = 
group.validateOffsetCommit(
+                    "member-id", "", 9, isTransactional, version
+                );
+                assertDoesNotThrow(() -> validator.validate("foo", topicId, 
0));
+            } else {
+                assertThrows(UnsupportedVersionException.class, () ->
+                    group.validateOffsetCommit("member-id", "", 9, 
isTransactional, version));
+            }
+
+            // When client epoch (11) > broker epoch (10) , exception thrown 
directly from validateOffsetCommit.
+            if (version >= 9 || isTransactional) {
+                StaleMemberEpochException ex = 
assertThrows(StaleMemberEpochException.class, () ->
+                    group.validateOffsetCommit("member-id", "", 11, 
isTransactional, version));
+                assertEquals("Received member epoch 11 is newer than current 
member epoch 10.",
+                    ex.getMessage()
+                );
+            } else {
+                assertThrows(UnsupportedVersionException.class, () ->
+                    group.validateOffsetCommit("member-id", "", 11, 
isTransactional, version));
+            }
+
+            // When client epoch (10) == broker epoch (10), no exception 
thrown.
+            if (version >= 9 || isTransactional) {
+                CommitPartitionValidator validator = 
group.validateOffsetCommit(
+                    "member-id", "", 7, isTransactional, version
+                );
+                assertDoesNotThrow(() -> validator.validate("foo", topicId, 
0));
+            } else {
+                assertThrows(UnsupportedVersionException.class, () ->
+                    group.validateOffsetCommit("member-id", "", 7, 
isTransactional, version));
+            }
+
+            // When client epoch (8) != broker epoch (10) and client epoch (8) 
> assignment epoch (7),
+            // stale member epoch exception thrown from validateOffsetCommit.
+            if (version >= 9 || isTransactional) {
+                CommitPartitionValidator validator = 
group.validateOffsetCommit(
+                    "member-id", "", 6, isTransactional, version
+                );
+                StaleMemberEpochException ex = 
assertThrows(StaleMemberEpochException.class, () ->
+                    validator.validate("foo", topicId, 0));
+                assertEquals(
+                    "Received member epoch 6 is older than assignment epoch 7 
for partition foo-0.",
+                    ex.getMessage()
+                );
+            } else {
+                assertThrows(UnsupportedVersionException.class, () ->
+                    group.validateOffsetCommit("member-id", "", 6, 
isTransactional, version));
+            }
+        }
+    }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+    public void testValidateOffsetCommitWithPartitionPendingRevocation(short 
version) {
+        Uuid topicId = Uuid.randomUuid();
+
+        for (boolean isTransactional : new boolean[] {true, false}) {
+            ConsumerGroup group = createConsumerGroup("group-foo");
+
+            group.updateMember(new ConsumerGroupMember.Builder("member-id")
+                .setMemberEpoch(10)
+                .setSubscribedTopicNames(List.of("foo"))
+                .setPartitionsPendingRevocation(mkAssignmentWithEpochs(
+                    mkTopicAssignmentWithEpochs(topicId, 7, 0)))
+                .build());
+
+            // When client epoch (10) == broker epoch (10), no exception 
thrown.
+            if (version >= 9 || isTransactional) {
+                CommitPartitionValidator validator = 
group.validateOffsetCommit(
+                    "member-id", "", 10, isTransactional, version
+                );
+                assertDoesNotThrow(() -> validator.validate("foo", topicId, 
0));
+            } else {
+                assertThrows(UnsupportedVersionException.class, () ->
+                    group.validateOffsetCommit("member-id", "", 10, 
isTransactional, version));
+            }
+
+            // When client epoch (11) > broker epoch (10), exception thrown 
directly from validateOffsetCommit.
+            if (version >= 9 || isTransactional) {
+                StaleMemberEpochException ex = 
assertThrows(StaleMemberEpochException.class, () ->
+                    group.validateOffsetCommit("member-id", "", 11, 
isTransactional, version));
+                assertEquals(
+                    "Received member epoch 11 is newer than current member 
epoch 10.",
+                    ex.getMessage()
+                );
+            } else {
+                assertThrows(UnsupportedVersionException.class, () ->
+                    group.validateOffsetCommit("member-id", "", 11, 
isTransactional, version));
+            }
+
+            // When partition epoch (7) <= client epoch (7)  <= broker epoch 
(10), no exception thrown.

Review Comment:
   ```suggestion
               // When partition epoch (7) <= client epoch (7) <= broker epoch 
(10), no exception thrown.
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -1419,6 +1429,59 @@ private static void verifyOffsetCommit(Uuid topicId, 
OffsetMetadataManagerTestCo
         );
     }
 
+    @Test
+    public void testConsumerGroupOffsetCommitWithZeroUuidResolvesTopicId() {
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        MetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(barTopicId, barTopicName, 3)
+            .build();
+
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder()
+            .withMetadataImage(new 
KRaftCoordinatorMetadataImage(metadataImage))
+            .build();
+
+        // Create an empty group.
+        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
+            "foo",
+            true
+        );
+
+        group.updateMember(new ConsumerGroupMember.Builder("member")
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(10)
+            .setAssignedPartitions(mkAssignmentWithEpochs(
+                mkTopicAssignmentWithEpochs(barTopicId, 5, 0)))
+            .build()
+        );
+
+        OffsetCommitRequestData request = new OffsetCommitRequestData()
+            .setGroupId("foo")
+            .setMemberId("member")
+            .setTopics(List.of(
+                new OffsetCommitRequestData.OffsetCommitRequestTopic()
+                    .setName(barTopicName)
+                    .setTopicId(Uuid.ZERO_UUID)
+                    .setPartitions(List.of(
+                        new 
OffsetCommitRequestData.OffsetCommitRequestPartition()
+                            .setPartitionIndex(0)
+                            .setCommittedOffset(100L)
+                    ))
+            ));
+
+        // When client epoch (3) < assignment epoch (5), exception should be 
thrown.
+        request.setGenerationIdOrMemberEpoch(3);
+        assertThrows(StaleMemberEpochException.class, () -> 
context.commitOffset(request));
+
+        // When client epoch (5) >= assignment epoch (5), commit should 
succeed.
+        request.setGenerationIdOrMemberEpoch(5);
+        assertDoesNotThrow(() -> context.commitOffset(request));
+        CoordinatorResult<OffsetCommitResponseData, CoordinatorRecord> resp = 
context.commitOffset(request);
+        assertEquals(1, resp.response().topics().size());
+        assertEquals(barTopicName, resp.response().topics().get(0).name());

Review Comment:
   I think we should also check the error code for the partition. The rest of 
the tests write the checks as:
   
   ```suggestion
           assertEquals(
               new OffsetCommitResponseData()
                   .setTopics(List.of(
                       new OffsetCommitResponseData.OffsetCommitResponseTopic()
                           .setName(barTopicName)
                           .setTopicId(barTopicId)
                           .setPartitions(List.of(
                               new 
OffsetCommitResponseData.OffsetCommitResponsePartition()
                                   .setPartitionIndex(0)
                                   .setErrorCode(Errors.NONE.code())
                           ))
                   )),
               result.response()
           );
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -1419,6 +1429,59 @@ private static void verifyOffsetCommit(Uuid topicId, 
OffsetMetadataManagerTestCo
         );
     }
 
+    @Test
+    public void testConsumerGroupOffsetCommitWithZeroUuidResolvesTopicId() {
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        MetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(barTopicId, barTopicName, 3)
+            .build();
+
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder()
+            .withMetadataImage(new 
KRaftCoordinatorMetadataImage(metadataImage))
+            .build();
+
+        // Create an empty group.
+        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
+            "foo",
+            true
+        );
+
+        group.updateMember(new ConsumerGroupMember.Builder("member")
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(10)
+            .setAssignedPartitions(mkAssignmentWithEpochs(
+                mkTopicAssignmentWithEpochs(barTopicId, 5, 0)))
+            .build()
+        );
+
+        OffsetCommitRequestData request = new OffsetCommitRequestData()
+            .setGroupId("foo")
+            .setMemberId("member")
+            .setTopics(List.of(
+                new OffsetCommitRequestData.OffsetCommitRequestTopic()
+                    .setName(barTopicName)
+                    .setTopicId(Uuid.ZERO_UUID)
+                    .setPartitions(List.of(
+                        new 
OffsetCommitRequestData.OffsetCommitRequestPartition()
+                            .setPartitionIndex(0)
+                            .setCommittedOffset(100L)
+                    ))
+            ));
+
+        // When client epoch (3) < assignment epoch (5), exception should be 
thrown.
+        request.setGenerationIdOrMemberEpoch(3);
+        assertThrows(StaleMemberEpochException.class, () -> 
context.commitOffset(request));
+
+        // When client epoch (5) >= assignment epoch (5), commit should 
succeed.
+        request.setGenerationIdOrMemberEpoch(5);
+        assertDoesNotThrow(() -> context.commitOffset(request));
+        CoordinatorResult<OffsetCommitResponseData, CoordinatorRecord> resp = 
context.commitOffset(request);

Review Comment:
   nit: newline please!
   
   also we usually call the return value of `commitOffset` `result`.
   ```suggestion
   
           CoordinatorResult<OffsetCommitResponseData, CoordinatorRecord> 
result = context.commitOffset(request);
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -300,9 +303,7 @@ public void 
testUpdatingMemberUpdatesPartitionEpochWhenPartitionIsNotReleased()
 
         // m3 should not be able to acquire foo-1 because the epoch is smaller
         // than the current partition epoch (11).
-        assertThrows(IllegalStateException.class, () -> {
-            consumerGroup.updateMember(m3);
-        });
+        assertThrows(IllegalStateException.class, () -> 
consumerGroup.updateMember(m3));

Review Comment:
   nit: Could we avoid these formatting changes? I'm in favor of some of them 
but let's not mix them in with this PR.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -934,6 +934,194 @@ public void testValidateOffsetCommit(short version) {
         }
     }
 
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+    public void testValidateOffsetCommitWithAssignmentEpochValidation(short 
version) {
+        Uuid topicId = Uuid.randomUuid();
+
+        for (boolean isTransactional : new boolean[] {true, false}) {
+            ConsumerGroup group = createConsumerGroup("group-foo");
+
+            group.updateMember(new ConsumerGroupMember.Builder("member-id")
+                .setMemberEpoch(10)
+                .setAssignedPartitions(mkAssignmentWithEpochs(
+                    mkTopicAssignmentWithEpochs(topicId, 7, 0)))
+                .build());
+
+            // When assignment epoch(7) <= client epoch(9) <= broker 
epoch(10), no exception thrown.
+            if (version >= 9 || isTransactional) {

Review Comment:
   Could we swap the order of these checks?
   `9` is an OffsetCommit version and we shouldn't run the comparison against a 
TxnOffsetCommit version.
   ```suggestion
               if (isTransactional || version >= 9) {
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -934,6 +934,194 @@ public void testValidateOffsetCommit(short version) {
         }
     }
 
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+    public void testValidateOffsetCommitWithAssignmentEpochValidation(short 
version) {
+        Uuid topicId = Uuid.randomUuid();
+
+        for (boolean isTransactional : new boolean[] {true, false}) {
+            ConsumerGroup group = createConsumerGroup("group-foo");
+
+            group.updateMember(new ConsumerGroupMember.Builder("member-id")
+                .setMemberEpoch(10)
+                .setAssignedPartitions(mkAssignmentWithEpochs(
+                    mkTopicAssignmentWithEpochs(topicId, 7, 0)))
+                .build());
+
+            // When assignment epoch(7) <= client epoch(9) <= broker 
epoch(10), no exception thrown.
+            if (version >= 9 || isTransactional) {
+                CommitPartitionValidator validator = 
group.validateOffsetCommit(
+                    "member-id", "", 9, isTransactional, version
+                );
+                assertDoesNotThrow(() -> validator.validate("foo", topicId, 
0));
+            } else {
+                assertThrows(UnsupportedVersionException.class, () ->
+                    group.validateOffsetCommit("member-id", "", 9, 
isTransactional, version));
+            }
+
+            // When client epoch (11) > broker epoch (10) , exception thrown 
directly from validateOffsetCommit.
+            if (version >= 9 || isTransactional) {
+                StaleMemberEpochException ex = 
assertThrows(StaleMemberEpochException.class, () ->
+                    group.validateOffsetCommit("member-id", "", 11, 
isTransactional, version));
+                assertEquals("Received member epoch 11 is newer than current 
member epoch 10.",
+                    ex.getMessage()
+                );
+            } else {
+                assertThrows(UnsupportedVersionException.class, () ->
+                    group.validateOffsetCommit("member-id", "", 11, 
isTransactional, version));
+            }
+
+            // When client epoch (10) == broker epoch (10), no exception 
thrown.
+            if (version >= 9 || isTransactional) {
+                CommitPartitionValidator validator = 
group.validateOffsetCommit(
+                    "member-id", "", 7, isTransactional, version
+                );
+                assertDoesNotThrow(() -> validator.validate("foo", topicId, 
0));
+            } else {
+                assertThrows(UnsupportedVersionException.class, () ->
+                    group.validateOffsetCommit("member-id", "", 7, 
isTransactional, version));
+            }
+
+            // When client epoch (8) != broker epoch (10) and client epoch (8) 
> assignment epoch (7),
+            // stale member epoch exception thrown from validateOffsetCommit.

Review Comment:
   This comment doesn't match the check. The client epoch is 6 and the 
exception comes from the validator.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -934,6 +934,194 @@ public void testValidateOffsetCommit(short version) {
         }
     }
 
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+    public void testValidateOffsetCommitWithAssignmentEpochValidation(short 
version) {
+        Uuid topicId = Uuid.randomUuid();
+
+        for (boolean isTransactional : new boolean[] {true, false}) {
+            ConsumerGroup group = createConsumerGroup("group-foo");
+
+            group.updateMember(new ConsumerGroupMember.Builder("member-id")
+                .setMemberEpoch(10)
+                .setAssignedPartitions(mkAssignmentWithEpochs(
+                    mkTopicAssignmentWithEpochs(topicId, 7, 0)))
+                .build());
+
+            // When assignment epoch(7) <= client epoch(9) <= broker 
epoch(10), no exception thrown.
+            if (version >= 9 || isTransactional) {
+                CommitPartitionValidator validator = 
group.validateOffsetCommit(
+                    "member-id", "", 9, isTransactional, version
+                );
+                assertDoesNotThrow(() -> validator.validate("foo", topicId, 
0));
+            } else {
+                assertThrows(UnsupportedVersionException.class, () ->
+                    group.validateOffsetCommit("member-id", "", 9, 
isTransactional, version));
+            }
+
+            // When client epoch (11) > broker epoch (10) , exception thrown 
directly from validateOffsetCommit.
+            if (version >= 9 || isTransactional) {
+                StaleMemberEpochException ex = 
assertThrows(StaleMemberEpochException.class, () ->
+                    group.validateOffsetCommit("member-id", "", 11, 
isTransactional, version));
+                assertEquals("Received member epoch 11 is newer than current 
member epoch 10.",
+                    ex.getMessage()
+                );
+            } else {
+                assertThrows(UnsupportedVersionException.class, () ->
+                    group.validateOffsetCommit("member-id", "", 11, 
isTransactional, version));
+            }

Review Comment:
   We could maybe drop this check since it's already covered by 
`testValidateTransactionalOffsetCommit` and `testValidateOffsetCommit`.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -934,6 +934,194 @@ public void testValidateOffsetCommit(short version) {
         }
     }
 
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)

Review Comment:
   It's not appropriate to test `TxnOffsetCommit` requests using versions from 
`OffsetCommit`.
   `TxnOffsetCommit` runs from version 0 to 5 and `OffsetCommit` runs from 
version 0 to 10.
   
   Do you think you could parameterize these with a `MethodSource` that returns
   ```
   (isTransactional=false, version=ApiKeys.OFFSET_COMMIT.oldestVersion())
   ...
   (isTransactional=false, version=ApiKeys.OFFSET_COMMIT.latestVersion(true))
   (isTransactional=true, version=ApiKeys.TXN_OFFSET_COMMIT.oldestVersion())
   ...
   (isTransactional=true, version=ApiKeys.TXN_OFFSET_COMMIT.latestVersion(true))
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -934,6 +934,194 @@ public void testValidateOffsetCommit(short version) {
         }
     }
 
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+    public void testValidateOffsetCommitWithAssignmentEpochValidation(short 
version) {
+        Uuid topicId = Uuid.randomUuid();
+
+        for (boolean isTransactional : new boolean[] {true, false}) {
+            ConsumerGroup group = createConsumerGroup("group-foo");
+
+            group.updateMember(new ConsumerGroupMember.Builder("member-id")
+                .setMemberEpoch(10)
+                .setAssignedPartitions(mkAssignmentWithEpochs(
+                    mkTopicAssignmentWithEpochs(topicId, 7, 0)))
+                .build());
+
+            // When assignment epoch(7) <= client epoch(9) <= broker 
epoch(10), no exception thrown.
+            if (version >= 9 || isTransactional) {
+                CommitPartitionValidator validator = 
group.validateOffsetCommit(
+                    "member-id", "", 9, isTransactional, version
+                );
+                assertDoesNotThrow(() -> validator.validate("foo", topicId, 
0));
+            } else {
+                assertThrows(UnsupportedVersionException.class, () ->
+                    group.validateOffsetCommit("member-id", "", 9, 
isTransactional, version));
+            }
+
+            // When client epoch (11) > broker epoch (10) , exception thrown 
directly from validateOffsetCommit.
+            if (version >= 9 || isTransactional) {
+                StaleMemberEpochException ex = 
assertThrows(StaleMemberEpochException.class, () ->
+                    group.validateOffsetCommit("member-id", "", 11, 
isTransactional, version));
+                assertEquals("Received member epoch 11 is newer than current 
member epoch 10.",
+                    ex.getMessage()
+                );
+            } else {
+                assertThrows(UnsupportedVersionException.class, () ->
+                    group.validateOffsetCommit("member-id", "", 11, 
isTransactional, version));
+            }
+
+            // When client epoch (10) == broker epoch (10), no exception 
thrown.
+            if (version >= 9 || isTransactional) {
+                CommitPartitionValidator validator = 
group.validateOffsetCommit(
+                    "member-id", "", 7, isTransactional, version
+                );
+                assertDoesNotThrow(() -> validator.validate("foo", topicId, 
0));
+            } else {
+                assertThrows(UnsupportedVersionException.class, () ->
+                    group.validateOffsetCommit("member-id", "", 7, 
isTransactional, version));
+            }
+
+            // When client epoch (8) != broker epoch (10) and client epoch (8) 
> assignment epoch (7),
+            // stale member epoch exception thrown from validateOffsetCommit.
+            if (version >= 9 || isTransactional) {
+                CommitPartitionValidator validator = 
group.validateOffsetCommit(
+                    "member-id", "", 6, isTransactional, version
+                );
+                StaleMemberEpochException ex = 
assertThrows(StaleMemberEpochException.class, () ->
+                    validator.validate("foo", topicId, 0));
+                assertEquals(
+                    "Received member epoch 6 is older than assignment epoch 7 
for partition foo-0.",
+                    ex.getMessage()
+                );
+            } else {
+                assertThrows(UnsupportedVersionException.class, () ->
+                    group.validateOffsetCommit("member-id", "", 6, 
isTransactional, version));
+            }
+        }
+    }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+    public void testValidateOffsetCommitWithPartitionPendingRevocation(short 
version) {
+        Uuid topicId = Uuid.randomUuid();
+
+        for (boolean isTransactional : new boolean[] {true, false}) {
+            ConsumerGroup group = createConsumerGroup("group-foo");
+
+            group.updateMember(new ConsumerGroupMember.Builder("member-id")
+                .setMemberEpoch(10)
+                .setSubscribedTopicNames(List.of("foo"))
+                .setPartitionsPendingRevocation(mkAssignmentWithEpochs(
+                    mkTopicAssignmentWithEpochs(topicId, 7, 0)))
+                .build());
+
+            // When client epoch (10) == broker epoch (10), no exception 
thrown.

Review Comment:
   Could we align the ordering of tests within 
`testValidateOffsetCommitWithAssignmentEpochValidation` and 
`testValidateOffsetCommitWithPartitionPendingRevocation`? I would suggest 
client epoch 10, then 7, then 6 or the other way around.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to