hachikuji commented on a change in pull request #11221:
URL: https://github.com/apache/kafka/pull/11221#discussion_r689886267



##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -268,7 +262,10 @@ abstract class AbstractFetcherThread(name: String,
     val fetchOffsets = mutable.HashMap.empty[TopicPartition, 
OffsetTruncationState]
     val partitionsWithError = mutable.HashSet.empty[TopicPartition]
 
-    fetchedEpochs.forKeyValue { (tp, leaderEpochOffset) =>
+    // Partitions may have been removed from the fetcher while the thread was 
waiting for fetch
+    // response. Filter out removed partitions while holding 
`partitionMapLock` to ensure that we
+    // don't update state for any partition that may have already been 
migrated to another thread.
+    fetchedEpochs.filter { case (tp, _) => partitionStates.contains(tp) 
}.forKeyValue { (tp, leaderEpochOffset) =>

Review comment:
       Related to David's comment, but I think the `filter` here still builds a 
collection. Alternatively, we could move the check into `forKeyValue`. Perhaps 
it would even be useful having a `trace` level log message in the `else` case 
when we ignore the result.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to