satishd commented on a change in pull request #11390:
URL: https://github.com/apache/kafka/pull/11390#discussion_r786517360



##########
File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
##########
@@ -386,11 +397,109 @@ class ReplicaFetcherThread(name: String,
   }
 
   /**
-   *  To avoid ISR thrashing, we only throttle a replica on the follower if 
it's in the throttled replica list,
-   *  the quota is exceeded and the replica is not in sync.
+   * To avoid ISR thrashing, we only throttle a replica on the follower if 
it's in the throttled replica list,
+   * the quota is exceeded and the replica is not in sync.
    */
   private def shouldFollowerThrottle(quota: ReplicaQuota, fetchState: 
PartitionFetchState, topicPartition: TopicPartition): Boolean = {
     !fetchState.isReplicaInSync && quota.isThrottled(topicPartition) && 
quota.isQuotaExceeded
   }
 
+  /**
+   * It tries to build the required state for this partition from leader and 
remote storage so that it can start
+   * fetching records from the leader.
+   */
+  override protected def buildRemoteLogAuxState(partition: TopicPartition,
+                                                currentLeaderEpoch: Int,
+                                                leaderLocalLogStartOffset: 
Long,
+                                                
epochForLeaderLocalLogStartOffset: Int,
+                                                leaderLogStartOffset: Long): 
Unit = {
+
+    def fetchEarlierEpochEndOffset(epoch:Int): EpochEndOffset = {
+        val previousEpoch = epoch - 1
+        // Find the end-offset for the epoch earlier to the given epoch from 
the leader
+        val partitionsWithEpochs = Map(partition -> new 
EpochData().setPartition(partition.partition())
+          .setCurrentLeaderEpoch(currentLeaderEpoch)
+          .setLeaderEpoch(previousEpoch))
+        val maybeEpochEndOffset = 
fetchEpochEndOffsetsFromLeader(partitionsWithEpochs).get(partition)
+        if (maybeEpochEndOffset.isEmpty) {
+          throw new KafkaException("No response received for partition: " + 
partition);
+        }
+
+        val epochEndOffset = maybeEpochEndOffset.get
+        if (epochEndOffset.errorCode() != Errors.NONE.code()) {
+          throw Errors.forCode(epochEndOffset.errorCode()).exception()
+        }
+
+        epochEndOffset
+    }
+
+    replicaMgr.localLog(partition).foreach { log =>
+      if (log.remoteStorageSystemEnable && 
log.config.remoteLogConfig.remoteStorageEnable) {
+        replicaMgr.remoteLogManager.foreach { rlm =>
+
+          // Find the respective leader epoch for (leaderLogStartOffset - 1)
+          val highestOffsetInRemoteFromLeader = leaderLogStartOffset - 1
+          val targetEpoch: Int = {
+            // If the existing epoch is 0, no need to fetch from earlier epoch 
as the desired offset(leaderLogStartOffset - 1)
+            // will have the same epoch.
+            if(epochForLeaderLocalLogStartOffset == 0) {
+              epochForLeaderLocalLogStartOffset
+            } else {
+              // Fetch the earlier epoch/end-offset from the leader.
+              val earlierEpochEndOffset = 
fetchEarlierEpochEndOffset(epochForLeaderLocalLogStartOffset)
+              // Check if the target offset lies with in the range of earlier 
epoch
+              if (earlierEpochEndOffset.endOffset >= 
highestOffsetInRemoteFromLeader)
+                earlierEpochEndOffset.leaderEpoch() // This gives the 
respective leader epoch, will handle any gaps in epochs
+              else epochForLeaderLocalLogStartOffset
+            }
+          }
+
+          val rlsMetadata = rlm.fetchRemoteLogSegmentMetadata(partition, 
targetEpoch, highestOffsetInRemoteFromLeader)
+
+          if (rlsMetadata.isPresent) {
+            val epochStream = 
rlm.storageManager().fetchIndex(rlsMetadata.get(), 
RemoteStorageManager.IndexType.LEADER_EPOCH)
+            val epochs = readLeaderEpochCheckpoint(epochStream)
+
+            // Truncate the existing local log before restoring the leader 
epoch cache and producer snapshots.
+            truncateFullyAndStartAt(partition, leaderLocalLogStartOffset)
+
+            log.maybeIncrementLogStartOffset(leaderLogStartOffset, 
LeaderOffsetIncremented)
+            epochs.foreach { epochEntry =>
+              log.leaderEpochCache.map(cache => cache.assign(epochEntry.epoch, 
epochEntry.startOffset))
+            }

Review comment:
       I refactored `assign` whether to flush or not with default as true for 
backward compatibility. In this case, I assign all the entries without flush, 
and flush to the file at the end.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to