dajac commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1203673361
########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -1335,6 +1433,50 @@ class ReplicaManager(val config: KafkaConfig, result } + private def handleOffsetOutOfRangeError(tp: TopicIdPartition, params: FetchParams, fetchInfo: PartitionData, + adjustedMaxBytes: Int, minOneMessage: + Boolean, log: UnifiedLog, fetchTimeMs: Long, + exception: OffsetOutOfRangeException): LogReadResult = { + val offset = fetchInfo.fetchOffset + // In case of offset out of range errors, handle it for tiered storage only if all the below conditions are true. + // 1) remote log manager is enabled and it is available + // 2) `log` instance should not be null here as that would have been caught earlier with NotLeaderForPartitionException or ReplicaNotAvailableException. + // 3) fetch offset is within the offset range of the remote storage layer + if (remoteLogManager.isDefined && log != null && log.remoteLogEnabled() && + log.logStartOffset <= offset && offset < log.localLogStartOffset()) + { + val highWatermark = log.highWatermark + val leaderLogStartOffset = log.logStartOffset + val leaderLogEndOffset = log.logEndOffset + + if (params.isFromFollower) { + // If it is from a follower then send the offset metadata only as the data is already available in remote + // storage and throw an error saying that this offset is moved to tiered storage. + createLogReadResult(highWatermark, leaderLogStartOffset, leaderLogEndOffset, + new OffsetMovedToTieredStorageException("Given offset" + offset + " is moved to tiered storage")) + } else { + // For consume fetch requests, create a dummy FetchDataInfo with the remote storage fetch information. + // For the first topic-partition that needs remote data, we will use this information to read the data in another thread. + val fetchDataInfo = + new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false, Optional.empty(), Review Comment: nit: `new FetchDataInfo` should be on previous line or indented. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org