m1a2st commented on code in PR #20081: URL: https://github.com/apache/kafka/pull/20081#discussion_r2180142403
########## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java: ########## @@ -0,0 +1,1607 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import kafka.server.KafkaBroker; + +import org.apache.kafka.clients.ClientsTestUtils.TestConsumerReassignmentListener; +import org.apache.kafka.clients.admin.NewPartitions; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.errors.InvalidGroupIdException; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; +import org.apache.kafka.common.test.api.Flaky; +import org.apache.kafka.common.test.api.Type; +import org.apache.kafka.server.quota.QuotaType; +import org.apache.kafka.test.MockConsumerInterceptor; +import org.apache.kafka.test.MockProducerInterceptor; +import org.apache.kafka.test.TestUtils; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.IntStream; + +import static org.apache.kafka.clients.ClientsTestUtils.BaseConsumerTestcase.BROKER_COUNT; +import static org.apache.kafka.clients.ClientsTestUtils.BaseConsumerTestcase.TOPIC; +import static org.apache.kafka.clients.ClientsTestUtils.BaseConsumerTestcase.TP; +import static org.apache.kafka.clients.ClientsTestUtils.BaseConsumerTestcase.testClusterResourceListener; +import static org.apache.kafka.clients.ClientsTestUtils.BaseConsumerTestcase.testCoordinatorFailover; +import static org.apache.kafka.clients.ClientsTestUtils.BaseConsumerTestcase.testSimpleConsumption; +import static org.apache.kafka.clients.ClientsTestUtils.awaitAssignment; +import static org.apache.kafka.clients.ClientsTestUtils.awaitRebalance; +import static org.apache.kafka.clients.ClientsTestUtils.consumeAndVerifyRecords; +import static org.apache.kafka.clients.ClientsTestUtils.consumeAndVerifyRecordsWithTimeTypeLogAppend; +import static org.apache.kafka.clients.ClientsTestUtils.consumeRecords; +import static org.apache.kafka.clients.ClientsTestUtils.sendAndAwaitAsyncCommit; +import static org.apache.kafka.clients.ClientsTestUtils.sendRecords; +import static org.apache.kafka.clients.CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG; +import static org.apache.kafka.clients.CommonClientConfigs.METADATA_MAX_AGE_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.CLIENT_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_INSTANCE_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@ClusterTestDefaults( + types = {Type.KRAFT}, + brokers = BROKER_COUNT, + serverProperties = { + @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), + @ClusterConfigProperty(key = GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, value = "100"), + @ClusterConfigProperty(key = GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, value = "60000"), + @ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "10"), + } +) +public class PlaintextConsumerTest { + + private final ClusterInstance cluster; + public static final double EPSILON = 0.1; + + public PlaintextConsumerTest(ClusterInstance cluster) { + this.cluster = cluster; + } + + @ClusterTest + public void testClassicConsumerSimpleConsumption() throws InterruptedException { + testSimpleConsumption(cluster, Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)) + ); + } + + @ClusterTest + public void testAsyncConsumerSimpleConsumption() throws InterruptedException { + testSimpleConsumption(cluster, Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)) + ); + } + + @ClusterTest + public void testClassicConsumerClusterResourceListener() throws InterruptedException { + testClusterResourceListener(cluster, Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)) + ); + } + + @ClusterTest + public void testAsyncConsumerClusterResourceListener() throws InterruptedException { + testClusterResourceListener(cluster, Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)) + ); + } + + @ClusterTest + public void testClassicConsumerCoordinatorFailover() throws InterruptedException { + Map<String, Object> config = Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + SESSION_TIMEOUT_MS_CONFIG, 5001, + HEARTBEAT_INTERVAL_MS_CONFIG, 1000, + // Use higher poll timeout to avoid consumer leaving the group due to timeout + MAX_POLL_INTERVAL_MS_CONFIG, 15000 + ); + testCoordinatorFailover(cluster, config); + } + + @ClusterTest + public void testAsyncConsumeCoordinatorFailover() throws InterruptedException { + Map<String, Object> config = Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT), + // Use higher poll timeout to avoid consumer leaving the group due to timeout + MAX_POLL_INTERVAL_MS_CONFIG, 15000 + ); + testCoordinatorFailover(cluster, config); + } + + @ClusterTest + public void testClassicConsumerHeaders() throws Exception { + testHeaders(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)) + ); + } + + @ClusterTest + public void testAsyncConsumerHeaders() throws Exception { + testHeaders(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)) + ); + } + + private void testHeaders(Map<String, Object> consumerConfig) throws Exception { + var numRecords = 1; + + try (Producer<byte[], byte[]> producer = cluster.producer(); + Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig) + ) { + var record = new ProducerRecord<>(TP.topic(), TP.partition(), null, "key".getBytes(), "value".getBytes()); + record.headers().add("headerKey", "headerValue".getBytes()); + producer.send(record); + + assertEquals(0, consumer.assignment().size()); + consumer.assign(List.of(TP)); + assertEquals(1, consumer.assignment().size()); + + consumer.seek(TP, 0); + var records = consumeRecords(consumer, numRecords); + assertEquals(numRecords, records.size()); + var header = records.get(0).headers().lastHeader("headerKey"); + assertEquals("headerValue", header == null ? null : new String(header.value())); + } + } + + @ClusterTest + public void testClassicConsumerHeadersSerializerDeserializer() throws Exception { + testHeadersSerializeDeserialize(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)) + ); + } + + @ClusterTest + public void testAsyncConsumerHeadersSerializerDeserializer() throws Exception { + testHeadersSerializeDeserialize(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)) + ); + } + + private void testHeadersSerializeDeserialize(Map<String, Object> config) throws InterruptedException { + var numRecords = 1; + Map<String, Object> consumerConfig = new HashMap<>(config); + consumerConfig.put(VALUE_DESERIALIZER_CLASS_CONFIG, DeserializerImpl.class); + Map<String, Object> producerConfig = Map.of( + VALUE_SERIALIZER_CLASS_CONFIG, SerializerImpl.class.getName() + ); + + try (Producer<byte[], byte[]> producer = cluster.producer(producerConfig); + Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig) + ) { + producer.send(new ProducerRecord<>( + TP.topic(), + TP.partition(), + null, + "key".getBytes(), + "value".getBytes()) + ); + + assertEquals(0, consumer.assignment().size()); + consumer.assign(List.of(TP)); + assertEquals(1, consumer.assignment().size()); + + consumer.seek(TP, 0); + assertEquals(numRecords, consumeRecords(consumer, numRecords).size()); + } + } + + @ClusterTest + public void testClassicConsumerAutoOffsetReset() throws Exception { + testAutoOffsetReset(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)) + ); + } + + @ClusterTest + public void testAsyncConsumerAutoOffsetReset() throws Exception { + testAutoOffsetReset(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)) + ); + } + + private void testAutoOffsetReset(Map<String, Object> consumerConfig) throws Exception { + try (Producer<byte[], byte[]> producer = cluster.producer(); + Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig) + ) { + var startingTimestamp = System.currentTimeMillis(); + sendRecords(producer, TP, 1, startingTimestamp); + consumer.assign(List.of(TP)); + consumeAndVerifyRecords(consumer, TP, 1, 0, 0, startingTimestamp); + } + } + + @ClusterTest + public void testClassicConsumerGroupConsumption() throws Exception { + testGroupConsumption(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)) + ); + } + + @ClusterTest + public void testAsyncConsumerGroupConsumption() throws Exception { + testGroupConsumption(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)) + ); + } + + private void testGroupConsumption(Map<String, Object> consumerConfig) throws Exception { + try (Producer<byte[], byte[]> producer = cluster.producer(); + Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig) + ) { + var startingTimestamp = System.currentTimeMillis(); + sendRecords(producer, TP, 10, startingTimestamp); + consumer.subscribe(List.of(TOPIC)); + consumeAndVerifyRecords(consumer, TP, 1, 0, 0, startingTimestamp); + } + } + + @ClusterTest + public void testClassicConsumerPartitionsFor() throws Exception { + testPartitionsFor(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)) + ); + } + + @ClusterTest + public void testAsyncConsumerPartitionsFor() throws Exception { + testPartitionsFor(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)) + ); + } + + private void testPartitionsFor(Map<String, Object> consumerConfig) throws Exception { + var numParts = 2; + cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT); + cluster.createTopic("part-test", numParts, (short) 1); + + try (var consumer = cluster.consumer(consumerConfig)) { + var partitions = consumer.partitionsFor(TOPIC); + assertNotNull(partitions); + assertEquals(2, partitions.size()); + } + } + + @ClusterTest + public void testClassicConsumerPartitionsForAutoCreate() throws Exception { + testPartitionsForAutoCreate(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)) + ); + } + + @ClusterTest + public void testAsyncConsumerPartitionsForAutoCreate() throws Exception { + testPartitionsForAutoCreate(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)) + ); + } + + private void testPartitionsForAutoCreate(Map<String, Object> consumerConfig) throws Exception { + try (var consumer = cluster.consumer(consumerConfig)) { + // First call would create the topic + consumer.partitionsFor("non-exist-topic"); + TestUtils.waitForCondition( + () -> !consumer.partitionsFor("non-exist-topic").isEmpty(), + "Timed out while awaiting non empty partitions." + ); + } + } + + @ClusterTest + public void testClassicConsumerPartitionsForInvalidTopic() { + testPartitionsForInvalidTopic(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)) + ); + } + + @ClusterTest + public void testAsyncConsumerPartitionsForInvalidTopic() { + testPartitionsForInvalidTopic(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)) + ); + } + + private void testPartitionsForInvalidTopic(Map<String, Object> consumerConfig) { + try (var consumer = cluster.consumer(consumerConfig)) { + assertThrows(InvalidTopicException.class, () -> consumer.partitionsFor(";3# ads,{234")); + } + } + + @ClusterTest + public void testClassicConsumerSeek() throws Exception { + testSeek( + Map.of(GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)) + ); + } + + @ClusterTest + public void testAsyncConsumerSeek() throws Exception { + testSeek( + Map.of(GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)) + ); + } + + private void testSeek(Map<String, Object> consumerConfig) throws Exception { + var totalRecords = 50; + var mid = totalRecords / 2; + cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT); + + try (Producer<byte[], byte[]> producer = cluster.producer(); + Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig) + ) { + var startingTimestamp = 0; + sendRecords(producer, TP, totalRecords, startingTimestamp); + + consumer.assign(List.of(TP)); + consumer.seekToEnd(List.of(TP)); + assertEquals(totalRecords, consumer.position(TP)); + assertTrue(consumer.poll(Duration.ofMillis(50)).isEmpty()); + + consumer.seekToBeginning(List.of(TP)); + assertEquals(0, consumer.position(TP)); + consumeAndVerifyRecords(consumer, TP, 1, 0, 0, startingTimestamp); + + consumer.seek(TP, mid); + assertEquals(mid, consumer.position(TP)); + + consumeAndVerifyRecords(consumer, TP, 1, mid, mid, mid); + + // Test seek compressed message + var tp2 = new TopicPartition(TOPIC, 1); + cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT); + sendCompressedMessages(totalRecords, tp2); + consumer.assign(List.of(tp2)); + + consumer.seekToEnd(List.of(tp2)); + assertEquals(totalRecords, consumer.position(tp2)); + assertTrue(consumer.poll(Duration.ofMillis(50)).isEmpty()); + + consumer.seekToBeginning(List.of(tp2)); + assertEquals(0L, consumer.position(tp2)); + consumeAndVerifyRecords(consumer, tp2, 1, 0); + + consumer.seek(tp2, mid); + assertEquals(mid, consumer.position(tp2)); + consumeAndVerifyRecords(consumer, tp2, 1, mid, mid, mid); + } + } + + @ClusterTest + public void testClassicConsumerPartitionPauseAndResume() throws Exception { + testPartitionPauseAndResume(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)) + ); + } + + @ClusterTest + public void testAsyncConsumerPartitionPauseAndResume() throws Exception { + testPartitionPauseAndResume(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)) + ); + } + + private void testPartitionPauseAndResume(Map<String, Object> consumerConfig) throws Exception { + var partitions = List.of(TP); + var numRecords = 5; + + try (Producer<byte[], byte[]> producer = cluster.producer(); + Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig) + ) { + var startingTimestamp = System.currentTimeMillis(); + sendRecords(producer, TP, numRecords, startingTimestamp); + + consumer.assign(partitions); + consumeAndVerifyRecords(consumer, TP, numRecords, 0, 0, startingTimestamp); + consumer.pause(partitions); + startingTimestamp = System.currentTimeMillis(); + sendRecords(producer, TP, numRecords, startingTimestamp); + assertTrue(consumer.poll(Duration.ofMillis(100)).isEmpty()); + consumer.resume(partitions); + consumeAndVerifyRecords(consumer, TP, numRecords, 5, 0, startingTimestamp); + } + } + + @ClusterTest + public void testClassicConsumerInterceptors() throws Exception { + testInterceptors(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)) + ); + } + + @ClusterTest + public void testAsyncConsumerInterceptors() throws Exception { + testInterceptors(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)) + ); + } + + private void testInterceptors(Map<String, Object> consumerConfig) throws Exception { + var appendStr = "mock"; + MockConsumerInterceptor.resetCounters(); + MockProducerInterceptor.resetCounters(); + + // create producer with interceptor + Map<String, Object> producerConfig = Map.of( + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MockProducerInterceptor.class.getName(), + KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(), + VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(), + "mock.interceptor.append", appendStr + ); + // create consumer with interceptor + Map<String, Object> consumerConfigOverride = new HashMap<>(consumerConfig); + consumerConfigOverride.put(INTERCEPTOR_CLASSES_CONFIG, MockConsumerInterceptor.class.getName()); + consumerConfigOverride.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerConfigOverride.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + + try (Producer<String, String> producer = cluster.producer(producerConfig); + Consumer<String, String> consumer = cluster.consumer(consumerConfigOverride) + ) { + // produce records + var numRecords = 10; + List<Future<RecordMetadata>> futures = new ArrayList<>(); + for (var i = 0; i < numRecords; i++) { + Future<RecordMetadata> future = producer.send( + new ProducerRecord<>(TP.topic(), TP.partition(), "key " + i, "value " + i) + ); + futures.add(future); + } + + // Wait for all sends to complete + futures.forEach(future -> assertDoesNotThrow(() -> future.get())); + + assertEquals(numRecords, MockProducerInterceptor.ONSEND_COUNT.intValue()); + assertEquals(numRecords, MockProducerInterceptor.ON_SUCCESS_COUNT.intValue()); + + // send invalid record + assertThrows( + Throwable.class, + () -> producer.send(null), + "Should not allow sending a null record" + ); + assertEquals( + 1, + MockProducerInterceptor.ON_ERROR_COUNT.intValue(), + "Interceptor should be notified about exception" + ); + assertEquals( + 0, + MockProducerInterceptor.ON_ERROR_WITH_METADATA_COUNT.intValue(), + "Interceptor should not receive metadata with an exception when record is null" + ); + + consumer.assign(List.of(TP)); + consumer.seek(TP, 0); + + // consume and verify that values are modified by interceptors + var records = consumeRecords(consumer, numRecords); + for (var i = 0; i < numRecords; i++) { + ConsumerRecord<String, String> record = records.get(i); + assertEquals("key " + i, record.key()); + assertEquals(("value " + i + appendStr).toUpperCase(Locale.ROOT), record.value()); + } + + // commit sync and verify onCommit is called + var commitCountBefore = MockConsumerInterceptor.ON_COMMIT_COUNT.intValue(); + consumer.commitSync(Map.of(TP, new OffsetAndMetadata(2L))); + assertEquals(2, consumer.committed(Set.of(TP)).get(TP).offset()); + assertEquals(commitCountBefore + 1, MockConsumerInterceptor.ON_COMMIT_COUNT.intValue()); + + // commit async and verify onCommit is called + var offsetsToCommit = Map.of(TP, new OffsetAndMetadata(5L)); + sendAndAwaitAsyncCommit(consumer, Optional.of(offsetsToCommit)); + assertEquals(5, consumer.committed(Set.of(TP)).get(TP).offset()); + assertEquals(commitCountBefore + 2, MockConsumerInterceptor.ON_COMMIT_COUNT.intValue()); + } + // cleanup + MockConsumerInterceptor.resetCounters(); + MockProducerInterceptor.resetCounters(); + } + + @ClusterTest + public void testClassicConsumerInterceptorsWithWrongKeyValue() throws Exception { + testInterceptorsWithWrongKeyValue(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)) + ); + } + + @ClusterTest + public void testAsyncConsumerInterceptorsWithWrongKeyValue() throws Exception { + testInterceptorsWithWrongKeyValue(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)) + ); + } + + private void testInterceptorsWithWrongKeyValue(Map<String, Object> consumerConfig) throws Exception { + var appendStr = "mock"; + // create producer with interceptor that has different key and value types from the producer + Map<String, Object> producerConfig = Map.of( + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MockProducerInterceptor.class.getName(), + "mock.interceptor.append", appendStr + ); + // create consumer with interceptor that has different key and value types from the consumer + Map<String, Object> consumerConfigOverride = new HashMap<>(consumerConfig); + consumerConfigOverride.put(INTERCEPTOR_CLASSES_CONFIG, MockConsumerInterceptor.class.getName()); + + try (Producer<byte[], byte[]> producer = cluster.producer(producerConfig); + Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfigOverride) + ) { + // producing records should succeed + producer.send(new ProducerRecord<>( + TP.topic(), + TP.partition(), + "key".getBytes(), + "value will not be modified".getBytes() + )); + + consumer.assign(List.of(TP)); + consumer.seek(TP, 0); + // consume and verify that values are not modified by interceptors -- their exceptions are caught and logged, but not propagated + var records = consumeRecords(consumer, 1); + var record = records.get(0); + assertEquals("value will not be modified", new String(record.value())); + } + } + + @ClusterTest + public void testClassicConsumerConsumeMessagesWithCreateTime() throws Exception { + testConsumeMessagesWithCreateTime(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)) + ); + } + + @ClusterTest + public void testAsyncConsumerConsumeMessagesWithCreateTime() throws Exception { + testConsumeMessagesWithCreateTime(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)) + ); + } + + private void testConsumeMessagesWithCreateTime(Map<String, Object> consumerConfig) throws Exception { + cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT); + var numRecords = 50; + var tp2 = new TopicPartition(TOPIC, 1); + + try (Producer<byte[], byte[]> producer = cluster.producer(); + Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig) + ) { + // Test non-compressed messages + var startingTimestamp = System.currentTimeMillis(); + sendRecords(producer, TP, numRecords, startingTimestamp); + consumer.assign(List.of(TP)); + consumeAndVerifyRecords(consumer, TP, numRecords, 0, 0, startingTimestamp); + + // Test compressed messages + sendCompressedMessages(numRecords, tp2); + consumer.assign(List.of(tp2)); + consumeAndVerifyRecords(consumer, tp2, numRecords, 0, 0, 0); + } + } + + @ClusterTest + public void testClassicConsumerConsumeMessagesWithLogAppendTime() throws Exception { + testConsumeMessagesWithLogAppendTime(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)) + ); + } + + @ClusterTest + public void testAsyncConsumerConsumeMessagesWithLogAppendTime() throws Exception { + testConsumeMessagesWithLogAppendTime(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)) + ); + } + + private void testConsumeMessagesWithLogAppendTime(Map<String, Object> consumerConfig) throws Exception { + var topicName = "testConsumeMessagesWithLogAppendTime"; + var startTime = System.currentTimeMillis(); + var numRecords = 50; + cluster.createTopic(topicName, 2, (short) 2, Map.of(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime")); + + try (Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig)) { + // Test non-compressed messages + var tp1 = new TopicPartition(topicName, 0); + sendRecords(cluster, tp1, numRecords); + consumer.assign(List.of(tp1)); + consumeAndVerifyRecordsWithTimeTypeLogAppend(consumer, tp1, numRecords, startTime); + + // Test compressed messages + var tp2 = new TopicPartition(topicName, 1); + sendCompressedMessages(numRecords, tp2); + consumer.assign(List.of(tp2)); + consumeAndVerifyRecordsWithTimeTypeLogAppend(consumer, tp2, numRecords, startTime); + } + } + + @ClusterTest + public void testClassicConsumerListTopics() throws Exception { + testListTopics(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)) + ); + } + + @ClusterTest + public void testAsyncConsumerListTopics() throws Exception { + testListTopics(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)) + ); + } + + private void testListTopics(Map<String, Object> consumerConfig) throws Exception { + var numParts = 2; + var topic1 = "part-test-topic-1"; + var topic2 = "part-test-topic-2"; + var topic3 = "part-test-topic-3"; + cluster.createTopic(topic1, numParts, (short) 1); + cluster.createTopic(topic2, numParts, (short) 1); + cluster.createTopic(topic3, numParts, (short) 1); + + sendRecords(cluster, new TopicPartition(topic1, 0), 1); + + try (var consumer = cluster.consumer(consumerConfig)) { + // consumer some messages, and we can list the internal topic __consumer_offsets + consumer.subscribe(List.of(topic1)); + consumer.poll(Duration.ofMillis(100)); + var topics = consumer.listTopics(); + assertNotNull(topics); + assertEquals(4, topics.size()); + assertEquals(2, topics.get(topic1).size()); + assertEquals(2, topics.get(topic2).size()); + assertEquals(2, topics.get(topic3).size()); + } + } + + @ClusterTest + public void testClassicConsumerPauseStateNotPreservedByRebalance() throws Exception { + testPauseStateNotPreservedByRebalance(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + SESSION_TIMEOUT_MS_CONFIG, 100, + HEARTBEAT_INTERVAL_MS_CONFIG, 30 + )); + } + + @ClusterTest + public void testAsyncConsumerPauseStateNotPreservedByRebalance() throws Exception { + testPauseStateNotPreservedByRebalance(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)) + ); + } + + private void testPauseStateNotPreservedByRebalance(Map<String, Object> consumerConfig) throws Exception { + try (Producer<byte[], byte[]> producer = cluster.producer(); + Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig) + ) { + var startingTimestamp = System.currentTimeMillis(); + sendRecords(producer, TP, 5, startingTimestamp); + consumer.subscribe(List.of(TOPIC)); + consumeAndVerifyRecords(consumer, TP, 5, 0, 0, startingTimestamp); + consumer.pause(List.of(TP)); + + // subscribe to a new topic to trigger a rebalance + consumer.subscribe(List.of("topic2")); + + // after rebalance, our position should be reset and our pause state lost, + // so we should be able to consume from the beginning + consumeAndVerifyRecords(consumer, TP, 0, 5, 0, startingTimestamp); + } + } + + @ClusterTest + public void testClassicConsumerPerPartitionLeadMetricsCleanUpWithSubscribe() throws Exception { + testPerPartitionLeadMetricsCleanUpWithSubscribe(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, "testPerPartitionLeadMetricsCleanUpWithSubscribe", + CLIENT_ID_CONFIG, "testPerPartitionLeadMetricsCleanUpWithSubscribe" + )); + } + + @ClusterTest + public void testAsyncConsumerPerPartitionLeadMetricsCleanUpWithSubscribe() throws Exception { + testPerPartitionLeadMetricsCleanUpWithSubscribe(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, "testPerPartitionLeadMetricsCleanUpWithSubscribe", + CLIENT_ID_CONFIG, "testPerPartitionLeadMetricsCleanUpWithSubscribe" + )); + } + + private void testPerPartitionLeadMetricsCleanUpWithSubscribe(Map<String, Object> consumerConfig) throws Exception { + var numMessages = 1000; + var topic2 = "topic2"; + var tp2 = new TopicPartition(TOPIC, 1); + cluster.createTopic(topic2, 2, (short) BROKER_COUNT); + + try (Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig)) { + // send some messages. + sendRecords(cluster, TP, numMessages); + + // Test subscribe + // Create a consumer and consumer some messages. + var listener = new TestConsumerReassignmentListener(); + consumer.subscribe(List.of(TOPIC, topic2), listener); + var records = awaitNonEmptyRecords(consumer, TP); + assertEquals(1, listener.callsToAssigned, "should be assigned once"); + + // Verify the metric exist. + Map<String, String> tags1 = new HashMap<>(); + tags1.put("client-id", "testPerPartitionLeadMetricsCleanUpWithSubscribe"); + tags1.put("topic", TP.topic()); + tags1.put("partition", String.valueOf(TP.partition())); + + Map<String, String> tags2 = new HashMap<>(); + tags2.put("client-id", "testPerPartitionLeadMetricsCleanUpWithSubscribe"); + tags2.put("topic", tp2.topic()); + tags2.put("partition", String.valueOf(tp2.partition())); + + var fetchLead0 = consumer.metrics().get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags1)); + assertNotNull(fetchLead0); + assertEquals((double) records.count(), fetchLead0.metricValue(), "The lead should be " + records.count()); + + // Remove topic from subscription + consumer.subscribe(List.of(topic2), listener); + awaitRebalance(consumer, listener); + + // Verify the metric has gone + assertNull(consumer.metrics().get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags1))); + assertNull(consumer.metrics().get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags2))); + } + } + + @ClusterTest + public void testClassicConsumerPerPartitionLagMetricsCleanUpWithSubscribe() throws Exception { + testPerPartitionLagMetricsCleanUpWithSubscribe(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, "testPerPartitionLagMetricsCleanUpWithSubscribe", + CLIENT_ID_CONFIG, "testPerPartitionLagMetricsCleanUpWithSubscribe" + )); + } + + @ClusterTest + public void testAsyncConsumerPerPartitionLagMetricsCleanUpWithSubscribe() throws Exception { + testPerPartitionLagMetricsCleanUpWithSubscribe(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, "testPerPartitionLagMetricsCleanUpWithSubscribe", + CLIENT_ID_CONFIG, "testPerPartitionLagMetricsCleanUpWithSubscribe" + )); + } + + private void testPerPartitionLagMetricsCleanUpWithSubscribe(Map<String, Object> consumerConfig) throws Exception { + int numMessages = 1000; + var topic2 = "topic2"; + var tp2 = new TopicPartition(TOPIC, 1); + cluster.createTopic(topic2, 2, (short) BROKER_COUNT); + + try (Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig)) { + // send some messages. + sendRecords(cluster, TP, numMessages); + + // Test subscribe + // Create a consumer and consumer some messages. + var listener = new TestConsumerReassignmentListener(); + consumer.subscribe(List.of(TOPIC, topic2), listener); + var records = awaitNonEmptyRecords(consumer, TP); + assertEquals(1, listener.callsToAssigned, "should be assigned once"); + + // Verify the metric exist. + Map<String, String> tags1 = new HashMap<>(); + tags1.put("client-id", "testPerPartitionLagMetricsCleanUpWithSubscribe"); + tags1.put("topic", TP.topic()); + tags1.put("partition", String.valueOf(TP.partition())); + + Map<String, String> tags2 = new HashMap<>(); + tags2.put("client-id", "testPerPartitionLagMetricsCleanUpWithSubscribe"); + tags2.put("topic", tp2.topic()); + tags2.put("partition", String.valueOf(tp2.partition())); + + var fetchLag0 = consumer.metrics().get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags1)); + assertNotNull(fetchLag0); + var expectedLag = numMessages - records.count(); + assertEquals(expectedLag, (double) fetchLag0.metricValue(), EPSILON, "The lag should be " + expectedLag); + + // Remove topic from subscription + consumer.subscribe(List.of(topic2), listener); + awaitRebalance(consumer, listener); + + // Verify the metric has gone + assertNull(consumer.metrics().get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags1))); + assertNull(consumer.metrics().get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags2))); + } + } + + @ClusterTest + public void testClassicConsumerPerPartitionLeadMetricsCleanUpWithAssign() throws Exception { + testPerPartitionLeadMetricsCleanUpWithAssign(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, "testPerPartitionLeadMetricsCleanUpWithAssign", + CLIENT_ID_CONFIG, "testPerPartitionLeadMetricsCleanUpWithAssign" + )); + } + + @ClusterTest + public void testAsyncConsumerPerPartitionLeadMetricsCleanUpWithAssign() throws Exception { + testPerPartitionLeadMetricsCleanUpWithAssign(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, "testPerPartitionLeadMetricsCleanUpWithAssign", + CLIENT_ID_CONFIG, "testPerPartitionLeadMetricsCleanUpWithAssign" + )); + } + + private void testPerPartitionLeadMetricsCleanUpWithAssign(Map<String, Object> consumerConfig) throws Exception { + var numMessages = 1000; + var tp2 = new TopicPartition(TOPIC, 1); + cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT); + + try (Producer<byte[], byte[]> producer = cluster.producer(); + Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig) + ) { + // Test assign send some messages. + sendRecords(producer, TP, numMessages, System.currentTimeMillis()); + sendRecords(producer, tp2, numMessages, System.currentTimeMillis()); + + consumer.assign(List.of(TP)); + var records = awaitNonEmptyRecords(consumer, TP); + + // Verify the metric exist. + Map<String, String> tags = new HashMap<>(); + tags.put("client-id", "testPerPartitionLeadMetricsCleanUpWithAssign"); + tags.put("topic", TP.topic()); + tags.put("partition", String.valueOf(TP.partition())); + + var fetchLead = consumer.metrics().get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags)); + assertNotNull(fetchLead); + assertEquals((double) records.count(), fetchLead.metricValue(), "The lead should be " + records.count()); + + consumer.assign(List.of(tp2)); + awaitNonEmptyRecords(consumer, tp2); + assertNull(consumer.metrics().get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags))); + } + } + + @ClusterTest + public void testClassicConsumerPerPartitionLagMetricsCleanUpWithAssign() throws Exception { + testPerPartitionLagMetricsCleanUpWithAssign(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, "testPerPartitionLagMetricsCleanUpWithAssign", + CLIENT_ID_CONFIG, "testPerPartitionLagMetricsCleanUpWithAssign" + )); + } + + @ClusterTest + public void testAsyncConsumerPerPartitionLagMetricsCleanUpWithAssign() throws Exception { + testPerPartitionLagMetricsCleanUpWithAssign(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, "testPerPartitionLagMetricsCleanUpWithAssign", + CLIENT_ID_CONFIG, "testPerPartitionLagMetricsCleanUpWithAssign" + )); + } + + private void testPerPartitionLagMetricsCleanUpWithAssign(Map<String, Object> consumerConfig) throws Exception { + var numMessages = 1000; + var tp2 = new TopicPartition(TOPIC, 1); + cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT); + + try (Producer<byte[], byte[]> producer = cluster.producer(); + Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig) + ) { + // Test assign send some messages. + sendRecords(producer, TP, numMessages, System.currentTimeMillis()); + sendRecords(producer, tp2, numMessages, System.currentTimeMillis()); + + consumer.assign(List.of(TP)); + var records = awaitNonEmptyRecords(consumer, TP); + + // Verify the metric exist. + Map<String, String> tags = new HashMap<>(); + tags.put("client-id", "testPerPartitionLagMetricsCleanUpWithAssign"); + tags.put("topic", TP.topic()); + tags.put("partition", String.valueOf(TP.partition())); + + var fetchLag = consumer.metrics().get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags)); + assertNotNull(fetchLag); + + var expectedLag = numMessages - records.count(); + assertEquals(expectedLag, (double) fetchLag.metricValue(), EPSILON, "The lag should be " + expectedLag); + consumer.assign(List.of(tp2)); + awaitNonEmptyRecords(consumer, tp2); + assertNull(consumer.metrics().get(new MetricName(TP + ".records-lag", "consumer-fetch-manager-metrics", "", tags))); + assertNull(consumer.metrics().get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags))); + } + } + + @ClusterTest + public void testClassicConsumerPerPartitionLagMetricsWhenReadCommitted() throws Exception { + testPerPartitionLagMetricsWhenReadCommitted(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, "testPerPartitionLagMetricsWhenReadCommitted", + CLIENT_ID_CONFIG, "testPerPartitionLagMetricsWhenReadCommitted", + ISOLATION_LEVEL_CONFIG, "read_committed" + )); + } + + @ClusterTest + public void testAsyncConsumerPerPartitionLagMetricsWhenReadCommitted() throws Exception { + testPerPartitionLagMetricsWhenReadCommitted(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, "testPerPartitionLagMetricsWhenReadCommitted", + CLIENT_ID_CONFIG, "testPerPartitionLagMetricsWhenReadCommitted", + ISOLATION_LEVEL_CONFIG, "read_committed" + )); + } + + private void testPerPartitionLagMetricsWhenReadCommitted(Map<String, Object> consumerConfig) throws Exception { + var numMessages = 1000; + var tp2 = new TopicPartition(TOPIC, 1); + cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT); + + try (Producer<byte[], byte[]> producer = cluster.producer(); + Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig) + ) { + // Test assign send some messages. + sendRecords(producer, TP, numMessages, System.currentTimeMillis()); + sendRecords(producer, tp2, numMessages, System.currentTimeMillis()); + + consumer.assign(List.of(TP)); + awaitNonEmptyRecords(consumer, TP); + + // Verify the metric exist. + Map<String, String> tags = new HashMap<>(); + tags.put("client-id", "testPerPartitionLagMetricsWhenReadCommitted"); + tags.put("topic", TP.topic()); + tags.put("partition", String.valueOf(TP.partition())); + + var fetchLag = consumer.metrics().get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags)); + assertNotNull(fetchLag); + } + } + + @ClusterTest + public void testClassicConsumerQuotaMetricsNotCreatedIfNoQuotasConfigured() throws Exception { + var consumerClientId = "testQuotaMetricsNotCreatedIfNoQuotasConfigured"; + testQuotaMetricsNotCreatedIfNoQuotasConfigured(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, consumerClientId, + CLIENT_ID_CONFIG, consumerClientId, + ISOLATION_LEVEL_CONFIG, "read_committed" + ), consumerClientId); + } + + @ClusterTest + public void testAsyncConsumerQuotaMetricsNotCreatedIfNoQuotasConfigured() throws Exception { + var consumerClientId = "testQuotaMetricsNotCreatedIfNoQuotasConfigured"; + testQuotaMetricsNotCreatedIfNoQuotasConfigured(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, consumerClientId, + CLIENT_ID_CONFIG, consumerClientId, + ISOLATION_LEVEL_CONFIG, "read_committed" + ), consumerClientId); + } + + private void testQuotaMetricsNotCreatedIfNoQuotasConfigured(Map<String, Object> consumerConfig, String consumerClientId) throws Exception { + var producerClientId = UUID.randomUUID().toString(); + var numRecords = 1000; + cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT); + + try (Producer<byte[], byte[]> producer = cluster.producer(Map.of(ProducerConfig.CLIENT_ID_CONFIG, producerClientId)); + Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig) + ) { + var startingTimestamp = System.currentTimeMillis(); + sendRecords(producer, TP, numRecords, startingTimestamp); + + consumer.assign(List.of(TP)); + consumer.seek(TP, 0); + consumeAndVerifyRecords(consumer, TP, numRecords, 0, 0, startingTimestamp); + + var brokers = cluster.brokers().values(); + brokers.forEach(broker -> assertNoMetric(broker, "byte-rate", QuotaType.PRODUCE, producerClientId)); + brokers.forEach(broker -> assertNoMetric(broker, "throttle-time", QuotaType.PRODUCE, producerClientId)); + brokers.forEach(broker -> assertNoMetric(broker, "byte-rate", QuotaType.FETCH, consumerClientId)); + brokers.forEach(broker -> assertNoMetric(broker, "throttle-time", QuotaType.FETCH, consumerClientId)); + brokers.forEach(broker -> assertNoMetric(broker, "request-time", QuotaType.REQUEST, producerClientId)); + brokers.forEach(broker -> assertNoMetric(broker, "throttle-time", QuotaType.REQUEST, producerClientId)); + brokers.forEach(broker -> assertNoMetric(broker, "request-time", QuotaType.REQUEST, consumerClientId)); + brokers.forEach(broker -> assertNoMetric(broker, "throttle-time", QuotaType.REQUEST, consumerClientId)); Review Comment: I think this approach is clear enough. However, I’m not sure if using a test case record and adding all test cases into a list is the best approach. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org