hachikuji commented on a change in pull request #10931:
URL: https://github.com/apache/kafka/pull/10931#discussion_r661670006



##########
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##########
@@ -264,19 +266,13 @@ class BrokerServer(
       /* Add all reconfigurables for config change notification before 
starting the metadata listener */
       config.dynamicConfig.addReconfigurables(this)
 
-      val clientQuotaMetadataManager = new ClientQuotaMetadataManager(
-        quotaManagers, socketServer.connectionQuotas, quotaCache)
+      // TODO : check that we're properly handling  "ConfigType" Client, User, 
and Ip (which are not

Review comment:
       Do you want to finish this or should we file a jira?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/DescribeClientQuotasResponse.java
##########
@@ -84,6 +84,7 @@ public static DescribeClientQuotasResponse parse(ByteBuffer 
buffer, short versio
         return new DescribeClientQuotasResponse(new 
DescribeClientQuotasResponseData(new ByteBufferAccessor(buffer), version));
     }
 
+    // TODO: remove this function

Review comment:
       Do you want to do this?

##########
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##########
@@ -96,7 +87,7 @@ object MetadataCache {
     new ZkMetadataCache(brokerId)
   }
 
-  def raftMetadataCache(brokerId: Int): RaftMetadataCache = {
-    new RaftMetadataCache(brokerId)
+  def raftMetadataCache(brokerId: Int): KRaftMetadataCache = {

Review comment:
       nit: perhaps we may as well change the name to `kraftMetadataCache` as 
well

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1302,7 +1278,8 @@ class ReplicaManager(val config: KafkaConfig,
         stateChangeLogger.warn(stateControllerEpochErrorMessage)
         throw new 
ControllerMovedException(stateChangeLogger.messageWithPrefix(stateControllerEpochErrorMessage))
       } else {
-        val deletedPartitions = metadataCache.updateMetadata(correlationId, 
updateMetadataRequest)
+        val zkMetadataCache = metadataCache.asInstanceOf[ZkMetadataCache]

Review comment:
       Could we use `MetadataSupport` in `ReplicaManager`?

##########
File path: core/src/main/scala/kafka/server/ConfigHandler.scala
##########
@@ -51,7 +51,8 @@ trait ConfigHandler {
   * The TopicConfigHandler will process topic config changes in ZK.
   * The callback provides the topic name and the full properties set read from 
ZK
   */
-class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: 
KafkaConfig, val quotas: QuotaManagers, kafkaController: KafkaController) 
extends ConfigHandler with Logging  {
+class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: 
KafkaConfig,

Review comment:
       nit: can we generalize the doc above to cover events from the metadata 
log?

##########
File path: core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
##########
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import kafka.controller.StateChangeLogger
+import kafka.server.MetadataCache
+import kafka.utils.Logging
+import org.apache.kafka.common.internals.Topic
+import 
org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition,
 MetadataResponseTopic}
+import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, 
Uuid}
+import 
org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.MetadataResponse
+import org.apache.kafka.image.MetadataImage
+import java.util
+import java.util.{Collections, Properties}
+import java.util.concurrent.ThreadLocalRandom
+
+import kafka.admin.BrokerMetadata
+import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.message.{DescribeClientQuotasRequestData, 
DescribeClientQuotasResponseData}
+import org.apache.kafka.metadata.{PartitionRegistration, Replicas}
+
+import scala.collection.{Seq, Set, mutable}
+import scala.jdk.CollectionConverters._
+import scala.compat.java8.OptionConverters._
+
+
+class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging 
with ConfigRepository {
+  this.logIdent = s"[MetadataCache brokerId=$brokerId] "
+
+  // This is the cache state. Every MetadataImage instance is immutable, and 
updates
+  // replace this value with a completely new one. This means reads (which are 
not under
+  // any lock) need to grab the value of this variable once, and retain that 
read copy for
+  // the /duration of their operation. Multiple reads of this value risk 
getting different

Review comment:
       nit: drop slash before duration

##########
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##########
@@ -331,46 +327,40 @@ class BrokerServer(
         new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
           KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
 
-      // Start processing requests once we've caught up on the metadata log, 
recovered logs if necessary,
-      // and started all services that we previously delayed starting.
-      val raftSupport = RaftSupport(forwardingManager, metadataCache, 
quotaCache)
+      // Create the request processor objects.
+      val raftSupport = RaftSupport(forwardingManager, metadataCache)
       dataPlaneRequestProcessor = new 
KafkaApis(socketServer.dataPlaneRequestChannel, raftSupport,
         replicaManager, groupCoordinator, transactionCoordinator, 
autoTopicCreationManager,
-        config.nodeId, config, configRepository, metadataCache, metrics, 
authorizer, quotaManagers,
+        config.nodeId, config, metadataCache, metadataCache, metrics, 
authorizer, quotaManagers,
         fetchManager, brokerTopicStats, clusterId, time, tokenManager, 
apiVersionManager)
 
-      dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId, 
socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
-        config.numIoThreads, 
s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", 
SocketServer.DataPlaneThreadPrefix)
-
-      socketServer.controlPlaneRequestChannelOpt.foreach { 
controlPlaneRequestChannel =>
-        controlPlaneRequestProcessor = new 
KafkaApis(controlPlaneRequestChannel, raftSupport,
-          replicaManager, groupCoordinator, transactionCoordinator, 
autoTopicCreationManager,
-          config.nodeId, config, configRepository, metadataCache, metrics, 
authorizer, quotaManagers,
-          fetchManager, brokerTopicStats, clusterId, time, tokenManager, 
apiVersionManager)
+      dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId,
+        socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
+        config.numIoThreads, 
s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent",
+        SocketServer.DataPlaneThreadPrefix)
 
-        controlPlaneRequestHandlerPool = new 
KafkaRequestHandlerPool(config.nodeId, 
socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, 
time,
-          1, 
s"${SocketServer.ControlPlaneMetricPrefix}RequestHandlerAvgIdlePercent", 
SocketServer.ControlPlaneThreadPrefix)
+      if (socketServer.controlPlaneRequestChannelOpt.isDefined) {
+        throw new RuntimeException("KIP-291 endpoints are not supported when 
in KRaft mode.")

Review comment:
       nit: It would be better to refer to the configuration. No one will know 
what KIP-291 is.

##########
File path: 
core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
##########
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import kafka.coordinator.group.GroupCoordinator
+import kafka.coordinator.transaction.TransactionCoordinator
+import kafka.log.{Log, LogManager}
+import kafka.server.ConfigType
+import kafka.server.{ConfigHandler, FinalizedFeatureCache, KafkaConfig, 
ReplicaManager, RequestLocal}
+import kafka.utils.Logging
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta}
+
+import scala.collection.mutable
+
+
+object BrokerMetadataPublisher {
+  /**
+   * Given a topic name, find out if it changed. Note: if a topic named X was 
deleted and
+   * then re-created, this method will return just the re-creation. The 
deletion will show
+   * up in deletedTopicIds and must be handled separately.
+   *
+   * @param topicName   The topic name.
+   * @param newImage    The new metadata image.
+   * @param delta       The metadata delta to search.
+   *
+   * @return            The delta, or None if appropriate.
+   */
+  def getTopicDelta(topicName: String,
+                    newImage: MetadataImage,
+                    delta: MetadataDelta): Option[TopicDelta] = {
+    Option(newImage.topics().getTopic(topicName)).map {
+      topicImage => delta.topicsDelta().changedTopic(topicImage.id())
+    }
+  }
+
+  /**
+   * Find logs which should not be on the current broker, according to the 
metadata image.
+   *
+   * @param brokerId  The ID of the current broker.
+   * @param newImage  The metadata image.
+   * @param logs      A collection of Log objects.
+   *
+   * @return          The topic partitions which are no longer needed on this 
broker.
+   */
+  def findGhostReplicas(brokerId: Int,
+                        newImage: MetadataImage,
+                        logs: Iterable[Log]): Iterable[TopicPartition] = {
+    logs.flatMap { log =>
+      log.topicId match {
+        case None => throw new RuntimeException(s"Topic ${log.name} does not 
have a topic ID, " +
+          "which is not allowed when running in KRaft mode.")
+        case Some(topicId) =>
+          val partitionId = log.topicPartition.partition()
+          Option(newImage.topics().getPartition(topicId, partitionId)) match {
+            case None => None
+            case Some(partition) => if (partition.replicas.contains(brokerId)) 
{
+              Some(log.topicPartition)
+            } else {
+              None
+            }
+          }
+      }
+    }
+  }
+}
+
+class BrokerMetadataPublisher(conf: KafkaConfig,
+                              metadataCache: KRaftMetadataCache,
+                              logManager: LogManager,
+                              replicaManager: ReplicaManager,
+                              groupCoordinator: GroupCoordinator,
+                              txnCoordinator: TransactionCoordinator,
+                              clientQuotaMetadataManager: 
ClientQuotaMetadataManager,
+                              featureCache: FinalizedFeatureCache,
+                              dynamicConfigHandlers: Map[String, 
ConfigHandler]) extends MetadataPublisher with Logging {
+  logIdent = s"[BrokerMetadataPublisher id=${conf.nodeId}] "
+
+  import BrokerMetadataPublisher._
+
+  /**
+   * The broker ID.
+   */
+  val brokerId = conf.nodeId
+
+  /**
+   * True if this is the first time we have published metadata.
+   */
+  var _firstPublish = false
+
+  override def publish(newHighestMetadataOffset: Long,
+                       delta: MetadataDelta,
+                       newImage: MetadataImage): Unit = {
+    try {
+      // Publish the new metadata image to the metadata cache.
+      metadataCache.setImage(newImage)
+
+      if (_firstPublish) {
+        info(s"Publishing initial metadata at offset 
${newHighestMetadataOffset}.")
+
+        // If this is the first metadata update we are applying, initialize 
the managers
+        // first (but after setting up the metadata cache).
+        initializeManagers()
+      } else if (isDebugEnabled) {
+        debug(s"Publishing metadata at offset ${newHighestMetadataOffset}.")
+      }
+
+      // Apply feature deltas.
+      Option(delta.featuresDelta()).foreach { featuresDelta =>
+        featureCache.update(featuresDelta, newHighestMetadataOffset)
+      }
+
+      // Apply topic deltas.
+      Option(delta.topicsDelta()).foreach { topicsDelta =>
+        // Notify the replica manager about changes to topics.
+        replicaManager.applyDelta(newImage, topicsDelta)
+
+        // Handle the case where the old consumer offsets topic was deleted.
+        if (topicsDelta.topicWasDeleted(Topic.GROUP_METADATA_TOPIC_NAME)) {
+          
topicsDelta.image().getTopic(Topic.GROUP_METADATA_TOPIC_NAME).partitions().entrySet().forEach
 {
+            entry =>
+              if (entry.getValue().leader == brokerId) {
+                groupCoordinator.onResignation(entry.getKey(), 
Some(entry.getValue().leaderEpoch))
+              }
+          }
+        }
+        // Handle the case where we have new local leaders or followers for 
the consumer
+        // offsets topic.
+        getTopicDelta(Topic.GROUP_METADATA_TOPIC_NAME, newImage, 
delta).foreach { topicDelta =>
+          topicDelta.newLocalLeaders(brokerId).forEach {
+            entry => groupCoordinator.onElection(entry.getKey(), 
entry.getValue().leaderEpoch)
+          }
+          topicDelta.newLocalFollowers(brokerId).forEach {
+            entry => groupCoordinator.onResignation(entry.getKey(), 
Some(entry.getValue().leaderEpoch))
+          }
+        }
+
+        // Handle the case where the old transaction state topic was deleted.
+        if (topicsDelta.topicWasDeleted(Topic.TRANSACTION_STATE_TOPIC_NAME)) {
+          
topicsDelta.image().getTopic(Topic.TRANSACTION_STATE_TOPIC_NAME).partitions().entrySet().forEach
 {
+            entry =>
+              if (entry.getValue().leader == brokerId) {
+                txnCoordinator.onResignation(entry.getKey(), 
Some(entry.getValue().leaderEpoch))
+              }
+          }
+        }
+        // If the transaction state topic changed in a way that's relevant to 
this broker,
+        // notify the transaction coordinator.
+        getTopicDelta(Topic.TRANSACTION_STATE_TOPIC_NAME, newImage, 
delta).foreach { topicDelta =>
+          topicDelta.newLocalLeaders(brokerId).forEach {
+            entry => txnCoordinator.onElection(entry.getKey(), 
entry.getValue().leaderEpoch)
+          }
+          topicDelta.newLocalFollowers(brokerId).forEach {
+            entry => txnCoordinator.onResignation(entry.getKey(), 
Some(entry.getValue().leaderEpoch))
+          }
+        }
+
+        // Notify the group coordinator about deleted topics.
+        val deletedTopicPartitions = new mutable.ArrayBuffer[TopicPartition]()
+        topicsDelta.deletedTopicIds().forEach { id =>
+          val topicImage = topicsDelta.image().getTopic(id)
+          topicImage.partitions().keySet().forEach {
+            id => deletedTopicPartitions += new 
TopicPartition(topicImage.name(), id)
+          }
+        }
+        if (!deletedTopicPartitions.isEmpty) {

Review comment:
       nit: use nonEmpty

##########
File path: 
core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
##########
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import kafka.coordinator.group.GroupCoordinator
+import kafka.coordinator.transaction.TransactionCoordinator
+import kafka.log.{Log, LogManager}
+import kafka.server.ConfigType
+import kafka.server.{ConfigHandler, FinalizedFeatureCache, KafkaConfig, 
ReplicaManager, RequestLocal}
+import kafka.utils.Logging
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta}
+
+import scala.collection.mutable
+
+
+object BrokerMetadataPublisher {
+  /**
+   * Given a topic name, find out if it changed. Note: if a topic named X was 
deleted and
+   * then re-created, this method will return just the re-creation. The 
deletion will show
+   * up in deletedTopicIds and must be handled separately.
+   *
+   * @param topicName   The topic name.
+   * @param newImage    The new metadata image.
+   * @param delta       The metadata delta to search.
+   *
+   * @return            The delta, or None if appropriate.
+   */
+  def getTopicDelta(topicName: String,
+                    newImage: MetadataImage,
+                    delta: MetadataDelta): Option[TopicDelta] = {
+    Option(newImage.topics().getTopic(topicName)).map {
+      topicImage => delta.topicsDelta().changedTopic(topicImage.id())
+    }
+  }
+
+  /**
+   * Find logs which should not be on the current broker, according to the 
metadata image.
+   *
+   * @param brokerId  The ID of the current broker.
+   * @param newImage  The metadata image.
+   * @param logs      A collection of Log objects.
+   *
+   * @return          The topic partitions which are no longer needed on this 
broker.
+   */
+  def findGhostReplicas(brokerId: Int,
+                        newImage: MetadataImage,
+                        logs: Iterable[Log]): Iterable[TopicPartition] = {
+    logs.flatMap { log =>
+      log.topicId match {
+        case None => throw new RuntimeException(s"Topic ${log.name} does not 
have a topic ID, " +
+          "which is not allowed when running in KRaft mode.")
+        case Some(topicId) =>
+          val partitionId = log.topicPartition.partition()
+          Option(newImage.topics().getPartition(topicId, partitionId)) match {
+            case None => None
+            case Some(partition) => if (partition.replicas.contains(brokerId)) 
{
+              Some(log.topicPartition)
+            } else {
+              None
+            }
+          }
+      }
+    }
+  }
+}
+
+class BrokerMetadataPublisher(conf: KafkaConfig,
+                              metadataCache: KRaftMetadataCache,
+                              logManager: LogManager,
+                              replicaManager: ReplicaManager,
+                              groupCoordinator: GroupCoordinator,
+                              txnCoordinator: TransactionCoordinator,
+                              clientQuotaMetadataManager: 
ClientQuotaMetadataManager,
+                              featureCache: FinalizedFeatureCache,
+                              dynamicConfigHandlers: Map[String, 
ConfigHandler]) extends MetadataPublisher with Logging {
+  logIdent = s"[BrokerMetadataPublisher id=${conf.nodeId}] "
+
+  import BrokerMetadataPublisher._
+
+  /**
+   * The broker ID.
+   */
+  val brokerId = conf.nodeId
+
+  /**
+   * True if this is the first time we have published metadata.
+   */
+  var _firstPublish = false

Review comment:
       Do you mean to initialize this to `true`? Otherwise it is always false.

##########
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##########
@@ -31,9 +31,9 @@ import kafka.metrics.KafkaYammerMetrics
 import kafka.network.SocketServer
 import kafka.raft.RaftManager
 import kafka.security.CredentialProvider
-import kafka.server.metadata.{BrokerMetadataListener, CachedConfigRepository, 
ClientQuotaCache, ClientQuotaMetadataManager, RaftMetadataCache}
+import kafka.server.metadata.{BrokerMetadataListener, BrokerMetadataPublisher, 
ClientQuotaMetadataManager, KRaftMetadataCache}
 import kafka.utils.{CoreUtils, KafkaScheduler}
-import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, 
TRANSACTION_STATE_TOPIC_NAME}
+//import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, 
TRANSACTION_STATE_TOPIC_NAME}

Review comment:
       nit: remove this?

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
##########
@@ -127,6 +131,22 @@ class FinalizedFeatureCache(private val brokerFeatures: 
BrokerFeatures) extends
     }
   }
 
+  def update(featuresDelta: FeaturesDelta, highestMetadataOffset: Long): Unit 
= {
+    val features = featuresAndEpoch.getOrElse(
+      FinalizedFeaturesAndEpoch(Features.emptyFinalizedFeatures(), -1))
+    val newFeatures = new util.HashMap[String, FinalizedVersionRange]()
+    newFeatures.putAll(features.features.features())

Review comment:
       features.features.features eh?

##########
File path: 
core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
##########
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import kafka.coordinator.group.GroupCoordinator
+import kafka.coordinator.transaction.TransactionCoordinator
+import kafka.log.{Log, LogManager}
+import kafka.server.ConfigType
+import kafka.server.{ConfigHandler, FinalizedFeatureCache, KafkaConfig, 
ReplicaManager, RequestLocal}
+import kafka.utils.Logging
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta}
+
+import scala.collection.mutable
+
+
+object BrokerMetadataPublisher {
+  /**
+   * Given a topic name, find out if it changed. Note: if a topic named X was 
deleted and
+   * then re-created, this method will return just the re-creation. The 
deletion will show
+   * up in deletedTopicIds and must be handled separately.
+   *
+   * @param topicName   The topic name.
+   * @param newImage    The new metadata image.
+   * @param delta       The metadata delta to search.
+   *
+   * @return            The delta, or None if appropriate.
+   */
+  def getTopicDelta(topicName: String,
+                    newImage: MetadataImage,
+                    delta: MetadataDelta): Option[TopicDelta] = {
+    Option(newImage.topics().getTopic(topicName)).map {
+      topicImage => delta.topicsDelta().changedTopic(topicImage.id())
+    }
+  }
+
+  /**
+   * Find logs which should not be on the current broker, according to the 
metadata image.
+   *
+   * @param brokerId  The ID of the current broker.
+   * @param newImage  The metadata image.
+   * @param logs      A collection of Log objects.
+   *
+   * @return          The topic partitions which are no longer needed on this 
broker.
+   */
+  def findGhostReplicas(brokerId: Int,
+                        newImage: MetadataImage,
+                        logs: Iterable[Log]): Iterable[TopicPartition] = {
+    logs.flatMap { log =>
+      log.topicId match {
+        case None => throw new RuntimeException(s"Topic ${log.name} does not 
have a topic ID, " +
+          "which is not allowed when running in KRaft mode.")
+        case Some(topicId) =>
+          val partitionId = log.topicPartition.partition()
+          Option(newImage.topics().getPartition(topicId, partitionId)) match {
+            case None => None
+            case Some(partition) => if (partition.replicas.contains(brokerId)) 
{
+              Some(log.topicPartition)
+            } else {
+              None
+            }
+          }
+      }
+    }
+  }
+}
+
+class BrokerMetadataPublisher(conf: KafkaConfig,
+                              metadataCache: KRaftMetadataCache,
+                              logManager: LogManager,
+                              replicaManager: ReplicaManager,
+                              groupCoordinator: GroupCoordinator,
+                              txnCoordinator: TransactionCoordinator,
+                              clientQuotaMetadataManager: 
ClientQuotaMetadataManager,
+                              featureCache: FinalizedFeatureCache,
+                              dynamicConfigHandlers: Map[String, 
ConfigHandler]) extends MetadataPublisher with Logging {
+  logIdent = s"[BrokerMetadataPublisher id=${conf.nodeId}] "
+
+  import BrokerMetadataPublisher._
+
+  /**
+   * The broker ID.
+   */
+  val brokerId = conf.nodeId
+
+  /**
+   * True if this is the first time we have published metadata.
+   */
+  var _firstPublish = false
+
+  override def publish(newHighestMetadataOffset: Long,
+                       delta: MetadataDelta,
+                       newImage: MetadataImage): Unit = {
+    try {
+      // Publish the new metadata image to the metadata cache.
+      metadataCache.setImage(newImage)
+
+      if (_firstPublish) {
+        info(s"Publishing initial metadata at offset 
${newHighestMetadataOffset}.")
+
+        // If this is the first metadata update we are applying, initialize 
the managers
+        // first (but after setting up the metadata cache).
+        initializeManagers()
+      } else if (isDebugEnabled) {
+        debug(s"Publishing metadata at offset ${newHighestMetadataOffset}.")
+      }
+
+      // Apply feature deltas.
+      Option(delta.featuresDelta()).foreach { featuresDelta =>
+        featureCache.update(featuresDelta, newHighestMetadataOffset)
+      }
+
+      // Apply topic deltas.
+      Option(delta.topicsDelta()).foreach { topicsDelta =>
+        // Notify the replica manager about changes to topics.
+        replicaManager.applyDelta(newImage, topicsDelta)
+
+        // Handle the case where the old consumer offsets topic was deleted.
+        if (topicsDelta.topicWasDeleted(Topic.GROUP_METADATA_TOPIC_NAME)) {
+          
topicsDelta.image().getTopic(Topic.GROUP_METADATA_TOPIC_NAME).partitions().entrySet().forEach
 {
+            entry =>
+              if (entry.getValue().leader == brokerId) {
+                groupCoordinator.onResignation(entry.getKey(), 
Some(entry.getValue().leaderEpoch))
+              }
+          }
+        }
+        // Handle the case where we have new local leaders or followers for 
the consumer
+        // offsets topic.
+        getTopicDelta(Topic.GROUP_METADATA_TOPIC_NAME, newImage, 
delta).foreach { topicDelta =>
+          topicDelta.newLocalLeaders(brokerId).forEach {
+            entry => groupCoordinator.onElection(entry.getKey(), 
entry.getValue().leaderEpoch)
+          }
+          topicDelta.newLocalFollowers(brokerId).forEach {
+            entry => groupCoordinator.onResignation(entry.getKey(), 
Some(entry.getValue().leaderEpoch))
+          }
+        }
+
+        // Handle the case where the old transaction state topic was deleted.
+        if (topicsDelta.topicWasDeleted(Topic.TRANSACTION_STATE_TOPIC_NAME)) {
+          
topicsDelta.image().getTopic(Topic.TRANSACTION_STATE_TOPIC_NAME).partitions().entrySet().forEach
 {
+            entry =>
+              if (entry.getValue().leader == brokerId) {
+                txnCoordinator.onResignation(entry.getKey(), 
Some(entry.getValue().leaderEpoch))
+              }
+          }
+        }
+        // If the transaction state topic changed in a way that's relevant to 
this broker,
+        // notify the transaction coordinator.
+        getTopicDelta(Topic.TRANSACTION_STATE_TOPIC_NAME, newImage, 
delta).foreach { topicDelta =>
+          topicDelta.newLocalLeaders(brokerId).forEach {
+            entry => txnCoordinator.onElection(entry.getKey(), 
entry.getValue().leaderEpoch)
+          }
+          topicDelta.newLocalFollowers(brokerId).forEach {
+            entry => txnCoordinator.onResignation(entry.getKey(), 
Some(entry.getValue().leaderEpoch))
+          }
+        }
+
+        // Notify the group coordinator about deleted topics.
+        val deletedTopicPartitions = new mutable.ArrayBuffer[TopicPartition]()
+        topicsDelta.deletedTopicIds().forEach { id =>
+          val topicImage = topicsDelta.image().getTopic(id)
+          topicImage.partitions().keySet().forEach {
+            id => deletedTopicPartitions += new 
TopicPartition(topicImage.name(), id)
+          }
+        }
+        if (!deletedTopicPartitions.isEmpty) {
+          groupCoordinator.handleDeletedPartitions(deletedTopicPartitions, 
RequestLocal.NoCaching)
+        }
+      }
+
+      // Apply configuration deltas.
+      Option(delta.configsDelta()).foreach { configsDelta =>
+        configsDelta.changes().keySet().forEach { configResource =>
+          val tag = configResource.`type`() match {
+            case ConfigResource.Type.TOPIC => Some(ConfigType.Topic)
+            case ConfigResource.Type.BROKER => Some(ConfigType.Broker)
+            case _ => None
+          }
+          tag.foreach { t =>
+            val newProperties = 
newImage.configs().configProperties(configResource)
+            
dynamicConfigHandlers(t).processConfigChanges(configResource.name(), 
newProperties)
+          }
+        }
+      }
+
+      // Apply client quotas delta.
+      Option(delta.clientQuotasDelta()).foreach { clientQuotasDelta =>
+        clientQuotaMetadataManager.update(clientQuotasDelta)
+      }
+
+      if (_firstPublish) {
+        finishInitializingReplicaManager(newImage)
+      }
+    } catch {
+      case t: Throwable => error(s"Error publishing broker metadata at 
${newHighestMetadataOffset}", t)
+        throw t
+    } finally {
+      _firstPublish = false
+    }
+  }
+
+  private def initializeManagers(): Unit = {
+    // Start log manager, which will perform (potentially lengthy)
+    // recovery-from-unclean-shutdown if required.
+    logManager.startup(metadataCache.getAllTopics())
+
+    // Start the replica manager.
+    replicaManager.startup()

Review comment:
       A little surprising to find this here given `BrokerServer` owns the rest 
of the lifecycle for these managers. Not sure if it would be any better with a 
callback to `BrokerServer`?

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
##########
@@ -127,6 +131,22 @@ class FinalizedFeatureCache(private val brokerFeatures: 
BrokerFeatures) extends
     }
   }
 
+  def update(featuresDelta: FeaturesDelta, highestMetadataOffset: Long): Unit 
= {
+    val features = featuresAndEpoch.getOrElse(
+      FinalizedFeaturesAndEpoch(Features.emptyFinalizedFeatures(), -1))
+    val newFeatures = new util.HashMap[String, FinalizedVersionRange]()
+    newFeatures.putAll(features.features.features())
+    featuresDelta.changes().entrySet().forEach { e =>
+      e.getValue().asScala match {
+        case None => newFeatures.remove(e.getKey)
+        case Some(feature) => newFeatures.put(e.getKey,
+          new FinalizedVersionRange(feature.min(), feature.max()))
+      }
+    }
+    featuresAndEpoch = Some(FinalizedFeaturesAndEpoch(
+      Features.finalizedFeatures(newFeatures), highestMetadataOffset))

Review comment:
       Wonder if it's worth wrapping `newFeatures` in an unmodifiable map.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to