squah-confluent commented on code in PR #21692:
URL: https://github.com/apache/kafka/pull/21692#discussion_r2915934260


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -673,8 +673,24 @@ public CommitPartitionValidator validateOffsetCommit(
                 "by members using the modern group protocol");
         }
 
-        validateMemberEpoch(memberEpoch, member.memberEpoch(), 
member.useClassicProtocol());
-        return CommitPartitionValidator.NO_OP;
+        // For members using the consumer protocol, the epoch must either 
match the last epoch sent
+        // in a heartbeat or be greater than or equal to the partition's 
assignment epoch.
+        if (member.useClassicProtocol()) {
+            validateMemberEpoch(memberEpoch, member.memberEpoch(), true);
+            return CommitPartitionValidator.NO_OP;
+        }
+
+        // For members using the consumer protocol
+        if (memberEpoch == member.memberEpoch()) {
+            return CommitPartitionValidator.NO_OP;
+        }
+        if (memberEpoch > member.memberEpoch()) {

Review Comment:
   nit: newline
   ```suggestion
           }
   
           if (memberEpoch > member.memberEpoch()) {
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -630,23 +631,31 @@ public CoordinatorResult<OffsetCommitResponseData, 
CoordinatorRecord> commitOffs
                         .setPartitionIndex(partition.partitionIndex())
                         
.setErrorCode(Errors.OFFSET_METADATA_TOO_LARGE.code()));
                 } else {
+                    // Resolve topic ID if it's ZERO_UUID
+                    Uuid resolvedTopicId = topic.topicId();
+                    if (resolvedTopicId.equals(Uuid.ZERO_UUID)) {
+                        resolvedTopicId = groupMetadataManager.image()
+                            .topicMetadata(topic.name())
+                            .map(CoordinatorMetadataImage.TopicMetadata::id)
+                            .orElse(Uuid.ZERO_UUID);
+                    }

Review Comment:
   We can lift the resolution up one loop level, so that it is done per topic 
instead of per partition.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -673,8 +673,24 @@ public CommitPartitionValidator validateOffsetCommit(
                 "by members using the modern group protocol");
         }
 
-        validateMemberEpoch(memberEpoch, member.memberEpoch(), 
member.useClassicProtocol());
-        return CommitPartitionValidator.NO_OP;
+        // For members using the consumer protocol, the epoch must either 
match the last epoch sent
+        // in a heartbeat or be greater than or equal to the partition's 
assignment epoch.

Review Comment:
   This comment looks misplaced.
   ```suggestion
           // For members using the classic protocol, the epoch must match the 
last epoch sent
           // in a heartbeat.
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -837,6 +853,41 @@ private void validateMemberEpoch(
         }
     }
 
+    /**
+     * Creates a validator that checks if the received member epoch is valid 
for each partition's assignment epoch.
+     * A commit is rejected if the partition is not assigned to the member
+     * or if the received client-side epoch is older than the partition's 
assignment epoch (KIP-1251).
+     *
+     * @param member              The consumer group member.
+     * @param receivedMemberEpoch The member epoch from the offset commit 
request.
+     * @return A validator that checks each partition's assignment epoch.

Review Comment:
   Could we re-use the streams javadoc wording?
   ```suggestion
        * @param member              The member whose assignments are being 
validated.
        * @param receivedMemberEpoch The received member epoch.
        * @return A validator for per-partition validation.
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -673,8 +673,24 @@ public CommitPartitionValidator validateOffsetCommit(
                 "by members using the modern group protocol");
         }
 
-        validateMemberEpoch(memberEpoch, member.memberEpoch(), 
member.useClassicProtocol());
-        return CommitPartitionValidator.NO_OP;
+        // For members using the consumer protocol, the epoch must either 
match the last epoch sent
+        // in a heartbeat or be greater than or equal to the partition's 
assignment epoch.
+        if (member.useClassicProtocol()) {
+            validateMemberEpoch(memberEpoch, member.memberEpoch(), true);
+            return CommitPartitionValidator.NO_OP;
+        }
+
+        // For members using the consumer protocol

Review Comment:
   ```suggestion
           // For members using the consumer protocol, the epoch must either 
match the last epoch sent
           // in a heartbeat or be greater than or equal to the partition's 
assignment epoch.
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -630,23 +631,31 @@ public CoordinatorResult<OffsetCommitResponseData, 
CoordinatorRecord> commitOffs
                         .setPartitionIndex(partition.partitionIndex())
                         
.setErrorCode(Errors.OFFSET_METADATA_TOO_LARGE.code()));
                 } else {
+                    // Resolve topic ID if it's ZERO_UUID
+                    Uuid resolvedTopicId = topic.topicId();
+                    if (resolvedTopicId.equals(Uuid.ZERO_UUID)) {
+                        resolvedTopicId = groupMetadataManager.image()
+                            .topicMetadata(topic.name())
+                            .map(CoordinatorMetadataImage.TopicMetadata::id)
+                            .orElse(Uuid.ZERO_UUID);
+                    }
                     // Validate commit per-partition
                     validator.validate(
                         topic.name(),
-                        topic.topicId(),
+                        resolvedTopicId,
                         partition.partitionIndex()
                     );
 
                     log.debug("[GroupId {}] Committing offsets {} for 
partition {}-{}-{} from member {} with leader epoch {}.",
-                        request.groupId(), partition.committedOffset(), 
topic.topicId(), topic.name(), partition.partitionIndex(),
+                        request.groupId(), partition.committedOffset(), 
resolvedTopicId, topic.name(), partition.partitionIndex(),

Review Comment:
   We shouldn't bake the topic id when the offset commit specified a topic name 
only. This impacts behavior when topics are recreated.
   
   ```suggestion
                           request.groupId(), partition.committedOffset(), 
topic.topicId(), topic.name(), partition.partitionIndex(),
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -359,14 +360,12 @@ public void testAddPartitionEpochs() {
         assertEquals(11, consumerGroup.currentPartitionEpoch(fooTopicId, 1));
 
         // Updating to a smaller epoch should fail.
-        assertThrows(IllegalStateException.class, () -> {
-            consumerGroup.addPartitionEpochs(
-                toAssignmentWithEpochs(mkAssignment(
-                    mkTopicAssignment(fooTopicId, 1)
-                ), 10),
-                10
-            );
-        });
+        assertThrows(IllegalStateException.class, () -> 
consumerGroup.addPartitionEpochs(
+            toAssignmentWithEpochs(mkAssignment(
+                mkTopicAssignment(fooTopicId, 1)
+            ), 10),
+            10
+        ));

Review Comment:
   Could we avoid these formatting changes? (x4)



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -630,23 +631,31 @@ public CoordinatorResult<OffsetCommitResponseData, 
CoordinatorRecord> commitOffs
                         .setPartitionIndex(partition.partitionIndex())
                         
.setErrorCode(Errors.OFFSET_METADATA_TOO_LARGE.code()));
                 } else {
+                    // Resolve topic ID if it's ZERO_UUID
+                    Uuid resolvedTopicId = topic.topicId();
+                    if (resolvedTopicId.equals(Uuid.ZERO_UUID)) {
+                        resolvedTopicId = groupMetadataManager.image()
+                            .topicMetadata(topic.name())
+                            .map(CoordinatorMetadataImage.TopicMetadata::id)
+                            .orElse(Uuid.ZERO_UUID);
+                    }
                     // Validate commit per-partition
                     validator.validate(
                         topic.name(),
-                        topic.topicId(),
+                        resolvedTopicId,
                         partition.partitionIndex()
                     );
 
                     log.debug("[GroupId {}] Committing offsets {} for 
partition {}-{}-{} from member {} with leader epoch {}.",
-                        request.groupId(), partition.committedOffset(), 
topic.topicId(), topic.name(), partition.partitionIndex(),
+                        request.groupId(), partition.committedOffset(), 
resolvedTopicId, topic.name(), partition.partitionIndex(),
                         request.memberId(), partition.committedLeaderEpoch());
 
                     topicResponse.partitions().add(new 
OffsetCommitResponsePartition()
                         .setPartitionIndex(partition.partitionIndex())
                         .setErrorCode(Errors.NONE.code()));
 
                     final OffsetAndMetadata offsetAndMetadata = 
OffsetAndMetadata.fromRequest(
-                        topic.topicId(),
+                        resolvedTopicId,

Review Comment:
   We shouldn't bake the topic id when the offset commit specified a topic name 
only. This impacts behavior when topics are recreated.
   
   ```suggestion
                           topic.topicId(),
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -1278,6 +1285,59 @@ public void 
testStreamsGroupOffsetCommitFromAdminClient() {
         verifyOffsetCommitFromAdminClient(context);
     }
 
+    @Test
+    public void testConsumerGroupOffsetCommitWithZeroUuidResolvesTopicId() {
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        MetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(barTopicId, barTopicName, 3)
+            .build();
+
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder()
+            .withMetadataImage(metadataImage)
+            .build();
+
+        // Create an empty group.
+        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
+            "foo",
+            true
+        );
+
+        group.updateMember(new ConsumerGroupMember.Builder("member")
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(10)
+            .setAssignedPartitions(Map.of(barTopicId, Map.of(0, 5)))
+            .build()
+        );
+
+        OffsetCommitRequestData request = new OffsetCommitRequestData()
+            .setGroupId("foo")
+            .setMemberId("member")
+            .setGenerationIdOrMemberEpoch(3) // stale member epoch
+            .setTopics(List.of(
+                new OffsetCommitRequestData.OffsetCommitRequestTopic()
+                    .setName(barTopicName)
+                    .setTopicId(Uuid.ZERO_UUID) // ZERO_UUID topic_id
+                    .setPartitions(List.of(
+                        new 
OffsetCommitRequestData.OffsetCommitRequestPartition()
+                            .setPartitionIndex(0)
+                            .setCommittedOffset(100L)
+                    ))
+            ));
+
+        // client epoch (3) < assignment epoch (5), fail

Review Comment:
   style nit: Could we either drop these comments or write them as full 
sentences?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -2160,4 +2159,327 @@ public void 
testComputeMetadataHashIgnoreTopicHashIfItIsNotInMetadataImage() {
             cache
         );
     }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+    public void testValidateOffsetCommitWithAssignmentEpochValidation(short 
version) {

Review Comment:
   Could we move these new `testValidateOffsetCommit...` tests just after 
`testValidateOffsetCommit`?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -1278,6 +1285,59 @@ public void 
testStreamsGroupOffsetCommitFromAdminClient() {
         verifyOffsetCommitFromAdminClient(context);
     }
 
+    @Test
+    public void testConsumerGroupOffsetCommitWithZeroUuidResolvesTopicId() {
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        MetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(barTopicId, barTopicName, 3)
+            .build();
+
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder()
+            .withMetadataImage(metadataImage)
+            .build();
+
+        // Create an empty group.
+        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
+            "foo",
+            true
+        );
+
+        group.updateMember(new ConsumerGroupMember.Builder("member")
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(10)
+            .setAssignedPartitions(Map.of(barTopicId, Map.of(0, 5)))

Review Comment:
   Can we use the `mkAssignment`-style methods?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -2160,4 +2159,327 @@ public void 
testComputeMetadataHashIgnoreTopicHashIfItIsNotInMetadataImage() {
             cache
         );
     }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+    public void testValidateOffsetCommitWithAssignmentEpochValidation(short 
version) {
+        Uuid topicId = Uuid.randomUuid();
+        String topicName = "foo";
+        int partitionId = 0;
+        int memberEpoch = 10;
+        int assignmentEpoch = 7;

Review Comment:
   nit: I would inline and constant-fold these as much as possible, including 
the `String.format`s, except for `isTransactional`.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -2160,4 +2159,327 @@ public void 
testComputeMetadataHashIgnoreTopicHashIfItIsNotInMetadataImage() {
             cache
         );
     }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+    public void testValidateOffsetCommitWithAssignmentEpochValidation(short 
version) {
+        Uuid topicId = Uuid.randomUuid();
+        String topicName = "foo";
+        int partitionId = 0;
+        int memberEpoch = 10;
+        int assignmentEpoch = 7;
+        boolean isTransactional = false;
+
+        ConsumerGroup group = createConsumerGroup("group-foo");
+
+        group.updateMember(new ConsumerGroupMember.Builder("member-id")
+            .setMemberEpoch(memberEpoch)
+            .setSubscribedTopicNames(List.of(topicName))
+            .setAssignedPartitions(mkAssignmentWithEpochs(
+                mkTopicAssignmentWithEpochs(topicId, assignmentEpoch, 
partitionId)))
+            .build());
+
+        // client epoch = broker epoch
+        if (version >= 9) {
+            CommitPartitionValidator validator = group.validateOffsetCommit(
+                "member-id", "", memberEpoch, isTransactional, version
+            );
+            assertDoesNotThrow(() -> validator.validate(topicName, topicId, 
partitionId));
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", memberEpoch, 
isTransactional, version));
+        }
+        // client epoch (11) > broker epoch (10) - exception thrown directly 
from validateOffsetCommit
+        if (version >= 9) {
+            int clientEpoch = memberEpoch + 1;
+            StaleMemberEpochException ex = 
assertThrows(StaleMemberEpochException.class, () ->
+                group.validateOffsetCommit("member-id", "", clientEpoch, 
isTransactional, version));
+            assertEquals(
+                String.format("Received member epoch %d is newer than "
+                    + "current member epoch %d.", clientEpoch, memberEpoch),
+                ex.getMessage()
+            );
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", memberEpoch + 1, 
isTransactional, version));
+        }
+
+        // assignment epoch (7) <= client epoch (7) <= broker epoch (10)
+        if (version >= 9) {
+            CommitPartitionValidator validator = group.validateOffsetCommit(
+                "member-id", "", assignmentEpoch, isTransactional, version
+            );
+            assertDoesNotThrow(() -> validator.validate(topicName, topicId, 
partitionId));
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", assignmentEpoch, 
isTransactional, version));
+        }
+
+        // client epoch (6) != broker epoch (10) and client epoch (6) < 
assignment epoch (7)
+        if (version >= 9) {
+            CommitPartitionValidator validator = group.validateOffsetCommit(
+                "member-id", "", assignmentEpoch - 1, isTransactional, version
+            );
+            StaleMemberEpochException ex = 
assertThrows(StaleMemberEpochException.class, () ->
+                validator.validate(topicName, topicId, partitionId));
+            assertEquals(
+                String.format("Received member epoch %d is older than 
assignment epoch %d for partition %s-%d.",
+                    assignmentEpoch - 1, assignmentEpoch, topicName, 
partitionId),
+                ex.getMessage()
+            );
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", assignmentEpoch - 
1, isTransactional, version));
+        }
+    }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+    public void testValidateOffsetCommitWithPartitionPendingRevocation(short 
version) {
+        Uuid topicId = Uuid.randomUuid();
+        String topicName = "foo";
+        int partitionId = 0;
+        int memberEpoch = 10;
+        int assignmentEpoch = 7;
+        boolean isTransactional = false;
+
+        ConsumerGroup group = createConsumerGroup("group-foo");
+
+        group.updateMember(new ConsumerGroupMember.Builder("member-id")
+            .setMemberEpoch(memberEpoch)
+            .setSubscribedTopicNames(List.of(topicName))
+            .setPartitionsPendingRevocation(mkAssignmentWithEpochs(
+                mkTopicAssignmentWithEpochs(topicId, assignmentEpoch, 
partitionId)))
+            .build());
+
+        // client epoch (10) == broker epoch (10), no exception thrown
+        if (version >= 9) {
+            CommitPartitionValidator validator = group.validateOffsetCommit(
+                "member-id", "", memberEpoch, isTransactional, version
+            );
+            assertDoesNotThrow(() -> validator.validate(topicName, topicId, 
partitionId));
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", memberEpoch, 
isTransactional, version));
+        }
+
+        // client epoch (11) > broker epoch (10) - exception thrown directly 
from validateOffsetCommit
+        if (version >= 9) {
+            StaleMemberEpochException ex = 
assertThrows(StaleMemberEpochException.class, () ->
+                group.validateOffsetCommit("member-id", "", memberEpoch + 1, 
isTransactional, version));
+            assertEquals(
+                String.format("Received member epoch %d is newer than "
+                    + "current member epoch %d.", memberEpoch + 1, 
memberEpoch),
+                ex.getMessage()
+            );
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", memberEpoch + 1, 
isTransactional, version));
+        }
+
+        // partition epoch <= client epoch <= broker epoch
+        if (version >= 9) {
+            CommitPartitionValidator validator = group.validateOffsetCommit(
+                "member-id", "", assignmentEpoch, isTransactional, version
+            );
+            assertDoesNotThrow(() -> validator.validate(topicName, topicId, 
partitionId));
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", assignmentEpoch, 
isTransactional, version));
+        }
+
+        // client epoch != broker epoch and client epoch < partition epoch
+        if (version >= 9) {
+            CommitPartitionValidator validator = group.validateOffsetCommit(
+                "member-id", "", assignmentEpoch - 1, isTransactional, version
+            );
+            StaleMemberEpochException ex = 
assertThrows(StaleMemberEpochException.class, () ->
+                validator.validate(topicName, topicId, partitionId));
+            assertEquals(
+                String.format("Received member epoch %d is older than 
assignment epoch %d for partition %s-%d.",
+                    assignmentEpoch - 1, assignmentEpoch, topicName, 
partitionId),
+                ex.getMessage()
+            );
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", assignmentEpoch - 
1, isTransactional, version));
+        }
+    }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+    public void testValidateOffsetCommitWithUnassignedPartition(short version) 
{
+        Uuid assignedTopicId = Uuid.randomUuid();
+        Uuid unassignedTopicId = Uuid.randomUuid();
+        String unassignedTopicName = "bar";
+        int partitionId = 0;
+        int memberEpoch = 10;
+        int assignmentEpoch = 7;
+        boolean isTransactional = false;
+
+        ConsumerGroup group = createConsumerGroup("group-foo");
+
+        group.updateMember(new ConsumerGroupMember.Builder("member-id")
+            .setMemberEpoch(memberEpoch)
+            .setAssignedPartitions(mkAssignmentWithEpochs(
+                mkTopicAssignmentWithEpochs(assignedTopicId, assignmentEpoch, 
partitionId)))
+            .setPartitionsPendingRevocation(mkAssignmentWithEpochs(
+                    mkTopicAssignmentWithEpochs(assignedTopicId, 
assignmentEpoch, partitionId + 1)))
+            .build());
+
+        // Commit an unassigned partition
+        if (version >= 9) {
+            CommitPartitionValidator validator = group.validateOffsetCommit(
+                "member-id", "", assignmentEpoch, isTransactional, version
+            );
+            StaleMemberEpochException ex = 
assertThrows(StaleMemberEpochException.class, () ->
+                validator.validate(unassignedTopicName, unassignedTopicId, 
partitionId));
+            assertEquals(
+                String.format("Partition %s-%d is not assigned or pending 
revocation for member.",
+                    unassignedTopicName, partitionId),
+                ex.getMessage()
+            );
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", assignmentEpoch, 
isTransactional, version));
+        }
+    }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+    public void testValidateOffsetCommitWithMultiplePartitionsAndEpochs(short 
version) {

Review Comment:
   I'm not sure this test is necessary?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -2160,4 +2159,327 @@ public void 
testComputeMetadataHashIgnoreTopicHashIfItIsNotInMetadataImage() {
             cache
         );
     }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+    public void testValidateOffsetCommitWithAssignmentEpochValidation(short 
version) {
+        Uuid topicId = Uuid.randomUuid();
+        String topicName = "foo";
+        int partitionId = 0;
+        int memberEpoch = 10;
+        int assignmentEpoch = 7;
+        boolean isTransactional = false;
+
+        ConsumerGroup group = createConsumerGroup("group-foo");
+
+        group.updateMember(new ConsumerGroupMember.Builder("member-id")
+            .setMemberEpoch(memberEpoch)
+            .setSubscribedTopicNames(List.of(topicName))
+            .setAssignedPartitions(mkAssignmentWithEpochs(
+                mkTopicAssignmentWithEpochs(topicId, assignmentEpoch, 
partitionId)))
+            .build());
+
+        // client epoch = broker epoch
+        if (version >= 9) {
+            CommitPartitionValidator validator = group.validateOffsetCommit(
+                "member-id", "", memberEpoch, isTransactional, version
+            );
+            assertDoesNotThrow(() -> validator.validate(topicName, topicId, 
partitionId));
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", memberEpoch, 
isTransactional, version));
+        }
+        // client epoch (11) > broker epoch (10) - exception thrown directly 
from validateOffsetCommit
+        if (version >= 9) {
+            int clientEpoch = memberEpoch + 1;
+            StaleMemberEpochException ex = 
assertThrows(StaleMemberEpochException.class, () ->
+                group.validateOffsetCommit("member-id", "", clientEpoch, 
isTransactional, version));
+            assertEquals(
+                String.format("Received member epoch %d is newer than "
+                    + "current member epoch %d.", clientEpoch, memberEpoch),
+                ex.getMessage()
+            );
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", memberEpoch + 1, 
isTransactional, version));
+        }
+
+        // assignment epoch (7) <= client epoch (7) <= broker epoch (10)
+        if (version >= 9) {
+            CommitPartitionValidator validator = group.validateOffsetCommit(
+                "member-id", "", assignmentEpoch, isTransactional, version
+            );
+            assertDoesNotThrow(() -> validator.validate(topicName, topicId, 
partitionId));
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", assignmentEpoch, 
isTransactional, version));
+        }
+
+        // client epoch (6) != broker epoch (10) and client epoch (6) < 
assignment epoch (7)
+        if (version >= 9) {
+            CommitPartitionValidator validator = group.validateOffsetCommit(
+                "member-id", "", assignmentEpoch - 1, isTransactional, version
+            );
+            StaleMemberEpochException ex = 
assertThrows(StaleMemberEpochException.class, () ->
+                validator.validate(topicName, topicId, partitionId));
+            assertEquals(
+                String.format("Received member epoch %d is older than 
assignment epoch %d for partition %s-%d.",
+                    assignmentEpoch - 1, assignmentEpoch, topicName, 
partitionId),
+                ex.getMessage()
+            );
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", assignmentEpoch - 
1, isTransactional, version));
+        }
+    }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+    public void testValidateOffsetCommitWithPartitionPendingRevocation(short 
version) {
+        Uuid topicId = Uuid.randomUuid();
+        String topicName = "foo";
+        int partitionId = 0;
+        int memberEpoch = 10;
+        int assignmentEpoch = 7;
+        boolean isTransactional = false;
+
+        ConsumerGroup group = createConsumerGroup("group-foo");
+
+        group.updateMember(new ConsumerGroupMember.Builder("member-id")
+            .setMemberEpoch(memberEpoch)
+            .setSubscribedTopicNames(List.of(topicName))
+            .setPartitionsPendingRevocation(mkAssignmentWithEpochs(
+                mkTopicAssignmentWithEpochs(topicId, assignmentEpoch, 
partitionId)))
+            .build());
+
+        // client epoch (10) == broker epoch (10), no exception thrown
+        if (version >= 9) {
+            CommitPartitionValidator validator = group.validateOffsetCommit(
+                "member-id", "", memberEpoch, isTransactional, version
+            );
+            assertDoesNotThrow(() -> validator.validate(topicName, topicId, 
partitionId));
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", memberEpoch, 
isTransactional, version));
+        }
+
+        // client epoch (11) > broker epoch (10) - exception thrown directly 
from validateOffsetCommit
+        if (version >= 9) {
+            StaleMemberEpochException ex = 
assertThrows(StaleMemberEpochException.class, () ->
+                group.validateOffsetCommit("member-id", "", memberEpoch + 1, 
isTransactional, version));
+            assertEquals(
+                String.format("Received member epoch %d is newer than "
+                    + "current member epoch %d.", memberEpoch + 1, 
memberEpoch),
+                ex.getMessage()
+            );
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", memberEpoch + 1, 
isTransactional, version));
+        }
+
+        // partition epoch <= client epoch <= broker epoch
+        if (version >= 9) {
+            CommitPartitionValidator validator = group.validateOffsetCommit(
+                "member-id", "", assignmentEpoch, isTransactional, version
+            );
+            assertDoesNotThrow(() -> validator.validate(topicName, topicId, 
partitionId));
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", assignmentEpoch, 
isTransactional, version));
+        }
+
+        // client epoch != broker epoch and client epoch < partition epoch
+        if (version >= 9) {
+            CommitPartitionValidator validator = group.validateOffsetCommit(
+                "member-id", "", assignmentEpoch - 1, isTransactional, version
+            );
+            StaleMemberEpochException ex = 
assertThrows(StaleMemberEpochException.class, () ->
+                validator.validate(topicName, topicId, partitionId));
+            assertEquals(
+                String.format("Received member epoch %d is older than 
assignment epoch %d for partition %s-%d.",
+                    assignmentEpoch - 1, assignmentEpoch, topicName, 
partitionId),
+                ex.getMessage()
+            );
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", assignmentEpoch - 
1, isTransactional, version));
+        }
+    }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+    public void testValidateOffsetCommitWithUnassignedPartition(short version) 
{
+        Uuid assignedTopicId = Uuid.randomUuid();
+        Uuid unassignedTopicId = Uuid.randomUuid();
+        String unassignedTopicName = "bar";
+        int partitionId = 0;
+        int memberEpoch = 10;
+        int assignmentEpoch = 7;
+        boolean isTransactional = false;
+
+        ConsumerGroup group = createConsumerGroup("group-foo");
+
+        group.updateMember(new ConsumerGroupMember.Builder("member-id")
+            .setMemberEpoch(memberEpoch)
+            .setAssignedPartitions(mkAssignmentWithEpochs(
+                mkTopicAssignmentWithEpochs(assignedTopicId, assignmentEpoch, 
partitionId)))
+            .setPartitionsPendingRevocation(mkAssignmentWithEpochs(
+                    mkTopicAssignmentWithEpochs(assignedTopicId, 
assignmentEpoch, partitionId + 1)))
+            .build());
+
+        // Commit an unassigned partition
+        if (version >= 9) {
+            CommitPartitionValidator validator = group.validateOffsetCommit(
+                "member-id", "", assignmentEpoch, isTransactional, version
+            );
+            StaleMemberEpochException ex = 
assertThrows(StaleMemberEpochException.class, () ->
+                validator.validate(unassignedTopicName, unassignedTopicId, 
partitionId));
+            assertEquals(
+                String.format("Partition %s-%d is not assigned or pending 
revocation for member.",
+                    unassignedTopicName, partitionId),
+                ex.getMessage()
+            );
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", assignmentEpoch, 
isTransactional, version));
+        }
+    }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+    public void testValidateOffsetCommitWithMultiplePartitionsAndEpochs(short 
version) {
+        Uuid topicId = Uuid.randomUuid();
+        String topicName = "foo";
+        int memberEpoch = 10;
+        boolean isTransactional = false;
+
+        ConsumerGroup group = createConsumerGroup("group-foo");
+
+        group.updateMember(new ConsumerGroupMember.Builder("member-id")
+            .setMemberEpoch(memberEpoch)
+            .setSubscribedTopicNames(List.of(topicName))
+            .setPartitionsPendingRevocation(mkAssignmentWithEpochs(
+                mkTopicAssignmentWithEpochs(topicId, 3, 0),
+                mkTopicAssignmentWithEpochs(topicId, 5, 1),
+                mkTopicAssignmentWithEpochs(topicId, 8, 2)))
+            .build());
+
+        // with clientEpoch=6: partitions 0,1 should pass, partition 2 should 
fail
+        if (version >= 9) {
+            int clientEpoch = 6;
+            CommitPartitionValidator validator = group.validateOffsetCommit(
+                "member-id", "", clientEpoch, isTransactional, version
+            );
+
+            // For partition 0  and 1, assignment epoch (3 or 5) < client 
epoch (6) < broker epoch (10), accept
+            assertDoesNotThrow(() -> validator.validate(topicName, topicId, 
0));
+            assertDoesNotThrow(() -> validator.validate(topicName, topicId, 
1));
+
+            // For partition 2, assignment epoch (8) > client epoch (6), reject
+            StaleMemberEpochException ex = 
assertThrows(StaleMemberEpochException.class, () ->
+                validator.validate(topicName, topicId, 2));
+            assertEquals(
+                String.format("Received member epoch %d is older than 
assignment epoch %d for partition %s-%d.",
+                    clientEpoch, 8, topicName, 2),
+                ex.getMessage()
+            );
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", 6, 
isTransactional, version));
+        }
+    }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+    public void 
testValidateOffsetCommitPrioritizeAssignedOverPendingRevocation(short version) {

Review Comment:
   I'm not sure this test is necessary? It's not clear to me what it's testing.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -1278,6 +1285,59 @@ public void 
testStreamsGroupOffsetCommitFromAdminClient() {
         verifyOffsetCommitFromAdminClient(context);
     }
 
+    @Test
+    public void testConsumerGroupOffsetCommitWithZeroUuidResolvesTopicId() {
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        MetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(barTopicId, barTopicName, 3)
+            .build();
+
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder()
+            .withMetadataImage(metadataImage)
+            .build();
+
+        // Create an empty group.
+        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(
+            "foo",
+            true
+        );
+
+        group.updateMember(new ConsumerGroupMember.Builder("member")
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(10)
+            .setAssignedPartitions(Map.of(barTopicId, Map.of(0, 5)))
+            .build()
+        );
+
+        OffsetCommitRequestData request = new OffsetCommitRequestData()
+            .setGroupId("foo")
+            .setMemberId("member")
+            .setGenerationIdOrMemberEpoch(3) // stale member epoch
+            .setTopics(List.of(
+                new OffsetCommitRequestData.OffsetCommitRequestTopic()
+                    .setName(barTopicName)
+                    .setTopicId(Uuid.ZERO_UUID) // ZERO_UUID topic_id
+                    .setPartitions(List.of(
+                        new 
OffsetCommitRequestData.OffsetCommitRequestPartition()
+                            .setPartitionIndex(0)
+                            .setCommittedOffset(100L)
+                    ))
+            ));
+
+        // client epoch (3) < assignment epoch (5), fail
+        assertThrows(StaleMemberEpochException.class, () -> 
context.commitOffset(request));

Review Comment:
   We can move the `setGenerationIdOrMemberEpoch` setup closer.
   ```suggestion
           request.setGenerationIdOrMemberEpoch(3);
           assertThrows(StaleMemberEpochException.class, () -> 
context.commitOffset(request));
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -2160,4 +2159,327 @@ public void 
testComputeMetadataHashIgnoreTopicHashIfItIsNotInMetadataImage() {
             cache
         );
     }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+    public void testValidateOffsetCommitWithAssignmentEpochValidation(short 
version) {
+        Uuid topicId = Uuid.randomUuid();
+        String topicName = "foo";
+        int partitionId = 0;
+        int memberEpoch = 10;
+        int assignmentEpoch = 7;
+        boolean isTransactional = false;
+
+        ConsumerGroup group = createConsumerGroup("group-foo");
+
+        group.updateMember(new ConsumerGroupMember.Builder("member-id")
+            .setMemberEpoch(memberEpoch)
+            .setSubscribedTopicNames(List.of(topicName))

Review Comment:
   I don't think we need to set up the subscribed topic names?
   ```suggestion
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -2160,4 +2159,327 @@ public void 
testComputeMetadataHashIgnoreTopicHashIfItIsNotInMetadataImage() {
             cache
         );
     }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+    public void testValidateOffsetCommitWithAssignmentEpochValidation(short 
version) {
+        Uuid topicId = Uuid.randomUuid();
+        String topicName = "foo";
+        int partitionId = 0;
+        int memberEpoch = 10;
+        int assignmentEpoch = 7;
+        boolean isTransactional = false;
+
+        ConsumerGroup group = createConsumerGroup("group-foo");
+
+        group.updateMember(new ConsumerGroupMember.Builder("member-id")
+            .setMemberEpoch(memberEpoch)
+            .setSubscribedTopicNames(List.of(topicName))
+            .setAssignedPartitions(mkAssignmentWithEpochs(
+                mkTopicAssignmentWithEpochs(topicId, assignmentEpoch, 
partitionId)))
+            .build());
+
+        // client epoch = broker epoch

Review Comment:
   style nit: Could we write these as short sentences?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -2160,4 +2159,327 @@ public void 
testComputeMetadataHashIgnoreTopicHashIfItIsNotInMetadataImage() {
             cache
         );
     }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+    public void testValidateOffsetCommitWithAssignmentEpochValidation(short 
version) {
+        Uuid topicId = Uuid.randomUuid();
+        String topicName = "foo";
+        int partitionId = 0;
+        int memberEpoch = 10;
+        int assignmentEpoch = 7;
+        boolean isTransactional = false;
+
+        ConsumerGroup group = createConsumerGroup("group-foo");
+
+        group.updateMember(new ConsumerGroupMember.Builder("member-id")
+            .setMemberEpoch(memberEpoch)
+            .setSubscribedTopicNames(List.of(topicName))
+            .setAssignedPartitions(mkAssignmentWithEpochs(
+                mkTopicAssignmentWithEpochs(topicId, assignmentEpoch, 
partitionId)))
+            .build());
+
+        // client epoch = broker epoch
+        if (version >= 9) {
+            CommitPartitionValidator validator = group.validateOffsetCommit(
+                "member-id", "", memberEpoch, isTransactional, version
+            );
+            assertDoesNotThrow(() -> validator.validate(topicName, topicId, 
partitionId));
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", memberEpoch, 
isTransactional, version));
+        }
+        // client epoch (11) > broker epoch (10) - exception thrown directly 
from validateOffsetCommit
+        if (version >= 9) {
+            int clientEpoch = memberEpoch + 1;
+            StaleMemberEpochException ex = 
assertThrows(StaleMemberEpochException.class, () ->
+                group.validateOffsetCommit("member-id", "", clientEpoch, 
isTransactional, version));
+            assertEquals(
+                String.format("Received member epoch %d is newer than "
+                    + "current member epoch %d.", clientEpoch, memberEpoch),
+                ex.getMessage()
+            );
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", memberEpoch + 1, 
isTransactional, version));
+        }
+
+        // assignment epoch (7) <= client epoch (7) <= broker epoch (10)
+        if (version >= 9) {
+            CommitPartitionValidator validator = group.validateOffsetCommit(
+                "member-id", "", assignmentEpoch, isTransactional, version
+            );
+            assertDoesNotThrow(() -> validator.validate(topicName, topicId, 
partitionId));
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", assignmentEpoch, 
isTransactional, version));
+        }
+
+        // client epoch (6) != broker epoch (10) and client epoch (6) < 
assignment epoch (7)
+        if (version >= 9) {
+            CommitPartitionValidator validator = group.validateOffsetCommit(
+                "member-id", "", assignmentEpoch - 1, isTransactional, version
+            );
+            StaleMemberEpochException ex = 
assertThrows(StaleMemberEpochException.class, () ->
+                validator.validate(topicName, topicId, partitionId));
+            assertEquals(
+                String.format("Received member epoch %d is older than 
assignment epoch %d for partition %s-%d.",
+                    assignmentEpoch - 1, assignmentEpoch, topicName, 
partitionId),
+                ex.getMessage()
+            );
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", assignmentEpoch - 
1, isTransactional, version));
+        }
+    }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+    public void testValidateOffsetCommitWithPartitionPendingRevocation(short 
version) {
+        Uuid topicId = Uuid.randomUuid();
+        String topicName = "foo";
+        int partitionId = 0;
+        int memberEpoch = 10;
+        int assignmentEpoch = 7;
+        boolean isTransactional = false;
+
+        ConsumerGroup group = createConsumerGroup("group-foo");
+
+        group.updateMember(new ConsumerGroupMember.Builder("member-id")
+            .setMemberEpoch(memberEpoch)
+            .setSubscribedTopicNames(List.of(topicName))
+            .setPartitionsPendingRevocation(mkAssignmentWithEpochs(
+                mkTopicAssignmentWithEpochs(topicId, assignmentEpoch, 
partitionId)))
+            .build());
+
+        // client epoch (10) == broker epoch (10), no exception thrown
+        if (version >= 9) {
+            CommitPartitionValidator validator = group.validateOffsetCommit(
+                "member-id", "", memberEpoch, isTransactional, version
+            );
+            assertDoesNotThrow(() -> validator.validate(topicName, topicId, 
partitionId));
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", memberEpoch, 
isTransactional, version));
+        }
+
+        // client epoch (11) > broker epoch (10) - exception thrown directly 
from validateOffsetCommit
+        if (version >= 9) {
+            StaleMemberEpochException ex = 
assertThrows(StaleMemberEpochException.class, () ->
+                group.validateOffsetCommit("member-id", "", memberEpoch + 1, 
isTransactional, version));
+            assertEquals(
+                String.format("Received member epoch %d is newer than "
+                    + "current member epoch %d.", memberEpoch + 1, 
memberEpoch),
+                ex.getMessage()
+            );
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", memberEpoch + 1, 
isTransactional, version));
+        }
+
+        // partition epoch <= client epoch <= broker epoch
+        if (version >= 9) {
+            CommitPartitionValidator validator = group.validateOffsetCommit(
+                "member-id", "", assignmentEpoch, isTransactional, version
+            );
+            assertDoesNotThrow(() -> validator.validate(topicName, topicId, 
partitionId));
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", assignmentEpoch, 
isTransactional, version));
+        }
+
+        // client epoch != broker epoch and client epoch < partition epoch
+        if (version >= 9) {
+            CommitPartitionValidator validator = group.validateOffsetCommit(
+                "member-id", "", assignmentEpoch - 1, isTransactional, version
+            );
+            StaleMemberEpochException ex = 
assertThrows(StaleMemberEpochException.class, () ->
+                validator.validate(topicName, topicId, partitionId));
+            assertEquals(
+                String.format("Received member epoch %d is older than 
assignment epoch %d for partition %s-%d.",
+                    assignmentEpoch - 1, assignmentEpoch, topicName, 
partitionId),
+                ex.getMessage()
+            );
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", assignmentEpoch - 
1, isTransactional, version));
+        }
+    }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+    public void testValidateOffsetCommitWithUnassignedPartition(short version) 
{
+        Uuid assignedTopicId = Uuid.randomUuid();
+        Uuid unassignedTopicId = Uuid.randomUuid();
+        String unassignedTopicName = "bar";
+        int partitionId = 0;
+        int memberEpoch = 10;
+        int assignmentEpoch = 7;
+        boolean isTransactional = false;
+
+        ConsumerGroup group = createConsumerGroup("group-foo");
+
+        group.updateMember(new ConsumerGroupMember.Builder("member-id")
+            .setMemberEpoch(memberEpoch)
+            .setAssignedPartitions(mkAssignmentWithEpochs(
+                mkTopicAssignmentWithEpochs(assignedTopicId, assignmentEpoch, 
partitionId)))
+            .setPartitionsPendingRevocation(mkAssignmentWithEpochs(
+                    mkTopicAssignmentWithEpochs(assignedTopicId, 
assignmentEpoch, partitionId + 1)))
+            .build());
+
+        // Commit an unassigned partition
+        if (version >= 9) {
+            CommitPartitionValidator validator = group.validateOffsetCommit(
+                "member-id", "", assignmentEpoch, isTransactional, version
+            );
+            StaleMemberEpochException ex = 
assertThrows(StaleMemberEpochException.class, () ->
+                validator.validate(unassignedTopicName, unassignedTopicId, 
partitionId));

Review Comment:
   Can we test both an unassigned topic and unassigned partition?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -2160,4 +2159,327 @@ public void 
testComputeMetadataHashIgnoreTopicHashIfItIsNotInMetadataImage() {
             cache
         );
     }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+    public void testValidateOffsetCommitWithAssignmentEpochValidation(short 
version) {
+        Uuid topicId = Uuid.randomUuid();
+        String topicName = "foo";
+        int partitionId = 0;
+        int memberEpoch = 10;
+        int assignmentEpoch = 7;
+        boolean isTransactional = false;
+
+        ConsumerGroup group = createConsumerGroup("group-foo");
+
+        group.updateMember(new ConsumerGroupMember.Builder("member-id")
+            .setMemberEpoch(memberEpoch)
+            .setSubscribedTopicNames(List.of(topicName))
+            .setAssignedPartitions(mkAssignmentWithEpochs(
+                mkTopicAssignmentWithEpochs(topicId, assignmentEpoch, 
partitionId)))
+            .build());
+
+        // client epoch = broker epoch
+        if (version >= 9) {
+            CommitPartitionValidator validator = group.validateOffsetCommit(
+                "member-id", "", memberEpoch, isTransactional, version
+            );
+            assertDoesNotThrow(() -> validator.validate(topicName, topicId, 
partitionId));
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", memberEpoch, 
isTransactional, version));
+        }
+        // client epoch (11) > broker epoch (10) - exception thrown directly 
from validateOffsetCommit
+        if (version >= 9) {
+            int clientEpoch = memberEpoch + 1;
+            StaleMemberEpochException ex = 
assertThrows(StaleMemberEpochException.class, () ->
+                group.validateOffsetCommit("member-id", "", clientEpoch, 
isTransactional, version));
+            assertEquals(
+                String.format("Received member epoch %d is newer than "
+                    + "current member epoch %d.", clientEpoch, memberEpoch),
+                ex.getMessage()
+            );
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", memberEpoch + 1, 
isTransactional, version));
+        }
+
+        // assignment epoch (7) <= client epoch (7) <= broker epoch (10)
+        if (version >= 9) {
+            CommitPartitionValidator validator = group.validateOffsetCommit(
+                "member-id", "", assignmentEpoch, isTransactional, version
+            );
+            assertDoesNotThrow(() -> validator.validate(topicName, topicId, 
partitionId));
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", assignmentEpoch, 
isTransactional, version));
+        }
+
+        // client epoch (6) != broker epoch (10) and client epoch (6) < 
assignment epoch (7)
+        if (version >= 9) {
+            CommitPartitionValidator validator = group.validateOffsetCommit(
+                "member-id", "", assignmentEpoch - 1, isTransactional, version
+            );
+            StaleMemberEpochException ex = 
assertThrows(StaleMemberEpochException.class, () ->
+                validator.validate(topicName, topicId, partitionId));
+            assertEquals(
+                String.format("Received member epoch %d is older than 
assignment epoch %d for partition %s-%d.",
+                    assignmentEpoch - 1, assignmentEpoch, topicName, 
partitionId),
+                ex.getMessage()
+            );
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", assignmentEpoch - 
1, isTransactional, version));
+        }
+    }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+    public void testValidateOffsetCommitWithPartitionPendingRevocation(short 
version) {
+        Uuid topicId = Uuid.randomUuid();
+        String topicName = "foo";
+        int partitionId = 0;
+        int memberEpoch = 10;
+        int assignmentEpoch = 7;
+        boolean isTransactional = false;
+
+        ConsumerGroup group = createConsumerGroup("group-foo");
+
+        group.updateMember(new ConsumerGroupMember.Builder("member-id")
+            .setMemberEpoch(memberEpoch)
+            .setSubscribedTopicNames(List.of(topicName))
+            .setPartitionsPendingRevocation(mkAssignmentWithEpochs(
+                mkTopicAssignmentWithEpochs(topicId, assignmentEpoch, 
partitionId)))
+            .build());
+
+        // client epoch (10) == broker epoch (10), no exception thrown
+        if (version >= 9) {
+            CommitPartitionValidator validator = group.validateOffsetCommit(
+                "member-id", "", memberEpoch, isTransactional, version
+            );
+            assertDoesNotThrow(() -> validator.validate(topicName, topicId, 
partitionId));
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", memberEpoch, 
isTransactional, version));
+        }
+
+        // client epoch (11) > broker epoch (10) - exception thrown directly 
from validateOffsetCommit
+        if (version >= 9) {
+            StaleMemberEpochException ex = 
assertThrows(StaleMemberEpochException.class, () ->
+                group.validateOffsetCommit("member-id", "", memberEpoch + 1, 
isTransactional, version));
+            assertEquals(
+                String.format("Received member epoch %d is newer than "
+                    + "current member epoch %d.", memberEpoch + 1, 
memberEpoch),
+                ex.getMessage()
+            );
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", memberEpoch + 1, 
isTransactional, version));
+        }
+
+        // partition epoch <= client epoch <= broker epoch
+        if (version >= 9) {
+            CommitPartitionValidator validator = group.validateOffsetCommit(
+                "member-id", "", assignmentEpoch, isTransactional, version
+            );
+            assertDoesNotThrow(() -> validator.validate(topicName, topicId, 
partitionId));
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", assignmentEpoch, 
isTransactional, version));
+        }
+
+        // client epoch != broker epoch and client epoch < partition epoch
+        if (version >= 9) {
+            CommitPartitionValidator validator = group.validateOffsetCommit(
+                "member-id", "", assignmentEpoch - 1, isTransactional, version
+            );
+            StaleMemberEpochException ex = 
assertThrows(StaleMemberEpochException.class, () ->
+                validator.validate(topicName, topicId, partitionId));
+            assertEquals(
+                String.format("Received member epoch %d is older than 
assignment epoch %d for partition %s-%d.",
+                    assignmentEpoch - 1, assignmentEpoch, topicName, 
partitionId),
+                ex.getMessage()
+            );
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", assignmentEpoch - 
1, isTransactional, version));
+        }
+    }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+    public void testValidateOffsetCommitWithUnassignedPartition(short version) 
{
+        Uuid assignedTopicId = Uuid.randomUuid();
+        Uuid unassignedTopicId = Uuid.randomUuid();
+        String unassignedTopicName = "bar";
+        int partitionId = 0;
+        int memberEpoch = 10;
+        int assignmentEpoch = 7;
+        boolean isTransactional = false;
+
+        ConsumerGroup group = createConsumerGroup("group-foo");
+
+        group.updateMember(new ConsumerGroupMember.Builder("member-id")
+            .setMemberEpoch(memberEpoch)
+            .setAssignedPartitions(mkAssignmentWithEpochs(
+                mkTopicAssignmentWithEpochs(assignedTopicId, assignmentEpoch, 
partitionId)))
+            .setPartitionsPendingRevocation(mkAssignmentWithEpochs(
+                    mkTopicAssignmentWithEpochs(assignedTopicId, 
assignmentEpoch, partitionId + 1)))

Review Comment:
   ```suggestion
                   mkTopicAssignmentWithEpochs(assignedTopicId, 
assignmentEpoch, partitionId + 1)))
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -2160,4 +2159,327 @@ public void 
testComputeMetadataHashIgnoreTopicHashIfItIsNotInMetadataImage() {
             cache
         );
     }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+    public void testValidateOffsetCommitWithAssignmentEpochValidation(short 
version) {
+        Uuid topicId = Uuid.randomUuid();
+        String topicName = "foo";
+        int partitionId = 0;
+        int memberEpoch = 10;
+        int assignmentEpoch = 7;
+        boolean isTransactional = false;
+
+        ConsumerGroup group = createConsumerGroup("group-foo");
+
+        group.updateMember(new ConsumerGroupMember.Builder("member-id")
+            .setMemberEpoch(memberEpoch)
+            .setSubscribedTopicNames(List.of(topicName))
+            .setAssignedPartitions(mkAssignmentWithEpochs(
+                mkTopicAssignmentWithEpochs(topicId, assignmentEpoch, 
partitionId)))
+            .build());
+
+        // client epoch = broker epoch
+        if (version >= 9) {
+            CommitPartitionValidator validator = group.validateOffsetCommit(
+                "member-id", "", memberEpoch, isTransactional, version
+            );
+            assertDoesNotThrow(() -> validator.validate(topicName, topicId, 
partitionId));
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", memberEpoch, 
isTransactional, version));
+        }
+        // client epoch (11) > broker epoch (10) - exception thrown directly 
from validateOffsetCommit

Review Comment:
   missing newline
   ```suggestion
   
           // client epoch (11) > broker epoch (10) - exception thrown directly 
from validateOffsetCommit
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -1278,6 +1285,59 @@ public void 
testStreamsGroupOffsetCommitFromAdminClient() {
         verifyOffsetCommitFromAdminClient(context);
     }
 
+    @Test
+    public void testConsumerGroupOffsetCommitWithZeroUuidResolvesTopicId() {

Review Comment:
   Could we move this method down one?
   `verifyOffsetCommitFromAdminClient` is supposed to be grouped together with 
`testConsumerGroupOffsetCommitFromAdminClient` and 
`testStreamsGroupOffsetCommitFromAdminClient`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -630,23 +631,31 @@ public CoordinatorResult<OffsetCommitResponseData, 
CoordinatorRecord> commitOffs
                         .setPartitionIndex(partition.partitionIndex())
                         
.setErrorCode(Errors.OFFSET_METADATA_TOO_LARGE.code()));
                 } else {
+                    // Resolve topic ID if it's ZERO_UUID
+                    Uuid resolvedTopicId = topic.topicId();
+                    if (resolvedTopicId.equals(Uuid.ZERO_UUID)) {
+                        resolvedTopicId = groupMetadataManager.image()

Review Comment:
   We shouldn't reach inside groupMetadataManager if we want the metadata image.
   
   `OffsetMetadataManager` already takes a metadata image as a constructor 
parameter. We should store it ourselves and update the type to follow the same 
pattern as `GroupMetadataManager`. We will have to implement an 
`onMetadataUpdate` method too.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -2160,4 +2159,327 @@ public void 
testComputeMetadataHashIgnoreTopicHashIfItIsNotInMetadataImage() {
             cache
         );
     }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+    public void testValidateOffsetCommitWithAssignmentEpochValidation(short 
version) {
+        Uuid topicId = Uuid.randomUuid();
+        String topicName = "foo";
+        int partitionId = 0;
+        int memberEpoch = 10;
+        int assignmentEpoch = 7;
+        boolean isTransactional = false;

Review Comment:
   Since the main driver for the KIP was transactional offset commits, it's 
best that we test both transactional and non-transactional offset commits.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##########
@@ -2160,4 +2159,327 @@ public void 
testComputeMetadataHashIgnoreTopicHashIfItIsNotInMetadataImage() {
             cache
         );
     }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+    public void testValidateOffsetCommitWithAssignmentEpochValidation(short 
version) {
+        Uuid topicId = Uuid.randomUuid();
+        String topicName = "foo";
+        int partitionId = 0;
+        int memberEpoch = 10;
+        int assignmentEpoch = 7;
+        boolean isTransactional = false;
+
+        ConsumerGroup group = createConsumerGroup("group-foo");
+
+        group.updateMember(new ConsumerGroupMember.Builder("member-id")
+            .setMemberEpoch(memberEpoch)
+            .setSubscribedTopicNames(List.of(topicName))
+            .setAssignedPartitions(mkAssignmentWithEpochs(
+                mkTopicAssignmentWithEpochs(topicId, assignmentEpoch, 
partitionId)))
+            .build());
+
+        // client epoch = broker epoch
+        if (version >= 9) {
+            CommitPartitionValidator validator = group.validateOffsetCommit(
+                "member-id", "", memberEpoch, isTransactional, version
+            );
+            assertDoesNotThrow(() -> validator.validate(topicName, topicId, 
partitionId));
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", memberEpoch, 
isTransactional, version));
+        }
+        // client epoch (11) > broker epoch (10) - exception thrown directly 
from validateOffsetCommit
+        if (version >= 9) {
+            int clientEpoch = memberEpoch + 1;
+            StaleMemberEpochException ex = 
assertThrows(StaleMemberEpochException.class, () ->
+                group.validateOffsetCommit("member-id", "", clientEpoch, 
isTransactional, version));
+            assertEquals(
+                String.format("Received member epoch %d is newer than "
+                    + "current member epoch %d.", clientEpoch, memberEpoch),
+                ex.getMessage()
+            );
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", memberEpoch + 1, 
isTransactional, version));
+        }
+
+        // assignment epoch (7) <= client epoch (7) <= broker epoch (10)
+        if (version >= 9) {
+            CommitPartitionValidator validator = group.validateOffsetCommit(
+                "member-id", "", assignmentEpoch, isTransactional, version
+            );
+            assertDoesNotThrow(() -> validator.validate(topicName, topicId, 
partitionId));
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", assignmentEpoch, 
isTransactional, version));
+        }
+
+        // client epoch (6) != broker epoch (10) and client epoch (6) < 
assignment epoch (7)
+        if (version >= 9) {
+            CommitPartitionValidator validator = group.validateOffsetCommit(
+                "member-id", "", assignmentEpoch - 1, isTransactional, version
+            );
+            StaleMemberEpochException ex = 
assertThrows(StaleMemberEpochException.class, () ->
+                validator.validate(topicName, topicId, partitionId));
+            assertEquals(
+                String.format("Received member epoch %d is older than 
assignment epoch %d for partition %s-%d.",
+                    assignmentEpoch - 1, assignmentEpoch, topicName, 
partitionId),
+                ex.getMessage()
+            );
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", assignmentEpoch - 
1, isTransactional, version));
+        }
+    }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+    public void testValidateOffsetCommitWithPartitionPendingRevocation(short 
version) {
+        Uuid topicId = Uuid.randomUuid();
+        String topicName = "foo";
+        int partitionId = 0;
+        int memberEpoch = 10;
+        int assignmentEpoch = 7;
+        boolean isTransactional = false;
+
+        ConsumerGroup group = createConsumerGroup("group-foo");
+
+        group.updateMember(new ConsumerGroupMember.Builder("member-id")
+            .setMemberEpoch(memberEpoch)
+            .setSubscribedTopicNames(List.of(topicName))
+            .setPartitionsPendingRevocation(mkAssignmentWithEpochs(
+                mkTopicAssignmentWithEpochs(topicId, assignmentEpoch, 
partitionId)))
+            .build());
+
+        // client epoch (10) == broker epoch (10), no exception thrown
+        if (version >= 9) {
+            CommitPartitionValidator validator = group.validateOffsetCommit(
+                "member-id", "", memberEpoch, isTransactional, version
+            );
+            assertDoesNotThrow(() -> validator.validate(topicName, topicId, 
partitionId));
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", memberEpoch, 
isTransactional, version));
+        }
+
+        // client epoch (11) > broker epoch (10) - exception thrown directly 
from validateOffsetCommit
+        if (version >= 9) {
+            StaleMemberEpochException ex = 
assertThrows(StaleMemberEpochException.class, () ->
+                group.validateOffsetCommit("member-id", "", memberEpoch + 1, 
isTransactional, version));
+            assertEquals(
+                String.format("Received member epoch %d is newer than "
+                    + "current member epoch %d.", memberEpoch + 1, 
memberEpoch),
+                ex.getMessage()
+            );
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", memberEpoch + 1, 
isTransactional, version));
+        }
+
+        // partition epoch <= client epoch <= broker epoch
+        if (version >= 9) {
+            CommitPartitionValidator validator = group.validateOffsetCommit(
+                "member-id", "", assignmentEpoch, isTransactional, version
+            );
+            assertDoesNotThrow(() -> validator.validate(topicName, topicId, 
partitionId));
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", assignmentEpoch, 
isTransactional, version));
+        }
+
+        // client epoch != broker epoch and client epoch < partition epoch
+        if (version >= 9) {
+            CommitPartitionValidator validator = group.validateOffsetCommit(
+                "member-id", "", assignmentEpoch - 1, isTransactional, version
+            );
+            StaleMemberEpochException ex = 
assertThrows(StaleMemberEpochException.class, () ->
+                validator.validate(topicName, topicId, partitionId));
+            assertEquals(
+                String.format("Received member epoch %d is older than 
assignment epoch %d for partition %s-%d.",
+                    assignmentEpoch - 1, assignmentEpoch, topicName, 
partitionId),
+                ex.getMessage()
+            );
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", assignmentEpoch - 
1, isTransactional, version));
+        }
+    }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+    public void testValidateOffsetCommitWithUnassignedPartition(short version) 
{
+        Uuid assignedTopicId = Uuid.randomUuid();
+        Uuid unassignedTopicId = Uuid.randomUuid();
+        String unassignedTopicName = "bar";
+        int partitionId = 0;
+        int memberEpoch = 10;
+        int assignmentEpoch = 7;
+        boolean isTransactional = false;
+
+        ConsumerGroup group = createConsumerGroup("group-foo");
+
+        group.updateMember(new ConsumerGroupMember.Builder("member-id")
+            .setMemberEpoch(memberEpoch)
+            .setAssignedPartitions(mkAssignmentWithEpochs(
+                mkTopicAssignmentWithEpochs(assignedTopicId, assignmentEpoch, 
partitionId)))
+            .setPartitionsPendingRevocation(mkAssignmentWithEpochs(
+                    mkTopicAssignmentWithEpochs(assignedTopicId, 
assignmentEpoch, partitionId + 1)))
+            .build());
+
+        // Commit an unassigned partition
+        if (version >= 9) {
+            CommitPartitionValidator validator = group.validateOffsetCommit(
+                "member-id", "", assignmentEpoch, isTransactional, version
+            );
+            StaleMemberEpochException ex = 
assertThrows(StaleMemberEpochException.class, () ->
+                validator.validate(unassignedTopicName, unassignedTopicId, 
partitionId));
+            assertEquals(
+                String.format("Partition %s-%d is not assigned or pending 
revocation for member.",
+                    unassignedTopicName, partitionId),
+                ex.getMessage()
+            );
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", assignmentEpoch, 
isTransactional, version));
+        }
+    }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+    public void testValidateOffsetCommitWithMultiplePartitionsAndEpochs(short 
version) {
+        Uuid topicId = Uuid.randomUuid();
+        String topicName = "foo";
+        int memberEpoch = 10;
+        boolean isTransactional = false;
+
+        ConsumerGroup group = createConsumerGroup("group-foo");
+
+        group.updateMember(new ConsumerGroupMember.Builder("member-id")
+            .setMemberEpoch(memberEpoch)
+            .setSubscribedTopicNames(List.of(topicName))
+            .setPartitionsPendingRevocation(mkAssignmentWithEpochs(
+                mkTopicAssignmentWithEpochs(topicId, 3, 0),
+                mkTopicAssignmentWithEpochs(topicId, 5, 1),
+                mkTopicAssignmentWithEpochs(topicId, 8, 2)))
+            .build());
+
+        // with clientEpoch=6: partitions 0,1 should pass, partition 2 should 
fail
+        if (version >= 9) {
+            int clientEpoch = 6;
+            CommitPartitionValidator validator = group.validateOffsetCommit(
+                "member-id", "", clientEpoch, isTransactional, version
+            );
+
+            // For partition 0  and 1, assignment epoch (3 or 5) < client 
epoch (6) < broker epoch (10), accept
+            assertDoesNotThrow(() -> validator.validate(topicName, topicId, 
0));
+            assertDoesNotThrow(() -> validator.validate(topicName, topicId, 
1));
+
+            // For partition 2, assignment epoch (8) > client epoch (6), reject
+            StaleMemberEpochException ex = 
assertThrows(StaleMemberEpochException.class, () ->
+                validator.validate(topicName, topicId, 2));
+            assertEquals(
+                String.format("Received member epoch %d is older than 
assignment epoch %d for partition %s-%d.",
+                    clientEpoch, 8, topicName, 2),
+                ex.getMessage()
+            );
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", 6, 
isTransactional, version));
+        }
+    }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+    public void 
testValidateOffsetCommitPrioritizeAssignedOverPendingRevocation(short version) {
+        Uuid topicId = Uuid.randomUuid();
+        String topicName = "foo";
+        int partitionId = 0;
+        int memberEpoch = 10;
+        int assignmentEpoch = 5;
+        boolean isTransactional = false;
+
+        ConsumerGroup group = createConsumerGroup("group-foo");
+
+        group.updateMember(new ConsumerGroupMember.Builder("member-id")
+            .setMemberEpoch(memberEpoch)
+            .setSubscribedTopicNames(List.of(topicName))
+            .setAssignedPartitions(mkAssignmentWithEpochs(
+                mkTopicAssignmentWithEpochs(topicId, assignmentEpoch, 
partitionId)
+            ))
+            .setPartitionsPendingRevocation(mkAssignmentWithEpochs(
+                mkTopicAssignmentWithEpochs(topicId, 4, 2),
+                mkTopicAssignmentWithEpochs(topicId, 7, 1)))
+            .build());
+
+        // clientEpoch (4)  < assignedPartitionEpoch (5), reject
+        if (version >= 9) {
+            CommitPartitionValidator validator = group.validateOffsetCommit(
+                "member-id", "", 4, isTransactional, version
+            );
+            StaleMemberEpochException ex = 
assertThrows(StaleMemberEpochException.class, () ->
+                validator.validate(topicName, topicId, partitionId));
+            assertEquals(
+                String.format("Received member epoch %d is older than 
assignment epoch %d for partition %s-%d.",
+                    4, assignmentEpoch, topicName, partitionId),
+                ex.getMessage()
+            );
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", 4, 
isTransactional, version));
+        }
+
+        // broker epoch (10) >= client epoch (6)  >= assigned partition epoch 
(5), accept
+        if (version >= 9) {
+            CommitPartitionValidator validator = group.validateOffsetCommit(
+                "member-id", "", 6, isTransactional, version
+            );
+            assertDoesNotThrow(() -> validator.validate(topicName, topicId, 
partitionId));
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("member-id", "", 6, 
isTransactional, version));
+        }
+    }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+    public void 
testValidateOffsetCommitWithMultiplePartitionsPendingRevocation(short version) {

Review Comment:
   I'm not sure this test is necessary?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to