apoorvmittal10 commented on code in PR #17573:
URL: https://github.com/apache/kafka/pull/17573#discussion_r1818848879
##########
core/src/main/java/kafka/server/share/ShareFetchUtils.java:
##########
@@ -128,4 +131,17 @@ static long offsetForEarliestTimestamp(TopicIdPartition
topicIdPartition, Replic
Optional.empty(), true).timestampAndOffsetOpt();
return timestampAndOffset.isEmpty() ? (long) 0 :
timestampAndOffset.get().offset;
}
+
+ /**
+ * The method is used to get the offset for the latest timestamp for the
topic-partition.
+ *
+ * @return The offset for the latest timestamp.
+ */
+ static long offsetForLatestTimestamp(TopicIdPartition topicIdPartition,
ReplicaManager replicaManager) {
+ // Isolation level is set to READ_UNCOMMITTED, matching with that used
in share fetch requests
+ Option<FileRecords.TimestampAndOffset> timestampAndOffset =
replicaManager.fetchOffsetForTimestamp(
+ topicIdPartition.topicPartition(),
ListOffsetsRequest.LATEST_TIMESTAMP, new
Some<>(IsolationLevel.READ_UNCOMMITTED),
+ Optional.empty(), true).timestampAndOffsetOpt();
+ return timestampAndOffset.isEmpty() ? (long) 0 :
timestampAndOffset.get().offset;
Review Comment:
If we send 0 when timestamp is empty but if first fetch offset in log is at
10 then will the fetch throw an exception? If yes, then what will happen next?
And in waht cases the the timestamp can be empty from replica manager?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]