This is an automated email from the ASF dual-hosted git repository.

ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 5cfd89174d IGNITE-17951 Sql. Enlist partitions in transaction before 
executing a query (#1501)
5cfd89174d is described below

commit 5cfd89174d9d6a23e05a3b920a5c2b07fb1cbd62
Author: Pavel Pereslegin <xxt...@gmail.com>
AuthorDate: Fri Feb 3 18:16:01 2023 +0300

    IGNITE-17951 Sql. Enlist partitions in transaction before executing a query 
(#1501)
---
 .../ignite/client/fakes/FakeInternalTable.java     |  35 +++++
 .../apache/ignite/internal/index/HashIndex.java    |  13 ++
 .../org/apache/ignite/internal/index/Index.java    |  21 +++
 .../apache/ignite/internal/index/SortedIndex.java  |  39 +++--
 .../ignite/internal/index/SortedIndexImpl.java     |  27 ++++
 .../ignite/internal/table/ItTableScanTest.java     |  64 ++++++--
 .../internal/sql/engine/SqlQueryProcessor.java     |  19 +--
 .../internal/sql/engine/exec/ExecutionContext.java |  16 +-
 .../sql/engine/exec/ExecutionServiceImpl.java      |  99 ++++++++++--
 .../sql/engine/exec/LogicalRelImplementor.java     |   4 +-
 .../sql/engine/exec/rel/IndexScanNode.java         |  76 +++++++---
 .../sql/engine/exec/rel/StorageScanNode.java       |   2 +-
 .../sql/engine/exec/rel/TableScanNode.java         |  37 +++--
 .../sql/engine/message/QueryStartRequest.java      |   7 +
 .../sql/engine/metadata/ColocationGroup.java       | 167 +++++++++++++++------
 .../engine/metadata/IgniteMdFragmentMapping.java   |   6 +-
 .../internal/sql/engine/metadata/NodeWithTerm.java |  63 ++++++++
 .../sql/engine/metadata/PartitionWithTerm.java     |  58 +++++++
 .../sql/engine/schema/IgniteTableImpl.java         |   4 +-
 .../sql/engine/trait/DistributionFunction.java     |   4 +-
 .../internal/sql/engine/util/BaseQueryContext.java |  21 +--
 .../sql/engine/util/LocalTxAttributesHolder.java   | 118 +++++++++++++++
 .../internal/sql/engine/StopCalciteModuleTest.java |   9 +-
 .../sql/engine/exec/ExecutionServiceImplTest.java  |  12 +-
 .../sql/engine/exec/RuntimeSortedIndexTest.java    |   3 +-
 .../sql/engine/exec/rel/AbstractExecutionTest.java |  12 +-
 .../exec/rel/IndexScanNodeExecutionTest.java       |  14 +-
 .../engine/exec/rel/MergeJoinExecutionTest.java    |   2 +-
 .../exec/rel/TableScanNodeExecutionTest.java       |  63 +++++---
 .../sql/engine/framework/TestBuilders.java         |   5 +-
 .../internal/sql/engine/framework/TestNode.java    |   2 +-
 .../sql/engine/planner/AbstractPlannerTest.java    |  21 +++
 .../internal/sql/engine/planner/PlannerTest.java   |  49 +++---
 .../ItAbstractInternalTableScanTest.java           |  47 +++++-
 .../ItInternalTableReadWriteScanTest.java          |  30 +++-
 .../ignite/internal/table/InternalTable.java       |  54 +++++++
 .../distributed/storage/InternalTableImpl.java     |  91 ++++++++++-
 .../ignite/internal/utils/PrimaryReplica.java}     |  48 ++++--
 .../table/impl/DummyInternalTableImpl.java         |   9 ++
 39 files changed, 1099 insertions(+), 272 deletions(-)

diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
index 24551df462..f1ad1c501a 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
@@ -38,6 +38,7 @@ import 
org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
+import org.apache.ignite.internal.utils.PrimaryReplica;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.network.ClusterNode;
 import org.jetbrains.annotations.NotNull;
@@ -334,6 +335,21 @@ public class FakeInternalTable implements InternalTable {
         throw new IgniteInternalException(new 
OperationNotSupportedException());
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public Publisher<BinaryRow> scan(
+            int partId,
+            UUID txId,
+            PrimaryReplica recipient,
+            @Nullable UUID indexId,
+            @Nullable BinaryTuplePrefix lowerBound,
+            @Nullable BinaryTuplePrefix upperBound,
+            int flags,
+            @Nullable BitSet columnsToInclude
+    ) {
+        throw new IgniteInternalException(new 
OperationNotSupportedException());
+    }
+
     /** {@inheritDoc} */
     @Override
     public Publisher<BinaryRow> scan(
@@ -370,6 +386,19 @@ public class FakeInternalTable implements InternalTable {
         throw new IgniteInternalException(new 
OperationNotSupportedException());
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public Publisher<BinaryRow> lookup(
+            int partId,
+            UUID txId,
+            PrimaryReplica recipient,
+            UUID indexId,
+            BinaryTuple key,
+            @Nullable BitSet columnsToInclude
+    ) {
+        throw new IgniteInternalException(new 
OperationNotSupportedException());
+    }
+
     /** {@inheritDoc} */
     @Override
     public Publisher<BinaryRow> lookup(
@@ -389,6 +418,12 @@ public class FakeInternalTable implements InternalTable {
         throw new IgniteInternalException(new 
OperationNotSupportedException());
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public List<PrimaryReplica> primaryReplicas() {
+        throw new IgniteInternalException(new 
OperationNotSupportedException());
+    }
+
     /** {@inheritDoc} */
     @Override
     public ClusterNode leaderAssignment(int partition) {
diff --git 
a/modules/index/src/main/java/org/apache/ignite/internal/index/HashIndex.java 
b/modules/index/src/main/java/org/apache/ignite/internal/index/HashIndex.java
index 069b1f7b1b..a4fd38bc72 100644
--- 
a/modules/index/src/main/java/org/apache/ignite/internal/index/HashIndex.java
+++ 
b/modules/index/src/main/java/org/apache/ignite/internal/index/HashIndex.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.utils.PrimaryReplica;
 import org.apache.ignite.network.ClusterNode;
 import org.jetbrains.annotations.Nullable;
 
@@ -81,6 +82,18 @@ public class HashIndex implements Index<IndexDescriptor> {
         return table.lookup(partId, tx, id, key, columns);
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public Publisher<BinaryRow> lookup(
+            int partId,
+            UUID txId,
+            PrimaryReplica recipient,
+            BinaryTuple key,
+            @Nullable BitSet columns
+    ) {
+        return table.lookup(partId, txId, recipient, id, key, columns);
+    }
+
     /** {@inheritDoc} */
     @Override
     public Publisher<BinaryRow> lookup(
diff --git 
a/modules/index/src/main/java/org/apache/ignite/internal/index/Index.java 
b/modules/index/src/main/java/org/apache/ignite/internal/index/Index.java
index 896ee3ab09..5e2fff76b5 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/Index.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/Index.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.utils.PrimaryReplica;
 import org.apache.ignite.network.ClusterNode;
 import org.jetbrains.annotations.Nullable;
 
@@ -53,9 +54,29 @@ public interface Index<DescriptorT extends IndexDescriptor> {
      * @param key Key to lookup.
      * @param columns Columns to include.
      * @return A cursor from resulting rows.
+     * @deprecated IGNITE-17952 Use {@link #lookup(int, UUID, PrimaryReplica, 
BinaryTuple, BitSet)}  instead.
      */
+    @Deprecated
     Publisher<BinaryRow> lookup(int partId, @Nullable InternalTransaction tx, 
BinaryTuple key, @Nullable BitSet columns);
 
+    /**
+     * Returns cursor for the values corresponding to the given key.
+     *
+     * @param partId Partition id.
+     * @param txId Transaction id.
+     * @param recipient Primary replica that will handle given get request.
+     * @param key Key to lookup.
+     * @param columns Columns to include.
+     * @return A cursor from resulting rows.
+     */
+    Publisher<BinaryRow> lookup(
+            int partId,
+            UUID txId,
+            PrimaryReplica recipient,
+            BinaryTuple key,
+            @Nullable BitSet columns
+    );
+
     /**
      * Returns cursor for the values corresponding to the given key.
      *
diff --git 
a/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndex.java 
b/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndex.java
index 0676ba557a..ed45802283 100644
--- 
a/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndex.java
+++ 
b/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndex.java
@@ -18,11 +18,13 @@
 package org.apache.ignite.internal.index;
 
 import java.util.BitSet;
+import java.util.UUID;
 import java.util.concurrent.Flow.Publisher;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryTuplePrefix;
 import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.utils.PrimaryReplica;
 import org.apache.ignite.network.ClusterNode;
 import org.jetbrains.annotations.Nullable;
 
@@ -39,24 +41,28 @@ public interface SortedIndex extends 
Index<SortedIndexDescriptor> {
     byte INCLUDE_RIGHT = 0b10;
 
     /**
-     * Opens a range cursor for given bounds with left bound included in 
result and right excluded.
+     * Opens a range cursor for given bounds. Inclusion of the bounds is 
defined by {@code flags} mask.
      *
      * @param partId Partition.
-     * @param tx Transaction.
-     * @param left Left bound of range.
-     * @param right Right bound of range.
-     * @param columns Columns to include.
+     * @param txId Transaction id.
+     * @param recipient Primary replica that will handle given get request.
+     * @param leftBound Left bound of range.
+     * @param rightBound Right bound of range.
+     * @param flags A mask that defines whether to include bounds into the 
final result or not.
+     * @param columnsToInclude Columns to include.
      * @return A cursor from resulting rows.
+     * @see SortedIndex#INCLUDE_LEFT
+     * @see SortedIndex#INCLUDE_RIGHT
      */
-    default Publisher<BinaryRow> scan(
+    Publisher<BinaryRow> scan(
             int partId,
-            @Nullable InternalTransaction tx,
-            @Nullable BinaryTuplePrefix left,
-            @Nullable BinaryTuplePrefix right,
-            @Nullable BitSet columns
-    ) {
-        return scan(partId, tx, left, right, INCLUDE_LEFT, columns);
-    }
+            UUID txId,
+            PrimaryReplica recipient,
+            @Nullable BinaryTuplePrefix leftBound,
+            @Nullable BinaryTuplePrefix rightBound,
+            int flags,
+            @Nullable BitSet columnsToInclude
+    );
 
     /**
      * Opens a read-only range cursor for given bounds with left bound 
included in result and right excluded.
@@ -81,7 +87,7 @@ public interface SortedIndex extends 
Index<SortedIndexDescriptor> {
     }
 
     /**
-     * Opens a range cursor for given bounds. Inclusion of the bounds is 
defined by {@code includeBounds} mask.
+     * Opens a range cursor for given bounds. Inclusion of the bounds is 
defined by {@code flags} mask.
      *
      * @param partId Partition.
      * @param tx Transaction.
@@ -92,7 +98,9 @@ public interface SortedIndex extends 
Index<SortedIndexDescriptor> {
      * @return A cursor from resulting rows.
      * @see SortedIndex#INCLUDE_LEFT
      * @see SortedIndex#INCLUDE_RIGHT
+     * @deprecated IGNITE-17952 Use {@link #scan(int, UUID, PrimaryReplica, 
BinaryTuplePrefix, BinaryTuplePrefix, int, BitSet)} instead.
      */
+    @Deprecated
     Publisher<BinaryRow> scan(
             int partId,
             @Nullable InternalTransaction tx,
@@ -102,9 +110,8 @@ public interface SortedIndex extends 
Index<SortedIndexDescriptor> {
             @Nullable BitSet columnsToInclude
     );
 
-
     /**
-     * Opens a range cursor for given bounds. Inclusion of the bounds is 
defined by {@code includeBounds} mask.
+     * Opens a range cursor for given bounds. Inclusion of the bounds is 
defined by {@code flags} mask.
      *
      * @param partId Partition.
      * @param readTimestamp Read timestamp.
diff --git 
a/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndexImpl.java
 
b/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndexImpl.java
index 3ec520b895..c3d6f5c65b 100644
--- 
a/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndexImpl.java
+++ 
b/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndexImpl.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.schema.BinaryTuplePrefix;
 import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.utils.PrimaryReplica;
 import org.apache.ignite.network.ClusterNode;
 import org.jetbrains.annotations.Nullable;
 
@@ -82,6 +83,18 @@ public class SortedIndexImpl implements SortedIndex {
         return table.lookup(partId, tx, id, key, columns);
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public Publisher<BinaryRow> lookup(
+            int partId,
+            UUID txId,
+            PrimaryReplica recipient,
+            BinaryTuple key,
+            @Nullable BitSet columns
+    ) {
+        return table.lookup(partId, txId, recipient, id, key, columns);
+    }
+
     /** {@inheritDoc} */
     @Override
     public Publisher<BinaryRow> lookup(
@@ -120,4 +133,18 @@ public class SortedIndexImpl implements SortedIndex {
     ) {
         return table.scan(partId, readTimestamp, recipientNode, id, leftBound, 
rightBound, flags, columnsToInclude);
     }
+
+    /** {@inheritDoc} */
+    @Override
+    public Publisher<BinaryRow> scan(
+            int partId,
+            UUID txId,
+            PrimaryReplica recipient,
+            @Nullable BinaryTuplePrefix leftBound,
+            @Nullable BinaryTuplePrefix rightBound,
+            int flags,
+            @Nullable BitSet columnsToInclude
+    ) {
+        return table.scan(partId, txId, recipient, id, leftBound, rightBound, 
flags, columnsToInclude);
+    }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
index 70c0841c5e..9d9f3690c4 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
@@ -43,6 +43,7 @@ import java.util.stream.IntStream;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.BinaryTuplePrefix;
@@ -56,9 +57,13 @@ import 
org.apache.ignite.internal.schema.configuration.index.TableIndexConfigura
 import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.schema.row.RowAssembler;
 import org.apache.ignite.internal.sql.engine.AbstractBasicIntegrationTest;
+import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.utils.PrimaryReplica;
+import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.table.KeyValueView;
 import org.apache.ignite.table.Tuple;
 import org.apache.ignite.tx.IgniteTransactions;
@@ -107,11 +112,12 @@ public class ItTableScanTest extends 
AbstractBasicIntegrationTest {
 
     @Test
     public void testInsertWaitScanComplete() throws Exception {
+        int partId = 0;
         TableImpl table = getOrCreateTable();
         IgniteTransactions transactions = CLUSTER_NODES.get(0).transactions();
 
         InternalTransaction tx0 = (InternalTransaction) transactions.begin();
-        InternalTransaction tx1 = (InternalTransaction) transactions.begin();
+        InternalTransaction tx1 = startTxWithEnlistedPartition(partId);
 
         InternalTable internalTable = table.internalTable();
 
@@ -119,7 +125,11 @@ public class ItTableScanTest extends 
AbstractBasicIntegrationTest {
 
         ArrayList<BinaryRow> scannedRows = new ArrayList<>();
 
-        Publisher<BinaryRow> publisher = internalTable.scan(0, tx1, 
sortedIndexId, null, null, 0, null);
+        IgniteBiTuple<ClusterNode, Long> leaderWithTerm = 
tx1.enlistedNodeAndTerm(new TablePartitionId(table.tableId(), partId));
+
+        PrimaryReplica recipient = new PrimaryReplica(leaderWithTerm.get1(), 
leaderWithTerm.get2());
+
+        Publisher<BinaryRow> publisher = internalTable.scan(partId, tx1.id(), 
recipient, sortedIndexId, null, null, 0, null);
 
         CompletableFuture<Void> scanned = new CompletableFuture<>();
 
@@ -459,9 +469,15 @@ public class ItTableScanTest extends 
AbstractBasicIntegrationTest {
 
         ArrayList<BinaryRow> scannedRows = new ArrayList<>();
 
-        InternalTransaction tx = (InternalTransaction) 
CLUSTER_NODES.get(0).transactions().begin();
+        int partId = 0;
+
+        InternalTransaction tx = startTxWithEnlistedPartition(partId);
+
+        IgniteBiTuple<ClusterNode, Long> leaderWithTerm = 
tx.enlistedNodeAndTerm(new TablePartitionId(table.tableId(), partId));
 
-        Publisher<BinaryRow> publisher = internalTable.scan(0, tx, 
sortedIndexId, null, null, 0, null);
+        PrimaryReplica recipient = new PrimaryReplica(leaderWithTerm.get1(), 
leaderWithTerm.get2());
+
+        Publisher<BinaryRow> publisher = internalTable.scan(partId, tx.id(), 
recipient, sortedIndexId, null, null, 0, null);
 
         CompletableFuture<Void> scanned = new CompletableFuture<>();
 
@@ -487,7 +503,7 @@ public class ItTableScanTest extends 
AbstractBasicIntegrationTest {
 
         assertEquals(ROW_IDS.size() + 1, scannedRows.size());
 
-        var publisher1 = internalTable.scan(0, tx, sortedIndexId, null, null, 
0, null);
+        Publisher<BinaryRow> publisher1 = internalTable.scan(0, tx.id(), 
recipient, sortedIndexId, null, null, 0, null);
 
         assertEquals(scanAllRows(publisher1).size(), scannedRows.size());
 
@@ -515,11 +531,16 @@ public class ItTableScanTest extends 
AbstractBasicIntegrationTest {
 
         UUID soredIndexId = getSortedIndexId();
 
-        InternalTransaction tx = (InternalTransaction) 
CLUSTER_NODES.get(0).transactions().begin();
+        int partId = 0;
+
+        InternalTransaction tx = startTxWithEnlistedPartition(partId);
+        IgniteBiTuple<ClusterNode, Long> leaderWithTerm = 
tx.enlistedNodeAndTerm(new TablePartitionId(table.tableId(), partId));
+        PrimaryReplica recipient = new PrimaryReplica(leaderWithTerm.get1(), 
leaderWithTerm.get2());
 
         Publisher<BinaryRow> publisher = internalTable.scan(
-                0,
-                tx,
+                partId,
+                tx.id(),
+                recipient,
                 soredIndexId,
                 lowBound,
                 upperBound,
@@ -540,8 +561,9 @@ public class ItTableScanTest extends 
AbstractBasicIntegrationTest {
                 kvView.put(null, Tuple.create().set("key", 9), 
Tuple.create().set("valInt", 9).set("valStr", "New_9")));
 
         Publisher<BinaryRow> publisher1 = internalTable.scan(
-                0,
-                tx,
+                partId,
+                tx.id(),
+                recipient,
                 soredIndexId,
                 lowBound,
                 upperBound,
@@ -763,4 +785,26 @@ public class ItTableScanTest extends 
AbstractBasicIntegrationTest {
 
         return new Row(SCHEMA, new ByteBufferRow(rowBuilder.toBytes()));
     }
+
+    /**
+     * Starts an RW transaction and enlists the specified partition in it.
+     *
+     * @param partId Partition ID.
+     * @return Transaction.
+     */
+    private InternalTransaction startTxWithEnlistedPartition(int partId) {
+        Ignite ignite = CLUSTER_NODES.get(0);
+
+        InternalTransaction tx = (InternalTransaction) 
ignite.transactions().begin();
+
+        InternalTable table = ((TableImpl) 
ignite.tables().table(TABLE_NAME)).internalTable();
+        TablePartitionId tblPartId = new TablePartitionId(table.tableId(), 
partId);
+        RaftGroupService raftSvc = table.partitionRaftGroupService(partId);
+        long term = 
IgniteTestUtils.await(raftSvc.refreshAndGetLeaderWithTerm()).term();
+
+        tx.assignCommitPartition(tblPartId);
+        tx.enlist(tblPartId, new 
IgniteBiTuple<>(table.leaderAssignment(partId), term));
+
+        return tx;
+    }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 2e383ec123..ad699136f6 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -45,7 +45,6 @@ import org.apache.calcite.tools.Frameworks;
 import org.apache.calcite.util.Pair;
 import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
 import org.apache.ignite.internal.hlc.HybridClock;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.index.IndexManager;
 import org.apache.ignite.internal.index.event.IndexEvent;
 import org.apache.ignite.internal.index.event.IndexEventParameters;
@@ -74,6 +73,7 @@ import 
org.apache.ignite.internal.sql.engine.session.SessionInfo;
 import org.apache.ignite.internal.sql.engine.session.SessionManager;
 import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
 import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.internal.sql.engine.util.LocalTxAttributesHolder;
 import org.apache.ignite.internal.storage.DataStorageManager;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.table.event.TableEvent;
@@ -399,22 +399,19 @@ public class SqlQueryProcessor implements QueryProcessor {
                     return nodes.get(0);
                 })
                 .thenCompose(sqlNode -> {
-                    final boolean rwOp = dataModificationOp(sqlNode);
-                    final HybridTimestamp txTime = outerTx != null ? 
outerTx.readTimestamp() : rwOp ? null : clock.now();
+                    boolean rwOp = dataModificationOp(sqlNode);
+                    boolean useDistributedTraits = outerTx != null ? 
outerTx.isReadOnly() : !rwOp;
 
                     BaseQueryContext ctx = BaseQueryContext.builder()
                             .frameworkConfig(
                                     
Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
                                             .defaultSchema(schema)
-                                            .traitDefs(rwOp || (outerTx != 
null && !outerTx.isReadOnly()) ? Commons.LOCAL_TRAITS_SET :
-                                                    
Commons.DISTRIBUTED_TRAITS_SET)
+                                            .traitDefs(useDistributedTraits ? 
Commons.DISTRIBUTED_TRAITS_SET : Commons.LOCAL_TRAITS_SET)
                                             .build()
                             )
                             .logger(LOG)
                             .cancel(queryCancel)
                             .parameters(params)
-                            .transaction(outerTx)
-                            .transactionTime(txTime)
                             .plannerTimeout(PLANNER_TIMEOUT)
                             .build();
 
@@ -425,17 +422,17 @@ public class SqlQueryProcessor implements QueryProcessor {
 
                                 boolean implicitTxRequired = outerTx == null 
&& rwOp;
 
-                                InternalTransaction implicitTx = 
implicitTxRequired ? txManager.begin() : null;
+                                InternalTransaction tx = implicitTxRequired ? 
txManager.begin()
+                                        : outerTx != null ? outerTx : new 
LocalTxAttributesHolder(null, clock.now());
 
-                                BaseQueryContext enrichedContext =
-                                        implicitTxRequired ? 
ctx.toBuilder().transaction(implicitTx).build() : ctx;
+                                BaseQueryContext enrichedContext = 
ctx.toBuilder().transaction(tx).build();
 
                                 var dataCursor = 
executionSrvc.executePlan(plan, enrichedContext);
 
                                 return new AsyncSqlCursorImpl<>(
                                         
SqlQueryType.mapPlanTypeToSqlType(plan.type()),
                                         plan.metadata(),
-                                        implicitTx,
+                                        implicitTxRequired ? tx : null,
                                         new AsyncCursor<List<Object>>() {
                                             @Override
                                             public 
CompletableFuture<BatchedResult<List<Object>>> requestNextAsync(int rows) {
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
index 42fc639162..18eb033296 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
@@ -32,7 +32,6 @@ import java.util.function.Consumer;
 import org.apache.calcite.DataContext;
 import org.apache.calcite.linq4j.QueryProvider;
 import org.apache.calcite.schema.SchemaPlus;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.schema.BinaryConverter;
@@ -84,9 +83,6 @@ public class ExecutionContext<RowT> extends 
AbstractQueryContext implements Data
 
     private final AtomicBoolean cancelFlag = new AtomicBoolean();
 
-    /** Transaction. */
-    private InternalTransaction tx;
-
     /**
      * Need to store timestamp, since SQL standard says that functions such as 
CURRENT_TIMESTAMP return the same value throughout the
      * query.
@@ -104,7 +100,6 @@ public class ExecutionContext<RowT> extends 
AbstractQueryContext implements Data
      * @param fragmentDesc Partitions information.
      * @param handler Row handler.
      * @param params Parameters.
-     * @param tx Transaction.
      */
     @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
     public ExecutionContext(
@@ -115,8 +110,7 @@ public class ExecutionContext<RowT> extends 
AbstractQueryContext implements Data
             String originatingNodeName,
             FragmentDescription fragmentDesc,
             RowHandler<RowT> handler,
-            Map<String, Object> params,
-            InternalTransaction tx
+            Map<String, Object> params
     ) {
         super(qctx);
 
@@ -128,7 +122,6 @@ public class ExecutionContext<RowT> extends 
AbstractQueryContext implements Data
         this.params = params;
         this.localNode = localNode;
         this.originatingNodeName = originatingNodeName;
-        this.tx = tx;
 
         expressionFactory = new ExpressionFactoryImpl<>(
                 this,
@@ -338,12 +331,7 @@ public class ExecutionContext<RowT> extends 
AbstractQueryContext implements Data
 
     /** Transaction for current context. */
     public InternalTransaction transaction() {
-        return tx;
-    }
-
-    /** Read only transaction time. */
-    public HybridTimestamp transactionTime() {
-        return qctx.transactionTime();
+        return qctx.transaction();
     }
 
     /**
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index 406447c379..bf80d7409d 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -37,6 +37,7 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 import org.apache.calcite.tools.Frameworks;
@@ -56,24 +57,35 @@ import 
org.apache.ignite.internal.sql.engine.message.QueryStartRequest;
 import org.apache.ignite.internal.sql.engine.message.QueryStartResponse;
 import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup;
 import org.apache.ignite.internal.sql.engine.message.SqlQueryMessagesFactory;
+import org.apache.ignite.internal.sql.engine.metadata.ColocationGroup;
 import org.apache.ignite.internal.sql.engine.metadata.FragmentDescription;
 import org.apache.ignite.internal.sql.engine.metadata.MappingService;
 import org.apache.ignite.internal.sql.engine.metadata.MappingServiceImpl;
+import org.apache.ignite.internal.sql.engine.metadata.NodeWithTerm;
 import org.apache.ignite.internal.sql.engine.metadata.RemoteException;
 import org.apache.ignite.internal.sql.engine.prepare.DdlPlan;
 import org.apache.ignite.internal.sql.engine.prepare.ExplainPlan;
 import org.apache.ignite.internal.sql.engine.prepare.Fragment;
 import org.apache.ignite.internal.sql.engine.prepare.FragmentPlan;
+import org.apache.ignite.internal.sql.engine.prepare.IgniteRelShuttle;
 import org.apache.ignite.internal.sql.engine.prepare.MappingQueryContext;
 import org.apache.ignite.internal.sql.engine.prepare.MultiStepPlan;
 import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
+import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
 import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
+import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
+import org.apache.ignite.internal.sql.engine.rel.SourceAwareIgniteRel;
+import org.apache.ignite.internal.sql.engine.schema.InternalIgniteTable;
 import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
+import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
 import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
 import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.sql.engine.util.HashFunctionFactoryImpl;
+import org.apache.ignite.internal.sql.engine.util.LocalTxAttributesHolder;
 import org.apache.ignite.internal.sql.engine.util.TypeUtils;
+import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
 import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteInternalCheckedException;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.network.ClusterNode;
@@ -99,6 +111,8 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
 
     private final MessageService msgSrvc;
 
+    private final TopologyService topSrvc;
+
     private final ClusterNode localNode;
 
     private final SqlSchemaManager sqlSchemaManager;
@@ -142,8 +156,8 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
             ExchangeService exchangeSrvc
     ) {
         return new ExecutionServiceImpl<>(
-                topSrvc.localMember(),
                 msgSrvc,
+                topSrvc,
                 new MappingServiceImpl(topSrvc),
                 sqlSchemaManager,
                 ddlCommandHandler,
@@ -160,11 +174,21 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
     }
 
     /**
-     * Constructor. TODO Documentation 
https://issues.apache.org/jira/browse/IGNITE-15859
+     * Constructor.
+     *
+     * @param msgSrvc Message service.
+     * @param topSrvc Topology service.
+     * @param mappingSrvc Nodes mapping calculation service.
+     * @param sqlSchemaManager Schema manager.
+     * @param ddlCmdHnd Handler of the DDL commands.
+     * @param taskExecutor Task executor.
+     * @param handler Row handler.
+     * @param exchangeSrvc Exchange service.
+     * @param implementorFactory Relational node implementor factory.
      */
     public ExecutionServiceImpl(
-            ClusterNode localNode,
             MessageService msgSrvc,
+            TopologyService topSrvc,
             MappingService mappingSrvc,
             SqlSchemaManager sqlSchemaManager,
             DdlCommandHandler ddlCmdHnd,
@@ -173,10 +197,11 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
             ExchangeService exchangeSrvc,
             ImplementorFactory<RowT> implementorFactory
     ) {
-        this.localNode = localNode;
+        this.localNode = topSrvc.localMember();
         this.handler = handler;
         this.msgSrvc = msgSrvc;
         this.mappingSrvc = mappingSrvc;
+        this.topSrvc = topSrvc;
         this.sqlSchemaManager = sqlSchemaManager;
         this.taskExecutor = taskExecutor;
         this.exchangeSrvc = exchangeSrvc;
@@ -197,11 +222,9 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
             BaseQueryContext ctx,
             MultiStepPlan plan
     ) {
-        DistributedQueryManager queryManager;
-
-        InternalTransaction tx = ctx.transaction();
+        DistributedQueryManager queryManager = new 
DistributedQueryManager(ctx, ctx.transaction());
 
-        DistributedQueryManager old = queryManagerMap.put(ctx.queryId(), 
queryManager = new DistributedQueryManager(ctx, tx));
+        DistributedQueryManager old = queryManagerMap.put(ctx.queryId(), 
queryManager);
 
         assert old == null;
 
@@ -210,7 +233,7 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
         return queryManager.execute(plan);
     }
 
-    private BaseQueryContext createQueryContext(UUID queryId, @Nullable String 
schema, Object[] params, HybridTimestamp txTime) {
+    private BaseQueryContext createQueryContext(UUID queryId, @Nullable String 
schema, Object[] params, HybridTimestamp txTime, UUID txId) {
         return BaseQueryContext.builder()
                 .queryId(queryId)
                 .parameters(params)
@@ -220,7 +243,7 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
                                 .build()
                 )
                 .logger(LOG)
-                .transactionTime(txTime)
+                .transaction(new LocalTxAttributesHolder(txId, txTime))
                 .build();
     }
 
@@ -303,7 +326,7 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
         assert nodeName != null && msg != null;
 
         DistributedQueryManager queryManager = 
queryManagerMap.computeIfAbsent(msg.queryId(), key -> {
-            BaseQueryContext ctx = createQueryContext(key, msg.schema(), 
msg.parameters(), msg.txTime());
+            BaseQueryContext ctx = createQueryContext(key, msg.schema(), 
msg.parameters(), msg.txTime(), msg.txId());
 
             return new DistributedQueryManager(ctx);
         });
@@ -433,7 +456,8 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
                     .root(fragment.serialized())
                     .fragmentDescription(desc)
                     .parameters(ctx.parameters())
-                    .txTime(ctx.transactionTime())
+                    .txTime(ctx.transaction().readTimestamp())
+                    .txId(ctx.transaction().id())
                     .build();
 
             var fut = new CompletableFuture<Void>();
@@ -538,8 +562,7 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
                     initiatorNodeName,
                     desc,
                     handler,
-                    Commons.parametersMap(ctx.parameters()),
-                    tx
+                    Commons.parametersMap(ctx.parameters())
             );
         }
 
@@ -583,6 +606,10 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
 
                     // start remote execution
                     for (Fragment fragment : fragments) {
+                        if (tx != null && !tx.isReadOnly()) {
+                            enlistPartitions(fragment, tx);
+                        }
+
                         if (fragment.rootFragment()) {
                             assert rootFragmentId == null;
 
@@ -630,6 +657,50 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
             };
         }
 
+        private void enlistPartitions(Fragment fragment, InternalTransaction 
tx) {
+            // TODO IGNITE-17952 Next condition should be removed.
+            if (!TraitUtils.distributionEnabled(fragment.root())) {
+                return;
+            }
+
+            new IgniteRelShuttle() {
+                @Override
+                public IgniteRel visit(IgniteIndexScan rel) {
+                    enlist(rel);
+
+                    return super.visit(rel);
+                }
+
+                @Override
+                public IgniteRel visit(IgniteTableScan rel) {
+                    enlist(rel);
+
+                    return super.visit(rel);
+                }
+
+                private void enlist(SourceAwareIgniteRel rel) {
+                    InternalIgniteTable tbl = 
rel.getTable().unwrap(InternalIgniteTable.class);
+                    ColocationGroup grp = 
fragment.mapping().findGroup(rel.sourceId());
+
+                    if (grp.assignments().isEmpty()) {
+                        return;
+                    }
+
+                    int partsCnt = grp.assignments().size();
+
+                    tx.assignCommitPartition(new TablePartitionId(tbl.id(), 
ThreadLocalRandom.current().nextInt(partsCnt)));
+
+                    for (int p = 0; p < partsCnt; p++) {
+                        List<NodeWithTerm> assign = grp.assignments().get(p);
+                        NodeWithTerm leaderWithTerm = assign.get(0);
+
+                        tx.enlist(new TablePartitionId(tbl.id(), p),
+                                new 
IgniteBiTuple<>(topSrvc.getByConsistentId(leaderWithTerm.name()), 
leaderWithTerm.term()));
+                    }
+                }
+            }.visit(fragment.root());
+        }
+
         private CompletableFuture<Void> close(boolean cancel) {
             if (!cancelled.compareAndSet(false, true)) {
                 return cancelFut.thenApply(Function.identity());
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
index 5cb79e1cce..63ae7944e7 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
@@ -367,7 +367,7 @@ public class LogicalRelImplementor<RowT> implements 
IgniteRelVisitor<Node<RowT>>
                 ctx.rowHandler().factory(ctx.getTypeFactory(), rowType),
                 idx,
                 tbl,
-                group.partitions(ctx.localNode().name()),
+                group.partitionsWithTerms(ctx.localNode().name()),
                 comp,
                 ranges,
                 filters,
@@ -404,7 +404,7 @@ public class LogicalRelImplementor<RowT> implements 
IgniteRelVisitor<Node<RowT>>
                 ctx,
                 ctx.rowHandler().factory(ctx.getTypeFactory(), rowType),
                 tbl,
-                group.partitions(ctx.localNode().name()),
+                group.partitionsWithTerms(ctx.localNode().name()),
                 filters,
                 prj,
                 requiredColumns == null ? null : requiredColumns.toBitSet()
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java
index ae53e7f8e3..80a1f90a38 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java
@@ -17,10 +17,8 @@
 
 package org.apache.ignite.internal.sql.engine.exec.rel;
 
-import static org.apache.ignite.internal.util.ArrayUtils.nullOrEmpty;
-
-import java.util.Arrays;
 import java.util.BitSet;
+import java.util.Collection;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.concurrent.Flow.Publisher;
@@ -36,12 +34,16 @@ import 
org.apache.ignite.internal.sql.engine.exec.RowConverter;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler;
 import org.apache.ignite.internal.sql.engine.exec.exp.RangeCondition;
 import org.apache.ignite.internal.sql.engine.exec.exp.RangeIterable;
+import org.apache.ignite.internal.sql.engine.metadata.PartitionWithTerm;
 import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
 import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Type;
 import org.apache.ignite.internal.sql.engine.schema.InternalIgniteTable;
 import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.internal.sql.engine.util.LocalTxAttributesHolder;
+import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.util.SubscriptionUtils;
 import org.apache.ignite.internal.util.TransformingIterator;
+import org.apache.ignite.internal.utils.PrimaryReplica;
 import org.jetbrains.annotations.Contract;
 import org.jetbrains.annotations.Nullable;
 
@@ -57,7 +59,8 @@ public class IndexScanNode<RowT> extends 
StorageScanNode<RowT> {
 
     private final RowHandler.RowFactory<RowT> factory;
 
-    private final int[] parts;
+    /** List of pairs containing the partition number to scan with the 
corresponding primary replica term. */
+    private final Collection<PartitionWithTerm> partsWithTerms;
 
     /** Participating columns. */
     private final @Nullable BitSet requiredColumns;
@@ -72,7 +75,7 @@ public class IndexScanNode<RowT> extends 
StorageScanNode<RowT> {
      * @param ctx Execution context.
      * @param rowFactory Row factory.
      * @param schemaTable The table this node should scan.
-     * @param parts Partition numbers to scan.
+     * @param partsWithTerms List of pairs containing the partition number to 
scan with the corresponding primary replica term.
      * @param comp Rows comparator.
      * @param rangeConditions Range conditions.
      * @param filters Optional filter to filter out rows.
@@ -84,7 +87,7 @@ public class IndexScanNode<RowT> extends 
StorageScanNode<RowT> {
             RowHandler.RowFactory<RowT> rowFactory,
             IgniteIndex schemaIndex,
             InternalIgniteTable schemaTable,
-            int[] parts,
+            Collection<PartitionWithTerm> partsWithTerms,
             @Nullable Comparator<RowT> comp,
             @Nullable RangeIterable<RowT> rangeConditions,
             @Nullable Predicate<RowT> filters,
@@ -93,11 +96,11 @@ public class IndexScanNode<RowT> extends 
StorageScanNode<RowT> {
     ) {
         super(ctx, rowFactory, schemaTable, filters, rowTransformer, 
requiredColumns);
 
-        assert !nullOrEmpty(parts);
+        assert partsWithTerms != null && !partsWithTerms.isEmpty();
         assert rangeConditions == null || rangeConditions.size() > 0;
 
         this.schemaIndex = schemaIndex;
-        this.parts = parts;
+        this.partsWithTerms = partsWithTerms;
         this.requiredColumns = requiredColumns;
         this.rangeConditions = rangeConditions;
         this.comp = comp;
@@ -110,16 +113,17 @@ public class IndexScanNode<RowT> extends 
StorageScanNode<RowT> {
     @Override
     protected Publisher<RowT> scan() {
         if (rangeConditions != null) {
-            return SubscriptionUtils.concat(new 
TransformingIterator<>(rangeConditions.iterator(), cond -> 
indexPublisher(parts, cond)));
+            return SubscriptionUtils.concat(
+                    new TransformingIterator<>(rangeConditions.iterator(), 
cond -> indexPublisher(partsWithTerms, cond)));
         } else {
-            return indexPublisher(parts, null);
+            return indexPublisher(partsWithTerms, null);
         }
     }
 
-    private Publisher<RowT> indexPublisher(int[] parts, @Nullable 
RangeCondition<RowT> cond) {
+    private Publisher<RowT> indexPublisher(Collection<PartitionWithTerm> 
partsWithTerms, @Nullable RangeCondition<RowT> cond) {
         Iterator<Publisher<? extends RowT>> it = new TransformingIterator<>(
-                Arrays.stream(parts).iterator(),
-                part -> partitionPublisher(part, cond)
+                partsWithTerms.iterator(),
+                partWithTerm -> partitionPublisher(partWithTerm, cond)
         );
 
         if (comp != null) {
@@ -129,9 +133,9 @@ public class IndexScanNode<RowT> extends 
StorageScanNode<RowT> {
         }
     }
 
-    private Publisher<RowT> partitionPublisher(int part, @Nullable 
RangeCondition<RowT> cond) {
+    private Publisher<RowT> partitionPublisher(PartitionWithTerm partWithTerm, 
@Nullable RangeCondition<RowT> cond) {
         Publisher<BinaryRow> pub;
-        boolean roTx = context().transactionTime() != null;
+        InternalTransaction tx = context().transaction();
 
         if (schemaIndex.type() == Type.SORTED) {
             int flags = 0;
@@ -148,20 +152,32 @@ public class IndexScanNode<RowT> extends 
StorageScanNode<RowT> {
                 flags |= (cond.upperInclude()) ? SortedIndex.INCLUDE_RIGHT : 0;
             }
 
-            if (roTx) {
+            if (tx.isReadOnly()) {
                 pub = ((SortedIndex) schemaIndex.index()).scan(
-                        part,
-                        context().transactionTime(),
+                        partWithTerm.partId(),
+                        tx.readTimestamp(),
                         context().localNode(),
                         lower,
                         upper,
                         flags,
                         requiredColumns
                 );
+            } else if (!(tx instanceof LocalTxAttributesHolder)) {
+                // TODO IGNITE-17952 This block should be removed.
+                // Workaround to make RW scan work from tx coordinator.
+                pub = ((SortedIndex) schemaIndex.index()).scan(
+                        partWithTerm.partId(),
+                        tx,
+                        lower,
+                        upper,
+                        flags,
+                        requiredColumns
+                );
             } else {
                 pub = ((SortedIndex) schemaIndex.index()).scan(
-                        part,
-                        context().transaction(),
+                        partWithTerm.partId(),
+                        tx.id(),
+                        new PrimaryReplica(context().localNode(), 
partWithTerm.term()),
                         lower,
                         upper,
                         flags,
@@ -174,18 +190,28 @@ public class IndexScanNode<RowT> extends 
StorageScanNode<RowT> {
 
             BinaryTuple key = toBinaryTuple(cond.lower());
 
-            if (roTx) {
+            if (tx.isReadOnly()) {
                 pub = schemaIndex.index().lookup(
-                        part,
-                        context().transactionTime(),
+                        partWithTerm.partId(),
+                        tx.readTimestamp(),
                         context().localNode(),
                         key,
                         requiredColumns
                 );
+            } else if (!(tx instanceof LocalTxAttributesHolder)) {
+                // TODO IGNITE-17952 This block should be removed.
+                // Workaround to make RW lookup work from tx coordinator.
+                pub = schemaIndex.index().lookup(
+                        partWithTerm.partId(),
+                        tx,
+                        key,
+                        requiredColumns
+                );
             } else {
                 pub = schemaIndex.index().lookup(
-                        part,
-                        context().transaction(),
+                        partWithTerm.partId(),
+                        tx.id(),
+                        new PrimaryReplica(context().localNode(), 
partWithTerm.term()),
                         key,
                         requiredColumns
                 );
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/StorageScanNode.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/StorageScanNode.java
index 881e3a4f0f..4e2382107a 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/StorageScanNode.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/StorageScanNode.java
@@ -80,7 +80,7 @@ public abstract class StorageScanNode<RowT> extends 
AbstractNode<RowT> {
     ) {
         super(ctx);
 
-        assert context().transaction() != null || context().transactionTime() 
!= null : "Transaction not initialized.";
+        assert ctx.transaction() != null : "Transaction not initialized.";
 
         tableRowConverter = row -> schemaTable.toRow(context(), row, 
rowFactory, requiredColumns);
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNode.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNode.java
index 57227042ec..5c332762e4 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNode.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNode.java
@@ -17,10 +17,8 @@
 
 package org.apache.ignite.internal.sql.engine.exec.rel;
 
-import static org.apache.ignite.internal.util.ArrayUtils.nullOrEmpty;
-
-import java.util.Arrays;
 import java.util.BitSet;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.concurrent.Flow.Publisher;
 import java.util.function.Function;
@@ -28,10 +26,14 @@ import java.util.function.Predicate;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler;
+import org.apache.ignite.internal.sql.engine.metadata.PartitionWithTerm;
 import org.apache.ignite.internal.sql.engine.schema.InternalIgniteTable;
+import org.apache.ignite.internal.sql.engine.util.LocalTxAttributesHolder;
 import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.util.SubscriptionUtils;
 import org.apache.ignite.internal.util.TransformingIterator;
+import org.apache.ignite.internal.utils.PrimaryReplica;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -42,7 +44,8 @@ public class TableScanNode<RowT> extends 
StorageScanNode<RowT> {
     /** Table that provides access to underlying data. */
     private final InternalTable physTable;
 
-    private final int[] parts;
+    /** List of pairs containing the partition number to scan with the 
corresponding primary replica term. */
+    private final Collection<PartitionWithTerm> partsWithTerms;
 
     /**
      * Constructor.
@@ -50,7 +53,7 @@ public class TableScanNode<RowT> extends 
StorageScanNode<RowT> {
      * @param ctx Execution context.
      * @param rowFactory Row factory.
      * @param schemaTable The table this node should scan.
-     * @param parts Partition numbers to scan.
+     * @param partsWithTerms List of pairs containing the partition number to 
scan with the corresponding primary replica term.
      * @param filters Optional filter to filter out rows.
      * @param rowTransformer Optional projection function.
      * @param requiredColumns Optional set of column of interest.
@@ -59,31 +62,37 @@ public class TableScanNode<RowT> extends 
StorageScanNode<RowT> {
             ExecutionContext<RowT> ctx,
             RowHandler.RowFactory<RowT> rowFactory,
             InternalIgniteTable schemaTable,
-            int[] parts,
+            Collection<PartitionWithTerm> partsWithTerms,
             @Nullable Predicate<RowT> filters,
             @Nullable Function<RowT, RowT> rowTransformer,
             @Nullable BitSet requiredColumns
     ) {
         super(ctx, rowFactory, schemaTable, filters, rowTransformer, 
requiredColumns);
 
-        assert !nullOrEmpty(parts);
+        assert partsWithTerms != null && !partsWithTerms.isEmpty();
 
         this.physTable = schemaTable.table();
-        this.parts = parts;
+        this.partsWithTerms = partsWithTerms;
     }
 
     /** {@inheritDoc} */
     @Override
     protected Publisher<RowT> scan() {
-        boolean roTx = context().transactionTime() != null;
-
+        InternalTransaction tx = context().transaction();
         Iterator<Publisher<? extends RowT>> it = new TransformingIterator<>(
-                Arrays.stream(parts).iterator(), part -> {
+                partsWithTerms.iterator(), partWithTerm -> {
             Publisher<BinaryRow> pub;
-            if (roTx) {
-                pub = physTable.scan(part, context().transactionTime(), 
context().localNode());
+
+            if (tx.isReadOnly()) {
+                pub = physTable.scan(partWithTerm.partId(), 
tx.readTimestamp(), context().localNode());
+            } else if (!(tx instanceof LocalTxAttributesHolder)) {
+                // TODO IGNITE-17952 This block should be removed.
+                // Workaround to make RW scan work from tx coordinator.
+                pub = physTable.scan(partWithTerm.partId(), tx);
             } else {
-                pub = physTable.scan(part, context().transaction());
+                PrimaryReplica recipient = new 
PrimaryReplica(context().localNode(), partWithTerm.term());
+
+                pub = physTable.scan(partWithTerm.partId(), tx.id(), 
recipient, null, null, null, 0, null);
             }
 
             return convertPublisher(pub);
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartRequest.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartRequest.java
index 73d85c97a7..d18edca3f6 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartRequest.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartRequest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.sql.engine.message;
 
+import java.util.UUID;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.sql.engine.metadata.FragmentDescription;
 import org.apache.ignite.network.annotations.Marshallable;
@@ -55,4 +56,10 @@ public interface QueryStartRequest extends 
ExecutionContextAwareMessage {
      */
     @Marshallable
     @Nullable HybridTimestamp txTime();
+
+    /**
+     * Transaction id.
+     */
+    @Marshallable
+    UUID txId();
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/ColocationGroup.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/ColocationGroup.java
index 4b153cffab..fc33780504 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/ColocationGroup.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/ColocationGroup.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.sql.engine.metadata;
 
-import static org.apache.ignite.internal.util.ArrayUtils.asList;
 import static org.apache.ignite.internal.util.CollectionUtils.first;
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 import static org.apache.ignite.internal.util.IgniteUtils.firstNotNull;
@@ -27,11 +26,13 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
 import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
 import org.apache.ignite.internal.sql.engine.util.Commons;
-import org.apache.ignite.internal.util.IgniteIntList;
 import org.jetbrains.annotations.NotNull;
 
 /**
@@ -39,14 +40,11 @@ import org.jetbrains.annotations.NotNull;
  * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
  */
 public class ColocationGroup implements Serializable {
-    private static final int SYNTHETIC_PARTITIONS_COUNT = 512;
-    // TODO: 
IgniteSystemProperties.getInteger("IGNITE_CALCITE_SYNTHETIC_PARTITIONS_COUNT", 
512);
-
     private final List<Long> sourceIds;
 
     private final List<String> nodeNames;
 
-    private final List<List<String>> assignments;
+    private final List<List<NodeWithTerm>> assignments;
 
     /**
      * ForNodes.
@@ -60,7 +58,7 @@ public class ColocationGroup implements Serializable {
      * ForAssignments.
      * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
      */
-    public static ColocationGroup forAssignments(List<List<String>> 
assignments) {
+    public static ColocationGroup forAssignments(List<List<NodeWithTerm>> 
assignments) {
         return new ColocationGroup(null, null, assignments);
     }
 
@@ -76,7 +74,7 @@ public class ColocationGroup implements Serializable {
      * Constructor.
      * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
      */
-    private ColocationGroup(List<Long> sourceIds, List<String> nodeNames, 
List<List<String>> assignments) {
+    private ColocationGroup(List<Long> sourceIds, List<String> nodeNames, 
List<List<NodeWithTerm>> assignments) {
         this.sourceIds = sourceIds;
         this.nodeNames = nodeNames;
         this.assignments = assignments;
@@ -100,7 +98,7 @@ public class ColocationGroup implements Serializable {
      * Get list of partitions (index) and nodes (items) having an appropriate 
partition in OWNING state, calculated for
      * distributed tables, involved in query execution.
      */
-    public List<List<String>> assignments() {
+    public List<List<NodeWithTerm>> assignments() {
         return assignments == null ? Collections.emptyList() : assignments;
     }
 
@@ -151,18 +149,20 @@ public class ColocationGroup implements Serializable {
                     + "Replicated query parts are not co-located on all 
nodes");
         }
 
-        List<List<String>> assignments;
+        List<List<NodeWithTerm>> assignments;
+        Set<String> nodeNamesSet = nodeNames == null ? null : new 
HashSet<>(nodeNames);
+        Predicate<String> nodeNamesFilter = nodeNames == null ? v -> true : 
nodeNamesSet::contains;
+
         if (this.assignments == null || other.assignments == null) {
             assignments = firstNotNull(this.assignments, other.assignments);
 
-            if (assignments != null && nodeNames != null) {
-                Set<String> filter = new HashSet<>(nodeNames);
-                List<List<String>> assignments0 = new 
ArrayList<>(assignments.size());
+            if (assignments != null && nodeNamesSet != null) {
+                List<List<NodeWithTerm>> assignments0 = new 
ArrayList<>(assignments.size());
 
                 for (int i = 0; i < assignments.size(); i++) {
-                    List<String> assignment = Commons.intersect(filter, 
assignments.get(i));
+                    List<NodeWithTerm> assignment = 
filterByNodeNames(assignments.get(i), nodeNamesFilter);
 
-                    if (assignment.isEmpty()) { // TODO check with partition 
filters
+                    if (assignment.isEmpty()) {
                         throw new ColocationMappingException("Failed to map 
fragment to location. "
                                 + "Partition mapping is empty [part=" + i + 
"]");
                     }
@@ -174,17 +174,17 @@ public class ColocationGroup implements Serializable {
             }
         } else {
             assert this.assignments.size() == other.assignments.size();
+
             assignments = new ArrayList<>(this.assignments.size());
-            Set<String> filter = nodeNames == null ? null : new 
HashSet<>(nodeNames);
-            for (int i = 0; i < this.assignments.size(); i++) {
-                List<String> assignment = 
Commons.intersect(this.assignments.get(i), other.assignments.get(i));
 
-                if (filter != null) {
-                    assignment.retainAll(filter);
-                }
+            for (int p = 0; p < this.assignments.size(); p++) {
+                List<NodeWithTerm> assignment0 = this.assignments.get(p);
+                List<NodeWithTerm> assignment1 = other.assignments.get(p);
+
+                List<NodeWithTerm> assignment = intersect(assignment0, 
assignment1, nodeNamesFilter, p);
 
-                if (assignment.isEmpty()) { // TODO check with partition 
filters
-                    throw new ColocationMappingException("Failed to map 
fragment to location. Partition mapping is empty [part=" + i + "]");
+                if (assignment.isEmpty()) {
+                    throw new ColocationMappingException("Failed to map 
fragment to location. Partition mapping is empty [part=" + p + "]");
                 }
 
                 assignments.add(assignment);
@@ -194,22 +194,92 @@ public class ColocationGroup implements Serializable {
         return new ColocationGroup(sourceIds, nodeNames, assignments);
     }
 
+    private List<NodeWithTerm> intersect(
+            List<NodeWithTerm> assignment0, 
+            List<NodeWithTerm> assignment1,
+            Predicate<String> filter, 
+            int p
+    ) throws ColocationMappingException {
+        if (assignment0.size() == 1 && assignment1.size() == 1) {
+            NodeWithTerm first = assignment0.get(0);
+            NodeWithTerm second = assignment1.get(0);
+
+            if (filter.test(first.name()) && Objects.equals(first.name(), 
second.name())) {
+                validateTerm(first, second, p);
+
+                return assignment0;
+            }
+
+            return Collections.emptyList();
+        }
+
+        if (assignment0.size() > assignment1.size()) {
+            List<NodeWithTerm> tmp = assignment0;
+            assignment0 = assignment1;
+            assignment1 = tmp;
+        }
+
+        List<NodeWithTerm> assignment = new ArrayList<>();
+
+        // Filter and hash a smaller list.
+        Map<String, NodeWithTerm> nameToAssignmentMapping = 
assignment0.stream()
+                .filter(v -> filter.test(v.name()))
+                .collect(Collectors.toMap(NodeWithTerm::name, nodeWithTerm -> 
nodeWithTerm));
+
+        // Iterate over a larger list.
+        for (NodeWithTerm first : assignment1) {
+            NodeWithTerm second = nameToAssignmentMapping.get(first.name());
+
+            if (second == null) {
+                continue;
+            }
+
+            validateTerm(first, second, p);
+
+            assignment.add(first);
+        }
+
+        return assignment;
+    }
+
+    private void validateTerm(NodeWithTerm first, NodeWithTerm second, int 
partId) throws ColocationMappingException {
+        if (first.term() != second.term()) {
+            throw new ColocationMappingException("Primary replica term has 
been changed during mapping ["
+                    + "node=" + first.name()
+                    + ", expectedTerm=" + first.term()
+                    + ", actualTerm=" + second.term()
+                    + ", part=" + partId
+                    + ']');
+        }
+    }
+
+    private List<NodeWithTerm> filterByNodeNames(List<NodeWithTerm> 
assignment, Predicate<String> filter) {
+        List<NodeWithTerm> res = new ArrayList<>(assignment.size());
+
+        for (NodeWithTerm nodeWithTerm : assignment) {
+            if (!filter.test(nodeWithTerm.name())) {
+                continue;
+            }
+
+            res.add(nodeWithTerm);
+        }
+
+        return res;
+    }
+
     /**
-     * Constructor.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     * Creates a new colocation group using only primary assignments.
+     *
+     * @return Colocation group with primary assignments.
      */
     public ColocationGroup complete() {
-        if (assignments == null && nodeNames == null) {
-            return this;
-        }
-
         if (assignments != null) {
-            List<List<String>> assignments = new 
ArrayList<>(this.assignments.size());
+            List<List<NodeWithTerm>> assignments = new 
ArrayList<>(this.assignments.size());
             Set<String> nodes = new HashSet<>();
-            for (List<String> assignment : this.assignments) {
-                String first = first(assignment);
+            for (List<NodeWithTerm> assignment : this.assignments) {
+                NodeWithTerm first = first(assignment);
                 if (first != null) {
-                    nodes.add(first);
+                    nodes.add(first.name());
                 }
                 assignments.add(first != null ? 
Collections.singletonList(first) : Collections.emptyList());
             }
@@ -217,7 +287,7 @@ public class ColocationGroup implements Serializable {
             return new ColocationGroup(sourceIds, new ArrayList<>(nodes), 
assignments);
         }
 
-        return forNodes0(nodeNames);
+        return mapToNodes(nodeNames);
     }
 
     /**
@@ -230,29 +300,30 @@ public class ColocationGroup implements Serializable {
 
     @NotNull
     private ColocationGroup forNodes0(List<String> nodeNames) {
-        List<List<String>> assignments = new 
ArrayList<>(SYNTHETIC_PARTITIONS_COUNT);
-        for (int i = 0; i < SYNTHETIC_PARTITIONS_COUNT; i++) {
-            assignments.add(asList(nodeNames.get(i % nodeNames.size())));
-        }
         return new ColocationGroup(sourceIds, nodeNames, assignments);
     }
 
     /**
-     * Returns List of partitions to scan on the given node.
+     * Returns list of pairs containing the partition number to scan on the 
given node with the corresponding primary replica term.
      *
-     * @param nodeNames Cluster node consistent ID.
-     * @return List of partitions to scan on the given node.
+     * @param nodeName Cluster node consistent ID.
+     * @return List of pairs containing the partition number to scan on the 
given node with the corresponding primary replica term.
      */
-    public int[] partitions(String nodeNames) {
-        IgniteIntList parts = new IgniteIntList(assignments.size());
+    public List<PartitionWithTerm> partitionsWithTerms(String nodeName) {
+        List<PartitionWithTerm> partsWithTerms = new ArrayList<>();
+
+        for (int p = 0; p < assignments.size(); p++) {
+            List<NodeWithTerm> assignment = assignments.get(p);
+
+            NodeWithTerm nodeWithTerm = first(assignment);
+
+            assert nodeWithTerm != null : "part=" + p;
 
-        for (int i = 0; i < assignments.size(); i++) {
-            List<String> assignment = assignments.get(i);
-            if (Objects.equals(nodeNames, first(assignment))) {
-                parts.add(i);
+            if (Objects.equals(nodeName, nodeWithTerm.name())) {
+                partsWithTerms.add(new PartitionWithTerm(p, 
nodeWithTerm.term()));
             }
         }
 
-        return parts.array();
+        return partsWithTerms;
     }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/IgniteMdFragmentMapping.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/IgniteMdFragmentMapping.java
index 33d9bf189d..c08820b7f9 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/IgniteMdFragmentMapping.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/IgniteMdFragmentMapping.java
@@ -227,6 +227,7 @@ public class IgniteMdFragmentMapping implements 
MetadataHandler<FragmentMappingM
     private static FragmentMapping getFragmentMapping(long sourceId, 
ProjectableFilterableTableScan rel, MappingQueryContext ctx) {
         ColocationGroup group = 
rel.getTable().unwrap(InternalIgniteTable.class).colocationGroup(ctx);
 
+        // TODO IGNITE-17952 The following block should be removed.
         // This condition is kinda workaround to make transactional scan works.
         //
         // For now, scan should be invoked on the node that coordinates the 
transaction.
@@ -234,10 +235,9 @@ public class IgniteMdFragmentMapping implements 
MetadataHandler<FragmentMappingM
         // will need to replace actual distribution with fake one where every 
partition
         // is owned by a local node.
         if (!TraitUtils.distributionEnabled(rel)) {
-            List<List<String>> fakeAssignments = new 
ArrayList<>(group.assignments().size());
-
+            List<List<NodeWithTerm>> fakeAssignments = new 
ArrayList<>(group.assignments().size());
             for (int i = 0; i < group.assignments().size(); i++) {
-                fakeAssignments.add(List.of(ctx.locNodeName()));
+                fakeAssignments.add(List.of(new 
NodeWithTerm(ctx.locNodeName(), -1L)));
             }
 
             group = ColocationGroup.forAssignments(fakeAssignments);
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/NodeWithTerm.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/NodeWithTerm.java
new file mode 100644
index 0000000000..61e15efa5e
--- /dev/null
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/NodeWithTerm.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.metadata;
+
+import java.io.Serializable;
+
+/**
+ * Tuple representing primary replica node name with current term.
+ */
+public class NodeWithTerm implements Serializable {
+    /** Serial version uid. */
+    private static final long serialVersionUID = 0L;
+
+    /** Primary replica node name. */
+    private final String name;
+
+    /** Primary replica term. */
+    private final long term;
+
+    /**
+     * Constructor.
+     *
+     * @param name Primary replica node name.
+     * @param term Primary replica term.
+     */
+    public NodeWithTerm(String name, Long term) {
+        this.name = name;
+        this.term = term;
+    }
+
+    /**
+     * Gets primary replica node name.
+     *
+     * @return Primary replica node name.
+     */
+    public String name() {
+        return name;
+    }
+
+    /**
+     * Gets primary replica term.
+     *
+     * @return Primary replica term.
+     */
+    public long term() {
+        return term;
+    }
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/PartitionWithTerm.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/PartitionWithTerm.java
new file mode 100644
index 0000000000..3da423a687
--- /dev/null
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/PartitionWithTerm.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.metadata;
+
+/**
+ * Tuple representing the number of the partition with its current primary 
replica term.
+ */
+public class PartitionWithTerm {
+    /** Partition number. */
+    private final int partId;
+
+    /** Primary replica term. */
+    private final long term;
+
+    /**
+     * Constructor.
+     *
+     * @param partId partition number
+     * @param term Primary replica term.
+     */
+    public PartitionWithTerm(int partId, Long term) {
+        this.partId = partId;
+        this.term = term;
+    }
+
+    /**
+     * Gets partition number.
+     *
+     * @return Partition number.
+     */
+    public int partId() {
+        return partId;
+    }
+
+    /**
+     * Gets primary replica term.
+     *
+     * @return Primary replica term.
+     */
+    public long term() {
+        return term;
+    }
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
index 8993cf29bc..d27c178db1 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
@@ -55,6 +55,7 @@ import 
org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler;
 import org.apache.ignite.internal.sql.engine.exec.exp.RexImpTable;
 import org.apache.ignite.internal.sql.engine.metadata.ColocationGroup;
+import org.apache.ignite.internal.sql.engine.metadata.NodeWithTerm;
 import org.apache.ignite.internal.sql.engine.prepare.MappingQueryContext;
 import 
org.apache.ignite.internal.sql.engine.rel.logical.IgniteLogicalIndexScan;
 import 
org.apache.ignite.internal.sql.engine.rel.logical.IgniteLogicalTableScan;
@@ -481,7 +482,8 @@ public class IgniteTableImpl extends AbstractTable 
implements InternalIgniteTabl
     }
 
     private ColocationGroup partitionedGroup() {
-        List<List<String>> assignments = table.assignments().stream()
+        List<List<NodeWithTerm>> assignments = table.primaryReplicas().stream()
+                .map(primaryReplica -> new 
NodeWithTerm(primaryReplica.node().name(), primaryReplica.term()))
                 .map(Collections::singletonList)
                 .collect(Collectors.toList());
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/DistributionFunction.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/DistributionFunction.java
index d9ebc797fb..a34258caad 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/DistributionFunction.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/DistributionFunction.java
@@ -28,6 +28,8 @@ import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.ignite.internal.sql.engine.metadata.ColocationGroup;
+import org.apache.ignite.internal.sql.engine.metadata.NodeWithTerm;
+import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.sql.engine.util.HashFunctionFactory;
 import org.apache.ignite.internal.util.IgniteUtils;
 
@@ -223,7 +225,7 @@ public abstract class DistributionFunction {
         public <RowT> Destination<RowT> destination(HashFunctionFactory<RowT> 
hashFuncFactory, ColocationGroup m, ImmutableIntList k) {
             assert m != null && !nullOrEmpty(m.assignments()) && !k.isEmpty();
 
-            List<List<String>> assignments = m.assignments();
+            List<List<String>> assignments = 
Commons.transform(m.assignments(), v -> Commons.transform(v, 
NodeWithTerm::name));
 
             if (IgniteUtils.assertionsEnabled()) {
                 for (List<String> assignment : assignments) {
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java
index 2327fa28e2..2cd28e9225 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java
@@ -49,7 +49,6 @@ import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.tools.FrameworkConfig;
 import org.apache.calcite.tools.Frameworks;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.sql.engine.QueryCancel;
@@ -162,8 +161,6 @@ public final class BaseQueryContext extends 
AbstractQueryContext {
 
     private final InternalTransaction tx;
 
-    private final HybridTimestamp txTs;
-
     private CalciteCatalogReader catalogReader;
 
     private long plannerTimeout;
@@ -178,7 +175,6 @@ public final class BaseQueryContext extends 
AbstractQueryContext {
             Object[] parameters,
             IgniteLogger log,
             InternalTransaction tx,
-            HybridTimestamp txTs,
             long plannerTimeout
     ) {
         super(Contexts.chain(cfg.getContext()));
@@ -191,7 +187,6 @@ public final class BaseQueryContext extends 
AbstractQueryContext {
         this.cancel = cancel;
         this.parameters = parameters;
         this.tx = tx;
-        this.txTs = txTs;
         this.plannerTimeout = plannerTimeout;
 
         RelDataTypeSystem typeSys = 
CALCITE_CONNECTION_CONFIG.typeSystem(RelDataTypeSystem.class, 
cfg.getTypeSystem());
@@ -245,10 +240,6 @@ public final class BaseQueryContext extends 
AbstractQueryContext {
         return tx;
     }
 
-    public HybridTimestamp transactionTime() {
-        return txTs;
-    }
-
     public long plannerTimeout() {
         return plannerTimeout;
     }
@@ -290,8 +281,7 @@ public final class BaseQueryContext extends 
AbstractQueryContext {
                 .logger(log)
                 .cancel(cancel)
                 .parameters(parameters)
-                .transaction(tx)
-                .transactionTime(txTs);
+                .transaction(tx);
     }
 
     /**
@@ -323,8 +313,6 @@ public final class BaseQueryContext extends 
AbstractQueryContext {
 
         private InternalTransaction tx;
 
-        private HybridTimestamp txTs;
-
         private long plannerTimeout;
 
         public Builder frameworkConfig(FrameworkConfig frameworkCfg) {
@@ -357,18 +345,13 @@ public final class BaseQueryContext extends 
AbstractQueryContext {
             return this;
         }
 
-        public Builder transactionTime(HybridTimestamp txTs) {
-            this.txTs = txTs;
-            return this;
-        }
-
         public Builder plannerTimeout(long plannerTimeout) {
             this.plannerTimeout = plannerTimeout;
             return this;
         }
 
         public BaseQueryContext build() {
-            return new BaseQueryContext(queryId, frameworkCfg, cancel, 
parameters, log, tx, txTs, plannerTimeout);
+            return new BaseQueryContext(queryId, frameworkCfg, cancel, 
parameters, log, tx, plannerTimeout);
         }
     }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/LocalTxAttributesHolder.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/LocalTxAttributesHolder.java
new file mode 100644
index 0000000000..232bfb72cf
--- /dev/null
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/LocalTxAttributesHolder.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.util;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.tx.TransactionException;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Local holder of {@link InternalTransaction#id() id} and {@link 
InternalTransaction#readTimestamp() readTimestamp} transaction attributes.
+ */
+// TODO IGNITE-17952 This class must not implement the transaction interface 
and may be passed to the remote nodes.
+public class LocalTxAttributesHolder implements InternalTransaction {
+    /** Transaction id. */
+    private final UUID id;
+
+    /** Read timestamp. */
+    private final @Nullable HybridTimestamp readTimestamp;
+
+    /**
+     * Constructor.
+     *
+     * @param id Transaction id.
+     * @param readTimestamp Read timestamp.
+     */
+    public LocalTxAttributesHolder(UUID id, @Nullable HybridTimestamp 
readTimestamp) {
+        this.readTimestamp = readTimestamp;
+        this.id = id;
+    }
+
+    @Override
+    public boolean isReadOnly() {
+        return readTimestamp != null;
+    }
+
+    @Override
+    public @Nullable HybridTimestamp readTimestamp() {
+        return readTimestamp;
+    }
+
+    @Override
+    public @NotNull UUID id() {
+        return id;
+    }
+
+    @Override
+    public void commit() throws TransactionException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CompletableFuture<Void> commitAsync() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void rollback() throws TransactionException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CompletableFuture<Void> rollbackAsync() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public IgniteBiTuple<ClusterNode, Long> 
enlistedNodeAndTerm(ReplicationGroupId replicationGroupId) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public TxState state() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean assignCommitPartition(ReplicationGroupId 
replicationGroupId) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ReplicationGroupId commitPartition() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public IgniteBiTuple<ClusterNode, Long> enlist(ReplicationGroupId 
replicationGroupId, IgniteBiTuple<ClusterNode, Long> nodeAndTerm) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void enlistResultFuture(CompletableFuture<?> resultFuture) {
+        throw new UnsupportedOperationException();
+    }
+}
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
index b23d82405c..9b1f98e566 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
@@ -36,6 +36,7 @@ import java.lang.management.ManagementFactory;
 import java.lang.management.ThreadInfo;
 import java.lang.management.ThreadMXBean;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -71,6 +72,7 @@ import 
org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
+import org.apache.ignite.internal.utils.PrimaryReplica;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.NodeStoppingException;
@@ -134,6 +136,8 @@ public class StopCalciteModuleTest {
 
     private final TestRevisionRegister testRevisionRegister = new 
TestRevisionRegister();
 
+    private final ClusterNode localNode = new ClusterNode("mock-node-id", 
NODE_NAME, null);
+
     /**
      * Before.
      * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
@@ -142,9 +146,7 @@ public class StopCalciteModuleTest {
     public void before(TestInfo testInfo) {
         when(clusterSrvc.messagingService()).thenReturn(msgSrvc);
         when(clusterSrvc.topologyService()).thenReturn(topologySrvc);
-
-        ClusterNode node = new ClusterNode("mock-node-id", NODE_NAME, null);
-        when(topologySrvc.localMember()).thenReturn(node);
+        when(topologySrvc.localMember()).thenReturn(localNode);
 
         SchemaDescriptor schemaDesc = new SchemaDescriptor(
                 1,
@@ -221,6 +223,7 @@ public class StopCalciteModuleTest {
         );
 
         when(tbl.tableId()).thenReturn(UUID.randomUUID());
+        when(tbl.primaryReplicas()).thenReturn(List.of(new 
PrimaryReplica(localNode, -1L)));
 
         when(txManager.begin()).thenReturn(mock(InternalTransaction.class));
         when(tbl.storage()).thenReturn(mock(MvTableStorage.class));
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index 71b5ca1af5..c29818745b 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -83,6 +83,7 @@ import 
org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
 import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.sql.engine.util.HashFunctionFactory;
 import org.apache.ignite.internal.sql.engine.util.HashFunctionFactoryImpl;
+import org.apache.ignite.internal.sql.engine.util.LocalTxAttributesHolder;
 import org.apache.ignite.internal.testframework.IgniteTestUtils.RunnableX;
 import org.apache.ignite.internal.util.ArrayUtils;
 import org.apache.ignite.lang.IgniteInternalCheckedException;
@@ -90,6 +91,7 @@ import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.TopologyService;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -383,12 +385,17 @@ public class ExecutionServiceImplTest {
 
         var schemaManagerMock = mock(SqlSchemaManager.class);
 
+        var clusterNode = new ClusterNode(UUID.randomUUID().toString(), 
nodeName, NetworkAddress.from("127.0.0.1:1111"));
+
+        var topologyService = mock(TopologyService.class);
+
+        when(topologyService.localMember()).thenReturn(clusterNode);
+
         when(schemaManagerMock.tableById(any(), anyInt())).thenReturn(table);
 
-        var clusterNode = new ClusterNode(UUID.randomUUID().toString(), 
nodeName, NetworkAddress.from("127.0.0.1:1111"));
         var executionService = new ExecutionServiceImpl<>(
-                clusterNode,
                 messageService,
+                topologyService,
                 (single, filter) -> single ? 
List.of(nodeNames.get(ThreadLocalRandom.current().nextInt(nodeNames.size()))) : 
nodeNames,
                 schemaManagerMock,
                 mock(DdlCommandHandler.class),
@@ -413,6 +420,7 @@ public class ExecutionServiceImplTest {
                                 .defaultSchema(wrap(schema))
                                 .build()
                 )
+                .transaction(new LocalTxAttributesHolder(UUID.randomUUID(), 
null))
                 .logger(LOG)
                 .build();
     }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
index af6d1daf2b..125760b94c 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
@@ -117,8 +117,7 @@ public class RuntimeSortedIndexTest extends 
IgniteAbstractTest {
                         "fake-test-node",
                         null,
                         ArrayRowHandler.INSTANCE,
-                        Map.of(),
-                        null
+                        Map.of()
                 ),
                 RelCollations.of(ImmutableIntList.copyOf(idxCols)),
                 (o1, o2) -> {
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
index 7e81eb4ede..dfa70fe72a 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
@@ -17,9 +17,6 @@
 
 package org.apache.ignite.internal.sql.engine.exec.rel;
 
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
 import java.util.ArrayDeque;
 import java.util.Deque;
@@ -43,12 +40,12 @@ import 
org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutorImpl;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler;
 import org.apache.ignite.internal.sql.engine.metadata.FragmentDescription;
 import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
+import org.apache.ignite.internal.sql.engine.util.LocalTxAttributesHolder;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.internal.thread.LogUncaughtExceptionHandler;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
-import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.util.Pair;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.NetworkAddress;
@@ -103,11 +100,9 @@ public class AbstractExecutionTest extends 
IgniteAbstractTest {
 
         FragmentDescription fragmentDesc = new FragmentDescription(0, null, 
null, Long2ObjectMaps.emptyMap());
 
-        InternalTransaction tx = mock(InternalTransaction.class);
-        
when(tx.rollbackAsync()).thenReturn(CompletableFuture.completedFuture(null));
-
         return new ExecutionContext<>(
                 BaseQueryContext.builder()
+                        .transaction(new 
LocalTxAttributesHolder(UUID.randomUUID(), null))
                         .logger(log)
                         .build(),
                 taskExecutor,
@@ -116,8 +111,7 @@ public class AbstractExecutionTest extends 
IgniteAbstractTest {
                 "fake-test-node",
                 fragmentDesc,
                 ArrayRowHandler.INSTANCE,
-                Map.of(),
-                tx
+                Map.of()
         );
     }
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java
index 91526fc9ef..8e0850a577 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java
@@ -33,6 +33,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.Flow.Publisher;
 import java.util.concurrent.Flow.Subscription;
 import java.util.concurrent.ThreadLocalRandom;
@@ -59,6 +60,7 @@ import 
org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
 import org.apache.ignite.internal.sql.engine.exec.exp.RangeCondition;
 import org.apache.ignite.internal.sql.engine.exec.exp.RangeIterable;
 import org.apache.ignite.internal.sql.engine.exec.exp.RexImpTable;
+import org.apache.ignite.internal.sql.engine.metadata.PartitionWithTerm;
 import org.apache.ignite.internal.sql.engine.planner.AbstractPlannerTest;
 import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
 import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Type;
@@ -337,13 +339,13 @@ public class IndexScanNodeExecutionTest extends 
AbstractExecutionTest {
         //CHECKSTYLE:OFF:Indentation
         Mockito.doAnswer(invocation -> {
                     if (key != null) {
-                        validateBound(indexDescriptor, schemaDescriptor, 
invocation.getArgument(2));
+                        validateBound(indexDescriptor, schemaDescriptor, 
invocation.getArgument(3));
                     }
 
                     return dummyPublisher(partitionData(tableData, 
schemaDescriptor, invocation.getArgument(0)));
                 })
                 .when(hashIndexMock)
-                .lookup(Mockito.anyInt(), any(), any(), any());
+                .lookup(Mockito.anyInt(), (UUID) any(), any(), any(), any());
         //CHECKSTYLE:ON:Indentation
 
         IgniteIndex indexMock = mock(IgniteIndex.class);
@@ -382,15 +384,15 @@ public class IndexScanNodeExecutionTest extends 
AbstractExecutionTest {
         //CHECKSTYLE:OFF:Indentation
         Mockito.doAnswer(invocation -> {
                     if (lowerBound != null) {
-                        validateBoundPrefix(indexDescriptor, schemaDescriptor, 
invocation.getArgument(2));
+                        validateBoundPrefix(indexDescriptor, schemaDescriptor, 
invocation.getArgument(3));
                     }
                     if (upperBound != null) {
-                        validateBoundPrefix(indexDescriptor, schemaDescriptor, 
invocation.getArgument(3));
+                        validateBoundPrefix(indexDescriptor, schemaDescriptor, 
invocation.getArgument(4));
                     }
 
                     return dummyPublisher(partitionData(tableData, 
schemaDescriptor, invocation.getArgument(0)));
                 }).when(sortedIndexMock)
-                .scan(Mockito.anyInt(), any(), any(), any(), Mockito.anyInt(), 
any());
+                .scan(Mockito.anyInt(), (UUID) any(), any(), any(), any(), 
Mockito.anyInt(), any());
         //CHECKSTYLE:ON:Indentation
 
         IgniteIndex indexMock = mock(IgniteIndex.class);
@@ -437,7 +439,7 @@ public class IndexScanNodeExecutionTest extends 
AbstractExecutionTest {
                 ectx.rowHandler().factory(ectx.getTypeFactory(), rowType),
                 index,
                 new TestTable(rowType, schemaDescriptor),
-                new int[]{0, 2},
+                List.of(new PartitionWithTerm(0, -1L), new 
PartitionWithTerm(2, -1L)),
                 index.type() == Type.SORTED ? comp : null,
                 rangeIterable,
                 null,
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java
index ee4680ed19..d5c781c7d6 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java
@@ -488,7 +488,7 @@ public class MergeJoinExecutionTest extends 
AbstractExecutionTest {
 
         ExecutionContext<Object[]> ectx =
                 new 
ExecutionContext<>(BaseQueryContext.builder().logger(log).build(), null, null, 
null,
-                        null, null, ArrayRowHandler.INSTANCE, null, null);
+                        null, null, ArrayRowHandler.INSTANCE, null);
 
         ExpressionFactoryImpl<Object[]> expFactory = new 
ExpressionFactoryImpl<>(ectx, typeFactory, SqlConformanceEnum.DEFAULT);
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
index d927e2e61a..6069a9edd5 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
@@ -21,23 +21,25 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.Mockito.mock;
 
 import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
-import java.util.ArrayList;
 import java.util.BitSet;
-import java.util.Collection;
+import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.Flow.Subscription;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.BinaryTuplePrefix;
 import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
+import org.apache.ignite.internal.sql.engine.metadata.PartitionWithTerm;
 import org.apache.ignite.internal.sql.engine.planner.AbstractPlannerTest;
 import org.apache.ignite.internal.sql.engine.schema.InternalIgniteTable;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
@@ -48,12 +50,11 @@ import org.apache.ignite.internal.sql.engine.util.TypeUtils;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
-import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
 import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
+import org.apache.ignite.internal.utils.PrimaryReplica;
 import org.apache.ignite.network.ClusterNode;
-import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.Test;
 
@@ -72,7 +73,9 @@ public class TableScanNodeExecutionTest extends 
AbstractExecutionTest {
 
         int inBufSize = Commons.IN_BUFFER_SIZE;
 
-        int[] parts = {0, 1, 2};
+        List<PartitionWithTerm> partsWithTerms = Stream.of(0, 1, 2)
+                .map(p -> new PartitionWithTerm(p, -1L))
+                .collect(Collectors.toList());
 
         int probingCnt = 50;
 
@@ -93,7 +96,7 @@ public class TableScanNodeExecutionTest extends 
AbstractExecutionTest {
 
             dataAmount = size;
 
-            TableScanNode<Object[]> scanNode = new TableScanNode<>(ctx, 
rowFactory, tbl, parts, null, null, null);
+            TableScanNode<Object[]> scanNode = new TableScanNode<>(ctx, 
rowFactory, tbl, partsWithTerms, null, null, null);
 
             RootNode<Object[]> root = new RootNode<>(ctx);
 
@@ -106,7 +109,7 @@ public class TableScanNodeExecutionTest extends 
AbstractExecutionTest {
                 ++cnt;
             }
 
-            assertEquals(sizes[i++] * parts.length, cnt);
+            assertEquals(sizes[i++] * partsWithTerms.size(), cnt);
         }
     }
 
@@ -161,29 +164,39 @@ public class TableScanNodeExecutionTest extends 
AbstractExecutionTest {
         }
 
         @Override
-        protected CompletableFuture<Collection<BinaryRow>> enlistCursorInTx(
-                @NotNull InternalTransaction tx,
+        public Publisher<BinaryRow> scan(
                 int partId,
-                long scanId,
-                int batchSize,
+                UUID txId,
+                PrimaryReplica recipient,
                 @Nullable UUID indexId,
-                @Nullable BinaryTuple exactKey,
                 @Nullable BinaryTuplePrefix lowerBound,
                 @Nullable BinaryTuplePrefix upperBound,
                 int flags,
                 @Nullable BitSet columnsToInclude
         ) {
-            int fillAmount = Math.min(dataAmount - processedPerPart[partId], 
batchSize);
-
-            Collection<BinaryRow> out = new ArrayList<>(fillAmount);
-
-            for (int i = 0; i < fillAmount; ++i) {
-                out.add(bbRow);
-            }
-
-            processedPerPart[partId] += fillAmount;
-
-            return CompletableFuture.completedFuture(out);
+            return s -> {
+                s.onSubscribe(new Subscription() {
+                    @Override
+                    public void request(long n) {
+                        int fillAmount = Math.min(dataAmount - 
processedPerPart[partId], (int) n);
+
+                        processedPerPart[partId] += fillAmount;
+
+                        for (int i = 0; i < fillAmount; ++i) {
+                            s.onNext(bbRow);
+                        }
+
+                        if (processedPerPart[partId] == dataAmount) {
+                            s.onComplete();
+                        }
+                    }
+
+                    @Override
+                    public void cancel() {
+                        // No-op.
+                    }
+                });
+            };
         }
     }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
index 7885aa5b5e..be091a1c24 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.sql.engine.framework;
 
 import static org.apache.ignite.lang.IgniteStringFormatter.format;
-import static org.mockito.Mockito.mock;
 
 import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
 import java.util.ArrayList;
@@ -45,7 +44,6 @@ import 
org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
 import org.apache.ignite.internal.sql.engine.schema.TableDescriptorImpl;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
 import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
-import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.network.ClusterNode;
 
 /**
@@ -208,8 +206,7 @@ public class TestBuilders {
                     node.name(),
                     description,
                     ArrayRowHandler.INSTANCE,
-                    Map.of(),
-                    mock(InternalTransaction.class)
+                    Map.of()
             );
         }
     }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
index 8968f50fde..94277e04a4 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
@@ -109,8 +109,8 @@ public class TestNode implements LifecycleAware {
         ));
 
         executionService = registerService(new ExecutionServiceImpl<>(
-                topologyService.localMember(),
                 messageService,
+                topologyService,
                 new MappingServiceImpl(topologyService),
                 schemaManager,
                 mock(DdlCommandHandler.class),
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
index 6022b24811..4bfad4f49b 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
@@ -130,6 +130,7 @@ import 
org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.util.ArrayUtils;
+import org.apache.ignite.internal.utils.PrimaryReplica;
 import org.apache.ignite.network.ClusterNode;
 import org.jetbrains.annotations.Nullable;
 
@@ -1229,6 +1230,13 @@ public abstract class AbstractPlannerTest extends 
IgniteAbstractTest {
             throw new AssertionError("Should not be called");
         }
 
+        /** {@inheritDoc} */
+        @Override
+        public Publisher<BinaryRow> lookup(int partId, UUID txId, 
PrimaryReplica recipient, BinaryTuple key,
+                @Nullable BitSet columns) {
+            throw new AssertionError("Should not be called");
+        }
+
         /** {@inheritDoc} */
         @Override
         public Publisher<BinaryRow> lookup(int partId, HybridTimestamp 
timestamp, ClusterNode recipient, BinaryTuple key, BitSet columns) {
@@ -1247,6 +1255,12 @@ public abstract class AbstractPlannerTest extends 
IgniteAbstractTest {
                 @Nullable BinaryTuplePrefix leftBound, @Nullable 
BinaryTuplePrefix rightBound, int flags, BitSet columnsToInclude) {
             throw new AssertionError("Should not be called");
         }
+
+        @Override
+        public Publisher<BinaryRow> scan(int partId, UUID txId, PrimaryReplica 
recipient, @Nullable BinaryTuplePrefix leftBound,
+                @Nullable BinaryTuplePrefix rightBound, int flags, @Nullable 
BitSet columnsToInclude) {
+            throw new AssertionError("Should not be called");
+        }
     }
 
     static class TestHashIndex implements Index<IndexDescriptor> {
@@ -1296,6 +1310,13 @@ public abstract class AbstractPlannerTest extends 
IgniteAbstractTest {
             throw new AssertionError("Should not be called");
         }
 
+        /** {@inheritDoc} */
+        @Override
+        public Publisher<BinaryRow> lookup(int partId, UUID txId, 
PrimaryReplica recipient, BinaryTuple key,
+                @Nullable BitSet columns) {
+            throw new AssertionError("Should not be called");
+        }
+
         /** {@inheritDoc} */
         @Override
         public Publisher<BinaryRow> lookup(int partId, HybridTimestamp 
timestamp, ClusterNode recipient, BinaryTuple key, BitSet columns) {
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTest.java
index b4f081d56f..b008e65d44 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTest.java
@@ -45,6 +45,7 @@ import org.apache.calcite.util.ImmutableIntList;
 import org.apache.calcite.util.Util;
 import org.apache.ignite.internal.index.ColumnCollation;
 import org.apache.ignite.internal.sql.engine.metadata.ColocationGroup;
+import org.apache.ignite.internal.sql.engine.metadata.NodeWithTerm;
 import org.apache.ignite.internal.sql.engine.metadata.cost.IgniteCostFactory;
 import org.apache.ignite.internal.sql.engine.prepare.IgnitePlanner;
 import org.apache.ignite.internal.sql.engine.prepare.MappingQueryContext;
@@ -82,6 +83,8 @@ import org.junit.jupiter.api.condition.OS;
 public class PlannerTest extends AbstractPlannerTest {
     private static List<String> NODES;
 
+    private static List<NodeWithTerm> NODES_WITH_TERM;
+
     /**
      * Init.
      * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
@@ -89,9 +92,13 @@ public class PlannerTest extends AbstractPlannerTest {
     @BeforeAll
     public static void init() {
         NODES = new ArrayList<>(4);
+        NODES_WITH_TERM = new ArrayList<>(4);
 
         for (int i = 0; i < 4; i++) {
-            NODES.add(UUID.randomUUID().toString());
+            String nodeName = UUID.randomUUID().toString();
+
+            NODES.add(nodeName);
+            NODES_WITH_TERM.add(new NodeWithTerm(nodeName, 0L));
         }
     }
 
@@ -113,11 +120,11 @@ public class PlannerTest extends AbstractPlannerTest {
             @Override
             public ColocationGroup colocationGroup(MappingQueryContext ctx) {
                 return ColocationGroup.forAssignments(Arrays.asList(
-                        select(NODES, 0, 1),
-                        select(NODES, 1, 2),
-                        select(NODES, 2, 0),
-                        select(NODES, 0, 1),
-                        select(NODES, 1, 2)
+                        select(NODES_WITH_TERM, 0, 1),
+                        select(NODES_WITH_TERM, 1, 2),
+                        select(NODES_WITH_TERM, 2, 0),
+                        select(NODES_WITH_TERM, 0, 1),
+                        select(NODES_WITH_TERM, 1, 2)
                 ));
             }
 
@@ -134,11 +141,11 @@ public class PlannerTest extends AbstractPlannerTest {
                     .build(), "PROJECT") {
             @Override public ColocationGroup 
colocationGroup(MappingQueryContext ctx) {
                 return ColocationGroup.forAssignments(Arrays.asList(
-                    select(NODES, 0, 1),
-                    select(NODES, 1, 2),
-                    select(NODES, 2, 0),
-                    select(NODES, 0, 1),
-                    select(NODES, 1, 2)));
+                    select(NODES_WITH_TERM, 0, 1),
+                    select(NODES_WITH_TERM, 1, 2),
+                    select(NODES_WITH_TERM, 2, 0),
+                    select(NODES_WITH_TERM, 0, 1),
+                    select(NODES_WITH_TERM, 1, 2)));
             }
 
             @Override public IgniteDistribution distribution() {
@@ -295,10 +302,10 @@ public class PlannerTest extends AbstractPlannerTest {
             @Override
             public ColocationGroup colocationGroup(MappingQueryContext ctx) {
                 return ColocationGroup.forAssignments(Arrays.asList(
-                        select(NODES, 1, 2),
-                        select(NODES, 2, 3),
-                        select(NODES, 3, 0),
-                        select(NODES, 0, 1)
+                        select(NODES_WITH_TERM, 1, 2),
+                        select(NODES_WITH_TERM, 2, 3),
+                        select(NODES_WITH_TERM, 3, 0),
+                        select(NODES_WITH_TERM, 0, 1)
                 ));
             }
 
@@ -376,9 +383,9 @@ public class PlannerTest extends AbstractPlannerTest {
             @Override
             public ColocationGroup colocationGroup(MappingQueryContext ctx) {
                 return ColocationGroup.forAssignments(Arrays.asList(
-                        select(NODES, 0),
-                        select(NODES, 1),
-                        select(NODES, 2)
+                        select(NODES_WITH_TERM, 0),
+                        select(NODES_WITH_TERM, 1),
+                        select(NODES_WITH_TERM, 2)
                 ));
             }
 
@@ -521,9 +528,9 @@ public class PlannerTest extends AbstractPlannerTest {
             @Override
             public ColocationGroup colocationGroup(MappingQueryContext ctx) {
                 return ColocationGroup.forAssignments(Arrays.asList(
-                        select(NODES, 1),
-                        select(NODES, 2),
-                        select(NODES, 3)
+                        select(NODES_WITH_TERM, 1),
+                        select(NODES_WITH_TERM, 2),
+                        select(NODES_WITH_TERM, 3)
                 ));
             }
 
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
index 2a0b44b60c..b67ecd4c26 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.distributed;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doAnswer;
@@ -37,6 +38,7 @@ import java.util.concurrent.Flow;
 import java.util.concurrent.Flow.Publisher;
 import java.util.concurrent.Flow.Subscriber;
 import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
@@ -61,6 +63,7 @@ import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.TxState;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
@@ -87,7 +90,7 @@ public abstract class ItAbstractInternalTableScanTest extends 
IgniteAbstractTest
     private MvPartitionStorage mockStorage;
 
     /** Internal table to test. */
-    protected InternalTable internalTbl;
+    DummyInternalTableImpl internalTbl;
 
     private final HybridClock clock = new HybridClockImpl();
 
@@ -196,7 +199,9 @@ public abstract class ItAbstractInternalTableScanTest 
extends IgniteAbstractTest
             return cursor;
         });
 
-        scan(0, null).subscribe(new Subscriber<>() {
+        InternalTransaction tx = startTx();
+
+        scan(0, tx).subscribe(new Subscriber<>() {
 
             @Override
             public void onSubscribe(Subscription subscription) {
@@ -212,6 +217,9 @@ public abstract class ItAbstractInternalTableScanTest 
extends IgniteAbstractTest
             public void onError(Throwable throwable) {
                 gotException.set(throwable);
                 subscriberFinishedLatch.countDown();
+
+                // Rollback the transaction manually, because only ID of the 
explicit transaction is passed to the internal table.
+                tx.rollback();
             }
 
             @Override
@@ -220,9 +228,13 @@ public abstract class ItAbstractInternalTableScanTest 
extends IgniteAbstractTest
             }
         });
 
-        subscriberFinishedLatch.await();
+        assertTrue(subscriberFinishedLatch.await(10, TimeUnit.SECONDS), 
"count=" + subscriberFinishedLatch.getCount());
 
         assertEquals(gotException.get().getCause().getClass(), 
NoSuchElementException.class);
+
+        if (tx != null) {
+            assertEquals(TxState.ABORTED, tx.state());
+        }
     }
 
     /**
@@ -237,7 +249,9 @@ public abstract class ItAbstractInternalTableScanTest 
extends IgniteAbstractTest
 
         when(mockStorage.scan(any(HybridTimestamp.class))).thenThrow(new 
StorageException("Some storage exception"));
 
-        scan(0, null).subscribe(new Subscriber<>() {
+        InternalTransaction tx = startTx();
+
+        scan(0, tx).subscribe(new Subscriber<>() {
 
             @Override
             public void onSubscribe(Subscription subscription) {
@@ -253,6 +267,9 @@ public abstract class ItAbstractInternalTableScanTest 
extends IgniteAbstractTest
             public void onError(Throwable throwable) {
                 gotException.set(throwable);
                 gotExceptionLatch.countDown();
+
+                // Rollback the transaction manually, because only ID of the 
explicit transaction is passed to the internal table.
+                tx.rollback();
             }
 
             @Override
@@ -264,8 +281,11 @@ public abstract class ItAbstractInternalTableScanTest 
extends IgniteAbstractTest
         gotExceptionLatch.await();
 
         assertEquals(gotException.get().getCause().getClass(), 
StorageException.class);
-    }
 
+        if (tx != null) {
+            assertEquals(TxState.ABORTED, tx.state());
+        }
+    }
 
     /**
      * Checks that {@link IllegalArgumentException} is thrown in case of 
invalid partition.
@@ -398,7 +418,9 @@ public abstract class ItAbstractInternalTableScanTest 
extends IgniteAbstractTest
         // The latch that allows to await Subscriber.onError() before 
asserting test invariants.
         CountDownLatch subscriberAllDataAwaitLatch = new CountDownLatch(1);
 
-        scan(0, null).subscribe(new Subscriber<>() {
+        InternalTransaction tx = startTx();
+
+        scan(0, tx).subscribe(new Subscriber<>() {
             private Subscription subscription;
 
             @Override
@@ -438,6 +460,19 @@ public abstract class ItAbstractInternalTableScanTest 
extends IgniteAbstractTest
         for (int i = 0; i < expItems.size(); i++) {
             assertArrayEquals(expItems.get(i), gotItems.get(i));
         }
+
+        if (tx != null) {
+            tx.commit();
+        }
+    }
+
+    /**
+     * Start transaction if needed.
+     *
+     * @return Started transaction or {@code null}.
+     */
+    protected InternalTransaction startTx() {
+        return null;
     }
 
     /**
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
index 3f6f15cf13..3f1fd642cc 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
@@ -18,9 +18,15 @@
 package org.apache.ignite.distributed;
 
 import java.util.concurrent.Flow.Publisher;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.table.InternalTable;
+import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.utils.PrimaryReplica;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.network.ClusterNode;
 
 /**
  * Tests for {@link InternalTable#scan(int, 
org.apache.ignite.internal.tx.InternalTransaction)}.
@@ -29,6 +35,28 @@ public class ItInternalTableReadWriteScanTest extends 
ItAbstractInternalTableSca
     /** {@inheritDoc} */
     @Override
     protected Publisher<BinaryRow> scan(int part, InternalTransaction tx) {
-        return internalTbl.scan(part, tx);
+        if (tx == null) {
+            return internalTbl.scan(part, null);
+        }
+
+        IgniteBiTuple<ClusterNode, Long> leaderWithTerm = 
tx.enlistedNodeAndTerm(new TablePartitionId(internalTbl.tableId(), part));
+        PrimaryReplica recipient = new PrimaryReplica(leaderWithTerm.get1(), 
leaderWithTerm.get2());
+
+        return internalTbl.scan(part, tx.id(), recipient, null, null, null, 0, 
null);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    protected InternalTransaction startTx() {
+        InternalTransaction tx = internalTbl.txManager().begin();
+
+        TablePartitionId tblPartId = new 
TablePartitionId(internalTbl.tableId(), ((TablePartitionId) 
internalTbl.groupId()).partitionId());
+        RaftGroupService raftSvc = 
internalTbl.partitionRaftGroupService(tblPartId.partitionId());
+        long term = 
IgniteTestUtils.await(raftSvc.refreshAndGetLeaderWithTerm()).term();
+
+        tx.assignCommitPartition(tblPartId);
+        tx.enlist(tblPartId, new 
IgniteBiTuple<>(internalTbl.leaderAssignment(tblPartId.partitionId()), term));
+
+        return tx;
     }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
index cb248a0cf7..8c4a67e7f0 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
@@ -34,6 +34,7 @@ import 
org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.LockException;
 import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
+import org.apache.ignite.internal.utils.PrimaryReplica;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.tx.TransactionException;
 import org.jetbrains.annotations.NotNull;
@@ -296,6 +297,29 @@ public interface InternalTable extends ManuallyCloseable {
             @Nullable BitSet columnsToInclude
     );
 
+    /**
+     * Scans given partition index, providing {@link Publisher} that 
reactively notifies about partition rows.
+     *
+     * @param partId The partition.
+     * @param txId Transaction id.
+     * @param recipient Primary replica that will handle given get request.
+     * @param lowerBound Lower search bound.
+     * @param upperBound Upper search bound.
+     * @param flags Control flags. See {@link 
org.apache.ignite.internal.storage.index.SortedIndexStorage} constants.
+     * @param columnsToInclude Row projection.
+     * @return {@link Publisher} that reactively notifies about partition rows.
+     */
+    Publisher<BinaryRow> scan(
+            int partId,
+            UUID txId,
+            PrimaryReplica recipient,
+            @Nullable UUID indexId,
+            @Nullable BinaryTuplePrefix lowerBound,
+            @Nullable BinaryTuplePrefix upperBound,
+            int flags,
+            @Nullable BitSet columnsToInclude
+    );
+
     /**
      * Scans given partition index, providing {@link Publisher} that 
reactively notifies about partition rows.
      *
@@ -348,7 +372,9 @@ public interface InternalTable extends ManuallyCloseable {
      * @param key Key to search.
      * @param columnsToInclude Row projection.
      * @return {@link Publisher} that reactively notifies about partition rows.
+     * @deprecated IGNITE-17952 Use {@link #lookup(int, UUID, PrimaryReplica, 
UUID, BinaryTuple, BitSet)} instead.
      */
+    @Deprecated
     Publisher<BinaryRow> lookup(
             int partId,
             @Nullable InternalTransaction tx,
@@ -357,6 +383,27 @@ public interface InternalTable extends ManuallyCloseable {
             @Nullable BitSet columnsToInclude
     );
 
+    /**
+     * Lookup rows corresponding to the given key given partition index, 
providing {@link Publisher}
+     * that reactively notifies about partition rows.
+     *
+     * @param partId The partition.
+     * @param txId Transaction id.
+     * @param recipient Primary replica that will handle given get request.
+     * @param indexId Index id.
+     * @param key Key to search.
+     * @param columnsToInclude Row projection.
+     * @return {@link Publisher} that reactively notifies about partition rows.
+     */
+    Publisher<BinaryRow> lookup(
+            int partId,
+            UUID txId,
+            PrimaryReplica recipient,
+            UUID indexId,
+            BinaryTuple key,
+            @Nullable BitSet columnsToInclude
+    );
+
     /**
      * Gets a count of partitions of the table.
      *
@@ -374,6 +421,13 @@ public interface InternalTable extends ManuallyCloseable {
      */
     List<String> assignments();
 
+    /**
+     * Gets a list of current primary replicas for each partition.
+     *
+     * @return List of current primary replicas for each partition.
+     */
+    List<PrimaryReplica> primaryReplicas();
+
     /**
      * Returns cluster node that is the leader of the corresponding partition 
group or throws an exception if
      * it cannot be found.
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 554498ac9d..9dc4315734 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -77,6 +77,7 @@ import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.LockException;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
+import org.apache.ignite.internal.utils.PrimaryReplica;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.IgniteFiveFunction;
@@ -350,7 +351,7 @@ public class InternalTableImpl implements InternalTable {
      * @param columnsToInclude Row projection.
      * @return Batch of retrieved rows.
      */
-    protected CompletableFuture<Collection<BinaryRow>> enlistCursorInTx(
+    private CompletableFuture<Collection<BinaryRow>> enlistCursorInTx(
             @NotNull InternalTransaction tx,
             int partId,
             long scanId,
@@ -853,6 +854,19 @@ public class InternalTableImpl implements InternalTable {
         return scan(partId, tx, indexId, key, null, null, 0, columnsToInclude);
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public Publisher<BinaryRow> lookup(
+            int partId,
+            UUID txId,
+            PrimaryReplica recipient,
+            UUID indexId,
+            BinaryTuple key,
+            @Nullable BitSet columnsToInclude
+    ) {
+        return scan(partId, txId, recipient, indexId, key, null, null, 0, 
columnsToInclude);
+    }
+
     /** {@inheritDoc} */
     @Override
     public Publisher<BinaryRow> scan(
@@ -967,6 +981,57 @@ public class InternalTableImpl implements InternalTable {
         );
     }
 
+
+    /** {@inheritDoc} */
+    @Override
+    public Publisher<BinaryRow> scan(
+            int partId,
+            UUID txId,
+            PrimaryReplica recipient,
+            @Nullable UUID indexId,
+            @Nullable BinaryTuplePrefix lowerBound,
+            @Nullable BinaryTuplePrefix upperBound,
+            int flags,
+            @Nullable BitSet columnsToInclude
+    ) {
+        return scan(partId, txId, recipient, indexId, null, lowerBound, 
upperBound, flags, columnsToInclude);
+    }
+
+    private Publisher<BinaryRow> scan(
+            int partId,
+            UUID txId,
+            PrimaryReplica recipient,
+            @Nullable UUID indexId,
+            @Nullable BinaryTuple exactKey,
+            @Nullable BinaryTuplePrefix lowerBound,
+            @Nullable BinaryTuplePrefix upperBound,
+            int flags,
+            @Nullable BitSet columnsToInclude
+    ) {
+        return new PartitionScanPublisher(
+                (scanId, batchSize) -> {
+                    ReplicationGroupId partGroupId = 
partitionMap.get(partId).groupId();
+
+                    ReadWriteScanRetrieveBatchReplicaRequest request = 
tableMessagesFactory.readWriteScanRetrieveBatchReplicaRequest()
+                            .groupId(partGroupId)
+                            .transactionId(txId)
+                            .scanId(scanId)
+                            .indexToUse(indexId)
+                            .exactKey(exactKey)
+                            .lowerBound(lowerBound)
+                            .upperBound(upperBound)
+                            .flags(flags)
+                            .columnsToInclude(columnsToInclude)
+                            .batchSize(batchSize)
+                            .term(recipient.term())
+                            .build();
+
+                    return replicaSvc.invoke(recipient.node(), request);
+                },
+                // TODO: IGNITE-17666 Close cursor tx finish.
+                Function.identity());
+    }
+
     /**
      * Validates partition index.
      *
@@ -1015,6 +1080,30 @@ public class InternalTableImpl implements InternalTable {
                 .collect(Collectors.toList());
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public List<PrimaryReplica> primaryReplicas() {
+        List<Entry<RaftGroupService>> entries = new 
ArrayList<>(partitionMap.int2ObjectEntrySet());
+        List<CompletableFuture<LeaderWithTerm>> futs = new ArrayList<>();
+
+        entries.sort(Comparator.comparingInt(Entry::getIntKey));
+
+        for (Entry<RaftGroupService> e : entries) {
+            futs.add(e.getValue().refreshAndGetLeaderWithTerm());
+        }
+
+        List<PrimaryReplica> primaryReplicas = new ArrayList<>(entries.size());
+
+        for (CompletableFuture<LeaderWithTerm> fut : futs) {
+            LeaderWithTerm leaderWithTerm = fut.join();
+            ClusterNode primaryNode = 
clusterNodeResolver.apply(leaderWithTerm.leader().consistentId());
+
+            primaryReplicas.add(new PrimaryReplica(primaryNode, 
leaderWithTerm.term()));
+        }
+
+        return primaryReplicas;
+    }
+
     @Override
     public ClusterNode leaderAssignment(int partition) {
         awaitLeaderInitialization();
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/utils/PrimaryReplica.java
similarity index 50%
copy from 
modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
copy to 
modules/table/src/main/java/org/apache/ignite/internal/utils/PrimaryReplica.java
index 3f6f15cf13..e0114ea1bb 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/utils/PrimaryReplica.java
@@ -15,20 +15,46 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.distributed;
+package org.apache.ignite.internal.utils;
 
-import java.util.concurrent.Flow.Publisher;
-import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.table.InternalTable;
-import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.network.ClusterNode;
 
 /**
- * Tests for {@link InternalTable#scan(int, 
org.apache.ignite.internal.tx.InternalTransaction)}.
+ * Tuple representing primary replica node with current term.
  */
-public class ItInternalTableReadWriteScanTest extends 
ItAbstractInternalTableScanTest {
-    /** {@inheritDoc} */
-    @Override
-    protected Publisher<BinaryRow> scan(int part, InternalTransaction tx) {
-        return internalTbl.scan(part, tx);
+public class PrimaryReplica {
+    /** Primary replica node. */
+    private final ClusterNode node;
+
+    /** Replica term. */
+    private final long term;
+
+    /**
+     * Constructor.
+     *
+     * @param node Primary replica node.
+     * @param term Replica term.
+     */
+    public PrimaryReplica(ClusterNode node, long term) {
+        this.node = node;
+        this.term = term;
+    }
+
+    /**
+     * Gets primary replica node.
+     *
+     * @return Primary replica node.
+     */
+    public ClusterNode node() {
+        return node;
+    }
+
+    /**
+     * Gets replica term.
+     *
+     * @return Replica term.
+     */
+    public long term() {
+        return term;
     }
 }
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 3346feb172..f8e4846776 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -317,6 +317,15 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
         return groupId;
     }
 
+    /**
+     * Gets the transaction manager that is bound to the table.
+     *
+     * @return Transaction manager.
+     */
+    public TxManager txManager() {
+        return txManager;
+    }
+
     /** {@inheritDoc} */
     @Override
     public @NotNull String name() {

Reply via email to