satishd commented on a change in pull request #11390:
URL: https://github.com/apache/kafka/pull/11390#discussion_r784801502



##########
File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
##########
@@ -386,11 +397,109 @@ class ReplicaFetcherThread(name: String,
   }
 
   /**
-   *  To avoid ISR thrashing, we only throttle a replica on the follower if 
it's in the throttled replica list,
-   *  the quota is exceeded and the replica is not in sync.
+   * To avoid ISR thrashing, we only throttle a replica on the follower if 
it's in the throttled replica list,
+   * the quota is exceeded and the replica is not in sync.
    */
   private def shouldFollowerThrottle(quota: ReplicaQuota, fetchState: 
PartitionFetchState, topicPartition: TopicPartition): Boolean = {
     !fetchState.isReplicaInSync && quota.isThrottled(topicPartition) && 
quota.isQuotaExceeded
   }
 
+  /**
+   * It tries to build the required state for this partition from leader and 
remote storage so that it can start
+   * fetching records from the leader.
+   */
+  override protected def buildRemoteLogAuxState(partition: TopicPartition,
+                                                currentLeaderEpoch: Int,
+                                                leaderLocalLogStartOffset: 
Long,
+                                                
epochForLeaderLocalLogStartOffset: Int,
+                                                leaderLogStartOffset: Long): 
Unit = {
+
+    def fetchEarlierEpochEndOffset(epoch:Int): EpochEndOffset = {
+        val previousEpoch = epoch - 1
+        // Find the end-offset for the epoch earlier to the given epoch from 
the leader
+        val partitionsWithEpochs = Map(partition -> new 
EpochData().setPartition(partition.partition())
+          .setCurrentLeaderEpoch(currentLeaderEpoch)
+          .setLeaderEpoch(previousEpoch))
+        val maybeEpochEndOffset = 
fetchEpochEndOffsetsFromLeader(partitionsWithEpochs).get(partition)
+        if (maybeEpochEndOffset.isEmpty) {
+          throw new KafkaException("No response received for partition: " + 
partition);
+        }
+
+        val epochEndOffset = maybeEpochEndOffset.get
+        if (epochEndOffset.errorCode() != Errors.NONE.code()) {
+          throw Errors.forCode(epochEndOffset.errorCode()).exception()
+        }
+
+        epochEndOffset
+    }
+
+    replicaMgr.localLog(partition).foreach { log =>
+      if (log.remoteStorageSystemEnable && 
log.config.remoteLogConfig.remoteStorageEnable) {
+        replicaMgr.remoteLogManager.foreach { rlm =>
+
+          // Find the respective leader epoch for (leaderLogStartOffset - 1)
+          val highestOffsetInRemoteFromLeader = leaderLogStartOffset - 1
+          val targetEpoch: Int = {
+            // If the existing epoch is 0, no need to fetch from earlier epoch 
as the desired offset(leaderLogStartOffset - 1)
+            // will have the same epoch.
+            if(epochForLeaderLocalLogStartOffset == 0) {
+              epochForLeaderLocalLogStartOffset
+            } else {
+              // Fetch the earlier epoch/end-offset from the leader.
+              val earlierEpochEndOffset = 
fetchEarlierEpochEndOffset(epochForLeaderLocalLogStartOffset)
+              // Check if the target offset lies with in the range of earlier 
epoch
+              if (earlierEpochEndOffset.endOffset >= 
highestOffsetInRemoteFromLeader)
+                earlierEpochEndOffset.leaderEpoch() // This gives the 
respective leader epoch, will handle any gaps in epochs
+              else epochForLeaderLocalLogStartOffset
+            }
+          }
+
+          val rlsMetadata = rlm.fetchRemoteLogSegmentMetadata(partition, 
targetEpoch, highestOffsetInRemoteFromLeader)

Review comment:
       If it does not catchup then it returns `Optional.EMPTY`, and that we 
throw a RemoteStorageException. As the caller receives RemoteStorageException, 
it will be retried again.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to