[
https://issues.apache.org/jira/browse/KAFKA-6680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16405544#comment-16405544
]
ASF GitHub Bot commented on KAFKA-6680:
---
rajinisivaram closed pull request #4731: KAFKA-6680: Fix issues related to
Dynamic Broker configs
URL: https://github.com/apache/kafka/pull/4731
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 3236af01bbb..92fd5d73136 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -137,6 +137,7 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
private val dynamicConfigPasswordEncoder =
maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret)
private[server] def initialize(zkClient: KafkaZkClient): Unit = {
+currentConfig = new KafkaConfig(kafkaConfig.props, false, None)
val adminZkClient = new AdminZkClient(zkClient)
updateDefaultConfig(adminZkClient.fetchEntityConfig(ConfigType.Broker,
ConfigEntityName.Default))
val props = adminZkClient.fetchEntityConfig(ConfigType.Broker,
kafkaConfig.brokerId.toString)
@@ -719,8 +720,8 @@ class DynamicListenerConfig(server: KafkaServer) extends
BrokerReconfigurable wi
val oldListeners = listenersToMap(oldConfig.listeners)
if (!newAdvertisedListeners.keySet.subsetOf(newListeners.keySet))
throw new ConfigException(s"Advertised listeners
'$newAdvertisedListeners' must be a subset of listeners '$newListeners'")
-if (newListeners.keySet != newConfig.listenerSecurityProtocolMap.keySet)
- throw new ConfigException(s"Listeners '$newListeners' and listener map
'${newConfig.listenerSecurityProtocolMap}' don't match")
+if
(!newListeners.keySet.subsetOf(newConfig.listenerSecurityProtocolMap.keySet))
+ throw new ConfigException(s"Listeners '$newListeners' must be subset of
listener map '${newConfig.listenerSecurityProtocolMap}'")
newListeners.keySet.intersect(oldListeners.keySet).foreach { listenerName
=>
val prefix = listenerName.configPrefix
val newListenerProps = immutableListenerConfigs(newConfig, prefix)
diff --git
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 2e3e2742dbc..bca98d2cee0 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -24,10 +24,12 @@ import kafka.utils.TestUtils
import org.apache.kafka.common.Reconfigurable
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.config.{ConfigException, SslConfigs}
+import org.easymock.EasyMock
import org.junit.Assert._
import org.junit.Test
import scala.collection.JavaConverters._
+import scala.collection.Set
class DynamicBrokerConfigTest {
@@ -248,4 +250,56 @@ class DynamicBrokerConfigTest {
newConfigWithNewSecret.dynamicConfig.updateBrokerConfig(0, persistedProps)
assertEquals("staticLoginModule required;",
newConfigWithNewSecret.values.get(KafkaConfig.SaslJaasConfigProp).asInstanceOf[Password].value)
}
+
+ @Test
+ def testDynamicListenerConfig(): Unit = {
+val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port
= 9092)
+val oldConfig = KafkaConfig.fromProps(props)
+val kafkaServer = EasyMock.createMock(classOf[kafka.server.KafkaServer])
+EasyMock.expect(kafkaServer.config).andReturn(oldConfig).anyTimes()
+EasyMock.replay(kafkaServer)
+
+props.put(KafkaConfig.ListenersProp,
"PLAINTEXT://hostname:9092,SASL_PLAINTEXT://hostname:9093")
+val newConfig = KafkaConfig(props)
+
+val dynamicListenerConfig = new DynamicListenerConfig(kafkaServer)
+dynamicListenerConfig.validateReconfiguration(newConfig)
+ }
+
+ @Test
+ def testDynamicConfigInitializationWithoutConfigsInZK(): Unit = {
+val zkClient = EasyMock.createMock(classOf[kafka.zk.KafkaZkClient])
+EasyMock.expect(zkClient.getEntityConfigs(EasyMock.anyString(),
EasyMock.anyString())).andReturn(new java.util.Properties()).anyTimes()
+EasyMock.replay(zkClient)
+
+val oldConfig = KafkaConfig.fromProps(TestUtils.createBrokerConfig(0,
TestUtils.MockZkConnect, port = 9092))
+val dynamicBrokerConfig = new DynamicBrokerConfig(oldConfig)
+dynamicBrokerConfig.initialize(zkClient)
+dynamicBrokerConfig.addBrokerReconfigurable(new TestDynamicThreadPool)
+
+val newprops = new Properties()
+newprops.put(KafkaConfig.NumIoThreadsProp,