rajinisivaram commented on a change in pull request #11221:
URL: https://github.com/apache/kafka/pull/11221#discussion_r689860826



##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -229,9 +229,16 @@ abstract class AbstractFetcherThread(name: String,
     }
   }
 
-  protected def truncateOnFetchResponse(epochEndOffsets: Map[TopicPartition, 
EpochEndOffset]): Unit = {
+  // Visibility for unit tests
+  protected[server] def truncateOnFetchResponse(epochEndOffsets: 
Map[TopicPartition, EpochEndOffset]): Unit = {
     inLock(partitionMapLock) {
-      val ResultWithPartitions(fetchOffsets, partitionsWithError) = 
maybeTruncateToEpochEndOffsets(epochEndOffsets, Map.empty)
+      // Partitions may have been removed from the fetcher while the thread 
was waiting for fetch
+      // response. Filter out removed partitions while holding 
`partitionMapLock` to ensure that we
+      // don't update state for any partition that may have already been 
migrated to another thread.
+      val filteredEpochEndOffsets = epochEndOffsets.filter { case (tp, _) =>
+        partitionStates.contains(tp)
+      }

Review comment:
       @dajac Thanks for the review, updated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to