dajac commented on code in PR #12187: URL: https://github.com/apache/kafka/pull/12187#discussion_r880181278
########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -2170,33 +2179,47 @@ class ReplicaManager(val config: KafkaConfig, } } - // Stopping the fetchers must be done first in order to initialize the fetch - // position correctly. - replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.keySet) - stateChangeLogger.info(s"Stopped fetchers as part of become-follower for ${partitionsToMakeFollower.size} partitions") - - val listenerName = config.interBrokerListenerName.value - val partitionAndOffsets = new mutable.HashMap[TopicPartition, InitialFetchState] - partitionsToMakeFollower.forKeyValue { (topicPartition, partition) => - val node = partition.leaderReplicaIdOpt - .flatMap(leaderId => Option(newImage.cluster.broker(leaderId))) - .flatMap(_.node(listenerName).asScala) - .getOrElse(Node.noNode) - val log = partition.localLogOrException - partitionAndOffsets.put(topicPartition, InitialFetchState( - log.topicId, - new BrokerEndPoint(node.id, node.host, node.port), - partition.getLeaderEpoch, - initialFetchOffset(log) - )) - } + if (partitionsToStart.nonEmpty) { + // Stopping the fetchers must be done first in order to initialize the fetch + // position correctly. + replicaFetcherManager.removeFetcherForPartitions(partitionsToStart.keySet) + stateChangeLogger.info(s"Stopped fetchers as part of become-follower for ${partitionsToStart.size} partitions") + + val listenerName = config.interBrokerListenerName.value + val partitionAndOffsets = new mutable.HashMap[TopicPartition, InitialFetchState] + + partitionsToStart.forKeyValue { (topicPartition, partition) => + val nodeOpt = partition.leaderReplicaIdOpt + .flatMap(leaderId => Option(newImage.cluster.broker(leaderId))) + .flatMap(_.node(listenerName).asScala) + + nodeOpt match { + case Some(node) => + val log = partition.localLogOrException + partitionAndOffsets.put(topicPartition, InitialFetchState( + log.topicId, + new BrokerEndPoint(node.id, node.host, node.port), + partition.getLeaderEpoch, + initialFetchOffset(log) + )) + case None => + stateChangeLogger.trace(s"Unable to start fetching $topicPartition with topic ID ${partition.topicId} " + + s"from leader ${partition.leaderReplicaIdOpt} because it is not alive.") + } + } - replicaFetcherManager.addFetcherForPartitions(partitionAndOffsets) - stateChangeLogger.info(s"Started fetchers as part of become-follower for ${partitionsToMakeFollower.size} partitions") + replicaFetcherManager.addFetcherForPartitions(partitionAndOffsets) + stateChangeLogger.info(s"Started fetchers as part of become-follower for ${partitionsToStart.size} partitions") - partitionsToMakeFollower.keySet.foreach(completeDelayedFetchOrProduceRequests) + partitionsToStart.keySet.foreach(completeDelayedFetchOrProduceRequests) Review Comment: That seems like a reasonable ask. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org