This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 24f38145e7b [feat][monitor] Add publish latency histogram as OTel
metrics (#24810)
24f38145e7b is described below
commit 24f38145e7beeee35d28f871ac9be65aa65fb1bc
Author: Yuri Mizushima <[email protected]>
AuthorDate: Tue Oct 7 21:50:54 2025 +0900
[feat][monitor] Add publish latency histogram as OTel metrics (#24810)
---
.../pulsar/broker/service/AbstractTopic.java | 12 +--------
.../apache/pulsar/broker/service/PulsarStats.java | 5 ++++
.../broker/stats/BrokerOperabilityMetrics.java | 30 ++++++++++++++++++++++
.../broker/stats/BrokerOpenTelemetryTestUtil.java | 15 +++++++++++
.../OpenTelemetryBrokerOperabilityStatsTest.java | 26 +++++++++++++++++++
5 files changed, 77 insertions(+), 11 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index e8253771ede..3ec6f5a0cd5 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -67,7 +67,6 @@ import org.apache.pulsar.broker.service.plugin.EntryFilter;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
-import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
@@ -899,7 +898,7 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener {
public void recordAddLatency(long latency, TimeUnit unit) {
addEntryLatencyStatsUsec.addValue(unit.toMicros(latency));
- PUBLISH_LATENCY.observe(latency, unit);
+ brokerService.getPulsarStats().recordPublishLatency(latency, unit);
}
@Override
@@ -908,15 +907,6 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener {
return RATE_LIMITED_UPDATER.incrementAndGet(this);
}
- private static final Summary PUBLISH_LATENCY =
Summary.build("pulsar_broker_publish_latency", "-")
- .quantile(0.0)
- .quantile(0.50)
- .quantile(0.95)
- .quantile(0.99)
- .quantile(0.999)
- .quantile(0.9999)
- .quantile(1.0)
- .register();
@Override
public void incrementPublishCount(Producer producer, int numOfMessages,
long msgSizeInBytes) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java
index b96e00a8909..45a0e8b42f0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import lombok.Getter;
@@ -280,4 +281,8 @@ public class PulsarStats implements Closeable {
public void recordConnectionCreateFail() {
brokerOperabilityMetrics.recordConnectionCreateFail();
}
+
+ public void recordPublishLatency(long latency, TimeUnit unit) {
+ brokerOperabilityMetrics.recordPublishLatency(latency, unit);
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java
index 1855e1798b4..310c14d4afa 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java
@@ -18,18 +18,22 @@
*/
package org.apache.pulsar.broker.stats;
+import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.ObservableLongCounter;
import io.prometheus.client.Counter;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.common.stats.Metrics;
import
org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.ConnectionCreateStatus;
import
org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.ConnectionStatus;
+import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric;
/**
*/
@@ -54,6 +58,19 @@ public class BrokerOperabilityMetrics implements
AutoCloseable {
"pulsar.broker.connection.create.operation.count";
private final ObservableLongCounter connectionCreateCounter;
+ public static final String TOPIC_PUBLISH_LATENCY_METRIC_NAME =
"pulsar.broker.topic.publish.latency";
+ private final DoubleHistogram topicPublishLatencyHistogram;
+ @PulsarDeprecatedMetric(newMetricName = TOPIC_PUBLISH_LATENCY_METRIC_NAME)
+ private static final Summary PUBLISH_LATENCY =
Summary.build("pulsar_broker_publish_latency", "-")
+ .quantile(0.0)
+ .quantile(0.50)
+ .quantile(0.95)
+ .quantile(0.99)
+ .quantile(0.999)
+ .quantile(0.9999)
+ .quantile(1.0)
+ .register();
+
public BrokerOperabilityMetrics(PulsarService pulsar) {
this.metricsList = new ArrayList<>();
this.localCluster = pulsar.getConfiguration().getClusterName();
@@ -87,6 +104,14 @@ public class BrokerOperabilityMetrics implements
AutoCloseable {
measurement.record(connectionCreateSuccessCount.sum(),
ConnectionCreateStatus.SUCCESS.attributes);
measurement.record(connectionCreateFailCount.sum(),
ConnectionCreateStatus.FAILURE.attributes);
});
+
+ this.topicPublishLatencyHistogram =
pulsar.getOpenTelemetry().getMeter()
+ .histogramBuilder(TOPIC_PUBLISH_LATENCY_METRIC_NAME)
+ .setUnit("s")
+ .setDescription("The latency in seconds for publishing
messages")
+ .setExplicitBucketBoundariesAdvice(Arrays.asList(0.001, 0.005,
0.01, 0.02, 0.05, 0.1,
+ 0.2, 0.5, 1.0, 5.0, 30.0))
+ .build();
}
@Override
@@ -195,4 +220,9 @@ public class BrokerOperabilityMetrics implements
AutoCloseable {
public void recordHealthCheckStatusFail() {
this.healthCheckStatus = 0;
}
+
+ public void recordPublishLatency(long latency, TimeUnit unit) {
+ this.topicPublishLatencyHistogram.record(unit.toMillis(latency) /
1000.0);
+ PUBLISH_LATENCY.observe(latency, unit);
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerOpenTelemetryTestUtil.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerOpenTelemetryTestUtil.java
index 3bfbf2064e1..76cad804e1e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerOpenTelemetryTestUtil.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerOpenTelemetryTestUtil.java
@@ -121,4 +121,19 @@ public class BrokerOpenTelemetryTestUtil {
valueConsumer.accept(point.getValue());
}))));
}
+
+ public static void assertMetricHistogramValue(Collection<MetricData>
metrics, String metricName,
+ Attributes attributes,
Consumer<Long> countConsumer,
+ Consumer<Double>
sumConsumer) {
+ final Map<AttributeKey<?>, Object> attributesMap = attributes.asMap();
+ assertThat(metrics).anySatisfy(metric -> assertThat(metric)
+ .hasName(metricName)
+ .hasHistogramSatisfying(histogram ->
histogram.satisfies(
+ histoData ->
assertThat(histoData.getPoints()).anySatisfy(
+ point -> {
+
assertThat(point.getAttributes().asMap()).isEqualTo(attributesMap);
+
countConsumer.accept(point.getCount());
+ sumConsumer.accept(point.getSum());
+ }))));
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryBrokerOperabilityStatsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryBrokerOperabilityStatsTest.java
index 4378e6b05b3..e197f3bc621 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryBrokerOperabilityStatsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryBrokerOperabilityStatsTest.java
@@ -18,8 +18,11 @@
*/
package org.apache.pulsar.broker.stats;
+import static
io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
+import static
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricHistogramValue;
import static
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import io.opentelemetry.api.common.Attributes;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.broker.BrokerTestUtil;
@@ -101,4 +104,27 @@ public class OpenTelemetryBrokerOperabilityStatsTest
extends BrokerTestBase {
assertMetricLongSumValue(metrics,
BrokerOperabilityMetrics.CONNECTION_CREATE_COUNTER_METRIC_NAME,
ConnectionCreateStatus.FAILURE.attributes, 1);
}
+
+ @Test
+ public void testPublishLatency() throws Exception {
+ final var topicName =
BrokerTestUtil.newUniqueName("persistent://my-namespace/use/my-ns/testPublishLatency");
+ @Cleanup
+ final var producer =
pulsarClient.newProducer().topic(topicName).create();
+
+ producer.send(("msg").getBytes());
+
+ var metrics =
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+ assertMetricHistogramValue(metrics,
BrokerOperabilityMetrics.TOPIC_PUBLISH_LATENCY_METRIC_NAME,
+ Attributes.empty(), count -> assertThat(count).isEqualTo(1L),
+ sum -> assertThat(sum).isGreaterThan(0.0));
+
+ for (int i = 0; i < 9; i++) {
+ producer.send(("msg-" + i).getBytes());
+ }
+
+ metrics =
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+ assertMetricHistogramValue(metrics,
BrokerOperabilityMetrics.TOPIC_PUBLISH_LATENCY_METRIC_NAME,
+ Attributes.empty(), count -> assertThat(count).isEqualTo(10L),
+ sum -> assertThat(sum).isGreaterThan(0.0));
+ }
}