iit2009060 commented on code in PR #15060: URL: https://github.com/apache/kafka/pull/15060#discussion_r1435445309
########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -1289,15 +1289,21 @@ public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws } RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlsMetadataOptional.get(); - int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, offset); InputStream remoteSegInputStream = null; try { - // Search forward for the position of the last offset that is greater than or equal to the target offset - remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos); - RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream); - - RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset); - + int startPos = 0; + RecordBatch firstBatch = null; + while (firstBatch == null && rlsMetadataOptional.isPresent()) { Review Comment: @clolov We need to use a while loop as there may be a possibility ,the next segment in the iteration is also log compacted and we may need to further move until we find it. Check the example above I attached in the description PR where 0.log and 6.log both are log compacted fully and the next batch exist only in the 07.log. The similar logic is used in fetching data from the log segment. https://github.com/apache/kafka/blob/317f61dfd9a7f1443cf75b6a32568d1c81984d08/core/src/main/scala/kafka/log/LocalLog.scala#L425 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org