This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e535cb3f4a [feat][broker] PIP-264: Add transaction metrics (#22970)
4e535cb3f4a is described below

commit 4e535cb3f4a3482b0d5dc5a3a0a63c87490704e3
Author: Dragos Misca <dragosvic...@users.noreply.github.com>
AuthorDate: Thu Jun 27 02:54:43 2024 -0700

    [feat][broker] PIP-264: Add transaction metrics (#22970)
---
 .../org/apache/pulsar/broker/PulsarService.java    | 15 ++++
 .../broker/service/PersistentTopicAttributes.java  | 30 ++++++++
 .../service/persistent/PersistentSubscription.java |  7 +-
 .../service/persistent/PersistentTopicMetrics.java | 14 +++-
 .../broker/stats/OpenTelemetryTopicStats.java      | 27 ++++++-
 .../OpenTelemetryTransactionCoordinatorStats.java  | 87 ++++++++++++++++++++++
 ...enTelemetryTransactionPendingAckStoreStats.java | 72 ++++++++++++++++++
 .../buffer/TransactionBufferClientStats.java       |  7 +-
 .../buffer/impl/TransactionBufferClientImpl.java   |  9 ++-
 .../impl/TransactionBufferClientStatsImpl.java     | 61 +++++++++++++--
 .../transaction/pendingack/PendingAckHandle.java   |  7 ++
 .../pendingack/PendingAckHandleAttributes.java     | 63 ++++++++++++++++
 .../pendingack/PendingAckHandleStats.java          |  7 ++
 .../pendingack/impl/PendingAckHandleDisabled.java  |  6 ++
 .../pendingack/impl/PendingAckHandleImpl.java      | 28 ++++---
 .../pendingack/impl/PendingAckHandleStatsImpl.java | 56 +++++++++++++-
 .../pulsar/broker/transaction/TransactionTest.java | 24 +++++-
 .../buffer/TopicTransactionBufferTest.java         | 22 +++++-
 .../pendingack/PendingAckPersistentTest.java       | 40 ++++++++++
 .../opentelemetry/OpenTelemetryAttributes.java     | 33 +++++++-
 pulsar-transaction/coordinator/pom.xml             |  6 ++
 .../coordinator/TransactionMetadataStore.java      |  9 +++
 .../TransactionMetadataStoreAttributes.java        | 56 ++++++--------
 .../impl/InMemTransactionMetadataStore.java        | 16 ++++
 .../impl/MLTransactionMetadataStore.java           | 16 ++++
 25 files changed, 640 insertions(+), 78 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 0d8bc571c57..848484fe376 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -116,6 +116,8 @@ import 
org.apache.pulsar.broker.stats.OpenTelemetryConsumerStats;
 import org.apache.pulsar.broker.stats.OpenTelemetryProducerStats;
 import org.apache.pulsar.broker.stats.OpenTelemetryReplicatorStats;
 import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats;
+import org.apache.pulsar.broker.stats.OpenTelemetryTransactionCoordinatorStats;
+import 
org.apache.pulsar.broker.stats.OpenTelemetryTransactionPendingAckStoreStats;
 import org.apache.pulsar.broker.stats.PulsarBrokerOpenTelemetry;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
@@ -263,6 +265,8 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
     private OpenTelemetryConsumerStats openTelemetryConsumerStats;
     private OpenTelemetryProducerStats openTelemetryProducerStats;
     private OpenTelemetryReplicatorStats openTelemetryReplicatorStats;
+    private OpenTelemetryTransactionCoordinatorStats 
openTelemetryTransactionCoordinatorStats;
+    private OpenTelemetryTransactionPendingAckStoreStats 
openTelemetryTransactionPendingAckStoreStats;
 
     private TransactionMetadataStoreService transactionMetadataStoreService;
     private TransactionBufferProvider transactionBufferProvider;
@@ -684,6 +688,14 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
             brokerClientSharedTimer.stop();
             monotonicSnapshotClock.close();
 
+            if (openTelemetryTransactionPendingAckStoreStats != null) {
+                openTelemetryTransactionPendingAckStoreStats.close();
+                openTelemetryTransactionPendingAckStoreStats = null;
+            }
+            if (openTelemetryTransactionCoordinatorStats != null) {
+                openTelemetryTransactionCoordinatorStats.close();
+                openTelemetryTransactionCoordinatorStats = null;
+            }
             if (openTelemetryReplicatorStats != null) {
                 openTelemetryReplicatorStats.close();
                 openTelemetryReplicatorStats = null;
@@ -996,6 +1008,9 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                         
.newProvider(config.getTransactionBufferProviderClassName());
                 transactionPendingAckStoreProvider = 
TransactionPendingAckStoreProvider
                         
.newProvider(config.getTransactionPendingAckStoreProviderClassName());
+
+                openTelemetryTransactionCoordinatorStats = new 
OpenTelemetryTransactionCoordinatorStats(this);
+                openTelemetryTransactionPendingAckStoreStats = new 
OpenTelemetryTransactionPendingAckStoreStats(this);
             }
 
             this.metricsGenerator = new MetricsGenerator(this);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PersistentTopicAttributes.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PersistentTopicAttributes.java
index 048edafe884..51f5bdb354d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PersistentTopicAttributes.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PersistentTopicAttributes.java
@@ -36,6 +36,11 @@ public class PersistentTopicAttributes extends 
TopicAttributes {
     private final Attributes transactionCommittedAttributes;
     private final Attributes transactionAbortedAttributes;
 
+    private final Attributes transactionBufferClientCommitSucceededAttributes;
+    private final Attributes transactionBufferClientCommitFailedAttributes;
+    private final Attributes transactionBufferClientAbortSucceededAttributes;
+    private final Attributes transactionBufferClientAbortFailedAttributes;
+
     public PersistentTopicAttributes(TopicName topicName) {
         super(topicName);
 
@@ -61,6 +66,31 @@ public class PersistentTopicAttributes extends 
TopicAttributes {
                 
.putAll(OpenTelemetryAttributes.TransactionStatus.ABORTED.attributes)
                 .build();
 
+        transactionBufferClientCommitSucceededAttributes = Attributes.builder()
+                .putAll(commonAttributes)
+                .remove(OpenTelemetryAttributes.PULSAR_DOMAIN)
+                
.putAll(OpenTelemetryAttributes.TransactionStatus.COMMITTED.attributes)
+                
.putAll(OpenTelemetryAttributes.TransactionBufferClientOperationStatus.SUCCESS.attributes)
+                .build();
+        transactionBufferClientCommitFailedAttributes = Attributes.builder()
+                .putAll(commonAttributes)
+                .remove(OpenTelemetryAttributes.PULSAR_DOMAIN)
+                
.putAll(OpenTelemetryAttributes.TransactionStatus.COMMITTED.attributes)
+                
.putAll(OpenTelemetryAttributes.TransactionBufferClientOperationStatus.FAILURE.attributes)
+                .build();
+        transactionBufferClientAbortSucceededAttributes = Attributes.builder()
+                .putAll(commonAttributes)
+                .remove(OpenTelemetryAttributes.PULSAR_DOMAIN)
+                
.putAll(OpenTelemetryAttributes.TransactionStatus.ABORTED.attributes)
+                
.putAll(OpenTelemetryAttributes.TransactionBufferClientOperationStatus.SUCCESS.attributes)
+                .build();
+        transactionBufferClientAbortFailedAttributes = Attributes.builder()
+                .putAll(commonAttributes)
+                .remove(OpenTelemetryAttributes.PULSAR_DOMAIN)
+                
.putAll(OpenTelemetryAttributes.TransactionStatus.ABORTED.attributes)
+                
.putAll(OpenTelemetryAttributes.TransactionBufferClientOperationStatus.FAILURE.attributes)
+                .build();
+
         compactionSuccessAttributes = Attributes.builder()
                 .putAll(commonAttributes)
                 
.putAll(OpenTelemetryAttributes.CompactionStatus.SUCCESS.attributes)
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 7da339a420c..a1d51668ca8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
+import lombok.Getter;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
@@ -128,6 +129,7 @@ public class PersistentSubscription extends 
AbstractSubscription {
     private static final Map<String, Long> 
NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES = Map.of();
 
     private volatile ReplicatedSubscriptionSnapshotCache 
replicatedSubscriptionSnapshotCache;
+    @Getter
     private final PendingAckHandle pendingAckHandle;
     private volatile Map<String, String> subscriptionProperties;
     private volatile CompletableFuture<Void> fenceFuture;
@@ -1439,11 +1441,6 @@ public class PersistentSubscription extends 
AbstractSubscription {
         return cursor;
     }
 
-    @VisibleForTesting
-    public PendingAckHandle getPendingAckHandle() {
-        return pendingAckHandle;
-    }
-
     public void syncBatchPositionBitSetForPendingAck(Position position) {
         this.pendingAckHandle.syncBatchPositionAckSetForTransaction(position);
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopicMetrics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopicMetrics.java
index f79d053a979..d8ebece7a51 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopicMetrics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopicMetrics.java
@@ -21,12 +21,13 @@ package org.apache.pulsar.broker.service.persistent;
 import java.util.concurrent.atomic.LongAdder;
 import lombok.Getter;
 
-@SuppressWarnings("LombokGetterMayBeUsed")
+@Getter
 public class PersistentTopicMetrics {
 
-    @Getter
     private final BacklogQuotaMetrics backlogQuotaMetrics = new 
BacklogQuotaMetrics();
 
+    private final TransactionBufferClientMetrics 
transactionBufferClientMetrics = new TransactionBufferClientMetrics();
+
     public static class BacklogQuotaMetrics {
         private final LongAdder timeBasedBacklogQuotaExceededEvictionCount = 
new LongAdder();
         private final LongAdder sizeBasedBacklogQuotaExceededEvictionCount = 
new LongAdder();
@@ -47,4 +48,13 @@ public class PersistentTopicMetrics {
             return timeBasedBacklogQuotaExceededEvictionCount.longValue();
         }
     }
+
+    @Getter
+    public static class TransactionBufferClientMetrics {
+        private final LongAdder commitSucceededCount = new LongAdder();
+        private final LongAdder commitFailedCount = new LongAdder();
+
+        private final LongAdder abortSucceededCount = new LongAdder();
+        private final LongAdder abortFailedCount = new LongAdder();
+    }
 }
\ No newline at end of file
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryTopicStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryTopicStats.java
index b6d3f089077..0274cb7a7d4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryTopicStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryTopicStats.java
@@ -149,6 +149,12 @@ public class OpenTelemetryTopicStats implements 
AutoCloseable {
     public static final String TRANSACTION_COUNTER = 
"pulsar.broker.topic.transaction.count";
     private final ObservableLongMeasurement transactionCounter;
 
+    // Replaces ['pulsar_txn_tb_client_abort_failed_total', 
'pulsar_txn_tb_client_commit_failed_total',
+    //           'pulsar_txn_tb_client_abort_latency', 
'pulsar_txn_tb_client_commit_latency']
+    public static final String TRANSACTION_BUFFER_CLIENT_OPERATION_COUNTER =
+            "pulsar.broker.topic.transaction.buffer.client.operation.count";
+    private final ObservableLongMeasurement 
transactionBufferClientOperationCounter;
+
     // Replaces pulsar_subscription_delayed
     public static final String DELAYED_SUBSCRIPTION_COUNTER = 
"pulsar.broker.topic.subscription.delayed.entry.count";
     private final ObservableLongMeasurement delayedSubscriptionCounter;
@@ -333,6 +339,12 @@ public class OpenTelemetryTopicStats implements 
AutoCloseable {
                 .setDescription("The number of transactions on this topic.")
                 .buildObserver();
 
+        transactionBufferClientOperationCounter = meter
+                .counterBuilder(TRANSACTION_BUFFER_CLIENT_OPERATION_COUNTER)
+                .setUnit("{operation}")
+                .setDescription("The number of operations on the transaction 
buffer client.")
+                .buildObserver();
+
         delayedSubscriptionCounter = meter
                 .upDownCounterBuilder(DELAYED_SUBSCRIPTION_COUNTER)
                 .setUnit("{entry}")
@@ -371,6 +383,7 @@ public class OpenTelemetryTopicStats implements 
AutoCloseable {
                 compactionEntriesCounter,
                 compactionBytesCounter,
                 transactionCounter,
+                transactionBufferClientOperationCounter,
                 delayedSubscriptionCounter);
     }
 
@@ -399,6 +412,8 @@ public class OpenTelemetryTopicStats implements 
AutoCloseable {
         }
 
         if (topic instanceof PersistentTopic persistentTopic) {
+            var persistentTopicMetrics = 
persistentTopic.getPersistentTopicMetrics();
+
             var persistentTopicAttributes = 
persistentTopic.getTopicAttributes();
             var managedLedger = persistentTopic.getManagedLedger();
             var managedLedgerStats = 
persistentTopic.getManagedLedger().getStats();
@@ -416,7 +431,7 @@ public class OpenTelemetryTopicStats implements 
AutoCloseable {
                     
topic.getBacklogQuota(BacklogQuota.BacklogQuotaType.message_age).getLimitTime(),
                     attributes);
             
backlogQuotaAge.record(topic.getBestEffortOldestUnacknowledgedMessageAgeSeconds(),
 attributes);
-            var backlogQuotaMetrics = 
persistentTopic.getPersistentTopicMetrics().getBacklogQuotaMetrics();
+            var backlogQuotaMetrics = 
persistentTopicMetrics.getBacklogQuotaMetrics();
             
backlogEvictionCounter.record(backlogQuotaMetrics.getSizeBasedBacklogQuotaExceededEvictionCount(),
                     persistentTopicAttributes.getSizeBasedQuotaAttributes());
             
backlogEvictionCounter.record(backlogQuotaMetrics.getTimeBasedBacklogQuotaExceededEvictionCount(),
@@ -430,6 +445,16 @@ public class OpenTelemetryTopicStats implements 
AutoCloseable {
             transactionCounter.record(txnBuffer.getAbortedTxnCount(),
                     
persistentTopicAttributes.getTransactionAbortedAttributes());
 
+            var txnBufferClientMetrics = 
persistentTopicMetrics.getTransactionBufferClientMetrics();
+            
transactionBufferClientOperationCounter.record(txnBufferClientMetrics.getCommitSucceededCount().sum(),
+                    
persistentTopicAttributes.getTransactionBufferClientCommitSucceededAttributes());
+            
transactionBufferClientOperationCounter.record(txnBufferClientMetrics.getCommitFailedCount().sum(),
+                    
persistentTopicAttributes.getTransactionBufferClientCommitFailedAttributes());
+            
transactionBufferClientOperationCounter.record(txnBufferClientMetrics.getAbortSucceededCount().sum(),
+                    
persistentTopicAttributes.getTransactionBufferClientAbortSucceededAttributes());
+            
transactionBufferClientOperationCounter.record(txnBufferClientMetrics.getAbortFailedCount().sum(),
+                    
persistentTopicAttributes.getTransactionBufferClientAbortFailedAttributes());
+
             Optional.ofNullable(pulsar.getNullableCompactor())
                     .map(Compactor::getStats)
                     .flatMap(compactorMXBean -> 
compactorMXBean.getCompactionRecordForTopic(topic.getName()))
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryTransactionCoordinatorStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryTransactionCoordinatorStats.java
new file mode 100644
index 00000000000..ab73b2390b3
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryTransactionCoordinatorStats.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats;
+
+import io.opentelemetry.api.metrics.BatchCallback;
+import io.opentelemetry.api.metrics.ObservableLongMeasurement;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
+
+public class OpenTelemetryTransactionCoordinatorStats implements AutoCloseable 
{
+
+    // Replaces ['pulsar_txn_aborted_total',
+    //           'pulsar_txn_committed_total',
+    //           'pulsar_txn_created_total',
+    //           'pulsar_txn_timeout_total',
+    //           'pulsar_txn_active_count']
+    public static final String TRANSACTION_COUNTER = 
"pulsar.broker.transaction.coordinator.transaction.count";
+    private final ObservableLongMeasurement transactionCounter;
+
+    // Replaces pulsar_txn_append_log_total
+    public static final String APPEND_LOG_COUNTER = 
"pulsar.broker.transaction.coordinator.append.log.count";
+    private final ObservableLongMeasurement appendLogCounter;
+
+    private final BatchCallback batchCallback;
+
+    public OpenTelemetryTransactionCoordinatorStats(PulsarService pulsar) {
+        var meter = pulsar.getOpenTelemetry().getMeter();
+
+        transactionCounter = meter
+                .upDownCounterBuilder(TRANSACTION_COUNTER)
+                .setUnit("{transaction}")
+                .setDescription("The number of transactions handled by the 
coordinator.")
+                .buildObserver();
+
+        appendLogCounter = meter
+                .counterBuilder(APPEND_LOG_COUNTER)
+                .setUnit("{entry}")
+                .setDescription("The number of transaction metadata entries 
appended by the coordinator.")
+                .buildObserver();
+
+        batchCallback = meter.batchCallback(() -> {
+                    var transactionMetadataStoreService = 
pulsar.getTransactionMetadataStoreService();
+                    // Avoid NPE during Pulsar shutdown.
+                    if (transactionMetadataStoreService != null) {
+                        transactionMetadataStoreService.getStores()
+                                .values()
+                                
.forEach(this::recordMetricsForTransactionMetadataStore);
+                    }
+                },
+                transactionCounter,
+                appendLogCounter);
+    }
+
+    @Override
+    public void close() {
+        batchCallback.close();
+    }
+
+    private void 
recordMetricsForTransactionMetadataStore(TransactionMetadataStore 
transactionMetadataStore) {
+        var attributes = transactionMetadataStore.getAttributes();
+        var stats = transactionMetadataStore.getMetadataStoreStats();
+
+        transactionCounter.record(stats.getAbortedCount(), 
attributes.getTxnAbortedAttributes());
+        transactionCounter.record(stats.getActives(), 
attributes.getTxnActiveAttributes());
+        transactionCounter.record(stats.getCommittedCount(), 
attributes.getTxnCommittedAttributes());
+        transactionCounter.record(stats.getCreatedCount(), 
attributes.getTxnCreatedAttributes());
+        transactionCounter.record(stats.getTimeoutCount(), 
attributes.getTxnTimeoutAttributes());
+
+        appendLogCounter.record(stats.getAppendLogCount(), 
attributes.getCommonAttributes());
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryTransactionPendingAckStoreStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryTransactionPendingAckStoreStats.java
new file mode 100644
index 00000000000..562ad56e44d
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryTransactionPendingAckStoreStats.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats;
+
+import io.opentelemetry.api.metrics.ObservableLongCounter;
+import io.opentelemetry.api.metrics.ObservableLongMeasurement;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+
+public class OpenTelemetryTransactionPendingAckStoreStats implements 
AutoCloseable {
+
+    // Replaces ['pulsar_txn_tp_committed_count_total', 
'pulsar_txn_tp_aborted_count_total']
+    public static final String ACK_COUNTER = 
"pulsar.broker.transaction.pending.ack.store.transaction.count";
+    private final ObservableLongCounter ackCounter;
+
+    public OpenTelemetryTransactionPendingAckStoreStats(PulsarService pulsar) {
+        var meter = pulsar.getOpenTelemetry().getMeter();
+
+        ackCounter = meter
+                .counterBuilder(ACK_COUNTER)
+                .setUnit("{transaction}")
+                .setDescription("The number of transactions handled by the 
persistent ack store.")
+                .buildWithCallback(measurement -> pulsar.getBrokerService()
+                        .getTopics()
+                        .values()
+                        .stream()
+                        .filter(topicFuture -> topicFuture.isDone() && 
!topicFuture.isCompletedExceptionally())
+                        .map(CompletableFuture::join)
+                        .filter(Optional::isPresent)
+                        .map(Optional::get)
+                        .filter(Topic::isPersistent)
+                        .map(Topic::getSubscriptions)
+                        .forEach(subs -> subs.forEach((__, sub) -> 
recordMetricsForSubscription(measurement, sub))));
+    }
+
+    @Override
+    public void close() {
+        ackCounter.close();
+    }
+
+    private void recordMetricsForSubscription(ObservableLongMeasurement 
measurement, Subscription subscription) {
+        assert subscription instanceof PersistentSubscription; // The topics 
have already been filtered for persistence.
+        var stats = ((PersistentSubscription) 
subscription).getPendingAckHandle().getPendingAckHandleStats();
+        if (stats != null) {
+            var attributes = stats.getAttributes();
+            measurement.record(stats.getCommitSuccessCount(), 
attributes.getCommitSuccessAttributes());
+            measurement.record(stats.getCommitFailedCount(), 
attributes.getCommitFailureAttributes());
+            measurement.record(stats.getAbortSuccessCount(), 
attributes.getAbortSuccessAttributes());
+            measurement.record(stats.getAbortFailedCount(), 
attributes.getAbortFailureAttributes());
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientStats.java
index 8fda233ff1d..c21b212f981 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientStats.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.transaction.buffer;
 
+import org.apache.pulsar.broker.PulsarService;
 import 
org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientStatsImpl;
 import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
 
@@ -34,10 +35,10 @@ public interface TransactionBufferClientStats {
     void close();
 
 
-    static TransactionBufferClientStats create(boolean exposeTopicMetrics, 
TransactionBufferHandler handler,
-                                               boolean enableTxnCoordinator) {
+    static TransactionBufferClientStats create(PulsarService pulsarService, 
boolean exposeTopicMetrics,
+                                               TransactionBufferHandler 
handler, boolean enableTxnCoordinator) {
         return enableTxnCoordinator
-                ? 
TransactionBufferClientStatsImpl.getInstance(exposeTopicMetrics, handler) : 
NOOP;
+                ? TransactionBufferClientStatsImpl.getInstance(pulsarService, 
exposeTopicMetrics, handler) : NOOP;
     }
 
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java
index 382d640ca86..96ad0203900 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java
@@ -39,10 +39,11 @@ public class TransactionBufferClientImpl implements 
TransactionBufferClient {
     private final TransactionBufferHandler tbHandler;
     private final TransactionBufferClientStats stats;
 
-    private TransactionBufferClientImpl(TransactionBufferHandler tbHandler, 
boolean exposeTopicLevelMetrics,
-                                        boolean enableTxnCoordinator) {
+    private TransactionBufferClientImpl(PulsarService pulsarService, 
TransactionBufferHandler tbHandler,
+                                        boolean exposeTopicLevelMetrics, 
boolean enableTxnCoordinator) {
         this.tbHandler = tbHandler;
-        this.stats = 
TransactionBufferClientStats.create(exposeTopicLevelMetrics, tbHandler, 
enableTxnCoordinator);
+        this.stats = TransactionBufferClientStats.create(pulsarService, 
exposeTopicLevelMetrics, tbHandler,
+                enableTxnCoordinator);
     }
 
     public static TransactionBufferClient create(PulsarService pulsarService, 
HashedWheelTimer timer,
@@ -53,7 +54,7 @@ public class TransactionBufferClientImpl implements 
TransactionBufferClient {
         ServiceConfiguration config = pulsarService.getConfig();
         boolean exposeTopicLevelMetrics = 
config.isExposeTopicLevelMetricsInPrometheus();
         boolean enableTxnCoordinator = 
config.isTransactionCoordinatorEnabled();
-        return new TransactionBufferClientImpl(handler, 
exposeTopicLevelMetrics, enableTxnCoordinator);
+        return new TransactionBufferClientImpl(pulsarService, handler, 
exposeTopicLevelMetrics, enableTxnCoordinator);
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientStatsImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientStatsImpl.java
index a447f707893..4f1c2ca30cf 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientStatsImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientStatsImpl.java
@@ -18,31 +18,55 @@
  */
 package org.apache.pulsar.broker.transaction.buffer.impl;
 
+import io.opentelemetry.api.metrics.ObservableLongUpDownCounter;
 import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.Counter;
 import io.prometheus.client.Gauge;
 import io.prometheus.client.Summary;
+import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.LongAdder;
+import lombok.NonNull;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopicMetrics;
+import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats;
 import 
org.apache.pulsar.broker.transaction.buffer.TransactionBufferClientStats;
 import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric;
 
 public final class TransactionBufferClientStatsImpl implements 
TransactionBufferClientStats {
     private static final double[] QUANTILES = {0.50, 0.75, 0.95, 0.99, 0.999, 
0.9999, 1};
     private final AtomicBoolean closed = new AtomicBoolean(false);
 
+    @PulsarDeprecatedMetric(newMetricName = 
OpenTelemetryTopicStats.TRANSACTION_BUFFER_CLIENT_OPERATION_COUNTER)
     private final Counter abortFailed;
+    @PulsarDeprecatedMetric(newMetricName = 
OpenTelemetryTopicStats.TRANSACTION_BUFFER_CLIENT_OPERATION_COUNTER)
     private final Counter commitFailed;
+    @PulsarDeprecatedMetric(newMetricName = 
OpenTelemetryTopicStats.TRANSACTION_BUFFER_CLIENT_OPERATION_COUNTER)
     private final Summary abortLatency;
+    @PulsarDeprecatedMetric(newMetricName = 
OpenTelemetryTopicStats.TRANSACTION_BUFFER_CLIENT_OPERATION_COUNTER)
     private final Summary commitLatency;
+
+    public static final String PENDING_TRANSACTION_COUNTER = 
"pulsar.broker.transaction.buffer.client.pending.count";
+    private final ObservableLongUpDownCounter pendingTransactionCounter;
+
+    @PulsarDeprecatedMetric(newMetricName = PENDING_TRANSACTION_COUNTER)
     private final Gauge pendingRequests;
 
     private final boolean exposeTopicLevelMetrics;
 
+    private final BrokerService brokerService;
+
     private static TransactionBufferClientStats instance;
 
-    private TransactionBufferClientStatsImpl(boolean exposeTopicLevelMetrics,
-                                             TransactionBufferHandler handler) 
{
+    private TransactionBufferClientStatsImpl(@NonNull PulsarService 
pulsarService,
+                                             boolean exposeTopicLevelMetrics,
+                                             @NonNull TransactionBufferHandler 
handler) {
+        this.brokerService = 
Objects.requireNonNull(pulsarService.getBrokerService());
         this.exposeTopicLevelMetrics = exposeTopicLevelMetrics;
         String[] labelNames = exposeTopicLevelMetrics
                 ? new String[]{"namespace", "topic"} : new 
String[]{"namespace"};
@@ -63,9 +87,14 @@ public final class TransactionBufferClientStatsImpl 
implements TransactionBuffer
                 .setChild(new Gauge.Child() {
                     @Override
                     public double get() {
-                        return null == handler ? 0 : 
handler.getPendingRequestsCount();
+                        return handler.getPendingRequestsCount();
                     }
                 });
+        this.pendingTransactionCounter = 
pulsarService.getOpenTelemetry().getMeter()
+                .upDownCounterBuilder(PENDING_TRANSACTION_COUNTER)
+                .setDescription("The number of pending transactions in the 
transaction buffer client.")
+                .setUnit("{transaction}")
+                .buildWithCallback(measurement -> 
measurement.record(handler.getPendingRequestsCount()));
     }
 
     private Summary buildSummary(String name, String help, String[] 
labelNames) {
@@ -77,33 +106,52 @@ public final class TransactionBufferClientStatsImpl 
implements TransactionBuffer
         return builder.register();
     }
 
-    public static synchronized TransactionBufferClientStats 
getInstance(boolean exposeTopicLevelMetrics,
+    public static synchronized TransactionBufferClientStats 
getInstance(PulsarService pulsarService,
+                                                                        
boolean exposeTopicLevelMetrics,
                                                                         
TransactionBufferHandler handler) {
         if (null == instance) {
-            instance = new 
TransactionBufferClientStatsImpl(exposeTopicLevelMetrics, handler);
+            instance = new TransactionBufferClientStatsImpl(pulsarService, 
exposeTopicLevelMetrics, handler);
         }
-
         return instance;
     }
 
     @Override
     public void recordAbortFailed(String topic) {
         this.abortFailed.labels(labelValues(topic)).inc();
+        getTransactionBufferClientMetrics(topic)
+                
.map(PersistentTopicMetrics.TransactionBufferClientMetrics::getAbortFailedCount)
+                .ifPresent(LongAdder::increment);
     }
 
     @Override
     public void recordCommitFailed(String topic) {
         this.commitFailed.labels(labelValues(topic)).inc();
+        getTransactionBufferClientMetrics(topic)
+                
.map(PersistentTopicMetrics.TransactionBufferClientMetrics::getCommitFailedCount)
+                .ifPresent(LongAdder::increment);
     }
 
     @Override
     public void recordAbortLatency(String topic, long nanos) {
         this.abortLatency.labels(labelValues(topic)).observe(nanos);
+        getTransactionBufferClientMetrics(topic)
+                
.map(PersistentTopicMetrics.TransactionBufferClientMetrics::getAbortSucceededCount)
+                .ifPresent(LongAdder::increment);
     }
 
     @Override
     public void recordCommitLatency(String topic, long nanos) {
         this.commitLatency.labels(labelValues(topic)).observe(nanos);
+        getTransactionBufferClientMetrics(topic)
+                
.map(PersistentTopicMetrics.TransactionBufferClientMetrics::getCommitSucceededCount)
+                .ifPresent(LongAdder::increment);
+    }
+
+    private Optional<PersistentTopicMetrics.TransactionBufferClientMetrics> 
getTransactionBufferClientMetrics(
+            String topic) {
+        return brokerService.getTopicReference(topic)
+                .filter(t -> t instanceof PersistentTopic)
+                .map(t -> ((PersistentTopic) 
t).getPersistentTopicMetrics().getTransactionBufferClientMetrics());
     }
 
     private String[] labelValues(String topic) {
@@ -125,6 +173,7 @@ public final class TransactionBufferClientStatsImpl 
implements TransactionBuffer
             CollectorRegistry.defaultRegistry.unregister(this.abortLatency);
             CollectorRegistry.defaultRegistry.unregister(this.commitLatency);
             CollectorRegistry.defaultRegistry.unregister(this.pendingRequests);
+            pendingTransactionCounter.close();
         }
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java
index 168a6b1483f..dcebbb2829e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java
@@ -145,6 +145,13 @@ public interface PendingAckHandle {
      */
     TransactionPendingAckStats getStats(boolean lowWaterMarks);
 
+    /**
+     * Get the raw pending ack handle stats.
+     *
+     * @return the raw stats of this pending ack handle.
+     */
+    PendingAckHandleStats getPendingAckHandleStats();
+
     /**
      * Close the pending ack handle.
      *
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandleAttributes.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandleAttributes.java
new file mode 100644
index 00000000000..87363b673e1
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandleAttributes.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.pendingack;
+
+import io.opentelemetry.api.common.Attributes;
+import lombok.Getter;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
+import 
org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.TransactionPendingAckOperationStatus;
+import 
org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.TransactionStatus;
+
+@Getter
+public class PendingAckHandleAttributes {
+
+    private final Attributes commitSuccessAttributes;
+    private final Attributes commitFailureAttributes;
+    private final Attributes abortSuccessAttributes;
+    private final Attributes abortFailureAttributes;
+
+    public PendingAckHandleAttributes(String topic, String subscription) {
+        var topicName = TopicName.get(topic);
+        commitSuccessAttributes = getAttributes(topicName, subscription, 
TransactionStatus.COMMITTED,
+                TransactionPendingAckOperationStatus.SUCCESS);
+        commitFailureAttributes = getAttributes(topicName, subscription, 
TransactionStatus.COMMITTED,
+                TransactionPendingAckOperationStatus.FAILURE);
+        abortSuccessAttributes = getAttributes(topicName, subscription, 
TransactionStatus.ABORTED,
+                TransactionPendingAckOperationStatus.SUCCESS);
+        abortFailureAttributes = getAttributes(topicName, subscription, 
TransactionStatus.ABORTED,
+                TransactionPendingAckOperationStatus.FAILURE);
+    }
+
+    private static Attributes getAttributes(TopicName topicName, String 
subscriptionName,
+                                            TransactionStatus txStatus,
+                                            
TransactionPendingAckOperationStatus txAckStoreStatus) {
+        var builder = Attributes.builder()
+                .put(OpenTelemetryAttributes.PULSAR_SUBSCRIPTION_NAME, 
subscriptionName)
+                .put(OpenTelemetryAttributes.PULSAR_TENANT, 
topicName.getTenant())
+                .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, 
topicName.getNamespace())
+                .put(OpenTelemetryAttributes.PULSAR_TOPIC, 
topicName.getPartitionedTopicName())
+                .putAll(txStatus.attributes)
+                .putAll(txAckStoreStatus.attributes);
+        if (topicName.isPartitioned()) {
+            builder.put(OpenTelemetryAttributes.PULSAR_PARTITION_INDEX, 
topicName.getPartitionIndex());
+        }
+        return builder.build();
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandleStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandleStats.java
index 7f01b9b69f9..855651c9116 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandleStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandleStats.java
@@ -28,6 +28,13 @@ public interface PendingAckHandleStats {
 
     void close();
 
+    long getCommitSuccessCount();
+    long getCommitFailedCount();
+    long getAbortSuccessCount();
+    long getAbortFailedCount();
+
+    PendingAckHandleAttributes getAttributes();
+
     static PendingAckHandleStats create(String topic, String subName, boolean 
exposeTopicLevelMetrics) {
         return new PendingAckHandleStatsImpl(topic, subName, 
exposeTopicLevelMetrics);
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java
index 4d5852ea33d..fb633f7af65 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java
@@ -26,6 +26,7 @@ import org.apache.commons.lang3.tuple.MutablePair;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle;
+import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandleStats;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
 import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
@@ -91,6 +92,11 @@ public class PendingAckHandleDisabled implements 
PendingAckHandle {
         return null;
     }
 
+    @Override
+    public PendingAckHandleStats getPendingAckHandleStats() {
+        return null;
+    }
+
     @Override
     public CompletableFuture<Void> closeAsync() {
         return CompletableFuture.completedFuture(null);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index 98d0d3bf1b9..6a071c891ff 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -50,7 +50,6 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.commons.collections4.map.LinkedMap;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
@@ -155,20 +154,14 @@ public class PendingAckHandleImpl extends 
PendingAckHandleState implements Pendi
         this.topicName = persistentSubscription.getTopicName();
         this.subName = persistentSubscription.getName();
         this.persistentSubscription = persistentSubscription;
-        internalPinnedExecutor = persistentSubscription
-                .getTopic()
-                .getBrokerService()
-                .getPulsar()
-                .getTransactionExecutorProvider()
-                .getExecutor(this);
-
-        ServiceConfiguration config = 
persistentSubscription.getTopic().getBrokerService().pulsar().getConfig();
-        boolean exposeTopicLevelMetrics = 
config.isExposeTopicLevelMetricsInPrometheus();
-        this.handleStats = PendingAckHandleStats.create(topicName, subName, 
exposeTopicLevelMetrics);
-
-        this.pendingAckStoreProvider = this.persistentSubscription.getTopic()
-                        
.getBrokerService().getPulsar().getTransactionPendingAckStoreProvider();
-        transactionOpTimer = 
persistentSubscription.getTopic().getBrokerService().getPulsar().getTransactionTimer();
+        var pulsar = 
persistentSubscription.getTopic().getBrokerService().getPulsar();
+        internalPinnedExecutor = 
pulsar.getTransactionExecutorProvider().getExecutor(this);
+
+        this.handleStats = PendingAckHandleStats.create(
+                topicName, subName, 
pulsar.getConfig().isExposeTopicLevelMetricsInPrometheus());
+
+        this.pendingAckStoreProvider = 
pulsar.getTransactionPendingAckStoreProvider();
+        transactionOpTimer = pulsar.getTransactionTimer();
         init();
     }
 
@@ -1021,6 +1014,11 @@ public class PendingAckHandleImpl extends 
PendingAckHandleState implements Pendi
         return transactionInPendingAckStats;
     }
 
+    @Override
+    public PendingAckHandleStats getPendingAckHandleStats() {
+        return handleStats;
+    }
+
     @Override
     public CompletableFuture<Void> closeAsync() {
         changeToCloseState();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleStatsImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleStatsImpl.java
index f30c233af59..a89b582b838 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleStatsImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleStatsImpl.java
@@ -22,7 +22,10 @@ import io.prometheus.client.Counter;
 import io.prometheus.client.Summary;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.concurrent.atomic.LongAdder;
 import org.apache.commons.lang3.StringUtils;
+import 
org.apache.pulsar.broker.transaction.pendingack.PendingAckHandleAttributes;
 import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandleStats;
 import org.apache.pulsar.common.naming.TopicName;
 
@@ -37,6 +40,19 @@ public class PendingAckHandleStatsImpl implements 
PendingAckHandleStats {
     private final String[] labelFailed;
     private final String[] commitLatencyLabel;
 
+    private final String topic;
+    private final String subscription;
+
+    private final LongAdder commitTxnSucceedCounter = new LongAdder();
+    private final LongAdder commitTxnFailedCounter = new LongAdder();
+    private final LongAdder abortTxnSucceedCounter = new LongAdder();
+    private final LongAdder abortTxnFailedCounter = new LongAdder();
+
+    private volatile PendingAckHandleAttributes attributes = null;
+    private static final 
AtomicReferenceFieldUpdater<PendingAckHandleStatsImpl, 
PendingAckHandleAttributes>
+            ATTRIBUTES_UPDATER = AtomicReferenceFieldUpdater.newUpdater(
+                    PendingAckHandleStatsImpl.class, 
PendingAckHandleAttributes.class, "attributes");
+
     public PendingAckHandleStatsImpl(String topic, String subscription, 
boolean exposeTopicLevelMetrics) {
         initialize(exposeTopicLevelMetrics);
 
@@ -51,6 +67,9 @@ public class PendingAckHandleStatsImpl implements 
PendingAckHandleStats {
             }
         }
 
+        this.topic = topic;
+        this.subscription = subscription;
+
         labelSucceed = exposeTopicLevelMetrics0
                 ? new String[]{namespace, topic, subscription, "succeed"} : 
new String[]{namespace, "succeed"};
         labelFailed = exposeTopicLevelMetrics0
@@ -62,18 +81,24 @@ public class PendingAckHandleStatsImpl implements 
PendingAckHandleStats {
     @Override
     public void recordCommitTxn(boolean success, long nanos) {
         String[] labels;
+        LongAdder counter;
         if (success) {
             labels = labelSucceed;
+            counter = commitTxnSucceedCounter;
             
commitTxnLatency.labels(commitLatencyLabel).observe(TimeUnit.NANOSECONDS.toMicros(nanos));
         } else {
             labels = labelFailed;
+            counter = commitTxnFailedCounter;
         }
         commitTxnCounter.labels(labels).inc();
+        counter.increment();
     }
 
     @Override
     public void recordAbortTxn(boolean success) {
         abortTxnCounter.labels(success ? labelSucceed : labelFailed).inc();
+        var counter = success ? abortTxnSucceedCounter : abortTxnFailedCounter;
+        counter.increment();
     }
 
     @Override
@@ -81,11 +106,40 @@ public class PendingAckHandleStatsImpl implements 
PendingAckHandleStats {
         if (exposeTopicLevelMetrics0) {
             commitTxnCounter.remove(this.labelSucceed);
             commitTxnCounter.remove(this.labelFailed);
+            abortTxnCounter.remove(this.labelSucceed);
             abortTxnCounter.remove(this.labelFailed);
-            abortTxnCounter.remove(this.labelFailed);
         }
     }
 
+    @Override
+    public long getCommitSuccessCount() {
+        return commitTxnSucceedCounter.sum();
+    }
+
+    @Override
+    public long getCommitFailedCount() {
+        return commitTxnFailedCounter.sum();
+    }
+
+    @Override
+    public long getAbortSuccessCount() {
+        return abortTxnSucceedCounter.sum();
+    }
+
+    @Override
+    public long getAbortFailedCount() {
+        return abortTxnFailedCounter.sum();
+    }
+
+    @Override
+    public PendingAckHandleAttributes getAttributes() {
+        if (attributes != null) {
+            return attributes;
+        }
+        return ATTRIBUTES_UPDATER.updateAndGet(PendingAckHandleStatsImpl.this,
+                old -> old != null ? old : new 
PendingAckHandleAttributes(topic, subscription));
+    }
+
     static void initialize(boolean exposeTopicLevelMetrics) {
         if (INITIALIZED.compareAndSet(false, true)) {
             exposeTopicLevelMetrics0 = exposeTopicLevelMetrics;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 14d4375b7bf..2a928084e64 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.transaction;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
 import static 
org.apache.pulsar.common.naming.SystemTopicNames.PENDING_ACK_STORE_CURSOR_NAME;
 import static 
org.apache.pulsar.common.naming.SystemTopicNames.PENDING_ACK_STORE_SUFFIX;
 import static 
org.apache.pulsar.transaction.coordinator.impl.DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS;
@@ -94,7 +95,6 @@ import org.apache.pulsar.broker.service.Topic;
 import 
org.apache.pulsar.broker.service.TransactionBufferSnapshotServiceFactory;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
-import org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil;
 import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats;
 import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
 import org.apache.pulsar.broker.systopic.SystemTopicClient;
@@ -231,19 +231,35 @@ public class TransactionTest extends TransactionTestBase {
                 .build();
 
         var metrics = 
pulsarTestContexts.get(0).getOpenTelemetryMetricReader().collectAllMetrics();
-        BrokerOpenTelemetryTestUtil.assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.TRANSACTION_COUNTER,
+        assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.TRANSACTION_BUFFER_CLIENT_OPERATION_COUNTER,
+                Attributes.builder()
+                        .putAll(attributes)
+                        .remove(OpenTelemetryAttributes.PULSAR_DOMAIN)
+                        
.putAll(OpenTelemetryAttributes.TransactionStatus.COMMITTED.attributes)
+                        
.putAll(OpenTelemetryAttributes.TransactionBufferClientOperationStatus.SUCCESS.attributes)
+                        .build(),
+                1);
+        assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.TRANSACTION_BUFFER_CLIENT_OPERATION_COUNTER,
+                Attributes.builder()
+                        .putAll(attributes)
+                        .remove(OpenTelemetryAttributes.PULSAR_DOMAIN)
+                        
.putAll(OpenTelemetryAttributes.TransactionStatus.ABORTED.attributes)
+                        
.putAll(OpenTelemetryAttributes.TransactionBufferClientOperationStatus.SUCCESS.attributes)
+                        .build(),
+                1);
+        assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.TRANSACTION_COUNTER,
                 Attributes.builder()
                         .putAll(attributes)
                         
.put(OpenTelemetryAttributes.PULSAR_TRANSACTION_STATUS, "committed")
                         .build(),
                 1);
-        BrokerOpenTelemetryTestUtil.assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.TRANSACTION_COUNTER,
+        assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.TRANSACTION_COUNTER,
                 Attributes.builder()
                         .putAll(attributes)
                         
.put(OpenTelemetryAttributes.PULSAR_TRANSACTION_STATUS, "aborted")
                         .build(),
                 1);
-        BrokerOpenTelemetryTestUtil.assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.TRANSACTION_COUNTER,
+        assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.TRANSACTION_COUNTER,
                 Attributes.builder()
                         .putAll(attributes)
                         
.put(OpenTelemetryAttributes.PULSAR_TRANSACTION_STATUS, "active")
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
index af12caf1efd..dea79f391e3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
@@ -18,12 +18,14 @@
  */
 package org.apache.pulsar.broker.transaction.buffer;
 
+import static 
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.Mockito.when;
 import static org.testng.AssertJUnit.assertEquals;
 import static org.testng.AssertJUnit.assertTrue;
 import static org.testng.AssertJUnit.fail;
+import io.opentelemetry.api.common.Attributes;
 import java.time.Duration;
 import java.util.Collections;
 import java.util.List;
@@ -44,6 +46,7 @@ import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats;
 import org.apache.pulsar.broker.transaction.TransactionTestBase;
 import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
 import 
org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
@@ -57,6 +60,7 @@ import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.TopicMessageIdImpl;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
@@ -70,7 +74,6 @@ import org.testng.annotations.Test;
 
 public class TopicTransactionBufferTest extends TransactionTestBase {
 
-
     @BeforeMethod(alwaysRun = true)
     protected void setup() throws Exception {
         setBrokerCount(1);
@@ -101,10 +104,19 @@ public class TopicTransactionBufferTest extends 
TransactionTestBase {
     @Test
     public void testTransactionBufferAppendMarkerWriteFailState() throws 
Exception {
         final String topic = "persistent://" + NAMESPACE1 + 
"/testPendingAckManageLedgerWriteFailState";
+        var attributes = Attributes.builder()
+                .put(OpenTelemetryAttributes.PULSAR_TENANT, "tnx")
+                .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, "tnx/ns1")
+                .put(OpenTelemetryAttributes.PULSAR_TOPIC, topic)
+                
.putAll(OpenTelemetryAttributes.TransactionStatus.COMMITTED.attributes)
+                
.putAll(OpenTelemetryAttributes.TransactionBufferClientOperationStatus.FAILURE.attributes)
+                .build();
+
         Transaction txn = pulsarClient.newTransaction()
                 .withTransactionTimeout(5, TimeUnit.SECONDS)
                 .build().get();
 
+        @Cleanup
         Producer<byte[]> producer = pulsarClient
                 .newProducer()
                 .topic(topic)
@@ -112,11 +124,19 @@ public class TopicTransactionBufferTest extends 
TransactionTestBase {
                 .enableBatching(false)
                 .create();
 
+        assertMetricLongSumValue(
+                
pulsarTestContexts.get(0).getOpenTelemetryMetricReader().collectAllMetrics(),
+                
OpenTelemetryTopicStats.TRANSACTION_BUFFER_CLIENT_OPERATION_COUNTER, 
attributes, 0);
+
         producer.newMessage(txn).value("test".getBytes()).send();
         PersistentTopic persistentTopic = (PersistentTopic) 
getPulsarServiceList().get(0)
                 .getBrokerService().getTopic(TopicName.get(topic).toString(), 
false).get().get();
         FieldUtils.writeField(persistentTopic.getManagedLedger(), "state", 
ManagedLedgerImpl.State.WriteFailed, true);
         txn.commit().get();
+
+        assertMetricLongSumValue(
+                
pulsarTestContexts.get(0).getOpenTelemetryMetricReader().collectAllMetrics(),
+                
OpenTelemetryTopicStats.TRANSACTION_BUFFER_CLIENT_OPERATION_COUNTER, 
attributes, 1);
     }
 
     @Test
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
index 00cdb4162f0..9487e3d3746 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.transaction.pendingack;
 
 
+import static 
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
 import static 
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metric;
 import static 
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.parseMetrics;
 import static org.mockito.ArgumentMatchers.any;
@@ -31,6 +32,7 @@ import static org.testng.AssertJUnit.assertNotNull;
 import static org.testng.AssertJUnit.assertTrue;
 import static org.testng.AssertJUnit.fail;
 import com.google.common.collect.Multimap;
+import io.opentelemetry.api.common.Attributes;
 import java.io.ByteArrayOutputStream;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
@@ -56,6 +58,7 @@ import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import 
org.apache.pulsar.broker.stats.OpenTelemetryTransactionPendingAckStoreStats;
 import org.apache.pulsar.broker.transaction.TransactionTestBase;
 import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
 import 
org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
@@ -78,6 +81,7 @@ import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -366,6 +370,42 @@ public class PendingAckPersistentTest extends 
TransactionTestBase {
                 assertTrue(metric.value > 0);
             }
         }
+
+        var otelMetrics = 
pulsarTestContexts.get(0).getOpenTelemetryMetricReader().collectAllMetrics();
+        var commonAttributes = Attributes.builder()
+                .put(OpenTelemetryAttributes.PULSAR_TENANT, "tnx")
+                .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, "tnx/ns1")
+                .put(OpenTelemetryAttributes.PULSAR_TOPIC, 
TopicName.get(PENDING_ACK_REPLAY_TOPIC).toString())
+                .put(OpenTelemetryAttributes.PULSAR_SUBSCRIPTION_NAME, subName)
+                .build();
+        assertMetricLongSumValue(otelMetrics, 
OpenTelemetryTransactionPendingAckStoreStats.ACK_COUNTER,
+                Attributes.builder()
+                        .putAll(commonAttributes)
+                        
.put(OpenTelemetryAttributes.PULSAR_TRANSACTION_STATUS, "committed")
+                        
.put(OpenTelemetryAttributes.PULSAR_TRANSACTION_ACK_STORE_OPERATION_STATUS, 
"success")
+                        .build(),
+                50);
+        assertMetricLongSumValue(otelMetrics, 
OpenTelemetryTransactionPendingAckStoreStats.ACK_COUNTER,
+                Attributes.builder()
+                        .putAll(commonAttributes)
+                        
.put(OpenTelemetryAttributes.PULSAR_TRANSACTION_STATUS, "committed")
+                        
.put(OpenTelemetryAttributes.PULSAR_TRANSACTION_ACK_STORE_OPERATION_STATUS, 
"failure")
+                        .build(),
+                0);
+        assertMetricLongSumValue(otelMetrics, 
OpenTelemetryTransactionPendingAckStoreStats.ACK_COUNTER,
+                Attributes.builder()
+                        .putAll(commonAttributes)
+                        
.put(OpenTelemetryAttributes.PULSAR_TRANSACTION_STATUS, "aborted")
+                        
.put(OpenTelemetryAttributes.PULSAR_TRANSACTION_ACK_STORE_OPERATION_STATUS, 
"success")
+                        .build(),
+                50);
+        assertMetricLongSumValue(otelMetrics, 
OpenTelemetryTransactionPendingAckStoreStats.ACK_COUNTER,
+                Attributes.builder()
+                        .putAll(commonAttributes)
+                        
.put(OpenTelemetryAttributes.PULSAR_TRANSACTION_STATUS, "aborted")
+                        
.put(OpenTelemetryAttributes.PULSAR_TRANSACTION_ACK_STORE_OPERATION_STATUS, 
"failure")
+                        .build(),
+                0);
     }
 
     @Test
diff --git 
a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
 
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
index b530b50ee59..f485e300926 100644
--- 
a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
+++ 
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
@@ -117,12 +117,43 @@ public interface OpenTelemetryAttributes {
      */
     AttributeKey<String> PULSAR_TRANSACTION_STATUS = 
AttributeKey.stringKey("pulsar.transaction.status");
     enum TransactionStatus {
+        ABORTED,
         ACTIVE,
         COMMITTED,
-        ABORTED;
+        CREATED,
+        TIMEOUT;
         public final Attributes attributes = 
Attributes.of(PULSAR_TRANSACTION_STATUS, name().toLowerCase());
     }
 
+    /**
+     * The status of the Pulsar transaction ack store operation.
+     */
+    AttributeKey<String> PULSAR_TRANSACTION_ACK_STORE_OPERATION_STATUS =
+            
AttributeKey.stringKey("pulsar.transaction.pending.ack.store.operation.status");
+    enum TransactionPendingAckOperationStatus {
+        SUCCESS,
+        FAILURE;
+        public final Attributes attributes =
+                Attributes.of(PULSAR_TRANSACTION_ACK_STORE_OPERATION_STATUS, 
name().toLowerCase());
+    }
+
+    /**
+     * The ID of the Pulsar transaction coordinator.
+     */
+    AttributeKey<Long> PULSAR_TRANSACTION_COORDINATOR_ID = 
AttributeKey.longKey("pulsar.transaction.coordinator.id");
+
+    /**
+     * The status of the Pulsar transaction buffer client operation.
+     */
+    AttributeKey<String> PULSAR_TRANSACTION_BUFFER_CLIENT_OPERATION_STATUS =
+            
AttributeKey.stringKey("pulsar.transaction.buffer.client.operation.status");
+    enum TransactionBufferClientOperationStatus {
+        SUCCESS,
+        FAILURE;
+        public final Attributes attributes =
+                
Attributes.of(PULSAR_TRANSACTION_BUFFER_CLIENT_OPERATION_STATUS, 
name().toLowerCase());
+    }
+
     /**
      * The status of the Pulsar compaction operation.
      */
diff --git a/pulsar-transaction/coordinator/pom.xml 
b/pulsar-transaction/coordinator/pom.xml
index 4728cd40634..fc326d9e9ba 100644
--- a/pulsar-transaction/coordinator/pom.xml
+++ b/pulsar-transaction/coordinator/pom.xml
@@ -41,6 +41,12 @@
             <version>${project.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>pulsar-opentelemetry</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>${project.groupId}</groupId>
             <artifactId>managed-ledger</artifactId>
diff --git 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java
 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java
index ff5adb4d409..850fcfb4d19 100644
--- 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java
+++ 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java
@@ -133,6 +133,15 @@ public interface TransactionMetadataStore {
      */
     TransactionMetadataStoreStats getMetadataStoreStats();
 
+    /**
+     * Get the transaction metadata store OpenTelemetry attributes.
+     *
+     * @return TransactionMetadataStoreAttributes {@link 
TransactionMetadataStoreAttributes}
+     */
+    default TransactionMetadataStoreAttributes getAttributes() {
+        return new TransactionMetadataStoreAttributes(this);
+    }
+
     /**
      * Get the transactions witch timeout is bigger than given timeout.
      *
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PersistentTopicAttributes.java
 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreAttributes.java
similarity index 50%
copy from 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PersistentTopicAttributes.java
copy to 
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreAttributes.java
index 048edafe884..e8ae0f6d039 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PersistentTopicAttributes.java
+++ 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreAttributes.java
@@ -16,58 +16,44 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.service;
+package org.apache.pulsar.transaction.coordinator;
 
 import io.opentelemetry.api.common.Attributes;
 import lombok.Getter;
-import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
 
 @Getter
-public class PersistentTopicAttributes extends TopicAttributes {
-
-    private final Attributes timeBasedQuotaAttributes;
-    private final Attributes sizeBasedQuotaAttributes;
-
-    private final Attributes compactionSuccessAttributes;
-    private final Attributes compactionFailureAttributes;
-
-    private final Attributes transactionActiveAttributes;
-    private final Attributes transactionCommittedAttributes;
-    private final Attributes transactionAbortedAttributes;
-
-    public PersistentTopicAttributes(TopicName topicName) {
-        super(topicName);
-
-        timeBasedQuotaAttributes = Attributes.builder()
-                .putAll(commonAttributes)
-                
.putAll(OpenTelemetryAttributes.BacklogQuotaType.TIME.attributes)
-                .build();
-        sizeBasedQuotaAttributes = Attributes.builder()
+public class TransactionMetadataStoreAttributes {
+
+    private final Attributes commonAttributes;
+    private final Attributes txnAbortedAttributes;
+    private final Attributes txnActiveAttributes;
+    private final Attributes txnCommittedAttributes;
+    private final Attributes txnCreatedAttributes;
+    private final Attributes txnTimeoutAttributes;
+
+    public TransactionMetadataStoreAttributes(TransactionMetadataStore store) {
+        this.commonAttributes = Attributes.of(
+                OpenTelemetryAttributes.PULSAR_TRANSACTION_COORDINATOR_ID, 
store.getTransactionCoordinatorID().getId());
+        this.txnAbortedAttributes = Attributes.builder()
                 .putAll(commonAttributes)
-                
.putAll(OpenTelemetryAttributes.BacklogQuotaType.SIZE.attributes)
+                
.putAll(OpenTelemetryAttributes.TransactionStatus.ABORTED.attributes)
                 .build();
-
-        transactionActiveAttributes =  Attributes.builder()
+        this.txnActiveAttributes = Attributes.builder()
                 .putAll(commonAttributes)
                 
.putAll(OpenTelemetryAttributes.TransactionStatus.ACTIVE.attributes)
                 .build();
-        transactionCommittedAttributes =  Attributes.builder()
+        this.txnCommittedAttributes = Attributes.builder()
                 .putAll(commonAttributes)
                 
.putAll(OpenTelemetryAttributes.TransactionStatus.COMMITTED.attributes)
                 .build();
-        transactionAbortedAttributes =  Attributes.builder()
-                .putAll(commonAttributes)
-                
.putAll(OpenTelemetryAttributes.TransactionStatus.ABORTED.attributes)
-                .build();
-
-        compactionSuccessAttributes = Attributes.builder()
+        this.txnCreatedAttributes = Attributes.builder()
                 .putAll(commonAttributes)
-                
.putAll(OpenTelemetryAttributes.CompactionStatus.SUCCESS.attributes)
+                
.putAll(OpenTelemetryAttributes.TransactionStatus.CREATED.attributes)
                 .build();
-        compactionFailureAttributes = Attributes.builder()
+        this.txnTimeoutAttributes = Attributes.builder()
                 .putAll(commonAttributes)
-                
.putAll(OpenTelemetryAttributes.CompactionStatus.FAILURE.attributes)
+                
.putAll(OpenTelemetryAttributes.TransactionStatus.TIMEOUT.attributes)
                 .build();
     }
 }
diff --git 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java
 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java
index 0f3c5e42d7a..7817d484875 100644
--- 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java
+++ 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java
@@ -23,12 +23,14 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.concurrent.atomic.LongAdder;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.policies.data.TransactionCoordinatorStats;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
+import 
org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreAttributes;
 import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
 import org.apache.pulsar.transaction.coordinator.TxnMeta;
 import 
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.InvalidTxnStatusException;
@@ -49,6 +51,11 @@ class InMemTransactionMetadataStore implements 
TransactionMetadataStore {
     private final LongAdder abortTransactionCount;
     private final LongAdder transactionTimeoutCount;
 
+    private volatile TransactionMetadataStoreAttributes attributes = null;
+    private static final 
AtomicReferenceFieldUpdater<InMemTransactionMetadataStore, 
TransactionMetadataStoreAttributes>
+            ATTRIBUTES_FIELD_UPDATER = AtomicReferenceFieldUpdater.newUpdater(
+                    InMemTransactionMetadataStore.class, 
TransactionMetadataStoreAttributes.class, "attributes");
+
     InMemTransactionMetadataStore(TransactionCoordinatorID tcID) {
         this.tcID = tcID;
         this.localID = new AtomicLong(0L);
@@ -165,4 +172,13 @@ class InMemTransactionMetadataStore implements 
TransactionMetadataStore {
     public List<TxnMeta> getSlowTransactions(long timeout) {
         return null;
     }
+
+    @Override
+    public TransactionMetadataStoreAttributes getAttributes() {
+        if (attributes != null) {
+            return attributes;
+        }
+        return ATTRIBUTES_FIELD_UPDATER.updateAndGet(this,
+                old -> old != null ? old : new 
TransactionMetadataStoreAttributes(this));
+    }
 }
diff --git 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index b6eaad2e3e3..6bd7a947e38 100644
--- 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++ 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -30,6 +30,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.concurrent.atomic.LongAdder;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.Position;
@@ -45,6 +46,7 @@ import org.apache.pulsar.common.util.RecoverTimeRecord;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.TransactionLogReplayCallback;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
+import 
org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreAttributes;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
 import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
 import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
@@ -83,6 +85,11 @@ public class MLTransactionMetadataStore
     public final RecoverTimeRecord recoverTime = new RecoverTimeRecord();
     private final long maxActiveTransactionsPerCoordinator;
 
+    private volatile TransactionMetadataStoreAttributes attributes = null;
+    private static final 
AtomicReferenceFieldUpdater<MLTransactionMetadataStore, 
TransactionMetadataStoreAttributes>
+            ATTRIBUTES_FIELD_UPDATER = AtomicReferenceFieldUpdater.newUpdater(
+                    MLTransactionMetadataStore.class, 
TransactionMetadataStoreAttributes.class, "attributes");
+
     public MLTransactionMetadataStore(TransactionCoordinatorID tcID,
                                       MLTransactionLogImpl mlTransactionLog,
                                       TransactionTimeoutTracker timeoutTracker,
@@ -549,4 +556,13 @@ public class MLTransactionMetadataStore
     public ManagedLedger getManagedLedger() {
         return this.transactionLog.getManagedLedger();
     }
+
+    @Override
+    public TransactionMetadataStoreAttributes getAttributes() {
+        if (attributes != null) {
+            return attributes;
+        }
+        return ATTRIBUTES_FIELD_UPDATER.updateAndGet(this,
+                old -> old != null ? old : new 
TransactionMetadataStoreAttributes(this));
+    }
 }

Reply via email to