hachikuji commented on a change in pull request #10931: URL: https://github.com/apache/kafka/pull/10931#discussion_r661670006
########## File path: core/src/main/scala/kafka/server/BrokerServer.scala ########## @@ -264,19 +266,13 @@ class BrokerServer( /* Add all reconfigurables for config change notification before starting the metadata listener */ config.dynamicConfig.addReconfigurables(this) - val clientQuotaMetadataManager = new ClientQuotaMetadataManager( - quotaManagers, socketServer.connectionQuotas, quotaCache) + // TODO : check that we're properly handling "ConfigType" Client, User, and Ip (which are not Review comment: Do you want to finish this or should we file a jira? ########## File path: clients/src/main/java/org/apache/kafka/common/requests/DescribeClientQuotasResponse.java ########## @@ -84,6 +84,7 @@ public static DescribeClientQuotasResponse parse(ByteBuffer buffer, short versio return new DescribeClientQuotasResponse(new DescribeClientQuotasResponseData(new ByteBufferAccessor(buffer), version)); } + // TODO: remove this function Review comment: Do you want to do this? ########## File path: core/src/main/scala/kafka/server/MetadataCache.scala ########## @@ -96,7 +87,7 @@ object MetadataCache { new ZkMetadataCache(brokerId) } - def raftMetadataCache(brokerId: Int): RaftMetadataCache = { - new RaftMetadataCache(brokerId) + def raftMetadataCache(brokerId: Int): KRaftMetadataCache = { Review comment: nit: perhaps we may as well change the name to `kraftMetadataCache` as well ########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -1302,7 +1278,8 @@ class ReplicaManager(val config: KafkaConfig, stateChangeLogger.warn(stateControllerEpochErrorMessage) throw new ControllerMovedException(stateChangeLogger.messageWithPrefix(stateControllerEpochErrorMessage)) } else { - val deletedPartitions = metadataCache.updateMetadata(correlationId, updateMetadataRequest) + val zkMetadataCache = metadataCache.asInstanceOf[ZkMetadataCache] Review comment: Could we use `MetadataSupport` in `ReplicaManager`? ########## File path: core/src/main/scala/kafka/server/ConfigHandler.scala ########## @@ -51,7 +51,8 @@ trait ConfigHandler { * The TopicConfigHandler will process topic config changes in ZK. * The callback provides the topic name and the full properties set read from ZK */ -class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaConfig, val quotas: QuotaManagers, kafkaController: KafkaController) extends ConfigHandler with Logging { +class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaConfig, Review comment: nit: can we generalize the doc above to cover events from the metadata log? ########## File path: core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala ########## @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server.metadata + +import kafka.controller.StateChangeLogger +import kafka.server.MetadataCache +import kafka.utils.Logging +import org.apache.kafka.common.internals.Topic +import org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition, MetadataResponseTopic} +import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid} +import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState +import org.apache.kafka.common.network.ListenerName +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.MetadataResponse +import org.apache.kafka.image.MetadataImage +import java.util +import java.util.{Collections, Properties} +import java.util.concurrent.ThreadLocalRandom + +import kafka.admin.BrokerMetadata +import org.apache.kafka.common.config.ConfigResource +import org.apache.kafka.common.message.{DescribeClientQuotasRequestData, DescribeClientQuotasResponseData} +import org.apache.kafka.metadata.{PartitionRegistration, Replicas} + +import scala.collection.{Seq, Set, mutable} +import scala.jdk.CollectionConverters._ +import scala.compat.java8.OptionConverters._ + + +class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging with ConfigRepository { + this.logIdent = s"[MetadataCache brokerId=$brokerId] " + + // This is the cache state. Every MetadataImage instance is immutable, and updates + // replace this value with a completely new one. This means reads (which are not under + // any lock) need to grab the value of this variable once, and retain that read copy for + // the /duration of their operation. Multiple reads of this value risk getting different Review comment: nit: drop slash before duration ########## File path: core/src/main/scala/kafka/server/BrokerServer.scala ########## @@ -331,46 +327,40 @@ class BrokerServer( new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots, KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS)) - // Start processing requests once we've caught up on the metadata log, recovered logs if necessary, - // and started all services that we previously delayed starting. - val raftSupport = RaftSupport(forwardingManager, metadataCache, quotaCache) + // Create the request processor objects. + val raftSupport = RaftSupport(forwardingManager, metadataCache) dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, raftSupport, replicaManager, groupCoordinator, transactionCoordinator, autoTopicCreationManager, - config.nodeId, config, configRepository, metadataCache, metrics, authorizer, quotaManagers, + config.nodeId, config, metadataCache, metadataCache, metrics, authorizer, quotaManagers, fetchManager, brokerTopicStats, clusterId, time, tokenManager, apiVersionManager) - dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time, - config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix) - - socketServer.controlPlaneRequestChannelOpt.foreach { controlPlaneRequestChannel => - controlPlaneRequestProcessor = new KafkaApis(controlPlaneRequestChannel, raftSupport, - replicaManager, groupCoordinator, transactionCoordinator, autoTopicCreationManager, - config.nodeId, config, configRepository, metadataCache, metrics, authorizer, quotaManagers, - fetchManager, brokerTopicStats, clusterId, time, tokenManager, apiVersionManager) + dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId, + socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time, + config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", + SocketServer.DataPlaneThreadPrefix) - controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time, - 1, s"${SocketServer.ControlPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.ControlPlaneThreadPrefix) + if (socketServer.controlPlaneRequestChannelOpt.isDefined) { + throw new RuntimeException("KIP-291 endpoints are not supported when in KRaft mode.") Review comment: nit: It would be better to refer to the configuration. No one will know what KIP-291 is. ########## File path: core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala ########## @@ -0,0 +1,249 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server.metadata + +import kafka.coordinator.group.GroupCoordinator +import kafka.coordinator.transaction.TransactionCoordinator +import kafka.log.{Log, LogManager} +import kafka.server.ConfigType +import kafka.server.{ConfigHandler, FinalizedFeatureCache, KafkaConfig, ReplicaManager, RequestLocal} +import kafka.utils.Logging +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.config.ConfigResource +import org.apache.kafka.common.internals.Topic +import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta} + +import scala.collection.mutable + + +object BrokerMetadataPublisher { + /** + * Given a topic name, find out if it changed. Note: if a topic named X was deleted and + * then re-created, this method will return just the re-creation. The deletion will show + * up in deletedTopicIds and must be handled separately. + * + * @param topicName The topic name. + * @param newImage The new metadata image. + * @param delta The metadata delta to search. + * + * @return The delta, or None if appropriate. + */ + def getTopicDelta(topicName: String, + newImage: MetadataImage, + delta: MetadataDelta): Option[TopicDelta] = { + Option(newImage.topics().getTopic(topicName)).map { + topicImage => delta.topicsDelta().changedTopic(topicImage.id()) + } + } + + /** + * Find logs which should not be on the current broker, according to the metadata image. + * + * @param brokerId The ID of the current broker. + * @param newImage The metadata image. + * @param logs A collection of Log objects. + * + * @return The topic partitions which are no longer needed on this broker. + */ + def findGhostReplicas(brokerId: Int, + newImage: MetadataImage, + logs: Iterable[Log]): Iterable[TopicPartition] = { + logs.flatMap { log => + log.topicId match { + case None => throw new RuntimeException(s"Topic ${log.name} does not have a topic ID, " + + "which is not allowed when running in KRaft mode.") + case Some(topicId) => + val partitionId = log.topicPartition.partition() + Option(newImage.topics().getPartition(topicId, partitionId)) match { + case None => None + case Some(partition) => if (partition.replicas.contains(brokerId)) { + Some(log.topicPartition) + } else { + None + } + } + } + } + } +} + +class BrokerMetadataPublisher(conf: KafkaConfig, + metadataCache: KRaftMetadataCache, + logManager: LogManager, + replicaManager: ReplicaManager, + groupCoordinator: GroupCoordinator, + txnCoordinator: TransactionCoordinator, + clientQuotaMetadataManager: ClientQuotaMetadataManager, + featureCache: FinalizedFeatureCache, + dynamicConfigHandlers: Map[String, ConfigHandler]) extends MetadataPublisher with Logging { + logIdent = s"[BrokerMetadataPublisher id=${conf.nodeId}] " + + import BrokerMetadataPublisher._ + + /** + * The broker ID. + */ + val brokerId = conf.nodeId + + /** + * True if this is the first time we have published metadata. + */ + var _firstPublish = false + + override def publish(newHighestMetadataOffset: Long, + delta: MetadataDelta, + newImage: MetadataImage): Unit = { + try { + // Publish the new metadata image to the metadata cache. + metadataCache.setImage(newImage) + + if (_firstPublish) { + info(s"Publishing initial metadata at offset ${newHighestMetadataOffset}.") + + // If this is the first metadata update we are applying, initialize the managers + // first (but after setting up the metadata cache). + initializeManagers() + } else if (isDebugEnabled) { + debug(s"Publishing metadata at offset ${newHighestMetadataOffset}.") + } + + // Apply feature deltas. + Option(delta.featuresDelta()).foreach { featuresDelta => + featureCache.update(featuresDelta, newHighestMetadataOffset) + } + + // Apply topic deltas. + Option(delta.topicsDelta()).foreach { topicsDelta => + // Notify the replica manager about changes to topics. + replicaManager.applyDelta(newImage, topicsDelta) + + // Handle the case where the old consumer offsets topic was deleted. + if (topicsDelta.topicWasDeleted(Topic.GROUP_METADATA_TOPIC_NAME)) { + topicsDelta.image().getTopic(Topic.GROUP_METADATA_TOPIC_NAME).partitions().entrySet().forEach { + entry => + if (entry.getValue().leader == brokerId) { + groupCoordinator.onResignation(entry.getKey(), Some(entry.getValue().leaderEpoch)) + } + } + } + // Handle the case where we have new local leaders or followers for the consumer + // offsets topic. + getTopicDelta(Topic.GROUP_METADATA_TOPIC_NAME, newImage, delta).foreach { topicDelta => + topicDelta.newLocalLeaders(brokerId).forEach { + entry => groupCoordinator.onElection(entry.getKey(), entry.getValue().leaderEpoch) + } + topicDelta.newLocalFollowers(brokerId).forEach { + entry => groupCoordinator.onResignation(entry.getKey(), Some(entry.getValue().leaderEpoch)) + } + } + + // Handle the case where the old transaction state topic was deleted. + if (topicsDelta.topicWasDeleted(Topic.TRANSACTION_STATE_TOPIC_NAME)) { + topicsDelta.image().getTopic(Topic.TRANSACTION_STATE_TOPIC_NAME).partitions().entrySet().forEach { + entry => + if (entry.getValue().leader == brokerId) { + txnCoordinator.onResignation(entry.getKey(), Some(entry.getValue().leaderEpoch)) + } + } + } + // If the transaction state topic changed in a way that's relevant to this broker, + // notify the transaction coordinator. + getTopicDelta(Topic.TRANSACTION_STATE_TOPIC_NAME, newImage, delta).foreach { topicDelta => + topicDelta.newLocalLeaders(brokerId).forEach { + entry => txnCoordinator.onElection(entry.getKey(), entry.getValue().leaderEpoch) + } + topicDelta.newLocalFollowers(brokerId).forEach { + entry => txnCoordinator.onResignation(entry.getKey(), Some(entry.getValue().leaderEpoch)) + } + } + + // Notify the group coordinator about deleted topics. + val deletedTopicPartitions = new mutable.ArrayBuffer[TopicPartition]() + topicsDelta.deletedTopicIds().forEach { id => + val topicImage = topicsDelta.image().getTopic(id) + topicImage.partitions().keySet().forEach { + id => deletedTopicPartitions += new TopicPartition(topicImage.name(), id) + } + } + if (!deletedTopicPartitions.isEmpty) { Review comment: nit: use nonEmpty ########## File path: core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala ########## @@ -0,0 +1,249 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server.metadata + +import kafka.coordinator.group.GroupCoordinator +import kafka.coordinator.transaction.TransactionCoordinator +import kafka.log.{Log, LogManager} +import kafka.server.ConfigType +import kafka.server.{ConfigHandler, FinalizedFeatureCache, KafkaConfig, ReplicaManager, RequestLocal} +import kafka.utils.Logging +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.config.ConfigResource +import org.apache.kafka.common.internals.Topic +import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta} + +import scala.collection.mutable + + +object BrokerMetadataPublisher { + /** + * Given a topic name, find out if it changed. Note: if a topic named X was deleted and + * then re-created, this method will return just the re-creation. The deletion will show + * up in deletedTopicIds and must be handled separately. + * + * @param topicName The topic name. + * @param newImage The new metadata image. + * @param delta The metadata delta to search. + * + * @return The delta, or None if appropriate. + */ + def getTopicDelta(topicName: String, + newImage: MetadataImage, + delta: MetadataDelta): Option[TopicDelta] = { + Option(newImage.topics().getTopic(topicName)).map { + topicImage => delta.topicsDelta().changedTopic(topicImage.id()) + } + } + + /** + * Find logs which should not be on the current broker, according to the metadata image. + * + * @param brokerId The ID of the current broker. + * @param newImage The metadata image. + * @param logs A collection of Log objects. + * + * @return The topic partitions which are no longer needed on this broker. + */ + def findGhostReplicas(brokerId: Int, + newImage: MetadataImage, + logs: Iterable[Log]): Iterable[TopicPartition] = { + logs.flatMap { log => + log.topicId match { + case None => throw new RuntimeException(s"Topic ${log.name} does not have a topic ID, " + + "which is not allowed when running in KRaft mode.") + case Some(topicId) => + val partitionId = log.topicPartition.partition() + Option(newImage.topics().getPartition(topicId, partitionId)) match { + case None => None + case Some(partition) => if (partition.replicas.contains(brokerId)) { + Some(log.topicPartition) + } else { + None + } + } + } + } + } +} + +class BrokerMetadataPublisher(conf: KafkaConfig, + metadataCache: KRaftMetadataCache, + logManager: LogManager, + replicaManager: ReplicaManager, + groupCoordinator: GroupCoordinator, + txnCoordinator: TransactionCoordinator, + clientQuotaMetadataManager: ClientQuotaMetadataManager, + featureCache: FinalizedFeatureCache, + dynamicConfigHandlers: Map[String, ConfigHandler]) extends MetadataPublisher with Logging { + logIdent = s"[BrokerMetadataPublisher id=${conf.nodeId}] " + + import BrokerMetadataPublisher._ + + /** + * The broker ID. + */ + val brokerId = conf.nodeId + + /** + * True if this is the first time we have published metadata. + */ + var _firstPublish = false Review comment: Do you mean to initialize this to `true`? Otherwise it is always false. ########## File path: core/src/main/scala/kafka/server/BrokerServer.scala ########## @@ -31,9 +31,9 @@ import kafka.metrics.KafkaYammerMetrics import kafka.network.SocketServer import kafka.raft.RaftManager import kafka.security.CredentialProvider -import kafka.server.metadata.{BrokerMetadataListener, CachedConfigRepository, ClientQuotaCache, ClientQuotaMetadataManager, RaftMetadataCache} +import kafka.server.metadata.{BrokerMetadataListener, BrokerMetadataPublisher, ClientQuotaMetadataManager, KRaftMetadataCache} import kafka.utils.{CoreUtils, KafkaScheduler} -import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME} +//import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME} Review comment: nit: remove this? ########## File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala ########## @@ -127,6 +131,22 @@ class FinalizedFeatureCache(private val brokerFeatures: BrokerFeatures) extends } } + def update(featuresDelta: FeaturesDelta, highestMetadataOffset: Long): Unit = { + val features = featuresAndEpoch.getOrElse( + FinalizedFeaturesAndEpoch(Features.emptyFinalizedFeatures(), -1)) + val newFeatures = new util.HashMap[String, FinalizedVersionRange]() + newFeatures.putAll(features.features.features()) Review comment: features.features.features eh? ########## File path: core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala ########## @@ -0,0 +1,249 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server.metadata + +import kafka.coordinator.group.GroupCoordinator +import kafka.coordinator.transaction.TransactionCoordinator +import kafka.log.{Log, LogManager} +import kafka.server.ConfigType +import kafka.server.{ConfigHandler, FinalizedFeatureCache, KafkaConfig, ReplicaManager, RequestLocal} +import kafka.utils.Logging +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.config.ConfigResource +import org.apache.kafka.common.internals.Topic +import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta} + +import scala.collection.mutable + + +object BrokerMetadataPublisher { + /** + * Given a topic name, find out if it changed. Note: if a topic named X was deleted and + * then re-created, this method will return just the re-creation. The deletion will show + * up in deletedTopicIds and must be handled separately. + * + * @param topicName The topic name. + * @param newImage The new metadata image. + * @param delta The metadata delta to search. + * + * @return The delta, or None if appropriate. + */ + def getTopicDelta(topicName: String, + newImage: MetadataImage, + delta: MetadataDelta): Option[TopicDelta] = { + Option(newImage.topics().getTopic(topicName)).map { + topicImage => delta.topicsDelta().changedTopic(topicImage.id()) + } + } + + /** + * Find logs which should not be on the current broker, according to the metadata image. + * + * @param brokerId The ID of the current broker. + * @param newImage The metadata image. + * @param logs A collection of Log objects. + * + * @return The topic partitions which are no longer needed on this broker. + */ + def findGhostReplicas(brokerId: Int, + newImage: MetadataImage, + logs: Iterable[Log]): Iterable[TopicPartition] = { + logs.flatMap { log => + log.topicId match { + case None => throw new RuntimeException(s"Topic ${log.name} does not have a topic ID, " + + "which is not allowed when running in KRaft mode.") + case Some(topicId) => + val partitionId = log.topicPartition.partition() + Option(newImage.topics().getPartition(topicId, partitionId)) match { + case None => None + case Some(partition) => if (partition.replicas.contains(brokerId)) { + Some(log.topicPartition) + } else { + None + } + } + } + } + } +} + +class BrokerMetadataPublisher(conf: KafkaConfig, + metadataCache: KRaftMetadataCache, + logManager: LogManager, + replicaManager: ReplicaManager, + groupCoordinator: GroupCoordinator, + txnCoordinator: TransactionCoordinator, + clientQuotaMetadataManager: ClientQuotaMetadataManager, + featureCache: FinalizedFeatureCache, + dynamicConfigHandlers: Map[String, ConfigHandler]) extends MetadataPublisher with Logging { + logIdent = s"[BrokerMetadataPublisher id=${conf.nodeId}] " + + import BrokerMetadataPublisher._ + + /** + * The broker ID. + */ + val brokerId = conf.nodeId + + /** + * True if this is the first time we have published metadata. + */ + var _firstPublish = false + + override def publish(newHighestMetadataOffset: Long, + delta: MetadataDelta, + newImage: MetadataImage): Unit = { + try { + // Publish the new metadata image to the metadata cache. + metadataCache.setImage(newImage) + + if (_firstPublish) { + info(s"Publishing initial metadata at offset ${newHighestMetadataOffset}.") + + // If this is the first metadata update we are applying, initialize the managers + // first (but after setting up the metadata cache). + initializeManagers() + } else if (isDebugEnabled) { + debug(s"Publishing metadata at offset ${newHighestMetadataOffset}.") + } + + // Apply feature deltas. + Option(delta.featuresDelta()).foreach { featuresDelta => + featureCache.update(featuresDelta, newHighestMetadataOffset) + } + + // Apply topic deltas. + Option(delta.topicsDelta()).foreach { topicsDelta => + // Notify the replica manager about changes to topics. + replicaManager.applyDelta(newImage, topicsDelta) + + // Handle the case where the old consumer offsets topic was deleted. + if (topicsDelta.topicWasDeleted(Topic.GROUP_METADATA_TOPIC_NAME)) { + topicsDelta.image().getTopic(Topic.GROUP_METADATA_TOPIC_NAME).partitions().entrySet().forEach { + entry => + if (entry.getValue().leader == brokerId) { + groupCoordinator.onResignation(entry.getKey(), Some(entry.getValue().leaderEpoch)) + } + } + } + // Handle the case where we have new local leaders or followers for the consumer + // offsets topic. + getTopicDelta(Topic.GROUP_METADATA_TOPIC_NAME, newImage, delta).foreach { topicDelta => + topicDelta.newLocalLeaders(brokerId).forEach { + entry => groupCoordinator.onElection(entry.getKey(), entry.getValue().leaderEpoch) + } + topicDelta.newLocalFollowers(brokerId).forEach { + entry => groupCoordinator.onResignation(entry.getKey(), Some(entry.getValue().leaderEpoch)) + } + } + + // Handle the case where the old transaction state topic was deleted. + if (topicsDelta.topicWasDeleted(Topic.TRANSACTION_STATE_TOPIC_NAME)) { + topicsDelta.image().getTopic(Topic.TRANSACTION_STATE_TOPIC_NAME).partitions().entrySet().forEach { + entry => + if (entry.getValue().leader == brokerId) { + txnCoordinator.onResignation(entry.getKey(), Some(entry.getValue().leaderEpoch)) + } + } + } + // If the transaction state topic changed in a way that's relevant to this broker, + // notify the transaction coordinator. + getTopicDelta(Topic.TRANSACTION_STATE_TOPIC_NAME, newImage, delta).foreach { topicDelta => + topicDelta.newLocalLeaders(brokerId).forEach { + entry => txnCoordinator.onElection(entry.getKey(), entry.getValue().leaderEpoch) + } + topicDelta.newLocalFollowers(brokerId).forEach { + entry => txnCoordinator.onResignation(entry.getKey(), Some(entry.getValue().leaderEpoch)) + } + } + + // Notify the group coordinator about deleted topics. + val deletedTopicPartitions = new mutable.ArrayBuffer[TopicPartition]() + topicsDelta.deletedTopicIds().forEach { id => + val topicImage = topicsDelta.image().getTopic(id) + topicImage.partitions().keySet().forEach { + id => deletedTopicPartitions += new TopicPartition(topicImage.name(), id) + } + } + if (!deletedTopicPartitions.isEmpty) { + groupCoordinator.handleDeletedPartitions(deletedTopicPartitions, RequestLocal.NoCaching) + } + } + + // Apply configuration deltas. + Option(delta.configsDelta()).foreach { configsDelta => + configsDelta.changes().keySet().forEach { configResource => + val tag = configResource.`type`() match { + case ConfigResource.Type.TOPIC => Some(ConfigType.Topic) + case ConfigResource.Type.BROKER => Some(ConfigType.Broker) + case _ => None + } + tag.foreach { t => + val newProperties = newImage.configs().configProperties(configResource) + dynamicConfigHandlers(t).processConfigChanges(configResource.name(), newProperties) + } + } + } + + // Apply client quotas delta. + Option(delta.clientQuotasDelta()).foreach { clientQuotasDelta => + clientQuotaMetadataManager.update(clientQuotasDelta) + } + + if (_firstPublish) { + finishInitializingReplicaManager(newImage) + } + } catch { + case t: Throwable => error(s"Error publishing broker metadata at ${newHighestMetadataOffset}", t) + throw t + } finally { + _firstPublish = false + } + } + + private def initializeManagers(): Unit = { + // Start log manager, which will perform (potentially lengthy) + // recovery-from-unclean-shutdown if required. + logManager.startup(metadataCache.getAllTopics()) + + // Start the replica manager. + replicaManager.startup() Review comment: A little surprising to find this here given `BrokerServer` owns the rest of the lifecycle for these managers. Not sure if it would be any better with a callback to `BrokerServer`? ########## File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala ########## @@ -127,6 +131,22 @@ class FinalizedFeatureCache(private val brokerFeatures: BrokerFeatures) extends } } + def update(featuresDelta: FeaturesDelta, highestMetadataOffset: Long): Unit = { + val features = featuresAndEpoch.getOrElse( + FinalizedFeaturesAndEpoch(Features.emptyFinalizedFeatures(), -1)) + val newFeatures = new util.HashMap[String, FinalizedVersionRange]() + newFeatures.putAll(features.features.features()) + featuresDelta.changes().entrySet().forEach { e => + e.getValue().asScala match { + case None => newFeatures.remove(e.getKey) + case Some(feature) => newFeatures.put(e.getKey, + new FinalizedVersionRange(feature.min(), feature.max())) + } + } + featuresAndEpoch = Some(FinalizedFeaturesAndEpoch( + Features.finalizedFeatures(newFeatures), highestMetadataOffset)) Review comment: Wonder if it's worth wrapping `newFeatures` in an unmodifiable map. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org