m1a2st commented on code in PR #20081:
URL: https://github.com/apache/kafka/pull/20081#discussion_r2180142403


##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java:
##########
@@ -0,0 +1,1607 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import kafka.server.KafkaBroker;
+
+import 
org.apache.kafka.clients.ClientsTestUtils.TestConsumerReassignmentListener;
+import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Flaky;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.server.quota.QuotaType;
+import org.apache.kafka.test.MockConsumerInterceptor;
+import org.apache.kafka.test.MockProducerInterceptor;
+import org.apache.kafka.test.TestUtils;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.clients.ClientsTestUtils.BaseConsumerTestcase.BROKER_COUNT;
+import static 
org.apache.kafka.clients.ClientsTestUtils.BaseConsumerTestcase.TOPIC;
+import static 
org.apache.kafka.clients.ClientsTestUtils.BaseConsumerTestcase.TP;
+import static 
org.apache.kafka.clients.ClientsTestUtils.BaseConsumerTestcase.testClusterResourceListener;
+import static 
org.apache.kafka.clients.ClientsTestUtils.BaseConsumerTestcase.testCoordinatorFailover;
+import static 
org.apache.kafka.clients.ClientsTestUtils.BaseConsumerTestcase.testSimpleConsumption;
+import static org.apache.kafka.clients.ClientsTestUtils.awaitAssignment;
+import static org.apache.kafka.clients.ClientsTestUtils.awaitRebalance;
+import static 
org.apache.kafka.clients.ClientsTestUtils.consumeAndVerifyRecords;
+import static 
org.apache.kafka.clients.ClientsTestUtils.consumeAndVerifyRecordsWithTimeTypeLogAppend;
+import static org.apache.kafka.clients.ClientsTestUtils.consumeRecords;
+import static 
org.apache.kafka.clients.ClientsTestUtils.sendAndAwaitAsyncCommit;
+import static org.apache.kafka.clients.ClientsTestUtils.sendRecords;
+import static 
org.apache.kafka.clients.CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG;
+import static 
org.apache.kafka.clients.CommonClientConfigs.METADATA_MAX_AGE_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.CLIENT_ID_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_INSTANCE_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ClusterTestDefaults(
+    types = {Type.KRAFT},
+    brokers = BROKER_COUNT,
+    serverProperties = {
+        @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = 
"1"),
+        @ClusterConfigProperty(key = GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 
value = "100"),
+        @ClusterConfigProperty(key = GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, 
value = "60000"),
+        @ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 
value = "10"),
+    }
+)
+public class PlaintextConsumerTest {
+
+    private final ClusterInstance cluster;
+    public static final double EPSILON = 0.1;
+
+    public PlaintextConsumerTest(ClusterInstance cluster) {
+        this.cluster = cluster;
+    }
+
+    @ClusterTest
+    public void testClassicConsumerSimpleConsumption() throws 
InterruptedException {
+        testSimpleConsumption(cluster, Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT))
+        );
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerSimpleConsumption() throws 
InterruptedException {
+        testSimpleConsumption(cluster, Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT))
+        );
+    }
+
+    @ClusterTest
+    public void testClassicConsumerClusterResourceListener() throws 
InterruptedException {
+        testClusterResourceListener(cluster, Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT))
+        );
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerClusterResourceListener() throws 
InterruptedException {
+        testClusterResourceListener(cluster, Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT))
+        );
+    }
+
+    @ClusterTest
+    public void testClassicConsumerCoordinatorFailover() throws 
InterruptedException {
+        Map<String, Object> config = Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT),
+            SESSION_TIMEOUT_MS_CONFIG, 5001,
+            HEARTBEAT_INTERVAL_MS_CONFIG, 1000,
+            // Use higher poll timeout to avoid consumer leaving the group due 
to timeout
+            MAX_POLL_INTERVAL_MS_CONFIG, 15000
+        );
+        testCoordinatorFailover(cluster, config);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumeCoordinatorFailover() throws 
InterruptedException {
+        Map<String, Object> config = Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT),
+            // Use higher poll timeout to avoid consumer leaving the group due 
to timeout
+            MAX_POLL_INTERVAL_MS_CONFIG, 15000
+        );
+        testCoordinatorFailover(cluster, config);
+    }
+
+    @ClusterTest
+    public void testClassicConsumerHeaders() throws Exception {
+        testHeaders(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT))
+        );
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerHeaders() throws Exception {
+        testHeaders(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT))
+        );
+    }
+
+    private void testHeaders(Map<String, Object> consumerConfig) throws 
Exception {
+        var numRecords = 1;
+
+        try (Producer<byte[], byte[]> producer = cluster.producer();
+             Consumer<byte[], byte[]> consumer = 
cluster.consumer(consumerConfig)
+        ) {
+            var record = new ProducerRecord<>(TP.topic(), TP.partition(), 
null, "key".getBytes(), "value".getBytes());
+            record.headers().add("headerKey", "headerValue".getBytes());
+            producer.send(record);
+
+            assertEquals(0, consumer.assignment().size());
+            consumer.assign(List.of(TP));
+            assertEquals(1, consumer.assignment().size());
+
+            consumer.seek(TP, 0);
+            var records = consumeRecords(consumer, numRecords);
+            assertEquals(numRecords, records.size());
+            var header = records.get(0).headers().lastHeader("headerKey");
+            assertEquals("headerValue", header == null ? null : new 
String(header.value()));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerHeadersSerializerDeserializer() throws 
Exception {
+        testHeadersSerializeDeserialize(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT))
+        );
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerHeadersSerializerDeserializer() throws 
Exception {
+        testHeadersSerializeDeserialize(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT))
+        );
+    }
+
+    private void testHeadersSerializeDeserialize(Map<String, Object> config) 
throws InterruptedException {
+        var numRecords = 1;
+        Map<String, Object> consumerConfig = new HashMap<>(config);
+        consumerConfig.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
DeserializerImpl.class);
+        Map<String, Object> producerConfig = Map.of(
+            VALUE_SERIALIZER_CLASS_CONFIG, SerializerImpl.class.getName()
+        );
+
+        try (Producer<byte[], byte[]> producer = 
cluster.producer(producerConfig);
+             Consumer<byte[], byte[]> consumer = 
cluster.consumer(consumerConfig)
+        ) {
+            producer.send(new ProducerRecord<>(
+                TP.topic(), 
+                TP.partition(), 
+                null, 
+                "key".getBytes(), 
+                "value".getBytes())
+            );
+            
+            assertEquals(0, consumer.assignment().size());
+            consumer.assign(List.of(TP));
+            assertEquals(1, consumer.assignment().size());
+
+            consumer.seek(TP, 0);
+            assertEquals(numRecords, consumeRecords(consumer, 
numRecords).size());
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerAutoOffsetReset() throws Exception {
+        testAutoOffsetReset(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT))
+        );
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerAutoOffsetReset() throws Exception {
+        testAutoOffsetReset(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT))
+        );
+    }
+
+    private void testAutoOffsetReset(Map<String, Object> consumerConfig) 
throws Exception {
+        try (Producer<byte[], byte[]> producer = cluster.producer();
+             Consumer<byte[], byte[]> consumer = 
cluster.consumer(consumerConfig)
+        ) {
+            var startingTimestamp = System.currentTimeMillis();
+            sendRecords(producer, TP, 1, startingTimestamp);
+            consumer.assign(List.of(TP));
+            consumeAndVerifyRecords(consumer, TP, 1, 0, 0, startingTimestamp);
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerGroupConsumption() throws Exception {
+        testGroupConsumption(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT))
+        );
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerGroupConsumption() throws Exception {
+        testGroupConsumption(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT))
+        );
+    }
+
+    private void testGroupConsumption(Map<String, Object> consumerConfig) 
throws Exception {
+        try (Producer<byte[], byte[]> producer = cluster.producer();
+             Consumer<byte[], byte[]> consumer = 
cluster.consumer(consumerConfig)
+        ) {
+            var startingTimestamp = System.currentTimeMillis();
+            sendRecords(producer, TP, 10, startingTimestamp);
+            consumer.subscribe(List.of(TOPIC));
+            consumeAndVerifyRecords(consumer, TP, 1, 0, 0, startingTimestamp);
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerPartitionsFor() throws Exception {
+        testPartitionsFor(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT))
+        );
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerPartitionsFor() throws Exception {
+        testPartitionsFor(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT))
+        );
+    }
+
+    private void testPartitionsFor(Map<String, Object> consumerConfig) throws 
Exception {
+        var numParts = 2;
+        cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT);
+        cluster.createTopic("part-test", numParts, (short) 1);
+
+        try (var consumer = cluster.consumer(consumerConfig)) {
+            var partitions = consumer.partitionsFor(TOPIC);
+            assertNotNull(partitions);
+            assertEquals(2, partitions.size());
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerPartitionsForAutoCreate() throws Exception {
+        testPartitionsForAutoCreate(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT))
+        );
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerPartitionsForAutoCreate() throws Exception {
+        testPartitionsForAutoCreate(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT))
+        );
+    }
+
+    private void testPartitionsForAutoCreate(Map<String, Object> 
consumerConfig) throws Exception {
+        try (var consumer = cluster.consumer(consumerConfig)) {
+            // First call would create the topic
+            consumer.partitionsFor("non-exist-topic");
+            TestUtils.waitForCondition(
+                () -> !consumer.partitionsFor("non-exist-topic").isEmpty(), 
+                "Timed out while awaiting non empty partitions."
+            );
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerPartitionsForInvalidTopic() {
+        testPartitionsForInvalidTopic(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT))
+        );
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerPartitionsForInvalidTopic() {
+        testPartitionsForInvalidTopic(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT))
+        );
+    }
+
+    private void testPartitionsForInvalidTopic(Map<String, Object> 
consumerConfig) {
+        try (var consumer = cluster.consumer(consumerConfig)) {
+            assertThrows(InvalidTopicException.class, () -> 
consumer.partitionsFor(";3# ads,{234"));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerSeek() throws Exception {
+        testSeek(
+            Map.of(GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT))
+        );
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerSeek() throws Exception {
+        testSeek(
+            Map.of(GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT))
+        );
+    }
+
+    private void testSeek(Map<String, Object> consumerConfig) throws Exception 
{
+        var totalRecords = 50;
+        var mid = totalRecords / 2;
+        cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT);
+
+        try (Producer<byte[], byte[]> producer = cluster.producer();
+             Consumer<byte[], byte[]> consumer = 
cluster.consumer(consumerConfig)
+        ) {
+            var startingTimestamp = 0;
+            sendRecords(producer, TP, totalRecords, startingTimestamp);
+
+            consumer.assign(List.of(TP));
+            consumer.seekToEnd(List.of(TP));
+            assertEquals(totalRecords, consumer.position(TP));
+            assertTrue(consumer.poll(Duration.ofMillis(50)).isEmpty());
+
+            consumer.seekToBeginning(List.of(TP));
+            assertEquals(0, consumer.position(TP));
+            consumeAndVerifyRecords(consumer, TP, 1, 0, 0, startingTimestamp);
+
+            consumer.seek(TP, mid);
+            assertEquals(mid, consumer.position(TP));
+
+            consumeAndVerifyRecords(consumer, TP, 1, mid, mid, mid);
+
+            // Test seek compressed message
+            var tp2 = new TopicPartition(TOPIC, 1);
+            cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT);
+            sendCompressedMessages(totalRecords, tp2);
+            consumer.assign(List.of(tp2));
+
+            consumer.seekToEnd(List.of(tp2));
+            assertEquals(totalRecords, consumer.position(tp2));
+            assertTrue(consumer.poll(Duration.ofMillis(50)).isEmpty());
+
+            consumer.seekToBeginning(List.of(tp2));
+            assertEquals(0L, consumer.position(tp2));
+            consumeAndVerifyRecords(consumer, tp2, 1, 0);
+
+            consumer.seek(tp2, mid);
+            assertEquals(mid, consumer.position(tp2));
+            consumeAndVerifyRecords(consumer, tp2, 1, mid, mid, mid);
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerPartitionPauseAndResume() throws Exception {
+        testPartitionPauseAndResume(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT))
+        );
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerPartitionPauseAndResume() throws Exception {
+        testPartitionPauseAndResume(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT))
+        );
+    }
+
+    private void testPartitionPauseAndResume(Map<String, Object> 
consumerConfig) throws Exception {
+        var partitions = List.of(TP);
+        var numRecords = 5;
+
+        try (Producer<byte[], byte[]> producer = cluster.producer();
+             Consumer<byte[], byte[]> consumer = 
cluster.consumer(consumerConfig)
+        ) {
+            var startingTimestamp = System.currentTimeMillis();
+            sendRecords(producer, TP, numRecords, startingTimestamp);
+
+            consumer.assign(partitions);
+            consumeAndVerifyRecords(consumer, TP, numRecords, 0, 0, 
startingTimestamp);
+            consumer.pause(partitions);
+            startingTimestamp = System.currentTimeMillis();
+            sendRecords(producer, TP, numRecords, startingTimestamp);
+            assertTrue(consumer.poll(Duration.ofMillis(100)).isEmpty());
+            consumer.resume(partitions);
+            consumeAndVerifyRecords(consumer, TP, numRecords, 5, 0, 
startingTimestamp);
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerInterceptors() throws Exception {
+        testInterceptors(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT))
+        );
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerInterceptors() throws Exception {
+        testInterceptors(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT))
+        );
+    }
+
+    private void testInterceptors(Map<String, Object> consumerConfig) throws 
Exception {
+        var appendStr = "mock";
+        MockConsumerInterceptor.resetCounters();
+        MockProducerInterceptor.resetCounters();
+
+        // create producer with interceptor
+        Map<String, Object> producerConfig = Map.of(
+            ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, 
MockProducerInterceptor.class.getName(),
+            KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(),
+            VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(),
+            "mock.interceptor.append", appendStr
+        );
+        // create consumer with interceptor
+        Map<String, Object> consumerConfigOverride = new 
HashMap<>(consumerConfig);
+        consumerConfigOverride.put(INTERCEPTOR_CLASSES_CONFIG, 
MockConsumerInterceptor.class.getName());
+        consumerConfigOverride.put(KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
+        consumerConfigOverride.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
+        
+        try (Producer<String, String> producer = 
cluster.producer(producerConfig);
+             Consumer<String, String> consumer = 
cluster.consumer(consumerConfigOverride)
+        ) {
+            // produce records
+            var numRecords = 10;
+            List<Future<RecordMetadata>> futures = new ArrayList<>();
+            for (var i = 0; i < numRecords; i++) {
+                Future<RecordMetadata> future = producer.send(
+                    new ProducerRecord<>(TP.topic(), TP.partition(), "key " + 
i, "value " + i)
+                );
+                futures.add(future);
+            }
+
+            // Wait for all sends to complete
+            futures.forEach(future -> assertDoesNotThrow(() -> future.get()));
+
+            assertEquals(numRecords, 
MockProducerInterceptor.ONSEND_COUNT.intValue());
+            assertEquals(numRecords, 
MockProducerInterceptor.ON_SUCCESS_COUNT.intValue());
+
+            // send invalid record
+            assertThrows(
+                Throwable.class,
+                () -> producer.send(null), 
+                "Should not allow sending a null record"
+            );
+            assertEquals(
+                1, 
+                MockProducerInterceptor.ON_ERROR_COUNT.intValue(), 
+                "Interceptor should be notified about exception"
+            );
+            assertEquals(
+                0, 
+                
MockProducerInterceptor.ON_ERROR_WITH_METADATA_COUNT.intValue(),
+                "Interceptor should not receive metadata with an exception 
when record is null"
+            );
+            
+            consumer.assign(List.of(TP));
+            consumer.seek(TP, 0);
+
+            // consume and verify that values are modified by interceptors
+            var records = consumeRecords(consumer, numRecords);
+            for (var i = 0; i < numRecords; i++) {
+                ConsumerRecord<String, String> record = records.get(i);
+                assertEquals("key " + i, record.key());
+                assertEquals(("value " + i + 
appendStr).toUpperCase(Locale.ROOT), record.value());
+            }
+
+            // commit sync and verify onCommit is called
+            var commitCountBefore = 
MockConsumerInterceptor.ON_COMMIT_COUNT.intValue();
+            consumer.commitSync(Map.of(TP, new OffsetAndMetadata(2L)));
+            assertEquals(2, consumer.committed(Set.of(TP)).get(TP).offset());
+            assertEquals(commitCountBefore + 1, 
MockConsumerInterceptor.ON_COMMIT_COUNT.intValue());
+
+            // commit async and verify onCommit is called
+            var offsetsToCommit = Map.of(TP, new OffsetAndMetadata(5L));
+            sendAndAwaitAsyncCommit(consumer, Optional.of(offsetsToCommit));
+            assertEquals(5, consumer.committed(Set.of(TP)).get(TP).offset());
+            assertEquals(commitCountBefore + 2, 
MockConsumerInterceptor.ON_COMMIT_COUNT.intValue());
+        }
+        // cleanup
+        MockConsumerInterceptor.resetCounters();
+        MockProducerInterceptor.resetCounters();
+    }
+
+    @ClusterTest
+    public void testClassicConsumerInterceptorsWithWrongKeyValue() throws 
Exception {
+        testInterceptorsWithWrongKeyValue(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT))
+        );
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerInterceptorsWithWrongKeyValue() throws 
Exception {
+        testInterceptorsWithWrongKeyValue(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT))
+        );
+    }
+
+    private void testInterceptorsWithWrongKeyValue(Map<String, Object> 
consumerConfig) throws Exception {
+        var appendStr = "mock";
+        // create producer with interceptor that has different key and value 
types from the producer
+        Map<String, Object> producerConfig = Map.of(
+            ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, 
MockProducerInterceptor.class.getName(),
+            "mock.interceptor.append", appendStr
+        );
+        // create consumer with interceptor that has different key and value 
types from the consumer
+        Map<String, Object> consumerConfigOverride = new 
HashMap<>(consumerConfig);
+        consumerConfigOverride.put(INTERCEPTOR_CLASSES_CONFIG, 
MockConsumerInterceptor.class.getName());
+
+        try (Producer<byte[], byte[]> producer = 
cluster.producer(producerConfig);
+             Consumer<byte[], byte[]> consumer = 
cluster.consumer(consumerConfigOverride)
+        ) {
+            // producing records should succeed
+            producer.send(new ProducerRecord<>(
+                TP.topic(),
+                TP.partition(),
+                "key".getBytes(),
+                "value will not be modified".getBytes()
+            ));
+
+            consumer.assign(List.of(TP));
+            consumer.seek(TP, 0);
+            // consume and verify that values are not modified by interceptors 
-- their exceptions are caught and logged, but not propagated
+            var records = consumeRecords(consumer, 1);
+            var record = records.get(0);
+            assertEquals("value will not be modified", new 
String(record.value()));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerConsumeMessagesWithCreateTime() throws 
Exception {
+        testConsumeMessagesWithCreateTime(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT))
+        );
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerConsumeMessagesWithCreateTime() throws 
Exception {
+        testConsumeMessagesWithCreateTime(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT))
+        );
+    }
+
+    private void testConsumeMessagesWithCreateTime(Map<String, Object> 
consumerConfig) throws Exception {
+        cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT);
+        var numRecords = 50;
+        var tp2 = new TopicPartition(TOPIC, 1);
+
+        try (Producer<byte[], byte[]> producer = cluster.producer();
+             Consumer<byte[], byte[]> consumer = 
cluster.consumer(consumerConfig)
+        ) {
+            // Test non-compressed messages
+            var startingTimestamp = System.currentTimeMillis();
+            sendRecords(producer, TP, numRecords, startingTimestamp);
+            consumer.assign(List.of(TP));
+            consumeAndVerifyRecords(consumer, TP, numRecords, 0, 0, 
startingTimestamp);
+
+            // Test compressed messages
+            sendCompressedMessages(numRecords, tp2);
+            consumer.assign(List.of(tp2));
+            consumeAndVerifyRecords(consumer, tp2, numRecords, 0, 0, 0);
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerConsumeMessagesWithLogAppendTime() throws 
Exception {
+        testConsumeMessagesWithLogAppendTime(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT))
+        );
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerConsumeMessagesWithLogAppendTime() throws 
Exception {
+        testConsumeMessagesWithLogAppendTime(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT))
+        );
+    }
+
+    private void testConsumeMessagesWithLogAppendTime(Map<String, Object> 
consumerConfig) throws Exception {
+        var topicName = "testConsumeMessagesWithLogAppendTime";
+        var startTime = System.currentTimeMillis();
+        var numRecords = 50;
+        cluster.createTopic(topicName, 2, (short) 2, 
Map.of(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime"));
+
+        try (Consumer<byte[], byte[]> consumer = 
cluster.consumer(consumerConfig)) {
+            // Test non-compressed messages
+            var tp1 = new TopicPartition(topicName, 0);
+            sendRecords(cluster, tp1, numRecords);
+            consumer.assign(List.of(tp1));
+            consumeAndVerifyRecordsWithTimeTypeLogAppend(consumer, tp1, 
numRecords, startTime);
+
+            // Test compressed messages
+            var tp2 = new TopicPartition(topicName, 1);
+            sendCompressedMessages(numRecords, tp2);
+            consumer.assign(List.of(tp2));
+            consumeAndVerifyRecordsWithTimeTypeLogAppend(consumer, tp2, 
numRecords, startTime);
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerListTopics() throws Exception {
+        testListTopics(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT))
+        );
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerListTopics() throws Exception {
+        testListTopics(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT))
+        );
+    }
+
+    private void testListTopics(Map<String, Object> consumerConfig) throws 
Exception {
+        var numParts = 2;
+        var topic1 = "part-test-topic-1";
+        var topic2 = "part-test-topic-2";
+        var topic3 = "part-test-topic-3";
+        cluster.createTopic(topic1, numParts, (short) 1);
+        cluster.createTopic(topic2, numParts, (short) 1);
+        cluster.createTopic(topic3, numParts, (short) 1);
+
+        sendRecords(cluster, new TopicPartition(topic1, 0), 1);
+
+        try (var consumer = cluster.consumer(consumerConfig)) {
+            // consumer some messages, and we can list the internal topic 
__consumer_offsets
+            consumer.subscribe(List.of(topic1));
+            consumer.poll(Duration.ofMillis(100));
+            var topics = consumer.listTopics();
+            assertNotNull(topics);
+            assertEquals(4, topics.size());
+            assertEquals(2, topics.get(topic1).size());
+            assertEquals(2, topics.get(topic2).size());
+            assertEquals(2, topics.get(topic3).size());
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerPauseStateNotPreservedByRebalance() throws 
Exception {
+        testPauseStateNotPreservedByRebalance(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT),
+            SESSION_TIMEOUT_MS_CONFIG, 100,
+            HEARTBEAT_INTERVAL_MS_CONFIG, 30
+        ));
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerPauseStateNotPreservedByRebalance() throws 
Exception {
+        testPauseStateNotPreservedByRebalance(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT))
+        );
+    }
+
+    private void testPauseStateNotPreservedByRebalance(Map<String, Object> 
consumerConfig) throws Exception {
+        try (Producer<byte[], byte[]> producer = cluster.producer();
+             Consumer<byte[], byte[]> consumer = 
cluster.consumer(consumerConfig)
+        ) {
+            var startingTimestamp = System.currentTimeMillis();
+            sendRecords(producer, TP, 5, startingTimestamp);
+            consumer.subscribe(List.of(TOPIC));
+            consumeAndVerifyRecords(consumer, TP, 5, 0, 0, startingTimestamp);
+            consumer.pause(List.of(TP));
+
+            // subscribe to a new topic to trigger a rebalance
+            consumer.subscribe(List.of("topic2"));
+
+            // after rebalance, our position should be reset and our pause 
state lost,
+            // so we should be able to consume from the beginning
+            consumeAndVerifyRecords(consumer, TP, 0, 5, 0, startingTimestamp);
+        }
+    }
+
+    @ClusterTest
+    public void 
testClassicConsumerPerPartitionLeadMetricsCleanUpWithSubscribe() throws 
Exception {
+        testPerPartitionLeadMetricsCleanUpWithSubscribe(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT),
+            GROUP_ID_CONFIG, "testPerPartitionLeadMetricsCleanUpWithSubscribe",
+            CLIENT_ID_CONFIG, "testPerPartitionLeadMetricsCleanUpWithSubscribe"
+        ));
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerPerPartitionLeadMetricsCleanUpWithSubscribe() 
throws Exception {
+        testPerPartitionLeadMetricsCleanUpWithSubscribe(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT),
+            GROUP_ID_CONFIG, "testPerPartitionLeadMetricsCleanUpWithSubscribe",
+            CLIENT_ID_CONFIG, "testPerPartitionLeadMetricsCleanUpWithSubscribe"
+        ));
+    }
+
+    private void testPerPartitionLeadMetricsCleanUpWithSubscribe(Map<String, 
Object> consumerConfig) throws Exception {
+        var numMessages = 1000;
+        var topic2 = "topic2";
+        var tp2 = new TopicPartition(TOPIC, 1);
+        cluster.createTopic(topic2, 2, (short) BROKER_COUNT);
+        
+        try (Consumer<byte[], byte[]> consumer = 
cluster.consumer(consumerConfig)) {
+            // send some messages.
+            sendRecords(cluster, TP, numMessages);
+
+            // Test subscribe
+            // Create a consumer and consumer some messages.
+            var listener = new TestConsumerReassignmentListener();
+            consumer.subscribe(List.of(TOPIC, topic2), listener);
+            var records = awaitNonEmptyRecords(consumer, TP);
+            assertEquals(1, listener.callsToAssigned, "should be assigned 
once");
+
+            // Verify the metric exist.
+            Map<String, String> tags1 = new HashMap<>();
+            tags1.put("client-id", 
"testPerPartitionLeadMetricsCleanUpWithSubscribe");
+            tags1.put("topic", TP.topic());
+            tags1.put("partition", String.valueOf(TP.partition()));
+
+            Map<String, String> tags2 = new HashMap<>();
+            tags2.put("client-id", 
"testPerPartitionLeadMetricsCleanUpWithSubscribe");
+            tags2.put("topic", tp2.topic());
+            tags2.put("partition", String.valueOf(tp2.partition()));
+
+            var fetchLead0 = consumer.metrics().get(new 
MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags1));
+            assertNotNull(fetchLead0);
+            assertEquals((double) records.count(), fetchLead0.metricValue(), 
"The lead should be " + records.count());
+
+            // Remove topic from subscription
+            consumer.subscribe(List.of(topic2), listener);
+            awaitRebalance(consumer, listener);
+
+            // Verify the metric has gone
+            assertNull(consumer.metrics().get(new MetricName("records-lead", 
"consumer-fetch-manager-metrics", "", tags1)));
+            assertNull(consumer.metrics().get(new MetricName("records-lead", 
"consumer-fetch-manager-metrics", "", tags2)));
+        }
+    }
+
+    @ClusterTest
+    public void 
testClassicConsumerPerPartitionLagMetricsCleanUpWithSubscribe() throws 
Exception {
+        testPerPartitionLagMetricsCleanUpWithSubscribe(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT),
+            GROUP_ID_CONFIG, "testPerPartitionLagMetricsCleanUpWithSubscribe",
+            CLIENT_ID_CONFIG, "testPerPartitionLagMetricsCleanUpWithSubscribe"
+        ));
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerPerPartitionLagMetricsCleanUpWithSubscribe() 
throws Exception {
+        testPerPartitionLagMetricsCleanUpWithSubscribe(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT),
+            GROUP_ID_CONFIG, "testPerPartitionLagMetricsCleanUpWithSubscribe",
+            CLIENT_ID_CONFIG, "testPerPartitionLagMetricsCleanUpWithSubscribe"
+        ));
+    }
+
+    private void testPerPartitionLagMetricsCleanUpWithSubscribe(Map<String, 
Object> consumerConfig) throws Exception {
+        int numMessages = 1000;
+        var topic2 = "topic2";
+        var tp2 = new TopicPartition(TOPIC, 1);
+        cluster.createTopic(topic2, 2, (short) BROKER_COUNT);
+
+        try (Consumer<byte[], byte[]> consumer = 
cluster.consumer(consumerConfig)) {
+            // send some messages.
+            sendRecords(cluster, TP, numMessages);
+            
+            // Test subscribe
+            // Create a consumer and consumer some messages.
+            var listener = new TestConsumerReassignmentListener();
+            consumer.subscribe(List.of(TOPIC, topic2), listener);
+            var records = awaitNonEmptyRecords(consumer, TP);
+            assertEquals(1, listener.callsToAssigned, "should be assigned 
once");
+
+            // Verify the metric exist.
+            Map<String, String> tags1 = new HashMap<>();
+            tags1.put("client-id", 
"testPerPartitionLagMetricsCleanUpWithSubscribe");
+            tags1.put("topic", TP.topic());
+            tags1.put("partition", String.valueOf(TP.partition()));
+
+            Map<String, String> tags2 = new HashMap<>();
+            tags2.put("client-id", 
"testPerPartitionLagMetricsCleanUpWithSubscribe");
+            tags2.put("topic", tp2.topic());
+            tags2.put("partition", String.valueOf(tp2.partition()));
+
+            var fetchLag0 = consumer.metrics().get(new 
MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags1));
+            assertNotNull(fetchLag0);
+            var expectedLag = numMessages - records.count();
+            assertEquals(expectedLag, (double) fetchLag0.metricValue(), 
EPSILON, "The lag should be " + expectedLag);
+
+            // Remove topic from subscription
+            consumer.subscribe(List.of(topic2), listener);
+            awaitRebalance(consumer, listener);
+
+            // Verify the metric has gone
+            assertNull(consumer.metrics().get(new MetricName("records-lag", 
"consumer-fetch-manager-metrics", "", tags1)));
+            assertNull(consumer.metrics().get(new MetricName("records-lag", 
"consumer-fetch-manager-metrics", "", tags2)));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerPerPartitionLeadMetricsCleanUpWithAssign() 
throws Exception {
+        testPerPartitionLeadMetricsCleanUpWithAssign(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT),
+            GROUP_ID_CONFIG, "testPerPartitionLeadMetricsCleanUpWithAssign",
+            CLIENT_ID_CONFIG, "testPerPartitionLeadMetricsCleanUpWithAssign"
+        ));
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerPerPartitionLeadMetricsCleanUpWithAssign() 
throws Exception {
+        testPerPartitionLeadMetricsCleanUpWithAssign(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT),
+            GROUP_ID_CONFIG, "testPerPartitionLeadMetricsCleanUpWithAssign",
+            CLIENT_ID_CONFIG, "testPerPartitionLeadMetricsCleanUpWithAssign"
+        ));
+    }
+
+    private void testPerPartitionLeadMetricsCleanUpWithAssign(Map<String, 
Object> consumerConfig) throws Exception {
+        var numMessages = 1000;
+        var tp2 = new TopicPartition(TOPIC, 1);
+        cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT);
+        
+        try (Producer<byte[], byte[]> producer = cluster.producer();
+             Consumer<byte[], byte[]> consumer = 
cluster.consumer(consumerConfig)
+        ) {
+            // Test assign send some messages.
+            sendRecords(producer, TP, numMessages, System.currentTimeMillis());
+            sendRecords(producer, tp2, numMessages, 
System.currentTimeMillis());
+            
+            consumer.assign(List.of(TP));
+            var records = awaitNonEmptyRecords(consumer, TP);
+
+            // Verify the metric exist.
+            Map<String, String> tags = new HashMap<>();
+            tags.put("client-id", 
"testPerPartitionLeadMetricsCleanUpWithAssign");
+            tags.put("topic", TP.topic());
+            tags.put("partition", String.valueOf(TP.partition()));
+
+            var fetchLead = consumer.metrics().get(new 
MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags));
+            assertNotNull(fetchLead);
+            assertEquals((double) records.count(), fetchLead.metricValue(), 
"The lead should be " + records.count());
+
+            consumer.assign(List.of(tp2));
+            awaitNonEmptyRecords(consumer, tp2);
+            assertNull(consumer.metrics().get(new MetricName("records-lead", 
"consumer-fetch-manager-metrics", "", tags)));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerPerPartitionLagMetricsCleanUpWithAssign() 
throws Exception {
+        testPerPartitionLagMetricsCleanUpWithAssign(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT),
+            GROUP_ID_CONFIG, "testPerPartitionLagMetricsCleanUpWithAssign",
+            CLIENT_ID_CONFIG, "testPerPartitionLagMetricsCleanUpWithAssign"
+        ));
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerPerPartitionLagMetricsCleanUpWithAssign() 
throws Exception {
+        testPerPartitionLagMetricsCleanUpWithAssign(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT),
+            GROUP_ID_CONFIG, "testPerPartitionLagMetricsCleanUpWithAssign",
+            CLIENT_ID_CONFIG, "testPerPartitionLagMetricsCleanUpWithAssign"
+        ));
+    }
+
+    private void testPerPartitionLagMetricsCleanUpWithAssign(Map<String, 
Object> consumerConfig) throws Exception {
+        var numMessages = 1000;
+        var tp2 = new TopicPartition(TOPIC, 1);
+        cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT);
+
+        try (Producer<byte[], byte[]> producer = cluster.producer();
+             Consumer<byte[], byte[]> consumer = 
cluster.consumer(consumerConfig)
+        ) {
+            // Test assign send some messages.
+            sendRecords(producer, TP, numMessages, System.currentTimeMillis());
+            sendRecords(producer, tp2, numMessages, 
System.currentTimeMillis());
+
+            consumer.assign(List.of(TP));
+            var records = awaitNonEmptyRecords(consumer, TP);
+
+            // Verify the metric exist.
+            Map<String, String> tags = new HashMap<>();
+            tags.put("client-id", 
"testPerPartitionLagMetricsCleanUpWithAssign");
+            tags.put("topic", TP.topic());
+            tags.put("partition", String.valueOf(TP.partition()));
+
+            var fetchLag = consumer.metrics().get(new 
MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags));
+            assertNotNull(fetchLag);
+
+            var expectedLag = numMessages - records.count();
+            assertEquals(expectedLag, (double) fetchLag.metricValue(), 
EPSILON, "The lag should be " + expectedLag);
+            consumer.assign(List.of(tp2));
+            awaitNonEmptyRecords(consumer, tp2);
+            assertNull(consumer.metrics().get(new MetricName(TP + 
".records-lag", "consumer-fetch-manager-metrics", "", tags)));
+            assertNull(consumer.metrics().get(new MetricName("records-lag", 
"consumer-fetch-manager-metrics", "", tags)));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerPerPartitionLagMetricsWhenReadCommitted() 
throws Exception {
+        testPerPartitionLagMetricsWhenReadCommitted(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT),
+            GROUP_ID_CONFIG, "testPerPartitionLagMetricsWhenReadCommitted",
+            CLIENT_ID_CONFIG, "testPerPartitionLagMetricsWhenReadCommitted",
+            ISOLATION_LEVEL_CONFIG, "read_committed"
+        ));
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerPerPartitionLagMetricsWhenReadCommitted() 
throws Exception {
+        testPerPartitionLagMetricsWhenReadCommitted(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT),
+            GROUP_ID_CONFIG, "testPerPartitionLagMetricsWhenReadCommitted",
+            CLIENT_ID_CONFIG, "testPerPartitionLagMetricsWhenReadCommitted",
+            ISOLATION_LEVEL_CONFIG, "read_committed"
+        ));
+    }
+
+    private void testPerPartitionLagMetricsWhenReadCommitted(Map<String, 
Object> consumerConfig) throws Exception {
+        var numMessages = 1000;
+        var tp2 = new TopicPartition(TOPIC, 1);
+        cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT);
+
+        try (Producer<byte[], byte[]> producer = cluster.producer();
+             Consumer<byte[], byte[]> consumer = 
cluster.consumer(consumerConfig)
+        ) {
+            // Test assign send some messages.
+            sendRecords(producer, TP, numMessages, System.currentTimeMillis());
+            sendRecords(producer, tp2, numMessages, 
System.currentTimeMillis());
+
+            consumer.assign(List.of(TP));
+            awaitNonEmptyRecords(consumer, TP);
+
+            // Verify the metric exist.
+            Map<String, String> tags = new HashMap<>();
+            tags.put("client-id", 
"testPerPartitionLagMetricsWhenReadCommitted");
+            tags.put("topic", TP.topic());
+            tags.put("partition", String.valueOf(TP.partition()));
+
+            var fetchLag = consumer.metrics().get(new 
MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags));
+            assertNotNull(fetchLag);
+        }
+    }
+
+    @ClusterTest
+    public void 
testClassicConsumerQuotaMetricsNotCreatedIfNoQuotasConfigured() throws 
Exception {
+        var consumerClientId = 
"testQuotaMetricsNotCreatedIfNoQuotasConfigured";
+        testQuotaMetricsNotCreatedIfNoQuotasConfigured(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT),
+            GROUP_ID_CONFIG, consumerClientId,
+            CLIENT_ID_CONFIG, consumerClientId,
+            ISOLATION_LEVEL_CONFIG, "read_committed"
+        ), consumerClientId);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerQuotaMetricsNotCreatedIfNoQuotasConfigured() 
throws Exception {
+        var consumerClientId = 
"testQuotaMetricsNotCreatedIfNoQuotasConfigured";
+        testQuotaMetricsNotCreatedIfNoQuotasConfigured(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT),
+            GROUP_ID_CONFIG, consumerClientId,
+            CLIENT_ID_CONFIG, consumerClientId,
+            ISOLATION_LEVEL_CONFIG, "read_committed"
+        ), consumerClientId);
+    }
+
+    private void testQuotaMetricsNotCreatedIfNoQuotasConfigured(Map<String, 
Object> consumerConfig, String consumerClientId) throws Exception {
+        var producerClientId = UUID.randomUUID().toString();
+        var numRecords = 1000;
+        cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT);
+
+        try (Producer<byte[], byte[]> producer = 
cluster.producer(Map.of(ProducerConfig.CLIENT_ID_CONFIG, producerClientId));
+             Consumer<byte[], byte[]> consumer = 
cluster.consumer(consumerConfig)
+        ) {
+            var startingTimestamp = System.currentTimeMillis();
+            sendRecords(producer, TP, numRecords, startingTimestamp);
+
+            consumer.assign(List.of(TP));
+            consumer.seek(TP, 0);
+            consumeAndVerifyRecords(consumer, TP, numRecords, 0, 0, 
startingTimestamp);
+            
+            var brokers = cluster.brokers().values();
+            brokers.forEach(broker -> assertNoMetric(broker, "byte-rate", 
QuotaType.PRODUCE, producerClientId));
+            brokers.forEach(broker -> assertNoMetric(broker, "throttle-time", 
QuotaType.PRODUCE, producerClientId));
+            brokers.forEach(broker -> assertNoMetric(broker, "byte-rate", 
QuotaType.FETCH, consumerClientId));
+            brokers.forEach(broker -> assertNoMetric(broker, "throttle-time", 
QuotaType.FETCH, consumerClientId));
+            brokers.forEach(broker -> assertNoMetric(broker, "request-time", 
QuotaType.REQUEST, producerClientId));
+            brokers.forEach(broker -> assertNoMetric(broker, "throttle-time", 
QuotaType.REQUEST, producerClientId));
+            brokers.forEach(broker -> assertNoMetric(broker, "request-time", 
QuotaType.REQUEST, consumerClientId));
+            brokers.forEach(broker -> assertNoMetric(broker, "throttle-time", 
QuotaType.REQUEST, consumerClientId));

Review Comment:
   I think this approach is clear enough. However, I’m not sure if using a test 
case record and adding all test cases into a list is the best approach.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to