This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit f579a589411d63ec81c8b4e991fc554c9abe7d18 Author: Ruimin MA <[email protected]> AuthorDate: Mon Dec 15 09:19:32 2025 +0800 [improve][broker] Use atomic counter for ongoing transaction count (#25053) (cherry picked from commit 9937d22ffea22b199fd5782153ebb4a9c2f25391) --- .../TransactionMetadataStoreServiceTest.java | 47 +++++++++------------- .../impl/MLTransactionMetadataStore.java | 27 ++++++++++--- .../coordinator/impl/MLTransactionLogImplTest.java | 2 +- 3 files changed, 40 insertions(+), 36 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java index df61d37564c..9f3776ca7cb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java @@ -31,6 +31,7 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.LongAdder; import org.apache.bookkeeper.mledger.Position; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.ServiceConfiguration; @@ -173,10 +174,7 @@ public class TransactionMetadataStoreServiceTest extends BrokerTestBase { (MLTransactionMetadataStore) pulsar.getTransactionMetadataStoreService() .getStores().get(TransactionCoordinatorID.get(0)); checkTransactionMetadataStoreReady(transactionMetadataStore); - Field field = MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap"); - field.setAccessible(true); - ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMap = - (ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>>) field.get(transactionMetadataStore); + ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMap = transactionMetadataStore.getTxnMetaMap(); int i = -1; while (++i < 1000) { try { @@ -189,7 +187,7 @@ public class TransactionMetadataStoreServiceTest extends BrokerTestBase { txnMap.forEach((txnID, txnMetaListPair) -> Assert.assertEquals(txnMetaListPair.getLeft().status(), TxnStatus.OPEN)); Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS) - .until(() -> txnMap.size() == 0); + .until(() -> transactionMetadataStore.getOnGoingTxnCount().intValue() == 0); } private TxnID newTransactionWithTimeoutOf(long timeout) @@ -209,25 +207,23 @@ public class TransactionMetadataStoreServiceTest extends BrokerTestBase { (MLTransactionMetadataStore) pulsar.getTransactionMetadataStoreService() .getStores().get(TransactionCoordinatorID.get(0)); checkTransactionMetadataStoreReady(transactionMetadataStore); - Field field = MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap"); - field.setAccessible(true); - ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMap = - (ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>>) field.get(transactionMetadataStore); + ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMap = transactionMetadataStore.getTxnMetaMap(); + LongAdder onGoingTxtCount = transactionMetadataStore.getOnGoingTxnCount(); newTransactionWithTimeoutOf(2000); - assertEquals(txnMap.size(), 1); + assertEquals(onGoingTxtCount.intValue(), 1); txnMap.forEach((txnID, txnMetaListPair) -> Assert.assertEquals(txnMetaListPair.getLeft().status(), TxnStatus.OPEN)); - Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS).until(() -> txnMap.size() == 0); + Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS).until(() -> onGoingTxtCount.intValue() == 0); newTransactionWithTimeoutOf(2000); - assertEquals(txnMap.size(), 1); + assertEquals(onGoingTxtCount.intValue(), 1); txnMap.forEach((txnID, txnMetaListPair) -> Assert.assertEquals(txnMetaListPair.getLeft().status(), TxnStatus.OPEN)); - Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS).until(() -> txnMap.size() == 0); + Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS).until(() -> onGoingTxtCount.intValue() == 0); } @Test @@ -241,10 +237,7 @@ public class TransactionMetadataStoreServiceTest extends BrokerTestBase { .getStores().get(TransactionCoordinatorID.get(0)); checkTransactionMetadataStoreReady(transactionMetadataStore); - Field field = MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap"); - field.setAccessible(true); - ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMap = - (ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>>) field.get(transactionMetadataStore); + LongAdder onGoingTxtCount = transactionMetadataStore.getOnGoingTxnCount(); new Thread(() -> { int i = -1; while (++i < 100) { @@ -289,15 +282,15 @@ public class TransactionMetadataStoreServiceTest extends BrokerTestBase { } }).start(); - checkoutTimeout(txnMap, 300); - checkoutTimeout(txnMap, 200); - checkoutTimeout(txnMap, 100); - checkoutTimeout(txnMap, 0); + checkoutTimeout(onGoingTxtCount, 300); + checkoutTimeout(onGoingTxtCount, 200); + checkoutTimeout(onGoingTxtCount, 100); + checkoutTimeout(onGoingTxtCount, 0); } - private void checkoutTimeout(ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMap, int time) { + private void checkoutTimeout(LongAdder onGoingTxtCount, int time) { Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS) - .until(() -> txnMap.size() == time); + .until(() -> onGoingTxtCount.intValue() == time); } @Test @@ -326,12 +319,8 @@ public class TransactionMetadataStoreServiceTest extends BrokerTestBase { .getStores().get(TransactionCoordinatorID.get(0)); checkTransactionMetadataStoreReady(transactionMetadataStore); - - Field field = MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap"); - field.setAccessible(true); - ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMap = - (ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>>) field.get(transactionMetadataStore); - Awaitility.await().until(() -> txnMap.size() == 0); + LongAdder onGoingTxtCount = transactionMetadataStore.getOnGoingTxnCount(); + Awaitility.await().until(() -> onGoingTxtCount.intValue() == 0); } diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java index 6bd7a947e38..526fe0fcb7f 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java @@ -32,6 +32,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.atomic.LongAdder; +import lombok.Getter; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.Position; import org.apache.commons.lang3.StringUtils; @@ -72,6 +73,7 @@ public class MLTransactionMetadataStore private final TransactionCoordinatorID tcID; private final MLTransactionLogImpl transactionLog; @VisibleForTesting + @Getter final ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMetaMap = new ConcurrentSkipListMap<>(); private final TransactionTimeoutTracker timeoutTracker; private final TransactionMetadataStoreStats transactionMetadataStoreStats; @@ -80,6 +82,9 @@ public class MLTransactionMetadataStore private final LongAdder abortedTransactionCount; private final LongAdder transactionTimeoutCount; private final LongAdder appendLogCount; + @Getter + @VisibleForTesting + private final LongAdder onGoingTxnCount; private final MLTransactionSequenceIdGenerator sequenceIdGenerator; private final ExecutorService internalPinnedExecutor; public final RecoverTimeRecord recoverTime = new RecoverTimeRecord(); @@ -108,6 +113,7 @@ public class MLTransactionMetadataStore this.abortedTransactionCount = new LongAdder(); this.transactionTimeoutCount = new LongAdder(); this.appendLogCount = new LongAdder(); + this.onGoingTxnCount = new LongAdder(); DefaultThreadFactory threadFactory = new DefaultThreadFactory("transaction_coordinator_" + tcID.toString() + "thread_factory"); this.internalPinnedExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory); @@ -162,6 +168,7 @@ public class MLTransactionMetadataStore final TxnMetaImpl left = new TxnMetaImpl(txnID, openTimestamp, timeoutAt, owner); txnMetaMap.put(transactionId, MutablePair.of(left, positions)); + onGoingTxnCount.increment(); recoverTracker.handleOpenStatusTransaction(txnSequenceId, timeoutAt + openTimestamp); } @@ -197,8 +204,12 @@ public class MLTransactionMetadataStore recoverTracker.updateTransactionStatus(txnID.getLeastSigBits(), newStatus); if (newStatus == TxnStatus.COMMITTED || newStatus == TxnStatus.ABORTED) { transactionLog.deletePosition(txnMetaMap - .get(transactionId).getRight()).thenAccept(v -> - txnMetaMap.remove(transactionId).getLeft()); + .get(transactionId).getRight()).thenAccept(v -> { + if (txnMetaMap.remove(transactionId) != null) { + onGoingTxnCount.decrement(); + } + } + ); } } break; @@ -237,7 +248,7 @@ public class MLTransactionMetadataStore @Override public CompletableFuture<TxnID> newTransaction(long timeOut, String owner) { if (this.maxActiveTransactionsPerCoordinator == 0 - || this.maxActiveTransactionsPerCoordinator > txnMetaMap.size()) { + || this.maxActiveTransactionsPerCoordinator > onGoingTxnCount.longValue()) { CompletableFuture<TxnID> completableFuture = new CompletableFuture<>(); FutureUtil.safeRunAsync(() -> { if (!checkIfReady()) { @@ -276,6 +287,7 @@ public class MLTransactionMetadataStore positions.add(position); Pair<TxnMeta, List<Position>> pair = MutablePair.of(txn, positions); txnMetaMap.put(leastSigBits, pair); + onGoingTxnCount.increment(); this.timeoutTracker.addTransaction(leastSigBits, timeOut); createdTransactionCount.increment(); completableFuture.complete(txnID); @@ -422,7 +434,9 @@ public class MLTransactionMetadataStore } else { abortedTransactionCount.increment(); } - txnMetaMap.remove(txnID.getLeastSigBits()); + if (txnMetaMap.remove(txnID.getLeastSigBits()) != null) { + onGoingTxnCount.decrement(); + } transactionLog.deletePosition(txnMetaListPair.getRight()).exceptionally(ex -> { log.warn("Failed to delete transaction log position " + "at end transaction [{}]", txnID); @@ -466,7 +480,7 @@ public class MLTransactionMetadataStore transactionCoordinatorstats.setLowWaterMark(getLowWaterMark()); transactionCoordinatorstats.setState(getState().name()); transactionCoordinatorstats.setLeastSigBits(sequenceIdGenerator.getCurrentSequenceId()); - transactionCoordinatorstats.ongoingTxnSize = txnMetaMap.size(); + transactionCoordinatorstats.ongoingTxnSize = onGoingTxnCount.longValue(); transactionCoordinatorstats.recoverStartTime = recoverTime.getRecoverStartTime(); transactionCoordinatorstats.recoverEndTime = recoverTime.getRecoverEndTime(); return transactionCoordinatorstats; @@ -490,6 +504,7 @@ public class MLTransactionMetadataStore internalPinnedExecutor.shutdown(); return transactionLog.closeAsync().thenCompose(v -> { txnMetaMap.clear(); + onGoingTxnCount.reset(); this.timeoutTracker.close(); if (!this.changeToCloseState()) { return FutureUtil.failedFuture( @@ -508,7 +523,7 @@ public class MLTransactionMetadataStore @Override public TransactionMetadataStoreStats getMetadataStoreStats() { this.transactionMetadataStoreStats.setCoordinatorId(tcID.getId()); - this.transactionMetadataStoreStats.setActives(txnMetaMap.size()); + this.transactionMetadataStoreStats.setActives(onGoingTxnCount.intValue()); this.transactionMetadataStoreStats.setCreatedCount(this.createdTransactionCount.longValue()); this.transactionMetadataStoreStats.setCommittedCount(this.committedTransactionCount.longValue()); this.transactionMetadataStoreStats.setAbortedCount(this.abortedTransactionCount.longValue()); diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImplTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImplTest.java index 5355256d369..052b2d193b9 100644 --- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImplTest.java +++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImplTest.java @@ -171,7 +171,7 @@ public class MLTransactionLogImplTest extends MockedBookKeeperTestCase { new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLogForRecover, timeoutTracker, sequenceIdGenerator, Integer.MAX_VALUE); transactionMetadataStoreForRecover.init(recoverTracker).get(2000, TimeUnit.SECONDS); - Assert.assertEquals(transactionMetadataStoreForRecover.txnMetaMap.size(), expectedMapping.size()); + Assert.assertEquals(transactionMetadataStoreForRecover.getOnGoingTxnCount().intValue(), expectedMapping.size()); Iterator<Integer> txnIdSet = expectedMapping.keySet().iterator(); while (txnIdSet.hasNext()){ int txnId = txnIdSet.next();
