chia7712 commented on code in PR #20081: URL: https://github.com/apache/kafka/pull/20081#discussion_r2189912219
########## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java: ########## @@ -0,0 +1,1640 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import kafka.server.KafkaBroker; + +import org.apache.kafka.clients.ClientsTestUtils.TestConsumerReassignmentListener; +import org.apache.kafka.clients.admin.NewPartitions; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.errors.InvalidGroupIdException; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; +import org.apache.kafka.common.test.api.Flaky; +import org.apache.kafka.common.test.api.Type; +import org.apache.kafka.server.quota.QuotaType; +import org.apache.kafka.test.MockConsumerInterceptor; +import org.apache.kafka.test.MockProducerInterceptor; +import org.apache.kafka.test.TestUtils; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.IntStream; + +import static org.apache.kafka.clients.ClientsTestUtils.BaseConsumerTestcase.BROKER_COUNT; +import static org.apache.kafka.clients.ClientsTestUtils.BaseConsumerTestcase.TOPIC; +import static org.apache.kafka.clients.ClientsTestUtils.BaseConsumerTestcase.TP; +import static org.apache.kafka.clients.ClientsTestUtils.BaseConsumerTestcase.testClusterResourceListener; +import static org.apache.kafka.clients.ClientsTestUtils.BaseConsumerTestcase.testCoordinatorFailover; +import static org.apache.kafka.clients.ClientsTestUtils.BaseConsumerTestcase.testSimpleConsumption; +import static org.apache.kafka.clients.ClientsTestUtils.awaitAssignment; +import static org.apache.kafka.clients.ClientsTestUtils.awaitRebalance; +import static org.apache.kafka.clients.ClientsTestUtils.consumeAndVerifyRecords; +import static org.apache.kafka.clients.ClientsTestUtils.consumeAndVerifyRecordsWithTimeTypeLogAppend; +import static org.apache.kafka.clients.ClientsTestUtils.consumeRecords; +import static org.apache.kafka.clients.ClientsTestUtils.sendAndAwaitAsyncCommit; +import static org.apache.kafka.clients.ClientsTestUtils.sendRecords; +import static org.apache.kafka.clients.CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG; +import static org.apache.kafka.clients.CommonClientConfigs.METADATA_MAX_AGE_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.CLIENT_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_INSTANCE_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@ClusterTestDefaults( + types = {Type.KRAFT}, + brokers = BROKER_COUNT, + serverProperties = { + @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), + @ClusterConfigProperty(key = GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, value = "100"), + @ClusterConfigProperty(key = GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, value = "60000"), + @ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "10"), + } +) +public class PlaintextConsumerTest { + + private final ClusterInstance cluster; + public static final double EPSILON = 0.1; + + public PlaintextConsumerTest(ClusterInstance cluster) { + this.cluster = cluster; + } + + @ClusterTest + public void testClassicConsumerSimpleConsumption() throws InterruptedException { + testSimpleConsumption(cluster, Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerSimpleConsumption() throws InterruptedException { + testSimpleConsumption(cluster, Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testClassicConsumerClusterResourceListener() throws InterruptedException { + testClusterResourceListener(cluster, Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerClusterResourceListener() throws InterruptedException { + testClusterResourceListener(cluster, Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testClassicConsumerCoordinatorFailover() throws InterruptedException { + Map<String, Object> config = Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + SESSION_TIMEOUT_MS_CONFIG, 5001, + HEARTBEAT_INTERVAL_MS_CONFIG, 1000, + // Use higher poll timeout to avoid consumer leaving the group due to timeout + MAX_POLL_INTERVAL_MS_CONFIG, 15000 + ); + testCoordinatorFailover(cluster, config); + } + + @ClusterTest + public void testAsyncConsumeCoordinatorFailover() throws InterruptedException { + Map<String, Object> config = Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT), + // Use higher poll timeout to avoid consumer leaving the group due to timeout + MAX_POLL_INTERVAL_MS_CONFIG, 15000 + ); + testCoordinatorFailover(cluster, config); + } + + @ClusterTest + public void testClassicConsumerHeaders() throws Exception { + testHeaders(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerHeaders() throws Exception { + testHeaders(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testHeaders(Map<String, Object> consumerConfig) throws Exception { + var numRecords = 1; + + try (Producer<byte[], byte[]> producer = cluster.producer(); + Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig) + ) { + var record = new ProducerRecord<>(TP.topic(), TP.partition(), null, "key".getBytes(), "value".getBytes()); + record.headers().add("headerKey", "headerValue".getBytes()); + producer.send(record); + + assertEquals(0, consumer.assignment().size()); + consumer.assign(List.of(TP)); + assertEquals(1, consumer.assignment().size()); + + consumer.seek(TP, 0); + var records = consumeRecords(consumer, numRecords); + assertEquals(numRecords, records.size()); + var header = records.get(0).headers().lastHeader("headerKey"); + assertEquals("headerValue", header == null ? null : new String(header.value())); + } + } + + @ClusterTest + public void testClassicConsumerHeadersSerializerDeserializer() throws Exception { + testHeadersSerializeDeserialize(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerHeadersSerializerDeserializer() throws Exception { + testHeadersSerializeDeserialize(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testHeadersSerializeDeserialize(Map<String, Object> config) throws InterruptedException { + var numRecords = 1; + Map<String, Object> consumerConfig = new HashMap<>(config); + consumerConfig.put(VALUE_DESERIALIZER_CLASS_CONFIG, DeserializerImpl.class); + Map<String, Object> producerConfig = Map.of( + VALUE_SERIALIZER_CLASS_CONFIG, SerializerImpl.class.getName() + ); + + try (Producer<byte[], byte[]> producer = cluster.producer(producerConfig); + Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig) + ) { + producer.send(new ProducerRecord<>( + TP.topic(), + TP.partition(), + null, + "key".getBytes(), + "value".getBytes()) + ); + + assertEquals(0, consumer.assignment().size()); + consumer.assign(List.of(TP)); + assertEquals(1, consumer.assignment().size()); + + consumer.seek(TP, 0); + assertEquals(numRecords, consumeRecords(consumer, numRecords).size()); + } + } + + @ClusterTest + public void testClassicConsumerAutoOffsetReset() throws Exception { + testAutoOffsetReset(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerAutoOffsetReset() throws Exception { + testAutoOffsetReset(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testAutoOffsetReset(Map<String, Object> consumerConfig) throws Exception { + try (Producer<byte[], byte[]> producer = cluster.producer(); + Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig) + ) { + var startingTimestamp = System.currentTimeMillis(); + sendRecords(producer, TP, 1, startingTimestamp); + consumer.assign(List.of(TP)); + consumeAndVerifyRecords(consumer, TP, 1, 0, 0, startingTimestamp); + } + } + + @ClusterTest + public void testClassicConsumerGroupConsumption() throws Exception { + testGroupConsumption(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerGroupConsumption() throws Exception { + testGroupConsumption(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testGroupConsumption(Map<String, Object> consumerConfig) throws Exception { + try (Producer<byte[], byte[]> producer = cluster.producer(); + Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig) + ) { + var startingTimestamp = System.currentTimeMillis(); + sendRecords(producer, TP, 10, startingTimestamp); + consumer.subscribe(List.of(TOPIC)); + consumeAndVerifyRecords(consumer, TP, 1, 0, 0, startingTimestamp); + } + } + + @ClusterTest + public void testClassicConsumerPartitionsFor() throws Exception { + testPartitionsFor(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerPartitionsFor() throws Exception { + testPartitionsFor(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testPartitionsFor(Map<String, Object> consumerConfig) throws Exception { + var numParts = 2; + cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT); + cluster.createTopic("part-test", numParts, (short) 1); + + try (var consumer = cluster.consumer(consumerConfig)) { + var partitions = consumer.partitionsFor(TOPIC); + assertNotNull(partitions); + assertEquals(2, partitions.size()); + } + } + + @ClusterTest + public void testClassicConsumerPartitionsForAutoCreate() throws Exception { + testPartitionsForAutoCreate(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerPartitionsForAutoCreate() throws Exception { + testPartitionsForAutoCreate(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testPartitionsForAutoCreate(Map<String, Object> consumerConfig) throws Exception { + try (var consumer = cluster.consumer(consumerConfig)) { + // First call would create the topic + consumer.partitionsFor("non-exist-topic"); + TestUtils.waitForCondition( + () -> !consumer.partitionsFor("non-exist-topic").isEmpty(), + "Timed out while awaiting non empty partitions." + ); + } + } + + @ClusterTest + public void testClassicConsumerPartitionsForInvalidTopic() { + testPartitionsForInvalidTopic(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerPartitionsForInvalidTopic() { + testPartitionsForInvalidTopic(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testPartitionsForInvalidTopic(Map<String, Object> consumerConfig) { + try (var consumer = cluster.consumer(consumerConfig)) { + assertThrows(InvalidTopicException.class, () -> consumer.partitionsFor(";3# ads,{234")); + } + } + + @ClusterTest + public void testClassicConsumerSeek() throws Exception { + testSeek( + Map.of(GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerSeek() throws Exception { + testSeek( + Map.of(GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testSeek(Map<String, Object> consumerConfig) throws Exception { + var totalRecords = 50; + var mid = totalRecords / 2; + cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT); + + try (Producer<byte[], byte[]> producer = cluster.producer(); + Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig) + ) { + var startingTimestamp = 0; + sendRecords(producer, TP, totalRecords, startingTimestamp); + + consumer.assign(List.of(TP)); + consumer.seekToEnd(List.of(TP)); + assertEquals(totalRecords, consumer.position(TP)); + assertTrue(consumer.poll(Duration.ofMillis(50)).isEmpty()); + + consumer.seekToBeginning(List.of(TP)); + assertEquals(0, consumer.position(TP)); + consumeAndVerifyRecords(consumer, TP, 1, 0, 0, startingTimestamp); + + consumer.seek(TP, mid); + assertEquals(mid, consumer.position(TP)); + + consumeAndVerifyRecords(consumer, TP, 1, mid, mid, mid); + + // Test seek compressed message + var tp2 = new TopicPartition(TOPIC, 1); + cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT); + sendCompressedMessages(totalRecords, tp2); + consumer.assign(List.of(tp2)); + + consumer.seekToEnd(List.of(tp2)); + assertEquals(totalRecords, consumer.position(tp2)); + assertTrue(consumer.poll(Duration.ofMillis(50)).isEmpty()); + + consumer.seekToBeginning(List.of(tp2)); + assertEquals(0L, consumer.position(tp2)); + consumeAndVerifyRecords(consumer, tp2, 1, 0); + + consumer.seek(tp2, mid); + assertEquals(mid, consumer.position(tp2)); + consumeAndVerifyRecords(consumer, tp2, 1, mid, mid, mid); + } + } + + @ClusterTest + public void testClassicConsumerPartitionPauseAndResume() throws Exception { + testPartitionPauseAndResume(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerPartitionPauseAndResume() throws Exception { + testPartitionPauseAndResume(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testPartitionPauseAndResume(Map<String, Object> consumerConfig) throws Exception { + var partitions = List.of(TP); + var numRecords = 5; + + try (Producer<byte[], byte[]> producer = cluster.producer(); + Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig) + ) { + var startingTimestamp = System.currentTimeMillis(); + sendRecords(producer, TP, numRecords, startingTimestamp); + + consumer.assign(partitions); + consumeAndVerifyRecords(consumer, TP, numRecords, 0, 0, startingTimestamp); + consumer.pause(partitions); + startingTimestamp = System.currentTimeMillis(); + sendRecords(producer, TP, numRecords, startingTimestamp); + assertTrue(consumer.poll(Duration.ofMillis(100)).isEmpty()); + consumer.resume(partitions); + consumeAndVerifyRecords(consumer, TP, numRecords, 5, 0, startingTimestamp); + } + } + + @ClusterTest + public void testClassicConsumerInterceptors() throws Exception { + testInterceptors(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerInterceptors() throws Exception { + testInterceptors(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testInterceptors(Map<String, Object> consumerConfig) throws Exception { + var appendStr = "mock"; + MockConsumerInterceptor.resetCounters(); + MockProducerInterceptor.resetCounters(); + + // create producer with interceptor + Map<String, Object> producerConfig = Map.of( + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MockProducerInterceptor.class.getName(), + KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(), + VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(), + "mock.interceptor.append", appendStr + ); + // create consumer with interceptor + Map<String, Object> consumerConfigOverride = new HashMap<>(consumerConfig); + consumerConfigOverride.put(INTERCEPTOR_CLASSES_CONFIG, MockConsumerInterceptor.class.getName()); + consumerConfigOverride.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerConfigOverride.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + + try (Producer<String, String> producer = cluster.producer(producerConfig); + Consumer<String, String> consumer = cluster.consumer(consumerConfigOverride) + ) { + // produce records + var numRecords = 10; + List<Future<RecordMetadata>> futures = new ArrayList<>(); + for (var i = 0; i < numRecords; i++) { + Future<RecordMetadata> future = producer.send( + new ProducerRecord<>(TP.topic(), TP.partition(), "key " + i, "value " + i) + ); + futures.add(future); + } + + // Wait for all sends to complete + futures.forEach(future -> assertDoesNotThrow(() -> future.get())); + + assertEquals(numRecords, MockProducerInterceptor.ONSEND_COUNT.intValue()); + assertEquals(numRecords, MockProducerInterceptor.ON_SUCCESS_COUNT.intValue()); + + // send invalid record + assertThrows( + Throwable.class, + () -> producer.send(null), + "Should not allow sending a null record" + ); + assertEquals( + 1, + MockProducerInterceptor.ON_ERROR_COUNT.intValue(), + "Interceptor should be notified about exception" + ); + assertEquals( + 0, + MockProducerInterceptor.ON_ERROR_WITH_METADATA_COUNT.intValue(), + "Interceptor should not receive metadata with an exception when record is null" + ); + + consumer.assign(List.of(TP)); + consumer.seek(TP, 0); + + // consume and verify that values are modified by interceptors + var records = consumeRecords(consumer, numRecords); + for (var i = 0; i < numRecords; i++) { + ConsumerRecord<String, String> record = records.get(i); + assertEquals("key " + i, record.key()); + assertEquals(("value " + i + appendStr).toUpperCase(Locale.ROOT), record.value()); + } + + // commit sync and verify onCommit is called + var commitCountBefore = MockConsumerInterceptor.ON_COMMIT_COUNT.intValue(); + consumer.commitSync(Map.of(TP, new OffsetAndMetadata(2L))); + assertEquals(2, consumer.committed(Set.of(TP)).get(TP).offset()); + assertEquals(commitCountBefore + 1, MockConsumerInterceptor.ON_COMMIT_COUNT.intValue()); + + // commit async and verify onCommit is called + var offsetsToCommit = Map.of(TP, new OffsetAndMetadata(5L)); + sendAndAwaitAsyncCommit(consumer, Optional.of(offsetsToCommit)); + assertEquals(5, consumer.committed(Set.of(TP)).get(TP).offset()); + assertEquals(commitCountBefore + 2, MockConsumerInterceptor.ON_COMMIT_COUNT.intValue()); + } + // cleanup + MockConsumerInterceptor.resetCounters(); + MockProducerInterceptor.resetCounters(); + } + + @ClusterTest + public void testClassicConsumerInterceptorsWithWrongKeyValue() throws Exception { + testInterceptorsWithWrongKeyValue(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerInterceptorsWithWrongKeyValue() throws Exception { + testInterceptorsWithWrongKeyValue(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testInterceptorsWithWrongKeyValue(Map<String, Object> consumerConfig) throws Exception { + var appendStr = "mock"; + // create producer with interceptor that has different key and value types from the producer + Map<String, Object> producerConfig = Map.of( + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MockProducerInterceptor.class.getName(), + "mock.interceptor.append", appendStr + ); + // create consumer with interceptor that has different key and value types from the consumer + Map<String, Object> consumerConfigOverride = new HashMap<>(consumerConfig); + consumerConfigOverride.put(INTERCEPTOR_CLASSES_CONFIG, MockConsumerInterceptor.class.getName()); + + try (Producer<byte[], byte[]> producer = cluster.producer(producerConfig); + Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfigOverride) + ) { + // producing records should succeed + producer.send(new ProducerRecord<>( + TP.topic(), + TP.partition(), + "key".getBytes(), + "value will not be modified".getBytes() + )); + + consumer.assign(List.of(TP)); + consumer.seek(TP, 0); + // consume and verify that values are not modified by interceptors -- their exceptions are caught and logged, but not propagated + var records = consumeRecords(consumer, 1); + var record = records.get(0); + assertEquals("value will not be modified", new String(record.value())); + } + } + + @ClusterTest + public void testClassicConsumerConsumeMessagesWithCreateTime() throws Exception { + testConsumeMessagesWithCreateTime(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerConsumeMessagesWithCreateTime() throws Exception { + testConsumeMessagesWithCreateTime(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testConsumeMessagesWithCreateTime(Map<String, Object> consumerConfig) throws Exception { + cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT); + var numRecords = 50; + var tp2 = new TopicPartition(TOPIC, 1); + + try (Producer<byte[], byte[]> producer = cluster.producer(); + Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig) + ) { + // Test non-compressed messages + var startingTimestamp = System.currentTimeMillis(); + sendRecords(producer, TP, numRecords, startingTimestamp); + consumer.assign(List.of(TP)); + consumeAndVerifyRecords(consumer, TP, numRecords, 0, 0, startingTimestamp); + + // Test compressed messages + sendCompressedMessages(numRecords, tp2); + consumer.assign(List.of(tp2)); + consumeAndVerifyRecords(consumer, tp2, numRecords, 0, 0, 0); + } + } + + @ClusterTest + public void testClassicConsumerConsumeMessagesWithLogAppendTime() throws Exception { + testConsumeMessagesWithLogAppendTime(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerConsumeMessagesWithLogAppendTime() throws Exception { + testConsumeMessagesWithLogAppendTime(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testConsumeMessagesWithLogAppendTime(Map<String, Object> consumerConfig) throws Exception { + var topicName = "testConsumeMessagesWithLogAppendTime"; + var startTime = System.currentTimeMillis(); + var numRecords = 50; + cluster.createTopic(topicName, 2, (short) 2, Map.of(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime")); + + try (Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig)) { + // Test non-compressed messages + var tp1 = new TopicPartition(topicName, 0); + sendRecords(cluster, tp1, numRecords); + consumer.assign(List.of(tp1)); + consumeAndVerifyRecordsWithTimeTypeLogAppend(consumer, tp1, numRecords, startTime); + + // Test compressed messages + var tp2 = new TopicPartition(topicName, 1); + sendCompressedMessages(numRecords, tp2); + consumer.assign(List.of(tp2)); + consumeAndVerifyRecordsWithTimeTypeLogAppend(consumer, tp2, numRecords, startTime); + } + } + + @ClusterTest + public void testClassicConsumerListTopics() throws Exception { + testListTopics(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerListTopics() throws Exception { + testListTopics(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testListTopics(Map<String, Object> consumerConfig) throws Exception { + var numParts = 2; + var topic1 = "part-test-topic-1"; + var topic2 = "part-test-topic-2"; + var topic3 = "part-test-topic-3"; + cluster.createTopic(topic1, numParts, (short) 1); + cluster.createTopic(topic2, numParts, (short) 1); + cluster.createTopic(topic3, numParts, (short) 1); + + sendRecords(cluster, new TopicPartition(topic1, 0), 1); + + try (var consumer = cluster.consumer(consumerConfig)) { + // consumer some messages, and we can list the internal topic __consumer_offsets + consumer.subscribe(List.of(topic1)); + consumer.poll(Duration.ofMillis(100)); + var topics = consumer.listTopics(); + assertNotNull(topics); + assertEquals(4, topics.size()); + assertEquals(2, topics.get(topic1).size()); + assertEquals(2, topics.get(topic2).size()); + assertEquals(2, topics.get(topic3).size()); + } + } + + @ClusterTest + public void testClassicConsumerPauseStateNotPreservedByRebalance() throws Exception { + testPauseStateNotPreservedByRebalance(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + SESSION_TIMEOUT_MS_CONFIG, 100, + HEARTBEAT_INTERVAL_MS_CONFIG, 30 + )); + } + + @ClusterTest + public void testAsyncConsumerPauseStateNotPreservedByRebalance() throws Exception { + testPauseStateNotPreservedByRebalance(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testPauseStateNotPreservedByRebalance(Map<String, Object> consumerConfig) throws Exception { + try (Producer<byte[], byte[]> producer = cluster.producer(); + Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig) + ) { + var startingTimestamp = System.currentTimeMillis(); + sendRecords(producer, TP, 5, startingTimestamp); + consumer.subscribe(List.of(TOPIC)); + consumeAndVerifyRecords(consumer, TP, 5, 0, 0, startingTimestamp); + consumer.pause(List.of(TP)); + + // subscribe to a new topic to trigger a rebalance + consumer.subscribe(List.of("topic2")); + + // after rebalance, our position should be reset and our pause state lost, + // so we should be able to consume from the beginning + consumeAndVerifyRecords(consumer, TP, 0, 5, 0, startingTimestamp); + } + } + + @ClusterTest + public void testClassicConsumerPerPartitionLeadMetricsCleanUpWithSubscribe() throws Exception { + String consumerClientId = "testClassicConsumerPerPartitionLeadMetricsCleanUpWithSubscribe"; + testPerPartitionLeadMetricsCleanUpWithSubscribe(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, consumerClientId, + CLIENT_ID_CONFIG, consumerClientId + ), consumerClientId); + } + + @ClusterTest + public void testAsyncConsumerPerPartitionLeadMetricsCleanUpWithSubscribe() throws Exception { + String consumerClientId = "testAsyncConsumerPerPartitionLeadMetricsCleanUpWithSubscribe"; + testPerPartitionLeadMetricsCleanUpWithSubscribe(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, consumerClientId, + CLIENT_ID_CONFIG, consumerClientId + ), consumerClientId); + } + + private void testPerPartitionLeadMetricsCleanUpWithSubscribe( + Map<String, Object> consumerConfig, + String consumerClientId + ) throws Exception { + var numMessages = 1000; + var topic2 = "topic2"; + var tp2 = new TopicPartition(TOPIC, 1); + cluster.createTopic(topic2, 2, (short) BROKER_COUNT); + + try (Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig)) { + // send some messages. + sendRecords(cluster, TP, numMessages); + + // Test subscribe + // Create a consumer and consumer some messages. + var listener = new TestConsumerReassignmentListener(); + consumer.subscribe(List.of(TOPIC, topic2), listener); + var records = awaitNonEmptyRecords(consumer, TP); + assertEquals(1, listener.callsToAssigned, "should be assigned once"); + + // Verify the metric exist. + Map<String, String> tags1 = Map.of( + "client-id", consumerClientId, + "topic", TP.topic(), + "partition", String.valueOf(TP.partition()) + ); + + Map<String, String> tags2 = Map.of( + "client-id", consumerClientId, + "topic", tp2.topic(), + "partition", String.valueOf(tp2.partition()) + ); + + var fetchLead0 = consumer.metrics().get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags1)); + assertNotNull(fetchLead0); + assertEquals((double) records.count(), fetchLead0.metricValue(), "The lead should be " + records.count()); + + // Remove topic from subscription + consumer.subscribe(List.of(topic2), listener); + awaitRebalance(consumer, listener); + + // Verify the metric has gone + assertNull(consumer.metrics().get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags1))); + assertNull(consumer.metrics().get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags2))); + } + } + + @ClusterTest + public void testClassicConsumerPerPartitionLagMetricsCleanUpWithSubscribe() throws Exception { + String consumerClientId = "testClassicConsumerPerPartitionLagMetricsCleanUpWithSubscribe"; + testPerPartitionLagMetricsCleanUpWithSubscribe(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, consumerClientId, + CLIENT_ID_CONFIG, consumerClientId + ), consumerClientId); + } + + @ClusterTest + public void testAsyncConsumerPerPartitionLagMetricsCleanUpWithSubscribe() throws Exception { + String consumerClientId = "testAsyncConsumerPerPartitionLagMetricsCleanUpWithSubscribe"; + testPerPartitionLagMetricsCleanUpWithSubscribe(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, consumerClientId, + CLIENT_ID_CONFIG, consumerClientId + ), consumerClientId); + } + + private void testPerPartitionLagMetricsCleanUpWithSubscribe( + Map<String, Object> consumerConfig, + String consumerClientId + ) throws Exception { + int numMessages = 1000; + var topic2 = "topic2"; + var tp2 = new TopicPartition(TOPIC, 1); + cluster.createTopic(topic2, 2, (short) BROKER_COUNT); + + try (Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig)) { + // send some messages. + sendRecords(cluster, TP, numMessages); + + // Test subscribe + // Create a consumer and consumer some messages. + var listener = new TestConsumerReassignmentListener(); + consumer.subscribe(List.of(TOPIC, topic2), listener); + var records = awaitNonEmptyRecords(consumer, TP); + assertEquals(1, listener.callsToAssigned, "should be assigned once"); + + // Verify the metric exist. + Map<String, String> tags1 = Map.of( + "client-id", consumerClientId, + "topic", TP.topic(), + "partition", String.valueOf(TP.partition()) + ); + + Map<String, String> tags2 = Map.of( + "client-id", consumerClientId, + "topic", tp2.topic(), + "partition", String.valueOf(tp2.partition()) + ); + + var fetchLag0 = consumer.metrics().get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags1)); + assertNotNull(fetchLag0); + var expectedLag = numMessages - records.count(); + assertEquals(expectedLag, (double) fetchLag0.metricValue(), EPSILON, "The lag should be " + expectedLag); + + // Remove topic from subscription + consumer.subscribe(List.of(topic2), listener); + awaitRebalance(consumer, listener); + + // Verify the metric has gone + assertNull(consumer.metrics().get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags1))); + assertNull(consumer.metrics().get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags2))); + } + } + + @ClusterTest + public void testClassicConsumerPerPartitionLeadMetricsCleanUpWithAssign() throws Exception { + String consumerClientId = "testClassicConsumerPerPartitionLeadMetricsCleanUpWithAssign"; + testPerPartitionLeadMetricsCleanUpWithAssign(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, consumerClientId, + CLIENT_ID_CONFIG, consumerClientId + ), consumerClientId); + } + + @ClusterTest + public void testAsyncConsumerPerPartitionLeadMetricsCleanUpWithAssign() throws Exception { + String consumerClientId = "testAsyncConsumerPerPartitionLeadMetricsCleanUpWithAssign"; + testPerPartitionLeadMetricsCleanUpWithAssign(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, consumerClientId, + CLIENT_ID_CONFIG, consumerClientId + ), consumerClientId); + } + + private void testPerPartitionLeadMetricsCleanUpWithAssign( + Map<String, Object> consumerConfig, + String consumerClientId + ) throws Exception { + var numMessages = 1000; + var tp2 = new TopicPartition(TOPIC, 1); + cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT); + + try (Producer<byte[], byte[]> producer = cluster.producer(); + Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig) + ) { + // Test assign send some messages. + sendRecords(producer, TP, numMessages, System.currentTimeMillis()); + sendRecords(producer, tp2, numMessages, System.currentTimeMillis()); + + consumer.assign(List.of(TP)); + var records = awaitNonEmptyRecords(consumer, TP); + + // Verify the metric exist. + Map<String, String> tags = Map.of( + "client-id", consumerClientId, + "topic", TP.topic(), + "partition", String.valueOf(TP.partition()) + ); + + var fetchLead = consumer.metrics().get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags)); + assertNotNull(fetchLead); + assertEquals((double) records.count(), fetchLead.metricValue(), "The lead should be " + records.count()); + + consumer.assign(List.of(tp2)); + awaitNonEmptyRecords(consumer, tp2); + assertNull(consumer.metrics().get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags))); + } + } + + @ClusterTest + public void testClassicConsumerPerPartitionLagMetricsCleanUpWithAssign() throws Exception { + String consumerClientId = "testClassicConsumerPerPartitionLagMetricsCleanUpWithAssign"; + testPerPartitionLagMetricsCleanUpWithAssign(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, consumerClientId, + CLIENT_ID_CONFIG, consumerClientId + ), consumerClientId); + } + + @ClusterTest + public void testAsyncConsumerPerPartitionLagMetricsCleanUpWithAssign() throws Exception { + String consumerClientId = "testAsyncConsumerPerPartitionLagMetricsCleanUpWithAssign"; + testPerPartitionLagMetricsCleanUpWithAssign(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, consumerClientId, + CLIENT_ID_CONFIG, consumerClientId + ), consumerClientId); + } + + private void testPerPartitionLagMetricsCleanUpWithAssign( + Map<String, Object> consumerConfig, + String consumerClientId + ) throws Exception { + var numMessages = 1000; + var tp2 = new TopicPartition(TOPIC, 1); + cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT); + + try (Producer<byte[], byte[]> producer = cluster.producer(); + Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig) + ) { + // Test assign send some messages. + sendRecords(producer, TP, numMessages, System.currentTimeMillis()); + sendRecords(producer, tp2, numMessages, System.currentTimeMillis()); + + consumer.assign(List.of(TP)); + var records = awaitNonEmptyRecords(consumer, TP); + + // Verify the metric exist. + Map<String, String> tags = Map.of( + "client-id", consumerClientId, + "topic", TP.topic(), + "partition", String.valueOf(TP.partition()) + ); + + var fetchLag = consumer.metrics().get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags)); + assertNotNull(fetchLag); + + var expectedLag = numMessages - records.count(); + assertEquals(expectedLag, (double) fetchLag.metricValue(), EPSILON, "The lag should be " + expectedLag); + consumer.assign(List.of(tp2)); + awaitNonEmptyRecords(consumer, tp2); + assertNull(consumer.metrics().get(new MetricName(TP + ".records-lag", "consumer-fetch-manager-metrics", "", tags))); + assertNull(consumer.metrics().get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags))); + } + } + + @ClusterTest + public void testClassicConsumerPerPartitionLagMetricsWhenReadCommitted() throws Exception { + String consumerClientId = "testClassicConsumerPerPartitionLagMetricsWhenReadCommitted"; + testPerPartitionLagMetricsWhenReadCommitted(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, consumerClientId, + CLIENT_ID_CONFIG, consumerClientId, + ISOLATION_LEVEL_CONFIG, "read_committed" + ), consumerClientId); + } + + @ClusterTest + public void testAsyncConsumerPerPartitionLagMetricsWhenReadCommitted() throws Exception { + String consumerClientId = "testAsyncConsumerPerPartitionLagMetricsWhenReadCommitted"; + testPerPartitionLagMetricsWhenReadCommitted(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, consumerClientId, + CLIENT_ID_CONFIG, consumerClientId, + ISOLATION_LEVEL_CONFIG, "read_committed" + ), consumerClientId); + } + + private void testPerPartitionLagMetricsWhenReadCommitted( + Map<String, Object> consumerConfig, + String consumerClientId + ) throws Exception { + var numMessages = 1000; + var tp2 = new TopicPartition(TOPIC, 1); + cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT); + + try (Producer<byte[], byte[]> producer = cluster.producer(); + Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig) + ) { + // Test assign send some messages. + sendRecords(producer, TP, numMessages, System.currentTimeMillis()); + sendRecords(producer, tp2, numMessages, System.currentTimeMillis()); + + consumer.assign(List.of(TP)); + awaitNonEmptyRecords(consumer, TP); + + // Verify the metric exist. + Map<String, String> tags = Map.of( + "client-id", consumerClientId, + "topic", TP.topic(), + "partition", String.valueOf(TP.partition()) + ); + + var fetchLag = consumer.metrics().get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags)); + assertNotNull(fetchLag); + } + } + + @ClusterTest + public void testClassicConsumerQuotaMetricsNotCreatedIfNoQuotasConfigured() throws Exception { + var consumerClientId = "testClassicConsumerQuotaMetricsNotCreatedIfNoQuotasConfigured"; + testQuotaMetricsNotCreatedIfNoQuotasConfigured(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, consumerClientId, + CLIENT_ID_CONFIG, consumerClientId, + ISOLATION_LEVEL_CONFIG, "read_committed" + ), consumerClientId); + } + + @ClusterTest + public void testAsyncConsumerQuotaMetricsNotCreatedIfNoQuotasConfigured() throws Exception { + var consumerClientId = "testAsyncConsumerQuotaMetricsNotCreatedIfNoQuotasConfigured"; + testQuotaMetricsNotCreatedIfNoQuotasConfigured(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, consumerClientId, + CLIENT_ID_CONFIG, consumerClientId, + ISOLATION_LEVEL_CONFIG, "read_committed" + ), consumerClientId); + } + + private void testQuotaMetricsNotCreatedIfNoQuotasConfigured( + Map<String, Object> consumerConfig, + String consumerClientId + ) throws Exception { + var producerClientId = UUID.randomUUID().toString(); + var numRecords = 1000; + cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT); + + try (Producer<byte[], byte[]> producer = cluster.producer(Map.of(ProducerConfig.CLIENT_ID_CONFIG, producerClientId)); + Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig) + ) { + var startingTimestamp = System.currentTimeMillis(); + sendRecords(producer, TP, numRecords, startingTimestamp); + + consumer.assign(List.of(TP)); + consumer.seek(TP, 0); + consumeAndVerifyRecords(consumer, TP, numRecords, 0, 0, startingTimestamp); + + var brokers = cluster.brokers().values(); + brokers.forEach(broker -> assertNoMetric(broker, "byte-rate", QuotaType.PRODUCE, producerClientId)); + brokers.forEach(broker -> assertNoMetric(broker, "throttle-time", QuotaType.PRODUCE, producerClientId)); + brokers.forEach(broker -> assertNoMetric(broker, "byte-rate", QuotaType.FETCH, consumerClientId)); + brokers.forEach(broker -> assertNoMetric(broker, "throttle-time", QuotaType.FETCH, consumerClientId)); + brokers.forEach(broker -> assertNoMetric(broker, "request-time", QuotaType.REQUEST, producerClientId)); + brokers.forEach(broker -> assertNoMetric(broker, "throttle-time", QuotaType.REQUEST, producerClientId)); + brokers.forEach(broker -> assertNoMetric(broker, "request-time", QuotaType.REQUEST, consumerClientId)); + brokers.forEach(broker -> assertNoMetric(broker, "throttle-time", QuotaType.REQUEST, consumerClientId)); + } + } + + private void assertNoMetric(KafkaBroker broker, String name, QuotaType quotaType, String clientId) { + var metricName = broker.metrics().metricName(name, quotaType.toString(), "", "user", "", "client-id", clientId); + assertNull(broker.metrics().metric(metricName), "Metric should not have been created " + metricName); + } + + @ClusterTest + public void testClassicConsumerSeekThrowsIllegalStateIfPartitionsNotAssigned() throws Exception { + testSeekThrowsIllegalStateIfPartitionsNotAssigned(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerSeekThrowsIllegalStateIfPartitionsNotAssigned() throws Exception { + testSeekThrowsIllegalStateIfPartitionsNotAssigned(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testSeekThrowsIllegalStateIfPartitionsNotAssigned(Map<String, Object> consumerConfig) throws Exception { + cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT); + try (var consumer = cluster.consumer(consumerConfig)) { + var e = assertThrows(IllegalStateException.class, () -> consumer.seekToEnd(List.of(TP))); + assertEquals("No current assignment for partition " + TP, e.getMessage()); + } + } + + @ClusterTest + public void testClassicConsumingWithNullGroupId() throws Exception { + testConsumingWithNullGroupId(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName(), + VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName(), + BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers() + )); + } + + @ClusterTest + public void testAsyncConsumerConsumingWithNullGroupId() throws Exception { + testConsumingWithNullGroupId(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT), + KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName(), + VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName(), + BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers() + )); + } + + private void testConsumingWithNullGroupId(Map<String, Object> consumerConfig) throws Exception { + var partition = 0; + cluster.createTopic(TOPIC, 1, (short) 1); + + // consumer 1 uses the default group id and consumes from earliest offset + Map<String, Object> consumer1Config = new HashMap<>(consumerConfig); + consumer1Config.put(AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumer1Config.put(CLIENT_ID_CONFIG, "consumer1"); + + // consumer 2 uses the default group id and consumes from latest offset + Map<String, Object> consumer2Config = new HashMap<>(consumerConfig); + consumer2Config.put(AUTO_OFFSET_RESET_CONFIG, "latest"); + consumer2Config.put(CLIENT_ID_CONFIG, "consumer2"); + + // consumer 3 uses the default group id and starts from an explicit offset + Map<String, Object> consumer3Config = new HashMap<>(consumerConfig); + consumer3Config.put(CLIENT_ID_CONFIG, "consumer3"); + + try (Producer<byte[], byte[]> producer = cluster.producer(); + Consumer<byte[], byte[]> consumer1 = new KafkaConsumer<>(consumer1Config); + Consumer<byte[], byte[]> consumer2 = new KafkaConsumer<>(consumer2Config); + Consumer<byte[], byte[]> consumer3 = new KafkaConsumer<>(consumer3Config) + ) { + producer.send(new ProducerRecord<>(TOPIC, partition, "k1".getBytes(), "v1".getBytes())).get(); + producer.send(new ProducerRecord<>(TOPIC, partition, "k2".getBytes(), "v2".getBytes())).get(); + producer.send(new ProducerRecord<>(TOPIC, partition, "k3".getBytes(), "v3".getBytes())).get(); + + consumer1.assign(List.of(TP)); + consumer2.assign(List.of(TP)); + consumer3.assign(List.of(TP)); + consumer3.seek(TP, 1); + + var numRecords1 = consumer1.poll(Duration.ofMillis(5000)).count(); + assertThrows(InvalidGroupIdException.class, consumer1::commitSync); + assertThrows(InvalidGroupIdException.class, () -> consumer2.committed(Set.of(TP))); + + var numRecords2 = consumer2.poll(Duration.ofMillis(5000)).count(); + var numRecords3 = consumer3.poll(Duration.ofMillis(5000)).count(); + + consumer1.unsubscribe(); + consumer2.unsubscribe(); + consumer3.unsubscribe(); + + assertTrue(consumer1.assignment().isEmpty()); + assertTrue(consumer2.assignment().isEmpty()); + assertTrue(consumer3.assignment().isEmpty()); + + consumer1.close(); + consumer2.close(); + consumer3.close(); + + assertEquals(3, numRecords1, "Expected consumer1 to consume from earliest offset"); + assertEquals(0, numRecords2, "Expected consumer2 to consume from latest offset"); + assertEquals(2, numRecords3, "Expected consumer3 to consume from offset 1"); + } + } + + @ClusterTest + public void testClassicConsumerNullGroupIdNotSupportedIfCommitting() throws Exception { + testNullGroupIdNotSupportedIfCommitting(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName(), + VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName(), + BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers(), + AUTO_OFFSET_RESET_CONFIG, "earliest", + CLIENT_ID_CONFIG, "consumer1" + )); + } + + @ClusterTest + public void testAsyncConsumerNullGroupIdNotSupportedIfCommitting() throws Exception { + testNullGroupIdNotSupportedIfCommitting(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT), + KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName(), + VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName(), + BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers(), + AUTO_OFFSET_RESET_CONFIG, "earliest", + CLIENT_ID_CONFIG, "consumer1" + )); + } + + private void testNullGroupIdNotSupportedIfCommitting(Map<String, Object> consumerConfig) throws Exception { + cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT); + try (var consumer = new KafkaConsumer<>(consumerConfig)) { + consumer.assign(List.of(TP)); + assertThrows(InvalidGroupIdException.class, consumer::commitSync); + } + } + + @ClusterTest + public void testClassicConsumerStaticConsumerDetectsNewPartitionCreatedAfterRestart() throws Exception { + testStaticConsumerDetectsNewPartitionCreatedAfterRestart(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, "my-group-id", + GROUP_INSTANCE_ID_CONFIG, "my-instance-id", + METADATA_MAX_AGE_CONFIG, 100, + MAX_POLL_INTERVAL_MS_CONFIG, 6000 + )); + } + + @ClusterTest + public void testAsyncConsumerStaticConsumerDetectsNewPartitionCreatedAfterRestart() throws Exception { + testStaticConsumerDetectsNewPartitionCreatedAfterRestart(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, "my-group-id", + GROUP_INSTANCE_ID_CONFIG, "my-instance-id", + METADATA_MAX_AGE_CONFIG, 100, + MAX_POLL_INTERVAL_MS_CONFIG, 6000 + )); + } + + private void testStaticConsumerDetectsNewPartitionCreatedAfterRestart(Map<String, Object> consumerConfig) throws Exception { + var foo = "foo"; + var foo0 = new TopicPartition(foo, 0); + var foo1 = new TopicPartition(foo, 1); + cluster.createTopic(foo, 1, (short) 1); + + try (Consumer<byte[], byte[]> consumer1 = cluster.consumer(consumerConfig); + Consumer<byte[], byte[]> consumer2 = cluster.consumer(consumerConfig); + var admin = cluster.admin() + ) { + consumer1.subscribe(List.of(foo)); + awaitAssignment(consumer1, Set.of(foo0)); + consumer1.close(); + + consumer2.subscribe(List.of(foo)); + awaitAssignment(consumer2, Set.of(foo0)); + + admin.createPartitions(Map.of(foo, NewPartitions.increaseTo(2))).all().get(); + awaitAssignment(consumer2, Set.of(foo0, foo1)); + } + } + + @ClusterTest + public void testClassicConsumerEndOffsets() throws Exception { + testEndOffsets(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + METADATA_MAX_AGE_CONFIG, 100, + MAX_POLL_INTERVAL_MS_CONFIG, 6000 + )); + } + + @ClusterTest + public void testAsyncConsumerEndOffsets() throws Exception { + testEndOffsets(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT), + METADATA_MAX_AGE_CONFIG, 100, + MAX_POLL_INTERVAL_MS_CONFIG, 6000 + )); + } + + private void testEndOffsets(Map<String, Object> consumerConfig) throws Exception { + var numRecords = 10000; + var tp2 = new TopicPartition(TOPIC, 1); + cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT); + + try (Producer<byte[], byte[]> producer = cluster.producer(); + Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig) + ) { + var startingTimestamp = System.currentTimeMillis(); + for (var i = 0; i < numRecords; i++) { + var timestamp = startingTimestamp + (long) i; + ProducerRecord<byte[], byte[]> record = new ProducerRecord<>( + TP.topic(), + TP.partition(), + timestamp, + ("key " + i).getBytes(), + ("value " + i).getBytes() + ); + producer.send(record); + } + producer.flush(); + + consumer.subscribe(List.of(TOPIC)); + awaitAssignment(consumer, Set.of(TP, tp2)); + + var endOffsets = consumer.endOffsets(Set.of(TP)); + assertEquals(numRecords, endOffsets.get(TP)); + } + } + + @ClusterTest + public void testClassicConsumerFetchOffsetsForTime() throws Exception { + testFetchOffsetsForTime(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerFetchOffsetsForTime() throws Exception { + testFetchOffsetsForTime(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testFetchOffsetsForTime(Map<String, Object> consumerConfig) throws Exception { + var numPartitions = 2; + var tp2 = new TopicPartition(TOPIC, 1); + cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT); + + try (Producer<byte[], byte[]> producer = cluster.producer(); + Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig) + ) { + Map<TopicPartition, Long> timestampsToSearch = new HashMap<>(); + for (int part = 0, i = 0; part < numPartitions; part++, i++) { + var tp = new TopicPartition(TOPIC, part); + // key, val, and timestamp equal to the sequence number. + sendRecords(producer, tp, 100, 0); + timestampsToSearch.put(tp, i * 20L); + } + // Test negative target time + assertThrows(IllegalArgumentException.class, () -> consumer.offsetsForTimes(Map.of(TP, -1L))); + var timestampOffsets = consumer.offsetsForTimes(timestampsToSearch); + + var timestampTp0 = timestampOffsets.get(TP); + assertEquals(0, timestampTp0.offset()); + assertEquals(0, timestampTp0.timestamp()); + assertEquals(Optional.of(0), timestampTp0.leaderEpoch()); + + var timestampTp1 = timestampOffsets.get(tp2); + assertEquals(20, timestampTp1.offset()); + assertEquals(20, timestampTp1.timestamp()); + assertEquals(Optional.of(0), timestampTp1.leaderEpoch()); + } + } + + @ClusterTest + public void testClassicConsumerPositionRespectsTimeout() { + testPositionRespectsTimeout(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerPositionRespectsTimeout() { + testPositionRespectsTimeout(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testPositionRespectsTimeout(Map<String, Object> consumerConfig) { + var topicPartition = new TopicPartition(TOPIC, 15); + try (var consumer = cluster.consumer(consumerConfig)) { + consumer.assign(List.of(topicPartition)); + // When position() is called for a topic/partition that doesn't exist, the consumer will repeatedly update the + // local metadata. However, it should give up after the user-supplied timeout has past. + assertThrows(TimeoutException.class, () -> consumer.position(topicPartition, Duration.ofSeconds(3))); + } + } + + @ClusterTest + public void testClassicConsumerPositionRespectsWakeup() { + testPositionRespectsWakeup(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerPositionRespectsWakeup() { + testPositionRespectsWakeup(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testPositionRespectsWakeup(Map<String, Object> consumerConfig) { + var topicPartition = new TopicPartition(TOPIC, 15); + try (var consumer = cluster.consumer(consumerConfig)) { + consumer.assign(List.of(topicPartition)); + CompletableFuture.runAsync(() -> { + try { + TimeUnit.SECONDS.sleep(1); + consumer.wakeup(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + assertThrows(WakeupException.class, () -> consumer.position(topicPartition, Duration.ofSeconds(3))); + } + } + + @ClusterTest + public void testClassicConsumerPositionWithErrorConnectionRespectsWakeup() { + testPositionWithErrorConnectionRespectsWakeup(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + // make sure the connection fails + BOOTSTRAP_SERVERS_CONFIG, "localhost:12345" + )); + } + + @ClusterTest + public void testAsyncConsumerPositionWithErrorConnectionRespectsWakeup() { + testPositionWithErrorConnectionRespectsWakeup(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT), + // make sure the connection fails + BOOTSTRAP_SERVERS_CONFIG, "localhost:12345" + )); + } + + private void testPositionWithErrorConnectionRespectsWakeup(Map<String, Object> consumerConfig) { + var topicPartition = new TopicPartition(TOPIC, 15); + try (var consumer = cluster.consumer(consumerConfig)) { + consumer.assign(List.of(topicPartition)); + CompletableFuture.runAsync(() -> { + try { + TimeUnit.SECONDS.sleep(1); + consumer.wakeup(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + assertThrows(WakeupException.class, () -> consumer.position(topicPartition, Duration.ofSeconds(100))); + } + } + + @Flaky("KAFKA-18031") + @ClusterTest + public void testClassicConsumerCloseLeavesGroupOnInterrupt() throws Exception { Review Comment: Could we leave those flaky in the old framework? It helps us to reproduce the failed tests. ########## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java: ########## @@ -0,0 +1,1640 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import kafka.server.KafkaBroker; + +import org.apache.kafka.clients.ClientsTestUtils.TestConsumerReassignmentListener; +import org.apache.kafka.clients.admin.NewPartitions; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.errors.InvalidGroupIdException; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; +import org.apache.kafka.common.test.api.Flaky; +import org.apache.kafka.common.test.api.Type; +import org.apache.kafka.server.quota.QuotaType; +import org.apache.kafka.test.MockConsumerInterceptor; +import org.apache.kafka.test.MockProducerInterceptor; +import org.apache.kafka.test.TestUtils; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.IntStream; + +import static org.apache.kafka.clients.ClientsTestUtils.BaseConsumerTestcase.BROKER_COUNT; +import static org.apache.kafka.clients.ClientsTestUtils.BaseConsumerTestcase.TOPIC; +import static org.apache.kafka.clients.ClientsTestUtils.BaseConsumerTestcase.TP; +import static org.apache.kafka.clients.ClientsTestUtils.BaseConsumerTestcase.testClusterResourceListener; +import static org.apache.kafka.clients.ClientsTestUtils.BaseConsumerTestcase.testCoordinatorFailover; +import static org.apache.kafka.clients.ClientsTestUtils.BaseConsumerTestcase.testSimpleConsumption; +import static org.apache.kafka.clients.ClientsTestUtils.awaitAssignment; +import static org.apache.kafka.clients.ClientsTestUtils.awaitRebalance; +import static org.apache.kafka.clients.ClientsTestUtils.consumeAndVerifyRecords; +import static org.apache.kafka.clients.ClientsTestUtils.consumeAndVerifyRecordsWithTimeTypeLogAppend; +import static org.apache.kafka.clients.ClientsTestUtils.consumeRecords; +import static org.apache.kafka.clients.ClientsTestUtils.sendAndAwaitAsyncCommit; +import static org.apache.kafka.clients.ClientsTestUtils.sendRecords; +import static org.apache.kafka.clients.CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG; +import static org.apache.kafka.clients.CommonClientConfigs.METADATA_MAX_AGE_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.CLIENT_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_INSTANCE_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@ClusterTestDefaults( + types = {Type.KRAFT}, + brokers = BROKER_COUNT, + serverProperties = { + @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), + @ClusterConfigProperty(key = GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, value = "100"), + @ClusterConfigProperty(key = GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, value = "60000"), + @ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "10"), + } +) +public class PlaintextConsumerTest { + + private final ClusterInstance cluster; + public static final double EPSILON = 0.1; + + public PlaintextConsumerTest(ClusterInstance cluster) { + this.cluster = cluster; + } + + @ClusterTest + public void testClassicConsumerSimpleConsumption() throws InterruptedException { + testSimpleConsumption(cluster, Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerSimpleConsumption() throws InterruptedException { + testSimpleConsumption(cluster, Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testClassicConsumerClusterResourceListener() throws InterruptedException { + testClusterResourceListener(cluster, Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerClusterResourceListener() throws InterruptedException { + testClusterResourceListener(cluster, Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testClassicConsumerCoordinatorFailover() throws InterruptedException { + Map<String, Object> config = Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + SESSION_TIMEOUT_MS_CONFIG, 5001, + HEARTBEAT_INTERVAL_MS_CONFIG, 1000, + // Use higher poll timeout to avoid consumer leaving the group due to timeout + MAX_POLL_INTERVAL_MS_CONFIG, 15000 + ); + testCoordinatorFailover(cluster, config); + } + + @ClusterTest + public void testAsyncConsumeCoordinatorFailover() throws InterruptedException { + Map<String, Object> config = Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT), + // Use higher poll timeout to avoid consumer leaving the group due to timeout + MAX_POLL_INTERVAL_MS_CONFIG, 15000 + ); + testCoordinatorFailover(cluster, config); + } + + @ClusterTest + public void testClassicConsumerHeaders() throws Exception { + testHeaders(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerHeaders() throws Exception { + testHeaders(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testHeaders(Map<String, Object> consumerConfig) throws Exception { + var numRecords = 1; + + try (Producer<byte[], byte[]> producer = cluster.producer(); + Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig) + ) { + var record = new ProducerRecord<>(TP.topic(), TP.partition(), null, "key".getBytes(), "value".getBytes()); + record.headers().add("headerKey", "headerValue".getBytes()); + producer.send(record); + + assertEquals(0, consumer.assignment().size()); + consumer.assign(List.of(TP)); + assertEquals(1, consumer.assignment().size()); + + consumer.seek(TP, 0); + var records = consumeRecords(consumer, numRecords); + assertEquals(numRecords, records.size()); + var header = records.get(0).headers().lastHeader("headerKey"); + assertEquals("headerValue", header == null ? null : new String(header.value())); + } + } + + @ClusterTest + public void testClassicConsumerHeadersSerializerDeserializer() throws Exception { + testHeadersSerializeDeserialize(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerHeadersSerializerDeserializer() throws Exception { + testHeadersSerializeDeserialize(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testHeadersSerializeDeserialize(Map<String, Object> config) throws InterruptedException { + var numRecords = 1; + Map<String, Object> consumerConfig = new HashMap<>(config); + consumerConfig.put(VALUE_DESERIALIZER_CLASS_CONFIG, DeserializerImpl.class); + Map<String, Object> producerConfig = Map.of( + VALUE_SERIALIZER_CLASS_CONFIG, SerializerImpl.class.getName() + ); + + try (Producer<byte[], byte[]> producer = cluster.producer(producerConfig); + Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig) + ) { + producer.send(new ProducerRecord<>( + TP.topic(), + TP.partition(), + null, + "key".getBytes(), + "value".getBytes()) + ); + + assertEquals(0, consumer.assignment().size()); + consumer.assign(List.of(TP)); + assertEquals(1, consumer.assignment().size()); + + consumer.seek(TP, 0); + assertEquals(numRecords, consumeRecords(consumer, numRecords).size()); + } + } + + @ClusterTest + public void testClassicConsumerAutoOffsetReset() throws Exception { + testAutoOffsetReset(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerAutoOffsetReset() throws Exception { + testAutoOffsetReset(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testAutoOffsetReset(Map<String, Object> consumerConfig) throws Exception { + try (Producer<byte[], byte[]> producer = cluster.producer(); + Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig) + ) { + var startingTimestamp = System.currentTimeMillis(); + sendRecords(producer, TP, 1, startingTimestamp); + consumer.assign(List.of(TP)); + consumeAndVerifyRecords(consumer, TP, 1, 0, 0, startingTimestamp); + } + } + + @ClusterTest + public void testClassicConsumerGroupConsumption() throws Exception { + testGroupConsumption(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerGroupConsumption() throws Exception { + testGroupConsumption(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testGroupConsumption(Map<String, Object> consumerConfig) throws Exception { + try (Producer<byte[], byte[]> producer = cluster.producer(); + Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig) + ) { + var startingTimestamp = System.currentTimeMillis(); + sendRecords(producer, TP, 10, startingTimestamp); + consumer.subscribe(List.of(TOPIC)); + consumeAndVerifyRecords(consumer, TP, 1, 0, 0, startingTimestamp); + } + } + + @ClusterTest + public void testClassicConsumerPartitionsFor() throws Exception { + testPartitionsFor(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerPartitionsFor() throws Exception { + testPartitionsFor(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testPartitionsFor(Map<String, Object> consumerConfig) throws Exception { + var numParts = 2; + cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT); + cluster.createTopic("part-test", numParts, (short) 1); + + try (var consumer = cluster.consumer(consumerConfig)) { + var partitions = consumer.partitionsFor(TOPIC); + assertNotNull(partitions); + assertEquals(2, partitions.size()); + } + } + + @ClusterTest + public void testClassicConsumerPartitionsForAutoCreate() throws Exception { + testPartitionsForAutoCreate(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerPartitionsForAutoCreate() throws Exception { + testPartitionsForAutoCreate(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testPartitionsForAutoCreate(Map<String, Object> consumerConfig) throws Exception { + try (var consumer = cluster.consumer(consumerConfig)) { + // First call would create the topic + consumer.partitionsFor("non-exist-topic"); + TestUtils.waitForCondition( + () -> !consumer.partitionsFor("non-exist-topic").isEmpty(), + "Timed out while awaiting non empty partitions." + ); + } + } + + @ClusterTest + public void testClassicConsumerPartitionsForInvalidTopic() { + testPartitionsForInvalidTopic(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerPartitionsForInvalidTopic() { + testPartitionsForInvalidTopic(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testPartitionsForInvalidTopic(Map<String, Object> consumerConfig) { + try (var consumer = cluster.consumer(consumerConfig)) { + assertThrows(InvalidTopicException.class, () -> consumer.partitionsFor(";3# ads,{234")); + } + } + + @ClusterTest + public void testClassicConsumerSeek() throws Exception { + testSeek( + Map.of(GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerSeek() throws Exception { + testSeek( + Map.of(GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testSeek(Map<String, Object> consumerConfig) throws Exception { + var totalRecords = 50; + var mid = totalRecords / 2; + cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT); + + try (Producer<byte[], byte[]> producer = cluster.producer(); + Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig) + ) { + var startingTimestamp = 0; + sendRecords(producer, TP, totalRecords, startingTimestamp); + + consumer.assign(List.of(TP)); + consumer.seekToEnd(List.of(TP)); + assertEquals(totalRecords, consumer.position(TP)); + assertTrue(consumer.poll(Duration.ofMillis(50)).isEmpty()); + + consumer.seekToBeginning(List.of(TP)); + assertEquals(0, consumer.position(TP)); + consumeAndVerifyRecords(consumer, TP, 1, 0, 0, startingTimestamp); + + consumer.seek(TP, mid); + assertEquals(mid, consumer.position(TP)); + + consumeAndVerifyRecords(consumer, TP, 1, mid, mid, mid); + + // Test seek compressed message + var tp2 = new TopicPartition(TOPIC, 1); + cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT); + sendCompressedMessages(totalRecords, tp2); + consumer.assign(List.of(tp2)); + + consumer.seekToEnd(List.of(tp2)); + assertEquals(totalRecords, consumer.position(tp2)); + assertTrue(consumer.poll(Duration.ofMillis(50)).isEmpty()); + + consumer.seekToBeginning(List.of(tp2)); + assertEquals(0L, consumer.position(tp2)); + consumeAndVerifyRecords(consumer, tp2, 1, 0); + + consumer.seek(tp2, mid); + assertEquals(mid, consumer.position(tp2)); + consumeAndVerifyRecords(consumer, tp2, 1, mid, mid, mid); + } + } + + @ClusterTest + public void testClassicConsumerPartitionPauseAndResume() throws Exception { + testPartitionPauseAndResume(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerPartitionPauseAndResume() throws Exception { + testPartitionPauseAndResume(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testPartitionPauseAndResume(Map<String, Object> consumerConfig) throws Exception { + var partitions = List.of(TP); + var numRecords = 5; + + try (Producer<byte[], byte[]> producer = cluster.producer(); + Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig) + ) { + var startingTimestamp = System.currentTimeMillis(); + sendRecords(producer, TP, numRecords, startingTimestamp); + + consumer.assign(partitions); + consumeAndVerifyRecords(consumer, TP, numRecords, 0, 0, startingTimestamp); + consumer.pause(partitions); + startingTimestamp = System.currentTimeMillis(); + sendRecords(producer, TP, numRecords, startingTimestamp); + assertTrue(consumer.poll(Duration.ofMillis(100)).isEmpty()); + consumer.resume(partitions); + consumeAndVerifyRecords(consumer, TP, numRecords, 5, 0, startingTimestamp); + } + } + + @ClusterTest + public void testClassicConsumerInterceptors() throws Exception { + testInterceptors(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerInterceptors() throws Exception { + testInterceptors(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testInterceptors(Map<String, Object> consumerConfig) throws Exception { + var appendStr = "mock"; + MockConsumerInterceptor.resetCounters(); + MockProducerInterceptor.resetCounters(); + + // create producer with interceptor + Map<String, Object> producerConfig = Map.of( + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MockProducerInterceptor.class.getName(), + KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(), + VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(), + "mock.interceptor.append", appendStr + ); + // create consumer with interceptor + Map<String, Object> consumerConfigOverride = new HashMap<>(consumerConfig); + consumerConfigOverride.put(INTERCEPTOR_CLASSES_CONFIG, MockConsumerInterceptor.class.getName()); + consumerConfigOverride.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerConfigOverride.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + + try (Producer<String, String> producer = cluster.producer(producerConfig); + Consumer<String, String> consumer = cluster.consumer(consumerConfigOverride) + ) { + // produce records + var numRecords = 10; + List<Future<RecordMetadata>> futures = new ArrayList<>(); + for (var i = 0; i < numRecords; i++) { + Future<RecordMetadata> future = producer.send( + new ProducerRecord<>(TP.topic(), TP.partition(), "key " + i, "value " + i) + ); + futures.add(future); + } + + // Wait for all sends to complete + futures.forEach(future -> assertDoesNotThrow(() -> future.get())); + + assertEquals(numRecords, MockProducerInterceptor.ONSEND_COUNT.intValue()); + assertEquals(numRecords, MockProducerInterceptor.ON_SUCCESS_COUNT.intValue()); + + // send invalid record + assertThrows( + Throwable.class, + () -> producer.send(null), + "Should not allow sending a null record" + ); + assertEquals( + 1, + MockProducerInterceptor.ON_ERROR_COUNT.intValue(), + "Interceptor should be notified about exception" + ); + assertEquals( + 0, + MockProducerInterceptor.ON_ERROR_WITH_METADATA_COUNT.intValue(), + "Interceptor should not receive metadata with an exception when record is null" + ); + + consumer.assign(List.of(TP)); + consumer.seek(TP, 0); + + // consume and verify that values are modified by interceptors + var records = consumeRecords(consumer, numRecords); + for (var i = 0; i < numRecords; i++) { + ConsumerRecord<String, String> record = records.get(i); + assertEquals("key " + i, record.key()); + assertEquals(("value " + i + appendStr).toUpperCase(Locale.ROOT), record.value()); + } + + // commit sync and verify onCommit is called + var commitCountBefore = MockConsumerInterceptor.ON_COMMIT_COUNT.intValue(); + consumer.commitSync(Map.of(TP, new OffsetAndMetadata(2L))); + assertEquals(2, consumer.committed(Set.of(TP)).get(TP).offset()); + assertEquals(commitCountBefore + 1, MockConsumerInterceptor.ON_COMMIT_COUNT.intValue()); + + // commit async and verify onCommit is called + var offsetsToCommit = Map.of(TP, new OffsetAndMetadata(5L)); + sendAndAwaitAsyncCommit(consumer, Optional.of(offsetsToCommit)); + assertEquals(5, consumer.committed(Set.of(TP)).get(TP).offset()); + assertEquals(commitCountBefore + 2, MockConsumerInterceptor.ON_COMMIT_COUNT.intValue()); + } + // cleanup + MockConsumerInterceptor.resetCounters(); + MockProducerInterceptor.resetCounters(); + } + + @ClusterTest + public void testClassicConsumerInterceptorsWithWrongKeyValue() throws Exception { + testInterceptorsWithWrongKeyValue(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerInterceptorsWithWrongKeyValue() throws Exception { + testInterceptorsWithWrongKeyValue(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testInterceptorsWithWrongKeyValue(Map<String, Object> consumerConfig) throws Exception { + var appendStr = "mock"; + // create producer with interceptor that has different key and value types from the producer + Map<String, Object> producerConfig = Map.of( + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MockProducerInterceptor.class.getName(), + "mock.interceptor.append", appendStr + ); + // create consumer with interceptor that has different key and value types from the consumer + Map<String, Object> consumerConfigOverride = new HashMap<>(consumerConfig); + consumerConfigOverride.put(INTERCEPTOR_CLASSES_CONFIG, MockConsumerInterceptor.class.getName()); + + try (Producer<byte[], byte[]> producer = cluster.producer(producerConfig); + Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfigOverride) + ) { + // producing records should succeed + producer.send(new ProducerRecord<>( + TP.topic(), + TP.partition(), + "key".getBytes(), + "value will not be modified".getBytes() + )); + + consumer.assign(List.of(TP)); + consumer.seek(TP, 0); + // consume and verify that values are not modified by interceptors -- their exceptions are caught and logged, but not propagated + var records = consumeRecords(consumer, 1); + var record = records.get(0); + assertEquals("value will not be modified", new String(record.value())); + } + } + + @ClusterTest + public void testClassicConsumerConsumeMessagesWithCreateTime() throws Exception { + testConsumeMessagesWithCreateTime(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerConsumeMessagesWithCreateTime() throws Exception { + testConsumeMessagesWithCreateTime(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testConsumeMessagesWithCreateTime(Map<String, Object> consumerConfig) throws Exception { + cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT); + var numRecords = 50; + var tp2 = new TopicPartition(TOPIC, 1); + + try (Producer<byte[], byte[]> producer = cluster.producer(); + Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig) + ) { + // Test non-compressed messages + var startingTimestamp = System.currentTimeMillis(); + sendRecords(producer, TP, numRecords, startingTimestamp); + consumer.assign(List.of(TP)); + consumeAndVerifyRecords(consumer, TP, numRecords, 0, 0, startingTimestamp); + + // Test compressed messages + sendCompressedMessages(numRecords, tp2); + consumer.assign(List.of(tp2)); + consumeAndVerifyRecords(consumer, tp2, numRecords, 0, 0, 0); + } + } + + @ClusterTest + public void testClassicConsumerConsumeMessagesWithLogAppendTime() throws Exception { + testConsumeMessagesWithLogAppendTime(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerConsumeMessagesWithLogAppendTime() throws Exception { + testConsumeMessagesWithLogAppendTime(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testConsumeMessagesWithLogAppendTime(Map<String, Object> consumerConfig) throws Exception { + var topicName = "testConsumeMessagesWithLogAppendTime"; + var startTime = System.currentTimeMillis(); + var numRecords = 50; + cluster.createTopic(topicName, 2, (short) 2, Map.of(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime")); + + try (Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig)) { + // Test non-compressed messages + var tp1 = new TopicPartition(topicName, 0); + sendRecords(cluster, tp1, numRecords); + consumer.assign(List.of(tp1)); + consumeAndVerifyRecordsWithTimeTypeLogAppend(consumer, tp1, numRecords, startTime); + + // Test compressed messages + var tp2 = new TopicPartition(topicName, 1); + sendCompressedMessages(numRecords, tp2); + consumer.assign(List.of(tp2)); + consumeAndVerifyRecordsWithTimeTypeLogAppend(consumer, tp2, numRecords, startTime); + } + } + + @ClusterTest + public void testClassicConsumerListTopics() throws Exception { + testListTopics(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerListTopics() throws Exception { + testListTopics(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testListTopics(Map<String, Object> consumerConfig) throws Exception { + var numParts = 2; + var topic1 = "part-test-topic-1"; + var topic2 = "part-test-topic-2"; + var topic3 = "part-test-topic-3"; + cluster.createTopic(topic1, numParts, (short) 1); + cluster.createTopic(topic2, numParts, (short) 1); + cluster.createTopic(topic3, numParts, (short) 1); + + sendRecords(cluster, new TopicPartition(topic1, 0), 1); + + try (var consumer = cluster.consumer(consumerConfig)) { + // consumer some messages, and we can list the internal topic __consumer_offsets + consumer.subscribe(List.of(topic1)); + consumer.poll(Duration.ofMillis(100)); + var topics = consumer.listTopics(); + assertNotNull(topics); + assertEquals(4, topics.size()); + assertEquals(2, topics.get(topic1).size()); + assertEquals(2, topics.get(topic2).size()); + assertEquals(2, topics.get(topic3).size()); + } + } + + @ClusterTest + public void testClassicConsumerPauseStateNotPreservedByRebalance() throws Exception { + testPauseStateNotPreservedByRebalance(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + SESSION_TIMEOUT_MS_CONFIG, 100, + HEARTBEAT_INTERVAL_MS_CONFIG, 30 + )); + } + + @ClusterTest + public void testAsyncConsumerPauseStateNotPreservedByRebalance() throws Exception { + testPauseStateNotPreservedByRebalance(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testPauseStateNotPreservedByRebalance(Map<String, Object> consumerConfig) throws Exception { + try (Producer<byte[], byte[]> producer = cluster.producer(); + Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig) + ) { + var startingTimestamp = System.currentTimeMillis(); + sendRecords(producer, TP, 5, startingTimestamp); + consumer.subscribe(List.of(TOPIC)); + consumeAndVerifyRecords(consumer, TP, 5, 0, 0, startingTimestamp); + consumer.pause(List.of(TP)); + + // subscribe to a new topic to trigger a rebalance + consumer.subscribe(List.of("topic2")); + + // after rebalance, our position should be reset and our pause state lost, + // so we should be able to consume from the beginning + consumeAndVerifyRecords(consumer, TP, 0, 5, 0, startingTimestamp); + } + } + + @ClusterTest + public void testClassicConsumerPerPartitionLeadMetricsCleanUpWithSubscribe() throws Exception { + String consumerClientId = "testClassicConsumerPerPartitionLeadMetricsCleanUpWithSubscribe"; + testPerPartitionLeadMetricsCleanUpWithSubscribe(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, consumerClientId, + CLIENT_ID_CONFIG, consumerClientId + ), consumerClientId); + } + + @ClusterTest + public void testAsyncConsumerPerPartitionLeadMetricsCleanUpWithSubscribe() throws Exception { + String consumerClientId = "testAsyncConsumerPerPartitionLeadMetricsCleanUpWithSubscribe"; + testPerPartitionLeadMetricsCleanUpWithSubscribe(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, consumerClientId, + CLIENT_ID_CONFIG, consumerClientId + ), consumerClientId); + } + + private void testPerPartitionLeadMetricsCleanUpWithSubscribe( + Map<String, Object> consumerConfig, + String consumerClientId + ) throws Exception { + var numMessages = 1000; + var topic2 = "topic2"; + var tp2 = new TopicPartition(TOPIC, 1); + cluster.createTopic(topic2, 2, (short) BROKER_COUNT); + + try (Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig)) { + // send some messages. + sendRecords(cluster, TP, numMessages); + + // Test subscribe + // Create a consumer and consumer some messages. + var listener = new TestConsumerReassignmentListener(); + consumer.subscribe(List.of(TOPIC, topic2), listener); + var records = awaitNonEmptyRecords(consumer, TP); + assertEquals(1, listener.callsToAssigned, "should be assigned once"); + + // Verify the metric exist. + Map<String, String> tags1 = Map.of( + "client-id", consumerClientId, + "topic", TP.topic(), + "partition", String.valueOf(TP.partition()) + ); + + Map<String, String> tags2 = Map.of( + "client-id", consumerClientId, + "topic", tp2.topic(), + "partition", String.valueOf(tp2.partition()) + ); + + var fetchLead0 = consumer.metrics().get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags1)); + assertNotNull(fetchLead0); + assertEquals((double) records.count(), fetchLead0.metricValue(), "The lead should be " + records.count()); + + // Remove topic from subscription + consumer.subscribe(List.of(topic2), listener); + awaitRebalance(consumer, listener); + + // Verify the metric has gone + assertNull(consumer.metrics().get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags1))); + assertNull(consumer.metrics().get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags2))); + } + } + + @ClusterTest + public void testClassicConsumerPerPartitionLagMetricsCleanUpWithSubscribe() throws Exception { + String consumerClientId = "testClassicConsumerPerPartitionLagMetricsCleanUpWithSubscribe"; + testPerPartitionLagMetricsCleanUpWithSubscribe(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, consumerClientId, + CLIENT_ID_CONFIG, consumerClientId + ), consumerClientId); + } + + @ClusterTest + public void testAsyncConsumerPerPartitionLagMetricsCleanUpWithSubscribe() throws Exception { + String consumerClientId = "testAsyncConsumerPerPartitionLagMetricsCleanUpWithSubscribe"; + testPerPartitionLagMetricsCleanUpWithSubscribe(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, consumerClientId, + CLIENT_ID_CONFIG, consumerClientId + ), consumerClientId); + } + + private void testPerPartitionLagMetricsCleanUpWithSubscribe( + Map<String, Object> consumerConfig, + String consumerClientId + ) throws Exception { + int numMessages = 1000; + var topic2 = "topic2"; + var tp2 = new TopicPartition(TOPIC, 1); + cluster.createTopic(topic2, 2, (short) BROKER_COUNT); + + try (Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig)) { + // send some messages. + sendRecords(cluster, TP, numMessages); + + // Test subscribe + // Create a consumer and consumer some messages. + var listener = new TestConsumerReassignmentListener(); + consumer.subscribe(List.of(TOPIC, topic2), listener); + var records = awaitNonEmptyRecords(consumer, TP); + assertEquals(1, listener.callsToAssigned, "should be assigned once"); + + // Verify the metric exist. + Map<String, String> tags1 = Map.of( + "client-id", consumerClientId, + "topic", TP.topic(), + "partition", String.valueOf(TP.partition()) + ); + + Map<String, String> tags2 = Map.of( + "client-id", consumerClientId, + "topic", tp2.topic(), + "partition", String.valueOf(tp2.partition()) + ); + + var fetchLag0 = consumer.metrics().get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags1)); + assertNotNull(fetchLag0); + var expectedLag = numMessages - records.count(); + assertEquals(expectedLag, (double) fetchLag0.metricValue(), EPSILON, "The lag should be " + expectedLag); + + // Remove topic from subscription + consumer.subscribe(List.of(topic2), listener); + awaitRebalance(consumer, listener); + + // Verify the metric has gone + assertNull(consumer.metrics().get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags1))); + assertNull(consumer.metrics().get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags2))); + } + } + + @ClusterTest + public void testClassicConsumerPerPartitionLeadMetricsCleanUpWithAssign() throws Exception { + String consumerClientId = "testClassicConsumerPerPartitionLeadMetricsCleanUpWithAssign"; + testPerPartitionLeadMetricsCleanUpWithAssign(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, consumerClientId, + CLIENT_ID_CONFIG, consumerClientId + ), consumerClientId); + } + + @ClusterTest + public void testAsyncConsumerPerPartitionLeadMetricsCleanUpWithAssign() throws Exception { + String consumerClientId = "testAsyncConsumerPerPartitionLeadMetricsCleanUpWithAssign"; + testPerPartitionLeadMetricsCleanUpWithAssign(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, consumerClientId, + CLIENT_ID_CONFIG, consumerClientId + ), consumerClientId); + } + + private void testPerPartitionLeadMetricsCleanUpWithAssign( + Map<String, Object> consumerConfig, + String consumerClientId + ) throws Exception { + var numMessages = 1000; + var tp2 = new TopicPartition(TOPIC, 1); + cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT); + + try (Producer<byte[], byte[]> producer = cluster.producer(); + Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig) + ) { + // Test assign send some messages. + sendRecords(producer, TP, numMessages, System.currentTimeMillis()); + sendRecords(producer, tp2, numMessages, System.currentTimeMillis()); + + consumer.assign(List.of(TP)); + var records = awaitNonEmptyRecords(consumer, TP); + + // Verify the metric exist. + Map<String, String> tags = Map.of( + "client-id", consumerClientId, + "topic", TP.topic(), + "partition", String.valueOf(TP.partition()) + ); + + var fetchLead = consumer.metrics().get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags)); + assertNotNull(fetchLead); + assertEquals((double) records.count(), fetchLead.metricValue(), "The lead should be " + records.count()); + + consumer.assign(List.of(tp2)); + awaitNonEmptyRecords(consumer, tp2); + assertNull(consumer.metrics().get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags))); + } + } + + @ClusterTest + public void testClassicConsumerPerPartitionLagMetricsCleanUpWithAssign() throws Exception { + String consumerClientId = "testClassicConsumerPerPartitionLagMetricsCleanUpWithAssign"; + testPerPartitionLagMetricsCleanUpWithAssign(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, consumerClientId, + CLIENT_ID_CONFIG, consumerClientId + ), consumerClientId); + } + + @ClusterTest + public void testAsyncConsumerPerPartitionLagMetricsCleanUpWithAssign() throws Exception { + String consumerClientId = "testAsyncConsumerPerPartitionLagMetricsCleanUpWithAssign"; + testPerPartitionLagMetricsCleanUpWithAssign(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, consumerClientId, + CLIENT_ID_CONFIG, consumerClientId + ), consumerClientId); + } + + private void testPerPartitionLagMetricsCleanUpWithAssign( + Map<String, Object> consumerConfig, + String consumerClientId + ) throws Exception { + var numMessages = 1000; + var tp2 = new TopicPartition(TOPIC, 1); + cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT); + + try (Producer<byte[], byte[]> producer = cluster.producer(); + Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig) + ) { + // Test assign send some messages. + sendRecords(producer, TP, numMessages, System.currentTimeMillis()); + sendRecords(producer, tp2, numMessages, System.currentTimeMillis()); + + consumer.assign(List.of(TP)); + var records = awaitNonEmptyRecords(consumer, TP); + + // Verify the metric exist. + Map<String, String> tags = Map.of( + "client-id", consumerClientId, + "topic", TP.topic(), + "partition", String.valueOf(TP.partition()) + ); + + var fetchLag = consumer.metrics().get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags)); + assertNotNull(fetchLag); + + var expectedLag = numMessages - records.count(); + assertEquals(expectedLag, (double) fetchLag.metricValue(), EPSILON, "The lag should be " + expectedLag); + consumer.assign(List.of(tp2)); + awaitNonEmptyRecords(consumer, tp2); + assertNull(consumer.metrics().get(new MetricName(TP + ".records-lag", "consumer-fetch-manager-metrics", "", tags))); + assertNull(consumer.metrics().get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags))); + } + } + + @ClusterTest + public void testClassicConsumerPerPartitionLagMetricsWhenReadCommitted() throws Exception { + String consumerClientId = "testClassicConsumerPerPartitionLagMetricsWhenReadCommitted"; + testPerPartitionLagMetricsWhenReadCommitted(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, consumerClientId, + CLIENT_ID_CONFIG, consumerClientId, + ISOLATION_LEVEL_CONFIG, "read_committed" + ), consumerClientId); + } + + @ClusterTest + public void testAsyncConsumerPerPartitionLagMetricsWhenReadCommitted() throws Exception { + String consumerClientId = "testAsyncConsumerPerPartitionLagMetricsWhenReadCommitted"; + testPerPartitionLagMetricsWhenReadCommitted(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, consumerClientId, + CLIENT_ID_CONFIG, consumerClientId, + ISOLATION_LEVEL_CONFIG, "read_committed" + ), consumerClientId); + } + + private void testPerPartitionLagMetricsWhenReadCommitted( + Map<String, Object> consumerConfig, + String consumerClientId + ) throws Exception { + var numMessages = 1000; + var tp2 = new TopicPartition(TOPIC, 1); + cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT); + + try (Producer<byte[], byte[]> producer = cluster.producer(); + Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig) + ) { + // Test assign send some messages. + sendRecords(producer, TP, numMessages, System.currentTimeMillis()); + sendRecords(producer, tp2, numMessages, System.currentTimeMillis()); + + consumer.assign(List.of(TP)); + awaitNonEmptyRecords(consumer, TP); + + // Verify the metric exist. + Map<String, String> tags = Map.of( + "client-id", consumerClientId, + "topic", TP.topic(), + "partition", String.valueOf(TP.partition()) + ); + + var fetchLag = consumer.metrics().get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags)); + assertNotNull(fetchLag); + } + } + + @ClusterTest + public void testClassicConsumerQuotaMetricsNotCreatedIfNoQuotasConfigured() throws Exception { + var consumerClientId = "testClassicConsumerQuotaMetricsNotCreatedIfNoQuotasConfigured"; + testQuotaMetricsNotCreatedIfNoQuotasConfigured(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, consumerClientId, + CLIENT_ID_CONFIG, consumerClientId, + ISOLATION_LEVEL_CONFIG, "read_committed" + ), consumerClientId); + } + + @ClusterTest + public void testAsyncConsumerQuotaMetricsNotCreatedIfNoQuotasConfigured() throws Exception { + var consumerClientId = "testAsyncConsumerQuotaMetricsNotCreatedIfNoQuotasConfigured"; + testQuotaMetricsNotCreatedIfNoQuotasConfigured(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, consumerClientId, + CLIENT_ID_CONFIG, consumerClientId, + ISOLATION_LEVEL_CONFIG, "read_committed" + ), consumerClientId); + } + + private void testQuotaMetricsNotCreatedIfNoQuotasConfigured( + Map<String, Object> consumerConfig, + String consumerClientId + ) throws Exception { + var producerClientId = UUID.randomUUID().toString(); + var numRecords = 1000; + cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT); + + try (Producer<byte[], byte[]> producer = cluster.producer(Map.of(ProducerConfig.CLIENT_ID_CONFIG, producerClientId)); + Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig) + ) { + var startingTimestamp = System.currentTimeMillis(); + sendRecords(producer, TP, numRecords, startingTimestamp); + + consumer.assign(List.of(TP)); + consumer.seek(TP, 0); + consumeAndVerifyRecords(consumer, TP, numRecords, 0, 0, startingTimestamp); + + var brokers = cluster.brokers().values(); + brokers.forEach(broker -> assertNoMetric(broker, "byte-rate", QuotaType.PRODUCE, producerClientId)); + brokers.forEach(broker -> assertNoMetric(broker, "throttle-time", QuotaType.PRODUCE, producerClientId)); + brokers.forEach(broker -> assertNoMetric(broker, "byte-rate", QuotaType.FETCH, consumerClientId)); + brokers.forEach(broker -> assertNoMetric(broker, "throttle-time", QuotaType.FETCH, consumerClientId)); + brokers.forEach(broker -> assertNoMetric(broker, "request-time", QuotaType.REQUEST, producerClientId)); + brokers.forEach(broker -> assertNoMetric(broker, "throttle-time", QuotaType.REQUEST, producerClientId)); + brokers.forEach(broker -> assertNoMetric(broker, "request-time", QuotaType.REQUEST, consumerClientId)); + brokers.forEach(broker -> assertNoMetric(broker, "throttle-time", QuotaType.REQUEST, consumerClientId)); + } + } + + private void assertNoMetric(KafkaBroker broker, String name, QuotaType quotaType, String clientId) { + var metricName = broker.metrics().metricName(name, quotaType.toString(), "", "user", "", "client-id", clientId); + assertNull(broker.metrics().metric(metricName), "Metric should not have been created " + metricName); + } + + @ClusterTest + public void testClassicConsumerSeekThrowsIllegalStateIfPartitionsNotAssigned() throws Exception { + testSeekThrowsIllegalStateIfPartitionsNotAssigned(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerSeekThrowsIllegalStateIfPartitionsNotAssigned() throws Exception { + testSeekThrowsIllegalStateIfPartitionsNotAssigned(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testSeekThrowsIllegalStateIfPartitionsNotAssigned(Map<String, Object> consumerConfig) throws Exception { + cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT); + try (var consumer = cluster.consumer(consumerConfig)) { + var e = assertThrows(IllegalStateException.class, () -> consumer.seekToEnd(List.of(TP))); + assertEquals("No current assignment for partition " + TP, e.getMessage()); + } + } + + @ClusterTest + public void testClassicConsumingWithNullGroupId() throws Exception { + testConsumingWithNullGroupId(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName(), + VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName(), + BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers() + )); + } + + @ClusterTest + public void testAsyncConsumerConsumingWithNullGroupId() throws Exception { + testConsumingWithNullGroupId(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT), + KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName(), + VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName(), + BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers() + )); + } + + private void testConsumingWithNullGroupId(Map<String, Object> consumerConfig) throws Exception { + var partition = 0; + cluster.createTopic(TOPIC, 1, (short) 1); + + // consumer 1 uses the default group id and consumes from earliest offset + Map<String, Object> consumer1Config = new HashMap<>(consumerConfig); + consumer1Config.put(AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumer1Config.put(CLIENT_ID_CONFIG, "consumer1"); + + // consumer 2 uses the default group id and consumes from latest offset + Map<String, Object> consumer2Config = new HashMap<>(consumerConfig); + consumer2Config.put(AUTO_OFFSET_RESET_CONFIG, "latest"); + consumer2Config.put(CLIENT_ID_CONFIG, "consumer2"); + + // consumer 3 uses the default group id and starts from an explicit offset + Map<String, Object> consumer3Config = new HashMap<>(consumerConfig); + consumer3Config.put(CLIENT_ID_CONFIG, "consumer3"); + + try (Producer<byte[], byte[]> producer = cluster.producer(); + Consumer<byte[], byte[]> consumer1 = new KafkaConsumer<>(consumer1Config); + Consumer<byte[], byte[]> consumer2 = new KafkaConsumer<>(consumer2Config); + Consumer<byte[], byte[]> consumer3 = new KafkaConsumer<>(consumer3Config) + ) { + producer.send(new ProducerRecord<>(TOPIC, partition, "k1".getBytes(), "v1".getBytes())).get(); + producer.send(new ProducerRecord<>(TOPIC, partition, "k2".getBytes(), "v2".getBytes())).get(); + producer.send(new ProducerRecord<>(TOPIC, partition, "k3".getBytes(), "v3".getBytes())).get(); + + consumer1.assign(List.of(TP)); + consumer2.assign(List.of(TP)); + consumer3.assign(List.of(TP)); + consumer3.seek(TP, 1); + + var numRecords1 = consumer1.poll(Duration.ofMillis(5000)).count(); + assertThrows(InvalidGroupIdException.class, consumer1::commitSync); + assertThrows(InvalidGroupIdException.class, () -> consumer2.committed(Set.of(TP))); + + var numRecords2 = consumer2.poll(Duration.ofMillis(5000)).count(); + var numRecords3 = consumer3.poll(Duration.ofMillis(5000)).count(); + + consumer1.unsubscribe(); + consumer2.unsubscribe(); + consumer3.unsubscribe(); + + assertTrue(consumer1.assignment().isEmpty()); + assertTrue(consumer2.assignment().isEmpty()); + assertTrue(consumer3.assignment().isEmpty()); + + consumer1.close(); + consumer2.close(); + consumer3.close(); + + assertEquals(3, numRecords1, "Expected consumer1 to consume from earliest offset"); + assertEquals(0, numRecords2, "Expected consumer2 to consume from latest offset"); + assertEquals(2, numRecords3, "Expected consumer3 to consume from offset 1"); + } + } + + @ClusterTest + public void testClassicConsumerNullGroupIdNotSupportedIfCommitting() throws Exception { + testNullGroupIdNotSupportedIfCommitting(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName(), + VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName(), + BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers(), + AUTO_OFFSET_RESET_CONFIG, "earliest", + CLIENT_ID_CONFIG, "consumer1" + )); + } + + @ClusterTest + public void testAsyncConsumerNullGroupIdNotSupportedIfCommitting() throws Exception { + testNullGroupIdNotSupportedIfCommitting(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT), + KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName(), + VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName(), + BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers(), + AUTO_OFFSET_RESET_CONFIG, "earliest", + CLIENT_ID_CONFIG, "consumer1" + )); + } + + private void testNullGroupIdNotSupportedIfCommitting(Map<String, Object> consumerConfig) throws Exception { + cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT); + try (var consumer = new KafkaConsumer<>(consumerConfig)) { + consumer.assign(List.of(TP)); + assertThrows(InvalidGroupIdException.class, consumer::commitSync); + } + } + + @ClusterTest + public void testClassicConsumerStaticConsumerDetectsNewPartitionCreatedAfterRestart() throws Exception { + testStaticConsumerDetectsNewPartitionCreatedAfterRestart(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, "my-group-id", + GROUP_INSTANCE_ID_CONFIG, "my-instance-id", + METADATA_MAX_AGE_CONFIG, 100, + MAX_POLL_INTERVAL_MS_CONFIG, 6000 + )); + } + + @ClusterTest + public void testAsyncConsumerStaticConsumerDetectsNewPartitionCreatedAfterRestart() throws Exception { + testStaticConsumerDetectsNewPartitionCreatedAfterRestart(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, "my-group-id", + GROUP_INSTANCE_ID_CONFIG, "my-instance-id", + METADATA_MAX_AGE_CONFIG, 100, + MAX_POLL_INTERVAL_MS_CONFIG, 6000 + )); + } + + private void testStaticConsumerDetectsNewPartitionCreatedAfterRestart(Map<String, Object> consumerConfig) throws Exception { + var foo = "foo"; + var foo0 = new TopicPartition(foo, 0); + var foo1 = new TopicPartition(foo, 1); + cluster.createTopic(foo, 1, (short) 1); + + try (Consumer<byte[], byte[]> consumer1 = cluster.consumer(consumerConfig); + Consumer<byte[], byte[]> consumer2 = cluster.consumer(consumerConfig); + var admin = cluster.admin() + ) { + consumer1.subscribe(List.of(foo)); + awaitAssignment(consumer1, Set.of(foo0)); + consumer1.close(); + + consumer2.subscribe(List.of(foo)); + awaitAssignment(consumer2, Set.of(foo0)); + + admin.createPartitions(Map.of(foo, NewPartitions.increaseTo(2))).all().get(); + awaitAssignment(consumer2, Set.of(foo0, foo1)); + } + } + + @ClusterTest + public void testClassicConsumerEndOffsets() throws Exception { + testEndOffsets(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + METADATA_MAX_AGE_CONFIG, 100, + MAX_POLL_INTERVAL_MS_CONFIG, 6000 + )); + } + + @ClusterTest + public void testAsyncConsumerEndOffsets() throws Exception { + testEndOffsets(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT), + METADATA_MAX_AGE_CONFIG, 100, + MAX_POLL_INTERVAL_MS_CONFIG, 6000 + )); + } + + private void testEndOffsets(Map<String, Object> consumerConfig) throws Exception { + var numRecords = 10000; + var tp2 = new TopicPartition(TOPIC, 1); + cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT); + + try (Producer<byte[], byte[]> producer = cluster.producer(); + Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig) + ) { + var startingTimestamp = System.currentTimeMillis(); + for (var i = 0; i < numRecords; i++) { + var timestamp = startingTimestamp + (long) i; + ProducerRecord<byte[], byte[]> record = new ProducerRecord<>( + TP.topic(), + TP.partition(), + timestamp, + ("key " + i).getBytes(), + ("value " + i).getBytes() + ); + producer.send(record); + } + producer.flush(); + + consumer.subscribe(List.of(TOPIC)); + awaitAssignment(consumer, Set.of(TP, tp2)); + + var endOffsets = consumer.endOffsets(Set.of(TP)); + assertEquals(numRecords, endOffsets.get(TP)); + } + } + + @ClusterTest + public void testClassicConsumerFetchOffsetsForTime() throws Exception { + testFetchOffsetsForTime(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerFetchOffsetsForTime() throws Exception { + testFetchOffsetsForTime(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testFetchOffsetsForTime(Map<String, Object> consumerConfig) throws Exception { + var numPartitions = 2; + var tp2 = new TopicPartition(TOPIC, 1); + cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT); + + try (Producer<byte[], byte[]> producer = cluster.producer(); + Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig) + ) { + Map<TopicPartition, Long> timestampsToSearch = new HashMap<>(); + for (int part = 0, i = 0; part < numPartitions; part++, i++) { + var tp = new TopicPartition(TOPIC, part); + // key, val, and timestamp equal to the sequence number. + sendRecords(producer, tp, 100, 0); + timestampsToSearch.put(tp, i * 20L); + } + // Test negative target time + assertThrows(IllegalArgumentException.class, () -> consumer.offsetsForTimes(Map.of(TP, -1L))); + var timestampOffsets = consumer.offsetsForTimes(timestampsToSearch); + + var timestampTp0 = timestampOffsets.get(TP); + assertEquals(0, timestampTp0.offset()); + assertEquals(0, timestampTp0.timestamp()); + assertEquals(Optional.of(0), timestampTp0.leaderEpoch()); + + var timestampTp1 = timestampOffsets.get(tp2); + assertEquals(20, timestampTp1.offset()); + assertEquals(20, timestampTp1.timestamp()); + assertEquals(Optional.of(0), timestampTp1.leaderEpoch()); + } + } + + @ClusterTest + public void testClassicConsumerPositionRespectsTimeout() { + testPositionRespectsTimeout(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerPositionRespectsTimeout() { + testPositionRespectsTimeout(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testPositionRespectsTimeout(Map<String, Object> consumerConfig) { + var topicPartition = new TopicPartition(TOPIC, 15); + try (var consumer = cluster.consumer(consumerConfig)) { + consumer.assign(List.of(topicPartition)); + // When position() is called for a topic/partition that doesn't exist, the consumer will repeatedly update the + // local metadata. However, it should give up after the user-supplied timeout has past. + assertThrows(TimeoutException.class, () -> consumer.position(topicPartition, Duration.ofSeconds(3))); + } + } + + @ClusterTest + public void testClassicConsumerPositionRespectsWakeup() { + testPositionRespectsWakeup(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerPositionRespectsWakeup() { + testPositionRespectsWakeup(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testPositionRespectsWakeup(Map<String, Object> consumerConfig) { + var topicPartition = new TopicPartition(TOPIC, 15); + try (var consumer = cluster.consumer(consumerConfig)) { + consumer.assign(List.of(topicPartition)); + CompletableFuture.runAsync(() -> { + try { + TimeUnit.SECONDS.sleep(1); + consumer.wakeup(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + assertThrows(WakeupException.class, () -> consumer.position(topicPartition, Duration.ofSeconds(3))); + } + } + + @ClusterTest + public void testClassicConsumerPositionWithErrorConnectionRespectsWakeup() { + testPositionWithErrorConnectionRespectsWakeup(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + // make sure the connection fails + BOOTSTRAP_SERVERS_CONFIG, "localhost:12345" + )); + } + + @ClusterTest + public void testAsyncConsumerPositionWithErrorConnectionRespectsWakeup() { + testPositionWithErrorConnectionRespectsWakeup(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT), + // make sure the connection fails + BOOTSTRAP_SERVERS_CONFIG, "localhost:12345" + )); + } + + private void testPositionWithErrorConnectionRespectsWakeup(Map<String, Object> consumerConfig) { + var topicPartition = new TopicPartition(TOPIC, 15); + try (var consumer = cluster.consumer(consumerConfig)) { + consumer.assign(List.of(topicPartition)); + CompletableFuture.runAsync(() -> { + try { + TimeUnit.SECONDS.sleep(1); + consumer.wakeup(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + assertThrows(WakeupException.class, () -> consumer.position(topicPartition, Duration.ofSeconds(100))); + } + } + + @Flaky("KAFKA-18031") + @ClusterTest + public void testClassicConsumerCloseLeavesGroupOnInterrupt() throws Exception { + testCloseLeavesGroupOnInterrupt(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName(), + VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName(), + AUTO_OFFSET_RESET_CONFIG, "earliest", + GROUP_ID_CONFIG, "group_test,", + BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers() + )); + } + + @Flaky("KAFKA-18031") Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org