cmccabe commented on a change in pull request #10931:
URL: https://github.com/apache/kafka/pull/10931#discussion_r663274842



##########
File path: 
core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
##########
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import kafka.coordinator.group.GroupCoordinator
+import kafka.coordinator.transaction.TransactionCoordinator
+import kafka.log.{Log, LogManager}
+import kafka.server.ConfigType
+import kafka.server.{ConfigHandler, FinalizedFeatureCache, KafkaConfig, 
ReplicaManager, RequestLocal}
+import kafka.utils.Logging
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta}
+
+import scala.collection.mutable
+
+
+object BrokerMetadataPublisher {
+  /**
+   * Given a topic name, find out if it changed. Note: if a topic named X was 
deleted and
+   * then re-created, this method will return just the re-creation. The 
deletion will show
+   * up in deletedTopicIds and must be handled separately.
+   *
+   * @param topicName   The topic name.
+   * @param newImage    The new metadata image.
+   * @param delta       The metadata delta to search.
+   *
+   * @return            The delta, or None if appropriate.
+   */
+  def getTopicDelta(topicName: String,
+                    newImage: MetadataImage,
+                    delta: MetadataDelta): Option[TopicDelta] = {
+    Option(newImage.topics().getTopic(topicName)).map {
+      topicImage => delta.topicsDelta().changedTopic(topicImage.id())
+    }
+  }
+
+  /**
+   * Find logs which should not be on the current broker, according to the 
metadata image.
+   *
+   * @param brokerId  The ID of the current broker.
+   * @param newImage  The metadata image.
+   * @param logs      A collection of Log objects.
+   *
+   * @return          The topic partitions which are no longer needed on this 
broker.
+   */
+  def findGhostReplicas(brokerId: Int,
+                        newImage: MetadataImage,
+                        logs: Iterable[Log]): Iterable[TopicPartition] = {
+    logs.flatMap { log =>
+      log.topicId match {
+        case None => throw new RuntimeException(s"Topic ${log.name} does not 
have a topic ID, " +
+          "which is not allowed when running in KRaft mode.")
+        case Some(topicId) =>
+          val partitionId = log.topicPartition.partition()
+          Option(newImage.topics().getPartition(topicId, partitionId)) match {
+            case None => None
+            case Some(partition) => if (partition.replicas.contains(brokerId)) 
{
+              Some(log.topicPartition)
+            } else {
+              None
+            }
+          }
+      }
+    }
+  }
+}
+
+class BrokerMetadataPublisher(conf: KafkaConfig,
+                              metadataCache: KRaftMetadataCache,
+                              logManager: LogManager,
+                              replicaManager: ReplicaManager,
+                              groupCoordinator: GroupCoordinator,
+                              txnCoordinator: TransactionCoordinator,
+                              clientQuotaMetadataManager: 
ClientQuotaMetadataManager,
+                              featureCache: FinalizedFeatureCache,
+                              dynamicConfigHandlers: Map[String, 
ConfigHandler]) extends MetadataPublisher with Logging {
+  logIdent = s"[BrokerMetadataPublisher id=${conf.nodeId}] "
+
+  import BrokerMetadataPublisher._
+
+  /**
+   * The broker ID.
+   */
+  val brokerId = conf.nodeId
+
+  /**
+   * True if this is the first time we have published metadata.
+   */
+  var _firstPublish = false
+
+  override def publish(newHighestMetadataOffset: Long,
+                       delta: MetadataDelta,
+                       newImage: MetadataImage): Unit = {
+    try {
+      // Publish the new metadata image to the metadata cache.
+      metadataCache.setImage(newImage)
+
+      if (_firstPublish) {
+        info(s"Publishing initial metadata at offset 
${newHighestMetadataOffset}.")
+
+        // If this is the first metadata update we are applying, initialize 
the managers
+        // first (but after setting up the metadata cache).
+        initializeManagers()
+      } else if (isDebugEnabled) {
+        debug(s"Publishing metadata at offset ${newHighestMetadataOffset}.")
+      }
+
+      // Apply feature deltas.
+      Option(delta.featuresDelta()).foreach { featuresDelta =>
+        featureCache.update(featuresDelta, newHighestMetadataOffset)
+      }
+
+      // Apply topic deltas.
+      Option(delta.topicsDelta()).foreach { topicsDelta =>
+        // Notify the replica manager about changes to topics.
+        replicaManager.applyDelta(newImage, topicsDelta)
+
+        // Handle the case where the old consumer offsets topic was deleted.
+        if (topicsDelta.topicWasDeleted(Topic.GROUP_METADATA_TOPIC_NAME)) {
+          
topicsDelta.image().getTopic(Topic.GROUP_METADATA_TOPIC_NAME).partitions().entrySet().forEach
 {
+            entry =>
+              if (entry.getValue().leader == brokerId) {
+                groupCoordinator.onResignation(entry.getKey(), 
Some(entry.getValue().leaderEpoch))
+              }
+          }
+        }
+        // Handle the case where we have new local leaders or followers for 
the consumer
+        // offsets topic.
+        getTopicDelta(Topic.GROUP_METADATA_TOPIC_NAME, newImage, 
delta).foreach { topicDelta =>
+          topicDelta.newLocalLeaders(brokerId).forEach {
+            entry => groupCoordinator.onElection(entry.getKey(), 
entry.getValue().leaderEpoch)
+          }
+          topicDelta.newLocalFollowers(brokerId).forEach {
+            entry => groupCoordinator.onResignation(entry.getKey(), 
Some(entry.getValue().leaderEpoch))
+          }
+        }
+
+        // Handle the case where the old transaction state topic was deleted.
+        if (topicsDelta.topicWasDeleted(Topic.TRANSACTION_STATE_TOPIC_NAME)) {
+          
topicsDelta.image().getTopic(Topic.TRANSACTION_STATE_TOPIC_NAME).partitions().entrySet().forEach
 {
+            entry =>
+              if (entry.getValue().leader == brokerId) {
+                txnCoordinator.onResignation(entry.getKey(), 
Some(entry.getValue().leaderEpoch))
+              }
+          }
+        }
+        // If the transaction state topic changed in a way that's relevant to 
this broker,
+        // notify the transaction coordinator.
+        getTopicDelta(Topic.TRANSACTION_STATE_TOPIC_NAME, newImage, 
delta).foreach { topicDelta =>
+          topicDelta.newLocalLeaders(brokerId).forEach {
+            entry => txnCoordinator.onElection(entry.getKey(), 
entry.getValue().leaderEpoch)
+          }
+          topicDelta.newLocalFollowers(brokerId).forEach {
+            entry => txnCoordinator.onResignation(entry.getKey(), 
Some(entry.getValue().leaderEpoch))
+          }
+        }
+
+        // Notify the group coordinator about deleted topics.
+        val deletedTopicPartitions = new mutable.ArrayBuffer[TopicPartition]()
+        topicsDelta.deletedTopicIds().forEach { id =>
+          val topicImage = topicsDelta.image().getTopic(id)
+          topicImage.partitions().keySet().forEach {
+            id => deletedTopicPartitions += new 
TopicPartition(topicImage.name(), id)
+          }
+        }
+        if (!deletedTopicPartitions.isEmpty) {
+          groupCoordinator.handleDeletedPartitions(deletedTopicPartitions, 
RequestLocal.NoCaching)
+        }
+      }
+
+      // Apply configuration deltas.
+      Option(delta.configsDelta()).foreach { configsDelta =>
+        configsDelta.changes().keySet().forEach { configResource =>
+          val tag = configResource.`type`() match {
+            case ConfigResource.Type.TOPIC => Some(ConfigType.Topic)
+            case ConfigResource.Type.BROKER => Some(ConfigType.Broker)
+            case _ => None
+          }
+          tag.foreach { t =>
+            val newProperties = 
newImage.configs().configProperties(configResource)
+            
dynamicConfigHandlers(t).processConfigChanges(configResource.name(), 
newProperties)
+          }
+        }
+      }
+
+      // Apply client quotas delta.
+      Option(delta.clientQuotasDelta()).foreach { clientQuotasDelta =>
+        clientQuotaMetadataManager.update(clientQuotasDelta)
+      }
+
+      if (_firstPublish) {
+        finishInitializingReplicaManager(newImage)
+      }
+    } catch {
+      case t: Throwable => error(s"Error publishing broker metadata at 
${newHighestMetadataOffset}", t)
+        throw t
+    } finally {
+      _firstPublish = false
+    }
+  }
+
+  private def initializeManagers(): Unit = {
+    // Start log manager, which will perform (potentially lengthy)
+    // recovery-from-unclean-shutdown if required.
+    logManager.startup(metadataCache.getAllTopics())
+
+    // Start the replica manager.
+    replicaManager.startup()

Review comment:
       I was thinking about that too. If I can find a way to condense the 
initialization into a single startup callback, I'll try to put the code back 
into `BrokerServer`. It still needs to happen in the context of the metadata 
listener thread, though.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to