This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 0f83c4c KAFKA-7976; Update config before notifying controller of unclean leader update (#6426) 0f83c4c is described below commit 0f83c4cdb76c2123a4c365ebcdd9cfb7bf711a45 Author: Rajini Sivaram <rajinisiva...@googlemail.com> AuthorDate: Tue Mar 12 09:47:05 2019 +0000 KAFKA-7976; Update config before notifying controller of unclean leader update (#6426) When unclean leader election is enabled dynamically on brokers, we notify controller of the update before updating KafkaConfig. When processing this event, controller's decision to elect unclean leaders is based on the current KafkaConfig, so there is a small timing window when the controller may not elect unclean leader because KafkaConfig of the server was not yet updated. The commit fixes this timing window by using the existing BrokerReconfigurable interface used by other classes [...] Reviewers: Manikumar Reddy <manikumar.re...@gmail.com> --- .../scala/kafka/server/DynamicBrokerConfig.scala | 31 +++++++++++++++------- .../server/DynamicBrokerReconfigurationTest.scala | 4 ++- 2 files changed, 24 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index 1c56572..b02b842 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -204,13 +204,25 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging brokerReconfigurables.clear() } + /** + * Add reconfigurables to be notified when a dynamic broker config is updated. + * + * `Reconfigurable` is the public API used by configurable plugins like metrics reporter + * and quota callbacks. These are reconfigured before `KafkaConfig` is updated so that + * the update can be aborted if `reconfigure()` fails with an exception. + * + * `BrokerReconfigurable` is used for internal reconfigurable classes. These are + * reconfigured after `KafkaConfig` is updated so that they can access `KafkaConfig` + * directly. They are provided both old and new configs. + */ def addReconfigurables(kafkaServer: KafkaServer): Unit = { + addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, kafkaServer)) + addReconfigurable(new DynamicClientQuotaCallback(kafkaConfig.brokerId, kafkaServer)) + addBrokerReconfigurable(new DynamicThreadPool(kafkaServer)) if (kafkaServer.logManager.cleaner != null) addBrokerReconfigurable(kafkaServer.logManager.cleaner) - addReconfigurable(new DynamicLogConfig(kafkaServer.logManager, kafkaServer)) - addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, kafkaServer)) - addReconfigurable(new DynamicClientQuotaCallback(kafkaConfig.brokerId, kafkaServer)) + addBrokerReconfigurable(new DynamicLogConfig(kafkaServer.logManager, kafkaServer)) addBrokerReconfigurable(new DynamicListenerConfig(kafkaServer)) addBrokerReconfigurable(new DynamicConnectionQuota(kafkaServer)) } @@ -565,25 +577,24 @@ object DynamicLogConfig { val ReconfigurableConfigs = LogConfig.TopicConfigSynonyms.values.toSet -- ExcludedConfigs val KafkaConfigToLogConfigName = LogConfig.TopicConfigSynonyms.map { case (k, v) => (v, k) } } -class DynamicLogConfig(logManager: LogManager, server: KafkaServer) extends Reconfigurable with Logging { - override def configure(configs: util.Map[String, _]): Unit = {} +class DynamicLogConfig(logManager: LogManager, server: KafkaServer) extends BrokerReconfigurable with Logging { - override def reconfigurableConfigs(): util.Set[String] = { - DynamicLogConfig.ReconfigurableConfigs.asJava + override def reconfigurableConfigs: Set[String] = { + DynamicLogConfig.ReconfigurableConfigs } - override def validateReconfiguration(configs: util.Map[String, _]): Unit = { + override def validateReconfiguration(newConfig: KafkaConfig): Unit = { // For update of topic config overrides, only config names and types are validated // Names and types have already been validated. For consistency with topic config // validation, no additional validation is performed. } - override def reconfigure(configs: util.Map[String, _]): Unit = { + override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = { val currentLogConfig = logManager.currentDefaultConfig val origUncleanLeaderElectionEnable = logManager.currentDefaultConfig.uncleanLeaderElectionEnable val newBrokerDefaults = new util.HashMap[String, Object](currentLogConfig.originals) - configs.asScala.filterKeys(DynamicLogConfig.ReconfigurableConfigs.contains).foreach { case (k, v) => + newConfig.valuesFromThisConfig.asScala.filterKeys(DynamicLogConfig.ReconfigurableConfigs.contains).foreach { case (k, v) => if (v != null) { DynamicLogConfig.KafkaConfigToLogConfigName.get(k).foreach { configName => newBrokerDefaults.put(configName, v.asInstanceOf[AnyRef]) diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index 798961e..c28fcea 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -391,7 +391,9 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet // Verify that configs of existing logs have been updated val newLogConfig = LogConfig(KafkaServer.copyKafkaConfigToLog(servers.head.config)) - assertEquals(newLogConfig, servers.head.logManager.currentDefaultConfig) + TestUtils.waitUntilTrue(() => servers.head.logManager.currentDefaultConfig == newLogConfig, + "Config not updated in LogManager") + val log = servers.head.logManager.getLog(new TopicPartition(topic, 0)).getOrElse(throw new IllegalStateException("Log not found")) TestUtils.waitUntilTrue(() => log.config.segmentSize == 4000, "Existing topic config using defaults not updated") props.asScala.foreach { case (k, v) =>