This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 24f38145e7b [feat][monitor] Add publish latency histogram as OTel 
metrics (#24810)
24f38145e7b is described below

commit 24f38145e7beeee35d28f871ac9be65aa65fb1bc
Author: Yuri Mizushima <[email protected]>
AuthorDate: Tue Oct 7 21:50:54 2025 +0900

    [feat][monitor] Add publish latency histogram as OTel metrics (#24810)
---
 .../pulsar/broker/service/AbstractTopic.java       | 12 +--------
 .../apache/pulsar/broker/service/PulsarStats.java  |  5 ++++
 .../broker/stats/BrokerOperabilityMetrics.java     | 30 ++++++++++++++++++++++
 .../broker/stats/BrokerOpenTelemetryTestUtil.java  | 15 +++++++++++
 .../OpenTelemetryBrokerOperabilityStatsTest.java   | 26 +++++++++++++++++++
 5 files changed, 77 insertions(+), 11 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index e8253771ede..3ec6f5a0cd5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -67,7 +67,6 @@ import org.apache.pulsar.broker.service.plugin.EntryFilter;
 import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import 
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
 import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
-import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
@@ -899,7 +898,7 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener {
     public void recordAddLatency(long latency, TimeUnit unit) {
         addEntryLatencyStatsUsec.addValue(unit.toMicros(latency));
 
-        PUBLISH_LATENCY.observe(latency, unit);
+        brokerService.getPulsarStats().recordPublishLatency(latency, unit);
     }
 
     @Override
@@ -908,15 +907,6 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener {
         return RATE_LIMITED_UPDATER.incrementAndGet(this);
     }
 
-    private static final Summary PUBLISH_LATENCY = 
Summary.build("pulsar_broker_publish_latency", "-")
-            .quantile(0.0)
-            .quantile(0.50)
-            .quantile(0.95)
-            .quantile(0.99)
-            .quantile(0.999)
-            .quantile(0.9999)
-            .quantile(1.0)
-            .register();
 
     @Override
     public void incrementPublishCount(Producer producer, int numOfMessages, 
long msgSizeInBytes) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java
index b96e00a8909..45a0e8b42f0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Consumer;
 import lombok.Getter;
@@ -280,4 +281,8 @@ public class PulsarStats implements Closeable {
     public void recordConnectionCreateFail() {
         brokerOperabilityMetrics.recordConnectionCreateFail();
     }
+
+    public void recordPublishLatency(long latency, TimeUnit unit) {
+        brokerOperabilityMetrics.recordPublishLatency(latency, unit);
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java
index 1855e1798b4..310c14d4afa 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java
@@ -18,18 +18,22 @@
  */
 package org.apache.pulsar.broker.stats;
 
+import io.opentelemetry.api.metrics.DoubleHistogram;
 import io.opentelemetry.api.metrics.ObservableLongCounter;
 import io.prometheus.client.Counter;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.LongAdder;
 import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
 import org.apache.pulsar.common.stats.Metrics;
 import 
org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.ConnectionCreateStatus;
 import 
org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.ConnectionStatus;
+import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric;
 
 /**
  */
@@ -54,6 +58,19 @@ public class BrokerOperabilityMetrics implements 
AutoCloseable {
             "pulsar.broker.connection.create.operation.count";
     private final ObservableLongCounter connectionCreateCounter;
 
+    public static final String TOPIC_PUBLISH_LATENCY_METRIC_NAME = 
"pulsar.broker.topic.publish.latency";
+    private final DoubleHistogram topicPublishLatencyHistogram;
+    @PulsarDeprecatedMetric(newMetricName = TOPIC_PUBLISH_LATENCY_METRIC_NAME)
+    private static final Summary PUBLISH_LATENCY = 
Summary.build("pulsar_broker_publish_latency", "-")
+            .quantile(0.0)
+            .quantile(0.50)
+            .quantile(0.95)
+            .quantile(0.99)
+            .quantile(0.999)
+            .quantile(0.9999)
+            .quantile(1.0)
+            .register();
+
     public BrokerOperabilityMetrics(PulsarService pulsar) {
         this.metricsList = new ArrayList<>();
         this.localCluster = pulsar.getConfiguration().getClusterName();
@@ -87,6 +104,14 @@ public class BrokerOperabilityMetrics implements 
AutoCloseable {
                     measurement.record(connectionCreateSuccessCount.sum(), 
ConnectionCreateStatus.SUCCESS.attributes);
                     measurement.record(connectionCreateFailCount.sum(), 
ConnectionCreateStatus.FAILURE.attributes);
                 });
+
+        this.topicPublishLatencyHistogram = 
pulsar.getOpenTelemetry().getMeter()
+                .histogramBuilder(TOPIC_PUBLISH_LATENCY_METRIC_NAME)
+                .setUnit("s")
+                .setDescription("The latency in seconds for publishing 
messages")
+                .setExplicitBucketBoundariesAdvice(Arrays.asList(0.001, 0.005, 
0.01, 0.02, 0.05, 0.1,
+                        0.2, 0.5, 1.0, 5.0, 30.0))
+                .build();
     }
 
     @Override
@@ -195,4 +220,9 @@ public class BrokerOperabilityMetrics implements 
AutoCloseable {
     public void recordHealthCheckStatusFail() {
         this.healthCheckStatus = 0;
     }
+
+    public void recordPublishLatency(long latency, TimeUnit unit) {
+        this.topicPublishLatencyHistogram.record(unit.toMillis(latency) / 
1000.0);
+        PUBLISH_LATENCY.observe(latency, unit);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerOpenTelemetryTestUtil.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerOpenTelemetryTestUtil.java
index 3bfbf2064e1..76cad804e1e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerOpenTelemetryTestUtil.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerOpenTelemetryTestUtil.java
@@ -121,4 +121,19 @@ public class BrokerOpenTelemetryTestUtil {
                                             
valueConsumer.accept(point.getValue());
                                         }))));
     }
+
+    public static void assertMetricHistogramValue(Collection<MetricData> 
metrics, String metricName,
+                                                  Attributes attributes, 
Consumer<Long> countConsumer,
+                                                  Consumer<Double> 
sumConsumer) {
+        final Map<AttributeKey<?>, Object> attributesMap = attributes.asMap();
+        assertThat(metrics).anySatisfy(metric -> assertThat(metric)
+                        .hasName(metricName)
+                        .hasHistogramSatisfying(histogram -> 
histogram.satisfies(
+                                histoData -> 
assertThat(histoData.getPoints()).anySatisfy(
+                                        point -> {
+                                            
assertThat(point.getAttributes().asMap()).isEqualTo(attributesMap);
+                                            
countConsumer.accept(point.getCount());
+                                            sumConsumer.accept(point.getSum());
+                                        }))));
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryBrokerOperabilityStatsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryBrokerOperabilityStatsTest.java
index 4378e6b05b3..e197f3bc621 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryBrokerOperabilityStatsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryBrokerOperabilityStatsTest.java
@@ -18,8 +18,11 @@
  */
 package org.apache.pulsar.broker.stats;
 
+import static 
io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
+import static 
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricHistogramValue;
 import static 
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import io.opentelemetry.api.common.Attributes;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import org.apache.pulsar.broker.BrokerTestUtil;
@@ -101,4 +104,27 @@ public class OpenTelemetryBrokerOperabilityStatsTest 
extends BrokerTestBase {
         assertMetricLongSumValue(metrics, 
BrokerOperabilityMetrics.CONNECTION_CREATE_COUNTER_METRIC_NAME,
                 ConnectionCreateStatus.FAILURE.attributes, 1);
     }
+
+    @Test
+    public void testPublishLatency() throws Exception {
+        final var topicName = 
BrokerTestUtil.newUniqueName("persistent://my-namespace/use/my-ns/testPublishLatency");
+        @Cleanup
+        final var producer = 
pulsarClient.newProducer().topic(topicName).create();
+
+        producer.send(("msg").getBytes());
+
+        var metrics = 
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+        assertMetricHistogramValue(metrics, 
BrokerOperabilityMetrics.TOPIC_PUBLISH_LATENCY_METRIC_NAME,
+                Attributes.empty(), count -> assertThat(count).isEqualTo(1L),
+                sum -> assertThat(sum).isGreaterThan(0.0));
+
+        for (int i = 0; i < 9; i++) {
+            producer.send(("msg-" + i).getBytes());
+        }
+
+        metrics = 
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+        assertMetricHistogramValue(metrics, 
BrokerOperabilityMetrics.TOPIC_PUBLISH_LATENCY_METRIC_NAME,
+                Attributes.empty(), count -> assertThat(count).isEqualTo(10L),
+                sum -> assertThat(sum).isGreaterThan(0.0));
+    }
 }

Reply via email to