dajac commented on a change in pull request #11221:
URL: https://github.com/apache/kafka/pull/11221#discussion_r690383569



##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -262,22 +263,29 @@ abstract class AbstractFetcherThread(name: String,
     val partitionsWithError = mutable.HashSet.empty[TopicPartition]
 
     fetchedEpochs.forKeyValue { (tp, leaderEpochOffset) =>
-      Errors.forCode(leaderEpochOffset.errorCode) match {
-        case Errors.NONE =>
-          val offsetTruncationState = getOffsetTruncationState(tp, 
leaderEpochOffset)
-          info(s"Truncating partition $tp with $offsetTruncationState due to 
leader epoch and offset $leaderEpochOffset")
-          if (doTruncate(tp, offsetTruncationState))
-            fetchOffsets.put(tp, offsetTruncationState)
-
-        case Errors.FENCED_LEADER_EPOCH =>
-          val currentLeaderEpoch = latestEpochsForPartitions.get(tp)
-            .map(epochEndOffset => 
Int.box(epochEndOffset.currentLeaderEpoch)).asJava
-          if (onPartitionFenced(tp, currentLeaderEpoch))
+      if (partitionStates.contains(tp)) {
+        Errors.forCode(leaderEpochOffset.errorCode) match {
+          case Errors.NONE =>
+            val offsetTruncationState = getOffsetTruncationState(tp, 
leaderEpochOffset)
+            info(s"Truncating partition $tp with $offsetTruncationState due to 
leader epoch and offset $leaderEpochOffset")
+            if (doTruncate(tp, offsetTruncationState))
+              fetchOffsets.put(tp, offsetTruncationState)
+
+          case Errors.FENCED_LEADER_EPOCH =>
+            val currentLeaderEpoch = latestEpochsForPartitions.get(tp)
+              .map(epochEndOffset => 
Int.box(epochEndOffset.currentLeaderEpoch)).asJava
+            if (onPartitionFenced(tp, currentLeaderEpoch))
+              partitionsWithError += tp
+
+          case error =>
+            info(s"Retrying leaderEpoch request for partition $tp as the 
leader reported an error: $error")
             partitionsWithError += tp
-
-        case error =>
-          info(s"Retrying leaderEpoch request for partition $tp as the leader 
reported an error: $error")
-          partitionsWithError += tp
+        }
+      } else {
+        // Partitions may have been removed from the fetcher while the thread 
was waiting for fetch
+        // response. Removed partitions are filtered out while holding 
`partitionMapLock` to ensure that we
+        // don't update state for any partition that may have already been 
migrated to another thread.
+        trace(s"Ignoring epoch offsets for partition '$tp' since it has been 
removed from this fetcher thread.")

Review comment:
       nit: I would remove the simple quotes around `tp` to remain consistent 
with the other logs above (or the other way around).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to