This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new fe726db49c3 [feat][broker] PIP-264: Add managed ledger cache metrics 
(#22898)
fe726db49c3 is described below

commit fe726db49c32eb539b6eb0b83c8735e48f742a35
Author: Dragos Misca <dragosvic...@users.noreply.github.com>
AuthorDate: Tue Jun 25 22:49:07 2024 -0700

    [feat][broker] PIP-264: Add managed ledger cache metrics (#22898)
---
 managed-ledger/pom.xml                             |  12 ++
 .../mledger/ManagedLedgerFactoryMXBean.java        |  25 +++
 .../OpenTelemetryManagedLedgerCacheStats.java      | 172 +++++++++++++++++++++
 .../mledger/impl/ManagedLedgerFactoryImpl.java     |  20 ++-
 .../impl/ManagedLedgerFactoryMBeanImpl.java        |  25 +++
 .../impl/cache/PooledByteBufAllocatorStats.java    |  68 ++++++++
 .../pulsar/broker/ManagedLedgerClientFactory.java  |   8 +-
 .../org/apache/pulsar/broker/PulsarService.java    |   2 +-
 .../broker/stats/PulsarBrokerOpenTelemetry.java    |   3 +-
 .../stats/metrics/ManagedLedgerCacheMetrics.java   |  43 +-----
 .../broker/storage/ManagedLedgerStorage.java       |   9 +-
 .../OpenTelemetryManagedLedgerCacheStatsTest.java  | 127 +++++++++++++++
 .../broker/testcontext/PulsarTestContext.java      |   6 +-
 .../client/impl/SequenceIdWithErrorTest.java       |   3 +-
 .../org/apache/pulsar/opentelemetry/Constants.java |  28 ++++
 .../opentelemetry/OpenTelemetryAttributes.java     |  44 ++++++
 16 files changed, 544 insertions(+), 51 deletions(-)

diff --git a/managed-ledger/pom.xml b/managed-ledger/pom.xml
index d8b31220d51..60a4edab95b 100644
--- a/managed-ledger/pom.xml
+++ b/managed-ledger/pom.xml
@@ -71,6 +71,12 @@
       <version>${project.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-opentelemetry</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
     <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
@@ -120,6 +126,12 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>io.opentelemetry</groupId>
+      <artifactId>opentelemetry-sdk-testing</artifactId>
+      <scope>test</scope>
+    </dependency>
+
     <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryMXBean.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryMXBean.java
index 35c26c5dfdb..43e8196daa9 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryMXBean.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryMXBean.java
@@ -47,26 +47,51 @@ public interface ManagedLedgerFactoryMXBean {
      */
     double getCacheHitsRate();
 
+    /**
+     * Cumulative number of cache hits.
+     */
+    long getCacheHitsTotal();
+
     /**
      * Get the number of cache misses per second.
      */
     double getCacheMissesRate();
 
+    /**
+     * Cumulative number of cache misses.
+     */
+    long getCacheMissesTotal();
+
     /**
      * Get the amount of data is retrieved from the cache in byte/s.
      */
     double getCacheHitsThroughput();
 
+    /**
+     * Cumulative amount of data retrieved from the cache in bytes.
+     */
+    long getCacheHitsBytesTotal();
+
     /**
      * Get the amount of data is retrieved from the bookkeeper in byte/s.
      */
     double getCacheMissesThroughput();
 
+    /**
+     * Cumulative amount of data retrieved from the bookkeeper in bytes.
+     */
+    long getCacheMissesBytesTotal();
+
     /**
      * Get the number of cache evictions during the last minute.
      */
     long getNumberOfCacheEvictions();
 
+    /**
+     * Cumulative number of cache evictions.
+     */
+    long getNumberOfCacheEvictionsTotal();
+
     /**
      * Cumulative number of entries inserted into the cache.
      */
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OpenTelemetryManagedLedgerCacheStats.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OpenTelemetryManagedLedgerCacheStats.java
new file mode 100644
index 00000000000..13e7ed6ac67
--- /dev/null
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OpenTelemetryManagedLedgerCacheStats.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger;
+
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.metrics.BatchCallback;
+import io.opentelemetry.api.metrics.ObservableLongMeasurement;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.bookkeeper.mledger.impl.cache.PooledByteBufAllocatorStats;
+import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl;
+import org.apache.pulsar.opentelemetry.Constants;
+import 
org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.CacheEntryStatus;
+import 
org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.CacheOperationStatus;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.PoolArenaType;
+import 
org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.PoolChunkAllocationType;
+
+public class OpenTelemetryManagedLedgerCacheStats implements AutoCloseable {
+
+    // Replaces pulsar_ml_count
+    public static final String MANAGED_LEDGER_COUNTER = 
"pulsar.broker.managed_ledger.count";
+    private final ObservableLongMeasurement managedLedgerCounter;
+
+    // Replaces pulsar_ml_cache_evictions
+    public static final String CACHE_EVICTION_OPERATION_COUNTER = 
"pulsar.broker.managed_ledger.cache.eviction.count";
+    private final ObservableLongMeasurement cacheEvictionOperationCounter;
+
+    // Replaces 'pulsar_ml_cache_entries',
+    //          'pulsar_ml_cache_inserted_entries_total',
+    //          'pulsar_ml_cache_evicted_entries_total'
+    public static final String CACHE_ENTRY_COUNTER = 
"pulsar.broker.managed_ledger.cache.entry.count";
+    private final ObservableLongMeasurement cacheEntryCounter;
+
+    // Replaces pulsar_ml_cache_used_size
+    public static final String CACHE_SIZE_COUNTER = 
"pulsar.broker.managed_ledger.cache.entry.size";
+    private final ObservableLongMeasurement cacheSizeCounter;
+
+    // Replaces pulsar_ml_cache_hits_rate, pulsar_ml_cache_misses_rate
+    public static final String CACHE_OPERATION_COUNTER = 
"pulsar.broker.managed_ledger.cache.operation.count";
+    private final ObservableLongMeasurement cacheOperationCounter;
+
+    // Replaces pulsar_ml_cache_hits_throughput, 
pulsar_ml_cache_misses_throughput
+    public static final String CACHE_OPERATION_BYTES_COUNTER = 
"pulsar.broker.managed_ledger.cache.operation.size";
+    private final ObservableLongMeasurement cacheOperationBytesCounter;
+
+    // Replaces 'pulsar_ml_cache_pool_active_allocations',
+    //          'pulsar_ml_cache_pool_active_allocations_huge',
+    //          'pulsar_ml_cache_pool_active_allocations_normal',
+    //          'pulsar_ml_cache_pool_active_allocations_small'
+    public static final String CACHE_POOL_ACTIVE_ALLOCATION_COUNTER =
+            "pulsar.broker.managed_ledger.cache.pool.allocation.active.count";
+    private final ObservableLongMeasurement cachePoolActiveAllocationCounter;
+
+    // Replaces ['pulsar_ml_cache_pool_allocated', 'pulsar_ml_cache_pool_used']
+    public static final String CACHE_POOL_ACTIVE_ALLOCATION_SIZE_COUNTER =
+            "pulsar.broker.managed_ledger.cache.pool.allocation.size";
+    private final ObservableLongMeasurement 
cachePoolActiveAllocationSizeCounter;
+
+    private final BatchCallback batchCallback;
+
+    public OpenTelemetryManagedLedgerCacheStats(OpenTelemetry openTelemetry, 
ManagedLedgerFactoryImpl factory) {
+        var meter = 
openTelemetry.getMeter(Constants.BROKER_INSTRUMENTATION_SCOPE_NAME);
+
+        managedLedgerCounter = meter
+                .upDownCounterBuilder(MANAGED_LEDGER_COUNTER)
+                .setUnit("{managed_ledger}")
+                .setDescription("The total number of managed ledgers.")
+                .buildObserver();
+
+        cacheEvictionOperationCounter = meter
+                .counterBuilder(CACHE_EVICTION_OPERATION_COUNTER)
+                .setUnit("{eviction}")
+                .setDescription("The total number of cache eviction 
operations.")
+                .buildObserver();
+
+        cacheEntryCounter = meter
+                .upDownCounterBuilder(CACHE_ENTRY_COUNTER)
+                .setUnit("{entry}")
+                .setDescription("The number of entries in the entry cache.")
+                .buildObserver();
+
+        cacheSizeCounter = meter
+                .upDownCounterBuilder(CACHE_SIZE_COUNTER)
+                .setUnit("{By}")
+                .setDescription("The byte amount of entries stored in the 
entry cache.")
+                .buildObserver();
+
+        cacheOperationCounter = meter
+                .counterBuilder(CACHE_OPERATION_COUNTER)
+                .setUnit("{entry}")
+                .setDescription("The number of cache operations.")
+                .buildObserver();
+
+        cacheOperationBytesCounter = meter
+                .counterBuilder(CACHE_OPERATION_BYTES_COUNTER)
+                .setUnit("{By}")
+                .setDescription("The byte amount of data retrieved from cache 
operations.")
+                .buildObserver();
+
+        cachePoolActiveAllocationCounter = meter
+                .upDownCounterBuilder(CACHE_POOL_ACTIVE_ALLOCATION_COUNTER)
+                .setUnit("{allocation}")
+                .setDescription("The number of currently active allocations in 
the direct arena.")
+                .buildObserver();
+
+        cachePoolActiveAllocationSizeCounter = meter
+                
.upDownCounterBuilder(CACHE_POOL_ACTIVE_ALLOCATION_SIZE_COUNTER)
+                .setUnit("{By}")
+                .setDescription("The memory allocated in the direct arena.")
+                .buildObserver();
+
+
+        batchCallback = meter.batchCallback(() -> recordMetrics(factory),
+                managedLedgerCounter,
+                cacheEvictionOperationCounter,
+                cacheEntryCounter,
+                cacheSizeCounter,
+                cacheOperationCounter,
+                cacheOperationBytesCounter,
+                cachePoolActiveAllocationCounter,
+                cachePoolActiveAllocationSizeCounter);
+    }
+
+    @Override
+    public void close() {
+        batchCallback.close();
+    }
+
+    private void recordMetrics(ManagedLedgerFactoryImpl factory) {
+        var stats = factory.getCacheStats();
+
+        managedLedgerCounter.record(stats.getNumberOfManagedLedgers());
+        
cacheEvictionOperationCounter.record(stats.getNumberOfCacheEvictionsTotal());
+
+        var entriesOut = stats.getCacheEvictedEntriesCount();
+        var entriesIn = stats.getCacheInsertedEntriesCount();
+        var entriesActive = entriesIn - entriesOut;
+        cacheEntryCounter.record(entriesActive, 
CacheEntryStatus.ACTIVE.attributes);
+        cacheEntryCounter.record(entriesIn, 
CacheEntryStatus.INSERTED.attributes);
+        cacheEntryCounter.record(entriesOut, 
CacheEntryStatus.EVICTED.attributes);
+        cacheSizeCounter.record(stats.getCacheUsedSize());
+
+        cacheOperationCounter.record(stats.getCacheHitsTotal(), 
CacheOperationStatus.HIT.attributes);
+        cacheOperationBytesCounter.record(stats.getCacheHitsBytesTotal(), 
CacheOperationStatus.HIT.attributes);
+        cacheOperationCounter.record(stats.getCacheMissesTotal(), 
CacheOperationStatus.MISS.attributes);
+        cacheOperationBytesCounter.record(stats.getCacheMissesBytesTotal(), 
CacheOperationStatus.MISS.attributes);
+
+        var allocatorStats = new 
PooledByteBufAllocatorStats(RangeEntryCacheImpl.ALLOCATOR);
+        
cachePoolActiveAllocationCounter.record(allocatorStats.activeAllocationsSmall, 
PoolArenaType.SMALL.attributes);
+        
cachePoolActiveAllocationCounter.record(allocatorStats.activeAllocationsNormal,
+                PoolArenaType.NORMAL.attributes);
+        
cachePoolActiveAllocationCounter.record(allocatorStats.activeAllocationsHuge, 
PoolArenaType.HUGE.attributes);
+        
cachePoolActiveAllocationSizeCounter.record(allocatorStats.totalAllocated,
+                PoolChunkAllocationType.ALLOCATED.attributes);
+        cachePoolActiveAllocationSizeCounter.record(allocatorStats.totalUsed, 
PoolChunkAllocationType.USED.attributes);
+    }
+}
\ No newline at end of file
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 0b0f66d14c9..fc291b801c8 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -24,6 +24,7 @@ import static 
org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowabl
 import com.google.common.base.Predicates;
 import com.google.common.collect.Maps;
 import io.netty.util.concurrent.DefaultThreadFactory;
+import io.opentelemetry.api.OpenTelemetry;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -67,6 +68,7 @@ import 
org.apache.bookkeeper.mledger.ManagedLedgerInfo.LedgerInfo;
 import org.apache.bookkeeper.mledger.ManagedLedgerInfo.MessageRangeInfo;
 import org.apache.bookkeeper.mledger.ManagedLedgerInfo.PositionInfo;
 import org.apache.bookkeeper.mledger.MetadataCompressionConfig;
+import org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.ReadOnlyCursor;
 import 
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.ManagedLedgerInitializeLedgerCallback;
@@ -118,6 +120,8 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
     private volatile long cacheEvictionTimeThresholdNanos;
     private final MetadataStore metadataStore;
 
+    private final OpenTelemetryManagedLedgerCacheStats openTelemetryCacheStats;
+
     //indicate whether shutdown() is called.
     private volatile boolean closed;
 
@@ -149,7 +153,7 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
                                     ManagedLedgerFactoryConfig config)
             throws Exception {
         this(metadataStore, new DefaultBkFactory(bkClientConfiguration),
-                true /* isBookkeeperManaged */, config, 
NullStatsLogger.INSTANCE);
+                true /* isBookkeeperManaged */, config, 
NullStatsLogger.INSTANCE, OpenTelemetry.noop());
     }
 
     public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, 
BookKeeper bookKeeper)
@@ -168,21 +172,24 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
                                     ManagedLedgerFactoryConfig config)
             throws Exception {
         this(metadataStore, bookKeeperGroupFactory, false /* 
isBookkeeperManaged */,
-                config, NullStatsLogger.INSTANCE);
+                config, NullStatsLogger.INSTANCE, OpenTelemetry.noop());
     }
 
     public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
                                     
BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory,
-                                    ManagedLedgerFactoryConfig config, 
StatsLogger statsLogger)
+                                    ManagedLedgerFactoryConfig config, 
StatsLogger statsLogger,
+                                    OpenTelemetry openTelemetry)
             throws Exception {
         this(metadataStore, bookKeeperGroupFactory, false /* 
isBookkeeperManaged */,
-                config, statsLogger);
+                config, statsLogger, openTelemetry);
     }
 
     private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
                                      
BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory,
                                      boolean isBookkeeperManaged,
-                                     ManagedLedgerFactoryConfig config, 
StatsLogger statsLogger) throws Exception {
+                                     ManagedLedgerFactoryConfig config,
+                                     StatsLogger statsLogger,
+                                     OpenTelemetry openTelemetry) throws 
Exception {
         MetadataCompressionConfig compressionConfigForManagedLedgerInfo =
                 config.getCompressionConfigForManagedLedgerInfo();
         MetadataCompressionConfig compressionConfigForManagedCursorInfo =
@@ -220,6 +227,8 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
         closed = false;
 
         
metadataStore.registerSessionListener(this::handleMetadataStoreNotification);
+
+        openTelemetryCacheStats = new 
OpenTelemetryManagedLedgerCacheStats(openTelemetry, this);
     }
 
     static class DefaultBkFactory implements 
BookkeeperFactoryForCustomEnsemblePlacementPolicy {
@@ -611,6 +620,7 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
                     }));
                 }).thenAcceptAsync(__ -> {
                     //wait for tasks in scheduledExecutor executed.
+                    openTelemetryCacheStats.close();
                     scheduledExecutor.shutdownNow();
                     entryCacheManager.clear();
                 });
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryMBeanImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryMBeanImpl.java
index cf3d7142d61..a3038a0e7ff 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryMBeanImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryMBeanImpl.java
@@ -99,26 +99,51 @@ public class ManagedLedgerFactoryMBeanImpl implements 
ManagedLedgerFactoryMXBean
         return cacheHits.getRate();
     }
 
+    @Override
+    public long getCacheHitsTotal() {
+        return cacheHits.getTotalCount();
+    }
+
     @Override
     public double getCacheMissesRate() {
         return cacheMisses.getRate();
     }
 
+    @Override
+    public long getCacheMissesTotal() {
+        return cacheMisses.getTotalCount();
+    }
+
     @Override
     public double getCacheHitsThroughput() {
         return cacheHits.getValueRate();
     }
 
+    @Override
+    public long getCacheHitsBytesTotal() {
+        return cacheHits.getTotalValue();
+    }
+
     @Override
     public double getCacheMissesThroughput() {
         return cacheMisses.getValueRate();
     }
 
+    @Override
+    public long getCacheMissesBytesTotal() {
+        return cacheMisses.getTotalValue();
+    }
+
     @Override
     public long getNumberOfCacheEvictions() {
         return cacheEvictions.getCount();
     }
 
+    @Override
+    public long getNumberOfCacheEvictionsTotal() {
+        return cacheEvictions.getTotalCount();
+    }
+
     public long getCacheInsertedEntriesCount() {
         return insertedEntryCount.sum();
     }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PooledByteBufAllocatorStats.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PooledByteBufAllocatorStats.java
new file mode 100644
index 00000000000..4f6a18cb5d9
--- /dev/null
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PooledByteBufAllocatorStats.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl.cache;
+
+import io.netty.buffer.PooledByteBufAllocator;
+import lombok.Value;
+
+@Value
+public class PooledByteBufAllocatorStats {
+
+    public long activeAllocations;
+    public long activeAllocationsSmall;
+    public long activeAllocationsNormal;
+    public long activeAllocationsHuge;
+
+    public long totalAllocated;
+    public long totalUsed;
+
+    public PooledByteBufAllocatorStats(PooledByteBufAllocator allocator) {
+        long activeAllocations = 0;
+        long activeAllocationsSmall = 0;
+        long activeAllocationsNormal = 0;
+        long activeAllocationsHuge = 0;
+        long totalAllocated = 0;
+        long totalUsed = 0;
+
+        for (var arena : allocator.metric().directArenas()) {
+            activeAllocations += arena.numActiveAllocations();
+            activeAllocationsSmall += arena.numActiveSmallAllocations();
+            activeAllocationsNormal += arena.numActiveNormalAllocations();
+            activeAllocationsHuge += arena.numActiveHugeAllocations();
+
+            for (var list : arena.chunkLists()) {
+                for (var chunk : list) {
+                    int size = chunk.chunkSize();
+                    int used = size - chunk.freeBytes();
+
+                    totalAllocated += size;
+                    totalUsed += used;
+                }
+            }
+        }
+
+        this.activeAllocations = activeAllocations;
+        this.activeAllocationsSmall = activeAllocationsSmall;
+        this.activeAllocationsNormal = activeAllocationsNormal;
+        this.activeAllocationsHuge = activeAllocationsHuge;
+
+        this.totalAllocated = totalAllocated;
+        this.totalUsed = totalUsed;
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
index 6ed95f167a1..9bbc2857863 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
@@ -22,6 +22,7 @@ import com.github.benmanes.caffeine.cache.AsyncCache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.channel.EventLoopGroup;
+import io.opentelemetry.api.OpenTelemetry;
 import java.io.IOException;
 import java.util.Map;
 import java.util.Optional;
@@ -54,9 +55,11 @@ public class ManagedLedgerClientFactory implements 
ManagedLedgerStorage {
             bkEnsemblePolicyToBkClientMap = Caffeine.newBuilder().buildAsync();
     private StatsProvider statsProvider = new NullStatsProvider();
 
+    @Override
     public void initialize(ServiceConfiguration conf, MetadataStoreExtended 
metadataStore,
                            BookKeeperClientFactory bookkeeperProvider,
-                           EventLoopGroup eventLoopGroup) throws Exception {
+                           EventLoopGroup eventLoopGroup,
+                           OpenTelemetry openTelemetry) throws Exception {
         ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new 
ManagedLedgerFactoryConfig();
         
managedLedgerFactoryConfig.setMaxCacheSize(conf.getManagedLedgerCacheSizeMB() * 
1024L * 1024L);
         
managedLedgerFactoryConfig.setCacheEvictionWatermark(conf.getManagedLedgerCacheEvictionWatermark());
@@ -109,7 +112,8 @@ public class ManagedLedgerClientFactory implements 
ManagedLedgerStorage {
 
         try {
             this.managedLedgerFactory =
-                    new ManagedLedgerFactoryImpl(metadataStore, bkFactory, 
managedLedgerFactoryConfig, statsLogger);
+                    new ManagedLedgerFactoryImpl(metadataStore, bkFactory, 
managedLedgerFactoryConfig, statsLogger,
+                            openTelemetry);
         } catch (Exception e) {
             statsProvider.stop();
             defaultBkClient.close();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 617afc6e5d1..4fa773dace9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1069,7 +1069,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
     protected ManagedLedgerStorage newManagedLedgerClientFactory() throws 
Exception {
         return ManagedLedgerStorage.create(
                 config, localMetadataStore,
-                bkClientFactory, ioEventLoopGroup
+                bkClientFactory, ioEventLoopGroup, 
openTelemetry.getOpenTelemetryService().getOpenTelemetry()
         );
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java
index c1bcfadaf97..178da8b8498 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java
@@ -26,6 +26,7 @@ import java.util.function.Consumer;
 import lombok.Getter;
 import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.opentelemetry.Constants;
 import org.apache.pulsar.opentelemetry.OpenTelemetryService;
 
 public class PulsarBrokerOpenTelemetry implements Closeable {
@@ -46,7 +47,7 @@ public class PulsarBrokerOpenTelemetry implements Closeable {
                 .serviceVersion(PulsarVersion.getVersion())
                 .builderCustomizer(builderCustomizer)
                 .build();
-        meter = 
openTelemetryService.getOpenTelemetry().getMeter("org.apache.pulsar.broker");
+        meter = 
openTelemetryService.getOpenTelemetry().getMeter(Constants.BROKER_INSTRUMENTATION_SCOPE_NAME);
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java
index 890a37aa2d8..9eb4beb72fb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java
@@ -18,13 +18,10 @@
  */
 package org.apache.pulsar.broker.stats.metrics;
 
-import io.netty.buffer.PoolArenaMetric;
-import io.netty.buffer.PoolChunkListMetric;
-import io.netty.buffer.PoolChunkMetric;
-import io.netty.buffer.PooledByteBufAllocator;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactoryMXBean;
+import org.apache.bookkeeper.mledger.impl.cache.PooledByteBufAllocatorStats;
 import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.common.stats.Metrics;
@@ -57,37 +54,13 @@ public class ManagedLedgerCacheMetrics extends 
AbstractMetrics {
         m.put("brk_ml_cache_hits_throughput", 
mlCacheStats.getCacheHitsThroughput());
         m.put("brk_ml_cache_misses_throughput", 
mlCacheStats.getCacheMissesThroughput());
 
-        PooledByteBufAllocator allocator = RangeEntryCacheImpl.ALLOCATOR;
-        long activeAllocations = 0;
-        long activeAllocationsSmall = 0;
-        long activeAllocationsNormal = 0;
-        long activeAllocationsHuge = 0;
-        long totalAllocated = 0;
-        long totalUsed = 0;
-
-        for (PoolArenaMetric arena : allocator.metric().directArenas()) {
-            activeAllocations += arena.numActiveAllocations();
-            activeAllocationsSmall += arena.numActiveSmallAllocations();
-            activeAllocationsNormal += arena.numActiveNormalAllocations();
-            activeAllocationsHuge += arena.numActiveHugeAllocations();
-
-            for (PoolChunkListMetric list : arena.chunkLists()) {
-                for (PoolChunkMetric chunk : list) {
-                    int size = chunk.chunkSize();
-                    int used = size - chunk.freeBytes();
-
-                    totalAllocated += size;
-                    totalUsed += used;
-                }
-            }
-        }
-
-        m.put("brk_ml_cache_pool_allocated", totalAllocated);
-        m.put("brk_ml_cache_pool_used", totalUsed);
-        m.put("brk_ml_cache_pool_active_allocations", activeAllocations);
-        m.put("brk_ml_cache_pool_active_allocations_small", 
activeAllocationsSmall);
-        m.put("brk_ml_cache_pool_active_allocations_normal", 
activeAllocationsNormal);
-        m.put("brk_ml_cache_pool_active_allocations_huge", 
activeAllocationsHuge);
+        var allocatorStats = new 
PooledByteBufAllocatorStats(RangeEntryCacheImpl.ALLOCATOR);
+        m.put("brk_ml_cache_pool_allocated", allocatorStats.totalAllocated);
+        m.put("brk_ml_cache_pool_used", allocatorStats.totalUsed);
+        m.put("brk_ml_cache_pool_active_allocations", 
allocatorStats.activeAllocations);
+        m.put("brk_ml_cache_pool_active_allocations_small", 
allocatorStats.activeAllocationsSmall);
+        m.put("brk_ml_cache_pool_active_allocations_normal", 
allocatorStats.activeAllocationsNormal);
+        m.put("brk_ml_cache_pool_active_allocations_huge", 
allocatorStats.activeAllocationsHuge);
 
         metrics.clear();
         metrics.add(m);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java
index 0b5a102eed1..944d2badf75 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.storage;
 
 import io.netty.channel.EventLoopGroup;
+import io.opentelemetry.api.OpenTelemetry;
 import java.io.IOException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
@@ -47,7 +48,8 @@ public interface ManagedLedgerStorage extends AutoCloseable {
     void initialize(ServiceConfiguration conf,
                     MetadataStoreExtended metadataStore,
                     BookKeeperClientFactory bookkeeperProvider,
-                    EventLoopGroup eventLoopGroup) throws Exception;
+                    EventLoopGroup eventLoopGroup,
+                    OpenTelemetry openTelemetry) throws Exception;
 
     /**
      * Return the factory to create {@link ManagedLedgerFactory}.
@@ -87,11 +89,12 @@ public interface ManagedLedgerStorage extends AutoCloseable 
{
     static ManagedLedgerStorage create(ServiceConfiguration conf,
                                        MetadataStoreExtended metadataStore,
                                        BookKeeperClientFactory bkProvider,
-                                       EventLoopGroup eventLoopGroup) throws 
Exception {
+                                       EventLoopGroup eventLoopGroup,
+                                       OpenTelemetry openTelemetry) throws 
Exception {
         ManagedLedgerStorage storage =
                 
Reflections.createInstance(conf.getManagedLedgerStorageClassName(), 
ManagedLedgerStorage.class,
                         Thread.currentThread().getContextClassLoader());
-        storage.initialize(conf, metadataStore, bkProvider, eventLoopGroup);
+        storage.initialize(conf, metadataStore, bkProvider, eventLoopGroup, 
openTelemetry);
         return storage;
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryManagedLedgerCacheStatsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryManagedLedgerCacheStatsTest.java
new file mode 100644
index 00000000000..c3a4a2e054e
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryManagedLedgerCacheStatsTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats;
+
+import static 
org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_ENTRY_COUNTER;
+import static 
org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_EVICTION_OPERATION_COUNTER;
+import static 
org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_OPERATION_BYTES_COUNTER;
+import static 
org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_OPERATION_COUNTER;
+import static 
org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_POOL_ACTIVE_ALLOCATION_COUNTER;
+import static 
org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_POOL_ACTIVE_ALLOCATION_SIZE_COUNTER;
+import static 
org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_SIZE_COUNTER;
+import static 
org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.MANAGED_LEDGER_COUNTER;
+import static 
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
+import static org.assertj.core.api.Assertions.assertThat;
+import io.opentelemetry.api.common.Attributes;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import 
org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.CacheEntryStatus;
+import 
org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.CacheOperationStatus;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.PoolArenaType;
+import 
org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.PoolChunkAllocationType;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class OpenTelemetryManagedLedgerCacheStatsTest extends BrokerTestBase {
+
+    @BeforeMethod(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        super.baseSetup();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Override
+    protected void 
customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder builder) {
+        super.customizeMainPulsarTestContextBuilder(builder);
+        builder.enableOpenTelemetry(true);
+    }
+
+    @Test
+    public void testManagedLedgerCacheStats() throws Exception {
+        var topicName = 
BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/testManagedLedgerCacheStats");
+
+        @Cleanup
+        var producer = pulsarClient.newProducer().topic(topicName).create();
+
+        @Cleanup
+        var consumer1 = pulsarClient.newConsumer()
+                .topic(topicName)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscriptionName(BrokerTestUtil.newUniqueName("sub"))
+                .subscribe();
+
+        @Cleanup
+        var consumer2 = pulsarClient.newConsumer()
+                .topic(topicName)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscriptionName(BrokerTestUtil.newUniqueName("sub"))
+                .subscribe();
+
+        producer.send("test".getBytes());
+        consumer1.receive();
+
+        Awaitility.await().untilAsserted(() -> {
+            var metrics = 
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+            assertMetricLongSumValue(metrics, CACHE_ENTRY_COUNTER, 
CacheEntryStatus.ACTIVE.attributes,
+                    value -> assertThat(value).isNotNegative());
+            assertMetricLongSumValue(metrics, CACHE_ENTRY_COUNTER, 
CacheEntryStatus.INSERTED.attributes,
+                    value -> assertThat(value).isPositive());
+            assertMetricLongSumValue(metrics, CACHE_ENTRY_COUNTER, 
CacheEntryStatus.EVICTED.attributes,
+                    value -> assertThat(value).isPositive());
+            assertMetricLongSumValue(metrics, CACHE_SIZE_COUNTER, 
Attributes.empty(),
+                    value -> assertThat(value).isNotNegative());
+        });
+
+        var metrics = 
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+
+        assertMetricLongSumValue(metrics, MANAGED_LEDGER_COUNTER, 
Attributes.empty(), 2);
+        assertMetricLongSumValue(metrics, CACHE_EVICTION_OPERATION_COUNTER, 
Attributes.empty(), 0);
+
+        assertMetricLongSumValue(metrics, CACHE_OPERATION_COUNTER, 
CacheOperationStatus.HIT.attributes,
+                value -> assertThat(value).isPositive());
+        assertMetricLongSumValue(metrics, CACHE_OPERATION_BYTES_COUNTER, 
CacheOperationStatus.HIT.attributes,
+                value -> assertThat(value).isPositive());
+        assertMetricLongSumValue(metrics, CACHE_OPERATION_COUNTER, 
CacheOperationStatus.MISS.attributes,
+                value -> assertThat(value).isNotNegative());
+        assertMetricLongSumValue(metrics, CACHE_OPERATION_BYTES_COUNTER, 
CacheOperationStatus.MISS.attributes,
+                value -> assertThat(value).isNotNegative());
+
+        assertMetricLongSumValue(metrics, 
CACHE_POOL_ACTIVE_ALLOCATION_COUNTER, PoolArenaType.SMALL.attributes,
+                value -> assertThat(value).isNotNegative());
+        assertMetricLongSumValue(metrics, 
CACHE_POOL_ACTIVE_ALLOCATION_COUNTER, PoolArenaType.NORMAL.attributes,
+                value -> assertThat(value).isNotNegative());
+        assertMetricLongSumValue(metrics, 
CACHE_POOL_ACTIVE_ALLOCATION_COUNTER, PoolArenaType.HUGE.attributes,
+                value -> assertThat(value).isNotNegative());
+        assertMetricLongSumValue(metrics, 
CACHE_POOL_ACTIVE_ALLOCATION_SIZE_COUNTER,
+                PoolChunkAllocationType.ALLOCATED.attributes, value -> 
assertThat(value).isNotNegative());
+        assertMetricLongSumValue(metrics, 
CACHE_POOL_ACTIVE_ALLOCATION_SIZE_COUNTER,
+                PoolChunkAllocationType.USED.attributes, value -> 
assertThat(value).isNotNegative());
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
index 09cd4f7cb1a..3d79a17a90f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.testcontext;
 
 import com.google.common.util.concurrent.MoreExecutors;
 import io.netty.channel.EventLoopGroup;
+import io.opentelemetry.api.OpenTelemetry;
 import 
io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder;
 import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
 import java.io.IOException;
@@ -843,9 +844,8 @@ public class PulsarTestContext implements AutoCloseable {
 
             @Override
             public void initialize(ServiceConfiguration conf, 
MetadataStoreExtended metadataStore,
-                                   BookKeeperClientFactory bookkeeperProvider, 
EventLoopGroup eventLoopGroup)
-                    throws Exception {
-
+                                   BookKeeperClientFactory bookkeeperProvider, 
EventLoopGroup eventLoopGroup,
+                                   OpenTelemetry openTelemetry) {
             }
 
             @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java
index 7d330bb82ad..1395424b141 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl;
 import static org.testng.Assert.assertEquals;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
+import io.opentelemetry.api.OpenTelemetry;
 import java.util.Collections;
 import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.ManagedLedger;
@@ -60,7 +61,7 @@ public class SequenceIdWithErrorTest extends 
BkEnsemblesTestBase {
         EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
         ManagedLedgerClientFactory clientFactory = new 
ManagedLedgerClientFactory();
         clientFactory.initialize(pulsar.getConfiguration(), 
pulsar.getLocalMetadataStore(),
-                pulsar.getBookKeeperClientFactory(), eventLoopGroup);
+                pulsar.getBookKeeperClientFactory(), eventLoopGroup, 
OpenTelemetry.noop());
         ManagedLedgerFactory mlFactory = 
clientFactory.getManagedLedgerFactory();
         ManagedLedger ml = 
mlFactory.open(TopicName.get(topicName).getPersistenceNamingEncoding());
         ml.close();
diff --git 
a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/Constants.java
 
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/Constants.java
new file mode 100644
index 00000000000..6d61cafb5a0
--- /dev/null
+++ 
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/Constants.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.opentelemetry;
+
+/**
+ * Common OpenTelemetry constants to be used by Pulsar components.
+ */
+public interface Constants {
+
+    String BROKER_INSTRUMENTATION_SCOPE_NAME = "org.apache.pulsar.broker";
+
+}
diff --git 
a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
 
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
index 31e527f0286..b530b50ee59 100644
--- 
a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
+++ 
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
@@ -164,4 +164,48 @@ public interface OpenTelemetryAttributes {
         FAILURE;
         public final Attributes attributes = 
Attributes.of(PULSAR_CONNECTION_CREATE_STATUS, name().toLowerCase());
     }
+
+    /**
+     * The type of the pool arena.
+     */
+    AttributeKey<String> ML_POOL_ARENA_TYPE = 
AttributeKey.stringKey("pulsar.managed_ledger.pool.arena.type");
+    enum PoolArenaType {
+        SMALL,
+        NORMAL,
+        HUGE;
+        public final Attributes attributes = Attributes.of(ML_POOL_ARENA_TYPE, 
name().toLowerCase());
+    }
+
+    /**
+     * The type of the pool chunk allocation.
+     */
+    AttributeKey<String> ML_POOL_CHUNK_ALLOCATION_TYPE =
+            
AttributeKey.stringKey("pulsar.managed_ledger.pool.chunk.allocation.type");
+    enum PoolChunkAllocationType {
+        ALLOCATED,
+        USED;
+        public final Attributes attributes = 
Attributes.of(ML_POOL_CHUNK_ALLOCATION_TYPE, name().toLowerCase());
+    }
+
+    /**
+     * The status of the cache entry.
+     */
+    AttributeKey<String> ML_CACHE_ENTRY_STATUS = 
AttributeKey.stringKey("pulsar.managed_ledger.cache.entry.status");
+    enum CacheEntryStatus {
+        ACTIVE,
+        EVICTED,
+        INSERTED;
+        public final Attributes attributes = 
Attributes.of(ML_CACHE_ENTRY_STATUS, name().toLowerCase());
+    }
+
+    /**
+     * The result of the cache operation.
+     */
+    AttributeKey<String> ML_CACHE_OPERATION_STATUS =
+            
AttributeKey.stringKey("pulsar.managed_ledger.cache.operation.status");
+    enum CacheOperationStatus {
+        HIT,
+        MISS;
+        public final Attributes attributes = 
Attributes.of(ML_CACHE_OPERATION_STATUS, name().toLowerCase());
+    }
 }


Reply via email to