[GitHub] [kafka] hachikuji commented on a change in pull request #9382: KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response
hachikuji commented on a change in pull request #9382: URL: https://github.com/apache/kafka/pull/9382#discussion_r534424325 ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -426,21 +451,35 @@ abstract class AbstractFetcherThread(name: String, warn(s"Partition $topicPartition marked as failed") } - def addPartitions(initialFetchStates: Map[TopicPartition, OffsetAndEpoch]): Set[TopicPartition] = { + /** + * Returns initial partition fetch state based on current state and the provided `initialFetchState`. + * From IBP 2.7 onwards, we can rely on truncation based on diverging data returned in fetch responses. + * For older versions, we can skip the truncation step iff the leader epoch matches the existing epoch. + */ + private def partitionFetchState(tp: TopicPartition, initialFetchState: InitialFetchState, currentState: PartitionFetchState): PartitionFetchState = { +if (currentState != null && currentState.currentLeaderEpoch == initialFetchState.currentLeaderEpoch) { + currentState +} else if (initialFetchState.initOffset < 0) { + fetchOffsetAndTruncate(tp, initialFetchState.currentLeaderEpoch) +} else if (isTruncationOnFetchSupported) { + val lastFetchedEpoch = latestEpoch(tp) + val state = if (lastFetchedEpoch.exists(_ != EpochEndOffset.UNDEFINED_EPOCH)) Fetching else Truncating Review comment: Hmm.. Do we actually return `Some(EpochEndOffset.UNDEFINED_EPOCH)` from `latestEpoch`? That seems surprising. Might be worth a comment here that we still go through the `Truncating` state here when the message format is old. ## File path: core/src/main/scala/kafka/server/AbstractFetcherManager.scala ## @@ -64,8 +64,8 @@ abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name: Stri def resizeThreadPool(newSize: Int): Unit = { def migratePartitions(newSize: Int): Unit = { fetcherThreadMap.forKeyValue { (id, thread) => -val removedPartitions = thread.partitionsAndOffsets -removeFetcherForPartitions(removedPartitions.keySet) +val removedPartitions = thread.removeAllPartitions() +removeFetcherForPartitions(removedPartitions.keySet) // clear state for removed partitions Review comment: This reads a bit odd following `removeAllPartitions`. I guess what we get from `removeFetcherForPartitions` is the clearing of `failedPartitions` and de-registration from `fetcherLagStats`. Not super important, but wonder if it's worth trying to consolidate a little. Maybe `removeFetcherForPartitions` could return the initial fetch offsets or something. ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1691,6 +1692,18 @@ class ReplicaManager(val config: KafkaConfig, partitionsToMakeFollower } + /** + * From IBP 2.7 onwards, we send latest fetch epoch in the request and truncate if a + * diverging epoch is returned in the response, avoiding the need for a separate + * OffsetForLeaderEpoch request. + */ + private def initialFetchOffset(log: Log): Long = { Review comment: I think this could be saved for a follow-up, but I wonder if we should consider similarly letting the initial offset be determined by the fetcher thread on initialization rather than being passed in. I find it confusing that we expect this to be the high watermark in some cases. It seems a little slippery the way we rely on it in `AbstractFetcherThread.truncateToHighWatermark`. ## File path: core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala ## @@ -453,6 +466,107 @@ class ReplicaFetcherThreadTest { truncateToCapture.getValues.asScala.contains(101)) } + @Test + def shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower(): Unit = { + +// Create a capture to track what partitions/offsets are truncated +val truncateToCapture: Capture[Long] = newCapture(CaptureType.ALL) + +val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234")) + +// Setup all dependencies +val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager]) +val logManager: LogManager = createMock(classOf[LogManager]) +val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager]) +val log: Log = createNiceMock(classOf[Log]) +val partition: Partition = createNiceMock(classOf[Partition]) +val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager]) + +val initialLEO = 200 +var latestLogEpoch: Option[Int] = Some(5) + +// Stubs +expect(partition.truncateTo(capture(truncateToCapture), anyBoolean())).anyTimes() +expect(partition.localLogOrException).andReturn(log).anyTimes() +expect(log.highWatermark).andReturn(115).anyTimes() +expect(log.latestEpoch).andAnswer(() =>
[GitHub] [kafka] hachikuji commented on a change in pull request #9382: KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response
hachikuji commented on a change in pull request #9382: URL: https://github.com/apache/kafka/pull/9382#discussion_r533648722 ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -426,21 +451,34 @@ abstract class AbstractFetcherThread(name: String, warn(s"Partition $topicPartition marked as failed") } - def addPartitions(initialFetchStates: Map[TopicPartition, OffsetAndEpoch]): Set[TopicPartition] = { + /** + * Returns initial partition fetch state based on current state and the provided `initialFetchState`. + * From IBP 2.7 onwards, we can rely on truncation based on diverging data returned in fetch responses. + * For older versions, we can skip the truncation step iff the leader epoch matches the existing epoch. + */ + private def partitionFetchState(tp: TopicPartition, initialFetchState: InitialFetchState, currentState: PartitionFetchState): PartitionFetchState = { +if (currentState != null && currentState.currentLeaderEpoch == initialFetchState.currentLeaderEpoch) { + currentState +} else if (isTruncationOnFetchSupported && initialFetchState.initOffset >= 0 && initialFetchState.lastFetchedEpoch.nonEmpty && + (currentState == null || currentState.state == Fetching)) { + PartitionFetchState(initialFetchState.initOffset, None, initialFetchState.currentLeaderEpoch, + state = Fetching, initialFetchState.lastFetchedEpoch) Review comment: This check is a still a little hard to follow. I think we expect that if `initOffset` is negative, then `lastFetchedEpoch` will be empty and we will hit the `fetchOffsetAndTruncate` case below. Is that right? On the other hand, if `lastFetchedEpoch` is empty, then `initOffset` could still be non-negative if we have an old message format, which means we need to enter `Truncating` so that we can truncate to the high watermark. One case that is not so clear is when `currentState` is non-null. Then we will enter the `Truncating` state below regardless whether `isTruncationOnFetchSupported` is set or not. Is that what we want? ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -669,11 +714,18 @@ abstract class AbstractFetcherThread(name: String, Option(partitionStates.stateValue(topicPartition)) } + /** + * Returns current fetch state for each partition assigned to this thread. This is used to reassign + * partitions when thread pool is resized. We return `lastFetchedEpoch=None` to ensure we go through Review comment: This is probably ok. I guess an alternative would be to not take the initial last fetched epoch from `InitialFetchState`, but instead use `latestEpoch`. ## File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ## @@ -102,6 +103,7 @@ class ReplicaFetcherThread(name: String, private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes private val fetchSize = brokerConfig.replicaFetchMaxBytes private val brokerSupportsLeaderEpochRequest = brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV2 + private val brokerSupportsTruncationOnFetch = ApiVersion.isTruncationOnFetchSupported(brokerConfig.interBrokerProtocolVersion) Review comment: nit: I don't think we need this. We can override `isTruncationOnFetchSupported` with a `val` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9382: KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response
hachikuji commented on a change in pull request #9382: URL: https://github.com/apache/kafka/pull/9382#discussion_r512370043 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -1388,6 +1390,7 @@ class Log(@volatile private var _dir: File, var validBytesCount = 0 var firstOffset: Option[Long] = None var lastOffset = -1L +var lastLeaderEpoch: Option[Int] = None Review comment: nit: not sure how much it matters, but maybe we can avoid the extra garbage and just use an integer until we're ready to build the result? ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -426,21 +451,34 @@ abstract class AbstractFetcherThread(name: String, warn(s"Partition $topicPartition marked as failed") } - def addPartitions(initialFetchStates: Map[TopicPartition, OffsetAndEpoch]): Set[TopicPartition] = { + /** + * Returns initial partition fetch state based on current state and the provided `initialFetchState`. + * From IBP 2.7 onwards, we can rely on truncation based on diverging data returned in fetch responses. + * For older versions, we can skip the truncation step iff the leader epoch matches the existing epoch. + */ + private def partitionFetchState(tp: TopicPartition, initialFetchState: InitialFetchState, currentState: PartitionFetchState): PartitionFetchState = { +if (currentState != null && currentState.currentLeaderEpoch == initialFetchState.currentLeaderEpoch) { + currentState +} else if (isTruncationOnFetchSupported && initialFetchState.initOffset >= 0 && initialFetchState.lastFetchedEpoch.nonEmpty && + (currentState == null || currentState.state == Fetching)) { + PartitionFetchState(initialFetchState.initOffset, None, initialFetchState.currentLeaderEpoch, + state = Fetching, initialFetchState.lastFetchedEpoch) Review comment: I am wondering in what situation we would find `currentState` non-null here. The current logic in `ReplicaManager.makeFollowers` always calls `removeFetcherForPartitions` before adding the partition back. The reason I ask is that I wasn't sure we should be taking the last fetched epoch from the initial state or if we should keep the current one. It seems like the latter might be more current? ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -221,7 +223,15 @@ abstract class AbstractFetcherThread(name: String, val ResultWithPartitions(fetchOffsets, partitionsWithError) = maybeTruncateToEpochEndOffsets(epochEndOffsets, latestEpochsForPartitions) handlePartitionsWithErrors(partitionsWithError, "truncateToEpochEndOffsets") - updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets) + updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets, isTruncationOnFetchSupported) +} + } + + private def truncateOnFetchResponse(epochEndOffsets: Map[TopicPartition, EpochEndOffset]): Unit = { +inLock(partitionMapLock) { + val ResultWithPartitions(fetchOffsets, partitionsWithError) = maybeTruncateToEpochEndOffsets(epochEndOffsets, Map.empty) + handlePartitionsWithErrors(partitionsWithError, "truncateOnFetchResponse") + updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets, maySkipTruncation = false) Review comment: It's not clear to me why we set `maySkipTruncation` to false here. If the truncation is not complete, wouldn't that put us in the `Truncating` state? ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -454,15 +492,23 @@ abstract class AbstractFetcherThread(name: String, * truncation completed if their offsetTruncationState indicates truncation completed * * @param fetchOffsets the partitions to update fetch offset and maybe mark truncation complete +* @param maySkipTruncation true if we can stay in Fetching mode and perform truncation later based on + * diverging epochs from fetch responses. */ - private def updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets: Map[TopicPartition, OffsetTruncationState]): Unit = { + private def updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets: Map[TopicPartition, OffsetTruncationState], + maySkipTruncation: Boolean): Unit = { val newStates: Map[TopicPartition, PartitionFetchState] = partitionStates.partitionStateMap.asScala .map { case (topicPartition, currentFetchState) => val maybeTruncationComplete = fetchOffsets.get(topicPartition) match { case Some(offsetTruncationState) => -val state = if (offsetTruncationState.truncationCompleted) Fetching else Truncating +val (state, lastFetchedEpoch) = if (offsetTruncationState.truncationCompleted) + (Fetching, latestEpoch(topicPartition)) +else if
[GitHub] [kafka] hachikuji commented on a change in pull request #9382: KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response
hachikuji commented on a change in pull request #9382: URL: https://github.com/apache/kafka/pull/9382#discussion_r509482332 ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -461,8 +510,9 @@ abstract class AbstractFetcherThread(name: String, val maybeTruncationComplete = fetchOffsets.get(topicPartition) match { case Some(offsetTruncationState) => val state = if (offsetTruncationState.truncationCompleted) Fetching else Truncating +// Resetting `lastFetchedEpoch` since we are truncating and don't expect diverging epoch in the next fetch Review comment: This is a little unclear to me. I guess it is safe to reset `lastFetchedEpoch` as long as we reinitialize it after the next leader change. On the other hand, it seems safer to always retain the value. ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -426,21 +454,42 @@ abstract class AbstractFetcherThread(name: String, warn(s"Partition $topicPartition marked as failed") } - def addPartitions(initialFetchStates: Map[TopicPartition, OffsetAndEpoch]): Set[TopicPartition] = { + /** + * Returns initial partition fetch state based on current state and the provided `initialFetchState`. + * From IBP 2.7 onwards, we can rely on truncation based on diverging data returned in fetch responses. + * For older versions, we can skip the truncation step iff the leader epoch matches the existing epoch. + */ + private def partitionFetchState(tp: TopicPartition, initialFetchState: InitialFetchState, currentState: PartitionFetchState): PartitionFetchState = { +if (isTruncationOnFetchSupported && initialFetchState.initOffset >= 0 && initialFetchState.lastFetchedEpoch.nonEmpty) { + if (currentState == null) { +return PartitionFetchState(initialFetchState.initOffset, None, initialFetchState.currentLeaderEpoch, + state = Fetching, initialFetchState.lastFetchedEpoch) + } + // If we are in `Fetching` state can continue to fetch regardless of current leader epoch and truncate + // if necessary based on diverging epochs returned by the leader. If we are currently in Truncating state, + // fall through and handle based on current epoch. + if (currentState.state == Fetching) { +return currentState Review comment: Is it not possible that the `InitialFetchState` has a bump to the current leader epoch? We will still need the latest epoch in order to continue fetching. ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -341,11 +352,18 @@ abstract class AbstractFetcherThread(name: String, // ReplicaDirAlterThread may have removed topicPartition from the partitionStates after processing the partition data if (validBytes > 0 && partitionStates.contains(topicPartition)) { // Update partitionStates only if there is no exception during processPartitionData -val newFetchState = PartitionFetchState(nextOffset, Some(lag), currentFetchState.currentLeaderEpoch, state = Fetching) +val newFetchState = PartitionFetchState(nextOffset, Some(lag), + currentFetchState.currentLeaderEpoch, state = Fetching, + Some(currentFetchState.currentLeaderEpoch)) Review comment: This doesn't seem right. The last fetched epoch is supposed to represent the epoch of the last fetched batch. The fetcher could be fetching the data from an older epoch here. ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -461,8 +510,9 @@ abstract class AbstractFetcherThread(name: String, val maybeTruncationComplete = fetchOffsets.get(topicPartition) match { case Some(offsetTruncationState) => val state = if (offsetTruncationState.truncationCompleted) Fetching else Truncating Review comment: Do we need to adjust this? I think we want to remain in the `Fetching` state if truncation detection is through `Fetch`. ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -629,7 +680,9 @@ abstract class AbstractFetcherThread(name: String, val initialLag = leaderEndOffset - offsetToFetch fetcherLagStats.getAndMaybePut(topicPartition).lag = initialLag - PartitionFetchState(offsetToFetch, Some(initialLag), currentLeaderEpoch, state = Fetching) + // We don't expect diverging epochs from the next fetch request, so resetting `lastFetchedEpoch` Review comment: Again it seems safe to keep `lastFetchedEpoch` in sync with the local log. If we have done a full truncation above, then `lastFetchedEpoch` will be `None`, but otherwise it seems like we should set it. ## File path:
[GitHub] [kafka] hachikuji commented on a change in pull request #9382: KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response
hachikuji commented on a change in pull request #9382: URL: https://github.com/apache/kafka/pull/9382#discussion_r504331562 ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -813,8 +852,9 @@ case class OffsetTruncationState(offset: Long, truncationCompleted: Boolean) { override def toString: String = "offset:%d-truncationCompleted:%b".format(offset, truncationCompleted) } -case class OffsetAndEpoch(offset: Long, leaderEpoch: Int) { +case class OffsetAndEpoch(offset: Long, leaderEpoch: Int, lastFetchedEpoch: Option[Int] = None) { Review comment: Wondering if it might be better not to change this type since it is used in contexts where `lastFetchedEpoch` is not relevant. Following the types through here, we first have use `InitialFetchState` in `AbstractFetcherManager`: ```scala def addFetcherForPartitions(partitionAndOffsets: Map[TopicPartition, InitialFetchState]) ``` We then convert to `OffsetAndEpoch` which gets passed down to `AbstractFetcherThread`: ```scala def addPartitions(initialFetchStates: Map[TopicPartition, OffsetAndEpoch]): Set[TopicPartition] ``` Then this gets converted to `PartitionFetchState`. I wonder if it's possible to skip the conversion through `OffsetAndEpoch` and use `InitialFetchState` consistently? Perhaps the only reason the current code doesn't do that is that `InitialFetchState` includes the broker end point which is not really relevant to the fetcher thread. Maybe that's not such a big deal? ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -432,14 +455,22 @@ abstract class AbstractFetcherThread(name: String, failedPartitions.removeAll(initialFetchStates.keySet) initialFetchStates.forKeyValue { (tp, initialFetchState) => -// We can skip the truncation step iff the leader epoch matches the existing epoch +// For IBP 2.7 onwards, we can rely on truncation based on diverging data returned in fetch responses. +// For older versions, we can skip the truncation step iff the leader epoch matches the existing epoch val currentState = partitionStates.stateValue(tp) -val updatedState = if (currentState != null && currentState.currentLeaderEpoch == initialFetchState.leaderEpoch) { +val updatedState = if (initialFetchState.offset >= 0 && isTruncationOnFetchSupported && initialFetchState.lastFetchedEpoch.nonEmpty) { + if (currentState != null) +currentState + else +PartitionFetchState(initialFetchState.offset, None, initialFetchState.leaderEpoch, + state = Fetching, initialFetchState.lastFetchedEpoch) +} else if (currentState != null && (currentState.currentLeaderEpoch == initialFetchState.leaderEpoch)) { Review comment: nit: unnecessary parenthesis ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -225,6 +227,20 @@ abstract class AbstractFetcherThread(name: String, } } + private def truncateOnFetchResponse(responseData: Map[TopicPartition, FetchData]): Unit = { +val epochEndOffsets = responseData + .filter { case (tp, fetchData) => fetchData.error == Errors.NONE && fetchData.divergingEpoch.isPresent } + .map { case (tp, fetchData) => +val divergingEpoch = fetchData.divergingEpoch.get +tp -> new EpochEndOffset(Errors.NONE, divergingEpoch.epoch, divergingEpoch.endOffset) + }.toMap +inLock(partitionMapLock) { Review comment: Borderline overkill perhaps, but we could check if `epochEndOffsets` is non-empty before acquiring the lock ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -225,6 +227,20 @@ abstract class AbstractFetcherThread(name: String, } } + private def truncateOnFetchResponse(responseData: Map[TopicPartition, FetchData]): Unit = { +val epochEndOffsets = responseData Review comment: Rather than doing an additional pass over the response partitions, would it be reasonable to build `epochEndOffsets` inline with the other error handling in `processFetchRequest`? ## File path: core/src/main/scala/kafka/server/DelayedFetch.scala ## @@ -77,6 +78,7 @@ class DelayedFetch(delayMs: Long, * Case E: This broker is the leader, but the requested epoch is now fenced * Case F: The fetch offset locates not on the last segment of the log * Case G: The accumulated bytes from all the fetching partitions exceeds the minimum bytes + * Case H: A diverging epoch was found, return response to trigger truncation Review comment: Good catch here and in `FetchSession`. Do you think we should consider doing these fixes separately so that we can get them into 2.7? Otherwise it might be difficult to tie this behavior to the 2.7 IBP. ## File path: