apoorvmittal10 commented on code in PR #17573:
URL: https://github.com/apache/kafka/pull/17573#discussion_r1818848879


##########
core/src/main/java/kafka/server/share/ShareFetchUtils.java:
##########
@@ -128,4 +131,17 @@ static long offsetForEarliestTimestamp(TopicIdPartition 
topicIdPartition, Replic
                 Optional.empty(), true).timestampAndOffsetOpt();
         return timestampAndOffset.isEmpty() ? (long) 0 : 
timestampAndOffset.get().offset;
     }
+
+    /**
+     * The method is used to get the offset for the latest timestamp for the 
topic-partition.
+     *
+     * @return The offset for the latest timestamp.
+     */
+    static long offsetForLatestTimestamp(TopicIdPartition topicIdPartition, 
ReplicaManager replicaManager) {
+        // Isolation level is set to READ_UNCOMMITTED, matching with that used 
in share fetch requests
+        Option<FileRecords.TimestampAndOffset> timestampAndOffset = 
replicaManager.fetchOffsetForTimestamp(
+            topicIdPartition.topicPartition(), 
ListOffsetsRequest.LATEST_TIMESTAMP, new 
Some<>(IsolationLevel.READ_UNCOMMITTED),
+            Optional.empty(), true).timestampAndOffsetOpt();
+        return timestampAndOffset.isEmpty() ? (long) 0 : 
timestampAndOffset.get().offset;

Review Comment:
   If we send 0 when timestamp is empty but if first fetch offset in log is at 
10 then will the fetch throw an exception? If yes, then what will happen next? 
And in waht cases the the timestamp can be empty from replica manager? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to