This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 765c7ee992f IGNITE-26458 Skip commit partition+cleanup path for RW-R 
transactions
765c7ee992f is described below

commit 765c7ee992fdc8a3addb7c5049f968383e0576f7
Author: Alexey Scherbakov <[email protected]>
AuthorDate: Fri Oct 3 12:54:14 2025 +0300

    IGNITE-26458 Skip commit partition+cleanup path for RW-R transactions
---
 .../handler/ClientInboundMessageHandler.java       |   2 +-
 .../apache/ignite/client/fakes/FakeTxManager.java  |   1 +
 .../partition/replicator/TxRecoveryEngine.java     |   6 +-
 modules/runner/build.gradle                        |  20 +++
 ...Benchmark.java => ClientKvGetAllBenchmark.java} |  30 +++--
 ...k.java => ClientKvGetAllExplicitBenchmark.java} |  29 +++--
 .../internal/benchmark/ClientKvGetBenchmark.java   |   2 +-
 .../runner/app/client/ItThinClientSqlTest.java     |  29 +++++
 .../app/client/ItThinClientTransactionsTest.java   |  67 +++++++---
 .../ItThinClientTransactionsWithReplicasTest.java  |   6 +-
 .../InflightTransactionalOperationTracker.java     |  12 +-
 .../engine/QueryTransactionWrapperSelfTest.java    |  35 +++---
 .../ItAbstractInternalTableScanTest.java           |  14 +--
 .../ItInternalTableReadOnlyScanTest.java           |   2 +-
 .../ignite/distributed/ItTxStateLocalMapTest.java  |  34 ++---
 .../ignite/internal/table/ItColocationTest.java    |   1 +
 .../distributed/storage/InternalTableImpl.java     |   4 +-
 .../replication/PartitionReplicaListenerTest.java  |   6 +-
 .../ZonePartitionReplicaListenerTest.java          |   6 +-
 .../distributed/storage/InternalTableImplTest.java |   5 +-
 .../apache/ignite/internal/tx/ItTxCleanupTest.java |   7 ++
 .../tx/distributed/ItTransactionRecoveryTest.java  |   1 +
 .../org/apache/ignite/internal/tx/TxManager.java   |  12 +-
 .../internal/tx/impl/ReadOnlyTransactionImpl.java  |   4 +-
 .../internal/tx/impl/ReadWriteTransactionImpl.java |   1 +
 .../internal/tx/impl/TransactionInflights.java     | 139 ++++++++++++++++-----
 .../internal/tx/impl/TxCleanupRequestSender.java   |  42 ++++---
 .../ignite/internal/tx/impl/TxManagerImpl.java     |  36 +++++-
 .../apache/ignite/internal/tx/TxManagerTest.java   |  10 +-
 .../tx/impl/ReadWriteTransactionImplTest.java      |   4 +-
 30 files changed, 403 insertions(+), 164 deletions(-)

diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index 5b86b2e261d..6fe212bac8e 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -1094,7 +1094,7 @@ public class ClientInboundMessageHandler
                 && 
primaryReplicaMaxStartTime.compareAndSet(lastSentMaxStartTime, 
currentMaxStartTime);
 
         if (primaryReplicasUpdated && LOG.isInfoEnabled()) {
-            LOG.info("Partition primary replica changed, notifying client 
[connectionId=" + connectionId + ", remoteAddress="
+            LOG.info("Partition primary replicas have changed, notifying the 
client [connectionId=" + connectionId + ", remoteAddress="
                     + ctx.channel().remoteAddress() + ']');
         }
 
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
index 3673094029e..8e52d164a73 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
@@ -222,6 +222,7 @@ public class FakeTxManager implements TxManager {
             ReplicationGroupId commitPartition,
             boolean commitIntent,
             boolean timeoutExceeded,
+            boolean recovery,
             Map<ReplicationGroupId, PendingTxPartitionEnlistment> 
enlistedGroups,
             UUID txId
     ) {
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxRecoveryEngine.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxRecoveryEngine.java
index 694f8c02ddd..0ca53daf1d0 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxRecoveryEngine.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxRecoveryEngine.java
@@ -69,10 +69,8 @@ public class TxRecoveryEngine {
                         replicationGroupId,
                         false,
                         false,
-                        Map.of(
-                                replicationGroupId,
-                                
abandonedTxRecoveryEnlistmentFactory.apply(clusterNodeResolver.getById(senderId))
-                        ),
+                        true,
+                        Map.of(replicationGroupId, 
abandonedTxRecoveryEnlistmentFactory.apply(clusterNodeResolver.getById(senderId))),
                         txId
                 )
                 .whenComplete((v, ex) -> runCleanupOnNode(replicationGroupId, 
txId, senderId));
diff --git a/modules/runner/build.gradle b/modules/runner/build.gradle
index 6cbe14ca4dc..b7edce5679a 100644
--- a/modules/runner/build.gradle
+++ b/modules/runner/build.gradle
@@ -301,6 +301,26 @@ tasks.register('runClientGetBenchmark', JavaExec) {
     enableAssertions = true
 }
 
+tasks.register('runClientGetAllBenchmark', JavaExec) {
+    mainClass = 'org.apache.ignite.internal.benchmark.ClientKvGetAllBenchmark'
+
+    jvmArgs += addOpens + ['-Dio.netty.tryReflectionSetAccessible=true', 
'-Xmx16g']
+
+    classpath = sourceSets.integrationTest.runtimeClasspath
+
+    enableAssertions = true
+}
+
+tasks.register('runClientGetAllExplicitBenchmark', JavaExec) {
+    mainClass = 
'org.apache.ignite.internal.benchmark.ClientKvGetAllExplicitBenchmark'
+
+    jvmArgs += addOpens + ['-Dio.netty.tryReflectionSetAccessible=true', 
'-Xmx16g']
+
+    classpath = sourceSets.integrationTest.runtimeClasspath
+
+    enableAssertions = true
+}
+
 jar {
     manifest {
         attributes(
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvGetBenchmark.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvGetAllBenchmark.java
similarity index 78%
copy from 
modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvGetBenchmark.java
copy to 
modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvGetAllBenchmark.java
index a2ab9b6653d..b85e26a6c46 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvGetBenchmark.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvGetAllBenchmark.java
@@ -17,7 +17,9 @@
 
 package org.apache.ignite.internal.benchmark;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.table.Tuple;
@@ -28,13 +30,16 @@ import org.openjdk.jmh.runner.RunnerException;
 /**
  * Put then Get benchmark.
  */
-public class ClientKvGetBenchmark extends ClientKvBenchmark {
+public class ClientKvGetAllBenchmark extends ClientKvBenchmark {
     @Param("1000")
     private int keysPerThread;
 
     @Param("1000")
     private int loadBatchSize;
 
+    @Param("1000")
+    private int batch;
+
     @Override
     public void setUp() {
         super.setUp();
@@ -85,25 +90,30 @@ public class ClientKvGetBenchmark extends ClientKvBenchmark 
{
     private final ThreadLocal<long[]> gen = ThreadLocal.withInitial(() -> new 
long[1]);
 
     /**
-     * Benchmark for KV upsert via embedded client.
+     * Benchmark for KV get via embedded client.
      */
     @Benchmark
     public void get() {
-        long[] cur = gen.get();
-        cur[0] = cur[0] + 1;
+        List<Tuple> keys = new ArrayList<>();
 
-        int id = (int) (base.get() + cur[0] % keysPerThread);
+        for (int i = 0; i < batch; i++) {
+            long[] cur = gen.get();
+            cur[0] = cur[0] + 1;
+
+            int id = (int) (base.get() + cur[0] % keysPerThread);
+            Tuple key = Tuple.create().set("ycsb_key", id);
+            keys.add(key);
+        }
 
-        Tuple key = Tuple.create().set("ycsb_key", id);
-        Tuple val = kvView.get(null, key);
-        assert val != null : Thread.currentThread().getName() + " " + key;
+        Map<Tuple, Tuple> map = kvView.getAll(null, keys);
+        assert map.size() == batch : Thread.currentThread().getName() + " " + 
keys;
     }
 
     /**
      * Benchmark's entry point. Can be started from command line:
-     * ./gradlew ":ignite-runner:runClientPutBenchmark" --args='jmh.batch=10 
jmh.threads=1'
+     * ./gradlew ":ignite-runner:runClientGetAllBenchmark" 
--args='jmh.batch=10 jmh.threads=1'
      */
     public static void main(String[] args) throws RunnerException {
-        runBenchmark(ClientKvGetBenchmark.class, args);
+        runBenchmark(ClientKvGetAllBenchmark.class, args);
     }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvGetBenchmark.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvGetAllExplicitBenchmark.java
similarity index 78%
copy from 
modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvGetBenchmark.java
copy to 
modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvGetAllExplicitBenchmark.java
index a2ab9b6653d..572703d4898 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvGetBenchmark.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvGetAllExplicitBenchmark.java
@@ -21,6 +21,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.Transaction;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.Param;
 import org.openjdk.jmh.runner.RunnerException;
@@ -28,13 +29,16 @@ import org.openjdk.jmh.runner.RunnerException;
 /**
  * Put then Get benchmark.
  */
-public class ClientKvGetBenchmark extends ClientKvBenchmark {
+public class ClientKvGetAllExplicitBenchmark extends ClientKvBenchmark {
     @Param("1000")
     private int keysPerThread;
 
     @Param("1000")
     private int loadBatchSize;
 
+    @Param("5")
+    private int batch;
+
     @Override
     public void setUp() {
         super.setUp();
@@ -85,25 +89,30 @@ public class ClientKvGetBenchmark extends ClientKvBenchmark 
{
     private final ThreadLocal<long[]> gen = ThreadLocal.withInitial(() -> new 
long[1]);
 
     /**
-     * Benchmark for KV upsert via embedded client.
+     * Benchmark for KV get via embedded client.
      */
     @Benchmark
     public void get() {
-        long[] cur = gen.get();
-        cur[0] = cur[0] + 1;
+        Transaction tx = publicIgnite.transactions().begin();
+
+        for (int i = 0; i < batch; i++) {
+            long[] cur = gen.get();
+            cur[0] = cur[0] + 1;
 
-        int id = (int) (base.get() + cur[0] % keysPerThread);
+            int id = (int) (base.get() + cur[0] % keysPerThread);
+            Tuple key = Tuple.create().set("ycsb_key", id);
+            Tuple val = kvView.get(null, key);
+            assert val != null : Thread.currentThread().getName() + " " + key;
+        }
 
-        Tuple key = Tuple.create().set("ycsb_key", id);
-        Tuple val = kvView.get(null, key);
-        assert val != null : Thread.currentThread().getName() + " " + key;
+        tx.commit();
     }
 
     /**
      * Benchmark's entry point. Can be started from command line:
-     * ./gradlew ":ignite-runner:runClientPutBenchmark" --args='jmh.batch=10 
jmh.threads=1'
+     * ./gradlew ":ignite-runner:runClientGetAllExplicitBenchmark" 
--args='jmh.batch=10 jmh.threads=1'
      */
     public static void main(String[] args) throws RunnerException {
-        runBenchmark(ClientKvGetBenchmark.class, args);
+        runBenchmark(ClientKvGetAllExplicitBenchmark.class, args);
     }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvGetBenchmark.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvGetBenchmark.java
index a2ab9b6653d..e2ddfdb254f 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvGetBenchmark.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvGetBenchmark.java
@@ -101,7 +101,7 @@ public class ClientKvGetBenchmark extends ClientKvBenchmark 
{
 
     /**
      * Benchmark's entry point. Can be started from command line:
-     * ./gradlew ":ignite-runner:runClientPutBenchmark" --args='jmh.batch=10 
jmh.threads=1'
+     * ./gradlew ":ignite-runner:runClientGetBenchmark" --args='jmh.batch=10 
jmh.threads=1'
      */
     public static void main(String[] args) throws RunnerException {
         runBenchmark(ClientKvGetBenchmark.class, args);
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSqlTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSqlTest.java
index ca14d6b4a75..41652b6ef47 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSqlTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSqlTest.java
@@ -37,8 +37,13 @@ import java.util.UUID;
 import java.util.concurrent.CompletionException;
 import java.util.stream.Collectors;
 import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.internal.TestWrappers;
+import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.catalog.commands.CatalogUtils;
 import org.apache.ignite.internal.security.authentication.UserDetails;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.sql.ColumnMetadata;
 import org.apache.ignite.sql.ColumnType;
@@ -57,6 +62,7 @@ import org.apache.ignite.tx.Transaction;
 import org.apache.ignite.tx.TransactionOptions;
 import org.hamcrest.Matchers;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
@@ -741,6 +747,29 @@ public class ItThinClientSqlTest extends 
ItAbstractThinClientTest {
         }
     }
 
+    @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-26567";)
+    public void testBroadcastQueryTxInflightStateCleanup() {
+        IgniteSql sql = client().sql();
+
+        sql.execute(null, "CREATE TABLE t1 (id INT PRIMARY KEY, val 
VARCHAR)").close();
+        sql.execute(null, String.format("CREATE INDEX IF NOT EXISTS idx1 ON t1 
(val)"));
+        sql.execute(null, "INSERT INTO t1 (id, val) VALUES (1, 
'test1')").close();
+
+        try (ResultSet<SqlRow> rs = sql.execute(null, "SELECT id FROM t1 WHERE 
val = ?", "test1")) {
+            assertTrue(rs.hasNext());
+            assertEquals(1, rs.next().intValue(0));
+            assertFalse(rs.hasNext());
+        }
+
+        for (int i = 0; i < nodes(); i++) {
+            IgniteImpl server = TestWrappers.unwrapIgniteImpl(server(i));
+            TxManager txManager = server.txManager();
+            TransactionInflights transactionInflights = 
IgniteTestUtils.getFieldValue(txManager, "transactionInflights");
+            assertFalse(transactionInflights.hasActiveInflights(), "Expecting 
no active inflights");
+        }
+    }
+
     private static class Pojo {
         public int num;
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
index 1357420abc2..3043f8aae18 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
@@ -57,7 +57,6 @@ import org.apache.ignite.internal.client.TcpIgniteClient;
 import org.apache.ignite.internal.client.table.ClientTable;
 import org.apache.ignite.internal.client.tx.ClientLazyTransaction;
 import org.apache.ignite.internal.client.tx.ClientTransaction;
-import org.apache.ignite.internal.network.InternalClusterNode;
 import org.apache.ignite.internal.table.partition.HashPartition;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.internal.tx.TxState;
@@ -505,7 +504,7 @@ public class ItThinClientTransactionsTest extends 
ItAbstractThinClientTest {
             int start,
             int count,
             Map<Partition, ClusterNode> map,
-            InternalClusterNode clusterNode,
+            ClusterNode clusterNode,
             Table table
     ) {
         String clusterNodeName = clusterNode.name();
@@ -595,7 +594,7 @@ public class ItThinClientTransactionsTest extends 
ItAbstractThinClientTest {
 
         IgniteImpl server0 = TestWrappers.unwrapIgniteImpl(server(0));
 
-        List<Tuple> tuples0 = generateKeysForNode(200, 2, map, 
server0.clusterService().topologyService().localMember(), table);
+        List<Tuple> tuples0 = generateKeysForNode(200, 2, map, 
server0.cluster().localNode(), table);
 
         Tuple key = tuples0.get(0);
         Tuple val = val(key.intValue(0) + "");
@@ -626,8 +625,8 @@ public class ItThinClientTransactionsTest extends 
ItAbstractThinClientTest {
         IgniteImpl server0 = TestWrappers.unwrapIgniteImpl(server(0));
         IgniteImpl server1 = TestWrappers.unwrapIgniteImpl(server(1));
 
-        List<Tuple> tuples0 = generateKeysForNode(300, 1, map, 
server0.clusterService().topologyService().localMember(), table);
-        List<Tuple> tuples1 = generateKeysForNode(310, 1, map, 
server1.clusterService().topologyService().localMember(), table);
+        List<Tuple> tuples0 = generateKeysForNode(300, 1, map, 
server0.cluster().localNode(), table);
+        List<Tuple> tuples1 = generateKeysForNode(310, 1, map, 
server1.cluster().localNode(), table);
 
         Map<Tuple, Tuple> data = new HashMap<>();
 
@@ -658,8 +657,8 @@ public class ItThinClientTransactionsTest extends 
ItAbstractThinClientTest {
         IgniteImpl server0 = TestWrappers.unwrapIgniteImpl(server(0));
         IgniteImpl server1 = TestWrappers.unwrapIgniteImpl(server(1));
 
-        List<Tuple> tuples0 = generateKeysForNode(400, 1, map, 
server0.clusterService().topologyService().localMember(), table);
-        List<Tuple> tuples1 = generateKeysForNode(410, 1, map, 
server1.clusterService().topologyService().localMember(), table);
+        List<Tuple> tuples0 = generateKeysForNode(400, 1, map, 
server0.cluster().localNode(), table);
+        List<Tuple> tuples1 = generateKeysForNode(410, 1, map, 
server1.cluster().localNode(), table);
 
         Map<Tuple, Tuple> data = new HashMap<>();
 
@@ -707,8 +706,8 @@ public class ItThinClientTransactionsTest extends 
ItAbstractThinClientTest {
         IgniteImpl server0 = TestWrappers.unwrapIgniteImpl(server(0));
         IgniteImpl server1 = TestWrappers.unwrapIgniteImpl(server(1));
 
-        List<Tuple> tuples0 = generateKeysForNode(500, 1, map, 
server0.clusterService().topologyService().localMember(), table);
-        List<Tuple> tuples1 = generateKeysForNode(510, 80, map, 
server1.clusterService().topologyService().localMember(), table);
+        List<Tuple> tuples0 = generateKeysForNode(500, 1, map, 
server0.cluster().localNode(), table);
+        List<Tuple> tuples1 = generateKeysForNode(510, 80, map, 
server1.cluster().localNode(), table);
 
         ClientLazyTransaction tx0 = (ClientLazyTransaction) 
client().transactions().begin();
 
@@ -834,8 +833,8 @@ public class ItThinClientTransactionsTest extends 
ItAbstractThinClientTest {
         IgniteImpl server0 = TestWrappers.unwrapIgniteImpl(server(0));
         IgniteImpl server1 = TestWrappers.unwrapIgniteImpl(server(1));
 
-        List<Tuple> tuples0 = generateKeysForNode(600, 50, map, 
server0.clusterService().topologyService().localMember(), table);
-        List<Tuple> tuples1 = generateKeysForNode(610, 50, map, 
server1.clusterService().topologyService().localMember(), table);
+        List<Tuple> tuples0 = generateKeysForNode(600, 50, map, 
server0.cluster().localNode(), table);
+        List<Tuple> tuples1 = generateKeysForNode(610, 50, map, 
server1.cluster().localNode(), table);
 
         Map<Tuple, Tuple> batch = new HashMap<>();
 
@@ -865,8 +864,8 @@ public class ItThinClientTransactionsTest extends 
ItAbstractThinClientTest {
         IgniteImpl server0 = TestWrappers.unwrapIgniteImpl(server(0));
         IgniteImpl server1 = TestWrappers.unwrapIgniteImpl(server(1));
 
-        List<Tuple> tuples0 = generateKeysForNode(700, 50, map, 
server0.clusterService().topologyService().localMember(), table);
-        List<Tuple> tuples1 = generateKeysForNode(710, 50, map, 
server1.clusterService().topologyService().localMember(), table);
+        List<Tuple> tuples0 = generateKeysForNode(700, 50, map, 
server0.cluster().localNode(), table);
+        List<Tuple> tuples1 = generateKeysForNode(710, 50, map, 
server1.cluster().localNode(), table);
 
         Map<Tuple, Tuple> batch = new HashMap<>();
 
@@ -931,6 +930,44 @@ public class ItThinClientTransactionsTest extends 
ItAbstractThinClientTest {
         tx.commit();
     }
 
+    @Test
+    void testExplicitReadWriteTransaction() {
+        ClientTable table = (ClientTable) table();
+
+        KeyValueView<Tuple, Tuple> kvView = table.keyValueView();
+
+        // Load partition map to ensure all entries are directly mapped.
+        Map<Partition, ClusterNode> map = 
table.partitionManager().primaryReplicasAsync().join();
+
+        IgniteImpl server0 = TestWrappers.unwrapIgniteImpl(server(0));
+        IgniteImpl server1 = TestWrappers.unwrapIgniteImpl(server(1));
+
+        List<Tuple> tuples0 = generateKeysForNode(600, 20, map, 
server0.cluster().localNode(), table);
+        List<Tuple> tuples1 = generateKeysForNode(600, 20, map, 
server1.cluster().localNode(), table);
+
+        Tuple k1 = tuples0.get(0);
+        Tuple v1 = val(tuples0.get(0).intValue(0) + "");
+
+        Tuple k2 = tuples1.get(1);
+        Tuple v2 = val(tuples1.get(1).intValue(0) + "");
+
+        Transaction tx0 = client().transactions().begin();
+        kvView.put(tx0, k1, v1);
+        kvView.put(tx0, k2, v2);
+        tx0.commit();
+
+        Map<Tuple, Tuple> crossPartitionBatch = new HashMap<>();
+        crossPartitionBatch.put(k1, v1);
+        crossPartitionBatch.put(k2, v2);
+
+        Map<Tuple, Tuple> res = kvView.getAll(null, 
crossPartitionBatch.keySet());
+
+        assertEquals(crossPartitionBatch, res);
+
+        assertEquals(v1, kvView.get(null, k1));
+        assertEquals(v2, kvView.get(null, k2));
+    }
+
     @Test
     void testExplicitReadOnlyTransaction() {
         ClientTable table = (ClientTable) table();
@@ -943,8 +980,8 @@ public class ItThinClientTransactionsTest extends 
ItAbstractThinClientTest {
         IgniteImpl server0 = TestWrappers.unwrapIgniteImpl(server(0));
         IgniteImpl server1 = TestWrappers.unwrapIgniteImpl(server(1));
 
-        List<Tuple> tuples0 = generateKeysForNode(600, 20, map, 
server0.clusterService().topologyService().localMember(), table);
-        List<Tuple> tuples1 = generateKeysForNode(600, 20, map, 
server1.clusterService().topologyService().localMember(), table);
+        List<Tuple> tuples0 = generateKeysForNode(600, 20, map, 
server0.cluster().localNode(), table);
+        List<Tuple> tuples1 = generateKeysForNode(600, 20, map, 
server1.cluster().localNode(), table);
 
         Tuple k1 = tuples0.get(0);
         Tuple v1 = val(tuples0.get(0).intValue(0) + "");
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsWithReplicasTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsWithReplicasTest.java
index fd9458925ba..f10eaf17548 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsWithReplicasTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsWithReplicasTest.java
@@ -51,9 +51,9 @@ public class ItThinClientTransactionsWithReplicasTest extends 
ItAbstractThinClie
         IgniteImpl server1 = TestWrappers.unwrapIgniteImpl(server(1));
         IgniteImpl server2 = TestWrappers.unwrapIgniteImpl(server(2));
 
-        List<Tuple> tuples0 = generateKeysForNode(100, 1, map, 
server0.clusterService().topologyService().localMember(), table);
-        List<Tuple> tuples1 = generateKeysForNode(100, 1, map, 
server1.clusterService().topologyService().localMember(), table);
-        List<Tuple> tuples2 = generateKeysForNode(100, 1, map, 
server2.clusterService().topologyService().localMember(), table);
+        List<Tuple> tuples0 = generateKeysForNode(100, 1, map, 
server0.cluster().localNode(), table);
+        List<Tuple> tuples1 = generateKeysForNode(100, 1, map, 
server1.cluster().localNode(), table);
+        List<Tuple> tuples2 = generateKeysForNode(100, 1, map, 
server2.cluster().localNode(), table);
 
         if (tuples0.isEmpty() || tuples1.isEmpty() || tuples2.isEmpty()) {
             return; // Skip the test if assignments are bad.
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/InflightTransactionalOperationTracker.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/InflightTransactionalOperationTracker.java
index 7f1c9bcca7c..7b2a8a761d4 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/InflightTransactionalOperationTracker.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/InflightTransactionalOperationTracker.java
@@ -38,7 +38,9 @@ class InflightTransactionalOperationTracker implements 
TransactionalOperationTra
     @Override
     public void registerOperationStart(InternalTransaction tx) {
         if (shouldBeTracked(tx)) {
-            if (!delegate.addInflight(tx.id(), tx.isReadOnly())) {
+            boolean result = tx.isReadOnly() ? 
delegate.addScanInflight(tx.id()) : delegate.track(tx.id());
+
+            if (!result) {
                 throw new TransactionException(TX_ALREADY_FINISHED_ERR, 
format("Transaction is already finished [tx={}]", tx));
             }
         }
@@ -47,11 +49,13 @@ class InflightTransactionalOperationTracker implements 
TransactionalOperationTra
     @Override
     public void registerOperationFinish(InternalTransaction tx) {
         if (shouldBeTracked(tx)) {
-            delegate.removeInflight(tx.id());
+            if (tx.isReadOnly()) {
+                delegate.removeInflight(tx.id());
+            }
         }
     }
 
     private static boolean shouldBeTracked(InternalTransaction tx) {
-        return tx.isReadOnly() && !tx.implicit();
-    } 
+        return !tx.implicit();
+    }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/QueryTransactionWrapperSelfTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/QueryTransactionWrapperSelfTest.java
index 272cf855af1..219b1d2313f 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/QueryTransactionWrapperSelfTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/QueryTransactionWrapperSelfTest.java
@@ -70,7 +70,8 @@ public class QueryTransactionWrapperSelfTest extends 
BaseIgniteAbstractTest {
     public void testImplicitTransactionAttributes() {
         prepareTransactionsMocks();
 
-        when(transactionInflights.addInflight(any(), 
anyBoolean())).thenReturn(true);
+        when(transactionInflights.track(any())).thenReturn(true);
+        when(transactionInflights.addScanInflight(any())).thenReturn(true);
 
         QueryTransactionContext transactionHandler = new 
QueryTransactionContextImpl(txManager, observableTimeTracker, null,
                 new 
InflightTransactionalOperationTracker(transactionInflights));
@@ -143,6 +144,7 @@ public class QueryTransactionWrapperSelfTest extends 
BaseIgniteAbstractTest {
             return NoOpTransaction.readWrite("test", implicit);
         });
 
+        when(transactionInflights.track(any())).thenReturn(true);
         txCtx.handleControlStatement(txStartStmt);
 
         assertThrowsSqlException(
@@ -163,23 +165,24 @@ public class QueryTransactionWrapperSelfTest extends 
BaseIgniteAbstractTest {
         QueryTransactionContext implicitDmlTxCtx = new 
QueryTransactionContextImpl(txManager, observableTimeTracker, null,
                 new 
InflightTransactionalOperationTracker(transactionInflights));
         implicitDmlTxCtx.getOrStartSqlManaged(false, false);
-        // Check that RW txns do not create tx inflights.
+        // Check that RW txns are tracked.
         log.info("inflights={}", inflights);
-        assertTrue(inflights.isEmpty());
+        assertEquals(1, inflights.size());
 
         QueryTransactionContext implicitQueryTxCtx = new 
QueryTransactionContextImpl(txManager, observableTimeTracker, null,
                 new 
InflightTransactionalOperationTracker(transactionInflights));
         QueryTransactionWrapper implicitQueryTxWrapper = 
implicitQueryTxCtx.getOrStartSqlManaged(true, false);
         assertTrue(inflights.contains(implicitQueryTxWrapper.unwrap().id()));
-        implicitQueryTxWrapper.finalise();
-        assertTrue(inflights.isEmpty());
+        implicitQueryTxWrapper.finalise().join();
+        assertEquals(1, inflights.size());
 
         NoOpTransaction rwTx = NoOpTransaction.readWrite("test-rw", false);
         QueryTransactionContext explicitRwTxCtx = new 
QueryTransactionContextImpl(txManager, observableTimeTracker, rwTx,
                 new 
InflightTransactionalOperationTracker(transactionInflights));
-        explicitRwTxCtx.getOrStartSqlManaged(true, false);
-        // Check that RW txns do not create tx inflights.
-        assertTrue(inflights.isEmpty());
+        QueryTransactionWrapper explicitRwTxWrapper = 
explicitRwTxCtx.getOrStartSqlManaged(true, false);
+        assertTrue(inflights.contains(explicitRwTxWrapper.unwrap().id()));
+        // Check that RW txns are tracked.
+        assertEquals(2, inflights.size());
 
         NoOpTransaction roTx = NoOpTransaction.readOnly("test-ro", false);
         QueryTransactionContext explicitRoTxCtx = new 
QueryTransactionContextImpl(txManager, observableTimeTracker, roTx,
@@ -187,7 +190,7 @@ public class QueryTransactionWrapperSelfTest extends 
BaseIgniteAbstractTest {
         QueryTransactionWrapper explicitRoTxWrapper = 
explicitRoTxCtx.getOrStartSqlManaged(true, false);
         assertTrue(inflights.contains(explicitRoTxWrapper.unwrap().id()));
         explicitRoTxWrapper.finalise();
-        assertTrue(inflights.isEmpty());
+        assertEquals(2, inflights.size());
     }
 
     @Test
@@ -207,25 +210,25 @@ public class QueryTransactionWrapperSelfTest extends 
BaseIgniteAbstractTest {
         when(sqlStartRwTx.getMode()).thenAnswer(inv -> 
IgniteSqlStartTransactionMode.READ_WRITE);
 
         scriptRwTxCtx.handleControlStatement(sqlStartRwTx);
-        assertTrue(inflights.isEmpty());
+        assertFalse(inflights.isEmpty());
 
         ScriptTransactionContext scriptRoTxCtx = new 
ScriptTransactionContext(txCtx, operationTracker);
         IgniteSqlStartTransaction sqlStartRoTx = 
mock(IgniteSqlStartTransaction.class);
         when(sqlStartRoTx.getMode()).thenAnswer(inv -> 
IgniteSqlStartTransactionMode.READ_ONLY);
 
         scriptRoTxCtx.handleControlStatement(sqlStartRoTx);
-        assertEquals(1, inflights.size());
+        assertEquals(2, inflights.size());
 
         QueryTransactionWrapper wrapper = 
scriptRoTxCtx.getOrStartSqlManaged(true, false);
-        assertEquals(1, inflights.size());
+        assertEquals(2, inflights.size());
 
         // ScriptTransactionWrapperImpl.commitImplicit is noop.
         wrapper.finalise();
-        assertEquals(1, inflights.size());
+        assertEquals(2, inflights.size());
 
         IgniteSqlCommitTransaction sqlCommitTx = 
mock(IgniteSqlCommitTransaction.class);
         scriptRoTxCtx.handleControlStatement(sqlCommitTx);
-        assertTrue(inflights.isEmpty());
+        assertEquals(1, inflights.size());
     }
 
     private void prepareTransactionsMocks() {
@@ -239,8 +242,10 @@ public class QueryTransactionWrapperSelfTest extends 
BaseIgniteAbstractTest {
     }
 
     private void prepareTxInflightsMocks(Set<UUID> inflights) {
-        when(transactionInflights.addInflight(any(), 
anyBoolean())).thenAnswer(inv -> inflights.add(inv.getArgument(0)));
+        when(transactionInflights.addScanInflight(any())).thenAnswer(inv -> 
inflights.add(inv.getArgument(0)));
 
         doAnswer(inv -> 
inflights.remove(inv.getArgument(0))).when(transactionInflights).removeInflight(any());
+
+        when(transactionInflights.track(any())).thenAnswer(inv -> 
inflights.add(inv.getArgument(0)));
     }
 }
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
index 9c57e845874..af51cae7bde 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
@@ -21,6 +21,8 @@ import static 
org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
@@ -69,7 +71,6 @@ import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.tx.InternalTransaction;
-import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
 import org.apache.ignite.internal.type.NativeTypes;
 import org.jetbrains.annotations.Nullable;
@@ -277,7 +278,7 @@ public abstract class ItAbstractInternalTableScanTest 
extends IgniteAbstractTest
 
         assertEquals(NoSuchElementException.class, 
unwrapCause(gotException.get()).getClass());
 
-        validateTxAbortedState(tx);
+        validateTxFinished(tx);
     }
 
     /**
@@ -322,13 +323,12 @@ public abstract class ItAbstractInternalTableScanTest 
extends IgniteAbstractTest
 
         assertEquals(StorageException.class, 
unwrapCause(gotException.get()).getClass());
 
-        validateTxAbortedState(tx);
+        validateTxFinished(tx);
     }
 
-    protected void validateTxAbortedState(InternalTransaction tx) {
-        if (tx != null) {
-            assertEquals(TxState.ABORTED, tx.state());
-        }
+    protected void validateTxFinished(InternalTransaction tx) {
+        assertNotNull(tx);
+        assertNull(tx.state());
     }
 
     /**
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java
index 68b864af23a..796a5166c06 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java
@@ -57,7 +57,7 @@ public class ItInternalTableReadOnlyScanTest extends 
ItAbstractInternalTableScan
     }
 
     @Override
-    protected void validateTxAbortedState(InternalTransaction tx) {
+    protected void validateTxFinished(InternalTransaction tx) {
         // noop since we do not store state for readonly transactions.
     }
 
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java
index 6f4655090f0..51a070c1251 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java
@@ -124,22 +124,22 @@ public class ItTxStateLocalMapTest extends 
IgniteAbstractTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testUpsert(boolean commit) {
-        testTransaction(tx -> table.recordView().upsert(tx, makeValue(1, 1)), 
true, commit);
+        testTransaction(tx -> table.recordView().upsert(tx, makeValue(1, 1)), 
true, commit, false);
     }
 
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testGet(boolean commit) {
-        testTransaction(tx -> table.recordView().get(tx, makeKey(1)), false, 
commit);
+        testTransaction(tx -> table.recordView().get(tx, makeKey(1)), false, 
commit, true);
     }
 
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testUpdateAll(boolean commit) {
-        testTransaction(tx -> table.recordView().upsertAll(tx, 
List.of(makeValue(1, 1), makeValue(2, 2))), true, commit);
+        testTransaction(tx -> table.recordView().upsertAll(tx, 
List.of(makeValue(1, 1), makeValue(2, 2))), true, commit, false);
     }
 
-    private void testTransaction(Consumer<Transaction> touchOp, boolean 
checkAfterTouch, boolean commit) {
+    private void testTransaction(Consumer<Transaction> touchOp, boolean 
checkAfterTouch, boolean commit, boolean read) {
         InternalClusterNode coord = 
testCluster.cluster.get(0).topologyService().localMember();
         UUID coordinatorId = coord.id();
 
@@ -160,17 +160,21 @@ public class ItTxStateLocalMapTest extends 
IgniteAbstractTest {
             tx.rollback();
         }
 
-        checkLocalTxStateOnNodes(
-                tx.id(),
-                new TxStateMeta(
-                        commit ? COMMITTED : ABORTED,
-                        coordinatorId,
-                        tx.commitPartition(),
-                        commit ? 
testCluster.clockServices.get(coord.name()).now() : null,
-                        null,
-                        null
-                )
-        );
+        if (read) {
+            checkLocalTxStateOnNodes(tx.id(), null);
+        } else {
+            checkLocalTxStateOnNodes(
+                    tx.id(),
+                    new TxStateMeta(
+                            commit ? COMMITTED : ABORTED,
+                            coordinatorId,
+                            tx.commitPartition(),
+                            commit ? 
testCluster.clockServices.get(coord.name()).now() : null,
+                            null,
+                            null
+                    )
+            );
+        }
     }
 
     private void checkLocalTxStateOnNodes(UUID txId, TxStateMeta expected) {
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index 6cea81e1e53..8d09a82ce9d 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -233,6 +233,7 @@ public class ItColocationTest extends 
BaseIgniteAbstractTest {
                     ReplicationGroupId commitPartition,
                     boolean commitIntent,
                     boolean timeoutExceeded,
+                    boolean recovery,
                     Map<ReplicationGroupId, PendingTxPartitionEnlistment> 
enlistedGroups,
                     UUID txId
             ) {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 8f418d59651..bd2e079ec7d 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -679,7 +679,7 @@ public class InternalTableImpl implements InternalTable {
 
             if (req.isWrite()) {
                 // Track only write requests from explicit transactions.
-                if (!tx.remote() && !transactionInflights.addInflight(tx.id(), 
false)) {
+                if (!tx.remote() && 
!transactionInflights.addInflight(tx.id())) {
                     int code = TX_ALREADY_FINISHED_ERR;
                     if (tx.isRolledBackWithTimeoutExceeded()) {
                         code = TX_ALREADY_FINISHED_WITH_TIMEOUT_ERR;
@@ -2136,7 +2136,7 @@ public class InternalTableImpl implements InternalTable {
         @Override
         public void onRequestBegin() {
             // Track read only requests which are able to create cursors.
-            if (!transactionInflights.addInflight(txId, true)) {
+            if (!transactionInflights.addScanInflight(txId)) {
                 throw new TransactionException(TX_ALREADY_FINISHED_ERR, format(
                         "Transaction is already finished () [txId={}, 
readOnly=true].",
                         txId
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 442b02a220c..fe0968f689d 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -763,7 +763,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
     // TODO https://issues.apache.org/jira/browse/IGNITE-22522 Remove this 
test when zone colocation will be the only implementation.
     public void testTxStateReplicaRequestEmptyState() throws Exception {
         doAnswer(invocation -> {
-            UUID txId = invocation.getArgument(5);
+            UUID txId = invocation.getArgument(6);
 
             txManager.updateTxMeta(txId, old -> new TxStateMeta(
                     ABORTED,
@@ -775,7 +775,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
             ));
 
             return nullCompletedFuture();
-        }).when(txManager).finish(any(), any(), anyBoolean(), anyBoolean(), 
any(), any());
+        }).when(txManager).finish(any(), any(), anyBoolean(), anyBoolean(), 
anyBoolean(), any(), any());
 
         CompletableFuture<ReplicaResult> fut = 
partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateCommitPartitionRequest()
                 .groupId(tablePartitionIdMessage(grpId))
@@ -2335,7 +2335,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         doAnswer(invocation -> 
nullCompletedFuture()).when(txManager).executeWriteIntentSwitchAsync(any(Runnable.class));
 
         doAnswer(invocation -> nullCompletedFuture())
-                .when(txManager).finish(any(), any(), anyBoolean(), 
anyBoolean(), any(), any());
+                .when(txManager).finish(any(), any(), anyBoolean(), 
anyBoolean(), anyBoolean(), any(), any());
         doAnswer(invocation -> nullCompletedFuture())
                 .when(txManager).cleanup(any(), anyString(), any());
     }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
index bacda373b05..39715e0b950 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
@@ -961,7 +961,7 @@ public class ZonePartitionReplicaListenerTest extends 
IgniteAbstractTest {
         doAnswer(invocation -> 
nullCompletedFuture()).when(txManager).executeWriteIntentSwitchAsync(any(Runnable.class));
 
         doAnswer(invocation -> nullCompletedFuture())
-                .when(txManager).finish(any(), any(), anyBoolean(), 
anyBoolean(), any(), any());
+                .when(txManager).finish(any(), any(), anyBoolean(), 
anyBoolean(), anyBoolean(), any(), any());
         doAnswer(invocation -> nullCompletedFuture())
                 .when(txManager).cleanup(any(), anyString(), any());
     }
@@ -1340,7 +1340,7 @@ public class ZonePartitionReplicaListenerTest extends 
IgniteAbstractTest {
     @Test
     public void testTxStateReplicaRequestEmptyState() throws Exception {
         doAnswer(invocation -> {
-            UUID txId = invocation.getArgument(5);
+            UUID txId = invocation.getArgument(6);
 
             txManager.updateTxMeta(txId, old -> new TxStateMeta(
                     ABORTED,
@@ -1352,7 +1352,7 @@ public class ZonePartitionReplicaListenerTest extends 
IgniteAbstractTest {
             ));
 
             return nullCompletedFuture();
-        }).when(txManager).finish(any(), any(), anyBoolean(), anyBoolean(), 
any(), any());
+        }).when(txManager).finish(any(), any(), anyBoolean(), anyBoolean(), 
anyBoolean(), any(), any());
 
         CompletableFuture<ReplicaResult> fut = 
zonePartitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateCommitPartitionRequest()
                 .groupId(tablePartitionIdMessage(grpId))
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
index 22974aaf3bd..180be47cce7 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
@@ -149,7 +149,8 @@ public class InternalTableImplTest extends 
BaseIgniteAbstractTest {
                     );
                 });
 
-        lenient().when(txManager.finish(any(), any(), anyBoolean(), 
anyBoolean(), any(), any())).thenReturn(nullCompletedFuture());
+        lenient().when(txManager.finish(any(), any(), anyBoolean(), 
anyBoolean(), anyBoolean(), any(), any()))
+                .thenReturn(nullCompletedFuture());
 
         lenient().when(replicaService.invoke(anyString(), 
any())).then(invocation -> {
             ReplicaRequest request = invocation.getArgument(1);
@@ -392,7 +393,7 @@ public class InternalTableImplTest extends 
BaseIgniteAbstractTest {
     private Map<ReplicationGroupId, PendingTxPartitionEnlistment> 
extractEnlistmentsFromTxFinish() {
         ArgumentCaptor<Map<ReplicationGroupId, PendingTxPartitionEnlistment>> 
enlistmentsCaptor = ArgumentCaptor.captor();
 
-        verify(txManager).finish(any(), any(), anyBoolean(), anyBoolean(), 
enlistmentsCaptor.capture(), any());
+        verify(txManager).finish(any(), any(), anyBoolean(), anyBoolean(), 
anyBoolean(), enlistmentsCaptor.capture(), any());
 
         return enlistmentsCaptor.getValue();
     }
diff --git 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTxCleanupTest.java
 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTxCleanupTest.java
index b758fdbd0dd..d16b6fed98b 100644
--- 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTxCleanupTest.java
+++ 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTxCleanupTest.java
@@ -21,6 +21,7 @@ import static java.util.concurrent.TimeUnit.SECONDS;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.concurrent.atomic.AtomicInteger;
@@ -75,6 +76,12 @@ class ItTxCleanupTest extends ClusterPerTestIntegrationTest {
             }
         });
 
+        if (readsOnly) {
+            // We shouldn't see write intent switching from RW-R a transaction.
+            assertFalse(waitForCondition(() -> 
writeIntentSwitchRequestCount.get() > 0, SECONDS.toMillis(1)));
+            return;
+        }
+
         // We should see one WI switch...
         assertTrue(waitForCondition(() -> writeIntentSwitchRequestCount.get() 
> 0, SECONDS.toMillis(10)));
 
diff --git 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTransactionRecoveryTest.java
 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTransactionRecoveryTest.java
index 4a4ce23cd2b..7bfeb9e7848 100644
--- 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTransactionRecoveryTest.java
+++ 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTransactionRecoveryTest.java
@@ -760,6 +760,7 @@ public class ItTransactionRecoveryTest extends 
ClusterPerTestIntegrationTest {
                 commitPartition,
                 false,
                 false,
+                true,
                 Map.of(commitPartition, new 
PendingTxPartitionEnlistment(txCrdNode2.node().name(), 0L)),
                 rwTx1Id
         );
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index 3c6b77a79ec..23f38e89748 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -176,6 +176,7 @@ public interface TxManager extends IgniteComponent {
      * @param commitPartition Partition to store a transaction state. {@code 
null} if nothing was enlisted into the transaction.
      * @param commitIntent {@code true} if a commit requested.
      * @param timeoutExceeded {@code true} if a timeout exceeded.
+     * @param recovery {@code true} if finished by recovery.
      * @param enlistedGroups Map of enlisted partitions.
      * @param txId Transaction id.
      */
@@ -184,6 +185,7 @@ public interface TxManager extends IgniteComponent {
             @Nullable ReplicationGroupId commitPartition,
             boolean commitIntent,
             boolean timeoutExceeded,
+            boolean recovery,
             Map<ReplicationGroupId, PendingTxPartitionEnlistment> 
enlistedGroups,
             UUID txId
     );
@@ -193,7 +195,7 @@ public interface TxManager extends IgniteComponent {
      *
      * <p>The nodes to send the request to are taken from the mapping 
`partition id -> partition primary`.
      *
-     * @param commitPartitionId Commit partition id.
+     * @param commitPartitionId Commit partition id. {@code Null} for unlock 
only path.
      * @param enlistedPartitions Map of enlisted partitions.
      * @param commit {@code true} if a commit requested.
      * @param commitTimestamp Commit timestamp ({@code null} if it's an abort).
@@ -201,7 +203,7 @@ public interface TxManager extends IgniteComponent {
      * @return Completable future of Void.
      */
     CompletableFuture<Void> cleanup(
-            ReplicationGroupId commitPartitionId,
+            @Nullable ReplicationGroupId commitPartitionId,
             Map<ReplicationGroupId, PartitionEnlistment> enlistedPartitions,
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp,
@@ -213,7 +215,7 @@ public interface TxManager extends IgniteComponent {
      *
      * <p>The nodes to sends the request to are calculated by the placement 
driver.
      *
-     * @param commitPartitionId Commit partition id.
+     * @param commitPartitionId Commit partition id. {@code Null} for unlock 
only path.
      * @param enlistedPartitions Enlisted partitions.
      * @param commit {@code true} if a commit requested.
      * @param commitTimestamp Commit timestamp ({@code null} if it's an abort).
@@ -221,7 +223,7 @@ public interface TxManager extends IgniteComponent {
      * @return Completable future of Void.
      */
     CompletableFuture<Void> cleanup(
-            ReplicationGroupId commitPartitionId,
+            @Nullable ReplicationGroupId commitPartitionId,
             Collection<EnlistedPartitionGroup> enlistedPartitions,
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp,
@@ -229,7 +231,7 @@ public interface TxManager extends IgniteComponent {
     );
 
     /**
-     * Sends cleanup request to the nodes than initiated recovery.
+     * Sends cleanup request to a node that had initiated the recovery.
      *
      * @param commitPartitionId Commit partition id.
      * @param node Target node.
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
index 44e2c014a78..5c85ffefbe8 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
@@ -152,8 +152,8 @@ public class ReadOnlyTransactionImpl extends 
IgniteAbstractTransactionImpl {
 
         ((TxManagerImpl) txManager).onCompleteReadOnlyTransaction(
                 commitIntent,
-                new TxIdAndTimestamp(readTimestamp, id()),
-                timeoutExceeded);
+                new TxIdAndTimestamp(readTimestamp, id())
+        );
 
         this.timeoutExceeded = timeoutExceeded;
 
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
index 284da3f73f2..77e071bce49 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
@@ -254,6 +254,7 @@ public class ReadWriteTransactionImpl extends 
IgniteAbstractTransactionImpl {
                             commitPart,
                             commit,
                             timeoutExceeded,
+                            false,
                             enlisted,
                             id()
                     );
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
index 4dc2230724f..5e8f4e4511f 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
@@ -42,6 +42,7 @@ import 
org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
 import org.apache.ignite.internal.tx.TransactionResult;
 import org.apache.ignite.internal.tx.message.FinishedTransactionsBatchMessage;
 import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
 
 /**
  * Contains counters for in-flight requests of the transactions. Read-write 
transactions can't finish when some requests are in-flight.
@@ -65,18 +66,17 @@ public class TransactionInflights {
     }
 
     /**
-     * Registers the inflight update for a transaction.
+     * Register the update inflight for RW transaction.
      *
      * @param txId The transaction id.
-     * @param readOnly Whether the transaction is read-only.
      * @return {@code True} if the inflight was registered. The update must be 
failed on false.
      */
-    public boolean addInflight(UUID txId, boolean readOnly) {
+    public boolean addInflight(UUID txId) {
         boolean[] res = {true};
 
         txCtxMap.compute(txId, (uuid, ctx) -> {
             if (ctx == null) {
-                ctx = readOnly ? new ReadOnlyTxContext() : new 
ReadWriteTxContext(placementDriver, clockService);
+                ctx = new ReadWriteTxContext(placementDriver, clockService);
             }
 
             res[0] = ctx.addInflight();
@@ -87,6 +87,74 @@ public class TransactionInflights {
         return res[0];
     }
 
+    /**
+     * Register the scan inflight for RO transaction.
+     *
+     * @param txId The transaction id.
+     * @return {@code True} if the inflight was registered. The scan must be 
failed on false.
+     */
+    public boolean addScanInflight(UUID txId) {
+        boolean[] res = {true};
+
+        txCtxMap.compute(txId, (uuid, ctx) -> {
+            if (ctx == null) {
+                ctx = new ReadOnlyTxContext();
+            }
+
+            res[0] = ctx.addInflight();
+
+            return ctx;
+        });
+
+        return res[0];
+    }
+
+    /**
+     * Track the given RW transaction until finish.
+     * Currently RW tracking is used to enforce cleanup path for SQL RW 
transactions, which doesn't use RW inflights tracking yet.
+     *
+     * @param txId The transaction id.
+     * @return {@code True} if the was registered and is in active state.
+     */
+    public boolean track(UUID txId) {
+        boolean[] res = {true};
+
+        txCtxMap.compute(txId, (uuid, ctx) -> {
+            if (ctx == null) {
+                ctx = new ReadWriteTxContext(placementDriver, clockService);
+            }
+
+            res[0] = !ctx.isTxFinishing();
+
+            return ctx;
+        });
+
+        return res[0];
+    }
+
+    /**
+     * Track the given RO transaction until finish.
+     * Currently RO tracking is used to prevent unclosed cursors.
+     *
+     * @param txId The transaction id.
+     * @return {@code True} if the was registered and is in active state.
+     */
+    public boolean trackReadOnly(UUID txId) {
+        boolean[] res = {true};
+
+        txCtxMap.compute(txId, (uuid, ctx) -> {
+            if (ctx == null) {
+                ctx = new ReadOnlyTxContext();
+            }
+
+            res[0] = !ctx.isTxFinishing();
+
+            return ctx;
+        });
+
+        return res[0];
+    }
+
     /**
      * Unregisters the inflight for a transaction.
      *
@@ -106,6 +174,22 @@ public class TransactionInflights {
         }
     }
 
+    /**
+     * Get active inflights.
+     *
+     * @return {@code True} if has some inflights in progress.
+     */
+    @TestOnly
+    public boolean hasActiveInflights() {
+        for (TxContext value : txCtxMap.values()) {
+            if (!value.isTxFinishing()) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
     Collection<UUID> finishedReadOnlyTransactions() {
         return txCtxMap.entrySet().stream()
                 .filter(e -> e.getValue() instanceof ReadOnlyTxContext && 
e.getValue().isReadyToFinish())
@@ -137,13 +221,15 @@ public class TransactionInflights {
         }
     }
 
-    void markReadOnlyTxFinished(UUID txId, boolean timeoutExceeded) {
+    void markReadOnlyTxFinished(UUID txId) {
         txCtxMap.compute(txId, (k, ctx) -> {
             if (ctx == null) {
-                ctx = new ReadOnlyTxContext(timeoutExceeded);
+                ctx = new ReadOnlyTxContext();
+            } else {
+                assert ctx instanceof ReadOnlyTxContext;
             }
 
-            ctx.finishTx(null, timeoutExceeded);
+            ctx.finishTx(null);
 
             return ctx;
         });
@@ -152,12 +238,12 @@ public class TransactionInflights {
     ReadWriteTxContext lockTxForNewUpdates(UUID txId, Map<ReplicationGroupId, 
PendingTxPartitionEnlistment> enlistedGroups) {
         return (ReadWriteTxContext) txCtxMap.compute(txId, (uuid, tuple0) -> {
             if (tuple0 == null) {
-                tuple0 = new ReadWriteTxContext(placementDriver, clockService, 
false); // No writes enlisted.
+                tuple0 = new ReadWriteTxContext(placementDriver, clockService, 
true); // No writes enlisted, can go with unlock only.
             }
 
             assert !tuple0.isTxFinishing() : "Transaction is already finished 
[id=" + uuid + "].";
 
-            tuple0.finishTx(enlistedGroups, false);
+            tuple0.finishTx(enlistedGroups);
 
             return tuple0;
         });
@@ -185,13 +271,11 @@ public class TransactionInflights {
 
         abstract void onInflightsRemoved();
 
-        abstract void finishTx(@Nullable Map<ReplicationGroupId, 
PendingTxPartitionEnlistment> enlistedGroups, boolean timeoutExceeded);
+        abstract void finishTx(@Nullable Map<ReplicationGroupId, 
PendingTxPartitionEnlistment> enlistedGroups);
 
         abstract boolean isTxFinishing();
 
         abstract boolean isReadyToFinish();
-
-        abstract boolean isTimeoutExceeded();
     }
 
     /**
@@ -203,23 +287,18 @@ public class TransactionInflights {
      */
     private static class ReadOnlyTxContext extends TxContext {
         private volatile boolean markedFinished;
-        private volatile boolean timeoutExceeded;
 
         ReadOnlyTxContext() {
             // No-op.
         }
 
-        ReadOnlyTxContext(boolean timeoutExceeded) {
-            this.timeoutExceeded = timeoutExceeded;
-        }
-
         @Override
         public void onInflightsRemoved() {
             // No-op.
         }
 
         @Override
-        public void finishTx(@Nullable Map<ReplicationGroupId, 
PendingTxPartitionEnlistment> enlistedGroups, boolean timeoutExceeded) {
+        public void finishTx(@Nullable Map<ReplicationGroupId, 
PendingTxPartitionEnlistment> enlistedGroups) {
             markedFinished = true;
         }
 
@@ -233,11 +312,6 @@ public class TransactionInflights {
             return markedFinished && inflights == 0;
         }
 
-        @Override
-        boolean isTimeoutExceeded() {
-            return timeoutExceeded;
-        }
-
         @Override
         public String toString() {
             return "ReadOnlyTxContext [inflights=" + inflights + ']';
@@ -247,19 +321,19 @@ public class TransactionInflights {
     static class ReadWriteTxContext extends TxContext {
         private final CompletableFuture<Void> waitRepFut = new 
CompletableFuture<>();
         private final PlacementDriver placementDriver;
+        private final boolean noWrites;
         private volatile CompletableFuture<Void> finishInProgressFuture = null;
         private volatile Map<ReplicationGroupId, PendingTxPartitionEnlistment> 
enlistedGroups;
         private final ClockService clockService;
-        private volatile boolean timeoutExceeded;
 
         private ReadWriteTxContext(PlacementDriver placementDriver, 
ClockService clockService) {
             this(placementDriver, clockService, false);
         }
 
-        private ReadWriteTxContext(PlacementDriver placementDriver, 
ClockService clockService, boolean timeoutExceeded) {
+        private ReadWriteTxContext(PlacementDriver placementDriver, 
ClockService clockService, boolean noWrites) {
             this.placementDriver = placementDriver;
             this.clockService = clockService;
-            this.timeoutExceeded = timeoutExceeded;
+            this.noWrites = noWrites;
         }
 
         CompletableFuture<Void> performFinish(boolean commit, 
Function<Boolean, CompletableFuture<Void>> finishAction) {
@@ -326,8 +400,7 @@ public class TransactionInflights {
                             });
                 }
 
-                return allOfToList(futures)
-                        .thenCompose(unused -> waitNoInflights());
+                return allOfToList(futures).thenCompose(unused -> 
waitNoInflights());
             } else {
                 return nullCompletedFuture();
             }
@@ -352,9 +425,8 @@ public class TransactionInflights {
         }
 
         @Override
-        public void finishTx(Map<ReplicationGroupId, 
PendingTxPartitionEnlistment> enlistedGroups, boolean timeoutExceeded) {
+        public void finishTx(Map<ReplicationGroupId, 
PendingTxPartitionEnlistment> enlistedGroups) {
             this.enlistedGroups = enlistedGroups;
-            this.timeoutExceeded = timeoutExceeded;
             finishInProgressFuture = new CompletableFuture<>();
         }
 
@@ -368,15 +440,14 @@ public class TransactionInflights {
             return waitRepFut.isDone();
         }
 
-        @Override
-        boolean isTimeoutExceeded() {
-            return timeoutExceeded;
+        boolean isNoWrites() {
+            return noWrites;
         }
 
         @Override
         public String toString() {
             return "ReadWriteTxContext [inflights=" + inflights + ", 
waitRepFut=" + waitRepFut
-                    + ", finishFut=" + finishInProgressFuture + ']';
+                    + ", noWrites=" + noWrites + ", finishFut=" + 
finishInProgressFuture + ']';
         }
     }
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
index e621115166d..52f059ccdaf 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
@@ -151,7 +151,7 @@ public class TxCleanupRequestSender {
     /**
      * Sends cleanup request to the primary nodes of each one of {@code 
partitions}.
      *
-     * @param commitPartitionId Commit partition id.
+     * @param commitPartitionId Commit partition id. {@code Null} for unlock 
only path.
      * @param enlistedPartitions Map of enlisted partitions.
      * @param commit {@code true} if a commit requested.
      * @param commitTimestamp Commit timestamp ({@code null} if it's an abort).
@@ -159,17 +159,19 @@ public class TxCleanupRequestSender {
      * @return Completable future of Void.
      */
     public CompletableFuture<Void> cleanup(
-            ReplicationGroupId commitPartitionId,
+            @Nullable ReplicationGroupId commitPartitionId,
             Map<ReplicationGroupId, PartitionEnlistment> enlistedPartitions,
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp,
             UUID txId
     ) {
         // Start tracking the partitions we want to learn the replication 
confirmation from.
-        writeIntentsReplicated.put(
-                txId,
-                new CleanupContext(commitPartitionId, 
enlistedPartitions.keySet(), commit ? TxState.COMMITTED : TxState.ABORTED)
-        );
+        if (commitPartitionId != null) {
+            writeIntentsReplicated.put(
+                    txId,
+                    new CleanupContext(commitPartitionId, 
enlistedPartitions.keySet(), commit ? TxState.COMMITTED : TxState.ABORTED)
+            );
+        }
 
         Map<String, List<EnlistedPartitionGroup>> partitionsByPrimaryName = 
new HashMap<>();
         enlistedPartitions.forEach((partitionId, partition) -> {
@@ -186,7 +188,7 @@ public class TxCleanupRequestSender {
     /**
      * Gets primary nodes for each of the provided {@code partitions} and 
sends cleanup request to each one.
      *
-     * @param commitPartitionId Commit partition id.
+     * @param commitPartitionId Commit partition id. {@code Null} for unlock 
only path.
      * @param partitions Collection of enlisted partitions.
      * @param commit {@code true} if a commit requested.
      * @param commitTimestamp Commit timestamp ({@code null} if it's an abort).
@@ -194,7 +196,7 @@ public class TxCleanupRequestSender {
      * @return Completable future of Void.
      */
     public CompletableFuture<Void> cleanup(
-            ReplicationGroupId commitPartitionId,
+            @Nullable ReplicationGroupId commitPartitionId,
             Collection<EnlistedPartitionGroup> partitions,
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp,
@@ -203,11 +205,14 @@ public class TxCleanupRequestSender {
         Map<ReplicationGroupId, EnlistedPartitionGroup> partitionIds = 
partitions.stream()
                 .collect(toMap(EnlistedPartitionGroup::groupId, identity()));
 
-        // Start tracking the partitions we want to learn the replication 
confirmation from.
-        writeIntentsReplicated.put(
-                txId,
-                new CleanupContext(commitPartitionId, new 
HashSet<>(partitionIds.keySet()), commit ? TxState.COMMITTED : TxState.ABORTED)
-        );
+        if (commitPartitionId != null) {
+            // Start tracking the partitions we want to learn the replication 
confirmation from.
+            writeIntentsReplicated.put(
+                    txId,
+                    new CleanupContext(commitPartitionId, new 
HashSet<>(partitionIds.keySet()),
+                            commit ? TxState.COMMITTED : TxState.ABORTED)
+            );
+        }
 
         return placementDriverHelper.findPrimaryReplicas(partitionIds.keySet())
                 .thenCompose(partitionData -> {
@@ -245,7 +250,7 @@ public class TxCleanupRequestSender {
     }
 
     private void cleanupPartitionsWithoutPrimary(
-            ReplicationGroupId commitPartitionId,
+            @Nullable ReplicationGroupId commitPartitionId,
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp,
             UUID txId,
@@ -267,7 +272,7 @@ public class TxCleanupRequestSender {
     }
 
     private CompletableFuture<Void> cleanupPartitions(
-            ReplicationGroupId commitPartitionId,
+            @Nullable ReplicationGroupId commitPartitionId,
             Map<String, List<EnlistedPartitionGroup>> partitionsByNode,
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp,
@@ -279,14 +284,15 @@ public class TxCleanupRequestSender {
             String node = entry.getKey();
             List<EnlistedPartitionGroup> nodePartitions = entry.getValue();
 
-            
cleanupFutures.add(sendCleanupMessageWithRetries(commitPartitionId, commit, 
commitTimestamp, txId, node, nodePartitions));
+            
cleanupFutures.add(sendCleanupMessageWithRetries(commitPartitionId, commit, 
commitTimestamp, txId, node,
+                    commitPartitionId == null ? null : nodePartitions));
         }
 
         return allOf(cleanupFutures.toArray(new CompletableFuture<?>[0]));
     }
 
     private CompletableFuture<Void> sendCleanupMessageWithRetries(
-            ReplicationGroupId commitPartitionId,
+            @Nullable ReplicationGroupId commitPartitionId,
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp,
             UUID txId,
@@ -305,7 +311,7 @@ public class TxCleanupRequestSender {
                             // or will run `switchWriteIntentsOnPartitions` 
for partitions with no primary.
                             // At the end of the day all write intents will be 
properly converted.
                             if (partitions == null) {
-                                // If we don't have any partition, which is 
the recovery case,
+                                // If we don't have any partition, which is 
the recovery or unlock only case,
                                 // just try again with the same node.
                                 return 
sendCleanupMessageWithRetries(commitPartitionId, commit, commitTimestamp, txId, 
node, partitions);
                             }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 890885baed2..85253160aa1 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -489,6 +489,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler, SystemVi
         // no need to register them.
         // TODO: https://issues.apache.org/jira/browse/IGNITE-24229 - schedule 
expiration for multi-key implicit transactions?
         if (!implicit) {
+            // TODO IGNITE-26531 Unregister is not called on finish.
             transactionExpirationRegistry.register(transaction);
 
             if (isStopping) {
@@ -624,6 +625,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler, SystemVi
             @Nullable ReplicationGroupId commitPartition,
             boolean commitIntent,
             boolean timeout,
+            boolean recovery,
             Map<ReplicationGroupId, PendingTxPartitionEnlistment> 
enlistedGroups,
             UUID txId
     ) {
@@ -695,7 +697,8 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler, SystemVi
                         commit,
                         enlistedGroups,
                         txId,
-                        finishingStateMeta.txFinishFuture()
+                        finishingStateMeta.txFinishFuture(),
+                        txContext.isNoWrites() && !recovery
                 )
         ).whenComplete((unused, throwable) -> {
             if (localNodeId.equals(finishingStateMeta.txCoordinatorId())) {
@@ -731,7 +734,8 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler, SystemVi
             boolean commit,
             Map<ReplicationGroupId, PendingTxPartitionEnlistment> 
enlistedGroups,
             UUID txId,
-            CompletableFuture<TransactionMeta> txFinishFuture
+            CompletableFuture<TransactionMeta> txFinishFuture,
+            boolean unlock
     ) {
         HybridTimestamp commitTimestamp = commitTimestamp(commit);
         // In case of commit it's required to check whether current primaries 
are still the same that were enlisted and whether
@@ -747,6 +751,27 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler, SystemVi
                             Map<ReplicationGroupId, PartitionEnlistment> 
groups = enlistedGroups.entrySet().stream()
                                     .collect(toMap(Entry::getKey, 
Entry::getValue));
 
+                            if (unlock) {
+                                return txCleanupRequestSender.cleanup(null, 
groups, verifiedCommit, commitTimestamp, txId)
+                                        .thenAccept(ignored -> {
+                                            // Don't keep useless state.
+                                            
txStateVolatileStorage.updateMeta(txId, old -> null);
+
+                                            TxStateMeta meta = new TxStateMeta(
+                                                    verifiedCommit ? COMMITTED 
: ABORTED,
+                                                    localNodeId,
+                                                    null,
+                                                    commitTimestamp,
+                                                    null,
+                                                    null,
+                                                    System.currentTimeMillis(),
+                                                    null
+                                            );
+
+                                            txFinishFuture.complete(meta);
+                                        });
+                            }
+
                             return durableFinish(
                                     observableTimestampTracker,
                                     commitPartition,
@@ -1095,13 +1120,12 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler, SystemVi
 
     @Override
     public CompletableFuture<Void> cleanup(
-            ReplicationGroupId commitPartitionId,
+            @Nullable ReplicationGroupId commitPartitionId,
             Map<ReplicationGroupId, PartitionEnlistment> enlistedPartitions,
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp,
             UUID txId
     ) {
-        assertReplicationGroupType(commitPartitionId);
         for (ReplicationGroupId replicationGroupId : 
enlistedPartitions.keySet()) {
             assertReplicationGroupType(replicationGroupId);
         }
@@ -1169,12 +1193,12 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler, SystemVi
         return runAsync(runnable, writeIntentSwitchPool);
     }
 
-    void onCompleteReadOnlyTransaction(boolean commitIntent, TxIdAndTimestamp 
txIdAndTimestamp, boolean timeoutExceeded) {
+    void onCompleteReadOnlyTransaction(boolean commitIntent, TxIdAndTimestamp 
txIdAndTimestamp) {
         UUID txId = txIdAndTimestamp.getTxId();
 
         txMetrics.onReadOnlyTransactionFinished(txId, commitIntent);
 
-        transactionInflights.markReadOnlyTxFinished(txId, timeoutExceeded);
+        transactionInflights.markReadOnlyTxFinished(txId);
     }
 
     @Override
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
index b6f6bd3a5d4..f7853879a69 100644
--- 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
+++ 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
@@ -164,6 +164,8 @@ public class TxManagerTest extends IgniteAbstractTest {
 
     private final TestLowWatermark lowWatermark = spy(new TestLowWatermark());
 
+    private TransactionInflights transactionInflights;
+
     @BeforeEach
     public void setup() {
         clusterService = mock(ClusterService.class, RETURNS_DEEP_STUBS);
@@ -178,7 +180,7 @@ public class TxManagerTest extends IgniteAbstractTest {
 
         RemotelyTriggeredResourceRegistry resourceRegistry = new 
RemotelyTriggeredResourceRegistry();
 
-        TransactionInflights transactionInflights = new 
TransactionInflights(placementDriver, clockService);
+        transactionInflights = new TransactionInflights(placementDriver, 
clockService);
 
         txManager = new TxManagerImpl(
                 txConfiguration,
@@ -350,6 +352,7 @@ public class TxManagerTest extends IgniteAbstractTest {
 
         tx.enlist(replicationGroupId, 10, REMOTE_NODE.name(), 1L);
         tx.assignCommitPartition(replicationGroupId);
+        transactionInflights.track(tx.id()); // Enable cleanup path.
 
         tx.commit();
 
@@ -371,6 +374,7 @@ public class TxManagerTest extends IgniteAbstractTest {
 
         tx.enlist(replicationGroupId, 10, REMOTE_NODE.name(), 1L);
         tx.assignCommitPartition(replicationGroupId);
+        transactionInflights.track(tx.id()); // Enable cleanup path.
 
         tx.rollback();
 
@@ -397,6 +401,7 @@ public class TxManagerTest extends IgniteAbstractTest {
 
         ReplicationGroupId replicationGroupId = targetReplicationGroupId(1, 0);
 
+        transactionInflights.track(tx.id());
         tx.enlist(replicationGroupId, 10, REMOTE_NODE.name(), 1L);
         tx.assignCommitPartition(replicationGroupId);
 
@@ -425,6 +430,7 @@ public class TxManagerTest extends IgniteAbstractTest {
 
         ReplicationGroupId replicationGroupId = targetReplicationGroupId(1, 0);
 
+        transactionInflights.track(tx.id());
         tx.enlist(replicationGroupId, 10, REMOTE_NODE.name(), 1L);
         tx.assignCommitPartition(replicationGroupId);
 
@@ -834,6 +840,8 @@ public class TxManagerTest extends IgniteAbstractTest {
         tx.enlist(replicationGroupId, 10, REMOTE_NODE.name(), 1L);
         tx.assignCommitPartition(replicationGroupId);
 
+        transactionInflights.track(tx.id());
+
         return tx;
     }
 
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java
index 98ccb28dd07..b86ff8bb5a1 100644
--- 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java
+++ 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java
@@ -104,8 +104,8 @@ class ReadWriteTransactionImplTest extends 
BaseIgniteAbstractTest {
     private void startTxAndTryToEnlist(boolean commit) {
         HashSet<UUID> finishedTxs = new HashSet<>();
 
-        Mockito.when(txManager.finish(any(), any(), anyBoolean(), 
anyBoolean(), any(), any())).thenAnswer(invocation -> {
-            finishedTxs.add(invocation.getArgument(5));
+        Mockito.when(txManager.finish(any(), any(), anyBoolean(), 
anyBoolean(), anyBoolean(), any(), any())).thenAnswer(invocation -> {
+            finishedTxs.add(invocation.getArgument(6));
 
             return nullCompletedFuture();
         });

Reply via email to