junrao commented on a change in pull request #11390:
URL: https://github.com/apache/kafka/pull/11390#discussion_r788279027



##########
File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
##########
@@ -386,11 +393,138 @@ class ReplicaFetcherThread(name: String,
   }
 
   /**
-   *  To avoid ISR thrashing, we only throttle a replica on the follower if 
it's in the throttled replica list,
-   *  the quota is exceeded and the replica is not in sync.
+   * To avoid ISR thrashing, we only throttle a replica on the follower if 
it's in the throttled replica list,
+   * the quota is exceeded and the replica is not in sync.
    */
   private def shouldFollowerThrottle(quota: ReplicaQuota, fetchState: 
PartitionFetchState, topicPartition: TopicPartition): Boolean = {
     !fetchState.isReplicaInSync && quota.isThrottled(topicPartition) && 
quota.isQuotaExceeded
   }
 
+  /**
+   * It tries to build the required state for this partition from leader and 
remote storage so that it can start
+   * fetching records from the leader.
+   */
+  override protected def buildRemoteLogAuxState(partition: TopicPartition,
+                                                currentLeaderEpoch: Int,
+                                                leaderLocalLogStartOffset: 
Long,
+                                                
epochForLeaderLocalLogStartOffset: Int,
+                                                leaderLogStartOffset: Long): 
Unit = {
+
+    def fetchEarlierEpochEndOffset(epoch:Int): EpochEndOffset = {
+        val previousEpoch = epoch - 1
+        // Find the end-offset for the epoch earlier to the given epoch from 
the leader
+        val partitionsWithEpochs = Map(partition -> new 
EpochData().setPartition(partition.partition())
+          .setCurrentLeaderEpoch(currentLeaderEpoch)
+          .setLeaderEpoch(previousEpoch))
+        val maybeEpochEndOffset = 
fetchEpochEndOffsetsFromLeader(partitionsWithEpochs).get(partition)
+        if (maybeEpochEndOffset.isEmpty) {
+          throw new KafkaException("No response received for partition: " + 
partition);
+        }
+
+        val epochEndOffset = maybeEpochEndOffset.get
+        if (epochEndOffset.errorCode() != Errors.NONE.code()) {
+          throw Errors.forCode(epochEndOffset.errorCode()).exception()
+        }
+
+        epochEndOffset
+    }
+
+    replicaMgr.localLog(partition).foreach { log =>
+      if (log.remoteStorageSystemEnable && 
log.config.remoteLogConfig.remoteStorageEnable) {
+        replicaMgr.remoteLogManager.foreach { rlm =>
+
+          // Find the respective leader epoch for (leaderLocalLogStartOffset - 
1). We need to build the leader epoch cache
+          // until that offset
+          val highestOffsetInRemoteFromLeader = leaderLocalLogStartOffset - 1
+          val targetEpoch: Int = {
+            // If the existing epoch is 0, no need to fetch from earlier epoch 
as the desired offset(leaderLogStartOffset - 1)
+            // will have the same epoch.
+            if (epochForLeaderLocalLogStartOffset == 0) {
+              epochForLeaderLocalLogStartOffset
+            } else {
+              // Fetch the earlier epoch/end-offset(exclusive) from the leader.
+              val earlierEpochEndOffset = 
fetchEarlierEpochEndOffset(epochForLeaderLocalLogStartOffset)
+              // Check if the target offset lies with in the range of earlier 
epoch. Here, epoch's end-offset is exclusive.
+              if (earlierEpochEndOffset.endOffset > 
highestOffsetInRemoteFromLeader) {
+                // Always use the leader epoch from returned 
earlierEpochEndOffset.
+                // This gives the respective leader epoch, that will handle 
any gaps in epochs.
+                // For ex, leader epoch cache contains:
+                // leader-epoch   start-offset
+                //  0                    20
+                //  1                    85
+                //  <2> - gap no messages were appended in this leader epoch.
+                //  3                    90
+                //  4                    98
+                // There is a gap in leader epoch. For 
leaderLocalLogStartOffset as 90, leader-epoch is 3.
+                // fetchEarlierEpochEndOffset(3) will return leader-epoch as 
1, end-offset as 90.
+                // So, for offset 89, we should return leader epoch as 1 like 
below.
+                earlierEpochEndOffset.leaderEpoch()
+              } else epochForLeaderLocalLogStartOffset
+            }
+          }
+
+          val maybeRlsm = rlm.fetchRemoteLogSegmentMetadata(partition, 
targetEpoch, highestOffsetInRemoteFromLeader)
+
+          if (maybeRlsm.isPresent) {
+            val remoteLogSegmentMetadata = maybeRlsm.get()
+            // Build leader epoch cache, producer snapshots until 
remoteLogSegmentMetadata.endOffset() and start
+            // segments from (remoteLogSegmentMetadata.endOffset() + 1)
+            val nextOffset = remoteLogSegmentMetadata.endOffset() + 1
+            val epochStream = 
rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, 
RemoteStorageManager.IndexType.LEADER_EPOCH)
+            val epochs = readLeaderEpochCheckpoint(epochStream)
+
+            // Truncate the existing local log before restoring the leader 
epoch cache and producer snapshots.
+            truncateFullyAndStartAt(partition, nextOffset)

Review comment:
       We need to return nextOffset to the caller so that 
fetchOffsetAndApplyFun() could set the next fetch offset to this.

##########
File path: core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
##########
@@ -52,11 +52,11 @@ class LeaderEpochFileCache(topicPartition: TopicPartition,
     * Assigns the supplied Leader Epoch to the supplied Offset
     * Once the epoch is assigned it cannot be reassigned
     */
-  def assign(epoch: Int, startOffset: Long): Unit = {
+  def assign(epoch: Int, startOffset: Long, flushToFile: Boolean = true): Unit 
= {
     val entry = EpochEntry(epoch, startOffset)
     if (assign(entry)) {
       debug(s"Appended new epoch entry $entry. Cache now contains 
${epochs.size} entries.")
-      flush()
+      if(flushToFile) flush()

Review comment:
       space after if

##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -715,6 +727,58 @@ abstract class AbstractFetcherThread(name: String,
     }
   }
 
+  /**
+   * Handle a partition whose offset is out of range and return a new fetch 
offset.
+   */
+  protected def fetchOffsetAndTruncate(topicPartition: TopicPartition, 
topicId: Option[Uuid], currentLeaderEpoch: Int): PartitionFetchState = {
+    fetchOffsetAndApplyFun(topicPartition, topicId, currentLeaderEpoch,
+      (epoch, leaderLogStartOffset) => truncateFullyAndStartAt(topicPartition, 
leaderLogStartOffset))

Review comment:
       If we hit OffsetOutOfRangeException, we call 
fetchEarliestOffsetFromLeader() inside fetchOffsetAndApplyFun(), which fetches 
the localStartOffset. We then truncate the whole log and start fetching from 
localStartOffset. If the leader has data in remote storage, the follower won't 
have the remote log metadata for consumer fetch and won't have the same state 
(e.g. producer state) as the leader.

##########
File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
##########
@@ -386,11 +393,138 @@ class ReplicaFetcherThread(name: String,
   }
 
   /**
-   *  To avoid ISR thrashing, we only throttle a replica on the follower if 
it's in the throttled replica list,
-   *  the quota is exceeded and the replica is not in sync.
+   * To avoid ISR thrashing, we only throttle a replica on the follower if 
it's in the throttled replica list,
+   * the quota is exceeded and the replica is not in sync.
    */
   private def shouldFollowerThrottle(quota: ReplicaQuota, fetchState: 
PartitionFetchState, topicPartition: TopicPartition): Boolean = {
     !fetchState.isReplicaInSync && quota.isThrottled(topicPartition) && 
quota.isQuotaExceeded
   }
 
+  /**
+   * It tries to build the required state for this partition from leader and 
remote storage so that it can start
+   * fetching records from the leader.
+   */
+  override protected def buildRemoteLogAuxState(partition: TopicPartition,
+                                                currentLeaderEpoch: Int,
+                                                leaderLocalLogStartOffset: 
Long,
+                                                
epochForLeaderLocalLogStartOffset: Int,
+                                                leaderLogStartOffset: Long): 
Unit = {
+
+    def fetchEarlierEpochEndOffset(epoch:Int): EpochEndOffset = {
+        val previousEpoch = epoch - 1
+        // Find the end-offset for the epoch earlier to the given epoch from 
the leader
+        val partitionsWithEpochs = Map(partition -> new 
EpochData().setPartition(partition.partition())
+          .setCurrentLeaderEpoch(currentLeaderEpoch)
+          .setLeaderEpoch(previousEpoch))
+        val maybeEpochEndOffset = 
fetchEpochEndOffsetsFromLeader(partitionsWithEpochs).get(partition)
+        if (maybeEpochEndOffset.isEmpty) {
+          throw new KafkaException("No response received for partition: " + 
partition);
+        }
+
+        val epochEndOffset = maybeEpochEndOffset.get
+        if (epochEndOffset.errorCode() != Errors.NONE.code()) {
+          throw Errors.forCode(epochEndOffset.errorCode()).exception()
+        }
+
+        epochEndOffset
+    }
+
+    replicaMgr.localLog(partition).foreach { log =>
+      if (log.remoteStorageSystemEnable && 
log.config.remoteLogConfig.remoteStorageEnable) {
+        replicaMgr.remoteLogManager.foreach { rlm =>
+
+          // Find the respective leader epoch for (leaderLocalLogStartOffset - 
1). We need to build the leader epoch cache
+          // until that offset
+          val highestOffsetInRemoteFromLeader = leaderLocalLogStartOffset - 1

Review comment:
       It's possible for the local data to overlap a bit with what's in the 
remote storage. So, leaderLocalLogStartOffset - 1 is not necessarily the 
highest offset in remote storage. Could we name highestOffsetInRemoteFromLeader 
more properly?

##########
File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
##########
@@ -386,11 +393,138 @@ class ReplicaFetcherThread(name: String,
   }
 
   /**
-   *  To avoid ISR thrashing, we only throttle a replica on the follower if 
it's in the throttled replica list,
-   *  the quota is exceeded and the replica is not in sync.
+   * To avoid ISR thrashing, we only throttle a replica on the follower if 
it's in the throttled replica list,
+   * the quota is exceeded and the replica is not in sync.
    */
   private def shouldFollowerThrottle(quota: ReplicaQuota, fetchState: 
PartitionFetchState, topicPartition: TopicPartition): Boolean = {
     !fetchState.isReplicaInSync && quota.isThrottled(topicPartition) && 
quota.isQuotaExceeded
   }
 
+  /**
+   * It tries to build the required state for this partition from leader and 
remote storage so that it can start
+   * fetching records from the leader.
+   */
+  override protected def buildRemoteLogAuxState(partition: TopicPartition,
+                                                currentLeaderEpoch: Int,
+                                                leaderLocalLogStartOffset: 
Long,
+                                                
epochForLeaderLocalLogStartOffset: Int,
+                                                leaderLogStartOffset: Long): 
Unit = {
+
+    def fetchEarlierEpochEndOffset(epoch:Int): EpochEndOffset = {
+        val previousEpoch = epoch - 1
+        // Find the end-offset for the epoch earlier to the given epoch from 
the leader
+        val partitionsWithEpochs = Map(partition -> new 
EpochData().setPartition(partition.partition())
+          .setCurrentLeaderEpoch(currentLeaderEpoch)
+          .setLeaderEpoch(previousEpoch))
+        val maybeEpochEndOffset = 
fetchEpochEndOffsetsFromLeader(partitionsWithEpochs).get(partition)
+        if (maybeEpochEndOffset.isEmpty) {
+          throw new KafkaException("No response received for partition: " + 
partition);
+        }
+
+        val epochEndOffset = maybeEpochEndOffset.get
+        if (epochEndOffset.errorCode() != Errors.NONE.code()) {
+          throw Errors.forCode(epochEndOffset.errorCode()).exception()
+        }
+
+        epochEndOffset
+    }
+
+    replicaMgr.localLog(partition).foreach { log =>
+      if (log.remoteStorageSystemEnable && 
log.config.remoteLogConfig.remoteStorageEnable) {
+        replicaMgr.remoteLogManager.foreach { rlm =>
+
+          // Find the respective leader epoch for (leaderLocalLogStartOffset - 
1). We need to build the leader epoch cache
+          // until that offset
+          val highestOffsetInRemoteFromLeader = leaderLocalLogStartOffset - 1
+          val targetEpoch: Int = {
+            // If the existing epoch is 0, no need to fetch from earlier epoch 
as the desired offset(leaderLogStartOffset - 1)
+            // will have the same epoch.
+            if (epochForLeaderLocalLogStartOffset == 0) {
+              epochForLeaderLocalLogStartOffset
+            } else {
+              // Fetch the earlier epoch/end-offset(exclusive) from the leader.
+              val earlierEpochEndOffset = 
fetchEarlierEpochEndOffset(epochForLeaderLocalLogStartOffset)
+              // Check if the target offset lies with in the range of earlier 
epoch. Here, epoch's end-offset is exclusive.
+              if (earlierEpochEndOffset.endOffset > 
highestOffsetInRemoteFromLeader) {
+                // Always use the leader epoch from returned 
earlierEpochEndOffset.
+                // This gives the respective leader epoch, that will handle 
any gaps in epochs.
+                // For ex, leader epoch cache contains:
+                // leader-epoch   start-offset
+                //  0                    20
+                //  1                    85
+                //  <2> - gap no messages were appended in this leader epoch.
+                //  3                    90
+                //  4                    98
+                // There is a gap in leader epoch. For 
leaderLocalLogStartOffset as 90, leader-epoch is 3.
+                // fetchEarlierEpochEndOffset(3) will return leader-epoch as 
1, end-offset as 90.

Review comment:
       fetchEarlierEpochEndOffset(3) => fetchEarlierEpochEndOffset(2) ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to