This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 81765194631a8f99685394ddfcc97105b8591a46 Author: Chia-Ping Tsai <chia7...@gmail.com> AuthorDate: Tue Aug 16 02:05:28 2022 +0800 MINOR: Appending value to LIST config should not generate empty string with … (#12503) Reviewers: dengziming <dengziming1...@gmail.com>, Luke Chen <show...@gmail.com> --- .../scala/kafka/server/ConfigAdminManager.scala | 5 ++-- .../kafka/api/PlaintextAdminIntegrationTest.scala | 32 ++++++++++++++++++++++ 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/server/ConfigAdminManager.scala b/core/src/main/scala/kafka/server/ConfigAdminManager.scala index cc7a98179dd..c6ea4e13953 100644 --- a/core/src/main/scala/kafka/server/ConfigAdminManager.scala +++ b/core/src/main/scala/kafka/server/ConfigAdminManager.scala @@ -497,8 +497,9 @@ object ConfigAdminManager { throw new InvalidConfigurationException(s"Config value append is not allowed for config key: ${alterConfigOp.configEntry.name}") val oldValueList = Option(configProps.getProperty(alterConfigOp.configEntry.name)) .orElse(Option(ConfigDef.convertToString(configKeys(configPropName).defaultValue, ConfigDef.Type.LIST))) - .getOrElse("") - .split(",").toList + .filter(s => s.nonEmpty) + .map(_.split(",").toList) + .getOrElse(List.empty) val appendingValueList = alterConfigOp.configEntry.value.split(",").toList.filter(value => !oldValueList.contains(value)) val newValueList = oldValueList ::: appendingValueList configProps.setProperty(alterConfigOp.configEntry.name, newValueList.mkString(",")) diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 203c04a68a7..7121f98bb9c 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -2480,6 +2480,38 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testAppendConfigToEmptyDefaultValue(ignored: String): Unit = { + testAppendConfig(new Properties(), "0:0", "0:0") + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testAppendConfigToExistentValue(ignored: String): Unit = { + val props = new Properties(); + props.setProperty(LogConfig.LeaderReplicationThrottledReplicasProp, "1:1") + testAppendConfig(props, "0:0", "1:1,0:0") + } + + private def testAppendConfig(props: Properties, append: String, expected: String): Unit = { + client = Admin.create(createConfig) + createTopic(topic, topicConfig = props) + val topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topic) + val topicAlterConfigs = Seq( + new AlterConfigOp(new ConfigEntry(LogConfig.LeaderReplicationThrottledReplicasProp, append), AlterConfigOp.OpType.APPEND), + ).asJavaCollection + + val alterResult = client.incrementalAlterConfigs(Map( + topicResource -> topicAlterConfigs + ).asJava) + alterResult.all().get() + + ensureConsistentKRaftMetadata() + val config = client.describeConfigs(List(topicResource).asJava).all().get().get(topicResource).get(LogConfig.LeaderReplicationThrottledReplicasProp) + assertEquals(expected, config.value()) + } + /** * Test that createTopics returns the dynamic configurations of the topics that were created. *