This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f3cc6c5d27 [feat][broker] PIP-264: Add topic messaging metrics 
(#22467)
4f3cc6c5d27 is described below

commit 4f3cc6c5d277b334b3a6868f9fc641648cd952a3
Author: Dragos Misca <dragosvic...@users.noreply.github.com>
AuthorDate: Wed May 1 11:17:19 2024 -0700

    [feat][broker] PIP-264: Add topic messaging metrics (#22467)
---
 .../bookkeeper/mledger/ManagedLedgerMXBean.java    |  10 +
 .../mledger/impl/ManagedLedgerMBeanImpl.java       |  10 +
 .../mledger/impl/ManagedLedgerMBeanTest.java       |   8 +
 .../org/apache/pulsar/broker/PulsarService.java    |   8 +
 .../pulsar/broker/service/AbstractTopic.java       |   8 +
 .../broker/stats/OpenTelemetryTopicStats.java      | 490 +++++++++++++++++++++
 .../pulsar/broker/stats/prometheus/TopicStats.java |  17 +
 .../apache/pulsar/compaction/CompactionRecord.java |  12 +
 .../pulsar/broker/admin/AdminApiOffloadTest.java   |  31 +-
 .../broker/auth/MockedPulsarServiceBaseTest.java   |  10 +
 .../broker/service/BacklogQuotaManagerTest.java    |  59 ++-
 .../service/BrokerServiceThrottlingTest.java       |  16 +-
 .../service/persistent/DelayedDeliveryTest.java    |  21 +
 .../broker/stats/BrokerOpenTelemetryTestUtil.java  |  92 ++++
 .../broker/stats/OpenTelemetryTopicStatsTest.java  | 145 ++++++
 .../broker/testcontext/PulsarTestContext.java      |  10 +-
 .../pulsar/broker/transaction/TransactionTest.java |  34 +-
 .../broker/transaction/TransactionTestBase.java    |   1 +
 .../pulsar/client/api/BrokerServiceLookupTest.java |   3 +
 .../apache/pulsar/compaction/CompactorTest.java    |  45 +-
 .../opentelemetry/OpenTelemetryAttributes.java     |  40 ++
 21 files changed, 1039 insertions(+), 31 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java
index cb6d3700afe..44345c430b7 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java
@@ -85,6 +85,11 @@ public interface ManagedLedgerMXBean {
      */
     long getAddEntrySucceed();
 
+    /**
+     * @return the total number of addEntry requests that succeeded
+     */
+    long getAddEntrySucceedTotal();
+
     /**
      * @return the number of addEntry requests that failed
      */
@@ -100,6 +105,11 @@ public interface ManagedLedgerMXBean {
      */
     long getReadEntriesSucceeded();
 
+    /**
+     * @return the total number of readEntries requests that succeeded
+     */
+    long getReadEntriesSucceededTotal();
+
     /**
      * @return the number of readEntries requests that failed
      */
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
index 3935828ff3d..5e5161a29ca 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
@@ -230,6 +230,11 @@ public class ManagedLedgerMBeanImpl implements 
ManagedLedgerMXBean {
         return addEntryOps.getCount();
     }
 
+    @Override
+    public long getAddEntrySucceedTotal() {
+        return addEntryOps.getTotalCount();
+    }
+
     @Override
     public long getAddEntryErrors() {
         return addEntryOpsFailed.getCount();
@@ -240,6 +245,11 @@ public class ManagedLedgerMBeanImpl implements 
ManagedLedgerMXBean {
         return readEntriesOps.getCount();
     }
 
+    @Override
+    public long getReadEntriesSucceededTotal() {
+        return readEntriesOps.getTotalCount();
+    }
+
     @Override
     public long getReadEntriesErrors() {
         return readEntriesOpsFailed.getCount();
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanTest.java
index 2505db6ec55..5f6bd0b7ae6 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanTest.java
@@ -77,10 +77,12 @@ public class ManagedLedgerMBeanTest extends 
MockedBookKeeperTestCase {
         assertEquals(mbean.getAddEntryWithReplicasBytesRate(), 0.0);
         assertEquals(mbean.getAddEntryMessagesRate(), 0.0);
         assertEquals(mbean.getAddEntrySucceed(), 0);
+        assertEquals(mbean.getAddEntrySucceedTotal(), 0);
         assertEquals(mbean.getAddEntryErrors(), 0);
         assertEquals(mbean.getReadEntriesBytesRate(), 0.0);
         assertEquals(mbean.getReadEntriesRate(), 0.0);
         assertEquals(mbean.getReadEntriesSucceeded(), 0);
+        assertEquals(mbean.getReadEntriesSucceededTotal(), 0);
         assertEquals(mbean.getReadEntriesErrors(), 0);
         assertEquals(mbean.getMarkDeleteRate(), 0.0);
 
@@ -105,10 +107,12 @@ public class ManagedLedgerMBeanTest extends 
MockedBookKeeperTestCase {
         assertEquals(mbean.getAddEntryWithReplicasBytesRate(), 1600.0);
         assertEquals(mbean.getAddEntryMessagesRate(), 2.0);
         assertEquals(mbean.getAddEntrySucceed(), 2);
+        assertEquals(mbean.getAddEntrySucceedTotal(), 2);
         assertEquals(mbean.getAddEntryErrors(), 0);
         assertEquals(mbean.getReadEntriesBytesRate(), 0.0);
         assertEquals(mbean.getReadEntriesRate(), 0.0);
         assertEquals(mbean.getReadEntriesSucceeded(), 0);
+        assertEquals(mbean.getReadEntriesSucceededTotal(), 0);
         assertEquals(mbean.getReadEntriesErrors(), 0);
         assertTrue(mbean.getMarkDeleteRate() > 0.0);
 
@@ -134,10 +138,14 @@ public class ManagedLedgerMBeanTest extends 
MockedBookKeeperTestCase {
         assertEquals(mbean.getReadEntriesBytesRate(), 600.0);
         assertEquals(mbean.getReadEntriesRate(), 1.0);
         assertEquals(mbean.getReadEntriesSucceeded(), 1);
+        assertEquals(mbean.getReadEntriesSucceededTotal(), 1);
         assertEquals(mbean.getReadEntriesErrors(), 0);
         assertEquals(mbean.getNumberOfMessagesInBacklog(), 1);
         assertEquals(mbean.getMarkDeleteRate(), 0.0);
 
+        assertEquals(mbean.getAddEntrySucceed(), 0);
+        assertEquals(mbean.getAddEntrySucceedTotal(), 2);
+
         factory.shutdown();
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 96f3653ea99..8c910fb91e1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -109,6 +109,7 @@ import 
org.apache.pulsar.broker.service.TransactionBufferSnapshotServiceFactory;
 import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import org.apache.pulsar.broker.service.schema.SchemaStorageFactory;
 import org.apache.pulsar.broker.stats.MetricsGenerator;
+import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats;
 import org.apache.pulsar.broker.stats.PulsarBrokerOpenTelemetry;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
@@ -252,6 +253,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
 
     private MetricsGenerator metricsGenerator;
     private final PulsarBrokerOpenTelemetry openTelemetry;
+    private OpenTelemetryTopicStats openTelemetryTopicStats;
 
     private TransactionMetadataStoreService transactionMetadataStoreService;
     private TransactionBufferProvider transactionBufferProvider;
@@ -631,6 +633,10 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
             brokerClientSharedTimer.stop();
             monotonicSnapshotClock.close();
 
+            if (openTelemetryTopicStats != null) {
+                openTelemetryTopicStats.close();
+            }
+
             
asyncCloseFutures.add(EventLoopUtil.shutdownGracefully(ioEventLoopGroup));
 
 
@@ -771,6 +777,8 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                         config.getDefaultRetentionTimeInMinutes() * 60));
             }
 
+            openTelemetryTopicStats = new OpenTelemetryTopicStats(this);
+
             localMetadataSynchronizer = 
StringUtils.isNotBlank(config.getMetadataSyncEventTopic())
                     ? new PulsarMetadataEventSynchronizer(this, 
config.getMetadataSyncEventTopic())
                     : null;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 44a4ca42cea..b6ce43b060c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -133,6 +133,9 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
     private static final AtomicLongFieldUpdater<AbstractTopic> 
RATE_LIMITED_UPDATER =
             AtomicLongFieldUpdater.newUpdater(AbstractTopic.class, 
"publishRateLimitedTimes");
     protected volatile long publishRateLimitedTimes = 0L;
+    private static final AtomicLongFieldUpdater<AbstractTopic> 
TOTAL_RATE_LIMITED_UPDATER =
+            AtomicLongFieldUpdater.newUpdater(AbstractTopic.class, 
"totalPublishRateLimitedCounter");
+    protected volatile long totalPublishRateLimitedCounter = 0L;
 
     private static final AtomicIntegerFieldUpdater<AbstractTopic> 
USER_CREATED_PRODUCER_COUNTER_UPDATER =
             AtomicIntegerFieldUpdater.newUpdater(AbstractTopic.class, 
"userCreatedProducerCount");
@@ -897,6 +900,7 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
 
     @Override
     public long increasePublishLimitedTimes() {
+        TOTAL_RATE_LIMITED_UPDATER.incrementAndGet(this);
         return RATE_LIMITED_UPDATER.incrementAndGet(this);
     }
 
@@ -1185,6 +1189,10 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
                 + sumSubscriptions(AbstractSubscription::getBytesOutCounter);
     }
 
+    public long getTotalPublishRateLimitCounter() {
+        return TOTAL_RATE_LIMITED_UPDATER.get(this);
+    }
+
     private long sumSubscriptions(ToLongFunction<AbstractSubscription> 
toCounter) {
         return getSubscriptions().values().stream()
                 .map(AbstractSubscription.class::cast)
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryTopicStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryTopicStats.java
new file mode 100644
index 00000000000..1f0735c0ec1
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryTopicStats.java
@@ -0,0 +1,490 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.BatchCallback;
+import io.opentelemetry.api.metrics.ObservableDoubleMeasurement;
+import io.opentelemetry.api.metrics.ObservableLongMeasurement;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.service.AbstractTopic;
+import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.BacklogQuota;
+import org.apache.pulsar.common.stats.MetricsUtil;
+import org.apache.pulsar.compaction.CompactedTopicContext;
+import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
+
+public class OpenTelemetryTopicStats implements AutoCloseable {
+
+    // Replaces pulsar_subscriptions_count
+    public static final String SUBSCRIPTION_COUNTER = 
"pulsar.broker.topic.subscription.count";
+    private final ObservableLongMeasurement subscriptionCounter;
+
+    // Replaces pulsar_producers_count
+    public static final String PRODUCER_COUNTER = 
"pulsar.broker.topic.producer.count";
+    private final ObservableLongMeasurement producerCounter;
+
+    // Replaces pulsar_consumers_count
+    public static final String CONSUMER_COUNTER = 
"pulsar.broker.topic.consumer.count";
+    private final ObservableLongMeasurement consumerCounter;
+
+    // Replaces ['pulsar_rate_in', 'pulsar_in_messages_total']
+    public static final String MESSAGE_IN_COUNTER = 
"pulsar.broker.topic.message.incoming.count";
+    private final ObservableLongMeasurement messageInCounter;
+
+    // Replaces ['pulsar_rate_out', 'pulsar_out_messages_total']
+    public static final String MESSAGE_OUT_COUNTER = 
"pulsar.broker.topic.message.outgoing.count";
+    private final ObservableLongMeasurement messageOutCounter;
+
+    // Replaces ['pulsar_throughput_in', 'pulsar_in_bytes_total']
+    public static final String BYTES_IN_COUNTER = 
"pulsar.broker.topic.message.incoming.size";
+    private final ObservableLongMeasurement bytesInCounter;
+
+    // Replaces ['pulsar_throughput_out', 'pulsar_out_bytes_total']
+    public static final String BYTES_OUT_COUNTER = 
"pulsar.broker.topic.message.outgoing.size";
+    private final ObservableLongMeasurement bytesOutCounter;
+
+    // Replaces pulsar_publish_rate_limit_times
+    public static final String PUBLISH_RATE_LIMIT_HIT_COUNTER = 
"pulsar.broker.topic.publish.rate.limit.count";
+    private final ObservableLongMeasurement publishRateLimitHitCounter;
+
+    // Omitted: pulsar_consumer_msg_ack_rate
+
+    // Replaces pulsar_storage_size
+    public static final String STORAGE_COUNTER = 
"pulsar.broker.topic.storage.size";
+    private final ObservableLongMeasurement storageCounter;
+
+    // Replaces pulsar_storage_logical_size
+    public static final String STORAGE_LOGICAL_COUNTER = 
"pulsar.broker.topic.storage.logical.size";
+    private final ObservableLongMeasurement storageLogicalCounter;
+
+    // Replaces pulsar_storage_backlog_size
+    public static final String STORAGE_BACKLOG_COUNTER = 
"pulsar.broker.topic.storage.backlog.size";
+    private final ObservableLongMeasurement storageBacklogCounter;
+
+    // Replaces pulsar_storage_offloaded_size
+    public static final String STORAGE_OFFLOADED_COUNTER = 
"pulsar.broker.topic.storage.offloaded.size";
+    private final ObservableLongMeasurement storageOffloadedCounter;
+
+    // Replaces pulsar_storage_backlog_quota_limit
+    public static final String BACKLOG_QUOTA_LIMIT_SIZE = 
"pulsar.broker.topic.storage.backlog.quota.limit.size";
+    private final ObservableLongMeasurement backlogQuotaLimitSize;
+
+    // Replaces pulsar_storage_backlog_quota_limit_time
+    public static final String BACKLOG_QUOTA_LIMIT_TIME = 
"pulsar.broker.topic.storage.backlog.quota.limit.time";
+    private final ObservableLongMeasurement backlogQuotaLimitTime;
+
+    // Replaces pulsar_storage_backlog_quota_exceeded_evictions_total
+    public static final String BACKLOG_EVICTION_COUNTER = 
"pulsar.broker.topic.storage.backlog.quota.eviction.count";
+    private final ObservableLongMeasurement backlogEvictionCounter;
+
+    // Replaces pulsar_storage_backlog_age_seconds
+    public static final String BACKLOG_QUOTA_AGE = 
"pulsar.broker.topic.storage.backlog.age";
+    private final ObservableLongMeasurement backlogQuotaAge;
+
+    // Replaces pulsar_storage_write_rate
+    public static final String STORAGE_OUT_COUNTER = 
"pulsar.broker.topic.storage.entry.outgoing.count";
+    private final ObservableLongMeasurement storageOutCounter;
+
+    // Replaces pulsar_storage_read_rate
+    public static final String STORAGE_IN_COUNTER = 
"pulsar.broker.topic.storage.entry.incoming.count";
+    private final ObservableLongMeasurement storageInCounter;
+
+    // Omitted: pulsar_storage_write_latency_le_*
+
+    // Omitted: pulsar_entry_size_le_*
+
+    // Replaces pulsar_compaction_removed_event_count
+    public static final String COMPACTION_REMOVED_COUNTER = 
"pulsar.broker.topic.compaction.removed.message.count";
+    private final ObservableLongMeasurement compactionRemovedCounter;
+
+    // Replaces ['pulsar_compaction_succeed_count', 
'pulsar_compaction_failed_count']
+    public static final String COMPACTION_OPERATION_COUNTER = 
"pulsar.broker.topic.compaction.operation.count";
+    private final ObservableLongMeasurement compactionOperationCounter;
+
+    // Replaces pulsar_compaction_duration_time_in_mills
+    public static final String COMPACTION_DURATION_SECONDS = 
"pulsar.broker.topic.compaction.duration";
+    private final ObservableDoubleMeasurement compactionDurationSeconds;
+
+    // Replaces pulsar_compaction_read_throughput
+    public static final String COMPACTION_BYTES_IN_COUNTER = 
"pulsar.broker.topic.compaction.incoming.size";
+    private final ObservableLongMeasurement compactionBytesInCounter;
+
+    // Replaces pulsar_compaction_write_throughput
+    public static final String COMPACTION_BYTES_OUT_COUNTER = 
"pulsar.broker.topic.compaction.outgoing.size";
+    private final ObservableLongMeasurement compactionBytesOutCounter;
+
+    // Omitted: pulsar_compaction_latency_le_*
+
+    // Replaces pulsar_compaction_compacted_entries_count
+    public static final String COMPACTION_ENTRIES_COUNTER = 
"pulsar.broker.topic.compaction.compacted.entry.count";
+    private final ObservableLongMeasurement compactionEntriesCounter;
+
+    // Replaces pulsar_compaction_compacted_entries_size
+    public static final String COMPACTION_BYTES_COUNTER = 
"pulsar.broker.topic.compaction.compacted.entry.size";
+    private final ObservableLongMeasurement compactionBytesCounter;
+
+    // Replaces ['pulsar_txn_tb_active_total', 'pulsar_txn_tb_aborted_total', 
'pulsar_txn_tb_committed_total']
+    public static final String TRANSACTION_COUNTER = 
"pulsar.broker.topic.transaction.count";
+    private final ObservableLongMeasurement transactionCounter;
+
+    // Replaces pulsar_subscription_delayed
+    public static final String DELAYED_SUBSCRIPTION_COUNTER = 
"pulsar.broker.topic.subscription.delayed.entry.count";
+    private final ObservableLongMeasurement delayedSubscriptionCounter;
+
+    // Omitted: pulsar_delayed_message_index_size_bytes
+
+    // Omitted: pulsar_delayed_message_index_bucket_total
+
+    // Omitted: pulsar_delayed_message_index_loaded
+
+    // Omitted: pulsar_delayed_message_index_bucket_snapshot_size_bytes
+
+    // Omitted: pulsar_delayed_message_index_bucket_op_count
+
+    // Omitted: pulsar_delayed_message_index_bucket_op_latency_ms
+
+
+    private final BatchCallback batchCallback;
+    private final PulsarService pulsar;
+
+    public OpenTelemetryTopicStats(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        var meter = pulsar.getOpenTelemetry().getMeter();
+
+        subscriptionCounter = meter
+                .upDownCounterBuilder(SUBSCRIPTION_COUNTER)
+                .setUnit("{subscription}")
+                .setDescription("The number of Pulsar subscriptions of the 
topic served by this broker.")
+                .buildObserver();
+
+        producerCounter = meter
+                .upDownCounterBuilder(PRODUCER_COUNTER)
+                .setUnit("{producer}")
+                .setDescription("The number of active producers of the topic 
connected to this broker.")
+                .buildObserver();
+
+        consumerCounter = meter
+                .upDownCounterBuilder(CONSUMER_COUNTER)
+                .setUnit("{consumer}")
+                .setDescription("The number of active consumers of the topic 
connected to this broker.")
+                .buildObserver();
+
+        messageInCounter = meter
+                .counterBuilder(MESSAGE_IN_COUNTER)
+                .setUnit("{message}")
+                .setDescription("The total number of messages received for 
this topic.")
+                .buildObserver();
+
+        messageOutCounter = meter
+                .counterBuilder(MESSAGE_OUT_COUNTER)
+                .setUnit("{message}")
+                .setDescription("The total number of messages read from this 
topic.")
+                .buildObserver();
+
+        bytesInCounter = meter
+                .counterBuilder(BYTES_IN_COUNTER)
+                .setUnit("By")
+                .setDescription("The total number of messages bytes received 
for this topic.")
+                .buildObserver();
+
+        bytesOutCounter = meter
+                .counterBuilder(BYTES_OUT_COUNTER)
+                .setUnit("By")
+                .setDescription("The total number of messages bytes read from 
this topic.")
+                .buildObserver();
+
+        publishRateLimitHitCounter = meter
+                .counterBuilder(PUBLISH_RATE_LIMIT_HIT_COUNTER)
+                .setUnit("{event}")
+                .setDescription("The number of times the publish rate limit is 
triggered.")
+                .buildObserver();
+
+        storageCounter = meter
+                .upDownCounterBuilder(STORAGE_COUNTER)
+                .setUnit("By")
+                .setDescription(
+                        "The total storage size of the messages in this topic, 
including storage used by replicas.")
+                .buildObserver();
+
+        storageLogicalCounter = meter
+                .upDownCounterBuilder(STORAGE_LOGICAL_COUNTER)
+                .setUnit("By")
+                .setDescription("The storage size of the messages in this 
topic, excluding storage used by replicas.")
+                .buildObserver();
+
+        storageBacklogCounter = meter
+                .upDownCounterBuilder(STORAGE_BACKLOG_COUNTER)
+                .setUnit("By")
+                .setDescription("The size of the backlog storage for this 
topic.")
+                .buildObserver();
+
+        storageOffloadedCounter = meter
+                .upDownCounterBuilder(STORAGE_OFFLOADED_COUNTER)
+                .setUnit("By")
+                .setDescription("The total amount of the data in this topic 
offloaded to the tiered storage.")
+                .buildObserver();
+
+        backlogQuotaLimitSize = meter
+                .upDownCounterBuilder(BACKLOG_QUOTA_LIMIT_SIZE)
+                .setUnit("By")
+                .setDescription("The size based backlog quota limit for this 
topic.")
+                .buildObserver();
+
+        backlogQuotaLimitTime = meter
+                .gaugeBuilder(BACKLOG_QUOTA_LIMIT_TIME)
+                .ofLongs()
+                .setUnit("s")
+                .setDescription("The time based backlog quota limit for this 
topic.")
+                .buildObserver();
+
+        backlogEvictionCounter = meter
+                .counterBuilder(BACKLOG_EVICTION_COUNTER)
+                .setUnit("{eviction}")
+                .setDescription("The number of times a backlog was evicted 
since it has exceeded its quota.")
+                .buildObserver();
+
+        backlogQuotaAge = meter
+                .gaugeBuilder(BACKLOG_QUOTA_AGE)
+                .ofLongs()
+                .setUnit("s")
+                .setDescription("The age of the oldest unacknowledged message 
(backlog).")
+                .buildObserver();
+
+        storageOutCounter = meter
+                .counterBuilder(STORAGE_OUT_COUNTER)
+                .setUnit("{entry}")
+                .setDescription("The total message batches (entries) written 
to the storage for this topic.")
+                .buildObserver();
+
+        storageInCounter = meter
+                .counterBuilder(STORAGE_IN_COUNTER)
+                .setUnit("{entry}")
+                .setDescription("The total message batches (entries) read from 
the storage for this topic.")
+                .buildObserver();
+
+        compactionRemovedCounter = meter
+                .counterBuilder(COMPACTION_REMOVED_COUNTER)
+                .setUnit("{message}")
+                .setDescription("The total number of messages removed by 
compaction.")
+                .buildObserver();
+
+        compactionOperationCounter = meter
+                .counterBuilder(COMPACTION_OPERATION_COUNTER)
+                .setUnit("{operation}")
+                .setDescription("The total number of compaction operations.")
+                .buildObserver();
+
+        compactionDurationSeconds = meter
+                .upDownCounterBuilder(COMPACTION_DURATION_SECONDS)
+                .ofDoubles()
+                .setUnit("s")
+                .setDescription("The total time duration of compaction 
operations on the topic.")
+                .buildObserver();
+
+        compactionBytesInCounter = meter
+                .counterBuilder(COMPACTION_BYTES_IN_COUNTER)
+                .setUnit("By")
+                .setDescription("The total count of bytes read by the 
compaction process for this topic.")
+                .buildObserver();
+
+        compactionBytesOutCounter = meter
+                .counterBuilder(COMPACTION_BYTES_OUT_COUNTER)
+                .setUnit("By")
+                .setDescription("The total count of bytes written by the 
compaction process for this topic.")
+                .buildObserver();
+
+        compactionEntriesCounter = meter
+                .counterBuilder(COMPACTION_ENTRIES_COUNTER)
+                .setUnit("{entry}")
+                .setDescription("The total number of compacted entries.")
+                .buildObserver();
+
+        compactionBytesCounter = meter
+                .counterBuilder(COMPACTION_BYTES_COUNTER)
+                .setUnit("By")
+                .setDescription("The total size of the compacted entries.")
+                .buildObserver();
+
+        transactionCounter = meter
+                .upDownCounterBuilder(TRANSACTION_COUNTER)
+                .setUnit("{transaction}")
+                .setDescription("The number of transactions on this topic.")
+                .buildObserver();
+
+        delayedSubscriptionCounter = meter
+                .upDownCounterBuilder(DELAYED_SUBSCRIPTION_COUNTER)
+                .setUnit("{entry}")
+                .setDescription("The total number of message batches (entries) 
delayed for dispatching.")
+                .buildObserver();
+
+        batchCallback = meter.batchCallback(() -> pulsar.getBrokerService()
+                        .getTopics()
+                        .values()
+                        .stream()
+                        .map(topicFuture -> 
topicFuture.getNow(Optional.empty()))
+                        .forEach(topic -> 
topic.ifPresent(this::recordMetricsForTopic)),
+                subscriptionCounter,
+                producerCounter,
+                consumerCounter,
+                messageInCounter,
+                messageOutCounter,
+                bytesInCounter,
+                bytesOutCounter,
+                publishRateLimitHitCounter,
+                storageCounter,
+                storageLogicalCounter,
+                storageBacklogCounter,
+                storageOffloadedCounter,
+                backlogQuotaLimitSize,
+                backlogQuotaLimitTime,
+                backlogEvictionCounter,
+                backlogQuotaAge,
+                storageOutCounter,
+                storageInCounter,
+                compactionRemovedCounter,
+                compactionOperationCounter,
+                compactionDurationSeconds,
+                compactionBytesInCounter,
+                compactionBytesOutCounter,
+                compactionEntriesCounter,
+                compactionBytesCounter,
+                transactionCounter,
+                delayedSubscriptionCounter);
+    }
+
+    @Override
+    public void close() {
+        batchCallback.close();
+    }
+
+    private void recordMetricsForTopic(Topic topic) {
+        var topicName = TopicName.get(topic.getName());
+        var builder = Attributes.builder()
+                .put(OpenTelemetryAttributes.PULSAR_DOMAIN, 
topicName.getDomain().toString())
+                .put(OpenTelemetryAttributes.PULSAR_TENANT, 
topicName.getTenant())
+                .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, 
topicName.getNamespace())
+                .put(OpenTelemetryAttributes.PULSAR_TOPIC, 
topicName.getPartitionedTopicName());
+        if (topicName.isPartitioned()) {
+            builder.put(OpenTelemetryAttributes.PULSAR_PARTITION_INDEX, 
topicName.getPartitionIndex());
+        }
+        var attributes = builder.build();
+
+        if (topic instanceof AbstractTopic abstractTopic) {
+            
subscriptionCounter.record(abstractTopic.getSubscriptions().size(), attributes);
+            producerCounter.record(abstractTopic.getProducers().size(), 
attributes);
+            consumerCounter.record(abstractTopic.getNumberOfConsumers(), 
attributes);
+
+            messageInCounter.record(abstractTopic.getMsgInCounter(), 
attributes);
+            messageOutCounter.record(abstractTopic.getMsgOutCounter(), 
attributes);
+            bytesInCounter.record(abstractTopic.getBytesInCounter(), 
attributes);
+            bytesOutCounter.record(abstractTopic.getBytesOutCounter(), 
attributes);
+
+            
publishRateLimitHitCounter.record(abstractTopic.getTotalPublishRateLimitCounter(),
 attributes);
+
+            // Omitted: consumerMsgAckCounter
+        }
+
+        if (topic instanceof PersistentTopic persistentTopic) {
+            var managedLedger = persistentTopic.getManagedLedger();
+            var managedLedgerStats = 
persistentTopic.getManagedLedger().getStats();
+            storageCounter.record(managedLedgerStats.getStoredMessagesSize(), 
attributes);
+            
storageLogicalCounter.record(managedLedgerStats.getStoredMessagesLogicalSize(), 
attributes);
+            
storageBacklogCounter.record(managedLedger.getEstimatedBacklogSize(), 
attributes);
+            storageOffloadedCounter.record(managedLedger.getOffloadedSize(), 
attributes);
+            
storageInCounter.record(managedLedgerStats.getReadEntriesSucceededTotal(), 
attributes);
+            
storageOutCounter.record(managedLedgerStats.getAddEntrySucceedTotal(), 
attributes);
+
+            backlogQuotaLimitSize.record(
+                    
topic.getBacklogQuota(BacklogQuota.BacklogQuotaType.destination_storage).getLimitSize(),
+                    attributes);
+            backlogQuotaLimitTime.record(
+                    
topic.getBacklogQuota(BacklogQuota.BacklogQuotaType.message_age).getLimitTime(),
+                    attributes);
+            
backlogQuotaAge.record(topic.getBestEffortOldestUnacknowledgedMessageAgeSeconds(),
 attributes);
+            var backlogQuotaMetrics = 
persistentTopic.getPersistentTopicMetrics().getBacklogQuotaMetrics();
+            
backlogEvictionCounter.record(backlogQuotaMetrics.getSizeBasedBacklogQuotaExceededEvictionCount(),
+                    Attributes.builder()
+                            .putAll(attributes)
+                            
.put(OpenTelemetryAttributes.PULSAR_BACKLOG_QUOTA_TYPE, "size")
+                            .build());
+            
backlogEvictionCounter.record(backlogQuotaMetrics.getTimeBasedBacklogQuotaExceededEvictionCount(),
+                    Attributes.builder()
+                            .putAll(attributes)
+                            
.put(OpenTelemetryAttributes.PULSAR_BACKLOG_QUOTA_TYPE, "time")
+                            .build());
+
+            var txnBuffer = persistentTopic.getTransactionBuffer();
+            transactionCounter.record(txnBuffer.getOngoingTxnCount(), 
Attributes.builder()
+                    .putAll(attributes)
+                    .put(OpenTelemetryAttributes.PULSAR_TRANSACTION_STATUS, 
"active")
+                    .build());
+            transactionCounter.record(txnBuffer.getCommittedTxnCount(), 
Attributes.builder()
+                    .putAll(attributes)
+                    .put(OpenTelemetryAttributes.PULSAR_TRANSACTION_STATUS, 
"committed")
+                    .build());
+            transactionCounter.record(txnBuffer.getAbortedTxnCount(), 
Attributes.builder()
+                    .putAll(attributes)
+                    .put(OpenTelemetryAttributes.PULSAR_TRANSACTION_STATUS, 
"aborted")
+                    .build());
+
+            Optional.ofNullable(pulsar.getNullableCompactor())
+                    .map(Compactor::getStats)
+                    .flatMap(compactorMXBean -> 
compactorMXBean.getCompactionRecordForTopic(topic.getName()))
+                    .ifPresent(compactionRecord -> {
+                        
compactionRemovedCounter.record(compactionRecord.getCompactionRemovedEventCount(),
 attributes);
+                        
compactionOperationCounter.record(compactionRecord.getCompactionSucceedCount(),
+                                Attributes.builder()
+                                        .putAll(attributes)
+                                        
.put(OpenTelemetryAttributes.PULSAR_COMPACTION_STATUS, "success")
+                                        .build());
+                        
compactionOperationCounter.record(compactionRecord.getCompactionFailedCount(),
+                                Attributes.builder()
+                                        .putAll(attributes)
+                                        
.put(OpenTelemetryAttributes.PULSAR_COMPACTION_STATUS, "failure")
+                                        .build());
+                        
compactionDurationSeconds.record(MetricsUtil.convertToSeconds(
+                            
compactionRecord.getCompactionDurationTimeInMills(), TimeUnit.MILLISECONDS), 
attributes);
+                        
compactionBytesInCounter.record(compactionRecord.getCompactionReadBytes(), 
attributes);
+                        
compactionBytesOutCounter.record(compactionRecord.getCompactionWriteBytes(), 
attributes);
+
+                        
persistentTopic.getCompactedTopicContext().map(CompactedTopicContext::getLedger)
+                                .ifPresent(ledger -> {
+                                    
compactionEntriesCounter.record(ledger.getLastAddConfirmed() + 1, attributes);
+                                    
compactionBytesCounter.record(ledger.getLength(), attributes);
+                                });
+                    });
+
+            var delayedMessages = topic.getSubscriptions().values().stream()
+                    .map(Subscription::getDispatcher)
+                    .filter(Objects::nonNull)
+                    .mapToLong(Dispatcher::getNumberOfDelayedMessages)
+                    .sum();
+            delayedSubscriptionCounter.record(delayedMessages, attributes);
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index 27288291d29..e8ab7b095dc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -25,28 +25,45 @@ import java.util.Optional;
 import org.apache.bookkeeper.mledger.util.StatsBuckets;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats;
 import org.apache.pulsar.broker.stats.prometheus.metrics.PrometheusLabels;
 import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
 import org.apache.pulsar.common.policies.data.stats.TopicMetricBean;
 import org.apache.pulsar.compaction.CompactionRecord;
 import org.apache.pulsar.compaction.CompactorMXBean;
+import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric;
 
 class TopicStats {
+    @PulsarDeprecatedMetric(newMetricName = 
OpenTelemetryTopicStats.SUBSCRIPTION_COUNTER)
     int subscriptionsCount;
+    @PulsarDeprecatedMetric(newMetricName = 
OpenTelemetryTopicStats.PRODUCER_COUNTER)
     int producersCount;
+    @PulsarDeprecatedMetric(newMetricName = 
OpenTelemetryTopicStats.CONSUMER_COUNTER)
     int consumersCount;
+    @PulsarDeprecatedMetric(newMetricName = 
OpenTelemetryTopicStats.MESSAGE_IN_COUNTER)
     double rateIn;
+    @PulsarDeprecatedMetric(newMetricName = 
OpenTelemetryTopicStats.MESSAGE_OUT_COUNTER)
     double rateOut;
+    @PulsarDeprecatedMetric(newMetricName = 
OpenTelemetryTopicStats.BYTES_IN_COUNTER)
     double throughputIn;
+    @PulsarDeprecatedMetric(newMetricName = 
OpenTelemetryTopicStats.BYTES_OUT_COUNTER)
     double throughputOut;
+    @PulsarDeprecatedMetric(newMetricName = 
OpenTelemetryTopicStats.MESSAGE_IN_COUNTER)
     long msgInCounter;
+    @PulsarDeprecatedMetric(newMetricName = 
OpenTelemetryTopicStats.BYTES_IN_COUNTER)
     long bytesInCounter;
+    @PulsarDeprecatedMetric(newMetricName = 
OpenTelemetryTopicStats.MESSAGE_OUT_COUNTER)
     long msgOutCounter;
+    @PulsarDeprecatedMetric(newMetricName = 
OpenTelemetryTopicStats.BYTES_OUT_COUNTER)
     long bytesOutCounter;
+    @PulsarDeprecatedMetric // Can be derived from MESSAGE_IN_COUNTER and 
BYTES_IN_COUNTER
     double averageMsgSize;
 
+    @PulsarDeprecatedMetric(newMetricName = 
OpenTelemetryTopicStats.TRANSACTION_COUNTER)
     long ongoingTxnCount;
+    @PulsarDeprecatedMetric(newMetricName = 
OpenTelemetryTopicStats.TRANSACTION_COUNTER)
     long abortedTxnCount;
+    @PulsarDeprecatedMetric(newMetricName = 
OpenTelemetryTopicStats.TRANSACTION_COUNTER)
     long committedTxnCount;
 
     public long msgBacklog;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionRecord.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionRecord.java
index 09f9f9b00ab..1d2af6638c3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionRecord.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionRecord.java
@@ -45,6 +45,8 @@ public class CompactionRecord {
     private final LongAdder compactionSucceedCount = new LongAdder();
     private final LongAdder compactionFailedCount = new LongAdder();
     private final LongAdder compactionDurationTimeInMills = new LongAdder();
+    private final LongAdder compactionReadBytes = new LongAdder();
+    private final LongAdder compactionWriteBytes = new LongAdder();
     public final StatsBuckets writeLatencyStats = new 
StatsBuckets(WRITE_LATENCY_BUCKETS_USEC);
     public final Rate writeRate = new Rate();
     public final Rate readRate = new Rate();
@@ -83,10 +85,12 @@ public class CompactionRecord {
 
     public void addCompactionReadOp(long readableBytes) {
         readRate.recordEvent(readableBytes);
+        compactionReadBytes.add(readableBytes);
     }
 
     public void addCompactionWriteOp(long writeableBytes) {
         writeRate.recordEvent(writeableBytes);
+        compactionWriteBytes.add(writeableBytes);
     }
 
     public void addCompactionLatencyOp(long latency, TimeUnit unit) {
@@ -123,8 +127,16 @@ public class CompactionRecord {
         return readRate.getValueRate();
     }
 
+    public long getCompactionReadBytes() {
+        return compactionReadBytes.sum();
+    }
+
     public double getCompactionWriteThroughput() {
         writeRate.calculateRate();
         return writeRate.getValueRate();
     }
+
+    public long getCompactionWriteBytes() {
+        return compactionWriteBytes.sum();
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
index 95b0d48c69a..eac816bd810 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
@@ -37,6 +37,7 @@
  */
 package org.apache.pulsar.broker.admin;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -48,6 +49,7 @@ import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
+import io.opentelemetry.api.common.Attributes;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
@@ -59,6 +61,9 @@ import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.resources.NamespaceResources;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil;
+import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
 import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
 import org.apache.pulsar.client.api.MessageId;
@@ -71,6 +76,7 @@ import org.apache.pulsar.common.policies.data.OffloadPolicies;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
 import org.apache.pulsar.common.policies.data.OffloadedReadPriority;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -100,6 +106,12 @@ public class AdminApiOffloadTest extends 
MockedPulsarServiceBaseTest {
         admin.namespaces().createNamespace(myNamespace, Set.of("test"));
     }
 
+    @Override
+    protected void 
customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder 
pulsarTestContextBuilder) {
+        super.customizeMainPulsarTestContextBuilder(pulsarTestContextBuilder);
+        pulsarTestContextBuilder.enableOpenTelemetry(true);
+    }
+
     @AfterMethod(alwaysRun = true)
     @Override
     public void cleanup() throws Exception {
@@ -125,8 +137,18 @@ public class AdminApiOffloadTest extends 
MockedPulsarServiceBaseTest {
         ManagedLedgerInfo info = 
pulsar.getManagedLedgerFactory().getManagedLedgerInfo(mlName);
         assertEquals(info.ledgers.size(), 2);
 
-        assertEquals(admin.topics().offloadStatus(topicName).getStatus(),
-                            LongRunningProcessStatus.Status.NOT_RUN);
+        assertEquals(admin.topics().offloadStatus(topicName).getStatus(), 
LongRunningProcessStatus.Status.NOT_RUN);
+        var topicNameObject = TopicName.get(topicName);
+        var attributes = Attributes.builder()
+                .put(OpenTelemetryAttributes.PULSAR_DOMAIN, 
topicNameObject.getDomain().toString())
+                .put(OpenTelemetryAttributes.PULSAR_TENANT, 
topicNameObject.getTenant())
+                .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, 
topicNameObject.getNamespace())
+                .put(OpenTelemetryAttributes.PULSAR_TOPIC, 
topicNameObject.getPartitionedTopicName())
+                .build();
+        // Verify the respective metric is 0 before the offload begins.
+        var metrics = 
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+        BrokerOpenTelemetryTestUtil.assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.STORAGE_OFFLOADED_COUNTER,
+                attributes, actual -> assertThat(actual).isZero());
 
         admin.topics().triggerOffload(topicName, currentId);
 
@@ -164,6 +186,11 @@ public class AdminApiOffloadTest extends 
MockedPulsarServiceBaseTest {
         assertEquals(firstUnoffloadedMessage.getEntryId(), 0);
 
         verify(offloader, times(2)).offload(any(), any(), any());
+
+        // Verify the metrics have been updated.
+        metrics = 
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+        BrokerOpenTelemetryTestUtil.assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.STORAGE_OFFLOADED_COUNTER,
+                attributes, actual -> assertThat(actual).isPositive());
     }
 
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 0bf096fb5d7..10d56ce2245 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -19,8 +19,10 @@
 package org.apache.pulsar.broker.auth;
 
 import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithoutRecordingInvocations;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.testng.Assert.assertEquals;
 import com.google.common.collect.Sets;
+import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
@@ -739,5 +741,13 @@ public abstract class MockedPulsarServiceBaseTest extends 
TestRetrySupport {
         reconnectAllConnections((PulsarClientImpl) pulsarClient);
     }
 
+    protected void assertOtelMetricLongSumValue(String metricName, int value) {
+        
assertThat(pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics())
+                .anySatisfy(metric -> 
OpenTelemetryAssertions.assertThat(metric)
+                        .hasName(metricName)
+                        .hasLongSumSatisfying(
+                                sum -> sum.hasPointsSatisfying(point -> 
point.hasValue(value))));
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(MockedPulsarServiceBaseTest.class);
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index f30b7f12b01..6be7023b161 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -21,6 +21,8 @@ package org.apache.pulsar.broker.service;
 import static java.util.Map.entry;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongGaugeValue;
+import static 
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
 import static 
org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType.destination_storage;
 import static 
org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType.message_age;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -30,6 +32,8 @@ import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import com.google.common.collect.Sets;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
 import java.net.URL;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -45,10 +49,13 @@ import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil;
+import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient;
 import 
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metric;
 import 
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metrics;
@@ -70,6 +77,8 @@ import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.policies.data.TopicType;
 import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
+import org.apache.pulsar.functions.worker.WorkerConfig;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
 import org.awaitility.Awaitility;
 import org.slf4j.Logger;
@@ -94,6 +103,7 @@ public class BacklogQuotaManagerTest {
 
     LocalBookkeeperEnsemble bkEnsemble;
     PrometheusMetricsClient prometheusMetricsClient;
+    InMemoryMetricReader openTelemetryMetricReader;
 
     private static final int TIME_TO_CHECK_BACKLOG_QUOTA = 2;
     private static final int MAX_ENTRIES_PER_LEDGER = 5;
@@ -145,7 +155,9 @@ public class BacklogQuotaManagerTest {
             config.setTopicLevelPoliciesEnabled(true);
             config.setForceDeleteNamespaceAllowed(true);
 
-            pulsar = new PulsarService(config);
+            openTelemetryMetricReader = InMemoryMetricReader.create();
+            pulsar = new PulsarService(config, new WorkerConfig(), 
Optional.empty(), exitCode -> {
+            }, 
BrokerOpenTelemetryTestUtil.getOpenTelemetrySdkBuilderConsumer(openTelemetryMetricReader));
             pulsar.start();
 
             adminUrl = new URL("http://127.0.0.1"; + ":" + 
pulsar.getListenPortHTTP().get());
@@ -709,16 +721,17 @@ public class BacklogQuotaManagerTest {
     public void testConsumerBacklogEvictionSizeQuota() throws Exception {
         assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),
                 new HashMap<>());
+        var backlogSizeLimit = 10 * 1024;
         admin.namespaces().setBacklogQuota("prop/ns-quota",
                 BacklogQuota.builder()
-                        .limitSize(10 * 1024)
+                        .limitSize(backlogSizeLimit)
                         
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
                         .build());
         @Cleanup
         PulsarClient client = 
PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, SECONDS)
                 .build();
 
-        final String topic1 = "persistent://prop/ns-quota/topic2" + 
UUID.randomUUID();
+        final String topic1 = 
BrokerTestUtil.newUniqueName("persistent://prop/ns-quota/topic2");
         final String subName1 = "c1";
         final String subName2 = "c2";
         final int numMsgs = 20;
@@ -740,6 +753,21 @@ public class BacklogQuotaManagerTest {
         assertTrue(stats.getBacklogSize() < 10 * 1024, "Storage size is [" + 
stats.getStorageSize() + "]");
         assertThat(evictionCountMetric("prop/ns-quota", topic1, 
"size")).isEqualTo(1);
         assertThat(evictionCountMetric("size")).isEqualTo(1);
+
+        var attributes = Attributes.builder()
+                .put(OpenTelemetryAttributes.PULSAR_DOMAIN, "persistent")
+                .put(OpenTelemetryAttributes.PULSAR_TENANT, "prop")
+                .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, "prop/ns-quota")
+                .put(OpenTelemetryAttributes.PULSAR_TOPIC, topic1)
+                .build();
+        var metrics = openTelemetryMetricReader.collectAllMetrics();
+        assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.BACKLOG_QUOTA_LIMIT_SIZE, attributes,
+                backlogSizeLimit);
+        assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.BACKLOG_EVICTION_COUNTER, Attributes.builder()
+                        .putAll(attributes)
+                        
.put(OpenTelemetryAttributes.PULSAR_BACKLOG_QUOTA_TYPE, "size")
+                        .build(),
+                1);
     }
 
     @Test
@@ -812,16 +840,17 @@ public class BacklogQuotaManagerTest {
     public void testConsumerBacklogEvictionTimeQuota() throws Exception {
         assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),
                 new HashMap<>());
+        var backlogTimeLimit = TIME_TO_CHECK_BACKLOG_QUOTA;
         admin.namespaces().setBacklogQuota("prop/ns-quota",
                 BacklogQuota.builder()
-                        .limitTime(TIME_TO_CHECK_BACKLOG_QUOTA)
+                        .limitTime(backlogTimeLimit)
                         
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
                         .build(), message_age);
         @Cleanup
         PulsarClient client = 
PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, SECONDS)
                 .build();
 
-        final String topic1 = "persistent://prop/ns-quota/topic3" + 
UUID.randomUUID();
+        final String topic1 = 
BrokerTestUtil.newUniqueName("persistent://prop/ns-quota/topic3");
         final String subName1 = "c1";
         final String subName2 = "c2";
         final int numMsgs = 14;
@@ -844,7 +873,8 @@ public class BacklogQuotaManagerTest {
         ManagedLedgerImpl ml = (ManagedLedgerImpl) 
topic1Reference.getManagedLedger();
         Position slowConsumerReadPos = 
ml.getSlowestConsumer().getReadPosition();
 
-        Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA * 2) * 1000);
+        var delaySeconds = backlogTimeLimit * 2;
+        Thread.sleep(delaySeconds * 1000);
         rolloverStats();
 
         TopicStats stats2 = getTopicStats(topic1);
@@ -856,6 +886,23 @@ public class BacklogQuotaManagerTest {
         });
 
         assertEquals(ml.getSlowestConsumer().getReadPosition(), 
slowConsumerReadPos);
+
+        var attributes = Attributes.builder()
+                .put(OpenTelemetryAttributes.PULSAR_DOMAIN, "persistent")
+                .put(OpenTelemetryAttributes.PULSAR_TENANT, "prop")
+                .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, "prop/ns-quota")
+                .put(OpenTelemetryAttributes.PULSAR_TOPIC, topic1)
+                .build();
+        var metrics = openTelemetryMetricReader.collectAllMetrics();
+        assertMetricLongGaugeValue(metrics, 
OpenTelemetryTopicStats.BACKLOG_QUOTA_LIMIT_TIME, attributes,
+                backlogTimeLimit);
+        assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.BACKLOG_EVICTION_COUNTER, Attributes.builder()
+                        .putAll(attributes)
+                        
.put(OpenTelemetryAttributes.PULSAR_BACKLOG_QUOTA_TYPE, "time")
+                        .build(),
+                1);
+        assertMetricLongGaugeValue(metrics, 
OpenTelemetryTopicStats.BACKLOG_QUOTA_AGE, attributes,
+                value -> 
assertThat(value).isGreaterThanOrEqualTo(delaySeconds));
     }
 
     @Test(timeOut = 60000)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java
index 0d517c014b3..312bfe0fc8a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java
@@ -86,11 +86,11 @@ public class BrokerServiceThrottlingTest extends 
BrokerTestBase {
         var metricName = BrokerService.TOPIC_LOOKUP_LIMIT_METRIC_NAME;
         // Validate that the configuration has not been overridden.
         
assertThat(admin.brokers().getAllDynamicConfigurations()).doesNotContainKey(configName);
-        assertLongSumValue(metricName, 50_000);
+        assertOtelMetricLongSumValue(metricName, 50_000);
         
assertThat(lookupRequestSemaphore.get().availablePermits()).isNotEqualTo(0);
         admin.brokers().updateDynamicConfiguration(configName, 
Integer.toString(0));
         waitAtMost(1, TimeUnit.SECONDS).until(() -> 
lookupRequestSemaphore.get().availablePermits() == 0);
-        assertLongSumValue(metricName, 0);
+        assertOtelMetricLongSumValue(metricName, 0);
     }
 
     /**
@@ -104,19 +104,11 @@ public class BrokerServiceThrottlingTest extends 
BrokerTestBase {
         var metricName = BrokerService.TOPIC_LOAD_LIMIT_METRIC_NAME;
         // Validate that the configuration has not been overridden.
         
assertThat(admin.brokers().getAllDynamicConfigurations()).doesNotContainKey(configName);
-        assertLongSumValue(metricName, 5_000);
+        assertOtelMetricLongSumValue(metricName, 5_000);
         
assertThat(topicLoadRequestSemaphore.get().availablePermits()).isNotEqualTo(0);
         admin.brokers().updateDynamicConfiguration(configName, 
Integer.toString(0));
         waitAtMost(1, TimeUnit.SECONDS).until(() -> 
topicLoadRequestSemaphore.get().availablePermits() == 0);
-        assertLongSumValue(metricName, 0);
-    }
-
-    private void assertLongSumValue(String metricName, int value) {
-        
assertThat(pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics())
-                .anySatisfy(metric -> assertThat(metric)
-                        .hasName(metricName)
-                        .hasLongSumSatisfying(
-                                sum -> sum.hasPointsSatisfying(point -> 
point.hasValue(value))));
+        assertOtelMetricLongSumValue(metricName, 0);
     }
 
     /**
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
index ae7edde4496..3ca966d2108 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
@@ -24,6 +24,7 @@ import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
+import io.opentelemetry.api.common.Attributes;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -36,6 +37,9 @@ import lombok.Cleanup;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil;
+import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -45,6 +49,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -69,6 +74,12 @@ public class DelayedDeliveryTest extends 
ProducerConsumerBase {
         super.internalCleanup();
     }
 
+    @Override
+    protected void 
customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder 
pulsarTestContextBuilder) {
+        super.customizeMainPulsarTestContextBuilder(pulsarTestContextBuilder);
+        pulsarTestContextBuilder.enableOpenTelemetry(true);
+    }
+
     @Test
     public void testDelayedDelivery() throws Exception {
         String topic = BrokerTestUtil.newUniqueName("testNegativeAcks");
@@ -106,6 +117,16 @@ public class DelayedDeliveryTest extends 
ProducerConsumerBase {
         Message<String> msg = sharedConsumer.receive(100, 
TimeUnit.MILLISECONDS);
         assertNull(msg);
 
+        var attributes = Attributes.builder()
+                .put(OpenTelemetryAttributes.PULSAR_DOMAIN, "persistent")
+                .put(OpenTelemetryAttributes.PULSAR_TENANT, "public")
+                .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, 
"public/default")
+                .put(OpenTelemetryAttributes.PULSAR_TOPIC, 
"persistent://public/default/" + topic)
+                .build();
+        var metrics = 
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+        BrokerOpenTelemetryTestUtil.assertMetricLongSumValue(metrics,
+                OpenTelemetryTopicStats.DELAYED_SUBSCRIPTION_COUNTER, 
attributes, 10);
+
         for (int i = 0; i < 10; i++) {
             msg = failoverConsumer.receive(100, TimeUnit.MILLISECONDS);
             assertEquals(msg.getValue(), "msg-" + i);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerOpenTelemetryTestUtil.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerOpenTelemetryTestUtil.java
new file mode 100644
index 00000000000..cb61677ab95
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerOpenTelemetryTestUtil.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats;
+
+import static 
io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
+import io.opentelemetry.api.common.Attributes;
+import 
io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
+import java.util.Collection;
+import java.util.Map;
+import java.util.function.Consumer;
+import org.apache.pulsar.opentelemetry.OpenTelemetryService;
+
+public class BrokerOpenTelemetryTestUtil {
+    // Creates an OpenTelemetrySdkBuilder customizer for use in tests.
+    public static Consumer<AutoConfiguredOpenTelemetrySdkBuilder> 
getOpenTelemetrySdkBuilderConsumer(
+            InMemoryMetricReader reader) {
+        return sdkBuilder -> {
+            sdkBuilder.addMeterProviderCustomizer(
+                    (meterProviderBuilder, __) -> 
meterProviderBuilder.registerMetricReader(reader));
+            sdkBuilder.addPropertiesSupplier(
+                    () -> Map.of(OpenTelemetryService.OTEL_SDK_DISABLED_KEY, 
"false",
+                            "otel.java.enabled.resource.providers", "none"));
+        };
+    }
+
+    public static void assertMetricDoubleSumValue(Collection<MetricData> 
metrics, String metricName,
+                                                  Attributes attributes, 
Consumer<Double> valueConsumer) {
+        assertThat(metrics)
+                .anySatisfy(metric -> assertThat(metric)
+                        .hasName(metricName)
+                        .hasDoubleSumSatisfying(sum -> sum.satisfies(
+                                sumData -> 
assertThat(sumData.getPoints()).anySatisfy(
+                                        point -> {
+                                            
assertThat(point.getAttributes()).isEqualTo(attributes);
+                                            
valueConsumer.accept(point.getValue());
+                                        }))));
+    }
+
+    public static void assertMetricLongSumValue(Collection<MetricData> 
metrics, String metricName,
+                                                Attributes attributes, long 
expected) {
+        assertMetricLongSumValue(metrics, metricName, attributes, actual -> 
assertThat(actual).isEqualTo(expected));
+    }
+
+    public static void assertMetricLongSumValue(Collection<MetricData> 
metrics, String metricName,
+                                                Attributes attributes, 
Consumer<Long> valueConsumer) {
+        assertThat(metrics)
+                .anySatisfy(metric -> assertThat(metric)
+                        .hasName(metricName)
+                        .hasLongSumSatisfying(sum -> sum.satisfies(
+                                sumData -> 
assertThat(sumData.getPoints()).anySatisfy(
+                                        point -> {
+                                            
assertThat(point.getAttributes()).isEqualTo(attributes);
+                                            
valueConsumer.accept(point.getValue());
+                                        }))));
+    }
+
+    public static void assertMetricLongGaugeValue(Collection<MetricData> 
metrics, String metricName,
+                                                  Attributes attributes, long 
expected) {
+        assertMetricLongGaugeValue(metrics, metricName, attributes, actual -> 
assertThat(actual).isEqualTo(expected));
+    }
+
+    public static void assertMetricLongGaugeValue(Collection<MetricData> 
metrics, String metricName,
+                                                  Attributes attributes, 
Consumer<Long> valueConsumer) {
+        assertThat(metrics)
+                .anySatisfy(metric -> assertThat(metric)
+                        .hasName(metricName)
+                        .hasLongGaugeSatisfying(gauge -> gauge.satisfies(
+                                pointData -> 
assertThat(pointData.getPoints()).anySatisfy(
+                                        point -> {
+                                            
assertThat(point.getAttributes()).isEqualTo(attributes);
+                                            
valueConsumer.accept(point.getValue());
+                                        }))));
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryTopicStatsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryTopicStatsTest.java
new file mode 100644
index 00000000000..c6d07c018c8
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryTopicStatsTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats;
+
+import static 
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
+import static org.assertj.core.api.Assertions.assertThat;
+import io.opentelemetry.api.common.Attributes;
+import java.util.Objects;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.policies.data.PublishRate;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class OpenTelemetryTopicStatsTest extends BrokerTestBase {
+
+    @BeforeMethod(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        super.baseSetup();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Override
+    protected void 
customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder builder) {
+        super.customizeMainPulsarTestContextBuilder(builder);
+        builder.enableOpenTelemetry(true);
+    }
+
+    @Test(timeOut = 30_000)
+    public void testMessagingMetrics() throws Exception {
+        var topicName = 
BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/testMessagingMetrics");
+        admin.topics().createNonPartitionedTopic(topicName);
+
+        var producerCount = 5;
+        var messagesPerProducer = 2;
+        var consumerCount = 3;
+        var messageCount = producerCount * messagesPerProducer;
+
+        for (int i = 0; i < producerCount; i++) {
+            var producer = 
registerCloseable(pulsarClient.newProducer().topic(topicName).create());
+            for (int j = 0; j < messagesPerProducer; j++) {
+                producer.send(String.format("producer-%d-msg-%d", i, 
j).getBytes());
+            }
+        }
+
+        var cdl = new CountDownLatch(consumerCount);
+        for (int i = 0; i < consumerCount; i++) {
+            var consumer = 
registerCloseable(pulsarClient.newConsumer().topic(topicName)
+                    .subscriptionName("test")
+                    
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                    .subscriptionType(SubscriptionType.Shared)
+                    .subscribe());
+            consumer.receiveAsync().orTimeout(100, 
TimeUnit.MILLISECONDS).handle((__, ex) -> {
+                cdl.countDown();
+                return null;
+            });
+        }
+        cdl.await();
+
+        var attributes = Attributes.builder()
+                .put(OpenTelemetryAttributes.PULSAR_DOMAIN, "persistent")
+                .put(OpenTelemetryAttributes.PULSAR_TENANT, "prop")
+                .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, "prop/ns-abc")
+                .put(OpenTelemetryAttributes.PULSAR_TOPIC, topicName)
+                .build();
+
+        var metrics = 
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+
+        assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.SUBSCRIPTION_COUNTER, attributes, 1);
+        assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.PRODUCER_COUNTER, attributes, producerCount);
+        assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.CONSUMER_COUNTER, attributes, consumerCount);
+
+        assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.MESSAGE_IN_COUNTER, attributes, messageCount);
+        assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.MESSAGE_OUT_COUNTER, attributes, messageCount);
+        assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.BYTES_IN_COUNTER, attributes,
+                actual -> assertThat(actual).isPositive());
+        assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.BYTES_OUT_COUNTER, attributes,
+                actual -> assertThat(actual).isPositive());
+
+        assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.STORAGE_COUNTER, attributes,
+                actual -> assertThat(actual).isPositive());
+        assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.STORAGE_LOGICAL_COUNTER,  attributes,
+                actual -> assertThat(actual).isPositive());
+        assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.STORAGE_BACKLOG_COUNTER,  attributes,
+                actual -> assertThat(actual).isPositive());
+
+        assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.STORAGE_OUT_COUNTER, attributes, messageCount);
+        assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.STORAGE_IN_COUNTER, attributes, messageCount);
+    }
+
+    @Test(timeOut = 30_000)
+    public void testPublishRateLimitMetric() throws Exception {
+        var topicName = 
BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/testPublishRateLimitMetric");
+        admin.topics().createNonPartitionedTopic(topicName);
+
+        var publishRate = new PublishRate(1, -1);
+        admin.topicPolicies().setPublishRate(topicName, publishRate);
+        Awaitility.await().until(() -> Objects.equals(publishRate, 
admin.topicPolicies().getPublishRate(topicName)));
+
+        @Cleanup
+        var producer = pulsarClient.newProducer().topic(topicName).create();
+        producer.send("msg".getBytes());
+
+        var attributes = Attributes.builder()
+                .put(OpenTelemetryAttributes.PULSAR_DOMAIN, "persistent")
+                .put(OpenTelemetryAttributes.PULSAR_TENANT, "prop")
+                .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, "prop/ns-abc")
+                .put(OpenTelemetryAttributes.PULSAR_TOPIC, topicName)
+                .build();
+
+        var metrics = 
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+        assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.PUBLISH_RATE_LIMIT_HIT_COUNTER, attributes, 1);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
index 13209ccfce7..dceb18cbeaa 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
@@ -28,7 +28,6 @@ import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -55,6 +54,7 @@ import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.broker.resources.TopicResources;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.ServerCnx;
+import org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil;
 import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
 import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
 import org.apache.pulsar.common.util.PortManager;
@@ -67,7 +67,6 @@ import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.metadata.impl.MetadataStoreFactoryImpl;
 import org.apache.pulsar.metadata.impl.ZKMetadataStore;
-import org.apache.pulsar.opentelemetry.OpenTelemetryService;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.MockZooKeeper;
 import org.apache.zookeeper.MockZooKeeperSession;
@@ -746,13 +745,8 @@ public class PulsarTestContext implements AutoCloseable {
             Consumer<AutoConfiguredOpenTelemetrySdkBuilder> 
openTelemetrySdkBuilderCustomizer;
             if (builder.enableOpenTelemetry) {
                 var reader = InMemoryMetricReader.create();
-                openTelemetrySdkBuilderCustomizer = sdkBuilder -> {
-                    sdkBuilder.addMeterProviderCustomizer(
-                            (meterProviderBuilder, __) -> 
meterProviderBuilder.registerMetricReader(reader));
-                    sdkBuilder.addPropertiesSupplier(
-                            () -> 
Map.of(OpenTelemetryService.OTEL_SDK_DISABLED_KEY, "false"));
-                };
                 openTelemetryMetricReader(reader);
+                openTelemetrySdkBuilderCustomizer = 
BrokerOpenTelemetryTestUtil.getOpenTelemetrySdkBuilderConsumer(reader);
             } else {
                 openTelemetrySdkBuilderCustomizer = null;
             }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index e45924e8bb4..ed1b74c46e0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -40,6 +40,7 @@ import io.netty.buffer.Unpooled;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timeout;
 import io.netty.util.concurrent.DefaultThreadFactory;
+import io.opentelemetry.api.common.Attributes;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.nio.charset.StandardCharsets;
@@ -79,6 +80,7 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.commons.lang3.reflect.MethodUtils;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.intercept.CounterBrokerInterceptor;
@@ -91,6 +93,8 @@ import org.apache.pulsar.broker.service.Topic;
 import 
org.apache.pulsar.broker.service.TransactionBufferSnapshotServiceFactory;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil;
+import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats;
 import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
 import org.apache.pulsar.broker.systopic.SystemTopicClient;
 import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
@@ -142,6 +146,7 @@ import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.compaction.CompactionServiceFactory;
 import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
@@ -182,7 +187,7 @@ public class TransactionTest extends TransactionTestBase {
 
     @Test
     public void testTopicTransactionMetrics() throws Exception {
-        final String topic = "persistent://tnx/ns1/test_transaction_topic";
+        final String topic = 
BrokerTestUtil.newUniqueName("persistent://tnx/ns1/test_transaction_topic");
 
         @Cleanup
         Producer<byte[]> producer = this.pulsarClient.newProducer()
@@ -216,6 +221,33 @@ public class TransactionTest extends TransactionTestBase {
         assertEquals(stats.committedTxnCount, 1);
         assertEquals(stats.abortedTxnCount, 1);
         assertEquals(stats.ongoingTxnCount, 1);
+
+        var attributes = Attributes.builder()
+                .put(OpenTelemetryAttributes.PULSAR_DOMAIN, "persistent")
+                .put(OpenTelemetryAttributes.PULSAR_TENANT, "tnx")
+                .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, "tnx/ns1")
+                .put(OpenTelemetryAttributes.PULSAR_TOPIC, topic)
+                .build();
+
+        var metrics = 
pulsarTestContexts.get(0).getOpenTelemetryMetricReader().collectAllMetrics();
+        BrokerOpenTelemetryTestUtil.assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.TRANSACTION_COUNTER,
+                Attributes.builder()
+                        .putAll(attributes)
+                        
.put(OpenTelemetryAttributes.PULSAR_TRANSACTION_STATUS, "committed")
+                        .build(),
+                1);
+        BrokerOpenTelemetryTestUtil.assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.TRANSACTION_COUNTER,
+                Attributes.builder()
+                        .putAll(attributes)
+                        
.put(OpenTelemetryAttributes.PULSAR_TRANSACTION_STATUS, "aborted")
+                        .build(),
+                1);
+        BrokerOpenTelemetryTestUtil.assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.TRANSACTION_COUNTER,
+                Attributes.builder()
+                        .putAll(attributes)
+                        
.put(OpenTelemetryAttributes.PULSAR_TRANSACTION_STATUS, "active")
+                        .build(),
+                1);
     }
 
     @Test
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
index 1ff835732aa..4ab886492a4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
@@ -169,6 +169,7 @@ public abstract class TransactionTestBase extends 
TestRetrySupport {
                     PulsarTestContext.builder()
                             .brokerInterceptor(new CounterBrokerInterceptor())
                             .spyByDefault()
+                            .enableOpenTelemetry(true)
                             .config(conf);
             if (i > 0) {
                 
testContextBuilder.reuseMockBookkeeperAndMetadataStores(pulsarTestContexts.get(0));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index 7a527a16889..2d2019b38ed 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -189,6 +189,9 @@ public class BrokerServiceLookupTest extends 
ProducerConsumerBase {
         
doReturn(Optional.of(resourceUnit)).when(loadManager2).getLeastLoaded(any(ServiceUnitId.class));
         loadManagerField.set(pulsar.getNamespaceService(), new 
AtomicReference<>(loadManager1));
 
+        // Disable collecting topic stats during this test, as it deadlocks on 
access to map BrokerService.topics.
+        pulsar2.getOpenTelemetryTopicStats().close();
+
         var metricReader = pulsarTestContext.getOpenTelemetryMetricReader();
         var lookupRequestSemaphoreField = 
BrokerService.class.getDeclaredField("lookupRequestSemaphore");
         lookupRequestSemaphoreField.setAccessible(true);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
index 71700ef83a4..debc3dd5e3f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
@@ -18,13 +18,17 @@
  */
 package org.apache.pulsar.compaction;
 
+import static 
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricDoubleSumValue;
+import static 
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
 import static org.apache.pulsar.client.impl.RawReaderTest.extractKey;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.netty.buffer.ByteBuf;
+import io.opentelemetry.api.common.Attributes;
 import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.HashMap;
@@ -37,7 +41,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-
 import lombok.Cleanup;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerEntry;
@@ -45,9 +48,12 @@ import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
 import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
@@ -63,6 +69,7 @@ import org.apache.pulsar.client.impl.RawMessageImpl;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
 import org.awaitility.Awaitility;
 import org.mockito.Mockito;
 import org.testng.Assert;
@@ -106,6 +113,12 @@ public class CompactorTest extends 
MockedPulsarServiceBaseTest {
         compactionScheduler.shutdownNow();
     }
 
+    @Override
+    protected void 
customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder 
pulsarTestContextBuilder) {
+        super.customizeMainPulsarTestContextBuilder(pulsarTestContextBuilder);
+        pulsarTestContextBuilder.enableOpenTelemetry(true);
+    }
+
     protected long compact(String topic) throws ExecutionException, 
InterruptedException {
         return compactor.compact(topic).get();
     }
@@ -186,7 +199,7 @@ public class CompactorTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testAllCompactedOut() throws Exception {
-        String topicName = 
"persistent://my-property/use/my-ns/testAllCompactedOut";
+        String topicName = 
BrokerTestUtil.newUniqueName("persistent://my-property/use/my-ns/testAllCompactedOut");
         // set retain null key to true
         boolean oldRetainNullKey = 
pulsar.getConfig().isTopicCompactionRetainNullKey();
         pulsar.getConfig().setTopicCompactionRetainNullKey(true);
@@ -208,6 +221,34 @@ public class CompactorTest extends 
MockedPulsarServiceBaseTest {
                     LongRunningProcessStatus.Status.SUCCESS);
         });
 
+        var attributes = Attributes.builder()
+                .put(OpenTelemetryAttributes.PULSAR_DOMAIN, "persistent")
+                .put(OpenTelemetryAttributes.PULSAR_TENANT, "my-property")
+                .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, 
"my-property/use/my-ns")
+                .put(OpenTelemetryAttributes.PULSAR_TOPIC, topicName)
+                .build();
+        var metrics = 
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+        assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.COMPACTION_REMOVED_COUNTER, attributes, 1);
+        assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.COMPACTION_OPERATION_COUNTER, Attributes.builder()
+                        .putAll(attributes)
+                        .put(OpenTelemetryAttributes.PULSAR_COMPACTION_STATUS, 
"success")
+                        .build(),
+                1);
+        assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.COMPACTION_OPERATION_COUNTER, Attributes.builder()
+                        .putAll(attributes)
+                        .put(OpenTelemetryAttributes.PULSAR_COMPACTION_STATUS, 
"failure")
+                        .build(),
+                0);
+        assertMetricDoubleSumValue(metrics, 
OpenTelemetryTopicStats.COMPACTION_DURATION_SECONDS, attributes,
+                actual -> assertThat(actual).isPositive());
+        assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.COMPACTION_BYTES_IN_COUNTER, attributes,
+                actual -> assertThat(actual).isPositive());
+        assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.COMPACTION_BYTES_OUT_COUNTER, attributes,
+                actual -> assertThat(actual).isPositive());
+        assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.COMPACTION_ENTRIES_COUNTER, attributes, 1);
+        assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.COMPACTION_BYTES_COUNTER, attributes,
+                actual -> assertThat(actual).isPositive());
+
         producer.newMessage().key("K1").value(null).sendAsync();
         producer.flush();
 
diff --git 
a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
 
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
index bdb002cb359..6088f52f72c 100644
--- 
a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
+++ 
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
@@ -29,4 +29,44 @@ public interface OpenTelemetryAttributes {
      * {@link OpenTelemetryService}.
      */
     AttributeKey<String> PULSAR_CLUSTER = 
AttributeKey.stringKey("pulsar.cluster");
+
+    /**
+     * The name of the Pulsar namespace.
+     */
+    AttributeKey<String> PULSAR_NAMESPACE = 
AttributeKey.stringKey("pulsar.namespace");
+
+    /**
+     * The name of the Pulsar tenant.
+     */
+    AttributeKey<String> PULSAR_TENANT = 
AttributeKey.stringKey("pulsar.tenant");
+
+    /**
+     * The Pulsar topic domain.
+     */
+    AttributeKey<String> PULSAR_DOMAIN = 
AttributeKey.stringKey("pulsar.domain");
+
+    /**
+     * The name of the Pulsar topic.
+     */
+    AttributeKey<String> PULSAR_TOPIC = AttributeKey.stringKey("pulsar.topic");
+
+    /**
+     * The partition index of a Pulsar topic.
+     */
+    AttributeKey<Long> PULSAR_PARTITION_INDEX = 
AttributeKey.longKey("pulsar.partition.index");
+
+    /**
+     * The status of the Pulsar transaction.
+     */
+    AttributeKey<String> PULSAR_TRANSACTION_STATUS = 
AttributeKey.stringKey("pulsar.transaction.status");
+
+    /**
+     * The status of the Pulsar compaction operation.
+     */
+    AttributeKey<String> PULSAR_COMPACTION_STATUS = 
AttributeKey.stringKey("pulsar.compaction.status");
+
+    /**
+     * The type of the backlog quota.
+     */
+    AttributeKey<String> PULSAR_BACKLOG_QUOTA_TYPE = 
AttributeKey.stringKey("pulsar.backlog.quota.type");
 }


Reply via email to