dajac commented on code in PR #12187:
URL: https://github.com/apache/kafka/pull/12187#discussion_r880181278


##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -2170,33 +2179,47 @@ class ReplicaManager(val config: KafkaConfig,
       }
     }
 
-    // Stopping the fetchers must be done first in order to initialize the 
fetch
-    // position correctly.
-    
replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.keySet)
-    stateChangeLogger.info(s"Stopped fetchers as part of become-follower for 
${partitionsToMakeFollower.size} partitions")
-
-    val listenerName = config.interBrokerListenerName.value
-    val partitionAndOffsets = new mutable.HashMap[TopicPartition, 
InitialFetchState]
-    partitionsToMakeFollower.forKeyValue { (topicPartition, partition) =>
-      val node = partition.leaderReplicaIdOpt
-        .flatMap(leaderId => Option(newImage.cluster.broker(leaderId)))
-        .flatMap(_.node(listenerName).asScala)
-        .getOrElse(Node.noNode)
-      val log = partition.localLogOrException
-      partitionAndOffsets.put(topicPartition, InitialFetchState(
-        log.topicId,
-        new BrokerEndPoint(node.id, node.host, node.port),
-        partition.getLeaderEpoch,
-        initialFetchOffset(log)
-      ))
-    }
+    if (partitionsToStart.nonEmpty) {
+      // Stopping the fetchers must be done first in order to initialize the 
fetch
+      // position correctly.
+      
replicaFetcherManager.removeFetcherForPartitions(partitionsToStart.keySet)
+      stateChangeLogger.info(s"Stopped fetchers as part of become-follower for 
${partitionsToStart.size} partitions")
+
+      val listenerName = config.interBrokerListenerName.value
+      val partitionAndOffsets = new mutable.HashMap[TopicPartition, 
InitialFetchState]
+
+      partitionsToStart.forKeyValue { (topicPartition, partition) =>
+        val nodeOpt = partition.leaderReplicaIdOpt
+          .flatMap(leaderId => Option(newImage.cluster.broker(leaderId)))
+          .flatMap(_.node(listenerName).asScala)
+
+        nodeOpt match {
+          case Some(node) =>
+            val log = partition.localLogOrException
+            partitionAndOffsets.put(topicPartition, InitialFetchState(
+              log.topicId,
+              new BrokerEndPoint(node.id, node.host, node.port),
+              partition.getLeaderEpoch,
+              initialFetchOffset(log)
+            ))
+          case None =>
+            stateChangeLogger.trace(s"Unable to start fetching $topicPartition 
with topic ID ${partition.topicId} " +
+              s"from leader ${partition.leaderReplicaIdOpt} because it is not 
alive.")
+        }
+      }
 
-    replicaFetcherManager.addFetcherForPartitions(partitionAndOffsets)
-    stateChangeLogger.info(s"Started fetchers as part of become-follower for 
${partitionsToMakeFollower.size} partitions")
+      replicaFetcherManager.addFetcherForPartitions(partitionAndOffsets)
+      stateChangeLogger.info(s"Started fetchers as part of become-follower for 
${partitionsToStart.size} partitions")
 
-    
partitionsToMakeFollower.keySet.foreach(completeDelayedFetchOrProduceRequests)
+      partitionsToStart.keySet.foreach(completeDelayedFetchOrProduceRequests)

Review Comment:
   That seems like a reasonable ask.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to