This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0f83c4c  KAFKA-7976; Update config before notifying controller of 
unclean leader update (#6426)
0f83c4c is described below

commit 0f83c4cdb76c2123a4c365ebcdd9cfb7bf711a45
Author: Rajini Sivaram <rajinisiva...@googlemail.com>
AuthorDate: Tue Mar 12 09:47:05 2019 +0000

    KAFKA-7976; Update config before notifying controller of unclean leader 
update (#6426)
    
    When unclean leader election is enabled dynamically on brokers, we notify 
controller of the update before updating KafkaConfig. When processing this 
event, controller's decision to elect unclean leaders is based on the current 
KafkaConfig, so there is a small timing window when the controller may not 
elect unclean leader because KafkaConfig of the server was not yet updated. The 
commit fixes this timing window by using the existing BrokerReconfigurable 
interface used by other classes  [...]
    
    Reviewers: Manikumar Reddy <manikumar.re...@gmail.com>
---
 .../scala/kafka/server/DynamicBrokerConfig.scala   | 31 +++++++++++++++-------
 .../server/DynamicBrokerReconfigurationTest.scala  |  4 ++-
 2 files changed, 24 insertions(+), 11 deletions(-)

diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala 
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 1c56572..b02b842 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -204,13 +204,25 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
     brokerReconfigurables.clear()
   }
 
+  /**
+   * Add reconfigurables to be notified when a dynamic broker config is 
updated.
+   *
+   * `Reconfigurable` is the public API used by configurable plugins like 
metrics reporter
+   * and quota callbacks. These are reconfigured before `KafkaConfig` is 
updated so that
+   * the update can be aborted if `reconfigure()` fails with an exception.
+   *
+   * `BrokerReconfigurable` is used for internal reconfigurable classes. These 
are
+   * reconfigured after `KafkaConfig` is updated so that they can access 
`KafkaConfig`
+   * directly. They are provided both old and new configs.
+   */
   def addReconfigurables(kafkaServer: KafkaServer): Unit = {
+    addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, 
kafkaServer))
+    addReconfigurable(new DynamicClientQuotaCallback(kafkaConfig.brokerId, 
kafkaServer))
+
     addBrokerReconfigurable(new DynamicThreadPool(kafkaServer))
     if (kafkaServer.logManager.cleaner != null)
       addBrokerReconfigurable(kafkaServer.logManager.cleaner)
-    addReconfigurable(new DynamicLogConfig(kafkaServer.logManager, 
kafkaServer))
-    addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, 
kafkaServer))
-    addReconfigurable(new DynamicClientQuotaCallback(kafkaConfig.brokerId, 
kafkaServer))
+    addBrokerReconfigurable(new DynamicLogConfig(kafkaServer.logManager, 
kafkaServer))
     addBrokerReconfigurable(new DynamicListenerConfig(kafkaServer))
     addBrokerReconfigurable(new DynamicConnectionQuota(kafkaServer))
   }
@@ -565,25 +577,24 @@ object DynamicLogConfig {
   val ReconfigurableConfigs = LogConfig.TopicConfigSynonyms.values.toSet -- 
ExcludedConfigs
   val KafkaConfigToLogConfigName = LogConfig.TopicConfigSynonyms.map { case 
(k, v) => (v, k) }
 }
-class DynamicLogConfig(logManager: LogManager, server: KafkaServer) extends 
Reconfigurable with Logging {
 
-  override def configure(configs: util.Map[String, _]): Unit = {}
+class DynamicLogConfig(logManager: LogManager, server: KafkaServer) extends 
BrokerReconfigurable with Logging {
 
-  override def reconfigurableConfigs(): util.Set[String] = {
-    DynamicLogConfig.ReconfigurableConfigs.asJava
+  override def reconfigurableConfigs: Set[String] = {
+    DynamicLogConfig.ReconfigurableConfigs
   }
 
-  override def validateReconfiguration(configs: util.Map[String, _]): Unit = {
+  override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
     // For update of topic config overrides, only config names and types are 
validated
     // Names and types have already been validated. For consistency with topic 
config
     // validation, no additional validation is performed.
   }
 
-  override def reconfigure(configs: util.Map[String, _]): Unit = {
+  override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): 
Unit = {
     val currentLogConfig = logManager.currentDefaultConfig
     val origUncleanLeaderElectionEnable = 
logManager.currentDefaultConfig.uncleanLeaderElectionEnable
     val newBrokerDefaults = new util.HashMap[String, 
Object](currentLogConfig.originals)
-    
configs.asScala.filterKeys(DynamicLogConfig.ReconfigurableConfigs.contains).foreach
 { case (k, v) =>
+    
newConfig.valuesFromThisConfig.asScala.filterKeys(DynamicLogConfig.ReconfigurableConfigs.contains).foreach
 { case (k, v) =>
       if (v != null) {
         DynamicLogConfig.KafkaConfigToLogConfigName.get(k).foreach { 
configName =>
           newBrokerDefaults.put(configName, v.asInstanceOf[AnyRef])
diff --git 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 798961e..c28fcea 100644
--- 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -391,7 +391,9 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
 
     // Verify that configs of existing logs have been updated
     val newLogConfig = 
LogConfig(KafkaServer.copyKafkaConfigToLog(servers.head.config))
-    assertEquals(newLogConfig, servers.head.logManager.currentDefaultConfig)
+    TestUtils.waitUntilTrue(() => servers.head.logManager.currentDefaultConfig 
== newLogConfig,
+      "Config not updated in LogManager")
+
     val log = servers.head.logManager.getLog(new TopicPartition(topic, 
0)).getOrElse(throw new IllegalStateException("Log not found"))
     TestUtils.waitUntilTrue(() => log.config.segmentSize == 4000, "Existing 
topic config using defaults not updated")
     props.asScala.foreach { case (k, v) =>

Reply via email to