satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1188479417
########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -1288,17 +1373,46 @@ class ReplicaManager(val config: KafkaConfig, _: FencedLeaderEpochException | _: ReplicaNotAvailableException | _: KafkaStorageException | - _: OffsetOutOfRangeException | _: InconsistentTopicIdException) => - LogReadResult(info = new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY), - divergingEpoch = None, - highWatermark = UnifiedLog.UnknownOffset, - leaderLogStartOffset = UnifiedLog.UnknownOffset, - leaderLogEndOffset = UnifiedLog.UnknownOffset, - followerLogStartOffset = UnifiedLog.UnknownOffset, - fetchTimeMs = -1L, - lastStableOffset = None, - exception = Some(e)) + createLogReadResult(e) + case e: OffsetOutOfRangeException => + // In case of offset out of range errors, check for remote log manager for non-compacted topics + // to fetch from remote storage. `log` instance should not be null here as that would have been caught earlier + // with NotLeaderForPartitionException or ReplicaNotAvailableException. + // If it is from a follower then send the offset metadata only as the data is already available in remote + // storage. + if (remoteLogManager.isDefined && log != null && log.remoteLogEnabled() && + // Check that the fetch offset is within the offset range within the remote storage layer. + log.logStartOffset <= offset && offset < log.localLogStartOffset()) { Review Comment: That should work fine because it will eventually throw offset out-of-range error if the target offset does not exist. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org