This is an automated email from the ASF dual-hosted git repository. vpyatkov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 7e3605024b IGNITE-21375 RO transactions should not update txnState local map (#3145) 7e3605024b is described below commit 7e3605024b18cf2099110fbd038409bcbee02ee5 Author: Vladislav Pyatkov <vldpyat...@gmail.com> AuthorDate: Fri Feb 9 12:41:18 2024 +0300 IGNITE-21375 RO transactions should not update txnState local map (#3145) --- .../internal/tx/impl/ReadOnlyTransactionImpl.java | 8 +----- .../ignite/internal/tx/impl/TxManagerImpl.java | 32 +++++++++++++++++----- 2 files changed, 26 insertions(+), 14 deletions(-) diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java index 3d80baba8f..34affae0b7 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.tx.impl; -import static org.apache.ignite.internal.tx.TxState.COMMITTED; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import java.util.UUID; @@ -27,7 +26,6 @@ import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.IgniteBiTuple; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.tx.HybridTimestampTracker; -import org.apache.ignite.internal.tx.TxStateMeta; import org.apache.ignite.network.ClusterNode; /** @@ -118,10 +116,6 @@ class ReadOnlyTransactionImpl extends IgniteAbstractTransactionImpl { observableTsTracker.update(executionTimestamp); - return ((TxManagerImpl) txManager).completeReadOnlyTransactionFuture(new TxIdAndTimestamp(readTimestamp, id())) - .thenRun(() -> txManager.updateTxMeta( - id(), - old -> new TxStateMeta(COMMITTED, old.txCoordinatorId(), old.commitPartitionId(), old.commitTimestamp()) - )); + return ((TxManagerImpl) txManager).completeReadOnlyTransactionFuture(new TxIdAndTimestamp(readTimestamp, id())); } } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java index 757a7eaac5..26d4f1c9ab 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java @@ -52,6 +52,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.LongSupplier; @@ -156,6 +157,18 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { /** Prevents double stopping of the tracker. */ private final AtomicBoolean stopGuard = new AtomicBoolean(); + /** + * Total number of started transaction. + * TODO: IGNITE-21440 Implement transaction metrics. + */ + private final AtomicInteger startedTxs = new AtomicInteger(); + /** + * Total number of fixed transaction. + * TODO: IGNITE-21440 Implement transaction metrics. + */ + private final AtomicInteger fixedTxs = new AtomicInteger(); + + /** Busy lock to stop synchronously. */ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock(); @@ -282,9 +295,12 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { public InternalTransaction begin(HybridTimestampTracker timestampTracker, boolean readOnly, TxPriority priority) { HybridTimestamp beginTimestamp = readOnly ? clock.now() : createBeginTimestampWithIncrementRwTxCounter(); UUID txId = transactionIdGenerator.transactionIdFor(beginTimestamp, priority); - updateTxMeta(txId, old -> new TxStateMeta(PENDING, localNodeId, null, null)); + + startedTxs.incrementAndGet(); if (!readOnly) { + updateTxMeta(txId, old -> new TxStateMeta(PENDING, localNodeId, null, null)); + return new ReadWriteTransactionImpl(this, timestampTracker, txId, localNodeId); } @@ -350,6 +366,8 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { public void finishFull(HybridTimestampTracker timestampTracker, UUID txId, boolean commit) { TxState finalState; + fixedTxs.incrementAndGet(); + if (commit) { timestampTracker.update(clock.now()); @@ -377,6 +395,8 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { ) { LOG.debug("Finish [commit={}, txId={}, groups={}].", commitIntent, txId, enlistedGroups); + fixedTxs.incrementAndGet(); + assert enlistedGroups != null; if (enlistedGroups.isEmpty()) { @@ -637,16 +657,12 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { @Override public int finished() { - return inBusyLock(busyLock, () -> (int) txStateVolatileStorage.states().stream() - .filter(e -> isFinalState(e.txState())) - .count()); + return fixedTxs.get(); } @Override public int pending() { - return inBusyLock(busyLock, () -> (int) txStateVolatileStorage.states().stream() - .filter(e -> e.txState() == PENDING) - .count()); + return startedTxs.get() - fixedTxs.get(); } @Override @@ -718,6 +734,8 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { } CompletableFuture<Void> completeReadOnlyTransactionFuture(TxIdAndTimestamp txIdAndTimestamp) { + fixedTxs.incrementAndGet(); + CompletableFuture<Void> readOnlyTxFuture = readOnlyTxFutureById.remove(txIdAndTimestamp); assert readOnlyTxFuture != null : txIdAndTimestamp;