hachikuji commented on code in PR #12187:
URL: https://github.com/apache/kafka/pull/12187#discussion_r879971283


##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -2141,23 +2141,22 @@ class ReplicaManager(val config: KafkaConfig,
             stateChangeLogger.trace(s"Unable to start fetching $tp with topic 
" +
               s"ID ${info.topicId} because the replica manager is shutting 
down.")
           } else {
-            if (isInControlledShutdown && 
!info.partition.isr.contains(config.brokerId)) {
-              // If we are in controlled shutdown and the replica is not in 
the ISR,
-              // we stop the replica.
+            // We always update the follower state.

Review Comment:
   Good change here. I do like keeping the state up-to-date even if we cannot 
start fetching. Hard to know when a dependence on that state will emerge.



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -2134,20 +2141,22 @@ class ReplicaManager(val config: KafkaConfig,
             stateChangeLogger.trace(s"Unable to start fetching $tp with topic 
" +
               s"ID ${info.topicId} because the replica manager is shutting 
down.")
           } else {
-            val leader = info.partition.leader
-            if (newImage.cluster.broker(leader) == null) {
-              stateChangeLogger.trace(s"Unable to start fetching $tp with 
topic ID ${info.topicId} " +
-                s"from leader $leader because it is not alive.")
-
-              // Create the local replica even if the leader is unavailable. 
This is required
-              // to ensure that we include the partition's high watermark in 
the checkpoint
-              // file (see KAFKA-1647).
-              partition.createLogIfNotExists(isNew, false, offsetCheckpoints, 
Some(info.topicId))
-            } else {
-              val state = info.partition.toLeaderAndIsrPartitionState(tp, 
isNew)
-              if (partition.makeFollower(state, offsetCheckpoints, 
Some(info.topicId))) {
-                partitionsToMakeFollower.put(tp, partition)
-              }
+            // We always update the follower state.
+            // - This ensure that a replica with no leader can step down;
+            // - This also ensures that the local replica is created even if 
the leader
+            //   is unavailable. This is required to ensure that we include 
the partition's
+            //   high watermark in the checkpoint file (see KAFKA-1647).
+            val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew)
+            val isNewLeaderEpoch = partition.makeFollower(state, 
offsetCheckpoints, Some(info.topicId))
+
+            if (isInControlledShutdown && (info.partition.leader == NO_LEADER 
||
+                !info.partition.isr.contains(config.brokerId))) {
+              // During controlled shutdown, replica with no leaders and 
replica
+              // where this broker is not in the ISR are stopped.
+              partitionsToStop.put(tp, false)
+            } else if (isNewLeaderEpoch) {
+              // Otherwise, fetcher is restarted if the leader epoch has 
changed.
+              partitionsToStart.put(tp, partition)
             }
           }
           changedPartitions.add(partition)

Review Comment:
   We're maintaining this collection for the start of the log dir fetchers. 
Similar to the replica fetchers, I guess we only want to update log dir 
fetchers when there is an epoch bump (leader or follower). Since we're not yet 
supporting JBOD anyway in KRaft, it seems tempting to get rid of this logic.



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -706,7 +706,7 @@ class Partition(val topicPartition: TopicPartition,
           s"and partition state $partitionState since it is already a follower 
with leader epoch $leaderEpoch.")
       }
 
-      leaderReplicaIdOpt = Some(partitionState.leader)
+      leaderReplicaIdOpt = Option(partitionState.leader)

Review Comment:
   Is it worth adding the current leader to the "Follower $topicPartition 
starts at ..."  log message above?



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -2170,33 +2179,47 @@ class ReplicaManager(val config: KafkaConfig,
       }
     }
 
-    // Stopping the fetchers must be done first in order to initialize the 
fetch
-    // position correctly.
-    
replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.keySet)
-    stateChangeLogger.info(s"Stopped fetchers as part of become-follower for 
${partitionsToMakeFollower.size} partitions")
-
-    val listenerName = config.interBrokerListenerName.value
-    val partitionAndOffsets = new mutable.HashMap[TopicPartition, 
InitialFetchState]
-    partitionsToMakeFollower.forKeyValue { (topicPartition, partition) =>
-      val node = partition.leaderReplicaIdOpt
-        .flatMap(leaderId => Option(newImage.cluster.broker(leaderId)))
-        .flatMap(_.node(listenerName).asScala)
-        .getOrElse(Node.noNode)
-      val log = partition.localLogOrException
-      partitionAndOffsets.put(topicPartition, InitialFetchState(
-        log.topicId,
-        new BrokerEndPoint(node.id, node.host, node.port),
-        partition.getLeaderEpoch,
-        initialFetchOffset(log)
-      ))
-    }
+    if (partitionsToStart.nonEmpty) {
+      // Stopping the fetchers must be done first in order to initialize the 
fetch
+      // position correctly.
+      
replicaFetcherManager.removeFetcherForPartitions(partitionsToStart.keySet)
+      stateChangeLogger.info(s"Stopped fetchers as part of become-follower for 
${partitionsToStart.size} partitions")
+
+      val listenerName = config.interBrokerListenerName.value
+      val partitionAndOffsets = new mutable.HashMap[TopicPartition, 
InitialFetchState]
+
+      partitionsToStart.forKeyValue { (topicPartition, partition) =>
+        val nodeOpt = partition.leaderReplicaIdOpt
+          .flatMap(leaderId => Option(newImage.cluster.broker(leaderId)))
+          .flatMap(_.node(listenerName).asScala)
+
+        nodeOpt match {
+          case Some(node) =>
+            val log = partition.localLogOrException
+            partitionAndOffsets.put(topicPartition, InitialFetchState(
+              log.topicId,
+              new BrokerEndPoint(node.id, node.host, node.port),
+              partition.getLeaderEpoch,
+              initialFetchOffset(log)
+            ))
+          case None =>
+            stateChangeLogger.trace(s"Unable to start fetching $topicPartition 
with topic ID ${partition.topicId} " +
+              s"from leader ${partition.leaderReplicaIdOpt} because it is not 
alive.")
+        }
+      }
 
-    replicaFetcherManager.addFetcherForPartitions(partitionAndOffsets)
-    stateChangeLogger.info(s"Started fetchers as part of become-follower for 
${partitionsToMakeFollower.size} partitions")
+      replicaFetcherManager.addFetcherForPartitions(partitionAndOffsets)
+      stateChangeLogger.info(s"Started fetchers as part of become-follower for 
${partitionsToStart.size} partitions")
 
-    
partitionsToMakeFollower.keySet.foreach(completeDelayedFetchOrProduceRequests)
+      partitionsToStart.keySet.foreach(completeDelayedFetchOrProduceRequests)

Review Comment:
   Borderline too verbose perhaps, but how about `partitionsToStartFetching`?



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -2123,7 +2129,8 @@ class ReplicaManager(val config: KafkaConfig,
     stateChangeLogger.info(s"Transitioning ${newLocalFollowers.size} 
partition(s) to " +
       "local followers.")
     val shuttingDown = isShuttingDown.get()
-    val partitionsToMakeFollower = new mutable.HashMap[TopicPartition, 
Partition]
+    val partitionsToStart = new mutable.HashMap[TopicPartition, Partition]
+    val partitionsToStop = new mutable.HashMap[TopicPartition, Boolean]
     val newFollowerTopicSet = new mutable.HashSet[String]

Review Comment:
   Not from this PR, but I find the naming of `newLocalFollowers` in the 
parameter list confusing given the presence of the `isNew` flag. I guess it's 
really representing new or updated followers. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to