This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6f1a6ae8687 MINOR: Clean up coordinator runtime metrics isolation 
tests (#21043)
6f1a6ae8687 is described below

commit 6f1a6ae8687e377a937f7bd8756f1003a5b959f0
Author: Sean Quah <[email protected]>
AuthorDate: Thu Dec 4 15:54:55 2025 +0000

    MINOR: Clean up coordinator runtime metrics isolation tests (#21043)
    
    When we fixed KAFKA-19445 to prevent sharing of metrics between the
    group coordinator and share coordinator, we added an isolation test for
    each metric. This approach is unwieldy and it is very easy to forget to
    add isolation tests for new metrics. Replace the isolation tests with  a
    single test that automatically collects the list of sensors and  metrics
    from two different CoordinatorRuntimeMetricsImpl instances and  checks
    them for overlap.
    
    Reviewers: David Jacot <[email protected]>
---
 .../runtime/CoordinatorRuntimeMetricsImplTest.java | 193 ++++++++-------------
 1 file changed, 73 insertions(+), 120 deletions(-)

diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java
index e6fccd4ca2e..b243f12466b 100644
--- 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.coordinator.common.runtime;
 
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricValueProvider;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
@@ -26,8 +27,9 @@ import 
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.Coordinato
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.ArgumentCaptor;
 
-import java.util.List;
+import java.util.HashSet;
 import java.util.Set;
 import java.util.stream.IntStream;
 
@@ -40,19 +42,20 @@ import static 
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetr
 import static 
org.apache.kafka.coordinator.common.runtime.KafkaMetricHistogram.MAX_LATENCY_MS;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 
 public class CoordinatorRuntimeMetricsImplTest {
 
     private static final String METRICS_GROUP = "test-runtime-metrics";
     private static final String OTHER_METRICS_GROUP = "test-runtime-metrics-2";
 
-    @Test
-    public void testMetricNames() {
-        Metrics metrics = new Metrics();
-
-        Set<org.apache.kafka.common.MetricName> expectedMetrics = Set.of(
+    private static Set<MetricName> expectedMetricNames(Metrics metrics) {
+        return Set.of(
             kafkaMetricName(metrics, NUM_PARTITIONS_METRIC_NAME, "state", 
"loading"),
             kafkaMetricName(metrics, NUM_PARTITIONS_METRIC_NAME, "state", 
"active"),
             kafkaMetricName(metrics, NUM_PARTITIONS_METRIC_NAME, "state", 
"failed"),
@@ -87,6 +90,13 @@ public class CoordinatorRuntimeMetricsImplTest {
             kafkaMetricName(metrics, "batch-flush-time-ms-p999"),
             kafkaMetricName(metrics, "batch-flush-rate")
         );
+    }
+
+    @Test
+    public void testMetricNames() {
+        Metrics metrics = new Metrics();
+
+        Set<MetricName> expectedMetrics = expectedMetricNames(metrics);
 
         try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new 
CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP)) {
             runtimeMetrics.registerEventQueueSizeGauge(() -> 0);
@@ -99,6 +109,62 @@ public class CoordinatorRuntimeMetricsImplTest {
         ));
     }
 
+    @Test
+    public void testMetricsGroupIsolation() {
+        Metrics metrics = spy(new Metrics());
+
+        // Create first CoordinatorRuntimeMetricsImpl instance and capture 
sensor and metric names.
+        Set<String> sensorNames;
+        Set<MetricName> metricNames;
+        try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new 
CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP)) {
+            runtimeMetrics.registerEventQueueSizeGauge(() -> 0);
+
+            ArgumentCaptor<String> sensorCaptor = 
ArgumentCaptor.forClass(String.class);
+            verify(metrics, atLeastOnce()).sensor(sensorCaptor.capture());
+            sensorNames = new HashSet<>(sensorCaptor.getAllValues());
+
+            ArgumentCaptor<MetricName> metricNameCaptor = 
ArgumentCaptor.forClass(MetricName.class);
+            verify(metrics, 
atLeastOnce()).addMetric(metricNameCaptor.capture(), 
any(MetricValueProvider.class));
+            metricNames = new HashSet<>(metricNameCaptor.getAllValues());
+
+            // Check that all gauges were registered.
+            expectedMetricNames(metrics).forEach(metricName -> 
assertTrue(metrics.metrics().containsKey(metricName)));
+        }
+
+        clearInvocations(metrics);
+
+        // Create second CoordinatorRuntimeMetricsImpl instance and capture 
sensor and metric names.
+        Set<String> otherSensorNames;
+        Set<MetricName> otherMetricNames;
+        try (CoordinatorRuntimeMetricsImpl otherRuntimeMetrics = new 
CoordinatorRuntimeMetricsImpl(metrics, OTHER_METRICS_GROUP)) {
+            otherRuntimeMetrics.registerEventQueueSizeGauge(() -> 0);
+
+            ArgumentCaptor<String> sensorCaptor = 
ArgumentCaptor.forClass(String.class);
+            verify(metrics, atLeastOnce()).sensor(sensorCaptor.capture());
+            otherSensorNames = new HashSet<>(sensorCaptor.getAllValues());
+
+            ArgumentCaptor<MetricName> metricNameCaptor = 
ArgumentCaptor.forClass(MetricName.class);
+            verify(metrics, 
atLeastOnce()).addMetric(metricNameCaptor.capture(), 
any(MetricValueProvider.class));
+            otherMetricNames = new HashSet<>(metricNameCaptor.getAllValues());
+
+            // Check that all gauges were registered.
+            assertEquals(sensorNames.size(), otherSensorNames.size());
+            assertEquals(metricNames.size(), otherMetricNames.size());
+        }
+
+        // Check for shared sensors.
+        Set<String> sharedSensorNames = new HashSet<>(sensorNames);
+        sharedSensorNames.retainAll(otherSensorNames);
+        assertTrue(sharedSensorNames.isEmpty(),
+            "Found shared sensors between two CoordinatorRuntimeMetricsImpl 
instances: " + sharedSensorNames);
+
+        // Check for shared metrics.
+        Set<MetricName> sharedMetricNames = new HashSet<>(metricNames);
+        sharedMetricNames.retainAll(otherMetricNames);
+        assertTrue(sharedMetricNames.isEmpty(),
+            "Found shared metrics between two CoordinatorRuntimeMetricsImpl 
instances: " + sharedMetricNames);
+    }
+
     @Test
     public void testUpdateNumPartitionsMetrics() {
         Metrics metrics = new Metrics();
@@ -119,25 +185,6 @@ public class CoordinatorRuntimeMetricsImplTest {
         }
     }
 
-    @Test
-    public void testNumPartitionsMetricsGroupIsolation() {
-        Metrics metrics = new Metrics();
-
-        try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new 
CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP);
-             CoordinatorRuntimeMetricsImpl runtimeMetrics2 = new 
CoordinatorRuntimeMetricsImpl(metrics, OTHER_METRICS_GROUP)) {
-            IntStream.range(0, 3)
-                .forEach(__ -> 
runtimeMetrics.recordPartitionStateChange(CoordinatorState.INITIAL, 
CoordinatorState.LOADING));
-            IntStream.range(0, 2)
-                .forEach(__ -> 
runtimeMetrics.recordPartitionStateChange(CoordinatorState.LOADING, 
CoordinatorState.ACTIVE));
-            IntStream.range(0, 1)
-                .forEach(__ -> 
runtimeMetrics.recordPartitionStateChange(CoordinatorState.ACTIVE, 
CoordinatorState.FAILED));
-
-            for (String state : List.of("loading", "active", "failed")) {
-                assertMetricGauge(metrics, kafkaMetricName(metrics, 
NUM_PARTITIONS_METRIC_NAME, "state", state), 1);
-                assertMetricGauge(metrics, otherGroupKafkaMetricName(metrics, 
NUM_PARTITIONS_METRIC_NAME, "state", state), 0);
-            }
-        }
-    }
 
     @Test
     public void testPartitionLoadSensorMetrics() {
@@ -160,28 +207,6 @@ public class CoordinatorRuntimeMetricsImplTest {
         }
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {
-        "partition-load-time-avg",
-        "partition-load-time-max"
-    })
-    public void testPartitionLoadSensorMetricsGroupIsolation(String name) {
-        Time time = new MockTime();
-        Metrics metrics = new Metrics(time);
-
-        try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new 
CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP);
-             CoordinatorRuntimeMetricsImpl runtimeMetrics2 = new 
CoordinatorRuntimeMetricsImpl(metrics, OTHER_METRICS_GROUP)) {
-            long startTimeMs = time.milliseconds();
-            runtimeMetrics.recordPartitionLoadSensor(startTimeMs, startTimeMs 
+ 1000);
-
-            org.apache.kafka.common.MetricName metricName = 
kafkaMetricName(metrics, name);
-            org.apache.kafka.common.MetricName otherGroupMetricName = 
otherGroupKafkaMetricName(metrics, name);
-            KafkaMetric metric = metrics.metrics().get(metricName);
-            KafkaMetric otherMetric = 
metrics.metrics().get(otherGroupMetricName);
-            assertNotEquals(Double.NaN, metric.metricValue());
-            assertEquals(Double.NaN, otherMetric.metricValue());
-        }
-    }
 
     @Test
     public void testThreadIdleSensor() {
@@ -196,21 +221,6 @@ public class CoordinatorRuntimeMetricsImplTest {
         assertEquals(6 / 30.0, metric.metricValue()); // 'total_ms / window_ms'
     }
 
-    @Test
-    public void testThreadIdleSensorMetricsGroupIsolation() {
-        Time time = new MockTime();
-        Metrics metrics = new Metrics(time);
-
-        try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new 
CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP);
-             CoordinatorRuntimeMetricsImpl runtimeMetrics2 = new 
CoordinatorRuntimeMetricsImpl(metrics, OTHER_METRICS_GROUP)) {
-            runtimeMetrics.recordThreadIdleTime(1000.0);
-
-            org.apache.kafka.common.MetricName metricName = 
kafkaMetricName(metrics, "thread-idle-ratio-avg");
-            org.apache.kafka.common.MetricName otherGroupMetricName = 
otherGroupKafkaMetricName(metrics, "thread-idle-ratio-avg");
-            assertNotEquals(0.0, 
metrics.metrics().get(metricName).metricValue());
-            assertEquals(0.0, 
metrics.metrics().get(otherGroupMetricName).metricValue());
-        }
-    }
 
     @Test
     public void testEventQueueSize() {
@@ -223,21 +233,6 @@ public class CoordinatorRuntimeMetricsImplTest {
         }
     }
 
-    @Test
-    public void testEventQueueSizeMetricsGroupIsolation() {
-        Time time = new MockTime();
-        Metrics metrics = new Metrics(time);
-
-        try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new 
CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP);
-             CoordinatorRuntimeMetricsImpl otherRuntimeMetrics = new 
CoordinatorRuntimeMetricsImpl(metrics, OTHER_METRICS_GROUP)) {
-            runtimeMetrics.registerEventQueueSizeGauge(() -> 5);
-            otherRuntimeMetrics.registerEventQueueSizeGauge(() -> 0);
-
-            assertMetricGauge(metrics, kafkaMetricName(metrics, 
"event-queue-size"), 5);
-            assertMetricGauge(metrics, otherGroupKafkaMetricName(metrics, 
"event-queue-size"), 0);
-        }
-    }
-
     @ParameterizedTest
     @ValueSource(strings = {
         EVENT_QUEUE_TIME_METRIC_NAME,
@@ -292,44 +287,6 @@ public class CoordinatorRuntimeMetricsImplTest {
         assertEquals(999.0, metric.metricValue());
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {
-        EVENT_QUEUE_TIME_METRIC_NAME,
-        EVENT_PROCESSING_TIME_METRIC_NAME,
-        EVENT_PURGATORY_TIME_METRIC_NAME,
-        BATCH_FLUSH_TIME_METRIC_NAME
-    })
-    public void testHistogramMetricsGroupIsolation(String metricNamePrefix) {
-        Time time = new MockTime();
-        Metrics metrics = new Metrics(time);
-
-        try (CoordinatorRuntimeMetricsImpl runtimeMetrics = new 
CoordinatorRuntimeMetricsImpl(metrics, METRICS_GROUP);
-             CoordinatorRuntimeMetricsImpl runtimeMetrics2 = new 
CoordinatorRuntimeMetricsImpl(metrics, OTHER_METRICS_GROUP)) {
-            switch (metricNamePrefix) {
-                case EVENT_QUEUE_TIME_METRIC_NAME:
-                    runtimeMetrics.recordEventQueueTime(1000);
-                    break;
-                case EVENT_PROCESSING_TIME_METRIC_NAME:
-                    runtimeMetrics.recordEventProcessingTime(1000);
-                    break;
-                case EVENT_PURGATORY_TIME_METRIC_NAME:
-                    runtimeMetrics.recordEventPurgatoryTime(1000);
-                    break;
-                case BATCH_FLUSH_TIME_METRIC_NAME:
-                    runtimeMetrics.recordFlushTime(1000);
-            }
-
-            // Check metric group isolation
-            for (String suffix : List.of("-max", "-p50", "-p95", "-p99", 
"-p999")) {
-                org.apache.kafka.common.MetricName metricName = 
kafkaMetricName(metrics, metricNamePrefix + suffix);
-                org.apache.kafka.common.MetricName otherGroupMetricName = 
otherGroupKafkaMetricName(metrics, metricNamePrefix + suffix);
-                KafkaMetric metric = metrics.metrics().get(metricName);
-                KafkaMetric otherMetric = 
metrics.metrics().get(otherGroupMetricName);
-                assertNotEquals(0.0, metric.metricValue());
-                assertEquals(0.0, otherMetric.metricValue());
-            }
-        }
-    }
 
     @Test
     public void testRecordEventPurgatoryTimeLimit() {
@@ -368,8 +325,4 @@ public class CoordinatorRuntimeMetricsImplTest {
     private static MetricName kafkaMetricName(Metrics metrics, String name, 
String... keyValue) {
         return metrics.metricName(name, METRICS_GROUP, "", keyValue);
     }
-
-    private static MetricName otherGroupKafkaMetricName(Metrics metrics, 
String name, String... keyValue) {
-        return metrics.metricName(name, OTHER_METRICS_GROUP, "", keyValue);
-    }
 }

Reply via email to