This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a399852cedf KAFKA-19042 Move PlaintextConsumerTest to
client-integration-tests module (#20081)
a399852cedf is described below
commit a399852cedf60382ef1f311c37abaecc2e5e59cd
Author: Ken Huang <[email protected]>
AuthorDate: Tue Jul 8 01:41:59 2025 +0800
KAFKA-19042 Move PlaintextConsumerTest to client-integration-tests module
(#20081)
Use Java to rewrite PlaintextConsumerTest by new test infra and move it
to client-integration-tests module.
Reviewers: Jhen-Yung Hsu <[email protected]>, TengYao Chi
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
build.gradle | 1 +
.../org/apache/kafka/clients/ClientsTestUtils.java | 49 +-
.../clients/consumer/PlaintextConsumerTest.java | 1640 ++++++++++++++++++++
.../integration/kafka/api/BaseConsumerTest.scala | 40 +-
.../kafka/api/PlaintextConsumerTest.scala | 821 +---------
5 files changed, 1681 insertions(+), 870 deletions(-)
diff --git a/build.gradle b/build.gradle
index 1ba506e6314..6f0288037a7 100644
--- a/build.gradle
+++ b/build.gradle
@@ -2001,6 +2001,7 @@ project(':clients:clients-integration-tests') {
implementation project(':group-coordinator')
implementation project(':group-coordinator:group-coordinator-api')
implementation project(':transaction-coordinator')
+ testImplementation project(':test-common:test-common-util')
testImplementation libs.junitJupiter
testImplementation libs.junitPlatformSuiteEngine
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java
index 0c99d51da88..dfdadeb5090 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java
@@ -64,19 +64,19 @@ public class ClientsTestUtils {
private ClientsTestUtils() {}
- public static List<ConsumerRecord<byte[], byte[]>> consumeRecords(
- Consumer<byte[], byte[]> consumer,
+ public static <K, V> List<ConsumerRecord<K, V>> consumeRecords(
+ Consumer<K, V> consumer,
int numRecords
) throws InterruptedException {
return consumeRecords(consumer, numRecords, Integer.MAX_VALUE);
}
- public static List<ConsumerRecord<byte[], byte[]>> consumeRecords(
- Consumer<byte[], byte[]> consumer,
+ public static <K, V> List<ConsumerRecord<K, V>> consumeRecords(
+ Consumer<K, V> consumer,
int numRecords,
int maxPollRecords
) throws InterruptedException {
- List<ConsumerRecord<byte[], byte[]>> consumedRecords = new
ArrayList<>();
+ List<ConsumerRecord<K, V>> consumedRecords = new ArrayList<>();
TestUtils.waitForCondition(() -> {
var records = consumer.poll(Duration.ofMillis(100));
records.forEach(consumedRecords::add);
@@ -138,6 +138,31 @@ public class ClientsTestUtils {
}, waitTimeMs, msg);
}
+ public static void consumeAndVerifyRecordsWithTimeTypeLogAppend(
+ Consumer<byte[], byte[]> consumer,
+ TopicPartition tp,
+ int numRecords,
+ long startingTimestamp
+ ) throws InterruptedException {
+ var records = consumeRecords(consumer, numRecords, Integer.MAX_VALUE);
+ var now = System.currentTimeMillis();
+ for (var i = 0; i < numRecords; i++) {
+ var record = records.get(i);
+ assertEquals(tp.topic(), record.topic());
+ assertEquals(tp.partition(), record.partition());
+
+ assertTrue(record.timestamp() >= startingTimestamp &&
record.timestamp() <= now,
+ "Got unexpected timestamp " + record.timestamp() + ".
Timestamp should be between [" + startingTimestamp + ", " + now + "]");
+
+ assertEquals(i, record.offset());
+ assertEquals(KEY_PREFIX + i, new String(record.key()));
+ assertEquals(VALUE_PREFIX + i, new String(record.value()));
+ // this is true only because K and V are byte arrays
+ assertEquals((KEY_PREFIX + i).length(),
record.serializedKeySize());
+ assertEquals((VALUE_PREFIX + i).length(),
record.serializedValueSize());
+ }
+ }
+
public static void consumeAndVerifyRecords(
Consumer<byte[], byte[]> consumer,
TopicPartition tp,
@@ -281,8 +306,8 @@ public class ClientsTestUtils {
return record;
}
- public static void sendAndAwaitAsyncCommit(
- Consumer<byte[], byte[]> consumer,
+ public static <K, V> void sendAndAwaitAsyncCommit(
+ Consumer<K, V> consumer,
Optional<Map<TopicPartition, OffsetAndMetadata>> offsetsOpt
) throws InterruptedException {
@@ -423,8 +448,8 @@ public class ClientsTestUtils {
}
}
- public static void sendAsyncCommit(
- Consumer<byte[], byte[]> consumer,
+ public static <K, V> void sendAsyncCommit(
+ Consumer<K, V> consumer,
OffsetCommitCallback callback,
Optional<Map<TopicPartition, OffsetAndMetadata>> offsetsOpt
) {
@@ -472,14 +497,14 @@ public class ClientsTestUtils {
}
}
- private static class RetryCommitCallback implements OffsetCommitCallback {
+ private static class RetryCommitCallback<K, V> implements
OffsetCommitCallback {
boolean isComplete = false;
Optional<Exception> error = Optional.empty();
- Consumer<byte[], byte[]> consumer;
+ Consumer<K, V> consumer;
Optional<Map<TopicPartition, OffsetAndMetadata>> offsetsOpt;
public RetryCommitCallback(
- Consumer<byte[], byte[]> consumer,
+ Consumer<K, V> consumer,
Optional<Map<TopicPartition, OffsetAndMetadata>> offsetsOpt
) {
this.consumer = consumer;
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
new file mode 100644
index 00000000000..5fd2ad20089
--- /dev/null
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
@@ -0,0 +1,1640 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import kafka.server.KafkaBroker;
+
+import
org.apache.kafka.clients.ClientsTestUtils.TestConsumerReassignmentListener;
+import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Flaky;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.server.quota.QuotaType;
+import org.apache.kafka.test.MockConsumerInterceptor;
+import org.apache.kafka.test.MockProducerInterceptor;
+import org.apache.kafka.test.TestUtils;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.IntStream;
+
+import static
org.apache.kafka.clients.ClientsTestUtils.BaseConsumerTestcase.BROKER_COUNT;
+import static
org.apache.kafka.clients.ClientsTestUtils.BaseConsumerTestcase.TOPIC;
+import static
org.apache.kafka.clients.ClientsTestUtils.BaseConsumerTestcase.TP;
+import static
org.apache.kafka.clients.ClientsTestUtils.BaseConsumerTestcase.testClusterResourceListener;
+import static
org.apache.kafka.clients.ClientsTestUtils.BaseConsumerTestcase.testCoordinatorFailover;
+import static
org.apache.kafka.clients.ClientsTestUtils.BaseConsumerTestcase.testSimpleConsumption;
+import static org.apache.kafka.clients.ClientsTestUtils.awaitAssignment;
+import static org.apache.kafka.clients.ClientsTestUtils.awaitRebalance;
+import static
org.apache.kafka.clients.ClientsTestUtils.consumeAndVerifyRecords;
+import static
org.apache.kafka.clients.ClientsTestUtils.consumeAndVerifyRecordsWithTimeTypeLogAppend;
+import static org.apache.kafka.clients.ClientsTestUtils.consumeRecords;
+import static
org.apache.kafka.clients.ClientsTestUtils.sendAndAwaitAsyncCommit;
+import static org.apache.kafka.clients.ClientsTestUtils.sendRecords;
+import static
org.apache.kafka.clients.CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG;
+import static
org.apache.kafka.clients.CommonClientConfigs.METADATA_MAX_AGE_CONFIG;
+import static
org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
+import static
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static
org.apache.kafka.clients.consumer.ConsumerConfig.CLIENT_ID_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_INSTANCE_ID_CONFIG;
+import static
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static
org.apache.kafka.clients.consumer.ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG;
+import static
org.apache.kafka.clients.consumer.ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG;
+import static
org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG;
+import static
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static
org.apache.kafka.clients.consumer.ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG;
+import static
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static
org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG;
+import static
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
+import static
org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
+import static
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ClusterTestDefaults(
+ types = {Type.KRAFT},
+ brokers = BROKER_COUNT,
+ serverProperties = {
+ @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value =
"1"),
+ @ClusterConfigProperty(key = GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG,
value = "100"),
+ @ClusterConfigProperty(key = GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG,
value = "60000"),
+ @ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
value = "10"),
+ }
+)
+public class PlaintextConsumerTest {
+
+ private final ClusterInstance cluster;
+ public static final double EPSILON = 0.1;
+
+ public PlaintextConsumerTest(ClusterInstance cluster) {
+ this.cluster = cluster;
+ }
+
+ @ClusterTest
+ public void testClassicConsumerSimpleConsumption() throws
InterruptedException {
+ testSimpleConsumption(cluster, Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)
+ ));
+ }
+
+ @ClusterTest
+ public void testAsyncConsumerSimpleConsumption() throws
InterruptedException {
+ testSimpleConsumption(cluster, Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+ ));
+ }
+
+ @ClusterTest
+ public void testClassicConsumerClusterResourceListener() throws
InterruptedException {
+ testClusterResourceListener(cluster, Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)
+ ));
+ }
+
+ @ClusterTest
+ public void testAsyncConsumerClusterResourceListener() throws
InterruptedException {
+ testClusterResourceListener(cluster, Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+ ));
+ }
+
+ @ClusterTest
+ public void testClassicConsumerCoordinatorFailover() throws
InterruptedException {
+ Map<String, Object> config = Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT),
+ SESSION_TIMEOUT_MS_CONFIG, 5001,
+ HEARTBEAT_INTERVAL_MS_CONFIG, 1000,
+ // Use higher poll timeout to avoid consumer leaving the group due
to timeout
+ MAX_POLL_INTERVAL_MS_CONFIG, 15000
+ );
+ testCoordinatorFailover(cluster, config);
+ }
+
+ @ClusterTest
+ public void testAsyncConsumeCoordinatorFailover() throws
InterruptedException {
+ Map<String, Object> config = Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT),
+ // Use higher poll timeout to avoid consumer leaving the group due
to timeout
+ MAX_POLL_INTERVAL_MS_CONFIG, 15000
+ );
+ testCoordinatorFailover(cluster, config);
+ }
+
+ @ClusterTest
+ public void testClassicConsumerHeaders() throws Exception {
+ testHeaders(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)
+ ));
+ }
+
+ @ClusterTest
+ public void testAsyncConsumerHeaders() throws Exception {
+ testHeaders(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+ ));
+ }
+
+ private void testHeaders(Map<String, Object> consumerConfig) throws
Exception {
+ var numRecords = 1;
+
+ try (Producer<byte[], byte[]> producer = cluster.producer();
+ Consumer<byte[], byte[]> consumer =
cluster.consumer(consumerConfig)
+ ) {
+ var record = new ProducerRecord<>(TP.topic(), TP.partition(),
null, "key".getBytes(), "value".getBytes());
+ record.headers().add("headerKey", "headerValue".getBytes());
+ producer.send(record);
+
+ assertEquals(0, consumer.assignment().size());
+ consumer.assign(List.of(TP));
+ assertEquals(1, consumer.assignment().size());
+
+ consumer.seek(TP, 0);
+ var records = consumeRecords(consumer, numRecords);
+ assertEquals(numRecords, records.size());
+ var header = records.get(0).headers().lastHeader("headerKey");
+ assertEquals("headerValue", header == null ? null : new
String(header.value()));
+ }
+ }
+
+ @ClusterTest
+ public void testClassicConsumerHeadersSerializerDeserializer() throws
Exception {
+ testHeadersSerializeDeserialize(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)
+ ));
+ }
+
+ @ClusterTest
+ public void testAsyncConsumerHeadersSerializerDeserializer() throws
Exception {
+ testHeadersSerializeDeserialize(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+ ));
+ }
+
+ private void testHeadersSerializeDeserialize(Map<String, Object> config)
throws InterruptedException {
+ var numRecords = 1;
+ Map<String, Object> consumerConfig = new HashMap<>(config);
+ consumerConfig.put(VALUE_DESERIALIZER_CLASS_CONFIG,
DeserializerImpl.class);
+ Map<String, Object> producerConfig = Map.of(
+ VALUE_SERIALIZER_CLASS_CONFIG, SerializerImpl.class.getName()
+ );
+
+ try (Producer<byte[], byte[]> producer =
cluster.producer(producerConfig);
+ Consumer<byte[], byte[]> consumer =
cluster.consumer(consumerConfig)
+ ) {
+ producer.send(new ProducerRecord<>(
+ TP.topic(),
+ TP.partition(),
+ null,
+ "key".getBytes(),
+ "value".getBytes())
+ );
+
+ assertEquals(0, consumer.assignment().size());
+ consumer.assign(List.of(TP));
+ assertEquals(1, consumer.assignment().size());
+
+ consumer.seek(TP, 0);
+ assertEquals(numRecords, consumeRecords(consumer,
numRecords).size());
+ }
+ }
+
+ @ClusterTest
+ public void testClassicConsumerAutoOffsetReset() throws Exception {
+ testAutoOffsetReset(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)
+ ));
+ }
+
+ @ClusterTest
+ public void testAsyncConsumerAutoOffsetReset() throws Exception {
+ testAutoOffsetReset(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+ ));
+ }
+
+ private void testAutoOffsetReset(Map<String, Object> consumerConfig)
throws Exception {
+ try (Producer<byte[], byte[]> producer = cluster.producer();
+ Consumer<byte[], byte[]> consumer =
cluster.consumer(consumerConfig)
+ ) {
+ var startingTimestamp = System.currentTimeMillis();
+ sendRecords(producer, TP, 1, startingTimestamp);
+ consumer.assign(List.of(TP));
+ consumeAndVerifyRecords(consumer, TP, 1, 0, 0, startingTimestamp);
+ }
+ }
+
+ @ClusterTest
+ public void testClassicConsumerGroupConsumption() throws Exception {
+ testGroupConsumption(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)
+ ));
+ }
+
+ @ClusterTest
+ public void testAsyncConsumerGroupConsumption() throws Exception {
+ testGroupConsumption(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+ ));
+ }
+
+ private void testGroupConsumption(Map<String, Object> consumerConfig)
throws Exception {
+ try (Producer<byte[], byte[]> producer = cluster.producer();
+ Consumer<byte[], byte[]> consumer =
cluster.consumer(consumerConfig)
+ ) {
+ var startingTimestamp = System.currentTimeMillis();
+ sendRecords(producer, TP, 10, startingTimestamp);
+ consumer.subscribe(List.of(TOPIC));
+ consumeAndVerifyRecords(consumer, TP, 1, 0, 0, startingTimestamp);
+ }
+ }
+
+ @ClusterTest
+ public void testClassicConsumerPartitionsFor() throws Exception {
+ testPartitionsFor(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)
+ ));
+ }
+
+ @ClusterTest
+ public void testAsyncConsumerPartitionsFor() throws Exception {
+ testPartitionsFor(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+ ));
+ }
+
+ private void testPartitionsFor(Map<String, Object> consumerConfig) throws
Exception {
+ var numParts = 2;
+ cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT);
+ cluster.createTopic("part-test", numParts, (short) 1);
+
+ try (var consumer = cluster.consumer(consumerConfig)) {
+ var partitions = consumer.partitionsFor(TOPIC);
+ assertNotNull(partitions);
+ assertEquals(2, partitions.size());
+ }
+ }
+
+ @ClusterTest
+ public void testClassicConsumerPartitionsForAutoCreate() throws Exception {
+ testPartitionsForAutoCreate(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)
+ ));
+ }
+
+ @ClusterTest
+ public void testAsyncConsumerPartitionsForAutoCreate() throws Exception {
+ testPartitionsForAutoCreate(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+ ));
+ }
+
+ private void testPartitionsForAutoCreate(Map<String, Object>
consumerConfig) throws Exception {
+ try (var consumer = cluster.consumer(consumerConfig)) {
+ // First call would create the topic
+ consumer.partitionsFor("non-exist-topic");
+ TestUtils.waitForCondition(
+ () -> !consumer.partitionsFor("non-exist-topic").isEmpty(),
+ "Timed out while awaiting non empty partitions."
+ );
+ }
+ }
+
+ @ClusterTest
+ public void testClassicConsumerPartitionsForInvalidTopic() {
+ testPartitionsForInvalidTopic(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)
+ ));
+ }
+
+ @ClusterTest
+ public void testAsyncConsumerPartitionsForInvalidTopic() {
+ testPartitionsForInvalidTopic(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+ ));
+ }
+
+ private void testPartitionsForInvalidTopic(Map<String, Object>
consumerConfig) {
+ try (var consumer = cluster.consumer(consumerConfig)) {
+ assertThrows(InvalidTopicException.class, () ->
consumer.partitionsFor(";3# ads,{234"));
+ }
+ }
+
+ @ClusterTest
+ public void testClassicConsumerSeek() throws Exception {
+ testSeek(
+ Map.of(GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)
+ ));
+ }
+
+ @ClusterTest
+ public void testAsyncConsumerSeek() throws Exception {
+ testSeek(
+ Map.of(GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+ ));
+ }
+
+ private void testSeek(Map<String, Object> consumerConfig) throws Exception
{
+ var totalRecords = 50;
+ var mid = totalRecords / 2;
+ cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT);
+
+ try (Producer<byte[], byte[]> producer = cluster.producer();
+ Consumer<byte[], byte[]> consumer =
cluster.consumer(consumerConfig)
+ ) {
+ var startingTimestamp = 0;
+ sendRecords(producer, TP, totalRecords, startingTimestamp);
+
+ consumer.assign(List.of(TP));
+ consumer.seekToEnd(List.of(TP));
+ assertEquals(totalRecords, consumer.position(TP));
+ assertTrue(consumer.poll(Duration.ofMillis(50)).isEmpty());
+
+ consumer.seekToBeginning(List.of(TP));
+ assertEquals(0, consumer.position(TP));
+ consumeAndVerifyRecords(consumer, TP, 1, 0, 0, startingTimestamp);
+
+ consumer.seek(TP, mid);
+ assertEquals(mid, consumer.position(TP));
+
+ consumeAndVerifyRecords(consumer, TP, 1, mid, mid, mid);
+
+ // Test seek compressed message
+ var tp2 = new TopicPartition(TOPIC, 1);
+ cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT);
+ sendCompressedMessages(totalRecords, tp2);
+ consumer.assign(List.of(tp2));
+
+ consumer.seekToEnd(List.of(tp2));
+ assertEquals(totalRecords, consumer.position(tp2));
+ assertTrue(consumer.poll(Duration.ofMillis(50)).isEmpty());
+
+ consumer.seekToBeginning(List.of(tp2));
+ assertEquals(0L, consumer.position(tp2));
+ consumeAndVerifyRecords(consumer, tp2, 1, 0);
+
+ consumer.seek(tp2, mid);
+ assertEquals(mid, consumer.position(tp2));
+ consumeAndVerifyRecords(consumer, tp2, 1, mid, mid, mid);
+ }
+ }
+
+ @ClusterTest
+ public void testClassicConsumerPartitionPauseAndResume() throws Exception {
+ testPartitionPauseAndResume(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)
+ ));
+ }
+
+ @ClusterTest
+ public void testAsyncConsumerPartitionPauseAndResume() throws Exception {
+ testPartitionPauseAndResume(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+ ));
+ }
+
+ private void testPartitionPauseAndResume(Map<String, Object>
consumerConfig) throws Exception {
+ var partitions = List.of(TP);
+ var numRecords = 5;
+
+ try (Producer<byte[], byte[]> producer = cluster.producer();
+ Consumer<byte[], byte[]> consumer =
cluster.consumer(consumerConfig)
+ ) {
+ var startingTimestamp = System.currentTimeMillis();
+ sendRecords(producer, TP, numRecords, startingTimestamp);
+
+ consumer.assign(partitions);
+ consumeAndVerifyRecords(consumer, TP, numRecords, 0, 0,
startingTimestamp);
+ consumer.pause(partitions);
+ startingTimestamp = System.currentTimeMillis();
+ sendRecords(producer, TP, numRecords, startingTimestamp);
+ assertTrue(consumer.poll(Duration.ofMillis(100)).isEmpty());
+ consumer.resume(partitions);
+ consumeAndVerifyRecords(consumer, TP, numRecords, 5, 0,
startingTimestamp);
+ }
+ }
+
+ @ClusterTest
+ public void testClassicConsumerInterceptors() throws Exception {
+ testInterceptors(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)
+ ));
+ }
+
+ @ClusterTest
+ public void testAsyncConsumerInterceptors() throws Exception {
+ testInterceptors(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+ ));
+ }
+
+ private void testInterceptors(Map<String, Object> consumerConfig) throws
Exception {
+ var appendStr = "mock";
+ MockConsumerInterceptor.resetCounters();
+ MockProducerInterceptor.resetCounters();
+
+ // create producer with interceptor
+ Map<String, Object> producerConfig = Map.of(
+ ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
MockProducerInterceptor.class.getName(),
+ KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(),
+ VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(),
+ "mock.interceptor.append", appendStr
+ );
+ // create consumer with interceptor
+ Map<String, Object> consumerConfigOverride = new
HashMap<>(consumerConfig);
+ consumerConfigOverride.put(INTERCEPTOR_CLASSES_CONFIG,
MockConsumerInterceptor.class.getName());
+ consumerConfigOverride.put(KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
+ consumerConfigOverride.put(VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
+
+ try (Producer<String, String> producer =
cluster.producer(producerConfig);
+ Consumer<String, String> consumer =
cluster.consumer(consumerConfigOverride)
+ ) {
+ // produce records
+ var numRecords = 10;
+ List<Future<RecordMetadata>> futures = new ArrayList<>();
+ for (var i = 0; i < numRecords; i++) {
+ Future<RecordMetadata> future = producer.send(
+ new ProducerRecord<>(TP.topic(), TP.partition(), "key " +
i, "value " + i)
+ );
+ futures.add(future);
+ }
+
+ // Wait for all sends to complete
+ futures.forEach(future -> assertDoesNotThrow(() -> future.get()));
+
+ assertEquals(numRecords,
MockProducerInterceptor.ONSEND_COUNT.intValue());
+ assertEquals(numRecords,
MockProducerInterceptor.ON_SUCCESS_COUNT.intValue());
+
+ // send invalid record
+ assertThrows(
+ Throwable.class,
+ () -> producer.send(null),
+ "Should not allow sending a null record"
+ );
+ assertEquals(
+ 1,
+ MockProducerInterceptor.ON_ERROR_COUNT.intValue(),
+ "Interceptor should be notified about exception"
+ );
+ assertEquals(
+ 0,
+
MockProducerInterceptor.ON_ERROR_WITH_METADATA_COUNT.intValue(),
+ "Interceptor should not receive metadata with an exception
when record is null"
+ );
+
+ consumer.assign(List.of(TP));
+ consumer.seek(TP, 0);
+
+ // consume and verify that values are modified by interceptors
+ var records = consumeRecords(consumer, numRecords);
+ for (var i = 0; i < numRecords; i++) {
+ ConsumerRecord<String, String> record = records.get(i);
+ assertEquals("key " + i, record.key());
+ assertEquals(("value " + i +
appendStr).toUpperCase(Locale.ROOT), record.value());
+ }
+
+ // commit sync and verify onCommit is called
+ var commitCountBefore =
MockConsumerInterceptor.ON_COMMIT_COUNT.intValue();
+ consumer.commitSync(Map.of(TP, new OffsetAndMetadata(2L)));
+ assertEquals(2, consumer.committed(Set.of(TP)).get(TP).offset());
+ assertEquals(commitCountBefore + 1,
MockConsumerInterceptor.ON_COMMIT_COUNT.intValue());
+
+ // commit async and verify onCommit is called
+ var offsetsToCommit = Map.of(TP, new OffsetAndMetadata(5L));
+ sendAndAwaitAsyncCommit(consumer, Optional.of(offsetsToCommit));
+ assertEquals(5, consumer.committed(Set.of(TP)).get(TP).offset());
+ assertEquals(commitCountBefore + 2,
MockConsumerInterceptor.ON_COMMIT_COUNT.intValue());
+ }
+ // cleanup
+ MockConsumerInterceptor.resetCounters();
+ MockProducerInterceptor.resetCounters();
+ }
+
+ @ClusterTest
+ public void testClassicConsumerInterceptorsWithWrongKeyValue() throws
Exception {
+ testInterceptorsWithWrongKeyValue(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)
+ ));
+ }
+
+ @ClusterTest
+ public void testAsyncConsumerInterceptorsWithWrongKeyValue() throws
Exception {
+ testInterceptorsWithWrongKeyValue(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+ ));
+ }
+
+ private void testInterceptorsWithWrongKeyValue(Map<String, Object>
consumerConfig) throws Exception {
+ var appendStr = "mock";
+ // create producer with interceptor that has different key and value
types from the producer
+ Map<String, Object> producerConfig = Map.of(
+ ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
MockProducerInterceptor.class.getName(),
+ "mock.interceptor.append", appendStr
+ );
+ // create consumer with interceptor that has different key and value
types from the consumer
+ Map<String, Object> consumerConfigOverride = new
HashMap<>(consumerConfig);
+ consumerConfigOverride.put(INTERCEPTOR_CLASSES_CONFIG,
MockConsumerInterceptor.class.getName());
+
+ try (Producer<byte[], byte[]> producer =
cluster.producer(producerConfig);
+ Consumer<byte[], byte[]> consumer =
cluster.consumer(consumerConfigOverride)
+ ) {
+ // producing records should succeed
+ producer.send(new ProducerRecord<>(
+ TP.topic(),
+ TP.partition(),
+ "key".getBytes(),
+ "value will not be modified".getBytes()
+ ));
+
+ consumer.assign(List.of(TP));
+ consumer.seek(TP, 0);
+ // consume and verify that values are not modified by interceptors
-- their exceptions are caught and logged, but not propagated
+ var records = consumeRecords(consumer, 1);
+ var record = records.get(0);
+ assertEquals("value will not be modified", new
String(record.value()));
+ }
+ }
+
+ @ClusterTest
+ public void testClassicConsumerConsumeMessagesWithCreateTime() throws
Exception {
+ testConsumeMessagesWithCreateTime(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)
+ ));
+ }
+
+ @ClusterTest
+ public void testAsyncConsumerConsumeMessagesWithCreateTime() throws
Exception {
+ testConsumeMessagesWithCreateTime(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+ ));
+ }
+
+ private void testConsumeMessagesWithCreateTime(Map<String, Object>
consumerConfig) throws Exception {
+ cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT);
+ var numRecords = 50;
+ var tp2 = new TopicPartition(TOPIC, 1);
+
+ try (Producer<byte[], byte[]> producer = cluster.producer();
+ Consumer<byte[], byte[]> consumer =
cluster.consumer(consumerConfig)
+ ) {
+ // Test non-compressed messages
+ var startingTimestamp = System.currentTimeMillis();
+ sendRecords(producer, TP, numRecords, startingTimestamp);
+ consumer.assign(List.of(TP));
+ consumeAndVerifyRecords(consumer, TP, numRecords, 0, 0,
startingTimestamp);
+
+ // Test compressed messages
+ sendCompressedMessages(numRecords, tp2);
+ consumer.assign(List.of(tp2));
+ consumeAndVerifyRecords(consumer, tp2, numRecords, 0, 0, 0);
+ }
+ }
+
+ @ClusterTest
+ public void testClassicConsumerConsumeMessagesWithLogAppendTime() throws
Exception {
+ testConsumeMessagesWithLogAppendTime(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)
+ ));
+ }
+
+ @ClusterTest
+ public void testAsyncConsumerConsumeMessagesWithLogAppendTime() throws
Exception {
+ testConsumeMessagesWithLogAppendTime(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+ ));
+ }
+
+ private void testConsumeMessagesWithLogAppendTime(Map<String, Object>
consumerConfig) throws Exception {
+ var topicName = "testConsumeMessagesWithLogAppendTime";
+ var startTime = System.currentTimeMillis();
+ var numRecords = 50;
+ cluster.createTopic(topicName, 2, (short) 2,
Map.of(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime"));
+
+ try (Consumer<byte[], byte[]> consumer =
cluster.consumer(consumerConfig)) {
+ // Test non-compressed messages
+ var tp1 = new TopicPartition(topicName, 0);
+ sendRecords(cluster, tp1, numRecords);
+ consumer.assign(List.of(tp1));
+ consumeAndVerifyRecordsWithTimeTypeLogAppend(consumer, tp1,
numRecords, startTime);
+
+ // Test compressed messages
+ var tp2 = new TopicPartition(topicName, 1);
+ sendCompressedMessages(numRecords, tp2);
+ consumer.assign(List.of(tp2));
+ consumeAndVerifyRecordsWithTimeTypeLogAppend(consumer, tp2,
numRecords, startTime);
+ }
+ }
+
+ @ClusterTest
+ public void testClassicConsumerListTopics() throws Exception {
+ testListTopics(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)
+ ));
+ }
+
+ @ClusterTest
+ public void testAsyncConsumerListTopics() throws Exception {
+ testListTopics(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+ ));
+ }
+
+ private void testListTopics(Map<String, Object> consumerConfig) throws
Exception {
+ var numParts = 2;
+ var topic1 = "part-test-topic-1";
+ var topic2 = "part-test-topic-2";
+ var topic3 = "part-test-topic-3";
+ cluster.createTopic(topic1, numParts, (short) 1);
+ cluster.createTopic(topic2, numParts, (short) 1);
+ cluster.createTopic(topic3, numParts, (short) 1);
+
+ sendRecords(cluster, new TopicPartition(topic1, 0), 1);
+
+ try (var consumer = cluster.consumer(consumerConfig)) {
+ // consumer some messages, and we can list the internal topic
__consumer_offsets
+ consumer.subscribe(List.of(topic1));
+ consumer.poll(Duration.ofMillis(100));
+ var topics = consumer.listTopics();
+ assertNotNull(topics);
+ assertEquals(4, topics.size());
+ assertEquals(2, topics.get(topic1).size());
+ assertEquals(2, topics.get(topic2).size());
+ assertEquals(2, topics.get(topic3).size());
+ }
+ }
+
+ @ClusterTest
+ public void testClassicConsumerPauseStateNotPreservedByRebalance() throws
Exception {
+ testPauseStateNotPreservedByRebalance(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT),
+ SESSION_TIMEOUT_MS_CONFIG, 100,
+ HEARTBEAT_INTERVAL_MS_CONFIG, 30
+ ));
+ }
+
+ @ClusterTest
+ public void testAsyncConsumerPauseStateNotPreservedByRebalance() throws
Exception {
+ testPauseStateNotPreservedByRebalance(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+ ));
+ }
+
+ private void testPauseStateNotPreservedByRebalance(Map<String, Object>
consumerConfig) throws Exception {
+ try (Producer<byte[], byte[]> producer = cluster.producer();
+ Consumer<byte[], byte[]> consumer =
cluster.consumer(consumerConfig)
+ ) {
+ var startingTimestamp = System.currentTimeMillis();
+ sendRecords(producer, TP, 5, startingTimestamp);
+ consumer.subscribe(List.of(TOPIC));
+ consumeAndVerifyRecords(consumer, TP, 5, 0, 0, startingTimestamp);
+ consumer.pause(List.of(TP));
+
+ // subscribe to a new topic to trigger a rebalance
+ consumer.subscribe(List.of("topic2"));
+
+ // after rebalance, our position should be reset and our pause
state lost,
+ // so we should be able to consume from the beginning
+ consumeAndVerifyRecords(consumer, TP, 0, 5, 0, startingTimestamp);
+ }
+ }
+
+ @ClusterTest
+ public void
testClassicConsumerPerPartitionLeadMetricsCleanUpWithSubscribe() throws
Exception {
+ String consumerClientId =
"testClassicConsumerPerPartitionLeadMetricsCleanUpWithSubscribe";
+ testPerPartitionLeadMetricsCleanUpWithSubscribe(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT),
+ GROUP_ID_CONFIG, consumerClientId,
+ CLIENT_ID_CONFIG, consumerClientId
+ ), consumerClientId);
+ }
+
+ @ClusterTest
+ public void testAsyncConsumerPerPartitionLeadMetricsCleanUpWithSubscribe()
throws Exception {
+ String consumerClientId =
"testAsyncConsumerPerPartitionLeadMetricsCleanUpWithSubscribe";
+ testPerPartitionLeadMetricsCleanUpWithSubscribe(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT),
+ GROUP_ID_CONFIG, consumerClientId,
+ CLIENT_ID_CONFIG, consumerClientId
+ ), consumerClientId);
+ }
+
+ private void testPerPartitionLeadMetricsCleanUpWithSubscribe(
+ Map<String, Object> consumerConfig,
+ String consumerClientId
+ ) throws Exception {
+ var numMessages = 1000;
+ var topic2 = "topic2";
+ var tp2 = new TopicPartition(TOPIC, 1);
+ cluster.createTopic(topic2, 2, (short) BROKER_COUNT);
+
+ try (Consumer<byte[], byte[]> consumer =
cluster.consumer(consumerConfig)) {
+ // send some messages.
+ sendRecords(cluster, TP, numMessages);
+
+ // Test subscribe
+ // Create a consumer and consumer some messages.
+ var listener = new TestConsumerReassignmentListener();
+ consumer.subscribe(List.of(TOPIC, topic2), listener);
+ var records = awaitNonEmptyRecords(consumer, TP);
+ assertEquals(1, listener.callsToAssigned, "should be assigned
once");
+
+ // Verify the metric exist.
+ Map<String, String> tags1 = Map.of(
+ "client-id", consumerClientId,
+ "topic", TP.topic(),
+ "partition", String.valueOf(TP.partition())
+ );
+
+ Map<String, String> tags2 = Map.of(
+ "client-id", consumerClientId,
+ "topic", tp2.topic(),
+ "partition", String.valueOf(tp2.partition())
+ );
+
+ var fetchLead0 = consumer.metrics().get(new
MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags1));
+ assertNotNull(fetchLead0);
+ assertEquals((double) records.count(), fetchLead0.metricValue(),
"The lead should be " + records.count());
+
+ // Remove topic from subscription
+ consumer.subscribe(List.of(topic2), listener);
+ awaitRebalance(consumer, listener);
+
+ // Verify the metric has gone
+ assertNull(consumer.metrics().get(new MetricName("records-lead",
"consumer-fetch-manager-metrics", "", tags1)));
+ assertNull(consumer.metrics().get(new MetricName("records-lead",
"consumer-fetch-manager-metrics", "", tags2)));
+ }
+ }
+
+ @ClusterTest
+ public void
testClassicConsumerPerPartitionLagMetricsCleanUpWithSubscribe() throws
Exception {
+ String consumerClientId =
"testClassicConsumerPerPartitionLagMetricsCleanUpWithSubscribe";
+ testPerPartitionLagMetricsCleanUpWithSubscribe(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT),
+ GROUP_ID_CONFIG, consumerClientId,
+ CLIENT_ID_CONFIG, consumerClientId
+ ), consumerClientId);
+ }
+
+ @ClusterTest
+ public void testAsyncConsumerPerPartitionLagMetricsCleanUpWithSubscribe()
throws Exception {
+ String consumerClientId =
"testAsyncConsumerPerPartitionLagMetricsCleanUpWithSubscribe";
+ testPerPartitionLagMetricsCleanUpWithSubscribe(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT),
+ GROUP_ID_CONFIG, consumerClientId,
+ CLIENT_ID_CONFIG, consumerClientId
+ ), consumerClientId);
+ }
+
+ private void testPerPartitionLagMetricsCleanUpWithSubscribe(
+ Map<String, Object> consumerConfig,
+ String consumerClientId
+ ) throws Exception {
+ int numMessages = 1000;
+ var topic2 = "topic2";
+ var tp2 = new TopicPartition(TOPIC, 1);
+ cluster.createTopic(topic2, 2, (short) BROKER_COUNT);
+
+ try (Consumer<byte[], byte[]> consumer =
cluster.consumer(consumerConfig)) {
+ // send some messages.
+ sendRecords(cluster, TP, numMessages);
+
+ // Test subscribe
+ // Create a consumer and consumer some messages.
+ var listener = new TestConsumerReassignmentListener();
+ consumer.subscribe(List.of(TOPIC, topic2), listener);
+ var records = awaitNonEmptyRecords(consumer, TP);
+ assertEquals(1, listener.callsToAssigned, "should be assigned
once");
+
+ // Verify the metric exist.
+ Map<String, String> tags1 = Map.of(
+ "client-id", consumerClientId,
+ "topic", TP.topic(),
+ "partition", String.valueOf(TP.partition())
+ );
+
+ Map<String, String> tags2 = Map.of(
+ "client-id", consumerClientId,
+ "topic", tp2.topic(),
+ "partition", String.valueOf(tp2.partition())
+ );
+
+ var fetchLag0 = consumer.metrics().get(new
MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags1));
+ assertNotNull(fetchLag0);
+ var expectedLag = numMessages - records.count();
+ assertEquals(expectedLag, (double) fetchLag0.metricValue(),
EPSILON, "The lag should be " + expectedLag);
+
+ // Remove topic from subscription
+ consumer.subscribe(List.of(topic2), listener);
+ awaitRebalance(consumer, listener);
+
+ // Verify the metric has gone
+ assertNull(consumer.metrics().get(new MetricName("records-lag",
"consumer-fetch-manager-metrics", "", tags1)));
+ assertNull(consumer.metrics().get(new MetricName("records-lag",
"consumer-fetch-manager-metrics", "", tags2)));
+ }
+ }
+
+ @ClusterTest
+ public void testClassicConsumerPerPartitionLeadMetricsCleanUpWithAssign()
throws Exception {
+ String consumerClientId =
"testClassicConsumerPerPartitionLeadMetricsCleanUpWithAssign";
+ testPerPartitionLeadMetricsCleanUpWithAssign(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT),
+ GROUP_ID_CONFIG, consumerClientId,
+ CLIENT_ID_CONFIG, consumerClientId
+ ), consumerClientId);
+ }
+
+ @ClusterTest
+ public void testAsyncConsumerPerPartitionLeadMetricsCleanUpWithAssign()
throws Exception {
+ String consumerClientId =
"testAsyncConsumerPerPartitionLeadMetricsCleanUpWithAssign";
+ testPerPartitionLeadMetricsCleanUpWithAssign(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT),
+ GROUP_ID_CONFIG, consumerClientId,
+ CLIENT_ID_CONFIG, consumerClientId
+ ), consumerClientId);
+ }
+
+ private void testPerPartitionLeadMetricsCleanUpWithAssign(
+ Map<String, Object> consumerConfig,
+ String consumerClientId
+ ) throws Exception {
+ var numMessages = 1000;
+ var tp2 = new TopicPartition(TOPIC, 1);
+ cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT);
+
+ try (Producer<byte[], byte[]> producer = cluster.producer();
+ Consumer<byte[], byte[]> consumer =
cluster.consumer(consumerConfig)
+ ) {
+ // Test assign send some messages.
+ sendRecords(producer, TP, numMessages, System.currentTimeMillis());
+ sendRecords(producer, tp2, numMessages,
System.currentTimeMillis());
+
+ consumer.assign(List.of(TP));
+ var records = awaitNonEmptyRecords(consumer, TP);
+
+ // Verify the metric exist.
+ Map<String, String> tags = Map.of(
+ "client-id", consumerClientId,
+ "topic", TP.topic(),
+ "partition", String.valueOf(TP.partition())
+ );
+
+ var fetchLead = consumer.metrics().get(new
MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags));
+ assertNotNull(fetchLead);
+ assertEquals((double) records.count(), fetchLead.metricValue(),
"The lead should be " + records.count());
+
+ consumer.assign(List.of(tp2));
+ awaitNonEmptyRecords(consumer, tp2);
+ assertNull(consumer.metrics().get(new MetricName("records-lead",
"consumer-fetch-manager-metrics", "", tags)));
+ }
+ }
+
+ @ClusterTest
+ public void testClassicConsumerPerPartitionLagMetricsCleanUpWithAssign()
throws Exception {
+ String consumerClientId =
"testClassicConsumerPerPartitionLagMetricsCleanUpWithAssign";
+ testPerPartitionLagMetricsCleanUpWithAssign(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT),
+ GROUP_ID_CONFIG, consumerClientId,
+ CLIENT_ID_CONFIG, consumerClientId
+ ), consumerClientId);
+ }
+
+ @ClusterTest
+ public void testAsyncConsumerPerPartitionLagMetricsCleanUpWithAssign()
throws Exception {
+ String consumerClientId =
"testAsyncConsumerPerPartitionLagMetricsCleanUpWithAssign";
+ testPerPartitionLagMetricsCleanUpWithAssign(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT),
+ GROUP_ID_CONFIG, consumerClientId,
+ CLIENT_ID_CONFIG, consumerClientId
+ ), consumerClientId);
+ }
+
+ private void testPerPartitionLagMetricsCleanUpWithAssign(
+ Map<String, Object> consumerConfig,
+ String consumerClientId
+ ) throws Exception {
+ var numMessages = 1000;
+ var tp2 = new TopicPartition(TOPIC, 1);
+ cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT);
+
+ try (Producer<byte[], byte[]> producer = cluster.producer();
+ Consumer<byte[], byte[]> consumer =
cluster.consumer(consumerConfig)
+ ) {
+ // Test assign send some messages.
+ sendRecords(producer, TP, numMessages, System.currentTimeMillis());
+ sendRecords(producer, tp2, numMessages,
System.currentTimeMillis());
+
+ consumer.assign(List.of(TP));
+ var records = awaitNonEmptyRecords(consumer, TP);
+
+ // Verify the metric exist.
+ Map<String, String> tags = Map.of(
+ "client-id", consumerClientId,
+ "topic", TP.topic(),
+ "partition", String.valueOf(TP.partition())
+ );
+
+ var fetchLag = consumer.metrics().get(new
MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags));
+ assertNotNull(fetchLag);
+
+ var expectedLag = numMessages - records.count();
+ assertEquals(expectedLag, (double) fetchLag.metricValue(),
EPSILON, "The lag should be " + expectedLag);
+ consumer.assign(List.of(tp2));
+ awaitNonEmptyRecords(consumer, tp2);
+ assertNull(consumer.metrics().get(new MetricName(TP +
".records-lag", "consumer-fetch-manager-metrics", "", tags)));
+ assertNull(consumer.metrics().get(new MetricName("records-lag",
"consumer-fetch-manager-metrics", "", tags)));
+ }
+ }
+
+ @ClusterTest
+ public void testClassicConsumerPerPartitionLagMetricsWhenReadCommitted()
throws Exception {
+ String consumerClientId =
"testClassicConsumerPerPartitionLagMetricsWhenReadCommitted";
+ testPerPartitionLagMetricsWhenReadCommitted(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT),
+ GROUP_ID_CONFIG, consumerClientId,
+ CLIENT_ID_CONFIG, consumerClientId,
+ ISOLATION_LEVEL_CONFIG, "read_committed"
+ ), consumerClientId);
+ }
+
+ @ClusterTest
+ public void testAsyncConsumerPerPartitionLagMetricsWhenReadCommitted()
throws Exception {
+ String consumerClientId =
"testAsyncConsumerPerPartitionLagMetricsWhenReadCommitted";
+ testPerPartitionLagMetricsWhenReadCommitted(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT),
+ GROUP_ID_CONFIG, consumerClientId,
+ CLIENT_ID_CONFIG, consumerClientId,
+ ISOLATION_LEVEL_CONFIG, "read_committed"
+ ), consumerClientId);
+ }
+
+ private void testPerPartitionLagMetricsWhenReadCommitted(
+ Map<String, Object> consumerConfig,
+ String consumerClientId
+ ) throws Exception {
+ var numMessages = 1000;
+ var tp2 = new TopicPartition(TOPIC, 1);
+ cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT);
+
+ try (Producer<byte[], byte[]> producer = cluster.producer();
+ Consumer<byte[], byte[]> consumer =
cluster.consumer(consumerConfig)
+ ) {
+ // Test assign send some messages.
+ sendRecords(producer, TP, numMessages, System.currentTimeMillis());
+ sendRecords(producer, tp2, numMessages,
System.currentTimeMillis());
+
+ consumer.assign(List.of(TP));
+ awaitNonEmptyRecords(consumer, TP);
+
+ // Verify the metric exist.
+ Map<String, String> tags = Map.of(
+ "client-id", consumerClientId,
+ "topic", TP.topic(),
+ "partition", String.valueOf(TP.partition())
+ );
+
+ var fetchLag = consumer.metrics().get(new
MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags));
+ assertNotNull(fetchLag);
+ }
+ }
+
+ @ClusterTest
+ public void
testClassicConsumerQuotaMetricsNotCreatedIfNoQuotasConfigured() throws
Exception {
+ var consumerClientId =
"testClassicConsumerQuotaMetricsNotCreatedIfNoQuotasConfigured";
+ testQuotaMetricsNotCreatedIfNoQuotasConfigured(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT),
+ GROUP_ID_CONFIG, consumerClientId,
+ CLIENT_ID_CONFIG, consumerClientId,
+ ISOLATION_LEVEL_CONFIG, "read_committed"
+ ), consumerClientId);
+ }
+
+ @ClusterTest
+ public void testAsyncConsumerQuotaMetricsNotCreatedIfNoQuotasConfigured()
throws Exception {
+ var consumerClientId =
"testAsyncConsumerQuotaMetricsNotCreatedIfNoQuotasConfigured";
+ testQuotaMetricsNotCreatedIfNoQuotasConfigured(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT),
+ GROUP_ID_CONFIG, consumerClientId,
+ CLIENT_ID_CONFIG, consumerClientId,
+ ISOLATION_LEVEL_CONFIG, "read_committed"
+ ), consumerClientId);
+ }
+
+ private void testQuotaMetricsNotCreatedIfNoQuotasConfigured(
+ Map<String, Object> consumerConfig,
+ String consumerClientId
+ ) throws Exception {
+ var producerClientId = UUID.randomUUID().toString();
+ var numRecords = 1000;
+ cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT);
+
+ try (Producer<byte[], byte[]> producer =
cluster.producer(Map.of(ProducerConfig.CLIENT_ID_CONFIG, producerClientId));
+ Consumer<byte[], byte[]> consumer =
cluster.consumer(consumerConfig)
+ ) {
+ var startingTimestamp = System.currentTimeMillis();
+ sendRecords(producer, TP, numRecords, startingTimestamp);
+
+ consumer.assign(List.of(TP));
+ consumer.seek(TP, 0);
+ consumeAndVerifyRecords(consumer, TP, numRecords, 0, 0,
startingTimestamp);
+
+ var brokers = cluster.brokers().values();
+ brokers.forEach(broker -> assertNoMetric(broker, "byte-rate",
QuotaType.PRODUCE, producerClientId));
+ brokers.forEach(broker -> assertNoMetric(broker, "throttle-time",
QuotaType.PRODUCE, producerClientId));
+ brokers.forEach(broker -> assertNoMetric(broker, "byte-rate",
QuotaType.FETCH, consumerClientId));
+ brokers.forEach(broker -> assertNoMetric(broker, "throttle-time",
QuotaType.FETCH, consumerClientId));
+ brokers.forEach(broker -> assertNoMetric(broker, "request-time",
QuotaType.REQUEST, producerClientId));
+ brokers.forEach(broker -> assertNoMetric(broker, "throttle-time",
QuotaType.REQUEST, producerClientId));
+ brokers.forEach(broker -> assertNoMetric(broker, "request-time",
QuotaType.REQUEST, consumerClientId));
+ brokers.forEach(broker -> assertNoMetric(broker, "throttle-time",
QuotaType.REQUEST, consumerClientId));
+ }
+ }
+
+ private void assertNoMetric(KafkaBroker broker, String name, QuotaType
quotaType, String clientId) {
+ var metricName = broker.metrics().metricName(name,
quotaType.toString(), "", "user", "", "client-id", clientId);
+ assertNull(broker.metrics().metric(metricName), "Metric should not
have been created " + metricName);
+ }
+
+ @ClusterTest
+ public void
testClassicConsumerSeekThrowsIllegalStateIfPartitionsNotAssigned() throws
Exception {
+ testSeekThrowsIllegalStateIfPartitionsNotAssigned(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)
+ ));
+ }
+
+ @ClusterTest
+ public void
testAsyncConsumerSeekThrowsIllegalStateIfPartitionsNotAssigned() throws
Exception {
+ testSeekThrowsIllegalStateIfPartitionsNotAssigned(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+ ));
+ }
+
+ private void testSeekThrowsIllegalStateIfPartitionsNotAssigned(Map<String,
Object> consumerConfig) throws Exception {
+ cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT);
+ try (var consumer = cluster.consumer(consumerConfig)) {
+ var e = assertThrows(IllegalStateException.class, () ->
consumer.seekToEnd(List.of(TP)));
+ assertEquals("No current assignment for partition " + TP,
e.getMessage());
+ }
+ }
+
+ @ClusterTest
+ public void testClassicConsumingWithNullGroupId() throws Exception {
+ testConsumingWithNullGroupId(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT),
+ KEY_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName(),
+ VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName(),
+ BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()
+ ));
+ }
+
+ @ClusterTest
+ public void testAsyncConsumerConsumingWithNullGroupId() throws Exception {
+ testConsumingWithNullGroupId(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT),
+ KEY_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName(),
+ VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName(),
+ BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()
+ ));
+ }
+
+ private void testConsumingWithNullGroupId(Map<String, Object>
consumerConfig) throws Exception {
+ var partition = 0;
+ cluster.createTopic(TOPIC, 1, (short) 1);
+
+ // consumer 1 uses the default group id and consumes from earliest
offset
+ Map<String, Object> consumer1Config = new HashMap<>(consumerConfig);
+ consumer1Config.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
+ consumer1Config.put(CLIENT_ID_CONFIG, "consumer1");
+
+ // consumer 2 uses the default group id and consumes from latest offset
+ Map<String, Object> consumer2Config = new HashMap<>(consumerConfig);
+ consumer2Config.put(AUTO_OFFSET_RESET_CONFIG, "latest");
+ consumer2Config.put(CLIENT_ID_CONFIG, "consumer2");
+
+ // consumer 3 uses the default group id and starts from an explicit
offset
+ Map<String, Object> consumer3Config = new HashMap<>(consumerConfig);
+ consumer3Config.put(CLIENT_ID_CONFIG, "consumer3");
+
+ try (Producer<byte[], byte[]> producer = cluster.producer();
+ Consumer<byte[], byte[]> consumer1 = new
KafkaConsumer<>(consumer1Config);
+ Consumer<byte[], byte[]> consumer2 = new
KafkaConsumer<>(consumer2Config);
+ Consumer<byte[], byte[]> consumer3 = new
KafkaConsumer<>(consumer3Config)
+ ) {
+ producer.send(new ProducerRecord<>(TOPIC, partition,
"k1".getBytes(), "v1".getBytes())).get();
+ producer.send(new ProducerRecord<>(TOPIC, partition,
"k2".getBytes(), "v2".getBytes())).get();
+ producer.send(new ProducerRecord<>(TOPIC, partition,
"k3".getBytes(), "v3".getBytes())).get();
+
+ consumer1.assign(List.of(TP));
+ consumer2.assign(List.of(TP));
+ consumer3.assign(List.of(TP));
+ consumer3.seek(TP, 1);
+
+ var numRecords1 = consumer1.poll(Duration.ofMillis(5000)).count();
+ assertThrows(InvalidGroupIdException.class, consumer1::commitSync);
+ assertThrows(InvalidGroupIdException.class, () ->
consumer2.committed(Set.of(TP)));
+
+ var numRecords2 = consumer2.poll(Duration.ofMillis(5000)).count();
+ var numRecords3 = consumer3.poll(Duration.ofMillis(5000)).count();
+
+ consumer1.unsubscribe();
+ consumer2.unsubscribe();
+ consumer3.unsubscribe();
+
+ assertTrue(consumer1.assignment().isEmpty());
+ assertTrue(consumer2.assignment().isEmpty());
+ assertTrue(consumer3.assignment().isEmpty());
+
+ consumer1.close();
+ consumer2.close();
+ consumer3.close();
+
+ assertEquals(3, numRecords1, "Expected consumer1 to consume from
earliest offset");
+ assertEquals(0, numRecords2, "Expected consumer2 to consume from
latest offset");
+ assertEquals(2, numRecords3, "Expected consumer3 to consume from
offset 1");
+ }
+ }
+
+ @ClusterTest
+ public void testClassicConsumerNullGroupIdNotSupportedIfCommitting()
throws Exception {
+ testNullGroupIdNotSupportedIfCommitting(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT),
+ KEY_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName(),
+ VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName(),
+ BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers(),
+ AUTO_OFFSET_RESET_CONFIG, "earliest",
+ CLIENT_ID_CONFIG, "consumer1"
+ ));
+ }
+
+ @ClusterTest
+ public void testAsyncConsumerNullGroupIdNotSupportedIfCommitting() throws
Exception {
+ testNullGroupIdNotSupportedIfCommitting(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT),
+ KEY_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName(),
+ VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName(),
+ BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers(),
+ AUTO_OFFSET_RESET_CONFIG, "earliest",
+ CLIENT_ID_CONFIG, "consumer1"
+ ));
+ }
+
+ private void testNullGroupIdNotSupportedIfCommitting(Map<String, Object>
consumerConfig) throws Exception {
+ cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT);
+ try (var consumer = new KafkaConsumer<>(consumerConfig)) {
+ consumer.assign(List.of(TP));
+ assertThrows(InvalidGroupIdException.class, consumer::commitSync);
+ }
+ }
+
+ @ClusterTest
+ public void
testClassicConsumerStaticConsumerDetectsNewPartitionCreatedAfterRestart()
throws Exception {
+ testStaticConsumerDetectsNewPartitionCreatedAfterRestart(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT),
+ GROUP_ID_CONFIG, "my-group-id",
+ GROUP_INSTANCE_ID_CONFIG, "my-instance-id",
+ METADATA_MAX_AGE_CONFIG, 100,
+ MAX_POLL_INTERVAL_MS_CONFIG, 6000
+ ));
+ }
+
+ @ClusterTest
+ public void
testAsyncConsumerStaticConsumerDetectsNewPartitionCreatedAfterRestart() throws
Exception {
+ testStaticConsumerDetectsNewPartitionCreatedAfterRestart(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT),
+ GROUP_ID_CONFIG, "my-group-id",
+ GROUP_INSTANCE_ID_CONFIG, "my-instance-id",
+ METADATA_MAX_AGE_CONFIG, 100,
+ MAX_POLL_INTERVAL_MS_CONFIG, 6000
+ ));
+ }
+
+ private void
testStaticConsumerDetectsNewPartitionCreatedAfterRestart(Map<String, Object>
consumerConfig) throws Exception {
+ var foo = "foo";
+ var foo0 = new TopicPartition(foo, 0);
+ var foo1 = new TopicPartition(foo, 1);
+ cluster.createTopic(foo, 1, (short) 1);
+
+ try (Consumer<byte[], byte[]> consumer1 =
cluster.consumer(consumerConfig);
+ Consumer<byte[], byte[]> consumer2 =
cluster.consumer(consumerConfig);
+ var admin = cluster.admin()
+ ) {
+ consumer1.subscribe(List.of(foo));
+ awaitAssignment(consumer1, Set.of(foo0));
+ consumer1.close();
+
+ consumer2.subscribe(List.of(foo));
+ awaitAssignment(consumer2, Set.of(foo0));
+
+ admin.createPartitions(Map.of(foo,
NewPartitions.increaseTo(2))).all().get();
+ awaitAssignment(consumer2, Set.of(foo0, foo1));
+ }
+ }
+
+ @ClusterTest
+ public void testClassicConsumerEndOffsets() throws Exception {
+ testEndOffsets(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT),
+ METADATA_MAX_AGE_CONFIG, 100,
+ MAX_POLL_INTERVAL_MS_CONFIG, 6000
+ ));
+ }
+
+ @ClusterTest
+ public void testAsyncConsumerEndOffsets() throws Exception {
+ testEndOffsets(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT),
+ METADATA_MAX_AGE_CONFIG, 100,
+ MAX_POLL_INTERVAL_MS_CONFIG, 6000
+ ));
+ }
+
+ private void testEndOffsets(Map<String, Object> consumerConfig) throws
Exception {
+ var numRecords = 10000;
+ var tp2 = new TopicPartition(TOPIC, 1);
+ cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT);
+
+ try (Producer<byte[], byte[]> producer = cluster.producer();
+ Consumer<byte[], byte[]> consumer =
cluster.consumer(consumerConfig)
+ ) {
+ var startingTimestamp = System.currentTimeMillis();
+ for (var i = 0; i < numRecords; i++) {
+ var timestamp = startingTimestamp + (long) i;
+ ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
+ TP.topic(),
+ TP.partition(),
+ timestamp,
+ ("key " + i).getBytes(),
+ ("value " + i).getBytes()
+ );
+ producer.send(record);
+ }
+ producer.flush();
+
+ consumer.subscribe(List.of(TOPIC));
+ awaitAssignment(consumer, Set.of(TP, tp2));
+
+ var endOffsets = consumer.endOffsets(Set.of(TP));
+ assertEquals(numRecords, endOffsets.get(TP));
+ }
+ }
+
+ @ClusterTest
+ public void testClassicConsumerFetchOffsetsForTime() throws Exception {
+ testFetchOffsetsForTime(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)
+ ));
+ }
+
+ @ClusterTest
+ public void testAsyncConsumerFetchOffsetsForTime() throws Exception {
+ testFetchOffsetsForTime(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+ ));
+ }
+
+ private void testFetchOffsetsForTime(Map<String, Object> consumerConfig)
throws Exception {
+ var numPartitions = 2;
+ var tp2 = new TopicPartition(TOPIC, 1);
+ cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT);
+
+ try (Producer<byte[], byte[]> producer = cluster.producer();
+ Consumer<byte[], byte[]> consumer =
cluster.consumer(consumerConfig)
+ ) {
+ Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
+ for (int part = 0, i = 0; part < numPartitions; part++, i++) {
+ var tp = new TopicPartition(TOPIC, part);
+ // key, val, and timestamp equal to the sequence number.
+ sendRecords(producer, tp, 100, 0);
+ timestampsToSearch.put(tp, i * 20L);
+ }
+ // Test negative target time
+ assertThrows(IllegalArgumentException.class, () ->
consumer.offsetsForTimes(Map.of(TP, -1L)));
+ var timestampOffsets =
consumer.offsetsForTimes(timestampsToSearch);
+
+ var timestampTp0 = timestampOffsets.get(TP);
+ assertEquals(0, timestampTp0.offset());
+ assertEquals(0, timestampTp0.timestamp());
+ assertEquals(Optional.of(0), timestampTp0.leaderEpoch());
+
+ var timestampTp1 = timestampOffsets.get(tp2);
+ assertEquals(20, timestampTp1.offset());
+ assertEquals(20, timestampTp1.timestamp());
+ assertEquals(Optional.of(0), timestampTp1.leaderEpoch());
+ }
+ }
+
+ @ClusterTest
+ public void testClassicConsumerPositionRespectsTimeout() {
+ testPositionRespectsTimeout(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)
+ ));
+ }
+
+ @ClusterTest
+ public void testAsyncConsumerPositionRespectsTimeout() {
+ testPositionRespectsTimeout(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+ ));
+ }
+
+ private void testPositionRespectsTimeout(Map<String, Object>
consumerConfig) {
+ var topicPartition = new TopicPartition(TOPIC, 15);
+ try (var consumer = cluster.consumer(consumerConfig)) {
+ consumer.assign(List.of(topicPartition));
+ // When position() is called for a topic/partition that doesn't
exist, the consumer will repeatedly update the
+ // local metadata. However, it should give up after the
user-supplied timeout has past.
+ assertThrows(TimeoutException.class, () ->
consumer.position(topicPartition, Duration.ofSeconds(3)));
+ }
+ }
+
+ @ClusterTest
+ public void testClassicConsumerPositionRespectsWakeup() {
+ testPositionRespectsWakeup(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)
+ ));
+ }
+
+ @ClusterTest
+ public void testAsyncConsumerPositionRespectsWakeup() {
+ testPositionRespectsWakeup(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+ ));
+ }
+
+ private void testPositionRespectsWakeup(Map<String, Object>
consumerConfig) {
+ var topicPartition = new TopicPartition(TOPIC, 15);
+ try (var consumer = cluster.consumer(consumerConfig)) {
+ consumer.assign(List.of(topicPartition));
+ CompletableFuture.runAsync(() -> {
+ try {
+ TimeUnit.SECONDS.sleep(1);
+ consumer.wakeup();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ });
+ assertThrows(WakeupException.class, () ->
consumer.position(topicPartition, Duration.ofSeconds(3)));
+ }
+ }
+
+ @ClusterTest
+ public void testClassicConsumerPositionWithErrorConnectionRespectsWakeup()
{
+ testPositionWithErrorConnectionRespectsWakeup(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT),
+ // make sure the connection fails
+ BOOTSTRAP_SERVERS_CONFIG, "localhost:12345"
+ ));
+ }
+
+ @ClusterTest
+ public void testAsyncConsumerPositionWithErrorConnectionRespectsWakeup() {
+ testPositionWithErrorConnectionRespectsWakeup(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT),
+ // make sure the connection fails
+ BOOTSTRAP_SERVERS_CONFIG, "localhost:12345"
+ ));
+ }
+
+ private void testPositionWithErrorConnectionRespectsWakeup(Map<String,
Object> consumerConfig) {
+ var topicPartition = new TopicPartition(TOPIC, 15);
+ try (var consumer = cluster.consumer(consumerConfig)) {
+ consumer.assign(List.of(topicPartition));
+ CompletableFuture.runAsync(() -> {
+ try {
+ TimeUnit.SECONDS.sleep(1);
+ consumer.wakeup();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ });
+ assertThrows(WakeupException.class, () ->
consumer.position(topicPartition, Duration.ofSeconds(100)));
+ }
+ }
+
+ @Flaky("KAFKA-18031")
+ @ClusterTest
+ public void testClassicConsumerCloseLeavesGroupOnInterrupt() throws
Exception {
+ testCloseLeavesGroupOnInterrupt(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT),
+ KEY_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName(),
+ VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName(),
+ AUTO_OFFSET_RESET_CONFIG, "earliest",
+ GROUP_ID_CONFIG, "group_test,",
+ BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()
+ ));
+ }
+
+ @Flaky("KAFKA-18031")
+ @ClusterTest
+ public void testAsyncConsumerCloseLeavesGroupOnInterrupt() throws
Exception {
+ testCloseLeavesGroupOnInterrupt(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT),
+ KEY_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName(),
+ VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName(),
+ AUTO_OFFSET_RESET_CONFIG, "earliest",
+ GROUP_ID_CONFIG, "group_test,",
+ BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()
+ ));
+ }
+
+ private void testCloseLeavesGroupOnInterrupt(Map<String, Object>
consumerConfig) throws Exception {
+ try (Consumer<byte[], byte[]> consumer =
cluster.consumer(consumerConfig)) {
+ var listener = new TestConsumerReassignmentListener();
+ consumer.subscribe(List.of(TOPIC), listener);
+ awaitRebalance(consumer, listener);
+
+ assertEquals(1, listener.callsToAssigned);
+ assertEquals(0, listener.callsToRevoked);
+
+ try {
+ Thread.currentThread().interrupt();
+ assertThrows(InterruptException.class, consumer::close);
+ } finally {
+ // Clear the interrupted flag so we don't create problems for
subsequent tests.
+ Thread.interrupted();
+ }
+
+ assertEquals(1, listener.callsToAssigned);
+ assertEquals(1, listener.callsToRevoked);
+
+ Map<String, Object> consumerConfigMap = new
HashMap<>(consumerConfig);
+ var config = new ConsumerConfig(consumerConfigMap);
+
+ // Set the wait timeout to be only *half* the configured session
timeout. This way we can make sure that the
+ // consumer explicitly left the group as opposed to being kicked
out by the broker.
+ var leaveGroupTimeoutMs = config.getInt(SESSION_TIMEOUT_MS_CONFIG)
/ 2;
+
+ TestUtils.waitForCondition(
+ () -> checkGroupMemberEmpty(config),
+ leaveGroupTimeoutMs,
+ "Consumer did not leave the consumer group within " +
leaveGroupTimeoutMs + " ms of close"
+ );
+ }
+ }
+
+ private boolean checkGroupMemberEmpty(ConsumerConfig config) {
+ try (var admin = cluster.admin()) {
+ var groupId = config.getString(GROUP_ID_CONFIG);
+ var result = admin.describeConsumerGroups(List.of(groupId));
+ var groupDescription = result.describedGroups().get(groupId).get();
+ return groupDescription.members().isEmpty();
+ } catch (ExecutionException | InterruptedException e) {
+ return false;
+ }
+ }
+
+ @ClusterTest
+ public void testClassicConsumerOffsetRelatedWhenTimeoutZero() throws
Exception {
+ testOffsetRelatedWhenTimeoutZero(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)
+ ));
+ }
+
+ @ClusterTest
+ public void testAsyncConsumerOffsetRelatedWhenTimeoutZero() throws
Exception {
+ testOffsetRelatedWhenTimeoutZero(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+ ));
+ }
+
+ private void testOffsetRelatedWhenTimeoutZero(Map<String, Object>
consumerConfig) throws Exception {
+ cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT);
+ try (var consumer = cluster.consumer(consumerConfig)) {
+ var result1 = consumer.beginningOffsets(List.of(TP),
Duration.ZERO);
+ assertNotNull(result1);
+ assertEquals(0, result1.size());
+
+ var result2 = consumer.endOffsets(List.of(TP), Duration.ZERO);
+ assertNotNull(result2);
+ assertEquals(0, result2.size());
+
+ var result3 = consumer.offsetsForTimes(Map.of(TP, 0L),
Duration.ZERO);
+ assertNotNull(result3);
+ assertEquals(1, result3.size());
+ assertNull(result3.get(TP));
+ }
+ }
+
+ private void sendCompressedMessages(int numRecords, TopicPartition tp) {
+ Map<String, Object> config = Map.of(
+ COMPRESSION_TYPE_CONFIG, CompressionType.GZIP.name,
+ LINGER_MS_CONFIG, Integer.MAX_VALUE
+ );
+ try (Producer<byte[], byte[]> producer = cluster.producer(config)) {
+ IntStream.range(0, numRecords).forEach(i -> producer.send(new
ProducerRecord<>(
+ tp.topic(),
+ tp.partition(),
+ (long) i,
+ ("key " + i).getBytes(),
+ ("value " + i).getBytes()
+ )));
+ }
+ }
+
+ private ConsumerRecords<byte[], byte[]> awaitNonEmptyRecords(
+ Consumer<byte[], byte[]> consumer,
+ TopicPartition tp
+ ) throws Exception {
+ AtomicReference<ConsumerRecords<byte[], byte[]>> result = new
AtomicReference<>();
+
+ TestUtils.waitForCondition(() -> {
+ var polledRecords = consumer.poll(Duration.ofSeconds(10));
+ boolean hasRecords = !polledRecords.isEmpty();
+ if (hasRecords) {
+ result.set(polledRecords);
+ }
+ return hasRecords;
+ }, "Timed out waiting for non-empty records from topic " + tp.topic()
+ " partition " + tp.partition());
+
+ return result.get();
+ }
+
+ public static class SerializerImpl implements Serializer<byte[]> {
+ private final ByteArraySerializer serializer = new
ByteArraySerializer();
+
+ @Override
+ public byte[] serialize(String topic, byte[] data) {
+ throw new RuntimeException("This method should not be called");
+ }
+
+ @Override
+ public byte[] serialize(String topic, Headers headers, byte[] data) {
+ headers.add("content-type", "application/octet-stream".getBytes());
+ return serializer.serialize(topic, headers, data);
+ }
+ }
+
+ public static class DeserializerImpl implements Deserializer<byte[]> {
+ private final ByteArrayDeserializer deserializer = new
ByteArrayDeserializer();
+
+ @Override
+ public byte[] deserialize(String topic, byte[] data) {
+ throw new RuntimeException("This method should not be called");
+ }
+
+ @Override
+ public byte[] deserialize(String topic, Headers headers, byte[] data) {
+ Header contentType = headers.lastHeader("content-type");
+ assertNotNull(contentType);
+ assertEquals("application/octet-stream", new
String(contentType.value()));
+ return deserializer.deserialize(topic, headers, data);
+ }
+ }
+}
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index 2d5e1a2c631..adfb657b776 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -19,10 +19,9 @@ package kafka.api
import kafka.utils.TestInfoUtils
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig,
GroupProtocol}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}
-import org.apache.kafka.common.header.Headers
import org.apache.kafka.common.{ClusterResource, ClusterResourceListener,
PartitionInfo}
import org.apache.kafka.common.internals.Topic
-import org.apache.kafka.common.serialization.{ByteArrayDeserializer,
ByteArraySerializer, Deserializer, Serializer}
+import org.apache.kafka.common.serialization.{Deserializer, Serializer}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
@@ -130,41 +129,4 @@ object BaseConsumerTest {
override def onUpdate(clusterResource: ClusterResource): Unit =
updateConsumerCount.incrementAndGet()
override def deserialize(topic: String, data: Array[Byte]): Array[Byte] =
data
}
-
- class SerializerImpl extends Serializer[Array[Byte]] {
- var serializer = new ByteArraySerializer()
-
- override def serialize(topic: String, headers: Headers, data:
Array[Byte]): Array[Byte] = {
- headers.add("content-type", "application/octet-stream".getBytes)
- serializer.serialize(topic, data)
- }
-
- override def configure(configs: java.util.Map[String, _], isKey: Boolean):
Unit = serializer.configure(configs, isKey)
-
- override def close(): Unit = serializer.close()
-
- override def serialize(topic: String, data: Array[Byte]): Array[Byte] = {
- fail("method should not be invoked")
- null
- }
- }
-
- class DeserializerImpl extends Deserializer[Array[Byte]] {
- var deserializer = new ByteArrayDeserializer()
-
- override def deserialize(topic: String, headers: Headers, data:
Array[Byte]): Array[Byte] = {
- val header = headers.lastHeader("content-type")
- assertEquals("application/octet-stream", if (header == null) null else
new String(header.value()))
- deserializer.deserialize(topic, data)
- }
-
- override def configure(configs: java.util.Map[String, _], isKey: Boolean):
Unit = deserializer.configure(configs, isKey)
-
- override def close(): Unit = deserializer.close()
-
- override def deserialize(topic: String, data: Array[Byte]): Array[Byte] = {
- fail("method should not be invoked")
- null
- }
- }
}
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index c9d33354c5d..fc60cab1d0d 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -12,820 +12,21 @@
*/
package kafka.api
-import kafka.api.BaseConsumerTest.{DeserializerImpl, SerializerImpl}
-
-import java.time.Duration
import java.util
-import java.util.{Locale, Optional, Properties}
-import kafka.server.KafkaBroker
import kafka.utils.{TestInfoUtils, TestUtils}
-import org.apache.kafka.clients.admin.{NewPartitions, NewTopic}
import org.apache.kafka.clients.consumer._
-import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
-import org.apache.kafka.common.config.TopicConfig
-import org.apache.kafka.common.errors.{InterruptException,
InvalidGroupIdException, InvalidTopicException, TimeoutException,
WakeupException}
-import org.apache.kafka.common.record.{CompressionType, TimestampType}
-import org.apache.kafka.common.serialization._
+import org.apache.kafka.common.errors.InterruptException
import org.apache.kafka.common.test.api.Flaky
-import org.apache.kafka.common.{MetricName, TopicPartition}
-import org.apache.kafka.server.quota.QuotaType
-import org.apache.kafka.test.{MockConsumerInterceptor, MockProducerInterceptor}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Timeout
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
-import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit}
+import java.util.concurrent.ExecutionException
@Timeout(600)
class PlaintextConsumerTest extends BaseConsumerTest {
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- def testHeaders(groupProtocol: String): Unit = {
- val numRecords = 1
- val record = new ProducerRecord(tp.topic, tp.partition, null,
"key".getBytes, "value".getBytes)
-
- record.headers().add("headerKey", "headerValue".getBytes)
-
- val producer = createProducer()
- producer.send(record)
-
- val consumer = createConsumer()
- assertEquals(0, consumer.assignment.size)
- consumer.assign(java.util.List.of(tp))
- assertEquals(1, consumer.assignment.size)
-
- consumer.seek(tp, 0)
- val records = consumeRecords(consumer = consumer, numRecords = numRecords)
-
- assertEquals(numRecords, records.size)
-
- for (i <- 0 until numRecords) {
- val record = records(i)
- val header = record.headers().lastHeader("headerKey")
- assertEquals("headerValue", if (header == null) null else new
String(header.value()))
- }
- }
-
- private def testHeadersSerializeDeserialize(serializer:
Serializer[Array[Byte]], deserializer: Deserializer[Array[Byte]]): Unit = {
- val numRecords = 1
- val record = new ProducerRecord(tp.topic, tp.partition, null,
"key".getBytes, "value".getBytes)
-
- val producer = createProducer(
- keySerializer = new ByteArraySerializer,
- valueSerializer = serializer)
- producer.send(record)
-
- val consumer = createConsumer(
- keyDeserializer = new ByteArrayDeserializer,
- valueDeserializer = deserializer)
- assertEquals(0, consumer.assignment.size)
- consumer.assign(java.util.List.of(tp))
- assertEquals(1, consumer.assignment.size)
-
- consumer.seek(tp, 0)
- val records = consumeRecords(consumer = consumer, numRecords = numRecords)
-
- assertEquals(numRecords, records.size)
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- def testHeadersSerializerDeserializer(groupProtocol: String): Unit = {
- val extendedSerializer = new SerializerImpl
-
- val extendedDeserializer = new DeserializerImpl
-
- testHeadersSerializeDeserialize(extendedSerializer, extendedDeserializer)
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- def testAutoOffsetReset(groupProtocol: String): Unit = {
- val producer = createProducer()
- val startingTimestamp = System.currentTimeMillis()
- sendRecords(producer, numRecords = 1, tp, startingTimestamp =
startingTimestamp)
-
- val consumer = createConsumer()
- consumer.assign(java.util.List.of(tp))
- consumeAndVerifyRecords(consumer = consumer, numRecords = 1,
startingOffset = 0, startingTimestamp = startingTimestamp)
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- def testGroupConsumption(groupProtocol: String): Unit = {
- val producer = createProducer()
- val startingTimestamp = System.currentTimeMillis()
- sendRecords(producer, numRecords = 10, tp, startingTimestamp =
startingTimestamp)
-
- val consumer = createConsumer()
- consumer.subscribe(java.util.List.of(topic))
- consumeAndVerifyRecords(consumer = consumer, numRecords = 1,
startingOffset = 0, startingTimestamp = startingTimestamp)
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- def testPartitionsFor(groupProtocol: String): Unit = {
- val numParts = 2
- createTopic("part-test", numParts)
- val consumer = createConsumer()
- val parts = consumer.partitionsFor("part-test")
- assertNotNull(parts)
- assertEquals(2, parts.size)
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- def testPartitionsForAutoCreate(groupProtocol: String): Unit = {
- val consumer = createConsumer()
- // First call would create the topic
- consumer.partitionsFor("non-exist-topic")
- TestUtils.waitUntilTrue(() => {
- !consumer.partitionsFor("non-exist-topic").isEmpty
- }, s"Timed out while awaiting non empty partitions.")
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- def testPartitionsForInvalidTopic(groupProtocol: String): Unit = {
- val consumer = createConsumer()
- assertThrows(classOf[InvalidTopicException], () =>
consumer.partitionsFor(";3# ads,{234"))
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- def testSeek(groupProtocol: String): Unit = {
- val consumer = createConsumer()
- val totalRecords = 50L
- val mid = totalRecords / 2
-
- // Test seek non-compressed message
- val producer = createProducer()
- val startingTimestamp = 0
- sendRecords(producer, totalRecords.toInt, tp, startingTimestamp =
startingTimestamp)
- consumer.assign(java.util.List.of(tp))
-
- consumer.seekToEnd(java.util.List.of(tp))
- assertEquals(totalRecords, consumer.position(tp))
- assertTrue(consumer.poll(Duration.ofMillis(50)).isEmpty)
-
- consumer.seekToBeginning(java.util.List.of(tp))
- assertEquals(0L, consumer.position(tp))
- consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = 0,
startingTimestamp = startingTimestamp)
-
- consumer.seek(tp, mid)
- assertEquals(mid, consumer.position(tp))
-
- consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset =
mid.toInt, startingKeyAndValueIndex = mid.toInt,
- startingTimestamp = mid)
-
- // Test seek compressed message
- sendCompressedMessages(totalRecords.toInt, tp2)
- consumer.assign(java.util.List.of(tp2))
-
- consumer.seekToEnd(java.util.List.of(tp2))
- assertEquals(totalRecords, consumer.position(tp2))
- assertTrue(consumer.poll(Duration.ofMillis(50)).isEmpty)
-
- consumer.seekToBeginning(java.util.List.of(tp2))
- assertEquals(0L, consumer.position(tp2))
- consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = 0, tp =
tp2)
-
- consumer.seek(tp2, mid)
- assertEquals(mid, consumer.position(tp2))
- consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset =
mid.toInt, startingKeyAndValueIndex = mid.toInt,
- startingTimestamp = mid, tp = tp2)
- }
-
- private def sendCompressedMessages(numRecords: Int, tp: TopicPartition):
Unit = {
- val producerProps = new Properties()
- producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG,
CompressionType.GZIP.name)
- producerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG,
Int.MaxValue.toString)
- val producer = createProducer(configOverrides = producerProps)
- (0 until numRecords).foreach { i =>
- producer.send(new ProducerRecord(tp.topic, tp.partition, i.toLong, s"key
$i".getBytes, s"value $i".getBytes))
- }
- producer.close()
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- def testPartitionPauseAndResume(groupProtocol: String): Unit = {
- val partitions = java.util.List.of(tp)
- val producer = createProducer()
- var startingTimestamp = System.currentTimeMillis()
- sendRecords(producer, numRecords = 5, tp, startingTimestamp =
startingTimestamp)
-
- val consumer = createConsumer()
- consumer.assign(partitions)
- consumeAndVerifyRecords(consumer = consumer, numRecords = 5,
startingOffset = 0, startingTimestamp = startingTimestamp)
- consumer.pause(partitions)
- startingTimestamp = System.currentTimeMillis()
- sendRecords(producer, numRecords = 5, tp, startingTimestamp =
startingTimestamp)
- assertTrue(consumer.poll(Duration.ofMillis(100)).isEmpty)
- consumer.resume(partitions)
- consumeAndVerifyRecords(consumer = consumer, numRecords = 5,
startingOffset = 5, startingTimestamp = startingTimestamp)
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- def testInterceptors(groupProtocol: String): Unit = {
- val appendStr = "mock"
- MockConsumerInterceptor.resetCounters()
- MockProducerInterceptor.resetCounters()
-
- // create producer with interceptor
- val producerProps = new Properties()
- producerProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
classOf[MockProducerInterceptor].getName)
- producerProps.put("mock.interceptor.append", appendStr)
- val testProducer = createProducer(keySerializer = new StringSerializer,
- valueSerializer = new StringSerializer,
- configOverrides = producerProps)
-
- // produce records
- val numRecords = 10
- (0 until numRecords).map { i =>
- testProducer.send(new ProducerRecord(tp.topic, tp.partition, s"key $i",
s"value $i"))
- }.foreach(_.get)
- assertEquals(numRecords, MockProducerInterceptor.ONSEND_COUNT.intValue)
- assertEquals(numRecords, MockProducerInterceptor.ON_SUCCESS_COUNT.intValue)
- // send invalid record
- assertThrows(classOf[Throwable], () => testProducer.send(null), () =>
"Should not allow sending a null record")
- assertEquals(1, MockProducerInterceptor.ON_ERROR_COUNT.intValue,
"Interceptor should be notified about exception")
- assertEquals(0,
MockProducerInterceptor.ON_ERROR_WITH_METADATA_COUNT.intValue(), "Interceptor
should not receive metadata with an exception when record is null")
-
- // create consumer with interceptor
- this.consumerConfig.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
"org.apache.kafka.test.MockConsumerInterceptor")
- val testConsumer = createConsumer(keyDeserializer = new
StringDeserializer, valueDeserializer = new StringDeserializer)
- testConsumer.assign(java.util.List.of(tp))
- testConsumer.seek(tp, 0)
-
- // consume and verify that values are modified by interceptors
- val records = consumeRecords(testConsumer, numRecords)
- for (i <- 0 until numRecords) {
- val record = records(i)
- assertEquals(s"key $i", new String(record.key))
- assertEquals(s"value $i$appendStr".toUpperCase(Locale.ROOT), new
String(record.value))
- }
-
- // commit sync and verify onCommit is called
- val commitCountBefore = MockConsumerInterceptor.ON_COMMIT_COUNT.intValue
- testConsumer.commitSync(java.util.Map.of(tp, new OffsetAndMetadata(2L)))
- assertEquals(2,
testConsumer.committed(java.util.Set.of(tp)).get(tp).offset)
- assertEquals(commitCountBefore + 1,
MockConsumerInterceptor.ON_COMMIT_COUNT.intValue)
-
- // commit async and verify onCommit is called
- sendAndAwaitAsyncCommit(testConsumer, Some(Map(tp -> new
OffsetAndMetadata(5L))))
- assertEquals(5,
testConsumer.committed(java.util.Set.of(tp)).get(tp).offset)
- assertEquals(commitCountBefore + 2,
MockConsumerInterceptor.ON_COMMIT_COUNT.intValue)
-
- testConsumer.close()
- testProducer.close()
-
- // cleanup
- MockConsumerInterceptor.resetCounters()
- MockProducerInterceptor.resetCounters()
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- def testInterceptorsWithWrongKeyValue(groupProtocol: String): Unit = {
- val appendStr = "mock"
- // create producer with interceptor that has different key and value types
from the producer
- val producerProps = new Properties()
- producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers())
- producerProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
"org.apache.kafka.test.MockProducerInterceptor")
- producerProps.put("mock.interceptor.append", appendStr)
- val testProducer = createProducer()
-
- // producing records should succeed
- testProducer.send(new ProducerRecord(tp.topic(), tp.partition(),
s"key".getBytes, s"value will not be modified".getBytes))
-
- // create consumer with interceptor that has different key and value types
from the consumer
- this.consumerConfig.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
"org.apache.kafka.test.MockConsumerInterceptor")
- val testConsumer = createConsumer()
-
- testConsumer.assign(java.util.List.of(tp))
- testConsumer.seek(tp, 0)
-
- // consume and verify that values are not modified by interceptors --
their exceptions are caught and logged, but not propagated
- val records = consumeRecords(testConsumer, 1)
- val record = records.head
- assertEquals(s"value will not be modified", new String(record.value()))
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- def testConsumeMessagesWithCreateTime(groupProtocol: String): Unit = {
- val numRecords = 50
- // Test non-compressed messages
- val producer = createProducer()
- val startingTimestamp = System.currentTimeMillis()
- sendRecords(producer, numRecords, tp, startingTimestamp =
startingTimestamp)
- val consumer = createConsumer()
- consumer.assign(java.util.List.of(tp))
- consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords,
startingOffset = 0, startingTimestamp = startingTimestamp)
-
- // Test compressed messages
- sendCompressedMessages(numRecords, tp2)
- consumer.assign(java.util.List.of(tp2))
- consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords, tp =
tp2, startingOffset = 0)
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- def testConsumeMessagesWithLogAppendTime(groupProtocol: String): Unit = {
- val topicName = "testConsumeMessagesWithLogAppendTime"
- val topicProps = new Properties()
- topicProps.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG,
"LogAppendTime")
- createTopic(topicName, 2, 2, topicProps)
-
- val startTime = System.currentTimeMillis()
- val numRecords = 50
-
- // Test non-compressed messages
- val tp1 = new TopicPartition(topicName, 0)
- val producer = createProducer()
- sendRecords(producer, numRecords, tp1)
-
- val consumer = createConsumer()
- consumer.assign(java.util.List.of(tp1))
- consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords, tp =
tp1, startingOffset = 0,
- startingTimestamp = startTime, timestampType =
TimestampType.LOG_APPEND_TIME)
-
- // Test compressed messages
- val tp2 = new TopicPartition(topicName, 1)
- sendCompressedMessages(numRecords, tp2)
- consumer.assign(java.util.List.of(tp2))
- consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords, tp =
tp2, startingOffset = 0,
- startingTimestamp = startTime, timestampType =
TimestampType.LOG_APPEND_TIME)
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- def testListTopics(groupProtocol: String): Unit = {
- val numParts = 2
- val topic1 = "part-test-topic-1"
- val topic2 = "part-test-topic-2"
- val topic3 = "part-test-topic-3"
- createTopic(topic1, numParts)
- createTopic(topic2, numParts)
- createTopic(topic3, numParts)
-
- val consumer = createConsumer()
- val topics = consumer.listTopics()
- assertNotNull(topics)
- assertEquals(5, topics.size())
- assertEquals(5, topics.keySet().size())
- assertEquals(2, topics.get(topic1).size)
- assertEquals(2, topics.get(topic2).size)
- assertEquals(2, topics.get(topic3).size)
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- def testPauseStateNotPreservedByRebalance(groupProtocol: String): Unit = {
- if (groupProtocol.equalsIgnoreCase(GroupProtocol.CLASSIC.name)) {
-
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
"100") // timeout quickly to avoid slow test
-
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,
"30")
- }
- val consumer = createConsumer()
-
- val producer = createProducer()
- val startingTimestamp = System.currentTimeMillis()
- sendRecords(producer, numRecords = 5, tp, startingTimestamp =
startingTimestamp)
- consumer.subscribe(java.util.List.of(topic))
- consumeAndVerifyRecords(consumer = consumer, numRecords = 5,
startingOffset = 0, startingTimestamp = startingTimestamp)
- consumer.pause(java.util.List.of(tp))
-
- // subscribe to a new topic to trigger a rebalance
- consumer.subscribe(java.util.List.of("topic2"))
-
- // after rebalance, our position should be reset and our pause state lost,
- // so we should be able to consume from the beginning
- consumeAndVerifyRecords(consumer = consumer, numRecords = 0,
startingOffset = 5, startingTimestamp = startingTimestamp)
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- def testPerPartitionLeadMetricsCleanUpWithSubscribe(groupProtocol: String):
Unit = {
- val numMessages = 1000
- val topic2 = "topic2"
- createTopic(topic2, 2, brokerCount)
- // send some messages.
- val producer = createProducer()
- sendRecords(producer, numMessages, tp)
- // Test subscribe
- // Create a consumer and consumer some messages.
- consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
"testPerPartitionLeadMetricsCleanUpWithSubscribe")
- consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG,
"testPerPartitionLeadMetricsCleanUpWithSubscribe")
- val consumer = createConsumer()
- val listener = new TestConsumerReassignmentListener
- consumer.subscribe(java.util.List.of(topic, topic2), listener)
- val records = awaitNonEmptyRecords(consumer, tp)
- assertEquals(1, listener.callsToAssigned, "should be assigned once")
- // Verify the metric exist.
- val tags1 = new util.HashMap[String, String]()
- tags1.put("client-id", "testPerPartitionLeadMetricsCleanUpWithSubscribe")
- tags1.put("topic", tp.topic())
- tags1.put("partition", String.valueOf(tp.partition()))
-
- val tags2 = new util.HashMap[String, String]()
- tags2.put("client-id", "testPerPartitionLeadMetricsCleanUpWithSubscribe")
- tags2.put("topic", tp2.topic())
- tags2.put("partition", String.valueOf(tp2.partition()))
- val fetchLead0 = consumer.metrics.get(new MetricName("records-lead",
"consumer-fetch-manager-metrics", "", tags1))
- assertNotNull(fetchLead0)
- assertEquals(records.count.toDouble, fetchLead0.metricValue(), s"The lead
should be ${records.count}")
-
- // Remove topic from subscription
- consumer.subscribe(java.util.List.of(topic2), listener)
- awaitRebalance(consumer, listener)
- // Verify the metric has gone
- assertNull(consumer.metrics.get(new MetricName("records-lead",
"consumer-fetch-manager-metrics", "", tags1)))
- assertNull(consumer.metrics.get(new MetricName("records-lead",
"consumer-fetch-manager-metrics", "", tags2)))
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- def testPerPartitionLagMetricsCleanUpWithSubscribe(groupProtocol: String):
Unit = {
- val numMessages = 1000
- val topic2 = "topic2"
- createTopic(topic2, 2, brokerCount)
- // send some messages.
- val producer = createProducer()
- sendRecords(producer, numMessages, tp)
- // Test subscribe
- // Create a consumer and consumer some messages.
- consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
"testPerPartitionLagMetricsCleanUpWithSubscribe")
- consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG,
"testPerPartitionLagMetricsCleanUpWithSubscribe")
- val consumer = createConsumer()
- val listener = new TestConsumerReassignmentListener
- consumer.subscribe(java.util.List.of(topic, topic2), listener)
- val records = awaitNonEmptyRecords(consumer, tp)
- assertEquals(1, listener.callsToAssigned, "should be assigned once")
- // Verify the metric exist.
- val tags1 = new util.HashMap[String, String]()
- tags1.put("client-id", "testPerPartitionLagMetricsCleanUpWithSubscribe")
- tags1.put("topic", tp.topic())
- tags1.put("partition", String.valueOf(tp.partition()))
-
- val tags2 = new util.HashMap[String, String]()
- tags2.put("client-id", "testPerPartitionLagMetricsCleanUpWithSubscribe")
- tags2.put("topic", tp2.topic())
- tags2.put("partition", String.valueOf(tp2.partition()))
- val fetchLag0 = consumer.metrics.get(new MetricName("records-lag",
"consumer-fetch-manager-metrics", "", tags1))
- assertNotNull(fetchLag0)
- val expectedLag = numMessages - records.count
- assertEquals(expectedLag, fetchLag0.metricValue.asInstanceOf[Double],
epsilon, s"The lag should be $expectedLag")
-
- // Remove topic from subscription
- consumer.subscribe(java.util.List.of(topic2), listener)
- awaitRebalance(consumer, listener)
- // Verify the metric has gone
- assertNull(consumer.metrics.get(new MetricName("records-lag",
"consumer-fetch-manager-metrics", "", tags1)))
- assertNull(consumer.metrics.get(new MetricName("records-lag",
"consumer-fetch-manager-metrics", "", tags2)))
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- def testPerPartitionLeadMetricsCleanUpWithAssign(groupProtocol: String):
Unit = {
- val numMessages = 1000
- // Test assign
- // send some messages.
- val producer = createProducer()
- sendRecords(producer, numMessages, tp)
- sendRecords(producer, numMessages, tp2)
-
- consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
"testPerPartitionLeadMetricsCleanUpWithAssign")
- consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG,
"testPerPartitionLeadMetricsCleanUpWithAssign")
- val consumer = createConsumer()
- consumer.assign(java.util.List.of(tp))
- val records = awaitNonEmptyRecords(consumer, tp)
- // Verify the metric exist.
- val tags = new util.HashMap[String, String]()
- tags.put("client-id", "testPerPartitionLeadMetricsCleanUpWithAssign")
- tags.put("topic", tp.topic())
- tags.put("partition", String.valueOf(tp.partition()))
- val fetchLead = consumer.metrics.get(new MetricName("records-lead",
"consumer-fetch-manager-metrics", "", tags))
- assertNotNull(fetchLead)
-
- assertEquals(records.count.toDouble, fetchLead.metricValue(), s"The lead
should be ${records.count}")
-
- consumer.assign(java.util.List.of(tp2))
- awaitNonEmptyRecords(consumer ,tp2)
- assertNull(consumer.metrics.get(new MetricName("records-lead",
"consumer-fetch-manager-metrics", "", tags)))
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- def testPerPartitionLagMetricsCleanUpWithAssign(groupProtocol: String): Unit
= {
- val numMessages = 1000
- // Test assign
- // send some messages.
- val producer = createProducer()
- sendRecords(producer, numMessages, tp)
- sendRecords(producer, numMessages, tp2)
-
- consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
"testPerPartitionLagMetricsCleanUpWithAssign")
- consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG,
"testPerPartitionLagMetricsCleanUpWithAssign")
- val consumer = createConsumer()
- consumer.assign(java.util.List.of(tp))
- val records = awaitNonEmptyRecords(consumer, tp)
- // Verify the metric exist.
- val tags = new util.HashMap[String, String]()
- tags.put("client-id", "testPerPartitionLagMetricsCleanUpWithAssign")
- tags.put("topic", tp.topic())
- tags.put("partition", String.valueOf(tp.partition()))
- val fetchLag = consumer.metrics.get(new MetricName("records-lag",
"consumer-fetch-manager-metrics", "", tags))
- assertNotNull(fetchLag)
-
- val expectedLag = numMessages - records.count
- assertEquals(expectedLag, fetchLag.metricValue.asInstanceOf[Double],
epsilon, s"The lag should be $expectedLag")
-
- consumer.assign(java.util.List.of(tp2))
- awaitNonEmptyRecords(consumer, tp2)
- assertNull(consumer.metrics.get(new MetricName(tp.toString +
".records-lag", "consumer-fetch-manager-metrics", "", tags)))
- assertNull(consumer.metrics.get(new MetricName("records-lag",
"consumer-fetch-manager-metrics", "", tags)))
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- def testPerPartitionLagMetricsWhenReadCommitted(groupProtocol: String): Unit
= {
- val numMessages = 1000
- // send some messages.
- val producer = createProducer()
- sendRecords(producer, numMessages, tp)
- sendRecords(producer, numMessages, tp2)
-
- consumerConfig.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
"read_committed")
- consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
"testPerPartitionLagMetricsCleanUpWithAssign")
- consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG,
"testPerPartitionLagMetricsCleanUpWithAssign")
- val consumer = createConsumer()
- consumer.assign(java.util.List.of(tp))
- awaitNonEmptyRecords(consumer, tp)
- // Verify the metric exist.
- val tags = new util.HashMap[String, String]()
- tags.put("client-id", "testPerPartitionLagMetricsCleanUpWithAssign")
- tags.put("topic", tp.topic())
- tags.put("partition", String.valueOf(tp.partition()))
- val fetchLag = consumer.metrics.get(new MetricName("records-lag",
"consumer-fetch-manager-metrics", "", tags))
- assertNotNull(fetchLag)
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- def testQuotaMetricsNotCreatedIfNoQuotasConfigured(groupProtocol: String):
Unit = {
- val numRecords = 1000
- val producer = createProducer()
- val startingTimestamp = System.currentTimeMillis()
- sendRecords(producer, numRecords, tp, startingTimestamp =
startingTimestamp)
-
- val consumer = createConsumer()
- consumer.assign(java.util.List.of(tp))
- consumer.seek(tp, 0)
- consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords,
startingOffset = 0, startingTimestamp = startingTimestamp)
-
- def assertNoMetric(broker: KafkaBroker, name: String, quotaType:
QuotaType, clientId: String): Unit = {
- val metricName = broker.metrics.metricName("throttle-time",
- quotaType.toString,
- "",
- "user", "",
- "client-id", clientId)
- assertNull(broker.metrics.metric(metricName), "Metric should not have
been created " + metricName)
- }
- brokers.foreach(assertNoMetric(_, "byte-rate", QuotaType.PRODUCE,
producerClientId))
- brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.PRODUCE,
producerClientId))
- brokers.foreach(assertNoMetric(_, "byte-rate", QuotaType.FETCH,
consumerClientId))
- brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.FETCH,
consumerClientId))
-
- brokers.foreach(assertNoMetric(_, "request-time", QuotaType.REQUEST,
producerClientId))
- brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.REQUEST,
producerClientId))
- brokers.foreach(assertNoMetric(_, "request-time", QuotaType.REQUEST,
consumerClientId))
- brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.REQUEST,
consumerClientId))
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- def testConsumingWithNullGroupId(groupProtocol: String): Unit = {
- val topic = "test_topic"
- val partition = 0
- val tp = new TopicPartition(topic, partition)
- createTopic(topic)
-
- val producer = createProducer()
- producer.send(new ProducerRecord(topic, partition, "k1".getBytes,
"v1".getBytes)).get()
- producer.send(new ProducerRecord(topic, partition, "k2".getBytes,
"v2".getBytes)).get()
- producer.send(new ProducerRecord(topic, partition, "k3".getBytes,
"v3".getBytes)).get()
- producer.close()
-
- // consumer 1 uses the default group id and consumes from earliest offset
- val consumer1Config = new Properties(consumerConfig)
- consumer1Config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
- consumer1Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer1")
- val consumer1 = createConsumer(
- configOverrides = consumer1Config,
- configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG))
-
- // consumer 2 uses the default group id and consumes from latest offset
- val consumer2Config = new Properties(consumerConfig)
- consumer2Config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
- consumer2Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer2")
- val consumer2 = createConsumer(
- configOverrides = consumer2Config,
- configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG))
-
- // consumer 3 uses the default group id and starts from an explicit offset
- val consumer3Config = new Properties(consumerConfig)
- consumer3Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer3")
- val consumer3 = createConsumer(
- configOverrides = consumer3Config,
- configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG))
-
- consumer1.assign(util.List.of(tp))
- consumer2.assign(util.List.of(tp))
- consumer3.assign(util.List.of(tp))
- consumer3.seek(tp, 1)
-
- val numRecords1 = consumer1.poll(Duration.ofMillis(5000)).count()
- assertThrows(classOf[InvalidGroupIdException], () =>
consumer1.commitSync())
- assertThrows(classOf[InvalidGroupIdException], () =>
consumer2.committed(java.util.Set.of(tp)))
-
- val numRecords2 = consumer2.poll(Duration.ofMillis(5000)).count()
- val numRecords3 = consumer3.poll(Duration.ofMillis(5000)).count()
-
- consumer1.unsubscribe()
- consumer2.unsubscribe()
- consumer3.unsubscribe()
-
- assertTrue(consumer1.assignment().isEmpty)
- assertTrue(consumer2.assignment().isEmpty)
- assertTrue(consumer3.assignment().isEmpty)
-
- consumer1.close()
- consumer2.close()
- consumer3.close()
-
- assertEquals(3, numRecords1, "Expected consumer1 to consume from earliest
offset")
- assertEquals(0, numRecords2, "Expected consumer2 to consume from latest
offset")
- assertEquals(2, numRecords3, "Expected consumer3 to consume from offset 1")
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- def testNullGroupIdNotSupportedIfCommitting(groupProtocol: String): Unit = {
- val consumer1Config = new Properties(consumerConfig)
- consumer1Config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
- consumer1Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer1")
- val consumer1 = createConsumer(
- configOverrides = consumer1Config,
- configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG))
-
- consumer1.assign(java.util.List.of(tp))
- assertThrows(classOf[InvalidGroupIdException], () =>
consumer1.commitSync())
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- def testStaticConsumerDetectsNewPartitionCreatedAfterRestart(groupProtocol:
String): Unit = {
- val foo = "foo"
- val foo0 = new TopicPartition(foo, 0)
- val foo1 = new TopicPartition(foo, 1)
-
- val admin = createAdminClient()
- admin.createTopics(java.util.List.of(new NewTopic(foo, 1,
1.toShort))).all.get
-
- val consumerConfig = new Properties
- consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group-id")
- consumerConfig.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG,
"my-instance-id")
-
- val consumer1 = createConsumer(configOverrides = consumerConfig)
- consumer1.subscribe(java.util.List.of(foo))
- awaitAssignment(consumer1, Set(foo0))
- consumer1.close()
-
- val consumer2 = createConsumer(configOverrides = consumerConfig)
- consumer2.subscribe(java.util.List.of(foo))
- awaitAssignment(consumer2, Set(foo0))
-
- admin.createPartitions(java.util.Map.of(foo,
NewPartitions.increaseTo(2))).all.get
-
- awaitAssignment(consumer2, Set(foo0, foo1))
-
- consumer2.close()
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- def testEndOffsets(groupProtocol: String): Unit = {
- val producer = createProducer()
- val startingTimestamp = System.currentTimeMillis()
- val numRecords = 10000
- (0 until numRecords).map { i =>
- val timestamp = startingTimestamp + i.toLong
- val record = new ProducerRecord(tp.topic(), tp.partition(), timestamp,
s"key $i".getBytes, s"value $i".getBytes)
- producer.send(record)
- record
- }
- producer.flush()
-
- val consumer = createConsumer()
- consumer.subscribe(java.util.List.of(topic))
- awaitAssignment(consumer, Set(tp, tp2))
-
- val endOffsets = consumer.endOffsets(java.util.Set.of(tp))
- assertEquals(numRecords, endOffsets.get(tp))
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- def testSeekThrowsIllegalStateIfPartitionsNotAssigned(groupProtocol:
String): Unit = {
- val tp = new TopicPartition(topic, 0)
- val consumer = createConsumer(configOverrides = consumerConfig)
- val e: Exception = assertThrows(classOf[IllegalStateException], () =>
consumer.seekToEnd(util.List.of(tp)))
- assertEquals("No current assignment for partition " + tp, e.getMessage)
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- def testFetchOffsetsForTime(groupProtocol: String): Unit = {
- val numPartitions = 2
- val producer = createProducer()
- val timestampsToSearch = new util.HashMap[TopicPartition, java.lang.Long]()
- var i = 0
- for (part <- 0 until numPartitions) {
- val tp = new TopicPartition(topic, part)
- // key, val, and timestamp equal to the sequence number.
- sendRecords(producer, numRecords = 100, tp, startingTimestamp = 0)
- timestampsToSearch.put(tp, (i * 20).toLong)
- i += 1
- }
-
- val consumer = createConsumer()
- // Test negative target time
- assertThrows(classOf[IllegalArgumentException],
- () => consumer.offsetsForTimes(util.Map.of(new TopicPartition(topic, 0),
-1)))
- val timestampOffsets = consumer.offsetsForTimes(timestampsToSearch)
-
- val timestampTp0 = timestampOffsets.get(new TopicPartition(topic, 0))
- assertEquals(0, timestampTp0.offset)
- assertEquals(0, timestampTp0.timestamp)
- assertEquals(Optional.of(0), timestampTp0.leaderEpoch)
-
- val timestampTp1 = timestampOffsets.get(new TopicPartition(topic, 1))
- assertEquals(20, timestampTp1.offset)
- assertEquals(20, timestampTp1.timestamp)
- assertEquals(Optional.of(0), timestampTp1.leaderEpoch)
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- @Timeout(15)
- def testPositionRespectsTimeout(groupProtocol: String): Unit = {
- val topicPartition = new TopicPartition(topic, 15)
- val consumer = createConsumer()
- consumer.assign(java.util.List.of(topicPartition))
-
- // When position() is called for a topic/partition that doesn't exist, the
consumer will repeatedly update the
- // local metadata. However, it should give up after the user-supplied
timeout has past.
- assertThrows(classOf[TimeoutException], () =>
consumer.position(topicPartition, Duration.ofSeconds(3)))
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- @Timeout(15)
- def testPositionRespectsWakeup(groupProtocol: String): Unit = {
- val topicPartition = new TopicPartition(topic, 15)
- val consumer = createConsumer()
- consumer.assign(java.util.List.of(topicPartition))
-
- CompletableFuture.runAsync { () =>
- TimeUnit.SECONDS.sleep(1)
- consumer.wakeup()
- }
-
- assertThrows(classOf[WakeupException], () =>
consumer.position(topicPartition, Duration.ofSeconds(3)))
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- @Timeout(15)
- def testPositionWithErrorConnectionRespectsWakeup(groupProtocol: String):
Unit = {
- val topicPartition = new TopicPartition(topic, 15)
- val properties = new Properties()
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:12345")
// make sure the connection fails
- val consumer = createConsumer(configOverrides = properties)
- consumer.assign(java.util.List.of(topicPartition))
-
- CompletableFuture.runAsync { () =>
- TimeUnit.SECONDS.sleep(1)
- consumer.wakeup()
- }
-
- assertThrows(classOf[WakeupException], () =>
consumer.position(topicPartition, Duration.ofSeconds(100)))
- }
-
@Flaky("KAFKA-18031")
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
@MethodSource(Array("getTestGroupProtocolParametersAll"))
@@ -871,22 +72,4 @@ class PlaintextConsumerTest extends BaseConsumerTest {
waitTimeMs=leaveGroupTimeoutMs
)
}
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
- @MethodSource(Array("getTestGroupProtocolParametersAll"))
- def testOffsetRelatedWhenTimeoutZero(groupProtocol: String): Unit = {
- val consumer = createConsumer()
- val result1 = consumer.beginningOffsets(util.List.of(tp), Duration.ZERO)
- assertNotNull(result1)
- assertEquals(0, result1.size())
-
- val result2 = consumer.endOffsets(util.List.of(tp), Duration.ZERO)
- assertNotNull(result2)
- assertEquals(0, result2.size())
-
- val result3 = consumer.offsetsForTimes(java.util.Map.of(tp, 0),
Duration.ZERO)
- assertNotNull(result3)
- assertEquals(1, result3.size())
- assertNull(result3.get(tp))
- }
}