This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 424e7251d69 KAFKA-19207 Move ForwardingManagerMetrics and 
ForwardingManagerMetricsTest to server module (#19574)
424e7251d69 is described below

commit 424e7251d69e65bfd6155e526d476a4eb07d93e1
Author: PoAn Yang <[email protected]>
AuthorDate: Tue May 6 07:03:17 2025 -0500

    KAFKA-19207 Move ForwardingManagerMetrics and ForwardingManagerMetricsTest 
to server module (#19574)
    
    1. Move `ForwardingManagerMetrics` and `ForwardingManagerMetricsTest` to
    server module.
    2. Rewrite them in Java.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../scala/kafka/server/ForwardingManager.scala     |   9 +-
 .../kafka/server/ForwardingManagerMetrics.scala    | 100 ---------------
 .../server/ForwardingManagerMetricsTest.scala      | 114 -----------------
 .../unit/kafka/server/ForwardingManagerTest.scala  |   2 +-
 .../server/metrics/ForwardingManagerMetrics.java   | 138 +++++++++++++++++++++
 .../metrics/ForwardingManagerMetricsTest.java      | 129 +++++++++++++++++++
 6 files changed, 273 insertions(+), 219 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ForwardingManager.scala 
b/core/src/main/scala/kafka/server/ForwardingManager.scala
index c067000bf0c..7737d2d2171 100644
--- a/core/src/main/scala/kafka/server/ForwardingManager.scala
+++ b/core/src/main/scala/kafka/server/ForwardingManager.scala
@@ -26,6 +26,7 @@ import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, 
EnvelopeRequest, EnvelopeResponse, RequestContext, RequestHeader}
 import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, 
NodeToControllerChannelManager}
+import org.apache.kafka.server.metrics.ForwardingManagerMetrics
 
 import java.util.Optional
 import java.util.concurrent.TimeUnit
@@ -117,7 +118,7 @@ class ForwardingManagerImpl(
   metrics: Metrics
 ) extends ForwardingManager with AutoCloseable with Logging {
 
-  val forwardingManagerMetrics: ForwardingManagerMetrics = 
ForwardingManagerMetrics(metrics, channelManager.getTimeoutMs)
+  val forwardingManagerMetrics: ForwardingManagerMetrics = new 
ForwardingManagerMetrics(metrics, channelManager.getTimeoutMs)
 
   override def forwardRequest(
     requestContext: RequestContext,
@@ -133,7 +134,7 @@ class ForwardingManagerImpl(
     class ForwardingResponseHandler extends ControllerRequestCompletionHandler 
{
       override def onComplete(clientResponse: ClientResponse): Unit = {
 
-        forwardingManagerMetrics.queueLength.getAndDecrement()
+        forwardingManagerMetrics.decrementQueueLength()
         
forwardingManagerMetrics.remoteTimeMsHist.record(clientResponse.requestLatencyMs())
         
forwardingManagerMetrics.queueTimeMsHist.record(clientResponse.receivedTimeMs() 
- clientResponse.requestLatencyMs() - requestCreationTimeMs)
 
@@ -174,14 +175,14 @@ class ForwardingManagerImpl(
 
       override def onTimeout(): Unit = {
         debug(s"Forwarding of the request ${requestToString()} failed due to 
timeout exception")
-        forwardingManagerMetrics.queueLength.getAndDecrement()
+        forwardingManagerMetrics.decrementQueueLength()
         
forwardingManagerMetrics.queueTimeMsHist.record(channelManager.getTimeoutMs)
         val response = requestBody.getErrorResponse(new TimeoutException())
         responseCallback(Option(response))
       }
     }
 
-    forwardingManagerMetrics.queueLength.getAndIncrement()
+    forwardingManagerMetrics.incrementQueueLength()
     channelManager.sendRequest(envelopeRequest, new ForwardingResponseHandler)
   }
 
diff --git a/core/src/main/scala/kafka/server/ForwardingManagerMetrics.scala 
b/core/src/main/scala/kafka/server/ForwardingManagerMetrics.scala
deleted file mode 100644
index a846f8c4955..00000000000
--- a/core/src/main/scala/kafka/server/ForwardingManagerMetrics.scala
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import org.apache.kafka.common.MetricName
-import org.apache.kafka.common.metrics.{Gauge, MetricConfig, Metrics}
-import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing
-import org.apache.kafka.common.metrics.stats.{Percentile, Percentiles}
-
-import java.util.concurrent.atomic.AtomicInteger
-
-final class ForwardingManagerMetrics private (
-  metrics: Metrics,
-  timeoutMs: Long,
-) extends AutoCloseable {
-  import ForwardingManagerMetrics._
-
-  /**
-   * A histogram describing the amount of time in milliseconds each admin 
request spends in the broker's forwarding manager queue, waiting to be sent to 
the controller.
-   * This does not include the time that the request spends waiting for a 
response from the controller.
-   */
-  val queueTimeMsHist: LatencyHistogram = new LatencyHistogram(metrics, 
queueTimeMsName, metricGroupName, timeoutMs)
-
-  /**
-   * A histogram describing the amount of time in milliseconds each request 
sent by the ForwardingManager spends waiting for a response.
-   * This does not include the time spent in the queue.
-   */
-  val remoteTimeMsHist: LatencyHistogram = new LatencyHistogram(metrics, 
remoteTimeMsName, metricGroupName, timeoutMs)
-
-  val queueLengthName: MetricName = metrics.metricName(
-    "QueueLength",
-    metricGroupName,
-    "The current number of RPCs that are waiting in the broker's forwarding 
manager queue, waiting to be sent to the controller."
-  )
-  val queueLength: AtomicInteger = new AtomicInteger(0)
-  metrics.addMetric(queueLengthName, new FuncGauge(_ => queueLength.get()))
-
-  override def close(): Unit = {
-    queueTimeMsHist.close()
-    remoteTimeMsHist.close()
-    metrics.removeMetric(queueLengthName)
-  }
-}
-
-object ForwardingManagerMetrics {
-
-  val metricGroupName = "ForwardingManager"
-  val queueTimeMsName = "QueueTimeMs"
-  val remoteTimeMsName = "RemoteTimeMs"
-
-  final class LatencyHistogram (
-    metrics: Metrics,
-    name: String,
-    group: String,
-    maxLatency: Long
-  ) extends AutoCloseable {
-    private val sensor = metrics.sensor(name)
-    val latencyP99Name: MetricName = metrics.metricName(s"$name.p99", group)
-    val latencyP999Name: MetricName = metrics.metricName(s"$name.p999", group)
-
-    sensor.add(new Percentiles(
-      4000,
-      maxLatency,
-      BucketSizing.CONSTANT,
-      new Percentile(latencyP99Name, 99),
-      new Percentile(latencyP999Name, 99.9)
-    ))
-
-    override def close(): Unit = {
-      metrics.removeSensor(name)
-      metrics.removeMetric(latencyP99Name)
-      metrics.removeMetric(latencyP999Name)
-    }
-
-    def record(latencyMs: Long): Unit = sensor.record(latencyMs)
-  }
-
-  private final class FuncGauge[T](func: Long => T) extends Gauge[T] {
-    override def value(config: MetricConfig, now: Long): T = {
-      func(now)
-    }
-  }
-
-  def apply(metrics: Metrics, timeoutMs: Long): ForwardingManagerMetrics = new 
ForwardingManagerMetrics(metrics, timeoutMs)
-}
diff --git 
a/core/src/test/scala/unit/kafka/server/ForwardingManagerMetricsTest.scala 
b/core/src/test/scala/unit/kafka/server/ForwardingManagerMetricsTest.scala
deleted file mode 100644
index 2c10decb3ed..00000000000
--- a/core/src/test/scala/unit/kafka/server/ForwardingManagerMetricsTest.scala
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package unit.kafka.server
-
-import kafka.server.ForwardingManagerMetrics
-import org.apache.kafka.common.MetricName
-import org.apache.kafka.common.metrics.Metrics
-import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
-import org.junit.jupiter.api.Test
-
-import java.util.Collections
-import scala.jdk.CollectionConverters._
-
-final class ForwardingManagerMetricsTest {
-  @Test
-  def testMetricsNames(): Unit = {
-    val metrics = new Metrics()
-    val expectedGroup = "ForwardingManager"
-
-    val expectedMetrics = Set(
-      new MetricName("QueueTimeMs.p99", expectedGroup, "", 
Collections.emptyMap()),
-      new MetricName("QueueTimeMs.p999", expectedGroup, "", 
Collections.emptyMap()),
-      new MetricName("QueueLength", expectedGroup, "", Collections.emptyMap()),
-      new MetricName("RemoteTimeMs.p99", expectedGroup, "", 
Collections.emptyMap()),
-      new MetricName("RemoteTimeMs.p999", expectedGroup, "", 
Collections.emptyMap())
-    )
-
-    var metricsMap = metrics.metrics().asScala.filter { case (name, _) => 
name.group == expectedGroup }
-    assertEquals(0, metricsMap.size)
-
-    ForwardingManagerMetrics(metrics, 1000)
-    metricsMap = metrics.metrics().asScala.filter { case (name, _) => 
name.group == expectedGroup }
-    assertEquals(metricsMap.size, expectedMetrics.size)
-    metricsMap.foreach { case (name, _) =>
-      assertTrue(expectedMetrics.contains(name))
-    }
-  }
-
-  @Test
-  def testQueueTimeMs(): Unit = {
-    val metrics = new Metrics()
-
-    val forwardingManagerMetrics = ForwardingManagerMetrics(metrics, 1000)
-    val queueTimeMsP99 = 
metrics.metrics().get(forwardingManagerMetrics.queueTimeMsHist.latencyP99Name)
-    val queueTimeMsP999 = 
metrics.metrics().get(forwardingManagerMetrics.queueTimeMsHist.latencyP999Name)
-    assertEquals(Double.NaN, queueTimeMsP99.metricValue.asInstanceOf[Double])
-    assertEquals(Double.NaN, queueTimeMsP999.metricValue.asInstanceOf[Double])
-    for(i <- 0 to 999) {
-      forwardingManagerMetrics.queueTimeMsHist.record(i)
-    }
-    assertEquals(990.0, queueTimeMsP99.metricValue.asInstanceOf[Double])
-    assertEquals(999.0, queueTimeMsP999.metricValue.asInstanceOf[Double])
-  }
-
-  @Test
-  def testQueueLength(): Unit = {
-    val metrics = new Metrics()
-
-    val forwardingManagerMetrics = ForwardingManagerMetrics(metrics, 1000)
-    val queueLength = 
metrics.metrics().get(forwardingManagerMetrics.queueLengthName)
-    assertEquals(0, queueLength.metricValue.asInstanceOf[Int])
-    forwardingManagerMetrics.queueLength.getAndIncrement()
-    assertEquals(1, queueLength.metricValue.asInstanceOf[Int])
-  }
-
-  @Test
-  def testRemoteTimeMs(): Unit = {
-    val metrics = new Metrics()
-
-    val forwardingManagerMetrics = ForwardingManagerMetrics(metrics, 1000)
-    val remoteTimeMsP99 = 
metrics.metrics().get(forwardingManagerMetrics.remoteTimeMsHist.latencyP99Name)
-    val remoteTimeMsP999 = 
metrics.metrics().get(forwardingManagerMetrics.remoteTimeMsHist.latencyP999Name)
-    assertEquals(Double.NaN, remoteTimeMsP99.metricValue.asInstanceOf[Double])
-    assertEquals(Double.NaN, remoteTimeMsP999.metricValue.asInstanceOf[Double])
-    for (i <- 0 to 999) {
-      forwardingManagerMetrics.remoteTimeMsHist.record(i)
-    }
-    assertEquals(990.0, remoteTimeMsP99.metricValue.asInstanceOf[Double])
-    assertEquals(999.0, remoteTimeMsP999.metricValue.asInstanceOf[Double])
-  }
-
-  @Test
-  def testTimeoutMs(): Unit = {
-    val metrics = new Metrics()
-    val timeoutMs = 500
-    val forwardingManagerMetrics = ForwardingManagerMetrics(metrics, timeoutMs)
-    val queueTimeMsP99 = 
metrics.metrics().get(forwardingManagerMetrics.queueTimeMsHist.latencyP99Name)
-    val queueTimeMsP999 = 
metrics.metrics().get(forwardingManagerMetrics.queueTimeMsHist.latencyP999Name)
-    assertEquals(Double.NaN, queueTimeMsP99.metricValue.asInstanceOf[Double])
-    assertEquals(Double.NaN, queueTimeMsP999.metricValue.asInstanceOf[Double])
-    for(i <- 0 to 99) {
-      forwardingManagerMetrics.queueTimeMsHist.record(i)
-    }
-    forwardingManagerMetrics.queueTimeMsHist.record(1000)
-
-    assertEquals(99, queueTimeMsP99.metricValue.asInstanceOf[Double])
-    assertEquals(timeoutMs * 0.999, 
queueTimeMsP999.metricValue.asInstanceOf[Double])
-  }
-}
diff --git a/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
index d2d8d3e0382..16e4b2bcb66 100644
--- a/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
@@ -53,7 +53,7 @@ class ForwardingManagerTest {
   private val forwardingManager = new 
ForwardingManagerImpl(brokerToController, metrics)
   private val principalBuilder = new DefaultKafkaPrincipalBuilder(null, null)
   private val queueTimeMsP999 = 
metrics.metrics().get(forwardingManager.forwardingManagerMetrics.queueTimeMsHist.latencyP999Name)
-  private val queueLength = 
metrics.metrics().get(forwardingManager.forwardingManagerMetrics.queueLengthName)
+  private val queueLength = 
metrics.metrics().get(forwardingManager.forwardingManagerMetrics.queueLengthName())
   private val remoteTimeMsP999 = 
metrics.metrics().get(forwardingManager.forwardingManagerMetrics.remoteTimeMsHist.latencyP999Name)
 
   private def controllerApiVersions: NodeApiVersions = {
diff --git 
a/server/src/main/java/org/apache/kafka/server/metrics/ForwardingManagerMetrics.java
 
b/server/src/main/java/org/apache/kafka/server/metrics/ForwardingManagerMetrics.java
new file mode 100644
index 00000000000..95dd84d09fb
--- /dev/null
+++ 
b/server/src/main/java/org/apache/kafka/server/metrics/ForwardingManagerMetrics.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.metrics;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Percentile;
+import org.apache.kafka.common.metrics.stats.Percentiles;
+import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+public final class ForwardingManagerMetrics implements AutoCloseable {
+
+    private final Metrics metrics;
+
+    private static final String METRIC_GROUP_NAME = "ForwardingManager";
+    private static final String QUEUE_TIME_MS_NAME = "QueueTimeMs";
+    private static final String REMOTE_TIME_MS_NAME = "RemoteTimeMs";
+
+    /**
+     * A histogram describing the amount of time in milliseconds each admin 
request spends in the broker's forwarding manager queue, waiting to be sent to 
the controller.
+     * This does not include the time that the request spends waiting for a 
response from the controller.
+     */
+    private final LatencyHistogram queueTimeMsHist;
+
+    /**
+     * A histogram describing the amount of time in milliseconds each request 
sent by the ForwardingManager spends waiting for a response.
+     * This does not include the time spent in the queue.
+     */
+    private final LatencyHistogram remoteTimeMsHist;
+
+    private final MetricName queueLengthName;
+    private final AtomicInteger queueLength = new AtomicInteger(0);
+
+    public ForwardingManagerMetrics(Metrics metrics, long timeoutMs) {
+        this.metrics = metrics;
+
+        this.queueTimeMsHist = new LatencyHistogram(metrics, 
QUEUE_TIME_MS_NAME, METRIC_GROUP_NAME, timeoutMs);
+        this.remoteTimeMsHist = new LatencyHistogram(metrics, 
REMOTE_TIME_MS_NAME, METRIC_GROUP_NAME, timeoutMs);
+
+        this.queueLengthName = metrics.metricName(
+            "QueueLength",
+            METRIC_GROUP_NAME,
+            "The current number of RPCs that are waiting in the broker's 
forwarding manager queue, waiting to be sent to the controller."
+        );
+        metrics.addMetric(queueLengthName, (Gauge<Integer>) (config, now) -> 
queueLength.get());
+    }
+
+    @Override
+    public void close() {
+        queueTimeMsHist.close();
+        remoteTimeMsHist.close();
+        metrics.removeMetric(queueLengthName);
+    }
+
+    public LatencyHistogram queueTimeMsHist() {
+        return queueTimeMsHist;
+    }
+
+    public LatencyHistogram remoteTimeMsHist() {
+        return remoteTimeMsHist;
+    }
+
+    public MetricName queueLengthName() {
+        return queueLengthName;
+    }
+
+    public void incrementQueueLength() {
+        queueLength.getAndIncrement();
+    }
+
+    public void decrementQueueLength() {
+        queueLength.getAndDecrement();
+    }
+
+    public static final class LatencyHistogram implements AutoCloseable {
+        private static final int SIZE_IN_BYTES = 4000;
+        private final Metrics metrics;
+        private final String name;
+        private final Sensor sensor;
+        private final MetricName latencyP99Name;
+        private final MetricName latencyP999Name;
+
+        private LatencyHistogram(Metrics metrics, String name, String group, 
long maxLatency) {
+            this.metrics = metrics;
+            this.name = name;
+            this.sensor = metrics.sensor(name);
+            this.latencyP99Name = metrics.metricName(name + ".p99", group);
+            this.latencyP999Name = metrics.metricName(name + ".p999", group);
+
+            sensor.add(new Percentiles(
+                SIZE_IN_BYTES,
+                maxLatency,
+                BucketSizing.CONSTANT,
+                new Percentile(latencyP99Name, 99),
+                new Percentile(latencyP999Name, 99.9)
+            ));
+        }
+
+        @Override
+        public void close() {
+            metrics.removeSensor(name);
+            metrics.removeMetric(latencyP99Name);
+            metrics.removeMetric(latencyP999Name);
+        }
+
+        public void record(long latencyMs) {
+            sensor.record(latencyMs);
+        }
+
+        // visible for test
+        public MetricName latencyP99Name() {
+            return latencyP99Name;
+        }
+
+        // visible for test
+        public MetricName latencyP999Name() {
+            return latencyP999Name;
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/server/src/test/java/org/apache/kafka/server/metrics/ForwardingManagerMetricsTest.java
 
b/server/src/test/java/org/apache/kafka/server/metrics/ForwardingManagerMetricsTest.java
new file mode 100644
index 00000000000..d3716366f90
--- /dev/null
+++ 
b/server/src/test/java/org/apache/kafka/server/metrics/ForwardingManagerMetricsTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.metrics;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Metrics;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class ForwardingManagerMetricsTest {
+    @Test
+    void testMetricsNames() {
+        String expectedGroup = "ForwardingManager";
+        Set<MetricName> expectedMetrics = Set.of(
+            new MetricName("QueueTimeMs.p99", expectedGroup, "", Map.of()),
+            new MetricName("QueueTimeMs.p999", expectedGroup, "", Map.of()),
+            new MetricName("QueueLength", expectedGroup, "", Map.of()),
+            new MetricName("RemoteTimeMs.p99", expectedGroup, "", Map.of()),
+            new MetricName("RemoteTimeMs.p999", expectedGroup, "", Map.of())
+        );
+
+        try (Metrics metrics = new Metrics()) {
+            Map<MetricName, ?> metricsMap = 
metrics.metrics().entrySet().stream()
+                .filter(entry -> entry.getKey().group().equals(expectedGroup))
+                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+            assertEquals(0, metricsMap.size());
+
+            try (ForwardingManagerMetrics ignore = new 
ForwardingManagerMetrics(metrics, 1000)) {
+                metricsMap = metrics.metrics().entrySet().stream()
+                    .filter(entry -> 
entry.getKey().group().equals(expectedGroup))
+                    .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+                assertEquals(expectedMetrics.size(), metricsMap.size());
+                metricsMap.keySet().forEach(name ->
+                    assertTrue(expectedMetrics.contains(name), "Metric " + 
name + " not found in expected set")
+                );
+            } finally {
+                metricsMap = metrics.metrics().entrySet().stream()
+                    .filter(entry -> 
entry.getKey().group().equals(expectedGroup))
+                    .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+                assertEquals(0, metricsMap.size());
+            }
+        }
+    }
+
+    @Test
+    void testQueueTimeMs() {
+        try (Metrics metrics = new Metrics();
+             ForwardingManagerMetrics forwardingManagerMetrics = new 
ForwardingManagerMetrics(metrics, 1000)) {
+            KafkaMetric queueTimeMsP99 = 
metrics.metrics().get(forwardingManagerMetrics.queueTimeMsHist().latencyP99Name());
+            KafkaMetric queueTimeMsP999 = 
metrics.metrics().get(forwardingManagerMetrics.queueTimeMsHist().latencyP999Name());
+            assertEquals(Double.NaN, (Double) queueTimeMsP99.metricValue());
+            assertEquals(Double.NaN, (Double) queueTimeMsP999.metricValue());
+            for (int i = 0; i < 1000; i++) {
+                forwardingManagerMetrics.queueTimeMsHist().record(i);
+            }
+            assertEquals(990.0, (Double) queueTimeMsP99.metricValue(), 0.1);
+            assertEquals(999.0, (Double) queueTimeMsP999.metricValue(), 0.1);
+        }
+    }
+
+    @Test
+    void testQueueLength() {
+        try (Metrics metrics = new Metrics();
+             ForwardingManagerMetrics forwardingManagerMetrics = new 
ForwardingManagerMetrics(metrics, 1000)) {
+            KafkaMetric queueLength = 
metrics.metrics().get(forwardingManagerMetrics.queueLengthName());
+            assertEquals(0, (Integer) queueLength.metricValue());
+            forwardingManagerMetrics.incrementQueueLength();
+            assertEquals(1, (Integer) queueLength.metricValue());
+        }
+    }
+
+    @Test
+    void testRemoteTimeMs() {
+        try (Metrics metrics = new Metrics();
+             ForwardingManagerMetrics forwardingManagerMetrics = new 
ForwardingManagerMetrics(metrics, 1000)) {
+            KafkaMetric remoteTimeMsP99 = 
metrics.metrics().get(forwardingManagerMetrics.remoteTimeMsHist().latencyP99Name());
+            KafkaMetric remoteTimeMsP999 = 
metrics.metrics().get(forwardingManagerMetrics.remoteTimeMsHist().latencyP999Name());
+            assertEquals(Double.NaN, (Double) remoteTimeMsP99.metricValue());
+            assertEquals(Double.NaN, (Double) remoteTimeMsP999.metricValue());
+            for (int i = 0; i < 1000; i++) {
+                forwardingManagerMetrics.remoteTimeMsHist().record(i);
+            }
+            assertEquals(990.0, (Double) remoteTimeMsP99.metricValue(), 0.1);
+            assertEquals(999.0, (Double) remoteTimeMsP999.metricValue(), 0.1);
+        }
+    }
+
+    @Test
+    void testTimeoutMs() {
+        long timeoutMs = 500;
+        try (Metrics metrics = new Metrics();
+             ForwardingManagerMetrics forwardingManagerMetrics = new 
ForwardingManagerMetrics(metrics, timeoutMs)) {
+            KafkaMetric queueTimeMsP99 = 
metrics.metrics().get(forwardingManagerMetrics.queueTimeMsHist().latencyP99Name());
+            KafkaMetric queueTimeMsP999 = 
metrics.metrics().get(forwardingManagerMetrics.queueTimeMsHist().latencyP999Name());
+            assertEquals(Double.NaN, (Double) queueTimeMsP99.metricValue());
+            assertEquals(Double.NaN, (Double) queueTimeMsP999.metricValue());
+            for (int i = 0; i < 100; i++) {
+                forwardingManagerMetrics.queueTimeMsHist().record(i);
+            }
+            forwardingManagerMetrics.queueTimeMsHist().record(1000);
+
+            assertEquals(99.0, (Double) queueTimeMsP99.metricValue(), 0.1);
+            assertEquals(timeoutMs * 0.999, (Double) 
queueTimeMsP999.metricValue(), 0.1);
+        }
+    }
+}
\ No newline at end of file

Reply via email to