This is an automated email from the ASF dual-hosted git repository.

heesung pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f4d1d05ee38 [feat][broker] PIP-264: Add OpenTelemetry managed ledger 
metrics (#22987)
f4d1d05ee38 is described below

commit f4d1d05ee385bd730cbb4fa09a287614a00400a3
Author: Dragos Misca <dragosvic...@users.noreply.github.com>
AuthorDate: Wed Jul 3 12:25:39 2024 -0700

    [feat][broker] PIP-264: Add OpenTelemetry managed ledger metrics (#22987)
---
 .../apache/bookkeeper/mledger/ManagedLedger.java   |   7 +
 .../mledger/ManagedLedgerAttributes.java           |  57 ++++++++
 .../bookkeeper/mledger/ManagedLedgerMXBean.java    |  35 +++++
 .../mledger/impl/ManagedLedgerFactoryImpl.java     |   3 +
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |   6 +
 .../mledger/impl/ManagedLedgerMBeanImpl.java       |  35 +++++
 .../impl/OpenTelemetryManagedLedgerStats.java      | 153 +++++++++++++++++++++
 .../broker/stats/ManagedLedgerMetricsTest.java     | 100 +++++++++++++-
 .../opentelemetry/OpenTelemetryAttributes.java     |  17 +++
 9 files changed, 406 insertions(+), 7 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index 955a0d78502..a9242d5cc65 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -696,4 +696,11 @@ public interface ManagedLedger {
      * Check if managed ledger should cache backlog reads.
      */
     void checkCursorsToCacheEntries();
+
+    /**
+     * Get managed ledger attributes.
+     */
+    default ManagedLedgerAttributes getManagedLedgerAttributes() {
+        return new ManagedLedgerAttributes(this);
+    }
 }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerAttributes.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerAttributes.java
new file mode 100644
index 00000000000..c3759a533a5
--- /dev/null
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerAttributes.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger;
+
+import io.opentelemetry.api.common.Attributes;
+import lombok.Getter;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
+import 
org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.ManagedLedgerOperationStatus;
+
+@Getter
+public class ManagedLedgerAttributes {
+
+    private final Attributes attributes;
+    private final Attributes attributesOperationSucceed;
+    private final Attributes attributesOperationFailure;
+
+    public ManagedLedgerAttributes(ManagedLedger ml) {
+        var mlName = ml.getName();
+        attributes = Attributes.of(
+                OpenTelemetryAttributes.ML_NAME, mlName,
+                OpenTelemetryAttributes.PULSAR_NAMESPACE, getNamespace(mlName)
+        );
+        attributesOperationSucceed = Attributes.builder()
+                .putAll(attributes)
+                .putAll(ManagedLedgerOperationStatus.SUCCESS.attributes)
+                .build();
+        attributesOperationFailure = Attributes.builder()
+                .putAll(attributes)
+                .putAll(ManagedLedgerOperationStatus.FAILURE.attributes)
+                .build();
+    }
+
+    private static String getNamespace(String mlName) {
+        try {
+            return 
TopicName.get(TopicName.fromPersistenceNamingEncoding(mlName)).getNamespace();
+        } catch (RuntimeException e) {
+            return null;
+        }
+    }
+}
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java
index 44345c430b7..1d978e23785 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java
@@ -60,11 +60,21 @@ public interface ManagedLedgerMXBean {
      */
     double getAddEntryBytesRate();
 
+    /**
+     * @return the total number of bytes written
+     */
+    long getAddEntryBytesTotal();
+
     /**
      * @return the bytes/s rate of messages added with replicas
      */
     double getAddEntryWithReplicasBytesRate();
 
+    /**
+     * @return the total number of bytes written, including replicas
+     */
+    long getAddEntryWithReplicasBytesTotal();
+
     /**
      * @return the msg/s rate of messages read
      */
@@ -75,11 +85,21 @@ public interface ManagedLedgerMXBean {
      */
     double getReadEntriesBytesRate();
 
+    /**
+     * @return the total number of bytes read
+     */
+    long getReadEntriesBytesTotal();
+
     /**
      * @return the rate of mark-delete ops/s
      */
     double getMarkDeleteRate();
 
+    /**
+     * @return the number of mark-delete ops
+     */
+    long getMarkDeleteTotal();
+
     /**
      * @return the number of addEntry requests that succeeded
      */
@@ -95,6 +115,11 @@ public interface ManagedLedgerMXBean {
      */
     long getAddEntryErrors();
 
+    /**
+     * @return the total number of addEntry requests that failed
+     */
+    long getAddEntryErrorsTotal();
+
     /**
      * @return the number of entries read from the managed ledger (from cache 
or BK)
      */
@@ -115,11 +140,21 @@ public interface ManagedLedgerMXBean {
      */
     long getReadEntriesErrors();
 
+    /**
+     * @return the total number of readEntries requests that failed
+     */
+    long getReadEntriesErrorsTotal();
+
     /**
      * @return the number of readEntries requests that cache miss Rate
      */
     double getReadEntriesOpsCacheMissesRate();
 
+    /**
+     * @return the total number of readEntries requests that cache miss
+     */
+    long getReadEntriesOpsCacheMissesTotal();
+
     // Entry size statistics
 
     double getEntrySizeAverage();
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index fc291b801c8..b1939f40e93 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -121,6 +121,7 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
     private final MetadataStore metadataStore;
 
     private final OpenTelemetryManagedLedgerCacheStats openTelemetryCacheStats;
+    private final OpenTelemetryManagedLedgerStats 
openTelemetryManagedLedgerStats;
 
     //indicate whether shutdown() is called.
     private volatile boolean closed;
@@ -229,6 +230,7 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
         
metadataStore.registerSessionListener(this::handleMetadataStoreNotification);
 
         openTelemetryCacheStats = new 
OpenTelemetryManagedLedgerCacheStats(openTelemetry, this);
+        openTelemetryManagedLedgerStats = new 
OpenTelemetryManagedLedgerStats(openTelemetry, this);
     }
 
     static class DefaultBkFactory implements 
BookkeeperFactoryForCustomEnsemblePlacementPolicy {
@@ -620,6 +622,7 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
                     }));
                 }).thenAcceptAsync(__ -> {
                     //wait for tasks in scheduledExecutor executed.
+                    openTelemetryManagedLedgerStats.close();
                     openTelemetryCacheStats.close();
                     scheduledExecutor.shutdownNow();
                     entryCacheManager.clear();
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 8d1919dd052..b7734906f75 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -96,6 +96,7 @@ import 
org.apache.bookkeeper.mledger.AsyncCallbacks.UpdatePropertiesCallback;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerAttributes;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.BadVersionException;
@@ -326,6 +327,9 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
      */
     final ConcurrentLinkedQueue<OpAddEntry> pendingAddEntries = new 
ConcurrentLinkedQueue<>();
 
+    @Getter
+    private final ManagedLedgerAttributes managedLedgerAttributes;
+
     /**
      * This variable is used for testing the tests.
      * ManagedLedgerTest#testManagedLedgerWithPlacementPolicyInCustomMetadata()
@@ -338,6 +342,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
             final String name) {
         this(factory, bookKeeper, store, config, scheduledExecutor, name, 
null);
     }
+
     public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper 
bookKeeper, MetaStore store,
             ManagedLedgerConfig config, OrderedScheduler scheduledExecutor,
             final String name, final Supplier<CompletableFuture<Boolean>> 
mlOwnershipChecker) {
@@ -373,6 +378,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         this.minBacklogCursorsForCaching = 
config.getMinimumBacklogCursorsForCaching();
         this.minBacklogEntriesForCaching = 
config.getMinimumBacklogEntriesForCaching();
         this.maxBacklogBetweenCursorsForCaching = 
config.getMaxBacklogBetweenCursorsForCaching();
+        this.managedLedgerAttributes = new ManagedLedgerAttributes(this);
     }
 
     synchronized void initialize(final ManagedLedgerInitializeLedgerCallback 
callback, final Object ctx) {
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
index 5e5161a29ca..86320f92924 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
@@ -210,11 +210,21 @@ public class ManagedLedgerMBeanImpl implements 
ManagedLedgerMXBean {
         return addEntryOps.getValueRate();
     }
 
+    @Override
+    public long getAddEntryBytesTotal() {
+        return addEntryOps.getTotalValue();
+    }
+
     @Override
     public double getAddEntryWithReplicasBytesRate() {
         return addEntryWithReplicasOps.getValueRate();
     }
 
+    @Override
+    public long getAddEntryWithReplicasBytesTotal() {
+        return addEntryWithReplicasOps.getTotalValue();
+    }
+
     @Override
     public double getReadEntriesRate() {
         return readEntriesOps.getRate();
@@ -225,6 +235,11 @@ public class ManagedLedgerMBeanImpl implements 
ManagedLedgerMXBean {
         return readEntriesOps.getValueRate();
     }
 
+    @Override
+    public long getReadEntriesBytesTotal() {
+        return readEntriesOps.getTotalValue();
+    }
+
     @Override
     public long getAddEntrySucceed() {
         return addEntryOps.getCount();
@@ -240,6 +255,11 @@ public class ManagedLedgerMBeanImpl implements 
ManagedLedgerMXBean {
         return addEntryOpsFailed.getCount();
     }
 
+    @Override
+    public long getAddEntryErrorsTotal() {
+        return addEntryOpsFailed.getTotalCount();
+    }
+
     @Override
     public long getReadEntriesSucceeded() {
         return readEntriesOps.getCount();
@@ -255,16 +275,31 @@ public class ManagedLedgerMBeanImpl implements 
ManagedLedgerMXBean {
         return readEntriesOpsFailed.getCount();
     }
 
+    @Override
+    public long getReadEntriesErrorsTotal() {
+        return readEntriesOpsFailed.getTotalCount();
+    }
+
     @Override
     public double getReadEntriesOpsCacheMissesRate() {
         return readEntriesOpsCacheMisses.getRate();
     }
 
+    @Override
+    public long getReadEntriesOpsCacheMissesTotal() {
+        return readEntriesOpsCacheMisses.getTotalCount();
+    }
+
     @Override
     public double getMarkDeleteRate() {
         return markDeleteOps.getRate();
     }
 
+    @Override
+    public long getMarkDeleteTotal() {
+        return markDeleteOps.getTotalCount();
+    }
+
     @Override
     public double getEntrySizeAverage() {
         return entryStats.getAvg();
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedLedgerStats.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedLedgerStats.java
new file mode 100644
index 00000000000..f7b9d91dff6
--- /dev/null
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedLedgerStats.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.metrics.BatchCallback;
+import io.opentelemetry.api.metrics.ObservableLongMeasurement;
+import org.apache.pulsar.opentelemetry.Constants;
+
+public class OpenTelemetryManagedLedgerStats implements AutoCloseable {
+
+    // Replaces pulsar_ml_AddEntryMessagesRate
+    public static final String ADD_ENTRY_COUNTER = 
"pulsar.broker.managed_ledger.message.outgoing.count";
+    private final ObservableLongMeasurement addEntryCounter;
+
+    // Replaces pulsar_ml_AddEntryBytesRate
+    public static final String BYTES_OUT_COUNTER = 
"pulsar.broker.managed_ledger.message.outgoing.logical.size";
+    private final ObservableLongMeasurement bytesOutCounter;
+
+    // Replaces pulsar_ml_AddEntryWithReplicasBytesRate
+    public static final String BYTES_OUT_WITH_REPLICAS_COUNTER =
+            "pulsar.broker.managed_ledger.message.outgoing.replicated.size";
+    private final ObservableLongMeasurement bytesOutWithReplicasCounter;
+
+    // Replaces pulsar_ml_NumberOfMessagesInBacklog
+    public static final String BACKLOG_COUNTER = 
"pulsar.broker.managed_ledger.backlog.count";
+    private final ObservableLongMeasurement backlogCounter;
+
+    // Replaces pulsar_ml_ReadEntriesRate
+    public static final String READ_ENTRY_COUNTER = 
"pulsar.broker.managed_ledger.message.incoming.count";
+    private final ObservableLongMeasurement readEntryCounter;
+
+    // Replaces pulsar_ml_ReadEntriesBytesRate
+    public static final String BYTES_IN_COUNTER = 
"pulsar.broker.managed_ledger.message.incoming.size";
+    private final ObservableLongMeasurement bytesInCounter;
+
+    // Replaces brk_ml_ReadEntriesOpsCacheMissesRate
+    public static final String READ_ENTRY_CACHE_MISS_COUNTER =
+            "pulsar.broker.managed_ledger.message.incoming.cache.miss.count";
+    private final ObservableLongMeasurement readEntryCacheMissCounter;
+
+    // Replaces pulsar_ml_MarkDeleteRate
+    public static final String MARK_DELETE_COUNTER = 
"pulsar.broker.managed_ledger.mark_delete.count";
+    private final ObservableLongMeasurement markDeleteCounter;
+
+    private final BatchCallback batchCallback;
+
+    public OpenTelemetryManagedLedgerStats(OpenTelemetry openTelemetry, 
ManagedLedgerFactoryImpl factory) {
+        var meter = 
openTelemetry.getMeter(Constants.BROKER_INSTRUMENTATION_SCOPE_NAME);
+
+        addEntryCounter = meter
+                .upDownCounterBuilder(ADD_ENTRY_COUNTER)
+                .setUnit("{operation}")
+                .setDescription("The number of write operations to this 
ledger.")
+                .buildObserver();
+
+        bytesOutCounter = meter
+                .counterBuilder(BYTES_OUT_COUNTER)
+                .setUnit("By")
+                .setDescription("The total number of messages bytes written to 
this ledger, excluding replicas.")
+                .buildObserver();
+
+        bytesOutWithReplicasCounter = meter
+                .counterBuilder(BYTES_OUT_WITH_REPLICAS_COUNTER)
+                .setUnit("By")
+                .setDescription("The total number of messages bytes written to 
this ledger, including replicas.")
+                .buildObserver();
+
+        backlogCounter = meter
+                .upDownCounterBuilder(BACKLOG_COUNTER)
+                .setUnit("{message}")
+                .setDescription("The number of messages in backlog for all 
consumers from this ledger.")
+                .buildObserver();
+
+        readEntryCounter = meter
+                .upDownCounterBuilder(READ_ENTRY_COUNTER)
+                .setUnit("{operation}")
+                .setDescription("The number of read operations from this 
ledger.")
+                .buildObserver();
+
+        bytesInCounter = meter
+                .counterBuilder(BYTES_IN_COUNTER)
+                .setUnit("By")
+                .setDescription("The total number of messages bytes read from 
this ledger.")
+                .buildObserver();
+
+        readEntryCacheMissCounter = meter
+                .upDownCounterBuilder(READ_ENTRY_CACHE_MISS_COUNTER)
+                .setUnit("{operation}")
+                .setDescription("The number of cache misses during read 
operations from this ledger.")
+                .buildObserver();
+
+        markDeleteCounter = meter
+                .counterBuilder(MARK_DELETE_COUNTER)
+                .setUnit("{operation}")
+                .setDescription("The total number of mark delete operations 
for this ledger.")
+                .buildObserver();
+
+        batchCallback = meter.batchCallback(() -> factory.getManagedLedgers()
+                        .values()
+                        .forEach(this::recordMetrics),
+                addEntryCounter,
+                bytesOutCounter,
+                bytesOutWithReplicasCounter,
+                backlogCounter,
+                readEntryCounter,
+                bytesInCounter,
+                readEntryCacheMissCounter,
+                markDeleteCounter);
+    }
+
+    @Override
+    public void close() {
+        batchCallback.close();
+    }
+
+    private void recordMetrics(ManagedLedgerImpl ml) {
+        var stats = ml.getMbean();
+        var ledgerAttributeSet = ml.getManagedLedgerAttributes();
+        var attributes = ledgerAttributeSet.getAttributes();
+        var attributesSucceed = 
ledgerAttributeSet.getAttributesOperationSucceed();
+        var attributesFailure = 
ledgerAttributeSet.getAttributesOperationFailure();
+
+        addEntryCounter.record(stats.getAddEntrySucceedTotal(), 
attributesSucceed);
+        addEntryCounter.record(stats.getAddEntryErrorsTotal(), 
attributesFailure);
+        bytesOutCounter.record(stats.getAddEntryBytesTotal(), attributes);
+        
bytesOutWithReplicasCounter.record(stats.getAddEntryWithReplicasBytesTotal(), 
attributes);
+
+        readEntryCounter.record(stats.getReadEntriesSucceededTotal(), 
attributesSucceed);
+        readEntryCounter.record(stats.getReadEntriesErrorsTotal(), 
attributesFailure);
+        bytesInCounter.record(stats.getReadEntriesBytesTotal(), attributes);
+
+        backlogCounter.record(stats.getNumberOfMessagesInBacklog(), 
attributes);
+        markDeleteCounter.record(stats.getMarkDeleteTotal(), attributes);
+        
readEntryCacheMissCounter.record(stats.getReadEntriesOpsCacheMissesTotal(), 
attributes);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
index bec73121e48..b9c0ab08e4e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
@@ -18,27 +18,38 @@
  */
 package org.apache.pulsar.broker.stats;
 
+import static 
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
 import static 
org.apache.pulsar.transaction.coordinator.impl.DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS;
+import static org.assertj.core.api.Assertions.assertThat;
+import com.google.common.collect.Sets;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.concurrent.DefaultThreadFactory;
+import io.opentelemetry.api.common.Attributes;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.concurrent.TimeUnit;
-import com.google.common.collect.Sets;
+import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
+import org.apache.bookkeeper.mledger.impl.OpenTelemetryManagedLedgerStats;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.broker.stats.metrics.ManagedLedgerMetrics;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
 import 
org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;
+import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -68,6 +79,12 @@ public class ManagedLedgerMetricsTest extends BrokerTestBase 
{
         super.internalCleanup();
     }
 
+    @Override
+    protected void 
customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder 
pulsarTestContextBuilder) {
+        super.customizeMainPulsarTestContextBuilder(pulsarTestContextBuilder);
+        pulsarTestContextBuilder.enableOpenTelemetry(true);
+    }
+
     @Test
     public void testManagedLedgerMetrics() throws Exception {
         ManagedLedgerMetrics metrics = new ManagedLedgerMetrics(pulsar);
@@ -76,15 +93,20 @@ public class ManagedLedgerMetricsTest extends 
BrokerTestBase {
         List<Metrics> list1 = metrics.generate();
         Assert.assertTrue(list1.isEmpty());
 
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1")
-                .create();
+        var topicName = "persistent://my-property/use/my-ns/my-topic1";
+        @Cleanup
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+
+        @Cleanup
+        var consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName("sub1").subscribe();
+
         for (int i = 0; i < 10; i++) {
             String message = "my-message-" + i;
             producer.send(message.getBytes());
         }
 
-        for (Entry<String, ManagedLedgerImpl> ledger : 
((ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory())
-                .getManagedLedgers().entrySet()) {
+        var managedLedgerFactory = (ManagedLedgerFactoryImpl) 
pulsar.getManagedLedgerFactory();
+        for (Entry<String, ManagedLedgerImpl> ledger : 
managedLedgerFactory.getManagedLedgers().entrySet()) {
             ManagedLedgerMBeanImpl stats = (ManagedLedgerMBeanImpl) 
ledger.getValue().getStats();
             stats.refreshStats(1, TimeUnit.SECONDS);
         }
@@ -96,14 +118,78 @@ public class ManagedLedgerMetricsTest extends 
BrokerTestBase {
             String message = "my-message-" + i;
             producer.send(message.getBytes());
         }
-        for (Entry<String, ManagedLedgerImpl> ledger : 
((ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory())
-                .getManagedLedgers().entrySet()) {
+        for (Entry<String, ManagedLedgerImpl> ledger : 
managedLedgerFactory.getManagedLedgers().entrySet()) {
             ManagedLedgerMBeanImpl stats = (ManagedLedgerMBeanImpl) 
ledger.getValue().getStats();
             stats.refreshStats(1, TimeUnit.SECONDS);
         }
         List<Metrics> list3 = metrics.generate();
         Assert.assertEquals(list3.get(0).getMetrics().get(addEntryRateKey), 
5.0D);
 
+        // Validate OpenTelemetry metrics.
+        var ledgers = managedLedgerFactory.getManagedLedgers();
+        var topicNameObj = TopicName.get(topicName);
+        var mlName = topicNameObj.getPersistenceNamingEncoding();
+        assertThat(ledgers).containsKey(mlName);
+        var ml = ledgers.get(mlName);
+        var attribCommon = Attributes.of(
+                OpenTelemetryAttributes.ML_NAME, mlName,
+                OpenTelemetryAttributes.PULSAR_NAMESPACE, 
topicNameObj.getNamespace()
+        );
+        var metricReader = pulsarTestContext.getOpenTelemetryMetricReader();
+
+        Awaitility.await().untilAsserted(() -> {
+            var otelMetrics = metricReader.collectAllMetrics();
+            assertMetricLongSumValue(otelMetrics, 
OpenTelemetryManagedLedgerStats.BACKLOG_COUNTER, attribCommon, 15);
+            assertMetricLongSumValue(otelMetrics, 
OpenTelemetryManagedLedgerStats.MARK_DELETE_COUNTER, attribCommon, 0);
+        });
+
+        for (int i = 0; i < 10; i++) {
+            var msg = consumer.receive(1, TimeUnit.SECONDS);
+            consumer.acknowledge(msg);
+        }
+
+        Awaitility.await().untilAsserted(() -> {
+            var otelMetrics = metricReader.collectAllMetrics();
+            assertMetricLongSumValue(otelMetrics, 
OpenTelemetryManagedLedgerStats.BACKLOG_COUNTER, attribCommon, 5);
+            assertMetricLongSumValue(otelMetrics, 
OpenTelemetryManagedLedgerStats.MARK_DELETE_COUNTER, attribCommon,
+                    value -> assertThat(value).isPositive());
+        });
+
+        Awaitility.await().untilAsserted(() -> {
+            @Cleanup
+            var cons = pulsarClient.newConsumer()
+                    .topic(topicName)
+                    .subscriptionName(BrokerTestUtil.newUniqueName("sub"))
+                    
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                    .subscribe();
+            cons.receive(1, TimeUnit.SECONDS);
+
+            var attribSucceed = Attributes.of(
+                    OpenTelemetryAttributes.ML_NAME, mlName,
+                    OpenTelemetryAttributes.PULSAR_NAMESPACE, 
topicNameObj.getNamespace(),
+                    OpenTelemetryAttributes.ML_OPERATION_STATUS, "success"
+            );
+            var attribFailed = Attributes.of(
+                    OpenTelemetryAttributes.ML_NAME, mlName,
+                    OpenTelemetryAttributes.PULSAR_NAMESPACE, 
topicNameObj.getNamespace(),
+                    OpenTelemetryAttributes.ML_OPERATION_STATUS, "failure"
+            );
+            var otelMetrics = metricReader.collectAllMetrics();
+            assertMetricLongSumValue(otelMetrics, 
OpenTelemetryManagedLedgerStats.ADD_ENTRY_COUNTER, attribSucceed, 15);
+            assertMetricLongSumValue(otelMetrics, 
OpenTelemetryManagedLedgerStats.ADD_ENTRY_COUNTER, attribFailed, 0);
+            assertMetricLongSumValue(otelMetrics, 
OpenTelemetryManagedLedgerStats.BYTES_OUT_COUNTER, attribCommon,
+                    value -> assertThat(value).isPositive());
+            assertMetricLongSumValue(otelMetrics, 
OpenTelemetryManagedLedgerStats.BYTES_OUT_WITH_REPLICAS_COUNTER,
+                    attribCommon, value -> assertThat(value).isPositive());
+
+            assertMetricLongSumValue(otelMetrics, 
OpenTelemetryManagedLedgerStats.READ_ENTRY_COUNTER, attribSucceed,
+                    value -> assertThat(value).isPositive());
+            assertMetricLongSumValue(otelMetrics, 
OpenTelemetryManagedLedgerStats.READ_ENTRY_COUNTER, attribFailed, 0);
+            assertMetricLongSumValue(otelMetrics, 
OpenTelemetryManagedLedgerStats.BYTES_IN_COUNTER, attribCommon,
+                    value -> assertThat(value).isPositive());
+            assertMetricLongSumValue(otelMetrics, 
OpenTelemetryManagedLedgerStats.READ_ENTRY_CACHE_MISS_COUNTER,
+                    attribCommon, value -> assertThat(value).isPositive());
+        });
     }
 
     @Test
diff --git 
a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
 
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
index f485e300926..24dd1be8509 100644
--- 
a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
+++ 
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
@@ -196,6 +196,23 @@ public interface OpenTelemetryAttributes {
         public final Attributes attributes = 
Attributes.of(PULSAR_CONNECTION_CREATE_STATUS, name().toLowerCase());
     }
 
+    // Managed Ledger Attributes
+
+    /**
+     * The name of the managed ledger.
+     */
+    AttributeKey<String> ML_NAME = 
AttributeKey.stringKey("pulsar.managed_ledger.name");
+
+    /**
+     * The status of the managed ledger operation.
+     */
+    AttributeKey<String> ML_OPERATION_STATUS = 
AttributeKey.stringKey("pulsar.managed_ledger.operation.status");
+    enum ManagedLedgerOperationStatus {
+        SUCCESS,
+        FAILURE;
+        public final Attributes attributes = 
Attributes.of(ML_OPERATION_STATUS, name().toLowerCase());
+    };
+
     /**
      * The type of the pool arena.
      */

Reply via email to