This is an automated email from the ASF dual-hosted git repository. ppa pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 8d6264af0e IGNITE-20443 Sql. Implement script processing logic (#2789) 8d6264af0e is described below commit 8d6264af0edb9752e2239f9b686abe5580056862 Author: Pavel Pereslegin <xxt...@gmail.com> AuthorDate: Thu Nov 16 13:27:08 2023 +0300 IGNITE-20443 Sql. Implement script processing logic (#2789) --- .../handler/requests/jdbc/JdbcQueryCursor.java | 19 ++ .../org/apache/ignite/client/fakes/FakeCursor.java | 10 + .../client/fakes/FakeIgniteQueryProcessor.java | 11 + .../sql/engine/ItSqlMultiStatementTest.java | 268 +++++++++++++++++++ .../ignite/internal/sql/engine/AsyncSqlCursor.java | 16 ++ .../internal/sql/engine/AsyncSqlCursorImpl.java | 43 +++ .../ignite/internal/sql/engine/QueryProcessor.java | 22 ++ .../internal/sql/engine/SqlQueryProcessor.java | 294 ++++++++++++++++++--- .../sql/engine/exec/ExecutionServiceImpl.java | 6 + .../internal/sql/engine/sql/ScriptParseResult.java | 44 +-- .../sql/engine/sql/IgniteSqlParserTest.java | 15 ++ .../internal/sql/engine/util/QueryCheckerTest.java | 11 + 12 files changed, 711 insertions(+), 48 deletions(-) diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/JdbcQueryCursor.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/JdbcQueryCursor.java index c33f5fa3e3..232f2eb593 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/JdbcQueryCursor.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/JdbcQueryCursor.java @@ -18,6 +18,7 @@ package org.apache.ignite.client.handler.requests.jdbc; import java.util.List; +import java.util.NoSuchElementException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.internal.sql.engine.AsyncSqlCursor; @@ -87,4 +88,22 @@ public class JdbcQueryCursor<T> implements AsyncSqlCursor<T> { public ResultSetMetadata metadata() { return cur.metadata(); } + + /** {@inheritDoc} */ + @Override + public boolean hasNextResult() { + // TODO https://issues.apache.org/jira/browse/IGNITE-20661 + return false; + } + + /** {@inheritDoc} */ + @Override + public CompletableFuture<AsyncSqlCursor<T>> nextResult() { + if (!hasNextResult()) { + throw new NoSuchElementException("Query has no more results"); + } + + // TODO https://issues.apache.org/jira/browse/IGNITE-20661 + return null; + } } diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCursor.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCursor.java index d941fb3a9f..340c1459d3 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCursor.java +++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCursor.java @@ -69,4 +69,14 @@ public class FakeCursor implements AsyncSqlCursor<List<Object>> { public ResultSetMetadata metadata() { return null; } + + @Override + public boolean hasNextResult() { + return false; + } + + @Override + public CompletableFuture<AsyncSqlCursor<List<Object>>> nextResult() { + throw new UnsupportedOperationException(); + } } diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java index 0a996f57d2..9302f7f4ee 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java +++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java @@ -41,6 +41,17 @@ public class FakeIgniteQueryProcessor implements QueryProcessor { return CompletableFuture.completedFuture(new FakeCursor()); } + @Override + public CompletableFuture<AsyncSqlCursor<List<Object>>> queryScriptAsync( + SqlProperties properties, + IgniteTransactions transactions, + @Nullable InternalTransaction transaction, + String qry, + Object... params + ) { + throw new UnsupportedOperationException(); + } + @Override public void start() { diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlMultiStatementTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlMultiStatementTest.java new file mode 100644 index 0000000000..08db857f25 --- /dev/null +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlMultiStatementTest.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine; + +import static org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.await; +import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR; +import static org.apache.ignite.lang.ErrorGroups.Sql.RUNTIME_ERR; +import static org.apache.ignite.lang.ErrorGroups.Sql.STMT_VALIDATION_ERR; +import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.sql.BaseSqlIntegrationTest; +import org.apache.ignite.internal.sql.engine.property.SqlPropertiesHelper; +import org.apache.ignite.internal.tx.InternalTransaction; +import org.apache.ignite.internal.util.AsyncCursor.BatchedResult; +import org.apache.ignite.tx.Transaction; +import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +/** + * Tests to verify the execution of queries with multiple statements. + */ +@SuppressWarnings("ThrowableNotThrown") +public class ItSqlMultiStatementTest extends BaseSqlIntegrationTest { + @AfterEach + void dropTables() { + dropAllTables(); + } + + @AfterEach + void checkNoPendingTransactions() { + assertEquals(0, txManager().pending()); + } + + @Test + void basicMultiStatementQuery() { + String sql = "CREATE TABLE test (id INT PRIMARY KEY, val INT);" + + "INSERT INTO test VALUES (0, 0);" + + "EXPLAIN PLAN FOR SELECT * FROM test;" + + "SELECT * FROM test"; + + List<AsyncSqlCursor<List<Object>>> cursors = fetchAllCursors(runScript(sql)); + assertNotNull(cursors); + + Iterator<AsyncSqlCursor<List<Object>>> curItr = cursors.iterator(); + + validateSingleResult(curItr.next(), true); + validateSingleResult(curItr.next(), 1L); + assertNotNull(curItr.next()); // skip EXPLAIN. + validateSingleResult(curItr.next(), 0, 0); + + assertFalse(curItr.hasNext()); + + // Ensures that the script is executed completely, even if the cursor data has not been read. + fetchAllCursors(runScript("INSERT INTO test VALUES (1, 1);" + + "INSERT INTO test VALUES (2, 2);" + + "SELECT * FROM test;" + + "INSERT INTO test VALUES (3, 3);")); + + assertQuery("select * from test") + .returns(0, 0) + .returns(1, 1) + .returns(2, 2) + .returns(3, 3) + .check(); + } + + /** Checks single statement execution using multi-statement API. */ + @Test + void singleStatementQuery() { + AsyncSqlCursor<List<Object>> cursor = runScript("CREATE TABLE test (id INT PRIMARY KEY, val INT)"); + assertNotNull(cursor); + validateSingleResult(cursor, true); + assertFalse(cursor.hasNextResult()); + assertThrows(NoSuchElementException.class, cursor::nextResult, "Query has no more results"); + + AsyncSqlCursor<List<Object>> cursor2 = runScript("INSERT INTO test VALUES (0, 0)"); + assertNotNull(cursor2); + validateSingleResult(cursor2, 1L); + assertFalse(cursor2.hasNextResult()); + + assertQuery("SELECT * FROM test").returns(0, 0).check(); + } + + @Test + void queryWithDynamicParameters() { + String sql = "CREATE TABLE test (id INT PRIMARY KEY, val VARCHAR DEFAULT '3');" + + "INSERT INTO test VALUES(?, ?);" + + "INSERT INTO test VALUES(?, DEFAULT);" + + "INSERT INTO test VALUES(?, ?);"; + + fetchAllCursors(runScript(sql, null, 0, "1", 2, 4, "5")); + + assertQuery("SELECT * FROM test") + .returns(0, "1") + .returns(2, "3") + .returns(4, "5") + .check(); + } + + @Test + void queryWithIncorrectNumberOfDynamicParametersFailsWithValidationError() { + String sql = "CREATE TABLE test (id INT PRIMARY KEY, val INT);" + + "INSERT INTO test VALUES(?, ?);" + + "INSERT INTO test VALUES(?, ?);"; + + String expectedMessage = "Unexpected number of query parameters"; + + assertThrowsSqlException(STMT_VALIDATION_ERR, expectedMessage, () -> runScript(sql, null, 0)); + assertThrowsSqlException(STMT_VALIDATION_ERR, expectedMessage, () -> runScript(sql, null, 0, 1, 2, 3, 4, 5)); + } + + @Test + void transactionControlStatementDoesNotCreateCursor() { + assertThat(runScript("START TRANSACTION; COMMIT"), nullValue()); + + AsyncSqlCursor<List<Object>> cursor = runScript( + "START TRANSACTION;" + + "SELECT 1;" + + "COMMIT" + ); + + assertNotNull(cursor); + validateSingleResult(cursor, 1); + + assertFalse(cursor.hasNextResult()); + } + + @Test + void scriptStopsExecutionOnError() { + // Runtime error. + AsyncSqlCursor<List<Object>> cursor = runScript( + "CREATE TABLE test (id INT PRIMARY KEY);" + + "SELECT 2/0;" // Runtime error. + + "INSERT INTO test VALUES (0)" + ); + assertNotNull(cursor); + assertTrue(cursor.hasNextResult()); + + CompletableFuture<AsyncSqlCursor<List<Object>>> curFut0 = cursor.nextResult(); + assertThrowsSqlException(RUNTIME_ERR, "/ by zero", () -> await(curFut0)); + + // Validation error. + assertThrowsSqlException(STMT_VALIDATION_ERR, "operator must have compatible types", + () -> runScript("INSERT INTO test VALUES (?); INSERT INTO test VALUES (1)", null, "Incompatible param")); + + assertQuery("SELECT count(*) FROM test") + .returns(0L) + .check(); + + // Internal error. + cursor = runScript( + "INSERT INTO test VALUES(0);" + + "INSERT INTO test VALUES(1);" + + "SELECT (SELECT id FROM test);" // Internal error. + + "INSERT INTO test VALUES(2);" + ); + assertNotNull(cursor); + assertTrue(cursor.hasNextResult()); + + cursor = await(cursor.nextResult()); + assertNotNull(cursor); + assertTrue(cursor.hasNextResult()); + + CompletableFuture<AsyncSqlCursor<List<Object>>> cursFut = cursor.nextResult(); + assertThrowsSqlException(INTERNAL_ERR, "Subquery returned more than 1 value", () -> await(cursFut)); + + assertQuery("SELECT * FROM test") + .returns(0) + .returns(1) + .check(); + + // Internal error due to transaction exception. + Transaction tx = igniteTx().begin(); + sql(tx, "INSERT INTO test VALUES(2);"); + tx.commit(); + + assertThrowsSqlException( + TX_FAILED_READ_WRITE_OPERATION_ERR, + "Transaction is already finished", + () -> runScript( + "INSERT INTO test VALUES(3); INSERT INTO test VALUES(4);", + (InternalTransaction) tx + ) + ); + + assertQuery("SELECT * FROM test") + .returns(0) + .returns(1) + .returns(2) + .check(); + + // DDL inside external transaction. + assertThrowsSqlException(STMT_VALIDATION_ERR, "DDL doesn't support transactions.", + () -> runScript("CREATE TABLE test2 (id INT PRIMARY KEY)", (InternalTransaction) tx)); + } + + private @Nullable AsyncSqlCursor<List<Object>> runScript(String query) { + return runScript(query, null); + } + + private @Nullable AsyncSqlCursor<List<Object>> runScript( + String query, + @Nullable InternalTransaction tx, + Object ... params + ) { + QueryProcessor qryProc = queryProcessor(); + + return await(qryProc.queryScriptAsync(SqlPropertiesHelper.emptyProperties(), igniteTx(), tx, query, params)); + } + + private static @Nullable List<AsyncSqlCursor<List<Object>>> fetchAllCursors(@Nullable AsyncSqlCursor<List<Object>> cursor) { + if (cursor == null) { + return null; + } + + List<AsyncSqlCursor<List<Object>>> cursors = new ArrayList<>(); + + cursors.add(cursor); + + while (cursor.hasNextResult()) { + cursor = await(cursor.nextResult()); + + assertNotNull(cursor); + + cursors.add(cursor); + } + + return cursors; + } + + private static void validateSingleResult(AsyncSqlCursor<List<Object>> cursor, Object... expected) { + BatchedResult<List<Object>> res = await(cursor.requestNextAsync(1)); + assertNotNull(res); + assertEquals(List.of(List.of(expected)), res.items()); + assertFalse(res.hasMore()); + + cursor.closeAsync(); + } +} diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursor.java index af8608a1b0..b314040d6f 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursor.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursor.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.sql.engine; +import java.util.NoSuchElementException; +import java.util.concurrent.CompletableFuture; import org.apache.ignite.internal.util.AsyncCursor; import org.apache.ignite.sql.ResultSetMetadata; @@ -35,4 +37,18 @@ public interface AsyncSqlCursor<T> extends AsyncCursor<T> { * Returns column metadata. */ ResultSetMetadata metadata(); + + /** + * Returns {@code true} if the current cursor is the result of a multi-statement query + * and this statement is not the last one, {@code false} otherwise. + */ + boolean hasNextResult(); + + /** + * Returns the future for the next statement of the query. + * + * @return Future that completes when the next statement completes. + * @throws NoSuchElementException if the query has no more statements to execute. + */ + CompletableFuture<AsyncSqlCursor<T>> nextResult(); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java index 71a63be42a..421f780cc0 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java @@ -17,12 +17,14 @@ package org.apache.ignite.internal.sql.engine; +import java.util.NoSuchElementException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import org.apache.ignite.internal.lang.SqlExceptionMapperUtil; import org.apache.ignite.internal.util.AsyncCursor; import org.apache.ignite.internal.util.ExceptionUtils; import org.apache.ignite.sql.ResultSetMetadata; +import org.jetbrains.annotations.Nullable; /** * Sql query cursor. @@ -35,12 +37,14 @@ public class AsyncSqlCursorImpl<T> implements AsyncSqlCursor<T> { private final QueryTransactionWrapper txWrapper; private final AsyncCursor<T> dataCursor; private final Runnable onClose; + private final CompletableFuture<AsyncSqlCursor<T>> nextStatement; /** * Constructor. * * @param queryType Type of the query. * @param meta The meta of the result set. + * @param txWrapper Transaction wrapper. * @param dataCursor The result set. * @param onClose Callback to invoke when cursor is closed. */ @@ -50,12 +54,35 @@ public class AsyncSqlCursorImpl<T> implements AsyncSqlCursor<T> { QueryTransactionWrapper txWrapper, AsyncCursor<T> dataCursor, Runnable onClose + ) { + this(queryType, meta, txWrapper, dataCursor, onClose, null); + } + + /** + * Constructor. + * + * @param queryType Type of the query. + * @param meta The meta of the result set. + * @param txWrapper Transaction wrapper. + * @param dataCursor The result set. + * @param onClose Callback to invoke when cursor is closed. + * @param nextStatement Next statement future, non-null in the case of a + * multi-statement query and if current statement is not the last. + */ + AsyncSqlCursorImpl( + SqlQueryType queryType, + ResultSetMetadata meta, + QueryTransactionWrapper txWrapper, + AsyncCursor<T> dataCursor, + Runnable onClose, + @Nullable CompletableFuture<AsyncSqlCursor<T>> nextStatement ) { this.queryType = queryType; this.meta = meta; this.txWrapper = txWrapper; this.dataCursor = dataCursor; this.onClose = onClose; + this.nextStatement = nextStatement; } /** {@inheritDoc} */ @@ -89,6 +116,22 @@ public class AsyncSqlCursorImpl<T> implements AsyncSqlCursor<T> { }); } + /** {@inheritDoc} */ + @Override + public boolean hasNextResult() { + return nextStatement != null; + } + + /** {@inheritDoc} */ + @Override + public CompletableFuture<AsyncSqlCursor<T>> nextResult() { + if (nextStatement == null) { + throw new NoSuchElementException("Query has no more results"); + } + + return nextStatement; + } + /** {@inheritDoc} */ @Override public CompletableFuture<Void> closeAsync() { diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java index f89717a892..e3e924b8b1 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java @@ -53,4 +53,26 @@ public interface QueryProcessor extends IgniteComponent { String qry, Object... params ); + + /** + * Execute the multi-statement query with given schema name and parameters. + * + * @param properties User query properties. See {@link QueryProperty} for available properties. + * @param transactions Transactions facade. + * @param transaction A transaction to use for query execution. If null, an implicit transaction + * will be started by provided transactions facade. + * @param qry Multi statement SQL query. + * @param params Query parameters. + * @return Sql cursor. + * + * @throws IgniteException in case of an error. + * @see QueryProperty + */ + CompletableFuture<AsyncSqlCursor<List<Object>>> queryScriptAsync( + SqlProperties properties, + IgniteTransactions transactions, + @Nullable InternalTransaction transaction, + String qry, + Object... params + ); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java index cd914bc079..7830570d2c 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java @@ -18,17 +18,22 @@ package org.apache.ignite.internal.sql.engine; import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; +import static org.apache.ignite.internal.lang.SqlExceptionMapperUtil.mapToPublicSqlException; import static org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFIG; import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty; import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR; +import static org.apache.ignite.lang.ErrorGroups.Sql.EXECUTION_CANCELLED_ERR; import static org.apache.ignite.lang.ErrorGroups.Sql.STMT_VALIDATION_ERR; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -351,6 +356,26 @@ public class SqlQueryProcessor implements QueryProcessor { } } + /** {@inheritDoc} */ + @Override + public CompletableFuture<AsyncSqlCursor<List<Object>>> queryScriptAsync( + SqlProperties properties, + IgniteTransactions transactions, + @Nullable InternalTransaction transaction, + String qry, + Object... params + ) { + if (!busyLock.enterBusy()) { + throw new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException()); + } + + try { + return queryScript0(properties, transactions, transaction, qry, params); + } finally { + busyLock.leaveBusy(); + } + } + private <T extends LifecycleAware> T registerService(T service) { services.add(service); @@ -374,25 +399,11 @@ public class SqlQueryProcessor implements QueryProcessor { CompletableFuture<AsyncSqlCursor<List<Object>>> stage = start.thenCompose(ignored -> { ParsedResult result = parserService.parse(sql); - validateParsedStatement(properties0, result, params, explicitTransaction); + validateParsedStatement(properties0, result, params); QueryTransactionWrapper txWrapper = wrapTxOrStartImplicit(result.queryType(), transactions, explicitTransaction); - return waitForActualSchema(schemaName, txWrapper.unwrap().startTimestamp()) - .thenCompose(schema -> { - BaseQueryContext ctx = BaseQueryContext.builder() - .frameworkConfig(Frameworks.newConfigBuilder(FRAMEWORK_CONFIG).defaultSchema(schema).build()) - .queryId(UUID.randomUUID()) - .cancel(queryCancel) - .parameters(params) - .build(); - - return prepareSvc.prepareAsync(result, ctx).thenApply(plan -> executePlan(txWrapper, ctx, plan)); - }).whenComplete((res, ex) -> { - if (ex != null) { - txWrapper.rollback(); - } - }); + return executeParsedStatement(schemaName, result, txWrapper, queryCancel, params, false, null); }); // TODO IGNITE-20078 Improve (or remove) CancellationException handling. @@ -407,6 +418,69 @@ public class SqlQueryProcessor implements QueryProcessor { return stage; } + private CompletableFuture<AsyncSqlCursor<List<Object>>> queryScript0( + SqlProperties properties, + IgniteTransactions transactions, + @Nullable InternalTransaction explicitTransaction, + String sql, + Object... params + ) { + SqlProperties properties0 = SqlPropertiesHelper.chain(properties, DEFAULT_PROPERTIES); + String schemaName = properties0.get(QueryProperty.DEFAULT_SCHEMA); + + CompletableFuture<?> start = new CompletableFuture<>(); + + CompletableFuture<AsyncSqlCursor<List<Object>>> parseFut = start + .thenApply(ignored -> parserService.parseScript(sql)) + .thenCompose(parsedResults -> { + MultiStatementHandler handler = new MultiStatementHandler( + schemaName, transactions, explicitTransaction, parsedResults, params); + + return handler.processNext(); + }); + + start.completeAsync(() -> null, taskExecutor); + + return parseFut; + } + + private CompletableFuture<AsyncSqlCursor<List<Object>>> executeParsedStatement( + String schemaName, + ParsedResult parsedResult, + QueryTransactionWrapper txWrapper, + QueryCancel queryCancel, + Object[] params, + boolean waitForPrefetch, + @Nullable CompletableFuture<AsyncSqlCursor<List<Object>>> nextStatement + ) { + return waitForActualSchema(schemaName, txWrapper.unwrap().startTimestamp()) + .thenCompose(schema -> { + PrefetchCallback callback = waitForPrefetch ? new PrefetchCallback() : null; + + BaseQueryContext ctx = BaseQueryContext.builder() + .frameworkConfig(Frameworks.newConfigBuilder(FRAMEWORK_CONFIG).defaultSchema(schema).build()) + .queryId(UUID.randomUUID()) + .cancel(queryCancel) + .prefetchCallback(callback) + .parameters(params) + .build(); + + CompletableFuture<AsyncSqlCursor<List<Object>>> fut = prepareSvc.prepareAsync(parsedResult, ctx) + .thenApply(plan -> executePlan(txWrapper, ctx, plan, nextStatement)); + + if (waitForPrefetch) { + fut = fut.thenCompose(cursor -> callback.prefetchFuture().thenApply(ignore -> cursor)); + } + + return fut; + }) + .whenComplete((res, ex) -> { + if (ex != null) { + txWrapper.rollback(); + } + }); + } + private CompletableFuture<SchemaPlus> waitForActualSchema(String schemaName, HybridTimestamp timestamp) { try { return schemaSyncService.waitForMetadataCompleteness(timestamp).thenApply(unused -> { @@ -426,7 +500,8 @@ public class SqlQueryProcessor implements QueryProcessor { private AsyncSqlCursor<List<Object>> executePlan( QueryTransactionWrapper txWrapper, BaseQueryContext ctx, - QueryPlan plan + QueryPlan plan, + @Nullable CompletableFuture<AsyncSqlCursor<List<Object>>> nextStatement ) { if (!busyLock.enterBusy()) { throw new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException()); @@ -443,7 +518,8 @@ public class SqlQueryProcessor implements QueryProcessor { plan.metadata(), txWrapper, dataCursor, - () -> openedCursors.remove(queryId) + () -> openedCursors.remove(queryId), + nextStatement ); Object old = openedCursors.put(queryId, cursor); @@ -477,6 +553,14 @@ public class SqlQueryProcessor implements QueryProcessor { return new QueryTransactionWrapper(tx, true); } + if (SqlQueryType.DDL == queryType) { + throw new SqlException(STMT_VALIDATION_ERR, "DDL doesn't support transactions."); + } + + if (SqlQueryType.DML == queryType && outerTx.isReadOnly()) { + throw new SqlException(STMT_VALIDATION_ERR, "DML query cannot be started by using read only transactions."); + } + return new QueryTransactionWrapper(outerTx, false); } @@ -489,22 +573,11 @@ public class SqlQueryProcessor implements QueryProcessor { private static void validateParsedStatement( SqlProperties properties, ParsedResult parsedResult, - Object[] params, - @Nullable InternalTransaction outerTx + Object[] params ) { Set<SqlQueryType> allowedTypes = properties.get(QueryProperty.ALLOWED_QUERY_TYPES); SqlQueryType queryType = parsedResult.queryType(); - if (outerTx != null) { - if (SqlQueryType.DDL == queryType) { - throw new SqlException(STMT_VALIDATION_ERR, "DDL doesn't support transactions."); - } - - if (SqlQueryType.DML == queryType && outerTx.isReadOnly()) { - throw new SqlException(STMT_VALIDATION_ERR, "DML query cannot be started by using read only transactions."); - } - } - if (parsedResult.queryType() == SqlQueryType.TX_CONTROL) { String message = "Transaction control statement can not be executed as an independent statement"; @@ -517,10 +590,14 @@ public class SqlQueryProcessor implements QueryProcessor { throw new SqlException(STMT_VALIDATION_ERR, message); } - if (parsedResult.dynamicParamsCount() != params.length) { + validateDynamicParameters(parsedResult.dynamicParamsCount(), params); + } + + private static void validateDynamicParameters(int expectedParamsCount, Object[] params) throws SqlException { + if (expectedParamsCount != params.length) { String message = format( "Unexpected number of query parameters. Provided {} but there is only {} dynamic parameter(s).", - params.length, parsedResult.dynamicParamsCount() + params.length, expectedParamsCount ); throw new SqlException(STMT_VALIDATION_ERR, message); @@ -535,4 +612,157 @@ public class SqlQueryProcessor implements QueryProcessor { } } } + + private class MultiStatementHandler { + private final String schemaName; + private final IgniteTransactions transactions; + private final @Nullable InternalTransaction explicitTransaction; + private final Queue<ScriptStatementParameters> statements; + + MultiStatementHandler( + String schemaName, + IgniteTransactions transactions, + @Nullable InternalTransaction explicitTransaction, + List<ParsedResult> parsedResults, + Object[] params + ) { + this.schemaName = schemaName; + this.transactions = transactions; + this.explicitTransaction = explicitTransaction; + this.statements = prepareStatementsQueue(parsedResults, params); + } + + CompletableFuture<AsyncSqlCursor<List<Object>>> processNext() { + if (statements == null) { + // TODO https://issues.apache.org/jira/browse/IGNITE-20463 Each tx control statement must return an empty cursor. + return CompletableFuture.completedFuture(null); + } + + ScriptStatementParameters parameters = statements.poll(); + + assert parameters != null; + + ParsedResult parsedResult = parameters.parsedResult; + Object[] dynamicParams = parameters.dynamicParams; + CompletableFuture<AsyncSqlCursor<List<Object>>> cursorFuture = parameters.cursorFuture; + CompletableFuture<AsyncSqlCursor<List<Object>>> nextCursorFuture = parameters.nextStatementFuture; + + try { + if (cursorFuture.isDone()) { + return cursorFuture; + } + + QueryTransactionWrapper txWrapper = wrapTxOrStartImplicit(parsedResult.queryType(), transactions, explicitTransaction); + + QueryCancel cancel = new QueryCancel(); + + executeParsedStatement(schemaName, parsedResult, txWrapper, cancel, dynamicParams, true, nextCursorFuture) + .whenComplete((res, ex) -> { + if (ex != null) { + cursorFuture.completeExceptionally(ex); + cancelAll(ex); + + return; + } + + txWrapper.commitImplicit(); + + if (nextCursorFuture != null) { + taskExecutor.execute(this::processNext); + } + + cursorFuture.complete(res); + }); + } catch (Exception e) { + cursorFuture.completeExceptionally(e); + + cancelAll(e); + } + + return cursorFuture; + } + + /** + * Returns a queue. each element of which represents parameters required to execute a single statement of the script. + */ + private @Nullable Queue<ScriptStatementParameters> prepareStatementsQueue(List<ParsedResult> parsedResults, Object[] params) { + List<ParsedResult> parsedResults0 = parsedResults.stream() + // TODO https://issues.apache.org/jira/browse/IGNITE-20463 Integrate TX-related statements + .filter(res -> res.queryType() != SqlQueryType.TX_CONTROL).collect(Collectors.toList()); + + if (parsedResults0.isEmpty()) { + return null; + } + + int paramsCount = parsedResults0.stream().mapToInt(ParsedResult::dynamicParamsCount).sum(); + validateDynamicParameters(paramsCount, params); + + ScriptStatementParameters[] results = new ScriptStatementParameters[parsedResults0.size()]; + + // We fill parameters in reverse order, because each script statement + // requires a reference to the future of the next statement. + for (int i = parsedResults0.size(); i > 0; i--) { + ParsedResult result = parsedResults0.get(i - 1); + + Object[] params0 = Arrays.copyOfRange(params, paramsCount - result.dynamicParamsCount(), paramsCount); + paramsCount -= result.dynamicParamsCount(); + + results[i - 1] = new ScriptStatementParameters(result, params0, + i < parsedResults0.size() ? results[i].cursorFuture : null); + } + + return new ArrayBlockingQueue<>(results.length, false, List.of(results)); + } + + private void cancelAll(Throwable cause) { + for (ScriptStatementParameters parameters : statements) { + CompletableFuture<AsyncSqlCursor<List<Object>>> fut = parameters.cursorFuture; + + if (fut.isDone()) { + continue; + } + + fut.completeExceptionally(new SqlException( + EXECUTION_CANCELLED_ERR, + "The script execution was canceled due to an error in the previous statement.", + cause + )); + } + } + + private class ScriptStatementParameters { + private final CompletableFuture<AsyncSqlCursor<List<Object>>> cursorFuture = new CompletableFuture<>(); + private final CompletableFuture<AsyncSqlCursor<List<Object>>> nextStatementFuture; + private final ParsedResult parsedResult; + private final Object[] dynamicParams; + + private ScriptStatementParameters( + ParsedResult parsedResult, + Object[] dynamicParams, + @Nullable CompletableFuture<AsyncSqlCursor<List<Object>>> nextStatementFuture + ) { + this.parsedResult = parsedResult; + this.dynamicParams = dynamicParams; + this.nextStatementFuture = nextStatementFuture; + } + } + } + + /** Completes the provided future when the callback is called. */ + private static class PrefetchCallback implements QueryPrefetchCallback { + private final CompletableFuture<Void> prefetchFuture = new CompletableFuture<>(); + + @Override + public void onPrefetchComplete(@Nullable Throwable ex) { + if (ex == null) { + prefetchFuture.complete(null); + } else { + prefetchFuture.completeExceptionally(mapToPublicSqlException(ex)); + } + } + + CompletableFuture<Void> prefetchFuture() { + return prefetchFuture; + } + } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java index 8f8103c6d4..a7ea216d5d 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java @@ -490,6 +490,12 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve root.exceptionally(t -> { this.close(true); + QueryPrefetchCallback callback = ctx.prefetchCallback(); + + if (callback != null) { + taskExecutor.execute(() -> callback.onPrefetchComplete(t)); + } + return null; }); diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/sql/ScriptParseResult.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/sql/ScriptParseResult.java index 2d59946b14..3b2b546fcc 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/sql/ScriptParseResult.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/sql/ScriptParseResult.java @@ -22,9 +22,8 @@ import java.util.stream.Collectors; import javax.annotation.concurrent.NotThreadSafe; import org.apache.calcite.sql.SqlDynamicParam; import org.apache.calcite.sql.SqlNode; -import org.apache.calcite.sql.util.SqlBasicVisitor; +import org.apache.calcite.sql.util.SqlShuttle; import org.apache.ignite.internal.tostring.S; -import org.jetbrains.annotations.Nullable; /** * Result of parsing SQL string that multiple statements. @@ -41,9 +40,22 @@ public final class ScriptParseResult extends ParseResult { return new ScriptParseResult(List.of(new StatementParseResult(list.get(0), dynamicParamsCount)), dynamicParamsCount); } - SqlDynamicParamsCounter paramsCounter = dynamicParamsCount == 0 ? null : new SqlDynamicParamsCounter(); + SqlDynamicParamsAdjuster dynamicParamsAdjuster = new SqlDynamicParamsAdjuster(); + List<StatementParseResult> results = list.stream() - .map(node -> new StatementParseResult(node, paramsCounter == null ? 0 : paramsCounter.forNode(node))) + .map(node -> { + if (dynamicParamsCount == 0) { + return new StatementParseResult(node, 0); + } + + dynamicParamsAdjuster.reset(); + + SqlNode newTree = dynamicParamsAdjuster.visitNode(node); + + assert newTree != null; + + return new StatementParseResult(newTree, dynamicParamsAdjuster.paramsCount()); + }) .collect(Collectors.toList()); return new ScriptParseResult(results, dynamicParamsCount); @@ -69,25 +81,25 @@ public final class ScriptParseResult extends ParseResult { } /** - * Counts the number of {@link SqlDynamicParam} nodes in the tree. + * Adjusts the dynamic parameter indexes to match the single statement parameter indexes. */ @NotThreadSafe - static class SqlDynamicParamsCounter extends SqlBasicVisitor<Object> { - int count; + private static final class SqlDynamicParamsAdjuster extends SqlShuttle { + private int counter; @Override - public @Nullable Object visit(SqlDynamicParam param) { - count++; - - return null; + public SqlNode visit(SqlDynamicParam param) { + return new SqlDynamicParam(counter++, param.getParserPosition()); } - int forNode(SqlNode node) { - count = 0; - - this.visitNode(node); + /** Resets the dynamic parameters counter. */ + void reset() { + counter = 0; + } - return count; + /** Returns the number of processed dynamic parameters. */ + int paramsCount() { + return counter; } } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/sql/IgniteSqlParserTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/sql/IgniteSqlParserTest.java index 8bc52a3c6a..cb5c48ad59 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/sql/IgniteSqlParserTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/sql/IgniteSqlParserTest.java @@ -93,6 +93,21 @@ public class IgniteSqlParserTest { () -> IgniteSqlParser.parse("", StatementParseResult.MODE)); } + @Test + public void testEmptyStatements() { + assertThrowsSqlException(Sql.STMT_PARSE_ERR, + "Failed to parse query: Encountered \";\" at line 1, column 1", + () -> IgniteSqlParser.parse(";", ScriptParseResult.MODE)); + + assertThrowsSqlException(Sql.STMT_PARSE_ERR, + "Failed to parse query: Encountered \";\" at line 2, column 1", + () -> IgniteSqlParser.parse("--- comment\n;", ScriptParseResult.MODE)); + + assertThrowsSqlException(Sql.STMT_PARSE_ERR, + "Failed to parse query: Encountered \"<EOF>\" at line 1, column 11", + () -> IgniteSqlParser.parse("--- comment", ScriptParseResult.MODE)); + } + @Test public void testCommentedQuery() { assertThrowsSqlException( diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerTest.java index 938e577abc..9fe1a6920c 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerTest.java @@ -293,6 +293,17 @@ public class QueryCheckerTest extends BaseIgniteAbstractTest { return CompletableFuture.completedFuture(sqlCursor); } + @Override + public CompletableFuture<AsyncSqlCursor<List<Object>>> queryScriptAsync( + SqlProperties properties, + IgniteTransactions transactions, + @Nullable InternalTransaction transaction, + String qry, + Object... params + ) { + throw new UnsupportedOperationException(); + } + @Override public void start() { // NO-OP