This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 81765194631a8f99685394ddfcc97105b8591a46
Author: Chia-Ping Tsai <chia7...@gmail.com>
AuthorDate: Tue Aug 16 02:05:28 2022 +0800

    MINOR: Appending value to LIST config should not generate empty string with 
… (#12503)
    
    Reviewers: dengziming <dengziming1...@gmail.com>, Luke Chen 
<show...@gmail.com>
---
 .../scala/kafka/server/ConfigAdminManager.scala    |  5 ++--
 .../kafka/api/PlaintextAdminIntegrationTest.scala  | 32 ++++++++++++++++++++++
 2 files changed, 35 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ConfigAdminManager.scala 
b/core/src/main/scala/kafka/server/ConfigAdminManager.scala
index cc7a98179dd..c6ea4e13953 100644
--- a/core/src/main/scala/kafka/server/ConfigAdminManager.scala
+++ b/core/src/main/scala/kafka/server/ConfigAdminManager.scala
@@ -497,8 +497,9 @@ object ConfigAdminManager {
             throw new InvalidConfigurationException(s"Config value append is 
not allowed for config key: ${alterConfigOp.configEntry.name}")
           val oldValueList = 
Option(configProps.getProperty(alterConfigOp.configEntry.name))
             
.orElse(Option(ConfigDef.convertToString(configKeys(configPropName).defaultValue,
 ConfigDef.Type.LIST)))
-            .getOrElse("")
-            .split(",").toList
+            .filter(s => s.nonEmpty)
+            .map(_.split(",").toList)
+            .getOrElse(List.empty)
           val appendingValueList = 
alterConfigOp.configEntry.value.split(",").toList.filter(value => 
!oldValueList.contains(value))
           val newValueList = oldValueList ::: appendingValueList
           configProps.setProperty(alterConfigOp.configEntry.name, 
newValueList.mkString(","))
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 203c04a68a7..7121f98bb9c 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -2480,6 +2480,38 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     }
   }
 
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testAppendConfigToEmptyDefaultValue(ignored: String): Unit = {
+    testAppendConfig(new Properties(), "0:0", "0:0")
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testAppendConfigToExistentValue(ignored: String): Unit = {
+    val props = new Properties();
+    props.setProperty(LogConfig.LeaderReplicationThrottledReplicasProp, "1:1")
+    testAppendConfig(props, "0:0", "1:1,0:0")
+  }
+
+  private def testAppendConfig(props: Properties, append: String, expected: 
String): Unit = {
+    client = Admin.create(createConfig)
+    createTopic(topic, topicConfig = props)
+    val topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topic)
+    val topicAlterConfigs = Seq(
+      new AlterConfigOp(new 
ConfigEntry(LogConfig.LeaderReplicationThrottledReplicasProp, append), 
AlterConfigOp.OpType.APPEND),
+    ).asJavaCollection
+
+    val alterResult = client.incrementalAlterConfigs(Map(
+      topicResource -> topicAlterConfigs
+    ).asJava)
+    alterResult.all().get()
+
+    ensureConsistentKRaftMetadata()
+    val config = 
client.describeConfigs(List(topicResource).asJava).all().get().get(topicResource).get(LogConfig.LeaderReplicationThrottledReplicasProp)
+    assertEquals(expected, config.value())
+  }
+
   /**
    * Test that createTopics returns the dynamic configurations of the topics 
that were created.
    *

Reply via email to