satishd commented on a change in pull request #11390: URL: https://github.com/apache/kafka/pull/11390#discussion_r786517360
########## File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ########## @@ -386,11 +397,109 @@ class ReplicaFetcherThread(name: String, } /** - * To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list, - * the quota is exceeded and the replica is not in sync. + * To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list, + * the quota is exceeded and the replica is not in sync. */ private def shouldFollowerThrottle(quota: ReplicaQuota, fetchState: PartitionFetchState, topicPartition: TopicPartition): Boolean = { !fetchState.isReplicaInSync && quota.isThrottled(topicPartition) && quota.isQuotaExceeded } + /** + * It tries to build the required state for this partition from leader and remote storage so that it can start + * fetching records from the leader. + */ + override protected def buildRemoteLogAuxState(partition: TopicPartition, + currentLeaderEpoch: Int, + leaderLocalLogStartOffset: Long, + epochForLeaderLocalLogStartOffset: Int, + leaderLogStartOffset: Long): Unit = { + + def fetchEarlierEpochEndOffset(epoch:Int): EpochEndOffset = { + val previousEpoch = epoch - 1 + // Find the end-offset for the epoch earlier to the given epoch from the leader + val partitionsWithEpochs = Map(partition -> new EpochData().setPartition(partition.partition()) + .setCurrentLeaderEpoch(currentLeaderEpoch) + .setLeaderEpoch(previousEpoch)) + val maybeEpochEndOffset = fetchEpochEndOffsetsFromLeader(partitionsWithEpochs).get(partition) + if (maybeEpochEndOffset.isEmpty) { + throw new KafkaException("No response received for partition: " + partition); + } + + val epochEndOffset = maybeEpochEndOffset.get + if (epochEndOffset.errorCode() != Errors.NONE.code()) { + throw Errors.forCode(epochEndOffset.errorCode()).exception() + } + + epochEndOffset + } + + replicaMgr.localLog(partition).foreach { log => + if (log.remoteStorageSystemEnable && log.config.remoteLogConfig.remoteStorageEnable) { + replicaMgr.remoteLogManager.foreach { rlm => + + // Find the respective leader epoch for (leaderLogStartOffset - 1) + val highestOffsetInRemoteFromLeader = leaderLogStartOffset - 1 + val targetEpoch: Int = { + // If the existing epoch is 0, no need to fetch from earlier epoch as the desired offset(leaderLogStartOffset - 1) + // will have the same epoch. + if(epochForLeaderLocalLogStartOffset == 0) { + epochForLeaderLocalLogStartOffset + } else { + // Fetch the earlier epoch/end-offset from the leader. + val earlierEpochEndOffset = fetchEarlierEpochEndOffset(epochForLeaderLocalLogStartOffset) + // Check if the target offset lies with in the range of earlier epoch + if (earlierEpochEndOffset.endOffset >= highestOffsetInRemoteFromLeader) + earlierEpochEndOffset.leaderEpoch() // This gives the respective leader epoch, will handle any gaps in epochs + else epochForLeaderLocalLogStartOffset + } + } + + val rlsMetadata = rlm.fetchRemoteLogSegmentMetadata(partition, targetEpoch, highestOffsetInRemoteFromLeader) + + if (rlsMetadata.isPresent) { + val epochStream = rlm.storageManager().fetchIndex(rlsMetadata.get(), RemoteStorageManager.IndexType.LEADER_EPOCH) + val epochs = readLeaderEpochCheckpoint(epochStream) + + // Truncate the existing local log before restoring the leader epoch cache and producer snapshots. + truncateFullyAndStartAt(partition, leaderLocalLogStartOffset) + + log.maybeIncrementLogStartOffset(leaderLogStartOffset, LeaderOffsetIncremented) + epochs.foreach { epochEntry => + log.leaderEpochCache.map(cache => cache.assign(epochEntry.epoch, epochEntry.startOffset)) + } Review comment: I refactored `assign` whether to flush or not with default as true for backward compatibility. In this case, I assign all the entries without flush, and flush to the file at the end. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org