This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6f1a6ae8687 MINOR: Clean up coordinator runtime metrics isolation
tests (#21043)
6f1a6ae8687 is described below
commit 6f1a6ae8687e377a937f7bd8756f1003a5b959f0
Author: Sean Quah <[email protected]>
AuthorDate: Thu Dec 4 15:54:55 2025 +0000
MINOR: Clean up coordinator runtime metrics isolation tests (#21043)
When we fixed KAFKA-19445 to prevent sharing of metrics between the
group coordinator and share coordinator, we added an isolation test for
each metric. This approach is unwieldy and it is very easy to forget to
add isolation tests for new metrics. Replace the isolation tests with a
single test that automatically collects the list of sensors and metrics
from two different CoordinatorRuntimeMetricsImpl instances and checks
them for overlap.
Reviewers: David Jacot <[email protected]>
---
.../runtime/CoordinatorRuntimeMetricsImplTest.java | 193 ++++++++-------------
1 file changed, 73 insertions(+), 120 deletions(-)
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java
index e6fccd4ca2e..b243f12466b 100644
---
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.coordinator.common.runtime;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricValueProvider;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
@@ -26,8 +27,9 @@ import
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.Coordinato
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.ArgumentCaptor;
-import java.util.List;
+import java.util.HashSet;
import java.util.Set;
import java.util.stream.IntStream;
@@ -40,19 +42,20 @@ import static
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetr
import static
org.apache.kafka.coordinator.common.runtime.KafkaMetricHistogram.MAX_LATENCY_MS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
public class CoordinatorRuntimeMetricsImplTest {
private static final String METRICS_GROUP = "test-runtime-metrics";
private static final String OTHER_METRICS_GROUP = "test-runtime-metrics-2";
- @Test
- public void testMetricNames() {
- Metrics metrics = new Metrics();
-
- Set<org.apache.kafka.common.MetricName> expectedMetrics = Set.of(
+ private static Set<MetricName> expectedMetricNames(Metrics metrics) {
+ return Set.of(
kafkaMetricName(metrics, NUM_PARTITIONS_METRIC_NAME, "state",
"loading"),
kafkaMetricName(metrics, NUM_PARTITIONS_METRIC_NAME, "state",
"active"),
kafkaMetricName(metrics, NUM_PARTITIONS_METRIC_NAME, "state",
"failed"),
@@ -87,6 +90,13 @@ public class CoordinatorRuntimeMetricsImplTest {
kafkaMetricName(metrics, "batch-flush-time-ms-p999"),
kafkaMetricName(metrics, "batch-flush-rate")
);
+ }
+
+ @Test
+ public void testMetricNames() {
+ Metrics metrics = new Metrics();
+
+ Set<MetricName> expectedMetrics = expectedMetricNames(metrics);
try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new
CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP)) {
runtimeMetrics.registerEventQueueSizeGauge(() -> 0);
@@ -99,6 +109,62 @@ public class CoordinatorRuntimeMetricsImplTest {
));
}
+ @Test
+ public void testMetricsGroupIsolation() {
+ Metrics metrics = spy(new Metrics());
+
+ // Create first CoordinatorRuntimeMetricsImpl instance and capture
sensor and metric names.
+ Set<String> sensorNames;
+ Set<MetricName> metricNames;
+ try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new
CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP)) {
+ runtimeMetrics.registerEventQueueSizeGauge(() -> 0);
+
+ ArgumentCaptor<String> sensorCaptor =
ArgumentCaptor.forClass(String.class);
+ verify(metrics, atLeastOnce()).sensor(sensorCaptor.capture());
+ sensorNames = new HashSet<>(sensorCaptor.getAllValues());
+
+ ArgumentCaptor<MetricName> metricNameCaptor =
ArgumentCaptor.forClass(MetricName.class);
+ verify(metrics,
atLeastOnce()).addMetric(metricNameCaptor.capture(),
any(MetricValueProvider.class));
+ metricNames = new HashSet<>(metricNameCaptor.getAllValues());
+
+ // Check that all gauges were registered.
+ expectedMetricNames(metrics).forEach(metricName ->
assertTrue(metrics.metrics().containsKey(metricName)));
+ }
+
+ clearInvocations(metrics);
+
+ // Create second CoordinatorRuntimeMetricsImpl instance and capture
sensor and metric names.
+ Set<String> otherSensorNames;
+ Set<MetricName> otherMetricNames;
+ try (CoordinatorRuntimeMetricsImpl otherRuntimeMetrics = new
CoordinatorRuntimeMetricsImpl(metrics, OTHER_METRICS_GROUP)) {
+ otherRuntimeMetrics.registerEventQueueSizeGauge(() -> 0);
+
+ ArgumentCaptor<String> sensorCaptor =
ArgumentCaptor.forClass(String.class);
+ verify(metrics, atLeastOnce()).sensor(sensorCaptor.capture());
+ otherSensorNames = new HashSet<>(sensorCaptor.getAllValues());
+
+ ArgumentCaptor<MetricName> metricNameCaptor =
ArgumentCaptor.forClass(MetricName.class);
+ verify(metrics,
atLeastOnce()).addMetric(metricNameCaptor.capture(),
any(MetricValueProvider.class));
+ otherMetricNames = new HashSet<>(metricNameCaptor.getAllValues());
+
+ // Check that all gauges were registered.
+ assertEquals(sensorNames.size(), otherSensorNames.size());
+ assertEquals(metricNames.size(), otherMetricNames.size());
+ }
+
+ // Check for shared sensors.
+ Set<String> sharedSensorNames = new HashSet<>(sensorNames);
+ sharedSensorNames.retainAll(otherSensorNames);
+ assertTrue(sharedSensorNames.isEmpty(),
+ "Found shared sensors between two CoordinatorRuntimeMetricsImpl
instances: " + sharedSensorNames);
+
+ // Check for shared metrics.
+ Set<MetricName> sharedMetricNames = new HashSet<>(metricNames);
+ sharedMetricNames.retainAll(otherMetricNames);
+ assertTrue(sharedMetricNames.isEmpty(),
+ "Found shared metrics between two CoordinatorRuntimeMetricsImpl
instances: " + sharedMetricNames);
+ }
+
@Test
public void testUpdateNumPartitionsMetrics() {
Metrics metrics = new Metrics();
@@ -119,25 +185,6 @@ public class CoordinatorRuntimeMetricsImplTest {
}
}
- @Test
- public void testNumPartitionsMetricsGroupIsolation() {
- Metrics metrics = new Metrics();
-
- try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new
CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP);
- CoordinatorRuntimeMetricsImpl runtimeMetrics2 = new
CoordinatorRuntimeMetricsImpl(metrics, OTHER_METRICS_GROUP)) {
- IntStream.range(0, 3)
- .forEach(__ ->
runtimeMetrics.recordPartitionStateChange(CoordinatorState.INITIAL,
CoordinatorState.LOADING));
- IntStream.range(0, 2)
- .forEach(__ ->
runtimeMetrics.recordPartitionStateChange(CoordinatorState.LOADING,
CoordinatorState.ACTIVE));
- IntStream.range(0, 1)
- .forEach(__ ->
runtimeMetrics.recordPartitionStateChange(CoordinatorState.ACTIVE,
CoordinatorState.FAILED));
-
- for (String state : List.of("loading", "active", "failed")) {
- assertMetricGauge(metrics, kafkaMetricName(metrics,
NUM_PARTITIONS_METRIC_NAME, "state", state), 1);
- assertMetricGauge(metrics, otherGroupKafkaMetricName(metrics,
NUM_PARTITIONS_METRIC_NAME, "state", state), 0);
- }
- }
- }
@Test
public void testPartitionLoadSensorMetrics() {
@@ -160,28 +207,6 @@ public class CoordinatorRuntimeMetricsImplTest {
}
}
- @ParameterizedTest
- @ValueSource(strings = {
- "partition-load-time-avg",
- "partition-load-time-max"
- })
- public void testPartitionLoadSensorMetricsGroupIsolation(String name) {
- Time time = new MockTime();
- Metrics metrics = new Metrics(time);
-
- try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new
CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP);
- CoordinatorRuntimeMetricsImpl runtimeMetrics2 = new
CoordinatorRuntimeMetricsImpl(metrics, OTHER_METRICS_GROUP)) {
- long startTimeMs = time.milliseconds();
- runtimeMetrics.recordPartitionLoadSensor(startTimeMs, startTimeMs
+ 1000);
-
- org.apache.kafka.common.MetricName metricName =
kafkaMetricName(metrics, name);
- org.apache.kafka.common.MetricName otherGroupMetricName =
otherGroupKafkaMetricName(metrics, name);
- KafkaMetric metric = metrics.metrics().get(metricName);
- KafkaMetric otherMetric =
metrics.metrics().get(otherGroupMetricName);
- assertNotEquals(Double.NaN, metric.metricValue());
- assertEquals(Double.NaN, otherMetric.metricValue());
- }
- }
@Test
public void testThreadIdleSensor() {
@@ -196,21 +221,6 @@ public class CoordinatorRuntimeMetricsImplTest {
assertEquals(6 / 30.0, metric.metricValue()); // 'total_ms / window_ms'
}
- @Test
- public void testThreadIdleSensorMetricsGroupIsolation() {
- Time time = new MockTime();
- Metrics metrics = new Metrics(time);
-
- try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new
CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP);
- CoordinatorRuntimeMetricsImpl runtimeMetrics2 = new
CoordinatorRuntimeMetricsImpl(metrics, OTHER_METRICS_GROUP)) {
- runtimeMetrics.recordThreadIdleTime(1000.0);
-
- org.apache.kafka.common.MetricName metricName =
kafkaMetricName(metrics, "thread-idle-ratio-avg");
- org.apache.kafka.common.MetricName otherGroupMetricName =
otherGroupKafkaMetricName(metrics, "thread-idle-ratio-avg");
- assertNotEquals(0.0,
metrics.metrics().get(metricName).metricValue());
- assertEquals(0.0,
metrics.metrics().get(otherGroupMetricName).metricValue());
- }
- }
@Test
public void testEventQueueSize() {
@@ -223,21 +233,6 @@ public class CoordinatorRuntimeMetricsImplTest {
}
}
- @Test
- public void testEventQueueSizeMetricsGroupIsolation() {
- Time time = new MockTime();
- Metrics metrics = new Metrics(time);
-
- try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new
CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP);
- CoordinatorRuntimeMetricsImpl otherRuntimeMetrics = new
CoordinatorRuntimeMetricsImpl(metrics, OTHER_METRICS_GROUP)) {
- runtimeMetrics.registerEventQueueSizeGauge(() -> 5);
- otherRuntimeMetrics.registerEventQueueSizeGauge(() -> 0);
-
- assertMetricGauge(metrics, kafkaMetricName(metrics,
"event-queue-size"), 5);
- assertMetricGauge(metrics, otherGroupKafkaMetricName(metrics,
"event-queue-size"), 0);
- }
- }
-
@ParameterizedTest
@ValueSource(strings = {
EVENT_QUEUE_TIME_METRIC_NAME,
@@ -292,44 +287,6 @@ public class CoordinatorRuntimeMetricsImplTest {
assertEquals(999.0, metric.metricValue());
}
- @ParameterizedTest
- @ValueSource(strings = {
- EVENT_QUEUE_TIME_METRIC_NAME,
- EVENT_PROCESSING_TIME_METRIC_NAME,
- EVENT_PURGATORY_TIME_METRIC_NAME,
- BATCH_FLUSH_TIME_METRIC_NAME
- })
- public void testHistogramMetricsGroupIsolation(String metricNamePrefix) {
- Time time = new MockTime();
- Metrics metrics = new Metrics(time);
-
- try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new
CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP);
- CoordinatorRuntimeMetricsImpl runtimeMetrics2 = new
CoordinatorRuntimeMetricsImpl(metrics, OTHER_METRICS_GROUP)) {
- switch (metricNamePrefix) {
- case EVENT_QUEUE_TIME_METRIC_NAME:
- runtimeMetrics.recordEventQueueTime(1000);
- break;
- case EVENT_PROCESSING_TIME_METRIC_NAME:
- runtimeMetrics.recordEventProcessingTime(1000);
- break;
- case EVENT_PURGATORY_TIME_METRIC_NAME:
- runtimeMetrics.recordEventPurgatoryTime(1000);
- break;
- case BATCH_FLUSH_TIME_METRIC_NAME:
- runtimeMetrics.recordFlushTime(1000);
- }
-
- // Check metric group isolation
- for (String suffix : List.of("-max", "-p50", "-p95", "-p99",
"-p999")) {
- org.apache.kafka.common.MetricName metricName =
kafkaMetricName(metrics, metricNamePrefix + suffix);
- org.apache.kafka.common.MetricName otherGroupMetricName =
otherGroupKafkaMetricName(metrics, metricNamePrefix + suffix);
- KafkaMetric metric = metrics.metrics().get(metricName);
- KafkaMetric otherMetric =
metrics.metrics().get(otherGroupMetricName);
- assertNotEquals(0.0, metric.metricValue());
- assertEquals(0.0, otherMetric.metricValue());
- }
- }
- }
@Test
public void testRecordEventPurgatoryTimeLimit() {
@@ -368,8 +325,4 @@ public class CoordinatorRuntimeMetricsImplTest {
private static MetricName kafkaMetricName(Metrics metrics, String name,
String... keyValue) {
return metrics.metricName(name, METRICS_GROUP, "", keyValue);
}
-
- private static MetricName otherGroupKafkaMetricName(Metrics metrics,
String name, String... keyValue) {
- return metrics.metricName(name, OTHER_METRICS_GROUP, "", keyValue);
- }
}