This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 06a744ac3c1 KAFKA-19980 ConfigCommand#validatePropsKey should accept 
`$` symbol (#21125)
06a744ac3c1 is described below

commit 06a744ac3c166fc9444a4fd03336f043dd6a272f
Author: Lan Ding <[email protected]>
AuthorDate: Fri Dec 12 22:52:25 2025 +0800

    KAFKA-19980 ConfigCommand#validatePropsKey should accept `$` symbol (#21125)
    
    1. Enabled ConfigCommand#validatePropsKey to support the `$` symbol.
    2. Added UT and IT for the change.
    
    Reviewers: Chih-Yuan Chien <[email protected]>, Chia-Ping Tsai
     <[email protected]>
---
 .../src/main/scala/kafka/admin/ConfigCommand.scala |  3 +-
 .../kafka/tools/ConfigCommandIntegrationTest.java  | 73 +++++++++++++++++-----
 .../org/apache/kafka/tools/ConfigCommandTest.java  |  5 +-
 3 files changed, 64 insertions(+), 17 deletions(-)

diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala 
b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index f004b9956c8..e731b566f9e 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -137,7 +137,8 @@ object ConfigCommand extends Logging {
 
   private def validatePropsKey(props: Properties): Unit = {
     props.keySet.forEach { propsKey =>
-      if (!propsKey.toString.matches("[a-zA-Z0-9._-]*")) {
+      // Allows the '$' symbol to support valid logger names for internal 
classes (e.g. 
org.apache.kafka.server.quota.ClientQuotaManager$ThrottledChannelReaper)
+      if (!propsKey.toString.matches("[$a-zA-Z0-9._-]*")) {
         throw new IllegalArgumentException(
           s"Invalid character found for config key: $propsKey"
         )
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java 
b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
index bdaa6ee5934..e6d45079d97 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
@@ -371,6 +371,30 @@ public class ConfigCommandIntegrationTest {
         }
     }
 
+    @ClusterTest
+    public void testBrokerLoggerConfigUpdate() throws Exception {
+        List<String> alterOpts = 
Stream.concat(entityOp(Optional.of(defaultBrokerId)).stream(),
+            Stream.of("--entity-type", "broker-loggers", "--alter")).toList();
+
+        verifyBrokerLoggerConfigUpdate(alterOpts);
+
+        // Test for the --broker-logger alias
+        verifyBrokerLoggerConfigUpdate(List.of("--broker-logger", 
defaultBrokerId, "--alter"));
+    }
+
+    private void verifyBrokerLoggerConfigUpdate(List<String> alterOpts) throws 
Exception {
+        try (Admin client = cluster.admin()) {
+            // Add config
+            Map<String, String> configs = new HashMap<>();
+            
configs.put("org.apache.kafka.server.quota.ClientQuotaManager$ThrottledChannelReaper",
 "DEBUG");
+            alterAndVerifyBrokerLoggerConfig(client, defaultBrokerId, configs, 
alterOpts);
+
+            // Delete config
+            
configs.put("org.apache.kafka.server.quota.ClientQuotaManager$ThrottledChannelReaper",
 "ERROR");
+            deleteAndVerifyBrokerLoggerConfigValue(client, defaultBrokerId, 
configs, alterOpts);
+        }
+    }
+
     @ClusterTest
     public void testAlterReadOnlyConfigInKRaftThenShouldFail() {
         List<String> alterOpts = 
generateDefaultAlterOpts(cluster.bootstrapServers());
@@ -593,7 +617,7 @@ public class ConfigCommandIntegrationTest {
                                       Map<String, String> config,
                                       List<String> alterOpts) throws Exception 
{
         alterConfigWithAdmin(client, brokerId, config, alterOpts);
-        verifyConfig(client, brokerId, config);
+        verifyBrokerConfig(client, brokerId, config);
     }
 
     private void alterAndVerifyGroupConfig(Admin client,
@@ -612,6 +636,14 @@ public class ConfigCommandIntegrationTest {
         verifyClientMetricsConfig(client, clientMetricsName, config);
     }
 
+    private void alterAndVerifyBrokerLoggerConfig(Admin client,
+                                                  String brokerId,
+                                                  Map<String, String> config,
+                                                  List<String> alterOpts) 
throws Exception {
+        alterConfigWithAdmin(client, config, alterOpts);
+        verifyBrokerLoggerConfig(client, brokerId, config);
+    }
+
     private void alterConfigWithAdmin(Admin client, Optional<String> 
resourceName, Map<String, String> config, List<String> alterOpts) {
         String configStr = transferConfigMapToString(config);
         List<String> bootstrapOpts = quorumArgs().toList();
@@ -635,28 +667,27 @@ public class ConfigCommandIntegrationTest {
         ConfigCommand.alterConfig(client, addOpts);
     }
 
-    private void verifyConfig(Admin client, Optional<String> brokerId, 
Map<String, String> config) throws Exception {
+    private void verifyBrokerConfig(Admin client, Optional<String> brokerId, 
Map<String, String> config) throws Exception {
         ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.BROKER, brokerId.orElse(""));
-        TestUtils.waitForCondition(() -> {
-            Map<String, String> current = getConfigEntryStream(client, 
configResource)
-                    .filter(configEntry -> 
Objects.nonNull(configEntry.value()))
-                    .collect(Collectors.toMap(ConfigEntry::name, 
ConfigEntry::value));
-            return config.entrySet().stream().allMatch(e -> 
e.getValue().equals(current.get(e.getKey())));
-        }, 10000, config + " are not updated");
+        verifyConfig(client, config, configResource);
     }
 
     private void verifyGroupConfig(Admin client, String groupName, Map<String, 
String> config) throws Exception {
         ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.GROUP, groupName);
-        TestUtils.waitForCondition(() -> {
-            Map<String, String> current = getConfigEntryStream(client, 
configResource)
-                .filter(configEntry -> Objects.nonNull(configEntry.value()))
-                .collect(Collectors.toMap(ConfigEntry::name, 
ConfigEntry::value));
-            return config.entrySet().stream().allMatch(e -> 
e.getValue().equals(current.get(e.getKey())));
-        }, 10000, config + " are not updated");
+        verifyConfig(client, config, configResource);
     }
 
     private void verifyClientMetricsConfig(Admin client, String 
clientMetricsName, Map<String, String> config) throws Exception {
         ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.CLIENT_METRICS, clientMetricsName);
+        verifyConfig(client, config, configResource);
+    }
+
+    private void verifyBrokerLoggerConfig(Admin client, String brokerId, 
Map<String, String> config) throws Exception {
+        ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.BROKER_LOGGER, brokerId);
+        verifyConfig(client, config, configResource);
+    }
+
+    private void verifyConfig(Admin client, Map<String, String> config, 
ConfigResource configResource) throws InterruptedException {
         TestUtils.waitForCondition(() -> {
             Map<String, String> current = getConfigEntryStream(client, 
configResource)
                     .filter(configEntry -> 
Objects.nonNull(configEntry.value()))
@@ -718,6 +749,20 @@ public class ConfigCommandIntegrationTest {
         verifyClientMetricsConfig(client, clientMetricsName, defaultConfigs);
     }
 
+    private void deleteAndVerifyBrokerLoggerConfigValue(Admin client,
+                                                        String brokerId,
+                                                        Map<String, String> 
defaultConfigs,
+                                                        List<String> 
alterOpts) throws Exception {
+        List<String> bootstrapOpts = quorumArgs().toList();
+        ConfigCommand.ConfigCommandOptions deleteOpts =
+            new ConfigCommand.ConfigCommandOptions(toArray(bootstrapOpts,
+                    alterOpts,
+                    List.of("--delete-config", String.join(",", 
defaultConfigs.keySet()))));
+        deleteOpts.checkArgs();
+        ConfigCommand.alterConfig(client, deleteOpts);
+        verifyBrokerLoggerConfig(client, brokerId, defaultConfigs);
+    }
+
     private void verifyPerBrokerConfigValue(Admin client,
                                             String brokerId,
                                             Set<String> config,
diff --git a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java 
b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java
index 13cfb281111..1109098500d 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java
@@ -295,13 +295,14 @@ public class ConfigCommandTest {
         createOpts = new 
ConfigCommand.ConfigCommandOptions(toArray(connectOpts1, connectOpts2,
             shortFlag, "1",
             "--alter",
-            "--add-config", "a._-c=b,c=,d=e,f="));
+            "--add-config", "a._-c=b,c=,d=e,a$b=c,f="));
         createOpts.checkArgs();
 
         Properties addedProps2 = 
ConfigCommand.parseConfigsToBeAdded(createOpts);
-        assertEquals(4, addedProps2.size());
+        assertEquals(5, addedProps2.size());
         assertEquals("b", addedProps2.getProperty("a._-c"));
         assertEquals("e", addedProps2.getProperty("d"));
+        assertEquals("c", addedProps2.getProperty("a$b"));
         assertTrue(addedProps2.getProperty("c").isEmpty());
         assertTrue(addedProps2.getProperty("f").isEmpty());
 

Reply via email to