dajac commented on a change in pull request #10760: URL: https://github.com/apache/kafka/pull/10760#discussion_r658512975
########## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ########## @@ -4226,6 +4235,124 @@ public void testListOffsetsNonRetriableErrors() throws Exception { } } + @Test + public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() { + Node node = new Node(0, "localhost", 8120); + List<Node> nodes = Collections.singletonList(node); + final Cluster cluster = new Cluster( + "mockClusterId", + nodes, + Collections.singleton(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})), + Collections.emptySet(), + Collections.emptySet(), + node); + final TopicPartition tp0 = new TopicPartition("foo", 0); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + + // listoffsets response from broker 0 + env.kafkaClient().prepareUnsupportedVersionResponse( + request -> request instanceof ListOffsetsRequest); + + ListOffsetsResult result = env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.maxTimestamp())); + + TestUtils.assertFutureThrows(result.all(), UnsupportedVersionException.class); + } + } + + @Test + public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() throws Exception { + Node node = new Node(0, "localhost", 8120); + List<Node> nodes = Collections.singletonList(node); + List<PartitionInfo> pInfos = new ArrayList<>(); + pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})); + pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new Node[]{node})); + final Cluster cluster = new Cluster( + "mockClusterId", + nodes, + pInfos, + Collections.emptySet(), + Collections.emptySet(), + node); + final TopicPartition tp0 = new TopicPartition("foo", 0); + final TopicPartition tp1 = new TopicPartition("foo", 1); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, + AdminClientConfig.RETRIES_CONFIG, "2")) { + + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + + // listoffsets response from broker 0 + env.kafkaClient().prepareUnsupportedVersionResponse( + // ensure that the initial request contains max timestamp requests + request -> request instanceof ListOffsetsRequest && ((ListOffsetsRequest) request).topics().stream() + .flatMap(t -> t.partitions().stream()) + .anyMatch(p -> p.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP)); + + ListOffsetsTopicResponse topicResponse = ListOffsetsResponse.singletonListOffsetsTopicResponse(tp1, Errors.NONE, -1L, 345L, 543); + ListOffsetsResponseData responseData = new ListOffsetsResponseData() + .setThrottleTimeMs(0) + .setTopics(Arrays.asList(topicResponse)); + env.kafkaClient().prepareResponseFrom( + // ensure that no max timestamp requests are retried + request -> request instanceof ListOffsetsRequest && ((ListOffsetsRequest) request).topics().stream() + .flatMap(t -> t.partitions().stream()) + .noneMatch(p -> p.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP), + new ListOffsetsResponse(responseData), node); + + ListOffsetsResult result = env.adminClient().listOffsets(new HashMap<TopicPartition, OffsetSpec>() {{ + put(tp0, OffsetSpec.maxTimestamp()); + put(tp1, OffsetSpec.latest()); + }}); + + TestUtils.assertFutureThrows(result.partitionResult(tp0), UnsupportedVersionException.class); + + ListOffsetsResultInfo tp1Offset = result.partitionResult(tp1).get(); + assertEquals(345L, tp1Offset.offset()); + assertEquals(543, tp1Offset.leaderEpoch().get().intValue()); + assertEquals(-1L, tp1Offset.timestamp()); + } + } + + @Test + public void testListOffsetsUnsupportedNonMaxTimestamp() { + Node node = new Node(0, "localhost", 8120); + List<Node> nodes = Collections.singletonList(node); + List<PartitionInfo> pInfos = new ArrayList<>(); + pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})); + final Cluster cluster = new Cluster( + "mockClusterId", + nodes, + pInfos, + Collections.emptySet(), + Collections.emptySet(), + node); + final TopicPartition tp0 = new TopicPartition("foo", 0); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, + AdminClientConfig.RETRIES_CONFIG, "2")) { + + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + + // listoffsets response from broker 0 + env.kafkaClient().prepareUnsupportedVersionResponse( + // ensure that the initial request doesn't contain max timestamp requests + request -> request instanceof ListOffsetsRequest && ((ListOffsetsRequest) request).topics().stream() + .flatMap(t -> t.partitions().stream()) + .noneMatch(p -> p.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP)); + + ListOffsetsResult result = env.adminClient().listOffsets(new HashMap<TopicPartition, OffsetSpec>() {{ + put(tp0, OffsetSpec.latest()); + }}); Review comment: nit: We could use `singletonMap` here. ########## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ########## @@ -4226,6 +4235,124 @@ public void testListOffsetsNonRetriableErrors() throws Exception { } } + @Test + public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() { + Node node = new Node(0, "localhost", 8120); + List<Node> nodes = Collections.singletonList(node); + final Cluster cluster = new Cluster( + "mockClusterId", + nodes, + Collections.singleton(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})), + Collections.emptySet(), + Collections.emptySet(), + node); + final TopicPartition tp0 = new TopicPartition("foo", 0); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + + // listoffsets response from broker 0 + env.kafkaClient().prepareUnsupportedVersionResponse( + request -> request instanceof ListOffsetsRequest); + + ListOffsetsResult result = env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.maxTimestamp())); + + TestUtils.assertFutureThrows(result.all(), UnsupportedVersionException.class); + } + } + + @Test + public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() throws Exception { + Node node = new Node(0, "localhost", 8120); + List<Node> nodes = Collections.singletonList(node); + List<PartitionInfo> pInfos = new ArrayList<>(); + pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})); + pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new Node[]{node})); + final Cluster cluster = new Cluster( + "mockClusterId", + nodes, + pInfos, + Collections.emptySet(), + Collections.emptySet(), + node); + final TopicPartition tp0 = new TopicPartition("foo", 0); + final TopicPartition tp1 = new TopicPartition("foo", 1); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, + AdminClientConfig.RETRIES_CONFIG, "2")) { + + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + + // listoffsets response from broker 0 + env.kafkaClient().prepareUnsupportedVersionResponse( + // ensure that the initial request contains max timestamp requests + request -> request instanceof ListOffsetsRequest && ((ListOffsetsRequest) request).topics().stream() + .flatMap(t -> t.partitions().stream()) + .anyMatch(p -> p.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP)); Review comment: As discussed yesterday, the matcher is not called. Therefore, I think that we should remove the logic here as it is misleading. The condition does not bring much anyway. Please, check the other usages of `prepareUnsupportedVersionResponse`. ########## File path: core/src/main/scala/kafka/api/ApiVersion.scala ########## @@ -114,7 +114,9 @@ object ApiVersion { // Introduced topic IDs to LeaderAndIsr and UpdateMetadata requests/responses (KIP-516) KAFKA_2_8_IV1, // Introduce AllocateProducerIds (KIP-730) - KAFKA_3_0_IV0 + KAFKA_3_0_IV0, + // Introduce ListOffsets maxTimestamps (KIP-734) Review comment: Could we say `Introduce ListOffsets V7 which supports listing offsets by max timestamp (KIP-734)`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org