This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch 3.4 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit e94a25173dc469da650f4d620ab312a1363242a2 Author: Colin Patrick McCabe <[email protected]> AuthorDate: Wed Dec 7 10:43:34 2022 -0800 MINOR: Move dynamic config logic to DynamicConfigPublisher (#12958) Split out the logic for applying dynamic configurations to a KafkaConfig object from BrokerMetadataPublisher into a new class, DynamicConfigPublisher. This will allow the ControllerServer to also run this code, in a follow-up change. Create separate KafkaConfig objects in BrokerServer versus ControllerServer. This is necessary because the controller will apply configuration changes as soon as its raft client catches up to the high water mark, whereas the broker will wait for the active controller to acknowledge it has caught up in a heartbeat response. So when running in combined mode, we want two separate KafkaConfig objects that are changed at different times. Minor changes: improve the error message when catching up broker metadata fails. Fix incorrect indentation in checkstyle/import-control.xml. Invoke AppInfoParser.unregisterAppInfo from SharedServer.stop so that it happens only when both the controller and broker have shut down. Reviewers: David Arthur <[email protected]> --- checkstyle/import-control.xml | 2 +- .../src/main/scala/kafka/server/BrokerServer.scala | 24 +++-- .../main/scala/kafka/server/ControllerServer.scala | 6 +- .../src/main/scala/kafka/server/SharedServer.scala | 22 +++-- .../server/metadata/BrokerMetadataPublisher.scala | 61 +----------- .../server/metadata/DynamicConfigPublisher.scala | 103 +++++++++++++++++++++ .../metadata/BrokerMetadataPublisherTest.scala | 45 +++------ 7 files changed, 150 insertions(+), 113 deletions(-) diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 0609ee7fd6c..bd05521964e 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -306,7 +306,7 @@ </subpackage> <subpackage name="queue"> - <allow pkg="org.apache.kafka.test" /> + <allow pkg="org.apache.kafka.test" /> </subpackage> <subpackage name="clients"> diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index d6b4fa92c3a..f55ceebffcc 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -30,7 +30,7 @@ import kafka.network.{DataPlaneAcceptor, SocketServer} import kafka.raft.KafkaRaftManager import kafka.security.CredentialProvider import kafka.server.KafkaRaftServer.ControllerRole -import kafka.server.metadata.{BrokerMetadataListener, BrokerMetadataPublisher, BrokerMetadataSnapshotter, ClientQuotaMetadataManager, KRaftMetadataCache, SnapshotWriterBuilder} +import kafka.server.metadata.{BrokerMetadataListener, BrokerMetadataPublisher, BrokerMetadataSnapshotter, ClientQuotaMetadataManager, DynamicConfigPublisher, KRaftMetadataCache, SnapshotWriterBuilder} import kafka.utils.{CoreUtils, KafkaScheduler} import org.apache.kafka.common.feature.SupportedVersionRange import org.apache.kafka.common.message.ApiMessageType.ListenerType @@ -39,7 +39,7 @@ import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.security.scram.internals.ScramMechanism import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache -import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time, Utils} +import org.apache.kafka.common.utils.{LogContext, Time, Utils} import org.apache.kafka.common.{ClusterResource, Endpoint} import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer import org.apache.kafka.metadata.{BrokerState, VersionRange} @@ -71,7 +71,7 @@ class BrokerServer( val initialOfflineDirs: Seq[String], ) extends KafkaBroker { val threadNamePrefix = sharedServer.threadNamePrefix - val config = sharedServer.config + val config = sharedServer.brokerConfig val time = sharedServer.time def metrics = sharedServer.metrics @@ -420,8 +420,13 @@ class BrokerServer( config.numIoThreads, s"${DataPlaneAcceptor.MetricPrefix}RequestHandlerAvgIdlePercent", DataPlaneAcceptor.ThreadPrefix) - // Block until we've caught up with the latest metadata from the controller quorum. - lifecycleManager.initialCatchUpFuture.get() + info("Waiting for broker metadata to catch up.") + try { + lifecycleManager.initialCatchUpFuture.get() + } catch { + case t: Throwable => throw new RuntimeException("Received a fatal error while " + + "waiting for the broker to catch up with the current cluster metadata.", t) + } // Apply the metadata log changes that we've accumulated. metadataPublisher = new BrokerMetadataPublisher(config, @@ -431,7 +436,11 @@ class BrokerServer( groupCoordinator, transactionCoordinator, clientQuotaMetadataManager, - dynamicConfigHandlers.toMap, + new DynamicConfigPublisher( + config, + sharedServer.metadataPublishingFaultHandler, + dynamicConfigHandlers.toMap, + "broker"), authorizer, sharedServer.initialBrokerMetadataLoadFaultHandler, sharedServer.metadataPublishingFaultHandler) @@ -567,9 +576,8 @@ class BrokerServer( isShuttingDown.set(false) CoreUtils.swallow(lifecycleManager.close(), this) + CoreUtils.swallow(config.dynamicConfig.clear(), this) sharedServer.stopForBroker() - - CoreUtils.swallow(AppInfoParser.unregisterAppInfo(MetricsPrefix, config.nodeId.toString, metrics), this) info("shut down completed") } catch { case e: Throwable => diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index f73088b30f0..2bd518cde2a 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -59,14 +59,12 @@ class ControllerServer( import kafka.server.Server._ - val config = sharedServer.config + val config = sharedServer.controllerConfig val time = sharedServer.time def metrics = sharedServer.metrics val threadNamePrefix = sharedServer.threadNamePrefix.getOrElse("") def raftManager: KafkaRaftManager[ApiMessageAndVersion] = sharedServer.raftManager - config.dynamicConfig.initialize(zkClientOpt = None) - val lock = new ReentrantLock() val awaitShutdownCond = lock.newCondition() var status: ProcessStatus = SHUTDOWN @@ -109,6 +107,7 @@ class ControllerServer( if (!maybeChangeStatus(SHUTDOWN, STARTING)) return try { info("Starting controller") + config.dynamicConfig.initialize(zkClientOpt = None) maybeChangeStatus(STARTING, STARTED) this.logIdent = new LogContext(s"[ControllerServer id=${config.nodeId}] ").logPrefix() @@ -284,6 +283,7 @@ class ControllerServer( createTopicPolicy.foreach(policy => CoreUtils.swallow(policy.close(), this)) alterConfigPolicy.foreach(policy => CoreUtils.swallow(policy.close(), this)) socketServerFirstBoundPortFuture.completeExceptionally(new RuntimeException("shutting down")) + CoreUtils.swallow(config.dynamicConfig.clear(), this) sharedServer.stopForController() } catch { case e: Throwable => diff --git a/core/src/main/scala/kafka/server/SharedServer.scala b/core/src/main/scala/kafka/server/SharedServer.scala index a420c9afa38..8b647e7464f 100644 --- a/core/src/main/scala/kafka/server/SharedServer.scala +++ b/core/src/main/scala/kafka/server/SharedServer.scala @@ -19,10 +19,11 @@ package kafka.server import kafka.raft.KafkaRaftManager import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole} +import kafka.server.Server.MetricsPrefix import kafka.server.metadata.BrokerServerMetrics import kafka.utils.{CoreUtils, Logging} import org.apache.kafka.common.metrics.Metrics -import org.apache.kafka.common.utils.{LogContext, Time} +import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time} import org.apache.kafka.controller.QuorumControllerMetrics import org.apache.kafka.metadata.MetadataRecordSerde import org.apache.kafka.raft.RaftConfig.AddressSpec @@ -77,7 +78,7 @@ class StandardFaultHandlerFactory extends FaultHandlerFactory { * make debugging easier and reduce the chance of resource leaks. */ class SharedServer( - val config: KafkaConfig, + private val sharedServerConfig: KafkaConfig, val metaProps: MetaProperties, val time: Time, private val _metrics: Metrics, @@ -85,11 +86,13 @@ class SharedServer( val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]], val faultHandlerFactory: FaultHandlerFactory ) extends Logging { - private val logContext: LogContext = new LogContext(s"[SharedServer id=${config.nodeId}] ") + private val logContext: LogContext = new LogContext(s"[SharedServer id=${sharedServerConfig.nodeId}] ") this.logIdent = logContext.logPrefix private var started = false private var usedByBroker: Boolean = false private var usedByController: Boolean = false + val brokerConfig = new KafkaConfig(sharedServerConfig.props, false, None) + val controllerConfig = new KafkaConfig(sharedServerConfig.props, false, None) @volatile var metrics: Metrics = _metrics @volatile var raftManager: KafkaRaftManager[ApiMessageAndVersion] = _ @volatile var brokerMetrics: BrokerServerMetrics = _ @@ -143,7 +146,7 @@ class SharedServer( * The fault handler to use when metadata loading fails. */ def metadataLoaderFaultHandler: FaultHandler = faultHandlerFactory.build("metadata loading", - fatal = config.processRoles.contains(ControllerRole), + fatal = sharedServerConfig.processRoles.contains(ControllerRole), action = () => SharedServer.this.synchronized { if (brokerMetrics != null) brokerMetrics.metadataLoadErrorCount.getAndIncrement() if (controllerMetrics != null) controllerMetrics.incrementMetadataErrorCount() @@ -188,17 +191,17 @@ class SharedServer( // This is only done in tests. metrics = new Metrics() } - config.dynamicConfig.initialize(zkClientOpt = None) + sharedServerConfig.dynamicConfig.initialize(zkClientOpt = None) - if (config.processRoles.contains(BrokerRole)) { + if (sharedServerConfig.processRoles.contains(BrokerRole)) { brokerMetrics = BrokerServerMetrics(metrics) } - if (config.processRoles.contains(ControllerRole)) { + if (sharedServerConfig.processRoles.contains(ControllerRole)) { controllerMetrics = new QuorumControllerMetrics(KafkaYammerMetrics.defaultRegistry(), time) } raftManager = new KafkaRaftManager[ApiMessageAndVersion]( metaProps, - config, + sharedServerConfig, new MetadataRecordSerde, KafkaRaftServer.MetadataPartition, KafkaRaftServer.MetadataTopicId, @@ -248,8 +251,7 @@ class SharedServer( CoreUtils.swallow(metrics.close(), this) metrics = null } - // Clear all reconfigurable instances stored in DynamicBrokerConfig - CoreUtils.swallow(config.dynamicConfig.clear(), this) + CoreUtils.swallow(AppInfoParser.unregisterAppInfo(MetricsPrefix, sharedServerConfig.nodeId.toString, metrics), this) started = false } } diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala index 0192bb4afcf..933a6bf8924 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala @@ -22,11 +22,9 @@ import java.util.concurrent.atomic.AtomicLong import kafka.coordinator.group.GroupCoordinator import kafka.coordinator.transaction.TransactionCoordinator import kafka.log.{LogManager, UnifiedLog} -import kafka.server.ConfigAdminManager.toLoggableProps -import kafka.server.{ConfigEntityName, ConfigHandler, ConfigType, KafkaConfig, ReplicaManager, RequestLocal} +import kafka.server.{KafkaConfig, ReplicaManager, RequestLocal} import kafka.utils.Logging import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, TOPIC} import org.apache.kafka.common.internals.Topic import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta, TopicsImage} import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer @@ -103,7 +101,7 @@ class BrokerMetadataPublisher( groupCoordinator: GroupCoordinator, txnCoordinator: TransactionCoordinator, clientQuotaMetadataManager: ClientQuotaMetadataManager, - dynamicConfigHandlers: Map[String, ConfigHandler], + var dynamicConfigPublisher: DynamicConfigPublisher, private val _authorizer: Option[Authorizer], fatalFaultHandler: FaultHandler, metadataPublishingFaultHandler: FaultHandler @@ -211,61 +209,10 @@ class BrokerMetadataPublisher( } // Apply configuration deltas. - Option(delta.configsDelta()).foreach { configsDelta => - configsDelta.changes().keySet().forEach { resource => - val props = newImage.configs().configProperties(resource) - resource.`type`() match { - case TOPIC => - try { - // Apply changes to a topic's dynamic configuration. - info(s"Updating topic ${resource.name()} with new configuration : " + - toLoggableProps(resource, props).mkString(",")) - dynamicConfigHandlers(ConfigType.Topic). - processConfigChanges(resource.name(), props) - } catch { - case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating topic " + - s"${resource.name()} with new configuration: ${toLoggableProps(resource, props).mkString(",")} " + - s"in ${deltaName}", t) - } - case BROKER => - if (resource.name().isEmpty) { - try { - // Apply changes to "cluster configs" (also known as default BROKER configs). - // These are stored in KRaft with an empty name field. - info("Updating cluster configuration : " + - toLoggableProps(resource, props).mkString(",")) - dynamicConfigHandlers(ConfigType.Broker). - processConfigChanges(ConfigEntityName.Default, props) - } catch { - case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating " + - s"cluster with new configuration: ${toLoggableProps(resource, props).mkString(",")} " + - s"in ${deltaName}", t) - } - } else if (resource.name() == brokerId.toString) { - try { - // Apply changes to this broker's dynamic configuration. - info(s"Updating broker $brokerId with new configuration : " + - toLoggableProps(resource, props).mkString(",")) - dynamicConfigHandlers(ConfigType.Broker). - processConfigChanges(resource.name(), props) - // When applying a per broker config (not a cluster config), we also - // reload any associated file. For example, if the ssl.keystore is still - // set to /tmp/foo, we still want to reload /tmp/foo in case its contents - // have changed. This doesn't apply to topic configs or cluster configs. - reloadUpdatedFilesWithoutConfigChange(props) - } catch { - case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating " + - s"broker with new configuration: ${toLoggableProps(resource, props).mkString(",")} " + - s"in ${deltaName}", t) - } - } - case _ => // nothing to do - } - } - } + dynamicConfigPublisher.publish(delta, newImage) + // Apply client quotas delta. try { - // Apply client quotas delta. Option(delta.clientQuotasDelta()).foreach { clientQuotasDelta => clientQuotaMetadataManager.update(clientQuotasDelta) } diff --git a/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala b/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala new file mode 100644 index 00000000000..12ff51d4039 --- /dev/null +++ b/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server.metadata + +import java.util.Properties +import kafka.server.ConfigAdminManager.toLoggableProps +import kafka.server.{ConfigEntityName, ConfigHandler, ConfigType, KafkaConfig} +import kafka.utils.Logging +import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, TOPIC} +import org.apache.kafka.image.{MetadataDelta, MetadataImage} +import org.apache.kafka.server.fault.FaultHandler + + +class DynamicConfigPublisher( + conf: KafkaConfig, + faultHandler: FaultHandler, + dynamicConfigHandlers: Map[String, ConfigHandler], + nodeType: String +) extends Logging { + logIdent = s"[DynamicConfigPublisher nodeType=${nodeType} id=${conf.nodeId}] " + + def publish(delta: MetadataDelta, newImage: MetadataImage): Unit = { + val deltaName = s"MetadataDelta up to ${newImage.highestOffsetAndEpoch().offset}" + try { + // Apply configuration deltas. + Option(delta.configsDelta()).foreach { configsDelta => + configsDelta.changes().keySet().forEach { resource => + val props = newImage.configs().configProperties(resource) + resource.`type`() match { + case TOPIC => + dynamicConfigHandlers.get(ConfigType.Topic).foreach(topicConfigHandler => + try { + // Apply changes to a topic's dynamic configuration. + info(s"Updating topic ${resource.name()} with new configuration : " + + toLoggableProps(resource, props).mkString(",")) + topicConfigHandler.processConfigChanges(resource.name(), props) + } catch { + case t: Throwable => faultHandler.handleFault("Error updating topic " + + s"${resource.name()} with new configuration: ${toLoggableProps(resource, props).mkString(",")} " + + s"in ${deltaName}", t) + } + ) + case BROKER => + dynamicConfigHandlers.get(ConfigType.Broker).foreach(nodeConfigHandler => + if (resource.name().isEmpty) { + try { + // Apply changes to "cluster configs" (also known as default BROKER configs). + // These are stored in KRaft with an empty name field. + info("Updating cluster configuration : " + + toLoggableProps(resource, props).mkString(",")) + nodeConfigHandler.processConfigChanges(ConfigEntityName.Default, props) + } catch { + case t: Throwable => faultHandler.handleFault("Error updating " + + s"cluster with new configuration: ${toLoggableProps(resource, props).mkString(",")} " + + s"in ${deltaName}", t) + } + } else if (resource.name() == conf.nodeId.toString) { + try { + // Apply changes to this node's dynamic configuration. + info(s"Updating node ${conf.nodeId} with new configuration : " + + toLoggableProps(resource, props).mkString(",")) + nodeConfigHandler.processConfigChanges(resource.name(), props) + // When applying a per node config (not a cluster config), we also + // reload any associated file. For example, if the ssl.keystore is still + // set to /tmp/foo, we still want to reload /tmp/foo in case its contents + // have changed. This doesn't apply to topic configs or cluster configs. + reloadUpdatedFilesWithoutConfigChange(props) + } catch { + case t: Throwable => faultHandler.handleFault("Error updating " + + s"node with new configuration: ${toLoggableProps(resource, props).mkString(",")} " + + s"in ${deltaName}", t) + } + } + ) + case _ => // nothing to do + } + } + } + } catch { + case t: Throwable => faultHandler.handleFault("Uncaught exception while " + + s"publishing dynamic configuration changes from ${deltaName}", t) + } + } + + def reloadUpdatedFilesWithoutConfigChange(props: Properties): Unit = { + conf.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(props) + } +} diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala index 8874a235a52..317da428888 100644 --- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala +++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala @@ -17,14 +17,11 @@ package kafka.server.metadata -import kafka.coordinator.group.GroupCoordinator -import kafka.coordinator.transaction.TransactionCoordinator - import java.util.Collections.{singleton, singletonList, singletonMap} import java.util.Properties import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} -import kafka.log.{LogManager, UnifiedLog} -import kafka.server.{BrokerServer, KafkaConfig, ReplicaManager} +import kafka.log.UnifiedLog +import kafka.server.{BrokerServer, KafkaConfig} import kafka.testkit.{KafkaClusterTestKit, TestKitNodes} import kafka.utils.TestUtils import org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET @@ -36,7 +33,7 @@ import org.apache.kafka.common.{TopicPartition, Uuid} import org.apache.kafka.image.{MetadataImageTest, TopicImage, TopicsImage} import org.apache.kafka.metadata.LeaderRecoveryState import org.apache.kafka.metadata.PartitionRegistration -import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler} +import org.apache.kafka.server.fault.FaultHandler import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertTrue} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.mockito.ArgumentMatchers.any @@ -179,29 +176,15 @@ class BrokerMetadataPublisherTest { new TopicsImage(idsMap.asJava, namesMap.asJava) } - private def newMockPublisher( + private def newMockDynamicConfigPublisher( broker: BrokerServer, - logManager: LogManager, - replicaManager: ReplicaManager, - groupCoordinator: GroupCoordinator, - txnCoordinator: TransactionCoordinator, - errorHandler: FaultHandler = new MockFaultHandler("publisher") - ): BrokerMetadataPublisher = { - val mockLogManager = Mockito.mock(classOf[LogManager]) - Mockito.when(mockLogManager.allLogs).thenReturn(Iterable.empty[UnifiedLog]) - Mockito.spy(new BrokerMetadataPublisher( + errorHandler: FaultHandler + ): DynamicConfigPublisher = { + Mockito.spy(new DynamicConfigPublisher( conf = broker.config, - metadataCache = broker.metadataCache, - logManager, - replicaManager, - groupCoordinator, - txnCoordinator, - clientQuotaMetadataManager = broker.clientQuotaMetadataManager, + faultHandler = errorHandler, dynamicConfigHandlers = broker.dynamicConfigHandlers.toMap, - _authorizer = Option.empty, - errorHandler, - errorHandler - )) + nodeType = "broker")) } @Test @@ -215,20 +198,14 @@ class BrokerMetadataPublisherTest { cluster.startup() cluster.waitForReadyBrokers() val broker = cluster.brokers().values().iterator().next() - val mockLogManager = Mockito.mock(classOf[LogManager]) - Mockito.when(mockLogManager.allLogs).thenReturn(Iterable.empty[UnifiedLog]) - val mockReplicaManager = Mockito.mock(classOf[ReplicaManager]) - val mockGroupCoordinator = Mockito.mock(classOf[GroupCoordinator]) - val mockTxnCoordinator = Mockito.mock(classOf[TransactionCoordinator]) - - val publisher = newMockPublisher(broker, mockLogManager, mockReplicaManager, mockGroupCoordinator, mockTxnCoordinator) + val publisher = newMockDynamicConfigPublisher(broker, cluster.nonFatalFaultHandler()) val numTimesReloadCalled = new AtomicInteger(0) Mockito.when(publisher.reloadUpdatedFilesWithoutConfigChange(any[Properties]())). thenAnswer(new Answer[Unit]() { override def answer(invocation: InvocationOnMock): Unit = numTimesReloadCalled.addAndGet(1) }) - broker.metadataListener.alterPublisher(publisher).get() + broker.metadataPublisher.dynamicConfigPublisher = publisher val admin = Admin.create(cluster.clientProperties()) try { assertEquals(0, numTimesReloadCalled.get())
