This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 8849484b557 [fix][broker]pulsar_ml_reads_inflight_bytes and
pulsar_ml_reads_available_inflight_bytes are 0 at the same time (#25105)
8849484b557 is described below
commit 8849484b5578ec08a7af9a8a4d9e2133ac7fd33c
Author: Penghui Li <[email protected]>
AuthorDate: Mon Dec 22 23:07:25 2025 -0800
[fix][broker]pulsar_ml_reads_inflight_bytes and
pulsar_ml_reads_available_inflight_bytes are 0 at the same time (#25105)
---
.../bookkeeper/mledger/impl/cache/InflightReadsLimiter.java | 7 +++++--
.../mledger/impl/cache/InflightReadsLimiterTest.java | 12 ++++++++++++
2 files changed, 17 insertions(+), 2 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java
index cda8e975544..616accf8b4f 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java
@@ -42,7 +42,8 @@ public class InflightReadsLimiter implements AutoCloseable {
@PulsarDeprecatedMetric(newMetricName =
INFLIGHT_READS_LIMITER_LIMIT_METRIC_NAME)
@Deprecated
- private static final Gauge PULSAR_ML_READS_BUFFER_SIZE = Gauge
+ @VisibleForTesting
+ static final Gauge PULSAR_ML_READS_BUFFER_SIZE = Gauge
.build()
.name("pulsar_ml_reads_inflight_bytes")
.help("Estimated number of bytes retained by data read from
storage or cache")
@@ -55,7 +56,8 @@ public class InflightReadsLimiter implements AutoCloseable {
@PulsarDeprecatedMetric(newMetricName =
INFLIGHT_READS_LIMITER_USAGE_METRIC_NAME)
@Deprecated
- private static final Gauge PULSAR_ML_READS_AVAILABLE_BUFFER_SIZE = Gauge
+ @VisibleForTesting
+ static final Gauge PULSAR_ML_READS_AVAILABLE_BUFFER_SIZE = Gauge
.build()
.name("pulsar_ml_reads_available_inflight_bytes")
.help("Available space for inflight data read from storage or
cache")
@@ -87,6 +89,7 @@ public class InflightReadsLimiter implements AutoCloseable {
if (maxReadsInFlightSize > 0) {
enabled = true;
this.queuedHandles = new ArrayDeque<>();
+ updateMetrics();
} else {
enabled = false;
this.queuedHandles = null;
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiterTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiterTest.java
index 7475b620f57..98c9e939cc6 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiterTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiterTest.java
@@ -538,6 +538,18 @@ public class InflightReadsLimiterTest {
.isEqualTo(maxReadsInFlightSize);
}
+ @Test
+ public void testPrometheusMetrics() throws Exception {
+ long maxReadsInFlightSize = 100;
+ @Cleanup
+ InflightReadsLimiter limiter = new
InflightReadsLimiter(maxReadsInFlightSize, ACQUIRE_QUEUE_SIZE,
+ ACQUIRE_TIMEOUT_MILLIS, mock(ScheduledExecutorService.class),
OpenTelemetry.noop());
+
+
Assertions.assertThat(limiter.PULSAR_ML_READS_BUFFER_SIZE.get()).isZero();
+
Assertions.assertThat(limiter.PULSAR_ML_READS_AVAILABLE_BUFFER_SIZE.get())
+ .isEqualTo(maxReadsInFlightSize);
+ }
+
private Pair<OpenTelemetrySdk, InMemoryMetricReader>
buildOpenTelemetryAndReader() {
var metricReader = InMemoryMetricReader.create();
var openTelemetry = AutoConfiguredOpenTelemetrySdk.builder()