This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5e7445a6d63 KAFKA-17516 Synonyms for client metrics configs (#17264)
5e7445a6d63 is described below
commit 5e7445a6d63f02033f1f158063c6cf7161170e71
Author: Andrew Schofield <[email protected]>
AuthorDate: Fri Mar 14 08:05:40 2025 +0000
KAFKA-17516 Synonyms for client metrics configs (#17264)
This PR brings client metrics configuration resources in line with the
other config resources in terms of handling synonyms and defaults.
Specifically, configs which are not explicitly set take their hard-coded
default values, and these are reported by `kafka-configs.sh --describe`
and `Kafka-client-metrics.sh --describe`. Previously, they were omitted
which means the administrator needed to know the default values.
The ConfigHelper was changed so that the handling of client metrics
configuration matches that of group configuration.
Reviewers: poorv Mittal <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../src/main/scala/kafka/server/ConfigHelper.scala | 45 +++++++++-----
.../kafka/api/PlaintextAdminIntegrationTest.scala | 4 --
.../ControllerConfigurationValidatorTest.scala | 12 ++--
.../scala/unit/kafka/server/KafkaApisTest.scala | 4 +-
.../apache/kafka/server/ClientMetricsManager.java | 12 ++--
.../kafka/server/metrics/ClientMetricsConfigs.java | 70 +++++++++++++++-------
.../kafka/server/ClientMetricsManagerTest.java | 58 +++++++++---------
.../server/metrics/ClientMetricsInstanceTest.java | 8 +--
.../server/metrics/ClientMetricsTestUtils.java | 14 ++---
.../kafka/tools/ConfigCommandIntegrationTest.java | 20 +++----
10 files changed, 141 insertions(+), 106 deletions(-)
diff --git a/core/src/main/scala/kafka/server/ConfigHelper.scala
b/core/src/main/scala/kafka/server/ConfigHelper.scala
index be5f8aca243..47b17a251b1 100644
--- a/core/src/main/scala/kafka/server/ConfigHelper.scala
+++ b/core/src/main/scala/kafka/server/ConfigHelper.scala
@@ -35,9 +35,9 @@ import
org.apache.kafka.common.resource.ResourceType.{CLUSTER, GROUP, TOPIC}
import org.apache.kafka.coordinator.group.GroupConfig
import org.apache.kafka.metadata.ConfigRepository
import org.apache.kafka.server.config.ServerTopicConfigSynonyms
+import org.apache.kafka.server.metrics.ClientMetricsConfigs
import org.apache.kafka.storage.internals.log.LogConfig
-import scala.collection.mutable.ListBuffer
import scala.collection.{Map, mutable}
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.RichOptional
@@ -136,21 +136,12 @@ class ConfigHelper(metadataCache: MetadataCache, config:
KafkaConfig, configRepo
.setIsSensitive(false).setReadOnly(false).setSynonyms(List.empty.asJava))
case ConfigResource.Type.CLIENT_METRICS =>
- val subscriptionName = resource.resourceName
- if (subscriptionName == null || subscriptionName.isEmpty) {
+ if (resource.resourceName == null ||
resource.resourceName.isEmpty) {
throw new InvalidRequestException("Client metrics subscription
name must not be empty")
} else {
- val entityProps = configRepository.config(new
ConfigResource(ConfigResource.Type.CLIENT_METRICS, subscriptionName))
- val configEntries = new
ListBuffer[DescribeConfigsResponseData.DescribeConfigsResourceResult]()
- entityProps.forEach((name, value) => {
- configEntries += new
DescribeConfigsResponseData.DescribeConfigsResourceResult().setName(name.toString)
-
.setValue(value.toString).setConfigSource(ConfigSource.CLIENT_METRICS_CONFIG.id())
-
.setIsSensitive(false).setReadOnly(false).setSynonyms(List.empty.asJava)
- })
-
- new DescribeConfigsResponseData.DescribeConfigsResult()
- .setErrorCode(Errors.NONE.code)
- .setConfigs(configEntries.asJava)
+ val clientMetricsProps = configRepository.config(new
ConfigResource(ConfigResource.Type.CLIENT_METRICS, resource.resourceName))
+ val clientMetricsConfig =
ClientMetricsConfigs.fromProps(ClientMetricsConfigs.defaultConfigsMap(),
clientMetricsProps)
+ createResponseConfig(allConfigs(clientMetricsConfig),
createClientMetricsConfigEntry(clientMetricsConfig, clientMetricsProps,
includeSynonyms, includeDocumentation))
}
case ConfigResource.Type.GROUP =>
@@ -185,8 +176,8 @@ class ConfigHelper(metadataCache: MetadataCache, config:
KafkaConfig, configRepo
}
}
- def createGroupConfigEntry(groupConfig: GroupConfig, groupProps: Properties,
includeSynonyms: Boolean, includeDocumentation: Boolean)
- (name: String, value: Any):
DescribeConfigsResponseData.DescribeConfigsResourceResult = {
+ private def createGroupConfigEntry(groupConfig: GroupConfig, groupProps:
Properties, includeSynonyms: Boolean, includeDocumentation: Boolean)
+ (name: String, value: Any):
DescribeConfigsResponseData.DescribeConfigsResourceResult = {
val allNames = brokerSynonyms(name)
val configEntryType = GroupConfig.configType(name).toScala
val isSensitive = KafkaConfig.maybeSensitive(configEntryType)
@@ -210,6 +201,28 @@ class ConfigHelper(metadataCache: MetadataCache, config:
KafkaConfig, configRepo
.setDocumentation(configDocumentation).setConfigType(dataType.id)
}
+ private def createClientMetricsConfigEntry(clientMetricsConfig:
ClientMetricsConfigs, clientMetricsProps: Properties, includeSynonyms: Boolean,
includeDocumentation: Boolean)
+ (name: String, value: Any):
DescribeConfigsResponseData.DescribeConfigsResourceResult = {
+ val configEntryType = ClientMetricsConfigs.configType(name).toScala
+ val valueAsString = ConfigDef.convertToString(value,
configEntryType.orNull)
+ val allSynonyms = {
+ if (!clientMetricsProps.containsKey(name)) {
+ List.empty
+ } else {
+ List(new
DescribeConfigsResponseData.DescribeConfigsSynonym().setName(name).setValue(valueAsString)
+ .setSource(ConfigSource.CLIENT_METRICS_CONFIG.id))
+ }
+ }
+ val source = if (allSynonyms.isEmpty) ConfigSource.DEFAULT_CONFIG.id else
allSynonyms.head.source
+ val synonyms = if (!includeSynonyms) List.empty else allSynonyms
+ val dataType = configResponseType(configEntryType)
+ val configDocumentation = if (includeDocumentation)
clientMetricsConfig.documentationOf(name) else null
+ new DescribeConfigsResponseData.DescribeConfigsResourceResult()
+ .setName(name).setValue(valueAsString).setConfigSource(source)
+ .setIsSensitive(false).setReadOnly(false).setSynonyms(synonyms.asJava)
+ .setDocumentation(configDocumentation).setConfigType(dataType.id)
+ }
+
def createTopicConfigEntry(logConfig: LogConfig, topicProps: Properties,
includeSynonyms: Boolean, includeDocumentation: Boolean)
(name: String, value: Any):
DescribeConfigsResponseData.DescribeConfigsResourceResult = {
val configEntryType = LogConfig.configType(name).toScala
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 18c5c2bbd05..af2a27fdf4f 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -944,10 +944,6 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
val groupResource = new ConfigResource(ConfigResource.Type.GROUP,
"none_group")
val groupResult =
client.describeConfigs(Seq(groupResource).asJava).all().get().get(groupResource)
assertNotEquals(0, groupResult.entries().size())
-
- val metricResource = new
ConfigResource(ConfigResource.Type.CLIENT_METRICS, "none_metric")
- val metricResult =
client.describeConfigs(Seq(metricResource).asJava).all().get().get(metricResource)
- assertEquals(0, metricResult.entries().size())
}
@Test
diff --git
a/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
b/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
index 9f903336d23..3056753f53b 100644
---
a/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
+++
b/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
@@ -128,9 +128,9 @@ class ControllerConfigurationValidatorTest {
@Test
def testValidClientMetricsConfig(): Unit = {
val config = new util.TreeMap[String, String]()
- config.put(ClientMetricsConfigs.PUSH_INTERVAL_MS, "2000")
- config.put(ClientMetricsConfigs.SUBSCRIPTION_METRICS,
"org.apache.kafka.client.producer.partition.queue.,org.apache.kafka.client.producer.partition.latency")
- config.put(ClientMetricsConfigs.CLIENT_MATCH_PATTERN,
"client_instance_id=b69cc35a-7a54-4790-aa69-cc2bd4ee4538,client_id=1" +
+ config.put(ClientMetricsConfigs.INTERVAL_MS_CONFIG, "2000")
+ config.put(ClientMetricsConfigs.METRICS_CONFIG,
"org.apache.kafka.client.producer.partition.queue.,org.apache.kafka.client.producer.partition.latency")
+ config.put(ClientMetricsConfigs.MATCH_CONFIG,
"client_instance_id=b69cc35a-7a54-4790-aa69-cc2bd4ee4538,client_id=1" +
",client_software_name=apache-kafka-java,client_software_version=2.8.0-SNAPSHOT,client_source_address=127.0.0.1,"
+
"client_source_port=1234")
validator.validate(new ConfigResource(CLIENT_METRICS, "subscription-1"),
config, emptyMap())
@@ -147,12 +147,12 @@ class ControllerConfigurationValidatorTest {
@Test
def testInvalidIntervalClientMetricsConfig(): Unit = {
val config = new util.TreeMap[String, String]()
- config.put(ClientMetricsConfigs.PUSH_INTERVAL_MS, "10")
+ config.put(ClientMetricsConfigs.INTERVAL_MS_CONFIG, "10")
assertEquals("Invalid value 10 for interval.ms, interval must be between
100 and 3600000 (1 hour)",
assertThrows(classOf[InvalidRequestException], () => validator.validate(
new ConfigResource(CLIENT_METRICS, "subscription-1"), config,
emptyMap())). getMessage)
- config.put(ClientMetricsConfigs.PUSH_INTERVAL_MS, "3600001")
+ config.put(ClientMetricsConfigs.INTERVAL_MS_CONFIG, "3600001")
assertEquals("Invalid value 3600001 for interval.ms, interval must be
between 100 and 3600000 (1 hour)",
assertThrows(classOf[InvalidRequestException], () => validator.validate(
new ConfigResource(CLIENT_METRICS, "subscription-1"), config,
emptyMap())). getMessage)
@@ -170,7 +170,7 @@ class ControllerConfigurationValidatorTest {
@Test
def testInvalidMatchClientMetricsConfig(): Unit = {
val config = new util.TreeMap[String, String]()
- config.put(ClientMetricsConfigs.CLIENT_MATCH_PATTERN, "10")
+ config.put(ClientMetricsConfigs.MATCH_CONFIG, "10")
assertEquals("Illegal client matching pattern: 10",
assertThrows(classOf[InvalidConfigurationException], () =>
validator.validate(
new ConfigResource(CLIENT_METRICS, "subscription-1"), config,
emptyMap())). getMessage)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 9325c92bd59..337dc9a62f3 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -373,7 +373,7 @@ class KafkaApisTest extends Logging {
val subscriptionName = "client_metric_subscription_1"
val authorizedResource = new
ConfigResource(ConfigResource.Type.CLIENT_METRICS, subscriptionName)
- val props = ClientMetricsTestUtils.defaultProperties
+ val props = ClientMetricsTestUtils.defaultTestProperties
val configEntries = new util.ArrayList[AlterConfigsRequest.ConfigEntry]()
props.forEach((x, y) =>
configEntries.add(new
AlterConfigsRequest.ConfigEntry(x.asInstanceOf[String],
y.asInstanceOf[String])))
@@ -444,7 +444,7 @@ class KafkaApisTest extends Logging {
val resource = new ConfigResource(ConfigResource.Type.CLIENT_METRICS,
subscriptionName)
val configRepository: ConfigRepository = mock(classOf[ConfigRepository])
- val cmConfigs = ClientMetricsTestUtils.defaultProperties
+ val cmConfigs = ClientMetricsTestUtils.defaultTestProperties
when(configRepository.config(resource)).thenReturn(cmConfigs)
metadataCache = mock(classOf[KRaftMetadataCache])
diff --git
a/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
b/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
index 5b77004d0ae..238e189202b 100644
--- a/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
+++ b/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
@@ -244,9 +244,9 @@ public class ClientMetricsManager implements AutoCloseable {
}
private void updateClientSubscription(String subscriptionName,
ClientMetricsConfigs configs) {
- List<String> metrics =
configs.getList(ClientMetricsConfigs.SUBSCRIPTION_METRICS);
- int pushInterval =
configs.getInt(ClientMetricsConfigs.PUSH_INTERVAL_MS);
- List<String> clientMatchPattern =
configs.getList(ClientMetricsConfigs.CLIENT_MATCH_PATTERN);
+ List<String> metrics =
configs.getList(ClientMetricsConfigs.METRICS_CONFIG);
+ int pushInterval =
configs.getInt(ClientMetricsConfigs.INTERVAL_MS_CONFIG);
+ List<String> clientMatchPattern =
configs.getList(ClientMetricsConfigs.MATCH_CONFIG);
SubscriptionInfo newSubscription =
new SubscriptionInfo(subscriptionName, metrics, pushInterval,
@@ -329,7 +329,7 @@ public class ClientMetricsManager implements AutoCloseable {
private ClientMetricsInstance createClientInstance(Uuid clientInstanceId,
ClientMetricsInstanceMetadata instanceMetadata) {
- int pushIntervalMs = ClientMetricsConfigs.DEFAULT_INTERVAL_MS;
+ int pushIntervalMs = ClientMetricsConfigs.INTERVAL_MS_DEFAULT;
// Keep a set of metrics to avoid duplicates in case of overlapping
subscriptions.
Set<String> subscribedMetrics = new HashSet<>();
boolean allMetricsSubscribed = false;
@@ -338,7 +338,7 @@ public class ClientMetricsManager implements AutoCloseable {
for (SubscriptionInfo info : subscriptionMap.values()) {
if (instanceMetadata.isMatch(info.matchPattern())) {
allMetricsSubscribed = allMetricsSubscribed ||
info.metrics().contains(
- ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS_CONFIG);
+ ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS);
subscribedMetrics.addAll(info.metrics());
pushIntervalMs = Math.min(pushIntervalMs, info.intervalMs());
}
@@ -351,7 +351,7 @@ public class ClientMetricsManager implements AutoCloseable {
if (allMetricsSubscribed) {
// Only add an * to indicate that all metrics are subscribed.
subscribedMetrics.clear();
-
subscribedMetrics.add(ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS_CONFIG);
+ subscribedMetrics.add(ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS);
}
int subscriptionId = computeSubscriptionId(subscribedMetrics,
pushIntervalMs, clientInstanceId);
diff --git
a/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java
b/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java
index ba94486e83c..2560854eb55 100644
---
a/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java
+++
b/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java
@@ -23,11 +23,10 @@ import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.InvalidRequestException;
-import java.util.Arrays;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
@@ -52,7 +51,8 @@ import java.util.regex.PatternSyntaxException;
* <ul>
* <li> "name" is a unique name for the subscription. This is used to
identify the subscription in
* the broker. Ex: "METRICS-SUB"
- * <li> "metrics" value should be comma separated metrics list. A prefix
match on the requested metrics
+ *
+ * <li> "metrics" value should be comma-separated metrics list. A prefix
match on the requested metrics
* is performed in clients to determine subscribed metrics. An empty
list means no metrics subscribed.
* A list containing just an empty string means all metrics
subscribed.
* Ex:
"org.apache.kafka.producer.partition.queue.,org.apache.kafka.producer.partition.latency"
@@ -60,19 +60,19 @@ import java.util.regex.PatternSyntaxException;
* <li> "interval.ms" should be between 100 and 3600000 (1 hour). This is
the interval at which the client
* should push the metrics to the broker.
*
- * <li> "match" is a comma separated list of client match patterns, in case
if there is no matching
+ * <li> "match" is a comma-separated list of client match patterns, in case
if there is no matching
* pattern specified then broker considers that as all match which
means the associated metrics
* applies to all the clients. Ex: "client_software_name = Java,
client_software_version = 11.1.*"
* which means all Java clients with any sub versions of 11.1 will be
matched i.e. 11.1.1, 11.1.2 etc.
* </ul>
- * For more information please look at kip-714:
- *
https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Clientmetricsconfiguration
+ * For more information please look at
+ * <a
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Clientmetricsconfiguration">KIP-714</a>
*/
public class ClientMetricsConfigs extends AbstractConfig {
- public static final String SUBSCRIPTION_METRICS = "metrics";
- public static final String PUSH_INTERVAL_MS = "interval.ms";
- public static final String CLIENT_MATCH_PATTERN = "match";
+ public static final String METRICS_CONFIG = "metrics";
+ public static final String INTERVAL_MS_CONFIG = "interval.ms";
+ public static final String MATCH_CONFIG = "match";
public static final String CLIENT_INSTANCE_ID = "client_instance_id";
public static final String CLIENT_ID = "client_id";
@@ -82,34 +82,50 @@ public class ClientMetricsConfigs extends AbstractConfig {
public static final String CLIENT_SOURCE_PORT = "client_source_port";
// '*' in client-metrics resource configs indicates that all the metrics
are subscribed.
- public static final String ALL_SUBSCRIBED_METRICS_CONFIG = "*";
+ public static final String ALL_SUBSCRIBED_METRICS = "*";
+
+ public static final List<String> METRICS_DEFAULT = List.of();
- public static final int DEFAULT_INTERVAL_MS = 5 * 60 * 1000; // 5 minutes
+ public static final int INTERVAL_MS_DEFAULT = 5 * 60 * 1000; // 5 minutes
private static final int MIN_INTERVAL_MS = 100; // 100ms
private static final int MAX_INTERVAL_MS = 3600000; // 1 hour
- private static final Set<String> ALLOWED_MATCH_PARAMS = new
HashSet<>(Arrays.asList(
+ public static final List<String> MATCH_DEFAULT = List.of();
+
+ private static final Set<String> ALLOWED_MATCH_PARAMS = Set.of(
CLIENT_INSTANCE_ID,
CLIENT_ID,
CLIENT_SOFTWARE_NAME,
CLIENT_SOFTWARE_VERSION,
CLIENT_SOURCE_ADDRESS,
CLIENT_SOURCE_PORT
- ));
+ );
private static final ConfigDef CONFIG = new ConfigDef()
- .define(SUBSCRIPTION_METRICS, Type.LIST, List.of(), Importance.MEDIUM,
"Subscription metrics list")
- .define(PUSH_INTERVAL_MS, Type.INT, DEFAULT_INTERVAL_MS,
Importance.MEDIUM, "Push interval in milliseconds")
- .define(CLIENT_MATCH_PATTERN, Type.LIST, List.of(), Importance.MEDIUM,
"Client match pattern list");
+ .define(METRICS_CONFIG, Type.LIST, METRICS_DEFAULT, Importance.MEDIUM,
"Telemetry metric name prefix list")
+ .define(INTERVAL_MS_CONFIG, Type.INT, INTERVAL_MS_DEFAULT,
Importance.MEDIUM, "Metrics push interval in milliseconds")
+ .define(MATCH_CONFIG, Type.LIST, MATCH_DEFAULT, Importance.MEDIUM,
"Client match criteria");
public ClientMetricsConfigs(Properties props) {
- super(CONFIG, props);
+ super(CONFIG, props, false);
}
public static ConfigDef configDef() {
return CONFIG;
}
+ public static Optional<Type> configType(String configName) {
+ return Optional.ofNullable(CONFIG.configKeys().get(configName)).map(c
-> c.type);
+ }
+
+ public static Map<String, Object> defaultConfigsMap() {
+ Map<String, Object> clientMetricsProps = new HashMap<>();
+ clientMetricsProps.put(METRICS_CONFIG, METRICS_DEFAULT);
+ clientMetricsProps.put(INTERVAL_MS_CONFIG, INTERVAL_MS_DEFAULT);
+ clientMetricsProps.put(MATCH_CONFIG, MATCH_DEFAULT);
+ return clientMetricsProps;
+ }
+
public static Set<String> names() {
return CONFIG.names();
}
@@ -134,18 +150,18 @@ public class ClientMetricsConfigs extends AbstractConfig {
Map<String, Object> parsed = CONFIG.parse(properties);
// Make sure that push interval is between 100ms and 1 hour.
- if (properties.containsKey(PUSH_INTERVAL_MS)) {
- Integer pushIntervalMs = (Integer) parsed.get(PUSH_INTERVAL_MS);
+ if (properties.containsKey(INTERVAL_MS_CONFIG)) {
+ int pushIntervalMs = (Integer) parsed.get(INTERVAL_MS_CONFIG);
if (pushIntervalMs < MIN_INTERVAL_MS || pushIntervalMs >
MAX_INTERVAL_MS) {
String msg = String.format("Invalid value %s for %s, interval
must be between 100 and 3600000 (1 hour)",
- pushIntervalMs, PUSH_INTERVAL_MS);
+ pushIntervalMs, INTERVAL_MS_CONFIG);
throw new InvalidRequestException(msg);
}
}
// Make sure that client match patterns are valid by parsing them.
- if (properties.containsKey(CLIENT_MATCH_PATTERN)) {
- List<String> patterns = (List<String>)
parsed.get(CLIENT_MATCH_PATTERN);
+ if (properties.containsKey(MATCH_CONFIG)) {
+ List<String> patterns = (List<String>) parsed.get(MATCH_CONFIG);
// Parse the client matching patterns to validate if the patterns
are valid.
parseMatchingPatterns(patterns);
}
@@ -193,4 +209,14 @@ public class ClientMetricsConfigs extends AbstractConfig {
private static boolean isValidParam(String paramName) {
return ALLOWED_MATCH_PARAMS.contains(paramName);
}
+
+ /**
+ * Create a client metrics config instance using the given properties and
defaults.
+ */
+ public static ClientMetricsConfigs fromProps(Map<?, ?> defaults,
Properties overrides) {
+ Properties props = new Properties();
+ props.putAll(defaults);
+ props.putAll(overrides);
+ return new ClientMetricsConfigs(props);
+ }
}
diff --git
a/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java
b/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java
index f01776f4488..4adb501cef5 100644
--- a/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java
+++ b/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java
@@ -109,7 +109,7 @@ public class ClientMetricsManagerTest {
assertTrue(clientMetricsManager.subscriptions().isEmpty());
assertEquals(0, clientMetricsManager.subscriptionUpdateVersion());
- clientMetricsManager.updateSubscription("sub-1",
ClientMetricsTestUtils.defaultProperties());
+ clientMetricsManager.updateSubscription("sub-1",
ClientMetricsTestUtils.defaultTestProperties());
assertEquals(1, clientMetricsManager.subscriptions().size());
assertNotNull(clientMetricsManager.subscriptionInfo("sub-1"));
@@ -118,17 +118,17 @@ public class ClientMetricsManagerTest {
Set<String> metrics = subscriptionInfo.metrics();
// Validate metrics.
- assertEquals(ClientMetricsTestUtils.DEFAULT_METRICS.split(",").length,
metrics.size());
-
Arrays.stream(ClientMetricsTestUtils.DEFAULT_METRICS.split(",")).forEach(metric
->
+
assertEquals(ClientMetricsTestUtils.METRICS_TEST_DEFAULT.split(",").length,
metrics.size());
+
Arrays.stream(ClientMetricsTestUtils.METRICS_TEST_DEFAULT.split(",")).forEach(metric
->
assertTrue(metrics.contains(metric)));
// Validate push interval.
-
assertEquals(ClientMetricsTestUtils.defaultProperties().getProperty(ClientMetricsConfigs.PUSH_INTERVAL_MS),
+
assertEquals(ClientMetricsTestUtils.defaultTestProperties().getProperty(ClientMetricsConfigs.INTERVAL_MS_CONFIG),
String.valueOf(subscriptionInfo.intervalMs()));
// Validate match patterns.
-
assertEquals(ClientMetricsTestUtils.DEFAULT_CLIENT_MATCH_PATTERNS.size(),
+ assertEquals(ClientMetricsTestUtils.MATCH_TEST_DEFAULT.size(),
subscriptionInfo.matchPattern().size());
- ClientMetricsTestUtils.DEFAULT_CLIENT_MATCH_PATTERNS.forEach(pattern
-> {
+ ClientMetricsTestUtils.MATCH_TEST_DEFAULT.forEach(pattern -> {
String[] split = pattern.split("=");
assertTrue(subscriptionInfo.matchPattern().containsKey(split[0]));
assertEquals(split[1],
subscriptionInfo.matchPattern().get(split[0]).pattern());
@@ -191,7 +191,7 @@ public class ClientMetricsManagerTest {
@Test
public void testGetTelemetry() throws Exception {
- clientMetricsManager.updateSubscription("sub-1",
ClientMetricsTestUtils.defaultProperties());
+ clientMetricsManager.updateSubscription("sub-1",
ClientMetricsTestUtils.defaultTestProperties());
assertEquals(1, clientMetricsManager.subscriptions().size());
GetTelemetrySubscriptionsRequest request = new
GetTelemetrySubscriptionsRequest.Builder(
@@ -203,8 +203,8 @@ public class ClientMetricsManagerTest {
assertNotNull(response.data().clientInstanceId());
assertTrue(response.data().subscriptionId() != 0);
- assertEquals(ClientMetricsTestUtils.DEFAULT_METRICS.split(",").length,
response.data().requestedMetrics().size());
-
Arrays.stream(ClientMetricsTestUtils.DEFAULT_METRICS.split(",")).forEach(metric
->
+
assertEquals(ClientMetricsTestUtils.METRICS_TEST_DEFAULT.split(",").length,
response.data().requestedMetrics().size());
+
Arrays.stream(ClientMetricsTestUtils.METRICS_TEST_DEFAULT.split(",")).forEach(metric
->
assertTrue(response.data().requestedMetrics().contains(metric)));
assertEquals(4, response.data().acceptedCompressionTypes().size());
@@ -213,7 +213,7 @@ public class ClientMetricsManagerTest {
assertEquals(CompressionType.LZ4.id,
response.data().acceptedCompressionTypes().get(1));
assertEquals(CompressionType.GZIP.id,
response.data().acceptedCompressionTypes().get(2));
assertEquals(CompressionType.SNAPPY.id,
response.data().acceptedCompressionTypes().get(3));
- assertEquals(ClientMetricsTestUtils.DEFAULT_PUSH_INTERVAL_MS,
response.data().pushIntervalMs());
+ assertEquals(ClientMetricsTestUtils.INTERVAL_MS_TEST_DEFAULT,
response.data().pushIntervalMs());
assertTrue(response.data().deltaTemporality());
assertEquals(100, response.data().telemetryMaxBytes());
assertEquals(Errors.NONE, response.error());
@@ -252,7 +252,7 @@ public class ClientMetricsManagerTest {
assertTrue(response.data().subscriptionId() != 0);
assertTrue(response.data().requestedMetrics().isEmpty());
assertEquals(4, response.data().acceptedCompressionTypes().size());
- assertEquals(ClientMetricsConfigs.DEFAULT_INTERVAL_MS,
response.data().pushIntervalMs());
+ assertEquals(ClientMetricsConfigs.INTERVAL_MS_DEFAULT,
response.data().pushIntervalMs());
assertTrue(response.data().deltaTemporality());
assertEquals(100, response.data().telemetryMaxBytes());
assertEquals(Errors.NONE, response.error());
@@ -273,7 +273,7 @@ public class ClientMetricsManagerTest {
assertNotNull(response.data().clientInstanceId());
assertEquals(Errors.NONE, response.error());
- time.sleep(ClientMetricsConfigs.DEFAULT_INTERVAL_MS);
+ time.sleep(ClientMetricsConfigs.INTERVAL_MS_DEFAULT);
request = new GetTelemetrySubscriptionsRequest.Builder(
new
GetTelemetrySubscriptionsRequestData().setClientInstanceId(response.data().clientInstanceId()),
true).build();
@@ -286,9 +286,9 @@ public class ClientMetricsManagerTest {
@Test
public void testGetTelemetryAllMetricSubscribedSubscription() throws
UnknownHostException {
- clientMetricsManager.updateSubscription("sub-1",
ClientMetricsTestUtils.defaultProperties());
+ clientMetricsManager.updateSubscription("sub-1",
ClientMetricsTestUtils.defaultTestProperties());
Properties properties = new Properties();
- properties.put("metrics",
ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS_CONFIG);
+ properties.put("metrics", ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS);
clientMetricsManager.updateSubscription("sub-2", properties);
assertEquals(2, clientMetricsManager.subscriptions().size());
@@ -303,10 +303,10 @@ public class ClientMetricsManagerTest {
assertTrue(response.data().subscriptionId() != 0);
assertEquals(1, response.data().requestedMetrics().size());
-
assertTrue(response.data().requestedMetrics().contains(ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS_CONFIG));
+
assertTrue(response.data().requestedMetrics().contains(ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS));
assertEquals(4, response.data().acceptedCompressionTypes().size());
- assertEquals(ClientMetricsTestUtils.DEFAULT_PUSH_INTERVAL_MS,
response.data().pushIntervalMs());
+ assertEquals(ClientMetricsTestUtils.INTERVAL_MS_TEST_DEFAULT,
response.data().pushIntervalMs());
assertTrue(response.data().deltaTemporality());
assertEquals(100, response.data().telemetryMaxBytes());
assertEquals(Errors.NONE, response.error());
@@ -387,7 +387,7 @@ public class ClientMetricsManagerTest {
@Test
public void testGetTelemetryUpdateSubscription() throws
UnknownHostException {
- clientMetricsManager.updateSubscription("sub-1",
ClientMetricsTestUtils.defaultProperties());
+ clientMetricsManager.updateSubscription("sub-1",
ClientMetricsTestUtils.defaultTestProperties());
assertEquals(1, clientMetricsManager.subscriptions().size());
GetTelemetrySubscriptionsRequest request = new
GetTelemetrySubscriptionsRequest.Builder(
@@ -404,7 +404,7 @@ public class ClientMetricsManagerTest {
// Update subscription
Properties properties = new Properties();
- properties.put("metrics",
ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS_CONFIG);
+ properties.put("metrics", ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS);
clientMetricsManager.updateSubscription("sub-2", properties);
assertEquals(2, clientMetricsManager.subscriptions().size());
@@ -490,7 +490,7 @@ public class ClientMetricsManagerTest {
CountDownLatch lock = new CountDownLatch(2);
List<GetTelemetrySubscriptionsResponse> responses =
Collections.synchronizedList(new ArrayList<>());
- clientMetricsManager.updateSubscription("sub-1",
ClientMetricsTestUtils.defaultProperties());
+ clientMetricsManager.updateSubscription("sub-1",
ClientMetricsTestUtils.defaultTestProperties());
assertEquals(1, clientMetricsManager.subscriptions().size());
Thread thread = new Thread(() -> {
@@ -543,7 +543,7 @@ public class ClientMetricsManagerTest {
@Test
public void testPushTelemetry() throws Exception {
- clientMetricsManager.updateSubscription("sub-1",
ClientMetricsTestUtils.defaultProperties());
+ clientMetricsManager.updateSubscription("sub-1",
ClientMetricsTestUtils.defaultTestProperties());
assertEquals(1, clientMetricsManager.subscriptions().size());
GetTelemetrySubscriptionsRequest subscriptionsRequest = new
GetTelemetrySubscriptionsRequest.Builder(
@@ -627,7 +627,7 @@ public class ClientMetricsManagerTest {
@Test
public void testPushTelemetryAfterPushIntervalTime() throws
UnknownHostException {
- clientMetricsManager.updateSubscription("sub-1",
ClientMetricsTestUtils.defaultProperties());
+ clientMetricsManager.updateSubscription("sub-1",
ClientMetricsTestUtils.defaultTestProperties());
assertEquals(1, clientMetricsManager.subscriptions().size());
GetTelemetrySubscriptionsRequest subscriptionsRequest = new
GetTelemetrySubscriptionsRequest.Builder(
@@ -648,7 +648,7 @@ public class ClientMetricsManagerTest {
assertEquals(Errors.NONE, response.error());
- time.sleep(ClientMetricsTestUtils.DEFAULT_PUSH_INTERVAL_MS);
+ time.sleep(ClientMetricsTestUtils.INTERVAL_MS_TEST_DEFAULT);
response = clientMetricsManager.processPushTelemetryRequest(
request, ClientMetricsTestUtils.requestContext());
@@ -1016,7 +1016,7 @@ public class ClientMetricsManagerTest {
.setCompressionType(CompressionType.NONE.id)
.setMetrics(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8))),
true).build();
- clientMetricsManager.updateSubscription("sub-1",
ClientMetricsTestUtils.defaultProperties());
+ clientMetricsManager.updateSubscription("sub-1",
ClientMetricsTestUtils.defaultTestProperties());
assertEquals(1, clientMetricsManager.subscriptions().size());
CountDownLatch lock = new CountDownLatch(2);
@@ -1087,7 +1087,7 @@ public class ClientMetricsManagerTest {
ClientMetricsManager clientMetricsManager = new
ClientMetricsManager(receiverPlugin, 100, time, 100, kafkaMetrics)
) {
- clientMetricsManager.updateSubscription("sub-1",
ClientMetricsTestUtils.defaultProperties());
+ clientMetricsManager.updateSubscription("sub-1",
ClientMetricsTestUtils.defaultTestProperties());
assertEquals(1, clientMetricsManager.subscriptions().size());
GetTelemetrySubscriptionsRequest subscriptionsRequest = new
GetTelemetrySubscriptionsRequest.Builder(
@@ -1127,8 +1127,8 @@ public class ClientMetricsManagerTest {
@Test
public void testCacheEviction() throws Exception {
Properties properties = new Properties();
- properties.put("metrics",
ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS_CONFIG);
- properties.put(ClientMetricsConfigs.PUSH_INTERVAL_MS, "100");
+ properties.put("metrics", ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS);
+ properties.put(ClientMetricsConfigs.INTERVAL_MS_CONFIG, "100");
clientMetricsManager.updateSubscription("sub-1", properties);
GetTelemetrySubscriptionsRequest request = new
GetTelemetrySubscriptionsRequest.Builder(
@@ -1167,8 +1167,8 @@ public class ClientMetricsManagerTest {
@Test
public void testCacheEvictionWithMultipleClients() throws Exception {
Properties properties = new Properties();
- properties.put("metrics",
ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS_CONFIG);
- properties.put(ClientMetricsConfigs.PUSH_INTERVAL_MS, "100");
+ properties.put("metrics", ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS);
+ properties.put(ClientMetricsConfigs.INTERVAL_MS_CONFIG, "100");
clientMetricsManager.updateSubscription("sub-1", properties);
GetTelemetrySubscriptionsRequest request = new
GetTelemetrySubscriptionsRequest.Builder(
@@ -1227,7 +1227,7 @@ public class ClientMetricsManagerTest {
assertNotNull(instance.expirationTimerTask());
// Update subscription
- clientMetricsManager.updateSubscription("sub-1",
ClientMetricsTestUtils.defaultProperties());
+ clientMetricsManager.updateSubscription("sub-1",
ClientMetricsTestUtils.defaultTestProperties());
assertEquals(1, clientMetricsManager.subscriptions().size());
request = new GetTelemetrySubscriptionsRequest.Builder(
diff --git
a/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsInstanceTest.java
b/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsInstanceTest.java
index 01826ac1648..d965c40ce3f 100644
---
a/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsInstanceTest.java
+++
b/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsInstanceTest.java
@@ -35,7 +35,7 @@ public class ClientMetricsInstanceTest {
Uuid uuid = Uuid.randomUuid();
ClientMetricsInstanceMetadata instanceMetadata = new
ClientMetricsInstanceMetadata(uuid,
ClientMetricsTestUtils.requestContext());
- clientInstance = new ClientMetricsInstance(uuid, instanceMetadata, 0,
0, null, ClientMetricsConfigs.DEFAULT_INTERVAL_MS);
+ clientInstance = new ClientMetricsInstance(uuid, instanceMetadata, 0,
0, null, ClientMetricsConfigs.INTERVAL_MS_DEFAULT);
}
@Test
@@ -47,7 +47,7 @@ public class ClientMetricsInstanceTest {
@Test
public void testMaybeUpdateGetRequestAfterElapsedTimeValid() {
-
assertTrue(clientInstance.maybeUpdateGetRequestTimestamp(System.currentTimeMillis()
- ClientMetricsConfigs.DEFAULT_INTERVAL_MS));
+
assertTrue(clientInstance.maybeUpdateGetRequestTimestamp(System.currentTimeMillis()
- ClientMetricsConfigs.INTERVAL_MS_DEFAULT));
// Second request should be accepted as time since last request is
greater than the push interval.
assertTrue(clientInstance.maybeUpdateGetRequestTimestamp(System.currentTimeMillis()));
}
@@ -61,7 +61,7 @@ public class ClientMetricsInstanceTest {
@Test
public void testMaybeUpdatePushRequestAfterElapsedTimeValid() {
-
assertTrue(clientInstance.maybeUpdatePushRequestTimestamp(System.currentTimeMillis()
- ClientMetricsConfigs.DEFAULT_INTERVAL_MS));
+
assertTrue(clientInstance.maybeUpdatePushRequestTimestamp(System.currentTimeMillis()
- ClientMetricsConfigs.INTERVAL_MS_DEFAULT));
// Second request should be accepted as time since last request is
greater than the push interval.
assertTrue(clientInstance.maybeUpdatePushRequestTimestamp(System.currentTimeMillis()));
}
@@ -82,7 +82,7 @@ public class ClientMetricsInstanceTest {
@Test
public void testMaybeUpdatePushRequestWithImmediateRetryAfterGetValid() {
-
assertTrue(clientInstance.maybeUpdatePushRequestTimestamp(System.currentTimeMillis()
- ClientMetricsConfigs.DEFAULT_INTERVAL_MS));
+
assertTrue(clientInstance.maybeUpdatePushRequestTimestamp(System.currentTimeMillis()
- ClientMetricsConfigs.INTERVAL_MS_DEFAULT));
assertTrue(clientInstance.maybeUpdateGetRequestTimestamp(System.currentTimeMillis()));
// Next request after get should be accepted.
assertTrue(clientInstance.maybeUpdatePushRequestTimestamp(System.currentTimeMillis()
+ 1));
diff --git
a/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTestUtils.java
b/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTestUtils.java
index 46fb24902af..30c0e5f67b4 100644
---
a/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTestUtils.java
+++
b/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTestUtils.java
@@ -38,20 +38,20 @@ import java.util.Properties;
public class ClientMetricsTestUtils {
- public static final String DEFAULT_METRICS =
+ public static final String METRICS_TEST_DEFAULT =
"org.apache.kafka.client.producer.partition.queue.,org.apache.kafka.client.producer.partition.latency";
- public static final int DEFAULT_PUSH_INTERVAL_MS = 30 * 1000; // 30 seconds
- public static final List<String> DEFAULT_CLIENT_MATCH_PATTERNS = List.of(
+ public static final int INTERVAL_MS_TEST_DEFAULT = 30 * 1000; // 30 seconds
+ public static final List<String> MATCH_TEST_DEFAULT = List.of(
ClientMetricsConfigs.CLIENT_SOFTWARE_NAME + "=apache-kafka-java",
ClientMetricsConfigs.CLIENT_SOFTWARE_VERSION + "=3.5.*"
);
public static final int CLIENT_PORT = 56078;
- public static Properties defaultProperties() {
+ public static Properties defaultTestProperties() {
Properties props = new Properties();
- props.put(ClientMetricsConfigs.SUBSCRIPTION_METRICS, DEFAULT_METRICS);
- props.put(ClientMetricsConfigs.PUSH_INTERVAL_MS,
Integer.toString(DEFAULT_PUSH_INTERVAL_MS));
- props.put(ClientMetricsConfigs.CLIENT_MATCH_PATTERN, String.join(",",
DEFAULT_CLIENT_MATCH_PATTERNS));
+ props.put(ClientMetricsConfigs.METRICS_CONFIG, METRICS_TEST_DEFAULT);
+ props.put(ClientMetricsConfigs.INTERVAL_MS_CONFIG,
Integer.toString(INTERVAL_MS_TEST_DEFAULT));
+ props.put(ClientMetricsConfigs.MATCH_CONFIG, String.join(",",
MATCH_TEST_DEFAULT));
return props;
}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
index 05670b63b1f..8cb15c148ed 100644
---
a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
@@ -37,7 +37,6 @@ import org.apache.kafka.test.TestUtils;
import org.mockito.Mockito;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -151,7 +150,7 @@ public class ConfigCommandIntegrationTest {
public void testAddConfigKeyValuesUsingCommand() throws Exception {
try (Admin client = cluster.admin()) {
NewTopic newTopic = new NewTopic("topic", 1, (short) 1);
- client.createTopics(Collections.singleton(newTopic)).all().get();
+ client.createTopics(Set.of(newTopic)).all().get();
cluster.waitForTopic("topic", 1);
Stream<String> command = Stream.concat(quorumArgs(), Stream.of(
"--entity-type", "topics",
@@ -251,12 +250,14 @@ public class ConfigCommandIntegrationTest {
try (Admin client = cluster.admin()) {
// Add config
Map<String, String> configs = new HashMap<>();
- configs.put("metrics", "");
+ configs.put("metrics", "org.apache.kafka.producer.");
configs.put("interval.ms", "6000");
alterAndVerifyClientMetricsConfig(client,
defaultClientMetricsName, configs, alterOpts);
// Delete config
- deleteAndVerifyClientMetricsConfigValue(client,
defaultClientMetricsName, configs.keySet(), alterOpts);
+ configs.put("metrics", "");
+ configs.put("interval.ms", "300000");
+ deleteAndVerifyClientMetricsConfigValue(client,
defaultClientMetricsName, configs, alterOpts);
// Unknown config configured should fail
assertThrows(ExecutionException.class, () ->
alterConfigWithAdmin(client, singletonMap("unknown.config", "20000"),
alterOpts));
@@ -337,7 +338,7 @@ public class ConfigCommandIntegrationTest {
private void updateAndCheckInvalidBrokerConfig(Optional<String>
brokerIdOrDefault) {
List<String> alterOpts =
generateDefaultAlterOpts(cluster.bootstrapServers());
try (Admin client = cluster.admin()) {
- alterConfigWithAdmin(client, brokerIdOrDefault,
Collections.singletonMap("invalid", "2"), alterOpts);
+ alterConfigWithAdmin(client, brokerIdOrDefault, Map.of("invalid",
"2"), alterOpts);
Stream<String> describeCommand = Stream.concat(
Stream.concat(
@@ -357,7 +358,7 @@ public class ConfigCommandIntegrationTest {
public void testUpdateInvalidTopicConfigs() throws ExecutionException,
InterruptedException {
List<String> alterOpts = asList("--bootstrap-server",
cluster.bootstrapServers(), "--entity-type", "topics", "--alter");
try (Admin client = cluster.admin()) {
- client.createTopics(Collections.singletonList(new
NewTopic("test-config-topic", 1, (short) 1))).all().get();
+ client.createTopics(List.of(new NewTopic("test-config-topic", 1,
(short) 1))).all().get();
assertInstanceOf(
InvalidConfigurationException.class,
assertThrows(
@@ -575,17 +576,16 @@ public class ConfigCommandIntegrationTest {
private void deleteAndVerifyClientMetricsConfigValue(Admin client,
String
clientMetricsName,
- Set<String>
defaultConfigs,
+ Map<String, String>
defaultConfigs,
List<String>
alterOpts) throws Exception {
List<String> bootstrapOpts = quorumArgs().collect(Collectors.toList());
ConfigCommand.ConfigCommandOptions deleteOpts =
new ConfigCommand.ConfigCommandOptions(toArray(bootstrapOpts,
alterOpts,
- asList("--delete-config", String.join(",",
defaultConfigs))));
+ asList("--delete-config", String.join(",",
defaultConfigs.keySet()))));
deleteOpts.checkArgs();
ConfigCommand.alterConfig(client, deleteOpts);
- // There are no default configs returned for client metrics
- verifyClientMetricsConfig(client, clientMetricsName,
Collections.emptyMap());
+ verifyClientMetricsConfig(client, clientMetricsName, defaultConfigs);
}
private void verifyPerBrokerConfigValue(Admin client,