This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b7728d904d1 KAFKA-18984: Reset interval.ms By Using
kafka-client-metrics.sh (#19213)
b7728d904d1 is described below
commit b7728d904d1c81f421efc8378af3a01c9e94df45
Author: Parker Chang <[email protected]>
AuthorDate: Mon Mar 24 18:53:49 2025 +0800
KAFKA-18984: Reset interval.ms By Using kafka-client-metrics.sh (#19213)
kafka-client-metrics.sh cannot reset the interval using `--interval=`.
Reviewers: Andrew Schofield <[email protected]>
---
.../apache/kafka/tools/ClientMetricsCommand.java | 29 ++++++++++----
.../kafka/tools/ClientMetricsCommandTest.java | 46 ++++++++++++++++++++++
2 files changed, 67 insertions(+), 8 deletions(-)
diff --git
a/tools/src/main/java/org/apache/kafka/tools/ClientMetricsCommand.java
b/tools/src/main/java/org/apache/kafka/tools/ClientMetricsCommand.java
index 1010184eb01..f9b02f78397 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ClientMetricsCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ClientMetricsCommand.java
@@ -120,7 +120,7 @@ public class ClientMetricsCommand {
String entityName = opts.hasGenerateNameOption() ?
Uuid.randomUuid().toString() : opts.name().get();
Map<String, String> configsToBeSet = new HashMap<>();
- opts.interval().map(intervalVal ->
configsToBeSet.put("interval.ms", intervalVal.toString()));
+ opts.interval().map(intervalVal ->
configsToBeSet.put("interval.ms", intervalVal));
opts.metrics().map(metricslist -> configsToBeSet.put("metrics",
String.join(",", metricslist)));
opts.match().map(matchlist -> configsToBeSet.put("match",
String.join(",", matchlist)));
@@ -211,7 +211,7 @@ public class ClientMetricsCommand {
private final OptionSpecBuilder generateNameOpt;
- private final ArgumentAcceptingOptionSpec<Integer> intervalOpt;
+ private final ArgumentAcceptingOptionSpec<String> intervalOpt;
private final ArgumentAcceptingOptionSpec<String> matchOpt;
@@ -238,24 +238,25 @@ public class ClientMetricsCommand {
.describedAs("name")
.ofType(String.class);
generateNameOpt = parser.accepts("generate-name", "Generate a UUID
to use as the name.");
- intervalOpt = parser.accepts("interval", "The metrics push
interval in milliseconds.")
+ String nl = System.lineSeparator();
+
+ intervalOpt = parser.accepts("interval", "The metrics push
interval in milliseconds." + nl + "Leave empty to reset the interval.")
.withRequiredArg()
.describedAs("push interval")
- .ofType(java.lang.Integer.class);
+ .ofType(String.class);
- String nl = System.lineSeparator();
String[] matchSelectors = new String[] {
"client_id", "client_instance_id", "client_software_name",
"client_software_version", "client_source_address",
"client_source_port"
};
String matchSelectorNames =
Arrays.stream(matchSelectors).map(config -> "\t" +
config).collect(Collectors.joining(nl));
- matchOpt = parser.accepts("match", "Matching selector
'k1=v1,k2=v2'. The following is a list of valid selector names: " + nl +
matchSelectorNames)
+ matchOpt = parser.accepts("match", "Matching selector
'k1=v1,k2=v2'. The following is a list of valid selector names: " + nl +
matchSelectorNames)
.withRequiredArg()
.describedAs("k1=v1,k2=v2")
.ofType(String.class)
.withValuesSeparatedBy(',');
- metricsOpt = parser.accepts("metrics", "Telemetry metric name
prefixes 'm1,m2'.")
+ metricsOpt = parser.accepts("metrics", "Telemetry metric name
prefixes 'm1,m2'.")
.withRequiredArg()
.describedAs("m1,m2")
.ofType(String.class)
@@ -334,7 +335,7 @@ public class ClientMetricsCommand {
return valuesAsOption(metricsOpt);
}
- public Optional<Integer> interval() {
+ public Optional<String> interval() {
return valueAsOption(intervalOpt);
}
@@ -367,6 +368,18 @@ public class ClientMetricsCommand {
if (has(alterOpt)) {
if ((isNamePresent && has(generateNameOpt)) || (!isNamePresent
&& !has(generateNameOpt)))
throw new IllegalArgumentException("One of --name or
--generate-name must be specified with --alter.");
+
+ interval().ifPresent(intervalStr -> {
+ if (!intervalStr.isEmpty()) {
+ try {
+ Integer.parseInt(intervalStr);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException(
+ "Invalid interval value. Enter an integer,
or leave empty to reset.");
+ }
+ }
+
+ });
}
if (has(deleteOpt) && !isNamePresent)
diff --git
a/tools/src/test/java/org/apache/kafka/tools/ClientMetricsCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/ClientMetricsCommandTest.java
index 804c70fd07c..c58748bf3c0 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ClientMetricsCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ClientMetricsCommandTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.tools;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientTestUtils;
+import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigsResult;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
@@ -29,11 +30,15 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.Exit;
import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import java.util.Collection;
import java.util.Collections;
+import java.util.Map;
import java.util.concurrent.ExecutionException;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -114,6 +119,14 @@ public class ClientMetricsCommandTest {
}
+ @Test
+ public void testOptionsAlterInvalidInterval() {
+ Exception exception = assertThrows(IllegalArgumentException.class, ()
-> new ClientMetricsCommand.ClientMetricsCommandOptions(
+ new String[]{"--bootstrap-server", bootstrapServer, "--alter",
"--name", clientMetricsName,
+ "--interval", "abc"}));
+ assertEquals("Invalid interval value. Enter an integer, or leave empty
to reset.", exception.getMessage());
+ }
+
@Test
public void testAlter() {
Admin adminClient = mock(Admin.class);
@@ -156,6 +169,39 @@ public class ClientMetricsCommandTest {
assertTrue(capturedOutput.contains("Altered client metrics config"));
}
+ @Test
+ public void testAlterResetConfigs() {
+ Admin adminClient = mock(Admin.class);
+ ClientMetricsCommand.ClientMetricsService service = new
ClientMetricsCommand.ClientMetricsService(adminClient);
+
+ AlterConfigsResult result =
AdminClientTestUtils.alterConfigsResult(new
ConfigResource(ConfigResource.Type.CLIENT_METRICS, clientMetricsName));
+ @SuppressWarnings("unchecked")
+ final ArgumentCaptor<Map<ConfigResource, Collection<AlterConfigOp>>>
configCaptor = ArgumentCaptor.forClass(Map.class);
+ when(adminClient.incrementalAlterConfigs(configCaptor.capture(),
any())).thenReturn(result);
+
+ String capturedOutput = ToolsTestUtils.captureStandardOut(() -> {
+ try {
+ service.alterClientMetrics(new
ClientMetricsCommand.ClientMetricsCommandOptions(
+ new String[]{"--bootstrap-server", bootstrapServer,
"--alter",
+ "--name", clientMetricsName, "--metrics",
"",
+ "--interval", "", "--match", ""}));
+ } catch (Throwable t) {
+ fail(t);
+ }
+ });
+ Map<ConfigResource, Collection<AlterConfigOp>> alteredConfigOps =
configCaptor.getValue();
+ assertNotNull(alteredConfigOps, "alteredConfigOps should not be null");
+ assertEquals(1, alteredConfigOps.size(), "Should have exactly one
ConfigResource");
+ assertEquals(3, alteredConfigOps.values().iterator().next().size(),
"Should have exactly 3 operations");
+ for (Collection<AlterConfigOp> operations : alteredConfigOps.values())
{
+ for (AlterConfigOp op : operations) {
+ assertEquals(AlterConfigOp.OpType.DELETE, op.opType(),
+ "Expected DELETE operation for config: " +
op.configEntry().name());
+ }
+ }
+ assertTrue(capturedOutput.contains("Altered client metrics config for
" + clientMetricsName + "."));
+ }
+
@Test
public void testDelete() {
Admin adminClient = mock(Admin.class);