This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 02a7bbce7eb [improve][client] Add OpenTelemetry metrics for client
memory buffer usage (#24647)
02a7bbce7eb is described below
commit 02a7bbce7eb107a3cc849d54e9d373b1d57659b9
Author: Penghui Li <[email protected]>
AuthorDate: Wed Aug 20 10:30:10 2025 -0700
[improve][client] Add OpenTelemetry metrics for client memory buffer usage
(#24647)
(cherry picked from commit a66e8068058664d65fe71d5d711a14a898840b46)
---
.../pulsar/client/metrics/ClientMetricsTest.java | 82 +++++++++++
.../pulsar/client/impl/MemoryLimitController.java | 4 +
.../pulsar/client/impl/PulsarClientImpl.java | 16 +++
.../client/impl/metrics/InstrumentProvider.java | 8 ++
.../client/impl/metrics/MemoryBufferStats.java | 63 +++++++++
.../impl/metrics/ObservableUpDownCounter.java | 79 +++++++++++
.../client/impl/metrics/MemoryBufferStatsTest.java | 152 +++++++++++++++++++++
7 files changed, 404 insertions(+)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/metrics/ClientMetricsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/metrics/ClientMetricsTest.java
index 02b38acf865..5728f3d0e01 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/metrics/ClientMetricsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/metrics/ClientMetricsTest.java
@@ -38,8 +38,10 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.api.SubscriptionType;
import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -343,4 +345,84 @@ public class ClientMetricsTest extends
ProducerConsumerBase {
assertCounterValue(metrics, "pulsar.client.consumer.closed", 1,
nsAttrs);
assertCounterValue(metrics, "pulsar.client.connection.closed", 1,
Attributes.empty());
}
+
+ @Test
+ public void testMemoryBufferMetrics() throws Exception {
+ String topic = newTopicName();
+ long memoryLimit = 1024 * 1024; // 1MB
+
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+ .openTelemetry(otel)
+ .memoryLimit(memoryLimit,
org.apache.pulsar.client.api.SizeUnit.BYTES)
+ .build();
+
+ Producer<byte[]> producer = client.newProducer()
+ .topic(topic)
+ .batchingMaxPublishDelay(1, TimeUnit.DAYS)
+ .batchingMaxBytes(1024 * 1024)
+ .create();
+
+ var metrics = collectMetrics();
+
+ // Verify memory buffer limit is reported correctly
+ assertCounterValue(metrics, "pulsar.client.memory.buffer.limit",
memoryLimit, Attributes.empty());
+
+ // Initially, memory usage should be 0 or very low
+ long initialUsage = getCounterValue(metrics,
"pulsar.client.memory.buffer.usage", Attributes.empty());
+
Assertions.assertThat(initialUsage).isGreaterThanOrEqualTo(0).isLessThan(memoryLimit
/ 4);
+
+ producer.sendAsync(new byte[512 * 1024]);
+
+ metrics = collectMetrics();
+
+ // Verify memory usage increased
+ long usageAfterSend = getCounterValue(metrics,
"pulsar.client.memory.buffer.usage", Attributes.empty());
+ Assertions.assertThat(usageAfterSend).isGreaterThan(initialUsage);
+
+ // Verify limit is still correct
+ assertCounterValue(metrics, "pulsar.client.memory.buffer.limit",
memoryLimit, Attributes.empty());
+
+ // Flush all pending messages
+ producer.flush();
+
+ Awaitility.await().untilAsserted(() -> {
+ var newMetrics = collectMetrics();
+ // Memory usage should be lower after flushing
+ long usageAfterFlush = getCounterValue(newMetrics,
"pulsar.client.memory.buffer.usage", Attributes.empty());
+
Assertions.assertThat(usageAfterFlush).isLessThanOrEqualTo(usageAfterSend);
+ });
+
+ producer.close();
+ client.close();
+ }
+
+ @Test
+ public void testMemoryBufferMetricsWithNoLimit() throws Exception {
+ // Create client without memory limit
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+ .openTelemetry(otel)
+ .memoryLimit(0L, SizeUnit.BYTES)
+ .build();
+
+ String topic = newTopicName();
+ Producer<String> producer = client.newProducer(Schema.STRING)
+ .topic(topic)
+ .create();
+
+ producer.send("test message");
+
+ var metrics = collectMetrics();
+
+ // When memory limiting is disabled, buffer metrics should not be
present at all
+ boolean hasUsageMetric =
metrics.containsKey("pulsar.client.memory.buffer.usage");
+ boolean hasLimitMetric =
metrics.containsKey("pulsar.client.memory.buffer.limit");
+
+ // Since memory limiting is disabled, these metrics should not exist
+ Assertions.assertThat(hasUsageMetric).isFalse();
+ Assertions.assertThat(hasLimitMetric).isFalse();
+ producer.close();
+ client.close();
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java
index c15821c0543..d7acfd69128 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java
@@ -145,4 +145,8 @@ public class MemoryLimitController {
public boolean isMemoryLimited() {
return memoryLimit > 0;
}
+
+ public long memoryLimit() {
+ return memoryLimit;
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 950a01afb46..01b27f491e1 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -74,6 +74,7 @@ import
org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
+import org.apache.pulsar.client.impl.metrics.MemoryBufferStats;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
@@ -146,6 +147,7 @@ public class PulsarClientImpl implements PulsarClient {
protected final EventLoopGroup eventLoopGroup;
private final MemoryLimitController memoryLimitController;
+ private final MemoryBufferStats memoryBufferStats;
private final LoadingCache<String, SchemaInfoProvider>
schemaProviderLoadingCache =
CacheBuilder.newBuilder().maximumSize(100000)
@@ -265,6 +267,12 @@ public class PulsarClientImpl implements PulsarClient {
memoryLimitController = new
MemoryLimitController(conf.getMemoryLimitBytes(),
(long) (conf.getMemoryLimitBytes() *
THRESHOLD_FOR_CONSUMER_RECEIVER_QUEUE_SIZE_SHRINKING),
this::reduceConsumerReceiverQueueSize);
+ // Only create memory buffer metrics if memory limiting is enabled
+ if (memoryLimitController.isMemoryLimited()) {
+ memoryBufferStats = new MemoryBufferStats(instrumentProvider,
memoryLimitController);
+ } else {
+ memoryBufferStats = null;
+ }
state.set(State.Open);
} catch (Throwable t) {
// Log the exception first, or it could be missed if there are any
subsequent exceptions in the
@@ -938,6 +946,14 @@ public class PulsarClientImpl implements PulsarClient {
} catch (PulsarClientException e) {
throwable = e;
}
+ if (memoryBufferStats != null) {
+ try {
+ memoryBufferStats.close();
+ } catch (Throwable t) {
+ log.warn("Failed to close memoryBufferStats", t);
+ throwable = t;
+ }
+ }
if (conf != null && conf.getAuthentication() != null) {
try {
conf.getAuthentication().close();
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/InstrumentProvider.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/InstrumentProvider.java
index 1e02af1fd37..a0bdd8b6fb6 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/InstrumentProvider.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/InstrumentProvider.java
@@ -23,6 +23,8 @@ import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.ObservableLongMeasurement;
+import java.util.function.Consumer;
import org.apache.pulsar.PulsarVersion;
public class InstrumentProvider {
@@ -55,4 +57,10 @@ public class InstrumentProvider {
public LatencyHistogram newLatencyHistogram(String name, String
description, String topic, Attributes attributes) {
return new LatencyHistogram(meter, name, description, topic,
attributes);
}
+
+ public ObservableUpDownCounter newObservableUpDownCounter(String name,
Unit unit, String description,
+ String topic,
Attributes attributes,
+
Consumer<ObservableLongMeasurement> callback) {
+ return new ObservableUpDownCounter(meter, name, unit, description,
topic, attributes, callback);
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/MemoryBufferStats.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/MemoryBufferStats.java
new file mode 100644
index 00000000000..4868562ffd3
--- /dev/null
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/MemoryBufferStats.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.impl.metrics;
+
+import org.apache.pulsar.client.impl.MemoryLimitController;
+
+public class MemoryBufferStats implements AutoCloseable {
+
+ public static final String BUFFER_USAGE_COUNTER =
"pulsar.client.memory.buffer.usage";
+ private final ObservableUpDownCounter bufferUsageCounter;
+
+ public static final String BUFFER_LIMIT_COUNTER =
"pulsar.client.memory.buffer.limit";
+ private final ObservableUpDownCounter bufferLimitCounter;
+
+ public MemoryBufferStats(InstrumentProvider instrumentProvider,
MemoryLimitController memoryLimitController) {
+ bufferUsageCounter = instrumentProvider.newObservableUpDownCounter(
+ BUFFER_USAGE_COUNTER,
+ Unit.Bytes,
+ "Current memory buffer usage by the client",
+ null, // no topic
+ null, // no extra attributes
+ measurement -> {
+ if (memoryLimitController.isMemoryLimited()) {
+
measurement.record(memoryLimitController.currentUsage());
+ }
+ });
+
+ bufferLimitCounter = instrumentProvider.newObservableUpDownCounter(
+ BUFFER_LIMIT_COUNTER,
+ Unit.Bytes,
+ "Memory buffer limit configured for the client",
+ null, // no topic
+ null, // no extra attributes
+ measurement -> {
+ if (memoryLimitController.isMemoryLimited()) {
+
measurement.record(memoryLimitController.memoryLimit());
+ }
+ });
+ }
+
+ @Override
+ public void close() {
+ bufferUsageCounter.close();
+ bufferLimitCounter.close();
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ObservableUpDownCounter.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ObservableUpDownCounter.java
new file mode 100644
index 00000000000..c7ca2907be2
--- /dev/null
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ObservableUpDownCounter.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.impl.metrics;
+
+import static
org.apache.pulsar.client.impl.metrics.MetricsUtil.getDefaultAggregationLabels;
+import static
org.apache.pulsar.client.impl.metrics.MetricsUtil.getTopicAttributes;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.incubator.metrics.ExtendedLongUpDownCounterBuilder;
+import io.opentelemetry.api.metrics.LongUpDownCounterBuilder;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.ObservableLongMeasurement;
+import io.opentelemetry.api.metrics.ObservableLongUpDownCounter;
+import java.util.function.Consumer;
+
+public class ObservableUpDownCounter implements AutoCloseable {
+
+ private final ObservableLongUpDownCounter counter;
+
+ ObservableUpDownCounter(Meter meter, String name, Unit unit, String
description, String topic,
+ Attributes attributes,
Consumer<ObservableLongMeasurement> callback) {
+ LongUpDownCounterBuilder builder = meter.upDownCounterBuilder(name)
+ .setDescription(description)
+ .setUnit(unit.toString());
+
+ if (topic != null) {
+ if (builder instanceof ExtendedLongUpDownCounterBuilder) {
+ ExtendedLongUpDownCounterBuilder eb =
(ExtendedLongUpDownCounterBuilder) builder;
+
eb.setAttributesAdvice(getDefaultAggregationLabels(attributes));
+ }
+
+ attributes = getTopicAttributes(topic, attributes);
+ }
+
+ final Attributes finalAttributes = attributes;
+ this.counter = builder.buildWithCallback(measurement -> {
+ if (finalAttributes != null && !finalAttributes.isEmpty()) {
+ callback.accept(new AttributeWrappedMeasurement(measurement,
finalAttributes));
+ } else {
+ callback.accept(measurement);
+ }
+ });
+ }
+
+ @Override
+ public void close() {
+ counter.close();
+ }
+
+ private record AttributeWrappedMeasurement(ObservableLongMeasurement
delegate,
+ Attributes attributes)
implements ObservableLongMeasurement {
+
+ @Override
+ public void record(long value) {
+ delegate.record(value, attributes);
+ }
+
+ @Override
+ public void record(long value, Attributes attributes) {
+ delegate.record(value,
this.attributes.toBuilder().putAll(attributes).build());
+ }
+ }
+}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/metrics/MemoryBufferStatsTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/metrics/MemoryBufferStatsTest.java
new file mode 100644
index 00000000000..81745201be6
--- /dev/null
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/metrics/MemoryBufferStatsTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.impl.metrics;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.data.LongPointData;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
+import java.util.Collection;
+import org.apache.pulsar.client.impl.MemoryLimitController;
+import org.testng.annotations.Test;
+
+public class MemoryBufferStatsTest {
+
+ @Test
+ public void testMemoryBufferStatsWithMemoryUsage() {
+ long memoryLimit = 1024 * 1024; // 1MB
+ MemoryLimitController memoryLimitController = new
MemoryLimitController(memoryLimit);
+
+ InMemoryMetricReader metricReader = InMemoryMetricReader.create();
+ SdkMeterProvider meterProvider = SdkMeterProvider.builder()
+ .registerMetricReader(metricReader)
+ .build();
+ OpenTelemetry openTelemetry = OpenTelemetrySdk.builder()
+ .setMeterProvider(meterProvider)
+ .build();
+
+ InstrumentProvider instrumentProvider = new
InstrumentProvider(openTelemetry);
+
+ try (MemoryBufferStats memoryBufferStats = new
MemoryBufferStats(instrumentProvider, memoryLimitController)) {
+ assertNotNull(memoryBufferStats);
+
+ // Test initial state - no memory used
+ Collection<MetricData> metrics = metricReader.collectAllMetrics();
+ assertUsageMetric(metrics, 0);
+ assertLimitMetric(metrics, memoryLimit);
+
+ // Reserve some memory
+ long reservedMemory = 512 * 1024; // 512KB
+ memoryLimitController.forceReserveMemory(reservedMemory);
+
+ // Collect metrics and verify
+ metrics = metricReader.collectAllMetrics();
+ assertUsageMetric(metrics, reservedMemory);
+ assertLimitMetric(metrics, memoryLimit);
+
+ // Reserve more memory
+ long additionalMemory = 256 * 1024; // 256KB
+ memoryLimitController.forceReserveMemory(additionalMemory);
+
+ // Verify total usage
+ metrics = metricReader.collectAllMetrics();
+ assertUsageMetric(metrics, reservedMemory + additionalMemory);
+ assertLimitMetric(metrics, memoryLimit);
+
+ // Release some memory
+ memoryLimitController.releaseMemory(additionalMemory);
+
+ // Verify usage decreased
+ metrics = metricReader.collectAllMetrics();
+ assertUsageMetric(metrics, reservedMemory);
+ assertLimitMetric(metrics, memoryLimit);
+
+ // Release all memory
+ memoryLimitController.releaseMemory(reservedMemory);
+
+ // Verify back to zero
+ metrics = metricReader.collectAllMetrics();
+ assertUsageMetric(metrics, 0);
+ assertLimitMetric(metrics, memoryLimit);
+ }
+ }
+
+ @Test
+ public void testMemoryBufferStatsWithNoMemoryLimit() {
+ MemoryLimitController memoryLimitController = new
MemoryLimitController(0); // No limit
+
+ InMemoryMetricReader metricReader = InMemoryMetricReader.create();
+ SdkMeterProvider meterProvider = SdkMeterProvider.builder()
+ .registerMetricReader(metricReader)
+ .build();
+ OpenTelemetry openTelemetry = OpenTelemetrySdk.builder()
+ .setMeterProvider(meterProvider)
+ .build();
+
+ InstrumentProvider instrumentProvider = new
InstrumentProvider(openTelemetry);
+
+ // When memory limiting is disabled, MemoryBufferStats should not be
created at all
+ // This test verifies that the callback correctly checks for memory
limiting
+ try (MemoryBufferStats memoryBufferStats = new
MemoryBufferStats(instrumentProvider, memoryLimitController)) {
+ assertNotNull(memoryBufferStats);
+
+ // When memory limiting is disabled, no metrics should be recorded
+ Collection<MetricData> metrics = metricReader.collectAllMetrics();
+ assertTrue(metrics.isEmpty() || metrics.stream().noneMatch(metric
->
+
metric.getName().equals(MemoryBufferStats.BUFFER_USAGE_COUNTER)
+ ||
metric.getName().equals(MemoryBufferStats.BUFFER_LIMIT_COUNTER)));
+ }
+ }
+
+ private void assertUsageMetric(Collection<MetricData> metrics, long
expectedValue) {
+ MetricData usageMetric = metrics.stream()
+ .filter(metric ->
metric.getName().equals(MemoryBufferStats.BUFFER_USAGE_COUNTER))
+ .findFirst()
+ .orElse(null);
+
+ assertNotNull(usageMetric, "Usage metric should be present");
+
+ Collection<LongPointData> points =
usageMetric.getLongSumData().getPoints();
+ assertEquals(points.size(), 1, "Should have exactly one data point");
+
+ LongPointData point = points.iterator().next();
+ assertEquals(point.getValue(), expectedValue, "Usage metric value
should match expected");
+ }
+
+ private void assertLimitMetric(Collection<MetricData> metrics, long
expectedValue) {
+ MetricData limitMetric = metrics.stream()
+ .filter(metric ->
metric.getName().equals(MemoryBufferStats.BUFFER_LIMIT_COUNTER))
+ .findFirst()
+ .orElse(null);
+
+ assertNotNull(limitMetric, "Limit metric should be present");
+
+ Collection<LongPointData> points =
limitMetric.getLongSumData().getPoints();
+ assertEquals(points.size(), 1, "Should have exactly one data point");
+
+ LongPointData point = points.iterator().next();
+ assertEquals(point.getValue(), expectedValue, "Limit metric value
should match expected");
+ }
+}
\ No newline at end of file