This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6573b4ace18 KAFKA-19042 Move PlaintextConsumerCommitTest to 
client-integration-tests module (#19389)
6573b4ace18 is described below

commit 6573b4ace1884c358b8751a19a2eff7697964625
Author: Ken Huang <[email protected]>
AuthorDate: Mon May 19 21:51:42 2025 +0800

    KAFKA-19042 Move PlaintextConsumerCommitTest to client-integration-tests 
module (#19389)
    
    Use Java to rewrite `PlaintextConsumerCommitTest` by new test infra and
    move it to client-integration-tests module.
    
    Reviewers: PoAn Yang <[email protected]>, Chia-Ping Tsai
    <[email protected]>
---
 .../consumer/PlaintextConsumerCommitTest.java      | 594 +++++++++++++++++++++
 .../kafka/api/PlaintextConsumerCommitTest.scala    | 371 -------------
 2 files changed, 594 insertions(+), 371 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
new file mode 100644
index 00000000000..c3f0aedccc6
--- /dev/null
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
@@ -0,0 +1,594 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.TestUtils;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.test.MockConsumerInterceptor;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.kafka.clients.ClientsTestUtils.awaitAssignment;
+import static 
org.apache.kafka.clients.ClientsTestUtils.consumeAndVerifyRecords;
+import static org.apache.kafka.clients.ClientsTestUtils.sendRecords;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG;
+import static 
org.apache.kafka.clients.consumer.PlaintextConsumerCommitTest.BROKER_COUNT;
+import static 
org.apache.kafka.clients.consumer.PlaintextConsumerCommitTest.OFFSETS_TOPIC_PARTITIONS;
+import static 
org.apache.kafka.clients.consumer.PlaintextConsumerCommitTest.OFFSETS_TOPIC_REPLICATION;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ClusterTestDefaults(
+    types = {Type.KRAFT},
+    brokers = BROKER_COUNT,
+    serverProperties = {
+        @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = 
OFFSETS_TOPIC_PARTITIONS),
+        @ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, 
value = OFFSETS_TOPIC_REPLICATION),
+        @ClusterConfigProperty(key = GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 
value = "100"),
+    }
+)
+public class PlaintextConsumerCommitTest {
+
+    public static final int BROKER_COUNT = 3;
+    public static final String OFFSETS_TOPIC_PARTITIONS = "1";
+    public static final String OFFSETS_TOPIC_REPLICATION = "3";
+    private final ClusterInstance cluster;
+    private final String topic = "topic";
+    private final TopicPartition tp = new TopicPartition(topic, 0);
+    private final TopicPartition tp1 = new TopicPartition(topic, 1);
+
+    public PlaintextConsumerCommitTest(ClusterInstance clusterInstance) {
+        this.cluster = clusterInstance;
+    }
+
+    @BeforeEach
+    public void setup() throws InterruptedException {
+        cluster.createTopic(topic, 2, (short) BROKER_COUNT);
+    }
+
+    @ClusterTest
+    public void testClassicConsumerAutoCommitOnClose() throws 
InterruptedException {
+        testAutoCommitOnClose(GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerAutoCommitOnClose() throws 
InterruptedException {
+        testAutoCommitOnClose(GroupProtocol.CONSUMER);
+    }
+
+    private void testAutoCommitOnClose(GroupProtocol groupProtocol) throws 
InterruptedException {
+        try (var consumer = createConsumer(groupProtocol, true)) {
+            sendRecords(cluster, tp, 1000);
+
+            consumer.subscribe(List.of(topic));
+            awaitAssignment(consumer, Set.of(tp, tp1));
+            // should auto-commit sought positions before closing
+            consumer.seek(tp, 300);
+            consumer.seek(tp1, 500);
+        }
+
+        // now we should see the committed positions from another consumer
+        try (var anotherConsumer = createConsumer(groupProtocol, true)) {
+            assertEquals(300, 
anotherConsumer.committed(Set.of(tp)).get(tp).offset());
+            assertEquals(500, 
anotherConsumer.committed(Set.of(tp1)).get(tp1).offset());
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerAutoCommitOnCloseAfterWakeup() throws 
InterruptedException {
+        testAutoCommitOnCloseAfterWakeup(GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerAutoCommitOnCloseAfterWakeup() throws 
InterruptedException {
+        testAutoCommitOnCloseAfterWakeup(GroupProtocol.CONSUMER);
+    }
+
+    private void testAutoCommitOnCloseAfterWakeup(GroupProtocol groupProtocol) 
throws InterruptedException {
+        try (var consumer = createConsumer(groupProtocol, true)) {
+            sendRecords(cluster, tp, 1000);
+
+            consumer.subscribe(List.of(topic));
+            awaitAssignment(consumer, Set.of(tp, tp1));
+
+            // should auto-commit sought positions before closing
+            consumer.seek(tp, 300);
+            consumer.seek(tp1, 500);
+
+            // wakeup the consumer before closing to simulate trying to break 
a poll
+            // loop from another thread
+            consumer.wakeup();
+        }
+
+        // now we should see the committed positions from another consumer
+        try (var anotherConsumer = createConsumer(groupProtocol, true)) {
+            assertEquals(300, 
anotherConsumer.committed(Set.of(tp)).get(tp).offset());
+            assertEquals(500, 
anotherConsumer.committed(Set.of(tp1)).get(tp1).offset());
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerCommitMetadata() throws 
InterruptedException {
+        testCommitMetadata(GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerCommitMetadata() throws InterruptedException {
+        testCommitMetadata(GroupProtocol.CONSUMER);
+    }
+
+    private void testCommitMetadata(GroupProtocol groupProtocol) throws 
InterruptedException {
+        try (var consumer = createConsumer(groupProtocol, true)) {
+            consumer.assign(List.of(tp));
+            // sync commit
+            var syncMetadata = new OffsetAndMetadata(5, Optional.of(15), 
"foo");
+            consumer.commitSync(Map.of(tp, syncMetadata));
+            assertEquals(syncMetadata, consumer.committed(Set.of(tp)).get(tp));
+
+            // async commit
+            var asyncMetadata = new OffsetAndMetadata(10, "bar");
+            sendAndAwaitAsyncCommit(consumer, Map.of(tp, asyncMetadata));
+            assertEquals(asyncMetadata, 
consumer.committed(Set.of(tp)).get(tp));
+
+            // handle null metadata
+            var nullMetadata = new OffsetAndMetadata(5, null);
+            consumer.commitSync(Map.of(tp, nullMetadata));
+            assertEquals(nullMetadata, consumer.committed(Set.of(tp)).get(tp));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerAsyncCommit() throws InterruptedException {
+        testAsyncCommit(GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerAsyncCommit() throws InterruptedException {
+        testAsyncCommit(GroupProtocol.CONSUMER);
+    }
+
+    private void testAsyncCommit(GroupProtocol groupProtocol) throws 
InterruptedException {
+        // Ensure the __consumer_offsets topic is created to prevent transient 
issues,
+        // such as RetriableCommitFailedException during async offset commits.
+        cluster.createTopic(
+            Topic.GROUP_METADATA_TOPIC_NAME,
+            Integer.parseInt(OFFSETS_TOPIC_PARTITIONS),
+            Short.parseShort(OFFSETS_TOPIC_REPLICATION)
+        );
+        try (var consumer = createConsumer(groupProtocol, false)) {
+            consumer.assign(List.of(tp));
+
+            var callback = new CountConsumerCommitCallback();
+            var count = 5;
+            for (var i = 1; i <= count; i++)
+                consumer.commitAsync(Map.of(tp, new OffsetAndMetadata(i)), 
callback);
+
+            TestUtils.waitForCondition(() -> {
+                consumer.poll(Duration.ofMillis(100));
+                return callback.successCount >= count || 
callback.lastError.isPresent();
+            }, "Failed to observe commit callback before timeout");
+
+            assertEquals(Optional.empty(), callback.lastError);
+            assertEquals(count, callback.successCount);
+            assertEquals(new OffsetAndMetadata(count), 
consumer.committed(Set.of(tp)).get(tp));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerAutoCommitIntercept() throws 
InterruptedException {
+        testAutoCommitIntercept(GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerAutoCommitIntercept() throws 
InterruptedException {
+        testAutoCommitIntercept(GroupProtocol.CONSUMER);
+    }
+
+    private void testAutoCommitIntercept(GroupProtocol groupProtocol) throws 
InterruptedException {
+        var topic2 = "topic2";
+        cluster.createTopic(topic2, 2, (short) 3);
+        var numRecords = 100;
+        try (var producer = cluster.producer();
+             // create consumer with interceptor
+             Consumer<byte[], byte[]> consumer = cluster.consumer(Map.of(
+                 GROUP_PROTOCOL_CONFIG, 
groupProtocol.name().toLowerCase(Locale.ROOT),
+                 ENABLE_AUTO_COMMIT_CONFIG, "true",
+                 INTERCEPTOR_CLASSES_CONFIG, 
"org.apache.kafka.test.MockConsumerInterceptor"
+             ))
+        ) {
+            // produce records
+            for (var i = 0; i < numRecords; i++) {
+                producer.send(new ProducerRecord<>(tp.topic(), tp.partition(), 
("key " + i).getBytes(), ("value " + i).getBytes()));
+            }
+
+            var rebalanceListener = new ConsumerRebalanceListener() {
+                @Override
+                public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
+                    // keep partitions paused in this test so that we can 
verify the commits based on specific seeks
+                    consumer.pause(partitions);
+                }
+                @Override
+                public void onPartitionsRevoked(Collection<TopicPartition> 
partitions) {
+                    // No-op
+                }
+            };
+
+            changeConsumerSubscriptionAndValidateAssignment(
+                consumer,
+                List.of(topic),
+                Set.of(tp, tp1),
+                rebalanceListener
+            );
+            consumer.seek(tp, 10);
+            consumer.seek(tp1, 20);
+
+            // change subscription to trigger rebalance
+            var commitCountBeforeRebalance = 
MockConsumerInterceptor.ON_COMMIT_COUNT.intValue();
+            var expectedAssignment = Set.of(tp, tp1, new 
TopicPartition(topic2, 0), new TopicPartition(topic2, 1));
+            changeConsumerSubscriptionAndValidateAssignment(
+                consumer,
+                List.of(topic, topic2),
+                expectedAssignment,
+                rebalanceListener
+            );
+
+            // after rebalancing, we should have reset to the committed 
positions
+            var committed1 = consumer.committed(Set.of(tp));
+            assertEquals(10, committed1.get(tp).offset());
+            var committed2 = consumer.committed(Set.of(tp1));
+            assertEquals(20, committed2.get(tp1).offset());
+
+            // In both CLASSIC and CONSUMER protocols, interceptors are 
executed in poll and close.
+            // However, in the CONSUMER protocol, the assignment may be 
changed outside a poll, so
+            // we need to poll once to ensure the interceptor is called.
+            if (groupProtocol == GroupProtocol.CONSUMER) {
+                consumer.poll(Duration.ZERO);
+            }
+
+            assertTrue(MockConsumerInterceptor.ON_COMMIT_COUNT.intValue() > 
commitCountBeforeRebalance);
+
+            // verify commits are intercepted on close
+            var commitCountBeforeClose = 
MockConsumerInterceptor.ON_COMMIT_COUNT.intValue();
+            consumer.close();
+            assertTrue(MockConsumerInterceptor.ON_COMMIT_COUNT.intValue() > 
commitCountBeforeClose);
+            producer.close();
+            // cleanup
+            MockConsumerInterceptor.resetCounters();
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerCommitSpecifiedOffsets() throws 
InterruptedException {
+        testCommitSpecifiedOffsets(GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerCommitSpecifiedOffsets() throws 
InterruptedException {
+        testCommitSpecifiedOffsets(GroupProtocol.CONSUMER);
+    }
+
+    private void testCommitSpecifiedOffsets(GroupProtocol groupProtocol) 
throws InterruptedException {
+        try (Producer<byte[], byte[]> producer = cluster.producer();
+             var consumer = createConsumer(groupProtocol, false)
+        ) {
+            sendRecords(producer, tp, 5, System.currentTimeMillis());
+            sendRecords(producer, tp1, 7, System.currentTimeMillis());
+
+            consumer.assign(List.of(tp, tp1));
+
+            var pos1 = consumer.position(tp);
+            var pos2 = consumer.position(tp1);
+
+            consumer.commitSync(Map.of(tp, new OffsetAndMetadata(3L)));
+
+            assertEquals(3, consumer.committed(Set.of(tp)).get(tp).offset());
+            
assertNull(consumer.committed(Collections.singleton(tp1)).get(tp1));
+
+            // Positions should not change
+            assertEquals(pos1, consumer.position(tp));
+            assertEquals(pos2, consumer.position(tp1));
+
+            consumer.commitSync(Map.of(tp1, new OffsetAndMetadata(5L)));
+
+            assertEquals(3, consumer.committed(Set.of(tp)).get(tp).offset());
+            assertEquals(5, consumer.committed(Set.of(tp1)).get(tp1).offset());
+
+            // Using async should pick up the committed changes after commit 
completes
+            sendAndAwaitAsyncCommit(consumer, Map.of(tp1, new 
OffsetAndMetadata(7L)));
+            assertEquals(7, 
consumer.committed(Collections.singleton(tp1)).get(tp1).offset());
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerAutoCommitOnRebalance() throws 
InterruptedException {
+        testAutoCommitOnRebalance(GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerAutoCommitOnRebalance() throws 
InterruptedException {
+        testAutoCommitOnRebalance(GroupProtocol.CONSUMER);
+    }
+
+    private void testAutoCommitOnRebalance(GroupProtocol groupProtocol) throws 
InterruptedException {
+        var topic2 = "topic2";
+        cluster.createTopic(topic2, 2, (short) BROKER_COUNT);
+        try (var consumer = createConsumer(groupProtocol, true)) {
+            sendRecords(cluster, tp, 1000);
+
+            var rebalanceListener = new ConsumerRebalanceListener() {
+                @Override
+                public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
+                    // keep partitions paused in this test so that we can 
verify the commits based on specific seeks
+                    consumer.pause(partitions);
+                }
+
+                @Override
+                public void onPartitionsRevoked(Collection<TopicPartition> 
partitions) {
+
+                }
+            };
+
+            consumer.subscribe(List.of(topic), rebalanceListener);
+            awaitAssignment(consumer, Set.of(tp, tp1));
+
+            consumer.seek(tp, 300);
+            consumer.seek(tp1, 500);
+            // change subscription to trigger rebalance
+            consumer.subscribe(List.of(topic, topic2), rebalanceListener);
+
+            var newAssignment = Set.of(tp, tp1, new TopicPartition(topic2, 0), 
new TopicPartition(topic2, 1));
+            awaitAssignment(consumer, newAssignment);
+
+            // after rebalancing, we should have reset to the committed 
positions
+            assertEquals(300, consumer.committed(Set.of(tp)).get(tp).offset());
+            assertEquals(500, 
consumer.committed(Set.of(tp1)).get(tp1).offset());
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerSubscribeAndCommitSync() throws 
InterruptedException {
+        testSubscribeAndCommitSync(GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerSubscribeAndCommitSync() throws 
InterruptedException {
+        testSubscribeAndCommitSync(GroupProtocol.CONSUMER);
+    }
+
+    private void testSubscribeAndCommitSync(GroupProtocol groupProtocol) 
throws InterruptedException {
+        // This test ensure that the member ID is propagated from the group 
coordinator when the
+        // assignment is received into a subsequent offset commit
+        try (var consumer = createConsumer(groupProtocol, false)) {
+            assertEquals(0, consumer.assignment().size());
+            consumer.subscribe(List.of(topic));
+            awaitAssignment(consumer, Set.of(tp, tp1));
+
+            consumer.seek(tp, 0);
+            consumer.commitSync();
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerPositionAndCommit() throws 
InterruptedException {
+        testPositionAndCommit(GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerPositionAndCommit() throws 
InterruptedException {
+        testPositionAndCommit(GroupProtocol.CONSUMER);
+    }
+
+    private void testPositionAndCommit(GroupProtocol groupProtocol) throws 
InterruptedException {
+        try (Producer<byte[], byte[]> producer = cluster.producer();
+             var consumer = createConsumer(groupProtocol, false);
+             var otherConsumer = createConsumer(groupProtocol, false)
+        ) {
+            var startingTimestamp = System.currentTimeMillis();
+            sendRecords(producer, tp, 5, startingTimestamp);
+
+            var topicPartition = new TopicPartition(topic, 15);
+            
assertNull(consumer.committed(Collections.singleton(topicPartition)).get(topicPartition));
+
+            // position() on a partition that we aren't subscribed to throws 
an exception
+            assertThrows(IllegalStateException.class, () -> 
consumer.position(topicPartition));
+
+            consumer.assign(List.of(tp));
+
+            assertEquals(0L, consumer.position(tp), "position() on a partition 
that we are subscribed to should reset the offset");
+            consumer.commitSync();
+            assertEquals(0L, consumer.committed(Set.of(tp)).get(tp).offset());
+            consumeAndVerifyRecords(consumer, tp, 5, 0, 0, startingTimestamp);
+            assertEquals(5L, consumer.position(tp), "After consuming 5 
records, position should be 5");
+            consumer.commitSync();
+            assertEquals(5L, consumer.committed(Set.of(tp)).get(tp).offset(), 
"Committed offset should be returned");
+
+            startingTimestamp = System.currentTimeMillis();
+            sendRecords(producer, tp, 1, startingTimestamp);
+
+            // another consumer in the same group should get the same position
+            otherConsumer.assign(List.of(tp));
+            consumeAndVerifyRecords(otherConsumer, tp, 1, 5, 0, 
startingTimestamp);
+        }
+    }
+
+    // TODO: This only works in the new consumer, but should be fixed for the 
old consumer as well
+    @ClusterTest
+    public void testCommitAsyncCompletedBeforeConsumerCloses() {
+        // This is testing the contract that asynchronous offset commit are 
completed before the consumer
+        // is closed, even when no commit sync is performed as part of the 
close (due to auto-commit
+        // disabled, or simply because there are no consumed offsets).
+        try (Producer<byte[], byte[]> producer = 
cluster.producer(Map.of(ProducerConfig.ACKS_CONFIG, "all"));
+             var consumer = createConsumer(GroupProtocol.CONSUMER, false)
+        ) {
+            sendRecords(producer, tp, 3, System.currentTimeMillis());
+            sendRecords(producer, tp1, 3, System.currentTimeMillis());
+            consumer.assign(List.of(tp, tp1));
+
+            // Try without looking up the coordinator first
+            var cb = new CountConsumerCommitCallback();
+            consumer.commitAsync(Map.of(tp, new OffsetAndMetadata(1L)), cb);
+            consumer.commitAsync(Map.of(tp1, new OffsetAndMetadata(1L)), cb);
+
+            consumer.close();
+            assertEquals(2, cb.successCount);
+        }
+    }
+
+    // TODO: This only works in the new consumer, but should be fixed for the 
old consumer as well
+    @ClusterTest
+    public void testCommitAsyncCompletedBeforeCommitSyncReturns() {
+        // This is testing the contract that asynchronous offset commits sent 
previously with the
+        // `commitAsync` are guaranteed to have their callbacks invoked prior 
to completion of
+        // `commitSync` (given that it does not time out).
+        try (Producer<byte[], byte[]> producer = cluster.producer();
+             var consumer = createConsumer(GroupProtocol.CONSUMER, false)
+        ) {
+            sendRecords(producer, tp, 3, System.currentTimeMillis());
+            sendRecords(producer, tp1, 3, System.currentTimeMillis());
+
+            consumer.assign(List.of(tp, tp1));
+
+            // Try without looking up the coordinator first
+            var cb = new CountConsumerCommitCallback();
+            consumer.commitAsync(Map.of(tp, new OffsetAndMetadata(1L)), cb);
+            consumer.commitSync(Map.of());
+
+            assertEquals(1, consumer.committed(Set.of(tp)).get(tp).offset());
+            assertEquals(1, cb.successCount);
+
+            // Try with coordinator known
+            consumer.commitAsync(Map.of(tp, new OffsetAndMetadata(2L)), cb);
+            consumer.commitSync(Map.of(tp1, new OffsetAndMetadata(2L)));
+
+            assertEquals(2, consumer.committed(Set.of(tp)).get(tp).offset());
+            assertEquals(2, consumer.committed(Set.of(tp1)).get(tp1).offset());
+            assertEquals(2, cb.successCount);
+
+            // Try with empty sync commit
+            consumer.commitAsync(Map.of(tp, new OffsetAndMetadata(3L)), cb);
+            consumer.commitSync(Map.of());
+
+            assertEquals(3, consumer.committed(Set.of(tp)).get(tp).offset());
+            assertEquals(2, consumer.committed(Set.of(tp1)).get(tp1).offset());
+            assertEquals(3, cb.successCount);
+        }
+    }
+
+    private Consumer<byte[], byte[]> createConsumer(GroupProtocol protocol, 
boolean enableAutoCommit) {
+        return cluster.consumer(Map.of(
+            GROUP_ID_CONFIG, "test-group",
+            GROUP_PROTOCOL_CONFIG, protocol.name().toLowerCase(Locale.ROOT),
+            ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit
+        ));
+    }
+
+    private void sendAndAwaitAsyncCommit(
+        Consumer<byte[], byte[]> consumer,
+        Map<TopicPartition, OffsetAndMetadata> offsetsOpt
+    ) throws InterruptedException {
+        var commitCallback = new RetryCommitCallback(consumer, offsetsOpt);
+
+        commitCallback.sendAsyncCommit();
+        TestUtils.waitForCondition(() -> {
+                consumer.poll(Duration.ofMillis(100));
+                return commitCallback.isComplete;
+            }, "Failed to observe commit callback before timeout"
+        );
+
+        assertEquals(Optional.empty(), commitCallback.error);
+    }
+
+    private static class RetryCommitCallback implements OffsetCommitCallback {
+        private boolean isComplete = false;
+        private Optional<Exception> error = Optional.empty();
+
+        private final Consumer<byte[], byte[]> consumer;
+        private final Map<TopicPartition, OffsetAndMetadata> offsetsOpt;
+
+        public RetryCommitCallback(
+            Consumer<byte[], byte[]> consumer,
+            Map<TopicPartition, OffsetAndMetadata> offsetsOpt
+        ) {
+            this.consumer = consumer;
+            this.offsetsOpt = offsetsOpt;
+        }
+
+        @Override
+        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, 
Exception exception) {
+            if (exception instanceof RetriableCommitFailedException) {
+                sendAsyncCommit();
+            } else {
+                isComplete = true;
+                error = Optional.ofNullable(exception);
+            }
+        }
+
+        void sendAsyncCommit() {
+            consumer.commitAsync(offsetsOpt, this);
+        }
+    }
+
+    private static class CountConsumerCommitCallback implements 
OffsetCommitCallback {
+        private int successCount = 0;
+        private Optional<Exception> lastError = Optional.empty();
+
+        @Override
+        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, 
Exception exception) {
+            if (exception == null) {
+                successCount += 1;
+            } else {
+                lastError = Optional.of(exception);
+            }
+        }
+    }
+
+    private void changeConsumerSubscriptionAndValidateAssignment(
+        Consumer<byte[], byte[]> consumer,
+        List<String> topicsToSubscribe,
+        Set<TopicPartition> expectedAssignment,
+        ConsumerRebalanceListener rebalanceListener
+    ) throws InterruptedException {
+        consumer.subscribe(topicsToSubscribe, rebalanceListener);
+        awaitAssignment(consumer, expectedAssignment);
+    }
+}
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala
deleted file mode 100644
index 0445e81cac1..00000000000
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala
+++ /dev/null
@@ -1,371 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
-  * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
-  * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
-  * License. You may obtain a copy of the License at
-  *
-  * http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
-  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
-  * specific language governing permissions and limitations under the License.
-  */
-package kafka.api
-
-import kafka.utils.{TestInfoUtils, TestUtils}
-import org.apache.kafka.clients.consumer._
-import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.common.serialization.{StringDeserializer, 
StringSerializer}
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.test.MockConsumerInterceptor
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Timeout
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.MethodSource
-
-import java.time.Duration
-import java.util
-import java.util.Optional
-import scala.jdk.CollectionConverters._
-
-/**
- * Integration tests for the consumer that covers the logic related to 
committing offsets.
- */
-@Timeout(600)
-class PlaintextConsumerCommitTest extends AbstractConsumerTest {
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testAutoCommitOnClose(groupProtocol: String): Unit = {
-    this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"true")
-    val consumer = createConsumer()
-
-    val numRecords = 10000
-    val producer = createProducer()
-    sendRecords(producer, numRecords, tp)
-
-    consumer.subscribe(List(topic).asJava)
-    awaitAssignment(consumer, Set(tp, tp2))
-
-    // should auto-commit sought positions before closing
-    consumer.seek(tp, 300)
-    consumer.seek(tp2, 500)
-    consumer.close()
-
-    // now we should see the committed positions from another consumer
-    val anotherConsumer = createConsumer()
-    assertEquals(300, anotherConsumer.committed(Set(tp).asJava).get(tp).offset)
-    assertEquals(500, 
anotherConsumer.committed(Set(tp2).asJava).get(tp2).offset)
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testAutoCommitOnCloseAfterWakeup(groupProtocol: String): Unit = {
-    this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"true")
-    val consumer = createConsumer()
-
-    val numRecords = 10000
-    val producer = createProducer()
-    sendRecords(producer, numRecords, tp)
-
-    consumer.subscribe(List(topic).asJava)
-    awaitAssignment(consumer, Set(tp, tp2))
-
-    // should auto-commit sought positions before closing
-    consumer.seek(tp, 300)
-    consumer.seek(tp2, 500)
-
-    // wakeup the consumer before closing to simulate trying to break a poll
-    // loop from another thread
-    consumer.wakeup()
-    consumer.close()
-
-    // now we should see the committed positions from another consumer
-    val anotherConsumer = createConsumer()
-    assertEquals(300, anotherConsumer.committed(Set(tp).asJava).get(tp).offset)
-    assertEquals(500, 
anotherConsumer.committed(Set(tp2).asJava).get(tp2).offset)
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testCommitMetadata(groupProtocol: String): Unit = {
-    val consumer = createConsumer()
-    consumer.assign(List(tp).asJava)
-
-    // sync commit
-    val syncMetadata = new OffsetAndMetadata(5, Optional.of(15), "foo")
-    consumer.commitSync(Map((tp, syncMetadata)).asJava)
-    assertEquals(syncMetadata, consumer.committed(Set(tp).asJava).get(tp))
-
-    // async commit
-    val asyncMetadata = new OffsetAndMetadata(10, "bar")
-    sendAndAwaitAsyncCommit(consumer, Some(Map(tp -> asyncMetadata)))
-    assertEquals(asyncMetadata, consumer.committed(Set(tp).asJava).get(tp))
-
-    // handle null metadata
-    val nullMetadata = new OffsetAndMetadata(5, null)
-    consumer.commitSync(Map(tp -> nullMetadata).asJava)
-    assertEquals(nullMetadata, consumer.committed(Set(tp).asJava).get(tp))
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testAsyncCommit(groupProtocol: String): Unit = {
-    val consumer = createConsumer()
-    consumer.assign(List(tp).asJava)
-
-    val callback = new CountConsumerCommitCallback
-    val count = 5
-
-    for (i <- 1 to count)
-      consumer.commitAsync(Map(tp -> new OffsetAndMetadata(i)).asJava, 
callback)
-
-    TestUtils.pollUntilTrue(consumer, () => callback.successCount >= count || 
callback.lastError.isDefined,
-      "Failed to observe commit callback before timeout", waitTimeMs = 10000)
-
-    assertEquals(None, callback.lastError)
-    assertEquals(count, callback.successCount)
-    assertEquals(new OffsetAndMetadata(count), 
consumer.committed(Set(tp).asJava).get(tp))
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testAutoCommitIntercept(groupProtocol: String): Unit = {
-    val topic2 = "topic2"
-    createTopic(topic2, 2, brokerCount)
-
-    // produce records
-    val numRecords = 100
-    val testProducer = createProducer(keySerializer = new StringSerializer, 
valueSerializer = new StringSerializer)
-    (0 until numRecords).map { i =>
-      testProducer.send(new ProducerRecord(tp.topic(), tp.partition(), s"key 
$i", s"value $i"))
-    }.foreach(_.get)
-
-    // create consumer with interceptor
-    this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"true")
-    this.consumerConfig.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, 
"org.apache.kafka.test.MockConsumerInterceptor")
-    val testConsumer = createConsumer(keyDeserializer = new 
StringDeserializer, valueDeserializer = new StringDeserializer)
-    val rebalanceListener = new ConsumerRebalanceListener {
-      override def onPartitionsAssigned(partitions: 
util.Collection[TopicPartition]): Unit = {
-        // keep partitions paused in this test so that we can verify the 
commits based on specific seeks
-        testConsumer.pause(partitions)
-      }
-
-      override def onPartitionsRevoked(partitions: 
util.Collection[TopicPartition]): Unit = {}
-    }
-    changeConsumerSubscriptionAndValidateAssignment(testConsumer, List(topic), 
Set(tp, tp2), rebalanceListener)
-    testConsumer.seek(tp, 10)
-    testConsumer.seek(tp2, 20)
-
-    // change subscription to trigger rebalance
-    val commitCountBeforeRebalance = 
MockConsumerInterceptor.ON_COMMIT_COUNT.intValue()
-    changeConsumerSubscriptionAndValidateAssignment(testConsumer,
-      List(topic, topic2),
-      Set(tp, tp2, new TopicPartition(topic2, 0), new TopicPartition(topic2, 
1)),
-      rebalanceListener)
-
-    // after rebalancing, we should have reset to the committed positions
-    assertEquals(10, testConsumer.committed(Set(tp).asJava).get(tp).offset)
-    assertEquals(20, testConsumer.committed(Set(tp2).asJava).get(tp2).offset)
-
-    // In both CLASSIC and CONSUMER protocols, interceptors are executed in 
poll and close.
-    // However, in the CONSUMER protocol, the assignment may be changed 
outside of a poll, so
-    // we need to poll once to ensure the interceptor is called.
-    if (groupProtocol.toUpperCase == GroupProtocol.CONSUMER.name) {
-      testConsumer.poll(Duration.ZERO)
-    }
-
-    assertTrue(MockConsumerInterceptor.ON_COMMIT_COUNT.intValue() > 
commitCountBeforeRebalance)
-
-    // verify commits are intercepted on close
-    val commitCountBeforeClose = 
MockConsumerInterceptor.ON_COMMIT_COUNT.intValue()
-    testConsumer.close()
-    assertTrue(MockConsumerInterceptor.ON_COMMIT_COUNT.intValue() > 
commitCountBeforeClose)
-    testProducer.close()
-
-    // cleanup
-    MockConsumerInterceptor.resetCounters()
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testCommitSpecifiedOffsets(groupProtocol: String): Unit = {
-    val producer = createProducer()
-    sendRecords(producer, numRecords = 5, tp)
-    sendRecords(producer, numRecords = 7, tp2)
-
-    val consumer = createConsumer()
-    consumer.assign(List(tp, tp2).asJava)
-
-    val pos1 = consumer.position(tp)
-    val pos2 = consumer.position(tp2)
-    consumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(3L))).asJava)
-    assertEquals(3, consumer.committed(Set(tp).asJava).get(tp).offset)
-    assertNull(consumer.committed(Set(tp2).asJava).get(tp2))
-
-    // Positions should not change
-    assertEquals(pos1, consumer.position(tp))
-    assertEquals(pos2, consumer.position(tp2))
-    consumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp2, new 
OffsetAndMetadata(5L))).asJava)
-    assertEquals(3, consumer.committed(Set(tp).asJava).get(tp).offset)
-    assertEquals(5, consumer.committed(Set(tp2).asJava).get(tp2).offset)
-
-    // Using async should pick up the committed changes after commit completes
-    sendAndAwaitAsyncCommit(consumer, Some(Map(tp2 -> new 
OffsetAndMetadata(7L))))
-    assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset)
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testAutoCommitOnRebalance(groupProtocol: String): Unit = {
-    val topic2 = "topic2"
-    createTopic(topic2, 2, brokerCount)
-
-    this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"true")
-    val consumer = createConsumer()
-
-    val numRecords = 10000
-    val producer = createProducer()
-    sendRecords(producer, numRecords, tp)
-
-    val rebalanceListener = new ConsumerRebalanceListener {
-      override def onPartitionsAssigned(partitions: 
util.Collection[TopicPartition]): Unit = {
-        // keep partitions paused in this test so that we can verify the 
commits based on specific seeks
-        consumer.pause(partitions)
-      }
-
-      override def onPartitionsRevoked(partitions: 
util.Collection[TopicPartition]): Unit = {}
-    }
-
-    consumer.subscribe(List(topic).asJava, rebalanceListener)
-
-    awaitAssignment(consumer, Set(tp, tp2))
-
-    consumer.seek(tp, 300)
-    consumer.seek(tp2, 500)
-
-    // change subscription to trigger rebalance
-    consumer.subscribe(List(topic, topic2).asJava, rebalanceListener)
-
-    val newAssignment = Set(tp, tp2, new TopicPartition(topic2, 0), new 
TopicPartition(topic2, 1))
-    awaitAssignment(consumer, newAssignment)
-
-    // after rebalancing, we should have reset to the committed positions
-    assertEquals(300, consumer.committed(Set(tp).asJava).get(tp).offset)
-    assertEquals(500, consumer.committed(Set(tp2).asJava).get(tp2).offset)
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testSubscribeAndCommitSync(groupProtocol: String): Unit = {
-    // This test ensure that the member ID is propagated from the group 
coordinator when the
-    // assignment is received into a subsequent offset commit
-    val consumer = createConsumer()
-    assertEquals(0, consumer.assignment.size)
-    consumer.subscribe(List(topic).asJava)
-    awaitAssignment(consumer, Set(tp, tp2))
-
-    consumer.seek(tp, 0)
-
-    consumer.commitSync()
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testPositionAndCommit(groupProtocol: String): Unit = {
-    val producer = createProducer()
-    var startingTimestamp = System.currentTimeMillis()
-    sendRecords(producer, numRecords = 5, tp, startingTimestamp = 
startingTimestamp)
-
-    val topicPartition = new TopicPartition(topic, 15)
-    val consumer = createConsumer()
-    
assertNull(consumer.committed(Set(topicPartition).asJava).get(topicPartition))
-
-    // position() on a partition that we aren't subscribed to throws an 
exception
-    assertThrows(classOf[IllegalStateException], () => 
consumer.position(topicPartition))
-
-    consumer.assign(List(tp).asJava)
-
-    assertEquals(0L, consumer.position(tp), "position() on a partition that we 
are subscribed to should reset the offset")
-    consumer.commitSync()
-    assertEquals(0L, consumer.committed(Set(tp).asJava).get(tp).offset)
-    consumeAndVerifyRecords(consumer = consumer, numRecords = 5, 
startingOffset = 0, startingTimestamp = startingTimestamp)
-    assertEquals(5L, consumer.position(tp), "After consuming 5 records, 
position should be 5")
-    consumer.commitSync()
-    assertEquals(5L, consumer.committed(Set(tp).asJava).get(tp).offset, 
"Committed offset should be returned")
-
-    startingTimestamp = System.currentTimeMillis()
-    sendRecords(producer, numRecords = 1, tp, startingTimestamp = 
startingTimestamp)
-
-    // another consumer in the same group should get the same position
-    val otherConsumer = createConsumer()
-    otherConsumer.assign(List(tp).asJava)
-    consumeAndVerifyRecords(consumer = otherConsumer, numRecords = 1, 
startingOffset = 5, startingTimestamp = startingTimestamp)
-  }
-
-  // TODO: This only works in the new consumer, but should be fixed for the 
old consumer as well
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  
@MethodSource(Array("getTestGroupProtocolParametersConsumerGroupProtocolOnly"))
-  def testCommitAsyncCompletedBeforeConsumerCloses(groupProtocol: String): 
Unit = {
-    // This is testing the contract that asynchronous offset commit are 
completed before the consumer
-    // is closed, even when no commit sync is performed as part of the close 
(due to auto-commit
-    // disabled, or simply because there are no consumed offsets).
-    val producer = createProducer()
-    sendRecords(producer, numRecords = 3, tp)
-    sendRecords(producer, numRecords = 3, tp2)
-
-    val consumer = createConsumer()
-    consumer.assign(List(tp, tp2).asJava)
-
-    // Try without looking up the coordinator first
-    val cb = new CountConsumerCommitCallback
-    consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(1L))).asJava, cb)
-    consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new 
OffsetAndMetadata(1L))).asJava, cb)
-    consumer.close()
-    assertEquals(2, cb.successCount)
-  }
-
-  // TODO: This only works in the new consumer, but should be fixed for the 
old consumer as well
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  
@MethodSource(Array("getTestGroupProtocolParametersConsumerGroupProtocolOnly"))
-  def testCommitAsyncCompletedBeforeCommitSyncReturns(groupProtocol: String): 
Unit = {
-    // This is testing the contract that asynchronous offset commits sent 
previously with the
-    // `commitAsync` are guaranteed to have their callbacks invoked prior to 
completion of
-    // `commitSync` (given that it does not time out).
-    val producer = createProducer()
-    sendRecords(producer, numRecords = 3, tp)
-    sendRecords(producer, numRecords = 3, tp2)
-
-    val consumer = createConsumer()
-    consumer.assign(List(tp, tp2).asJava)
-
-    // Try without looking up the coordinator first
-    val cb = new CountConsumerCommitCallback
-    consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(1L))).asJava, cb)
-    consumer.commitSync(Map.empty[TopicPartition, OffsetAndMetadata].asJava)
-    assertEquals(1, consumer.committed(Set(tp).asJava).get(tp).offset)
-    assertEquals(1, cb.successCount)
-
-    // Try with coordinator known
-    consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(2L))).asJava, cb)
-    consumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp2, new 
OffsetAndMetadata(2L))).asJava)
-    assertEquals(2, consumer.committed(Set(tp).asJava).get(tp).offset)
-    assertEquals(2, consumer.committed(Set(tp2).asJava).get(tp2).offset)
-    assertEquals(2, cb.successCount)
-
-    // Try with empty sync commit
-    consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(3L))).asJava, cb)
-    consumer.commitSync(Map.empty[TopicPartition, OffsetAndMetadata].asJava)
-    assertEquals(3, consumer.committed(Set(tp).asJava).get(tp).offset)
-    assertEquals(2, consumer.committed(Set(tp2).asJava).get(tp2).offset)
-    assertEquals(3, cb.successCount)
-  }
-
-  def changeConsumerSubscriptionAndValidateAssignment[K, V](consumer: 
Consumer[K, V],
-                                                            topicsToSubscribe: 
List[String],
-                                                            
expectedAssignment: Set[TopicPartition],
-                                                            rebalanceListener: 
ConsumerRebalanceListener): Unit = {
-    consumer.subscribe(topicsToSubscribe.asJava, rebalanceListener)
-    awaitAssignment(consumer, expectedAssignment)
-  }
-}


Reply via email to