This is an automated email from the ASF dual-hosted git repository. ppa pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 4fd6b984d5 IGNITE-19898 Sql. Added ability to use different transaction facades to start an implicit transaction (#2449) 4fd6b984d5 is described below commit 4fd6b984d5a46ddb2bd6def55365a6b349feb720 Author: Pavel Pereslegin <xxt...@gmail.com> AuthorDate: Fri Aug 25 16:55:48 2023 +0300 IGNITE-19898 Sql. Added ability to use different transaction facades to start an implicit transaction (#2449) --- .../internal/cli/CliIntegrationTestBase.java | 7 +- .../client/handler/JdbcQueryEventHandlerImpl.java | 2 + .../requests/sql/ClientSqlExecuteRequest.java | 4 +- .../handler/JdbcQueryEventHandlerImplTest.java | 8 +- .../requests/jdbc/JdbcQueryCursorSelfTest.java | 15 +- .../client/fakes/FakeIgniteQueryProcessor.java | 3 +- .../internal/ClusterPerTestIntegrationTest.java | 2 +- .../benchmark/AbstractOneNodeBenchmark.java | 2 +- .../runner/app/ItIgniteNodeRestartTest.java | 1 - .../internal/sql/api/ItSqlAsynchronousApiTest.java | 48 ++-- .../internal/sql/api/ItSqlSynchronousApiTest.java | 22 +- .../sql/engine/ClusterPerClassIntegrationTest.java | 7 +- .../engine/datatypes/tests/BaseDataTypeTest.java | 6 + .../org/apache/ignite/internal/app/IgniteImpl.java | 3 +- .../sql/engine/util/TestQueryProcessor.java | 13 +- .../ignite/internal/sql/api/IgniteSqlImpl.java | 9 +- .../internal/sql/api/SessionBuilderImpl.java | 8 +- .../ignite/internal/sql/api/SessionImpl.java | 12 +- .../internal/sql/engine/AsyncSqlCursorImpl.java | 21 +- .../ignite/internal/sql/engine/QueryProcessor.java | 6 +- .../sql/engine/QueryTransactionWrapper.java | 59 +++++ .../internal/sql/engine/SqlQueryProcessor.java | 252 +++++++++++---------- .../sql/engine/AsyncSqlCursorImplTest.java | 51 ++--- .../engine/QueryTransactionWrapperSelfTest.java | 131 +++++++++++ .../internal/sql/engine/StopCalciteModuleTest.java | 10 +- .../sql/engine/exec/MockedStructuresTest.java | 88 +++---- .../internal/sql/engine/util/QueryChecker.java | 9 +- .../ignite/internal/table/AbstractTableView.java | 2 +- 28 files changed, 522 insertions(+), 279 deletions(-) diff --git a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTestBase.java b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTestBase.java index becb71c8cd..b06ea4bc9d 100644 --- a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTestBase.java +++ b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTestBase.java @@ -29,11 +29,13 @@ import org.apache.ignite.internal.app.IgniteImpl; import org.apache.ignite.internal.sql.engine.AsyncCursor; import org.apache.ignite.internal.sql.engine.AsyncCursor.BatchedResult; import org.apache.ignite.internal.sql.engine.QueryContext; +import org.apache.ignite.internal.sql.engine.QueryProcessor; import org.apache.ignite.internal.sql.engine.SqlQueryType; import org.apache.ignite.internal.sql.engine.property.PropertiesHelper; import org.apache.ignite.internal.sql.engine.session.SessionId; import org.apache.ignite.internal.testframework.IntegrationTestBase; import org.apache.ignite.table.Table; +import org.apache.ignite.tx.IgniteTransactions; import org.apache.ignite.tx.Transaction; import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.TestInstance; @@ -88,7 +90,8 @@ public abstract class CliIntegrationTestBase extends IntegrationTestBase { } protected static List<List<Object>> sql(@Nullable Transaction tx, String sql, Object... args) { - var queryEngine = ((IgniteImpl) CLUSTER_NODES.get(0)).queryEngine(); + QueryProcessor queryEngine = ((IgniteImpl) CLUSTER_NODES.get(0)).queryEngine(); + IgniteTransactions transactions = CLUSTER_NODES.get(0).transactions(); SessionId sessionId = queryEngine.createSession(PropertiesHelper.emptyHolder()); @@ -96,7 +99,7 @@ public abstract class CliIntegrationTestBase extends IntegrationTestBase { var context = QueryContext.create(SqlQueryType.ALL, tx); return getAllFromCursor( - await(queryEngine.querySingleAsync(sessionId, context, sql, args)) + await(queryEngine.querySingleAsync(sessionId, context, transactions, sql, args)) ); } finally { queryEngine.closeSession(sessionId); diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java index c04486f5ec..ea632c4f4d 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java @@ -165,6 +165,7 @@ public class JdbcQueryEventHandlerImpl implements JdbcQueryEventHandler { CompletableFuture<AsyncSqlCursor<List<Object>>> result = connectionContext.doInSession(sessionId -> processor.querySingleAsync( sessionId, context, + igniteTransactions, req.sqlQuery(), req.arguments() == null ? OBJECT_EMPTY_ARRAY : req.arguments() )); @@ -271,6 +272,7 @@ public class JdbcQueryEventHandlerImpl implements JdbcQueryEventHandler { CompletableFuture<AsyncSqlCursor<List<Object>>> result = connCtx.doInSession(sessionId -> processor.querySingleAsync( sessionId, queryContext, + igniteTransactions, sql, arg == null ? OBJECT_EMPTY_ARRAY : arg )); diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java index ad8675dd08..616c949a7d 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java @@ -79,7 +79,7 @@ public class ClientSqlExecuteRequest { arguments = ArrayUtils.OBJECT_EMPTY_ARRAY; } - // TODO IGNITE-19898 SQL implicit RO transaction should use observation timestamp. + // TODO IGNITE-20232 Propagate observable timestamp to sql engine using internal API. HybridTimestamp unused = HybridTimestamp.nullableHybridTimestamp(in.unpackLong()); return session @@ -87,7 +87,7 @@ public class ClientSqlExecuteRequest { .thenCompose(asyncResultSet -> { //noinspection StatementWithEmptyBody if (tx == null) { - // TODO IGNITE-19898 Return readTimestamp from implicit RO TX to the client + // TODO IGNITE-20232 Propagate observable timestamp to sql engine using internal API. // out.meta(asyncResultSet.tx().readTimestamp()); } diff --git a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImplTest.java b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImplTest.java index 89823b4cb9..6477c9b8bc 100644 --- a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImplTest.java +++ b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImplTest.java @@ -175,7 +175,7 @@ class JdbcQueryEventHandlerImplTest extends BaseIgniteAbstractTest { when(queryProcessor.createSession(any())).thenReturn(expectedSessionId); - when(queryProcessor.querySingleAsync(eq(expectedSessionId), any(), any())) + when(queryProcessor.querySingleAsync(eq(expectedSessionId), any(), any(), any())) .thenReturn(CompletableFuture.failedFuture(new RuntimeException("This is fine"))); long connectionId = acquireConnectionId(); @@ -185,7 +185,7 @@ class JdbcQueryEventHandlerImplTest extends BaseIgniteAbstractTest { ))); verify(queryProcessor).createSession(any()); - verify(queryProcessor).querySingleAsync(eq(expectedSessionId), any(), any(), any(Object[].class)); + verify(queryProcessor).querySingleAsync(eq(expectedSessionId), any(), any(), any(), any(Object[].class)); verifyNoMoreInteractions(queryProcessor); } @@ -240,7 +240,7 @@ class JdbcQueryEventHandlerImplTest extends BaseIgniteAbstractTest { @Test public void singleTxUsedForMultipleOperations() { - when(queryProcessor.querySingleAsync(any(), any(), any())) + when(queryProcessor.querySingleAsync(any(), any(), any(), any())) .thenReturn(CompletableFuture.failedFuture(new RuntimeException("Expected"))); Transaction tx = mock(Transaction.class); @@ -273,7 +273,7 @@ class JdbcQueryEventHandlerImplTest extends BaseIgniteAbstractTest { verify(tx).commitAsync(); verifyNoMoreInteractions(igniteTransactions); - verify(queryProcessor, times(5)).querySingleAsync(any(), any(), any(), any(Object[].class)); + verify(queryProcessor, times(5)).querySingleAsync(any(), any(), any(), any(), any(Object[].class)); } @Test diff --git a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/requests/jdbc/JdbcQueryCursorSelfTest.java b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/requests/jdbc/JdbcQueryCursorSelfTest.java index 3d1ae15c63..29bc7db0f4 100644 --- a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/requests/jdbc/JdbcQueryCursorSelfTest.java +++ b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/requests/jdbc/JdbcQueryCursorSelfTest.java @@ -24,17 +24,26 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import org.apache.ignite.internal.sql.engine.AsyncCursor.BatchedResult; import org.apache.ignite.internal.sql.engine.AsyncSqlCursorImpl; +import org.apache.ignite.internal.sql.engine.QueryTransactionWrapper; import org.apache.ignite.internal.sql.engine.SqlQueryType; import org.apache.ignite.internal.sql.engine.exec.AsyncWrapper; +import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; +import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; /** * Test class for {@link JdbcQueryCursor}. */ -public class JdbcQueryCursorSelfTest { +@ExtendWith(MockitoExtension.class) +public class JdbcQueryCursorSelfTest extends BaseIgniteAbstractTest { + @Mock + private QueryTransactionWrapper txWrapper; + private static final List<Integer> ROWS = List.of(1, 2, 3); private static final int TOTAL_ROWS_COUNT = ROWS.size(); @@ -58,9 +67,9 @@ public class JdbcQueryCursorSelfTest { assertEquals(ROWS, results); } - private static List<Integer> fetchFullBatch(int maxRows, int fetchSize) { + private List<Integer> fetchFullBatch(int maxRows, int fetchSize) { JdbcQueryCursor<Integer> cursor = new JdbcQueryCursor<>(maxRows, - new AsyncSqlCursorImpl<>(SqlQueryType.QUERY, null, null, + new AsyncSqlCursorImpl<>(SqlQueryType.QUERY, null, txWrapper, new AsyncWrapper<>(CompletableFuture.completedFuture(ROWS.iterator()), Runnable::run))); List<Integer> results = new ArrayList<>(maxRows); diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java index 08106c9cc6..c8c73fa78b 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java +++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java @@ -27,6 +27,7 @@ import org.apache.ignite.internal.sql.engine.QueryProcessor; import org.apache.ignite.internal.sql.engine.property.PropertiesHolder; import org.apache.ignite.internal.sql.engine.session.SessionId; import org.apache.ignite.internal.sql.engine.session.SessionInfo; +import org.apache.ignite.tx.IgniteTransactions; /** * Fake {@link QueryProcessor}. @@ -49,7 +50,7 @@ public class FakeIgniteQueryProcessor implements QueryProcessor { @Override public CompletableFuture<AsyncSqlCursor<List<Object>>> querySingleAsync( - SessionId sessionid, QueryContext context, String qry, + SessionId sessionid, QueryContext context, IgniteTransactions transactions, String qry, Object... params) { return CompletableFuture.completedFuture(new FakeCursor()); } diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java index 2413724fc4..9d3c4a57fd 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java @@ -207,7 +207,7 @@ public abstract class ClusterPerTestIntegrationTest extends IgniteIntegrationTes QueryContext context = QueryContext.create(SqlQueryType.ALL); return getAllFromCursor( - qryProc.querySingleAsync(sessionId, context, sql, args).join() + qryProc.querySingleAsync(sessionId, context, node(0).transactions(), sql, args).join() ); } } diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractOneNodeBenchmark.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractOneNodeBenchmark.java index 88f177aa8b..3de017059b 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractOneNodeBenchmark.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractOneNodeBenchmark.java @@ -114,7 +114,7 @@ public class AbstractOneNodeBenchmark { var context = QueryContext.create(SqlQueryType.ALL); getAllFromCursor( - await(queryEngine.querySingleAsync(sessionId, context, sql)) + await(queryEngine.querySingleAsync(sessionId, context, clusterNode.transactions(), sql)) ); } finally { queryEngine.closeSession(sessionId); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java index fbaf31240f..5a661e2634 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java @@ -400,7 +400,6 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { indexManager, schemaManager, dataStorageManager, - txManager, distributionZoneManager, () -> dataStorageModules.collectSchemasFields(modules.distributed().polymorphicSchemaExtensions()), replicaSvc, diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java index 48615d41ba..10ee8fb135 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java @@ -459,13 +459,14 @@ public class ItSqlAsynchronousApiTest extends ClusterPerClassIntegrationTest { @Test public void select() { sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); - for (int i = 0; i < ROW_COUNT; ++i) { - sql("INSERT INTO TEST VALUES (?, ?)", i, i); - } IgniteSql sql = igniteSql(); Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 4).build(); + for (int i = 0; i < ROW_COUNT; ++i) { + ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i); + } + TestPageProcessor pageProc = new TestPageProcessor(4); await(ses.executeAsync(null, "SELECT ID FROM TEST").thenCompose(pageProc)); @@ -481,11 +482,12 @@ public class ItSqlAsynchronousApiTest extends ClusterPerClassIntegrationTest { @Test public void metadata() { sql("CREATE TABLE TEST(COL0 BIGINT PRIMARY KEY, COL1 VARCHAR NOT NULL)"); - sql("INSERT INTO TEST VALUES (?, ?)", 1L, "some string"); IgniteSql sql = igniteSql(); Session ses = sql.sessionBuilder().build(); + ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", 1L, "some string"); + AsyncResultSet<SqlRow> rs = await(ses.executeAsync(null, "SELECT COL1, COL0 FROM TEST")); // Validate columns metadata. @@ -561,13 +563,14 @@ public class ItSqlAsynchronousApiTest extends ClusterPerClassIntegrationTest { @Test public void pageSequence() { sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); - for (int i = 0; i < ROW_COUNT; ++i) { - sql("INSERT INTO TEST VALUES (?, ?)", i, i); - } IgniteSql sql = igniteSql(); Session ses = sql.sessionBuilder().defaultPageSize(1).build(); + for (int i = 0; i < ROW_COUNT; ++i) { + ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i); + } + AsyncResultSet<SqlRow> ars0 = await(ses.executeAsync(null, "SELECT ID FROM TEST ORDER BY ID")); var p0 = ars0.currentPage(); AsyncResultSet<SqlRow> ars1 = await(ars0.fetchNextPage()); @@ -601,13 +604,14 @@ public class ItSqlAsynchronousApiTest extends ClusterPerClassIntegrationTest { @Test public void errors() { sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT NOT NULL)"); - for (int i = 0; i < ROW_COUNT; ++i) { - sql("INSERT INTO TEST VALUES (?, ?)", i, i); - } IgniteSql sql = igniteSql(); Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 2).build(); + for (int i = 0; i < ROW_COUNT; ++i) { + ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i); + } + // Parse error. checkError(SqlException.class, STMT_PARSE_ERR, "Failed to parse query", ses, "SELECT ID FROM"); @@ -685,13 +689,14 @@ public class ItSqlAsynchronousApiTest extends ClusterPerClassIntegrationTest { @Test public void closeSession() { sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); - for (int i = 0; i < ROW_COUNT; ++i) { - sql("INSERT INTO TEST VALUES (?, ?)", i, i); - } IgniteSql sql = igniteSql(); Session ses = sql.sessionBuilder().defaultPageSize(2).build(); + for (int i = 0; i < ROW_COUNT; ++i) { + ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i); + } + AsyncResultSet ars0 = await(ses.executeAsync(null, "SELECT ID FROM TEST")); await(ses.closeAsync()); @@ -769,13 +774,14 @@ public class ItSqlAsynchronousApiTest extends ClusterPerClassIntegrationTest { @Test public void resultSetCloseShouldFinishImplicitTransaction() throws InterruptedException { sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); - for (int i = 0; i < ROW_COUNT; ++i) { - sql("INSERT INTO TEST VALUES (?, ?)", i, i); - } IgniteSql sql = igniteSql(); - Session ses = sql.sessionBuilder().defaultPageSize(2).build(); + + for (int i = 0; i < ROW_COUNT; ++i) { + ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i); + } + CompletableFuture<AsyncResultSet<SqlRow>> f = ses.executeAsync(null, "SELECT * FROM TEST"); AsyncResultSet<SqlRow> ars = f.join(); @@ -790,14 +796,16 @@ public class ItSqlAsynchronousApiTest extends ClusterPerClassIntegrationTest { @Test public void resultSetFullReadShouldFinishImplicitTransaction() { sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); - for (int i = 0; i < ROW_COUNT; ++i) { - sql("INSERT INTO TEST VALUES (?, ?)", i, i); - } IgniteSql sql = igniteSql(); // Fetch all data in one read. Session ses = sql.sessionBuilder().defaultPageSize(100).build(); + + for (int i = 0; i < ROW_COUNT; ++i) { + ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i); + } + CompletableFuture<AsyncResultSet<SqlRow>> f = ses.executeAsync(null, "SELECT * FROM TEST"); AsyncResultSet<SqlRow> ars = f.join(); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java index dc1bf88eb7..518c8faf1a 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java @@ -249,13 +249,14 @@ public class ItSqlSynchronousApiTest extends ClusterPerClassIntegrationTest { @Test public void select() throws Exception { sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); - for (int i = 0; i < ROW_COUNT; ++i) { - sql("INSERT INTO TEST VALUES (?, ?)", i, i); - } IgniteSql sql = igniteSql(); Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 4).build(); + for (int i = 0; i < ROW_COUNT; ++i) { + ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i); + } + ResultSet<SqlRow> rs = ses.execute(null, "SELECT ID FROM TEST"); Set<Integer> set = Streams.stream(rs).map(r -> r.intValue(0)).collect(Collectors.toSet()); @@ -272,13 +273,13 @@ public class ItSqlSynchronousApiTest extends ClusterPerClassIntegrationTest { @Test public void errors() throws InterruptedException { sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT NOT NULL)"); - for (int i = 0; i < ROW_COUNT; ++i) { - sql("INSERT INTO TEST VALUES (?, ?)", i, i); - } - IgniteSql sql = igniteSql(); Session ses = sql.sessionBuilder().defaultPageSize(2).build(); + for (int i = 0; i < ROW_COUNT; ++i) { + ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i); + } + // Parse error. checkError(SqlException.class, STMT_PARSE_ERR, "Failed to parse query", ses, "SELECT ID FROM"); @@ -420,13 +421,14 @@ public class ItSqlSynchronousApiTest extends ClusterPerClassIntegrationTest { @Test public void resultSetCloseShouldFinishImplicitTransaction() { sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); - for (int i = 0; i < ROW_COUNT; ++i) { - sql("INSERT INTO TEST VALUES (?, ?)", i, i); - } IgniteSql sql = igniteSql(); Session ses = sql.sessionBuilder().defaultPageSize(2).build(); + for (int i = 0; i < ROW_COUNT; ++i) { + ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i); + } + ResultSet<?> rs = ses.execute(null, "SELECT * FROM TEST"); assertEquals(1, txManager().pending()); rs.close(); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java index 1f31358f2b..bf2fb5ac01 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java @@ -287,6 +287,11 @@ public abstract class ClusterPerClassIntegrationTest extends IgniteIntegrationTe protected QueryProcessor getEngine() { return ((IgniteImpl) CLUSTER_NODES.get(0)).queryEngine(); } + + @Override + protected IgniteTransactions transactions() { + return CLUSTER_NODES.get(0).transactions(); + } }; } @@ -446,7 +451,7 @@ public abstract class ClusterPerClassIntegrationTest extends IgniteIntegrationTe var context = QueryContext.create(SqlQueryType.ALL, tx); return getAllFromCursor( - await(queryEngine.querySingleAsync(sessionId, context, sql, args)) + await(queryEngine.querySingleAsync(sessionId, context, CLUSTER_NODES.get(0).transactions(), sql, args)) ); } finally { queryEngine.closeSession(sessionId); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/datatypes/tests/BaseDataTypeTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/datatypes/tests/BaseDataTypeTest.java index 1a795c0325..bceb796de4 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/datatypes/tests/BaseDataTypeTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/datatypes/tests/BaseDataTypeTest.java @@ -38,6 +38,7 @@ import org.apache.ignite.internal.sql.engine.util.QueryChecker.QueryTemplate; import org.apache.ignite.internal.sql.engine.util.TestQueryProcessor; import org.apache.ignite.sql.ColumnMetadata; import org.apache.ignite.sql.ColumnType; +import org.apache.ignite.tx.IgniteTransactions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.provider.Arguments; @@ -125,6 +126,11 @@ public abstract class BaseDataTypeTest<T extends Comparable<T>> extends ClusterP return ((IgniteImpl) CLUSTER_NODES.get(0)).queryEngine(); } + @Override + protected IgniteTransactions transactions() { + return igniteTx(); + } + @Override protected void checkMetadata(AsyncSqlCursor<?> cursor) { Optional<ColumnMetadata> testKey = cursor.metadata().columns() diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index bbc51f28bd..9828499320 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -555,7 +555,6 @@ public class IgniteImpl implements Ignite { indexManager, schemaManager, dataStorageMgr, - txManager, distributionZoneManager, () -> dataStorageModules.collectSchemasFields(modules.distributed().polymorphicSchemaExtensions()), replicaSvc, @@ -564,7 +563,7 @@ public class IgniteImpl implements Ignite { metricManager ); - sql = new IgniteSqlImpl(qryEngine); + sql = new IgniteSqlImpl(qryEngine, new IgniteTransactionsImpl(txManager)); var deploymentManagerImpl = new DeploymentManagerImpl( clusterSvc, diff --git a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/TestQueryProcessor.java b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/TestQueryProcessor.java index a62a433cf1..b78178768a 100644 --- a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/TestQueryProcessor.java +++ b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/TestQueryProcessor.java @@ -28,6 +28,7 @@ import org.apache.ignite.internal.sql.engine.QueryProcessor; import org.apache.ignite.internal.sql.engine.property.PropertiesHolder; import org.apache.ignite.internal.sql.engine.session.SessionId; import org.apache.ignite.internal.sql.engine.session.SessionInfo; +import org.apache.ignite.tx.IgniteTransactions; /** * {@link QueryProcessor} that handles test {@link NativeTypeWrapper native type wrappers} . @@ -71,11 +72,15 @@ public final class TestQueryProcessor implements QueryProcessor { /** {@inheritDoc} */ @Override - public CompletableFuture<AsyncSqlCursor<List<Object>>> querySingleAsync(SessionId sessionId, QueryContext context, String qry, - Object... params) { - + public CompletableFuture<AsyncSqlCursor<List<Object>>> querySingleAsync( + SessionId sessionId, + QueryContext context, + IgniteTransactions transactions, + String qry, + Object... params + ) { Object[] unwrappedParams = Arrays.stream(params).map(NativeTypeWrapper::unwrap).toArray(); - return queryProcessor.querySingleAsync(sessionId, context, qry, unwrappedParams); + return queryProcessor.querySingleAsync(sessionId, context, transactions, qry, unwrappedParams); } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java index 6f164001b2..8f674bfe17 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java @@ -24,6 +24,7 @@ import org.apache.ignite.sql.Session; import org.apache.ignite.sql.Session.SessionBuilder; import org.apache.ignite.sql.Statement; import org.apache.ignite.sql.Statement.StatementBuilder; +import org.apache.ignite.tx.IgniteTransactions; /** * Embedded implementation of the Ignite SQL query facade. @@ -31,13 +32,17 @@ import org.apache.ignite.sql.Statement.StatementBuilder; public class IgniteSqlImpl implements IgniteSql { private final QueryProcessor qryProc; + private final IgniteTransactions transactions; + /** * Constructor. * * @param qryProc Query processor. + * @param transactions Transactions facade. */ - public IgniteSqlImpl(QueryProcessor qryProc) { + public IgniteSqlImpl(QueryProcessor qryProc, IgniteTransactions transactions) { this.qryProc = qryProc; + this.transactions = transactions; } /** {@inheritDoc} */ @@ -49,7 +54,7 @@ public class IgniteSqlImpl implements IgniteSql { /** {@inheritDoc} */ @Override public SessionBuilder sessionBuilder() { - return new SessionBuilderImpl(qryProc, new HashMap<>()); + return new SessionBuilderImpl(qryProc, transactions, new HashMap<>()); } /** {@inheritDoc} */ diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionBuilderImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionBuilderImpl.java index 352db1c689..3ed4ca61b2 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionBuilderImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionBuilderImpl.java @@ -28,6 +28,7 @@ import org.apache.ignite.internal.sql.engine.session.SessionId; import org.apache.ignite.internal.sql.engine.session.SessionProperty; import org.apache.ignite.sql.Session; import org.apache.ignite.sql.Session.SessionBuilder; +import org.apache.ignite.tx.IgniteTransactions; import org.jetbrains.annotations.Nullable; /** @@ -40,6 +41,8 @@ public class SessionBuilderImpl implements SessionBuilder { private final QueryProcessor qryProc; + private final IgniteTransactions transactions; + private long queryTimeout = DEFAULT_QUERY_TIMEOUT; private long sessionTimeout = DEFAULT_SESSION_TIMEOUT; @@ -54,10 +57,12 @@ public class SessionBuilderImpl implements SessionBuilder { * Session builder constructor. * * @param qryProc SQL query processor. + * @param transactions Transactions facade. * @param props Initial properties. */ - SessionBuilderImpl(QueryProcessor qryProc, Map<String, Object> props) { + SessionBuilderImpl(QueryProcessor qryProc, IgniteTransactions transactions, Map<String, Object> props) { this.qryProc = qryProc; + this.transactions = transactions; this.props = props; } @@ -145,6 +150,7 @@ public class SessionBuilderImpl implements SessionBuilder { return new SessionImpl( sessionId, qryProc, + transactions, pageSize, propsHolder ); diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java index 638ad0495b..fca3f6ae50 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java @@ -59,6 +59,7 @@ import org.apache.ignite.sql.Statement; import org.apache.ignite.sql.async.AsyncResultSet; import org.apache.ignite.sql.reactive.ReactiveResultSet; import org.apache.ignite.table.mapper.Mapper; +import org.apache.ignite.tx.IgniteTransactions; import org.apache.ignite.tx.Transaction; import org.jetbrains.annotations.Nullable; @@ -73,6 +74,8 @@ public class SessionImpl implements AbstractSession { private final QueryProcessor qryProc; + private final IgniteTransactions transactions; + private final SessionId sessionId; private final int pageSize; @@ -83,16 +86,19 @@ public class SessionImpl implements AbstractSession { * Constructor. * * @param qryProc Query processor. + * @param transactions Transactions facade. * @param pageSize Query fetch page size. * @param props Session's properties. */ SessionImpl( SessionId sessionId, QueryProcessor qryProc, + IgniteTransactions transactions, int pageSize, PropertiesHolder props ) { this.qryProc = qryProc; + this.transactions = transactions; this.sessionId = sessionId; this.pageSize = pageSize; this.props = props; @@ -155,7 +161,7 @@ public class SessionImpl implements AbstractSession { propertyMap.put(entry.getKey().name, entry.getValue()); } - return new SessionBuilderImpl(qryProc, propertyMap) + return new SessionBuilderImpl(qryProc, transactions, propertyMap) .defaultPageSize(pageSize); } @@ -174,7 +180,7 @@ public class SessionImpl implements AbstractSession { try { QueryContext ctx = QueryContext.create(SqlQueryType.ALL, transaction); - result = qryProc.querySingleAsync(sessionId, ctx, query, arguments) + result = qryProc.querySingleAsync(sessionId, ctx, transactions, query, arguments) .thenCompose(cur -> cur.requestNextAsync(pageSize) .thenApply( batchRes -> new AsyncResultSetImpl<>( @@ -251,7 +257,7 @@ public class SessionImpl implements AbstractSession { Object[] args = batch.get(i).toArray(); final var qryFut = tail - .thenCompose(v -> qryProc.querySingleAsync(sessionId, ctx, query, args)); + .thenCompose(v -> qryProc.querySingleAsync(sessionId, ctx, transactions, query, args)); tail = qryFut.thenCompose(cur -> cur.requestNextAsync(1)) .thenAccept(page -> { diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java index 99251fa8cd..5d1baead5c 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java @@ -19,11 +19,9 @@ package org.apache.ignite.internal.sql.engine; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; -import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.util.ExceptionUtils; import org.apache.ignite.lang.IgniteExceptionMapperUtil; import org.apache.ignite.sql.ResultSetMetadata; -import org.jetbrains.annotations.Nullable; /** * Sql query cursor. @@ -33,7 +31,7 @@ import org.jetbrains.annotations.Nullable; public class AsyncSqlCursorImpl<T> implements AsyncSqlCursor<T> { private final SqlQueryType queryType; private final ResultSetMetadata meta; - private final @Nullable InternalTransaction implicitTx; + private final QueryTransactionWrapper txWrapper; private final AsyncCursor<T> dataCursor; /** @@ -46,12 +44,12 @@ public class AsyncSqlCursorImpl<T> implements AsyncSqlCursor<T> { public AsyncSqlCursorImpl( SqlQueryType queryType, ResultSetMetadata meta, - @Nullable InternalTransaction implicitTx, + QueryTransactionWrapper txWrapper, AsyncCursor<T> dataCursor ) { this.queryType = queryType; this.meta = meta; - this.implicitTx = implicitTx; + this.txWrapper = txWrapper; this.dataCursor = dataCursor; } @@ -72,16 +70,14 @@ public class AsyncSqlCursorImpl<T> implements AsyncSqlCursor<T> { public CompletableFuture<BatchedResult<T>> requestNextAsync(int rows) { return dataCursor.requestNextAsync(rows).handle((batch, t) -> { if (t != null) { - if (implicitTx != null) { - implicitTx.rollback(); - } + txWrapper.rollbackImplicit(); throw new CompletionException(wrapIfNecessary(t)); } - if (implicitTx != null && !batch.hasMore()) { + if (!batch.hasMore()) { // last batch, need to commit transaction - implicitTx.commit(); + txWrapper.commitImplicit(); } return batch; @@ -92,9 +88,8 @@ public class AsyncSqlCursorImpl<T> implements AsyncSqlCursor<T> { @Override public CompletableFuture<Void> closeAsync() { // Commit implicit transaction, if any. - if (implicitTx != null) { - implicitTx.commit(); - } + txWrapper.commitImplicit(); + return dataCursor.closeAsync(); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java index 92bacfd50d..b52ef56d34 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java @@ -24,6 +24,7 @@ import org.apache.ignite.internal.sql.engine.property.PropertiesHolder; import org.apache.ignite.internal.sql.engine.session.SessionId; import org.apache.ignite.internal.sql.engine.session.SessionInfo; import org.apache.ignite.lang.IgniteException; +import org.apache.ignite.tx.IgniteTransactions; /** * QueryProcessor interface. @@ -61,8 +62,10 @@ public interface QueryProcessor extends IgniteComponent { * * <p>If the query string contains more than one statement the IgniteException will be thrown. * + * @param sessionId A session identifier. * @param context User query context. - * @param qry Single statement SQL query . + * @param transactions Transactions facade. + * @param qry Single statement SQL query. * @param params Query parameters. * @return Sql cursor. * @@ -71,6 +74,7 @@ public interface QueryProcessor extends IgniteComponent { CompletableFuture<AsyncSqlCursor<List<Object>>> querySingleAsync( SessionId sessionId, QueryContext context, + IgniteTransactions transactions, String qry, Object... params ); diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryTransactionWrapper.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryTransactionWrapper.java new file mode 100644 index 0000000000..e39be75076 --- /dev/null +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryTransactionWrapper.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine; + +import org.apache.ignite.internal.tx.InternalTransaction; + +/** + * Wrapper for the transaction that encapsulates the management of an implicit transaction. + */ +public class QueryTransactionWrapper { + private final boolean implicit; + + private final InternalTransaction transaction; + + QueryTransactionWrapper(InternalTransaction transaction, boolean implicit) { + this.transaction = transaction; + this.implicit = implicit; + } + + /** + * Unwrap transaction. + */ + InternalTransaction unwrap() { + return transaction; + } + + /** + * Commits an implicit transaction, if one has been started. + */ + void commitImplicit() { + if (implicit) { + transaction.commit(); + } + } + + /** + * Rolls back an implicit transaction, if one has been started. + */ + void rollbackImplicit() { + if (implicit) { + transaction.rollback(); + } + } +} diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java index 647cc4e934..33445d9aac 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java @@ -31,7 +31,6 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.LongFunction; import java.util.function.Supplier; @@ -69,6 +68,7 @@ import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandlerWrapper; import org.apache.ignite.internal.sql.engine.message.MessageServiceImpl; import org.apache.ignite.internal.sql.engine.prepare.PrepareService; import org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl; +import org.apache.ignite.internal.sql.engine.prepare.QueryPlan; import org.apache.ignite.internal.sql.engine.property.PropertiesHelper; import org.apache.ignite.internal.sql.engine.property.PropertiesHolder; import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager; @@ -91,7 +91,6 @@ import org.apache.ignite.internal.table.distributed.TableManager; import org.apache.ignite.internal.table.event.TableEvent; import org.apache.ignite.internal.table.event.TableEventParameters; import org.apache.ignite.internal.tx.InternalTransaction; -import org.apache.ignite.internal.tx.TxManager; import org.apache.ignite.internal.util.IgniteSpinBusyLock; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.lang.IgniteInternalException; @@ -99,11 +98,13 @@ import org.apache.ignite.lang.NodeStoppingException; import org.apache.ignite.lang.SchemaNotFoundException; import org.apache.ignite.network.ClusterService; import org.apache.ignite.sql.SqlException; +import org.apache.ignite.tx.IgniteTransactions; +import org.apache.ignite.tx.TransactionOptions; import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.TestOnly; /** - * SqlQueryProcessor. + * SqlQueryProcessor. * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859 */ public class SqlQueryProcessor implements QueryProcessor { @@ -172,9 +173,6 @@ public class SqlQueryProcessor implements QueryProcessor { private volatile SqlSchemaManager sqlSchemaManager; - /** Transaction manager. */ - private final TxManager txManager; - /** Distribution zones manager. */ private final DistributionZoneManager distributionZoneManager; @@ -198,7 +196,6 @@ public class SqlQueryProcessor implements QueryProcessor { IndexManager indexManager, SchemaManager schemaManager, DataStorageManager dataStorageManager, - TxManager txManager, DistributionZoneManager distributionZoneManager, Supplier<Map<String, Map<String, Class<?>>>> dataStorageFieldsSupplier, ReplicaService replicaService, @@ -212,7 +209,6 @@ public class SqlQueryProcessor implements QueryProcessor { this.indexManager = indexManager; this.schemaManager = schemaManager; this.dataStorageManager = dataStorageManager; - this.txManager = txManager; this.distributionZoneManager = distributionZoneManager; this.dataStorageFieldsSupplier = dataStorageFieldsSupplier; this.replicaService = replicaService; @@ -367,14 +363,18 @@ public class SqlQueryProcessor implements QueryProcessor { /** {@inheritDoc} */ @Override public CompletableFuture<AsyncSqlCursor<List<Object>>> querySingleAsync( - SessionId sessionId, QueryContext context, String qry, Object... params + SessionId sessionId, + QueryContext context, + IgniteTransactions transactions, + String qry, + Object... params ) { if (!busyLock.enterBusy()) { throw new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException()); } try { - return querySingle0(sessionId, context, qry, params); + return querySingle0(sessionId, context, transactions, qry, params); } finally { busyLock.leaveBusy(); } @@ -401,6 +401,7 @@ public class SqlQueryProcessor implements QueryProcessor { private CompletableFuture<AsyncSqlCursor<List<Object>>> querySingle0( SessionId sessionId, QueryContext context, + IgniteTransactions transactions, String sql, Object... params ) { @@ -429,93 +430,120 @@ public class SqlQueryProcessor implements QueryProcessor { return CompletableFuture.failedFuture(new SessionNotFoundException(sessionId)); } - CompletableFuture<Void> start = new CompletableFuture<>(); + CompletableFuture<AsyncSqlCursor<List<Object>>> start = new CompletableFuture<>(); - AtomicReference<InternalTransaction> tx = new AtomicReference<>(); + CompletableFuture<AsyncSqlCursor<List<Object>>> stage = start.thenCompose(ignored -> { + ParsedResult result = parserService.parse(sql); - CompletableFuture<AsyncSqlCursor<List<Object>>> stage = start - .thenCompose(ignored -> { - ParsedResult result = parserService.parse(sql); + validateParsedStatement(context, result, params); - validateParsedStatement(context, outerTx, result, params); + QueryTransactionWrapper txWrapper = wrapTxOrStartImplicit(result.queryType(), transactions, outerTx); - boolean rwOp = dataModificationOp(result); + return waitForActualSchema(schemaName, txWrapper.unwrap().startTimestamp()) + .thenCompose(schema -> { + BaseQueryContext ctx = BaseQueryContext.builder() + .frameworkConfig(Frameworks.newConfigBuilder(FRAMEWORK_CONFIG).defaultSchema(schema).build()) + .logger(LOG) + .cancel(queryCancel) + .parameters(params).build(); - boolean implicitTxRequired = outerTx == null; + return prepareSvc.prepareAsync(result, ctx).thenApply(plan -> executePlan(session, txWrapper, ctx, plan)); + }).whenComplete((res, ex) -> { + if (ex != null) { + txWrapper.rollbackImplicit(); + } + }); + }); - InternalTransaction currentTx = implicitTxRequired ? txManager.begin(!rwOp, null) : outerTx; + // TODO IGNITE-20078 Improve (or remove) CancellationException handling. + stage.whenComplete((cur, ex) -> { + if (ex instanceof CancellationException) { + queryCancel.cancel(); + } + }); - tx.set(currentTx); + start.completeAsync(() -> null, taskExecutor); - // TODO IGNITE-18733: wait for actual metadata for TX. - HybridTimestamp txTimestamp = currentTx.startTimestamp(); + return stage; + } - SchemaPlus schema = sqlSchemaManager.schema(schemaName, txTimestamp.longValue()); + private CompletableFuture<SchemaPlus> waitForActualSchema(String schemaName, HybridTimestamp timestamp) { + try { + // TODO IGNITE-18733: wait for actual metadata for TX. + SchemaPlus schema = sqlSchemaManager.schema(schemaName, timestamp.longValue()); - if (schema == null) { - return CompletableFuture.failedFuture(new SchemaNotFoundException(schemaName)); - } + if (schema == null) { + return CompletableFuture.failedFuture(new SchemaNotFoundException(schemaName)); + } - BaseQueryContext ctx = BaseQueryContext.builder() - .frameworkConfig( - Frameworks.newConfigBuilder(FRAMEWORK_CONFIG) - .defaultSchema(schema) - .build() - ) - .logger(LOG) - .cancel(queryCancel) - .parameters(params) - .build(); - - return prepareSvc.prepareAsync(result, ctx) - .thenApply(plan -> { - var dataCursor = executionSrvc.executePlan(tx.get(), plan, ctx); - - SqlQueryType queryType = plan.type(); - assert queryType != null : "Expected a full plan but got a fragment: " + plan; - - numberOfOpenCursors.incrementAndGet(); - - return new AsyncSqlCursorImpl<>( - queryType, - plan.metadata(), - implicitTxRequired ? tx.get() : null, - new AsyncCursor<List<Object>>() { - @Override - public CompletableFuture<BatchedResult<List<Object>>> requestNextAsync(int rows) { - session.touch(); - - return dataCursor.requestNextAsync(rows); - } - - @Override - public CompletableFuture<Void> closeAsync() { - session.touch(); - numberOfOpenCursors.decrementAndGet(); - - return dataCursor.closeAsync(); - } - } - ); - }); - }); + return CompletableFuture.completedFuture(schema); + } catch (Throwable t) { + return CompletableFuture.failedFuture(t); + } + } - stage.whenComplete((cur, ex) -> { - if (ex instanceof CancellationException) { - queryCancel.cancel(); - } + private AsyncSqlCursor<List<Object>> executePlan( + Session session, + QueryTransactionWrapper txWrapper, + BaseQueryContext ctx, + QueryPlan plan + ) { + var dataCursor = executionSrvc.executePlan(txWrapper.unwrap(), plan, ctx); + + SqlQueryType queryType = plan.type(); + assert queryType != null : "Expected a full plan but got a fragment: " + plan; - if (ex != null && outerTx == null) { - InternalTransaction tx0 = tx.get(); - if (tx0 != null) { - tx0.rollback(); + numberOfOpenCursors.incrementAndGet(); + + return new AsyncSqlCursorImpl<>( + queryType, + plan.metadata(), + txWrapper, + new AsyncCursor<List<Object>>() { + @Override + public CompletableFuture<BatchedResult<List<Object>>> requestNextAsync(int rows) { + session.touch(); + + return dataCursor.requestNextAsync(rows); + } + + @Override + public CompletableFuture<Void> closeAsync() { + session.touch(); + numberOfOpenCursors.decrementAndGet(); + + return dataCursor.closeAsync(); + } } - } - }); + ); + } - start.completeAsync(() -> null, taskExecutor); + /** + * Creates a new transaction wrapper using an existing outer transaction or starting a new "implicit" transaction. + * + * @param queryType Query type. + * @param transactions Transactions facade. + * @param outerTx Outer transaction. + * @return Wrapper for an active transaction. + * @throws SqlException If an outer transaction was started for a {@link SqlQueryType#DDL DDL} query. + */ + static QueryTransactionWrapper wrapTxOrStartImplicit( + SqlQueryType queryType, + IgniteTransactions transactions, + @Nullable InternalTransaction outerTx + ) { + if (outerTx == null) { + InternalTransaction tx = (InternalTransaction) transactions.begin( + new TransactionOptions().readOnly(queryType != SqlQueryType.DML)); - return stage; + return new QueryTransactionWrapper(tx, true); + } + + if (SqlQueryType.DDL == queryType) { + throw new SqlException(STMT_VALIDATION_ERR, "DDL doesn't support transactions."); + } + + return new QueryTransactionWrapper(outerTx, false); } @TestOnly @@ -548,11 +576,11 @@ public class SqlQueryProcessor implements QueryProcessor { @Override public CompletableFuture<Boolean> notify(TableEventParameters parameters, @Nullable Throwable exception) { return schemaHolder.onTableCreated( - // TODO: https://issues.apache.org/jira/browse/IGNITE-17694 Hardcoded schemas - DEFAULT_SCHEMA_NAME, - parameters.tableId(), - parameters.causalityToken() - ) + // TODO: https://issues.apache.org/jira/browse/IGNITE-17694 Hardcoded schemas + DEFAULT_SCHEMA_NAME, + parameters.tableId(), + parameters.causalityToken() + ) .thenApply(v -> false); } } @@ -566,11 +594,11 @@ public class SqlQueryProcessor implements QueryProcessor { @Override public CompletableFuture<Boolean> notify(TableEventParameters parameters, @Nullable Throwable exception) { return schemaHolder.onTableUpdated( - // TODO: https://issues.apache.org/jira/browse/IGNITE-17694 Hardcoded schemas - DEFAULT_SCHEMA_NAME, - parameters.tableId(), - parameters.causalityToken() - ) + // TODO: https://issues.apache.org/jira/browse/IGNITE-17694 Hardcoded schemas + DEFAULT_SCHEMA_NAME, + parameters.tableId(), + parameters.causalityToken() + ) .thenApply(v -> false); } } @@ -584,11 +612,11 @@ public class SqlQueryProcessor implements QueryProcessor { @Override public CompletableFuture<Boolean> notify(TableEventParameters parameters, @Nullable Throwable exception) { return schemaHolder.onTableDropped( - // TODO: https://issues.apache.org/jira/browse/IGNITE-17694 Hardcoded schemas - DEFAULT_SCHEMA_NAME, - parameters.tableId(), - parameters.causalityToken() - ) + // TODO: https://issues.apache.org/jira/browse/IGNITE-17694 Hardcoded schemas + DEFAULT_SCHEMA_NAME, + parameters.tableId(), + parameters.causalityToken() + ) .thenApply(v -> false); } } @@ -602,12 +630,12 @@ public class SqlQueryProcessor implements QueryProcessor { @Override public CompletableFuture<Boolean> notify(IndexEventParameters parameters, @Nullable Throwable exception) { return schemaHolder.onIndexDropped( - // TODO: https://issues.apache.org/jira/browse/IGNITE-17694 Hardcoded schemas - DEFAULT_SCHEMA_NAME, - parameters.tableId(), - parameters.indexId(), - parameters.causalityToken() - ) + // TODO: https://issues.apache.org/jira/browse/IGNITE-17694 Hardcoded schemas + DEFAULT_SCHEMA_NAME, + parameters.tableId(), + parameters.indexId(), + parameters.causalityToken() + ) .thenApply(v -> false); } } @@ -621,24 +649,18 @@ public class SqlQueryProcessor implements QueryProcessor { @Override public CompletableFuture<Boolean> notify(IndexEventParameters parameters, @Nullable Throwable exception) { return schemaHolder.onIndexCreated( - parameters.tableId(), - parameters.indexId(), - parameters.indexDescriptor(), - parameters.causalityToken() - ) + parameters.tableId(), + parameters.indexId(), + parameters.indexDescriptor(), + parameters.causalityToken() + ) .thenApply(v -> false); } } - /** Returns {@code true} if this is data modification operation. */ - private static boolean dataModificationOp(ParsedResult parsedResult) { - return parsedResult.queryType() == SqlQueryType.DML; - } - /** Performs additional validation of a parsed statement. **/ private static void validateParsedStatement( QueryContext context, - @Nullable InternalTransaction outerTx, ParsedResult parsedResult, Object[] params ) { @@ -651,10 +673,6 @@ public class SqlQueryProcessor implements QueryProcessor { throw new QueryValidationException(message); } - if (SqlQueryType.DDL == queryType && outerTx != null) { - throw new SqlException(STMT_VALIDATION_ERR, "DDL doesn't support transactions."); - } - if (parsedResult.dynamicParamsCount() != params.length) { String message = format( "Unexpected number of query parameters. Provided {} but there is only {} dynamic parameter(s).", diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImplTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImplTest.java index dec2b80a98..eb30c61bcb 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImplTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImplTest.java @@ -21,7 +21,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.Collections; import java.util.List; @@ -48,70 +47,66 @@ public class AsyncSqlCursorImplTest { private static final ResultSetMetadata RESULT_SET_METADATA = new ResultSetMetadataImpl(Collections.emptyList()); /** Cursor should trigger commit of implicit transaction (if any) only if data is fully read. */ - @ParameterizedTest + @ParameterizedTest(name = "{0}") @MethodSource("transactions") - public void testTriggerCommitAfterDataIsFullyRead(NoOpTransaction implicitTx) { + public void testTriggerCommitAfterDataIsFullyRead(boolean implicit, QueryTransactionWrapper txWrapper) { List<Integer> list = List.of(1, 2, 3); - AsyncSqlCursorImpl<Integer> cursor = new AsyncSqlCursorImpl<>(SqlQueryType.QUERY, RESULT_SET_METADATA, implicitTx, + AsyncSqlCursorImpl<Integer> cursor = new AsyncSqlCursorImpl<>(SqlQueryType.QUERY, RESULT_SET_METADATA, txWrapper, new AsyncWrapper<>(CompletableFuture.completedFuture(list.iterator()), Runnable::run)); int requestRows = 2; BatchedResult<Integer> in1 = cursor.requestNextAsync(requestRows).join(); assertEquals(in1.items(), list.subList(0, requestRows)); - if (implicitTx != null) { - CompletableFuture<Void> f = implicitTx.commitFuture(); - assertFalse(f.isDone(), "Implicit transaction should have not been committed because there is more data."); - } + assertFalse(((NoOpTransaction) txWrapper.unwrap()).commitFuture().isDone(), + "Implicit transaction should have not been committed because there is more data."); BatchedResult<Integer> in2 = cursor.requestNextAsync(requestRows).join(); assertEquals(in2.items(), list.subList(requestRows, list.size())); - if (implicitTx != null) { - CompletableFuture<Void> f = implicitTx.commitFuture(); - assertTrue(f.isDone(), "Implicit transaction should been committed because there is no more data"); - } + CompletableFuture<Void> f = ((NoOpTransaction) txWrapper.unwrap()).commitFuture(); + assertEquals(implicit, f.isDone(), "Implicit transaction should been committed because there is no more data"); } /** Exception on read should trigger rollback of implicit transaction, if any. */ - @ParameterizedTest + @ParameterizedTest(name = "{0}") @MethodSource("transactions") - public void testExceptionRollbacksImplicitTx(NoOpTransaction implicitTx) { + public void testExceptionRollbacksImplicitTx(boolean implicit, QueryTransactionWrapper txWrapper) { IgniteException err = new IgniteException(Common.INTERNAL_ERR); - AsyncSqlCursorImpl<Integer> cursor = new AsyncSqlCursorImpl<>(SqlQueryType.QUERY, RESULT_SET_METADATA, implicitTx, + AsyncSqlCursorImpl<Integer> cursor = new AsyncSqlCursorImpl<>(SqlQueryType.QUERY, RESULT_SET_METADATA, txWrapper, new AsyncWrapper<>(CompletableFuture.failedFuture(err), Runnable::run)); CompletionException t = assertThrows(CompletionException.class, () -> cursor.requestNextAsync(1).join()); - if (implicitTx != null) { - CompletableFuture<Void> f = implicitTx.rollbackFuture(); - assertTrue(f.isDone(), "Implicit transaction should have been rolled back: " + f); - } + CompletableFuture<Void> f = ((NoOpTransaction) txWrapper.unwrap()).rollbackFuture(); + assertEquals(implicit, f.isDone(), "Implicit transaction should have been rolled back: " + f); IgniteException igniteErr = assertInstanceOf(IgniteException.class, t.getCause()); assertEquals(err.codeAsString(), igniteErr.codeAsString()); } /** Cursor close should trigger commit of implicit transaction, if any. */ - @ParameterizedTest + @ParameterizedTest(name = "{0}") @MethodSource("transactions") - public void testCloseCommitsImplicitTx(NoOpTransaction implicitTx) { + public void testCloseCommitsImplicitTx(boolean implicit, QueryTransactionWrapper txWrapper) { AsyncCursor<Integer> data = new AsyncWrapper<>(List.of(1, 2, 3, 4).iterator()); - AsyncSqlCursorImpl<Integer> cursor = new AsyncSqlCursorImpl<>(SqlQueryType.QUERY, RESULT_SET_METADATA, implicitTx, data); + AsyncSqlCursorImpl<Integer> cursor = new AsyncSqlCursorImpl<>(SqlQueryType.QUERY, RESULT_SET_METADATA, txWrapper, data); cursor.closeAsync().join(); - if (implicitTx != null) { - CompletableFuture<Void> f = implicitTx.commitFuture(); - assertTrue(f.isDone(), "Implicit transaction should have been committed: " + f); - } + CompletableFuture<Void> f = ((NoOpTransaction) txWrapper.unwrap()).commitFuture(); + assertEquals(implicit, f.isDone(), "Implicit transaction should have been committed: " + f); } private static Stream<Arguments> transactions() { return Stream.of( - Arguments.of(Named.named("implicit-tx", NoOpTransaction.readOnly("TX"))), - Arguments.of(Named.named("no implicit-tx", null)) + Arguments.of(Named.named("implicit-tx", true), newTxWrapper(true)), + Arguments.of(Named.named("explicit-tx", false), newTxWrapper(false)) ); } + + private static QueryTransactionWrapper newTxWrapper(boolean implicit) { + return new QueryTransactionWrapper(NoOpTransaction.readOnly("TX"), implicit); + } } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/QueryTransactionWrapperSelfTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/QueryTransactionWrapperSelfTest.java new file mode 100644 index 0000000000..f3c9d0e659 --- /dev/null +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/QueryTransactionWrapperSelfTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine; + +import static org.apache.ignite.internal.sql.engine.SqlQueryProcessor.wrapTxOrStartImplicit; +import static org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import java.util.EnumSet; +import org.apache.ignite.internal.sql.engine.framework.NoOpTransaction; +import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; +import org.apache.ignite.lang.ErrorGroups; +import org.apache.ignite.tx.IgniteTransactions; +import org.apache.ignite.tx.TransactionOptions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +/** + * Tests for class {@link QueryTransactionWrapper}. + */ +@ExtendWith(MockitoExtension.class) +public class QueryTransactionWrapperSelfTest extends BaseIgniteAbstractTest { + @Mock + private IgniteTransactions transactions; + + @Test + public void throwsExceptionForDdlWithExternalTransaction() { + //noinspection ThrowableNotThrown + assertThrowsSqlException(ErrorGroups.Sql.STMT_VALIDATION_ERR, + () -> wrapTxOrStartImplicit(SqlQueryType.DDL, transactions, new NoOpTransaction("test"))); + verifyNoInteractions(transactions); + } + + @Test + public void testImplicitTransactionAttributes() { + when(transactions.begin(any())).thenAnswer( + inv -> { + boolean readOnly = inv.getArgument(0, TransactionOptions.class).readOnly(); + + return readOnly ? NoOpTransaction.readOnly("test-ro") : NoOpTransaction.readWrite("test-rw"); + } + ); + + assertThat(wrapTxOrStartImplicit(SqlQueryType.DML, transactions, null).unwrap().isReadOnly(), equalTo(false)); + + for (SqlQueryType type : EnumSet.complementOf(EnumSet.of(SqlQueryType.DML))) { + assertThat(wrapTxOrStartImplicit(type, transactions, null).unwrap().isReadOnly(), equalTo(true)); + } + + verify(transactions, times(SqlQueryType.values().length)).begin(any()); + verifyNoMoreInteractions(transactions); + } + + @Test + public void commitAndRollbackNotAffectExternalTransaction() { + NoOpTransaction externalTx = new NoOpTransaction("test"); + + QueryTransactionWrapper wrapper = wrapTxOrStartImplicit(SqlQueryType.QUERY, transactions, externalTx); + wrapper.commitImplicit(); + assertFalse(externalTx.commitFuture().isDone()); + + wrapper = wrapTxOrStartImplicit(SqlQueryType.QUERY, transactions, externalTx); + wrapper.rollbackImplicit(); + assertFalse(externalTx.commitFuture().isDone()); + + verifyNoInteractions(transactions); + } + + @Test + public void testCommitImplicit() { + QueryTransactionWrapper wrapper = prepareImplicitTx(); + NoOpTransaction tx = (NoOpTransaction) wrapper.unwrap(); + + wrapper.commitImplicit(); + + assertThat(tx.commitFuture().isDone(), equalTo(true)); + assertThat(tx.rollbackFuture().isDone(), equalTo(false)); + } + + @Test + public void testRollbackImplicit() { + QueryTransactionWrapper wrapper = prepareImplicitTx(); + NoOpTransaction tx = (NoOpTransaction) wrapper.unwrap(); + + wrapper.rollbackImplicit(); + + assertThat(tx.rollbackFuture().isDone(), equalTo(true)); + assertThat(tx.commitFuture().isDone(), equalTo(false)); + } + + private QueryTransactionWrapper prepareImplicitTx() { + when(transactions.begin(any())).thenReturn(new NoOpTransaction("test")); + + QueryTransactionWrapper wrapper = wrapTxOrStartImplicit(SqlQueryType.QUERY, transactions, null); + + assertThat(wrapper.unwrap(), instanceOf(NoOpTransaction.class)); + NoOpTransaction tx = (NoOpTransaction) wrapper.unwrap(); + + assertFalse(tx.rollbackFuture().isDone()); + assertFalse(tx.commitFuture().isDone()); + + return wrapper; + } +} + diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java index 1fbf38976d..98aab74fb0 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java @@ -25,7 +25,6 @@ import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; @@ -76,7 +75,6 @@ import org.apache.ignite.internal.table.event.TableEvent; import org.apache.ignite.internal.table.event.TableEventParameters; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; import org.apache.ignite.internal.testframework.IgniteTestUtils; -import org.apache.ignite.internal.tx.TxManager; import org.apache.ignite.internal.tx.impl.HeapLockManager; import org.apache.ignite.internal.utils.PrimaryReplica; import org.apache.ignite.lang.IgniteException; @@ -87,6 +85,7 @@ import org.apache.ignite.network.ClusterNodeImpl; import org.apache.ignite.network.ClusterService; import org.apache.ignite.network.MessagingService; import org.apache.ignite.network.TopologyService; +import org.apache.ignite.tx.IgniteTransactions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; @@ -122,7 +121,7 @@ public class StopCalciteModuleTest extends BaseIgniteAbstractTest { private MessagingService msgSrvc; @Mock - private TxManager txManager; + private IgniteTransactions transactions; @Mock private DistributionZoneManager distributionZoneManager; @@ -235,7 +234,6 @@ public class StopCalciteModuleTest extends BaseIgniteAbstractTest { indexManager, schemaManager, dataStorageManager, - txManager, distributionZoneManager, Map::of, mock(ReplicaService.class), @@ -256,7 +254,7 @@ public class StopCalciteModuleTest extends BaseIgniteAbstractTest { when(tbl.storage()).thenReturn(mock(MvTableStorage.class)); when(tbl.storage().getTableDescriptor()).thenReturn(new StorageTableDescriptor(tblId, 1, "none")); - when(txManager.begin(anyBoolean(), any())).thenReturn(new NoOpTransaction(localNode.name())); + when(transactions.begin(any())).thenReturn(new NoOpTransaction(localNode.name())); qryProc.start(); @@ -268,6 +266,7 @@ public class StopCalciteModuleTest extends BaseIgniteAbstractTest { var cursors = qryProc.querySingleAsync( sessionId, context, + transactions, "SELECT * FROM TEST" ); @@ -293,6 +292,7 @@ public class StopCalciteModuleTest extends BaseIgniteAbstractTest { assertTrue(assertThrows(IgniteInternalException.class, () -> qryProc.querySingleAsync( sessionId, context, + transactions, "SELECT 1" )).getCause() instanceof NodeStoppingException); diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java index d698e996d7..e675871303 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java @@ -105,13 +105,13 @@ import org.apache.ignite.lang.IgniteException; import org.apache.ignite.lang.NodeStoppingException; import org.apache.ignite.lang.TableAlreadyExistsException; import org.apache.ignite.lang.TableNotFoundException; -import org.apache.ignite.network.ClusterNode; import org.apache.ignite.network.ClusterNodeImpl; import org.apache.ignite.network.ClusterService; import org.apache.ignite.network.MessagingService; import org.apache.ignite.network.NetworkAddress; import org.apache.ignite.network.TopologyService; import org.apache.ignite.sql.SqlException; +import org.apache.ignite.tx.IgniteTransactions; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -158,6 +158,10 @@ public class MockedStructuresTest extends IgniteAbstractTest { @Mock(lenient = true) private TxManager tm; + /** Ignite transactions. */ + @Mock(lenient = true) + private IgniteTransactions transactions; + /** Meta storage manager. */ @Mock MetaStorageManager msm; @@ -204,13 +208,6 @@ public class MockedStructuresTest extends IgniteAbstractTest { SqlQueryProcessor queryProc; - /** Test node. */ - private final ClusterNode node = new ClusterNodeImpl( - UUID.randomUUID().toString(), - NODE_NAME, - new NetworkAddress("127.0.0.1", 2245) - ); - @InjectConfiguration private RocksDbStorageEngineConfiguration rocksDbEngineConfig; @@ -326,7 +323,6 @@ public class MockedStructuresTest extends IgniteAbstractTest { idxManager, schemaManager, dataStorageManager, - tm, distributionZoneManager, () -> dataStorageModules.collectSchemasFields( List.of( @@ -374,17 +370,12 @@ public class MockedStructuresTest extends IgniteAbstractTest { */ @Test public void testCreateTable() { - SqlQueryProcessor finalQueryProc = queryProc; - - SessionId sessionId = queryProc.createSession(PropertiesHelper.emptyHolder()); - QueryContext context = QueryContext.create(SqlQueryType.ALL); - String curMethodName = getCurrentMethodName(); String newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) " + "with primary_zone='%s'", curMethodName, ZONE_NAME); - readFirst(queryProc.querySingleAsync(sessionId, context, newTblSql)); + readFirst(sql(newTblSql)); assertTrue(tblManager.tables().stream().anyMatch(t -> t.name() .equalsIgnoreCase(curMethodName))); @@ -392,23 +383,23 @@ public class MockedStructuresTest extends IgniteAbstractTest { String finalNewTblSql1 = newTblSql; assertThrows(TableAlreadyExistsException.class, - () -> readFirst(finalQueryProc.querySingleAsync(sessionId, context, finalNewTblSql1))); + () -> readFirst(sql(finalNewTblSql1))); String finalNewTblSql2 = String.format("CREATE TABLE \"PUBLIC\".%s (c1 int PRIMARY KEY, c2 varbinary(255)) " + "with primary_zone='%s'", curMethodName, ZONE_NAME); assertThrows(TableAlreadyExistsException.class, - () -> readFirst(finalQueryProc.querySingleAsync(sessionId, context, finalNewTblSql2))); + () -> readFirst(sql(finalNewTblSql2))); - assertThrows(SqlException.class, () -> readFirst(finalQueryProc.querySingleAsync(sessionId, context, + assertThrows(SqlException.class, () -> readFirst(sql( "CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with partitions__wrong=1,primary_zone='" + ZONE_NAME + "'"))); - assertThrows(SqlException.class, () -> readFirst(finalQueryProc.querySingleAsync(sessionId, context, + assertThrows(SqlException.class, () -> readFirst(sql( "CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with replicas__wrong=1,primary_zone='" + ZONE_NAME + "'"))); - assertThrows(SqlException.class, () -> readFirst(finalQueryProc.querySingleAsync(sessionId, context, + assertThrows(SqlException.class, () -> readFirst(sql( "CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with primary_zone__wrong='" + ZONE_NAME + "'"))); @@ -417,7 +408,7 @@ public class MockedStructuresTest extends IgniteAbstractTest { String finalNewTblSql3 = newTblSql; - assertDoesNotThrow(() -> readFirst(finalQueryProc.querySingleAsync(sessionId, context, finalNewTblSql3))); + assertDoesNotThrow(() -> readFirst(sql(finalNewTblSql3))); } /** @@ -427,28 +418,25 @@ public class MockedStructuresTest extends IgniteAbstractTest { public void testCreateTableWithDistributionZone() { String tableName = getCurrentMethodName().toUpperCase(); - SessionId sessionId = queryProc.createSession(PropertiesHelper.emptyHolder()); - QueryContext context = QueryContext.create(SqlQueryType.ALL); - String newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) ", tableName); - readFirst(queryProc.querySingleAsync(sessionId, context, newTblSql)); + readFirst(sql(newTblSql)); assertEquals(getZoneId(DEFAULT_ZONE_NAME), tblsCfg.tables().get(tableName).zoneId().value()); - readFirst(queryProc.querySingleAsync(sessionId, context, "DROP TABLE " + tableName)); + readFirst(sql("DROP TABLE " + tableName)); int zoneId = dstZnsCfg.distributionZones().get(ZONE_NAME).zoneId().value(); newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) " + "with primary_zone='%s'", tableName, ZONE_NAME); - readFirst(queryProc.querySingleAsync(sessionId, context, newTblSql)); + readFirst(sql(newTblSql)); assertEquals(zoneId, tblsCfg.tables().get(tableName).zoneId().value()); - readFirst(queryProc.querySingleAsync(sessionId, context, "DROP TABLE " + tableName)); + readFirst(sql("DROP TABLE " + tableName)); log.info("Creating a table with a non-existent distribution zone."); @@ -458,8 +446,7 @@ public class MockedStructuresTest extends IgniteAbstractTest { Throwable exception = assertThrows( Throwable.class, - () -> readFirst(queryProc.querySingleAsync(sessionId, context, - String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) " + () -> readFirst(sql(String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) " + "with primary_zone='%s'", tableName, nonExistZone))) ); @@ -473,29 +460,21 @@ public class MockedStructuresTest extends IgniteAbstractTest { public void testDropTable() { String curMethodName = getCurrentMethodName(); - SessionId sessionId = queryProc.createSession(PropertiesHelper.emptyHolder()); - QueryContext context = QueryContext.create(SqlQueryType.ALL); - String newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varchar(255))", curMethodName); - readFirst(queryProc.querySingleAsync(sessionId, context, newTblSql)); - - readFirst(queryProc.querySingleAsync(sessionId, context, "DROP TABLE " + curMethodName)); + readFirst(sql(newTblSql)); - SqlQueryProcessor finalQueryProc = queryProc; + readFirst(sql("DROP TABLE " + curMethodName)); - assertThrows(TableNotFoundException.class, () -> readFirst(finalQueryProc.querySingleAsync(sessionId, context, - "DROP TABLE " + curMethodName + "_not_exist"))); + assertThrows(TableNotFoundException.class, () -> readFirst(sql("DROP TABLE " + curMethodName + "_not_exist"))); - assertThrows(TableNotFoundException.class, () -> readFirst(finalQueryProc.querySingleAsync(sessionId, context, - "DROP TABLE " + curMethodName))); + assertThrows(TableNotFoundException.class, () -> readFirst(sql("DROP TABLE " + curMethodName))); - assertThrows(TableNotFoundException.class, () -> readFirst(finalQueryProc.querySingleAsync(sessionId, context, - "DROP TABLE PUBLIC." + curMethodName))); + assertThrows(TableNotFoundException.class, () -> readFirst(sql("DROP TABLE PUBLIC." + curMethodName))); - readFirst(queryProc.querySingleAsync(sessionId, context, "DROP TABLE IF EXISTS PUBLIC." + curMethodName + "_not_exist")); + readFirst(sql("DROP TABLE IF EXISTS PUBLIC." + curMethodName + "_not_exist")); - readFirst(queryProc.querySingleAsync(sessionId, context, "DROP TABLE IF EXISTS PUBLIC." + curMethodName)); + readFirst(sql("DROP TABLE IF EXISTS PUBLIC." + curMethodName)); assertTrue(tblManager.tables().stream().noneMatch(t -> t.name() .equalsIgnoreCase("PUBLIC." + curMethodName))); @@ -505,12 +484,7 @@ public class MockedStructuresTest extends IgniteAbstractTest { void createTableWithTableOptions() { String method = getCurrentMethodName(); - SessionId sessionId = queryProc.createSession(PropertiesHelper.emptyHolder()); - QueryContext context = QueryContext.create(SqlQueryType.ALL); - - assertDoesNotThrow(() -> readFirst(queryProc.querySingleAsync( - sessionId, - context, + assertDoesNotThrow(() -> readFirst(sql( String.format( "CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with primary_zone='%s'", method + 4, @@ -520,9 +494,7 @@ public class MockedStructuresTest extends IgniteAbstractTest { IgniteException exception = assertThrows( IgniteException.class, - () -> readFirst(queryProc.querySingleAsync( - sessionId, - context, + () -> readFirst(sql( String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) WITH %s='%s'", method + 6, method, method) )) ); @@ -584,6 +556,7 @@ public class MockedStructuresTest extends IgniteAbstractTest { InternalTransaction tx = mock(InternalTransaction.class); when(tx.startTimestamp()).thenReturn(HybridTimestamp.MAX_VALUE); when(tm.begin(anyBoolean(), any())).thenReturn(tx); + when(transactions.begin(any())).thenReturn(tx); when(replicaManager.stopReplica(any())).thenReturn(completedFuture(true)); @@ -630,4 +603,11 @@ public class MockedStructuresTest extends IgniteAbstractTest { private int getZoneId(String zoneName) { return getZoneIdStrict(dstZnsCfg, zoneName); } + + private CompletableFuture<AsyncSqlCursor<List<Object>>> sql(String query) { + SessionId sessionId = queryProc.createSession(PropertiesHelper.emptyHolder()); + QueryContext context = QueryContext.create(SqlQueryType.ALL); + + return queryProc.querySingleAsync(sessionId, context, transactions, query); + } } diff --git a/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/QueryChecker.java b/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/QueryChecker.java index d8974861d8..a676e3cdc2 100644 --- a/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/QueryChecker.java +++ b/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/QueryChecker.java @@ -50,6 +50,7 @@ import org.apache.ignite.internal.util.ArrayUtils; import org.apache.ignite.internal.util.CollectionUtils; import org.apache.ignite.sql.ColumnMetadata; import org.apache.ignite.sql.ColumnType; +import org.apache.ignite.tx.IgniteTransactions; import org.apache.ignite.tx.Transaction; import org.hamcrest.CoreMatchers; import org.hamcrest.Matcher; @@ -474,7 +475,7 @@ public abstract class QueryChecker { if (!CollectionUtils.nullOrEmpty(planMatchers) || exactPlan != null) { CompletableFuture<AsyncSqlCursor<List<Object>>> explainCursors = qryProc.querySingleAsync(sessionId, - context, "EXPLAIN PLAN FOR " + qry, params); + context, transactions(), "EXPLAIN PLAN FOR " + qry, params); AsyncSqlCursor<List<Object>> explainCursor = await(explainCursors); List<List<Object>> explainRes = getAllFromCursor(explainCursor); @@ -491,7 +492,9 @@ public abstract class QueryChecker { } } // Check result. - CompletableFuture<AsyncSqlCursor<List<Object>>> cursors = qryProc.querySingleAsync(sessionId, context, qry, params); + CompletableFuture<AsyncSqlCursor<List<Object>>> cursors = + qryProc.querySingleAsync(sessionId, context, transactions(), qry, params); + AsyncSqlCursor<List<Object>> cur = await(cursors); checkMetadata(cur); @@ -553,6 +556,8 @@ public abstract class QueryChecker { protected abstract QueryProcessor getEngine(); + protected abstract IgniteTransactions transactions(); + protected void checkMetadata(AsyncSqlCursor<?> cursor) { } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java index 4005572fcc..056ec46767 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java @@ -76,7 +76,7 @@ abstract class AbstractTableView { return (IgniteException) th; } - //TODO: IGNITE-14500 Replace with public exception with an error code (or unwrap?). + //TODO: IGNITE-20181 KV/Binary view public API should only throw public exceptions for the end user. return new IgniteException(th); } }