thomaskwscott commented on a change in pull request #10760: URL: https://github.com/apache/kafka/pull/10760#discussion_r654459694
########## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ########## @@ -4226,6 +4227,137 @@ public void testListOffsetsNonRetriableErrors() throws Exception { } } + @Test + public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() { + + Node node = new Node(0, "localhost", 8120); + List<Node> nodes = Collections.singletonList(node); + final Cluster cluster = new Cluster( + "mockClusterId", + nodes, + Collections.singleton(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})), + Collections.emptySet(), + Collections.emptySet(), + node); + final TopicPartition tp0 = new TopicPartition("foo", 0); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + + // listoffsets response from broker 0 + env.kafkaClient().prepareUnsupportedVersionResponse( + request -> request instanceof ListOffsetsRequest); + + ListOffsetsResult result = env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.maxTimestamp())); + + Exception exception = assertThrows(ExecutionException.class, () -> { + Map<TopicPartition, ListOffsetsResultInfo> offsets = result.all().get(3, TimeUnit.SECONDS); + }); + assertTrue(exception.getCause() instanceof UnsupportedVersionException); + } + } + + @Test + public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() throws Exception { + + Node node = new Node(0, "localhost", 8120); + List<Node> nodes = Collections.singletonList(node); + List<PartitionInfo> pInfos = new ArrayList<>(); + pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})); + pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new Node[]{node})); + final Cluster cluster = new Cluster( + "mockClusterId", + nodes, + pInfos, + Collections.emptySet(), + Collections.emptySet(), + node); + final TopicPartition tp0 = new TopicPartition("foo", 0); + final TopicPartition tp1 = new TopicPartition("foo", 1); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, + AdminClientConfig.RETRIES_CONFIG, "2")) { + + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + + // listoffsets response from broker 0 + env.kafkaClient().prepareUnsupportedVersionResponse( + request -> request instanceof ListOffsetsRequest); + + ListOffsetsResult result = env.adminClient().listOffsets(new HashMap<TopicPartition, OffsetSpec>() {{ + put(tp0, OffsetSpec.maxTimestamp()); + put(tp1, OffsetSpec.latest()); + }}); Review comment: checkstyle is not letting me :-( ########## File path: core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala ########## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package integration.kafka.admin + +import kafka.integration.KafkaServerTestHarness +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import org.apache.kafka.clients.admin._ +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.utils.Utils +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} + +import scala.collection.{Map, Seq} +import scala.jdk.CollectionConverters._ + +class ListOffsetsIntegrationTest extends KafkaServerTestHarness { + + val topicName = "foo" + var adminClient: Admin = null + + @BeforeEach + override def setUp(): Unit = { + super.setUp() + createTopic(topicName,1,1.asInstanceOf[Short]) + produceMessages() + adminClient = Admin.create(Map[String, Object]( + AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> brokerList + ).asJava) + } + + @AfterEach + override def tearDown(): Unit = { + Utils.closeQuietly(adminClient, "ListOffsetsAdminClient") + super.tearDown() + } + + @Test + def testEarliestOffset(): Unit = { + val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest()) + assertEquals(0,earliestOffset.offset()) + } + + @Test + def testLatestOffset(): Unit = { + val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest()) + assertEquals(3,latestOffset.offset()) + } + + @Test + def testMaxTimestampOffset(): Unit = { + val maxTimestampOffset = runFetchOffsets(adminClient, OffsetSpec.maxTimestamp()) + assertEquals(1,maxTimestampOffset.offset()) + } + + private def runFetchOffsets(adminClient: Admin, + offsetSpec: OffsetSpec): ListOffsetsResult.ListOffsetsResultInfo = { + println(s"==> listOffsets(${topicName} -> ${offsetSpec}, new ListOffsetsOptions())") Review comment: removed ########## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ########## @@ -4226,6 +4227,137 @@ public void testListOffsetsNonRetriableErrors() throws Exception { } } + @Test + public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() { + + Node node = new Node(0, "localhost", 8120); + List<Node> nodes = Collections.singletonList(node); + final Cluster cluster = new Cluster( + "mockClusterId", + nodes, + Collections.singleton(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})), + Collections.emptySet(), + Collections.emptySet(), + node); + final TopicPartition tp0 = new TopicPartition("foo", 0); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + + // listoffsets response from broker 0 + env.kafkaClient().prepareUnsupportedVersionResponse( + request -> request instanceof ListOffsetsRequest); + + ListOffsetsResult result = env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.maxTimestamp())); + + Exception exception = assertThrows(ExecutionException.class, () -> { + Map<TopicPartition, ListOffsetsResultInfo> offsets = result.all().get(3, TimeUnit.SECONDS); + }); + assertTrue(exception.getCause() instanceof UnsupportedVersionException); Review comment: updated ########## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ########## @@ -4226,6 +4227,137 @@ public void testListOffsetsNonRetriableErrors() throws Exception { } } + @Test + public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() { + + Node node = new Node(0, "localhost", 8120); + List<Node> nodes = Collections.singletonList(node); + final Cluster cluster = new Cluster( + "mockClusterId", + nodes, + Collections.singleton(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})), + Collections.emptySet(), + Collections.emptySet(), + node); + final TopicPartition tp0 = new TopicPartition("foo", 0); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + + // listoffsets response from broker 0 + env.kafkaClient().prepareUnsupportedVersionResponse( + request -> request instanceof ListOffsetsRequest); + + ListOffsetsResult result = env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.maxTimestamp())); + + Exception exception = assertThrows(ExecutionException.class, () -> { + Map<TopicPartition, ListOffsetsResultInfo> offsets = result.all().get(3, TimeUnit.SECONDS); + }); + assertTrue(exception.getCause() instanceof UnsupportedVersionException); + } + } + + @Test + public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() throws Exception { + + Node node = new Node(0, "localhost", 8120); + List<Node> nodes = Collections.singletonList(node); + List<PartitionInfo> pInfos = new ArrayList<>(); + pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})); + pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new Node[]{node})); + final Cluster cluster = new Cluster( + "mockClusterId", + nodes, + pInfos, + Collections.emptySet(), + Collections.emptySet(), + node); + final TopicPartition tp0 = new TopicPartition("foo", 0); + final TopicPartition tp1 = new TopicPartition("foo", 1); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, + AdminClientConfig.RETRIES_CONFIG, "2")) { + + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + + // listoffsets response from broker 0 + env.kafkaClient().prepareUnsupportedVersionResponse( + request -> request instanceof ListOffsetsRequest); + + ListOffsetsResult result = env.adminClient().listOffsets(new HashMap<TopicPartition, OffsetSpec>() {{ + put(tp0, OffsetSpec.maxTimestamp()); + put(tp1, OffsetSpec.latest()); + }}); + + ListOffsetsTopicResponse topicResponse = ListOffsetsResponse.singletonListOffsetsTopicResponse(tp1, Errors.NONE, -1L, 345L, 543); + ListOffsetsResponseData responseData = new ListOffsetsResponseData() + .setThrottleTimeMs(0) + .setTopics(Arrays.asList(topicResponse)); + env.kafkaClient().prepareResponseFrom(new ListOffsetsResponse(responseData), node); Review comment: done ########## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ########## @@ -4226,6 +4227,137 @@ public void testListOffsetsNonRetriableErrors() throws Exception { } } + @Test + public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() { + + Node node = new Node(0, "localhost", 8120); + List<Node> nodes = Collections.singletonList(node); + final Cluster cluster = new Cluster( + "mockClusterId", + nodes, + Collections.singleton(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})), + Collections.emptySet(), + Collections.emptySet(), + node); + final TopicPartition tp0 = new TopicPartition("foo", 0); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + + // listoffsets response from broker 0 + env.kafkaClient().prepareUnsupportedVersionResponse( + request -> request instanceof ListOffsetsRequest); + + ListOffsetsResult result = env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.maxTimestamp())); + + Exception exception = assertThrows(ExecutionException.class, () -> { + Map<TopicPartition, ListOffsetsResultInfo> offsets = result.all().get(3, TimeUnit.SECONDS); + }); + assertTrue(exception.getCause() instanceof UnsupportedVersionException); + } + } + + @Test + public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() throws Exception { + + Node node = new Node(0, "localhost", 8120); + List<Node> nodes = Collections.singletonList(node); + List<PartitionInfo> pInfos = new ArrayList<>(); + pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})); + pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new Node[]{node})); + final Cluster cluster = new Cluster( + "mockClusterId", + nodes, + pInfos, + Collections.emptySet(), + Collections.emptySet(), + node); + final TopicPartition tp0 = new TopicPartition("foo", 0); + final TopicPartition tp1 = new TopicPartition("foo", 1); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, + AdminClientConfig.RETRIES_CONFIG, "2")) { + + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + + // listoffsets response from broker 0 + env.kafkaClient().prepareUnsupportedVersionResponse( + request -> request instanceof ListOffsetsRequest); + + ListOffsetsResult result = env.adminClient().listOffsets(new HashMap<TopicPartition, OffsetSpec>() {{ + put(tp0, OffsetSpec.maxTimestamp()); + put(tp1, OffsetSpec.latest()); + }}); + + ListOffsetsTopicResponse topicResponse = ListOffsetsResponse.singletonListOffsetsTopicResponse(tp1, Errors.NONE, -1L, 345L, 543); + ListOffsetsResponseData responseData = new ListOffsetsResponseData() + .setThrottleTimeMs(0) + .setTopics(Arrays.asList(topicResponse)); + env.kafkaClient().prepareResponseFrom(new ListOffsetsResponse(responseData), node); + + Exception exception = assertThrows(ExecutionException.class, () -> { + result.partitionResult(tp0).get(); + }); + assertTrue(exception.getCause() instanceof UnsupportedVersionException); + + ListOffsetsResultInfo tp1Offset = result.partitionResult(tp1).get(); + assertEquals(345L, tp1Offset.offset()); + assertEquals(543, tp1Offset.leaderEpoch().get().intValue()); + assertEquals(-1L, tp1Offset.timestamp()); + } + } + + @Test + public void testListOffsetsMaxTimestampAndNoBrokerResponse() { + Node node = new Node(0, "localhost", 8120); + List<Node> nodes = Collections.singletonList(node); + List<PartitionInfo> pInfos = new ArrayList<>(); + pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})); + pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new Node[]{node})); + final Cluster cluster = new Cluster( + "mockClusterId", + nodes, + pInfos, + Collections.emptySet(), + Collections.emptySet(), + node); + final TopicPartition tp0 = new TopicPartition("foo", 0); + final TopicPartition tp1 = new TopicPartition("foo", 1); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, + AdminClientConfig.RETRIES_CONFIG, "2")) { + + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + + // listoffsets response from broker 0 + env.kafkaClient().prepareUnsupportedVersionResponse( + request -> request instanceof ListOffsetsRequest); + + ListOffsetsResult result = env.adminClient().listOffsets(new HashMap<TopicPartition, OffsetSpec>() {{ + put(tp0, OffsetSpec.maxTimestamp()); + put(tp1, OffsetSpec.latest()); + }}); + + ListOffsetsResponseData responseData = new ListOffsetsResponseData() + .setThrottleTimeMs(0) + .setTopics(new ArrayList<>()); + env.kafkaClient().prepareResponseFrom(new ListOffsetsResponse(responseData), node); + + Exception maxTimestampException = assertThrows(ExecutionException.class, () -> { + result.partitionResult(tp0).get(); + }); + assertTrue(maxTimestampException.getCause() instanceof UnsupportedVersionException); + + Exception nopResponseException = assertThrows(ExecutionException.class, () -> { + result.partitionResult(tp1).get(); + }); + assertTrue(nopResponseException.getCause() instanceof ApiException); + } + } Review comment: I added it to the existing happy path test ########## File path: core/src/test/scala/unit/kafka/log/LogTest.scala ########## @@ -2072,6 +2072,30 @@ class LogTest { log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP)) } + @Test + def testFetchOffsetByTimestampWithMaxTimestampIncludesTimestamp(): Unit = { + val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1) + val log = createLog(logDir, logConfig) + + assertEquals(None, log.fetchOffsetByTimestamp(0L)) + + val firstTimestamp = mockTime.milliseconds + val leaderEpoch = 0 + log.appendAsLeader(TestUtils.singletonRecords( + value = TestUtils.randomBytes(10), + timestamp = firstTimestamp), + leaderEpoch = leaderEpoch) + + val secondTimestamp = firstTimestamp + 1 Review comment: done ########## File path: core/src/test/scala/unit/kafka/server/LogOffsetTest.scala ########## @@ -92,6 +92,57 @@ class LogOffsetTest extends BaseRequestTest { assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 3L), consumerOffsets) } + @Test + def testFetchOffsetByTimestampForMaxTimestampAfterTruncate(): Unit = { + val topic = "kafka-" + val topicPartition = new TopicPartition(topic, 0) + + createTopic(topic, 1, 1) + + val logManager = server.getLogManager + TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isDefined, + "Log for partition [topic,0] should be created") + val log = logManager.getLog(topicPartition).get + + for (timestamp <- 0 until 20) + log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes(), timestamp = timestamp.toLong), leaderEpoch = 0) + log.flush() + + log.updateHighWatermark(log.logEndOffset) + + val firstOffset = log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP) + assertEquals(19L, firstOffset.get.offset) + + log.truncateTo(0) + + val secondOffset = log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP) + assertEquals(0L, secondOffset.get.offset) + + } + + @Test + def testFetchOffsetByTimestampForMaxTimestampWithUnorderedTimestamps(): Unit = { + val topic = "kafka-" + val topicPartition = new TopicPartition(topic, 0) + + createTopic(topic, 1, 1) + + val logManager = server.getLogManager + TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isDefined, + "Log for partition [topic,0] should be created") + val log = logManager.getLog(topicPartition).get + + for (timestamp <- List(0L, 1L, 2L, 3L, 4L, 6L, 5L)) + log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes(), timestamp = timestamp), leaderEpoch = 0) + log.flush() + + log.updateHighWatermark(log.logEndOffset) + + val maxTimestampOffset = log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP) + assertEquals(7L, log.logEndOffset) + assertEquals(5L, maxTimestampOffset.get.offset) Review comment: updated ########## File path: core/src/test/scala/unit/kafka/server/LogOffsetTest.scala ########## @@ -92,6 +92,57 @@ class LogOffsetTest extends BaseRequestTest { assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 3L), consumerOffsets) } + @Test + def testFetchOffsetByTimestampForMaxTimestampAfterTruncate(): Unit = { + val topic = "kafka-" + val topicPartition = new TopicPartition(topic, 0) + + createTopic(topic, 1, 1) + + val logManager = server.getLogManager + TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isDefined, + "Log for partition [topic,0] should be created") + val log = logManager.getLog(topicPartition).get Review comment: sure, I refactored it out. ########## File path: core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala ########## @@ -162,11 +162,15 @@ class ListOffsetsRequestTest extends BaseRequestTest { val partitionToLeader = TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 3, servers) val firstLeaderId = partitionToLeader(partition.partition) - TestUtils.generateAndProduceMessages(servers, topic, 10) + // produce in 2 batches to ensure the max timestamp matches the last message + TestUtils.generateAndProduceMessages(servers, topic, 9) + Thread.sleep(10) + TestUtils.generateAndProduceMessages(servers, topic, 1) Review comment: This test had become flaky because multiple messages could have the same timestamp. This pause ensured the last message had the highest timestamp. I've removed the sleep now and explicitly set the timestamp in the test messages. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org