This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 424e7251d69 KAFKA-19207 Move ForwardingManagerMetrics and
ForwardingManagerMetricsTest to server module (#19574)
424e7251d69 is described below
commit 424e7251d69e65bfd6155e526d476a4eb07d93e1
Author: PoAn Yang <[email protected]>
AuthorDate: Tue May 6 07:03:17 2025 -0500
KAFKA-19207 Move ForwardingManagerMetrics and ForwardingManagerMetricsTest
to server module (#19574)
1. Move `ForwardingManagerMetrics` and `ForwardingManagerMetricsTest` to
server module.
2. Rewrite them in Java.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../scala/kafka/server/ForwardingManager.scala | 9 +-
.../kafka/server/ForwardingManagerMetrics.scala | 100 ---------------
.../server/ForwardingManagerMetricsTest.scala | 114 -----------------
.../unit/kafka/server/ForwardingManagerTest.scala | 2 +-
.../server/metrics/ForwardingManagerMetrics.java | 138 +++++++++++++++++++++
.../metrics/ForwardingManagerMetricsTest.java | 129 +++++++++++++++++++
6 files changed, 273 insertions(+), 219 deletions(-)
diff --git a/core/src/main/scala/kafka/server/ForwardingManager.scala
b/core/src/main/scala/kafka/server/ForwardingManager.scala
index c067000bf0c..7737d2d2171 100644
--- a/core/src/main/scala/kafka/server/ForwardingManager.scala
+++ b/core/src/main/scala/kafka/server/ForwardingManager.scala
@@ -26,6 +26,7 @@ import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse,
EnvelopeRequest, EnvelopeResponse, RequestContext, RequestHeader}
import org.apache.kafka.server.common.{ControllerRequestCompletionHandler,
NodeToControllerChannelManager}
+import org.apache.kafka.server.metrics.ForwardingManagerMetrics
import java.util.Optional
import java.util.concurrent.TimeUnit
@@ -117,7 +118,7 @@ class ForwardingManagerImpl(
metrics: Metrics
) extends ForwardingManager with AutoCloseable with Logging {
- val forwardingManagerMetrics: ForwardingManagerMetrics =
ForwardingManagerMetrics(metrics, channelManager.getTimeoutMs)
+ val forwardingManagerMetrics: ForwardingManagerMetrics = new
ForwardingManagerMetrics(metrics, channelManager.getTimeoutMs)
override def forwardRequest(
requestContext: RequestContext,
@@ -133,7 +134,7 @@ class ForwardingManagerImpl(
class ForwardingResponseHandler extends ControllerRequestCompletionHandler
{
override def onComplete(clientResponse: ClientResponse): Unit = {
- forwardingManagerMetrics.queueLength.getAndDecrement()
+ forwardingManagerMetrics.decrementQueueLength()
forwardingManagerMetrics.remoteTimeMsHist.record(clientResponse.requestLatencyMs())
forwardingManagerMetrics.queueTimeMsHist.record(clientResponse.receivedTimeMs()
- clientResponse.requestLatencyMs() - requestCreationTimeMs)
@@ -174,14 +175,14 @@ class ForwardingManagerImpl(
override def onTimeout(): Unit = {
debug(s"Forwarding of the request ${requestToString()} failed due to
timeout exception")
- forwardingManagerMetrics.queueLength.getAndDecrement()
+ forwardingManagerMetrics.decrementQueueLength()
forwardingManagerMetrics.queueTimeMsHist.record(channelManager.getTimeoutMs)
val response = requestBody.getErrorResponse(new TimeoutException())
responseCallback(Option(response))
}
}
- forwardingManagerMetrics.queueLength.getAndIncrement()
+ forwardingManagerMetrics.incrementQueueLength()
channelManager.sendRequest(envelopeRequest, new ForwardingResponseHandler)
}
diff --git a/core/src/main/scala/kafka/server/ForwardingManagerMetrics.scala
b/core/src/main/scala/kafka/server/ForwardingManagerMetrics.scala
deleted file mode 100644
index a846f8c4955..00000000000
--- a/core/src/main/scala/kafka/server/ForwardingManagerMetrics.scala
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import org.apache.kafka.common.MetricName
-import org.apache.kafka.common.metrics.{Gauge, MetricConfig, Metrics}
-import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing
-import org.apache.kafka.common.metrics.stats.{Percentile, Percentiles}
-
-import java.util.concurrent.atomic.AtomicInteger
-
-final class ForwardingManagerMetrics private (
- metrics: Metrics,
- timeoutMs: Long,
-) extends AutoCloseable {
- import ForwardingManagerMetrics._
-
- /**
- * A histogram describing the amount of time in milliseconds each admin
request spends in the broker's forwarding manager queue, waiting to be sent to
the controller.
- * This does not include the time that the request spends waiting for a
response from the controller.
- */
- val queueTimeMsHist: LatencyHistogram = new LatencyHistogram(metrics,
queueTimeMsName, metricGroupName, timeoutMs)
-
- /**
- * A histogram describing the amount of time in milliseconds each request
sent by the ForwardingManager spends waiting for a response.
- * This does not include the time spent in the queue.
- */
- val remoteTimeMsHist: LatencyHistogram = new LatencyHistogram(metrics,
remoteTimeMsName, metricGroupName, timeoutMs)
-
- val queueLengthName: MetricName = metrics.metricName(
- "QueueLength",
- metricGroupName,
- "The current number of RPCs that are waiting in the broker's forwarding
manager queue, waiting to be sent to the controller."
- )
- val queueLength: AtomicInteger = new AtomicInteger(0)
- metrics.addMetric(queueLengthName, new FuncGauge(_ => queueLength.get()))
-
- override def close(): Unit = {
- queueTimeMsHist.close()
- remoteTimeMsHist.close()
- metrics.removeMetric(queueLengthName)
- }
-}
-
-object ForwardingManagerMetrics {
-
- val metricGroupName = "ForwardingManager"
- val queueTimeMsName = "QueueTimeMs"
- val remoteTimeMsName = "RemoteTimeMs"
-
- final class LatencyHistogram (
- metrics: Metrics,
- name: String,
- group: String,
- maxLatency: Long
- ) extends AutoCloseable {
- private val sensor = metrics.sensor(name)
- val latencyP99Name: MetricName = metrics.metricName(s"$name.p99", group)
- val latencyP999Name: MetricName = metrics.metricName(s"$name.p999", group)
-
- sensor.add(new Percentiles(
- 4000,
- maxLatency,
- BucketSizing.CONSTANT,
- new Percentile(latencyP99Name, 99),
- new Percentile(latencyP999Name, 99.9)
- ))
-
- override def close(): Unit = {
- metrics.removeSensor(name)
- metrics.removeMetric(latencyP99Name)
- metrics.removeMetric(latencyP999Name)
- }
-
- def record(latencyMs: Long): Unit = sensor.record(latencyMs)
- }
-
- private final class FuncGauge[T](func: Long => T) extends Gauge[T] {
- override def value(config: MetricConfig, now: Long): T = {
- func(now)
- }
- }
-
- def apply(metrics: Metrics, timeoutMs: Long): ForwardingManagerMetrics = new
ForwardingManagerMetrics(metrics, timeoutMs)
-}
diff --git
a/core/src/test/scala/unit/kafka/server/ForwardingManagerMetricsTest.scala
b/core/src/test/scala/unit/kafka/server/ForwardingManagerMetricsTest.scala
deleted file mode 100644
index 2c10decb3ed..00000000000
--- a/core/src/test/scala/unit/kafka/server/ForwardingManagerMetricsTest.scala
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package unit.kafka.server
-
-import kafka.server.ForwardingManagerMetrics
-import org.apache.kafka.common.MetricName
-import org.apache.kafka.common.metrics.Metrics
-import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
-import org.junit.jupiter.api.Test
-
-import java.util.Collections
-import scala.jdk.CollectionConverters._
-
-final class ForwardingManagerMetricsTest {
- @Test
- def testMetricsNames(): Unit = {
- val metrics = new Metrics()
- val expectedGroup = "ForwardingManager"
-
- val expectedMetrics = Set(
- new MetricName("QueueTimeMs.p99", expectedGroup, "",
Collections.emptyMap()),
- new MetricName("QueueTimeMs.p999", expectedGroup, "",
Collections.emptyMap()),
- new MetricName("QueueLength", expectedGroup, "", Collections.emptyMap()),
- new MetricName("RemoteTimeMs.p99", expectedGroup, "",
Collections.emptyMap()),
- new MetricName("RemoteTimeMs.p999", expectedGroup, "",
Collections.emptyMap())
- )
-
- var metricsMap = metrics.metrics().asScala.filter { case (name, _) =>
name.group == expectedGroup }
- assertEquals(0, metricsMap.size)
-
- ForwardingManagerMetrics(metrics, 1000)
- metricsMap = metrics.metrics().asScala.filter { case (name, _) =>
name.group == expectedGroup }
- assertEquals(metricsMap.size, expectedMetrics.size)
- metricsMap.foreach { case (name, _) =>
- assertTrue(expectedMetrics.contains(name))
- }
- }
-
- @Test
- def testQueueTimeMs(): Unit = {
- val metrics = new Metrics()
-
- val forwardingManagerMetrics = ForwardingManagerMetrics(metrics, 1000)
- val queueTimeMsP99 =
metrics.metrics().get(forwardingManagerMetrics.queueTimeMsHist.latencyP99Name)
- val queueTimeMsP999 =
metrics.metrics().get(forwardingManagerMetrics.queueTimeMsHist.latencyP999Name)
- assertEquals(Double.NaN, queueTimeMsP99.metricValue.asInstanceOf[Double])
- assertEquals(Double.NaN, queueTimeMsP999.metricValue.asInstanceOf[Double])
- for(i <- 0 to 999) {
- forwardingManagerMetrics.queueTimeMsHist.record(i)
- }
- assertEquals(990.0, queueTimeMsP99.metricValue.asInstanceOf[Double])
- assertEquals(999.0, queueTimeMsP999.metricValue.asInstanceOf[Double])
- }
-
- @Test
- def testQueueLength(): Unit = {
- val metrics = new Metrics()
-
- val forwardingManagerMetrics = ForwardingManagerMetrics(metrics, 1000)
- val queueLength =
metrics.metrics().get(forwardingManagerMetrics.queueLengthName)
- assertEquals(0, queueLength.metricValue.asInstanceOf[Int])
- forwardingManagerMetrics.queueLength.getAndIncrement()
- assertEquals(1, queueLength.metricValue.asInstanceOf[Int])
- }
-
- @Test
- def testRemoteTimeMs(): Unit = {
- val metrics = new Metrics()
-
- val forwardingManagerMetrics = ForwardingManagerMetrics(metrics, 1000)
- val remoteTimeMsP99 =
metrics.metrics().get(forwardingManagerMetrics.remoteTimeMsHist.latencyP99Name)
- val remoteTimeMsP999 =
metrics.metrics().get(forwardingManagerMetrics.remoteTimeMsHist.latencyP999Name)
- assertEquals(Double.NaN, remoteTimeMsP99.metricValue.asInstanceOf[Double])
- assertEquals(Double.NaN, remoteTimeMsP999.metricValue.asInstanceOf[Double])
- for (i <- 0 to 999) {
- forwardingManagerMetrics.remoteTimeMsHist.record(i)
- }
- assertEquals(990.0, remoteTimeMsP99.metricValue.asInstanceOf[Double])
- assertEquals(999.0, remoteTimeMsP999.metricValue.asInstanceOf[Double])
- }
-
- @Test
- def testTimeoutMs(): Unit = {
- val metrics = new Metrics()
- val timeoutMs = 500
- val forwardingManagerMetrics = ForwardingManagerMetrics(metrics, timeoutMs)
- val queueTimeMsP99 =
metrics.metrics().get(forwardingManagerMetrics.queueTimeMsHist.latencyP99Name)
- val queueTimeMsP999 =
metrics.metrics().get(forwardingManagerMetrics.queueTimeMsHist.latencyP999Name)
- assertEquals(Double.NaN, queueTimeMsP99.metricValue.asInstanceOf[Double])
- assertEquals(Double.NaN, queueTimeMsP999.metricValue.asInstanceOf[Double])
- for(i <- 0 to 99) {
- forwardingManagerMetrics.queueTimeMsHist.record(i)
- }
- forwardingManagerMetrics.queueTimeMsHist.record(1000)
-
- assertEquals(99, queueTimeMsP99.metricValue.asInstanceOf[Double])
- assertEquals(timeoutMs * 0.999,
queueTimeMsP999.metricValue.asInstanceOf[Double])
- }
-}
diff --git a/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
index d2d8d3e0382..16e4b2bcb66 100644
--- a/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
@@ -53,7 +53,7 @@ class ForwardingManagerTest {
private val forwardingManager = new
ForwardingManagerImpl(brokerToController, metrics)
private val principalBuilder = new DefaultKafkaPrincipalBuilder(null, null)
private val queueTimeMsP999 =
metrics.metrics().get(forwardingManager.forwardingManagerMetrics.queueTimeMsHist.latencyP999Name)
- private val queueLength =
metrics.metrics().get(forwardingManager.forwardingManagerMetrics.queueLengthName)
+ private val queueLength =
metrics.metrics().get(forwardingManager.forwardingManagerMetrics.queueLengthName())
private val remoteTimeMsP999 =
metrics.metrics().get(forwardingManager.forwardingManagerMetrics.remoteTimeMsHist.latencyP999Name)
private def controllerApiVersions: NodeApiVersions = {
diff --git
a/server/src/main/java/org/apache/kafka/server/metrics/ForwardingManagerMetrics.java
b/server/src/main/java/org/apache/kafka/server/metrics/ForwardingManagerMetrics.java
new file mode 100644
index 00000000000..95dd84d09fb
--- /dev/null
+++
b/server/src/main/java/org/apache/kafka/server/metrics/ForwardingManagerMetrics.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.metrics;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Percentile;
+import org.apache.kafka.common.metrics.stats.Percentiles;
+import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+public final class ForwardingManagerMetrics implements AutoCloseable {
+
+ private final Metrics metrics;
+
+ private static final String METRIC_GROUP_NAME = "ForwardingManager";
+ private static final String QUEUE_TIME_MS_NAME = "QueueTimeMs";
+ private static final String REMOTE_TIME_MS_NAME = "RemoteTimeMs";
+
+ /**
+ * A histogram describing the amount of time in milliseconds each admin
request spends in the broker's forwarding manager queue, waiting to be sent to
the controller.
+ * This does not include the time that the request spends waiting for a
response from the controller.
+ */
+ private final LatencyHistogram queueTimeMsHist;
+
+ /**
+ * A histogram describing the amount of time in milliseconds each request
sent by the ForwardingManager spends waiting for a response.
+ * This does not include the time spent in the queue.
+ */
+ private final LatencyHistogram remoteTimeMsHist;
+
+ private final MetricName queueLengthName;
+ private final AtomicInteger queueLength = new AtomicInteger(0);
+
+ public ForwardingManagerMetrics(Metrics metrics, long timeoutMs) {
+ this.metrics = metrics;
+
+ this.queueTimeMsHist = new LatencyHistogram(metrics,
QUEUE_TIME_MS_NAME, METRIC_GROUP_NAME, timeoutMs);
+ this.remoteTimeMsHist = new LatencyHistogram(metrics,
REMOTE_TIME_MS_NAME, METRIC_GROUP_NAME, timeoutMs);
+
+ this.queueLengthName = metrics.metricName(
+ "QueueLength",
+ METRIC_GROUP_NAME,
+ "The current number of RPCs that are waiting in the broker's
forwarding manager queue, waiting to be sent to the controller."
+ );
+ metrics.addMetric(queueLengthName, (Gauge<Integer>) (config, now) ->
queueLength.get());
+ }
+
+ @Override
+ public void close() {
+ queueTimeMsHist.close();
+ remoteTimeMsHist.close();
+ metrics.removeMetric(queueLengthName);
+ }
+
+ public LatencyHistogram queueTimeMsHist() {
+ return queueTimeMsHist;
+ }
+
+ public LatencyHistogram remoteTimeMsHist() {
+ return remoteTimeMsHist;
+ }
+
+ public MetricName queueLengthName() {
+ return queueLengthName;
+ }
+
+ public void incrementQueueLength() {
+ queueLength.getAndIncrement();
+ }
+
+ public void decrementQueueLength() {
+ queueLength.getAndDecrement();
+ }
+
+ public static final class LatencyHistogram implements AutoCloseable {
+ private static final int SIZE_IN_BYTES = 4000;
+ private final Metrics metrics;
+ private final String name;
+ private final Sensor sensor;
+ private final MetricName latencyP99Name;
+ private final MetricName latencyP999Name;
+
+ private LatencyHistogram(Metrics metrics, String name, String group,
long maxLatency) {
+ this.metrics = metrics;
+ this.name = name;
+ this.sensor = metrics.sensor(name);
+ this.latencyP99Name = metrics.metricName(name + ".p99", group);
+ this.latencyP999Name = metrics.metricName(name + ".p999", group);
+
+ sensor.add(new Percentiles(
+ SIZE_IN_BYTES,
+ maxLatency,
+ BucketSizing.CONSTANT,
+ new Percentile(latencyP99Name, 99),
+ new Percentile(latencyP999Name, 99.9)
+ ));
+ }
+
+ @Override
+ public void close() {
+ metrics.removeSensor(name);
+ metrics.removeMetric(latencyP99Name);
+ metrics.removeMetric(latencyP999Name);
+ }
+
+ public void record(long latencyMs) {
+ sensor.record(latencyMs);
+ }
+
+ // visible for test
+ public MetricName latencyP99Name() {
+ return latencyP99Name;
+ }
+
+ // visible for test
+ public MetricName latencyP999Name() {
+ return latencyP999Name;
+ }
+ }
+}
\ No newline at end of file
diff --git
a/server/src/test/java/org/apache/kafka/server/metrics/ForwardingManagerMetricsTest.java
b/server/src/test/java/org/apache/kafka/server/metrics/ForwardingManagerMetricsTest.java
new file mode 100644
index 00000000000..d3716366f90
--- /dev/null
+++
b/server/src/test/java/org/apache/kafka/server/metrics/ForwardingManagerMetricsTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.metrics;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Metrics;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class ForwardingManagerMetricsTest {
+ @Test
+ void testMetricsNames() {
+ String expectedGroup = "ForwardingManager";
+ Set<MetricName> expectedMetrics = Set.of(
+ new MetricName("QueueTimeMs.p99", expectedGroup, "", Map.of()),
+ new MetricName("QueueTimeMs.p999", expectedGroup, "", Map.of()),
+ new MetricName("QueueLength", expectedGroup, "", Map.of()),
+ new MetricName("RemoteTimeMs.p99", expectedGroup, "", Map.of()),
+ new MetricName("RemoteTimeMs.p999", expectedGroup, "", Map.of())
+ );
+
+ try (Metrics metrics = new Metrics()) {
+ Map<MetricName, ?> metricsMap =
metrics.metrics().entrySet().stream()
+ .filter(entry -> entry.getKey().group().equals(expectedGroup))
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
+ assertEquals(0, metricsMap.size());
+
+ try (ForwardingManagerMetrics ignore = new
ForwardingManagerMetrics(metrics, 1000)) {
+ metricsMap = metrics.metrics().entrySet().stream()
+ .filter(entry ->
entry.getKey().group().equals(expectedGroup))
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
+
+ assertEquals(expectedMetrics.size(), metricsMap.size());
+ metricsMap.keySet().forEach(name ->
+ assertTrue(expectedMetrics.contains(name), "Metric " +
name + " not found in expected set")
+ );
+ } finally {
+ metricsMap = metrics.metrics().entrySet().stream()
+ .filter(entry ->
entry.getKey().group().equals(expectedGroup))
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
+ assertEquals(0, metricsMap.size());
+ }
+ }
+ }
+
+ @Test
+ void testQueueTimeMs() {
+ try (Metrics metrics = new Metrics();
+ ForwardingManagerMetrics forwardingManagerMetrics = new
ForwardingManagerMetrics(metrics, 1000)) {
+ KafkaMetric queueTimeMsP99 =
metrics.metrics().get(forwardingManagerMetrics.queueTimeMsHist().latencyP99Name());
+ KafkaMetric queueTimeMsP999 =
metrics.metrics().get(forwardingManagerMetrics.queueTimeMsHist().latencyP999Name());
+ assertEquals(Double.NaN, (Double) queueTimeMsP99.metricValue());
+ assertEquals(Double.NaN, (Double) queueTimeMsP999.metricValue());
+ for (int i = 0; i < 1000; i++) {
+ forwardingManagerMetrics.queueTimeMsHist().record(i);
+ }
+ assertEquals(990.0, (Double) queueTimeMsP99.metricValue(), 0.1);
+ assertEquals(999.0, (Double) queueTimeMsP999.metricValue(), 0.1);
+ }
+ }
+
+ @Test
+ void testQueueLength() {
+ try (Metrics metrics = new Metrics();
+ ForwardingManagerMetrics forwardingManagerMetrics = new
ForwardingManagerMetrics(metrics, 1000)) {
+ KafkaMetric queueLength =
metrics.metrics().get(forwardingManagerMetrics.queueLengthName());
+ assertEquals(0, (Integer) queueLength.metricValue());
+ forwardingManagerMetrics.incrementQueueLength();
+ assertEquals(1, (Integer) queueLength.metricValue());
+ }
+ }
+
+ @Test
+ void testRemoteTimeMs() {
+ try (Metrics metrics = new Metrics();
+ ForwardingManagerMetrics forwardingManagerMetrics = new
ForwardingManagerMetrics(metrics, 1000)) {
+ KafkaMetric remoteTimeMsP99 =
metrics.metrics().get(forwardingManagerMetrics.remoteTimeMsHist().latencyP99Name());
+ KafkaMetric remoteTimeMsP999 =
metrics.metrics().get(forwardingManagerMetrics.remoteTimeMsHist().latencyP999Name());
+ assertEquals(Double.NaN, (Double) remoteTimeMsP99.metricValue());
+ assertEquals(Double.NaN, (Double) remoteTimeMsP999.metricValue());
+ for (int i = 0; i < 1000; i++) {
+ forwardingManagerMetrics.remoteTimeMsHist().record(i);
+ }
+ assertEquals(990.0, (Double) remoteTimeMsP99.metricValue(), 0.1);
+ assertEquals(999.0, (Double) remoteTimeMsP999.metricValue(), 0.1);
+ }
+ }
+
+ @Test
+ void testTimeoutMs() {
+ long timeoutMs = 500;
+ try (Metrics metrics = new Metrics();
+ ForwardingManagerMetrics forwardingManagerMetrics = new
ForwardingManagerMetrics(metrics, timeoutMs)) {
+ KafkaMetric queueTimeMsP99 =
metrics.metrics().get(forwardingManagerMetrics.queueTimeMsHist().latencyP99Name());
+ KafkaMetric queueTimeMsP999 =
metrics.metrics().get(forwardingManagerMetrics.queueTimeMsHist().latencyP999Name());
+ assertEquals(Double.NaN, (Double) queueTimeMsP99.metricValue());
+ assertEquals(Double.NaN, (Double) queueTimeMsP999.metricValue());
+ for (int i = 0; i < 100; i++) {
+ forwardingManagerMetrics.queueTimeMsHist().record(i);
+ }
+ forwardingManagerMetrics.queueTimeMsHist().record(1000);
+
+ assertEquals(99.0, (Double) queueTimeMsP99.metricValue(), 0.1);
+ assertEquals(timeoutMs * 0.999, (Double)
queueTimeMsP999.metricValue(), 0.1);
+ }
+ }
+}
\ No newline at end of file