This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6573b4ace18 KAFKA-19042 Move PlaintextConsumerCommitTest to
client-integration-tests module (#19389)
6573b4ace18 is described below
commit 6573b4ace1884c358b8751a19a2eff7697964625
Author: Ken Huang <[email protected]>
AuthorDate: Mon May 19 21:51:42 2025 +0800
KAFKA-19042 Move PlaintextConsumerCommitTest to client-integration-tests
module (#19389)
Use Java to rewrite `PlaintextConsumerCommitTest` by new test infra and
move it to client-integration-tests module.
Reviewers: PoAn Yang <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../consumer/PlaintextConsumerCommitTest.java | 594 +++++++++++++++++++++
.../kafka/api/PlaintextConsumerCommitTest.scala | 371 -------------
2 files changed, 594 insertions(+), 371 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
new file mode 100644
index 00000000000..c3f0aedccc6
--- /dev/null
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
@@ -0,0 +1,594 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.TestUtils;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.test.MockConsumerInterceptor;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.kafka.clients.ClientsTestUtils.awaitAssignment;
+import static
org.apache.kafka.clients.ClientsTestUtils.consumeAndVerifyRecords;
+import static org.apache.kafka.clients.ClientsTestUtils.sendRecords;
+import static
org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static
org.apache.kafka.clients.consumer.ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG;
+import static
org.apache.kafka.clients.consumer.PlaintextConsumerCommitTest.BROKER_COUNT;
+import static
org.apache.kafka.clients.consumer.PlaintextConsumerCommitTest.OFFSETS_TOPIC_PARTITIONS;
+import static
org.apache.kafka.clients.consumer.PlaintextConsumerCommitTest.OFFSETS_TOPIC_REPLICATION;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ClusterTestDefaults(
+ types = {Type.KRAFT},
+ brokers = BROKER_COUNT,
+ serverProperties = {
+ @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value =
OFFSETS_TOPIC_PARTITIONS),
+ @ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG,
value = OFFSETS_TOPIC_REPLICATION),
+ @ClusterConfigProperty(key = GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG,
value = "100"),
+ }
+)
+public class PlaintextConsumerCommitTest {
+
+ public static final int BROKER_COUNT = 3;
+ public static final String OFFSETS_TOPIC_PARTITIONS = "1";
+ public static final String OFFSETS_TOPIC_REPLICATION = "3";
+ private final ClusterInstance cluster;
+ private final String topic = "topic";
+ private final TopicPartition tp = new TopicPartition(topic, 0);
+ private final TopicPartition tp1 = new TopicPartition(topic, 1);
+
+ public PlaintextConsumerCommitTest(ClusterInstance clusterInstance) {
+ this.cluster = clusterInstance;
+ }
+
+ @BeforeEach
+ public void setup() throws InterruptedException {
+ cluster.createTopic(topic, 2, (short) BROKER_COUNT);
+ }
+
+ @ClusterTest
+ public void testClassicConsumerAutoCommitOnClose() throws
InterruptedException {
+ testAutoCommitOnClose(GroupProtocol.CLASSIC);
+ }
+
+ @ClusterTest
+ public void testAsyncConsumerAutoCommitOnClose() throws
InterruptedException {
+ testAutoCommitOnClose(GroupProtocol.CONSUMER);
+ }
+
+ private void testAutoCommitOnClose(GroupProtocol groupProtocol) throws
InterruptedException {
+ try (var consumer = createConsumer(groupProtocol, true)) {
+ sendRecords(cluster, tp, 1000);
+
+ consumer.subscribe(List.of(topic));
+ awaitAssignment(consumer, Set.of(tp, tp1));
+ // should auto-commit sought positions before closing
+ consumer.seek(tp, 300);
+ consumer.seek(tp1, 500);
+ }
+
+ // now we should see the committed positions from another consumer
+ try (var anotherConsumer = createConsumer(groupProtocol, true)) {
+ assertEquals(300,
anotherConsumer.committed(Set.of(tp)).get(tp).offset());
+ assertEquals(500,
anotherConsumer.committed(Set.of(tp1)).get(tp1).offset());
+ }
+ }
+
+ @ClusterTest
+ public void testClassicConsumerAutoCommitOnCloseAfterWakeup() throws
InterruptedException {
+ testAutoCommitOnCloseAfterWakeup(GroupProtocol.CLASSIC);
+ }
+
+ @ClusterTest
+ public void testAsyncConsumerAutoCommitOnCloseAfterWakeup() throws
InterruptedException {
+ testAutoCommitOnCloseAfterWakeup(GroupProtocol.CONSUMER);
+ }
+
+ private void testAutoCommitOnCloseAfterWakeup(GroupProtocol groupProtocol)
throws InterruptedException {
+ try (var consumer = createConsumer(groupProtocol, true)) {
+ sendRecords(cluster, tp, 1000);
+
+ consumer.subscribe(List.of(topic));
+ awaitAssignment(consumer, Set.of(tp, tp1));
+
+ // should auto-commit sought positions before closing
+ consumer.seek(tp, 300);
+ consumer.seek(tp1, 500);
+
+ // wakeup the consumer before closing to simulate trying to break
a poll
+ // loop from another thread
+ consumer.wakeup();
+ }
+
+ // now we should see the committed positions from another consumer
+ try (var anotherConsumer = createConsumer(groupProtocol, true)) {
+ assertEquals(300,
anotherConsumer.committed(Set.of(tp)).get(tp).offset());
+ assertEquals(500,
anotherConsumer.committed(Set.of(tp1)).get(tp1).offset());
+ }
+ }
+
+ @ClusterTest
+ public void testClassicConsumerCommitMetadata() throws
InterruptedException {
+ testCommitMetadata(GroupProtocol.CLASSIC);
+ }
+
+ @ClusterTest
+ public void testAsyncConsumerCommitMetadata() throws InterruptedException {
+ testCommitMetadata(GroupProtocol.CONSUMER);
+ }
+
+ private void testCommitMetadata(GroupProtocol groupProtocol) throws
InterruptedException {
+ try (var consumer = createConsumer(groupProtocol, true)) {
+ consumer.assign(List.of(tp));
+ // sync commit
+ var syncMetadata = new OffsetAndMetadata(5, Optional.of(15),
"foo");
+ consumer.commitSync(Map.of(tp, syncMetadata));
+ assertEquals(syncMetadata, consumer.committed(Set.of(tp)).get(tp));
+
+ // async commit
+ var asyncMetadata = new OffsetAndMetadata(10, "bar");
+ sendAndAwaitAsyncCommit(consumer, Map.of(tp, asyncMetadata));
+ assertEquals(asyncMetadata,
consumer.committed(Set.of(tp)).get(tp));
+
+ // handle null metadata
+ var nullMetadata = new OffsetAndMetadata(5, null);
+ consumer.commitSync(Map.of(tp, nullMetadata));
+ assertEquals(nullMetadata, consumer.committed(Set.of(tp)).get(tp));
+ }
+ }
+
+ @ClusterTest
+ public void testClassicConsumerAsyncCommit() throws InterruptedException {
+ testAsyncCommit(GroupProtocol.CLASSIC);
+ }
+
+ @ClusterTest
+ public void testAsyncConsumerAsyncCommit() throws InterruptedException {
+ testAsyncCommit(GroupProtocol.CONSUMER);
+ }
+
+ private void testAsyncCommit(GroupProtocol groupProtocol) throws
InterruptedException {
+ // Ensure the __consumer_offsets topic is created to prevent transient
issues,
+ // such as RetriableCommitFailedException during async offset commits.
+ cluster.createTopic(
+ Topic.GROUP_METADATA_TOPIC_NAME,
+ Integer.parseInt(OFFSETS_TOPIC_PARTITIONS),
+ Short.parseShort(OFFSETS_TOPIC_REPLICATION)
+ );
+ try (var consumer = createConsumer(groupProtocol, false)) {
+ consumer.assign(List.of(tp));
+
+ var callback = new CountConsumerCommitCallback();
+ var count = 5;
+ for (var i = 1; i <= count; i++)
+ consumer.commitAsync(Map.of(tp, new OffsetAndMetadata(i)),
callback);
+
+ TestUtils.waitForCondition(() -> {
+ consumer.poll(Duration.ofMillis(100));
+ return callback.successCount >= count ||
callback.lastError.isPresent();
+ }, "Failed to observe commit callback before timeout");
+
+ assertEquals(Optional.empty(), callback.lastError);
+ assertEquals(count, callback.successCount);
+ assertEquals(new OffsetAndMetadata(count),
consumer.committed(Set.of(tp)).get(tp));
+ }
+ }
+
+ @ClusterTest
+ public void testClassicConsumerAutoCommitIntercept() throws
InterruptedException {
+ testAutoCommitIntercept(GroupProtocol.CLASSIC);
+ }
+
+ @ClusterTest
+ public void testAsyncConsumerAutoCommitIntercept() throws
InterruptedException {
+ testAutoCommitIntercept(GroupProtocol.CONSUMER);
+ }
+
+ private void testAutoCommitIntercept(GroupProtocol groupProtocol) throws
InterruptedException {
+ var topic2 = "topic2";
+ cluster.createTopic(topic2, 2, (short) 3);
+ var numRecords = 100;
+ try (var producer = cluster.producer();
+ // create consumer with interceptor
+ Consumer<byte[], byte[]> consumer = cluster.consumer(Map.of(
+ GROUP_PROTOCOL_CONFIG,
groupProtocol.name().toLowerCase(Locale.ROOT),
+ ENABLE_AUTO_COMMIT_CONFIG, "true",
+ INTERCEPTOR_CLASSES_CONFIG,
"org.apache.kafka.test.MockConsumerInterceptor"
+ ))
+ ) {
+ // produce records
+ for (var i = 0; i < numRecords; i++) {
+ producer.send(new ProducerRecord<>(tp.topic(), tp.partition(),
("key " + i).getBytes(), ("value " + i).getBytes()));
+ }
+
+ var rebalanceListener = new ConsumerRebalanceListener() {
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition>
partitions) {
+ // keep partitions paused in this test so that we can
verify the commits based on specific seeks
+ consumer.pause(partitions);
+ }
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition>
partitions) {
+ // No-op
+ }
+ };
+
+ changeConsumerSubscriptionAndValidateAssignment(
+ consumer,
+ List.of(topic),
+ Set.of(tp, tp1),
+ rebalanceListener
+ );
+ consumer.seek(tp, 10);
+ consumer.seek(tp1, 20);
+
+ // change subscription to trigger rebalance
+ var commitCountBeforeRebalance =
MockConsumerInterceptor.ON_COMMIT_COUNT.intValue();
+ var expectedAssignment = Set.of(tp, tp1, new
TopicPartition(topic2, 0), new TopicPartition(topic2, 1));
+ changeConsumerSubscriptionAndValidateAssignment(
+ consumer,
+ List.of(topic, topic2),
+ expectedAssignment,
+ rebalanceListener
+ );
+
+ // after rebalancing, we should have reset to the committed
positions
+ var committed1 = consumer.committed(Set.of(tp));
+ assertEquals(10, committed1.get(tp).offset());
+ var committed2 = consumer.committed(Set.of(tp1));
+ assertEquals(20, committed2.get(tp1).offset());
+
+ // In both CLASSIC and CONSUMER protocols, interceptors are
executed in poll and close.
+ // However, in the CONSUMER protocol, the assignment may be
changed outside a poll, so
+ // we need to poll once to ensure the interceptor is called.
+ if (groupProtocol == GroupProtocol.CONSUMER) {
+ consumer.poll(Duration.ZERO);
+ }
+
+ assertTrue(MockConsumerInterceptor.ON_COMMIT_COUNT.intValue() >
commitCountBeforeRebalance);
+
+ // verify commits are intercepted on close
+ var commitCountBeforeClose =
MockConsumerInterceptor.ON_COMMIT_COUNT.intValue();
+ consumer.close();
+ assertTrue(MockConsumerInterceptor.ON_COMMIT_COUNT.intValue() >
commitCountBeforeClose);
+ producer.close();
+ // cleanup
+ MockConsumerInterceptor.resetCounters();
+ }
+ }
+
+ @ClusterTest
+ public void testClassicConsumerCommitSpecifiedOffsets() throws
InterruptedException {
+ testCommitSpecifiedOffsets(GroupProtocol.CLASSIC);
+ }
+
+ @ClusterTest
+ public void testAsyncConsumerCommitSpecifiedOffsets() throws
InterruptedException {
+ testCommitSpecifiedOffsets(GroupProtocol.CONSUMER);
+ }
+
+ private void testCommitSpecifiedOffsets(GroupProtocol groupProtocol)
throws InterruptedException {
+ try (Producer<byte[], byte[]> producer = cluster.producer();
+ var consumer = createConsumer(groupProtocol, false)
+ ) {
+ sendRecords(producer, tp, 5, System.currentTimeMillis());
+ sendRecords(producer, tp1, 7, System.currentTimeMillis());
+
+ consumer.assign(List.of(tp, tp1));
+
+ var pos1 = consumer.position(tp);
+ var pos2 = consumer.position(tp1);
+
+ consumer.commitSync(Map.of(tp, new OffsetAndMetadata(3L)));
+
+ assertEquals(3, consumer.committed(Set.of(tp)).get(tp).offset());
+
assertNull(consumer.committed(Collections.singleton(tp1)).get(tp1));
+
+ // Positions should not change
+ assertEquals(pos1, consumer.position(tp));
+ assertEquals(pos2, consumer.position(tp1));
+
+ consumer.commitSync(Map.of(tp1, new OffsetAndMetadata(5L)));
+
+ assertEquals(3, consumer.committed(Set.of(tp)).get(tp).offset());
+ assertEquals(5, consumer.committed(Set.of(tp1)).get(tp1).offset());
+
+ // Using async should pick up the committed changes after commit
completes
+ sendAndAwaitAsyncCommit(consumer, Map.of(tp1, new
OffsetAndMetadata(7L)));
+ assertEquals(7,
consumer.committed(Collections.singleton(tp1)).get(tp1).offset());
+ }
+ }
+
+ @ClusterTest
+ public void testClassicConsumerAutoCommitOnRebalance() throws
InterruptedException {
+ testAutoCommitOnRebalance(GroupProtocol.CLASSIC);
+ }
+
+ @ClusterTest
+ public void testAsyncConsumerAutoCommitOnRebalance() throws
InterruptedException {
+ testAutoCommitOnRebalance(GroupProtocol.CONSUMER);
+ }
+
+ private void testAutoCommitOnRebalance(GroupProtocol groupProtocol) throws
InterruptedException {
+ var topic2 = "topic2";
+ cluster.createTopic(topic2, 2, (short) BROKER_COUNT);
+ try (var consumer = createConsumer(groupProtocol, true)) {
+ sendRecords(cluster, tp, 1000);
+
+ var rebalanceListener = new ConsumerRebalanceListener() {
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition>
partitions) {
+ // keep partitions paused in this test so that we can
verify the commits based on specific seeks
+ consumer.pause(partitions);
+ }
+
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition>
partitions) {
+
+ }
+ };
+
+ consumer.subscribe(List.of(topic), rebalanceListener);
+ awaitAssignment(consumer, Set.of(tp, tp1));
+
+ consumer.seek(tp, 300);
+ consumer.seek(tp1, 500);
+ // change subscription to trigger rebalance
+ consumer.subscribe(List.of(topic, topic2), rebalanceListener);
+
+ var newAssignment = Set.of(tp, tp1, new TopicPartition(topic2, 0),
new TopicPartition(topic2, 1));
+ awaitAssignment(consumer, newAssignment);
+
+ // after rebalancing, we should have reset to the committed
positions
+ assertEquals(300, consumer.committed(Set.of(tp)).get(tp).offset());
+ assertEquals(500,
consumer.committed(Set.of(tp1)).get(tp1).offset());
+ }
+ }
+
+ @ClusterTest
+ public void testClassicConsumerSubscribeAndCommitSync() throws
InterruptedException {
+ testSubscribeAndCommitSync(GroupProtocol.CLASSIC);
+ }
+
+ @ClusterTest
+ public void testAsyncConsumerSubscribeAndCommitSync() throws
InterruptedException {
+ testSubscribeAndCommitSync(GroupProtocol.CONSUMER);
+ }
+
+ private void testSubscribeAndCommitSync(GroupProtocol groupProtocol)
throws InterruptedException {
+ // This test ensure that the member ID is propagated from the group
coordinator when the
+ // assignment is received into a subsequent offset commit
+ try (var consumer = createConsumer(groupProtocol, false)) {
+ assertEquals(0, consumer.assignment().size());
+ consumer.subscribe(List.of(topic));
+ awaitAssignment(consumer, Set.of(tp, tp1));
+
+ consumer.seek(tp, 0);
+ consumer.commitSync();
+ }
+ }
+
+ @ClusterTest
+ public void testClassicConsumerPositionAndCommit() throws
InterruptedException {
+ testPositionAndCommit(GroupProtocol.CLASSIC);
+ }
+
+ @ClusterTest
+ public void testAsyncConsumerPositionAndCommit() throws
InterruptedException {
+ testPositionAndCommit(GroupProtocol.CONSUMER);
+ }
+
+ private void testPositionAndCommit(GroupProtocol groupProtocol) throws
InterruptedException {
+ try (Producer<byte[], byte[]> producer = cluster.producer();
+ var consumer = createConsumer(groupProtocol, false);
+ var otherConsumer = createConsumer(groupProtocol, false)
+ ) {
+ var startingTimestamp = System.currentTimeMillis();
+ sendRecords(producer, tp, 5, startingTimestamp);
+
+ var topicPartition = new TopicPartition(topic, 15);
+
assertNull(consumer.committed(Collections.singleton(topicPartition)).get(topicPartition));
+
+ // position() on a partition that we aren't subscribed to throws
an exception
+ assertThrows(IllegalStateException.class, () ->
consumer.position(topicPartition));
+
+ consumer.assign(List.of(tp));
+
+ assertEquals(0L, consumer.position(tp), "position() on a partition
that we are subscribed to should reset the offset");
+ consumer.commitSync();
+ assertEquals(0L, consumer.committed(Set.of(tp)).get(tp).offset());
+ consumeAndVerifyRecords(consumer, tp, 5, 0, 0, startingTimestamp);
+ assertEquals(5L, consumer.position(tp), "After consuming 5
records, position should be 5");
+ consumer.commitSync();
+ assertEquals(5L, consumer.committed(Set.of(tp)).get(tp).offset(),
"Committed offset should be returned");
+
+ startingTimestamp = System.currentTimeMillis();
+ sendRecords(producer, tp, 1, startingTimestamp);
+
+ // another consumer in the same group should get the same position
+ otherConsumer.assign(List.of(tp));
+ consumeAndVerifyRecords(otherConsumer, tp, 1, 5, 0,
startingTimestamp);
+ }
+ }
+
+ // TODO: This only works in the new consumer, but should be fixed for the
old consumer as well
+ @ClusterTest
+ public void testCommitAsyncCompletedBeforeConsumerCloses() {
+ // This is testing the contract that asynchronous offset commit are
completed before the consumer
+ // is closed, even when no commit sync is performed as part of the
close (due to auto-commit
+ // disabled, or simply because there are no consumed offsets).
+ try (Producer<byte[], byte[]> producer =
cluster.producer(Map.of(ProducerConfig.ACKS_CONFIG, "all"));
+ var consumer = createConsumer(GroupProtocol.CONSUMER, false)
+ ) {
+ sendRecords(producer, tp, 3, System.currentTimeMillis());
+ sendRecords(producer, tp1, 3, System.currentTimeMillis());
+ consumer.assign(List.of(tp, tp1));
+
+ // Try without looking up the coordinator first
+ var cb = new CountConsumerCommitCallback();
+ consumer.commitAsync(Map.of(tp, new OffsetAndMetadata(1L)), cb);
+ consumer.commitAsync(Map.of(tp1, new OffsetAndMetadata(1L)), cb);
+
+ consumer.close();
+ assertEquals(2, cb.successCount);
+ }
+ }
+
+ // TODO: This only works in the new consumer, but should be fixed for the
old consumer as well
+ @ClusterTest
+ public void testCommitAsyncCompletedBeforeCommitSyncReturns() {
+ // This is testing the contract that asynchronous offset commits sent
previously with the
+ // `commitAsync` are guaranteed to have their callbacks invoked prior
to completion of
+ // `commitSync` (given that it does not time out).
+ try (Producer<byte[], byte[]> producer = cluster.producer();
+ var consumer = createConsumer(GroupProtocol.CONSUMER, false)
+ ) {
+ sendRecords(producer, tp, 3, System.currentTimeMillis());
+ sendRecords(producer, tp1, 3, System.currentTimeMillis());
+
+ consumer.assign(List.of(tp, tp1));
+
+ // Try without looking up the coordinator first
+ var cb = new CountConsumerCommitCallback();
+ consumer.commitAsync(Map.of(tp, new OffsetAndMetadata(1L)), cb);
+ consumer.commitSync(Map.of());
+
+ assertEquals(1, consumer.committed(Set.of(tp)).get(tp).offset());
+ assertEquals(1, cb.successCount);
+
+ // Try with coordinator known
+ consumer.commitAsync(Map.of(tp, new OffsetAndMetadata(2L)), cb);
+ consumer.commitSync(Map.of(tp1, new OffsetAndMetadata(2L)));
+
+ assertEquals(2, consumer.committed(Set.of(tp)).get(tp).offset());
+ assertEquals(2, consumer.committed(Set.of(tp1)).get(tp1).offset());
+ assertEquals(2, cb.successCount);
+
+ // Try with empty sync commit
+ consumer.commitAsync(Map.of(tp, new OffsetAndMetadata(3L)), cb);
+ consumer.commitSync(Map.of());
+
+ assertEquals(3, consumer.committed(Set.of(tp)).get(tp).offset());
+ assertEquals(2, consumer.committed(Set.of(tp1)).get(tp1).offset());
+ assertEquals(3, cb.successCount);
+ }
+ }
+
+ private Consumer<byte[], byte[]> createConsumer(GroupProtocol protocol,
boolean enableAutoCommit) {
+ return cluster.consumer(Map.of(
+ GROUP_ID_CONFIG, "test-group",
+ GROUP_PROTOCOL_CONFIG, protocol.name().toLowerCase(Locale.ROOT),
+ ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit
+ ));
+ }
+
+ private void sendAndAwaitAsyncCommit(
+ Consumer<byte[], byte[]> consumer,
+ Map<TopicPartition, OffsetAndMetadata> offsetsOpt
+ ) throws InterruptedException {
+ var commitCallback = new RetryCommitCallback(consumer, offsetsOpt);
+
+ commitCallback.sendAsyncCommit();
+ TestUtils.waitForCondition(() -> {
+ consumer.poll(Duration.ofMillis(100));
+ return commitCallback.isComplete;
+ }, "Failed to observe commit callback before timeout"
+ );
+
+ assertEquals(Optional.empty(), commitCallback.error);
+ }
+
+ private static class RetryCommitCallback implements OffsetCommitCallback {
+ private boolean isComplete = false;
+ private Optional<Exception> error = Optional.empty();
+
+ private final Consumer<byte[], byte[]> consumer;
+ private final Map<TopicPartition, OffsetAndMetadata> offsetsOpt;
+
+ public RetryCommitCallback(
+ Consumer<byte[], byte[]> consumer,
+ Map<TopicPartition, OffsetAndMetadata> offsetsOpt
+ ) {
+ this.consumer = consumer;
+ this.offsetsOpt = offsetsOpt;
+ }
+
+ @Override
+ public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
Exception exception) {
+ if (exception instanceof RetriableCommitFailedException) {
+ sendAsyncCommit();
+ } else {
+ isComplete = true;
+ error = Optional.ofNullable(exception);
+ }
+ }
+
+ void sendAsyncCommit() {
+ consumer.commitAsync(offsetsOpt, this);
+ }
+ }
+
+ private static class CountConsumerCommitCallback implements
OffsetCommitCallback {
+ private int successCount = 0;
+ private Optional<Exception> lastError = Optional.empty();
+
+ @Override
+ public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
Exception exception) {
+ if (exception == null) {
+ successCount += 1;
+ } else {
+ lastError = Optional.of(exception);
+ }
+ }
+ }
+
+ private void changeConsumerSubscriptionAndValidateAssignment(
+ Consumer<byte[], byte[]> consumer,
+ List<String> topicsToSubscribe,
+ Set<TopicPartition> expectedAssignment,
+ ConsumerRebalanceListener rebalanceListener
+ ) throws InterruptedException {
+ consumer.subscribe(topicsToSubscribe, rebalanceListener);
+ awaitAssignment(consumer, expectedAssignment);
+ }
+}
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala
deleted file mode 100644
index 0445e81cac1..00000000000
---
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala
+++ /dev/null
@@ -1,371 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding
copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package kafka.api
-
-import kafka.utils.{TestInfoUtils, TestUtils}
-import org.apache.kafka.clients.consumer._
-import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.common.serialization.{StringDeserializer,
StringSerializer}
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.test.MockConsumerInterceptor
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Timeout
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.MethodSource
-
-import java.time.Duration
-import java.util
-import java.util.Optional
-import scala.jdk.CollectionConverters._
-
-/**
- * Integration tests for the consumer that covers the logic related to
committing offsets.
- */
-@Timeout(600)
-class PlaintextConsumerCommitTest extends AbstractConsumerTest {
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- def testAutoCommitOnClose(groupProtocol: String): Unit = {
- this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
"true")
- val consumer = createConsumer()
-
- val numRecords = 10000
- val producer = createProducer()
- sendRecords(producer, numRecords, tp)
-
- consumer.subscribe(List(topic).asJava)
- awaitAssignment(consumer, Set(tp, tp2))
-
- // should auto-commit sought positions before closing
- consumer.seek(tp, 300)
- consumer.seek(tp2, 500)
- consumer.close()
-
- // now we should see the committed positions from another consumer
- val anotherConsumer = createConsumer()
- assertEquals(300, anotherConsumer.committed(Set(tp).asJava).get(tp).offset)
- assertEquals(500,
anotherConsumer.committed(Set(tp2).asJava).get(tp2).offset)
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- def testAutoCommitOnCloseAfterWakeup(groupProtocol: String): Unit = {
- this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
"true")
- val consumer = createConsumer()
-
- val numRecords = 10000
- val producer = createProducer()
- sendRecords(producer, numRecords, tp)
-
- consumer.subscribe(List(topic).asJava)
- awaitAssignment(consumer, Set(tp, tp2))
-
- // should auto-commit sought positions before closing
- consumer.seek(tp, 300)
- consumer.seek(tp2, 500)
-
- // wakeup the consumer before closing to simulate trying to break a poll
- // loop from another thread
- consumer.wakeup()
- consumer.close()
-
- // now we should see the committed positions from another consumer
- val anotherConsumer = createConsumer()
- assertEquals(300, anotherConsumer.committed(Set(tp).asJava).get(tp).offset)
- assertEquals(500,
anotherConsumer.committed(Set(tp2).asJava).get(tp2).offset)
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- def testCommitMetadata(groupProtocol: String): Unit = {
- val consumer = createConsumer()
- consumer.assign(List(tp).asJava)
-
- // sync commit
- val syncMetadata = new OffsetAndMetadata(5, Optional.of(15), "foo")
- consumer.commitSync(Map((tp, syncMetadata)).asJava)
- assertEquals(syncMetadata, consumer.committed(Set(tp).asJava).get(tp))
-
- // async commit
- val asyncMetadata = new OffsetAndMetadata(10, "bar")
- sendAndAwaitAsyncCommit(consumer, Some(Map(tp -> asyncMetadata)))
- assertEquals(asyncMetadata, consumer.committed(Set(tp).asJava).get(tp))
-
- // handle null metadata
- val nullMetadata = new OffsetAndMetadata(5, null)
- consumer.commitSync(Map(tp -> nullMetadata).asJava)
- assertEquals(nullMetadata, consumer.committed(Set(tp).asJava).get(tp))
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- def testAsyncCommit(groupProtocol: String): Unit = {
- val consumer = createConsumer()
- consumer.assign(List(tp).asJava)
-
- val callback = new CountConsumerCommitCallback
- val count = 5
-
- for (i <- 1 to count)
- consumer.commitAsync(Map(tp -> new OffsetAndMetadata(i)).asJava,
callback)
-
- TestUtils.pollUntilTrue(consumer, () => callback.successCount >= count ||
callback.lastError.isDefined,
- "Failed to observe commit callback before timeout", waitTimeMs = 10000)
-
- assertEquals(None, callback.lastError)
- assertEquals(count, callback.successCount)
- assertEquals(new OffsetAndMetadata(count),
consumer.committed(Set(tp).asJava).get(tp))
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- def testAutoCommitIntercept(groupProtocol: String): Unit = {
- val topic2 = "topic2"
- createTopic(topic2, 2, brokerCount)
-
- // produce records
- val numRecords = 100
- val testProducer = createProducer(keySerializer = new StringSerializer,
valueSerializer = new StringSerializer)
- (0 until numRecords).map { i =>
- testProducer.send(new ProducerRecord(tp.topic(), tp.partition(), s"key
$i", s"value $i"))
- }.foreach(_.get)
-
- // create consumer with interceptor
- this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
"true")
- this.consumerConfig.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
"org.apache.kafka.test.MockConsumerInterceptor")
- val testConsumer = createConsumer(keyDeserializer = new
StringDeserializer, valueDeserializer = new StringDeserializer)
- val rebalanceListener = new ConsumerRebalanceListener {
- override def onPartitionsAssigned(partitions:
util.Collection[TopicPartition]): Unit = {
- // keep partitions paused in this test so that we can verify the
commits based on specific seeks
- testConsumer.pause(partitions)
- }
-
- override def onPartitionsRevoked(partitions:
util.Collection[TopicPartition]): Unit = {}
- }
- changeConsumerSubscriptionAndValidateAssignment(testConsumer, List(topic),
Set(tp, tp2), rebalanceListener)
- testConsumer.seek(tp, 10)
- testConsumer.seek(tp2, 20)
-
- // change subscription to trigger rebalance
- val commitCountBeforeRebalance =
MockConsumerInterceptor.ON_COMMIT_COUNT.intValue()
- changeConsumerSubscriptionAndValidateAssignment(testConsumer,
- List(topic, topic2),
- Set(tp, tp2, new TopicPartition(topic2, 0), new TopicPartition(topic2,
1)),
- rebalanceListener)
-
- // after rebalancing, we should have reset to the committed positions
- assertEquals(10, testConsumer.committed(Set(tp).asJava).get(tp).offset)
- assertEquals(20, testConsumer.committed(Set(tp2).asJava).get(tp2).offset)
-
- // In both CLASSIC and CONSUMER protocols, interceptors are executed in
poll and close.
- // However, in the CONSUMER protocol, the assignment may be changed
outside of a poll, so
- // we need to poll once to ensure the interceptor is called.
- if (groupProtocol.toUpperCase == GroupProtocol.CONSUMER.name) {
- testConsumer.poll(Duration.ZERO)
- }
-
- assertTrue(MockConsumerInterceptor.ON_COMMIT_COUNT.intValue() >
commitCountBeforeRebalance)
-
- // verify commits are intercepted on close
- val commitCountBeforeClose =
MockConsumerInterceptor.ON_COMMIT_COUNT.intValue()
- testConsumer.close()
- assertTrue(MockConsumerInterceptor.ON_COMMIT_COUNT.intValue() >
commitCountBeforeClose)
- testProducer.close()
-
- // cleanup
- MockConsumerInterceptor.resetCounters()
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- def testCommitSpecifiedOffsets(groupProtocol: String): Unit = {
- val producer = createProducer()
- sendRecords(producer, numRecords = 5, tp)
- sendRecords(producer, numRecords = 7, tp2)
-
- val consumer = createConsumer()
- consumer.assign(List(tp, tp2).asJava)
-
- val pos1 = consumer.position(tp)
- val pos2 = consumer.position(tp2)
- consumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp, new
OffsetAndMetadata(3L))).asJava)
- assertEquals(3, consumer.committed(Set(tp).asJava).get(tp).offset)
- assertNull(consumer.committed(Set(tp2).asJava).get(tp2))
-
- // Positions should not change
- assertEquals(pos1, consumer.position(tp))
- assertEquals(pos2, consumer.position(tp2))
- consumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp2, new
OffsetAndMetadata(5L))).asJava)
- assertEquals(3, consumer.committed(Set(tp).asJava).get(tp).offset)
- assertEquals(5, consumer.committed(Set(tp2).asJava).get(tp2).offset)
-
- // Using async should pick up the committed changes after commit completes
- sendAndAwaitAsyncCommit(consumer, Some(Map(tp2 -> new
OffsetAndMetadata(7L))))
- assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset)
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- def testAutoCommitOnRebalance(groupProtocol: String): Unit = {
- val topic2 = "topic2"
- createTopic(topic2, 2, brokerCount)
-
- this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
"true")
- val consumer = createConsumer()
-
- val numRecords = 10000
- val producer = createProducer()
- sendRecords(producer, numRecords, tp)
-
- val rebalanceListener = new ConsumerRebalanceListener {
- override def onPartitionsAssigned(partitions:
util.Collection[TopicPartition]): Unit = {
- // keep partitions paused in this test so that we can verify the
commits based on specific seeks
- consumer.pause(partitions)
- }
-
- override def onPartitionsRevoked(partitions:
util.Collection[TopicPartition]): Unit = {}
- }
-
- consumer.subscribe(List(topic).asJava, rebalanceListener)
-
- awaitAssignment(consumer, Set(tp, tp2))
-
- consumer.seek(tp, 300)
- consumer.seek(tp2, 500)
-
- // change subscription to trigger rebalance
- consumer.subscribe(List(topic, topic2).asJava, rebalanceListener)
-
- val newAssignment = Set(tp, tp2, new TopicPartition(topic2, 0), new
TopicPartition(topic2, 1))
- awaitAssignment(consumer, newAssignment)
-
- // after rebalancing, we should have reset to the committed positions
- assertEquals(300, consumer.committed(Set(tp).asJava).get(tp).offset)
- assertEquals(500, consumer.committed(Set(tp2).asJava).get(tp2).offset)
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- def testSubscribeAndCommitSync(groupProtocol: String): Unit = {
- // This test ensure that the member ID is propagated from the group
coordinator when the
- // assignment is received into a subsequent offset commit
- val consumer = createConsumer()
- assertEquals(0, consumer.assignment.size)
- consumer.subscribe(List(topic).asJava)
- awaitAssignment(consumer, Set(tp, tp2))
-
- consumer.seek(tp, 0)
-
- consumer.commitSync()
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- def testPositionAndCommit(groupProtocol: String): Unit = {
- val producer = createProducer()
- var startingTimestamp = System.currentTimeMillis()
- sendRecords(producer, numRecords = 5, tp, startingTimestamp =
startingTimestamp)
-
- val topicPartition = new TopicPartition(topic, 15)
- val consumer = createConsumer()
-
assertNull(consumer.committed(Set(topicPartition).asJava).get(topicPartition))
-
- // position() on a partition that we aren't subscribed to throws an
exception
- assertThrows(classOf[IllegalStateException], () =>
consumer.position(topicPartition))
-
- consumer.assign(List(tp).asJava)
-
- assertEquals(0L, consumer.position(tp), "position() on a partition that we
are subscribed to should reset the offset")
- consumer.commitSync()
- assertEquals(0L, consumer.committed(Set(tp).asJava).get(tp).offset)
- consumeAndVerifyRecords(consumer = consumer, numRecords = 5,
startingOffset = 0, startingTimestamp = startingTimestamp)
- assertEquals(5L, consumer.position(tp), "After consuming 5 records,
position should be 5")
- consumer.commitSync()
- assertEquals(5L, consumer.committed(Set(tp).asJava).get(tp).offset,
"Committed offset should be returned")
-
- startingTimestamp = System.currentTimeMillis()
- sendRecords(producer, numRecords = 1, tp, startingTimestamp =
startingTimestamp)
-
- // another consumer in the same group should get the same position
- val otherConsumer = createConsumer()
- otherConsumer.assign(List(tp).asJava)
- consumeAndVerifyRecords(consumer = otherConsumer, numRecords = 1,
startingOffset = 5, startingTimestamp = startingTimestamp)
- }
-
- // TODO: This only works in the new consumer, but should be fixed for the
old consumer as well
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-
@MethodSource(Array("getTestGroupProtocolParametersConsumerGroupProtocolOnly"))
- def testCommitAsyncCompletedBeforeConsumerCloses(groupProtocol: String):
Unit = {
- // This is testing the contract that asynchronous offset commit are
completed before the consumer
- // is closed, even when no commit sync is performed as part of the close
(due to auto-commit
- // disabled, or simply because there are no consumed offsets).
- val producer = createProducer()
- sendRecords(producer, numRecords = 3, tp)
- sendRecords(producer, numRecords = 3, tp2)
-
- val consumer = createConsumer()
- consumer.assign(List(tp, tp2).asJava)
-
- // Try without looking up the coordinator first
- val cb = new CountConsumerCommitCallback
- consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new
OffsetAndMetadata(1L))).asJava, cb)
- consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new
OffsetAndMetadata(1L))).asJava, cb)
- consumer.close()
- assertEquals(2, cb.successCount)
- }
-
- // TODO: This only works in the new consumer, but should be fixed for the
old consumer as well
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-
@MethodSource(Array("getTestGroupProtocolParametersConsumerGroupProtocolOnly"))
- def testCommitAsyncCompletedBeforeCommitSyncReturns(groupProtocol: String):
Unit = {
- // This is testing the contract that asynchronous offset commits sent
previously with the
- // `commitAsync` are guaranteed to have their callbacks invoked prior to
completion of
- // `commitSync` (given that it does not time out).
- val producer = createProducer()
- sendRecords(producer, numRecords = 3, tp)
- sendRecords(producer, numRecords = 3, tp2)
-
- val consumer = createConsumer()
- consumer.assign(List(tp, tp2).asJava)
-
- // Try without looking up the coordinator first
- val cb = new CountConsumerCommitCallback
- consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new
OffsetAndMetadata(1L))).asJava, cb)
- consumer.commitSync(Map.empty[TopicPartition, OffsetAndMetadata].asJava)
- assertEquals(1, consumer.committed(Set(tp).asJava).get(tp).offset)
- assertEquals(1, cb.successCount)
-
- // Try with coordinator known
- consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new
OffsetAndMetadata(2L))).asJava, cb)
- consumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp2, new
OffsetAndMetadata(2L))).asJava)
- assertEquals(2, consumer.committed(Set(tp).asJava).get(tp).offset)
- assertEquals(2, consumer.committed(Set(tp2).asJava).get(tp2).offset)
- assertEquals(2, cb.successCount)
-
- // Try with empty sync commit
- consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new
OffsetAndMetadata(3L))).asJava, cb)
- consumer.commitSync(Map.empty[TopicPartition, OffsetAndMetadata].asJava)
- assertEquals(3, consumer.committed(Set(tp).asJava).get(tp).offset)
- assertEquals(2, consumer.committed(Set(tp2).asJava).get(tp2).offset)
- assertEquals(3, cb.successCount)
- }
-
- def changeConsumerSubscriptionAndValidateAssignment[K, V](consumer:
Consumer[K, V],
- topicsToSubscribe:
List[String],
-
expectedAssignment: Set[TopicPartition],
- rebalanceListener:
ConsumerRebalanceListener): Unit = {
- consumer.subscribe(topicsToSubscribe.asJava, rebalanceListener)
- awaitAssignment(consumer, expectedAssignment)
- }
-}