dajac commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r658512975



##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -4226,6 +4235,124 @@ public void testListOffsetsNonRetriableErrors() throws 
Exception {
         }
     }
 
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() {
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            Collections.singleton(new PartitionInfo("foo", 0, node, new 
Node[]{node}, new Node[]{node})),
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, 
AdminClientConfig.RETRIES_CONFIG, "2")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = 
env.adminClient().listOffsets(Collections.singletonMap(tp0, 
OffsetSpec.maxTimestamp()));
+
+            TestUtils.assertFutureThrows(result.all(), 
UnsupportedVersionException.class);
+        }
+    }
+
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() 
throws Exception {
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        List<PartitionInfo> pInfos = new ArrayList<>();
+        pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new 
Node[]{node}));
+        pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new 
Node[]{node}));
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            pInfos,
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+        final TopicPartition tp1 = new TopicPartition("foo", 1);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
+            AdminClientConfig.RETRIES_CONFIG, "2")) {
+
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                // ensure that the initial request contains max timestamp 
requests
+                request -> request instanceof ListOffsetsRequest && 
((ListOffsetsRequest) request).topics().stream()
+                    .flatMap(t -> t.partitions().stream())
+                    .anyMatch(p -> p.timestamp() == 
ListOffsetsRequest.MAX_TIMESTAMP));
+
+            ListOffsetsTopicResponse topicResponse = 
ListOffsetsResponse.singletonListOffsetsTopicResponse(tp1, Errors.NONE, -1L, 
345L, 543);
+            ListOffsetsResponseData responseData = new 
ListOffsetsResponseData()
+                .setThrottleTimeMs(0)
+                .setTopics(Arrays.asList(topicResponse));
+            env.kafkaClient().prepareResponseFrom(
+                // ensure that no max timestamp requests are retried
+                request -> request instanceof ListOffsetsRequest && 
((ListOffsetsRequest) request).topics().stream()
+                    .flatMap(t -> t.partitions().stream())
+                    .noneMatch(p -> p.timestamp() == 
ListOffsetsRequest.MAX_TIMESTAMP),
+                new ListOffsetsResponse(responseData), node);
+
+            ListOffsetsResult result = env.adminClient().listOffsets(new 
HashMap<TopicPartition, OffsetSpec>() {{
+                    put(tp0, OffsetSpec.maxTimestamp());
+                    put(tp1, OffsetSpec.latest());
+                }});
+
+            TestUtils.assertFutureThrows(result.partitionResult(tp0), 
UnsupportedVersionException.class);
+
+            ListOffsetsResultInfo tp1Offset = 
result.partitionResult(tp1).get();
+            assertEquals(345L, tp1Offset.offset());
+            assertEquals(543, tp1Offset.leaderEpoch().get().intValue());
+            assertEquals(-1L, tp1Offset.timestamp());
+        }
+    }
+
+    @Test
+    public void testListOffsetsUnsupportedNonMaxTimestamp() {
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        List<PartitionInfo> pInfos = new ArrayList<>();
+        pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new 
Node[]{node}));
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            pInfos,
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
+            AdminClientConfig.RETRIES_CONFIG, "2")) {
+
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                // ensure that the initial request doesn't contain max 
timestamp requests
+                request -> request instanceof ListOffsetsRequest && 
((ListOffsetsRequest) request).topics().stream()
+                    .flatMap(t -> t.partitions().stream())
+                    .noneMatch(p -> p.timestamp() == 
ListOffsetsRequest.MAX_TIMESTAMP));
+
+            ListOffsetsResult result = env.adminClient().listOffsets(new 
HashMap<TopicPartition, OffsetSpec>() {{
+                    put(tp0, OffsetSpec.latest());
+                }});

Review comment:
       nit: We could use `singletonMap` here.

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -4226,6 +4235,124 @@ public void testListOffsetsNonRetriableErrors() throws 
Exception {
         }
     }
 
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() {
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            Collections.singleton(new PartitionInfo("foo", 0, node, new 
Node[]{node}, new Node[]{node})),
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, 
AdminClientConfig.RETRIES_CONFIG, "2")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = 
env.adminClient().listOffsets(Collections.singletonMap(tp0, 
OffsetSpec.maxTimestamp()));
+
+            TestUtils.assertFutureThrows(result.all(), 
UnsupportedVersionException.class);
+        }
+    }
+
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() 
throws Exception {
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        List<PartitionInfo> pInfos = new ArrayList<>();
+        pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new 
Node[]{node}));
+        pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new 
Node[]{node}));
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            pInfos,
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+        final TopicPartition tp1 = new TopicPartition("foo", 1);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
+            AdminClientConfig.RETRIES_CONFIG, "2")) {
+
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                // ensure that the initial request contains max timestamp 
requests
+                request -> request instanceof ListOffsetsRequest && 
((ListOffsetsRequest) request).topics().stream()
+                    .flatMap(t -> t.partitions().stream())
+                    .anyMatch(p -> p.timestamp() == 
ListOffsetsRequest.MAX_TIMESTAMP));

Review comment:
       As discussed yesterday, the matcher is not called. Therefore, I think 
that we should remove the logic here as it is misleading. The condition does 
not bring much anyway. Please, check the other usages of 
`prepareUnsupportedVersionResponse`.

##########
File path: core/src/main/scala/kafka/api/ApiVersion.scala
##########
@@ -114,7 +114,9 @@ object ApiVersion {
     // Introduced topic IDs to LeaderAndIsr and UpdateMetadata 
requests/responses (KIP-516)
     KAFKA_2_8_IV1,
     // Introduce AllocateProducerIds (KIP-730)
-    KAFKA_3_0_IV0
+    KAFKA_3_0_IV0,
+    // Introduce ListOffsets maxTimestamps (KIP-734)

Review comment:
       Could we say `Introduce ListOffsets V7 which supports listing offsets by 
max timestamp (KIP-734)`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to