dajac commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1203673361


##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1335,6 +1433,50 @@ class ReplicaManager(val config: KafkaConfig,
     result
   }
 
+  private def handleOffsetOutOfRangeError(tp: TopicIdPartition, params: 
FetchParams, fetchInfo: PartitionData,
+                                          adjustedMaxBytes: Int, minOneMessage:
+                                          Boolean, log: UnifiedLog, 
fetchTimeMs: Long,
+                                          exception: 
OffsetOutOfRangeException): LogReadResult = {
+    val offset = fetchInfo.fetchOffset
+    // In case of offset out of range errors, handle it for tiered storage 
only if all the below conditions are true.
+    //   1) remote log manager is enabled and it is available
+    //   2) `log` instance should not be null here as that would have been 
caught earlier with NotLeaderForPartitionException or 
ReplicaNotAvailableException.
+    //   3) fetch offset is within the offset range of the remote storage layer
+    if (remoteLogManager.isDefined && log != null && log.remoteLogEnabled() &&
+      log.logStartOffset <= offset && offset < log.localLogStartOffset())
+    {
+      val highWatermark = log.highWatermark
+      val leaderLogStartOffset = log.logStartOffset
+      val leaderLogEndOffset = log.logEndOffset
+
+      if (params.isFromFollower) {
+        // If it is from a follower then send the offset metadata only as the 
data is already available in remote
+        // storage and throw an error saying that this offset is moved to 
tiered storage.
+        createLogReadResult(highWatermark, leaderLogStartOffset, 
leaderLogEndOffset,
+          new OffsetMovedToTieredStorageException("Given offset" + offset + " 
is moved to tiered storage"))
+      } else {
+        // For consume fetch requests, create a dummy FetchDataInfo with the 
remote storage fetch information.
+        // For the first topic-partition that needs remote data, we will use 
this information to read the data in another thread.
+        val fetchDataInfo =
+        new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, 
false, Optional.empty(),

Review Comment:
   nit: `new FetchDataInfo` should be on previous line or indented.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to