satishd commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1181518738


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -600,25 +622,208 @@ public String toString() {
         }
     }
 
-    long findHighestRemoteOffset(TopicIdPartition topicIdPartition) throws 
RemoteStorageException {
-        Optional<Long> offset = Optional.empty();
-        Optional<UnifiedLog> maybeLog = 
fetchLog.apply(topicIdPartition.topicPartition());
-        if (maybeLog.isPresent()) {
-            UnifiedLog log = maybeLog.get();
-            Option<LeaderEpochFileCache> maybeLeaderEpochFileCache = 
log.leaderEpochCache();
-            if (maybeLeaderEpochFileCache.isDefined()) {
-                LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get();
-                OptionalInt epoch = cache.latestEpoch();
-                while (!offset.isPresent() && epoch.isPresent()) {
-                    offset = 
remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, 
epoch.getAsInt());
-                    epoch = cache.previousEpoch(epoch.getAsInt());
+    public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) 
throws RemoteStorageException, IOException {
+        int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+        TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+        FetchRequest.PartitionData fetchInfo = 
remoteStorageFetchInfo.fetchInfo;
+
+        boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == 
FetchIsolation.TXN_COMMITTED;
+
+        long offset = fetchInfo.fetchOffset;
+        int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+        Optional<UnifiedLog> logOptional = fetchLog.apply(tp);
+        OptionalInt epoch = OptionalInt.empty();
+
+        if (logOptional.isPresent()) {
+            Option<LeaderEpochFileCache> leaderEpochCache = 
logOptional.get().leaderEpochCache();
+            if (leaderEpochCache.isDefined()) {
+                epoch = leaderEpochCache.get().epochForOffset(offset);
+            }
+        }
+
+        Optional<RemoteLogSegmentMetadata> rlsMetadataOptional = 
epoch.isPresent()
+                ? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset)
+                : Optional.empty();
+
+        if (!rlsMetadataOptional.isPresent()) {
+            String epochStr = (epoch.isPresent()) ? 
Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE";
+            throw new OffsetOutOfRangeException("Received request for offset " 
+ offset + " for leader epoch "
+                    + epochStr + " and partition " + tp + " which does not 
exist in remote tier.");
+        }
+
+        RemoteLogSegmentMetadata remoteLogSegmentMetadata = 
rlsMetadataOptional.get();
+        int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, 
offset);
+        InputStream remoteSegInputStream = null;
+        try {
+            // Search forward for the position of the last offset that is 
greater than or equal to the target offset
+            remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos);
+            RemoteLogInputStream remoteLogInputStream = new 
RemoteLogInputStream(remoteSegInputStream);

Review Comment:
   Sure, we can definitely look into that.  We did not see much of GC issues 
remote read throughputs ~750 MBps on a broker but there are plans to improve by 
exploring buffer pool mechanisms(variations of pool used in producers).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to