This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a399852cedf KAFKA-19042 Move PlaintextConsumerTest to 
client-integration-tests module  (#20081)
a399852cedf is described below

commit a399852cedf60382ef1f311c37abaecc2e5e59cd
Author: Ken Huang <[email protected]>
AuthorDate: Tue Jul 8 01:41:59 2025 +0800

    KAFKA-19042 Move PlaintextConsumerTest to client-integration-tests module  
(#20081)
    
    Use Java to rewrite PlaintextConsumerTest by new test infra and  move it
    to client-integration-tests module.
    
    Reviewers: Jhen-Yung Hsu <[email protected]>, TengYao Chi
    <[email protected]>, Chia-Ping Tsai <[email protected]>
---
 build.gradle                                       |    1 +
 .../org/apache/kafka/clients/ClientsTestUtils.java |   49 +-
 .../clients/consumer/PlaintextConsumerTest.java    | 1640 ++++++++++++++++++++
 .../integration/kafka/api/BaseConsumerTest.scala   |   40 +-
 .../kafka/api/PlaintextConsumerTest.scala          |  821 +---------
 5 files changed, 1681 insertions(+), 870 deletions(-)

diff --git a/build.gradle b/build.gradle
index 1ba506e6314..6f0288037a7 100644
--- a/build.gradle
+++ b/build.gradle
@@ -2001,6 +2001,7 @@ project(':clients:clients-integration-tests') {
     implementation project(':group-coordinator')
     implementation project(':group-coordinator:group-coordinator-api')
     implementation project(':transaction-coordinator')
+    testImplementation project(':test-common:test-common-util')
 
     testImplementation libs.junitJupiter
     testImplementation libs.junitPlatformSuiteEngine
diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java
index 0c99d51da88..dfdadeb5090 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java
@@ -64,19 +64,19 @@ public class ClientsTestUtils {
 
     private ClientsTestUtils() {}
 
-    public static List<ConsumerRecord<byte[], byte[]>> consumeRecords(
-        Consumer<byte[], byte[]> consumer,
+    public static <K, V> List<ConsumerRecord<K, V>> consumeRecords(
+        Consumer<K, V> consumer,
         int numRecords
     ) throws InterruptedException {
         return consumeRecords(consumer, numRecords, Integer.MAX_VALUE);
     }
 
-    public static List<ConsumerRecord<byte[], byte[]>> consumeRecords(
-        Consumer<byte[], byte[]> consumer,
+    public static <K, V> List<ConsumerRecord<K, V>> consumeRecords(
+        Consumer<K, V> consumer,
         int numRecords,
         int maxPollRecords
     ) throws InterruptedException {
-        List<ConsumerRecord<byte[], byte[]>> consumedRecords = new 
ArrayList<>();
+        List<ConsumerRecord<K, V>> consumedRecords = new ArrayList<>();
         TestUtils.waitForCondition(() -> {
             var records = consumer.poll(Duration.ofMillis(100));
             records.forEach(consumedRecords::add);
@@ -138,6 +138,31 @@ public class ClientsTestUtils {
         }, waitTimeMs, msg);
     }
 
+    public static void consumeAndVerifyRecordsWithTimeTypeLogAppend(
+        Consumer<byte[], byte[]> consumer,
+        TopicPartition tp,
+        int numRecords,
+        long startingTimestamp
+    ) throws InterruptedException {
+        var records = consumeRecords(consumer, numRecords, Integer.MAX_VALUE);
+        var now = System.currentTimeMillis();
+        for (var i = 0; i < numRecords; i++) {
+            var record = records.get(i);
+            assertEquals(tp.topic(), record.topic());
+            assertEquals(tp.partition(), record.partition());
+
+            assertTrue(record.timestamp() >= startingTimestamp && 
record.timestamp() <= now,
+                "Got unexpected timestamp " + record.timestamp() + ". 
Timestamp should be between [" + startingTimestamp + ", " + now + "]");
+
+            assertEquals(i, record.offset());
+            assertEquals(KEY_PREFIX + i, new String(record.key()));
+            assertEquals(VALUE_PREFIX + i, new String(record.value()));
+            // this is true only because K and V are byte arrays
+            assertEquals((KEY_PREFIX + i).length(), 
record.serializedKeySize());
+            assertEquals((VALUE_PREFIX + i).length(), 
record.serializedValueSize());
+        }
+    }
+
     public static void consumeAndVerifyRecords(
         Consumer<byte[], byte[]> consumer,
         TopicPartition tp,
@@ -281,8 +306,8 @@ public class ClientsTestUtils {
         return record;
     }
 
-    public static void sendAndAwaitAsyncCommit(
-        Consumer<byte[], byte[]> consumer,
+    public static <K, V> void sendAndAwaitAsyncCommit(
+        Consumer<K, V> consumer,
         Optional<Map<TopicPartition, OffsetAndMetadata>> offsetsOpt
     ) throws InterruptedException {
 
@@ -423,8 +448,8 @@ public class ClientsTestUtils {
         }
     }
 
-    public static void sendAsyncCommit(
-        Consumer<byte[], byte[]> consumer,
+    public static <K, V> void sendAsyncCommit(
+        Consumer<K, V> consumer,
         OffsetCommitCallback callback,
         Optional<Map<TopicPartition, OffsetAndMetadata>> offsetsOpt
     ) {
@@ -472,14 +497,14 @@ public class ClientsTestUtils {
         }
     }
 
-    private static class RetryCommitCallback implements OffsetCommitCallback {
+    private static class RetryCommitCallback<K, V> implements 
OffsetCommitCallback {
         boolean isComplete = false;
         Optional<Exception> error = Optional.empty();
-        Consumer<byte[], byte[]> consumer;
+        Consumer<K, V> consumer;
         Optional<Map<TopicPartition, OffsetAndMetadata>> offsetsOpt;
 
         public RetryCommitCallback(
-            Consumer<byte[], byte[]> consumer,
+            Consumer<K, V> consumer,
             Optional<Map<TopicPartition, OffsetAndMetadata>> offsetsOpt
         ) {
             this.consumer = consumer;
diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
new file mode 100644
index 00000000000..5fd2ad20089
--- /dev/null
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
@@ -0,0 +1,1640 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import kafka.server.KafkaBroker;
+
+import 
org.apache.kafka.clients.ClientsTestUtils.TestConsumerReassignmentListener;
+import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Flaky;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.server.quota.QuotaType;
+import org.apache.kafka.test.MockConsumerInterceptor;
+import org.apache.kafka.test.MockProducerInterceptor;
+import org.apache.kafka.test.TestUtils;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.clients.ClientsTestUtils.BaseConsumerTestcase.BROKER_COUNT;
+import static 
org.apache.kafka.clients.ClientsTestUtils.BaseConsumerTestcase.TOPIC;
+import static 
org.apache.kafka.clients.ClientsTestUtils.BaseConsumerTestcase.TP;
+import static 
org.apache.kafka.clients.ClientsTestUtils.BaseConsumerTestcase.testClusterResourceListener;
+import static 
org.apache.kafka.clients.ClientsTestUtils.BaseConsumerTestcase.testCoordinatorFailover;
+import static 
org.apache.kafka.clients.ClientsTestUtils.BaseConsumerTestcase.testSimpleConsumption;
+import static org.apache.kafka.clients.ClientsTestUtils.awaitAssignment;
+import static org.apache.kafka.clients.ClientsTestUtils.awaitRebalance;
+import static 
org.apache.kafka.clients.ClientsTestUtils.consumeAndVerifyRecords;
+import static 
org.apache.kafka.clients.ClientsTestUtils.consumeAndVerifyRecordsWithTimeTypeLogAppend;
+import static org.apache.kafka.clients.ClientsTestUtils.consumeRecords;
+import static 
org.apache.kafka.clients.ClientsTestUtils.sendAndAwaitAsyncCommit;
+import static org.apache.kafka.clients.ClientsTestUtils.sendRecords;
+import static 
org.apache.kafka.clients.CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG;
+import static 
org.apache.kafka.clients.CommonClientConfigs.METADATA_MAX_AGE_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.CLIENT_ID_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_INSTANCE_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ClusterTestDefaults(
+    types = {Type.KRAFT},
+    brokers = BROKER_COUNT,
+    serverProperties = {
+        @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = 
"1"),
+        @ClusterConfigProperty(key = GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 
value = "100"),
+        @ClusterConfigProperty(key = GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, 
value = "60000"),
+        @ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 
value = "10"),
+    }
+)
+public class PlaintextConsumerTest {
+
+    private final ClusterInstance cluster;
+    public static final double EPSILON = 0.1;
+
+    public PlaintextConsumerTest(ClusterInstance cluster) {
+        this.cluster = cluster;
+    }
+
+    @ClusterTest
+    public void testClassicConsumerSimpleConsumption() throws 
InterruptedException {
+        testSimpleConsumption(cluster, Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerSimpleConsumption() throws 
InterruptedException {
+        testSimpleConsumption(cluster, Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    @ClusterTest
+    public void testClassicConsumerClusterResourceListener() throws 
InterruptedException {
+        testClusterResourceListener(cluster, Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerClusterResourceListener() throws 
InterruptedException {
+        testClusterResourceListener(cluster, Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    @ClusterTest
+    public void testClassicConsumerCoordinatorFailover() throws 
InterruptedException {
+        Map<String, Object> config = Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT),
+            SESSION_TIMEOUT_MS_CONFIG, 5001,
+            HEARTBEAT_INTERVAL_MS_CONFIG, 1000,
+            // Use higher poll timeout to avoid consumer leaving the group due 
to timeout
+            MAX_POLL_INTERVAL_MS_CONFIG, 15000
+        );
+        testCoordinatorFailover(cluster, config);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumeCoordinatorFailover() throws 
InterruptedException {
+        Map<String, Object> config = Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT),
+            // Use higher poll timeout to avoid consumer leaving the group due 
to timeout
+            MAX_POLL_INTERVAL_MS_CONFIG, 15000
+        );
+        testCoordinatorFailover(cluster, config);
+    }
+
+    @ClusterTest
+    public void testClassicConsumerHeaders() throws Exception {
+        testHeaders(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerHeaders() throws Exception {
+        testHeaders(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    private void testHeaders(Map<String, Object> consumerConfig) throws 
Exception {
+        var numRecords = 1;
+
+        try (Producer<byte[], byte[]> producer = cluster.producer();
+             Consumer<byte[], byte[]> consumer = 
cluster.consumer(consumerConfig)
+        ) {
+            var record = new ProducerRecord<>(TP.topic(), TP.partition(), 
null, "key".getBytes(), "value".getBytes());
+            record.headers().add("headerKey", "headerValue".getBytes());
+            producer.send(record);
+
+            assertEquals(0, consumer.assignment().size());
+            consumer.assign(List.of(TP));
+            assertEquals(1, consumer.assignment().size());
+
+            consumer.seek(TP, 0);
+            var records = consumeRecords(consumer, numRecords);
+            assertEquals(numRecords, records.size());
+            var header = records.get(0).headers().lastHeader("headerKey");
+            assertEquals("headerValue", header == null ? null : new 
String(header.value()));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerHeadersSerializerDeserializer() throws 
Exception {
+        testHeadersSerializeDeserialize(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerHeadersSerializerDeserializer() throws 
Exception {
+        testHeadersSerializeDeserialize(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    private void testHeadersSerializeDeserialize(Map<String, Object> config) 
throws InterruptedException {
+        var numRecords = 1;
+        Map<String, Object> consumerConfig = new HashMap<>(config);
+        consumerConfig.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
DeserializerImpl.class);
+        Map<String, Object> producerConfig = Map.of(
+            VALUE_SERIALIZER_CLASS_CONFIG, SerializerImpl.class.getName()
+        );
+
+        try (Producer<byte[], byte[]> producer = 
cluster.producer(producerConfig);
+             Consumer<byte[], byte[]> consumer = 
cluster.consumer(consumerConfig)
+        ) {
+            producer.send(new ProducerRecord<>(
+                TP.topic(), 
+                TP.partition(), 
+                null, 
+                "key".getBytes(), 
+                "value".getBytes())
+            );
+            
+            assertEquals(0, consumer.assignment().size());
+            consumer.assign(List.of(TP));
+            assertEquals(1, consumer.assignment().size());
+
+            consumer.seek(TP, 0);
+            assertEquals(numRecords, consumeRecords(consumer, 
numRecords).size());
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerAutoOffsetReset() throws Exception {
+        testAutoOffsetReset(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerAutoOffsetReset() throws Exception {
+        testAutoOffsetReset(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    private void testAutoOffsetReset(Map<String, Object> consumerConfig) 
throws Exception {
+        try (Producer<byte[], byte[]> producer = cluster.producer();
+             Consumer<byte[], byte[]> consumer = 
cluster.consumer(consumerConfig)
+        ) {
+            var startingTimestamp = System.currentTimeMillis();
+            sendRecords(producer, TP, 1, startingTimestamp);
+            consumer.assign(List.of(TP));
+            consumeAndVerifyRecords(consumer, TP, 1, 0, 0, startingTimestamp);
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerGroupConsumption() throws Exception {
+        testGroupConsumption(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerGroupConsumption() throws Exception {
+        testGroupConsumption(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    private void testGroupConsumption(Map<String, Object> consumerConfig) 
throws Exception {
+        try (Producer<byte[], byte[]> producer = cluster.producer();
+             Consumer<byte[], byte[]> consumer = 
cluster.consumer(consumerConfig)
+        ) {
+            var startingTimestamp = System.currentTimeMillis();
+            sendRecords(producer, TP, 10, startingTimestamp);
+            consumer.subscribe(List.of(TOPIC));
+            consumeAndVerifyRecords(consumer, TP, 1, 0, 0, startingTimestamp);
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerPartitionsFor() throws Exception {
+        testPartitionsFor(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerPartitionsFor() throws Exception {
+        testPartitionsFor(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    private void testPartitionsFor(Map<String, Object> consumerConfig) throws 
Exception {
+        var numParts = 2;
+        cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT);
+        cluster.createTopic("part-test", numParts, (short) 1);
+
+        try (var consumer = cluster.consumer(consumerConfig)) {
+            var partitions = consumer.partitionsFor(TOPIC);
+            assertNotNull(partitions);
+            assertEquals(2, partitions.size());
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerPartitionsForAutoCreate() throws Exception {
+        testPartitionsForAutoCreate(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerPartitionsForAutoCreate() throws Exception {
+        testPartitionsForAutoCreate(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    private void testPartitionsForAutoCreate(Map<String, Object> 
consumerConfig) throws Exception {
+        try (var consumer = cluster.consumer(consumerConfig)) {
+            // First call would create the topic
+            consumer.partitionsFor("non-exist-topic");
+            TestUtils.waitForCondition(
+                () -> !consumer.partitionsFor("non-exist-topic").isEmpty(), 
+                "Timed out while awaiting non empty partitions."
+            );
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerPartitionsForInvalidTopic() {
+        testPartitionsForInvalidTopic(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerPartitionsForInvalidTopic() {
+        testPartitionsForInvalidTopic(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    private void testPartitionsForInvalidTopic(Map<String, Object> 
consumerConfig) {
+        try (var consumer = cluster.consumer(consumerConfig)) {
+            assertThrows(InvalidTopicException.class, () -> 
consumer.partitionsFor(";3# ads,{234"));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerSeek() throws Exception {
+        testSeek(
+            Map.of(GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerSeek() throws Exception {
+        testSeek(
+            Map.of(GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    private void testSeek(Map<String, Object> consumerConfig) throws Exception 
{
+        var totalRecords = 50;
+        var mid = totalRecords / 2;
+        cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT);
+
+        try (Producer<byte[], byte[]> producer = cluster.producer();
+             Consumer<byte[], byte[]> consumer = 
cluster.consumer(consumerConfig)
+        ) {
+            var startingTimestamp = 0;
+            sendRecords(producer, TP, totalRecords, startingTimestamp);
+
+            consumer.assign(List.of(TP));
+            consumer.seekToEnd(List.of(TP));
+            assertEquals(totalRecords, consumer.position(TP));
+            assertTrue(consumer.poll(Duration.ofMillis(50)).isEmpty());
+
+            consumer.seekToBeginning(List.of(TP));
+            assertEquals(0, consumer.position(TP));
+            consumeAndVerifyRecords(consumer, TP, 1, 0, 0, startingTimestamp);
+
+            consumer.seek(TP, mid);
+            assertEquals(mid, consumer.position(TP));
+
+            consumeAndVerifyRecords(consumer, TP, 1, mid, mid, mid);
+
+            // Test seek compressed message
+            var tp2 = new TopicPartition(TOPIC, 1);
+            cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT);
+            sendCompressedMessages(totalRecords, tp2);
+            consumer.assign(List.of(tp2));
+
+            consumer.seekToEnd(List.of(tp2));
+            assertEquals(totalRecords, consumer.position(tp2));
+            assertTrue(consumer.poll(Duration.ofMillis(50)).isEmpty());
+
+            consumer.seekToBeginning(List.of(tp2));
+            assertEquals(0L, consumer.position(tp2));
+            consumeAndVerifyRecords(consumer, tp2, 1, 0);
+
+            consumer.seek(tp2, mid);
+            assertEquals(mid, consumer.position(tp2));
+            consumeAndVerifyRecords(consumer, tp2, 1, mid, mid, mid);
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerPartitionPauseAndResume() throws Exception {
+        testPartitionPauseAndResume(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerPartitionPauseAndResume() throws Exception {
+        testPartitionPauseAndResume(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    private void testPartitionPauseAndResume(Map<String, Object> 
consumerConfig) throws Exception {
+        var partitions = List.of(TP);
+        var numRecords = 5;
+
+        try (Producer<byte[], byte[]> producer = cluster.producer();
+             Consumer<byte[], byte[]> consumer = 
cluster.consumer(consumerConfig)
+        ) {
+            var startingTimestamp = System.currentTimeMillis();
+            sendRecords(producer, TP, numRecords, startingTimestamp);
+
+            consumer.assign(partitions);
+            consumeAndVerifyRecords(consumer, TP, numRecords, 0, 0, 
startingTimestamp);
+            consumer.pause(partitions);
+            startingTimestamp = System.currentTimeMillis();
+            sendRecords(producer, TP, numRecords, startingTimestamp);
+            assertTrue(consumer.poll(Duration.ofMillis(100)).isEmpty());
+            consumer.resume(partitions);
+            consumeAndVerifyRecords(consumer, TP, numRecords, 5, 0, 
startingTimestamp);
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerInterceptors() throws Exception {
+        testInterceptors(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerInterceptors() throws Exception {
+        testInterceptors(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    private void testInterceptors(Map<String, Object> consumerConfig) throws 
Exception {
+        var appendStr = "mock";
+        MockConsumerInterceptor.resetCounters();
+        MockProducerInterceptor.resetCounters();
+
+        // create producer with interceptor
+        Map<String, Object> producerConfig = Map.of(
+            ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, 
MockProducerInterceptor.class.getName(),
+            KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(),
+            VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(),
+            "mock.interceptor.append", appendStr
+        );
+        // create consumer with interceptor
+        Map<String, Object> consumerConfigOverride = new 
HashMap<>(consumerConfig);
+        consumerConfigOverride.put(INTERCEPTOR_CLASSES_CONFIG, 
MockConsumerInterceptor.class.getName());
+        consumerConfigOverride.put(KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
+        consumerConfigOverride.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
+        
+        try (Producer<String, String> producer = 
cluster.producer(producerConfig);
+             Consumer<String, String> consumer = 
cluster.consumer(consumerConfigOverride)
+        ) {
+            // produce records
+            var numRecords = 10;
+            List<Future<RecordMetadata>> futures = new ArrayList<>();
+            for (var i = 0; i < numRecords; i++) {
+                Future<RecordMetadata> future = producer.send(
+                    new ProducerRecord<>(TP.topic(), TP.partition(), "key " + 
i, "value " + i)
+                );
+                futures.add(future);
+            }
+
+            // Wait for all sends to complete
+            futures.forEach(future -> assertDoesNotThrow(() -> future.get()));
+
+            assertEquals(numRecords, 
MockProducerInterceptor.ONSEND_COUNT.intValue());
+            assertEquals(numRecords, 
MockProducerInterceptor.ON_SUCCESS_COUNT.intValue());
+
+            // send invalid record
+            assertThrows(
+                Throwable.class,
+                () -> producer.send(null), 
+                "Should not allow sending a null record"
+            );
+            assertEquals(
+                1, 
+                MockProducerInterceptor.ON_ERROR_COUNT.intValue(), 
+                "Interceptor should be notified about exception"
+            );
+            assertEquals(
+                0, 
+                
MockProducerInterceptor.ON_ERROR_WITH_METADATA_COUNT.intValue(),
+                "Interceptor should not receive metadata with an exception 
when record is null"
+            );
+            
+            consumer.assign(List.of(TP));
+            consumer.seek(TP, 0);
+
+            // consume and verify that values are modified by interceptors
+            var records = consumeRecords(consumer, numRecords);
+            for (var i = 0; i < numRecords; i++) {
+                ConsumerRecord<String, String> record = records.get(i);
+                assertEquals("key " + i, record.key());
+                assertEquals(("value " + i + 
appendStr).toUpperCase(Locale.ROOT), record.value());
+            }
+
+            // commit sync and verify onCommit is called
+            var commitCountBefore = 
MockConsumerInterceptor.ON_COMMIT_COUNT.intValue();
+            consumer.commitSync(Map.of(TP, new OffsetAndMetadata(2L)));
+            assertEquals(2, consumer.committed(Set.of(TP)).get(TP).offset());
+            assertEquals(commitCountBefore + 1, 
MockConsumerInterceptor.ON_COMMIT_COUNT.intValue());
+
+            // commit async and verify onCommit is called
+            var offsetsToCommit = Map.of(TP, new OffsetAndMetadata(5L));
+            sendAndAwaitAsyncCommit(consumer, Optional.of(offsetsToCommit));
+            assertEquals(5, consumer.committed(Set.of(TP)).get(TP).offset());
+            assertEquals(commitCountBefore + 2, 
MockConsumerInterceptor.ON_COMMIT_COUNT.intValue());
+        }
+        // cleanup
+        MockConsumerInterceptor.resetCounters();
+        MockProducerInterceptor.resetCounters();
+    }
+
+    @ClusterTest
+    public void testClassicConsumerInterceptorsWithWrongKeyValue() throws 
Exception {
+        testInterceptorsWithWrongKeyValue(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerInterceptorsWithWrongKeyValue() throws 
Exception {
+        testInterceptorsWithWrongKeyValue(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    private void testInterceptorsWithWrongKeyValue(Map<String, Object> 
consumerConfig) throws Exception {
+        var appendStr = "mock";
+        // create producer with interceptor that has different key and value 
types from the producer
+        Map<String, Object> producerConfig = Map.of(
+            ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, 
MockProducerInterceptor.class.getName(),
+            "mock.interceptor.append", appendStr
+        );
+        // create consumer with interceptor that has different key and value 
types from the consumer
+        Map<String, Object> consumerConfigOverride = new 
HashMap<>(consumerConfig);
+        consumerConfigOverride.put(INTERCEPTOR_CLASSES_CONFIG, 
MockConsumerInterceptor.class.getName());
+
+        try (Producer<byte[], byte[]> producer = 
cluster.producer(producerConfig);
+             Consumer<byte[], byte[]> consumer = 
cluster.consumer(consumerConfigOverride)
+        ) {
+            // producing records should succeed
+            producer.send(new ProducerRecord<>(
+                TP.topic(),
+                TP.partition(),
+                "key".getBytes(),
+                "value will not be modified".getBytes()
+            ));
+
+            consumer.assign(List.of(TP));
+            consumer.seek(TP, 0);
+            // consume and verify that values are not modified by interceptors 
-- their exceptions are caught and logged, but not propagated
+            var records = consumeRecords(consumer, 1);
+            var record = records.get(0);
+            assertEquals("value will not be modified", new 
String(record.value()));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerConsumeMessagesWithCreateTime() throws 
Exception {
+        testConsumeMessagesWithCreateTime(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerConsumeMessagesWithCreateTime() throws 
Exception {
+        testConsumeMessagesWithCreateTime(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    private void testConsumeMessagesWithCreateTime(Map<String, Object> 
consumerConfig) throws Exception {
+        cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT);
+        var numRecords = 50;
+        var tp2 = new TopicPartition(TOPIC, 1);
+
+        try (Producer<byte[], byte[]> producer = cluster.producer();
+             Consumer<byte[], byte[]> consumer = 
cluster.consumer(consumerConfig)
+        ) {
+            // Test non-compressed messages
+            var startingTimestamp = System.currentTimeMillis();
+            sendRecords(producer, TP, numRecords, startingTimestamp);
+            consumer.assign(List.of(TP));
+            consumeAndVerifyRecords(consumer, TP, numRecords, 0, 0, 
startingTimestamp);
+
+            // Test compressed messages
+            sendCompressedMessages(numRecords, tp2);
+            consumer.assign(List.of(tp2));
+            consumeAndVerifyRecords(consumer, tp2, numRecords, 0, 0, 0);
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerConsumeMessagesWithLogAppendTime() throws 
Exception {
+        testConsumeMessagesWithLogAppendTime(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerConsumeMessagesWithLogAppendTime() throws 
Exception {
+        testConsumeMessagesWithLogAppendTime(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    private void testConsumeMessagesWithLogAppendTime(Map<String, Object> 
consumerConfig) throws Exception {
+        var topicName = "testConsumeMessagesWithLogAppendTime";
+        var startTime = System.currentTimeMillis();
+        var numRecords = 50;
+        cluster.createTopic(topicName, 2, (short) 2, 
Map.of(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime"));
+
+        try (Consumer<byte[], byte[]> consumer = 
cluster.consumer(consumerConfig)) {
+            // Test non-compressed messages
+            var tp1 = new TopicPartition(topicName, 0);
+            sendRecords(cluster, tp1, numRecords);
+            consumer.assign(List.of(tp1));
+            consumeAndVerifyRecordsWithTimeTypeLogAppend(consumer, tp1, 
numRecords, startTime);
+
+            // Test compressed messages
+            var tp2 = new TopicPartition(topicName, 1);
+            sendCompressedMessages(numRecords, tp2);
+            consumer.assign(List.of(tp2));
+            consumeAndVerifyRecordsWithTimeTypeLogAppend(consumer, tp2, 
numRecords, startTime);
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerListTopics() throws Exception {
+        testListTopics(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerListTopics() throws Exception {
+        testListTopics(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    private void testListTopics(Map<String, Object> consumerConfig) throws 
Exception {
+        var numParts = 2;
+        var topic1 = "part-test-topic-1";
+        var topic2 = "part-test-topic-2";
+        var topic3 = "part-test-topic-3";
+        cluster.createTopic(topic1, numParts, (short) 1);
+        cluster.createTopic(topic2, numParts, (short) 1);
+        cluster.createTopic(topic3, numParts, (short) 1);
+
+        sendRecords(cluster, new TopicPartition(topic1, 0), 1);
+
+        try (var consumer = cluster.consumer(consumerConfig)) {
+            // consumer some messages, and we can list the internal topic 
__consumer_offsets
+            consumer.subscribe(List.of(topic1));
+            consumer.poll(Duration.ofMillis(100));
+            var topics = consumer.listTopics();
+            assertNotNull(topics);
+            assertEquals(4, topics.size());
+            assertEquals(2, topics.get(topic1).size());
+            assertEquals(2, topics.get(topic2).size());
+            assertEquals(2, topics.get(topic3).size());
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerPauseStateNotPreservedByRebalance() throws 
Exception {
+        testPauseStateNotPreservedByRebalance(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT),
+            SESSION_TIMEOUT_MS_CONFIG, 100,
+            HEARTBEAT_INTERVAL_MS_CONFIG, 30
+        ));
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerPauseStateNotPreservedByRebalance() throws 
Exception {
+        testPauseStateNotPreservedByRebalance(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    private void testPauseStateNotPreservedByRebalance(Map<String, Object> 
consumerConfig) throws Exception {
+        try (Producer<byte[], byte[]> producer = cluster.producer();
+             Consumer<byte[], byte[]> consumer = 
cluster.consumer(consumerConfig)
+        ) {
+            var startingTimestamp = System.currentTimeMillis();
+            sendRecords(producer, TP, 5, startingTimestamp);
+            consumer.subscribe(List.of(TOPIC));
+            consumeAndVerifyRecords(consumer, TP, 5, 0, 0, startingTimestamp);
+            consumer.pause(List.of(TP));
+
+            // subscribe to a new topic to trigger a rebalance
+            consumer.subscribe(List.of("topic2"));
+
+            // after rebalance, our position should be reset and our pause 
state lost,
+            // so we should be able to consume from the beginning
+            consumeAndVerifyRecords(consumer, TP, 0, 5, 0, startingTimestamp);
+        }
+    }
+
+    @ClusterTest
+    public void 
testClassicConsumerPerPartitionLeadMetricsCleanUpWithSubscribe() throws 
Exception {
+        String consumerClientId = 
"testClassicConsumerPerPartitionLeadMetricsCleanUpWithSubscribe";
+        testPerPartitionLeadMetricsCleanUpWithSubscribe(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT),
+            GROUP_ID_CONFIG, consumerClientId,
+            CLIENT_ID_CONFIG, consumerClientId
+        ), consumerClientId);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerPerPartitionLeadMetricsCleanUpWithSubscribe() 
throws Exception {
+        String consumerClientId = 
"testAsyncConsumerPerPartitionLeadMetricsCleanUpWithSubscribe";
+        testPerPartitionLeadMetricsCleanUpWithSubscribe(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT),
+            GROUP_ID_CONFIG, consumerClientId,
+            CLIENT_ID_CONFIG, consumerClientId
+        ), consumerClientId);
+    }
+
+    private void testPerPartitionLeadMetricsCleanUpWithSubscribe(
+        Map<String, Object> consumerConfig,
+        String consumerClientId
+    ) throws Exception {
+        var numMessages = 1000;
+        var topic2 = "topic2";
+        var tp2 = new TopicPartition(TOPIC, 1);
+        cluster.createTopic(topic2, 2, (short) BROKER_COUNT);
+        
+        try (Consumer<byte[], byte[]> consumer = 
cluster.consumer(consumerConfig)) {
+            // send some messages.
+            sendRecords(cluster, TP, numMessages);
+
+            // Test subscribe
+            // Create a consumer and consumer some messages.
+            var listener = new TestConsumerReassignmentListener();
+            consumer.subscribe(List.of(TOPIC, topic2), listener);
+            var records = awaitNonEmptyRecords(consumer, TP);
+            assertEquals(1, listener.callsToAssigned, "should be assigned 
once");
+
+            // Verify the metric exist.
+            Map<String, String> tags1 = Map.of(
+                "client-id", consumerClientId,
+                "topic", TP.topic(),
+                "partition", String.valueOf(TP.partition())
+            );
+
+            Map<String, String> tags2 = Map.of(
+                "client-id", consumerClientId,
+                "topic", tp2.topic(),
+                "partition", String.valueOf(tp2.partition())
+            );
+
+            var fetchLead0 = consumer.metrics().get(new 
MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags1));
+            assertNotNull(fetchLead0);
+            assertEquals((double) records.count(), fetchLead0.metricValue(), 
"The lead should be " + records.count());
+
+            // Remove topic from subscription
+            consumer.subscribe(List.of(topic2), listener);
+            awaitRebalance(consumer, listener);
+
+            // Verify the metric has gone
+            assertNull(consumer.metrics().get(new MetricName("records-lead", 
"consumer-fetch-manager-metrics", "", tags1)));
+            assertNull(consumer.metrics().get(new MetricName("records-lead", 
"consumer-fetch-manager-metrics", "", tags2)));
+        }
+    }
+
+    @ClusterTest
+    public void 
testClassicConsumerPerPartitionLagMetricsCleanUpWithSubscribe() throws 
Exception {
+        String consumerClientId = 
"testClassicConsumerPerPartitionLagMetricsCleanUpWithSubscribe";
+        testPerPartitionLagMetricsCleanUpWithSubscribe(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT),
+            GROUP_ID_CONFIG, consumerClientId,
+            CLIENT_ID_CONFIG, consumerClientId
+        ), consumerClientId);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerPerPartitionLagMetricsCleanUpWithSubscribe() 
throws Exception {
+        String consumerClientId = 
"testAsyncConsumerPerPartitionLagMetricsCleanUpWithSubscribe";
+        testPerPartitionLagMetricsCleanUpWithSubscribe(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT),
+            GROUP_ID_CONFIG, consumerClientId,
+            CLIENT_ID_CONFIG, consumerClientId
+        ), consumerClientId);
+    }
+
+    private void testPerPartitionLagMetricsCleanUpWithSubscribe(
+        Map<String, Object> consumerConfig,
+        String consumerClientId
+    ) throws Exception {
+        int numMessages = 1000;
+        var topic2 = "topic2";
+        var tp2 = new TopicPartition(TOPIC, 1);
+        cluster.createTopic(topic2, 2, (short) BROKER_COUNT);
+
+        try (Consumer<byte[], byte[]> consumer = 
cluster.consumer(consumerConfig)) {
+            // send some messages.
+            sendRecords(cluster, TP, numMessages);
+            
+            // Test subscribe
+            // Create a consumer and consumer some messages.
+            var listener = new TestConsumerReassignmentListener();
+            consumer.subscribe(List.of(TOPIC, topic2), listener);
+            var records = awaitNonEmptyRecords(consumer, TP);
+            assertEquals(1, listener.callsToAssigned, "should be assigned 
once");
+
+            // Verify the metric exist.
+            Map<String, String> tags1 = Map.of(
+                "client-id", consumerClientId,
+                "topic", TP.topic(),
+                "partition", String.valueOf(TP.partition())
+            );
+
+            Map<String, String> tags2 = Map.of(
+                "client-id", consumerClientId,
+                "topic", tp2.topic(),
+                "partition", String.valueOf(tp2.partition())
+            );
+
+            var fetchLag0 = consumer.metrics().get(new 
MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags1));
+            assertNotNull(fetchLag0);
+            var expectedLag = numMessages - records.count();
+            assertEquals(expectedLag, (double) fetchLag0.metricValue(), 
EPSILON, "The lag should be " + expectedLag);
+
+            // Remove topic from subscription
+            consumer.subscribe(List.of(topic2), listener);
+            awaitRebalance(consumer, listener);
+
+            // Verify the metric has gone
+            assertNull(consumer.metrics().get(new MetricName("records-lag", 
"consumer-fetch-manager-metrics", "", tags1)));
+            assertNull(consumer.metrics().get(new MetricName("records-lag", 
"consumer-fetch-manager-metrics", "", tags2)));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerPerPartitionLeadMetricsCleanUpWithAssign() 
throws Exception {
+        String consumerClientId = 
"testClassicConsumerPerPartitionLeadMetricsCleanUpWithAssign";
+        testPerPartitionLeadMetricsCleanUpWithAssign(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT),
+            GROUP_ID_CONFIG, consumerClientId,
+            CLIENT_ID_CONFIG, consumerClientId
+        ), consumerClientId);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerPerPartitionLeadMetricsCleanUpWithAssign() 
throws Exception {
+        String consumerClientId = 
"testAsyncConsumerPerPartitionLeadMetricsCleanUpWithAssign";
+        testPerPartitionLeadMetricsCleanUpWithAssign(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT),
+            GROUP_ID_CONFIG, consumerClientId,
+            CLIENT_ID_CONFIG, consumerClientId
+        ), consumerClientId);
+    }
+
+    private void testPerPartitionLeadMetricsCleanUpWithAssign(
+        Map<String, Object> consumerConfig,
+        String consumerClientId
+    ) throws Exception {
+        var numMessages = 1000;
+        var tp2 = new TopicPartition(TOPIC, 1);
+        cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT);
+        
+        try (Producer<byte[], byte[]> producer = cluster.producer();
+             Consumer<byte[], byte[]> consumer = 
cluster.consumer(consumerConfig)
+        ) {
+            // Test assign send some messages.
+            sendRecords(producer, TP, numMessages, System.currentTimeMillis());
+            sendRecords(producer, tp2, numMessages, 
System.currentTimeMillis());
+            
+            consumer.assign(List.of(TP));
+            var records = awaitNonEmptyRecords(consumer, TP);
+
+            // Verify the metric exist.
+            Map<String, String> tags = Map.of(
+                "client-id", consumerClientId,
+                "topic", TP.topic(),
+                "partition", String.valueOf(TP.partition()) 
+            );
+
+            var fetchLead = consumer.metrics().get(new 
MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags));
+            assertNotNull(fetchLead);
+            assertEquals((double) records.count(), fetchLead.metricValue(), 
"The lead should be " + records.count());
+
+            consumer.assign(List.of(tp2));
+            awaitNonEmptyRecords(consumer, tp2);
+            assertNull(consumer.metrics().get(new MetricName("records-lead", 
"consumer-fetch-manager-metrics", "", tags)));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerPerPartitionLagMetricsCleanUpWithAssign() 
throws Exception {
+        String consumerClientId = 
"testClassicConsumerPerPartitionLagMetricsCleanUpWithAssign";
+        testPerPartitionLagMetricsCleanUpWithAssign(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT),
+            GROUP_ID_CONFIG, consumerClientId,
+            CLIENT_ID_CONFIG, consumerClientId
+        ), consumerClientId);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerPerPartitionLagMetricsCleanUpWithAssign() 
throws Exception {
+        String consumerClientId = 
"testAsyncConsumerPerPartitionLagMetricsCleanUpWithAssign";
+        testPerPartitionLagMetricsCleanUpWithAssign(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT),
+            GROUP_ID_CONFIG, consumerClientId,
+            CLIENT_ID_CONFIG, consumerClientId
+        ), consumerClientId);
+    }
+
+    private void testPerPartitionLagMetricsCleanUpWithAssign(
+        Map<String, Object> consumerConfig,
+        String consumerClientId
+    ) throws Exception {
+        var numMessages = 1000;
+        var tp2 = new TopicPartition(TOPIC, 1);
+        cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT);
+
+        try (Producer<byte[], byte[]> producer = cluster.producer();
+             Consumer<byte[], byte[]> consumer = 
cluster.consumer(consumerConfig)
+        ) {
+            // Test assign send some messages.
+            sendRecords(producer, TP, numMessages, System.currentTimeMillis());
+            sendRecords(producer, tp2, numMessages, 
System.currentTimeMillis());
+
+            consumer.assign(List.of(TP));
+            var records = awaitNonEmptyRecords(consumer, TP);
+
+            // Verify the metric exist.
+            Map<String, String> tags = Map.of(
+                "client-id", consumerClientId,
+                "topic", TP.topic(),
+                "partition", String.valueOf(TP.partition())
+            );
+
+            var fetchLag = consumer.metrics().get(new 
MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags));
+            assertNotNull(fetchLag);
+
+            var expectedLag = numMessages - records.count();
+            assertEquals(expectedLag, (double) fetchLag.metricValue(), 
EPSILON, "The lag should be " + expectedLag);
+            consumer.assign(List.of(tp2));
+            awaitNonEmptyRecords(consumer, tp2);
+            assertNull(consumer.metrics().get(new MetricName(TP + 
".records-lag", "consumer-fetch-manager-metrics", "", tags)));
+            assertNull(consumer.metrics().get(new MetricName("records-lag", 
"consumer-fetch-manager-metrics", "", tags)));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerPerPartitionLagMetricsWhenReadCommitted() 
throws Exception {
+        String consumerClientId = 
"testClassicConsumerPerPartitionLagMetricsWhenReadCommitted";
+        testPerPartitionLagMetricsWhenReadCommitted(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT),
+            GROUP_ID_CONFIG, consumerClientId,
+            CLIENT_ID_CONFIG, consumerClientId,
+            ISOLATION_LEVEL_CONFIG, "read_committed"
+        ), consumerClientId);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerPerPartitionLagMetricsWhenReadCommitted() 
throws Exception {
+        String consumerClientId = 
"testAsyncConsumerPerPartitionLagMetricsWhenReadCommitted";
+        testPerPartitionLagMetricsWhenReadCommitted(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT),
+            GROUP_ID_CONFIG, consumerClientId,
+            CLIENT_ID_CONFIG, consumerClientId,
+            ISOLATION_LEVEL_CONFIG, "read_committed"
+        ), consumerClientId);
+    }
+
+    private void testPerPartitionLagMetricsWhenReadCommitted(
+        Map<String, Object> consumerConfig,
+        String consumerClientId
+    ) throws Exception {
+        var numMessages = 1000;
+        var tp2 = new TopicPartition(TOPIC, 1);
+        cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT);
+
+        try (Producer<byte[], byte[]> producer = cluster.producer();
+             Consumer<byte[], byte[]> consumer = 
cluster.consumer(consumerConfig)
+        ) {
+            // Test assign send some messages.
+            sendRecords(producer, TP, numMessages, System.currentTimeMillis());
+            sendRecords(producer, tp2, numMessages, 
System.currentTimeMillis());
+
+            consumer.assign(List.of(TP));
+            awaitNonEmptyRecords(consumer, TP);
+
+            // Verify the metric exist.
+            Map<String, String> tags = Map.of(
+                "client-id", consumerClientId,
+                "topic", TP.topic(),
+                "partition", String.valueOf(TP.partition())
+            );
+
+            var fetchLag = consumer.metrics().get(new 
MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags));
+            assertNotNull(fetchLag);
+        }
+    }
+
+    @ClusterTest
+    public void 
testClassicConsumerQuotaMetricsNotCreatedIfNoQuotasConfigured() throws 
Exception {
+        var consumerClientId = 
"testClassicConsumerQuotaMetricsNotCreatedIfNoQuotasConfigured";
+        testQuotaMetricsNotCreatedIfNoQuotasConfigured(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT),
+            GROUP_ID_CONFIG, consumerClientId,
+            CLIENT_ID_CONFIG, consumerClientId,
+            ISOLATION_LEVEL_CONFIG, "read_committed"
+        ), consumerClientId);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerQuotaMetricsNotCreatedIfNoQuotasConfigured() 
throws Exception {
+        var consumerClientId = 
"testAsyncConsumerQuotaMetricsNotCreatedIfNoQuotasConfigured";
+        testQuotaMetricsNotCreatedIfNoQuotasConfigured(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT),
+            GROUP_ID_CONFIG, consumerClientId,
+            CLIENT_ID_CONFIG, consumerClientId,
+            ISOLATION_LEVEL_CONFIG, "read_committed"
+        ), consumerClientId);
+    }
+
+    private void testQuotaMetricsNotCreatedIfNoQuotasConfigured(
+        Map<String, Object> consumerConfig, 
+        String consumerClientId
+    ) throws Exception {
+        var producerClientId = UUID.randomUUID().toString();
+        var numRecords = 1000;
+        cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT);
+
+        try (Producer<byte[], byte[]> producer = 
cluster.producer(Map.of(ProducerConfig.CLIENT_ID_CONFIG, producerClientId));
+             Consumer<byte[], byte[]> consumer = 
cluster.consumer(consumerConfig)
+        ) {
+            var startingTimestamp = System.currentTimeMillis();
+            sendRecords(producer, TP, numRecords, startingTimestamp);
+
+            consumer.assign(List.of(TP));
+            consumer.seek(TP, 0);
+            consumeAndVerifyRecords(consumer, TP, numRecords, 0, 0, 
startingTimestamp);
+            
+            var brokers = cluster.brokers().values();
+            brokers.forEach(broker -> assertNoMetric(broker, "byte-rate", 
QuotaType.PRODUCE, producerClientId));
+            brokers.forEach(broker -> assertNoMetric(broker, "throttle-time", 
QuotaType.PRODUCE, producerClientId));
+            brokers.forEach(broker -> assertNoMetric(broker, "byte-rate", 
QuotaType.FETCH, consumerClientId));
+            brokers.forEach(broker -> assertNoMetric(broker, "throttle-time", 
QuotaType.FETCH, consumerClientId));
+            brokers.forEach(broker -> assertNoMetric(broker, "request-time", 
QuotaType.REQUEST, producerClientId));
+            brokers.forEach(broker -> assertNoMetric(broker, "throttle-time", 
QuotaType.REQUEST, producerClientId));
+            brokers.forEach(broker -> assertNoMetric(broker, "request-time", 
QuotaType.REQUEST, consumerClientId));
+            brokers.forEach(broker -> assertNoMetric(broker, "throttle-time", 
QuotaType.REQUEST, consumerClientId));
+        }
+    }
+
+    private void assertNoMetric(KafkaBroker broker, String name, QuotaType 
quotaType, String clientId) {
+        var metricName = broker.metrics().metricName(name, 
quotaType.toString(), "", "user", "", "client-id", clientId);
+        assertNull(broker.metrics().metric(metricName), "Metric should not 
have been created " + metricName);
+    }
+    
+    @ClusterTest
+    public void 
testClassicConsumerSeekThrowsIllegalStateIfPartitionsNotAssigned() throws 
Exception {
+        testSeekThrowsIllegalStateIfPartitionsNotAssigned(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    @ClusterTest
+    public void 
testAsyncConsumerSeekThrowsIllegalStateIfPartitionsNotAssigned() throws 
Exception {
+        testSeekThrowsIllegalStateIfPartitionsNotAssigned(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    private void testSeekThrowsIllegalStateIfPartitionsNotAssigned(Map<String, 
Object> consumerConfig) throws Exception {
+        cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT);
+        try (var consumer = cluster.consumer(consumerConfig)) {
+            var e = assertThrows(IllegalStateException.class, () -> 
consumer.seekToEnd(List.of(TP)));
+            assertEquals("No current assignment for partition " + TP, 
e.getMessage());
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumingWithNullGroupId() throws Exception {
+        testConsumingWithNullGroupId(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT),
+            KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName(),
+            VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName(),
+            BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()
+        ));
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerConsumingWithNullGroupId() throws Exception {
+        testConsumingWithNullGroupId(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT),
+            KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName(),
+            VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName(),
+            BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()
+        ));
+    }
+
+    private void testConsumingWithNullGroupId(Map<String, Object> 
consumerConfig) throws Exception {
+        var partition = 0;
+        cluster.createTopic(TOPIC, 1, (short) 1);
+
+        // consumer 1 uses the default group id and consumes from earliest 
offset
+        Map<String, Object> consumer1Config = new HashMap<>(consumerConfig);
+        consumer1Config.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
+        consumer1Config.put(CLIENT_ID_CONFIG, "consumer1");
+
+        // consumer 2 uses the default group id and consumes from latest offset
+        Map<String, Object> consumer2Config = new HashMap<>(consumerConfig);
+        consumer2Config.put(AUTO_OFFSET_RESET_CONFIG, "latest");
+        consumer2Config.put(CLIENT_ID_CONFIG, "consumer2");
+
+        // consumer 3 uses the default group id and starts from an explicit 
offset
+        Map<String, Object> consumer3Config = new HashMap<>(consumerConfig);
+        consumer3Config.put(CLIENT_ID_CONFIG, "consumer3");
+
+        try (Producer<byte[], byte[]> producer = cluster.producer();
+             Consumer<byte[], byte[]> consumer1 = new 
KafkaConsumer<>(consumer1Config);
+             Consumer<byte[], byte[]> consumer2 = new 
KafkaConsumer<>(consumer2Config);
+             Consumer<byte[], byte[]> consumer3 = new 
KafkaConsumer<>(consumer3Config)
+        ) {
+            producer.send(new ProducerRecord<>(TOPIC, partition, 
"k1".getBytes(), "v1".getBytes())).get();
+            producer.send(new ProducerRecord<>(TOPIC, partition, 
"k2".getBytes(), "v2".getBytes())).get();
+            producer.send(new ProducerRecord<>(TOPIC, partition, 
"k3".getBytes(), "v3".getBytes())).get();
+
+            consumer1.assign(List.of(TP));
+            consumer2.assign(List.of(TP));
+            consumer3.assign(List.of(TP));
+            consumer3.seek(TP, 1);
+
+            var numRecords1 = consumer1.poll(Duration.ofMillis(5000)).count();
+            assertThrows(InvalidGroupIdException.class, consumer1::commitSync);
+            assertThrows(InvalidGroupIdException.class, () -> 
consumer2.committed(Set.of(TP)));
+
+            var numRecords2 = consumer2.poll(Duration.ofMillis(5000)).count();
+            var numRecords3 = consumer3.poll(Duration.ofMillis(5000)).count();
+
+            consumer1.unsubscribe();
+            consumer2.unsubscribe();
+            consumer3.unsubscribe();
+
+            assertTrue(consumer1.assignment().isEmpty());
+            assertTrue(consumer2.assignment().isEmpty());
+            assertTrue(consumer3.assignment().isEmpty());
+
+            consumer1.close();
+            consumer2.close();
+            consumer3.close();
+
+            assertEquals(3, numRecords1, "Expected consumer1 to consume from 
earliest offset");
+            assertEquals(0, numRecords2, "Expected consumer2 to consume from 
latest offset");
+            assertEquals(2, numRecords3, "Expected consumer3 to consume from 
offset 1");
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerNullGroupIdNotSupportedIfCommitting() 
throws Exception {
+        testNullGroupIdNotSupportedIfCommitting(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT),
+            KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName(),
+            VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName(),
+            BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers(),
+            AUTO_OFFSET_RESET_CONFIG, "earliest",
+            CLIENT_ID_CONFIG, "consumer1"
+        ));
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerNullGroupIdNotSupportedIfCommitting() throws 
Exception {
+        testNullGroupIdNotSupportedIfCommitting(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT),
+            KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName(),
+            VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName(),
+            BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers(),
+            AUTO_OFFSET_RESET_CONFIG, "earliest",
+            CLIENT_ID_CONFIG, "consumer1"
+        ));
+    }
+
+    private void testNullGroupIdNotSupportedIfCommitting(Map<String, Object> 
consumerConfig) throws Exception {
+        cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT);
+        try (var consumer = new KafkaConsumer<>(consumerConfig)) {
+            consumer.assign(List.of(TP));
+            assertThrows(InvalidGroupIdException.class, consumer::commitSync);
+        }
+    }
+
+    @ClusterTest
+    public void 
testClassicConsumerStaticConsumerDetectsNewPartitionCreatedAfterRestart() 
throws Exception {
+        testStaticConsumerDetectsNewPartitionCreatedAfterRestart(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT),
+            GROUP_ID_CONFIG, "my-group-id",
+            GROUP_INSTANCE_ID_CONFIG, "my-instance-id",
+            METADATA_MAX_AGE_CONFIG, 100,
+            MAX_POLL_INTERVAL_MS_CONFIG, 6000
+        ));
+    }
+
+    @ClusterTest
+    public void 
testAsyncConsumerStaticConsumerDetectsNewPartitionCreatedAfterRestart() throws 
Exception {
+        testStaticConsumerDetectsNewPartitionCreatedAfterRestart(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT),
+            GROUP_ID_CONFIG, "my-group-id",
+            GROUP_INSTANCE_ID_CONFIG, "my-instance-id",
+            METADATA_MAX_AGE_CONFIG, 100,
+            MAX_POLL_INTERVAL_MS_CONFIG, 6000
+        ));
+    }
+
+    private void 
testStaticConsumerDetectsNewPartitionCreatedAfterRestart(Map<String, Object> 
consumerConfig) throws Exception {
+        var foo = "foo";
+        var foo0 = new TopicPartition(foo, 0);
+        var foo1 = new TopicPartition(foo, 1);
+        cluster.createTopic(foo, 1, (short) 1);
+
+        try (Consumer<byte[], byte[]> consumer1 = 
cluster.consumer(consumerConfig);
+             Consumer<byte[], byte[]> consumer2 = 
cluster.consumer(consumerConfig);
+             var admin = cluster.admin()
+        ) {
+            consumer1.subscribe(List.of(foo));
+            awaitAssignment(consumer1, Set.of(foo0));
+            consumer1.close();
+
+            consumer2.subscribe(List.of(foo));
+            awaitAssignment(consumer2, Set.of(foo0));
+
+            admin.createPartitions(Map.of(foo, 
NewPartitions.increaseTo(2))).all().get();
+            awaitAssignment(consumer2, Set.of(foo0, foo1));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerEndOffsets() throws Exception {
+        testEndOffsets(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT),
+            METADATA_MAX_AGE_CONFIG, 100,
+            MAX_POLL_INTERVAL_MS_CONFIG, 6000
+        ));
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerEndOffsets() throws Exception {
+        testEndOffsets(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT),
+            METADATA_MAX_AGE_CONFIG, 100,
+            MAX_POLL_INTERVAL_MS_CONFIG, 6000
+        ));
+    }
+
+    private void testEndOffsets(Map<String, Object> consumerConfig) throws 
Exception {
+        var numRecords = 10000;
+        var tp2 = new TopicPartition(TOPIC, 1);
+        cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT);
+
+        try (Producer<byte[], byte[]> producer = cluster.producer();
+             Consumer<byte[], byte[]> consumer = 
cluster.consumer(consumerConfig)
+        ) {
+            var startingTimestamp = System.currentTimeMillis();
+            for (var i = 0; i < numRecords; i++) {
+                var timestamp = startingTimestamp + (long) i;
+                ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
+                    TP.topic(),
+                    TP.partition(),
+                    timestamp,
+                    ("key " + i).getBytes(),
+                    ("value " + i).getBytes()
+                );
+                producer.send(record);
+            }
+            producer.flush();
+
+            consumer.subscribe(List.of(TOPIC));
+            awaitAssignment(consumer, Set.of(TP, tp2));
+
+            var endOffsets = consumer.endOffsets(Set.of(TP));
+            assertEquals(numRecords, endOffsets.get(TP));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerFetchOffsetsForTime() throws Exception {
+        testFetchOffsetsForTime(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerFetchOffsetsForTime() throws Exception {
+        testFetchOffsetsForTime(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    private void testFetchOffsetsForTime(Map<String, Object> consumerConfig) 
throws Exception {
+        var numPartitions = 2;
+        var tp2 = new TopicPartition(TOPIC, 1);
+        cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT);
+
+        try (Producer<byte[], byte[]> producer = cluster.producer();
+             Consumer<byte[], byte[]> consumer = 
cluster.consumer(consumerConfig)
+        ) {
+            Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
+            for (int part = 0, i = 0; part < numPartitions; part++, i++) {
+                var tp = new TopicPartition(TOPIC, part);
+                // key, val, and timestamp equal to the sequence number.
+                sendRecords(producer, tp, 100, 0);
+                timestampsToSearch.put(tp, i * 20L);
+            }
+            // Test negative target time
+            assertThrows(IllegalArgumentException.class, () -> 
consumer.offsetsForTimes(Map.of(TP, -1L)));
+            var timestampOffsets = 
consumer.offsetsForTimes(timestampsToSearch);
+
+            var timestampTp0 = timestampOffsets.get(TP);
+            assertEquals(0, timestampTp0.offset());
+            assertEquals(0, timestampTp0.timestamp());
+            assertEquals(Optional.of(0), timestampTp0.leaderEpoch());
+
+            var timestampTp1 = timestampOffsets.get(tp2);
+            assertEquals(20, timestampTp1.offset());
+            assertEquals(20, timestampTp1.timestamp());
+            assertEquals(Optional.of(0), timestampTp1.leaderEpoch());
+        }
+    }
+    
+    @ClusterTest
+    public void testClassicConsumerPositionRespectsTimeout() {
+        testPositionRespectsTimeout(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerPositionRespectsTimeout() {
+        testPositionRespectsTimeout(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    private void testPositionRespectsTimeout(Map<String, Object> 
consumerConfig) {
+        var topicPartition = new TopicPartition(TOPIC, 15);
+        try (var consumer = cluster.consumer(consumerConfig)) {
+            consumer.assign(List.of(topicPartition));
+            // When position() is called for a topic/partition that doesn't 
exist, the consumer will repeatedly update the
+            // local metadata. However, it should give up after the 
user-supplied timeout has past.
+            assertThrows(TimeoutException.class, () -> 
consumer.position(topicPartition, Duration.ofSeconds(3)));
+        }
+    }
+    
+    @ClusterTest
+    public void testClassicConsumerPositionRespectsWakeup() {
+        testPositionRespectsWakeup(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerPositionRespectsWakeup() {
+        testPositionRespectsWakeup(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    private void testPositionRespectsWakeup(Map<String, Object> 
consumerConfig) {
+        var topicPartition = new TopicPartition(TOPIC, 15);
+        try (var consumer = cluster.consumer(consumerConfig)) {
+            consumer.assign(List.of(topicPartition));
+            CompletableFuture.runAsync(() -> {
+                try {
+                    TimeUnit.SECONDS.sleep(1);
+                    consumer.wakeup();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+            });
+            assertThrows(WakeupException.class, () -> 
consumer.position(topicPartition, Duration.ofSeconds(3)));
+        }
+    }
+    
+    @ClusterTest
+    public void testClassicConsumerPositionWithErrorConnectionRespectsWakeup() 
{
+        testPositionWithErrorConnectionRespectsWakeup(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT),
+            // make sure the connection fails
+            BOOTSTRAP_SERVERS_CONFIG, "localhost:12345"
+        ));
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerPositionWithErrorConnectionRespectsWakeup() {
+        testPositionWithErrorConnectionRespectsWakeup(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT),
+            // make sure the connection fails
+            BOOTSTRAP_SERVERS_CONFIG, "localhost:12345"
+        ));
+    }
+
+    private void testPositionWithErrorConnectionRespectsWakeup(Map<String, 
Object> consumerConfig) {
+        var topicPartition = new TopicPartition(TOPIC, 15);
+        try (var consumer = cluster.consumer(consumerConfig)) {
+            consumer.assign(List.of(topicPartition));
+            CompletableFuture.runAsync(() -> {
+                try {
+                    TimeUnit.SECONDS.sleep(1);
+                    consumer.wakeup();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+            });
+            assertThrows(WakeupException.class, () -> 
consumer.position(topicPartition, Duration.ofSeconds(100)));
+        }
+    }
+
+    @Flaky("KAFKA-18031")
+    @ClusterTest
+    public void testClassicConsumerCloseLeavesGroupOnInterrupt() throws 
Exception {
+        testCloseLeavesGroupOnInterrupt(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT),
+            KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName(),
+            VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName(),
+            AUTO_OFFSET_RESET_CONFIG, "earliest",
+            GROUP_ID_CONFIG, "group_test,",
+            BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()
+        ));
+    }
+
+    @Flaky("KAFKA-18031")
+    @ClusterTest
+    public void testAsyncConsumerCloseLeavesGroupOnInterrupt() throws 
Exception {
+        testCloseLeavesGroupOnInterrupt(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT),
+            KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName(),
+            VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName(),
+            AUTO_OFFSET_RESET_CONFIG, "earliest",
+            GROUP_ID_CONFIG, "group_test,",
+            BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()
+        ));
+    }
+
+    private void testCloseLeavesGroupOnInterrupt(Map<String, Object> 
consumerConfig) throws Exception {
+        try (Consumer<byte[], byte[]> consumer = 
cluster.consumer(consumerConfig)) {
+            var listener = new TestConsumerReassignmentListener();
+            consumer.subscribe(List.of(TOPIC), listener);
+            awaitRebalance(consumer, listener);
+
+            assertEquals(1, listener.callsToAssigned);
+            assertEquals(0, listener.callsToRevoked);
+
+            try {
+                Thread.currentThread().interrupt();
+                assertThrows(InterruptException.class, consumer::close);
+            } finally {
+                // Clear the interrupted flag so we don't create problems for 
subsequent tests.
+                Thread.interrupted();
+            }
+
+            assertEquals(1, listener.callsToAssigned);
+            assertEquals(1, listener.callsToRevoked);
+
+            Map<String, Object> consumerConfigMap = new 
HashMap<>(consumerConfig);
+            var config = new ConsumerConfig(consumerConfigMap);
+
+            // Set the wait timeout to be only *half* the configured session 
timeout. This way we can make sure that the
+            // consumer explicitly left the group as opposed to being kicked 
out by the broker.
+            var leaveGroupTimeoutMs = config.getInt(SESSION_TIMEOUT_MS_CONFIG) 
/ 2;
+
+            TestUtils.waitForCondition(
+                () -> checkGroupMemberEmpty(config), 
+                leaveGroupTimeoutMs, 
+                "Consumer did not leave the consumer group within " + 
leaveGroupTimeoutMs + " ms of close"
+            );
+        }
+    }
+
+    private boolean checkGroupMemberEmpty(ConsumerConfig config) {
+        try (var admin = cluster.admin()) {
+            var groupId = config.getString(GROUP_ID_CONFIG);
+            var result = admin.describeConsumerGroups(List.of(groupId));
+            var groupDescription = result.describedGroups().get(groupId).get();
+            return groupDescription.members().isEmpty();
+        } catch (ExecutionException | InterruptedException e) {
+            return false;
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerOffsetRelatedWhenTimeoutZero() throws 
Exception {
+        testOffsetRelatedWhenTimeoutZero(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerOffsetRelatedWhenTimeoutZero() throws 
Exception {
+        testOffsetRelatedWhenTimeoutZero(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    private void testOffsetRelatedWhenTimeoutZero(Map<String, Object> 
consumerConfig) throws Exception {
+        cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT);
+        try (var consumer = cluster.consumer(consumerConfig)) {
+            var result1 = consumer.beginningOffsets(List.of(TP), 
Duration.ZERO);
+            assertNotNull(result1);
+            assertEquals(0, result1.size());
+
+            var result2 = consumer.endOffsets(List.of(TP), Duration.ZERO);
+            assertNotNull(result2);
+            assertEquals(0, result2.size());
+
+            var result3 = consumer.offsetsForTimes(Map.of(TP, 0L), 
Duration.ZERO);
+            assertNotNull(result3);
+            assertEquals(1, result3.size());
+            assertNull(result3.get(TP));
+        }
+    }
+
+    private void sendCompressedMessages(int numRecords, TopicPartition tp) {
+        Map<String, Object> config = Map.of(
+            COMPRESSION_TYPE_CONFIG, CompressionType.GZIP.name,
+            LINGER_MS_CONFIG, Integer.MAX_VALUE
+        );
+        try (Producer<byte[], byte[]> producer = cluster.producer(config)) {
+            IntStream.range(0, numRecords).forEach(i -> producer.send(new 
ProducerRecord<>(
+                tp.topic(),
+                tp.partition(),
+                (long) i,
+                ("key " + i).getBytes(),
+                ("value " + i).getBytes()
+            )));
+        }
+    }
+
+    private ConsumerRecords<byte[], byte[]> awaitNonEmptyRecords(
+        Consumer<byte[], byte[]> consumer,
+        TopicPartition tp
+    ) throws Exception {
+        AtomicReference<ConsumerRecords<byte[], byte[]>> result = new 
AtomicReference<>();
+
+        TestUtils.waitForCondition(() -> {
+            var polledRecords = consumer.poll(Duration.ofSeconds(10));
+            boolean hasRecords = !polledRecords.isEmpty();
+            if (hasRecords) {
+                result.set(polledRecords);
+            }
+            return hasRecords;
+        }, "Timed out waiting for non-empty records from topic " + tp.topic() 
+ " partition " + tp.partition());
+
+        return result.get();
+    }
+
+    public static class SerializerImpl implements Serializer<byte[]> {
+        private final ByteArraySerializer serializer = new 
ByteArraySerializer();
+
+        @Override
+        public byte[] serialize(String topic, byte[] data) {
+            throw new RuntimeException("This method should not be called");
+        }
+
+        @Override
+        public byte[] serialize(String topic, Headers headers, byte[] data) {
+            headers.add("content-type", "application/octet-stream".getBytes());
+            return serializer.serialize(topic, headers, data);
+        }
+    }
+
+    public static class DeserializerImpl implements Deserializer<byte[]> {
+        private final ByteArrayDeserializer deserializer = new 
ByteArrayDeserializer();
+
+        @Override
+        public byte[] deserialize(String topic, byte[] data) {
+            throw new RuntimeException("This method should not be called");
+        }
+
+        @Override
+        public byte[] deserialize(String topic, Headers headers, byte[] data) {
+            Header contentType = headers.lastHeader("content-type");
+            assertNotNull(contentType);
+            assertEquals("application/octet-stream", new 
String(contentType.value()));
+            return deserializer.deserialize(topic, headers, data);
+        }
+    }
+}
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index 2d5e1a2c631..adfb657b776 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -19,10 +19,9 @@ package kafka.api
 import kafka.utils.TestInfoUtils
 import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, 
GroupProtocol}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}
-import org.apache.kafka.common.header.Headers
 import org.apache.kafka.common.{ClusterResource, ClusterResourceListener, 
PartitionInfo}
 import org.apache.kafka.common.internals.Topic
-import org.apache.kafka.common.serialization.{ByteArrayDeserializer, 
ByteArraySerializer, Deserializer, Serializer}
+import org.apache.kafka.common.serialization.{Deserializer, Serializer}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.MethodSource
@@ -130,41 +129,4 @@ object BaseConsumerTest {
     override def onUpdate(clusterResource: ClusterResource): Unit = 
updateConsumerCount.incrementAndGet()
     override def deserialize(topic: String, data: Array[Byte]): Array[Byte] = 
data
   }
-
-  class SerializerImpl extends Serializer[Array[Byte]] {
-    var serializer = new ByteArraySerializer()
-
-    override def serialize(topic: String, headers: Headers, data: 
Array[Byte]): Array[Byte] = {
-      headers.add("content-type", "application/octet-stream".getBytes)
-      serializer.serialize(topic, data)
-    }
-
-    override def configure(configs: java.util.Map[String, _], isKey: Boolean): 
Unit = serializer.configure(configs, isKey)
-
-    override def close(): Unit = serializer.close()
-
-    override def serialize(topic: String, data: Array[Byte]): Array[Byte] = {
-      fail("method should not be invoked")
-      null
-    }
-  }
-
-  class DeserializerImpl extends Deserializer[Array[Byte]] {
-    var deserializer = new ByteArrayDeserializer()
-
-    override def deserialize(topic: String, headers: Headers, data: 
Array[Byte]): Array[Byte] = {
-      val header = headers.lastHeader("content-type")
-      assertEquals("application/octet-stream", if (header == null) null else 
new String(header.value()))
-      deserializer.deserialize(topic, data)
-    }
-
-    override def configure(configs: java.util.Map[String, _], isKey: Boolean): 
Unit = deserializer.configure(configs, isKey)
-
-    override def close(): Unit = deserializer.close()
-
-    override def deserialize(topic: String, data: Array[Byte]): Array[Byte] = {
-      fail("method should not be invoked")
-      null
-    }
-  }
 }
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index c9d33354c5d..fc60cab1d0d 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -12,820 +12,21 @@
   */
 package kafka.api
 
-import kafka.api.BaseConsumerTest.{DeserializerImpl, SerializerImpl}
-
-import java.time.Duration
 import java.util
-import java.util.{Locale, Optional, Properties}
-import kafka.server.KafkaBroker
 import kafka.utils.{TestInfoUtils, TestUtils}
-import org.apache.kafka.clients.admin.{NewPartitions, NewTopic}
 import org.apache.kafka.clients.consumer._
-import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
-import org.apache.kafka.common.config.TopicConfig
-import org.apache.kafka.common.errors.{InterruptException, 
InvalidGroupIdException, InvalidTopicException, TimeoutException, 
WakeupException}
-import org.apache.kafka.common.record.{CompressionType, TimestampType}
-import org.apache.kafka.common.serialization._
+import org.apache.kafka.common.errors.InterruptException
 import org.apache.kafka.common.test.api.Flaky
-import org.apache.kafka.common.{MetricName, TopicPartition}
-import org.apache.kafka.server.quota.QuotaType
-import org.apache.kafka.test.{MockConsumerInterceptor, MockProducerInterceptor}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Timeout
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.MethodSource
 
-import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit}
+import java.util.concurrent.ExecutionException
 
 @Timeout(600)
 class PlaintextConsumerTest extends BaseConsumerTest {
 
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testHeaders(groupProtocol: String): Unit = {
-    val numRecords = 1
-    val record = new ProducerRecord(tp.topic, tp.partition, null, 
"key".getBytes, "value".getBytes)
-
-    record.headers().add("headerKey", "headerValue".getBytes)
-
-    val producer = createProducer()
-    producer.send(record)
-
-    val consumer = createConsumer()
-    assertEquals(0, consumer.assignment.size)
-    consumer.assign(java.util.List.of(tp))
-    assertEquals(1, consumer.assignment.size)
-
-    consumer.seek(tp, 0)
-    val records = consumeRecords(consumer = consumer, numRecords = numRecords)
-
-    assertEquals(numRecords, records.size)
-
-    for (i <- 0 until numRecords) {
-      val record = records(i)
-      val header = record.headers().lastHeader("headerKey")
-      assertEquals("headerValue", if (header == null) null else new 
String(header.value()))
-    }
-  }
-
-  private def testHeadersSerializeDeserialize(serializer: 
Serializer[Array[Byte]], deserializer: Deserializer[Array[Byte]]): Unit = {
-    val numRecords = 1
-    val record = new ProducerRecord(tp.topic, tp.partition, null, 
"key".getBytes, "value".getBytes)
-
-    val producer = createProducer(
-      keySerializer = new ByteArraySerializer,
-      valueSerializer = serializer)
-    producer.send(record)
-
-    val consumer = createConsumer(
-      keyDeserializer = new ByteArrayDeserializer,
-      valueDeserializer = deserializer)
-    assertEquals(0, consumer.assignment.size)
-    consumer.assign(java.util.List.of(tp))
-    assertEquals(1, consumer.assignment.size)
-
-    consumer.seek(tp, 0)
-    val records = consumeRecords(consumer = consumer, numRecords = numRecords)
-
-    assertEquals(numRecords, records.size)
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testHeadersSerializerDeserializer(groupProtocol: String): Unit = {
-    val extendedSerializer = new SerializerImpl
-
-    val extendedDeserializer = new DeserializerImpl
-
-    testHeadersSerializeDeserialize(extendedSerializer, extendedDeserializer)
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testAutoOffsetReset(groupProtocol: String): Unit = {
-    val producer = createProducer()
-    val startingTimestamp = System.currentTimeMillis()
-    sendRecords(producer, numRecords = 1, tp, startingTimestamp = 
startingTimestamp)
-
-    val consumer = createConsumer()
-    consumer.assign(java.util.List.of(tp))
-    consumeAndVerifyRecords(consumer = consumer, numRecords = 1, 
startingOffset = 0, startingTimestamp = startingTimestamp)
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testGroupConsumption(groupProtocol: String): Unit = {
-    val producer = createProducer()
-    val startingTimestamp = System.currentTimeMillis()
-    sendRecords(producer, numRecords = 10, tp, startingTimestamp = 
startingTimestamp)
-
-    val consumer = createConsumer()
-    consumer.subscribe(java.util.List.of(topic))
-    consumeAndVerifyRecords(consumer = consumer, numRecords = 1, 
startingOffset = 0, startingTimestamp = startingTimestamp)
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testPartitionsFor(groupProtocol: String): Unit = {
-    val numParts = 2
-    createTopic("part-test", numParts)
-    val consumer = createConsumer()
-    val parts = consumer.partitionsFor("part-test")
-    assertNotNull(parts)
-    assertEquals(2, parts.size)
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testPartitionsForAutoCreate(groupProtocol: String): Unit = {
-    val consumer = createConsumer()
-    // First call would create the topic
-    consumer.partitionsFor("non-exist-topic")
-    TestUtils.waitUntilTrue(() => {
-      !consumer.partitionsFor("non-exist-topic").isEmpty
-    }, s"Timed out while awaiting non empty partitions.")
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testPartitionsForInvalidTopic(groupProtocol: String): Unit = {
-    val consumer = createConsumer()
-    assertThrows(classOf[InvalidTopicException], () => 
consumer.partitionsFor(";3# ads,{234"))
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testSeek(groupProtocol: String): Unit = {
-    val consumer = createConsumer()
-    val totalRecords = 50L
-    val mid = totalRecords / 2
-
-    // Test seek non-compressed message
-    val producer = createProducer()
-    val startingTimestamp = 0
-    sendRecords(producer, totalRecords.toInt, tp, startingTimestamp = 
startingTimestamp)
-    consumer.assign(java.util.List.of(tp))
-
-    consumer.seekToEnd(java.util.List.of(tp))
-    assertEquals(totalRecords, consumer.position(tp))
-    assertTrue(consumer.poll(Duration.ofMillis(50)).isEmpty)
-
-    consumer.seekToBeginning(java.util.List.of(tp))
-    assertEquals(0L, consumer.position(tp))
-    consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = 0, 
startingTimestamp = startingTimestamp)
-
-    consumer.seek(tp, mid)
-    assertEquals(mid, consumer.position(tp))
-
-    consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = 
mid.toInt, startingKeyAndValueIndex = mid.toInt,
-      startingTimestamp = mid)
-
-    // Test seek compressed message
-    sendCompressedMessages(totalRecords.toInt, tp2)
-    consumer.assign(java.util.List.of(tp2))
-
-    consumer.seekToEnd(java.util.List.of(tp2))
-    assertEquals(totalRecords, consumer.position(tp2))
-    assertTrue(consumer.poll(Duration.ofMillis(50)).isEmpty)
-
-    consumer.seekToBeginning(java.util.List.of(tp2))
-    assertEquals(0L, consumer.position(tp2))
-    consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = 0, tp = 
tp2)
-
-    consumer.seek(tp2, mid)
-    assertEquals(mid, consumer.position(tp2))
-    consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = 
mid.toInt, startingKeyAndValueIndex = mid.toInt,
-      startingTimestamp = mid, tp = tp2)
-  }
-
-  private def sendCompressedMessages(numRecords: Int, tp: TopicPartition): 
Unit = {
-    val producerProps = new Properties()
-    producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, 
CompressionType.GZIP.name)
-    producerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, 
Int.MaxValue.toString)
-    val producer = createProducer(configOverrides = producerProps)
-    (0 until numRecords).foreach { i =>
-      producer.send(new ProducerRecord(tp.topic, tp.partition, i.toLong, s"key 
$i".getBytes, s"value $i".getBytes))
-    }
-    producer.close()
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testPartitionPauseAndResume(groupProtocol: String): Unit = {
-    val partitions = java.util.List.of(tp)
-    val producer = createProducer()
-    var startingTimestamp = System.currentTimeMillis()
-    sendRecords(producer, numRecords = 5, tp, startingTimestamp = 
startingTimestamp)
-
-    val consumer = createConsumer()
-    consumer.assign(partitions)
-    consumeAndVerifyRecords(consumer = consumer, numRecords = 5, 
startingOffset = 0, startingTimestamp = startingTimestamp)
-    consumer.pause(partitions)
-    startingTimestamp = System.currentTimeMillis()
-    sendRecords(producer, numRecords = 5, tp, startingTimestamp = 
startingTimestamp)
-    assertTrue(consumer.poll(Duration.ofMillis(100)).isEmpty)
-    consumer.resume(partitions)
-    consumeAndVerifyRecords(consumer = consumer, numRecords = 5, 
startingOffset = 5, startingTimestamp = startingTimestamp)
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testInterceptors(groupProtocol: String): Unit = {
-    val appendStr = "mock"
-    MockConsumerInterceptor.resetCounters()
-    MockProducerInterceptor.resetCounters()
-
-    // create producer with interceptor
-    val producerProps = new Properties()
-    producerProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, 
classOf[MockProducerInterceptor].getName)
-    producerProps.put("mock.interceptor.append", appendStr)
-    val testProducer = createProducer(keySerializer = new StringSerializer,
-      valueSerializer = new StringSerializer,
-      configOverrides = producerProps)
-
-    // produce records
-    val numRecords = 10
-    (0 until numRecords).map { i =>
-      testProducer.send(new ProducerRecord(tp.topic, tp.partition, s"key $i", 
s"value $i"))
-    }.foreach(_.get)
-    assertEquals(numRecords, MockProducerInterceptor.ONSEND_COUNT.intValue)
-    assertEquals(numRecords, MockProducerInterceptor.ON_SUCCESS_COUNT.intValue)
-    // send invalid record
-    assertThrows(classOf[Throwable], () => testProducer.send(null), () => 
"Should not allow sending a null record")
-    assertEquals(1, MockProducerInterceptor.ON_ERROR_COUNT.intValue, 
"Interceptor should be notified about exception")
-    assertEquals(0, 
MockProducerInterceptor.ON_ERROR_WITH_METADATA_COUNT.intValue(), "Interceptor 
should not receive metadata with an exception when record is null")
-
-    // create consumer with interceptor
-    this.consumerConfig.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, 
"org.apache.kafka.test.MockConsumerInterceptor")
-    val testConsumer = createConsumer(keyDeserializer = new 
StringDeserializer, valueDeserializer = new StringDeserializer)
-    testConsumer.assign(java.util.List.of(tp))
-    testConsumer.seek(tp, 0)
-
-    // consume and verify that values are modified by interceptors
-    val records = consumeRecords(testConsumer, numRecords)
-    for (i <- 0 until numRecords) {
-      val record = records(i)
-      assertEquals(s"key $i", new String(record.key))
-      assertEquals(s"value $i$appendStr".toUpperCase(Locale.ROOT), new 
String(record.value))
-    }
-
-    // commit sync and verify onCommit is called
-    val commitCountBefore = MockConsumerInterceptor.ON_COMMIT_COUNT.intValue
-    testConsumer.commitSync(java.util.Map.of(tp, new OffsetAndMetadata(2L)))
-    assertEquals(2, 
testConsumer.committed(java.util.Set.of(tp)).get(tp).offset)
-    assertEquals(commitCountBefore + 1, 
MockConsumerInterceptor.ON_COMMIT_COUNT.intValue)
-
-    // commit async and verify onCommit is called
-    sendAndAwaitAsyncCommit(testConsumer, Some(Map(tp -> new 
OffsetAndMetadata(5L))))
-    assertEquals(5, 
testConsumer.committed(java.util.Set.of(tp)).get(tp).offset)
-    assertEquals(commitCountBefore + 2, 
MockConsumerInterceptor.ON_COMMIT_COUNT.intValue)
-
-    testConsumer.close()
-    testProducer.close()
-
-    // cleanup
-    MockConsumerInterceptor.resetCounters()
-    MockProducerInterceptor.resetCounters()
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testInterceptorsWithWrongKeyValue(groupProtocol: String): Unit = {
-    val appendStr = "mock"
-    // create producer with interceptor that has different key and value types 
from the producer
-    val producerProps = new Properties()
-    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers())
-    producerProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, 
"org.apache.kafka.test.MockProducerInterceptor")
-    producerProps.put("mock.interceptor.append", appendStr)
-    val testProducer = createProducer()
-
-    // producing records should succeed
-    testProducer.send(new ProducerRecord(tp.topic(), tp.partition(), 
s"key".getBytes, s"value will not be modified".getBytes))
-
-    // create consumer with interceptor that has different key and value types 
from the consumer
-    this.consumerConfig.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, 
"org.apache.kafka.test.MockConsumerInterceptor")
-    val testConsumer = createConsumer()
-
-    testConsumer.assign(java.util.List.of(tp))
-    testConsumer.seek(tp, 0)
-
-    // consume and verify that values are not modified by interceptors -- 
their exceptions are caught and logged, but not propagated
-    val records = consumeRecords(testConsumer, 1)
-    val record = records.head
-    assertEquals(s"value will not be modified", new String(record.value()))
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testConsumeMessagesWithCreateTime(groupProtocol: String): Unit = {
-    val numRecords = 50
-    // Test non-compressed messages
-    val producer = createProducer()
-    val startingTimestamp = System.currentTimeMillis()
-    sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
-    val consumer = createConsumer()
-    consumer.assign(java.util.List.of(tp))
-    consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords, 
startingOffset = 0, startingTimestamp = startingTimestamp)
-
-    // Test compressed messages
-    sendCompressedMessages(numRecords, tp2)
-    consumer.assign(java.util.List.of(tp2))
-    consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords, tp = 
tp2, startingOffset = 0)
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testConsumeMessagesWithLogAppendTime(groupProtocol: String): Unit = {
-    val topicName = "testConsumeMessagesWithLogAppendTime"
-    val topicProps = new Properties()
-    topicProps.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, 
"LogAppendTime")
-    createTopic(topicName, 2, 2, topicProps)
-
-    val startTime = System.currentTimeMillis()
-    val numRecords = 50
-
-    // Test non-compressed messages
-    val tp1 = new TopicPartition(topicName, 0)
-    val producer = createProducer()
-    sendRecords(producer, numRecords, tp1)
-
-    val consumer = createConsumer()
-    consumer.assign(java.util.List.of(tp1))
-    consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords, tp = 
tp1, startingOffset = 0,
-      startingTimestamp = startTime, timestampType = 
TimestampType.LOG_APPEND_TIME)
-
-    // Test compressed messages
-    val tp2 = new TopicPartition(topicName, 1)
-    sendCompressedMessages(numRecords, tp2)
-    consumer.assign(java.util.List.of(tp2))
-    consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords, tp = 
tp2, startingOffset = 0,
-      startingTimestamp = startTime, timestampType = 
TimestampType.LOG_APPEND_TIME)
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testListTopics(groupProtocol: String): Unit = {
-    val numParts = 2
-    val topic1 = "part-test-topic-1"
-    val topic2 = "part-test-topic-2"
-    val topic3 = "part-test-topic-3"
-    createTopic(topic1, numParts)
-    createTopic(topic2, numParts)
-    createTopic(topic3, numParts)
-
-    val consumer = createConsumer()
-    val topics = consumer.listTopics()
-    assertNotNull(topics)
-    assertEquals(5, topics.size())
-    assertEquals(5, topics.keySet().size())
-    assertEquals(2, topics.get(topic1).size)
-    assertEquals(2, topics.get(topic2).size)
-    assertEquals(2, topics.get(topic3).size)
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testPauseStateNotPreservedByRebalance(groupProtocol: String): Unit = {
-    if (groupProtocol.equalsIgnoreCase(GroupProtocol.CLASSIC.name)) {
-      
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
"100") // timeout quickly to avoid slow test
-      
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 
"30")
-    }
-    val consumer = createConsumer()
-
-    val producer = createProducer()
-    val startingTimestamp = System.currentTimeMillis()
-    sendRecords(producer, numRecords = 5, tp, startingTimestamp = 
startingTimestamp)
-    consumer.subscribe(java.util.List.of(topic))
-    consumeAndVerifyRecords(consumer = consumer, numRecords = 5, 
startingOffset = 0, startingTimestamp = startingTimestamp)
-    consumer.pause(java.util.List.of(tp))
-
-    // subscribe to a new topic to trigger a rebalance
-    consumer.subscribe(java.util.List.of("topic2"))
-
-    // after rebalance, our position should be reset and our pause state lost,
-    // so we should be able to consume from the beginning
-    consumeAndVerifyRecords(consumer = consumer, numRecords = 0, 
startingOffset = 5, startingTimestamp = startingTimestamp)
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testPerPartitionLeadMetricsCleanUpWithSubscribe(groupProtocol: String): 
Unit = {
-    val numMessages = 1000
-    val topic2 = "topic2"
-    createTopic(topic2, 2, brokerCount)
-    // send some messages.
-    val producer = createProducer()
-    sendRecords(producer, numMessages, tp)
-    // Test subscribe
-    // Create a consumer and consumer some messages.
-    consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
"testPerPartitionLeadMetricsCleanUpWithSubscribe")
-    consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
"testPerPartitionLeadMetricsCleanUpWithSubscribe")
-    val consumer = createConsumer()
-    val listener = new TestConsumerReassignmentListener
-    consumer.subscribe(java.util.List.of(topic, topic2), listener)
-    val records = awaitNonEmptyRecords(consumer, tp)
-    assertEquals(1, listener.callsToAssigned, "should be assigned once")
-    // Verify the metric exist.
-    val tags1 = new util.HashMap[String, String]()
-    tags1.put("client-id", "testPerPartitionLeadMetricsCleanUpWithSubscribe")
-    tags1.put("topic", tp.topic())
-    tags1.put("partition", String.valueOf(tp.partition()))
-
-    val tags2 = new util.HashMap[String, String]()
-    tags2.put("client-id", "testPerPartitionLeadMetricsCleanUpWithSubscribe")
-    tags2.put("topic", tp2.topic())
-    tags2.put("partition", String.valueOf(tp2.partition()))
-    val fetchLead0 = consumer.metrics.get(new MetricName("records-lead", 
"consumer-fetch-manager-metrics", "", tags1))
-    assertNotNull(fetchLead0)
-    assertEquals(records.count.toDouble, fetchLead0.metricValue(), s"The lead 
should be ${records.count}")
-
-    // Remove topic from subscription
-    consumer.subscribe(java.util.List.of(topic2), listener)
-    awaitRebalance(consumer, listener)
-    // Verify the metric has gone
-    assertNull(consumer.metrics.get(new MetricName("records-lead", 
"consumer-fetch-manager-metrics", "", tags1)))
-    assertNull(consumer.metrics.get(new MetricName("records-lead", 
"consumer-fetch-manager-metrics", "", tags2)))
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testPerPartitionLagMetricsCleanUpWithSubscribe(groupProtocol: String): 
Unit = {
-    val numMessages = 1000
-    val topic2 = "topic2"
-    createTopic(topic2, 2, brokerCount)
-    // send some messages.
-    val producer = createProducer()
-    sendRecords(producer, numMessages, tp)
-    // Test subscribe
-    // Create a consumer and consumer some messages.
-    consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
"testPerPartitionLagMetricsCleanUpWithSubscribe")
-    consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
"testPerPartitionLagMetricsCleanUpWithSubscribe")
-    val consumer = createConsumer()
-    val listener = new TestConsumerReassignmentListener
-    consumer.subscribe(java.util.List.of(topic, topic2), listener)
-    val records = awaitNonEmptyRecords(consumer, tp)
-    assertEquals(1, listener.callsToAssigned, "should be assigned once")
-    // Verify the metric exist.
-    val tags1 = new util.HashMap[String, String]()
-    tags1.put("client-id", "testPerPartitionLagMetricsCleanUpWithSubscribe")
-    tags1.put("topic", tp.topic())
-    tags1.put("partition", String.valueOf(tp.partition()))
-
-    val tags2 = new util.HashMap[String, String]()
-    tags2.put("client-id", "testPerPartitionLagMetricsCleanUpWithSubscribe")
-    tags2.put("topic", tp2.topic())
-    tags2.put("partition", String.valueOf(tp2.partition()))
-    val fetchLag0 = consumer.metrics.get(new MetricName("records-lag", 
"consumer-fetch-manager-metrics", "", tags1))
-    assertNotNull(fetchLag0)
-    val expectedLag = numMessages - records.count
-    assertEquals(expectedLag, fetchLag0.metricValue.asInstanceOf[Double], 
epsilon, s"The lag should be $expectedLag")
-
-    // Remove topic from subscription
-    consumer.subscribe(java.util.List.of(topic2), listener)
-    awaitRebalance(consumer, listener)
-    // Verify the metric has gone
-    assertNull(consumer.metrics.get(new MetricName("records-lag", 
"consumer-fetch-manager-metrics", "", tags1)))
-    assertNull(consumer.metrics.get(new MetricName("records-lag", 
"consumer-fetch-manager-metrics", "", tags2)))
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testPerPartitionLeadMetricsCleanUpWithAssign(groupProtocol: String): 
Unit = {
-    val numMessages = 1000
-    // Test assign
-    // send some messages.
-    val producer = createProducer()
-    sendRecords(producer, numMessages, tp)
-    sendRecords(producer, numMessages, tp2)
-
-    consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
"testPerPartitionLeadMetricsCleanUpWithAssign")
-    consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
"testPerPartitionLeadMetricsCleanUpWithAssign")
-    val consumer = createConsumer()
-    consumer.assign(java.util.List.of(tp))
-    val records = awaitNonEmptyRecords(consumer, tp)
-    // Verify the metric exist.
-    val tags = new util.HashMap[String, String]()
-    tags.put("client-id", "testPerPartitionLeadMetricsCleanUpWithAssign")
-    tags.put("topic", tp.topic())
-    tags.put("partition", String.valueOf(tp.partition()))
-    val fetchLead = consumer.metrics.get(new MetricName("records-lead", 
"consumer-fetch-manager-metrics", "", tags))
-    assertNotNull(fetchLead)
-
-    assertEquals(records.count.toDouble, fetchLead.metricValue(), s"The lead 
should be ${records.count}")
-
-    consumer.assign(java.util.List.of(tp2))
-    awaitNonEmptyRecords(consumer ,tp2)
-    assertNull(consumer.metrics.get(new MetricName("records-lead", 
"consumer-fetch-manager-metrics", "", tags)))
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testPerPartitionLagMetricsCleanUpWithAssign(groupProtocol: String): Unit 
= {
-    val numMessages = 1000
-    // Test assign
-    // send some messages.
-    val producer = createProducer()
-    sendRecords(producer, numMessages, tp)
-    sendRecords(producer, numMessages, tp2)
-
-    consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
"testPerPartitionLagMetricsCleanUpWithAssign")
-    consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
"testPerPartitionLagMetricsCleanUpWithAssign")
-    val consumer = createConsumer()
-    consumer.assign(java.util.List.of(tp))
-    val records = awaitNonEmptyRecords(consumer, tp)
-    // Verify the metric exist.
-    val tags = new util.HashMap[String, String]()
-    tags.put("client-id", "testPerPartitionLagMetricsCleanUpWithAssign")
-    tags.put("topic", tp.topic())
-    tags.put("partition", String.valueOf(tp.partition()))
-    val fetchLag = consumer.metrics.get(new MetricName("records-lag", 
"consumer-fetch-manager-metrics", "", tags))
-    assertNotNull(fetchLag)
-
-    val expectedLag = numMessages - records.count
-    assertEquals(expectedLag, fetchLag.metricValue.asInstanceOf[Double], 
epsilon, s"The lag should be $expectedLag")
-
-    consumer.assign(java.util.List.of(tp2))
-    awaitNonEmptyRecords(consumer, tp2)
-    assertNull(consumer.metrics.get(new MetricName(tp.toString + 
".records-lag", "consumer-fetch-manager-metrics", "", tags)))
-    assertNull(consumer.metrics.get(new MetricName("records-lag", 
"consumer-fetch-manager-metrics", "", tags)))
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testPerPartitionLagMetricsWhenReadCommitted(groupProtocol: String): Unit 
= {
-    val numMessages = 1000
-    // send some messages.
-    val producer = createProducer()
-    sendRecords(producer, numMessages, tp)
-    sendRecords(producer, numMessages, tp2)
-
-    consumerConfig.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
"read_committed")
-    consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
"testPerPartitionLagMetricsCleanUpWithAssign")
-    consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
"testPerPartitionLagMetricsCleanUpWithAssign")
-    val consumer = createConsumer()
-    consumer.assign(java.util.List.of(tp))
-    awaitNonEmptyRecords(consumer, tp)
-    // Verify the metric exist.
-    val tags = new util.HashMap[String, String]()
-    tags.put("client-id", "testPerPartitionLagMetricsCleanUpWithAssign")
-    tags.put("topic", tp.topic())
-    tags.put("partition", String.valueOf(tp.partition()))
-    val fetchLag = consumer.metrics.get(new MetricName("records-lag", 
"consumer-fetch-manager-metrics", "", tags))
-    assertNotNull(fetchLag)
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testQuotaMetricsNotCreatedIfNoQuotasConfigured(groupProtocol: String): 
Unit = {
-    val numRecords = 1000
-    val producer = createProducer()
-    val startingTimestamp = System.currentTimeMillis()
-    sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
-
-    val consumer = createConsumer()
-    consumer.assign(java.util.List.of(tp))
-    consumer.seek(tp, 0)
-    consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords, 
startingOffset = 0, startingTimestamp = startingTimestamp)
-
-    def assertNoMetric(broker: KafkaBroker, name: String, quotaType: 
QuotaType, clientId: String): Unit = {
-        val metricName = broker.metrics.metricName("throttle-time",
-                                  quotaType.toString,
-                                  "",
-                                  "user", "",
-                                  "client-id", clientId)
-        assertNull(broker.metrics.metric(metricName), "Metric should not have 
been created " + metricName)
-    }
-    brokers.foreach(assertNoMetric(_, "byte-rate", QuotaType.PRODUCE, 
producerClientId))
-    brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.PRODUCE, 
producerClientId))
-    brokers.foreach(assertNoMetric(_, "byte-rate", QuotaType.FETCH, 
consumerClientId))
-    brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.FETCH, 
consumerClientId))
-
-    brokers.foreach(assertNoMetric(_, "request-time", QuotaType.REQUEST, 
producerClientId))
-    brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.REQUEST, 
producerClientId))
-    brokers.foreach(assertNoMetric(_, "request-time", QuotaType.REQUEST, 
consumerClientId))
-    brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.REQUEST, 
consumerClientId))
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testConsumingWithNullGroupId(groupProtocol: String): Unit = {
-    val topic = "test_topic"
-    val partition = 0
-    val tp = new TopicPartition(topic, partition)
-    createTopic(topic)
-
-    val producer = createProducer()
-    producer.send(new ProducerRecord(topic, partition, "k1".getBytes, 
"v1".getBytes)).get()
-    producer.send(new ProducerRecord(topic, partition, "k2".getBytes, 
"v2".getBytes)).get()
-    producer.send(new ProducerRecord(topic, partition, "k3".getBytes, 
"v3".getBytes)).get()
-    producer.close()
-
-    // consumer 1 uses the default group id and consumes from earliest offset
-    val consumer1Config = new Properties(consumerConfig)
-    consumer1Config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
-    consumer1Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer1")
-    val consumer1 = createConsumer(
-      configOverrides = consumer1Config,
-      configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG))
-
-    // consumer 2 uses the default group id and consumes from latest offset
-    val consumer2Config = new Properties(consumerConfig)
-    consumer2Config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
-    consumer2Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer2")
-    val consumer2 = createConsumer(
-      configOverrides = consumer2Config,
-      configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG))
-
-    // consumer 3 uses the default group id and starts from an explicit offset
-    val consumer3Config = new Properties(consumerConfig)
-    consumer3Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer3")
-    val consumer3 = createConsumer(
-      configOverrides = consumer3Config,
-      configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG))
-
-    consumer1.assign(util.List.of(tp))
-    consumer2.assign(util.List.of(tp))
-    consumer3.assign(util.List.of(tp))
-    consumer3.seek(tp, 1)
-
-    val numRecords1 = consumer1.poll(Duration.ofMillis(5000)).count()
-    assertThrows(classOf[InvalidGroupIdException], () => 
consumer1.commitSync())
-    assertThrows(classOf[InvalidGroupIdException], () => 
consumer2.committed(java.util.Set.of(tp)))
-
-    val numRecords2 = consumer2.poll(Duration.ofMillis(5000)).count()
-    val numRecords3 = consumer3.poll(Duration.ofMillis(5000)).count()
-
-    consumer1.unsubscribe()
-    consumer2.unsubscribe()
-    consumer3.unsubscribe()
-
-    assertTrue(consumer1.assignment().isEmpty)
-    assertTrue(consumer2.assignment().isEmpty)
-    assertTrue(consumer3.assignment().isEmpty)
-
-    consumer1.close()
-    consumer2.close()
-    consumer3.close()
-
-    assertEquals(3, numRecords1, "Expected consumer1 to consume from earliest 
offset")
-    assertEquals(0, numRecords2, "Expected consumer2 to consume from latest 
offset")
-    assertEquals(2, numRecords3, "Expected consumer3 to consume from offset 1")
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testNullGroupIdNotSupportedIfCommitting(groupProtocol: String): Unit = {
-    val consumer1Config = new Properties(consumerConfig)
-    consumer1Config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
-    consumer1Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer1")
-    val consumer1 = createConsumer(
-      configOverrides = consumer1Config,
-      configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG))
-
-    consumer1.assign(java.util.List.of(tp))
-    assertThrows(classOf[InvalidGroupIdException], () => 
consumer1.commitSync())
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testStaticConsumerDetectsNewPartitionCreatedAfterRestart(groupProtocol: 
String): Unit = {
-    val foo = "foo"
-    val foo0 = new TopicPartition(foo, 0)
-    val foo1 = new TopicPartition(foo, 1)
-
-    val admin = createAdminClient()
-    admin.createTopics(java.util.List.of(new NewTopic(foo, 1, 
1.toShort))).all.get
-
-    val consumerConfig = new Properties
-    consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group-id")
-    consumerConfig.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, 
"my-instance-id")
-
-    val consumer1 = createConsumer(configOverrides = consumerConfig)
-    consumer1.subscribe(java.util.List.of(foo))
-    awaitAssignment(consumer1, Set(foo0))
-    consumer1.close()
-
-    val consumer2 = createConsumer(configOverrides = consumerConfig)
-    consumer2.subscribe(java.util.List.of(foo))
-    awaitAssignment(consumer2, Set(foo0))
-
-    admin.createPartitions(java.util.Map.of(foo, 
NewPartitions.increaseTo(2))).all.get
-
-    awaitAssignment(consumer2, Set(foo0, foo1))
-
-    consumer2.close()
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testEndOffsets(groupProtocol: String): Unit = {
-    val producer = createProducer()
-    val startingTimestamp = System.currentTimeMillis()
-    val numRecords = 10000
-    (0 until numRecords).map { i =>
-      val timestamp = startingTimestamp + i.toLong
-      val record = new ProducerRecord(tp.topic(), tp.partition(), timestamp, 
s"key $i".getBytes, s"value $i".getBytes)
-      producer.send(record)
-      record
-    }
-    producer.flush()
-
-    val consumer = createConsumer()
-    consumer.subscribe(java.util.List.of(topic))
-    awaitAssignment(consumer, Set(tp, tp2))
-
-    val endOffsets = consumer.endOffsets(java.util.Set.of(tp))
-    assertEquals(numRecords, endOffsets.get(tp))
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testSeekThrowsIllegalStateIfPartitionsNotAssigned(groupProtocol: 
String): Unit = {
-    val tp = new TopicPartition(topic, 0)
-    val consumer = createConsumer(configOverrides = consumerConfig)
-    val e: Exception = assertThrows(classOf[IllegalStateException], () => 
consumer.seekToEnd(util.List.of(tp)))
-    assertEquals("No current assignment for partition " + tp, e.getMessage)
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testFetchOffsetsForTime(groupProtocol: String): Unit = {
-    val numPartitions = 2
-    val producer = createProducer()
-    val timestampsToSearch = new util.HashMap[TopicPartition, java.lang.Long]()
-    var i = 0
-    for (part <- 0 until numPartitions) {
-      val tp = new TopicPartition(topic, part)
-      // key, val, and timestamp equal to the sequence number.
-      sendRecords(producer, numRecords = 100, tp, startingTimestamp = 0)
-      timestampsToSearch.put(tp, (i * 20).toLong)
-      i += 1
-    }
-
-    val consumer = createConsumer()
-    // Test negative target time
-    assertThrows(classOf[IllegalArgumentException],
-      () => consumer.offsetsForTimes(util.Map.of(new TopicPartition(topic, 0), 
-1)))
-    val timestampOffsets = consumer.offsetsForTimes(timestampsToSearch)
-
-    val timestampTp0 = timestampOffsets.get(new TopicPartition(topic, 0))
-    assertEquals(0, timestampTp0.offset)
-    assertEquals(0, timestampTp0.timestamp)
-    assertEquals(Optional.of(0), timestampTp0.leaderEpoch)
-
-    val timestampTp1 = timestampOffsets.get(new TopicPartition(topic, 1))
-    assertEquals(20, timestampTp1.offset)
-    assertEquals(20, timestampTp1.timestamp)
-    assertEquals(Optional.of(0), timestampTp1.leaderEpoch)
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  @Timeout(15)
-  def testPositionRespectsTimeout(groupProtocol: String): Unit = {
-    val topicPartition = new TopicPartition(topic, 15)
-    val consumer = createConsumer()
-    consumer.assign(java.util.List.of(topicPartition))
-
-    // When position() is called for a topic/partition that doesn't exist, the 
consumer will repeatedly update the
-    // local metadata. However, it should give up after the user-supplied 
timeout has past.
-    assertThrows(classOf[TimeoutException], () => 
consumer.position(topicPartition, Duration.ofSeconds(3)))
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  @Timeout(15)
-  def testPositionRespectsWakeup(groupProtocol: String): Unit = {
-    val topicPartition = new TopicPartition(topic, 15)
-    val consumer = createConsumer()
-    consumer.assign(java.util.List.of(topicPartition))
-
-    CompletableFuture.runAsync { () =>
-      TimeUnit.SECONDS.sleep(1)
-      consumer.wakeup()
-    }
-
-    assertThrows(classOf[WakeupException], () => 
consumer.position(topicPartition, Duration.ofSeconds(3)))
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  @Timeout(15)
-  def testPositionWithErrorConnectionRespectsWakeup(groupProtocol: String): 
Unit = {
-    val topicPartition = new TopicPartition(topic, 15)
-    val properties = new Properties()
-    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:12345") 
// make sure the connection fails
-    val consumer = createConsumer(configOverrides = properties)
-    consumer.assign(java.util.List.of(topicPartition))
-
-    CompletableFuture.runAsync { () =>
-      TimeUnit.SECONDS.sleep(1)
-      consumer.wakeup()
-    }
-
-    assertThrows(classOf[WakeupException], () => 
consumer.position(topicPartition, Duration.ofSeconds(100)))
-  }
-
   @Flaky("KAFKA-18031")
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
   @MethodSource(Array("getTestGroupProtocolParametersAll"))
@@ -871,22 +72,4 @@ class PlaintextConsumerTest extends BaseConsumerTest {
       waitTimeMs=leaveGroupTimeoutMs
     )
   }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
-  @MethodSource(Array("getTestGroupProtocolParametersAll"))
-  def testOffsetRelatedWhenTimeoutZero(groupProtocol: String): Unit = {
-    val consumer = createConsumer()
-    val result1 = consumer.beginningOffsets(util.List.of(tp), Duration.ZERO)
-    assertNotNull(result1)
-    assertEquals(0, result1.size())
-
-    val result2 = consumer.endOffsets(util.List.of(tp), Duration.ZERO)
-    assertNotNull(result2)
-    assertEquals(0, result2.size())
-
-    val result3 = consumer.offsetsForTimes(java.util.Map.of(tp, 0), 
Duration.ZERO)
-    assertNotNull(result3)
-    assertEquals(1, result3.size())
-    assertNull(result3.get(tp))
-  }
 }

Reply via email to