divijvaidya commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1182330923
########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -600,25 +622,208 @@ public String toString() { } } - long findHighestRemoteOffset(TopicIdPartition topicIdPartition) throws RemoteStorageException { - Optional<Long> offset = Optional.empty(); - Optional<UnifiedLog> maybeLog = fetchLog.apply(topicIdPartition.topicPartition()); - if (maybeLog.isPresent()) { - UnifiedLog log = maybeLog.get(); - Option<LeaderEpochFileCache> maybeLeaderEpochFileCache = log.leaderEpochCache(); - if (maybeLeaderEpochFileCache.isDefined()) { - LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get(); - OptionalInt epoch = cache.latestEpoch(); - while (!offset.isPresent() && epoch.isPresent()) { - offset = remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, epoch.getAsInt()); - epoch = cache.previousEpoch(epoch.getAsInt()); + public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws RemoteStorageException, IOException { + int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes; + TopicPartition tp = remoteStorageFetchInfo.topicPartition; + FetchRequest.PartitionData fetchInfo = remoteStorageFetchInfo.fetchInfo; + + boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == FetchIsolation.TXN_COMMITTED; + + long offset = fetchInfo.fetchOffset; + int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes); + + Optional<UnifiedLog> logOptional = fetchLog.apply(tp); + OptionalInt epoch = OptionalInt.empty(); + + if (logOptional.isPresent()) { + Option<LeaderEpochFileCache> leaderEpochCache = logOptional.get().leaderEpochCache(); + if (leaderEpochCache.isDefined()) { + epoch = leaderEpochCache.get().epochForOffset(offset); + } + } + + Optional<RemoteLogSegmentMetadata> rlsMetadataOptional = epoch.isPresent() + ? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset) + : Optional.empty(); + + if (!rlsMetadataOptional.isPresent()) { + String epochStr = (epoch.isPresent()) ? Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE"; + throw new OffsetOutOfRangeException("Received request for offset " + offset + " for leader epoch " + + epochStr + " and partition " + tp + " which does not exist in remote tier."); + } + + RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlsMetadataOptional.get(); + int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, offset); + InputStream remoteSegInputStream = null; + try { + // Search forward for the position of the last offset that is greater than or equal to the target offset + remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos); + RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream); Review Comment: Thanks, I added a JIRA https://issues.apache.org/jira/browse/KAFKA-14954 to ensure that we are tracking it and don't forget about it. I agree that this is not a major blocker from GC perspective as JVMs are getting more intelligent about allocation/deallocation of byte[] but this adds up in situations when we have large number of allocation. Another PR where we removed data copy and allocation resulted in 2% CPU improvement (measured from flamegraph) with JDK17. JDK 8 will probably have a larger impact. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org