This is an automated email from the ASF dual-hosted git repository.
korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 784e098a840 IGNITE-28460 Sql. Choose commit partition from list of
involved partitions (#7936)
784e098a840 is described below
commit 784e098a84065d7155f70923acdc6e26bc4b740e
Author: korlov42 <[email protected]>
AuthorDate: Tue Apr 7 17:02:38 2026 +0300
IGNITE-28460 Sql. Choose commit partition from list of involved partitions
(#7936)
---
.../sql/engine/exec/ExecutionServiceImpl.java | 79 +++++----------
.../sql/engine/exec/TransactionEnlistTest.java | 106 ++++++++++++++++++++-
.../engine/exec/rel/ScannableTableSelfTest.java | 12 ++-
.../sql/engine/framework/NoOpTransaction.java | 11 +--
4 files changed, 140 insertions(+), 68 deletions(-)
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index b7bfe8fb861..23d413eab70 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -27,6 +27,7 @@ import static
org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import com.github.benmanes.caffeine.cache.Caffeine;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
@@ -49,7 +50,6 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -114,15 +114,10 @@ import
org.apache.ignite.internal.sql.engine.prepare.DdlPlan;
import org.apache.ignite.internal.sql.engine.prepare.ExplainPlan;
import org.apache.ignite.internal.sql.engine.prepare.ExplainablePlan;
import org.apache.ignite.internal.sql.engine.prepare.Fragment;
-import org.apache.ignite.internal.sql.engine.prepare.IgniteRelShuttle;
import org.apache.ignite.internal.sql.engine.prepare.KillPlan;
import org.apache.ignite.internal.sql.engine.prepare.MultiStepPlan;
import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
-import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
-import org.apache.ignite.internal.sql.engine.rel.IgniteTableModify;
-import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
-import org.apache.ignite.internal.sql.engine.rel.SourceAwareIgniteRel;
import org.apache.ignite.internal.sql.engine.schema.IgniteSchemas;
import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
@@ -1292,65 +1287,39 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, LogicalTopo
}
private void enlistPartitions(MappedFragment mappedFragment,
InternalTransaction tx) {
- // no need to traverse the tree if fragment has no tables
- if (mappedFragment.fragment().tables().isEmpty()) {
- return;
- }
-
- new IgniteRelShuttle() {
- @Override
- public IgniteRel visit(IgniteIndexScan rel) {
- enlist(rel);
-
- return super.visit(rel);
- }
-
- @Override
- public IgniteRel visit(IgniteTableScan rel) {
- enlist(rel);
-
- return super.visit(rel);
- }
+ boolean shouldAssignCommitPartition = tx.commitPartition() == null;
+ for (Long2ObjectMap.Entry<IgniteTable> entry :
mappedFragment.fragment().tables().long2ObjectEntrySet()) {
+ long sourceId = entry.getLongKey();
+ IgniteTable table = entry.getValue();
- @Override
- public IgniteRel visit(IgniteTableModify rel) {
- enlist(rel);
+ ColocationGroup colocationGroup =
mappedFragment.groupsBySourceId().get(sourceId);
+ Int2ObjectMap<NodeWithConsistencyToken> assignments =
colocationGroup.assignments();
- return super.visit(rel);
+ if (assignments.isEmpty()) {
+ continue;
}
- private void enlist(int tableId, int zoneId,
Int2ObjectMap<NodeWithConsistencyToken> assignments) {
- if (assignments.isEmpty()) {
- return;
- }
-
- int partsCnt = assignments.size();
-
- tx.assignCommitPartition(new ZonePartitionId(zoneId,
ThreadLocalRandom.current().nextInt(partsCnt)));
-
- for (Int2ObjectMap.Entry<NodeWithConsistencyToken>
partWithToken : assignments.int2ObjectEntrySet()) {
- ZonePartitionId replicationGroupId = new
ZonePartitionId(zoneId, partWithToken.getIntKey());
+ int tableId = table.id();
+ int zoneId = table.zoneId();
- NodeWithConsistencyToken assignment =
partWithToken.getValue();
-
- tx.enlist(
- replicationGroupId,
- tableId,
- assignment.name(),
- assignment.enlistmentConsistencyToken()
- );
- }
+ if (shouldAssignCommitPartition) {
+ tx.assignCommitPartition(new ZonePartitionId(zoneId,
assignments.keySet().iterator().nextInt()));
+ shouldAssignCommitPartition = false;
}
- private void enlist(SourceAwareIgniteRel rel) {
- IgniteTable igniteTable =
rel.getTable().unwrap(IgniteTable.class);
+ for (Int2ObjectMap.Entry<NodeWithConsistencyToken>
partWithToken : assignments.int2ObjectEntrySet()) {
+ ZonePartitionId partitionId = new ZonePartitionId(zoneId,
partWithToken.getIntKey());
- ColocationGroup colocationGroup =
mappedFragment.groupsBySourceId().get(rel.sourceId());
- Int2ObjectMap<NodeWithConsistencyToken> assignments =
colocationGroup.assignments();
+ NodeWithConsistencyToken assignment =
partWithToken.getValue();
- enlist(igniteTable.id(), igniteTable.zoneId(),
assignments);
+ tx.enlist(
+ partitionId,
+ tableId,
+ assignment.name(),
+ assignment.enlistmentConsistencyToken()
+ );
}
- }.visit(mappedFragment.fragment().root());
+ }
}
private CompletableFuture<Void> close(CancellationReason reason) {
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TransactionEnlistTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TransactionEnlistTest.java
index ba8861c9366..88a4e5848ea 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TransactionEnlistTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TransactionEnlistTest.java
@@ -21,6 +21,8 @@ import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import java.util.List;
@@ -29,10 +31,12 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.ignite.internal.hlc.HybridTimestampTracker;
import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
import org.apache.ignite.internal.sql.engine.InternalSqlRow;
import org.apache.ignite.internal.sql.engine.QueryProcessor;
import org.apache.ignite.internal.sql.engine.SqlProperties;
+import org.apache.ignite.internal.sql.engine.exec.mapping.ColocationGroup;
import org.apache.ignite.internal.sql.engine.framework.DataProvider;
import org.apache.ignite.internal.sql.engine.framework.NoOpTransaction;
import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
@@ -40,23 +44,31 @@ import
org.apache.ignite.internal.sql.engine.framework.TestCluster;
import org.apache.ignite.internal.sql.engine.framework.TestNode;
import org.apache.ignite.internal.sql.engine.prepare.QueryMetadata;
import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
+import org.apache.ignite.internal.sql.engine.schema.PartitionCalculator;
+import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
import org.apache.ignite.internal.sql.engine.util.InjectQueryCheckerFactory;
import org.apache.ignite.internal.sql.engine.util.QueryChecker;
import org.apache.ignite.internal.sql.engine.util.QueryCheckerExtension;
import org.apache.ignite.internal.sql.engine.util.QueryCheckerFactory;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.type.NativeType;
+import org.apache.ignite.internal.type.NativeTypes;
import org.apache.ignite.lang.CancellationToken;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
/** Transactions enlist count test. */
@ExtendWith(QueryCheckerExtension.class)
public class TransactionEnlistTest extends BaseIgniteAbstractTest {
+ private static final int PARTITIONS_COUNT = 3;
private static final String NODE_NAME1 = "N1";
@InjectQueryCheckerFactory
@@ -72,13 +84,45 @@ public class TransactionEnlistTest extends
BaseIgniteAbstractTest {
//noinspection ConcatenationWithEmptyString
CLUSTER.node("N1").initSchema(""
- + "CREATE ZONE test_zone (partitions 3) storage profiles
['Default'];"
- + "CREATE TABLE t1 (id INT PRIMARY KEY, val INT) ZONE
test_zone");
+ + "CREATE ZONE test_zone (partitions " + PARTITIONS_COUNT + ")
storage profiles ['Default'];"
+ + "CREATE TABLE t1 (id INT PRIMARY KEY, val INT) ZONE
test_zone;");
CLUSTER.setAssignmentsProvider("T1", (partitionCount, b) ->
IntStream.range(0, partitionCount)
.mapToObj(i -> List.of("N1"))
.collect(Collectors.toList()));
CLUSTER.setDataProvider("T1",
TestBuilders.tableScan(DataProvider.fromCollection(List.of())));
+ CLUSTER.setUpdatableTable("T1", new UpdatableTable() {
+ @Override
+ public TableDescriptor descriptor() {
+ return null;
+ }
+
+ @Override
+ public <RowT> CompletableFuture<?>
insertAll(ExecutionContext<RowT> ectx, List<RowT> rows, ColocationGroup
colocationGroup) {
+ return nullCompletedFuture();
+ }
+
+ @Override
+ public <RowT> CompletableFuture<Void> insert(@Nullable
InternalTransaction explicitTx, ExecutionContext<RowT> ectx, RowT row) {
+ return nullCompletedFuture();
+ }
+
+ @Override
+ public <RowT> CompletableFuture<?>
upsertAll(ExecutionContext<RowT> ectx, List<RowT> rows, ColocationGroup
colocationGroup) {
+ return nullCompletedFuture();
+ }
+
+ @Override
+ public <RowT> CompletableFuture<Boolean> delete(@Nullable
InternalTransaction explicitTx, ExecutionContext<RowT> ectx,
+ RowT key) {
+ return nullCompletedFuture();
+ }
+
+ @Override
+ public <RowT> CompletableFuture<?>
deleteAll(ExecutionContext<RowT> ectx, List<RowT> rows, ColocationGroup
colocationGroup) {
+ return nullCompletedFuture();
+ }
+ });
}
@AfterAll
@@ -104,6 +148,55 @@ public class TransactionEnlistTest extends
BaseIgniteAbstractTest {
Mockito.verify(spiedTx, times(2)).enlist(any(), anyInt(), any(),
anyLong());
}
+ @ParameterizedTest
+ @ValueSource(ints = {1, 2, 3})
+ void testCommitPartitionChoice(int id) {
+ NoOpTransaction tx = NoOpTransaction.readWrite("t1", false);
+
+ NoOpTransaction spiedTx = Mockito.spy(tx);
+
+ assertQuery("UPDATE t1 /*+ no_index */ SET val = 42 WHERE id = ?",
spiedTx)
+ .withParam(id)
+ .check();
+
+ int expectedPartition = expectedPartition(id);
+ {
+ ArgumentMatcher<ZonePartitionId> partitionIdMatch =
zonePartitionId -> zonePartitionId.partitionId() == expectedPartition;
+ // We expect commit partitions to be assigned once for given
transaction.
+ Mockito.verify(spiedTx, times(1))
+ .assignCommitPartition(argThat(partitionIdMatch));
+ // Individual partition on the other hand will be enlisted for
every source.
+ // In this particular case -- first time for scan and second for
Modify node.
+ Mockito.verify(spiedTx, times(2))
+ .enlist(argThat(partitionIdMatch), anyInt(), any(),
anyLong());
+ }
+
+ {
+ // Due to partition pruning we don't expect any more enlistment.
+ // We should not try to assign other partition as commit partition
as well.
+ ArgumentMatcher<ZonePartitionId> partitionIdMismatch =
zonePartitionId -> zonePartitionId.partitionId() != expectedPartition;
+ Mockito.verify(spiedTx, never())
+ .assignCommitPartition(argThat(partitionIdMismatch));
+ Mockito.verify(spiedTx, never())
+ .enlist(argThat(partitionIdMismatch), anyInt(), any(),
anyLong());
+ }
+ }
+
+ @Test
+ void testNoCommitPartitionAssignment() {
+ NoOpTransaction tx = NoOpTransaction.readWrite("t1", false);
+ tx.assignCommitPartition(new ZonePartitionId(1, 1));
+
+ NoOpTransaction spiedTx = Mockito.spy(tx);
+
+ assertQuery("UPDATE t1 /*+ no_index */ SET val = 42 WHERE id = ?",
spiedTx)
+ .withParam(1)
+ .check();
+
+ // Transaction already has a commit partition, so no assignment is
expected during query processing.
+ Mockito.verify(spiedTx, never()).assignCommitPartition(any());
+ }
+
private static QueryChecker assertQuery(String qry, InternalTransaction
tx) {
TestNode testNode = CLUSTER.node(NODE_NAME1);
@@ -149,10 +242,9 @@ public class TransactionEnlistTest extends
BaseIgniteAbstractTest {
String qry,
Object... params
) {
- assert params == null || params.length == 0 : "params are not
supported";
assert !prepareOnly : "Expected that the query will only be
prepared, but not executed";
- AsyncSqlCursor<InternalSqlRow> sqlCursor =
node.executeQuery(transaction, qry);
+ AsyncSqlCursor<InternalSqlRow> sqlCursor =
node.executeQuery(transaction, qry, params);
return CompletableFuture.completedFuture(sqlCursor);
}
@@ -169,4 +261,10 @@ public class TransactionEnlistTest extends
BaseIgniteAbstractTest {
return nullCompletedFuture();
}
}
+
+ private static int expectedPartition(int key) {
+ var calculator = new PartitionCalculator(PARTITIONS_COUNT, new
NativeType[] {NativeTypes.INT32});
+ calculator.append(key);
+ return calculator.partition();
+ }
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
index 420d3aad04e..33815f21301 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
@@ -59,6 +59,7 @@ import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.internal.binarytuple.BinaryTuple;
import org.apache.ignite.internal.binarytuple.BinaryTuplePrefix;
import org.apache.ignite.internal.network.InternalClusterNode;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowEx;
import org.apache.ignite.internal.sql.engine.api.expressions.RowFactory;
@@ -80,6 +81,7 @@ import org.apache.ignite.internal.table.OperationContext;
import org.apache.ignite.internal.table.TxContext;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.type.StructNativeType;
+import org.apache.ignite.tx.Transaction;
import org.hamcrest.Matchers;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.Named;
@@ -220,7 +222,7 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
for (Bound leftBound : Bound.values()) {
for (Bound rightBound : Bound.values()) {
params.add(Arguments.of(NoOpTransaction.readOnly("RO", false),
leftBound, rightBound));
- params.add(Arguments.of(NoOpTransaction.readWrite("RW",
false), leftBound, rightBound));
+ params.add(Arguments.of(rwWithCommitPartition(), leftBound,
rightBound));
}
}
@@ -483,10 +485,16 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
private static Stream<Arguments> transactions() {
return Stream.of(
Arguments.of(Named.of("Read-only transaction",
NoOpTransaction.readOnly("RO", false))),
- Arguments.of(Named.of("Read-write transaction",
NoOpTransaction.readWrite("RW", false)))
+ Arguments.of(Named.of("Read-write transaction",
rwWithCommitPartition()))
);
}
+ private static Transaction rwWithCommitPartition() {
+ NoOpTransaction tx = NoOpTransaction.readWrite("RW", false);
+ tx.assignCommitPartition(new ZonePartitionId(1, 1));
+ return tx;
+ }
+
private class Tester {
final ScannableTable scannableTable;
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
index 22af25100ab..aa7c27e4d97 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
@@ -26,6 +26,7 @@ import java.net.InetSocketAddress;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.network.ClusterNodeImpl;
import org.apache.ignite.internal.network.InternalClusterNode;
@@ -42,12 +43,8 @@ import org.jetbrains.annotations.Nullable;
* Dummy transaction that should be used as mock transaction for execution
tests.
*/
public final class NoOpTransaction implements InternalTransaction {
- private static final int ZONE_ID = 1;
-
private static final int TABLE_ID = 2;
- private static final int PARTITION_ID = 2;
-
private final UUID id;
private final HybridTimestamp hybridTimestamp = new HybridTimestamp(1, 1)
@@ -57,7 +54,7 @@ public final class NoOpTransaction implements
InternalTransaction {
private final PendingTxPartitionEnlistment enlistment;
- private final ZonePartitionId groupId = new ZonePartitionId(ZONE_ID,
PARTITION_ID);
+ private final AtomicReference<ZonePartitionId> commitPartition = new
AtomicReference<>();
private final boolean implicit;
@@ -178,12 +175,12 @@ public final class NoOpTransaction implements
InternalTransaction {
@Override
public boolean assignCommitPartition(ZonePartitionId replicationGroupId) {
- return true;
+ return commitPartition.compareAndSet(null, replicationGroupId);
}
@Override
public ZonePartitionId commitPartition() {
- return groupId;
+ return commitPartition.get();
}
@Override