This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5e7445a6d63 KAFKA-17516 Synonyms for client metrics configs (#17264)
5e7445a6d63 is described below

commit 5e7445a6d63f02033f1f158063c6cf7161170e71
Author: Andrew Schofield <[email protected]>
AuthorDate: Fri Mar 14 08:05:40 2025 +0000

    KAFKA-17516 Synonyms for client metrics configs (#17264)
    
    This PR brings client metrics configuration resources in line with the
    other config resources in terms of handling synonyms and defaults.
    Specifically, configs which are not explicitly set take their hard-coded
    default values, and these are reported by `kafka-configs.sh --describe`
    and `Kafka-client-metrics.sh --describe`. Previously, they were omitted
    which means the administrator needed to know the default values.
    
    The ConfigHelper was changed so that the handling of client metrics
    configuration matches that of group configuration.
    
    Reviewers: poorv Mittal <[email protected]>, Chia-Ping Tsai 
<[email protected]>
---
 .../src/main/scala/kafka/server/ConfigHelper.scala | 45 +++++++++-----
 .../kafka/api/PlaintextAdminIntegrationTest.scala  |  4 --
 .../ControllerConfigurationValidatorTest.scala     | 12 ++--
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  4 +-
 .../apache/kafka/server/ClientMetricsManager.java  | 12 ++--
 .../kafka/server/metrics/ClientMetricsConfigs.java | 70 +++++++++++++++-------
 .../kafka/server/ClientMetricsManagerTest.java     | 58 +++++++++---------
 .../server/metrics/ClientMetricsInstanceTest.java  |  8 +--
 .../server/metrics/ClientMetricsTestUtils.java     | 14 ++---
 .../kafka/tools/ConfigCommandIntegrationTest.java  | 20 +++----
 10 files changed, 141 insertions(+), 106 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ConfigHelper.scala 
b/core/src/main/scala/kafka/server/ConfigHelper.scala
index be5f8aca243..47b17a251b1 100644
--- a/core/src/main/scala/kafka/server/ConfigHelper.scala
+++ b/core/src/main/scala/kafka/server/ConfigHelper.scala
@@ -35,9 +35,9 @@ import 
org.apache.kafka.common.resource.ResourceType.{CLUSTER, GROUP, TOPIC}
 import org.apache.kafka.coordinator.group.GroupConfig
 import org.apache.kafka.metadata.ConfigRepository
 import org.apache.kafka.server.config.ServerTopicConfigSynonyms
+import org.apache.kafka.server.metrics.ClientMetricsConfigs
 import org.apache.kafka.storage.internals.log.LogConfig
 
-import scala.collection.mutable.ListBuffer
 import scala.collection.{Map, mutable}
 import scala.jdk.CollectionConverters._
 import scala.jdk.OptionConverters.RichOptional
@@ -136,21 +136,12 @@ class ConfigHelper(metadataCache: MetadataCache, config: 
KafkaConfig, configRepo
                   
.setIsSensitive(false).setReadOnly(false).setSynonyms(List.empty.asJava))
 
           case ConfigResource.Type.CLIENT_METRICS =>
-            val subscriptionName = resource.resourceName
-            if (subscriptionName == null || subscriptionName.isEmpty) {
+            if (resource.resourceName == null || 
resource.resourceName.isEmpty) {
               throw new InvalidRequestException("Client metrics subscription 
name must not be empty")
             } else {
-              val entityProps = configRepository.config(new 
ConfigResource(ConfigResource.Type.CLIENT_METRICS, subscriptionName))
-              val configEntries = new 
ListBuffer[DescribeConfigsResponseData.DescribeConfigsResourceResult]()
-              entityProps.forEach((name, value) => {
-                configEntries += new 
DescribeConfigsResponseData.DescribeConfigsResourceResult().setName(name.toString)
-                  
.setValue(value.toString).setConfigSource(ConfigSource.CLIENT_METRICS_CONFIG.id())
-                  
.setIsSensitive(false).setReadOnly(false).setSynonyms(List.empty.asJava)
-              })
-
-              new DescribeConfigsResponseData.DescribeConfigsResult()
-                .setErrorCode(Errors.NONE.code)
-                .setConfigs(configEntries.asJava)
+              val clientMetricsProps = configRepository.config(new 
ConfigResource(ConfigResource.Type.CLIENT_METRICS, resource.resourceName))
+              val clientMetricsConfig = 
ClientMetricsConfigs.fromProps(ClientMetricsConfigs.defaultConfigsMap(), 
clientMetricsProps)
+              createResponseConfig(allConfigs(clientMetricsConfig), 
createClientMetricsConfigEntry(clientMetricsConfig, clientMetricsProps, 
includeSynonyms, includeDocumentation))
             }
 
           case ConfigResource.Type.GROUP =>
@@ -185,8 +176,8 @@ class ConfigHelper(metadataCache: MetadataCache, config: 
KafkaConfig, configRepo
     }
   }
 
-  def createGroupConfigEntry(groupConfig: GroupConfig, groupProps: Properties, 
includeSynonyms: Boolean, includeDocumentation: Boolean)
-                            (name: String, value: Any): 
DescribeConfigsResponseData.DescribeConfigsResourceResult = {
+  private def createGroupConfigEntry(groupConfig: GroupConfig, groupProps: 
Properties, includeSynonyms: Boolean, includeDocumentation: Boolean)
+                                    (name: String, value: Any): 
DescribeConfigsResponseData.DescribeConfigsResourceResult = {
     val allNames = brokerSynonyms(name)
     val configEntryType = GroupConfig.configType(name).toScala
     val isSensitive = KafkaConfig.maybeSensitive(configEntryType)
@@ -210,6 +201,28 @@ class ConfigHelper(metadataCache: MetadataCache, config: 
KafkaConfig, configRepo
       .setDocumentation(configDocumentation).setConfigType(dataType.id)
   }
 
+  private def createClientMetricsConfigEntry(clientMetricsConfig: 
ClientMetricsConfigs, clientMetricsProps: Properties, includeSynonyms: Boolean, 
includeDocumentation: Boolean)
+                                            (name: String, value: Any): 
DescribeConfigsResponseData.DescribeConfigsResourceResult = {
+    val configEntryType = ClientMetricsConfigs.configType(name).toScala
+    val valueAsString = ConfigDef.convertToString(value, 
configEntryType.orNull)
+    val allSynonyms = {
+      if (!clientMetricsProps.containsKey(name)) {
+        List.empty
+      } else {
+        List(new 
DescribeConfigsResponseData.DescribeConfigsSynonym().setName(name).setValue(valueAsString)
+          .setSource(ConfigSource.CLIENT_METRICS_CONFIG.id))
+      }
+    }
+    val source = if (allSynonyms.isEmpty) ConfigSource.DEFAULT_CONFIG.id else 
allSynonyms.head.source
+    val synonyms = if (!includeSynonyms) List.empty else allSynonyms
+    val dataType = configResponseType(configEntryType)
+    val configDocumentation = if (includeDocumentation) 
clientMetricsConfig.documentationOf(name) else null
+    new DescribeConfigsResponseData.DescribeConfigsResourceResult()
+      .setName(name).setValue(valueAsString).setConfigSource(source)
+      .setIsSensitive(false).setReadOnly(false).setSynonyms(synonyms.asJava)
+      .setDocumentation(configDocumentation).setConfigType(dataType.id)
+  }
+
   def createTopicConfigEntry(logConfig: LogConfig, topicProps: Properties, 
includeSynonyms: Boolean, includeDocumentation: Boolean)
                             (name: String, value: Any): 
DescribeConfigsResponseData.DescribeConfigsResourceResult = {
     val configEntryType = LogConfig.configType(name).toScala
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 18c5c2bbd05..af2a27fdf4f 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -944,10 +944,6 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     val groupResource = new ConfigResource(ConfigResource.Type.GROUP, 
"none_group")
     val groupResult = 
client.describeConfigs(Seq(groupResource).asJava).all().get().get(groupResource)
     assertNotEquals(0, groupResult.entries().size())
-
-    val metricResource = new 
ConfigResource(ConfigResource.Type.CLIENT_METRICS, "none_metric")
-    val metricResult = 
client.describeConfigs(Seq(metricResource).asJava).all().get().get(metricResource)
-    assertEquals(0, metricResult.entries().size())
   }
 
   @Test
diff --git 
a/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
 
b/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
index 9f903336d23..3056753f53b 100644
--- 
a/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
@@ -128,9 +128,9 @@ class ControllerConfigurationValidatorTest {
   @Test
   def testValidClientMetricsConfig(): Unit = {
     val config = new util.TreeMap[String, String]()
-    config.put(ClientMetricsConfigs.PUSH_INTERVAL_MS, "2000")
-    config.put(ClientMetricsConfigs.SUBSCRIPTION_METRICS, 
"org.apache.kafka.client.producer.partition.queue.,org.apache.kafka.client.producer.partition.latency")
-    config.put(ClientMetricsConfigs.CLIENT_MATCH_PATTERN, 
"client_instance_id=b69cc35a-7a54-4790-aa69-cc2bd4ee4538,client_id=1" +
+    config.put(ClientMetricsConfigs.INTERVAL_MS_CONFIG, "2000")
+    config.put(ClientMetricsConfigs.METRICS_CONFIG, 
"org.apache.kafka.client.producer.partition.queue.,org.apache.kafka.client.producer.partition.latency")
+    config.put(ClientMetricsConfigs.MATCH_CONFIG, 
"client_instance_id=b69cc35a-7a54-4790-aa69-cc2bd4ee4538,client_id=1" +
       
",client_software_name=apache-kafka-java,client_software_version=2.8.0-SNAPSHOT,client_source_address=127.0.0.1,"
 +
       "client_source_port=1234")
     validator.validate(new ConfigResource(CLIENT_METRICS, "subscription-1"), 
config, emptyMap())
@@ -147,12 +147,12 @@ class ControllerConfigurationValidatorTest {
   @Test
   def testInvalidIntervalClientMetricsConfig(): Unit = {
     val config = new util.TreeMap[String, String]()
-    config.put(ClientMetricsConfigs.PUSH_INTERVAL_MS, "10")
+    config.put(ClientMetricsConfigs.INTERVAL_MS_CONFIG, "10")
     assertEquals("Invalid value 10 for interval.ms, interval must be between 
100 and 3600000 (1 hour)",
       assertThrows(classOf[InvalidRequestException], () => validator.validate(
         new ConfigResource(CLIENT_METRICS, "subscription-1"), config, 
emptyMap())). getMessage)
 
-    config.put(ClientMetricsConfigs.PUSH_INTERVAL_MS, "3600001")
+    config.put(ClientMetricsConfigs.INTERVAL_MS_CONFIG, "3600001")
     assertEquals("Invalid value 3600001 for interval.ms, interval must be 
between 100 and 3600000 (1 hour)",
       assertThrows(classOf[InvalidRequestException], () => validator.validate(
         new ConfigResource(CLIENT_METRICS, "subscription-1"), config, 
emptyMap())). getMessage)
@@ -170,7 +170,7 @@ class ControllerConfigurationValidatorTest {
   @Test
   def testInvalidMatchClientMetricsConfig(): Unit = {
     val config = new util.TreeMap[String, String]()
-    config.put(ClientMetricsConfigs.CLIENT_MATCH_PATTERN, "10")
+    config.put(ClientMetricsConfigs.MATCH_CONFIG, "10")
     assertEquals("Illegal client matching pattern: 10",
       assertThrows(classOf[InvalidConfigurationException], () => 
validator.validate(
         new ConfigResource(CLIENT_METRICS, "subscription-1"), config, 
emptyMap())). getMessage)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 9325c92bd59..337dc9a62f3 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -373,7 +373,7 @@ class KafkaApisTest extends Logging {
     val subscriptionName = "client_metric_subscription_1"
     val authorizedResource = new 
ConfigResource(ConfigResource.Type.CLIENT_METRICS, subscriptionName)
 
-    val props = ClientMetricsTestUtils.defaultProperties
+    val props = ClientMetricsTestUtils.defaultTestProperties
     val configEntries = new util.ArrayList[AlterConfigsRequest.ConfigEntry]()
     props.forEach((x, y) =>
       configEntries.add(new 
AlterConfigsRequest.ConfigEntry(x.asInstanceOf[String], 
y.asInstanceOf[String])))
@@ -444,7 +444,7 @@ class KafkaApisTest extends Logging {
 
     val resource = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, 
subscriptionName)
     val configRepository: ConfigRepository = mock(classOf[ConfigRepository])
-    val cmConfigs = ClientMetricsTestUtils.defaultProperties
+    val cmConfigs = ClientMetricsTestUtils.defaultTestProperties
     when(configRepository.config(resource)).thenReturn(cmConfigs)
 
     metadataCache = mock(classOf[KRaftMetadataCache])
diff --git 
a/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java 
b/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
index 5b77004d0ae..238e189202b 100644
--- a/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
+++ b/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
@@ -244,9 +244,9 @@ public class ClientMetricsManager implements AutoCloseable {
     }
 
     private void updateClientSubscription(String subscriptionName, 
ClientMetricsConfigs configs) {
-        List<String> metrics = 
configs.getList(ClientMetricsConfigs.SUBSCRIPTION_METRICS);
-        int pushInterval = 
configs.getInt(ClientMetricsConfigs.PUSH_INTERVAL_MS);
-        List<String> clientMatchPattern = 
configs.getList(ClientMetricsConfigs.CLIENT_MATCH_PATTERN);
+        List<String> metrics = 
configs.getList(ClientMetricsConfigs.METRICS_CONFIG);
+        int pushInterval = 
configs.getInt(ClientMetricsConfigs.INTERVAL_MS_CONFIG);
+        List<String> clientMatchPattern = 
configs.getList(ClientMetricsConfigs.MATCH_CONFIG);
 
         SubscriptionInfo newSubscription =
             new SubscriptionInfo(subscriptionName, metrics, pushInterval,
@@ -329,7 +329,7 @@ public class ClientMetricsManager implements AutoCloseable {
 
     private ClientMetricsInstance createClientInstance(Uuid clientInstanceId, 
ClientMetricsInstanceMetadata instanceMetadata) {
 
-        int pushIntervalMs = ClientMetricsConfigs.DEFAULT_INTERVAL_MS;
+        int pushIntervalMs = ClientMetricsConfigs.INTERVAL_MS_DEFAULT;
         // Keep a set of metrics to avoid duplicates in case of overlapping 
subscriptions.
         Set<String> subscribedMetrics = new HashSet<>();
         boolean allMetricsSubscribed = false;
@@ -338,7 +338,7 @@ public class ClientMetricsManager implements AutoCloseable {
         for (SubscriptionInfo info : subscriptionMap.values()) {
             if (instanceMetadata.isMatch(info.matchPattern())) {
                 allMetricsSubscribed = allMetricsSubscribed || 
info.metrics().contains(
-                    ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS_CONFIG);
+                    ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS);
                 subscribedMetrics.addAll(info.metrics());
                 pushIntervalMs = Math.min(pushIntervalMs, info.intervalMs());
             }
@@ -351,7 +351,7 @@ public class ClientMetricsManager implements AutoCloseable {
         if (allMetricsSubscribed) {
             // Only add an * to indicate that all metrics are subscribed.
             subscribedMetrics.clear();
-            
subscribedMetrics.add(ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS_CONFIG);
+            subscribedMetrics.add(ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS);
         }
 
         int subscriptionId = computeSubscriptionId(subscribedMetrics, 
pushIntervalMs, clientInstanceId);
diff --git 
a/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java
 
b/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java
index ba94486e83c..2560854eb55 100644
--- 
a/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java
+++ 
b/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java
@@ -23,11 +23,10 @@ import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.common.errors.InvalidConfigurationException;
 import org.apache.kafka.common.errors.InvalidRequestException;
 
-import java.util.Arrays;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.regex.Pattern;
@@ -52,7 +51,8 @@ import java.util.regex.PatternSyntaxException;
  * <ul>
  *    <li> "name" is a unique name for the subscription. This is used to 
identify the subscription in
  *          the broker. Ex: "METRICS-SUB"
- *    <li> "metrics" value should be comma separated metrics list. A prefix 
match on the requested metrics
+ *
+ *    <li> "metrics" value should be comma-separated metrics list. A prefix 
match on the requested metrics
  *          is performed in clients to determine subscribed metrics. An empty 
list means no metrics subscribed.
  *          A list containing just an empty string means all metrics 
subscribed.
  *          Ex: 
"org.apache.kafka.producer.partition.queue.,org.apache.kafka.producer.partition.latency"
@@ -60,19 +60,19 @@ import java.util.regex.PatternSyntaxException;
  *    <li> "interval.ms" should be between 100 and 3600000 (1 hour). This is 
the interval at which the client
  *          should push the metrics to the broker.
  *
- *    <li> "match" is a comma separated list of client match patterns, in case 
if there is no matching
+ *    <li> "match" is a comma-separated list of client match patterns, in case 
if there is no matching
  *          pattern specified then broker considers that as all match which 
means the associated metrics
  *          applies to all the clients. Ex: "client_software_name = Java, 
client_software_version = 11.1.*"
  *          which means all Java clients with any sub versions of 11.1 will be 
matched i.e. 11.1.1, 11.1.2 etc.
  * </ul>
- * For more information please look at kip-714:
- * 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Clientmetricsconfiguration
+ * For more information please look at
+ * <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Clientmetricsconfiguration";>KIP-714</a>
  */
 public class ClientMetricsConfigs extends AbstractConfig {
 
-    public static final String SUBSCRIPTION_METRICS = "metrics";
-    public static final String PUSH_INTERVAL_MS = "interval.ms";
-    public static final String CLIENT_MATCH_PATTERN = "match";
+    public static final String METRICS_CONFIG = "metrics";
+    public static final String INTERVAL_MS_CONFIG = "interval.ms";
+    public static final String MATCH_CONFIG = "match";
 
     public static final String CLIENT_INSTANCE_ID = "client_instance_id";
     public static final String CLIENT_ID = "client_id";
@@ -82,34 +82,50 @@ public class ClientMetricsConfigs extends AbstractConfig {
     public static final String CLIENT_SOURCE_PORT = "client_source_port";
 
     // '*' in client-metrics resource configs indicates that all the metrics 
are subscribed.
-    public static final String ALL_SUBSCRIBED_METRICS_CONFIG = "*";
+    public static final String ALL_SUBSCRIBED_METRICS = "*";
+
+    public static final List<String> METRICS_DEFAULT = List.of();
 
-    public static final int DEFAULT_INTERVAL_MS = 5 * 60 * 1000; // 5 minutes
+    public static final int INTERVAL_MS_DEFAULT = 5 * 60 * 1000; // 5 minutes
     private static final int MIN_INTERVAL_MS = 100; // 100ms
     private static final int MAX_INTERVAL_MS = 3600000; // 1 hour
 
-    private static final Set<String> ALLOWED_MATCH_PARAMS = new 
HashSet<>(Arrays.asList(
+    public static final List<String> MATCH_DEFAULT = List.of();
+
+    private static final Set<String> ALLOWED_MATCH_PARAMS = Set.of(
         CLIENT_INSTANCE_ID,
         CLIENT_ID,
         CLIENT_SOFTWARE_NAME,
         CLIENT_SOFTWARE_VERSION,
         CLIENT_SOURCE_ADDRESS,
         CLIENT_SOURCE_PORT
-    ));
+    );
 
     private static final ConfigDef CONFIG = new ConfigDef()
-        .define(SUBSCRIPTION_METRICS, Type.LIST, List.of(), Importance.MEDIUM, 
"Subscription metrics list")
-        .define(PUSH_INTERVAL_MS, Type.INT, DEFAULT_INTERVAL_MS, 
Importance.MEDIUM, "Push interval in milliseconds")
-        .define(CLIENT_MATCH_PATTERN, Type.LIST, List.of(), Importance.MEDIUM, 
"Client match pattern list");
+        .define(METRICS_CONFIG, Type.LIST, METRICS_DEFAULT, Importance.MEDIUM, 
"Telemetry metric name prefix list")
+        .define(INTERVAL_MS_CONFIG, Type.INT, INTERVAL_MS_DEFAULT, 
Importance.MEDIUM, "Metrics push interval in milliseconds")
+        .define(MATCH_CONFIG, Type.LIST, MATCH_DEFAULT, Importance.MEDIUM, 
"Client match criteria");
 
     public ClientMetricsConfigs(Properties props) {
-        super(CONFIG, props);
+        super(CONFIG, props, false);
     }
 
     public static ConfigDef configDef() {
         return CONFIG;
     }
 
+    public static Optional<Type> configType(String configName) {
+        return Optional.ofNullable(CONFIG.configKeys().get(configName)).map(c 
-> c.type);
+    }
+
+    public static Map<String, Object> defaultConfigsMap() {
+        Map<String, Object> clientMetricsProps = new HashMap<>();
+        clientMetricsProps.put(METRICS_CONFIG, METRICS_DEFAULT);
+        clientMetricsProps.put(INTERVAL_MS_CONFIG, INTERVAL_MS_DEFAULT);
+        clientMetricsProps.put(MATCH_CONFIG, MATCH_DEFAULT);
+        return clientMetricsProps;
+    }
+
     public static Set<String> names() {
         return CONFIG.names();
     }
@@ -134,18 +150,18 @@ public class ClientMetricsConfigs extends AbstractConfig {
         Map<String, Object> parsed = CONFIG.parse(properties);
 
         // Make sure that push interval is between 100ms and 1 hour.
-        if (properties.containsKey(PUSH_INTERVAL_MS)) {
-            Integer pushIntervalMs = (Integer) parsed.get(PUSH_INTERVAL_MS);
+        if (properties.containsKey(INTERVAL_MS_CONFIG)) {
+            int pushIntervalMs = (Integer) parsed.get(INTERVAL_MS_CONFIG);
             if (pushIntervalMs < MIN_INTERVAL_MS || pushIntervalMs > 
MAX_INTERVAL_MS) {
                 String msg = String.format("Invalid value %s for %s, interval 
must be between 100 and 3600000 (1 hour)",
-                    pushIntervalMs, PUSH_INTERVAL_MS);
+                    pushIntervalMs, INTERVAL_MS_CONFIG);
                 throw new InvalidRequestException(msg);
             }
         }
 
         // Make sure that client match patterns are valid by parsing them.
-        if (properties.containsKey(CLIENT_MATCH_PATTERN)) {
-            List<String> patterns = (List<String>) 
parsed.get(CLIENT_MATCH_PATTERN);
+        if (properties.containsKey(MATCH_CONFIG)) {
+            List<String> patterns = (List<String>) parsed.get(MATCH_CONFIG);
             // Parse the client matching patterns to validate if the patterns 
are valid.
             parseMatchingPatterns(patterns);
         }
@@ -193,4 +209,14 @@ public class ClientMetricsConfigs extends AbstractConfig {
     private static boolean isValidParam(String paramName) {
         return ALLOWED_MATCH_PARAMS.contains(paramName);
     }
+
+    /**
+     * Create a client metrics config instance using the given properties and 
defaults.
+     */
+    public static ClientMetricsConfigs fromProps(Map<?, ?> defaults, 
Properties overrides) {
+        Properties props = new Properties();
+        props.putAll(defaults);
+        props.putAll(overrides);
+        return new ClientMetricsConfigs(props);
+    }
 }
diff --git 
a/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java 
b/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java
index f01776f4488..4adb501cef5 100644
--- a/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java
+++ b/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java
@@ -109,7 +109,7 @@ public class ClientMetricsManagerTest {
         assertTrue(clientMetricsManager.subscriptions().isEmpty());
 
         assertEquals(0, clientMetricsManager.subscriptionUpdateVersion());
-        clientMetricsManager.updateSubscription("sub-1", 
ClientMetricsTestUtils.defaultProperties());
+        clientMetricsManager.updateSubscription("sub-1", 
ClientMetricsTestUtils.defaultTestProperties());
 
         assertEquals(1, clientMetricsManager.subscriptions().size());
         assertNotNull(clientMetricsManager.subscriptionInfo("sub-1"));
@@ -118,17 +118,17 @@ public class ClientMetricsManagerTest {
         Set<String> metrics = subscriptionInfo.metrics();
 
         // Validate metrics.
-        assertEquals(ClientMetricsTestUtils.DEFAULT_METRICS.split(",").length, 
metrics.size());
-        
Arrays.stream(ClientMetricsTestUtils.DEFAULT_METRICS.split(",")).forEach(metric 
->
+        
assertEquals(ClientMetricsTestUtils.METRICS_TEST_DEFAULT.split(",").length, 
metrics.size());
+        
Arrays.stream(ClientMetricsTestUtils.METRICS_TEST_DEFAULT.split(",")).forEach(metric
 ->
             assertTrue(metrics.contains(metric)));
         // Validate push interval.
-        
assertEquals(ClientMetricsTestUtils.defaultProperties().getProperty(ClientMetricsConfigs.PUSH_INTERVAL_MS),
+        
assertEquals(ClientMetricsTestUtils.defaultTestProperties().getProperty(ClientMetricsConfigs.INTERVAL_MS_CONFIG),
             String.valueOf(subscriptionInfo.intervalMs()));
 
         // Validate match patterns.
-        
assertEquals(ClientMetricsTestUtils.DEFAULT_CLIENT_MATCH_PATTERNS.size(),
+        assertEquals(ClientMetricsTestUtils.MATCH_TEST_DEFAULT.size(),
             subscriptionInfo.matchPattern().size());
-        ClientMetricsTestUtils.DEFAULT_CLIENT_MATCH_PATTERNS.forEach(pattern 
-> {
+        ClientMetricsTestUtils.MATCH_TEST_DEFAULT.forEach(pattern -> {
             String[] split = pattern.split("=");
             assertTrue(subscriptionInfo.matchPattern().containsKey(split[0]));
             assertEquals(split[1], 
subscriptionInfo.matchPattern().get(split[0]).pattern());
@@ -191,7 +191,7 @@ public class ClientMetricsManagerTest {
 
     @Test
     public void testGetTelemetry() throws Exception {
-        clientMetricsManager.updateSubscription("sub-1", 
ClientMetricsTestUtils.defaultProperties());
+        clientMetricsManager.updateSubscription("sub-1", 
ClientMetricsTestUtils.defaultTestProperties());
         assertEquals(1, clientMetricsManager.subscriptions().size());
 
         GetTelemetrySubscriptionsRequest request = new 
GetTelemetrySubscriptionsRequest.Builder(
@@ -203,8 +203,8 @@ public class ClientMetricsManagerTest {
         assertNotNull(response.data().clientInstanceId());
         assertTrue(response.data().subscriptionId() != 0);
 
-        assertEquals(ClientMetricsTestUtils.DEFAULT_METRICS.split(",").length, 
response.data().requestedMetrics().size());
-        
Arrays.stream(ClientMetricsTestUtils.DEFAULT_METRICS.split(",")).forEach(metric 
->
+        
assertEquals(ClientMetricsTestUtils.METRICS_TEST_DEFAULT.split(",").length, 
response.data().requestedMetrics().size());
+        
Arrays.stream(ClientMetricsTestUtils.METRICS_TEST_DEFAULT.split(",")).forEach(metric
 ->
             assertTrue(response.data().requestedMetrics().contains(metric)));
 
         assertEquals(4, response.data().acceptedCompressionTypes().size());
@@ -213,7 +213,7 @@ public class ClientMetricsManagerTest {
         assertEquals(CompressionType.LZ4.id, 
response.data().acceptedCompressionTypes().get(1));
         assertEquals(CompressionType.GZIP.id, 
response.data().acceptedCompressionTypes().get(2));
         assertEquals(CompressionType.SNAPPY.id, 
response.data().acceptedCompressionTypes().get(3));
-        assertEquals(ClientMetricsTestUtils.DEFAULT_PUSH_INTERVAL_MS, 
response.data().pushIntervalMs());
+        assertEquals(ClientMetricsTestUtils.INTERVAL_MS_TEST_DEFAULT, 
response.data().pushIntervalMs());
         assertTrue(response.data().deltaTemporality());
         assertEquals(100, response.data().telemetryMaxBytes());
         assertEquals(Errors.NONE, response.error());
@@ -252,7 +252,7 @@ public class ClientMetricsManagerTest {
         assertTrue(response.data().subscriptionId() != 0);
         assertTrue(response.data().requestedMetrics().isEmpty());
         assertEquals(4, response.data().acceptedCompressionTypes().size());
-        assertEquals(ClientMetricsConfigs.DEFAULT_INTERVAL_MS, 
response.data().pushIntervalMs());
+        assertEquals(ClientMetricsConfigs.INTERVAL_MS_DEFAULT, 
response.data().pushIntervalMs());
         assertTrue(response.data().deltaTemporality());
         assertEquals(100, response.data().telemetryMaxBytes());
         assertEquals(Errors.NONE, response.error());
@@ -273,7 +273,7 @@ public class ClientMetricsManagerTest {
         assertNotNull(response.data().clientInstanceId());
         assertEquals(Errors.NONE, response.error());
 
-        time.sleep(ClientMetricsConfigs.DEFAULT_INTERVAL_MS);
+        time.sleep(ClientMetricsConfigs.INTERVAL_MS_DEFAULT);
 
         request = new GetTelemetrySubscriptionsRequest.Builder(
             new 
GetTelemetrySubscriptionsRequestData().setClientInstanceId(response.data().clientInstanceId()),
 true).build();
@@ -286,9 +286,9 @@ public class ClientMetricsManagerTest {
 
     @Test
     public void testGetTelemetryAllMetricSubscribedSubscription() throws 
UnknownHostException {
-        clientMetricsManager.updateSubscription("sub-1", 
ClientMetricsTestUtils.defaultProperties());
+        clientMetricsManager.updateSubscription("sub-1", 
ClientMetricsTestUtils.defaultTestProperties());
         Properties properties = new Properties();
-        properties.put("metrics", 
ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS_CONFIG);
+        properties.put("metrics", ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS);
         clientMetricsManager.updateSubscription("sub-2", properties);
 
         assertEquals(2, clientMetricsManager.subscriptions().size());
@@ -303,10 +303,10 @@ public class ClientMetricsManagerTest {
         assertTrue(response.data().subscriptionId() != 0);
 
         assertEquals(1, response.data().requestedMetrics().size());
-        
assertTrue(response.data().requestedMetrics().contains(ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS_CONFIG));
+        
assertTrue(response.data().requestedMetrics().contains(ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS));
 
         assertEquals(4, response.data().acceptedCompressionTypes().size());
-        assertEquals(ClientMetricsTestUtils.DEFAULT_PUSH_INTERVAL_MS, 
response.data().pushIntervalMs());
+        assertEquals(ClientMetricsTestUtils.INTERVAL_MS_TEST_DEFAULT, 
response.data().pushIntervalMs());
         assertTrue(response.data().deltaTemporality());
         assertEquals(100, response.data().telemetryMaxBytes());
         assertEquals(Errors.NONE, response.error());
@@ -387,7 +387,7 @@ public class ClientMetricsManagerTest {
 
     @Test
     public void testGetTelemetryUpdateSubscription() throws 
UnknownHostException {
-        clientMetricsManager.updateSubscription("sub-1", 
ClientMetricsTestUtils.defaultProperties());
+        clientMetricsManager.updateSubscription("sub-1", 
ClientMetricsTestUtils.defaultTestProperties());
         assertEquals(1, clientMetricsManager.subscriptions().size());
 
         GetTelemetrySubscriptionsRequest request = new 
GetTelemetrySubscriptionsRequest.Builder(
@@ -404,7 +404,7 @@ public class ClientMetricsManagerTest {
 
         // Update subscription
         Properties properties = new Properties();
-        properties.put("metrics", 
ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS_CONFIG);
+        properties.put("metrics", ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS);
         clientMetricsManager.updateSubscription("sub-2", properties);
         assertEquals(2, clientMetricsManager.subscriptions().size());
 
@@ -490,7 +490,7 @@ public class ClientMetricsManagerTest {
         CountDownLatch lock = new CountDownLatch(2);
         List<GetTelemetrySubscriptionsResponse> responses = 
Collections.synchronizedList(new ArrayList<>());
 
-        clientMetricsManager.updateSubscription("sub-1", 
ClientMetricsTestUtils.defaultProperties());
+        clientMetricsManager.updateSubscription("sub-1", 
ClientMetricsTestUtils.defaultTestProperties());
         assertEquals(1, clientMetricsManager.subscriptions().size());
 
         Thread thread = new Thread(() -> {
@@ -543,7 +543,7 @@ public class ClientMetricsManagerTest {
 
     @Test
     public void testPushTelemetry() throws Exception {
-        clientMetricsManager.updateSubscription("sub-1", 
ClientMetricsTestUtils.defaultProperties());
+        clientMetricsManager.updateSubscription("sub-1", 
ClientMetricsTestUtils.defaultTestProperties());
         assertEquals(1, clientMetricsManager.subscriptions().size());
 
         GetTelemetrySubscriptionsRequest subscriptionsRequest = new 
GetTelemetrySubscriptionsRequest.Builder(
@@ -627,7 +627,7 @@ public class ClientMetricsManagerTest {
 
     @Test
     public void testPushTelemetryAfterPushIntervalTime() throws 
UnknownHostException {
-        clientMetricsManager.updateSubscription("sub-1", 
ClientMetricsTestUtils.defaultProperties());
+        clientMetricsManager.updateSubscription("sub-1", 
ClientMetricsTestUtils.defaultTestProperties());
         assertEquals(1, clientMetricsManager.subscriptions().size());
 
         GetTelemetrySubscriptionsRequest subscriptionsRequest = new 
GetTelemetrySubscriptionsRequest.Builder(
@@ -648,7 +648,7 @@ public class ClientMetricsManagerTest {
 
         assertEquals(Errors.NONE, response.error());
 
-        time.sleep(ClientMetricsTestUtils.DEFAULT_PUSH_INTERVAL_MS);
+        time.sleep(ClientMetricsTestUtils.INTERVAL_MS_TEST_DEFAULT);
 
         response = clientMetricsManager.processPushTelemetryRequest(
             request, ClientMetricsTestUtils.requestContext());
@@ -1016,7 +1016,7 @@ public class ClientMetricsManagerTest {
                 .setCompressionType(CompressionType.NONE.id)
                 
.setMetrics(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8))), 
true).build();
 
-        clientMetricsManager.updateSubscription("sub-1", 
ClientMetricsTestUtils.defaultProperties());
+        clientMetricsManager.updateSubscription("sub-1", 
ClientMetricsTestUtils.defaultTestProperties());
         assertEquals(1, clientMetricsManager.subscriptions().size());
 
         CountDownLatch lock = new CountDownLatch(2);
@@ -1087,7 +1087,7 @@ public class ClientMetricsManagerTest {
                 ClientMetricsManager clientMetricsManager = new 
ClientMetricsManager(receiverPlugin, 100, time, 100, kafkaMetrics)
         ) {
 
-            clientMetricsManager.updateSubscription("sub-1", 
ClientMetricsTestUtils.defaultProperties());
+            clientMetricsManager.updateSubscription("sub-1", 
ClientMetricsTestUtils.defaultTestProperties());
             assertEquals(1, clientMetricsManager.subscriptions().size());
 
             GetTelemetrySubscriptionsRequest subscriptionsRequest = new 
GetTelemetrySubscriptionsRequest.Builder(
@@ -1127,8 +1127,8 @@ public class ClientMetricsManagerTest {
     @Test
     public void testCacheEviction() throws Exception {
         Properties properties = new Properties();
-        properties.put("metrics", 
ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS_CONFIG);
-        properties.put(ClientMetricsConfigs.PUSH_INTERVAL_MS, "100");
+        properties.put("metrics", ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS);
+        properties.put(ClientMetricsConfigs.INTERVAL_MS_CONFIG, "100");
         clientMetricsManager.updateSubscription("sub-1", properties);
 
         GetTelemetrySubscriptionsRequest request = new 
GetTelemetrySubscriptionsRequest.Builder(
@@ -1167,8 +1167,8 @@ public class ClientMetricsManagerTest {
     @Test
     public void testCacheEvictionWithMultipleClients() throws Exception {
         Properties properties = new Properties();
-        properties.put("metrics", 
ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS_CONFIG);
-        properties.put(ClientMetricsConfigs.PUSH_INTERVAL_MS, "100");
+        properties.put("metrics", ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS);
+        properties.put(ClientMetricsConfigs.INTERVAL_MS_CONFIG, "100");
         clientMetricsManager.updateSubscription("sub-1", properties);
 
         GetTelemetrySubscriptionsRequest request = new 
GetTelemetrySubscriptionsRequest.Builder(
@@ -1227,7 +1227,7 @@ public class ClientMetricsManagerTest {
         assertNotNull(instance.expirationTimerTask());
 
         // Update subscription
-        clientMetricsManager.updateSubscription("sub-1", 
ClientMetricsTestUtils.defaultProperties());
+        clientMetricsManager.updateSubscription("sub-1", 
ClientMetricsTestUtils.defaultTestProperties());
         assertEquals(1, clientMetricsManager.subscriptions().size());
 
         request = new GetTelemetrySubscriptionsRequest.Builder(
diff --git 
a/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsInstanceTest.java
 
b/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsInstanceTest.java
index 01826ac1648..d965c40ce3f 100644
--- 
a/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsInstanceTest.java
+++ 
b/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsInstanceTest.java
@@ -35,7 +35,7 @@ public class ClientMetricsInstanceTest {
         Uuid uuid = Uuid.randomUuid();
         ClientMetricsInstanceMetadata instanceMetadata = new 
ClientMetricsInstanceMetadata(uuid,
             ClientMetricsTestUtils.requestContext());
-        clientInstance = new ClientMetricsInstance(uuid, instanceMetadata, 0, 
0, null, ClientMetricsConfigs.DEFAULT_INTERVAL_MS);
+        clientInstance = new ClientMetricsInstance(uuid, instanceMetadata, 0, 
0, null, ClientMetricsConfigs.INTERVAL_MS_DEFAULT);
     }
 
     @Test
@@ -47,7 +47,7 @@ public class ClientMetricsInstanceTest {
 
     @Test
     public void testMaybeUpdateGetRequestAfterElapsedTimeValid() {
-        
assertTrue(clientInstance.maybeUpdateGetRequestTimestamp(System.currentTimeMillis()
 - ClientMetricsConfigs.DEFAULT_INTERVAL_MS));
+        
assertTrue(clientInstance.maybeUpdateGetRequestTimestamp(System.currentTimeMillis()
 - ClientMetricsConfigs.INTERVAL_MS_DEFAULT));
         // Second request should be accepted as time since last request is 
greater than the push interval.
         
assertTrue(clientInstance.maybeUpdateGetRequestTimestamp(System.currentTimeMillis()));
     }
@@ -61,7 +61,7 @@ public class ClientMetricsInstanceTest {
 
     @Test
     public void testMaybeUpdatePushRequestAfterElapsedTimeValid() {
-        
assertTrue(clientInstance.maybeUpdatePushRequestTimestamp(System.currentTimeMillis()
 - ClientMetricsConfigs.DEFAULT_INTERVAL_MS));
+        
assertTrue(clientInstance.maybeUpdatePushRequestTimestamp(System.currentTimeMillis()
 - ClientMetricsConfigs.INTERVAL_MS_DEFAULT));
         // Second request should be accepted as time since last request is 
greater than the push interval.
         
assertTrue(clientInstance.maybeUpdatePushRequestTimestamp(System.currentTimeMillis()));
     }
@@ -82,7 +82,7 @@ public class ClientMetricsInstanceTest {
 
     @Test
     public void testMaybeUpdatePushRequestWithImmediateRetryAfterGetValid() {
-        
assertTrue(clientInstance.maybeUpdatePushRequestTimestamp(System.currentTimeMillis()
 - ClientMetricsConfigs.DEFAULT_INTERVAL_MS));
+        
assertTrue(clientInstance.maybeUpdatePushRequestTimestamp(System.currentTimeMillis()
 - ClientMetricsConfigs.INTERVAL_MS_DEFAULT));
         
assertTrue(clientInstance.maybeUpdateGetRequestTimestamp(System.currentTimeMillis()));
         // Next request after get should be accepted.
         
assertTrue(clientInstance.maybeUpdatePushRequestTimestamp(System.currentTimeMillis()
 + 1));
diff --git 
a/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTestUtils.java
 
b/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTestUtils.java
index 46fb24902af..30c0e5f67b4 100644
--- 
a/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTestUtils.java
+++ 
b/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTestUtils.java
@@ -38,20 +38,20 @@ import java.util.Properties;
 
 public class ClientMetricsTestUtils {
 
-    public static final String DEFAULT_METRICS =
+    public static final String METRICS_TEST_DEFAULT =
         
"org.apache.kafka.client.producer.partition.queue.,org.apache.kafka.client.producer.partition.latency";
-    public static final int DEFAULT_PUSH_INTERVAL_MS = 30 * 1000; // 30 seconds
-    public static final List<String> DEFAULT_CLIENT_MATCH_PATTERNS = List.of(
+    public static final int INTERVAL_MS_TEST_DEFAULT = 30 * 1000; // 30 seconds
+    public static final List<String> MATCH_TEST_DEFAULT = List.of(
         ClientMetricsConfigs.CLIENT_SOFTWARE_NAME + "=apache-kafka-java",
         ClientMetricsConfigs.CLIENT_SOFTWARE_VERSION + "=3.5.*"
     );
     public static final int CLIENT_PORT = 56078;
 
-    public static Properties defaultProperties() {
+    public static Properties defaultTestProperties() {
         Properties props = new Properties();
-        props.put(ClientMetricsConfigs.SUBSCRIPTION_METRICS, DEFAULT_METRICS);
-        props.put(ClientMetricsConfigs.PUSH_INTERVAL_MS, 
Integer.toString(DEFAULT_PUSH_INTERVAL_MS));
-        props.put(ClientMetricsConfigs.CLIENT_MATCH_PATTERN, String.join(",", 
DEFAULT_CLIENT_MATCH_PATTERNS));
+        props.put(ClientMetricsConfigs.METRICS_CONFIG, METRICS_TEST_DEFAULT);
+        props.put(ClientMetricsConfigs.INTERVAL_MS_CONFIG, 
Integer.toString(INTERVAL_MS_TEST_DEFAULT));
+        props.put(ClientMetricsConfigs.MATCH_CONFIG, String.join(",", 
MATCH_TEST_DEFAULT));
         return props;
     }
 
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java 
b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
index 05670b63b1f..8cb15c148ed 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
@@ -37,7 +37,6 @@ import org.apache.kafka.test.TestUtils;
 
 import org.mockito.Mockito;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -151,7 +150,7 @@ public class ConfigCommandIntegrationTest {
     public void testAddConfigKeyValuesUsingCommand() throws Exception {
         try (Admin client = cluster.admin()) {
             NewTopic newTopic = new NewTopic("topic", 1, (short) 1);
-            client.createTopics(Collections.singleton(newTopic)).all().get();
+            client.createTopics(Set.of(newTopic)).all().get();
             cluster.waitForTopic("topic", 1);
             Stream<String> command = Stream.concat(quorumArgs(), Stream.of(
                     "--entity-type", "topics",
@@ -251,12 +250,14 @@ public class ConfigCommandIntegrationTest {
         try (Admin client = cluster.admin()) {
             // Add config
             Map<String, String> configs = new HashMap<>();
-            configs.put("metrics", "");
+            configs.put("metrics", "org.apache.kafka.producer.");
             configs.put("interval.ms", "6000");
             alterAndVerifyClientMetricsConfig(client, 
defaultClientMetricsName, configs, alterOpts);
 
             // Delete config
-            deleteAndVerifyClientMetricsConfigValue(client, 
defaultClientMetricsName, configs.keySet(), alterOpts);
+            configs.put("metrics", "");
+            configs.put("interval.ms", "300000");
+            deleteAndVerifyClientMetricsConfigValue(client, 
defaultClientMetricsName, configs, alterOpts);
 
             // Unknown config configured should fail
             assertThrows(ExecutionException.class, () -> 
alterConfigWithAdmin(client, singletonMap("unknown.config", "20000"), 
alterOpts));
@@ -337,7 +338,7 @@ public class ConfigCommandIntegrationTest {
     private void updateAndCheckInvalidBrokerConfig(Optional<String> 
brokerIdOrDefault) {
         List<String> alterOpts = 
generateDefaultAlterOpts(cluster.bootstrapServers());
         try (Admin client = cluster.admin()) {
-            alterConfigWithAdmin(client, brokerIdOrDefault, 
Collections.singletonMap("invalid", "2"), alterOpts);
+            alterConfigWithAdmin(client, brokerIdOrDefault, Map.of("invalid", 
"2"), alterOpts);
 
             Stream<String> describeCommand = Stream.concat(
                     Stream.concat(
@@ -357,7 +358,7 @@ public class ConfigCommandIntegrationTest {
     public void testUpdateInvalidTopicConfigs() throws ExecutionException, 
InterruptedException {
         List<String> alterOpts = asList("--bootstrap-server", 
cluster.bootstrapServers(), "--entity-type", "topics", "--alter");
         try (Admin client = cluster.admin()) {
-            client.createTopics(Collections.singletonList(new 
NewTopic("test-config-topic", 1, (short) 1))).all().get();
+            client.createTopics(List.of(new NewTopic("test-config-topic", 1, 
(short) 1))).all().get();
             assertInstanceOf(
                     InvalidConfigurationException.class,
                     assertThrows(
@@ -575,17 +576,16 @@ public class ConfigCommandIntegrationTest {
 
     private void deleteAndVerifyClientMetricsConfigValue(Admin client,
                                                          String 
clientMetricsName,
-                                                         Set<String> 
defaultConfigs,
+                                                         Map<String, String> 
defaultConfigs,
                                                          List<String> 
alterOpts) throws Exception {
         List<String> bootstrapOpts = quorumArgs().collect(Collectors.toList());
         ConfigCommand.ConfigCommandOptions deleteOpts =
             new ConfigCommand.ConfigCommandOptions(toArray(bootstrapOpts,
                     alterOpts,
-                    asList("--delete-config", String.join(",", 
defaultConfigs))));
+                    asList("--delete-config", String.join(",", 
defaultConfigs.keySet()))));
         deleteOpts.checkArgs();
         ConfigCommand.alterConfig(client, deleteOpts);
-        // There are no default configs returned for client metrics
-        verifyClientMetricsConfig(client, clientMetricsName, 
Collections.emptyMap());
+        verifyClientMetricsConfig(client, clientMetricsName, defaultConfigs);
     }
 
     private void verifyPerBrokerConfigValue(Admin client,


Reply via email to