hachikuji commented on a change in pull request #10931:
URL: https://github.com/apache/kafka/pull/10931#discussion_r663154812



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -2032,4 +2001,207 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
+  private[kafka] def getOrCreatePartition(tp: TopicPartition,
+                                          delta: TopicsDelta,
+                                          topicId: Uuid): Option[(Partition, 
Boolean)] = {
+    getPartition(tp) match {
+      case HostedPartition.Offline =>
+        stateChangeLogger.warn(s"Unable to bring up new local leader ${tp} " +
+          s"with topic id ${topicId} because it resides in an offline log " +
+          "directory.")
+        None
+
+      case HostedPartition.Online(partition) => Some(partition, false)

Review comment:
       Should we validate the topicId matches?

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -2032,4 +2001,207 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
+  private[kafka] def getOrCreatePartition(tp: TopicPartition,
+                                          delta: TopicsDelta,
+                                          topicId: Uuid): Option[(Partition, 
Boolean)] = {
+    getPartition(tp) match {
+      case HostedPartition.Offline =>
+        stateChangeLogger.warn(s"Unable to bring up new local leader ${tp} " +
+          s"with topic id ${topicId} because it resides in an offline log " +
+          "directory.")
+        None
+
+      case HostedPartition.Online(partition) => Some(partition, false)
+
+      case HostedPartition.None =>
+        if (delta.image().topicsById().containsKey(topicId)) {
+          stateChangeLogger.error(s"Expected partition ${tp} with topic id " +
+            s"${topicId} to exist, but it was missing. Creating...")
+        } else {
+          stateChangeLogger.info(s"Creating new partition ${tp} with topic id 
" +
+            s"${topicId}.")
+        }
+        // it's a partition that we don't know about yet, so create it and 
mark it online
+        val partition = Partition(tp, time, this)
+        allPartitions.put(tp, HostedPartition.Online(partition))
+        Some(partition, true)
+    }
+  }
+
+  private[kafka] def calculateDeltaChanges(delta: TopicsDelta)
+    : (mutable.HashMap[TopicPartition, Boolean],
+       mutable.HashMap[TopicPartition, LocalLeaderInfo],
+       mutable.HashMap[TopicPartition, LocalLeaderInfo]) = {
+    val deleted = new mutable.HashMap[TopicPartition, Boolean]()
+    delta.deletedTopicIds().forEach { topicId =>
+      val topicImage = delta.image().getTopic(topicId)
+      topicImage.partitions().keySet().forEach { partitionId =>
+        deleted.put(new TopicPartition(topicImage.name(), partitionId), true)
+      }
+    }
+    val newLocalLeaders = new mutable.HashMap[TopicPartition, 
LocalLeaderInfo]()
+    val newLocalFollowers = new mutable.HashMap[TopicPartition, 
LocalLeaderInfo]()
+    delta.changedTopics().values().forEach { topicDelta =>
+      topicDelta.newLocalLeaders(config.nodeId).forEach { e =>
+        newLocalLeaders.put(new TopicPartition(topicDelta.name(), e.getKey),
+          LocalLeaderInfo(topicDelta.id(), e.getValue))
+      }
+      topicDelta.newLocalFollowers(config.nodeId).forEach { e =>
+        newLocalFollowers.put(new TopicPartition(topicDelta.name(), e.getKey),
+          LocalLeaderInfo(topicDelta.id(), e.getValue))
+      }
+    }
+    (deleted, newLocalLeaders, newLocalFollowers)
+  }
+
+  /**
+   * Apply a KRaft topic change delta.
+   *
+   * @param newImage        The new metadata image.
+   * @param delta           The delta to apply.
+   */
+  def applyDelta(newImage: MetadataImage, delta: TopicsDelta): Unit = {
+    // Before taking the lock, build some hash maps that we will need.
+    val (deleted, newLocalLeaders, newLocalFollowers) = 
calculateDeltaChanges(delta)
+
+    replicaStateChangeLock.synchronized {
+      // Handle deleted partitions. We need to do this first because we might 
subsequently
+      // create new partitions with the same names as the ones we are deleting 
here.
+      if (!deleted.isEmpty) {
+        stateChangeLogger.info(s"Deleting ${deleted.size} partition(s).")
+        stopPartitions(deleted).foreach { case (topicPartition, e) =>
+          if (e.isInstanceOf[KafkaStorageException]) {
+            stateChangeLogger.error(s"Unable to delete replica 
${topicPartition} because " +
+              "the local replica for the partition is in an offline log 
directory")
+          } else {
+            stateChangeLogger.error(s"Unable to delete replica 
${topicPartition} because " +
+              s"we got an unexpected ${e.getClass.getName} exception: 
${e.getMessage}")
+          }
+        }
+      }
+      // Handle partitions which we are now the leader or follower for.
+      if (!newLocalLeaders.isEmpty || !newLocalFollowers.isEmpty) {
+        val lazyOffsetCheckpoints = new 
LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
+        val changedPartitions = new mutable.HashSet[Partition]
+        if (!newLocalLeaders.isEmpty) {
+          applyLocalLeadersDelta(changedPartitions, delta, 
lazyOffsetCheckpoints, newLocalLeaders)
+        }
+        if (!newLocalFollowers.isEmpty) {
+          applyLocalFollowersDelta(changedPartitions, newImage, delta, 
lazyOffsetCheckpoints, newLocalFollowers)
+        }
+        maybeAddLogDirFetchers(changedPartitions, lazyOffsetCheckpoints,
+          name => Option(newImage.topics().getTopic(name)).map(_.id()))
+
+        def markPartitionOfflineIfNeeded(tp: TopicPartition): Unit = {
+          /*
+           * If there is offline log directory, a Partition object may have 
been created by getOrCreatePartition()
+           * before getOrCreateReplica() failed to create local replica due to 
KafkaStorageException.
+           * In this case ReplicaManager.allPartitions will map this 
topic-partition to an empty Partition object.
+           * we need to map this topic-partition to OfflinePartition instead.
+           */
+          if (localLog(tp).isEmpty)
+            markPartitionOffline(tp)
+        }
+        newLocalLeaders.keySet.foreach(markPartitionOfflineIfNeeded(_))
+        newLocalFollowers.keySet.foreach(markPartitionOfflineIfNeeded(_))
+
+        replicaFetcherManager.shutdownIdleFetcherThreads()
+        replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
+      }
+    }
+  }
+
+  private def applyLocalLeadersDelta(changedPartitions: 
mutable.HashSet[Partition],
+                                     delta: TopicsDelta,
+                                     offsetCheckpoints: OffsetCheckpoints,
+                                     newLocalLeaders: 
mutable.HashMap[TopicPartition, LocalLeaderInfo]): Unit = {
+    stateChangeLogger.info(s"Transitioning ${newLocalLeaders.size} 
partition(s) to " +
+      "local leaders.")
+    replicaFetcherManager.removeFetcherForPartitions(newLocalLeaders.keySet)
+    newLocalLeaders.forKeyValue { case (tp, info) =>
+      getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, 
isNew) =>
+        try {
+          val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew)
+          if (!partition.makeLeader(state, offsetCheckpoints, 
Some(info.topicId))) {
+            stateChangeLogger.info("Skipped the become-leader state change for 
" +
+              s"${tp} with topic id ${info.topicId} because this partition is 
" +
+              "already a local leader.")
+          }
+          changedPartitions.add(partition)
+        } catch {
+          case e: KafkaStorageException =>
+            stateChangeLogger.info(s"Skipped the become-leader state change 
for ${tp} " +
+              s"with topic id ${info.topicId} due to disk error ${e}")
+            val dirOpt = getLogDir(tp)
+            error(s"Error while making broker the leader for partition ${tp} 
in dir " +
+              s"${dirOpt}", e)
+        }
+      }
+    }
+  }
+
+  private def applyLocalFollowersDelta(changedPartitions: 
mutable.HashSet[Partition],
+                                       newImage: MetadataImage,
+                                       delta: TopicsDelta,
+                                       offsetCheckpoints: OffsetCheckpoints,
+                                       newLocalFollowers: 
mutable.HashMap[TopicPartition, LocalLeaderInfo]): Unit = {
+    stateChangeLogger.info(s"Transitioning ${newLocalFollowers.size} 
partition(s) to " +
+      "local followers.")
+    replicaFetcherManager.removeFetcherForPartitions(newLocalFollowers.keySet)
+    val shuttingDown = isShuttingDown.get()
+    val partitionsToMakeFollower = new mutable.HashMap[TopicPartition, 
InitialFetchState]
+    val newFollowerTopicSet = new mutable.HashSet[String]
+    newLocalFollowers.forKeyValue { case (tp, info) =>
+      getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, 
isNew) =>
+        try {
+          newFollowerTopicSet.add(tp.topic())
+
+          completeDelayedFetchOrProduceRequests(tp)
+
+          // Create the local replica even if the leader is unavailable. This 
is required
+          // to ensure that we include the partition's high watermark in the 
checkpoint
+          // file (see KAFKA-1647)
+          partition.createLogIfNotExists(isNew, false, offsetCheckpoints, 
Some(info.topicId))
+
+          if (shuttingDown) {
+            stateChangeLogger.trace(s"Unable to start fetching ${tp} with 
topic " +
+              s"ID ${info.topicId} because the replica manager is shutting 
down.")
+          } else {
+            val listenerName = config.interBrokerListenerName.value()
+            val leader = info.partition.leader
+            
Option(newImage.cluster().broker(leader)).flatMap(_.node(listenerName).asScala) 
match {
+              case None => stateChangeLogger.trace(s"Unable to start fetching 
${tp} " +
+                s"with topic ID ${info.topicId} from leader ${leader} because 
it is not " +
+                "alive.")
+              case Some(node) =>
+                val leaderEndPoint = new BrokerEndPoint(node.id(), 
node.host(), node.port())
+                val log = partition.localLogOrException
+                val fetchOffset = initialFetchOffset(log)
+                partitionsToMakeFollower.put(tp,
+                  InitialFetchState(leaderEndPoint, partition.getLeaderEpoch, 
fetchOffset))
+            }
+          }
+          changedPartitions.add(partition)
+        } catch {
+          case e: Throwable => stateChangeLogger.error(s"Unable to start 
fetching ${tp} " +

Review comment:
       Perhaps we should consider adding this partition to the failed 
partitions that are tracked in the ReplicaFetcherManager.

##########
File path: 
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
##########
@@ -70,230 +58,188 @@ class BrokerMetadataListener(
    */
   @volatile private var _highestMetadataOffset = -1L
 
+  /**
+   * The current broker metadata image. Accessed only from the event queue 
thread.
+   */
+  private var _image = MetadataImage.EMPTY
+
+  /**
+   * The current metadata delta. Accessed only from the event queue thread.
+   */
+  private var _delta = new MetadataDelta(_image)
+
+  /**
+   * The object to use to publish new metadata changes, or None if this 
listener has not
+   * been activated yet.
+   */
+  private var _publisher: Option[MetadataPublisher] = None
+
+  /**
+   * The event queue which runs this listener.
+   */
   val eventQueue = new KafkaEventQueue(time, logContext, 
threadNamePrefix.getOrElse(""))
 
+  /**
+   * Returns the highest metadata-offset. Thread-safe.
+   */
   def highestMetadataOffset(): Long = _highestMetadataOffset
 
   /**
    * Handle new metadata records.
    */
-  override def handleCommit(reader: BatchReader[ApiMessageAndVersion]): Unit = 
{
+  override def handleCommit(reader: BatchReader[ApiMessageAndVersion]): Unit =
     eventQueue.append(new HandleCommitsEvent(reader))
+
+  class HandleCommitsEvent(reader: BatchReader[ApiMessageAndVersion])
+      extends EventQueue.FailureLoggingEvent(log) {
+    override def run(): Unit = {
+      val results = try {
+        val loadResults = loadBatches(_delta, reader)
+        if (isDebugEnabled) {
+          debug(s"Loaded new commits: ${loadResults}")
+        }
+        loadResults
+      } finally {
+        reader.close()
+      }
+      maybePublish(results.highestMetadataOffset)
+    }
   }
 
   /**
    * Handle metadata snapshots
    */
-  override def handleSnapshot(reader: SnapshotReader[ApiMessageAndVersion]): 
Unit = {
-    // Loading snapshot on the broker is currently not supported.
-    reader.close();
-    throw new UnsupportedOperationException(s"Loading snapshot 
(${reader.snapshotId()}) is not supported")
-  }
-
-  // Visible for testing. It's useful to execute events synchronously in order
-  // to make tests deterministic. This object is responsible for closing the 
reader.
-  private[metadata] def execCommits(batchReader: 
BatchReader[ApiMessageAndVersion]): Unit = {
-    new HandleCommitsEvent(batchReader).run()
-  }
+  override def handleSnapshot(reader: SnapshotReader[ApiMessageAndVersion]): 
Unit =
+    eventQueue.append(new HandleSnapshotEvent(reader))
 
-  class HandleCommitsEvent(
-    reader: BatchReader[ApiMessageAndVersion]
-  ) extends EventQueue.FailureLoggingEvent(log) {
+  class HandleSnapshotEvent(reader: SnapshotReader[ApiMessageAndVersion])
+    extends EventQueue.FailureLoggingEvent(log) {
     override def run(): Unit = {
-      try {
-        while (reader.hasNext()) {
-          apply(reader.next())
-        }
+      val results = try {
+        info(s"Loading snapshot 
${reader.snapshotId().offset}-${reader.snapshotId().epoch}.")
+        _delta = new MetadataDelta(_image) // Discard any previous deltas.
+        val loadResults = loadBatches(_delta, reader)
+        _delta.finishSnapshot()

Review comment:
       I found this part to be confusing at first, but I think I get it now. 
Basically the purpose of `finishSnapshot` is to ensure that any keys present in 
the old image but not the new image get removed in the delta. Do I have that 
right? I suspect we can improve the name, but I'm not sure I have any great 
suggestions. At least it would be useful to document somewhere how the api 
works.

##########
File path: 
core/src/main/scala/kafka/server/metadata/ClientQuotaMetadataManager.scala
##########
@@ -115,36 +117,28 @@ class ClientQuotaMetadataManager(private[metadata] val 
quotaManagers: QuotaManag
       case _ => throw new IllegalStateException("Should only handle IP quota 
entities here")
     }
 
-    // The connection quota only understands the connection rate limit
-    if (quotaRecord.key() != QuotaConfigs.IP_CONNECTION_RATE_OVERRIDE_CONFIG) {
-      warn(s"Ignoring unexpected quota key ${quotaRecord.key()} for entity 
$ipEntity")
-      return
-    }
-
-    // Convert the value to an appropriate Option for the quota manager
-    val newValue = if (quotaRecord.remove()) {
-      None
-    } else {
-      Some(quotaRecord.value).map(_.toInt)
-    }
-    try {
-      connectionQuotas.updateIpConnectionRateQuota(inetAddress, newValue)
-    } catch {
-      case t: Throwable => error(s"Failed to update IP quota $ipEntity", t)
+    quotaDelta.changes().entrySet().forEach { e =>
+      // The connection quota only understands the connection rate limit
+      if (!e.getKey().equals(QuotaConfigs.IP_CONNECTION_RATE_OVERRIDE_CONFIG)) 
{

Review comment:
       nit: it would make the code a little more readable if use used vals with 
good names for `e.getKey` and `e.getValue`

##########
File path: 
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
##########
@@ -70,230 +58,188 @@ class BrokerMetadataListener(
    */
   @volatile private var _highestMetadataOffset = -1L
 
+  /**
+   * The current broker metadata image. Accessed only from the event queue 
thread.
+   */
+  private var _image = MetadataImage.EMPTY
+
+  /**
+   * The current metadata delta. Accessed only from the event queue thread.
+   */
+  private var _delta = new MetadataDelta(_image)
+
+  /**
+   * The object to use to publish new metadata changes, or None if this 
listener has not
+   * been activated yet.
+   */
+  private var _publisher: Option[MetadataPublisher] = None
+
+  /**
+   * The event queue which runs this listener.
+   */
   val eventQueue = new KafkaEventQueue(time, logContext, 
threadNamePrefix.getOrElse(""))
 
+  /**
+   * Returns the highest metadata-offset. Thread-safe.
+   */
   def highestMetadataOffset(): Long = _highestMetadataOffset
 
   /**
    * Handle new metadata records.
    */
-  override def handleCommit(reader: BatchReader[ApiMessageAndVersion]): Unit = 
{
+  override def handleCommit(reader: BatchReader[ApiMessageAndVersion]): Unit =
     eventQueue.append(new HandleCommitsEvent(reader))
+
+  class HandleCommitsEvent(reader: BatchReader[ApiMessageAndVersion])
+      extends EventQueue.FailureLoggingEvent(log) {
+    override def run(): Unit = {
+      val results = try {
+        val loadResults = loadBatches(_delta, reader)
+        if (isDebugEnabled) {
+          debug(s"Loaded new commits: ${loadResults}")
+        }
+        loadResults
+      } finally {
+        reader.close()
+      }
+      maybePublish(results.highestMetadataOffset)
+    }
   }
 
   /**
    * Handle metadata snapshots
    */
-  override def handleSnapshot(reader: SnapshotReader[ApiMessageAndVersion]): 
Unit = {
-    // Loading snapshot on the broker is currently not supported.
-    reader.close();
-    throw new UnsupportedOperationException(s"Loading snapshot 
(${reader.snapshotId()}) is not supported")
-  }
-
-  // Visible for testing. It's useful to execute events synchronously in order
-  // to make tests deterministic. This object is responsible for closing the 
reader.
-  private[metadata] def execCommits(batchReader: 
BatchReader[ApiMessageAndVersion]): Unit = {
-    new HandleCommitsEvent(batchReader).run()
-  }
+  override def handleSnapshot(reader: SnapshotReader[ApiMessageAndVersion]): 
Unit =
+    eventQueue.append(new HandleSnapshotEvent(reader))
 
-  class HandleCommitsEvent(
-    reader: BatchReader[ApiMessageAndVersion]
-  ) extends EventQueue.FailureLoggingEvent(log) {
+  class HandleSnapshotEvent(reader: SnapshotReader[ApiMessageAndVersion])
+    extends EventQueue.FailureLoggingEvent(log) {
     override def run(): Unit = {
-      try {
-        while (reader.hasNext()) {
-          apply(reader.next())
-        }
+      val results = try {
+        info(s"Loading snapshot 
${reader.snapshotId().offset}-${reader.snapshotId().epoch}.")
+        _delta = new MetadataDelta(_image) // Discard any previous deltas.
+        val loadResults = loadBatches(_delta, reader)
+        _delta.finishSnapshot()
+        info(s"Loaded snapshot 
${reader.snapshotId().offset}-${reader.snapshotId().epoch}: " +
+          s"${loadResults}")
+        loadResults
       } finally {
         reader.close()
       }
+      maybePublish(results.highestMetadataOffset)
     }
+  }
 
-    private def apply(batch: Batch[ApiMessageAndVersion]): Unit = {
-      val records = batch.records
-      val lastOffset = batch.lastOffset
+  case class BatchLoadResults(numBatches: Int,
+                              numRecords: Int,
+                              elapsedUs: Long,
+                              highestMetadataOffset: Long) {
+    override def toString(): String = {
+      s"${numBatches} batch(es) with ${numRecords} record(s) ending at offset 
" +
+      s"${highestMetadataOffset} in ${elapsedUs} microseconds"
+    }
+  }
 
-      if (isDebugEnabled) {
-        debug(s"Metadata batch $lastOffset: handling ${records.size()} 
record(s).")
-      }
-      val imageBuilder =
-        MetadataImageBuilder(brokerId, log, metadataCache.currentImage())
-      val startNs = time.nanoseconds()
+  private def loadBatches(delta: MetadataDelta,
+                         iterator: 
util.Iterator[Batch[ApiMessageAndVersion]]): BatchLoadResults = {

Review comment:
       nit: misaligned

##########
File path: core/src/main/scala/kafka/server/metadata/MetadataPublisher.scala
##########
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server.metadata
+
+import org.apache.kafka.image.{MetadataDelta, MetadataImage}
+
+trait MetadataPublisher {
+  def publish(newHighestMetadataOffset: Long,

Review comment:
       Might be helpful to have a doc here. Specifically I think it's useful to 
note that the new image is the result of applying the passed delta to the 
previous image.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to