This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 06a744ac3c1 KAFKA-19980 ConfigCommand#validatePropsKey should accept
`$` symbol (#21125)
06a744ac3c1 is described below
commit 06a744ac3c166fc9444a4fd03336f043dd6a272f
Author: Lan Ding <[email protected]>
AuthorDate: Fri Dec 12 22:52:25 2025 +0800
KAFKA-19980 ConfigCommand#validatePropsKey should accept `$` symbol (#21125)
1. Enabled ConfigCommand#validatePropsKey to support the `$` symbol.
2. Added UT and IT for the change.
Reviewers: Chih-Yuan Chien <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../src/main/scala/kafka/admin/ConfigCommand.scala | 3 +-
.../kafka/tools/ConfigCommandIntegrationTest.java | 73 +++++++++++++++++-----
.../org/apache/kafka/tools/ConfigCommandTest.java | 5 +-
3 files changed, 64 insertions(+), 17 deletions(-)
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala
b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index f004b9956c8..e731b566f9e 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -137,7 +137,8 @@ object ConfigCommand extends Logging {
private def validatePropsKey(props: Properties): Unit = {
props.keySet.forEach { propsKey =>
- if (!propsKey.toString.matches("[a-zA-Z0-9._-]*")) {
+ // Allows the '$' symbol to support valid logger names for internal
classes (e.g.
org.apache.kafka.server.quota.ClientQuotaManager$ThrottledChannelReaper)
+ if (!propsKey.toString.matches("[$a-zA-Z0-9._-]*")) {
throw new IllegalArgumentException(
s"Invalid character found for config key: $propsKey"
)
diff --git
a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
index bdaa6ee5934..e6d45079d97 100644
---
a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
@@ -371,6 +371,30 @@ public class ConfigCommandIntegrationTest {
}
}
+ @ClusterTest
+ public void testBrokerLoggerConfigUpdate() throws Exception {
+ List<String> alterOpts =
Stream.concat(entityOp(Optional.of(defaultBrokerId)).stream(),
+ Stream.of("--entity-type", "broker-loggers", "--alter")).toList();
+
+ verifyBrokerLoggerConfigUpdate(alterOpts);
+
+ // Test for the --broker-logger alias
+ verifyBrokerLoggerConfigUpdate(List.of("--broker-logger",
defaultBrokerId, "--alter"));
+ }
+
+ private void verifyBrokerLoggerConfigUpdate(List<String> alterOpts) throws
Exception {
+ try (Admin client = cluster.admin()) {
+ // Add config
+ Map<String, String> configs = new HashMap<>();
+
configs.put("org.apache.kafka.server.quota.ClientQuotaManager$ThrottledChannelReaper",
"DEBUG");
+ alterAndVerifyBrokerLoggerConfig(client, defaultBrokerId, configs,
alterOpts);
+
+ // Delete config
+
configs.put("org.apache.kafka.server.quota.ClientQuotaManager$ThrottledChannelReaper",
"ERROR");
+ deleteAndVerifyBrokerLoggerConfigValue(client, defaultBrokerId,
configs, alterOpts);
+ }
+ }
+
@ClusterTest
public void testAlterReadOnlyConfigInKRaftThenShouldFail() {
List<String> alterOpts =
generateDefaultAlterOpts(cluster.bootstrapServers());
@@ -593,7 +617,7 @@ public class ConfigCommandIntegrationTest {
Map<String, String> config,
List<String> alterOpts) throws Exception
{
alterConfigWithAdmin(client, brokerId, config, alterOpts);
- verifyConfig(client, brokerId, config);
+ verifyBrokerConfig(client, brokerId, config);
}
private void alterAndVerifyGroupConfig(Admin client,
@@ -612,6 +636,14 @@ public class ConfigCommandIntegrationTest {
verifyClientMetricsConfig(client, clientMetricsName, config);
}
+ private void alterAndVerifyBrokerLoggerConfig(Admin client,
+ String brokerId,
+ Map<String, String> config,
+ List<String> alterOpts)
throws Exception {
+ alterConfigWithAdmin(client, config, alterOpts);
+ verifyBrokerLoggerConfig(client, brokerId, config);
+ }
+
private void alterConfigWithAdmin(Admin client, Optional<String>
resourceName, Map<String, String> config, List<String> alterOpts) {
String configStr = transferConfigMapToString(config);
List<String> bootstrapOpts = quorumArgs().toList();
@@ -635,28 +667,27 @@ public class ConfigCommandIntegrationTest {
ConfigCommand.alterConfig(client, addOpts);
}
- private void verifyConfig(Admin client, Optional<String> brokerId,
Map<String, String> config) throws Exception {
+ private void verifyBrokerConfig(Admin client, Optional<String> brokerId,
Map<String, String> config) throws Exception {
ConfigResource configResource = new
ConfigResource(ConfigResource.Type.BROKER, brokerId.orElse(""));
- TestUtils.waitForCondition(() -> {
- Map<String, String> current = getConfigEntryStream(client,
configResource)
- .filter(configEntry ->
Objects.nonNull(configEntry.value()))
- .collect(Collectors.toMap(ConfigEntry::name,
ConfigEntry::value));
- return config.entrySet().stream().allMatch(e ->
e.getValue().equals(current.get(e.getKey())));
- }, 10000, config + " are not updated");
+ verifyConfig(client, config, configResource);
}
private void verifyGroupConfig(Admin client, String groupName, Map<String,
String> config) throws Exception {
ConfigResource configResource = new
ConfigResource(ConfigResource.Type.GROUP, groupName);
- TestUtils.waitForCondition(() -> {
- Map<String, String> current = getConfigEntryStream(client,
configResource)
- .filter(configEntry -> Objects.nonNull(configEntry.value()))
- .collect(Collectors.toMap(ConfigEntry::name,
ConfigEntry::value));
- return config.entrySet().stream().allMatch(e ->
e.getValue().equals(current.get(e.getKey())));
- }, 10000, config + " are not updated");
+ verifyConfig(client, config, configResource);
}
private void verifyClientMetricsConfig(Admin client, String
clientMetricsName, Map<String, String> config) throws Exception {
ConfigResource configResource = new
ConfigResource(ConfigResource.Type.CLIENT_METRICS, clientMetricsName);
+ verifyConfig(client, config, configResource);
+ }
+
+ private void verifyBrokerLoggerConfig(Admin client, String brokerId,
Map<String, String> config) throws Exception {
+ ConfigResource configResource = new
ConfigResource(ConfigResource.Type.BROKER_LOGGER, brokerId);
+ verifyConfig(client, config, configResource);
+ }
+
+ private void verifyConfig(Admin client, Map<String, String> config,
ConfigResource configResource) throws InterruptedException {
TestUtils.waitForCondition(() -> {
Map<String, String> current = getConfigEntryStream(client,
configResource)
.filter(configEntry ->
Objects.nonNull(configEntry.value()))
@@ -718,6 +749,20 @@ public class ConfigCommandIntegrationTest {
verifyClientMetricsConfig(client, clientMetricsName, defaultConfigs);
}
+ private void deleteAndVerifyBrokerLoggerConfigValue(Admin client,
+ String brokerId,
+ Map<String, String>
defaultConfigs,
+ List<String>
alterOpts) throws Exception {
+ List<String> bootstrapOpts = quorumArgs().toList();
+ ConfigCommand.ConfigCommandOptions deleteOpts =
+ new ConfigCommand.ConfigCommandOptions(toArray(bootstrapOpts,
+ alterOpts,
+ List.of("--delete-config", String.join(",",
defaultConfigs.keySet()))));
+ deleteOpts.checkArgs();
+ ConfigCommand.alterConfig(client, deleteOpts);
+ verifyBrokerLoggerConfig(client, brokerId, defaultConfigs);
+ }
+
private void verifyPerBrokerConfigValue(Admin client,
String brokerId,
Set<String> config,
diff --git a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java
index 13cfb281111..1109098500d 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java
@@ -295,13 +295,14 @@ public class ConfigCommandTest {
createOpts = new
ConfigCommand.ConfigCommandOptions(toArray(connectOpts1, connectOpts2,
shortFlag, "1",
"--alter",
- "--add-config", "a._-c=b,c=,d=e,f="));
+ "--add-config", "a._-c=b,c=,d=e,a$b=c,f="));
createOpts.checkArgs();
Properties addedProps2 =
ConfigCommand.parseConfigsToBeAdded(createOpts);
- assertEquals(4, addedProps2.size());
+ assertEquals(5, addedProps2.size());
assertEquals("b", addedProps2.getProperty("a._-c"));
assertEquals("e", addedProps2.getProperty("d"));
+ assertEquals("c", addedProps2.getProperty("a$b"));
assertTrue(addedProps2.getProperty("c").isEmpty());
assertTrue(addedProps2.getProperty("f").isEmpty());