cmccabe commented on a change in pull request #10931:
URL: https://github.com/apache/kafka/pull/10931#discussion_r663275737



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -2032,4 +2001,207 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
+  private[kafka] def getOrCreatePartition(tp: TopicPartition,
+                                          delta: TopicsDelta,
+                                          topicId: Uuid): Option[(Partition, 
Boolean)] = {
+    getPartition(tp) match {
+      case HostedPartition.Offline =>
+        stateChangeLogger.warn(s"Unable to bring up new local leader ${tp} " +
+          s"with topic id ${topicId} because it resides in an offline log " +
+          "directory.")
+        None
+
+      case HostedPartition.Online(partition) => Some(partition, false)
+
+      case HostedPartition.None =>
+        if (delta.image().topicsById().containsKey(topicId)) {
+          stateChangeLogger.error(s"Expected partition ${tp} with topic id " +
+            s"${topicId} to exist, but it was missing. Creating...")
+        } else {
+          stateChangeLogger.info(s"Creating new partition ${tp} with topic id 
" +
+            s"${topicId}.")
+        }
+        // it's a partition that we don't know about yet, so create it and 
mark it online
+        val partition = Partition(tp, time, this)
+        allPartitions.put(tp, HostedPartition.Online(partition))
+        Some(partition, true)
+    }
+  }
+
+  private[kafka] def calculateDeltaChanges(delta: TopicsDelta)
+    : (mutable.HashMap[TopicPartition, Boolean],
+       mutable.HashMap[TopicPartition, LocalLeaderInfo],
+       mutable.HashMap[TopicPartition, LocalLeaderInfo]) = {
+    val deleted = new mutable.HashMap[TopicPartition, Boolean]()
+    delta.deletedTopicIds().forEach { topicId =>
+      val topicImage = delta.image().getTopic(topicId)
+      topicImage.partitions().keySet().forEach { partitionId =>
+        deleted.put(new TopicPartition(topicImage.name(), partitionId), true)
+      }
+    }
+    val newLocalLeaders = new mutable.HashMap[TopicPartition, 
LocalLeaderInfo]()
+    val newLocalFollowers = new mutable.HashMap[TopicPartition, 
LocalLeaderInfo]()
+    delta.changedTopics().values().forEach { topicDelta =>
+      topicDelta.newLocalLeaders(config.nodeId).forEach { e =>
+        newLocalLeaders.put(new TopicPartition(topicDelta.name(), e.getKey),
+          LocalLeaderInfo(topicDelta.id(), e.getValue))
+      }
+      topicDelta.newLocalFollowers(config.nodeId).forEach { e =>
+        newLocalFollowers.put(new TopicPartition(topicDelta.name(), e.getKey),
+          LocalLeaderInfo(topicDelta.id(), e.getValue))
+      }
+    }
+    (deleted, newLocalLeaders, newLocalFollowers)
+  }
+
+  /**
+   * Apply a KRaft topic change delta.
+   *
+   * @param newImage        The new metadata image.
+   * @param delta           The delta to apply.
+   */
+  def applyDelta(newImage: MetadataImage, delta: TopicsDelta): Unit = {
+    // Before taking the lock, build some hash maps that we will need.
+    val (deleted, newLocalLeaders, newLocalFollowers) = 
calculateDeltaChanges(delta)
+
+    replicaStateChangeLock.synchronized {
+      // Handle deleted partitions. We need to do this first because we might 
subsequently
+      // create new partitions with the same names as the ones we are deleting 
here.
+      if (!deleted.isEmpty) {
+        stateChangeLogger.info(s"Deleting ${deleted.size} partition(s).")
+        stopPartitions(deleted).foreach { case (topicPartition, e) =>
+          if (e.isInstanceOf[KafkaStorageException]) {
+            stateChangeLogger.error(s"Unable to delete replica 
${topicPartition} because " +
+              "the local replica for the partition is in an offline log 
directory")
+          } else {
+            stateChangeLogger.error(s"Unable to delete replica 
${topicPartition} because " +
+              s"we got an unexpected ${e.getClass.getName} exception: 
${e.getMessage}")
+          }
+        }
+      }
+      // Handle partitions which we are now the leader or follower for.
+      if (!newLocalLeaders.isEmpty || !newLocalFollowers.isEmpty) {
+        val lazyOffsetCheckpoints = new 
LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
+        val changedPartitions = new mutable.HashSet[Partition]
+        if (!newLocalLeaders.isEmpty) {
+          applyLocalLeadersDelta(changedPartitions, delta, 
lazyOffsetCheckpoints, newLocalLeaders)
+        }
+        if (!newLocalFollowers.isEmpty) {
+          applyLocalFollowersDelta(changedPartitions, newImage, delta, 
lazyOffsetCheckpoints, newLocalFollowers)
+        }
+        maybeAddLogDirFetchers(changedPartitions, lazyOffsetCheckpoints,
+          name => Option(newImage.topics().getTopic(name)).map(_.id()))
+
+        def markPartitionOfflineIfNeeded(tp: TopicPartition): Unit = {
+          /*
+           * If there is offline log directory, a Partition object may have 
been created by getOrCreatePartition()
+           * before getOrCreateReplica() failed to create local replica due to 
KafkaStorageException.
+           * In this case ReplicaManager.allPartitions will map this 
topic-partition to an empty Partition object.
+           * we need to map this topic-partition to OfflinePartition instead.
+           */
+          if (localLog(tp).isEmpty)
+            markPartitionOffline(tp)
+        }
+        newLocalLeaders.keySet.foreach(markPartitionOfflineIfNeeded(_))
+        newLocalFollowers.keySet.foreach(markPartitionOfflineIfNeeded(_))
+
+        replicaFetcherManager.shutdownIdleFetcherThreads()
+        replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
+      }
+    }
+  }
+
+  private def applyLocalLeadersDelta(changedPartitions: 
mutable.HashSet[Partition],
+                                     delta: TopicsDelta,
+                                     offsetCheckpoints: OffsetCheckpoints,
+                                     newLocalLeaders: 
mutable.HashMap[TopicPartition, LocalLeaderInfo]): Unit = {
+    stateChangeLogger.info(s"Transitioning ${newLocalLeaders.size} 
partition(s) to " +
+      "local leaders.")
+    replicaFetcherManager.removeFetcherForPartitions(newLocalLeaders.keySet)
+    newLocalLeaders.forKeyValue { case (tp, info) =>
+      getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, 
isNew) =>
+        try {
+          val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew)
+          if (!partition.makeLeader(state, offsetCheckpoints, 
Some(info.topicId))) {
+            stateChangeLogger.info("Skipped the become-leader state change for 
" +
+              s"${tp} with topic id ${info.topicId} because this partition is 
" +
+              "already a local leader.")
+          }
+          changedPartitions.add(partition)
+        } catch {
+          case e: KafkaStorageException =>
+            stateChangeLogger.info(s"Skipped the become-leader state change 
for ${tp} " +
+              s"with topic id ${info.topicId} due to disk error ${e}")
+            val dirOpt = getLogDir(tp)
+            error(s"Error while making broker the leader for partition ${tp} 
in dir " +
+              s"${dirOpt}", e)
+        }
+      }
+    }
+  }
+
+  private def applyLocalFollowersDelta(changedPartitions: 
mutable.HashSet[Partition],
+                                       newImage: MetadataImage,
+                                       delta: TopicsDelta,
+                                       offsetCheckpoints: OffsetCheckpoints,
+                                       newLocalFollowers: 
mutable.HashMap[TopicPartition, LocalLeaderInfo]): Unit = {
+    stateChangeLogger.info(s"Transitioning ${newLocalFollowers.size} 
partition(s) to " +
+      "local followers.")
+    replicaFetcherManager.removeFetcherForPartitions(newLocalFollowers.keySet)
+    val shuttingDown = isShuttingDown.get()
+    val partitionsToMakeFollower = new mutable.HashMap[TopicPartition, 
InitialFetchState]
+    val newFollowerTopicSet = new mutable.HashSet[String]
+    newLocalFollowers.forKeyValue { case (tp, info) =>
+      getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, 
isNew) =>
+        try {
+          newFollowerTopicSet.add(tp.topic())
+
+          completeDelayedFetchOrProduceRequests(tp)
+
+          // Create the local replica even if the leader is unavailable. This 
is required
+          // to ensure that we include the partition's high watermark in the 
checkpoint
+          // file (see KAFKA-1647)
+          partition.createLogIfNotExists(isNew, false, offsetCheckpoints, 
Some(info.topicId))
+
+          if (shuttingDown) {
+            stateChangeLogger.trace(s"Unable to start fetching ${tp} with 
topic " +
+              s"ID ${info.topicId} because the replica manager is shutting 
down.")
+          } else {
+            val listenerName = config.interBrokerListenerName.value()
+            val leader = info.partition.leader
+            
Option(newImage.cluster().broker(leader)).flatMap(_.node(listenerName).asScala) 
match {
+              case None => stateChangeLogger.trace(s"Unable to start fetching 
${tp} " +
+                s"with topic ID ${info.topicId} from leader ${leader} because 
it is not " +
+                "alive.")
+              case Some(node) =>
+                val leaderEndPoint = new BrokerEndPoint(node.id(), 
node.host(), node.port())
+                val log = partition.localLogOrException
+                val fetchOffset = initialFetchOffset(log)
+                partitionsToMakeFollower.put(tp,
+                  InitialFetchState(leaderEndPoint, partition.getLeaderEpoch, 
fetchOffset))
+            }
+          }
+          changedPartitions.add(partition)
+        } catch {
+          case e: Throwable => stateChangeLogger.error(s"Unable to start 
fetching ${tp} " +

Review comment:
       Seems reasonable. It does say the partitions that had "unexpected 
errors" should end up there.
   
   I'll add it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to