This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 155f2c0 KAFKA-10803: Fix improper removal of bad dynamic config
(#9682)
155f2c0 is described below
commit 155f2c06fbadc7a1a7a15cd2a2d5c2b7e72de0eb
Author: Prateek Agarwal <[email protected]>
AuthorDate: Fri Dec 4 09:29:39 2020 +0530
KAFKA-10803: Fix improper removal of bad dynamic config (#9682)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../scala/kafka/server/DynamicBrokerConfig.scala | 2 +-
.../kafka/server/DynamicBrokerConfigTest.scala | 27 ++++++++++++++++++++++
2 files changed, 28 insertions(+), 1 deletion(-)
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 6bec3c2..7563b38 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -467,7 +467,7 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
case _: Exception => true
}
}
- invalidProps.foreach(props.remove)
+ invalidProps.keys.foreach(props.remove)
val configSource = if (perBrokerConfig) "broker" else "default cluster"
error(s"Dynamic $configSource config contains invalid values:
$invalidProps, these configs will be ignored", e)
}
diff --git
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 0692bf3..ef115db 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -400,6 +400,33 @@ class DynamicBrokerConfigTest {
newprops.put(KafkaConfig.BackgroundThreadsProp, "100")
dynamicBrokerConfig.updateBrokerConfig(0, newprops)
}
+
+ @Test
+ def testImproperConfigsAreRemoved(): Unit = {
+ val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+ val configs = KafkaConfig(props)
+
+ assertEquals(Defaults.MaxConnections, configs.maxConnections)
+ assertEquals(Defaults.MessageMaxBytes, configs.messageMaxBytes)
+
+ var newProps = new Properties()
+ newProps.put(KafkaConfig.MaxConnectionsProp, "9999")
+ newProps.put(KafkaConfig.MessageMaxBytesProp, "2222")
+
+ configs.dynamicConfig.updateDefaultConfig(newProps)
+ assertEquals(9999, configs.maxConnections)
+ assertEquals(2222, configs.messageMaxBytes)
+
+ newProps = new Properties()
+ newProps.put(KafkaConfig.MaxConnectionsProp, "INVALID_INT")
+ newProps.put(KafkaConfig.MessageMaxBytesProp, "1111")
+
+ configs.dynamicConfig.updateDefaultConfig(newProps)
+ // Invalid value should be skipped and reassigned as default value
+ assertEquals(Defaults.MaxConnections, configs.maxConnections)
+ // Even if One property is invalid, the below should get correctly updated.
+ assertEquals(1111, configs.messageMaxBytes)
+ }
}
class TestDynamicThreadPool() extends BrokerReconfigurable {