cmccabe commented on a change in pull request #10931:
URL: https://github.com/apache/kafka/pull/10931#discussion_r663275737
##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -2032,4 +2001,207 @@ class ReplicaManager(val config: KafkaConfig,
}
}
+ private[kafka] def getOrCreatePartition(tp: TopicPartition,
+ delta: TopicsDelta,
+ topicId: Uuid): Option[(Partition,
Boolean)] = {
+ getPartition(tp) match {
+ case HostedPartition.Offline =>
+ stateChangeLogger.warn(s"Unable to bring up new local leader ${tp} " +
+ s"with topic id ${topicId} because it resides in an offline log " +
+ "directory.")
+ None
+
+ case HostedPartition.Online(partition) => Some(partition, false)
+
+ case HostedPartition.None =>
+ if (delta.image().topicsById().containsKey(topicId)) {
+ stateChangeLogger.error(s"Expected partition ${tp} with topic id " +
+ s"${topicId} to exist, but it was missing. Creating...")
+ } else {
+ stateChangeLogger.info(s"Creating new partition ${tp} with topic id
" +
+ s"${topicId}.")
+ }
+ // it's a partition that we don't know about yet, so create it and
mark it online
+ val partition = Partition(tp, time, this)
+ allPartitions.put(tp, HostedPartition.Online(partition))
+ Some(partition, true)
+ }
+ }
+
+ private[kafka] def calculateDeltaChanges(delta: TopicsDelta)
+ : (mutable.HashMap[TopicPartition, Boolean],
+ mutable.HashMap[TopicPartition, LocalLeaderInfo],
+ mutable.HashMap[TopicPartition, LocalLeaderInfo]) = {
+ val deleted = new mutable.HashMap[TopicPartition, Boolean]()
+ delta.deletedTopicIds().forEach { topicId =>
+ val topicImage = delta.image().getTopic(topicId)
+ topicImage.partitions().keySet().forEach { partitionId =>
+ deleted.put(new TopicPartition(topicImage.name(), partitionId), true)
+ }
+ }
+ val newLocalLeaders = new mutable.HashMap[TopicPartition,
LocalLeaderInfo]()
+ val newLocalFollowers = new mutable.HashMap[TopicPartition,
LocalLeaderInfo]()
+ delta.changedTopics().values().forEach { topicDelta =>
+ topicDelta.newLocalLeaders(config.nodeId).forEach { e =>
+ newLocalLeaders.put(new TopicPartition(topicDelta.name(), e.getKey),
+ LocalLeaderInfo(topicDelta.id(), e.getValue))
+ }
+ topicDelta.newLocalFollowers(config.nodeId).forEach { e =>
+ newLocalFollowers.put(new TopicPartition(topicDelta.name(), e.getKey),
+ LocalLeaderInfo(topicDelta.id(), e.getValue))
+ }
+ }
+ (deleted, newLocalLeaders, newLocalFollowers)
+ }
+
+ /**
+ * Apply a KRaft topic change delta.
+ *
+ * @param newImage The new metadata image.
+ * @param delta The delta to apply.
+ */
+ def applyDelta(newImage: MetadataImage, delta: TopicsDelta): Unit = {
+ // Before taking the lock, build some hash maps that we will need.
+ val (deleted, newLocalLeaders, newLocalFollowers) =
calculateDeltaChanges(delta)
+
+ replicaStateChangeLock.synchronized {
+ // Handle deleted partitions. We need to do this first because we might
subsequently
+ // create new partitions with the same names as the ones we are deleting
here.
+ if (!deleted.isEmpty) {
+ stateChangeLogger.info(s"Deleting ${deleted.size} partition(s).")
+ stopPartitions(deleted).foreach { case (topicPartition, e) =>
+ if (e.isInstanceOf[KafkaStorageException]) {
+ stateChangeLogger.error(s"Unable to delete replica
${topicPartition} because " +
+ "the local replica for the partition is in an offline log
directory")
+ } else {
+ stateChangeLogger.error(s"Unable to delete replica
${topicPartition} because " +
+ s"we got an unexpected ${e.getClass.getName} exception:
${e.getMessage}")
+ }
+ }
+ }
+ // Handle partitions which we are now the leader or follower for.
+ if (!newLocalLeaders.isEmpty || !newLocalFollowers.isEmpty) {
+ val lazyOffsetCheckpoints = new
LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
+ val changedPartitions = new mutable.HashSet[Partition]
+ if (!newLocalLeaders.isEmpty) {
+ applyLocalLeadersDelta(changedPartitions, delta,
lazyOffsetCheckpoints, newLocalLeaders)
+ }
+ if (!newLocalFollowers.isEmpty) {
+ applyLocalFollowersDelta(changedPartitions, newImage, delta,
lazyOffsetCheckpoints, newLocalFollowers)
+ }
+ maybeAddLogDirFetchers(changedPartitions, lazyOffsetCheckpoints,
+ name => Option(newImage.topics().getTopic(name)).map(_.id()))
+
+ def markPartitionOfflineIfNeeded(tp: TopicPartition): Unit = {
+ /*
+ * If there is offline log directory, a Partition object may have
been created by getOrCreatePartition()
+ * before getOrCreateReplica() failed to create local replica due to
KafkaStorageException.
+ * In this case ReplicaManager.allPartitions will map this
topic-partition to an empty Partition object.
+ * we need to map this topic-partition to OfflinePartition instead.
+ */
+ if (localLog(tp).isEmpty)
+ markPartitionOffline(tp)
+ }
+ newLocalLeaders.keySet.foreach(markPartitionOfflineIfNeeded(_))
+ newLocalFollowers.keySet.foreach(markPartitionOfflineIfNeeded(_))
+
+ replicaFetcherManager.shutdownIdleFetcherThreads()
+ replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
+ }
+ }
+ }
+
+ private def applyLocalLeadersDelta(changedPartitions:
mutable.HashSet[Partition],
+ delta: TopicsDelta,
+ offsetCheckpoints: OffsetCheckpoints,
+ newLocalLeaders:
mutable.HashMap[TopicPartition, LocalLeaderInfo]): Unit = {
+ stateChangeLogger.info(s"Transitioning ${newLocalLeaders.size}
partition(s) to " +
+ "local leaders.")
+ replicaFetcherManager.removeFetcherForPartitions(newLocalLeaders.keySet)
+ newLocalLeaders.forKeyValue { case (tp, info) =>
+ getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition,
isNew) =>
+ try {
+ val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew)
+ if (!partition.makeLeader(state, offsetCheckpoints,
Some(info.topicId))) {
+ stateChangeLogger.info("Skipped the become-leader state change for
" +
+ s"${tp} with topic id ${info.topicId} because this partition is
" +
+ "already a local leader.")
+ }
+ changedPartitions.add(partition)
+ } catch {
+ case e: KafkaStorageException =>
+ stateChangeLogger.info(s"Skipped the become-leader state change
for ${tp} " +
+ s"with topic id ${info.topicId} due to disk error ${e}")
+ val dirOpt = getLogDir(tp)
+ error(s"Error while making broker the leader for partition ${tp}
in dir " +
+ s"${dirOpt}", e)
+ }
+ }
+ }
+ }
+
+ private def applyLocalFollowersDelta(changedPartitions:
mutable.HashSet[Partition],
+ newImage: MetadataImage,
+ delta: TopicsDelta,
+ offsetCheckpoints: OffsetCheckpoints,
+ newLocalFollowers:
mutable.HashMap[TopicPartition, LocalLeaderInfo]): Unit = {
+ stateChangeLogger.info(s"Transitioning ${newLocalFollowers.size}
partition(s) to " +
+ "local followers.")
+ replicaFetcherManager.removeFetcherForPartitions(newLocalFollowers.keySet)
+ val shuttingDown = isShuttingDown.get()
+ val partitionsToMakeFollower = new mutable.HashMap[TopicPartition,
InitialFetchState]
+ val newFollowerTopicSet = new mutable.HashSet[String]
+ newLocalFollowers.forKeyValue { case (tp, info) =>
+ getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition,
isNew) =>
+ try {
+ newFollowerTopicSet.add(tp.topic())
+
+ completeDelayedFetchOrProduceRequests(tp)
+
+ // Create the local replica even if the leader is unavailable. This
is required
+ // to ensure that we include the partition's high watermark in the
checkpoint
+ // file (see KAFKA-1647)
+ partition.createLogIfNotExists(isNew, false, offsetCheckpoints,
Some(info.topicId))
+
+ if (shuttingDown) {
+ stateChangeLogger.trace(s"Unable to start fetching ${tp} with
topic " +
+ s"ID ${info.topicId} because the replica manager is shutting
down.")
+ } else {
+ val listenerName = config.interBrokerListenerName.value()
+ val leader = info.partition.leader
+
Option(newImage.cluster().broker(leader)).flatMap(_.node(listenerName).asScala)
match {
+ case None => stateChangeLogger.trace(s"Unable to start fetching
${tp} " +
+ s"with topic ID ${info.topicId} from leader ${leader} because
it is not " +
+ "alive.")
+ case Some(node) =>
+ val leaderEndPoint = new BrokerEndPoint(node.id(),
node.host(), node.port())
+ val log = partition.localLogOrException
+ val fetchOffset = initialFetchOffset(log)
+ partitionsToMakeFollower.put(tp,
+ InitialFetchState(leaderEndPoint, partition.getLeaderEpoch,
fetchOffset))
+ }
+ }
+ changedPartitions.add(partition)
+ } catch {
+ case e: Throwable => stateChangeLogger.error(s"Unable to start
fetching ${tp} " +
Review comment:
Seems reasonable. The JavaDoc does say the partitions that had
"unexpected errors" should end up there. I'll add it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]