hachikuji commented on a change in pull request #9382: URL: https://github.com/apache/kafka/pull/9382#discussion_r512370043
########## File path: core/src/main/scala/kafka/log/Log.scala ########## @@ -1388,6 +1390,7 @@ class Log(@volatile private var _dir: File, var validBytesCount = 0 var firstOffset: Option[Long] = None var lastOffset = -1L + var lastLeaderEpoch: Option[Int] = None Review comment: nit: not sure how much it matters, but maybe we can avoid the extra garbage and just use an integer until we're ready to build the result? ########## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ########## @@ -426,21 +451,34 @@ abstract class AbstractFetcherThread(name: String, warn(s"Partition $topicPartition marked as failed") } - def addPartitions(initialFetchStates: Map[TopicPartition, OffsetAndEpoch]): Set[TopicPartition] = { + /** + * Returns initial partition fetch state based on current state and the provided `initialFetchState`. + * From IBP 2.7 onwards, we can rely on truncation based on diverging data returned in fetch responses. + * For older versions, we can skip the truncation step iff the leader epoch matches the existing epoch. + */ + private def partitionFetchState(tp: TopicPartition, initialFetchState: InitialFetchState, currentState: PartitionFetchState): PartitionFetchState = { + if (currentState != null && currentState.currentLeaderEpoch == initialFetchState.currentLeaderEpoch) { + currentState + } else if (isTruncationOnFetchSupported && initialFetchState.initOffset >= 0 && initialFetchState.lastFetchedEpoch.nonEmpty && + (currentState == null || currentState.state == Fetching)) { + PartitionFetchState(initialFetchState.initOffset, None, initialFetchState.currentLeaderEpoch, + state = Fetching, initialFetchState.lastFetchedEpoch) Review comment: I am wondering in what situation we would find `currentState` non-null here. The current logic in `ReplicaManager.makeFollowers` always calls `removeFetcherForPartitions` before adding the partition back. The reason I ask is that I wasn't sure we should be taking the last fetched epoch from the initial state or if we should keep the current one. It seems like the latter might be more current? ########## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ########## @@ -221,7 +223,15 @@ abstract class AbstractFetcherThread(name: String, val ResultWithPartitions(fetchOffsets, partitionsWithError) = maybeTruncateToEpochEndOffsets(epochEndOffsets, latestEpochsForPartitions) handlePartitionsWithErrors(partitionsWithError, "truncateToEpochEndOffsets") - updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets) + updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets, isTruncationOnFetchSupported) + } + } + + private def truncateOnFetchResponse(epochEndOffsets: Map[TopicPartition, EpochEndOffset]): Unit = { + inLock(partitionMapLock) { + val ResultWithPartitions(fetchOffsets, partitionsWithError) = maybeTruncateToEpochEndOffsets(epochEndOffsets, Map.empty) + handlePartitionsWithErrors(partitionsWithError, "truncateOnFetchResponse") + updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets, maySkipTruncation = false) Review comment: It's not clear to me why we set `maySkipTruncation` to false here. If the truncation is not complete, wouldn't that put us in the `Truncating` state? ########## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ########## @@ -454,15 +492,23 @@ abstract class AbstractFetcherThread(name: String, * truncation completed if their offsetTruncationState indicates truncation completed * * @param fetchOffsets the partitions to update fetch offset and maybe mark truncation complete + * @param maySkipTruncation true if we can stay in Fetching mode and perform truncation later based on + * diverging epochs from fetch responses. */ - private def updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets: Map[TopicPartition, OffsetTruncationState]): Unit = { + private def updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets: Map[TopicPartition, OffsetTruncationState], + maySkipTruncation: Boolean): Unit = { val newStates: Map[TopicPartition, PartitionFetchState] = partitionStates.partitionStateMap.asScala .map { case (topicPartition, currentFetchState) => val maybeTruncationComplete = fetchOffsets.get(topicPartition) match { case Some(offsetTruncationState) => - val state = if (offsetTruncationState.truncationCompleted) Fetching else Truncating + val (state, lastFetchedEpoch) = if (offsetTruncationState.truncationCompleted) + (Fetching, latestEpoch(topicPartition)) + else if (maySkipTruncation && currentFetchState.lastFetchedEpoch.nonEmpty) + (Fetching, currentFetchState.lastFetchedEpoch) Review comment: I'm a little uncertain about this case. If we have truncated to an earlier offset, wouldn't we also need to reset last fetched epoch? I am thinking we should remove this check and modify the first one: ```scala val (state, lastFetchedEpoch) = if (maySkipTruncation || offsetTruncationState.truncationCompleted) (Fetching, latestEpoch(topicPartition)) ``` I might be missing something though. ########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -1667,8 +1667,9 @@ class ReplicaManager(val config: KafkaConfig, val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map { partition => val leader = metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get .brokerEndPoint(config.interBrokerListenerName) - val fetchOffset = partition.localLogOrException.highWatermark - partition.topicPartition -> InitialFetchState(leader, partition.getLeaderEpoch, fetchOffset) + val log = partition.localLogOrException + val (fetchOffset, lastFetchedEpoch) = initialFetchOffsetAndEpoch(log) + partition.topicPartition -> InitialFetchState(leader, partition.getLeaderEpoch, fetchOffset, lastFetchedEpoch) Review comment: nit: line below is misaligned ########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -770,7 +770,7 @@ class ReplicaManager(val config: KafkaConfig, logManager.abortAndPauseCleaning(topicPartition) val initialFetchState = InitialFetchState(BrokerEndPoint(config.brokerId, "localhost", -1), - partition.getLeaderEpoch, futureLog.highWatermark) + partition.getLeaderEpoch, futureLog.highWatermark, lastFetchedEpoch = None) Review comment: Sounds fair. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org