This is an automated email from the ASF dual-hosted git repository.

ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 8d6264af0e IGNITE-20443 Sql. Implement script processing logic (#2789)
8d6264af0e is described below

commit 8d6264af0edb9752e2239f9b686abe5580056862
Author: Pavel Pereslegin <xxt...@gmail.com>
AuthorDate: Thu Nov 16 13:27:08 2023 +0300

    IGNITE-20443 Sql. Implement script processing logic (#2789)
---
 .../handler/requests/jdbc/JdbcQueryCursor.java     |  19 ++
 .../org/apache/ignite/client/fakes/FakeCursor.java |  10 +
 .../client/fakes/FakeIgniteQueryProcessor.java     |  11 +
 .../sql/engine/ItSqlMultiStatementTest.java        | 268 +++++++++++++++++++
 .../ignite/internal/sql/engine/AsyncSqlCursor.java |  16 ++
 .../internal/sql/engine/AsyncSqlCursorImpl.java    |  43 +++
 .../ignite/internal/sql/engine/QueryProcessor.java |  22 ++
 .../internal/sql/engine/SqlQueryProcessor.java     | 294 ++++++++++++++++++---
 .../sql/engine/exec/ExecutionServiceImpl.java      |   6 +
 .../internal/sql/engine/sql/ScriptParseResult.java |  44 +--
 .../sql/engine/sql/IgniteSqlParserTest.java        |  15 ++
 .../internal/sql/engine/util/QueryCheckerTest.java |  11 +
 12 files changed, 711 insertions(+), 48 deletions(-)

diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/JdbcQueryCursor.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/JdbcQueryCursor.java
index c33f5fa3e3..232f2eb593 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/JdbcQueryCursor.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/JdbcQueryCursor.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.client.handler.requests.jdbc;
 
 import java.util.List;
+import java.util.NoSuchElementException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
@@ -87,4 +88,22 @@ public class JdbcQueryCursor<T> implements AsyncSqlCursor<T> 
{
     public ResultSetMetadata metadata() {
         return cur.metadata();
     }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean hasNextResult() {
+        // TODO https://issues.apache.org/jira/browse/IGNITE-20661
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<AsyncSqlCursor<T>> nextResult() {
+        if (!hasNextResult()) {
+            throw new NoSuchElementException("Query has no more results");
+        }
+
+        // TODO https://issues.apache.org/jira/browse/IGNITE-20661
+        return null;
+    }
 }
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCursor.java 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCursor.java
index d941fb3a9f..340c1459d3 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCursor.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCursor.java
@@ -69,4 +69,14 @@ public class FakeCursor implements 
AsyncSqlCursor<List<Object>> {
     public ResultSetMetadata metadata() {
         return null;
     }
+
+    @Override
+    public boolean hasNextResult() {
+        return false;
+    }
+
+    @Override
+    public CompletableFuture<AsyncSqlCursor<List<Object>>> nextResult() {
+        throw new UnsupportedOperationException();
+    }
 }
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
index 0a996f57d2..9302f7f4ee 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
@@ -41,6 +41,17 @@ public class FakeIgniteQueryProcessor implements 
QueryProcessor {
         return CompletableFuture.completedFuture(new FakeCursor());
     }
 
+    @Override
+    public CompletableFuture<AsyncSqlCursor<List<Object>>> queryScriptAsync(
+            SqlProperties properties,
+            IgniteTransactions transactions,
+            @Nullable InternalTransaction transaction,
+            String qry,
+            Object... params
+    ) {
+        throw new UnsupportedOperationException();
+    }
+
     @Override
     public void start() {
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlMultiStatementTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlMultiStatementTest.java
new file mode 100644
index 0000000000..08db857f25
--- /dev/null
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlMultiStatementTest.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine;
+
+import static 
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
+import static org.apache.ignite.lang.ErrorGroups.Sql.RUNTIME_ERR;
+import static org.apache.ignite.lang.ErrorGroups.Sql.STMT_VALIDATION_ERR;
+import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
+import org.apache.ignite.internal.sql.engine.property.SqlPropertiesHelper;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.util.AsyncCursor.BatchedResult;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests to verify the execution of queries with multiple statements.
+ */
+@SuppressWarnings("ThrowableNotThrown")
+public class ItSqlMultiStatementTest extends BaseSqlIntegrationTest {
+    @AfterEach
+    void dropTables() {
+        dropAllTables();
+    }
+
+    @AfterEach
+    void checkNoPendingTransactions() {
+        assertEquals(0, txManager().pending());
+    }
+
+    @Test
+    void basicMultiStatementQuery() {
+        String sql = "CREATE TABLE test (id INT PRIMARY KEY, val INT);"
+                + "INSERT INTO test VALUES (0, 0);"
+                + "EXPLAIN PLAN FOR SELECT * FROM test;"
+                + "SELECT * FROM test";
+
+        List<AsyncSqlCursor<List<Object>>> cursors = 
fetchAllCursors(runScript(sql));
+        assertNotNull(cursors);
+
+        Iterator<AsyncSqlCursor<List<Object>>> curItr = cursors.iterator();
+
+        validateSingleResult(curItr.next(), true);
+        validateSingleResult(curItr.next(), 1L);
+        assertNotNull(curItr.next()); // skip EXPLAIN.
+        validateSingleResult(curItr.next(), 0, 0);
+
+        assertFalse(curItr.hasNext());
+
+        // Ensures that the script is executed completely, even if the cursor 
data has not been read.
+        fetchAllCursors(runScript("INSERT INTO test VALUES (1, 1);"
+                + "INSERT INTO test VALUES (2, 2);"
+                + "SELECT * FROM test;"
+                + "INSERT INTO test VALUES (3, 3);"));
+
+        assertQuery("select * from test")
+                .returns(0, 0)
+                .returns(1, 1)
+                .returns(2, 2)
+                .returns(3, 3)
+                .check();
+    }
+
+    /** Checks single statement execution using multi-statement API. */
+    @Test
+    void singleStatementQuery() {
+        AsyncSqlCursor<List<Object>> cursor = runScript("CREATE TABLE test (id 
INT PRIMARY KEY, val INT)");
+        assertNotNull(cursor);
+        validateSingleResult(cursor, true);
+        assertFalse(cursor.hasNextResult());
+        assertThrows(NoSuchElementException.class, cursor::nextResult, "Query 
has no more results");
+
+        AsyncSqlCursor<List<Object>> cursor2 = runScript("INSERT INTO test 
VALUES (0, 0)");
+        assertNotNull(cursor2);
+        validateSingleResult(cursor2, 1L);
+        assertFalse(cursor2.hasNextResult());
+
+        assertQuery("SELECT * FROM test").returns(0, 0).check();
+    }
+
+    @Test
+    void queryWithDynamicParameters() {
+        String sql = "CREATE TABLE test (id INT PRIMARY KEY, val VARCHAR 
DEFAULT '3');"
+                + "INSERT INTO test VALUES(?, ?);"
+                + "INSERT INTO test VALUES(?, DEFAULT);"
+                + "INSERT INTO test VALUES(?, ?);";
+
+        fetchAllCursors(runScript(sql, null, 0, "1", 2, 4, "5"));
+
+        assertQuery("SELECT * FROM test")
+                .returns(0, "1")
+                .returns(2, "3")
+                .returns(4, "5")
+                .check();
+    }
+
+    @Test
+    void queryWithIncorrectNumberOfDynamicParametersFailsWithValidationError() 
{
+        String sql = "CREATE TABLE test (id INT PRIMARY KEY, val INT);"
+                + "INSERT INTO test VALUES(?, ?);"
+                + "INSERT INTO test VALUES(?, ?);";
+
+        String expectedMessage = "Unexpected number of query parameters";
+
+        assertThrowsSqlException(STMT_VALIDATION_ERR, expectedMessage, () -> 
runScript(sql, null, 0));
+        assertThrowsSqlException(STMT_VALIDATION_ERR, expectedMessage, () -> 
runScript(sql, null, 0, 1, 2, 3, 4, 5));
+    }
+
+    @Test
+    void transactionControlStatementDoesNotCreateCursor() {
+        assertThat(runScript("START TRANSACTION; COMMIT"), nullValue());
+
+        AsyncSqlCursor<List<Object>> cursor = runScript(
+                "START TRANSACTION;"
+                        + "SELECT 1;"
+                        + "COMMIT"
+        );
+
+        assertNotNull(cursor);
+        validateSingleResult(cursor, 1);
+
+        assertFalse(cursor.hasNextResult());
+    }
+
+    @Test
+    void scriptStopsExecutionOnError() {
+        // Runtime error.
+        AsyncSqlCursor<List<Object>> cursor = runScript(
+                "CREATE TABLE test (id INT PRIMARY KEY);"
+                + "SELECT 2/0;" // Runtime error.
+                + "INSERT INTO test VALUES (0)"
+        );
+        assertNotNull(cursor);
+        assertTrue(cursor.hasNextResult());
+
+        CompletableFuture<AsyncSqlCursor<List<Object>>> curFut0 = 
cursor.nextResult();
+        assertThrowsSqlException(RUNTIME_ERR, "/ by zero", () -> 
await(curFut0));
+
+        // Validation error.
+        assertThrowsSqlException(STMT_VALIDATION_ERR, "operator must have 
compatible types",
+                () -> runScript("INSERT INTO test VALUES (?); INSERT INTO test 
VALUES (1)", null, "Incompatible param"));
+
+        assertQuery("SELECT count(*) FROM test")
+                .returns(0L)
+                .check();
+
+        // Internal error.
+        cursor = runScript(
+                "INSERT INTO test VALUES(0);"
+                + "INSERT INTO test VALUES(1);"
+                + "SELECT (SELECT id FROM test);" // Internal error.
+                + "INSERT INTO test VALUES(2);"
+        );
+        assertNotNull(cursor);
+        assertTrue(cursor.hasNextResult());
+
+        cursor = await(cursor.nextResult());
+        assertNotNull(cursor);
+        assertTrue(cursor.hasNextResult());
+
+        CompletableFuture<AsyncSqlCursor<List<Object>>> cursFut = 
cursor.nextResult();
+        assertThrowsSqlException(INTERNAL_ERR, "Subquery returned more than 1 
value", () -> await(cursFut));
+
+        assertQuery("SELECT * FROM test")
+                .returns(0)
+                .returns(1)
+                .check();
+
+        // Internal error due to transaction exception.
+        Transaction tx = igniteTx().begin();
+        sql(tx, "INSERT INTO test VALUES(2);");
+        tx.commit();
+
+        assertThrowsSqlException(
+                TX_FAILED_READ_WRITE_OPERATION_ERR,
+                "Transaction is already finished",
+                () -> runScript(
+                        "INSERT INTO test VALUES(3); INSERT INTO test 
VALUES(4);",
+                        (InternalTransaction) tx
+                )
+        );
+
+        assertQuery("SELECT * FROM test")
+                .returns(0)
+                .returns(1)
+                .returns(2)
+                .check();
+
+        // DDL inside external transaction.
+        assertThrowsSqlException(STMT_VALIDATION_ERR, "DDL doesn't support 
transactions.",
+                () -> runScript("CREATE TABLE test2 (id INT PRIMARY KEY)", 
(InternalTransaction) tx));
+    }
+
+    private @Nullable AsyncSqlCursor<List<Object>> runScript(String query) {
+        return runScript(query, null);
+    }
+
+    private @Nullable AsyncSqlCursor<List<Object>> runScript(
+            String query,
+            @Nullable InternalTransaction tx,
+            Object ... params
+    ) {
+        QueryProcessor qryProc = queryProcessor();
+
+        return 
await(qryProc.queryScriptAsync(SqlPropertiesHelper.emptyProperties(), 
igniteTx(), tx, query, params));
+    }
+
+    private static @Nullable List<AsyncSqlCursor<List<Object>>> 
fetchAllCursors(@Nullable AsyncSqlCursor<List<Object>> cursor) {
+        if (cursor == null) {
+            return null;
+        }
+
+        List<AsyncSqlCursor<List<Object>>> cursors = new ArrayList<>();
+
+        cursors.add(cursor);
+
+        while (cursor.hasNextResult()) {
+            cursor = await(cursor.nextResult());
+
+            assertNotNull(cursor);
+
+            cursors.add(cursor);
+        }
+
+        return cursors;
+    }
+
+    private static void validateSingleResult(AsyncSqlCursor<List<Object>> 
cursor, Object... expected) {
+        BatchedResult<List<Object>> res = await(cursor.requestNextAsync(1));
+        assertNotNull(res);
+        assertEquals(List.of(List.of(expected)), res.items());
+        assertFalse(res.hasMore());
+
+        cursor.closeAsync();
+    }
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursor.java
index af8608a1b0..b314040d6f 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursor.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.sql.engine;
 
+import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.util.AsyncCursor;
 import org.apache.ignite.sql.ResultSetMetadata;
 
@@ -35,4 +37,18 @@ public interface AsyncSqlCursor<T> extends AsyncCursor<T> {
      * Returns column metadata.
      */
     ResultSetMetadata metadata();
+
+    /**
+     * Returns {@code true} if the current cursor is the result of a 
multi-statement query
+     * and this statement is not the last one, {@code false} otherwise.
+     */
+    boolean hasNextResult();
+
+    /**
+     * Returns the future for the next statement of the query.
+     *
+     * @return Future that completes when the next statement completes.
+     * @throws NoSuchElementException if the query has no more statements to 
execute.
+     */
+    CompletableFuture<AsyncSqlCursor<T>> nextResult();
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
index 71a63be42a..421f780cc0 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
@@ -17,12 +17,14 @@
 
 package org.apache.ignite.internal.sql.engine;
 
+import java.util.NoSuchElementException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import org.apache.ignite.internal.lang.SqlExceptionMapperUtil;
 import org.apache.ignite.internal.util.AsyncCursor;
 import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.sql.ResultSetMetadata;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Sql query cursor.
@@ -35,12 +37,14 @@ public class AsyncSqlCursorImpl<T> implements 
AsyncSqlCursor<T> {
     private final QueryTransactionWrapper txWrapper;
     private final AsyncCursor<T> dataCursor;
     private final Runnable onClose;
+    private final CompletableFuture<AsyncSqlCursor<T>> nextStatement;
 
     /**
      * Constructor.
      *
      * @param queryType Type of the query.
      * @param meta The meta of the result set.
+     * @param txWrapper Transaction wrapper.
      * @param dataCursor The result set.
      * @param onClose Callback to invoke when cursor is closed.
      */
@@ -50,12 +54,35 @@ public class AsyncSqlCursorImpl<T> implements 
AsyncSqlCursor<T> {
             QueryTransactionWrapper txWrapper,
             AsyncCursor<T> dataCursor,
             Runnable onClose
+    ) {
+        this(queryType, meta, txWrapper, dataCursor, onClose, null);
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param queryType Type of the query.
+     * @param meta The meta of the result set.
+     * @param txWrapper Transaction wrapper.
+     * @param dataCursor The result set.
+     * @param onClose Callback to invoke when cursor is closed.
+     * @param nextStatement Next statement future, non-null in the case of a
+     *         multi-statement query and if current statement is not the last.
+     */
+    AsyncSqlCursorImpl(
+            SqlQueryType queryType,
+            ResultSetMetadata meta,
+            QueryTransactionWrapper txWrapper,
+            AsyncCursor<T> dataCursor,
+            Runnable onClose,
+            @Nullable CompletableFuture<AsyncSqlCursor<T>> nextStatement
     ) {
         this.queryType = queryType;
         this.meta = meta;
         this.txWrapper = txWrapper;
         this.dataCursor = dataCursor;
         this.onClose = onClose;
+        this.nextStatement = nextStatement;
     }
 
     /** {@inheritDoc} */
@@ -89,6 +116,22 @@ public class AsyncSqlCursorImpl<T> implements 
AsyncSqlCursor<T> {
         });
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public boolean hasNextResult() {
+        return nextStatement != null;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<AsyncSqlCursor<T>> nextResult() {
+        if (nextStatement == null) {
+            throw new NoSuchElementException("Query has no more results");
+        }
+
+        return nextStatement;
+    }
+
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<Void> closeAsync() {
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java
index f89717a892..e3e924b8b1 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java
@@ -53,4 +53,26 @@ public interface QueryProcessor extends IgniteComponent {
             String qry,
             Object... params
     );
+
+    /**
+     * Execute the multi-statement query with given schema name and parameters.
+     *
+     * @param properties User query properties. See {@link QueryProperty} for 
available properties.
+     * @param transactions Transactions facade.
+     * @param transaction A transaction to use for query execution. If null, 
an implicit transaction
+     *      will be started by provided transactions facade.
+     * @param qry Multi statement SQL query.
+     * @param params Query parameters.
+     * @return Sql cursor.
+     *
+     * @throws IgniteException in case of an error.
+     * @see QueryProperty
+     */
+    CompletableFuture<AsyncSqlCursor<List<Object>>> queryScriptAsync(
+            SqlProperties properties,
+            IgniteTransactions transactions,
+            @Nullable InternalTransaction transaction,
+            String qry,
+            Object... params
+    );
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index cd914bc079..7830570d2c 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -18,17 +18,22 @@
 package org.apache.ignite.internal.sql.engine;
 
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static 
org.apache.ignite.internal.lang.SqlExceptionMapperUtil.mapToPublicSqlException;
 import static 
org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFIG;
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
+import static org.apache.ignite.lang.ErrorGroups.Sql.EXECUTION_CANCELLED_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Sql.STMT_VALIDATION_ERR;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -351,6 +356,26 @@ public class SqlQueryProcessor implements QueryProcessor {
         }
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<AsyncSqlCursor<List<Object>>> queryScriptAsync(
+            SqlProperties properties,
+            IgniteTransactions transactions,
+            @Nullable InternalTransaction transaction,
+            String qry,
+            Object... params
+    ) {
+        if (!busyLock.enterBusy()) {
+            throw new IgniteInternalException(NODE_STOPPING_ERR, new 
NodeStoppingException());
+        }
+
+        try {
+            return queryScript0(properties, transactions, transaction, qry, 
params);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
     private <T extends LifecycleAware> T registerService(T service) {
         services.add(service);
 
@@ -374,25 +399,11 @@ public class SqlQueryProcessor implements QueryProcessor {
         CompletableFuture<AsyncSqlCursor<List<Object>>> stage = 
start.thenCompose(ignored -> {
             ParsedResult result = parserService.parse(sql);
 
-            validateParsedStatement(properties0, result, params, 
explicitTransaction);
+            validateParsedStatement(properties0, result, params);
 
             QueryTransactionWrapper txWrapper = 
wrapTxOrStartImplicit(result.queryType(), transactions, explicitTransaction);
 
-            return waitForActualSchema(schemaName, 
txWrapper.unwrap().startTimestamp())
-                    .thenCompose(schema -> {
-                        BaseQueryContext ctx = BaseQueryContext.builder()
-                                
.frameworkConfig(Frameworks.newConfigBuilder(FRAMEWORK_CONFIG).defaultSchema(schema).build())
-                                .queryId(UUID.randomUUID())
-                                .cancel(queryCancel)
-                                .parameters(params)
-                                .build();
-
-                        return prepareSvc.prepareAsync(result, 
ctx).thenApply(plan -> executePlan(txWrapper, ctx, plan));
-                    }).whenComplete((res, ex) -> {
-                        if (ex != null) {
-                            txWrapper.rollback();
-                        }
-                    });
+            return executeParsedStatement(schemaName, result, txWrapper, 
queryCancel, params, false, null);
         });
 
         // TODO IGNITE-20078 Improve (or remove) CancellationException 
handling.
@@ -407,6 +418,69 @@ public class SqlQueryProcessor implements QueryProcessor {
         return stage;
     }
 
+    private CompletableFuture<AsyncSqlCursor<List<Object>>> queryScript0(
+            SqlProperties properties,
+            IgniteTransactions transactions,
+            @Nullable InternalTransaction explicitTransaction,
+            String sql,
+            Object... params
+    ) {
+        SqlProperties properties0 = SqlPropertiesHelper.chain(properties, 
DEFAULT_PROPERTIES);
+        String schemaName = properties0.get(QueryProperty.DEFAULT_SCHEMA);
+
+        CompletableFuture<?> start = new CompletableFuture<>();
+
+        CompletableFuture<AsyncSqlCursor<List<Object>>> parseFut = start
+                .thenApply(ignored -> parserService.parseScript(sql))
+                .thenCompose(parsedResults -> {
+                    MultiStatementHandler handler = new MultiStatementHandler(
+                            schemaName, transactions, explicitTransaction, 
parsedResults, params);
+
+                    return handler.processNext();
+                });
+
+        start.completeAsync(() -> null, taskExecutor);
+
+        return parseFut;
+    }
+
+    private CompletableFuture<AsyncSqlCursor<List<Object>>> 
executeParsedStatement(
+            String schemaName,
+            ParsedResult parsedResult,
+            QueryTransactionWrapper txWrapper,
+            QueryCancel queryCancel,
+            Object[] params,
+            boolean waitForPrefetch,
+            @Nullable CompletableFuture<AsyncSqlCursor<List<Object>>> 
nextStatement
+    ) {
+        return waitForActualSchema(schemaName, 
txWrapper.unwrap().startTimestamp())
+                .thenCompose(schema -> {
+                    PrefetchCallback callback = waitForPrefetch ? new 
PrefetchCallback() : null;
+
+                    BaseQueryContext ctx = BaseQueryContext.builder()
+                            
.frameworkConfig(Frameworks.newConfigBuilder(FRAMEWORK_CONFIG).defaultSchema(schema).build())
+                            .queryId(UUID.randomUUID())
+                            .cancel(queryCancel)
+                            .prefetchCallback(callback)
+                            .parameters(params)
+                            .build();
+
+                    CompletableFuture<AsyncSqlCursor<List<Object>>> fut = 
prepareSvc.prepareAsync(parsedResult, ctx)
+                            .thenApply(plan -> executePlan(txWrapper, ctx, 
plan, nextStatement));
+
+                    if (waitForPrefetch) {
+                        fut = fut.thenCompose(cursor -> 
callback.prefetchFuture().thenApply(ignore -> cursor));
+                    }
+
+                    return fut;
+                })
+                .whenComplete((res, ex) -> {
+                    if (ex != null) {
+                        txWrapper.rollback();
+                    }
+                });
+    }
+
     private CompletableFuture<SchemaPlus> waitForActualSchema(String 
schemaName, HybridTimestamp timestamp) {
         try {
             return 
schemaSyncService.waitForMetadataCompleteness(timestamp).thenApply(unused -> {
@@ -426,7 +500,8 @@ public class SqlQueryProcessor implements QueryProcessor {
     private AsyncSqlCursor<List<Object>> executePlan(
             QueryTransactionWrapper txWrapper,
             BaseQueryContext ctx,
-            QueryPlan plan
+            QueryPlan plan,
+            @Nullable CompletableFuture<AsyncSqlCursor<List<Object>>> 
nextStatement
     ) {
         if (!busyLock.enterBusy()) {
             throw new IgniteInternalException(NODE_STOPPING_ERR, new 
NodeStoppingException());
@@ -443,7 +518,8 @@ public class SqlQueryProcessor implements QueryProcessor {
                     plan.metadata(),
                     txWrapper,
                     dataCursor,
-                    () -> openedCursors.remove(queryId)
+                    () -> openedCursors.remove(queryId),
+                    nextStatement
             );
 
             Object old = openedCursors.put(queryId, cursor);
@@ -477,6 +553,14 @@ public class SqlQueryProcessor implements QueryProcessor {
             return new QueryTransactionWrapper(tx, true);
         }
 
+        if (SqlQueryType.DDL == queryType) {
+            throw new SqlException(STMT_VALIDATION_ERR, "DDL doesn't support 
transactions.");
+        }
+
+        if (SqlQueryType.DML == queryType && outerTx.isReadOnly()) {
+            throw new SqlException(STMT_VALIDATION_ERR, "DML query cannot be 
started by using read only transactions.");
+        }
+
         return new QueryTransactionWrapper(outerTx, false);
     }
 
@@ -489,22 +573,11 @@ public class SqlQueryProcessor implements QueryProcessor {
     private static void validateParsedStatement(
             SqlProperties properties,
             ParsedResult parsedResult,
-            Object[] params,
-            @Nullable InternalTransaction outerTx
+            Object[] params
     ) {
         Set<SqlQueryType> allowedTypes = 
properties.get(QueryProperty.ALLOWED_QUERY_TYPES);
         SqlQueryType queryType = parsedResult.queryType();
 
-        if (outerTx != null) {
-            if (SqlQueryType.DDL == queryType) {
-                throw new SqlException(STMT_VALIDATION_ERR, "DDL doesn't 
support transactions.");
-            }
-
-            if (SqlQueryType.DML == queryType && outerTx.isReadOnly()) {
-                throw new SqlException(STMT_VALIDATION_ERR, "DML query cannot 
be started by using read only transactions.");
-            }
-        }
-
         if (parsedResult.queryType() == SqlQueryType.TX_CONTROL) {
             String message = "Transaction control statement can not be 
executed as an independent statement";
 
@@ -517,10 +590,14 @@ public class SqlQueryProcessor implements QueryProcessor {
             throw new SqlException(STMT_VALIDATION_ERR, message);
         }
 
-        if (parsedResult.dynamicParamsCount() != params.length) {
+        validateDynamicParameters(parsedResult.dynamicParamsCount(), params);
+    }
+
+    private static void validateDynamicParameters(int expectedParamsCount, 
Object[] params) throws SqlException {
+        if (expectedParamsCount != params.length) {
             String message = format(
                     "Unexpected number of query parameters. Provided {} but 
there is only {} dynamic parameter(s).",
-                    params.length, parsedResult.dynamicParamsCount()
+                    params.length, expectedParamsCount
             );
 
             throw new SqlException(STMT_VALIDATION_ERR, message);
@@ -535,4 +612,157 @@ public class SqlQueryProcessor implements QueryProcessor {
             }
         }
     }
+
+    private class MultiStatementHandler {
+        private final String schemaName;
+        private final IgniteTransactions transactions;
+        private final @Nullable InternalTransaction explicitTransaction;
+        private final Queue<ScriptStatementParameters> statements;
+
+        MultiStatementHandler(
+                String schemaName,
+                IgniteTransactions transactions,
+                @Nullable InternalTransaction explicitTransaction,
+                List<ParsedResult> parsedResults,
+                Object[] params
+        ) {
+            this.schemaName = schemaName;
+            this.transactions = transactions;
+            this.explicitTransaction = explicitTransaction;
+            this.statements = prepareStatementsQueue(parsedResults, params);
+        }
+
+        CompletableFuture<AsyncSqlCursor<List<Object>>> processNext() {
+            if (statements == null) {
+                // TODO https://issues.apache.org/jira/browse/IGNITE-20463 
Each tx control statement must return an empty cursor.
+                return CompletableFuture.completedFuture(null);
+            }
+
+            ScriptStatementParameters parameters = statements.poll();
+
+            assert parameters != null;
+
+            ParsedResult parsedResult = parameters.parsedResult;
+            Object[] dynamicParams = parameters.dynamicParams;
+            CompletableFuture<AsyncSqlCursor<List<Object>>> cursorFuture = 
parameters.cursorFuture;
+            CompletableFuture<AsyncSqlCursor<List<Object>>> nextCursorFuture = 
parameters.nextStatementFuture;
+
+            try {
+                if (cursorFuture.isDone()) {
+                    return cursorFuture;
+                }
+
+                QueryTransactionWrapper txWrapper = 
wrapTxOrStartImplicit(parsedResult.queryType(), transactions, 
explicitTransaction);
+
+                QueryCancel cancel = new QueryCancel();
+
+                executeParsedStatement(schemaName, parsedResult, txWrapper, 
cancel, dynamicParams, true, nextCursorFuture)
+                        .whenComplete((res, ex) -> {
+                            if (ex != null) {
+                                cursorFuture.completeExceptionally(ex);
+                                cancelAll(ex);
+
+                                return;
+                            }
+
+                            txWrapper.commitImplicit();
+
+                            if (nextCursorFuture != null) {
+                                taskExecutor.execute(this::processNext);
+                            }
+
+                            cursorFuture.complete(res);
+                        });
+            } catch (Exception e) {
+                cursorFuture.completeExceptionally(e);
+
+                cancelAll(e);
+            }
+
+            return cursorFuture;
+        }
+
+        /**
+         * Returns a queue. each element of which represents parameters 
required to execute a single statement of the script.
+         */
+        private @Nullable Queue<ScriptStatementParameters> 
prepareStatementsQueue(List<ParsedResult> parsedResults, Object[] params) {
+            List<ParsedResult> parsedResults0 = parsedResults.stream()
+                    // TODO https://issues.apache.org/jira/browse/IGNITE-20463 
Integrate TX-related statements
+                    .filter(res -> res.queryType() != 
SqlQueryType.TX_CONTROL).collect(Collectors.toList());
+
+            if (parsedResults0.isEmpty()) {
+                return null;
+            }
+
+            int paramsCount = 
parsedResults0.stream().mapToInt(ParsedResult::dynamicParamsCount).sum();
+            validateDynamicParameters(paramsCount, params);
+
+            ScriptStatementParameters[] results = new 
ScriptStatementParameters[parsedResults0.size()];
+
+            // We fill parameters in reverse order, because each script 
statement
+            // requires a reference to the future of the next statement.
+            for (int i = parsedResults0.size(); i > 0; i--) {
+                ParsedResult result = parsedResults0.get(i - 1);
+
+                Object[] params0 = Arrays.copyOfRange(params, paramsCount - 
result.dynamicParamsCount(), paramsCount);
+                paramsCount -= result.dynamicParamsCount();
+
+                results[i - 1] = new ScriptStatementParameters(result, params0,
+                        i < parsedResults0.size() ? results[i].cursorFuture : 
null);
+            }
+
+            return new ArrayBlockingQueue<>(results.length, false, 
List.of(results));
+        }
+
+        private void cancelAll(Throwable cause) {
+            for (ScriptStatementParameters parameters : statements) {
+                CompletableFuture<AsyncSqlCursor<List<Object>>> fut = 
parameters.cursorFuture;
+
+                if (fut.isDone()) {
+                    continue;
+                }
+
+                fut.completeExceptionally(new SqlException(
+                        EXECUTION_CANCELLED_ERR,
+                        "The script execution was canceled due to an error in 
the previous statement.",
+                        cause
+                ));
+            }
+        }
+
+        private class ScriptStatementParameters {
+            private final CompletableFuture<AsyncSqlCursor<List<Object>>> 
cursorFuture = new CompletableFuture<>();
+            private final CompletableFuture<AsyncSqlCursor<List<Object>>> 
nextStatementFuture;
+            private final ParsedResult parsedResult;
+            private final Object[] dynamicParams;
+
+            private ScriptStatementParameters(
+                    ParsedResult parsedResult,
+                    Object[] dynamicParams,
+                    @Nullable CompletableFuture<AsyncSqlCursor<List<Object>>> 
nextStatementFuture
+            ) {
+                this.parsedResult = parsedResult;
+                this.dynamicParams = dynamicParams;
+                this.nextStatementFuture = nextStatementFuture;
+            }
+        }
+    }
+
+    /** Completes the provided future when the callback is called. */
+    private static class PrefetchCallback implements QueryPrefetchCallback {
+        private final CompletableFuture<Void> prefetchFuture = new 
CompletableFuture<>();
+
+        @Override
+        public void onPrefetchComplete(@Nullable Throwable ex) {
+            if (ex == null) {
+                prefetchFuture.complete(null);
+            } else {
+                
prefetchFuture.completeExceptionally(mapToPublicSqlException(ex));
+            }
+        }
+
+        CompletableFuture<Void> prefetchFuture() {
+            return prefetchFuture;
+        }
+    }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index 8f8103c6d4..a7ea216d5d 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -490,6 +490,12 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
                 root.exceptionally(t -> {
                     this.close(true);
 
+                    QueryPrefetchCallback callback = ctx.prefetchCallback();
+
+                    if (callback != null) {
+                        taskExecutor.execute(() -> 
callback.onPrefetchComplete(t));
+                    }
+
                     return null;
                 });
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/sql/ScriptParseResult.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/sql/ScriptParseResult.java
index 2d59946b14..3b2b546fcc 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/sql/ScriptParseResult.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/sql/ScriptParseResult.java
@@ -22,9 +22,8 @@ import java.util.stream.Collectors;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.apache.calcite.sql.SqlDynamicParam;
 import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.util.SqlBasicVisitor;
+import org.apache.calcite.sql.util.SqlShuttle;
 import org.apache.ignite.internal.tostring.S;
-import org.jetbrains.annotations.Nullable;
 
 /**
  * Result of parsing SQL string that multiple statements.
@@ -41,9 +40,22 @@ public final class ScriptParseResult extends ParseResult {
                 return new ScriptParseResult(List.of(new 
StatementParseResult(list.get(0), dynamicParamsCount)), dynamicParamsCount);
             }
 
-            SqlDynamicParamsCounter paramsCounter = dynamicParamsCount == 0 ? 
null : new SqlDynamicParamsCounter();
+            SqlDynamicParamsAdjuster dynamicParamsAdjuster = new 
SqlDynamicParamsAdjuster();
+
             List<StatementParseResult> results = list.stream()
-                    .map(node -> new StatementParseResult(node, paramsCounter 
== null ? 0 : paramsCounter.forNode(node)))
+                    .map(node -> {
+                        if (dynamicParamsCount == 0) {
+                            return new StatementParseResult(node, 0);
+                        }
+
+                        dynamicParamsAdjuster.reset();
+
+                        SqlNode newTree = 
dynamicParamsAdjuster.visitNode(node);
+
+                        assert newTree != null;
+
+                        return new StatementParseResult(newTree, 
dynamicParamsAdjuster.paramsCount());
+                    })
                     .collect(Collectors.toList());
 
             return new ScriptParseResult(results, dynamicParamsCount);
@@ -69,25 +81,25 @@ public final class ScriptParseResult extends ParseResult {
     }
 
     /**
-     * Counts the number of {@link SqlDynamicParam} nodes in the tree.
+     * Adjusts the dynamic parameter indexes to match the single statement 
parameter indexes.
      */
     @NotThreadSafe
-    static class SqlDynamicParamsCounter extends SqlBasicVisitor<Object> {
-        int count;
+    private static final class SqlDynamicParamsAdjuster extends SqlShuttle {
+        private int counter;
 
         @Override
-        public @Nullable Object visit(SqlDynamicParam param) {
-            count++;
-
-            return null;
+        public SqlNode visit(SqlDynamicParam param) {
+            return new SqlDynamicParam(counter++, param.getParserPosition());
         }
 
-        int forNode(SqlNode node) {
-            count = 0;
-
-            this.visitNode(node);
+        /** Resets the dynamic parameters counter. */
+        void reset() {
+            counter = 0;
+        }
 
-            return count;
+        /** Returns the number of processed dynamic parameters. */
+        int paramsCount() {
+            return counter;
         }
     }
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/sql/IgniteSqlParserTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/sql/IgniteSqlParserTest.java
index 8bc52a3c6a..cb5c48ad59 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/sql/IgniteSqlParserTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/sql/IgniteSqlParserTest.java
@@ -93,6 +93,21 @@ public class IgniteSqlParserTest {
                 () -> IgniteSqlParser.parse("", StatementParseResult.MODE));
     }
 
+    @Test
+    public void testEmptyStatements() {
+        assertThrowsSqlException(Sql.STMT_PARSE_ERR,
+                "Failed to parse query: Encountered \";\" at line 1, column 1",
+                () -> IgniteSqlParser.parse(";", ScriptParseResult.MODE));
+
+        assertThrowsSqlException(Sql.STMT_PARSE_ERR,
+                "Failed to parse query: Encountered \";\" at line 2, column 1",
+                () -> IgniteSqlParser.parse("--- comment\n;", 
ScriptParseResult.MODE));
+
+        assertThrowsSqlException(Sql.STMT_PARSE_ERR,
+                "Failed to parse query: Encountered \"<EOF>\" at line 1, 
column 11",
+                () -> IgniteSqlParser.parse("--- comment", 
ScriptParseResult.MODE));
+    }
+
     @Test
     public void testCommentedQuery() {
         assertThrowsSqlException(
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerTest.java
index 938e577abc..9fe1a6920c 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerTest.java
@@ -293,6 +293,17 @@ public class QueryCheckerTest extends 
BaseIgniteAbstractTest {
             return CompletableFuture.completedFuture(sqlCursor);
         }
 
+        @Override
+        public CompletableFuture<AsyncSqlCursor<List<Object>>> 
queryScriptAsync(
+                SqlProperties properties,
+                IgniteTransactions transactions,
+                @Nullable InternalTransaction transaction,
+                String qry,
+                Object... params
+        ) {
+            throw new UnsupportedOperationException();
+        }
+
         @Override
         public void start() {
             // NO-OP


Reply via email to