dajac commented on a change in pull request #11221: URL: https://github.com/apache/kafka/pull/11221#discussion_r690383569
########## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ########## @@ -262,22 +263,29 @@ abstract class AbstractFetcherThread(name: String, val partitionsWithError = mutable.HashSet.empty[TopicPartition] fetchedEpochs.forKeyValue { (tp, leaderEpochOffset) => - Errors.forCode(leaderEpochOffset.errorCode) match { - case Errors.NONE => - val offsetTruncationState = getOffsetTruncationState(tp, leaderEpochOffset) - info(s"Truncating partition $tp with $offsetTruncationState due to leader epoch and offset $leaderEpochOffset") - if (doTruncate(tp, offsetTruncationState)) - fetchOffsets.put(tp, offsetTruncationState) - - case Errors.FENCED_LEADER_EPOCH => - val currentLeaderEpoch = latestEpochsForPartitions.get(tp) - .map(epochEndOffset => Int.box(epochEndOffset.currentLeaderEpoch)).asJava - if (onPartitionFenced(tp, currentLeaderEpoch)) + if (partitionStates.contains(tp)) { + Errors.forCode(leaderEpochOffset.errorCode) match { + case Errors.NONE => + val offsetTruncationState = getOffsetTruncationState(tp, leaderEpochOffset) + info(s"Truncating partition $tp with $offsetTruncationState due to leader epoch and offset $leaderEpochOffset") + if (doTruncate(tp, offsetTruncationState)) + fetchOffsets.put(tp, offsetTruncationState) + + case Errors.FENCED_LEADER_EPOCH => + val currentLeaderEpoch = latestEpochsForPartitions.get(tp) + .map(epochEndOffset => Int.box(epochEndOffset.currentLeaderEpoch)).asJava + if (onPartitionFenced(tp, currentLeaderEpoch)) + partitionsWithError += tp + + case error => + info(s"Retrying leaderEpoch request for partition $tp as the leader reported an error: $error") partitionsWithError += tp - - case error => - info(s"Retrying leaderEpoch request for partition $tp as the leader reported an error: $error") - partitionsWithError += tp + } + } else { + // Partitions may have been removed from the fetcher while the thread was waiting for fetch + // response. Removed partitions are filtered out while holding `partitionMapLock` to ensure that we + // don't update state for any partition that may have already been migrated to another thread. + trace(s"Ignoring epoch offsets for partition '$tp' since it has been removed from this fetcher thread.") Review comment: nit: I would remove the simple quotes around `tp` to remain consistent with the other logs above (or the other way around). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org