dajac commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r656222177



##########
File path: core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
##########
@@ -93,16 +87,52 @@ class LogOffsetTest extends BaseRequestTest {
   }
 
   @Test
-  def testGetOffsetsBeforeLatestTime(): Unit = {
+  def testFetchOffsetByTimestampForMaxTimestampAfterTruncate(): Unit = {
     val topic = "kafka-"
     val topicPartition = new TopicPartition(topic, 0)
+    val log = createTopicAndGetLog(topic, topicPartition)
 
-    createTopic(topic, 1, 1)
+    for (timestamp <- 0 until 20)
+      log.appendAsLeader(TestUtils.singletonRecords(value = 
Integer.toString(42).getBytes(), timestamp = timestamp.toLong), leaderEpoch = 0)
+    log.flush()
 
-    val logManager = server.getLogManager
-    TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isDefined,
-      s"Log for partition $topicPartition should be created")
-    val log = logManager.getLog(topicPartition).get
+    log.updateHighWatermark(log.logEndOffset)
+
+    val firstOffset = 
log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)
+    assertEquals(19L, firstOffset.get.offset)
+    assertEquals(19L, firstOffset.get.timestamp)
+
+    log.truncateTo(0)
+
+    val secondOffset = 
log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)
+    assertEquals(0L, secondOffset.get.offset)
+    assertEquals(-1L, secondOffset.get.timestamp)
+

Review comment:
       nit: Empty line could be removed.

##########
File path: 
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##########
@@ -302,10 +302,16 @@ public void testSerialization() throws Exception {
         checkErrorResponse(createDeleteGroupsRequest(), 
unknownServerException, true);
         checkResponse(createDeleteGroupsResponse(), 0, true);
         for (short version : LIST_OFFSETS.allVersions()) {
-            checkRequest(createListOffsetRequest(version), true);
-            checkErrorResponse(createListOffsetRequest(version), 
unknownServerException, true);
+            checkRequest(createListOffsetRequest(version, 1000000L), true);
+            checkErrorResponse(createListOffsetRequest(version, 1000000L), 
unknownServerException, true);
             checkResponse(createListOffsetResponse(version), version, true);
         }
+        LIST_OFFSETS.allVersions().stream().filter(version -> version >= 
(short) 7).forEach(
+            version -> {
+                checkRequest(createListOffsetRequest(version, 
ListOffsetsRequest.MAX_TIMESTAMP), true);
+                checkErrorResponse(createListOffsetRequest(version, 
ListOffsetsRequest.MAX_TIMESTAMP), unknownServerException, true);
+            }
+        );

Review comment:
       This does not bring much in this test suite. I think that we can remove 
it. The only important point is to test all the versions here.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java
##########
@@ -54,9 +55,11 @@ public static Builder forReplica(short allowedVersion, int 
replicaId) {
             return new Builder((short) 0, allowedVersion, replicaId, 
IsolationLevel.READ_UNCOMMITTED);
         }
 
-        public static Builder forConsumer(boolean requireTimestamp, 
IsolationLevel isolationLevel) {
+        public static Builder forConsumer(boolean requireTimestamp, 
IsolationLevel isolationLevel, boolean requireMaxTimestamp) {

Review comment:
       Should we add a small unit test for this change in 
`clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java`?

##########
File path: 
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##########
@@ -1444,43 +1450,63 @@ private DeleteGroupsResponse 
createDeleteGroupsResponse() {
         );
     }
 
-    private ListOffsetsRequest createListOffsetRequest(int version) {
+    private ListOffsetsRequest createListOffsetRequest(int version, long 
timestamp) {
         if (version == 0) {
             ListOffsetsTopic topic = new ListOffsetsTopic()
                     .setName("test")
                     .setPartitions(Arrays.asList(new ListOffsetsPartition()
                             .setPartitionIndex(0)
-                            .setTimestamp(1000000L)
+                            .setTimestamp(timestamp)
                             .setMaxNumOffsets(10)
                             .setCurrentLeaderEpoch(5)));
             return ListOffsetsRequest.Builder
-                    .forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
+                    .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false)
                     .setTargetTimes(Collections.singletonList(topic))
                     .build((short) version);
         } else if (version == 1) {
             ListOffsetsTopic topic = new ListOffsetsTopic()
                     .setName("test")
                     .setPartitions(Arrays.asList(new ListOffsetsPartition()
                             .setPartitionIndex(0)
-                            .setTimestamp(1000000L)
+                            .setTimestamp(timestamp)
                             .setCurrentLeaderEpoch(5)));
             return ListOffsetsRequest.Builder
-                    .forConsumer(true, IsolationLevel.READ_UNCOMMITTED)
+                    .forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false)
                     .setTargetTimes(Collections.singletonList(topic))
                     .build((short) version);
-        } else if (version >= 2 && version <= LIST_OFFSETS.latestVersion()) {
+        } else if (version >= 2 && version <= 6) {
             ListOffsetsPartition partition = new ListOffsetsPartition()
                     .setPartitionIndex(0)
-                    .setTimestamp(1000000L)
+                    .setTimestamp(timestamp)
                     .setCurrentLeaderEpoch(5);
 
             ListOffsetsTopic topic = new ListOffsetsTopic()
                     .setName("test")
                     .setPartitions(Arrays.asList(partition));
             return ListOffsetsRequest.Builder
-                    .forConsumer(true, IsolationLevel.READ_COMMITTED)
+                    .forConsumer(true, IsolationLevel.READ_COMMITTED, false)
                     .setTargetTimes(Collections.singletonList(topic))
                     .build((short) version);
+        } else if (version >= 7 && version <= LIST_OFFSETS.latestVersion()) {
+            ListOffsetsPartition partition = new ListOffsetsPartition()
+                .setPartitionIndex(0)
+                .setTimestamp(timestamp)
+                .setCurrentLeaderEpoch(5);
+
+            ListOffsetsTopic topic = new ListOffsetsTopic()
+                .setName("test")
+                .setPartitions(Arrays.asList(partition));
+            if (timestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
+                return ListOffsetsRequest.Builder
+                        .forConsumer(true, IsolationLevel.READ_COMMITTED, 
false)
+                        .setTargetTimes(Collections.singletonList(topic))
+                        .build((short) version);
+            } else {

Review comment:
       We could also remove this. Note that `.forConsumer(true, 
IsolationLevel.READ_COMMITTED, false)` is used here. The last argument should 
have been `true`. It shows that it does not really bring any value.

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -4226,6 +4235,124 @@ public void testListOffsetsNonRetriableErrors() throws 
Exception {
         }
     }
 
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() {
+
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            Collections.singleton(new PartitionInfo("foo", 0, node, new 
Node[]{node}, new Node[]{node})),
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, 
AdminClientConfig.RETRIES_CONFIG, "2")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = 
env.adminClient().listOffsets(Collections.singletonMap(tp0, 
OffsetSpec.maxTimestamp()));
+
+            TestUtils.assertFutureThrows(result.all(), 
UnsupportedVersionException.class);
+        }
+    }
+
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() 
throws Exception {
+
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        List<PartitionInfo> pInfos = new ArrayList<>();
+        pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new 
Node[]{node}));
+        pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new 
Node[]{node}));
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            pInfos,
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+        final TopicPartition tp1 = new TopicPartition("foo", 1);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
+            AdminClientConfig.RETRIES_CONFIG, "2")) {
+
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsTopicResponse topicResponse = 
ListOffsetsResponse.singletonListOffsetsTopicResponse(tp1, Errors.NONE, -1L, 
345L, 543);
+            ListOffsetsResponseData responseData = new 
ListOffsetsResponseData()
+                .setThrottleTimeMs(0)
+                .setTopics(Arrays.asList(topicResponse));
+            env.kafkaClient().prepareResponseFrom(new 
ListOffsetsResponse(responseData), node);
+
+            ListOffsetsResult result = env.adminClient().listOffsets(new 
HashMap<TopicPartition, OffsetSpec>() {{
+                    put(tp0, OffsetSpec.maxTimestamp());
+                    put(tp1, OffsetSpec.latest());
+                }});
+
+            TestUtils.assertFutureThrows(result.partitionResult(tp0), 
UnsupportedVersionException.class);
+
+            ListOffsetsResultInfo tp1Offset = 
result.partitionResult(tp1).get();
+            assertEquals(345L, tp1Offset.offset());
+            assertEquals(543, tp1Offset.leaderEpoch().get().intValue());
+            assertEquals(-1L, tp1Offset.timestamp());
+        }
+    }
+
+    @Test
+    public void testListOffsetsMaxTimestampAndNoBrokerResponse() {
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        List<PartitionInfo> pInfos = new ArrayList<>();
+        pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new 
Node[]{node}));
+        pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new 
Node[]{node}));
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            pInfos,
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+        final TopicPartition tp1 = new TopicPartition("foo", 1);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
+            AdminClientConfig.RETRIES_CONFIG, "2")) {
+
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);

Review comment:
       This does not really align with the name of the test. Is it intentional 
to have it? The test basically verifies that the first request is failed with 
an `UnsupportedVersionResponse` error and that the second request does not get 
any response.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4298,6 +4296,39 @@ void handleFailure(Throwable throwable) {
                         }
                     }
                 }
+
+                @Override
+                boolean 
handleUnsupportedVersionException(UnsupportedVersionException exception) {
+                    if (supportsMaxTimestamp) {
+                        supportsMaxTimestamp = false;
+
+                        // fail any unsupported futures and remove partitions 
from the downgraded retry
+                        List<ListOffsetsTopic> topicsToRemove = new 
ArrayList<>();
+                        partitionsToQuery.stream().forEach(
+                            t -> {
+                                List<ListOffsetsPartition> partitionsToRemove 
= new ArrayList<>();
+                                t.partitions().stream()
+                                    .filter(p -> p.timestamp() == 
ListOffsetsRequest.MAX_TIMESTAMP)
+                                    .forEach(
+                                        p -> {
+                                            futures.get(new 
TopicPartition(t.name(), p.partitionIndex()))
+                                                .completeExceptionally(
+                                                    new 
UnsupportedVersionException(
+                                                        "Broker " + brokerId
+                                                            + " does not 
support MAX_TIMESTAMP offset spec"));
+                                            partitionsToRemove.add(p);
+
+                                        });
+                                t.partitions().removeAll(partitionsToRemove);
+                                if (t.partitions().isEmpty()) 
topicsToRemove.add(t);
+                            }
+                        );
+                        partitionsToQuery.removeAll(topicsToRemove);

Review comment:
       Did you consider using iterators here? This would allow us to remove 
elements while iterating over the collections. It seems to me that this could 
make the code a bit more understandable.
   
   I am thinking about something like this:
   ```
   // fail any unsupported futures and remove partitions from the downgraded 
retry
   Iterator<ListOffsetsTopic> topicIterator = partitionsToQuery.iterator();
   while (topicIterator.hasNext()) {
       ListOffsetsTopic topic = topicIterator.next();
       Iterator<ListOffsetsPartition> partitionIterator = 
topic.partitions().iterator();
       while (partitionIterator.hasNext()) {
           ListOffsetsPartition partition = partitionIterator.next();
           if (partition.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP) {
               futures.get(new TopicPartition(topic.name(), 
partition.partitionIndex()))
                   .completeExceptionally(new UnsupportedVersionException(
                       "Broker " + brokerId + " does not support MAX_TIMESTAMP 
offset spec"));
               partitionIterator.remove();
           }
       }
       if (topic.partitions().isEmpty()) {
           topicIterator.remove();
       }
   }
   ```
   
   What do you think?

##########
File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
##########
@@ -94,7 +94,8 @@ class ReplicaFetcherThread(name: String,
 
   // Visible for testing
   private[server] val listOffsetRequestVersion: Short =
-    if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_8_IV0) 6
+    if (brokerConfig.interBrokerProtocolVersion >= KAFKA_3_0_IV0) 7

Review comment:
       We should use `KAFKA_3_0_IV1` here.

##########
File path: 
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##########
@@ -432,8 +438,8 @@ public void testSerialization() throws Exception {
         checkRequest(createUpdateMetadataRequest(5, null), false);
         checkErrorResponse(createUpdateMetadataRequest(5, "rack1"), 
unknownServerException, true);
         checkResponse(createUpdateMetadataResponse(), 0, true);
-        checkRequest(createListOffsetRequest(0), true);
-        checkErrorResponse(createListOffsetRequest(0), unknownServerException, 
true);
+        checkRequest(createListOffsetRequest(0, 1000000L), true);
+        checkErrorResponse(createListOffsetRequest(0, 1000000L), 
unknownServerException, true);

Review comment:
       Is this redundant with what is already tested above?

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -4226,6 +4235,124 @@ public void testListOffsetsNonRetriableErrors() throws 
Exception {
         }
     }
 
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() {
+
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            Collections.singleton(new PartitionInfo("foo", 0, node, new 
Node[]{node}, new Node[]{node})),
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, 
AdminClientConfig.RETRIES_CONFIG, "2")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = 
env.adminClient().listOffsets(Collections.singletonMap(tp0, 
OffsetSpec.maxTimestamp()));
+
+            TestUtils.assertFutureThrows(result.all(), 
UnsupportedVersionException.class);
+        }
+    }
+
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() 
throws Exception {
+
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        List<PartitionInfo> pInfos = new ArrayList<>();
+        pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new 
Node[]{node}));
+        pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new 
Node[]{node}));
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            pInfos,
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+        final TopicPartition tp1 = new TopicPartition("foo", 1);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
+            AdminClientConfig.RETRIES_CONFIG, "2")) {
+
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);

Review comment:
       Could we verify that the request contains only the specs which must be 
retried?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4298,6 +4296,39 @@ void handleFailure(Throwable throwable) {
                         }
                     }
                 }
+
+                @Override
+                boolean 
handleUnsupportedVersionException(UnsupportedVersionException exception) {
+                    if (supportsMaxTimestamp) {
+                        supportsMaxTimestamp = false;
+
+                        // fail any unsupported futures and remove partitions 
from the downgraded retry
+                        List<ListOffsetsTopic> topicsToRemove = new 
ArrayList<>();
+                        partitionsToQuery.stream().forEach(
+                            t -> {
+                                List<ListOffsetsPartition> partitionsToRemove 
= new ArrayList<>();
+                                t.partitions().stream()
+                                    .filter(p -> p.timestamp() == 
ListOffsetsRequest.MAX_TIMESTAMP)
+                                    .forEach(
+                                        p -> {
+                                            futures.get(new 
TopicPartition(t.name(), p.partitionIndex()))
+                                                .completeExceptionally(
+                                                    new 
UnsupportedVersionException(
+                                                        "Broker " + brokerId
+                                                            + " does not 
support MAX_TIMESTAMP offset spec"));
+                                            partitionsToRemove.add(p);
+
+                                        });
+                                t.partitions().removeAll(partitionsToRemove);
+                                if (t.partitions().isEmpty()) 
topicsToRemove.add(t);
+                            }
+                        );
+                        partitionsToQuery.removeAll(topicsToRemove);
+
+                        return !partitionsToQuery.isEmpty();

Review comment:
       Don't we need to not retry if they were no `MAX_TIMESTAMP` in the 
collection? In this case, it would mean that the `UnsupportedVersionException` 
is not retryable. This could happen if the admin client talk to a really old 
broker which does not support v1 for instance.

##########
File path: core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
##########
@@ -149,6 +179,21 @@ class LogOffsetTest extends BaseRequestTest {
     assertFalse(offsetChanged)
   }
 
+  @Test
+  def testFetchOffsetByTimestampForMaxTimestampWithEmptyLog(): Unit = {
+    val topic = "kafka-"
+    val topicPartition = new TopicPartition(topic, 0)
+    val log = createTopicAndGetLog(topic, topicPartition)
+
+    log.updateHighWatermark(log.logEndOffset)
+
+    val maxTimestampOffset = 
log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)
+    assertEquals(0L, log.logEndOffset)
+    assertEquals(0L, maxTimestampOffset.get.offset)
+    assertEquals(-1L, maxTimestampOffset.get.timestamp)
+

Review comment:
       nit: Empty line could be removed.

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -4226,6 +4235,124 @@ public void testListOffsetsNonRetriableErrors() throws 
Exception {
         }
     }
 
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() {
+
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            Collections.singleton(new PartitionInfo("foo", 0, node, new 
Node[]{node}, new Node[]{node})),
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, 
AdminClientConfig.RETRIES_CONFIG, "2")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = 
env.adminClient().listOffsets(Collections.singletonMap(tp0, 
OffsetSpec.maxTimestamp()));
+
+            TestUtils.assertFutureThrows(result.all(), 
UnsupportedVersionException.class);
+        }
+    }
+
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() 
throws Exception {
+

Review comment:
       nit: Empty line could be removed.

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -4226,6 +4235,124 @@ public void testListOffsetsNonRetriableErrors() throws 
Exception {
         }
     }
 
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() {
+

Review comment:
       nit: Empty line could be removed.

##########
File path: clients/src/main/resources/common/message/ListOffsetsRequest.json
##########
@@ -30,7 +30,9 @@
   // Version 5 is the same as version 4.
   //
   // Version 6 enables flexible versions.
-  "validVersions": "0-6",
+  //
+  // Version 7 enables listing offsets by max timestamp.

Review comment:
       Could we add the KIP as well?

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1316,6 +1316,16 @@ class Log(@volatile private var _dir: File,
         val latestEpochOpt = 
leaderEpochCache.flatMap(_.latestEpoch).map(_.asInstanceOf[Integer])
         val epochOptional = Optional.ofNullable(latestEpochOpt.orNull)
         Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, 
epochOptional))
+      } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
+        // Cache to avoid race conditions. `toBuffer` is faster than most 
alternatives and provides
+        // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
+        val segmentsCopy = logSegments.toBuffer
+        val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
+        val latestEpochOpt = 
leaderEpochCache.flatMap(_.latestEpoch).map(_.asInstanceOf[Integer])
+        val epochOptional = Optional.ofNullable(latestEpochOpt.orNull)
+        Some(new TimestampAndOffset(latestTimestampSegment.maxTimestampSoFar,
+          latestTimestampSegment.offsetOfMaxTimestampSoFar,
+          epochOptional))

Review comment:
       Could we file a JIRA as a subtask in the Jira of the KIP to not forget 
about it?

##########
File path: clients/src/main/resources/common/message/ListOffsetsResponse.json
##########
@@ -29,7 +29,9 @@
   // Version 5 adds a new error code, OFFSET_NOT_AVAILABLE.
   //
   // Version 6 enables flexible versions.
-  "validVersions": "0-6",
+  //
+  // Version 7 is the same as version 6.

Review comment:
       Could we add the KIP as well?

##########
File path: core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
##########
@@ -266,4 +311,14 @@ class LogOffsetTest extends BaseRequestTest {
       .partitions.asScala.find(_.partitionIndex == tp.partition).get
   }
 
+  private def createTopicAndGetLog(topic: String, topicPartition: 
TopicPartition): Log = {
+

Review comment:
       nit: Empty line could be removed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to