This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new b856aad93a7 IGNITE-25978 Optimize implicit RO transaction flow
b856aad93a7 is described below

commit b856aad93a789cdbcbacb41b144173d1c012cf14
Author: Alexey Scherbakov <[email protected]>
AuthorDate: Tue Jul 29 17:43:45 2025 +0300

    IGNITE-25978 Optimize implicit RO transaction flow
---
 .../handler/requests/table/ClientTableCommon.java  |  17 +-
 modules/runner/build.gradle                        |  10 -
 .../benchmark/AbstractMultiNodeBenchmark.java      |  11 +-
 .../internal/benchmark/ClientKvBenchmark.java      |  17 +-
 .../internal/benchmark/RemoteKvBenchmark.java      |  51 -----
 .../internal/table/ItReadOnlyTransactionTest.java  |   3 +-
 .../ignite/internal/table/AbstractTableView.java   |   6 +-
 .../internal/table/distributed/TableUtils.java     |   2 +-
 .../replicator/PartitionReplicaListener.java       |  24 ++-
 .../distributed/storage/InternalTableImpl.java     |  24 +--
 .../ignite/internal/table/TxAbstractTest.java      |   4 +-
 .../internal/tx/ItTransactionMetricsTest.java      |   4 +-
 ...l.java => ReadOnlyImplicitTransactionImpl.java} | 133 ++++++-------
 .../internal/tx/impl/ReadOnlyTransactionImpl.java  |   6 +-
 .../internal/tx/impl/ReadWriteTransactionImpl.java |  10 +
 .../ignite/internal/tx/impl/TxManagerImpl.java     | 208 +++++++++++----------
 .../apache/ignite/internal/tx/TxManagerTest.java   |  14 ++
 .../tx/impl/ReadOnlyTransactionImplTest.java       |   1 -
 18 files changed, 261 insertions(+), 284 deletions(-)

diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
index 3732906dc04..1c6630d1135 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
@@ -471,7 +471,7 @@ public class ClientTableCommon {
         if (tx == null) {
             // Implicit transactions do not use an observation timestamp 
because RW never depends on it, and implicit RO is always direct.
             // The direct transaction uses a current timestamp on the primary 
replica by definition.
-            tx = startImplicitTx(readTs, txManager, null, readOnly);
+            tx = startImplicitTx(readTs, txManager, readOnly);
         }
 
         return tx;
@@ -494,23 +494,20 @@ public class ClientTableCommon {
             boolean readOnly,
             InternalTxOptions options
     ) {
-        tsTracker.update(currentTs);
+        if (readOnly) {
+            tsTracker.update(currentTs);
 
-        return txManager.beginExplicit(
-                tsTracker,
-                readOnly,
-                options
-        );
+            return txManager.beginExplicitRo(tsTracker, options);
+        } else {
+            return txManager.beginExplicitRw(tsTracker, options);
+        }
     }
 
     private static InternalTransaction startImplicitTx(
             HybridTimestampTracker tsTracker,
             TxManager txManager,
-            @Nullable HybridTimestamp currentTs,
             boolean readOnly
     ) {
-        tsTracker.update(currentTs);
-
         return txManager.beginImplicit(tsTracker, readOnly);
     }
 
diff --git a/modules/runner/build.gradle b/modules/runner/build.gradle
index e1f20d4fe06..6411441821d 100644
--- a/modules/runner/build.gradle
+++ b/modules/runner/build.gradle
@@ -283,16 +283,6 @@ tasks.register('runClientGetBenchmark', JavaExec) {
     enableAssertions = true
 }
 
-tasks.register('runRemoteBenchmark', JavaExec) {
-    mainClass = 'org.apache.ignite.internal.benchmark.RemoteKvBenchmark'
-
-    jvmArgs += addOpens + ['-Dio.netty.tryReflectionSetAccessible=true', 
'-Xmx16g']
-
-    classpath = sourceSets.integrationTest.runtimeClasspath
-
-    enableAssertions = true
-}
-
 jar {
     manifest {
         attributes(
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
index ee653459bf1..eef5c815306 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
@@ -73,6 +73,9 @@ public class AbstractMultiNodeBenchmark {
     protected static Ignite publicIgnite;
     protected static IgniteImpl igniteImpl;
 
+    @Param({"false"})
+    protected boolean remote;
+
     @Param({"false"})
     private boolean fsync;
 
@@ -87,7 +90,7 @@ public class AbstractMultiNodeBenchmark {
     @Setup
     public void nodeSetUp() throws Exception {
         System.setProperty("jraft.available_processors", "2");
-        if (!remote()) {
+        if (!remote) {
             startCluster();
         }
 
@@ -192,7 +195,7 @@ public class AbstractMultiNodeBenchmark {
     }
 
     private void startCluster() throws Exception {
-        if (remote()) {
+        if (remote) {
             throw new AssertionError("Can't start the cluster in remote mode");
         }
 
@@ -292,8 +295,4 @@ public class AbstractMultiNodeBenchmark {
     protected int replicaCount() {
         return CatalogUtils.DEFAULT_REPLICA_COUNT;
     }
-
-    protected boolean remote() {
-        return false;
-    }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvBenchmark.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvBenchmark.java
index e3fb22ba783..ded0f63f34d 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvBenchmark.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvBenchmark.java
@@ -65,9 +65,6 @@ public abstract class ClientKvBenchmark extends 
AbstractMultiNodeBenchmark {
     @Param({"0"})
     protected int offset; // 1073741824 for second client to ensure unique keys
 
-    @Param({"false"})
-    private boolean fsync;
-
     @Param({"32"})
     private int partitionCount;
 
@@ -90,8 +87,13 @@ public abstract class ClientKvBenchmark extends 
AbstractMultiNodeBenchmark {
 
     @Override
     public void nodeSetUp() throws Exception {
-        
System.setProperty(IgniteSystemProperties.IGNITE_SKIP_REPLICATION_IN_BENCHMARK, 
"false");
-        
System.setProperty(IgniteSystemProperties.IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK,
 "false");
+        if (remote) {
+            client = IgniteClient.builder().addresses(addresses()).build();
+            publicIgnite = client;
+        } else {
+            
System.setProperty(IgniteSystemProperties.IGNITE_SKIP_REPLICATION_IN_BENCHMARK, 
"false");
+            
System.setProperty(IgniteSystemProperties.IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK,
 "false");
+        }
         super.nodeSetUp();
     }
 
@@ -153,11 +155,6 @@ public abstract class ClientKvBenchmark extends 
AbstractMultiNodeBenchmark {
         new Runner(builder.build()).run();
     }
 
-    @Override
-    protected boolean fsync() {
-        return fsync;
-    }
-
     @Override
     protected int nodes() {
         return 1;
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/RemoteKvBenchmark.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/RemoteKvBenchmark.java
deleted file mode 100644
index 8f3cae627d3..00000000000
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/RemoteKvBenchmark.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.benchmark;
-
-import org.apache.ignite.client.IgniteClient;
-import org.openjdk.jmh.runner.RunnerException;
-
-/**
- * Benchmark for a single upsert operation using externally enabled cluster.
- */
-public class RemoteKvBenchmark extends ClientKvBenchmark {
-    @Override
-    protected boolean remote() {
-        return true;
-    }
-
-    @Override
-    protected String[] addresses() {
-        return new String[]{};
-    }
-
-    @Override
-    public void nodeSetUp() throws Exception {
-        client = IgniteClient.builder().addresses(addresses()).build();
-        publicIgnite = client;
-
-        super.nodeSetUp();
-    }
-
-    /**
-     * Benchmark's entry point.
-     */
-    public static void main(String[] args) throws RunnerException {
-        runBenchmark(RemoteKvBenchmark.class, args);
-    }
-}
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReadOnlyTransactionTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReadOnlyTransactionTest.java
index 00501cb5fa7..183b7a6c4b6 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReadOnlyTransactionTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReadOnlyTransactionTest.java
@@ -123,7 +123,8 @@ public class ItReadOnlyTransactionTest extends 
ClusterPerClassIntegrationTest {
             // but we check that the new transaction does not appear.
             assertFalse(txRwStatesAfter > txRwStatesBefore, "RW transaction 
was stated unexpectedly.");
 
-            assertEquals(2, txFinishedAfter - txFinishedBefore, format(
+            // Implicit RO operations are not counted as transactions.
+            assertEquals(0, txFinishedAfter - txFinishedBefore, format(
                     "Unexpected finished transaction quantity [i={}, 
beforeOp={}, afterOp={}]",
                     i,
                     txFinishedBefore,
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
index 916ba0259f5..f67fc3af5f4 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
@@ -21,7 +21,7 @@ import static 
java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.function.Function.identity;
 import static 
org.apache.ignite.internal.lang.IgniteExceptionMapperUtil.convertToPublicFuture;
 import static 
org.apache.ignite.internal.table.criteria.CriteriaExceptionMapperUtil.mapToPublicCriteriaException;
-import static 
org.apache.ignite.internal.table.distributed.TableUtils.isDirectFlowApplicableTx;
+import static 
org.apache.ignite.internal.table.distributed.TableUtils.isDirectFlowApplicable;
 import static org.apache.ignite.internal.util.ExceptionUtils.isOrCausedBy;
 import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
 import static org.apache.ignite.internal.util.ViewUtils.sync;
@@ -139,7 +139,7 @@ abstract class AbstractTableView<R> implements 
CriteriaQuerySource<R> {
             @Nullable Integer previousSchemaVersion,
             KvAction<T> action
     ) {
-        CompletableFuture<Integer> schemaVersionFuture = 
isDirectFlowApplicableTx(tx)
+        CompletableFuture<Integer> schemaVersionFuture = tx == null
                 ? schemaVersions.schemaVersionAtCurrentTime(tbl.tableId())
                 : schemaVersions.schemaVersionAt(tx.schemaTimestamp(), 
tbl.tableId());
 
@@ -157,7 +157,7 @@ abstract class AbstractTableView<R> implements 
CriteriaQuerySource<R> {
                                 // version corresponding to the transaction 
creation moment, so this mismatch is not tolerable: we need
                                 // to retry the operation here.
 
-                                assert isDirectFlowApplicableTx(tx) : "Only 
for direct flow applicable tx a retry might be requested";
+                                assert isDirectFlowApplicable(tx) : "Only for 
direct flow applicable tx a retry might be requested";
                                 
assertSchemaVersionIncreased(previousSchemaVersion, schemaVersion);
 
                                 // Repeat.
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableUtils.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableUtils.java
index 6e4502a3d7f..8f3cd2d2b06 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableUtils.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableUtils.java
@@ -114,7 +114,7 @@ public class TableUtils {
      * @param tx Transaction of {@code null}.
      * @return True of direct flow is applicable for the transaction.
      */
-    public static boolean isDirectFlowApplicableTx(@Nullable 
InternalTransaction tx) {
+    public static boolean isDirectFlowApplicable(@Nullable InternalTransaction 
tx) {
         // TODO https://issues.apache.org/jira/browse/IGNITE-24218 Remove this 
method ot use tx == null || tx.direct() instead.
         return tx == null || (tx.implicit() && tx.isReadOnly());
     }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 8db22fcc6eb..b08c54b7548 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -56,6 +56,7 @@ import static 
org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Replicator.CURSOR_CLOSE_ERR;
 import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_ERR;
 import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_WITH_TIMEOUT_ERR;
+import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_READ_ONLY_TOO_OLD_ERR;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -2019,20 +2020,29 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
             ReadOnlyDirectMultiRowReplicaRequest request,
             HybridTimestamp opStartTimestamp) {
         List<BinaryTuple> primaryKeys = resolvePks(request.primaryKeys());
-        HybridTimestamp readTimestamp = opStartTimestamp;
 
-        if (request.requestType() != RO_GET_ALL) {
-            throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
-                    format("Unknown single request [actionType={}]", 
request.requestType()));
-        }
+        assert request.requestType() == RO_GET_ALL;
 
         CompletableFuture<BinaryRow>[] resolutionFuts = new 
CompletableFuture[primaryKeys.size()];
 
         for (int i = 0; i < primaryKeys.size(); i++) {
-            resolutionFuts[i] = resolveRowByPkForReadOnly(primaryKeys.get(i), 
readTimestamp);
+            resolutionFuts[i] = resolveRowByPkForReadOnly(primaryKeys.get(i), 
opStartTimestamp);
         }
 
-        return allOfToList(resolutionFuts);
+        return allOfToList(resolutionFuts).thenApply(rows -> {
+            // Validate read correctness.
+            HybridTimestamp lwm = lowWatermark.getLowWatermark();
+
+            if (lwm != null && opStartTimestamp.compareTo(lwm) < 0) {
+                throw new IgniteInternalException(
+                        TX_READ_ONLY_TOO_OLD_ERR,
+                        "Attempted to read data below the garbage collection 
watermark: [readTimestamp={}, gcTimestamp={}]",
+                        opStartTimestamp,
+                        lowWatermark.getLowWatermark());
+            }
+
+            return rows;
+        });
     }
 
     /**
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 980f6211799..7a42adea027 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -42,7 +42,7 @@ import static 
org.apache.ignite.internal.partition.replicator.network.replicatio
 import static 
org.apache.ignite.internal.partition.replicator.network.replication.RequestType.RW_UPSERT;
 import static 
org.apache.ignite.internal.partition.replicator.network.replication.RequestType.RW_UPSERT_ALL;
 import static 
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toReplicationGroupIdMessage;
-import static 
org.apache.ignite.internal.table.distributed.TableUtils.isDirectFlowApplicableTx;
+import static 
org.apache.ignite.internal.table.distributed.TableUtils.isDirectFlowApplicable;
 import static 
org.apache.ignite.internal.table.distributed.storage.RowBatch.allResultFutures;
 import static 
org.apache.ignite.internal.util.CompletableFutures.completedOrFailedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.emptyListCompletedFuture;
@@ -760,7 +760,7 @@ public class InternalTableImpl implements InternalTable {
      * @param <T> Operation return type.
      * @return The future.
      */
-    private <T> CompletableFuture<T> postEnlist(
+    private static <T> CompletableFuture<T> postEnlist(
             CompletableFuture<T> fut, boolean autoCommit, InternalTransaction 
tx0, boolean full
     ) {
         assert !(autoCommit && full) : "Invalid combination of flags";
@@ -796,7 +796,7 @@ public class InternalTableImpl implements InternalTable {
     }
 
     /**
-     * Evaluates the single-row request to the cluster for a read-only 
single-partition transaction.
+     * Evaluates the single-row request to the cluster for a read-only 
single-partition implicit transaction.
      *
      * @param tx Transaction or {@code null} if it is not started yet.
      * @param row Binary row.
@@ -819,20 +819,18 @@ public class InternalTableImpl implements InternalTable {
     }
 
     /**
-     * Evaluates the multi-row request to the cluster for a read-only 
single-partition transaction.
+     * Evaluates the multi-row request to the cluster for a read-only 
single-partition implicit transaction.
      *
-     * @param tx Transaction or {@code null} if it is not started yet.
      * @param rows Rows.
      * @param op Replica requests factory.
      * @param <R> Result type.
      * @return The future.
      */
     private <R> CompletableFuture<R> evaluateReadOnlyPrimaryNode(
-            @Nullable InternalTransaction tx,
             Collection<BinaryRowEx> rows,
             BiFunction<ReplicationGroupId, Long, ReplicaRequest> op
     ) {
-        InternalTransaction actualTx = startImplicitRoTxIfNeeded(tx);
+        InternalTransaction actualTx = 
txManager.beginImplicitRo(observableTimestampTracker);
 
         int partId = partitionId(rows.iterator().next());
 
@@ -902,12 +900,13 @@ public class InternalTableImpl implements InternalTable {
                 return tx.finish(false, clockService.current(), false, false)
                         .handle((ignored, err) -> {
                             if (err != null) {
+                                // Preserve failed state.
                                 e.addSuppressed(err);
                             }
 
                             sneakyThrow(e);
                             return null;
-                        }); // Preserve failed state.
+                        });
             }
 
             return tx.finish(true, clockService.current(), false, 
false).thenApply(ignored -> r);
@@ -919,7 +918,7 @@ public class InternalTableImpl implements InternalTable {
     public CompletableFuture<BinaryRow> get(BinaryRowEx keyRow, @Nullable 
InternalTransaction tx) {
         checkTransactionFinishStarted(tx);
 
-        if (isDirectFlowApplicableTx(tx)) {
+        if (isDirectFlowApplicable(tx)) {
             return evaluateReadOnlyPrimaryNode(
                     tx,
                     keyRow,
@@ -1014,9 +1013,8 @@ public class InternalTableImpl implements InternalTable {
             return emptyListCompletedFuture();
         }
 
-        if (isDirectFlowApplicableTx(tx) && isSinglePartitionBatch(keyRows)) {
+        if (tx == null && isSinglePartitionBatch(keyRows)) {
             return evaluateReadOnlyPrimaryNode(
-                    tx,
                     keyRows,
                     (groupId, consistencyToken) -> 
TABLE_MESSAGES_FACTORY.readOnlyDirectMultiRowReplicaRequest()
                             .groupId(serializeReplicationGroupId(groupId))
@@ -1030,6 +1028,8 @@ public class InternalTableImpl implements InternalTable {
         }
 
         if (tx != null && tx.isReadOnly()) {
+            assert !tx.implicit() : "implicit RO getAll not supported";
+
             BinaryRowEx firstRow = keyRows.iterator().next();
 
             return evaluateReadOnlyRecipientNode(partitionId(firstRow), 
tx.readTimestamp())
@@ -2382,7 +2382,7 @@ public class InternalTableImpl implements InternalTable {
         return replicaMeta.getStartTime().longValue();
     }
 
-    private void checkTransactionFinishStarted(@Nullable InternalTransaction 
transaction) {
+    private static void checkTransactionFinishStarted(@Nullable 
InternalTransaction transaction) {
         if (transaction != null && transaction.isFinishingOrFinished()) {
             boolean isFinishedDueToTimeout = 
transaction.isRolledBackWithTimeoutExceeded();
             throw new TransactionException(
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
index 8743aa60df0..58f3da7f5c4 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
@@ -458,7 +458,7 @@ public abstract class TxAbstractTest extends 
TxInfrastructureTest {
         assertEquals(BALANCE_1 - DELTA, view.get(null, 
makeKey(1)).doubleValue("balance"));
         assertEquals(BALANCE_2 + DELTA, view.get(null, 
makeKey(2)).doubleValue("balance"));
 
-        assertEquals(5, clientTxManager().finished());
+        assertEquals(3, clientTxManager().finished());
         assertEquals(0, clientTxManager().pending());
     }
 
@@ -483,7 +483,7 @@ public abstract class TxAbstractTest extends 
TxInfrastructureTest {
         assertEquals(BALANCE_1 - DELTA, accounts.recordView().get(null, 
makeKey(1)).doubleValue("balance"));
         assertEquals(BALANCE_2 + DELTA, accounts.recordView().get(null, 
makeKey(2)).doubleValue("balance"));
 
-        assertEquals(5, clientTxManager().finished());
+        assertEquals(3, clientTxManager().finished());
         assertEquals(0, clientTxManager().pending());
     }
 
diff --git 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTransactionMetricsTest.java
 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTransactionMetricsTest.java
index 82f90c45a02..a5923f777e1 100644
--- 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTransactionMetricsTest.java
+++ 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTransactionMetricsTest.java
@@ -157,8 +157,8 @@ public class ItTransactionMetricsTest extends 
ClusterPerClassIntegrationTest {
         // Check that all transaction metrics ere not changed except 
TotalCommits and RoCommits.
         testMetricValues(metrics0, actualMetrics0, "TotalCommits", 
"RoCommits");
 
-        assertThat(actualMetrics0.get("TotalCommits"), 
is(metrics0.get("TotalCommits") + 1));
-        assertThat(actualMetrics0.get("RoCommits"), 
is(metrics0.get("RoCommits") + 1));
+        assertThat(actualMetrics0.get("TotalCommits"), 
is(metrics0.get("TotalCommits") + (implicit ? 0 : 1)));
+        assertThat(actualMetrics0.get("RoCommits"), 
is(metrics0.get("RoCommits") + (implicit ? 0 : 1)));
     }
 
     /**
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyImplicitTransactionImpl.java
similarity index 51%
copy from 
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
copy to 
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyImplicitTransactionImpl.java
index cd3705215a5..0833cd3be94 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyImplicitTransactionImpl.java
@@ -18,56 +18,78 @@
 package org.apache.ignite.internal.tx.impl;
 
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
-import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_COMMIT_ERR;
-import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_ROLLBACK_ERR;
 
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.hlc.HybridTimestampTracker;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
+import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.tx.TransactionException;
+import org.jetbrains.annotations.Nullable;
 
 /**
- * The read-only implementation of an internal transaction.
+ * The special lightweight implementation for read-only implicit transaction.
  */
-public class ReadOnlyTransactionImpl extends IgniteAbstractTransactionImpl {
-    /** The read timestamp. */
-    private final HybridTimestamp readTimestamp;
+public class ReadOnlyImplicitTransactionImpl implements InternalTransaction {
+    private static final UUID FAKE_ID = new UUID(0, 0);
 
-    /** Prevents double finish of the transaction. */
-    private final AtomicBoolean finishGuard = new AtomicBoolean();
+    private final HybridTimestampTracker observableTsTracker;
 
-    /** Transaction future. */
-    private final CompletableFuture<Void> txFuture;
+    private final HybridTimestamp createTs;
 
     /**
      * The constructor.
      *
-     * @param txManager The tx manager.
      * @param observableTsTracker Observable timestamp tracker.
-     * @param id The id.
-     * @param txCoordinatorId Transaction coordinator inconsistent ID.
-     * @param implicit True for an implicit transaction, false for an ordinary 
one.
-     * @param timeout The timeout.
-     * @param readTimestamp The read timestamp.
+     * @param createTs Create timestamp.
      */
-    ReadOnlyTransactionImpl(
-            TxManagerImpl txManager,
-            HybridTimestampTracker observableTsTracker,
-            UUID id,
-            UUID txCoordinatorId,
-            boolean implicit,
-            long timeout,
-            HybridTimestamp readTimestamp,
-            CompletableFuture<Void> txFuture
-    ) {
-        super(txManager, observableTsTracker, id, txCoordinatorId, implicit, 
timeout);
+    ReadOnlyImplicitTransactionImpl(HybridTimestampTracker 
observableTsTracker, HybridTimestamp createTs) {
+        this.observableTsTracker = observableTsTracker;
+        this.createTs = createTs;
+    }
+
+    @Override
+    public UUID id() {
+        return FAKE_ID;
+    }
+
+    @Override
+    public TxState state() {
+        return null;
+    }
+
+    @Override
+    public UUID coordinatorId() {
+        return null;
+    }
+
+    @Override
+    public boolean implicit() {
+        return true;
+    }
+
+    @Override
+    public boolean remote() {
+        return false;
+    }
+
+    @Override
+    public long getTimeout() {
+        return 0;
+    }
 
-        this.readTimestamp = readTimestamp;
-        this.txFuture = txFuture;
+    @Override
+    public boolean isRolledBackWithTimeoutExceeded() {
+        return false;
+    }
+
+    @Override
+    public void processDelayedAck(Object val, @Nullable Throwable err) {
+        // No-op.
     }
 
     @Override
@@ -77,12 +99,12 @@ public class ReadOnlyTransactionImpl extends 
IgniteAbstractTransactionImpl {
 
     @Override
     public HybridTimestamp readTimestamp() {
-        return readTimestamp;
+        return null;
     }
 
     @Override
     public HybridTimestamp schemaTimestamp() {
-        return readTimestamp;
+        return createTs;
     }
 
     @Override
@@ -112,26 +134,17 @@ public class ReadOnlyTransactionImpl extends 
IgniteAbstractTransactionImpl {
 
     @Override
     public CompletableFuture<Void> commitAsync() {
-        return TransactionsExceptionMapperUtil.convertToPublicFuture(
-                finish(true, readTimestamp, false, false),
-                TX_COMMIT_ERR
-        );
+        return nullCompletedFuture();
     }
 
     @Override
     public CompletableFuture<Void> rollbackAsync() {
-        return TransactionsExceptionMapperUtil.convertToPublicFuture(
-                finish(false, readTimestamp, false, false),
-                TX_ROLLBACK_ERR
-        );
+        return nullCompletedFuture();
     }
 
     @Override
     public CompletableFuture<Void> rollbackTimeoutExceededAsync() {
-        return TransactionsExceptionMapperUtil.convertToPublicFuture(
-                finish(false, readTimestamp, false, true),
-                TX_ROLLBACK_ERR
-        );
+        return nullCompletedFuture();
     }
 
     @Override
@@ -141,34 +154,28 @@ public class ReadOnlyTransactionImpl extends 
IgniteAbstractTransactionImpl {
             boolean full,
             boolean timeoutExceeded
     ) {
-        assert !full : "Read-only transactions cannot be full.";
-        assert !(commitIntent && timeoutExceeded) : "Transaction cannot commit 
with timeout exceeded.";
-
-        if (!finishGuard.compareAndSet(false, true)) {
-            return nullCompletedFuture();
-        }
-
         observableTsTracker.update(executionTimestamp);
 
-        txFuture.complete(null);
-
-        ((TxManagerImpl) txManager).completeReadOnlyTransactionFuture(
-                commitIntent,
-                new TxIdAndTimestamp(readTimestamp, id()),
-                timeoutExceeded);
-
-        this.timeoutExceeded = timeoutExceeded;
-
-        return txFuture;
+        return nullCompletedFuture();
     }
 
     @Override
     public boolean isFinishingOrFinished() {
-        return finishGuard.get();
+        return false;
     }
 
     @Override
     public CompletableFuture<Void> kill() {
-        return finish(false, readTimestamp, false, false);
+        return nullCompletedFuture();
+    }
+
+    @Override
+    public void commit() throws TransactionException {
+        // No-op.
+    }
+
+    @Override
+    public void rollback() throws TransactionException {
+        // No-op.
     }
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
index cd3705215a5..44e2c014a78 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
@@ -50,7 +50,6 @@ public class ReadOnlyTransactionImpl extends 
IgniteAbstractTransactionImpl {
      * @param observableTsTracker Observable timestamp tracker.
      * @param id The id.
      * @param txCoordinatorId Transaction coordinator inconsistent ID.
-     * @param implicit True for an implicit transaction, false for an ordinary 
one.
      * @param timeout The timeout.
      * @param readTimestamp The read timestamp.
      */
@@ -59,12 +58,11 @@ public class ReadOnlyTransactionImpl extends 
IgniteAbstractTransactionImpl {
             HybridTimestampTracker observableTsTracker,
             UUID id,
             UUID txCoordinatorId,
-            boolean implicit,
             long timeout,
             HybridTimestamp readTimestamp,
             CompletableFuture<Void> txFuture
     ) {
-        super(txManager, observableTsTracker, id, txCoordinatorId, implicit, 
timeout);
+        super(txManager, observableTsTracker, id, txCoordinatorId, false, 
timeout);
 
         this.readTimestamp = readTimestamp;
         this.txFuture = txFuture;
@@ -152,7 +150,7 @@ public class ReadOnlyTransactionImpl extends 
IgniteAbstractTransactionImpl {
 
         txFuture.complete(null);
 
-        ((TxManagerImpl) txManager).completeReadOnlyTransactionFuture(
+        ((TxManagerImpl) txManager).onCompleteReadOnlyTransaction(
                 commitIntent,
                 new TxIdAndTimestamp(readTimestamp, id()),
                 timeoutExceeded);
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
index 1b9ef100004..20b77239fa3 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
@@ -302,4 +302,14 @@ public class ReadWriteTransactionImpl extends 
IgniteAbstractTransactionImpl {
     public CompletableFuture<Void> kill() {
         return finishInternal(false, null, false, false, false);
     }
+
+    /**
+     * Fail the transaction with exception so finishing it is not possible.
+     *
+     * @param e Fail reason.
+     */
+    public void fail(TransactionException e) {
+        // Thread safety is not needed.
+        finishFuture = failedFuture(e);
+    }
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 40a60feaecd..91ad9d5dc63 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -37,17 +37,17 @@ import static org.apache.ignite.internal.tx.TxState.PENDING;
 import static org.apache.ignite.internal.tx.TxState.isFinalState;
 import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
-import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
-import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
 import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
 import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_READ_ONLY_TOO_OLD_ERR;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -119,8 +119,9 @@ import 
org.apache.ignite.internal.tx.views.LocksViewProvider;
 import org.apache.ignite.internal.tx.views.TransactionsViewProvider;
 import org.apache.ignite.internal.util.CompletableFutures;
 import org.apache.ignite.internal.util.ExceptionUtils;
-import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.lang.ErrorGroups.Common;
 import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.tx.TransactionException;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
@@ -184,9 +185,6 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler, SystemVi
     /** Prevents double stopping of the tracker. */
     private final AtomicBoolean stopGuard = new AtomicBoolean();
 
-    /** Busy lock to stop synchronously. */
-    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
-
     /** Detector of transactions that lost the coordinator. */
     private final OrphanDetector orphanDetector;
 
@@ -241,6 +239,10 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler, SystemVi
 
     private final TransactionMetricsSource txMetrics;
 
+    private volatile boolean isStopping;
+
+    private final ConcurrentLinkedQueue<CompletableFuture<?>> stopFuts = new 
ConcurrentLinkedQueue<>();
+
     /**
      * Test-only constructor.
      *
@@ -404,18 +406,16 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler, SystemVi
             PrimaryReplicaEventParameters eventParameters,
             Consumer<ReplicationGroupId> action
     ) {
-        return inBusyLock(busyLock, () -> {
-            assertReplicationGroupType(eventParameters.groupId());
+        assertReplicationGroupType(eventParameters.groupId());
 
-            // TODO: https://issues.apache.org/jira/browse/IGNITE-22522 - 
remove check for TablePartitionId.
-            if (!(eventParameters.groupId() instanceof TablePartitionId) && 
!(eventParameters.groupId() instanceof ZonePartitionId)) {
-                return falseCompletedFuture();
-            }
+        // TODO: https://issues.apache.org/jira/browse/IGNITE-22522 - remove 
check for TablePartitionId.
+        if (!(eventParameters.groupId() instanceof TablePartitionId) && 
!(eventParameters.groupId() instanceof ZonePartitionId)) {
+            return falseCompletedFuture();
+        }
 
-            action.accept(eventParameters.groupId());
+        action.accept(eventParameters.groupId());
 
-            return falseCompletedFuture();
-        });
+        return falseCompletedFuture();
     }
 
     private CompletableFuture<Boolean> 
primaryReplicaElectedListener(PrimaryReplicaEventParameters eventParameters) {
@@ -434,41 +434,32 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler, SystemVi
 
     @Override
     public InternalTransaction beginImplicit(HybridTimestampTracker 
timestampTracker, boolean readOnly) {
-        return begin(timestampTracker, true, readOnly, 
InternalTxOptions.defaults());
-    }
+        if (readOnly) {
+            return new ReadOnlyImplicitTransactionImpl(timestampTracker, 
clockService.current());
+        }
 
-    @Override
-    public InternalTransaction beginExplicit(HybridTimestampTracker 
timestampTracker, boolean readOnly, InternalTxOptions txOptions) {
-        return begin(timestampTracker, false, readOnly, txOptions);
-    }
+        HybridTimestamp beginTimestamp = 
createBeginTimestampWithIncrementRwTxCounter();
+        var tx = beginReadWriteTransaction(timestampTracker, beginTimestamp, 
true, InternalTxOptions.defaults());
 
-    private InternalTransaction begin(
-            HybridTimestampTracker timestampTracker,
-            boolean implicit,
-            boolean readOnly,
-            InternalTxOptions options
-    ) {
-        return inBusyLock(busyLock, () -> beginBusy(timestampTracker, 
implicit, readOnly, options));
+        txStateVolatileStorage.initialize(tx);
+        txMetrics.onTransactionStarted();
+
+        return tx;
     }
 
-    private InternalTransaction beginBusy(
-            HybridTimestampTracker timestampTracker,
-            boolean implicit,
-            boolean readOnly,
-            InternalTxOptions options
-    ) {
+    @Override
+    public InternalTransaction beginExplicit(HybridTimestampTracker 
timestampTracker, boolean readOnly, InternalTxOptions txOptions) {
         InternalTransaction tx;
 
         if (readOnly) {
-            HybridTimestamp beginTimestamp = clockService.now();
-            tx = beginReadOnlyTransaction(timestampTracker, beginTimestamp, 
implicit, options);
+            HybridTimestamp beginTimestamp = clockService.now(); // Tick to 
generate new unique id.
+            tx = beginReadOnlyTransaction(timestampTracker, beginTimestamp, 
txOptions);
         } else {
             HybridTimestamp beginTimestamp = 
createBeginTimestampWithIncrementRwTxCounter();
-            tx = beginReadWriteTransaction(timestampTracker, beginTimestamp, 
implicit, options);
+            tx = beginReadWriteTransaction(timestampTracker, beginTimestamp, 
false, txOptions);
         }
 
         txStateVolatileStorage.initialize(tx);
-
         txMetrics.onTransactionStarted();
 
         return tx;
@@ -497,10 +488,13 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler, SystemVi
         // Implicit transactions are finished as soon as their operation/query 
is finished, they cannot be abandoned, so there is
         // no need to register them.
         // TODO: https://issues.apache.org/jira/browse/IGNITE-24229 - schedule 
expiration for multi-key implicit transactions?
-        boolean scheduleExpiration = !implicit;
-
-        if (scheduleExpiration) {
+        if (!implicit) {
             transactionExpirationRegistry.register(transaction);
+
+            if (isStopping) {
+                transaction.fail(new 
TransactionException(Common.NODE_STOPPING_ERR,
+                        "Failed to finish the transaction because a node is 
stopping: [txId=" + txId + ']'));
+            }
         }
 
         return transaction;
@@ -513,7 +507,6 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler, SystemVi
     private ReadOnlyTransactionImpl beginReadOnlyTransaction(
             HybridTimestampTracker timestampTracker,
             HybridTimestamp beginTimestamp,
-            boolean implicit,
             InternalTxOptions options
     ) {
         UUID txId = transactionIdGenerator.transactionIdFor(beginTimestamp, 
options.priority());
@@ -532,7 +525,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler, SystemVi
         if (!lockAcquired) {
             throw new IgniteInternalException(
                     TX_READ_ONLY_TOO_OLD_ERR,
-                    "Timestamp of read-only transaction must be greater than 
the low watermark: [txTimestamp={}, lowWatermark={}]",
+                    "Attempted to read data below the garbage collection 
watermark: [readTimestamp={}, gcTimestamp={}]",
                     readTimestamp,
                     lowWatermark.getLowWatermark());
         }
@@ -543,25 +536,15 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler, SystemVi
             long timeout = getTimeoutOrDefault(options, 
txConfig.readOnlyTimeoutMillis().value());
 
             var transaction = new ReadOnlyTransactionImpl(
-                    this, timestampTracker, txId, localNodeId, implicit, 
timeout, readTimestamp, txFuture
+                    this, timestampTracker, txId, localNodeId, timeout, 
readTimestamp, txFuture
             );
 
-            // Implicit transactions are finished as soon as their 
operation/query is finished, they cannot be abandoned, so there is
-            // no need to register them.
-            // TODO: https://issues.apache.org/jira/browse/IGNITE-24229 - 
schedule expiration for multi-key implicit transactions?
-            boolean scheduleExpiration = !implicit;
-
-            if (scheduleExpiration) {
-                transactionExpirationRegistry.register(transaction);
-            }
+            transactionExpirationRegistry.register(transaction);
 
             txFuture.whenComplete((unused, throwable) -> {
                 lowWatermark.unlock(txId);
 
-                // We only register explicit transactions, so we only 
unregister them as well.
-                if (scheduleExpiration) {
-                    transactionExpirationRegistry.unregister(transaction);
-                }
+                transactionExpirationRegistry.unregister(transaction);
             });
 
             return transaction;
@@ -578,9 +561,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler, SystemVi
      * @return Current read timestamp.
      */
     private HybridTimestamp currentReadTimestamp(HybridTimestamp beginTx) {
-        return beginTx.subtractPhysicalTime(
-                idleSafeTimePropagationPeriodMsSupplier.getAsLong() + 
clockService.maxClockSkewMillis()
-        );
+        return 
beginTx.subtractPhysicalTime(idleSafeTimePropagationPeriodMsSupplier.getAsLong()
 + clockService.maxClockSkewMillis());
     }
 
     @Override
@@ -780,6 +761,19 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler, SystemVi
                 .thenCompose(r -> verificationFuture);
     }
 
+    private <T> CompletableFuture<T> trackFuture(CompletableFuture<T> fut) {
+        if (fut.isDone()) {
+            return fut;
+        }
+
+        if (isStopping) {
+            // Track finish future.
+            stopFuts.add(fut);
+        }
+
+        return fut;
+    }
+
     /**
      * Durable finish request.
      */
@@ -792,7 +786,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler, SystemVi
             HybridTimestamp commitTimestamp,
             CompletableFuture<TransactionMeta> txFinishFuture
     ) {
-        return inBusyLockAsync(busyLock, () -> 
placementDriverHelper.awaitPrimaryReplicaWithExceptionHandling(commitPartition)
+        return 
trackFuture(placementDriverHelper.awaitPrimaryReplicaWithExceptionHandling(commitPartition)
                 .thenCompose(meta ->
                         sendFinishRequest(
                                 observableTimestampTracker,
@@ -993,49 +987,47 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler, SystemVi
 
     @Override
     public CompletableFuture<Void> startAsync(ComponentContext 
componentContext) {
-        return inBusyLockAsync(busyLock, () -> {
-            var deadlockPreventionPolicy = new 
DeadlockPreventionPolicyImpl(DEFAULT_TX_ID_COMPARATOR, DEFAULT_LOCK_TIMEOUT);
+        var deadlockPreventionPolicy = new 
DeadlockPreventionPolicyImpl(DEFAULT_TX_ID_COMPARATOR, DEFAULT_LOCK_TIMEOUT);
 
-            // TODO https://issues.apache.org/jira/browse/IGNITE-23539
-            lockManager.start(deadlockPreventionPolicy);
+        // TODO https://issues.apache.org/jira/browse/IGNITE-23539
+        lockManager.start(deadlockPreventionPolicy);
 
-            localNodeId = topologyService.localMember().id();
+        localNodeId = topologyService.localMember().id();
 
-            messagingService.addMessageHandler(ReplicaMessageGroup.class, 
this);
+        messagingService.addMessageHandler(ReplicaMessageGroup.class, this);
 
-            persistentTxStateVacuumizer = new PersistentTxStateVacuumizer(
-                    replicaService,
-                    topologyService.localMember(),
-                    clockService,
-                    placementDriver,
-                    failureProcessor
-            );
+        persistentTxStateVacuumizer = new PersistentTxStateVacuumizer(
+                replicaService,
+                topologyService.localMember(),
+                clockService,
+                placementDriver,
+                failureProcessor
+        );
 
-            txStateVolatileStorage.start();
+        txStateVolatileStorage.start();
 
-            txViewProvider.init(localNodeId, 
txStateVolatileStorage.statesMap());
+        txViewProvider.init(localNodeId, txStateVolatileStorage.statesMap());
 
-            orphanDetector.start(txStateVolatileStorage,
-                    () -> longProperty(systemCfg, ABANDONED_CHECK_TS_PROP, 
ABANDONED_CHECK_TS_PROP_DEFAULT_VALUE));
+        orphanDetector.start(txStateVolatileStorage,
+                () -> longProperty(systemCfg, ABANDONED_CHECK_TS_PROP, 
ABANDONED_CHECK_TS_PROP_DEFAULT_VALUE));
 
-            txCleanupRequestSender.start();
+        txCleanupRequestSender.start();
 
-            txCleanupRequestHandler.start();
+        txCleanupRequestHandler.start();
 
-            
placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, 
primaryReplicaExpiredListener);
+        placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, 
primaryReplicaExpiredListener);
 
-            
placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, 
primaryReplicaElectedListener);
+        placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, 
primaryReplicaElectedListener);
 
-            transactionExpirationJobFuture = 
commonScheduler.scheduleAtFixedRate(this::expireTransactionsUpToNow,
-                    EXPIRE_FREQ_MILLIS, EXPIRE_FREQ_MILLIS, MILLISECONDS);
+        transactionExpirationJobFuture = 
commonScheduler.scheduleAtFixedRate(this::expireTransactionsUpToNow,
+                EXPIRE_FREQ_MILLIS, EXPIRE_FREQ_MILLIS, MILLISECONDS);
 
-            lockRetryCount = toIntExact(longProperty(systemCfg, 
LOCK_RETRY_COUNT_PROP, LOCK_RETRY_COUNT_PROP_DEFAULT_VALUE));
+        lockRetryCount = toIntExact(longProperty(systemCfg, 
LOCK_RETRY_COUNT_PROP, LOCK_RETRY_COUNT_PROP_DEFAULT_VALUE));
 
-            metricsManager.registerSource(txMetrics);
-            metricsManager.enable(txMetrics);
+        metricsManager.registerSource(txMetrics);
+        metricsManager.enable(txMetrics);
 
-            return nullCompletedFuture();
-        });
+        return nullCompletedFuture();
     }
 
     private void expireTransactionsUpToNow() {
@@ -1051,6 +1043,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler, SystemVi
 
     @Override
     public void beforeNodeStop() {
+        isStopping = true;
         orphanDetector.stop();
     }
 
@@ -1060,26 +1053,39 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler, SystemVi
             return nullCompletedFuture();
         }
 
-        busyLock.block();
+        // Wait all pending finish futures.
+        List<CompletableFuture<?>> toWait = new ArrayList<>();
+
+        for (CompletableFuture<?> stopFut : stopFuts) {
+            if (!stopFut.isDone()) {
+                toWait.add(stopFut);
+            }
+        }
+
+        stopFuts.clear();
 
-        txStateVolatileStorage.stop();
+        LOG.debug("Waiting for tx finish futures on shutdown [cnt=" + 
toWait.size() + ']');
 
-        txCleanupRequestHandler.stop();
+        return CompletableFutures.allOf(toWait).handle((r, e) -> {
+            txStateVolatileStorage.stop();
 
-        
placementDriver.removeListener(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, 
primaryReplicaExpiredListener);
+            txCleanupRequestHandler.stop();
 
-        
placementDriver.removeListener(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, 
primaryReplicaElectedListener);
+            
placementDriver.removeListener(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, 
primaryReplicaExpiredListener);
 
-        ScheduledFuture<?> expirationJobFuture = 
transactionExpirationJobFuture;
-        if (expirationJobFuture != null) {
-            expirationJobFuture.cancel(false);
-        }
+            
placementDriver.removeListener(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, 
primaryReplicaElectedListener);
+
+            ScheduledFuture<?> expirationJobFuture = 
transactionExpirationJobFuture;
+            if (expirationJobFuture != null) {
+                expirationJobFuture.cancel(false);
+            }
 
-        transactionExpirationRegistry.abortAllRegistered();
+            transactionExpirationRegistry.abortAllRegistered();
 
-        shutdownAndAwaitTermination(writeIntentSwitchPool, 10, 
TimeUnit.SECONDS);
+            shutdownAndAwaitTermination(writeIntentSwitchPool, 10, 
TimeUnit.SECONDS);
 
-        return nullCompletedFuture();
+            return null;
+        });
     }
 
     @Override
@@ -1163,7 +1169,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler, SystemVi
         return runAsync(runnable, writeIntentSwitchPool);
     }
 
-    void completeReadOnlyTransactionFuture(boolean commitIntent, 
TxIdAndTimestamp txIdAndTimestamp, boolean timeoutExceeded) {
+    void onCompleteReadOnlyTransaction(boolean commitIntent, TxIdAndTimestamp 
txIdAndTimestamp, boolean timeoutExceeded) {
         UUID txId = txIdAndTimestamp.getTxId();
 
         txMetrics.onReadOnlyTransactionFinished(txId, commitIntent);
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
index 44bdbe3a6a8..48d339d57bc 100644
--- 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
+++ 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
@@ -244,12 +244,26 @@ public class TxManagerTest extends IgniteAbstractTest {
 
         ReplicationGroupId partitionIdForEnlistment = 
targetReplicationGroupId(1, 0);
 
+        tx.assignCommitPartition(partitionIdForEnlistment);
         tx.enlist(partitionIdForEnlistment, 10, REMOTE_NODE.name(), 1L);
 
         PendingTxPartitionEnlistment actual = 
tx.enlistedPartition(partitionIdForEnlistment);
         assertEquals(REMOTE_NODE.name(), actual.primaryNodeConsistentId());
         assertEquals(1L, actual.consistencyToken());
         assertEquals(Set.of(10), actual.tableIds());
+
+        // Avoid NPE on finish.
+        var meta = mock(ReplicaMeta.class);
+        when(meta.getStartTime()).thenReturn(clock.current());
+        when(meta.getExpirationTime()).thenReturn(clock.current());
+        when(meta.getLeaseholder()).thenReturn("test");
+        when(meta.getLeaseholderId()).thenReturn(randomUUID());
+
+        when(placementDriver.awaitPrimaryReplica(any(), any(), anyLong(), 
any())).thenReturn(completedFuture(meta));
+
+        HybridTimestamp commitTimestamp = clockService.now();
+        when(replicaService.invoke(anyString(), 
any(TxFinishReplicaRequest.class)))
+                .thenReturn(completedFuture(new 
TransactionResult(TxState.COMMITTED, commitTimestamp)));
     }
 
     // TODO: IGNITE-22522 - inline this after switching to ZonePartitionId.
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImplTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImplTest.java
index bda33912133..4ce5d3a18a6 100644
--- 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImplTest.java
+++ 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImplTest.java
@@ -47,7 +47,6 @@ class ReadOnlyTransactionImplTest extends 
BaseIgniteAbstractTest {
                 HybridTimestampTracker.atomicTracker(null),
                 txId,
                 new UUID(1, 2),
-                false,
                 10_000,
                 readTimestamp,
                 new CompletableFuture<>()

Reply via email to