This is an automated email from the ASF dual-hosted git repository.

korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 784e098a840 IGNITE-28460 Sql. Choose commit partition from list of 
involved partitions (#7936)
784e098a840 is described below

commit 784e098a84065d7155f70923acdc6e26bc4b740e
Author: korlov42 <[email protected]>
AuthorDate: Tue Apr 7 17:02:38 2026 +0300

    IGNITE-28460 Sql. Choose commit partition from list of involved partitions 
(#7936)
---
 .../sql/engine/exec/ExecutionServiceImpl.java      |  79 +++++----------
 .../sql/engine/exec/TransactionEnlistTest.java     | 106 ++++++++++++++++++++-
 .../engine/exec/rel/ScannableTableSelfTest.java    |  12 ++-
 .../sql/engine/framework/NoOpTransaction.java      |  11 +--
 4 files changed, 140 insertions(+), 68 deletions(-)

diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index b7bfe8fb861..23d413eab70 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -27,6 +27,7 @@ import static 
org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
 
 import com.github.benmanes.caffeine.cache.Caffeine;
 import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
 import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
 import java.lang.management.ManagementFactory;
 import java.lang.management.ThreadInfo;
@@ -49,7 +50,6 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -114,15 +114,10 @@ import 
org.apache.ignite.internal.sql.engine.prepare.DdlPlan;
 import org.apache.ignite.internal.sql.engine.prepare.ExplainPlan;
 import org.apache.ignite.internal.sql.engine.prepare.ExplainablePlan;
 import org.apache.ignite.internal.sql.engine.prepare.Fragment;
-import org.apache.ignite.internal.sql.engine.prepare.IgniteRelShuttle;
 import org.apache.ignite.internal.sql.engine.prepare.KillPlan;
 import org.apache.ignite.internal.sql.engine.prepare.MultiStepPlan;
 import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
-import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
 import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
-import org.apache.ignite.internal.sql.engine.rel.IgniteTableModify;
-import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
-import org.apache.ignite.internal.sql.engine.rel.SourceAwareIgniteRel;
 import org.apache.ignite.internal.sql.engine.schema.IgniteSchemas;
 import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
 import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
@@ -1292,65 +1287,39 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, LogicalTopo
         }
 
         private void enlistPartitions(MappedFragment mappedFragment, 
InternalTransaction tx) {
-            // no need to traverse the tree if fragment has no tables
-            if (mappedFragment.fragment().tables().isEmpty()) {
-                return;
-            }
-
-            new IgniteRelShuttle() {
-                @Override
-                public IgniteRel visit(IgniteIndexScan rel) {
-                    enlist(rel);
-
-                    return super.visit(rel);
-                }
-
-                @Override
-                public IgniteRel visit(IgniteTableScan rel) {
-                    enlist(rel);
-
-                    return super.visit(rel);
-                }
+            boolean shouldAssignCommitPartition = tx.commitPartition() == null;
+            for (Long2ObjectMap.Entry<IgniteTable> entry : 
mappedFragment.fragment().tables().long2ObjectEntrySet()) {
+                long sourceId = entry.getLongKey();
+                IgniteTable table = entry.getValue();
 
-                @Override
-                public IgniteRel visit(IgniteTableModify rel) {
-                    enlist(rel);
+                ColocationGroup colocationGroup = 
mappedFragment.groupsBySourceId().get(sourceId);
+                Int2ObjectMap<NodeWithConsistencyToken> assignments = 
colocationGroup.assignments();
 
-                    return super.visit(rel);
+                if (assignments.isEmpty()) {
+                    continue;
                 }
 
-                private void enlist(int tableId, int zoneId, 
Int2ObjectMap<NodeWithConsistencyToken> assignments) {
-                    if (assignments.isEmpty()) {
-                        return;
-                    }
-
-                    int partsCnt = assignments.size();
-
-                    tx.assignCommitPartition(new ZonePartitionId(zoneId, 
ThreadLocalRandom.current().nextInt(partsCnt)));
-
-                    for (Int2ObjectMap.Entry<NodeWithConsistencyToken> 
partWithToken : assignments.int2ObjectEntrySet()) {
-                        ZonePartitionId replicationGroupId = new 
ZonePartitionId(zoneId, partWithToken.getIntKey());
+                int tableId = table.id();
+                int zoneId = table.zoneId();
 
-                        NodeWithConsistencyToken assignment = 
partWithToken.getValue();
-
-                        tx.enlist(
-                                replicationGroupId,
-                                tableId,
-                                assignment.name(),
-                                assignment.enlistmentConsistencyToken()
-                        );
-                    }
+                if (shouldAssignCommitPartition) {
+                    tx.assignCommitPartition(new ZonePartitionId(zoneId, 
assignments.keySet().iterator().nextInt()));
+                    shouldAssignCommitPartition = false;
                 }
 
-                private void enlist(SourceAwareIgniteRel rel) {
-                    IgniteTable igniteTable = 
rel.getTable().unwrap(IgniteTable.class);
+                for (Int2ObjectMap.Entry<NodeWithConsistencyToken> 
partWithToken : assignments.int2ObjectEntrySet()) {
+                    ZonePartitionId partitionId = new ZonePartitionId(zoneId, 
partWithToken.getIntKey());
 
-                    ColocationGroup colocationGroup = 
mappedFragment.groupsBySourceId().get(rel.sourceId());
-                    Int2ObjectMap<NodeWithConsistencyToken> assignments = 
colocationGroup.assignments();
+                    NodeWithConsistencyToken assignment = 
partWithToken.getValue();
 
-                    enlist(igniteTable.id(), igniteTable.zoneId(), 
assignments);
+                    tx.enlist(
+                            partitionId,
+                            tableId,
+                            assignment.name(),
+                            assignment.enlistmentConsistencyToken()
+                    );
                 }
-            }.visit(mappedFragment.fragment().root());
+            }
         }
 
         private CompletableFuture<Void> close(CancellationReason reason) {
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TransactionEnlistTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TransactionEnlistTest.java
index ba8861c9366..88a4e5848ea 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TransactionEnlistTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TransactionEnlistTest.java
@@ -21,6 +21,8 @@ import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 
 import java.util.List;
@@ -29,10 +31,12 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.ignite.internal.hlc.HybridTimestampTracker;
 import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
 import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
 import org.apache.ignite.internal.sql.engine.InternalSqlRow;
 import org.apache.ignite.internal.sql.engine.QueryProcessor;
 import org.apache.ignite.internal.sql.engine.SqlProperties;
+import org.apache.ignite.internal.sql.engine.exec.mapping.ColocationGroup;
 import org.apache.ignite.internal.sql.engine.framework.DataProvider;
 import org.apache.ignite.internal.sql.engine.framework.NoOpTransaction;
 import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
@@ -40,23 +44,31 @@ import 
org.apache.ignite.internal.sql.engine.framework.TestCluster;
 import org.apache.ignite.internal.sql.engine.framework.TestNode;
 import org.apache.ignite.internal.sql.engine.prepare.QueryMetadata;
 import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
+import org.apache.ignite.internal.sql.engine.schema.PartitionCalculator;
+import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
 import org.apache.ignite.internal.sql.engine.util.InjectQueryCheckerFactory;
 import org.apache.ignite.internal.sql.engine.util.QueryChecker;
 import org.apache.ignite.internal.sql.engine.util.QueryCheckerExtension;
 import org.apache.ignite.internal.sql.engine.util.QueryCheckerFactory;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.type.NativeType;
+import org.apache.ignite.internal.type.NativeTypes;
 import org.apache.ignite.lang.CancellationToken;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.ArgumentMatcher;
 import org.mockito.Mockito;
 
 /** Transactions enlist count test. */
 @ExtendWith(QueryCheckerExtension.class)
 public class TransactionEnlistTest extends BaseIgniteAbstractTest {
+    private static final int PARTITIONS_COUNT = 3;
     private static final String NODE_NAME1 = "N1";
 
     @InjectQueryCheckerFactory
@@ -72,13 +84,45 @@ public class TransactionEnlistTest extends 
BaseIgniteAbstractTest {
 
         //noinspection ConcatenationWithEmptyString
         CLUSTER.node("N1").initSchema(""
-                + "CREATE ZONE test_zone (partitions 3) storage profiles 
['Default'];"
-                + "CREATE TABLE t1 (id INT PRIMARY KEY, val INT) ZONE 
test_zone");
+                + "CREATE ZONE test_zone (partitions " + PARTITIONS_COUNT + ") 
storage profiles ['Default'];"
+                + "CREATE TABLE t1 (id INT PRIMARY KEY, val INT) ZONE 
test_zone;");
 
         CLUSTER.setAssignmentsProvider("T1", (partitionCount, b) -> 
IntStream.range(0, partitionCount)
                 .mapToObj(i -> List.of("N1"))
                 .collect(Collectors.toList()));
         CLUSTER.setDataProvider("T1", 
TestBuilders.tableScan(DataProvider.fromCollection(List.of())));
+        CLUSTER.setUpdatableTable("T1", new UpdatableTable() {
+            @Override
+            public TableDescriptor descriptor() {
+                return null;
+            }
+
+            @Override
+            public <RowT> CompletableFuture<?> 
insertAll(ExecutionContext<RowT> ectx, List<RowT> rows, ColocationGroup 
colocationGroup) {
+                return nullCompletedFuture();
+            }
+
+            @Override
+            public <RowT> CompletableFuture<Void> insert(@Nullable 
InternalTransaction explicitTx, ExecutionContext<RowT> ectx, RowT row) {
+                return nullCompletedFuture();
+            }
+
+            @Override
+            public <RowT> CompletableFuture<?> 
upsertAll(ExecutionContext<RowT> ectx, List<RowT> rows, ColocationGroup 
colocationGroup) {
+                return nullCompletedFuture();
+            }
+
+            @Override
+            public <RowT> CompletableFuture<Boolean> delete(@Nullable 
InternalTransaction explicitTx, ExecutionContext<RowT> ectx,
+                    RowT key) {
+                return nullCompletedFuture();
+            }
+
+            @Override
+            public <RowT> CompletableFuture<?> 
deleteAll(ExecutionContext<RowT> ectx, List<RowT> rows, ColocationGroup 
colocationGroup) {
+                return nullCompletedFuture();
+            }
+        });
     }
 
     @AfterAll
@@ -104,6 +148,55 @@ public class TransactionEnlistTest extends 
BaseIgniteAbstractTest {
         Mockito.verify(spiedTx, times(2)).enlist(any(), anyInt(), any(), 
anyLong());
     }
 
+    @ParameterizedTest
+    @ValueSource(ints = {1, 2, 3})
+    void testCommitPartitionChoice(int id) {
+        NoOpTransaction tx = NoOpTransaction.readWrite("t1", false);
+
+        NoOpTransaction spiedTx = Mockito.spy(tx);
+
+        assertQuery("UPDATE t1 /*+ no_index */ SET val = 42 WHERE id = ?", 
spiedTx)
+                .withParam(id)
+                .check();
+
+        int expectedPartition = expectedPartition(id);
+        {
+            ArgumentMatcher<ZonePartitionId> partitionIdMatch = 
zonePartitionId -> zonePartitionId.partitionId() == expectedPartition;
+            // We expect commit partitions to be assigned once for given 
transaction.
+            Mockito.verify(spiedTx, times(1))
+                    .assignCommitPartition(argThat(partitionIdMatch));
+            // Individual partition on the other hand will be enlisted for 
every source.
+            // In this particular case -- first time for scan and second for 
Modify node.
+            Mockito.verify(spiedTx, times(2))
+                    .enlist(argThat(partitionIdMatch), anyInt(), any(), 
anyLong());
+        }
+
+        {
+            // Due to partition pruning we don't expect any more enlistment.
+            // We should not try to assign other partition as commit partition 
as well.
+            ArgumentMatcher<ZonePartitionId> partitionIdMismatch = 
zonePartitionId -> zonePartitionId.partitionId() != expectedPartition;
+            Mockito.verify(spiedTx, never())
+                    .assignCommitPartition(argThat(partitionIdMismatch));
+            Mockito.verify(spiedTx, never())
+                    .enlist(argThat(partitionIdMismatch), anyInt(), any(), 
anyLong());
+        }
+    }
+
+    @Test
+    void testNoCommitPartitionAssignment() {
+        NoOpTransaction tx = NoOpTransaction.readWrite("t1", false);
+        tx.assignCommitPartition(new ZonePartitionId(1, 1));
+
+        NoOpTransaction spiedTx = Mockito.spy(tx);
+
+        assertQuery("UPDATE t1 /*+ no_index */ SET val = 42 WHERE id = ?", 
spiedTx)
+                .withParam(1)
+                .check();
+
+        // Transaction already has a commit partition, so no assignment is 
expected during query processing.
+        Mockito.verify(spiedTx, never()).assignCommitPartition(any());
+    }
+
     private static QueryChecker assertQuery(String qry, InternalTransaction 
tx) {
         TestNode testNode = CLUSTER.node(NODE_NAME1);
 
@@ -149,10 +242,9 @@ public class TransactionEnlistTest extends 
BaseIgniteAbstractTest {
                 String qry,
                 Object... params
         ) {
-            assert params == null || params.length == 0 : "params are not 
supported";
             assert !prepareOnly : "Expected that the query will only be 
prepared, but not executed";
 
-            AsyncSqlCursor<InternalSqlRow> sqlCursor = 
node.executeQuery(transaction, qry);
+            AsyncSqlCursor<InternalSqlRow> sqlCursor = 
node.executeQuery(transaction, qry, params);
 
             return CompletableFuture.completedFuture(sqlCursor);
         }
@@ -169,4 +261,10 @@ public class TransactionEnlistTest extends 
BaseIgniteAbstractTest {
             return nullCompletedFuture();
         }
     }
+
+    private static int expectedPartition(int key) {
+        var calculator = new PartitionCalculator(PARTITIONS_COUNT, new 
NativeType[] {NativeTypes.INT32});
+        calculator.append(key);
+        return calculator.partition();
+    }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
index 420d3aad04e..33815f21301 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
@@ -59,6 +59,7 @@ import org.apache.calcite.util.ImmutableIntList;
 import org.apache.ignite.internal.binarytuple.BinaryTuple;
 import org.apache.ignite.internal.binarytuple.BinaryTuplePrefix;
 import org.apache.ignite.internal.network.InternalClusterNode;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowEx;
 import org.apache.ignite.internal.sql.engine.api.expressions.RowFactory;
@@ -80,6 +81,7 @@ import org.apache.ignite.internal.table.OperationContext;
 import org.apache.ignite.internal.table.TxContext;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.type.StructNativeType;
+import org.apache.ignite.tx.Transaction;
 import org.hamcrest.Matchers;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.Named;
@@ -220,7 +222,7 @@ public class ScannableTableSelfTest extends 
BaseIgniteAbstractTest {
         for (Bound leftBound : Bound.values()) {
             for (Bound rightBound : Bound.values()) {
                 params.add(Arguments.of(NoOpTransaction.readOnly("RO", false), 
leftBound, rightBound));
-                params.add(Arguments.of(NoOpTransaction.readWrite("RW", 
false), leftBound, rightBound));
+                params.add(Arguments.of(rwWithCommitPartition(), leftBound, 
rightBound));
             }
         }
 
@@ -483,10 +485,16 @@ public class ScannableTableSelfTest extends 
BaseIgniteAbstractTest {
     private static Stream<Arguments> transactions() {
         return Stream.of(
                 Arguments.of(Named.of("Read-only transaction", 
NoOpTransaction.readOnly("RO", false))),
-                Arguments.of(Named.of("Read-write transaction", 
NoOpTransaction.readWrite("RW", false)))
+                Arguments.of(Named.of("Read-write transaction", 
rwWithCommitPartition()))
         );
     }
 
+    private static Transaction rwWithCommitPartition() {
+        NoOpTransaction tx = NoOpTransaction.readWrite("RW", false);
+        tx.assignCommitPartition(new ZonePartitionId(1, 1));
+        return tx;
+    }
+
     private class Tester {
 
         final ScannableTable scannableTable;
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
index 22af25100ab..aa7c27e4d97 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
@@ -26,6 +26,7 @@ import java.net.InetSocketAddress;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.network.ClusterNodeImpl;
 import org.apache.ignite.internal.network.InternalClusterNode;
@@ -42,12 +43,8 @@ import org.jetbrains.annotations.Nullable;
  * Dummy transaction that should be used as mock transaction for execution 
tests.
  */
 public final class NoOpTransaction implements InternalTransaction {
-    private static final int ZONE_ID = 1;
-
     private static final int TABLE_ID = 2;
 
-    private static final int PARTITION_ID = 2;
-
     private final UUID id;
 
     private final HybridTimestamp hybridTimestamp = new HybridTimestamp(1, 1)
@@ -57,7 +54,7 @@ public final class NoOpTransaction implements 
InternalTransaction {
 
     private final PendingTxPartitionEnlistment enlistment;
 
-    private final ZonePartitionId groupId = new ZonePartitionId(ZONE_ID, 
PARTITION_ID);
+    private final AtomicReference<ZonePartitionId> commitPartition = new 
AtomicReference<>();
 
     private final boolean implicit;
 
@@ -178,12 +175,12 @@ public final class NoOpTransaction implements 
InternalTransaction {
 
     @Override
     public boolean assignCommitPartition(ZonePartitionId replicationGroupId) {
-        return true;
+        return commitPartition.compareAndSet(null, replicationGroupId);
     }
 
     @Override
     public ZonePartitionId commitPartition() {
-        return groupId;
+        return commitPartition.get();
     }
 
     @Override

Reply via email to