This is an automated email from the ASF dual-hosted git repository. ppa pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 5cfd89174d IGNITE-17951 Sql. Enlist partitions in transaction before executing a query (#1501) 5cfd89174d is described below commit 5cfd89174d9d6a23e05a3b920a5c2b07fb1cbd62 Author: Pavel Pereslegin <xxt...@gmail.com> AuthorDate: Fri Feb 3 18:16:01 2023 +0300 IGNITE-17951 Sql. Enlist partitions in transaction before executing a query (#1501) --- .../ignite/client/fakes/FakeInternalTable.java | 35 +++++ .../apache/ignite/internal/index/HashIndex.java | 13 ++ .../org/apache/ignite/internal/index/Index.java | 21 +++ .../apache/ignite/internal/index/SortedIndex.java | 39 +++-- .../ignite/internal/index/SortedIndexImpl.java | 27 ++++ .../ignite/internal/table/ItTableScanTest.java | 64 ++++++-- .../internal/sql/engine/SqlQueryProcessor.java | 19 +-- .../internal/sql/engine/exec/ExecutionContext.java | 16 +- .../sql/engine/exec/ExecutionServiceImpl.java | 99 ++++++++++-- .../sql/engine/exec/LogicalRelImplementor.java | 4 +- .../sql/engine/exec/rel/IndexScanNode.java | 76 +++++++--- .../sql/engine/exec/rel/StorageScanNode.java | 2 +- .../sql/engine/exec/rel/TableScanNode.java | 37 +++-- .../sql/engine/message/QueryStartRequest.java | 7 + .../sql/engine/metadata/ColocationGroup.java | 167 +++++++++++++++------ .../engine/metadata/IgniteMdFragmentMapping.java | 6 +- .../internal/sql/engine/metadata/NodeWithTerm.java | 63 ++++++++ .../sql/engine/metadata/PartitionWithTerm.java | 58 +++++++ .../sql/engine/schema/IgniteTableImpl.java | 4 +- .../sql/engine/trait/DistributionFunction.java | 4 +- .../internal/sql/engine/util/BaseQueryContext.java | 21 +-- .../sql/engine/util/LocalTxAttributesHolder.java | 118 +++++++++++++++ .../internal/sql/engine/StopCalciteModuleTest.java | 9 +- .../sql/engine/exec/ExecutionServiceImplTest.java | 12 +- .../sql/engine/exec/RuntimeSortedIndexTest.java | 3 +- .../sql/engine/exec/rel/AbstractExecutionTest.java | 12 +- .../exec/rel/IndexScanNodeExecutionTest.java | 14 +- .../engine/exec/rel/MergeJoinExecutionTest.java | 2 +- .../exec/rel/TableScanNodeExecutionTest.java | 63 +++++--- .../sql/engine/framework/TestBuilders.java | 5 +- .../internal/sql/engine/framework/TestNode.java | 2 +- .../sql/engine/planner/AbstractPlannerTest.java | 21 +++ .../internal/sql/engine/planner/PlannerTest.java | 49 +++--- .../ItAbstractInternalTableScanTest.java | 47 +++++- .../ItInternalTableReadWriteScanTest.java | 30 +++- .../ignite/internal/table/InternalTable.java | 54 +++++++ .../distributed/storage/InternalTableImpl.java | 91 ++++++++++- .../ignite/internal/utils/PrimaryReplica.java} | 48 ++++-- .../table/impl/DummyInternalTableImpl.java | 9 ++ 39 files changed, 1099 insertions(+), 272 deletions(-) diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java index 24551df462..f1ad1c501a 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java +++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java @@ -38,6 +38,7 @@ import org.apache.ignite.internal.storage.engine.MvTableStorage; import org.apache.ignite.internal.table.InternalTable; import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage; +import org.apache.ignite.internal.utils.PrimaryReplica; import org.apache.ignite.lang.IgniteInternalException; import org.apache.ignite.network.ClusterNode; import org.jetbrains.annotations.NotNull; @@ -334,6 +335,21 @@ public class FakeInternalTable implements InternalTable { throw new IgniteInternalException(new OperationNotSupportedException()); } + /** {@inheritDoc} */ + @Override + public Publisher<BinaryRow> scan( + int partId, + UUID txId, + PrimaryReplica recipient, + @Nullable UUID indexId, + @Nullable BinaryTuplePrefix lowerBound, + @Nullable BinaryTuplePrefix upperBound, + int flags, + @Nullable BitSet columnsToInclude + ) { + throw new IgniteInternalException(new OperationNotSupportedException()); + } + /** {@inheritDoc} */ @Override public Publisher<BinaryRow> scan( @@ -370,6 +386,19 @@ public class FakeInternalTable implements InternalTable { throw new IgniteInternalException(new OperationNotSupportedException()); } + /** {@inheritDoc} */ + @Override + public Publisher<BinaryRow> lookup( + int partId, + UUID txId, + PrimaryReplica recipient, + UUID indexId, + BinaryTuple key, + @Nullable BitSet columnsToInclude + ) { + throw new IgniteInternalException(new OperationNotSupportedException()); + } + /** {@inheritDoc} */ @Override public Publisher<BinaryRow> lookup( @@ -389,6 +418,12 @@ public class FakeInternalTable implements InternalTable { throw new IgniteInternalException(new OperationNotSupportedException()); } + /** {@inheritDoc} */ + @Override + public List<PrimaryReplica> primaryReplicas() { + throw new IgniteInternalException(new OperationNotSupportedException()); + } + /** {@inheritDoc} */ @Override public ClusterNode leaderAssignment(int partition) { diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/HashIndex.java b/modules/index/src/main/java/org/apache/ignite/internal/index/HashIndex.java index 069b1f7b1b..a4fd38bc72 100644 --- a/modules/index/src/main/java/org/apache/ignite/internal/index/HashIndex.java +++ b/modules/index/src/main/java/org/apache/ignite/internal/index/HashIndex.java @@ -27,6 +27,7 @@ import org.apache.ignite.internal.schema.BinaryTuple; import org.apache.ignite.internal.table.InternalTable; import org.apache.ignite.internal.table.TableImpl; import org.apache.ignite.internal.tx.InternalTransaction; +import org.apache.ignite.internal.utils.PrimaryReplica; import org.apache.ignite.network.ClusterNode; import org.jetbrains.annotations.Nullable; @@ -81,6 +82,18 @@ public class HashIndex implements Index<IndexDescriptor> { return table.lookup(partId, tx, id, key, columns); } + /** {@inheritDoc} */ + @Override + public Publisher<BinaryRow> lookup( + int partId, + UUID txId, + PrimaryReplica recipient, + BinaryTuple key, + @Nullable BitSet columns + ) { + return table.lookup(partId, txId, recipient, id, key, columns); + } + /** {@inheritDoc} */ @Override public Publisher<BinaryRow> lookup( diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/Index.java b/modules/index/src/main/java/org/apache/ignite/internal/index/Index.java index 896ee3ab09..5e2fff76b5 100644 --- a/modules/index/src/main/java/org/apache/ignite/internal/index/Index.java +++ b/modules/index/src/main/java/org/apache/ignite/internal/index/Index.java @@ -24,6 +24,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.schema.BinaryTuple; import org.apache.ignite.internal.tx.InternalTransaction; +import org.apache.ignite.internal.utils.PrimaryReplica; import org.apache.ignite.network.ClusterNode; import org.jetbrains.annotations.Nullable; @@ -53,9 +54,29 @@ public interface Index<DescriptorT extends IndexDescriptor> { * @param key Key to lookup. * @param columns Columns to include. * @return A cursor from resulting rows. + * @deprecated IGNITE-17952 Use {@link #lookup(int, UUID, PrimaryReplica, BinaryTuple, BitSet)} instead. */ + @Deprecated Publisher<BinaryRow> lookup(int partId, @Nullable InternalTransaction tx, BinaryTuple key, @Nullable BitSet columns); + /** + * Returns cursor for the values corresponding to the given key. + * + * @param partId Partition id. + * @param txId Transaction id. + * @param recipient Primary replica that will handle given get request. + * @param key Key to lookup. + * @param columns Columns to include. + * @return A cursor from resulting rows. + */ + Publisher<BinaryRow> lookup( + int partId, + UUID txId, + PrimaryReplica recipient, + BinaryTuple key, + @Nullable BitSet columns + ); + /** * Returns cursor for the values corresponding to the given key. * diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndex.java b/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndex.java index 0676ba557a..ed45802283 100644 --- a/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndex.java +++ b/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndex.java @@ -18,11 +18,13 @@ package org.apache.ignite.internal.index; import java.util.BitSet; +import java.util.UUID; import java.util.concurrent.Flow.Publisher; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.schema.BinaryTuplePrefix; import org.apache.ignite.internal.tx.InternalTransaction; +import org.apache.ignite.internal.utils.PrimaryReplica; import org.apache.ignite.network.ClusterNode; import org.jetbrains.annotations.Nullable; @@ -39,24 +41,28 @@ public interface SortedIndex extends Index<SortedIndexDescriptor> { byte INCLUDE_RIGHT = 0b10; /** - * Opens a range cursor for given bounds with left bound included in result and right excluded. + * Opens a range cursor for given bounds. Inclusion of the bounds is defined by {@code flags} mask. * * @param partId Partition. - * @param tx Transaction. - * @param left Left bound of range. - * @param right Right bound of range. - * @param columns Columns to include. + * @param txId Transaction id. + * @param recipient Primary replica that will handle given get request. + * @param leftBound Left bound of range. + * @param rightBound Right bound of range. + * @param flags A mask that defines whether to include bounds into the final result or not. + * @param columnsToInclude Columns to include. * @return A cursor from resulting rows. + * @see SortedIndex#INCLUDE_LEFT + * @see SortedIndex#INCLUDE_RIGHT */ - default Publisher<BinaryRow> scan( + Publisher<BinaryRow> scan( int partId, - @Nullable InternalTransaction tx, - @Nullable BinaryTuplePrefix left, - @Nullable BinaryTuplePrefix right, - @Nullable BitSet columns - ) { - return scan(partId, tx, left, right, INCLUDE_LEFT, columns); - } + UUID txId, + PrimaryReplica recipient, + @Nullable BinaryTuplePrefix leftBound, + @Nullable BinaryTuplePrefix rightBound, + int flags, + @Nullable BitSet columnsToInclude + ); /** * Opens a read-only range cursor for given bounds with left bound included in result and right excluded. @@ -81,7 +87,7 @@ public interface SortedIndex extends Index<SortedIndexDescriptor> { } /** - * Opens a range cursor for given bounds. Inclusion of the bounds is defined by {@code includeBounds} mask. + * Opens a range cursor for given bounds. Inclusion of the bounds is defined by {@code flags} mask. * * @param partId Partition. * @param tx Transaction. @@ -92,7 +98,9 @@ public interface SortedIndex extends Index<SortedIndexDescriptor> { * @return A cursor from resulting rows. * @see SortedIndex#INCLUDE_LEFT * @see SortedIndex#INCLUDE_RIGHT + * @deprecated IGNITE-17952 Use {@link #scan(int, UUID, PrimaryReplica, BinaryTuplePrefix, BinaryTuplePrefix, int, BitSet)} instead. */ + @Deprecated Publisher<BinaryRow> scan( int partId, @Nullable InternalTransaction tx, @@ -102,9 +110,8 @@ public interface SortedIndex extends Index<SortedIndexDescriptor> { @Nullable BitSet columnsToInclude ); - /** - * Opens a range cursor for given bounds. Inclusion of the bounds is defined by {@code includeBounds} mask. + * Opens a range cursor for given bounds. Inclusion of the bounds is defined by {@code flags} mask. * * @param partId Partition. * @param readTimestamp Read timestamp. diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndexImpl.java b/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndexImpl.java index 3ec520b895..c3d6f5c65b 100644 --- a/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndexImpl.java +++ b/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndexImpl.java @@ -28,6 +28,7 @@ import org.apache.ignite.internal.schema.BinaryTuplePrefix; import org.apache.ignite.internal.table.InternalTable; import org.apache.ignite.internal.table.TableImpl; import org.apache.ignite.internal.tx.InternalTransaction; +import org.apache.ignite.internal.utils.PrimaryReplica; import org.apache.ignite.network.ClusterNode; import org.jetbrains.annotations.Nullable; @@ -82,6 +83,18 @@ public class SortedIndexImpl implements SortedIndex { return table.lookup(partId, tx, id, key, columns); } + /** {@inheritDoc} */ + @Override + public Publisher<BinaryRow> lookup( + int partId, + UUID txId, + PrimaryReplica recipient, + BinaryTuple key, + @Nullable BitSet columns + ) { + return table.lookup(partId, txId, recipient, id, key, columns); + } + /** {@inheritDoc} */ @Override public Publisher<BinaryRow> lookup( @@ -120,4 +133,18 @@ public class SortedIndexImpl implements SortedIndex { ) { return table.scan(partId, readTimestamp, recipientNode, id, leftBound, rightBound, flags, columnsToInclude); } + + /** {@inheritDoc} */ + @Override + public Publisher<BinaryRow> scan( + int partId, + UUID txId, + PrimaryReplica recipient, + @Nullable BinaryTuplePrefix leftBound, + @Nullable BinaryTuplePrefix rightBound, + int flags, + @Nullable BitSet columnsToInclude + ) { + return table.scan(partId, txId, recipient, id, leftBound, rightBound, flags, columnsToInclude); + } } diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java index 70c0841c5e..9d9f3690c4 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java @@ -43,6 +43,7 @@ import java.util.stream.IntStream; import org.apache.ignite.Ignite; import org.apache.ignite.internal.app.IgniteImpl; import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder; +import org.apache.ignite.internal.raft.service.RaftGroupService; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.schema.BinaryTuple; import org.apache.ignite.internal.schema.BinaryTuplePrefix; @@ -56,9 +57,13 @@ import org.apache.ignite.internal.schema.configuration.index.TableIndexConfigura import org.apache.ignite.internal.schema.row.Row; import org.apache.ignite.internal.schema.row.RowAssembler; import org.apache.ignite.internal.sql.engine.AbstractBasicIntegrationTest; +import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId; import org.apache.ignite.internal.testframework.IgniteTestUtils; import org.apache.ignite.internal.tx.InternalTransaction; +import org.apache.ignite.internal.utils.PrimaryReplica; +import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteStringFormatter; +import org.apache.ignite.network.ClusterNode; import org.apache.ignite.table.KeyValueView; import org.apache.ignite.table.Tuple; import org.apache.ignite.tx.IgniteTransactions; @@ -107,11 +112,12 @@ public class ItTableScanTest extends AbstractBasicIntegrationTest { @Test public void testInsertWaitScanComplete() throws Exception { + int partId = 0; TableImpl table = getOrCreateTable(); IgniteTransactions transactions = CLUSTER_NODES.get(0).transactions(); InternalTransaction tx0 = (InternalTransaction) transactions.begin(); - InternalTransaction tx1 = (InternalTransaction) transactions.begin(); + InternalTransaction tx1 = startTxWithEnlistedPartition(partId); InternalTable internalTable = table.internalTable(); @@ -119,7 +125,11 @@ public class ItTableScanTest extends AbstractBasicIntegrationTest { ArrayList<BinaryRow> scannedRows = new ArrayList<>(); - Publisher<BinaryRow> publisher = internalTable.scan(0, tx1, sortedIndexId, null, null, 0, null); + IgniteBiTuple<ClusterNode, Long> leaderWithTerm = tx1.enlistedNodeAndTerm(new TablePartitionId(table.tableId(), partId)); + + PrimaryReplica recipient = new PrimaryReplica(leaderWithTerm.get1(), leaderWithTerm.get2()); + + Publisher<BinaryRow> publisher = internalTable.scan(partId, tx1.id(), recipient, sortedIndexId, null, null, 0, null); CompletableFuture<Void> scanned = new CompletableFuture<>(); @@ -459,9 +469,15 @@ public class ItTableScanTest extends AbstractBasicIntegrationTest { ArrayList<BinaryRow> scannedRows = new ArrayList<>(); - InternalTransaction tx = (InternalTransaction) CLUSTER_NODES.get(0).transactions().begin(); + int partId = 0; + + InternalTransaction tx = startTxWithEnlistedPartition(partId); + + IgniteBiTuple<ClusterNode, Long> leaderWithTerm = tx.enlistedNodeAndTerm(new TablePartitionId(table.tableId(), partId)); - Publisher<BinaryRow> publisher = internalTable.scan(0, tx, sortedIndexId, null, null, 0, null); + PrimaryReplica recipient = new PrimaryReplica(leaderWithTerm.get1(), leaderWithTerm.get2()); + + Publisher<BinaryRow> publisher = internalTable.scan(partId, tx.id(), recipient, sortedIndexId, null, null, 0, null); CompletableFuture<Void> scanned = new CompletableFuture<>(); @@ -487,7 +503,7 @@ public class ItTableScanTest extends AbstractBasicIntegrationTest { assertEquals(ROW_IDS.size() + 1, scannedRows.size()); - var publisher1 = internalTable.scan(0, tx, sortedIndexId, null, null, 0, null); + Publisher<BinaryRow> publisher1 = internalTable.scan(0, tx.id(), recipient, sortedIndexId, null, null, 0, null); assertEquals(scanAllRows(publisher1).size(), scannedRows.size()); @@ -515,11 +531,16 @@ public class ItTableScanTest extends AbstractBasicIntegrationTest { UUID soredIndexId = getSortedIndexId(); - InternalTransaction tx = (InternalTransaction) CLUSTER_NODES.get(0).transactions().begin(); + int partId = 0; + + InternalTransaction tx = startTxWithEnlistedPartition(partId); + IgniteBiTuple<ClusterNode, Long> leaderWithTerm = tx.enlistedNodeAndTerm(new TablePartitionId(table.tableId(), partId)); + PrimaryReplica recipient = new PrimaryReplica(leaderWithTerm.get1(), leaderWithTerm.get2()); Publisher<BinaryRow> publisher = internalTable.scan( - 0, - tx, + partId, + tx.id(), + recipient, soredIndexId, lowBound, upperBound, @@ -540,8 +561,9 @@ public class ItTableScanTest extends AbstractBasicIntegrationTest { kvView.put(null, Tuple.create().set("key", 9), Tuple.create().set("valInt", 9).set("valStr", "New_9"))); Publisher<BinaryRow> publisher1 = internalTable.scan( - 0, - tx, + partId, + tx.id(), + recipient, soredIndexId, lowBound, upperBound, @@ -763,4 +785,26 @@ public class ItTableScanTest extends AbstractBasicIntegrationTest { return new Row(SCHEMA, new ByteBufferRow(rowBuilder.toBytes())); } + + /** + * Starts an RW transaction and enlists the specified partition in it. + * + * @param partId Partition ID. + * @return Transaction. + */ + private InternalTransaction startTxWithEnlistedPartition(int partId) { + Ignite ignite = CLUSTER_NODES.get(0); + + InternalTransaction tx = (InternalTransaction) ignite.transactions().begin(); + + InternalTable table = ((TableImpl) ignite.tables().table(TABLE_NAME)).internalTable(); + TablePartitionId tblPartId = new TablePartitionId(table.tableId(), partId); + RaftGroupService raftSvc = table.partitionRaftGroupService(partId); + long term = IgniteTestUtils.await(raftSvc.refreshAndGetLeaderWithTerm()).term(); + + tx.assignCommitPartition(tblPartId); + tx.enlist(tblPartId, new IgniteBiTuple<>(table.leaderAssignment(partId), term)); + + return tx; + } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java index 2e383ec123..ad699136f6 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java @@ -45,7 +45,6 @@ import org.apache.calcite.tools.Frameworks; import org.apache.calcite.util.Pair; import org.apache.ignite.internal.distributionzones.DistributionZoneManager; import org.apache.ignite.internal.hlc.HybridClock; -import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.index.IndexManager; import org.apache.ignite.internal.index.event.IndexEvent; import org.apache.ignite.internal.index.event.IndexEventParameters; @@ -74,6 +73,7 @@ import org.apache.ignite.internal.sql.engine.session.SessionInfo; import org.apache.ignite.internal.sql.engine.session.SessionManager; import org.apache.ignite.internal.sql.engine.util.BaseQueryContext; import org.apache.ignite.internal.sql.engine.util.Commons; +import org.apache.ignite.internal.sql.engine.util.LocalTxAttributesHolder; import org.apache.ignite.internal.storage.DataStorageManager; import org.apache.ignite.internal.table.distributed.TableManager; import org.apache.ignite.internal.table.event.TableEvent; @@ -399,22 +399,19 @@ public class SqlQueryProcessor implements QueryProcessor { return nodes.get(0); }) .thenCompose(sqlNode -> { - final boolean rwOp = dataModificationOp(sqlNode); - final HybridTimestamp txTime = outerTx != null ? outerTx.readTimestamp() : rwOp ? null : clock.now(); + boolean rwOp = dataModificationOp(sqlNode); + boolean useDistributedTraits = outerTx != null ? outerTx.isReadOnly() : !rwOp; BaseQueryContext ctx = BaseQueryContext.builder() .frameworkConfig( Frameworks.newConfigBuilder(FRAMEWORK_CONFIG) .defaultSchema(schema) - .traitDefs(rwOp || (outerTx != null && !outerTx.isReadOnly()) ? Commons.LOCAL_TRAITS_SET : - Commons.DISTRIBUTED_TRAITS_SET) + .traitDefs(useDistributedTraits ? Commons.DISTRIBUTED_TRAITS_SET : Commons.LOCAL_TRAITS_SET) .build() ) .logger(LOG) .cancel(queryCancel) .parameters(params) - .transaction(outerTx) - .transactionTime(txTime) .plannerTimeout(PLANNER_TIMEOUT) .build(); @@ -425,17 +422,17 @@ public class SqlQueryProcessor implements QueryProcessor { boolean implicitTxRequired = outerTx == null && rwOp; - InternalTransaction implicitTx = implicitTxRequired ? txManager.begin() : null; + InternalTransaction tx = implicitTxRequired ? txManager.begin() + : outerTx != null ? outerTx : new LocalTxAttributesHolder(null, clock.now()); - BaseQueryContext enrichedContext = - implicitTxRequired ? ctx.toBuilder().transaction(implicitTx).build() : ctx; + BaseQueryContext enrichedContext = ctx.toBuilder().transaction(tx).build(); var dataCursor = executionSrvc.executePlan(plan, enrichedContext); return new AsyncSqlCursorImpl<>( SqlQueryType.mapPlanTypeToSqlType(plan.type()), plan.metadata(), - implicitTx, + implicitTxRequired ? tx : null, new AsyncCursor<List<Object>>() { @Override public CompletableFuture<BatchedResult<List<Object>>> requestNextAsync(int rows) { diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java index 42fc639162..18eb033296 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java @@ -32,7 +32,6 @@ import java.util.function.Consumer; import org.apache.calcite.DataContext; import org.apache.calcite.linq4j.QueryProvider; import org.apache.calcite.schema.SchemaPlus; -import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.schema.BinaryConverter; @@ -84,9 +83,6 @@ public class ExecutionContext<RowT> extends AbstractQueryContext implements Data private final AtomicBoolean cancelFlag = new AtomicBoolean(); - /** Transaction. */ - private InternalTransaction tx; - /** * Need to store timestamp, since SQL standard says that functions such as CURRENT_TIMESTAMP return the same value throughout the * query. @@ -104,7 +100,6 @@ public class ExecutionContext<RowT> extends AbstractQueryContext implements Data * @param fragmentDesc Partitions information. * @param handler Row handler. * @param params Parameters. - * @param tx Transaction. */ @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType") public ExecutionContext( @@ -115,8 +110,7 @@ public class ExecutionContext<RowT> extends AbstractQueryContext implements Data String originatingNodeName, FragmentDescription fragmentDesc, RowHandler<RowT> handler, - Map<String, Object> params, - InternalTransaction tx + Map<String, Object> params ) { super(qctx); @@ -128,7 +122,6 @@ public class ExecutionContext<RowT> extends AbstractQueryContext implements Data this.params = params; this.localNode = localNode; this.originatingNodeName = originatingNodeName; - this.tx = tx; expressionFactory = new ExpressionFactoryImpl<>( this, @@ -338,12 +331,7 @@ public class ExecutionContext<RowT> extends AbstractQueryContext implements Data /** Transaction for current context. */ public InternalTransaction transaction() { - return tx; - } - - /** Read only transaction time. */ - public HybridTimestamp transactionTime() { - return qctx.transactionTime(); + return qctx.transaction(); } /** diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java index 406447c379..bf80d7409d 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java @@ -37,6 +37,7 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import org.apache.calcite.tools.Frameworks; @@ -56,24 +57,35 @@ import org.apache.ignite.internal.sql.engine.message.QueryStartRequest; import org.apache.ignite.internal.sql.engine.message.QueryStartResponse; import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup; import org.apache.ignite.internal.sql.engine.message.SqlQueryMessagesFactory; +import org.apache.ignite.internal.sql.engine.metadata.ColocationGroup; import org.apache.ignite.internal.sql.engine.metadata.FragmentDescription; import org.apache.ignite.internal.sql.engine.metadata.MappingService; import org.apache.ignite.internal.sql.engine.metadata.MappingServiceImpl; +import org.apache.ignite.internal.sql.engine.metadata.NodeWithTerm; import org.apache.ignite.internal.sql.engine.metadata.RemoteException; import org.apache.ignite.internal.sql.engine.prepare.DdlPlan; import org.apache.ignite.internal.sql.engine.prepare.ExplainPlan; import org.apache.ignite.internal.sql.engine.prepare.Fragment; import org.apache.ignite.internal.sql.engine.prepare.FragmentPlan; +import org.apache.ignite.internal.sql.engine.prepare.IgniteRelShuttle; import org.apache.ignite.internal.sql.engine.prepare.MappingQueryContext; import org.apache.ignite.internal.sql.engine.prepare.MultiStepPlan; import org.apache.ignite.internal.sql.engine.prepare.QueryPlan; +import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan; import org.apache.ignite.internal.sql.engine.rel.IgniteRel; +import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan; +import org.apache.ignite.internal.sql.engine.rel.SourceAwareIgniteRel; +import org.apache.ignite.internal.sql.engine.schema.InternalIgniteTable; import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager; +import org.apache.ignite.internal.sql.engine.trait.TraitUtils; import org.apache.ignite.internal.sql.engine.util.BaseQueryContext; import org.apache.ignite.internal.sql.engine.util.Commons; import org.apache.ignite.internal.sql.engine.util.HashFunctionFactoryImpl; +import org.apache.ignite.internal.sql.engine.util.LocalTxAttributesHolder; import org.apache.ignite.internal.sql.engine.util.TypeUtils; +import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId; import org.apache.ignite.internal.tx.InternalTransaction; +import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteInternalCheckedException; import org.apache.ignite.lang.IgniteInternalException; import org.apache.ignite.network.ClusterNode; @@ -99,6 +111,8 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve private final MessageService msgSrvc; + private final TopologyService topSrvc; + private final ClusterNode localNode; private final SqlSchemaManager sqlSchemaManager; @@ -142,8 +156,8 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve ExchangeService exchangeSrvc ) { return new ExecutionServiceImpl<>( - topSrvc.localMember(), msgSrvc, + topSrvc, new MappingServiceImpl(topSrvc), sqlSchemaManager, ddlCommandHandler, @@ -160,11 +174,21 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve } /** - * Constructor. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859 + * Constructor. + * + * @param msgSrvc Message service. + * @param topSrvc Topology service. + * @param mappingSrvc Nodes mapping calculation service. + * @param sqlSchemaManager Schema manager. + * @param ddlCmdHnd Handler of the DDL commands. + * @param taskExecutor Task executor. + * @param handler Row handler. + * @param exchangeSrvc Exchange service. + * @param implementorFactory Relational node implementor factory. */ public ExecutionServiceImpl( - ClusterNode localNode, MessageService msgSrvc, + TopologyService topSrvc, MappingService mappingSrvc, SqlSchemaManager sqlSchemaManager, DdlCommandHandler ddlCmdHnd, @@ -173,10 +197,11 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve ExchangeService exchangeSrvc, ImplementorFactory<RowT> implementorFactory ) { - this.localNode = localNode; + this.localNode = topSrvc.localMember(); this.handler = handler; this.msgSrvc = msgSrvc; this.mappingSrvc = mappingSrvc; + this.topSrvc = topSrvc; this.sqlSchemaManager = sqlSchemaManager; this.taskExecutor = taskExecutor; this.exchangeSrvc = exchangeSrvc; @@ -197,11 +222,9 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve BaseQueryContext ctx, MultiStepPlan plan ) { - DistributedQueryManager queryManager; - - InternalTransaction tx = ctx.transaction(); + DistributedQueryManager queryManager = new DistributedQueryManager(ctx, ctx.transaction()); - DistributedQueryManager old = queryManagerMap.put(ctx.queryId(), queryManager = new DistributedQueryManager(ctx, tx)); + DistributedQueryManager old = queryManagerMap.put(ctx.queryId(), queryManager); assert old == null; @@ -210,7 +233,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve return queryManager.execute(plan); } - private BaseQueryContext createQueryContext(UUID queryId, @Nullable String schema, Object[] params, HybridTimestamp txTime) { + private BaseQueryContext createQueryContext(UUID queryId, @Nullable String schema, Object[] params, HybridTimestamp txTime, UUID txId) { return BaseQueryContext.builder() .queryId(queryId) .parameters(params) @@ -220,7 +243,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve .build() ) .logger(LOG) - .transactionTime(txTime) + .transaction(new LocalTxAttributesHolder(txId, txTime)) .build(); } @@ -303,7 +326,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve assert nodeName != null && msg != null; DistributedQueryManager queryManager = queryManagerMap.computeIfAbsent(msg.queryId(), key -> { - BaseQueryContext ctx = createQueryContext(key, msg.schema(), msg.parameters(), msg.txTime()); + BaseQueryContext ctx = createQueryContext(key, msg.schema(), msg.parameters(), msg.txTime(), msg.txId()); return new DistributedQueryManager(ctx); }); @@ -433,7 +456,8 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve .root(fragment.serialized()) .fragmentDescription(desc) .parameters(ctx.parameters()) - .txTime(ctx.transactionTime()) + .txTime(ctx.transaction().readTimestamp()) + .txId(ctx.transaction().id()) .build(); var fut = new CompletableFuture<Void>(); @@ -538,8 +562,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve initiatorNodeName, desc, handler, - Commons.parametersMap(ctx.parameters()), - tx + Commons.parametersMap(ctx.parameters()) ); } @@ -583,6 +606,10 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve // start remote execution for (Fragment fragment : fragments) { + if (tx != null && !tx.isReadOnly()) { + enlistPartitions(fragment, tx); + } + if (fragment.rootFragment()) { assert rootFragmentId == null; @@ -630,6 +657,50 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve }; } + private void enlistPartitions(Fragment fragment, InternalTransaction tx) { + // TODO IGNITE-17952 Next condition should be removed. + if (!TraitUtils.distributionEnabled(fragment.root())) { + return; + } + + new IgniteRelShuttle() { + @Override + public IgniteRel visit(IgniteIndexScan rel) { + enlist(rel); + + return super.visit(rel); + } + + @Override + public IgniteRel visit(IgniteTableScan rel) { + enlist(rel); + + return super.visit(rel); + } + + private void enlist(SourceAwareIgniteRel rel) { + InternalIgniteTable tbl = rel.getTable().unwrap(InternalIgniteTable.class); + ColocationGroup grp = fragment.mapping().findGroup(rel.sourceId()); + + if (grp.assignments().isEmpty()) { + return; + } + + int partsCnt = grp.assignments().size(); + + tx.assignCommitPartition(new TablePartitionId(tbl.id(), ThreadLocalRandom.current().nextInt(partsCnt))); + + for (int p = 0; p < partsCnt; p++) { + List<NodeWithTerm> assign = grp.assignments().get(p); + NodeWithTerm leaderWithTerm = assign.get(0); + + tx.enlist(new TablePartitionId(tbl.id(), p), + new IgniteBiTuple<>(topSrvc.getByConsistentId(leaderWithTerm.name()), leaderWithTerm.term())); + } + } + }.visit(fragment.root()); + } + private CompletableFuture<Void> close(boolean cancel) { if (!cancelled.compareAndSet(false, true)) { return cancelFut.thenApply(Function.identity()); diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java index 5cb79e1cce..63ae7944e7 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java @@ -367,7 +367,7 @@ public class LogicalRelImplementor<RowT> implements IgniteRelVisitor<Node<RowT>> ctx.rowHandler().factory(ctx.getTypeFactory(), rowType), idx, tbl, - group.partitions(ctx.localNode().name()), + group.partitionsWithTerms(ctx.localNode().name()), comp, ranges, filters, @@ -404,7 +404,7 @@ public class LogicalRelImplementor<RowT> implements IgniteRelVisitor<Node<RowT>> ctx, ctx.rowHandler().factory(ctx.getTypeFactory(), rowType), tbl, - group.partitions(ctx.localNode().name()), + group.partitionsWithTerms(ctx.localNode().name()), filters, prj, requiredColumns == null ? null : requiredColumns.toBitSet() diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java index ae53e7f8e3..80a1f90a38 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java @@ -17,10 +17,8 @@ package org.apache.ignite.internal.sql.engine.exec.rel; -import static org.apache.ignite.internal.util.ArrayUtils.nullOrEmpty; - -import java.util.Arrays; import java.util.BitSet; +import java.util.Collection; import java.util.Comparator; import java.util.Iterator; import java.util.concurrent.Flow.Publisher; @@ -36,12 +34,16 @@ import org.apache.ignite.internal.sql.engine.exec.RowConverter; import org.apache.ignite.internal.sql.engine.exec.RowHandler; import org.apache.ignite.internal.sql.engine.exec.exp.RangeCondition; import org.apache.ignite.internal.sql.engine.exec.exp.RangeIterable; +import org.apache.ignite.internal.sql.engine.metadata.PartitionWithTerm; import org.apache.ignite.internal.sql.engine.schema.IgniteIndex; import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Type; import org.apache.ignite.internal.sql.engine.schema.InternalIgniteTable; import org.apache.ignite.internal.sql.engine.util.Commons; +import org.apache.ignite.internal.sql.engine.util.LocalTxAttributesHolder; +import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.util.SubscriptionUtils; import org.apache.ignite.internal.util.TransformingIterator; +import org.apache.ignite.internal.utils.PrimaryReplica; import org.jetbrains.annotations.Contract; import org.jetbrains.annotations.Nullable; @@ -57,7 +59,8 @@ public class IndexScanNode<RowT> extends StorageScanNode<RowT> { private final RowHandler.RowFactory<RowT> factory; - private final int[] parts; + /** List of pairs containing the partition number to scan with the corresponding primary replica term. */ + private final Collection<PartitionWithTerm> partsWithTerms; /** Participating columns. */ private final @Nullable BitSet requiredColumns; @@ -72,7 +75,7 @@ public class IndexScanNode<RowT> extends StorageScanNode<RowT> { * @param ctx Execution context. * @param rowFactory Row factory. * @param schemaTable The table this node should scan. - * @param parts Partition numbers to scan. + * @param partsWithTerms List of pairs containing the partition number to scan with the corresponding primary replica term. * @param comp Rows comparator. * @param rangeConditions Range conditions. * @param filters Optional filter to filter out rows. @@ -84,7 +87,7 @@ public class IndexScanNode<RowT> extends StorageScanNode<RowT> { RowHandler.RowFactory<RowT> rowFactory, IgniteIndex schemaIndex, InternalIgniteTable schemaTable, - int[] parts, + Collection<PartitionWithTerm> partsWithTerms, @Nullable Comparator<RowT> comp, @Nullable RangeIterable<RowT> rangeConditions, @Nullable Predicate<RowT> filters, @@ -93,11 +96,11 @@ public class IndexScanNode<RowT> extends StorageScanNode<RowT> { ) { super(ctx, rowFactory, schemaTable, filters, rowTransformer, requiredColumns); - assert !nullOrEmpty(parts); + assert partsWithTerms != null && !partsWithTerms.isEmpty(); assert rangeConditions == null || rangeConditions.size() > 0; this.schemaIndex = schemaIndex; - this.parts = parts; + this.partsWithTerms = partsWithTerms; this.requiredColumns = requiredColumns; this.rangeConditions = rangeConditions; this.comp = comp; @@ -110,16 +113,17 @@ public class IndexScanNode<RowT> extends StorageScanNode<RowT> { @Override protected Publisher<RowT> scan() { if (rangeConditions != null) { - return SubscriptionUtils.concat(new TransformingIterator<>(rangeConditions.iterator(), cond -> indexPublisher(parts, cond))); + return SubscriptionUtils.concat( + new TransformingIterator<>(rangeConditions.iterator(), cond -> indexPublisher(partsWithTerms, cond))); } else { - return indexPublisher(parts, null); + return indexPublisher(partsWithTerms, null); } } - private Publisher<RowT> indexPublisher(int[] parts, @Nullable RangeCondition<RowT> cond) { + private Publisher<RowT> indexPublisher(Collection<PartitionWithTerm> partsWithTerms, @Nullable RangeCondition<RowT> cond) { Iterator<Publisher<? extends RowT>> it = new TransformingIterator<>( - Arrays.stream(parts).iterator(), - part -> partitionPublisher(part, cond) + partsWithTerms.iterator(), + partWithTerm -> partitionPublisher(partWithTerm, cond) ); if (comp != null) { @@ -129,9 +133,9 @@ public class IndexScanNode<RowT> extends StorageScanNode<RowT> { } } - private Publisher<RowT> partitionPublisher(int part, @Nullable RangeCondition<RowT> cond) { + private Publisher<RowT> partitionPublisher(PartitionWithTerm partWithTerm, @Nullable RangeCondition<RowT> cond) { Publisher<BinaryRow> pub; - boolean roTx = context().transactionTime() != null; + InternalTransaction tx = context().transaction(); if (schemaIndex.type() == Type.SORTED) { int flags = 0; @@ -148,20 +152,32 @@ public class IndexScanNode<RowT> extends StorageScanNode<RowT> { flags |= (cond.upperInclude()) ? SortedIndex.INCLUDE_RIGHT : 0; } - if (roTx) { + if (tx.isReadOnly()) { pub = ((SortedIndex) schemaIndex.index()).scan( - part, - context().transactionTime(), + partWithTerm.partId(), + tx.readTimestamp(), context().localNode(), lower, upper, flags, requiredColumns ); + } else if (!(tx instanceof LocalTxAttributesHolder)) { + // TODO IGNITE-17952 This block should be removed. + // Workaround to make RW scan work from tx coordinator. + pub = ((SortedIndex) schemaIndex.index()).scan( + partWithTerm.partId(), + tx, + lower, + upper, + flags, + requiredColumns + ); } else { pub = ((SortedIndex) schemaIndex.index()).scan( - part, - context().transaction(), + partWithTerm.partId(), + tx.id(), + new PrimaryReplica(context().localNode(), partWithTerm.term()), lower, upper, flags, @@ -174,18 +190,28 @@ public class IndexScanNode<RowT> extends StorageScanNode<RowT> { BinaryTuple key = toBinaryTuple(cond.lower()); - if (roTx) { + if (tx.isReadOnly()) { pub = schemaIndex.index().lookup( - part, - context().transactionTime(), + partWithTerm.partId(), + tx.readTimestamp(), context().localNode(), key, requiredColumns ); + } else if (!(tx instanceof LocalTxAttributesHolder)) { + // TODO IGNITE-17952 This block should be removed. + // Workaround to make RW lookup work from tx coordinator. + pub = schemaIndex.index().lookup( + partWithTerm.partId(), + tx, + key, + requiredColumns + ); } else { pub = schemaIndex.index().lookup( - part, - context().transaction(), + partWithTerm.partId(), + tx.id(), + new PrimaryReplica(context().localNode(), partWithTerm.term()), key, requiredColumns ); diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/StorageScanNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/StorageScanNode.java index 881e3a4f0f..4e2382107a 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/StorageScanNode.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/StorageScanNode.java @@ -80,7 +80,7 @@ public abstract class StorageScanNode<RowT> extends AbstractNode<RowT> { ) { super(ctx); - assert context().transaction() != null || context().transactionTime() != null : "Transaction not initialized."; + assert ctx.transaction() != null : "Transaction not initialized."; tableRowConverter = row -> schemaTable.toRow(context(), row, rowFactory, requiredColumns); diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNode.java index 57227042ec..5c332762e4 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNode.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNode.java @@ -17,10 +17,8 @@ package org.apache.ignite.internal.sql.engine.exec.rel; -import static org.apache.ignite.internal.util.ArrayUtils.nullOrEmpty; - -import java.util.Arrays; import java.util.BitSet; +import java.util.Collection; import java.util.Iterator; import java.util.concurrent.Flow.Publisher; import java.util.function.Function; @@ -28,10 +26,14 @@ import java.util.function.Predicate; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.sql.engine.exec.ExecutionContext; import org.apache.ignite.internal.sql.engine.exec.RowHandler; +import org.apache.ignite.internal.sql.engine.metadata.PartitionWithTerm; import org.apache.ignite.internal.sql.engine.schema.InternalIgniteTable; +import org.apache.ignite.internal.sql.engine.util.LocalTxAttributesHolder; import org.apache.ignite.internal.table.InternalTable; +import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.util.SubscriptionUtils; import org.apache.ignite.internal.util.TransformingIterator; +import org.apache.ignite.internal.utils.PrimaryReplica; import org.jetbrains.annotations.Nullable; /** @@ -42,7 +44,8 @@ public class TableScanNode<RowT> extends StorageScanNode<RowT> { /** Table that provides access to underlying data. */ private final InternalTable physTable; - private final int[] parts; + /** List of pairs containing the partition number to scan with the corresponding primary replica term. */ + private final Collection<PartitionWithTerm> partsWithTerms; /** * Constructor. @@ -50,7 +53,7 @@ public class TableScanNode<RowT> extends StorageScanNode<RowT> { * @param ctx Execution context. * @param rowFactory Row factory. * @param schemaTable The table this node should scan. - * @param parts Partition numbers to scan. + * @param partsWithTerms List of pairs containing the partition number to scan with the corresponding primary replica term. * @param filters Optional filter to filter out rows. * @param rowTransformer Optional projection function. * @param requiredColumns Optional set of column of interest. @@ -59,31 +62,37 @@ public class TableScanNode<RowT> extends StorageScanNode<RowT> { ExecutionContext<RowT> ctx, RowHandler.RowFactory<RowT> rowFactory, InternalIgniteTable schemaTable, - int[] parts, + Collection<PartitionWithTerm> partsWithTerms, @Nullable Predicate<RowT> filters, @Nullable Function<RowT, RowT> rowTransformer, @Nullable BitSet requiredColumns ) { super(ctx, rowFactory, schemaTable, filters, rowTransformer, requiredColumns); - assert !nullOrEmpty(parts); + assert partsWithTerms != null && !partsWithTerms.isEmpty(); this.physTable = schemaTable.table(); - this.parts = parts; + this.partsWithTerms = partsWithTerms; } /** {@inheritDoc} */ @Override protected Publisher<RowT> scan() { - boolean roTx = context().transactionTime() != null; - + InternalTransaction tx = context().transaction(); Iterator<Publisher<? extends RowT>> it = new TransformingIterator<>( - Arrays.stream(parts).iterator(), part -> { + partsWithTerms.iterator(), partWithTerm -> { Publisher<BinaryRow> pub; - if (roTx) { - pub = physTable.scan(part, context().transactionTime(), context().localNode()); + + if (tx.isReadOnly()) { + pub = physTable.scan(partWithTerm.partId(), tx.readTimestamp(), context().localNode()); + } else if (!(tx instanceof LocalTxAttributesHolder)) { + // TODO IGNITE-17952 This block should be removed. + // Workaround to make RW scan work from tx coordinator. + pub = physTable.scan(partWithTerm.partId(), tx); } else { - pub = physTable.scan(part, context().transaction()); + PrimaryReplica recipient = new PrimaryReplica(context().localNode(), partWithTerm.term()); + + pub = physTable.scan(partWithTerm.partId(), tx.id(), recipient, null, null, null, 0, null); } return convertPublisher(pub); diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartRequest.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartRequest.java index 73d85c97a7..d18edca3f6 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartRequest.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartRequest.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.sql.engine.message; +import java.util.UUID; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.sql.engine.metadata.FragmentDescription; import org.apache.ignite.network.annotations.Marshallable; @@ -55,4 +56,10 @@ public interface QueryStartRequest extends ExecutionContextAwareMessage { */ @Marshallable @Nullable HybridTimestamp txTime(); + + /** + * Transaction id. + */ + @Marshallable + UUID txId(); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/ColocationGroup.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/ColocationGroup.java index 4b153cffab..fc33780504 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/ColocationGroup.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/ColocationGroup.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.sql.engine.metadata; -import static org.apache.ignite.internal.util.ArrayUtils.asList; import static org.apache.ignite.internal.util.CollectionUtils.first; import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty; import static org.apache.ignite.internal.util.IgniteUtils.firstNotNull; @@ -27,11 +26,13 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.function.Predicate; +import java.util.stream.Collectors; import org.apache.ignite.internal.sql.engine.rel.IgniteRel; import org.apache.ignite.internal.sql.engine.util.Commons; -import org.apache.ignite.internal.util.IgniteIntList; import org.jetbrains.annotations.NotNull; /** @@ -39,14 +40,11 @@ import org.jetbrains.annotations.NotNull; * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859 */ public class ColocationGroup implements Serializable { - private static final int SYNTHETIC_PARTITIONS_COUNT = 512; - // TODO: IgniteSystemProperties.getInteger("IGNITE_CALCITE_SYNTHETIC_PARTITIONS_COUNT", 512); - private final List<Long> sourceIds; private final List<String> nodeNames; - private final List<List<String>> assignments; + private final List<List<NodeWithTerm>> assignments; /** * ForNodes. @@ -60,7 +58,7 @@ public class ColocationGroup implements Serializable { * ForAssignments. * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859 */ - public static ColocationGroup forAssignments(List<List<String>> assignments) { + public static ColocationGroup forAssignments(List<List<NodeWithTerm>> assignments) { return new ColocationGroup(null, null, assignments); } @@ -76,7 +74,7 @@ public class ColocationGroup implements Serializable { * Constructor. * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859 */ - private ColocationGroup(List<Long> sourceIds, List<String> nodeNames, List<List<String>> assignments) { + private ColocationGroup(List<Long> sourceIds, List<String> nodeNames, List<List<NodeWithTerm>> assignments) { this.sourceIds = sourceIds; this.nodeNames = nodeNames; this.assignments = assignments; @@ -100,7 +98,7 @@ public class ColocationGroup implements Serializable { * Get list of partitions (index) and nodes (items) having an appropriate partition in OWNING state, calculated for * distributed tables, involved in query execution. */ - public List<List<String>> assignments() { + public List<List<NodeWithTerm>> assignments() { return assignments == null ? Collections.emptyList() : assignments; } @@ -151,18 +149,20 @@ public class ColocationGroup implements Serializable { + "Replicated query parts are not co-located on all nodes"); } - List<List<String>> assignments; + List<List<NodeWithTerm>> assignments; + Set<String> nodeNamesSet = nodeNames == null ? null : new HashSet<>(nodeNames); + Predicate<String> nodeNamesFilter = nodeNames == null ? v -> true : nodeNamesSet::contains; + if (this.assignments == null || other.assignments == null) { assignments = firstNotNull(this.assignments, other.assignments); - if (assignments != null && nodeNames != null) { - Set<String> filter = new HashSet<>(nodeNames); - List<List<String>> assignments0 = new ArrayList<>(assignments.size()); + if (assignments != null && nodeNamesSet != null) { + List<List<NodeWithTerm>> assignments0 = new ArrayList<>(assignments.size()); for (int i = 0; i < assignments.size(); i++) { - List<String> assignment = Commons.intersect(filter, assignments.get(i)); + List<NodeWithTerm> assignment = filterByNodeNames(assignments.get(i), nodeNamesFilter); - if (assignment.isEmpty()) { // TODO check with partition filters + if (assignment.isEmpty()) { throw new ColocationMappingException("Failed to map fragment to location. " + "Partition mapping is empty [part=" + i + "]"); } @@ -174,17 +174,17 @@ public class ColocationGroup implements Serializable { } } else { assert this.assignments.size() == other.assignments.size(); + assignments = new ArrayList<>(this.assignments.size()); - Set<String> filter = nodeNames == null ? null : new HashSet<>(nodeNames); - for (int i = 0; i < this.assignments.size(); i++) { - List<String> assignment = Commons.intersect(this.assignments.get(i), other.assignments.get(i)); - if (filter != null) { - assignment.retainAll(filter); - } + for (int p = 0; p < this.assignments.size(); p++) { + List<NodeWithTerm> assignment0 = this.assignments.get(p); + List<NodeWithTerm> assignment1 = other.assignments.get(p); + + List<NodeWithTerm> assignment = intersect(assignment0, assignment1, nodeNamesFilter, p); - if (assignment.isEmpty()) { // TODO check with partition filters - throw new ColocationMappingException("Failed to map fragment to location. Partition mapping is empty [part=" + i + "]"); + if (assignment.isEmpty()) { + throw new ColocationMappingException("Failed to map fragment to location. Partition mapping is empty [part=" + p + "]"); } assignments.add(assignment); @@ -194,22 +194,92 @@ public class ColocationGroup implements Serializable { return new ColocationGroup(sourceIds, nodeNames, assignments); } + private List<NodeWithTerm> intersect( + List<NodeWithTerm> assignment0, + List<NodeWithTerm> assignment1, + Predicate<String> filter, + int p + ) throws ColocationMappingException { + if (assignment0.size() == 1 && assignment1.size() == 1) { + NodeWithTerm first = assignment0.get(0); + NodeWithTerm second = assignment1.get(0); + + if (filter.test(first.name()) && Objects.equals(first.name(), second.name())) { + validateTerm(first, second, p); + + return assignment0; + } + + return Collections.emptyList(); + } + + if (assignment0.size() > assignment1.size()) { + List<NodeWithTerm> tmp = assignment0; + assignment0 = assignment1; + assignment1 = tmp; + } + + List<NodeWithTerm> assignment = new ArrayList<>(); + + // Filter and hash a smaller list. + Map<String, NodeWithTerm> nameToAssignmentMapping = assignment0.stream() + .filter(v -> filter.test(v.name())) + .collect(Collectors.toMap(NodeWithTerm::name, nodeWithTerm -> nodeWithTerm)); + + // Iterate over a larger list. + for (NodeWithTerm first : assignment1) { + NodeWithTerm second = nameToAssignmentMapping.get(first.name()); + + if (second == null) { + continue; + } + + validateTerm(first, second, p); + + assignment.add(first); + } + + return assignment; + } + + private void validateTerm(NodeWithTerm first, NodeWithTerm second, int partId) throws ColocationMappingException { + if (first.term() != second.term()) { + throw new ColocationMappingException("Primary replica term has been changed during mapping [" + + "node=" + first.name() + + ", expectedTerm=" + first.term() + + ", actualTerm=" + second.term() + + ", part=" + partId + + ']'); + } + } + + private List<NodeWithTerm> filterByNodeNames(List<NodeWithTerm> assignment, Predicate<String> filter) { + List<NodeWithTerm> res = new ArrayList<>(assignment.size()); + + for (NodeWithTerm nodeWithTerm : assignment) { + if (!filter.test(nodeWithTerm.name())) { + continue; + } + + res.add(nodeWithTerm); + } + + return res; + } + /** - * Constructor. - * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859 + * Creates a new colocation group using only primary assignments. + * + * @return Colocation group with primary assignments. */ public ColocationGroup complete() { - if (assignments == null && nodeNames == null) { - return this; - } - if (assignments != null) { - List<List<String>> assignments = new ArrayList<>(this.assignments.size()); + List<List<NodeWithTerm>> assignments = new ArrayList<>(this.assignments.size()); Set<String> nodes = new HashSet<>(); - for (List<String> assignment : this.assignments) { - String first = first(assignment); + for (List<NodeWithTerm> assignment : this.assignments) { + NodeWithTerm first = first(assignment); if (first != null) { - nodes.add(first); + nodes.add(first.name()); } assignments.add(first != null ? Collections.singletonList(first) : Collections.emptyList()); } @@ -217,7 +287,7 @@ public class ColocationGroup implements Serializable { return new ColocationGroup(sourceIds, new ArrayList<>(nodes), assignments); } - return forNodes0(nodeNames); + return mapToNodes(nodeNames); } /** @@ -230,29 +300,30 @@ public class ColocationGroup implements Serializable { @NotNull private ColocationGroup forNodes0(List<String> nodeNames) { - List<List<String>> assignments = new ArrayList<>(SYNTHETIC_PARTITIONS_COUNT); - for (int i = 0; i < SYNTHETIC_PARTITIONS_COUNT; i++) { - assignments.add(asList(nodeNames.get(i % nodeNames.size()))); - } return new ColocationGroup(sourceIds, nodeNames, assignments); } /** - * Returns List of partitions to scan on the given node. + * Returns list of pairs containing the partition number to scan on the given node with the corresponding primary replica term. * - * @param nodeNames Cluster node consistent ID. - * @return List of partitions to scan on the given node. + * @param nodeName Cluster node consistent ID. + * @return List of pairs containing the partition number to scan on the given node with the corresponding primary replica term. */ - public int[] partitions(String nodeNames) { - IgniteIntList parts = new IgniteIntList(assignments.size()); + public List<PartitionWithTerm> partitionsWithTerms(String nodeName) { + List<PartitionWithTerm> partsWithTerms = new ArrayList<>(); + + for (int p = 0; p < assignments.size(); p++) { + List<NodeWithTerm> assignment = assignments.get(p); + + NodeWithTerm nodeWithTerm = first(assignment); + + assert nodeWithTerm != null : "part=" + p; - for (int i = 0; i < assignments.size(); i++) { - List<String> assignment = assignments.get(i); - if (Objects.equals(nodeNames, first(assignment))) { - parts.add(i); + if (Objects.equals(nodeName, nodeWithTerm.name())) { + partsWithTerms.add(new PartitionWithTerm(p, nodeWithTerm.term())); } } - return parts.array(); + return partsWithTerms; } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/IgniteMdFragmentMapping.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/IgniteMdFragmentMapping.java index 33d9bf189d..c08820b7f9 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/IgniteMdFragmentMapping.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/IgniteMdFragmentMapping.java @@ -227,6 +227,7 @@ public class IgniteMdFragmentMapping implements MetadataHandler<FragmentMappingM private static FragmentMapping getFragmentMapping(long sourceId, ProjectableFilterableTableScan rel, MappingQueryContext ctx) { ColocationGroup group = rel.getTable().unwrap(InternalIgniteTable.class).colocationGroup(ctx); + // TODO IGNITE-17952 The following block should be removed. // This condition is kinda workaround to make transactional scan works. // // For now, scan should be invoked on the node that coordinates the transaction. @@ -234,10 +235,9 @@ public class IgniteMdFragmentMapping implements MetadataHandler<FragmentMappingM // will need to replace actual distribution with fake one where every partition // is owned by a local node. if (!TraitUtils.distributionEnabled(rel)) { - List<List<String>> fakeAssignments = new ArrayList<>(group.assignments().size()); - + List<List<NodeWithTerm>> fakeAssignments = new ArrayList<>(group.assignments().size()); for (int i = 0; i < group.assignments().size(); i++) { - fakeAssignments.add(List.of(ctx.locNodeName())); + fakeAssignments.add(List.of(new NodeWithTerm(ctx.locNodeName(), -1L))); } group = ColocationGroup.forAssignments(fakeAssignments); diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/NodeWithTerm.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/NodeWithTerm.java new file mode 100644 index 0000000000..61e15efa5e --- /dev/null +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/NodeWithTerm.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine.metadata; + +import java.io.Serializable; + +/** + * Tuple representing primary replica node name with current term. + */ +public class NodeWithTerm implements Serializable { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** Primary replica node name. */ + private final String name; + + /** Primary replica term. */ + private final long term; + + /** + * Constructor. + * + * @param name Primary replica node name. + * @param term Primary replica term. + */ + public NodeWithTerm(String name, Long term) { + this.name = name; + this.term = term; + } + + /** + * Gets primary replica node name. + * + * @return Primary replica node name. + */ + public String name() { + return name; + } + + /** + * Gets primary replica term. + * + * @return Primary replica term. + */ + public long term() { + return term; + } +} diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/PartitionWithTerm.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/PartitionWithTerm.java new file mode 100644 index 0000000000..3da423a687 --- /dev/null +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/PartitionWithTerm.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine.metadata; + +/** + * Tuple representing the number of the partition with its current primary replica term. + */ +public class PartitionWithTerm { + /** Partition number. */ + private final int partId; + + /** Primary replica term. */ + private final long term; + + /** + * Constructor. + * + * @param partId partition number + * @param term Primary replica term. + */ + public PartitionWithTerm(int partId, Long term) { + this.partId = partId; + this.term = term; + } + + /** + * Gets partition number. + * + * @return Partition number. + */ + public int partId() { + return partId; + } + + /** + * Gets primary replica term. + * + * @return Primary replica term. + */ + public long term() { + return term; + } +} diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java index 8993cf29bc..d27c178db1 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java @@ -55,6 +55,7 @@ import org.apache.ignite.internal.sql.engine.exec.ExecutionContext; import org.apache.ignite.internal.sql.engine.exec.RowHandler; import org.apache.ignite.internal.sql.engine.exec.exp.RexImpTable; import org.apache.ignite.internal.sql.engine.metadata.ColocationGroup; +import org.apache.ignite.internal.sql.engine.metadata.NodeWithTerm; import org.apache.ignite.internal.sql.engine.prepare.MappingQueryContext; import org.apache.ignite.internal.sql.engine.rel.logical.IgniteLogicalIndexScan; import org.apache.ignite.internal.sql.engine.rel.logical.IgniteLogicalTableScan; @@ -481,7 +482,8 @@ public class IgniteTableImpl extends AbstractTable implements InternalIgniteTabl } private ColocationGroup partitionedGroup() { - List<List<String>> assignments = table.assignments().stream() + List<List<NodeWithTerm>> assignments = table.primaryReplicas().stream() + .map(primaryReplica -> new NodeWithTerm(primaryReplica.node().name(), primaryReplica.term())) .map(Collections::singletonList) .collect(Collectors.toList()); diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/DistributionFunction.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/DistributionFunction.java index d9ebc797fb..a34258caad 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/DistributionFunction.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/DistributionFunction.java @@ -28,6 +28,8 @@ import org.apache.calcite.rel.RelDistribution; import org.apache.calcite.rel.RelNode; import org.apache.calcite.util.ImmutableIntList; import org.apache.ignite.internal.sql.engine.metadata.ColocationGroup; +import org.apache.ignite.internal.sql.engine.metadata.NodeWithTerm; +import org.apache.ignite.internal.sql.engine.util.Commons; import org.apache.ignite.internal.sql.engine.util.HashFunctionFactory; import org.apache.ignite.internal.util.IgniteUtils; @@ -223,7 +225,7 @@ public abstract class DistributionFunction { public <RowT> Destination<RowT> destination(HashFunctionFactory<RowT> hashFuncFactory, ColocationGroup m, ImmutableIntList k) { assert m != null && !nullOrEmpty(m.assignments()) && !k.isEmpty(); - List<List<String>> assignments = m.assignments(); + List<List<String>> assignments = Commons.transform(m.assignments(), v -> Commons.transform(v, NodeWithTerm::name)); if (IgniteUtils.assertionsEnabled()) { for (List<String> assignment : assignments) { diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java index 2327fa28e2..2cd28e9225 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java @@ -49,7 +49,6 @@ import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.tools.FrameworkConfig; import org.apache.calcite.tools.Frameworks; -import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.sql.engine.QueryCancel; @@ -162,8 +161,6 @@ public final class BaseQueryContext extends AbstractQueryContext { private final InternalTransaction tx; - private final HybridTimestamp txTs; - private CalciteCatalogReader catalogReader; private long plannerTimeout; @@ -178,7 +175,6 @@ public final class BaseQueryContext extends AbstractQueryContext { Object[] parameters, IgniteLogger log, InternalTransaction tx, - HybridTimestamp txTs, long plannerTimeout ) { super(Contexts.chain(cfg.getContext())); @@ -191,7 +187,6 @@ public final class BaseQueryContext extends AbstractQueryContext { this.cancel = cancel; this.parameters = parameters; this.tx = tx; - this.txTs = txTs; this.plannerTimeout = plannerTimeout; RelDataTypeSystem typeSys = CALCITE_CONNECTION_CONFIG.typeSystem(RelDataTypeSystem.class, cfg.getTypeSystem()); @@ -245,10 +240,6 @@ public final class BaseQueryContext extends AbstractQueryContext { return tx; } - public HybridTimestamp transactionTime() { - return txTs; - } - public long plannerTimeout() { return plannerTimeout; } @@ -290,8 +281,7 @@ public final class BaseQueryContext extends AbstractQueryContext { .logger(log) .cancel(cancel) .parameters(parameters) - .transaction(tx) - .transactionTime(txTs); + .transaction(tx); } /** @@ -323,8 +313,6 @@ public final class BaseQueryContext extends AbstractQueryContext { private InternalTransaction tx; - private HybridTimestamp txTs; - private long plannerTimeout; public Builder frameworkConfig(FrameworkConfig frameworkCfg) { @@ -357,18 +345,13 @@ public final class BaseQueryContext extends AbstractQueryContext { return this; } - public Builder transactionTime(HybridTimestamp txTs) { - this.txTs = txTs; - return this; - } - public Builder plannerTimeout(long plannerTimeout) { this.plannerTimeout = plannerTimeout; return this; } public BaseQueryContext build() { - return new BaseQueryContext(queryId, frameworkCfg, cancel, parameters, log, tx, txTs, plannerTimeout); + return new BaseQueryContext(queryId, frameworkCfg, cancel, parameters, log, tx, plannerTimeout); } } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/LocalTxAttributesHolder.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/LocalTxAttributesHolder.java new file mode 100644 index 0000000000..232bfb72cf --- /dev/null +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/LocalTxAttributesHolder.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine.util; + +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.replicator.ReplicationGroupId; +import org.apache.ignite.internal.tx.InternalTransaction; +import org.apache.ignite.internal.tx.TxState; +import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.tx.TransactionException; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * Local holder of {@link InternalTransaction#id() id} and {@link InternalTransaction#readTimestamp() readTimestamp} transaction attributes. + */ +// TODO IGNITE-17952 This class must not implement the transaction interface and may be passed to the remote nodes. +public class LocalTxAttributesHolder implements InternalTransaction { + /** Transaction id. */ + private final UUID id; + + /** Read timestamp. */ + private final @Nullable HybridTimestamp readTimestamp; + + /** + * Constructor. + * + * @param id Transaction id. + * @param readTimestamp Read timestamp. + */ + public LocalTxAttributesHolder(UUID id, @Nullable HybridTimestamp readTimestamp) { + this.readTimestamp = readTimestamp; + this.id = id; + } + + @Override + public boolean isReadOnly() { + return readTimestamp != null; + } + + @Override + public @Nullable HybridTimestamp readTimestamp() { + return readTimestamp; + } + + @Override + public @NotNull UUID id() { + return id; + } + + @Override + public void commit() throws TransactionException { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture<Void> commitAsync() { + throw new UnsupportedOperationException(); + } + + @Override + public void rollback() throws TransactionException { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture<Void> rollbackAsync() { + throw new UnsupportedOperationException(); + } + + @Override + public IgniteBiTuple<ClusterNode, Long> enlistedNodeAndTerm(ReplicationGroupId replicationGroupId) { + throw new UnsupportedOperationException(); + } + + @Override + public TxState state() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean assignCommitPartition(ReplicationGroupId replicationGroupId) { + throw new UnsupportedOperationException(); + } + + @Override + public ReplicationGroupId commitPartition() { + throw new UnsupportedOperationException(); + } + + @Override + public IgniteBiTuple<ClusterNode, Long> enlist(ReplicationGroupId replicationGroupId, IgniteBiTuple<ClusterNode, Long> nodeAndTerm) { + throw new UnsupportedOperationException(); + } + + @Override + public void enlistResultFuture(CompletableFuture<?> resultFuture) { + throw new UnsupportedOperationException(); + } +} diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java index b23d82405c..9b1f98e566 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java @@ -36,6 +36,7 @@ import java.lang.management.ManagementFactory; import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; import java.util.Arrays; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -71,6 +72,7 @@ import org.apache.ignite.internal.testframework.IgniteTestUtils; import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.tx.TxManager; import org.apache.ignite.internal.tx.impl.HeapLockManager; +import org.apache.ignite.internal.utils.PrimaryReplica; import org.apache.ignite.lang.IgniteException; import org.apache.ignite.lang.IgniteInternalException; import org.apache.ignite.lang.NodeStoppingException; @@ -134,6 +136,8 @@ public class StopCalciteModuleTest { private final TestRevisionRegister testRevisionRegister = new TestRevisionRegister(); + private final ClusterNode localNode = new ClusterNode("mock-node-id", NODE_NAME, null); + /** * Before. * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859 @@ -142,9 +146,7 @@ public class StopCalciteModuleTest { public void before(TestInfo testInfo) { when(clusterSrvc.messagingService()).thenReturn(msgSrvc); when(clusterSrvc.topologyService()).thenReturn(topologySrvc); - - ClusterNode node = new ClusterNode("mock-node-id", NODE_NAME, null); - when(topologySrvc.localMember()).thenReturn(node); + when(topologySrvc.localMember()).thenReturn(localNode); SchemaDescriptor schemaDesc = new SchemaDescriptor( 1, @@ -221,6 +223,7 @@ public class StopCalciteModuleTest { ); when(tbl.tableId()).thenReturn(UUID.randomUUID()); + when(tbl.primaryReplicas()).thenReturn(List.of(new PrimaryReplica(localNode, -1L))); when(txManager.begin()).thenReturn(mock(InternalTransaction.class)); when(tbl.storage()).thenReturn(mock(MvTableStorage.class)); diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java index 71b5ca1af5..c29818745b 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java @@ -83,6 +83,7 @@ import org.apache.ignite.internal.sql.engine.util.BaseQueryContext; import org.apache.ignite.internal.sql.engine.util.Commons; import org.apache.ignite.internal.sql.engine.util.HashFunctionFactory; import org.apache.ignite.internal.sql.engine.util.HashFunctionFactoryImpl; +import org.apache.ignite.internal.sql.engine.util.LocalTxAttributesHolder; import org.apache.ignite.internal.testframework.IgniteTestUtils.RunnableX; import org.apache.ignite.internal.util.ArrayUtils; import org.apache.ignite.lang.IgniteInternalCheckedException; @@ -90,6 +91,7 @@ import org.apache.ignite.lang.IgniteInternalException; import org.apache.ignite.network.ClusterNode; import org.apache.ignite.network.NetworkAddress; import org.apache.ignite.network.NetworkMessage; +import org.apache.ignite.network.TopologyService; import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -383,12 +385,17 @@ public class ExecutionServiceImplTest { var schemaManagerMock = mock(SqlSchemaManager.class); + var clusterNode = new ClusterNode(UUID.randomUUID().toString(), nodeName, NetworkAddress.from("127.0.0.1:1111")); + + var topologyService = mock(TopologyService.class); + + when(topologyService.localMember()).thenReturn(clusterNode); + when(schemaManagerMock.tableById(any(), anyInt())).thenReturn(table); - var clusterNode = new ClusterNode(UUID.randomUUID().toString(), nodeName, NetworkAddress.from("127.0.0.1:1111")); var executionService = new ExecutionServiceImpl<>( - clusterNode, messageService, + topologyService, (single, filter) -> single ? List.of(nodeNames.get(ThreadLocalRandom.current().nextInt(nodeNames.size()))) : nodeNames, schemaManagerMock, mock(DdlCommandHandler.class), @@ -413,6 +420,7 @@ public class ExecutionServiceImplTest { .defaultSchema(wrap(schema)) .build() ) + .transaction(new LocalTxAttributesHolder(UUID.randomUUID(), null)) .logger(LOG) .build(); } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java index af6d1daf2b..125760b94c 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java @@ -117,8 +117,7 @@ public class RuntimeSortedIndexTest extends IgniteAbstractTest { "fake-test-node", null, ArrayRowHandler.INSTANCE, - Map.of(), - null + Map.of() ), RelCollations.of(ImmutableIntList.copyOf(idxCols)), (o1, o2) -> { diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java index 7e81eb4ede..dfa70fe72a 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java @@ -17,9 +17,6 @@ package org.apache.ignite.internal.sql.engine.exec.rel; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - import it.unimi.dsi.fastutil.longs.Long2ObjectMaps; import java.util.ArrayDeque; import java.util.Deque; @@ -43,12 +40,12 @@ import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutorImpl; import org.apache.ignite.internal.sql.engine.exec.RowHandler; import org.apache.ignite.internal.sql.engine.metadata.FragmentDescription; import org.apache.ignite.internal.sql.engine.util.BaseQueryContext; +import org.apache.ignite.internal.sql.engine.util.LocalTxAttributesHolder; import org.apache.ignite.internal.testframework.IgniteAbstractTest; import org.apache.ignite.internal.testframework.IgniteTestUtils; import org.apache.ignite.internal.thread.LogUncaughtExceptionHandler; import org.apache.ignite.internal.thread.NamedThreadFactory; import org.apache.ignite.internal.thread.StripedThreadPoolExecutor; -import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.util.Pair; import org.apache.ignite.network.ClusterNode; import org.apache.ignite.network.NetworkAddress; @@ -103,11 +100,9 @@ public class AbstractExecutionTest extends IgniteAbstractTest { FragmentDescription fragmentDesc = new FragmentDescription(0, null, null, Long2ObjectMaps.emptyMap()); - InternalTransaction tx = mock(InternalTransaction.class); - when(tx.rollbackAsync()).thenReturn(CompletableFuture.completedFuture(null)); - return new ExecutionContext<>( BaseQueryContext.builder() + .transaction(new LocalTxAttributesHolder(UUID.randomUUID(), null)) .logger(log) .build(), taskExecutor, @@ -116,8 +111,7 @@ public class AbstractExecutionTest extends IgniteAbstractTest { "fake-test-node", fragmentDesc, ArrayRowHandler.INSTANCE, - Map.of(), - tx + Map.of() ); } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java index 91526fc9ef..8e0850a577 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java @@ -33,6 +33,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.UUID; import java.util.concurrent.Flow.Publisher; import java.util.concurrent.Flow.Subscription; import java.util.concurrent.ThreadLocalRandom; @@ -59,6 +60,7 @@ import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory; import org.apache.ignite.internal.sql.engine.exec.exp.RangeCondition; import org.apache.ignite.internal.sql.engine.exec.exp.RangeIterable; import org.apache.ignite.internal.sql.engine.exec.exp.RexImpTable; +import org.apache.ignite.internal.sql.engine.metadata.PartitionWithTerm; import org.apache.ignite.internal.sql.engine.planner.AbstractPlannerTest; import org.apache.ignite.internal.sql.engine.schema.IgniteIndex; import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Type; @@ -337,13 +339,13 @@ public class IndexScanNodeExecutionTest extends AbstractExecutionTest { //CHECKSTYLE:OFF:Indentation Mockito.doAnswer(invocation -> { if (key != null) { - validateBound(indexDescriptor, schemaDescriptor, invocation.getArgument(2)); + validateBound(indexDescriptor, schemaDescriptor, invocation.getArgument(3)); } return dummyPublisher(partitionData(tableData, schemaDescriptor, invocation.getArgument(0))); }) .when(hashIndexMock) - .lookup(Mockito.anyInt(), any(), any(), any()); + .lookup(Mockito.anyInt(), (UUID) any(), any(), any(), any()); //CHECKSTYLE:ON:Indentation IgniteIndex indexMock = mock(IgniteIndex.class); @@ -382,15 +384,15 @@ public class IndexScanNodeExecutionTest extends AbstractExecutionTest { //CHECKSTYLE:OFF:Indentation Mockito.doAnswer(invocation -> { if (lowerBound != null) { - validateBoundPrefix(indexDescriptor, schemaDescriptor, invocation.getArgument(2)); + validateBoundPrefix(indexDescriptor, schemaDescriptor, invocation.getArgument(3)); } if (upperBound != null) { - validateBoundPrefix(indexDescriptor, schemaDescriptor, invocation.getArgument(3)); + validateBoundPrefix(indexDescriptor, schemaDescriptor, invocation.getArgument(4)); } return dummyPublisher(partitionData(tableData, schemaDescriptor, invocation.getArgument(0))); }).when(sortedIndexMock) - .scan(Mockito.anyInt(), any(), any(), any(), Mockito.anyInt(), any()); + .scan(Mockito.anyInt(), (UUID) any(), any(), any(), any(), Mockito.anyInt(), any()); //CHECKSTYLE:ON:Indentation IgniteIndex indexMock = mock(IgniteIndex.class); @@ -437,7 +439,7 @@ public class IndexScanNodeExecutionTest extends AbstractExecutionTest { ectx.rowHandler().factory(ectx.getTypeFactory(), rowType), index, new TestTable(rowType, schemaDescriptor), - new int[]{0, 2}, + List.of(new PartitionWithTerm(0, -1L), new PartitionWithTerm(2, -1L)), index.type() == Type.SORTED ? comp : null, rangeIterable, null, diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java index ee4680ed19..d5c781c7d6 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java @@ -488,7 +488,7 @@ public class MergeJoinExecutionTest extends AbstractExecutionTest { ExecutionContext<Object[]> ectx = new ExecutionContext<>(BaseQueryContext.builder().logger(log).build(), null, null, null, - null, null, ArrayRowHandler.INSTANCE, null, null); + null, null, ArrayRowHandler.INSTANCE, null); ExpressionFactoryImpl<Object[]> expFactory = new ExpressionFactoryImpl<>(ectx, typeFactory, SqlConformanceEnum.DEFAULT); diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java index d927e2e61a..6069a9edd5 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java @@ -21,23 +21,25 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.mock; import it.unimi.dsi.fastutil.ints.Int2ObjectMaps; -import java.util.ArrayList; import java.util.BitSet; -import java.util.Collection; +import java.util.List; import java.util.UUID; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Flow.Publisher; +import java.util.concurrent.Flow.Subscription; import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.calcite.rel.type.RelDataType; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.raft.service.RaftGroupService; import org.apache.ignite.internal.replicator.ReplicaService; import org.apache.ignite.internal.schema.BinaryRow; -import org.apache.ignite.internal.schema.BinaryTuple; import org.apache.ignite.internal.schema.BinaryTuplePrefix; import org.apache.ignite.internal.schema.ByteBufferRow; import org.apache.ignite.internal.sql.engine.exec.ExecutionContext; import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory; +import org.apache.ignite.internal.sql.engine.metadata.PartitionWithTerm; import org.apache.ignite.internal.sql.engine.planner.AbstractPlannerTest; import org.apache.ignite.internal.sql.engine.schema.InternalIgniteTable; import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution; @@ -48,12 +50,11 @@ import org.apache.ignite.internal.sql.engine.util.TypeUtils; import org.apache.ignite.internal.storage.engine.MvTableStorage; import org.apache.ignite.internal.table.InternalTable; import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl; -import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.tx.impl.HeapLockManager; import org.apache.ignite.internal.tx.impl.TxManagerImpl; import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage; +import org.apache.ignite.internal.utils.PrimaryReplica; import org.apache.ignite.network.ClusterNode; -import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.Test; @@ -72,7 +73,9 @@ public class TableScanNodeExecutionTest extends AbstractExecutionTest { int inBufSize = Commons.IN_BUFFER_SIZE; - int[] parts = {0, 1, 2}; + List<PartitionWithTerm> partsWithTerms = Stream.of(0, 1, 2) + .map(p -> new PartitionWithTerm(p, -1L)) + .collect(Collectors.toList()); int probingCnt = 50; @@ -93,7 +96,7 @@ public class TableScanNodeExecutionTest extends AbstractExecutionTest { dataAmount = size; - TableScanNode<Object[]> scanNode = new TableScanNode<>(ctx, rowFactory, tbl, parts, null, null, null); + TableScanNode<Object[]> scanNode = new TableScanNode<>(ctx, rowFactory, tbl, partsWithTerms, null, null, null); RootNode<Object[]> root = new RootNode<>(ctx); @@ -106,7 +109,7 @@ public class TableScanNodeExecutionTest extends AbstractExecutionTest { ++cnt; } - assertEquals(sizes[i++] * parts.length, cnt); + assertEquals(sizes[i++] * partsWithTerms.size(), cnt); } } @@ -161,29 +164,39 @@ public class TableScanNodeExecutionTest extends AbstractExecutionTest { } @Override - protected CompletableFuture<Collection<BinaryRow>> enlistCursorInTx( - @NotNull InternalTransaction tx, + public Publisher<BinaryRow> scan( int partId, - long scanId, - int batchSize, + UUID txId, + PrimaryReplica recipient, @Nullable UUID indexId, - @Nullable BinaryTuple exactKey, @Nullable BinaryTuplePrefix lowerBound, @Nullable BinaryTuplePrefix upperBound, int flags, @Nullable BitSet columnsToInclude ) { - int fillAmount = Math.min(dataAmount - processedPerPart[partId], batchSize); - - Collection<BinaryRow> out = new ArrayList<>(fillAmount); - - for (int i = 0; i < fillAmount; ++i) { - out.add(bbRow); - } - - processedPerPart[partId] += fillAmount; - - return CompletableFuture.completedFuture(out); + return s -> { + s.onSubscribe(new Subscription() { + @Override + public void request(long n) { + int fillAmount = Math.min(dataAmount - processedPerPart[partId], (int) n); + + processedPerPart[partId] += fillAmount; + + for (int i = 0; i < fillAmount; ++i) { + s.onNext(bbRow); + } + + if (processedPerPart[partId] == dataAmount) { + s.onComplete(); + } + } + + @Override + public void cancel() { + // No-op. + } + }); + }; } } } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java index 7885aa5b5e..be091a1c24 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.sql.engine.framework; import static org.apache.ignite.lang.IgniteStringFormatter.format; -import static org.mockito.Mockito.mock; import it.unimi.dsi.fastutil.longs.Long2ObjectMaps; import java.util.ArrayList; @@ -45,7 +44,6 @@ import org.apache.ignite.internal.sql.engine.schema.IgniteSchema; import org.apache.ignite.internal.sql.engine.schema.TableDescriptorImpl; import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution; import org.apache.ignite.internal.sql.engine.util.BaseQueryContext; -import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.network.ClusterNode; /** @@ -208,8 +206,7 @@ public class TestBuilders { node.name(), description, ArrayRowHandler.INSTANCE, - Map.of(), - mock(InternalTransaction.class) + Map.of() ); } } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java index 8968f50fde..94277e04a4 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java @@ -109,8 +109,8 @@ public class TestNode implements LifecycleAware { )); executionService = registerService(new ExecutionServiceImpl<>( - topologyService.localMember(), messageService, + topologyService, new MappingServiceImpl(topologyService), schemaManager, mock(DdlCommandHandler.class), diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java index 6022b24811..4bfad4f49b 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java @@ -130,6 +130,7 @@ import org.apache.ignite.internal.testframework.IgniteAbstractTest; import org.apache.ignite.internal.testframework.IgniteTestUtils; import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.util.ArrayUtils; +import org.apache.ignite.internal.utils.PrimaryReplica; import org.apache.ignite.network.ClusterNode; import org.jetbrains.annotations.Nullable; @@ -1229,6 +1230,13 @@ public abstract class AbstractPlannerTest extends IgniteAbstractTest { throw new AssertionError("Should not be called"); } + /** {@inheritDoc} */ + @Override + public Publisher<BinaryRow> lookup(int partId, UUID txId, PrimaryReplica recipient, BinaryTuple key, + @Nullable BitSet columns) { + throw new AssertionError("Should not be called"); + } + /** {@inheritDoc} */ @Override public Publisher<BinaryRow> lookup(int partId, HybridTimestamp timestamp, ClusterNode recipient, BinaryTuple key, BitSet columns) { @@ -1247,6 +1255,12 @@ public abstract class AbstractPlannerTest extends IgniteAbstractTest { @Nullable BinaryTuplePrefix leftBound, @Nullable BinaryTuplePrefix rightBound, int flags, BitSet columnsToInclude) { throw new AssertionError("Should not be called"); } + + @Override + public Publisher<BinaryRow> scan(int partId, UUID txId, PrimaryReplica recipient, @Nullable BinaryTuplePrefix leftBound, + @Nullable BinaryTuplePrefix rightBound, int flags, @Nullable BitSet columnsToInclude) { + throw new AssertionError("Should not be called"); + } } static class TestHashIndex implements Index<IndexDescriptor> { @@ -1296,6 +1310,13 @@ public abstract class AbstractPlannerTest extends IgniteAbstractTest { throw new AssertionError("Should not be called"); } + /** {@inheritDoc} */ + @Override + public Publisher<BinaryRow> lookup(int partId, UUID txId, PrimaryReplica recipient, BinaryTuple key, + @Nullable BitSet columns) { + throw new AssertionError("Should not be called"); + } + /** {@inheritDoc} */ @Override public Publisher<BinaryRow> lookup(int partId, HybridTimestamp timestamp, ClusterNode recipient, BinaryTuple key, BitSet columns) { diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTest.java index b4f081d56f..b008e65d44 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTest.java @@ -45,6 +45,7 @@ import org.apache.calcite.util.ImmutableIntList; import org.apache.calcite.util.Util; import org.apache.ignite.internal.index.ColumnCollation; import org.apache.ignite.internal.sql.engine.metadata.ColocationGroup; +import org.apache.ignite.internal.sql.engine.metadata.NodeWithTerm; import org.apache.ignite.internal.sql.engine.metadata.cost.IgniteCostFactory; import org.apache.ignite.internal.sql.engine.prepare.IgnitePlanner; import org.apache.ignite.internal.sql.engine.prepare.MappingQueryContext; @@ -82,6 +83,8 @@ import org.junit.jupiter.api.condition.OS; public class PlannerTest extends AbstractPlannerTest { private static List<String> NODES; + private static List<NodeWithTerm> NODES_WITH_TERM; + /** * Init. * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859 @@ -89,9 +92,13 @@ public class PlannerTest extends AbstractPlannerTest { @BeforeAll public static void init() { NODES = new ArrayList<>(4); + NODES_WITH_TERM = new ArrayList<>(4); for (int i = 0; i < 4; i++) { - NODES.add(UUID.randomUUID().toString()); + String nodeName = UUID.randomUUID().toString(); + + NODES.add(nodeName); + NODES_WITH_TERM.add(new NodeWithTerm(nodeName, 0L)); } } @@ -113,11 +120,11 @@ public class PlannerTest extends AbstractPlannerTest { @Override public ColocationGroup colocationGroup(MappingQueryContext ctx) { return ColocationGroup.forAssignments(Arrays.asList( - select(NODES, 0, 1), - select(NODES, 1, 2), - select(NODES, 2, 0), - select(NODES, 0, 1), - select(NODES, 1, 2) + select(NODES_WITH_TERM, 0, 1), + select(NODES_WITH_TERM, 1, 2), + select(NODES_WITH_TERM, 2, 0), + select(NODES_WITH_TERM, 0, 1), + select(NODES_WITH_TERM, 1, 2) )); } @@ -134,11 +141,11 @@ public class PlannerTest extends AbstractPlannerTest { .build(), "PROJECT") { @Override public ColocationGroup colocationGroup(MappingQueryContext ctx) { return ColocationGroup.forAssignments(Arrays.asList( - select(NODES, 0, 1), - select(NODES, 1, 2), - select(NODES, 2, 0), - select(NODES, 0, 1), - select(NODES, 1, 2))); + select(NODES_WITH_TERM, 0, 1), + select(NODES_WITH_TERM, 1, 2), + select(NODES_WITH_TERM, 2, 0), + select(NODES_WITH_TERM, 0, 1), + select(NODES_WITH_TERM, 1, 2))); } @Override public IgniteDistribution distribution() { @@ -295,10 +302,10 @@ public class PlannerTest extends AbstractPlannerTest { @Override public ColocationGroup colocationGroup(MappingQueryContext ctx) { return ColocationGroup.forAssignments(Arrays.asList( - select(NODES, 1, 2), - select(NODES, 2, 3), - select(NODES, 3, 0), - select(NODES, 0, 1) + select(NODES_WITH_TERM, 1, 2), + select(NODES_WITH_TERM, 2, 3), + select(NODES_WITH_TERM, 3, 0), + select(NODES_WITH_TERM, 0, 1) )); } @@ -376,9 +383,9 @@ public class PlannerTest extends AbstractPlannerTest { @Override public ColocationGroup colocationGroup(MappingQueryContext ctx) { return ColocationGroup.forAssignments(Arrays.asList( - select(NODES, 0), - select(NODES, 1), - select(NODES, 2) + select(NODES_WITH_TERM, 0), + select(NODES_WITH_TERM, 1), + select(NODES_WITH_TERM, 2) )); } @@ -521,9 +528,9 @@ public class PlannerTest extends AbstractPlannerTest { @Override public ColocationGroup colocationGroup(MappingQueryContext ctx) { return ColocationGroup.forAssignments(Arrays.asList( - select(NODES, 1), - select(NODES, 2), - select(NODES, 3) + select(NODES_WITH_TERM, 1), + select(NODES_WITH_TERM, 2), + select(NODES_WITH_TERM, 3) )); } diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java index 2a0b44b60c..b67ecd4c26 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java @@ -20,6 +20,7 @@ package org.apache.ignite.distributed; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; @@ -37,6 +38,7 @@ import java.util.concurrent.Flow; import java.util.concurrent.Flow.Publisher; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -61,6 +63,7 @@ import org.apache.ignite.internal.table.InternalTable; import org.apache.ignite.internal.table.impl.DummyInternalTableImpl; import org.apache.ignite.internal.testframework.IgniteAbstractTest; import org.apache.ignite.internal.tx.InternalTransaction; +import org.apache.ignite.internal.tx.TxState; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; @@ -87,7 +90,7 @@ public abstract class ItAbstractInternalTableScanTest extends IgniteAbstractTest private MvPartitionStorage mockStorage; /** Internal table to test. */ - protected InternalTable internalTbl; + DummyInternalTableImpl internalTbl; private final HybridClock clock = new HybridClockImpl(); @@ -196,7 +199,9 @@ public abstract class ItAbstractInternalTableScanTest extends IgniteAbstractTest return cursor; }); - scan(0, null).subscribe(new Subscriber<>() { + InternalTransaction tx = startTx(); + + scan(0, tx).subscribe(new Subscriber<>() { @Override public void onSubscribe(Subscription subscription) { @@ -212,6 +217,9 @@ public abstract class ItAbstractInternalTableScanTest extends IgniteAbstractTest public void onError(Throwable throwable) { gotException.set(throwable); subscriberFinishedLatch.countDown(); + + // Rollback the transaction manually, because only ID of the explicit transaction is passed to the internal table. + tx.rollback(); } @Override @@ -220,9 +228,13 @@ public abstract class ItAbstractInternalTableScanTest extends IgniteAbstractTest } }); - subscriberFinishedLatch.await(); + assertTrue(subscriberFinishedLatch.await(10, TimeUnit.SECONDS), "count=" + subscriberFinishedLatch.getCount()); assertEquals(gotException.get().getCause().getClass(), NoSuchElementException.class); + + if (tx != null) { + assertEquals(TxState.ABORTED, tx.state()); + } } /** @@ -237,7 +249,9 @@ public abstract class ItAbstractInternalTableScanTest extends IgniteAbstractTest when(mockStorage.scan(any(HybridTimestamp.class))).thenThrow(new StorageException("Some storage exception")); - scan(0, null).subscribe(new Subscriber<>() { + InternalTransaction tx = startTx(); + + scan(0, tx).subscribe(new Subscriber<>() { @Override public void onSubscribe(Subscription subscription) { @@ -253,6 +267,9 @@ public abstract class ItAbstractInternalTableScanTest extends IgniteAbstractTest public void onError(Throwable throwable) { gotException.set(throwable); gotExceptionLatch.countDown(); + + // Rollback the transaction manually, because only ID of the explicit transaction is passed to the internal table. + tx.rollback(); } @Override @@ -264,8 +281,11 @@ public abstract class ItAbstractInternalTableScanTest extends IgniteAbstractTest gotExceptionLatch.await(); assertEquals(gotException.get().getCause().getClass(), StorageException.class); - } + if (tx != null) { + assertEquals(TxState.ABORTED, tx.state()); + } + } /** * Checks that {@link IllegalArgumentException} is thrown in case of invalid partition. @@ -398,7 +418,9 @@ public abstract class ItAbstractInternalTableScanTest extends IgniteAbstractTest // The latch that allows to await Subscriber.onError() before asserting test invariants. CountDownLatch subscriberAllDataAwaitLatch = new CountDownLatch(1); - scan(0, null).subscribe(new Subscriber<>() { + InternalTransaction tx = startTx(); + + scan(0, tx).subscribe(new Subscriber<>() { private Subscription subscription; @Override @@ -438,6 +460,19 @@ public abstract class ItAbstractInternalTableScanTest extends IgniteAbstractTest for (int i = 0; i < expItems.size(); i++) { assertArrayEquals(expItems.get(i), gotItems.get(i)); } + + if (tx != null) { + tx.commit(); + } + } + + /** + * Start transaction if needed. + * + * @return Started transaction or {@code null}. + */ + protected InternalTransaction startTx() { + return null; } /** diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java index 3f6f15cf13..3f1fd642cc 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java @@ -18,9 +18,15 @@ package org.apache.ignite.distributed; import java.util.concurrent.Flow.Publisher; +import org.apache.ignite.internal.raft.service.RaftGroupService; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.table.InternalTable; +import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId; +import org.apache.ignite.internal.testframework.IgniteTestUtils; import org.apache.ignite.internal.tx.InternalTransaction; +import org.apache.ignite.internal.utils.PrimaryReplica; +import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.network.ClusterNode; /** * Tests for {@link InternalTable#scan(int, org.apache.ignite.internal.tx.InternalTransaction)}. @@ -29,6 +35,28 @@ public class ItInternalTableReadWriteScanTest extends ItAbstractInternalTableSca /** {@inheritDoc} */ @Override protected Publisher<BinaryRow> scan(int part, InternalTransaction tx) { - return internalTbl.scan(part, tx); + if (tx == null) { + return internalTbl.scan(part, null); + } + + IgniteBiTuple<ClusterNode, Long> leaderWithTerm = tx.enlistedNodeAndTerm(new TablePartitionId(internalTbl.tableId(), part)); + PrimaryReplica recipient = new PrimaryReplica(leaderWithTerm.get1(), leaderWithTerm.get2()); + + return internalTbl.scan(part, tx.id(), recipient, null, null, null, 0, null); + } + + /** {@inheritDoc} */ + @Override + protected InternalTransaction startTx() { + InternalTransaction tx = internalTbl.txManager().begin(); + + TablePartitionId tblPartId = new TablePartitionId(internalTbl.tableId(), ((TablePartitionId) internalTbl.groupId()).partitionId()); + RaftGroupService raftSvc = internalTbl.partitionRaftGroupService(tblPartId.partitionId()); + long term = IgniteTestUtils.await(raftSvc.refreshAndGetLeaderWithTerm()).term(); + + tx.assignCommitPartition(tblPartId); + tx.enlist(tblPartId, new IgniteBiTuple<>(internalTbl.leaderAssignment(tblPartId.partitionId()), term)); + + return tx; } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java index cb248a0cf7..8c4a67e7f0 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java @@ -34,6 +34,7 @@ import org.apache.ignite.internal.storage.engine.MvTableStorage; import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.tx.LockException; import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage; +import org.apache.ignite.internal.utils.PrimaryReplica; import org.apache.ignite.network.ClusterNode; import org.apache.ignite.tx.TransactionException; import org.jetbrains.annotations.NotNull; @@ -296,6 +297,29 @@ public interface InternalTable extends ManuallyCloseable { @Nullable BitSet columnsToInclude ); + /** + * Scans given partition index, providing {@link Publisher} that reactively notifies about partition rows. + * + * @param partId The partition. + * @param txId Transaction id. + * @param recipient Primary replica that will handle given get request. + * @param lowerBound Lower search bound. + * @param upperBound Upper search bound. + * @param flags Control flags. See {@link org.apache.ignite.internal.storage.index.SortedIndexStorage} constants. + * @param columnsToInclude Row projection. + * @return {@link Publisher} that reactively notifies about partition rows. + */ + Publisher<BinaryRow> scan( + int partId, + UUID txId, + PrimaryReplica recipient, + @Nullable UUID indexId, + @Nullable BinaryTuplePrefix lowerBound, + @Nullable BinaryTuplePrefix upperBound, + int flags, + @Nullable BitSet columnsToInclude + ); + /** * Scans given partition index, providing {@link Publisher} that reactively notifies about partition rows. * @@ -348,7 +372,9 @@ public interface InternalTable extends ManuallyCloseable { * @param key Key to search. * @param columnsToInclude Row projection. * @return {@link Publisher} that reactively notifies about partition rows. + * @deprecated IGNITE-17952 Use {@link #lookup(int, UUID, PrimaryReplica, UUID, BinaryTuple, BitSet)} instead. */ + @Deprecated Publisher<BinaryRow> lookup( int partId, @Nullable InternalTransaction tx, @@ -357,6 +383,27 @@ public interface InternalTable extends ManuallyCloseable { @Nullable BitSet columnsToInclude ); + /** + * Lookup rows corresponding to the given key given partition index, providing {@link Publisher} + * that reactively notifies about partition rows. + * + * @param partId The partition. + * @param txId Transaction id. + * @param recipient Primary replica that will handle given get request. + * @param indexId Index id. + * @param key Key to search. + * @param columnsToInclude Row projection. + * @return {@link Publisher} that reactively notifies about partition rows. + */ + Publisher<BinaryRow> lookup( + int partId, + UUID txId, + PrimaryReplica recipient, + UUID indexId, + BinaryTuple key, + @Nullable BitSet columnsToInclude + ); + /** * Gets a count of partitions of the table. * @@ -374,6 +421,13 @@ public interface InternalTable extends ManuallyCloseable { */ List<String> assignments(); + /** + * Gets a list of current primary replicas for each partition. + * + * @return List of current primary replicas for each partition. + */ + List<PrimaryReplica> primaryReplicas(); + /** * Returns cluster node that is the leader of the corresponding partition group or throws an exception if * it cannot be found. diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java index 554498ac9d..9dc4315734 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java @@ -77,6 +77,7 @@ import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.tx.LockException; import org.apache.ignite.internal.tx.TxManager; import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage; +import org.apache.ignite.internal.utils.PrimaryReplica; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteException; import org.apache.ignite.lang.IgniteFiveFunction; @@ -350,7 +351,7 @@ public class InternalTableImpl implements InternalTable { * @param columnsToInclude Row projection. * @return Batch of retrieved rows. */ - protected CompletableFuture<Collection<BinaryRow>> enlistCursorInTx( + private CompletableFuture<Collection<BinaryRow>> enlistCursorInTx( @NotNull InternalTransaction tx, int partId, long scanId, @@ -853,6 +854,19 @@ public class InternalTableImpl implements InternalTable { return scan(partId, tx, indexId, key, null, null, 0, columnsToInclude); } + /** {@inheritDoc} */ + @Override + public Publisher<BinaryRow> lookup( + int partId, + UUID txId, + PrimaryReplica recipient, + UUID indexId, + BinaryTuple key, + @Nullable BitSet columnsToInclude + ) { + return scan(partId, txId, recipient, indexId, key, null, null, 0, columnsToInclude); + } + /** {@inheritDoc} */ @Override public Publisher<BinaryRow> scan( @@ -967,6 +981,57 @@ public class InternalTableImpl implements InternalTable { ); } + + /** {@inheritDoc} */ + @Override + public Publisher<BinaryRow> scan( + int partId, + UUID txId, + PrimaryReplica recipient, + @Nullable UUID indexId, + @Nullable BinaryTuplePrefix lowerBound, + @Nullable BinaryTuplePrefix upperBound, + int flags, + @Nullable BitSet columnsToInclude + ) { + return scan(partId, txId, recipient, indexId, null, lowerBound, upperBound, flags, columnsToInclude); + } + + private Publisher<BinaryRow> scan( + int partId, + UUID txId, + PrimaryReplica recipient, + @Nullable UUID indexId, + @Nullable BinaryTuple exactKey, + @Nullable BinaryTuplePrefix lowerBound, + @Nullable BinaryTuplePrefix upperBound, + int flags, + @Nullable BitSet columnsToInclude + ) { + return new PartitionScanPublisher( + (scanId, batchSize) -> { + ReplicationGroupId partGroupId = partitionMap.get(partId).groupId(); + + ReadWriteScanRetrieveBatchReplicaRequest request = tableMessagesFactory.readWriteScanRetrieveBatchReplicaRequest() + .groupId(partGroupId) + .transactionId(txId) + .scanId(scanId) + .indexToUse(indexId) + .exactKey(exactKey) + .lowerBound(lowerBound) + .upperBound(upperBound) + .flags(flags) + .columnsToInclude(columnsToInclude) + .batchSize(batchSize) + .term(recipient.term()) + .build(); + + return replicaSvc.invoke(recipient.node(), request); + }, + // TODO: IGNITE-17666 Close cursor tx finish. + Function.identity()); + } + /** * Validates partition index. * @@ -1015,6 +1080,30 @@ public class InternalTableImpl implements InternalTable { .collect(Collectors.toList()); } + /** {@inheritDoc} */ + @Override + public List<PrimaryReplica> primaryReplicas() { + List<Entry<RaftGroupService>> entries = new ArrayList<>(partitionMap.int2ObjectEntrySet()); + List<CompletableFuture<LeaderWithTerm>> futs = new ArrayList<>(); + + entries.sort(Comparator.comparingInt(Entry::getIntKey)); + + for (Entry<RaftGroupService> e : entries) { + futs.add(e.getValue().refreshAndGetLeaderWithTerm()); + } + + List<PrimaryReplica> primaryReplicas = new ArrayList<>(entries.size()); + + for (CompletableFuture<LeaderWithTerm> fut : futs) { + LeaderWithTerm leaderWithTerm = fut.join(); + ClusterNode primaryNode = clusterNodeResolver.apply(leaderWithTerm.leader().consistentId()); + + primaryReplicas.add(new PrimaryReplica(primaryNode, leaderWithTerm.term())); + } + + return primaryReplicas; + } + @Override public ClusterNode leaderAssignment(int partition) { awaitLeaderInitialization(); diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java b/modules/table/src/main/java/org/apache/ignite/internal/utils/PrimaryReplica.java similarity index 50% copy from modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java copy to modules/table/src/main/java/org/apache/ignite/internal/utils/PrimaryReplica.java index 3f6f15cf13..e0114ea1bb 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/utils/PrimaryReplica.java @@ -15,20 +15,46 @@ * limitations under the License. */ -package org.apache.ignite.distributed; +package org.apache.ignite.internal.utils; -import java.util.concurrent.Flow.Publisher; -import org.apache.ignite.internal.schema.BinaryRow; -import org.apache.ignite.internal.table.InternalTable; -import org.apache.ignite.internal.tx.InternalTransaction; +import org.apache.ignite.network.ClusterNode; /** - * Tests for {@link InternalTable#scan(int, org.apache.ignite.internal.tx.InternalTransaction)}. + * Tuple representing primary replica node with current term. */ -public class ItInternalTableReadWriteScanTest extends ItAbstractInternalTableScanTest { - /** {@inheritDoc} */ - @Override - protected Publisher<BinaryRow> scan(int part, InternalTransaction tx) { - return internalTbl.scan(part, tx); +public class PrimaryReplica { + /** Primary replica node. */ + private final ClusterNode node; + + /** Replica term. */ + private final long term; + + /** + * Constructor. + * + * @param node Primary replica node. + * @param term Replica term. + */ + public PrimaryReplica(ClusterNode node, long term) { + this.node = node; + this.term = term; + } + + /** + * Gets primary replica node. + * + * @return Primary replica node. + */ + public ClusterNode node() { + return node; + } + + /** + * Gets replica term. + * + * @return Replica term. + */ + public long term() { + return term; } } diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java index 3346feb172..f8e4846776 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java @@ -317,6 +317,15 @@ public class DummyInternalTableImpl extends InternalTableImpl { return groupId; } + /** + * Gets the transaction manager that is bound to the table. + * + * @return Transaction manager. + */ + public TxManager txManager() { + return txManager; + } + /** {@inheritDoc} */ @Override public @NotNull String name() {