showuon commented on code in PR #13487:

@@ -272,15 +364,270 @@ class RemoteLogManager(rlmConfig: RemoteLogManagerConfig,
+  trait CancellableRunnable extends Runnable {
+    @volatile private var cancelled = false
+    def cancel(): Unit = {
+      cancelled = true
+    }
+    def isCancelled(): Boolean = {
+      cancelled
+    }
+  }
+  /**
+   * Returns the leader epoch checkpoint by truncating with the given 
start[exclusive] and end[inclusive] offset
+   *
+   * @param log         The actual log from where to take the leader-epoch 
+   * @param startOffset The start offset of the checkpoint file (exclusive in 
the truncation).
+   *                    If start offset is 6, then it will retain an entry at 
offset 6.
+   * @param endOffset   The end offset of the checkpoint file (inclusive in 
the truncation)
+   *                    If end offset is 100, then it will remove the entries 
greater than or equal to 100.
+   * @return the truncated leader epoch checkpoint
+   */
+  private[remote] def getLeaderEpochCheckpoint(log: UnifiedLog, startOffset: 
Long, endOffset: Long): InMemoryLeaderEpochCheckpoint = {
+    val checkpoint = new InMemoryLeaderEpochCheckpoint()
+    log.leaderEpochCache
+      .map(cache => cache.writeTo(checkpoint))
+      .foreach { x =>
+        if (startOffset >= 0) {
+          x.truncateFromStart(startOffset)
+        }
+        x.truncateFromEnd(endOffset)
+      }
+    checkpoint
+  }
+  private[remote] class RLMTask(tpId: TopicIdPartition) extends 
CancellableRunnable with Logging {
+    this.logIdent = s"[RemoteLogManager=$brokerId partition=$tpId] "
+    @volatile private var leaderEpoch: Int = -1
+    private def isLeader(): Boolean = leaderEpoch >= 0
+    // The readOffset is None initially for a new leader RLMTask, and needs to 
be fetched inside the task's run() method.
+    @volatile private var copiedOffsetOption: Option[Long] = None
+    def convertToLeader(leaderEpochVal: Int): Unit = {
+      if (leaderEpochVal < 0) {
+        throw new KafkaException(s"leaderEpoch value for topic partition $tpId 
can not be negative")
+      }
+      if (this.leaderEpoch != leaderEpochVal) {
+        leaderEpoch = leaderEpochVal
+      }
+      // Reset readOffset, so that it is set in next run of RLMTask
+      copiedOffsetOption = None
+    }
+    def convertToFollower(): Unit = {
+      leaderEpoch = -1
+    }
+    def handleCopyLogSegmentsToRemote(): Unit = {
+      if (isCancelled())
+        return
+      def maybeUpdateReadOffset(): Unit = {
+        if (copiedOffsetOption.isEmpty) {
+          info(s"Find the highest remote offset for partition: $tpId after 
becoming leader, leaderEpoch: $leaderEpoch")
+          // This is found by traversing from the latest leader epoch from 
leader epoch history and find the highest offset
+          // of a segment with that epoch copied into remote storage. If it 
can not find an entry then it checks for the
+          // previous leader epoch till it finds an entry, If there are no 
entries till the earliest leader epoch in leader
+          // epoch cache then it starts copying the segments from the earliest 
epoch entry’s offset.
+          copiedOffsetOption = Some(findHighestRemoteOffset(tpId))
+        }
+      }
+      try {
+        maybeUpdateReadOffset()
+        val copiedOffset = copiedOffsetOption.get
+        fetchLog(tpId.topicPartition()).foreach { log =>
+          // LSO indicates the offset below are ready to be 
consumed(high-watermark or committed)
+          val lso = log.lastStableOffset
+          if (lso < 0) {
+            warn(s"lastStableOffset for partition $tpId is $lso, which should 
not be negative.")
+          } else if (lso > 0 && copiedOffset < lso) {
+            // Copy segments only till the min of high-watermark or 
stable-offset as remote storage should contain
+            // only committed/acked messages
+            val toOffset = lso
+            debug(s"Checking for segments to copy, copiedOffset: $copiedOffset 
and toOffset: $toOffset")
+            val activeSegBaseOffset = log.activeSegment.baseOffset
+            // log-start-offset can be ahead of the read-offset, when:
+            // 1) log-start-offset gets incremented via delete-records API (or)
+            // 2) enabling the remote log for the first time
+            val fromOffset = Math.max(copiedOffset + 1, log.logStartOffset)
+            val sortedSegments = log.logSegments(fromOffset, 
+            val activeSegIndex: Int = => 
x.baseOffset).search(activeSegBaseOffset) match {
+              case Found(x) => x
+              case InsertionPoint(y) => y - 1
+            }
+            // sortedSegments becomes empty list when fromOffset and toOffset 
are same, and activeSegIndex becomes -1
+            if (activeSegIndex < 0) {
+              debug(s"No segments found to be copied for partition $tpId with 
copiedOffset: $copiedOffset and " +
+                s"active segment's base-offset: $activeSegBaseOffset")
+            } else {
+              sortedSegments.slice(0, activeSegIndex).foreach { segment =>
+                if (isCancelled() || !isLeader()) {
+                  info(s"Skipping copying log segments as the current task 
state is changed, cancelled: " +
+                    s"${isCancelled()} leader:${isLeader()}")
+                  return
+                }
+                val logFile = segment.log.file()
+                val logFileName = logFile.getName
+                info(s"Copying $logFileName to remote storage.")
+                val id = new RemoteLogSegmentId(tpId, Uuid.randomUuid())
+                val nextOffset = segment.readNextOffset
+                val endOffset = nextOffset - 1
+                val producerIdSnapshotFile: File = 
+                val epochEntries = getLeaderEpochCheckpoint(log, 
segment.baseOffset, nextOffset).read()
+                val segmentLeaderEpochs: util.Map[Integer, lang.Long] = new 
util.HashMap[Integer, lang.Long](epochEntries.size())
+                epochEntries.forEach(entry =>
+                  segmentLeaderEpochs.put(Integer.valueOf(entry.epoch), 
+                )
+                val remoteLogSegmentMetadata = new 
RemoteLogSegmentMetadata(id, segment.baseOffset, endOffset,
+                  segment.largestTimestamp, brokerId, time.milliseconds(), 
+                  segmentLeaderEpochs)
+                val leaderEpochsIndex = getLeaderEpochCheckpoint(log, 
startOffset = -1, nextOffset).readAsByteBuffer()
+                val segmentData = new LogSegmentData(logFile.toPath, 
+                  toPathIfExists(segment.lazyTimeIndex.get.file), 
+                  producerIdSnapshotFile.toPath, leaderEpochsIndex)
+                val rlsmAfterSegmentCopy = new 
RemoteLogSegmentMetadataUpdate(id, time.milliseconds(),
+                  RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId)
+                copiedOffsetOption = Some(endOffset)
+                log.updateHighestOffsetInRemoteStorage(endOffset)
+                info(s"Copied $logFileName to remote storage with segment-id: 
+              }
+            }
+          } else {
+            debug(s"Skipping copying segments, current 
read-offset:$copiedOffset, and LSO:$lso")
+          }
+        }
+      } catch {
+        case ex: Exception =>
+          if (!isCancelled()) {
+            error(s"Error occurred while copying log segments of partition: 
$tpId", ex)
+          }
+      }
+    }
+    private def toPathIfExists(file: File): Path = {
+      if (file.exists()) file.toPath else null
+    }
+    override def run(): Unit = {
+      if (isCancelled())
+        return
+      try {
+        if (isLeader()) {
+          // Copy log segments to remote storage
+          handleCopyLogSegmentsToRemote()
+        }
+      } catch {
+        case ex: InterruptedException =>
+          if (!isCancelled()) {
+            warn(s"Current thread for topic-partition-id $tpId is interrupted, 
this task won't be rescheduled. " +
+              s"Reason: ${ex.getMessage}")
+          }
+        case ex: Exception =>
+          if (!isCancelled()) {
+            warn(s"Current task for topic-partition $tpId received error but 
it will be scheduled. " +
+              s"Reason: ${ex.getMessage}")
+          }
+      }
+    }
+    override def toString: String = {
+      this.getClass.toString + s"[$tpId]"
+    }
+  }
+  def findHighestRemoteOffset(topicIdPartition: TopicIdPartition): Long = {
+    var offset: Optional[lang.Long] = Optional.empty()
+    fetchLog(topicIdPartition.topicPartition()).foreach { log =>
+      log.leaderEpochCache.foreach(cache => {
+        var epoch = cache.latestEpoch
+        while (!offset.isPresent && epoch.isPresent) {
+          offset = 
remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, epoch.getAsInt)
+          epoch = cache.previousEpoch(epoch.getAsInt)
+        }
+      })
+    }
+    offset.orElse(-1L)
+  }
+  private def doHandleLeaderOrFollowerPartitions(topicPartition: 
+                                                 convertToLeaderOrFollower: 
RLMTask => Unit): Unit = {
+    val rlmTaskWithFuture = 
leaderOrFollowerTasks.computeIfAbsent(topicPartition, (tp: TopicIdPartition) => 
+      val task = new RLMTask(tp)
+      // set this upfront when it is getting initialized instead of doing it 
after scheduling.
+      convertToLeaderOrFollower(task)
+      info(s"Created a new task: $task and getting scheduled")
+      val future = rlmScheduledThreadPool.scheduleWithFixedDelay(task, 0, 
delayInMs, TimeUnit.MILLISECONDS)
+      RLMTaskWithFuture(task, future)
+    })
+    convertToLeaderOrFollower(rlmTaskWithFuture.rlmTask)

Review Comment:
   Will there be race condition when the topic partition is new inserted, so 
it'll set `convertToLeaderOrFollower` once before scheduling. And then the 
scheduled thread could be run immediately, later, run this line 
`convertToLeaderOrFollower(rlmTaskWithFuture.rlmTask)`, so it reset the leader 
epoch if it got updated. 
   Could we add a null condition check for `rlmTaskWithFuture` here? that is:
   if (rlmTaskWithFuture != null) {
     // not new created task, should convert it here

@@ -272,15 +364,270 @@ class RemoteLogManager(rlmConfig: RemoteLogManagerConfig,
+  trait CancellableRunnable extends Runnable {
+    @volatile private var cancelled = false
+    def cancel(): Unit = {
+      cancelled = true
+    }
+    def isCancelled(): Boolean = {
+      cancelled
+    }
+  }
+  /**
+   * Returns the leader epoch checkpoint by truncating with the given 
start[exclusive] and end[inclusive] offset
+   *
+   * @param log         The actual log from where to take the leader-epoch 
+   * @param startOffset The start offset of the checkpoint file (exclusive in 
the truncation).
+   *                    If start offset is 6, then it will retain an entry at 
offset 6.
+   * @param endOffset   The end offset of the checkpoint file (inclusive in 
the truncation)
+   *                    If end offset is 100, then it will remove the entries 
greater than or equal to 100.
+   * @return the truncated leader epoch checkpoint
+   */
+  private[remote] def getLeaderEpochCheckpoint(log: UnifiedLog, startOffset: 
Long, endOffset: Long): InMemoryLeaderEpochCheckpoint = {
+    val checkpoint = new InMemoryLeaderEpochCheckpoint()
+    log.leaderEpochCache
+      .map(cache => cache.writeTo(checkpoint))
+      .foreach { x =>
+        if (startOffset >= 0) {
+          x.truncateFromStart(startOffset)
+        }
+        x.truncateFromEnd(endOffset)
+      }
+    checkpoint
+  }
+  private[remote] class RLMTask(tpId: TopicIdPartition) extends 
CancellableRunnable with Logging {
+    this.logIdent = s"[RemoteLogManager=$brokerId partition=$tpId] "
+    @volatile private var leaderEpoch: Int = -1
+    private def isLeader(): Boolean = leaderEpoch >= 0
+    // The readOffset is None initially for a new leader RLMTask, and needs to 
be fetched inside the task's run() method.
+    @volatile private var copiedOffsetOption: Option[Long] = None
+    def convertToLeader(leaderEpochVal: Int): Unit = {
+      if (leaderEpochVal < 0) {
+        throw new KafkaException(s"leaderEpoch value for topic partition $tpId 
can not be negative")
+      }
+      if (this.leaderEpoch != leaderEpochVal) {
+        leaderEpoch = leaderEpochVal
+      }
+      // Reset readOffset, so that it is set in next run of RLMTask
+      copiedOffsetOption = None
+    }
+    def convertToFollower(): Unit = {
+      leaderEpoch = -1
+    }
+    def handleCopyLogSegmentsToRemote(): Unit = {
+      if (isCancelled())
+        return
+      def maybeUpdateReadOffset(): Unit = {
+        if (copiedOffsetOption.isEmpty) {
+          info(s"Find the highest remote offset for partition: $tpId after 
becoming leader, leaderEpoch: $leaderEpoch")
+          // This is found by traversing from the latest leader epoch from 
leader epoch history and find the highest offset
+          // of a segment with that epoch copied into remote storage. If it 
can not find an entry then it checks for the
+          // previous leader epoch till it finds an entry, If there are no 
entries till the earliest leader epoch in leader
+          // epoch cache then it starts copying the segments from the earliest 
epoch entry’s offset.
+          copiedOffsetOption = Some(findHighestRemoteOffset(tpId))
+        }
+      }
+      try {
+        maybeUpdateReadOffset()
+        val copiedOffset = copiedOffsetOption.get
+        fetchLog(tpId.topicPartition()).foreach { log =>
+          // LSO indicates the offset below are ready to be 
consumed(high-watermark or committed)
+          val lso = log.lastStableOffset
+          if (lso < 0) {
+            warn(s"lastStableOffset for partition $tpId is $lso, which should 
not be negative.")
+          } else if (lso > 0 && copiedOffset < lso) {
+            // Copy segments only till the min of high-watermark or 
stable-offset as remote storage should contain
+            // only committed/acked messages
+            val toOffset = lso
+            debug(s"Checking for segments to copy, copiedOffset: $copiedOffset 
and toOffset: $toOffset")
+            val activeSegBaseOffset = log.activeSegment.baseOffset
+            // log-start-offset can be ahead of the read-offset, when:
+            // 1) log-start-offset gets incremented via delete-records API (or)
+            // 2) enabling the remote log for the first time
+            val fromOffset = Math.max(copiedOffset + 1, log.logStartOffset)
+            val sortedSegments = log.logSegments(fromOffset, 
+            val activeSegIndex: Int = => 
x.baseOffset).search(activeSegBaseOffset) match {
+              case Found(x) => x
+              case InsertionPoint(y) => y - 1
+            }
+            // sortedSegments becomes empty list when fromOffset and toOffset 
are same, and activeSegIndex becomes -1
+            if (activeSegIndex < 0) {
+              debug(s"No segments found to be copied for partition $tpId with 
copiedOffset: $copiedOffset and " +
+                s"active segment's base-offset: $activeSegBaseOffset")
+            } else {
+              sortedSegments.slice(0, activeSegIndex).foreach { segment =>
+                if (isCancelled() || !isLeader()) {
+                  info(s"Skipping copying log segments as the current task 
state is changed, cancelled: " +
+                    s"${isCancelled()} leader:${isLeader()}")
+                  return
+                }
+                val logFile = segment.log.file()
+                val logFileName = logFile.getName
+                info(s"Copying $logFileName to remote storage.")
+                val id = new RemoteLogSegmentId(tpId, Uuid.randomUuid())
+                val nextOffset = segment.readNextOffset
+                val endOffset = nextOffset - 1
+                val producerIdSnapshotFile: File = 
+                val epochEntries = getLeaderEpochCheckpoint(log, 
segment.baseOffset, nextOffset).read()
+                val segmentLeaderEpochs: util.Map[Integer, lang.Long] = new 
util.HashMap[Integer, lang.Long](epochEntries.size())
+                epochEntries.forEach(entry =>
+                  segmentLeaderEpochs.put(Integer.valueOf(entry.epoch), 
+                )
+                val remoteLogSegmentMetadata = new 
RemoteLogSegmentMetadata(id, segment.baseOffset, endOffset,
+                  segment.largestTimestamp, brokerId, time.milliseconds(), 
+                  segmentLeaderEpochs)
+                val leaderEpochsIndex = getLeaderEpochCheckpoint(log, 
startOffset = -1, nextOffset).readAsByteBuffer()
+                val segmentData = new LogSegmentData(logFile.toPath, 
+                  toPathIfExists(segment.lazyTimeIndex.get.file), 
+                  producerIdSnapshotFile.toPath, leaderEpochsIndex)
+                val rlsmAfterSegmentCopy = new 
RemoteLogSegmentMetadataUpdate(id, time.milliseconds(),
+                  RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId)
+                copiedOffsetOption = Some(endOffset)
+                log.updateHighestOffsetInRemoteStorage(endOffset)
+                info(s"Copied $logFileName to remote storage with segment-id: 
+              }
+            }
+          } else {
+            debug(s"Skipping copying segments, current 
read-offset:$copiedOffset, and LSO:$lso")
+          }
+        }
+      } catch {
+        case ex: Exception =>
+          if (!isCancelled()) {
+            error(s"Error occurred while copying log segments of partition: 
$tpId", ex)
+          }
+      }
+    }
+    private def toPathIfExists(file: File): Path = {
+      if (file.exists()) file.toPath else null
+    }
+    override def run(): Unit = {
+      if (isCancelled())
+        return
+      try {
+        if (isLeader()) {
+          // Copy log segments to remote storage
+          handleCopyLogSegmentsToRemote()
+        }
+      } catch {
+        case ex: InterruptedException =>
+          if (!isCancelled()) {
+            warn(s"Current thread for topic-partition-id $tpId is interrupted, 
this task won't be rescheduled. " +
+              s"Reason: ${ex.getMessage}")
+          }
+        case ex: Exception =>
+          if (!isCancelled()) {
+            warn(s"Current task for topic-partition $tpId received error but 
it will be scheduled. " +
+              s"Reason: ${ex.getMessage}")
+          }
+      }
+    }
+    override def toString: String = {
+      this.getClass.toString + s"[$tpId]"
+    }
+  }
+  def findHighestRemoteOffset(topicIdPartition: TopicIdPartition): Long = {
+    var offset: Optional[lang.Long] = Optional.empty()
+    fetchLog(topicIdPartition.topicPartition()).foreach { log =>
+      log.leaderEpochCache.foreach(cache => {
+        var epoch = cache.latestEpoch
+        while (!offset.isPresent && epoch.isPresent) {
+          offset = 
remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, epoch.getAsInt)
+          epoch = cache.previousEpoch(epoch.getAsInt)
+        }
+      })
+    }
+    offset.orElse(-1L)
+  }
+  private def doHandleLeaderOrFollowerPartitions(topicPartition: 
+                                                 convertToLeaderOrFollower: 
RLMTask => Unit): Unit = {
+    val rlmTaskWithFuture = 
leaderOrFollowerTasks.computeIfAbsent(topicPartition, (tp: TopicIdPartition) => 
+      val task = new RLMTask(tp)
+      // set this upfront when it is getting initialized instead of doing it 
after scheduling.
+      convertToLeaderOrFollower(task)
+      info(s"Created a new task: $task and getting scheduled")
+      val future = rlmScheduledThreadPool.scheduleWithFixedDelay(task, 0, 
delayInMs, TimeUnit.MILLISECONDS)
+      RLMTaskWithFuture(task, future)
+    })
+    convertToLeaderOrFollower(rlmTaskWithFuture.rlmTask)

Review Comment:
   Will there be race condition when the topic partition is new inserted, so 
it'll set `convertToLeaderOrFollower` once before scheduling. And then the 
scheduled thread could be run immediately, later, run this line 
`convertToLeaderOrFollower(rlmTaskWithFuture.rlmTask)`, so it reset the leader 
epoch if it got updated. 
   Could we add a null condition check for `rlmTaskWithFuture` here? that is:
   if (rlmTaskWithFuture != null) {
     // not new created task, should convert it here

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail:

For queries about this service, please contact Infrastructure at:

Reply via email to