This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9937d22ffea [improve][broker] Use atomic counter for ongoing 
transaction count (#25053)
9937d22ffea is described below

commit 9937d22ffea22b199fd5782153ebb4a9c2f25391
Author: Ruimin MA <[email protected]>
AuthorDate: Mon Dec 15 09:19:32 2025 +0800

    [improve][broker] Use atomic counter for ongoing transaction count (#25053)
---
 .../TransactionMetadataStoreServiceTest.java       | 47 +++++++++-------------
 .../impl/MLTransactionMetadataStore.java           | 27 ++++++++++---
 .../coordinator/impl/MLTransactionLogImplTest.java |  2 +-
 3 files changed, 40 insertions(+), 36 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
index df61d37564c..9f3776ca7cb 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.LongAdder;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -173,10 +174,7 @@ public class TransactionMetadataStoreServiceTest extends 
BrokerTestBase {
                 (MLTransactionMetadataStore) 
pulsar.getTransactionMetadataStoreService()
                         .getStores().get(TransactionCoordinatorID.get(0));
         checkTransactionMetadataStoreReady(transactionMetadataStore);
-        Field field = 
MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap");
-        field.setAccessible(true);
-        ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMap =
-                (ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>>) 
field.get(transactionMetadataStore);
+        ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMap = 
transactionMetadataStore.getTxnMetaMap();
         int i = -1;
         while (++i < 1000) {
             try {
@@ -189,7 +187,7 @@ public class TransactionMetadataStoreServiceTest extends 
BrokerTestBase {
         txnMap.forEach((txnID, txnMetaListPair) ->
                 Assert.assertEquals(txnMetaListPair.getLeft().status(), 
TxnStatus.OPEN));
         Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS)
-                .until(() -> txnMap.size() == 0);
+                .until(() -> 
transactionMetadataStore.getOnGoingTxnCount().intValue() == 0);
     }
 
     private TxnID newTransactionWithTimeoutOf(long timeout)
@@ -209,25 +207,23 @@ public class TransactionMetadataStoreServiceTest extends 
BrokerTestBase {
                 (MLTransactionMetadataStore) 
pulsar.getTransactionMetadataStoreService()
                         .getStores().get(TransactionCoordinatorID.get(0));
         checkTransactionMetadataStoreReady(transactionMetadataStore);
-        Field field = 
MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap");
-        field.setAccessible(true);
-        ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMap =
-                (ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>>) 
field.get(transactionMetadataStore);
+        ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMap = 
transactionMetadataStore.getTxnMetaMap();
+        LongAdder onGoingTxtCount = 
transactionMetadataStore.getOnGoingTxnCount();
 
         newTransactionWithTimeoutOf(2000);
 
-        assertEquals(txnMap.size(), 1);
+        assertEquals(onGoingTxtCount.intValue(), 1);
 
         txnMap.forEach((txnID, txnMetaListPair) ->
                 Assert.assertEquals(txnMetaListPair.getLeft().status(), 
TxnStatus.OPEN));
-        Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS).until(() -> 
txnMap.size() == 0);
+        Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS).until(() -> 
onGoingTxtCount.intValue() == 0);
 
         newTransactionWithTimeoutOf(2000);
-        assertEquals(txnMap.size(), 1);
+        assertEquals(onGoingTxtCount.intValue(), 1);
 
         txnMap.forEach((txnID, txnMetaListPair) ->
                 Assert.assertEquals(txnMetaListPair.getLeft().status(), 
TxnStatus.OPEN));
-        Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS).until(() -> 
txnMap.size() == 0);
+        Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS).until(() -> 
onGoingTxtCount.intValue() == 0);
     }
 
     @Test
@@ -241,10 +237,7 @@ public class TransactionMetadataStoreServiceTest extends 
BrokerTestBase {
                         .getStores().get(TransactionCoordinatorID.get(0));
 
         checkTransactionMetadataStoreReady(transactionMetadataStore);
-        Field field = 
MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap");
-        field.setAccessible(true);
-        ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMap =
-                (ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>>) 
field.get(transactionMetadataStore);
+        LongAdder onGoingTxtCount = 
transactionMetadataStore.getOnGoingTxnCount();
         new Thread(() -> {
             int i = -1;
             while (++i < 100) {
@@ -289,15 +282,15 @@ public class TransactionMetadataStoreServiceTest extends 
BrokerTestBase {
             }
         }).start();
 
-        checkoutTimeout(txnMap, 300);
-        checkoutTimeout(txnMap, 200);
-        checkoutTimeout(txnMap, 100);
-        checkoutTimeout(txnMap, 0);
+        checkoutTimeout(onGoingTxtCount, 300);
+        checkoutTimeout(onGoingTxtCount, 200);
+        checkoutTimeout(onGoingTxtCount, 100);
+        checkoutTimeout(onGoingTxtCount, 0);
     }
 
-    private void checkoutTimeout(ConcurrentSkipListMap<Long, Pair<TxnMeta, 
List<Position>>> txnMap, int time) {
+    private void checkoutTimeout(LongAdder onGoingTxtCount, int time) {
         Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS)
-                .until(() -> txnMap.size() == time);
+                .until(() -> onGoingTxtCount.intValue() == time);
     }
 
     @Test
@@ -326,12 +319,8 @@ public class TransactionMetadataStoreServiceTest extends 
BrokerTestBase {
                         .getStores().get(TransactionCoordinatorID.get(0));
 
         checkTransactionMetadataStoreReady(transactionMetadataStore);
-
-        Field field = 
MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap");
-        field.setAccessible(true);
-        ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMap =
-                (ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>>) 
field.get(transactionMetadataStore);
-        Awaitility.await().until(() -> txnMap.size() == 0);
+        LongAdder onGoingTxtCount = 
transactionMetadataStore.getOnGoingTxnCount();
+        Awaitility.await().until(() -> onGoingTxtCount.intValue() == 0);
 
     }
 
diff --git 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index 6bd7a947e38..526fe0fcb7f 100644
--- 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++ 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.concurrent.atomic.LongAdder;
+import lombok.Getter;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.commons.lang3.StringUtils;
@@ -72,6 +73,7 @@ public class MLTransactionMetadataStore
     private final TransactionCoordinatorID tcID;
     private final MLTransactionLogImpl transactionLog;
     @VisibleForTesting
+    @Getter
     final ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> 
txnMetaMap = new ConcurrentSkipListMap<>();
     private final TransactionTimeoutTracker timeoutTracker;
     private final TransactionMetadataStoreStats transactionMetadataStoreStats;
@@ -80,6 +82,9 @@ public class MLTransactionMetadataStore
     private final LongAdder abortedTransactionCount;
     private final LongAdder transactionTimeoutCount;
     private final LongAdder appendLogCount;
+    @Getter
+    @VisibleForTesting
+    private final LongAdder onGoingTxnCount;
     private final MLTransactionSequenceIdGenerator sequenceIdGenerator;
     private final ExecutorService internalPinnedExecutor;
     public final RecoverTimeRecord recoverTime = new RecoverTimeRecord();
@@ -108,6 +113,7 @@ public class MLTransactionMetadataStore
         this.abortedTransactionCount = new LongAdder();
         this.transactionTimeoutCount = new LongAdder();
         this.appendLogCount = new LongAdder();
+        this.onGoingTxnCount = new LongAdder();
         DefaultThreadFactory threadFactory = new 
DefaultThreadFactory("transaction_coordinator_"
                 + tcID.toString() + "thread_factory");
         this.internalPinnedExecutor = 
Executors.newSingleThreadScheduledExecutor(threadFactory);
@@ -162,6 +168,7 @@ public class MLTransactionMetadataStore
                                     final TxnMetaImpl left = new 
TxnMetaImpl(txnID,
                                             openTimestamp, timeoutAt, owner);
                                     txnMetaMap.put(transactionId, 
MutablePair.of(left, positions));
+                                    onGoingTxnCount.increment();
                                     
recoverTracker.handleOpenStatusTransaction(txnSequenceId,
                                             timeoutAt + openTimestamp);
                                 }
@@ -197,8 +204,12 @@ public class MLTransactionMetadataStore
                                     
recoverTracker.updateTransactionStatus(txnID.getLeastSigBits(), newStatus);
                                     if (newStatus == TxnStatus.COMMITTED || 
newStatus == TxnStatus.ABORTED) {
                                         
transactionLog.deletePosition(txnMetaMap
-                                                
.get(transactionId).getRight()).thenAccept(v ->
-                                                
txnMetaMap.remove(transactionId).getLeft());
+                                                
.get(transactionId).getRight()).thenAccept(v -> {
+                                                    if 
(txnMetaMap.remove(transactionId) != null) {
+                                                        
onGoingTxnCount.decrement();
+                                                    }
+                                                }
+                                        );
                                     }
                                 }
                                 break;
@@ -237,7 +248,7 @@ public class MLTransactionMetadataStore
     @Override
     public CompletableFuture<TxnID> newTransaction(long timeOut, String owner) 
{
         if (this.maxActiveTransactionsPerCoordinator == 0
-                || this.maxActiveTransactionsPerCoordinator > 
txnMetaMap.size()) {
+                || this.maxActiveTransactionsPerCoordinator > 
onGoingTxnCount.longValue()) {
             CompletableFuture<TxnID> completableFuture = new 
CompletableFuture<>();
             FutureUtil.safeRunAsync(() -> {
                 if (!checkIfReady()) {
@@ -276,6 +287,7 @@ public class MLTransactionMetadataStore
                                 positions.add(position);
                                 Pair<TxnMeta, List<Position>> pair = 
MutablePair.of(txn, positions);
                                 txnMetaMap.put(leastSigBits, pair);
+                                onGoingTxnCount.increment();
                                 
this.timeoutTracker.addTransaction(leastSigBits, timeOut);
                                 createdTransactionCount.increment();
                                 completableFuture.complete(txnID);
@@ -422,7 +434,9 @@ public class MLTransactionMetadataStore
                                     } else {
                                         abortedTransactionCount.increment();
                                     }
-                                    txnMetaMap.remove(txnID.getLeastSigBits());
+                                    if 
(txnMetaMap.remove(txnID.getLeastSigBits()) != null) {
+                                        onGoingTxnCount.decrement();
+                                    }
                                     
transactionLog.deletePosition(txnMetaListPair.getRight()).exceptionally(ex -> {
                                         log.warn("Failed to delete transaction 
log position "
                                                 + "at end transaction [{}]", 
txnID);
@@ -466,7 +480,7 @@ public class MLTransactionMetadataStore
         transactionCoordinatorstats.setLowWaterMark(getLowWaterMark());
         transactionCoordinatorstats.setState(getState().name());
         
transactionCoordinatorstats.setLeastSigBits(sequenceIdGenerator.getCurrentSequenceId());
-        transactionCoordinatorstats.ongoingTxnSize = txnMetaMap.size();
+        transactionCoordinatorstats.ongoingTxnSize = 
onGoingTxnCount.longValue();
         transactionCoordinatorstats.recoverStartTime = 
recoverTime.getRecoverStartTime();
         transactionCoordinatorstats.recoverEndTime = 
recoverTime.getRecoverEndTime();
         return transactionCoordinatorstats;
@@ -490,6 +504,7 @@ public class MLTransactionMetadataStore
             internalPinnedExecutor.shutdown();
             return transactionLog.closeAsync().thenCompose(v -> {
                 txnMetaMap.clear();
+                onGoingTxnCount.reset();
                 this.timeoutTracker.close();
                 if (!this.changeToCloseState()) {
                     return FutureUtil.failedFuture(
@@ -508,7 +523,7 @@ public class MLTransactionMetadataStore
     @Override
     public TransactionMetadataStoreStats getMetadataStoreStats() {
         this.transactionMetadataStoreStats.setCoordinatorId(tcID.getId());
-        this.transactionMetadataStoreStats.setActives(txnMetaMap.size());
+        
this.transactionMetadataStoreStats.setActives(onGoingTxnCount.intValue());
         
this.transactionMetadataStoreStats.setCreatedCount(this.createdTransactionCount.longValue());
         
this.transactionMetadataStoreStats.setCommittedCount(this.committedTransactionCount.longValue());
         
this.transactionMetadataStoreStats.setAbortedCount(this.abortedTransactionCount.longValue());
diff --git 
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImplTest.java
 
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImplTest.java
index 5355256d369..052b2d193b9 100644
--- 
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImplTest.java
+++ 
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImplTest.java
@@ -171,7 +171,7 @@ public class MLTransactionLogImplTest extends 
MockedBookKeeperTestCase {
                 new MLTransactionMetadataStore(transactionCoordinatorID,
                 mlTransactionLogForRecover, timeoutTracker, 
sequenceIdGenerator, Integer.MAX_VALUE);
         transactionMetadataStoreForRecover.init(recoverTracker).get(2000, 
TimeUnit.SECONDS);
-        
Assert.assertEquals(transactionMetadataStoreForRecover.txnMetaMap.size(), 
expectedMapping.size());
+        
Assert.assertEquals(transactionMetadataStoreForRecover.getOnGoingTxnCount().intValue(),
 expectedMapping.size());
         Iterator<Integer> txnIdSet = expectedMapping.keySet().iterator();
         while (txnIdSet.hasNext()){
             int txnId = txnIdSet.next();

Reply via email to