This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8351c079d8e [feat][broker] PIP-264: Add OpenTelemetry managed cursor 
metrics (#23000)
8351c079d8e is described below

commit 8351c079d8e8b162f964ed6a735edf76459070ec
Author: Dragos Misca <dragosvic...@users.noreply.github.com>
AuthorDate: Fri Jul 5 02:45:55 2024 -0700

    [feat][broker] PIP-264: Add OpenTelemetry managed cursor metrics (#23000)
---
 .../apache/bookkeeper/mledger/ManagedCursor.java   |   8 ++
 .../mledger/ManagedCursorAttributes.java           |  51 ++++++++
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  14 +++
 .../mledger/impl/ManagedLedgerFactoryImpl.java     |   3 +
 .../impl/OpenTelemetryManagedCursorStats.java      | 136 +++++++++++++++++++++
 .../broker/stats/ManagedCursorMetricsTest.java     |  98 +++++++++++++--
 .../opentelemetry/OpenTelemetryAttributes.java     |  23 ++++
 7 files changed, 321 insertions(+), 12 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index 4aa3226a4dc..f6345e7b9ec 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -877,4 +877,12 @@ public interface ManagedCursor {
         return false;
     }
 
+    /**
+     * Get the attributes associated with the cursor.
+     *
+     * @return the attributes associated with the cursor
+     */
+    default ManagedCursorAttributes getManagedCursorAttributes() {
+        return new ManagedCursorAttributes(this);
+    }
 }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursorAttributes.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursorAttributes.java
new file mode 100644
index 00000000000..6c06e68d75e
--- /dev/null
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursorAttributes.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger;
+
+import io.opentelemetry.api.common.Attributes;
+import lombok.Getter;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
+import 
org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.ManagedCursorOperationStatus;
+
+@Getter
+public class ManagedCursorAttributes {
+
+    private final Attributes attributes;
+    private final Attributes attributesOperationSucceed;
+    private final Attributes attributesOperationFailure;
+
+    public ManagedCursorAttributes(ManagedCursor cursor) {
+        var mlName = cursor.getManagedLedger().getName();
+        var topicName = 
TopicName.get(TopicName.fromPersistenceNamingEncoding(mlName));
+        attributes = Attributes.of(
+                OpenTelemetryAttributes.ML_CURSOR_NAME, cursor.getName(),
+                OpenTelemetryAttributes.ML_LEDGER_NAME, mlName,
+                OpenTelemetryAttributes.PULSAR_NAMESPACE, 
topicName.getNamespace()
+        );
+        attributesOperationSucceed = Attributes.builder()
+                .putAll(attributes)
+                .putAll(ManagedCursorOperationStatus.SUCCESS.attributes)
+                .build();
+        attributesOperationFailure = Attributes.builder()
+                .putAll(attributes)
+                .putAll(ManagedCursorOperationStatus.FAILURE.attributes)
+                .build();
+    }
+}
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 98ba722ba1c..4ef9678f3e1 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -77,6 +77,7 @@ import 
org.apache.bookkeeper.mledger.AsyncCallbacks.ScanCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.SkipEntriesCallback;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedCursorAttributes;
 import org.apache.bookkeeper.mledger.ManagedCursorMXBean;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
@@ -286,6 +287,11 @@ public class ManagedCursorImpl implements ManagedCursor {
 
     protected final ManagedCursorMXBean mbean;
 
+    private volatile ManagedCursorAttributes managedCursorAttributes;
+    private static final AtomicReferenceFieldUpdater<ManagedCursorImpl, 
ManagedCursorAttributes> ATTRIBUTES_UPDATER =
+            AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, 
ManagedCursorAttributes.class,
+                    "managedCursorAttributes");
+
     @SuppressWarnings("checkstyle:javadoctype")
     public interface VoidCallback {
         void operationComplete();
@@ -3719,4 +3725,12 @@ public class ManagedCursorImpl implements ManagedCursor {
         }
         return newNonDurableCursor;
     }
+
+    @Override
+    public ManagedCursorAttributes getManagedCursorAttributes() {
+        if (managedCursorAttributes != null) {
+            return managedCursorAttributes;
+        }
+        return ATTRIBUTES_UPDATER.updateAndGet(this, old -> old != null ? old 
: new ManagedCursorAttributes(this));
+    }
 }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index b1939f40e93..00afb85a9d4 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -122,6 +122,7 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
 
     private final OpenTelemetryManagedLedgerCacheStats openTelemetryCacheStats;
     private final OpenTelemetryManagedLedgerStats 
openTelemetryManagedLedgerStats;
+    private final OpenTelemetryManagedCursorStats 
openTelemetryManagedCursorStats;
 
     //indicate whether shutdown() is called.
     private volatile boolean closed;
@@ -231,6 +232,7 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
 
         openTelemetryCacheStats = new 
OpenTelemetryManagedLedgerCacheStats(openTelemetry, this);
         openTelemetryManagedLedgerStats = new 
OpenTelemetryManagedLedgerStats(openTelemetry, this);
+        openTelemetryManagedCursorStats = new 
OpenTelemetryManagedCursorStats(openTelemetry, this);
     }
 
     static class DefaultBkFactory implements 
BookkeeperFactoryForCustomEnsemblePlacementPolicy {
@@ -622,6 +624,7 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
                     }));
                 }).thenAcceptAsync(__ -> {
                     //wait for tasks in scheduledExecutor executed.
+                    openTelemetryManagedCursorStats.close();
                     openTelemetryManagedLedgerStats.close();
                     openTelemetryCacheStats.close();
                     scheduledExecutor.shutdownNow();
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedCursorStats.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedCursorStats.java
new file mode 100644
index 00000000000..93a749d4aef
--- /dev/null
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedCursorStats.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import com.google.common.collect.Streams;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.metrics.BatchCallback;
+import io.opentelemetry.api.metrics.ObservableLongMeasurement;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.pulsar.opentelemetry.Constants;
+
+public class OpenTelemetryManagedCursorStats implements AutoCloseable {
+
+    // Replaces ['pulsar_ml_cursor_persistLedgerSucceed', 
'pulsar_ml_cursor_persistLedgerErrors']
+    public static final String PERSIST_OPERATION_COUNTER = 
"pulsar.broker.managed_ledger.persist.operation.count";
+    private final ObservableLongMeasurement persistOperationCounter;
+
+    // Replaces ['pulsar_ml_cursor_persistZookeeperSucceed', 
'pulsar_ml_cursor_persistZookeeperErrors']
+    public static final String PERSIST_OPERATION_METADATA_STORE_COUNTER =
+            "pulsar.broker.managed_ledger.persist.mds.operation.count";
+    private final ObservableLongMeasurement 
persistOperationMetadataStoreCounter;
+
+    // Replaces pulsar_ml_cursor_nonContiguousDeletedMessagesRange
+    public static final String NON_CONTIGUOUS_MESSAGE_RANGE_COUNTER =
+            "pulsar.broker.managed_ledger.message_range.count";
+    private final ObservableLongMeasurement nonContiguousMessageRangeCounter;
+
+    // Replaces pulsar_ml_cursor_writeLedgerSize
+    public static final String OUTGOING_BYTE_COUNTER = 
"pulsar.broker.managed_ledger.cursor.outgoing.size";
+    private final ObservableLongMeasurement outgoingByteCounter;
+
+    // Replaces pulsar_ml_cursor_writeLedgerLogicalSize
+    public static final String OUTGOING_BYTE_LOGICAL_COUNTER =
+            "pulsar.broker.managed_ledger.cursor.outgoing.logical.size";
+    private final ObservableLongMeasurement outgoingByteLogicalCounter;
+
+    // Replaces pulsar_ml_cursor_readLedgerSize
+    public static final String INCOMING_BYTE_COUNTER = 
"pulsar.broker.managed_ledger.cursor.incoming.size";
+    private final ObservableLongMeasurement incomingByteCounter;
+
+    private final BatchCallback batchCallback;
+
+    public OpenTelemetryManagedCursorStats(OpenTelemetry openTelemetry, 
ManagedLedgerFactoryImpl factory) {
+        var meter = 
openTelemetry.getMeter(Constants.BROKER_INSTRUMENTATION_SCOPE_NAME);
+
+        persistOperationCounter = meter
+                .counterBuilder(PERSIST_OPERATION_COUNTER)
+                .setUnit("{operation}")
+                .setDescription("The number of acknowledgment operations on 
the ledger.")
+                .buildObserver();
+
+        persistOperationMetadataStoreCounter = meter
+                .counterBuilder(PERSIST_OPERATION_METADATA_STORE_COUNTER)
+                .setUnit("{operation}")
+                .setDescription("The number of acknowledgment operations in 
the metadata store.")
+                .buildObserver();
+
+        nonContiguousMessageRangeCounter = meter
+                .upDownCounterBuilder(NON_CONTIGUOUS_MESSAGE_RANGE_COUNTER)
+                .setUnit("{range}")
+                .setDescription("The number of non-contiguous deleted messages 
ranges.")
+                .buildObserver();
+
+        outgoingByteCounter = meter
+                .counterBuilder(OUTGOING_BYTE_COUNTER)
+                .setUnit("{By}")
+                .setDescription("The total amount of data written to the 
ledger.")
+                .buildObserver();
+
+        outgoingByteLogicalCounter = meter
+                .counterBuilder(OUTGOING_BYTE_LOGICAL_COUNTER)
+                .setUnit("{By}")
+                .setDescription("The total amount of data written to the 
ledger, not including replicas.")
+                .buildObserver();
+
+        incomingByteCounter = meter
+                .counterBuilder(INCOMING_BYTE_COUNTER)
+                .setUnit("{By}")
+                .setDescription("The total amount of data read from the 
ledger.")
+                .buildObserver();
+
+        batchCallback = meter.batchCallback(() -> factory.getManagedLedgers()
+                        .values()
+                        .stream()
+                        .map(ManagedLedgerImpl::getCursors)
+                        .flatMap(Streams::stream)
+                        .forEach(this::recordMetrics),
+                persistOperationCounter,
+                persistOperationMetadataStoreCounter,
+                nonContiguousMessageRangeCounter,
+                outgoingByteCounter,
+                outgoingByteLogicalCounter,
+                incomingByteCounter);
+    }
+
+    @Override
+    public void close() {
+        batchCallback.close();
+    }
+
+    private void recordMetrics(ManagedCursor cursor) {
+        var stats = cursor.getStats();
+        var cursorAttributesSet = cursor.getManagedCursorAttributes();
+        var attributes = cursorAttributesSet.getAttributes();
+        var attributesSucceed = 
cursorAttributesSet.getAttributesOperationSucceed();
+        var attributesFailed = 
cursorAttributesSet.getAttributesOperationFailure();
+
+        persistOperationCounter.record(stats.getPersistLedgerSucceed(), 
attributesSucceed);
+        persistOperationCounter.record(stats.getPersistLedgerErrors(), 
attributesFailed);
+
+        
persistOperationMetadataStoreCounter.record(stats.getPersistZookeeperSucceed(), 
attributesSucceed);
+        
persistOperationMetadataStoreCounter.record(stats.getPersistZookeeperErrors(), 
attributesFailed);
+
+        
nonContiguousMessageRangeCounter.record(cursor.getTotalNonContiguousDeletedMessagesRange(),
 attributes);
+
+        outgoingByteCounter.record(stats.getWriteCursorLedgerSize(), 
attributes);
+        
outgoingByteLogicalCounter.record(stats.getWriteCursorLedgerLogicalSize(), 
attributes);
+        incomingByteCounter.record(stats.getReadCursorLedgerSize(), 
attributes);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
index baa4bea5701..8ddb5320588 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
@@ -18,20 +18,24 @@
  */
 package org.apache.pulsar.broker.stats;
 
+import static 
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
+import static org.assertj.core.api.Assertions.assertThat;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedCursorMXBean;
-import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.bookkeeper.mledger.ManagedCursorAttributes;
+import org.apache.bookkeeper.mledger.impl.OpenTelemetryManagedCursorStats;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageId;
@@ -80,6 +84,12 @@ public class ManagedCursorMetricsTest extends 
MockedPulsarServiceBaseTest {
         return PulsarTestClient.create(clientBuilder);
     }
 
+    @Override
+    protected void 
customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder 
pulsarTestContextBuilder) {
+        super.customizeMainPulsarTestContextBuilder(pulsarTestContextBuilder);
+        pulsarTestContextBuilder.enableOpenTelemetry(true);
+    }
+
     /***
      * This method has overridden these case:
      *    brk_ml_cursor_persistLedgerSucceed
@@ -115,10 +125,7 @@ public class ManagedCursorMetricsTest extends 
MockedPulsarServiceBaseTest {
                 .topic(topicName)
                 .enableBatching(false)
                 .create();
-        final PersistentSubscription persistentSubscription =
-                (PersistentSubscription) pulsar.getBrokerService()
-                        .getTopic(topicName, 
false).get().get().getSubscription(subName);
-        final ManagedCursorImpl managedCursor = (ManagedCursorImpl) 
persistentSubscription.getCursor();
+        var managedCursor = getManagedCursor(topicName, subName);
         ManagedCursorMXBean managedCursorMXBean = managedCursor.getStats();
         // Assert.
         metricsList = metrics.generate();
@@ -128,6 +135,19 @@ public class ManagedCursorMetricsTest extends 
MockedPulsarServiceBaseTest {
         
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperSucceed"),
 0L);
         
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperErrors"),
 0L);
         
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_nonContiguousDeletedMessagesRange"),
 0L);
+        // Validate OpenTelemetry metrics as well
+        var attributesSet = new ManagedCursorAttributes(managedCursor);
+        var otelMetrics = 
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+        assertMetricLongSumValue(otelMetrics, 
OpenTelemetryManagedCursorStats.PERSIST_OPERATION_COUNTER,
+                attributesSet.getAttributesOperationSucceed(), 0);
+        assertMetricLongSumValue(otelMetrics, 
OpenTelemetryManagedCursorStats.PERSIST_OPERATION_COUNTER,
+                attributesSet.getAttributesOperationFailure(), 0);
+        assertMetricLongSumValue(otelMetrics, 
OpenTelemetryManagedCursorStats.PERSIST_OPERATION_METADATA_STORE_COUNTER,
+                attributesSet.getAttributesOperationSucceed(), 0);
+        assertMetricLongSumValue(otelMetrics, 
OpenTelemetryManagedCursorStats.PERSIST_OPERATION_METADATA_STORE_COUNTER,
+                attributesSet.getAttributesOperationFailure(), 0);
+        assertMetricLongSumValue(otelMetrics, 
OpenTelemetryManagedCursorStats.NON_CONTIGUOUS_MESSAGE_RANGE_COUNTER,
+                attributesSet.getAttributes(), 0);
         /**
          * 1. Send many messages, and only ack half. After the cursor data is 
written to BK,
          *    verify "brk_ml_cursor_persistLedgerSucceed" and 
"brk_ml_cursor_nonContiguousDeletedMessagesRange".
@@ -156,6 +176,17 @@ public class ManagedCursorMetricsTest extends 
MockedPulsarServiceBaseTest {
         
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperErrors"),
 0L);
         
Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_nonContiguousDeletedMessagesRange"),
                 0L);
+        otelMetrics = 
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+        assertMetricLongSumValue(otelMetrics, 
OpenTelemetryManagedCursorStats.PERSIST_OPERATION_COUNTER,
+                attributesSet.getAttributesOperationSucceed(), value -> 
assertThat(value).isPositive());
+        assertMetricLongSumValue(otelMetrics, 
OpenTelemetryManagedCursorStats.PERSIST_OPERATION_COUNTER,
+                attributesSet.getAttributesOperationFailure(), 0);
+        assertMetricLongSumValue(otelMetrics, 
OpenTelemetryManagedCursorStats.PERSIST_OPERATION_METADATA_STORE_COUNTER,
+                attributesSet.getAttributesOperationSucceed(), 0);
+        assertMetricLongSumValue(otelMetrics, 
OpenTelemetryManagedCursorStats.PERSIST_OPERATION_METADATA_STORE_COUNTER,
+                attributesSet.getAttributesOperationFailure(), 0);
+        assertMetricLongSumValue(otelMetrics, 
OpenTelemetryManagedCursorStats.NON_CONTIGUOUS_MESSAGE_RANGE_COUNTER,
+                attributesSet.getAttributes(), value -> 
assertThat(value).isPositive());
         // Ack another half.
         for (MessageId messageId : keepsMessageIdList){
             consumer.acknowledge(messageId);
@@ -171,6 +202,17 @@ public class ManagedCursorMetricsTest extends 
MockedPulsarServiceBaseTest {
         
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperSucceed"),
 0L);
         
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperErrors"),
 0L);
         
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_nonContiguousDeletedMessagesRange"),
 0L);
+        otelMetrics = 
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+        assertMetricLongSumValue(otelMetrics, 
OpenTelemetryManagedCursorStats.PERSIST_OPERATION_COUNTER,
+                attributesSet.getAttributesOperationSucceed(), value -> 
assertThat(value).isPositive());
+        assertMetricLongSumValue(otelMetrics, 
OpenTelemetryManagedCursorStats.PERSIST_OPERATION_COUNTER,
+                attributesSet.getAttributesOperationFailure(), 0);
+        assertMetricLongSumValue(otelMetrics, 
OpenTelemetryManagedCursorStats.PERSIST_OPERATION_METADATA_STORE_COUNTER,
+                attributesSet.getAttributesOperationSucceed(), 0);
+        assertMetricLongSumValue(otelMetrics, 
OpenTelemetryManagedCursorStats.PERSIST_OPERATION_METADATA_STORE_COUNTER,
+                attributesSet.getAttributesOperationFailure(), 0);
+        assertMetricLongSumValue(otelMetrics, 
OpenTelemetryManagedCursorStats.NON_CONTIGUOUS_MESSAGE_RANGE_COUNTER,
+                attributesSet.getAttributes(), 0);
         /**
          * Make BK error, and send many message, then wait cursor persistent 
finish.
          * After the cursor data is written to ZK, verify 
"brk_ml_cursor_persistLedgerErrors" and
@@ -196,6 +238,17 @@ public class ManagedCursorMetricsTest extends 
MockedPulsarServiceBaseTest {
         
Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperSucceed"),
 0L);
         
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperErrors"),
 0L);
         
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_nonContiguousDeletedMessagesRange"),
 0L);
+        otelMetrics = 
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+        assertMetricLongSumValue(otelMetrics, 
OpenTelemetryManagedCursorStats.PERSIST_OPERATION_COUNTER,
+                attributesSet.getAttributesOperationSucceed(), value -> 
assertThat(value).isPositive());
+        assertMetricLongSumValue(otelMetrics, 
OpenTelemetryManagedCursorStats.PERSIST_OPERATION_COUNTER,
+                attributesSet.getAttributesOperationFailure(), value -> 
assertThat(value).isPositive());
+        assertMetricLongSumValue(otelMetrics, 
OpenTelemetryManagedCursorStats.PERSIST_OPERATION_METADATA_STORE_COUNTER,
+                attributesSet.getAttributesOperationSucceed(), value -> 
assertThat(value).isPositive());
+        assertMetricLongSumValue(otelMetrics, 
OpenTelemetryManagedCursorStats.PERSIST_OPERATION_METADATA_STORE_COUNTER,
+                attributesSet.getAttributesOperationFailure(), 0);
+        assertMetricLongSumValue(otelMetrics, 
OpenTelemetryManagedCursorStats.NON_CONTIGUOUS_MESSAGE_RANGE_COUNTER,
+                attributesSet.getAttributes(), 0);
         /**
          * TODO verify "brk_ml_cursor_persistZookeeperErrors".
          * This is not easy to implement, we can use {@link #mockZooKeeper} to 
fail ZK, but we cannot identify whether
@@ -210,13 +263,16 @@ public class ManagedCursorMetricsTest extends 
MockedPulsarServiceBaseTest {
         admin.topics().delete(topicName, true);
     }
 
-    private ManagedCursorMXBean getManagedCursorMXBean(String topicName, 
String subscriptionName)
-            throws ExecutionException, InterruptedException {
+    private ManagedCursorMXBean getManagedCursorMXBean(String topicName, 
String subscriptionName) throws Exception {
+        var managedCursor = getManagedCursor(topicName, subscriptionName);
+        return managedCursor.getStats();
+    }
+
+    private ManagedCursor getManagedCursor(String topicName, String 
subscriptionName) throws Exception {
         final PersistentSubscription persistentSubscription =
                 (PersistentSubscription) pulsar.getBrokerService()
                         .getTopic(topicName, 
false).get().get().getSubscription(subscriptionName);
-        final ManagedCursorImpl managedCursor = (ManagedCursorImpl) 
persistentSubscription.getCursor();
-        return managedCursor.getStats();
+        return persistentSubscription.getCursor();
     }
 
     @Test
@@ -265,9 +321,11 @@ public class ManagedCursorMetricsTest extends 
MockedPulsarServiceBaseTest {
             }
         }
 
+        var managedCursor1 = getManagedCursor(topicName, subName1);
+        var cursorMXBean1 = managedCursor1.getStats();
+        var managedCursor2 = getManagedCursor(topicName, subName2);
+        var cursorMXBean2 = managedCursor2.getStats();
         // Wait for persistent cursor meta.
-        ManagedCursorMXBean cursorMXBean1 = getManagedCursorMXBean(topicName, 
subName1);
-        ManagedCursorMXBean cursorMXBean2 = getManagedCursorMXBean(topicName, 
subName2);
         Awaitility.await().until(() -> 
cursorMXBean1.getWriteCursorLedgerLogicalSize() > 0);
         Awaitility.await().until(() -> 
cursorMXBean2.getWriteCursorLedgerLogicalSize() > 0);
 
@@ -281,6 +339,22 @@ public class ManagedCursorMetricsTest extends 
MockedPulsarServiceBaseTest {
         
Assert.assertNotEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_writeLedgerLogicalSize"),
 0L);
         
Assert.assertEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_readLedgerSize"),
 0L);
 
+        var otelMetrics = 
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+        var attributes1 = new 
ManagedCursorAttributes(managedCursor1).getAttributes();
+        assertMetricLongSumValue(otelMetrics, 
OpenTelemetryManagedCursorStats.OUTGOING_BYTE_COUNTER,
+                attributes1, value -> assertThat(value).isPositive());
+        assertMetricLongSumValue(otelMetrics, 
OpenTelemetryManagedCursorStats.OUTGOING_BYTE_LOGICAL_COUNTER,
+                attributes1, value -> assertThat(value).isPositive());
+        assertMetricLongSumValue(otelMetrics, 
OpenTelemetryManagedCursorStats.INCOMING_BYTE_COUNTER,
+                attributes1, 0);
+
+        var attributes2 = new 
ManagedCursorAttributes(managedCursor2).getAttributes();
+        assertMetricLongSumValue(otelMetrics, 
OpenTelemetryManagedCursorStats.OUTGOING_BYTE_COUNTER,
+                attributes2, value -> assertThat(value).isPositive());
+        assertMetricLongSumValue(otelMetrics, 
OpenTelemetryManagedCursorStats.OUTGOING_BYTE_LOGICAL_COUNTER,
+                attributes2, value -> assertThat(value).isPositive());
+        assertMetricLongSumValue(otelMetrics, 
OpenTelemetryManagedCursorStats.INCOMING_BYTE_COUNTER,
+                attributes2, 0);
         // cleanup.
         consumer.close();
         consumer2.close();
diff --git 
a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
 
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
index 24dd1be8509..41358a72c0d 100644
--- 
a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
+++ 
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
@@ -116,6 +116,7 @@ public interface OpenTelemetryAttributes {
      * The status of the Pulsar transaction.
      */
     AttributeKey<String> PULSAR_TRANSACTION_STATUS = 
AttributeKey.stringKey("pulsar.transaction.status");
+
     enum TransactionStatus {
         ABORTED,
         ACTIVE,
@@ -174,6 +175,28 @@ public interface OpenTelemetryAttributes {
         public final Attributes attributes = 
Attributes.of(PULSAR_BACKLOG_QUOTA_TYPE, name().toLowerCase());
     }
 
+    // Managed Ledger Attributes
+    /**
+     * The name of the managed ledger.
+     */
+    AttributeKey<String> ML_LEDGER_NAME = 
AttributeKey.stringKey("pulsar.managed_ledger.name");
+
+    /**
+     * The name of the managed cursor.
+     */
+    AttributeKey<String> ML_CURSOR_NAME = 
AttributeKey.stringKey("pulsar.managed_ledger.cursor.name");
+
+    /**
+     * The status of the managed cursor operation.
+     */
+    AttributeKey<String> ML_CURSOR_OPERATION_STATUS =
+            
AttributeKey.stringKey("pulsar.managed_ledger.cursor.operation.status");
+    enum ManagedCursorOperationStatus {
+        SUCCESS,
+        FAILURE;
+        public final Attributes attributes = 
Attributes.of(ML_CURSOR_OPERATION_STATUS, name().toLowerCase());
+    }
+
     /**
      * The name of the remote cluster for a Pulsar replicator.
      */

Reply via email to