chirag-wadhwa5 commented on code in PR #17573:
URL: https://github.com/apache/kafka/pull/17573#discussion_r1814625508


##########
core/src/main/java/kafka/server/share/ShareFetchUtils.java:
##########
@@ -117,4 +119,17 @@ static long offsetForEarliestTimestamp(TopicIdPartition 
topicIdPartition, Replic
                 Optional.empty(), true).timestampAndOffsetOpt();
         return timestampAndOffset.isEmpty() ? (long) 0 : 
timestampAndOffset.get().offset;
     }
+
+    /**
+     * The method is used to get the offset for the latest timestamp for the 
topic-partition.
+     *
+     * @return The offset for the latest timestamp.
+     */
+    static long offsetForLatestTimestamp(TopicIdPartition topicIdPartition, 
ReplicaManager replicaManager) {
+        // Isolation level is set to READ_UNCOMMITTED, matching with that used 
in share fetch requests
+        Option<FileRecords.TimestampAndOffset> timestampAndOffset = 
replicaManager.fetchOffsetForTimestamp(
+            topicIdPartition.topicPartition(), 
ListOffsetsRequest.LATEST_TIMESTAMP, new 
Some<>(IsolationLevel.READ_UNCOMMITTED),
+            Optional.empty(), true).timestampAndOffsetOpt();
+        return timestampAndOffset.get().offset;

Review Comment:
   Thanks for the review. As per the fetchOffsetForTimestamp method in 
Partition.scala, the method actually never returns a None when we try to fetch 
the latest offset, but I will still add the check just to be sure. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to