This is an automated email from the ASF dual-hosted git repository.

ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 39bc0f37d5 IGNITE-18984 Sql. Migrate JDBC batched methods on new 
internal API (#1866)
39bc0f37d5 is described below

commit 39bc0f37d597ec2d58db27e3eeb095097625d70f
Author: ygerzhedovich <41903880+ygerzhedov...@users.noreply.github.com>
AuthorDate: Fri Mar 31 18:44:20 2023 +0300

    IGNITE-18984 Sql. Migrate JDBC batched methods on new internal API (#1866)
---
 .../internal/jdbc/proto/JdbcQueryEventHandler.java |   8 +-
 .../client/handler/JdbcQueryEventHandlerImpl.java  |  40 ++++---
 .../jdbc/ClientJdbcExecuteBatchRequest.java        |   4 +-
 .../jdbc/ClientJdbcPreparedStmntBatchRequest.java  |   4 +-
 .../client/fakes/FakeIgniteQueryProcessor.java     |  11 --
 .../apache/ignite/jdbc/ItJdbcBatchSelfTest.java    |  15 ++-
 .../internal/jdbc/JdbcClientQueryEventHandler.java |  17 ++-
 .../internal/jdbc/JdbcPreparedStatement.java       |   2 +-
 .../apache/ignite/internal/jdbc/JdbcStatement.java |   2 +-
 .../internal/ClusterPerTestIntegrationTest.java    |  11 +-
 .../ignite/internal/sql/engine/QueryProcessor.java |  25 -----
 .../internal/sql/engine/SqlQueryProcessor.java     | 124 +--------------------
 .../internal/sql/engine/StopCalciteModuleTest.java |  20 ++--
 .../sql/engine/exec/MockedStructuresTest.java      |  89 +++++++++------
 14 files changed, 145 insertions(+), 227 deletions(-)

diff --git 
a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/JdbcQueryEventHandler.java
 
b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/JdbcQueryEventHandler.java
index 737c0a4983..a71c49a203 100644
--- 
a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/JdbcQueryEventHandler.java
+++ 
b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/JdbcQueryEventHandler.java
@@ -47,6 +47,7 @@ public interface JdbcQueryEventHandler {
     /**
      * {@link JdbcQueryExecuteRequest} command handler.
      *
+     * @param connectionId Identifier of the connection.
      * @param req Execute query request.
      * @return Result future.
      */
@@ -55,19 +56,20 @@ public interface JdbcQueryEventHandler {
     /**
      * {@link JdbcBatchExecuteRequest} command handler.
      *
+     * @param connectionId Identifier of the connection.
      * @param req Batch query request.
      * @return Result future.
      */
-    CompletableFuture<JdbcBatchExecuteResult> 
batchAsync(JdbcBatchExecuteRequest req);
+    CompletableFuture<JdbcBatchExecuteResult> batchAsync(long connectionId, 
JdbcBatchExecuteRequest req);
 
     /**
      * {@link JdbcBatchPreparedStmntRequest} command handler.
      *
+     * @param connectionId The identifier of the connection.
      * @param req Batch query request.
      * @return Result future.
      */
-    CompletableFuture<JdbcBatchExecuteResult> batchPrepStatementAsync(
-            JdbcBatchPreparedStmntRequest req);
+    CompletableFuture<JdbcBatchExecuteResult> batchPrepStatementAsync(long 
connectionId, JdbcBatchPreparedStmntRequest req);
 
     /**
      * {@link JdbcMetaTablesRequest} command handler.
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java
index 82b04200d4..9836ffc154 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java
@@ -60,6 +60,7 @@ import 
org.apache.ignite.internal.sql.engine.property.PropertiesHolder;
 import org.apache.ignite.internal.sql.engine.session.SessionId;
 import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.internal.util.Pair;
+import org.apache.ignite.lang.ErrorGroups.Client;
 import org.apache.ignite.lang.ErrorGroups.Sql;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.IgniteInternalCheckedException;
@@ -177,14 +178,14 @@ public class JdbcQueryEventHandlerImpl implements 
JdbcQueryEventHandler {
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<JdbcBatchExecuteResult> 
batchAsync(JdbcBatchExecuteRequest req) {
+    public CompletableFuture<JdbcBatchExecuteResult> batchAsync(long 
connectionId, JdbcBatchExecuteRequest req) {
         List<String> queries = req.queries();
 
         var counters = new IntArrayList(req.queries().size());
         var tail = CompletableFuture.completedFuture(counters);
 
         for (String query : queries) {
-            tail = tail.thenCompose(list -> 
executeAndCollectUpdateCount(req.schemaName(), query, OBJECT_EMPTY_ARRAY)
+            tail = tail.thenCompose(list -> 
executeAndCollectUpdateCount(connectionId, query, OBJECT_EMPTY_ARRAY)
                     .thenApply(cnt -> {
                         list.add(cnt > Integer.MAX_VALUE ? 
Statement.SUCCESS_NO_INFO : cnt.intValue());
 
@@ -203,15 +204,14 @@ public class JdbcQueryEventHandlerImpl implements 
JdbcQueryEventHandler {
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<JdbcBatchExecuteResult> batchPrepStatementAsync(
-            JdbcBatchPreparedStmntRequest req) {
+    public CompletableFuture<JdbcBatchExecuteResult> 
batchPrepStatementAsync(long connectionId, JdbcBatchPreparedStmntRequest req) {
         var argList = req.getArgs();
 
         var counters = new IntArrayList(req.getArgs().size());
         var tail = CompletableFuture.completedFuture(counters);
 
         for (Object[] args : argList) {
-            tail = tail.thenCompose(list -> 
executeAndCollectUpdateCount(req.schemaName(), req.getQuery(), args)
+            tail = tail.thenCompose(list -> 
executeAndCollectUpdateCount(connectionId, req.getQuery(), args)
                     .thenApply(cnt -> {
                         list.add(cnt > Integer.MAX_VALUE ? 
Statement.SUCCESS_NO_INFO : cnt.intValue());
 
@@ -228,16 +228,25 @@ public class JdbcQueryEventHandlerImpl implements 
JdbcQueryEventHandler {
         });
     }
 
-    private CompletableFuture<Long> executeAndCollectUpdateCount(String 
schema, String sql, Object[] arg) {
+    private CompletableFuture<Long> executeAndCollectUpdateCount(long 
connectionId, String sql, Object[] arg) {
         var context = 
createQueryContext(JdbcStatementType.UPDATE_STATEMENT_TYPE);
 
-        var cursors = processor.queryAsync(context, schema, sql, arg);
-
-        if (cursors.size() != 1) {
-            return CompletableFuture.failedFuture(new 
IgniteInternalException("Multi statement queries are not supported in 
batching"));
+        JdbcConnectionContext connectionContext;
+        try {
+            connectionContext = 
resources.get(connectionId).get(JdbcConnectionContext.class);
+        } catch (IgniteInternalCheckedException exception) {
+            return CompletableFuture.failedFuture(new 
IgniteInternalException(Client.CONNECTION_ERR));
         }
 
-        return cursors.get(0).thenCompose(cursor -> 
cursor.requestNextAsync(1).thenApply(batch -> (Long) 
batch.items().get(0).get(0)));
+        CompletableFuture<AsyncSqlCursor<List<Object>>> result = 
connectionContext.doInSession(sessionId -> processor.querySingleAsync(
+                sessionId,
+                context,
+                sql,
+                arg == null ? OBJECT_EMPTY_ARRAY : arg
+        ));
+
+        return result.thenCompose(cursor -> cursor.requestNextAsync(1))
+                .thenApply(batch -> (Long) batch.items().get(0).get(0));
     }
 
     private JdbcBatchExecuteResult handleBatchException(Throwable e, String 
query, int[] counters) {
@@ -290,7 +299,8 @@ public class JdbcQueryEventHandlerImpl implements 
JdbcQueryEventHandler {
 
         try (PrintWriter pw = new PrintWriter(sw)) {
             // We need to remap QueryValidationException into a jdbc error.
-            if (cause instanceof IgniteException && cause.getCause() 
instanceof QueryValidationException) {
+            if (cause instanceof QueryValidationException
+                    || (cause instanceof IgniteException && cause.getCause() 
instanceof QueryValidationException)) {
                 pw.print("Given statement type does not match that declared by 
JDBC driver.");
             } else {
                 pw.print(cause.getMessage());
@@ -394,7 +404,7 @@ public class JdbcQueryEventHandlerImpl implements 
JdbcQueryEventHandler {
                 potentiallyNotCreatedSessionId = recreateSession(null);
             }
 
-            final SessionId finalSessionId = potentiallyNotCreatedSessionId;
+            SessionId finalSessionId = potentiallyNotCreatedSessionId;
 
             return action.perform(finalSessionId)
                     .handle((BiFunction<T, Throwable, Pair<T, Throwable>>) 
Pair::new)
@@ -457,13 +467,13 @@ public class JdbcQueryEventHandlerImpl implements 
JdbcQueryEventHandler {
      * when the session is no longer needed.
      */
     @FunctionalInterface
-    private static interface SessionCleaner {
+    private interface SessionCleaner {
         void clean(SessionId sessionId);
     }
 
     /** Interface describing an action that should be performed within the 
session. */
     @FunctionalInterface
-    static interface SessionAwareAction<T> {
+    interface SessionAwareAction<T> {
         CompletableFuture<T> perform(SessionId sessionId);
     }
 }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcExecuteBatchRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcExecuteBatchRequest.java
index e6d183ca00..311a04a872 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcExecuteBatchRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcExecuteBatchRequest.java
@@ -42,8 +42,10 @@ public class ClientJdbcExecuteBatchRequest {
     ) {
         var req = new JdbcBatchExecuteRequest();
 
+        long connectionId = in.unpackLong();
+
         req.readBinary(in);
 
-        return handler.batchAsync(req).thenAccept(res -> res.writeBinary(out));
+        return handler.batchAsync(connectionId, req).thenAccept(res -> 
res.writeBinary(out));
     }
 }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcPreparedStmntBatchRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcPreparedStmntBatchRequest.java
index d0017c6586..ba29ea7a0e 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcPreparedStmntBatchRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcPreparedStmntBatchRequest.java
@@ -42,8 +42,10 @@ public class ClientJdbcPreparedStmntBatchRequest {
     ) {
         var req = new JdbcBatchPreparedStmntRequest();
 
+        long connectionId = in.unpackLong();
+
         req.readBinary(in);
 
-        return handler.batchPrepStatementAsync(req).thenAccept(res -> 
res.writeBinary(out));
+        return handler.batchPrepStatementAsync(connectionId, 
req).thenAccept(res -> res.writeBinary(out));
     }
 }
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
index dc0ac6e481..08106c9cc6 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
@@ -47,17 +47,6 @@ public class FakeIgniteQueryProcessor implements 
QueryProcessor {
         return Collections.emptyList();
     }
 
-    @Override
-    public List<CompletableFuture<AsyncSqlCursor<List<Object>>>> 
queryAsync(String schemaName, String qry, Object... params) {
-        return List.of(CompletableFuture.completedFuture(new FakeCursor()));
-    }
-
-    @Override
-    public List<CompletableFuture<AsyncSqlCursor<List<Object>>>> 
queryAsync(QueryContext context, String schemaName,
-            String qry, Object... params) {
-        return List.of(CompletableFuture.completedFuture(new FakeCursor()));
-    }
-
     @Override
     public CompletableFuture<AsyncSqlCursor<List<Object>>> querySingleAsync(
             SessionId sessionid, QueryContext context, String qry,
diff --git 
a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcBatchSelfTest.java
 
b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcBatchSelfTest.java
index cc581ac178..5634af9127 100644
--- 
a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcBatchSelfTest.java
+++ 
b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcBatchSelfTest.java
@@ -123,10 +123,21 @@ public class ItJdbcBatchSelfTest extends 
AbstractJdbcSelfTest {
         }
     }
 
+    @Test
+    public void testMultipleStatementForBatchIsNotAllowed() throws 
SQLException {
+        String insertStmt = "insert into Person (id, firstName, lastName, age) 
values";
+        String ins1 = insertStmt + valuesRow(1);
+        String ins2 = insertStmt + valuesRow(2);
+
+        stmt.addBatch(ins1 + ";" + ins2);
+
+        assertThrows(BatchUpdateException.class, () -> stmt.executeBatch(), 
"Multiple statements are not allowed.");
+    }
+
     @Test
     public void testBatchOnClosedStatement() throws SQLException {
-        final Statement stmt2 = conn.createStatement();
-        final PreparedStatement pstmt2 = conn.prepareStatement("");
+        Statement stmt2 = conn.createStatement();
+        PreparedStatement pstmt2 = conn.prepareStatement("");
 
         stmt2.close();
         pstmt2.close();
diff --git 
a/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcClientQueryEventHandler.java
 
b/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcClientQueryEventHandler.java
index c69ed96559..083f53c84c 100644
--- 
a/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcClientQueryEventHandler.java
+++ 
b/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcClientQueryEventHandler.java
@@ -82,8 +82,12 @@ public class JdbcClientQueryEventHandler implements 
JdbcQueryEventHandler {
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<JdbcBatchExecuteResult> 
batchAsync(JdbcBatchExecuteRequest req) {
-        return client.sendRequestAsync(ClientOp.JDBC_EXEC_BATCH, w -> 
req.writeBinary(w.out()), r -> {
+    public CompletableFuture<JdbcBatchExecuteResult> batchAsync(long 
connectionId, JdbcBatchExecuteRequest req) {
+        return client.sendRequestAsync(ClientOp.JDBC_EXEC_BATCH, w -> {
+            w.out().packLong(connectionId);
+
+            req.writeBinary(w.out());
+        }, r -> {
             JdbcBatchExecuteResult res = new JdbcBatchExecuteResult();
 
             res.readBinary(r.in());
@@ -94,9 +98,12 @@ public class JdbcClientQueryEventHandler implements 
JdbcQueryEventHandler {
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<JdbcBatchExecuteResult> batchPrepStatementAsync(
-            JdbcBatchPreparedStmntRequest req) {
-        return client.sendRequestAsync(ClientOp.JDBC_SQL_EXEC_PS_BATCH, w -> 
req.writeBinary(w.out()), r -> {
+    public CompletableFuture<JdbcBatchExecuteResult> 
batchPrepStatementAsync(long connectionId, JdbcBatchPreparedStmntRequest req) {
+        return client.sendRequestAsync(ClientOp.JDBC_SQL_EXEC_PS_BATCH, w -> {
+            w.out().packLong(connectionId);
+
+            req.writeBinary(w.out());
+        }, r -> {
             JdbcBatchExecuteResult res = new JdbcBatchExecuteResult();
 
             res.readBinary(r.in());
diff --git 
a/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcPreparedStatement.java
 
b/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcPreparedStatement.java
index ccf16601f3..246ec5f2a6 100644
--- 
a/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcPreparedStatement.java
+++ 
b/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcPreparedStatement.java
@@ -115,7 +115,7 @@ public class JdbcPreparedStatement extends JdbcStatement 
implements PreparedStat
                 = new JdbcBatchPreparedStmntRequest(conn.getSchema(), sql, 
batchedArgs);
 
         try {
-            JdbcBatchExecuteResult res = 
conn.handler().batchPrepStatementAsync(req).join();
+            JdbcBatchExecuteResult res = 
conn.handler().batchPrepStatementAsync(conn.connectionId(), req).join();
 
             if (!res.hasResults()) {
                 throw new BatchUpdateException(res.err(),
diff --git 
a/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcStatement.java 
b/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcStatement.java
index 2cb6946390..0808ae91ec 100644
--- 
a/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcStatement.java
+++ 
b/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcStatement.java
@@ -550,7 +550,7 @@ public class JdbcStatement implements Statement {
         JdbcBatchExecuteRequest req = new 
JdbcBatchExecuteRequest(conn.getSchema(), batch);
 
         try {
-            JdbcBatchExecuteResult res = conn.handler().batchAsync(req).join();
+            JdbcBatchExecuteResult res = 
conn.handler().batchAsync(conn.connectionId(), req).join();
 
             if (!res.hasResults()) {
                 throw new BatchUpdateException(res.err(),
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
index 4c8121d146..f2806ffaa4 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
@@ -27,6 +27,11 @@ import org.apache.ignite.Ignite;
 import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.sql.engine.QueryContext;
+import org.apache.ignite.internal.sql.engine.QueryProcessor;
+import org.apache.ignite.internal.sql.engine.SqlQueryType;
+import org.apache.ignite.internal.sql.engine.property.PropertiesHelper;
+import org.apache.ignite.internal.sql.engine.session.SessionId;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -194,8 +199,12 @@ public abstract class ClusterPerTestIntegrationTest 
extends IgniteIntegrationTes
     }
 
     protected final List<List<Object>> executeSql(String sql, Object... args) {
+        QueryProcessor qryProc = node(0).queryEngine();
+        SessionId sessionId = 
qryProc.createSession(PropertiesHelper.emptyHolder());
+        QueryContext context = QueryContext.create(SqlQueryType.ALL);
+
         return getAllFromCursor(
-                node(0).queryEngine().queryAsync("PUBLIC", sql, 
args).get(0).join()
+                qryProc.querySingleAsync(sessionId, context, sql, args).join()
         );
     }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java
index 5cfadb9783..92bacfd50d 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java
@@ -56,31 +56,6 @@ public interface QueryProcessor extends IgniteComponent {
      */
     List<SessionInfo> liveSessions();
 
-    /**
-     * Execute the query with given schema name and parameters.
-     *
-     * @param schemaName Schema name.
-     * @param qry Sql query.
-     * @param params Query parameters.
-     * @return List of sql cursors.
-     *
-     * @throws IgniteException in case of an error.
-     */
-    List<CompletableFuture<AsyncSqlCursor<List<Object>>>> queryAsync(String 
schemaName, String qry, Object... params);
-
-    /**
-     * Execute the query with given schema name and parameters.
-     *
-     * @param context User query context.
-     * @param schemaName Schema name.
-     * @param qry Sql query.
-     * @param params Query parameters.
-     * @return List of sql cursors.
-     *
-     * @throws IgniteException in case of an error.
-     */
-    List<CompletableFuture<AsyncSqlCursor<List<Object>>>> 
queryAsync(QueryContext context, String schemaName, String qry, Object... 
params);
-
     /**
      * Execute the single statement query with given schema name and 
parameters.
      *
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 4f4aa60c03..4366c4f63a 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -82,7 +82,6 @@ import 
org.apache.ignite.internal.sql.engine.session.SessionManager;
 import org.apache.ignite.internal.sql.engine.session.SessionProperty;
 import org.apache.ignite.internal.sql.engine.sql.IgniteSqlParser;
 import org.apache.ignite.internal.sql.engine.sql.ParseResult;
-import org.apache.ignite.internal.sql.engine.sql.ScriptParseResult;
 import org.apache.ignite.internal.sql.engine.sql.StatementParseResult;
 import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
 import org.apache.ignite.internal.sql.engine.util.Commons;
@@ -175,7 +174,7 @@ public class SqlQueryProcessor implements QueryProcessor {
     private final HybridClock clock;
 
     /** Distributed catalog manager. */
-    private CatalogManager catalogManager;
+    private final CatalogManager catalogManager;
 
     /** Constructor. */
     public SqlQueryProcessor(
@@ -334,29 +333,6 @@ public class SqlQueryProcessor implements QueryProcessor {
         IgniteUtils.closeAll(Stream.concat(closableComponents, 
closableListeners).collect(Collectors.toList()));
     }
 
-    /** {@inheritDoc} */
-    @Override
-    public List<CompletableFuture<AsyncSqlCursor<List<Object>>>> 
queryAsync(String schemaName, String qry, Object... params) {
-        QueryContext context = QueryContext.create(SqlQueryType.ALL);
-
-        return queryAsync(context, schemaName, qry, params);
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public List<CompletableFuture<AsyncSqlCursor<List<Object>>>> 
queryAsync(QueryContext context, String schemaName,
-            String qry, Object... params) {
-        if (!busyLock.enterBusy()) {
-            throw new IgniteInternalException(OPERATION_INTERRUPTED_ERR, new 
NodeStoppingException());
-        }
-
-        try {
-            return query0(context, schemaName, qry, params);
-        } finally {
-            busyLock.leaveBusy();
-        }
-    }
-
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<AsyncSqlCursor<List<Object>>> querySingleAsync(
@@ -502,104 +478,6 @@ public class SqlQueryProcessor implements QueryProcessor {
         return stage;
     }
 
-    private List<CompletableFuture<AsyncSqlCursor<List<Object>>>> query0(
-            QueryContext context,
-            String schemaName,
-            String sql,
-            Object... params
-    ) {
-        SchemaPlus schema = sqlSchemaManager.schema(schemaName);
-
-        if (schema == null) {
-            throw new IgniteInternalException(SCHEMA_NOT_FOUND_ERR, 
format("Schema not found [schemaName={}]", schemaName));
-        }
-
-        CompletableFuture<Void> start = new CompletableFuture<>();
-
-        ScriptParseResult parseResult;
-        List<CompletableFuture<AsyncSqlCursor<List<Object>>>> res;
-
-        try {
-            parseResult = IgniteSqlParser.parse(sql, ScriptParseResult.MODE);
-            res = new ArrayList<>(parseResult.statements().size());
-        } catch (Throwable th) {
-            start.completeExceptionally(th);
-
-            parseResult = new ScriptParseResult(Collections.emptyList(), 0);
-            res = 
Collections.singletonList(CompletableFuture.completedFuture(failedCursor(th)));
-        }
-
-        for (SqlNode sqlNode : parseResult.statements()) {
-            try {
-                validateParsedStatement(context, parseResult, sqlNode, params);
-            } catch (Exception e) {
-                start.completeExceptionally(e);
-
-                res = 
Collections.singletonList(CompletableFuture.completedFuture(failedCursor(e)));
-                return res;
-            }
-
-            // Only rw transactions for now.
-            InternalTransaction implicitTx = txManager.begin(false);
-
-            final BaseQueryContext ctx = BaseQueryContext.builder()
-                    .cancel(new QueryCancel())
-                    .frameworkConfig(
-                            Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
-                                    .defaultSchema(schema)
-                                    .build()
-                    )
-                    .logger(LOG)
-                    .parameters(params)
-                    .plannerTimeout(PLANNER_TIMEOUT)
-                    .build();
-
-            // TODO https://issues.apache.org/jira/browse/IGNITE-17746 Fix 
query execution flow.
-            CompletableFuture<AsyncSqlCursor<List<Object>>> stage = start
-                    .thenCompose(none -> prepareSvc.prepareAsync(sqlNode, ctx))
-                    .thenApply(plan -> {
-                        SqlQueryType queryType = plan.type();
-                        assert queryType != null : "Expected a full plan but 
got a fragment: " + plan;
-
-                        return new AsyncSqlCursorImpl<>(
-                                queryType,
-                                plan.metadata(),
-                                implicitTx,
-                                executionSrvc.executePlan(implicitTx, plan, 
ctx)
-                        );
-                    });
-
-            stage.whenComplete((cur, ex) -> {
-                if (ex instanceof CancellationException) {
-                    ctx.cancel().cancel();
-                }
-            });
-
-            res.add(stage);
-        }
-
-        start.completeAsync(() -> null, taskExecutor);
-
-        return res;
-    }
-
-    private static <T> AsyncSqlCursor<T> failedCursor(Throwable th) {
-        return new AsyncSqlCursorImpl<>(
-                null, null, null,
-                new AsyncCursor<>() {
-                    @Override
-                    public CompletableFuture<BatchedResult<T>> 
requestNextAsync(int rows) {
-                        return CompletableFuture.failedFuture(th);
-                    }
-
-                    @Override
-                    public CompletableFuture<Void> closeAsync() {
-                        return CompletableFuture.completedFuture(null);
-                    }
-                }
-        );
-    }
-
     private abstract static class AbstractTableEventListener implements 
EventListener<TableEventParameters> {
         protected final SqlSchemaManagerImpl schemaHolder;
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
index cf5811b285..c78501322f 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
@@ -68,6 +68,8 @@ import org.apache.ignite.internal.schema.row.RowAssembler;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionCancelledException;
 import org.apache.ignite.internal.sql.engine.framework.NoOpTransaction;
 import 
org.apache.ignite.internal.sql.engine.planner.AbstractPlannerTest.TestHashIndex;
+import org.apache.ignite.internal.sql.engine.property.PropertiesHelper;
+import org.apache.ignite.internal.sql.engine.session.SessionId;
 import org.apache.ignite.internal.storage.DataStorageManager;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.table.InternalTable;
@@ -261,19 +263,22 @@ public class StopCalciteModuleTest {
 
         await(testRevisionRegister.moveRevision.apply(0L));
 
-        var cursors = qryProc.queryAsync(
-                "PUBLIC",
+        SessionId sessionId = 
qryProc.createSession(PropertiesHelper.emptyHolder());
+        QueryContext context = QueryContext.create(SqlQueryType.ALL);
+
+        var cursors = qryProc.querySingleAsync(
+                sessionId,
+                context,
                 "SELECT * FROM TEST"
         );
 
-        await(cursors.get(0).thenCompose(cursor -> 
cursor.requestNextAsync(1)));
+        await(cursors.thenCompose(cursor -> cursor.requestNextAsync(1)));
 
         assertTrue(isThereNodeThreads(NODE_NAME));
 
         qryProc.stop();
 
-        var request = cursors.get(0)
-                .thenCompose(cursor -> cursor.requestNextAsync(1));
+        var request = cursors.thenCompose(cursor -> 
cursor.requestNextAsync(1));
 
         // Check cursor closed.
         await(request.exceptionally(t -> {
@@ -286,8 +291,9 @@ public class StopCalciteModuleTest {
         assertTrue(request.isCompletedExceptionally());
 
         // Check execute query on stopped node.
-        assertTrue(assertThrows(IgniteInternalException.class, () -> 
qryProc.queryAsync(
-                "PUBLIC",
+        assertTrue(assertThrows(IgniteInternalException.class, () -> 
qryProc.querySingleAsync(
+                sessionId,
+                context,
                 "SELECT 1"
         )).getCause() instanceof NodeStoppingException);
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
index 06253e8b62..feac7aacb3 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
@@ -71,7 +71,11 @@ import 
org.apache.ignite.internal.schema.configuration.TableView;
 import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import org.apache.ignite.internal.sql.engine.AsyncCursor.BatchedResult;
 import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
+import org.apache.ignite.internal.sql.engine.QueryContext;
 import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
+import org.apache.ignite.internal.sql.engine.SqlQueryType;
+import org.apache.ignite.internal.sql.engine.property.PropertiesHelper;
+import org.apache.ignite.internal.sql.engine.session.SessionId;
 import org.apache.ignite.internal.storage.DataStorageManager;
 import org.apache.ignite.internal.storage.DataStorageModules;
 import org.apache.ignite.internal.storage.impl.TestDataStorageModule;
@@ -300,32 +304,37 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
     public void testCreateTable() {
         SqlQueryProcessor finalQueryProc = queryProc;
 
+        SessionId sessionId = 
queryProc.createSession(PropertiesHelper.emptyHolder());
+        QueryContext context = QueryContext.create(SqlQueryType.ALL);
+
         String curMethodName = getCurrentMethodName();
 
         String newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY, 
c2 varbinary(255)) "
                 + "with primary_zone='zone123'", curMethodName);
 
-        readFirst(queryProc.queryAsync("PUBLIC", newTblSql));
+        readFirst(queryProc.querySingleAsync(sessionId, context, newTblSql));
 
         assertTrue(tblManager.tables().stream().anyMatch(t -> t.name()
                 .equalsIgnoreCase(curMethodName)));
 
         String finalNewTblSql1 = newTblSql;
 
-        assertThrows(TableAlreadyExistsException.class, () -> 
readFirst(finalQueryProc.queryAsync("PUBLIC", finalNewTblSql1)));
+        assertThrows(TableAlreadyExistsException.class,
+                () -> readFirst(finalQueryProc.querySingleAsync(sessionId, 
context, finalNewTblSql1)));
 
         String finalNewTblSql2 = String.format("CREATE TABLE \"PUBLIC\".%s (c1 
int PRIMARY KEY, c2 varbinary(255)) "
                 + "with primary_zone='zone123'", curMethodName);
 
-        assertThrows(TableAlreadyExistsException.class, () -> 
readFirst(finalQueryProc.queryAsync("PUBLIC", finalNewTblSql2)));
+        assertThrows(TableAlreadyExistsException.class,
+                () -> readFirst(finalQueryProc.querySingleAsync(sessionId, 
context, finalNewTblSql2)));
 
-        assertThrows(SqlException.class, () -> 
readFirst(finalQueryProc.queryAsync("PUBLIC",
+        assertThrows(SqlException.class, () -> 
readFirst(finalQueryProc.querySingleAsync(sessionId, context,
                 "CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with 
partitions__wrong=1,primary_zone='zone123'")));
 
-        assertThrows(SqlException.class, () -> 
readFirst(finalQueryProc.queryAsync("PUBLIC",
+        assertThrows(SqlException.class, () -> 
readFirst(finalQueryProc.querySingleAsync(sessionId, context,
                 "CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with 
replicas__wrong=1,primary_zone='zone123'")));
 
-        assertThrows(SqlException.class, () -> 
readFirst(finalQueryProc.queryAsync("PUBLIC",
+        assertThrows(SqlException.class, () -> 
readFirst(finalQueryProc.querySingleAsync(sessionId, context,
                 "CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with 
primary_zone__wrong='zone123'")));
 
         newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 
varchar(255))",
@@ -333,7 +342,7 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
 
         String finalNewTblSql3 = newTblSql;
 
-        assertDoesNotThrow(() -> await(finalQueryProc.queryAsync("PUBLIC", 
finalNewTblSql3).get(0)));
+        assertDoesNotThrow(() -> 
readFirst(finalQueryProc.querySingleAsync(sessionId, context, 
finalNewTblSql3)));
     }
 
     /**
@@ -343,16 +352,19 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
     public void testCreateTableWithDistributionZone() {
         String tableName = getCurrentMethodName().toUpperCase();
 
+        SessionId sessionId = 
queryProc.createSession(PropertiesHelper.emptyHolder());
+        QueryContext context = QueryContext.create(SqlQueryType.ALL);
+
         String zoneName = "zone123";
 
         String newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY, 
c2 varbinary(255)) ",
                  tableName);
 
-        readFirst(queryProc.queryAsync("PUBLIC", newTblSql));
+        readFirst(queryProc.querySingleAsync(sessionId, context, newTblSql));
 
         assertEquals(DistributionZoneManager.DEFAULT_ZONE_ID, 
tblsCfg.tables().get(tableName).zoneId().value());
 
-        readFirst(queryProc.queryAsync("PUBLIC", "DROP TABLE " + tableName));
+        readFirst(queryProc.querySingleAsync(sessionId, context, "DROP TABLE " 
+ tableName));
 
         int zoneId = 
dstZnsCfg.distributionZones().get(zoneName).zoneId().value();
 
@@ -361,18 +373,18 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
         newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 
varbinary(255)) "
                 + "with primary_zone='%s'", tableName, zoneName);
 
-        readFirst(queryProc.queryAsync("PUBLIC", newTblSql));
+        readFirst(queryProc.querySingleAsync(sessionId, context, newTblSql));
 
         assertEquals(zoneId, tblsCfg.tables().get(tableName).zoneId().value());
 
-        readFirst(queryProc.queryAsync("PUBLIC", "DROP TABLE " + tableName));
+        readFirst(queryProc.querySingleAsync(sessionId, context, "DROP TABLE " 
+ tableName));
 
 
         
when(distributionZoneManager.getZoneId(zoneName)).thenThrow(DistributionZoneNotFoundException.class);
 
         Exception exception = assertThrows(
                 IgniteException.class,
-                () -> readFirst(queryProc.queryAsync("PUBLIC",
+                () -> readFirst(queryProc.querySingleAsync(sessionId, context,
                         String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 
varbinary(255)) "
                                 + "with primary_zone='%s'", tableName, 
zoneName)))
         );
@@ -387,26 +399,29 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
     public void testDropTable() {
         String curMethodName = getCurrentMethodName();
 
+        SessionId sessionId = 
queryProc.createSession(PropertiesHelper.emptyHolder());
+        QueryContext context = QueryContext.create(SqlQueryType.ALL);
+
         String newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY, 
c2 varchar(255))", curMethodName);
 
-        readFirst(queryProc.queryAsync("PUBLIC", newTblSql));
+        readFirst(queryProc.querySingleAsync(sessionId, context, newTblSql));
 
-        readFirst(queryProc.queryAsync("PUBLIC", "DROP TABLE " + 
curMethodName));
+        readFirst(queryProc.querySingleAsync(sessionId, context, "DROP TABLE " 
+ curMethodName));
 
         SqlQueryProcessor finalQueryProc = queryProc;
 
-        assertThrows(TableNotFoundException.class, () -> 
readFirst(finalQueryProc.queryAsync("PUBLIC",
+        assertThrows(TableNotFoundException.class, () -> 
readFirst(finalQueryProc.querySingleAsync(sessionId, context,
                 "DROP TABLE " + curMethodName + "_not_exist")));
 
-        assertThrows(TableNotFoundException.class, () -> 
readFirst(finalQueryProc.queryAsync("PUBLIC",
+        assertThrows(TableNotFoundException.class, () -> 
readFirst(finalQueryProc.querySingleAsync(sessionId, context,
                 "DROP TABLE " + curMethodName)));
 
-        assertThrows(TableNotFoundException.class, () -> 
readFirst(finalQueryProc.queryAsync("PUBLIC",
+        assertThrows(TableNotFoundException.class, () -> 
readFirst(finalQueryProc.querySingleAsync(sessionId, context,
                 "DROP TABLE PUBLIC." + curMethodName)));
 
-        readFirst(queryProc.queryAsync("PUBLIC", "DROP TABLE IF EXISTS 
PUBLIC." + curMethodName + "_not_exist"));
+        readFirst(queryProc.querySingleAsync(sessionId, context, "DROP TABLE 
IF EXISTS PUBLIC." + curMethodName + "_not_exist"));
 
-        readFirst(queryProc.queryAsync("PUBLIC", "DROP TABLE IF EXISTS 
PUBLIC." + curMethodName));
+        readFirst(queryProc.querySingleAsync(sessionId, context, "DROP TABLE 
IF EXISTS PUBLIC." + curMethodName));
 
         assertTrue(tblManager.tables().stream().noneMatch(t -> t.name()
                 .equalsIgnoreCase("PUBLIC." + curMethodName)));
@@ -416,8 +431,12 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
     void createTableWithTableOptions() {
         String method = getCurrentMethodName();
 
-        assertDoesNotThrow(() -> readFirst(queryProc.queryAsync(
-                "PUBLIC",
+        SessionId sessionId = 
queryProc.createSession(PropertiesHelper.emptyHolder());
+        QueryContext context = QueryContext.create(SqlQueryType.ALL);
+
+        assertDoesNotThrow(() -> readFirst(queryProc.querySingleAsync(
+                sessionId,
+                context,
                 String.format(
                         "CREATE TABLE %s (c1 int PRIMARY KEY, c2 
varbinary(255)) with primary_zone='zone123'",
                         method + 4
@@ -426,8 +445,9 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
 
         IgniteException exception = assertThrows(
                 IgniteException.class,
-                () -> readFirst(queryProc.queryAsync(
-                        "PUBLIC",
+                () -> readFirst(queryProc.querySingleAsync(
+                        sessionId,
+                        context,
                         String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 
varbinary(255)) with %s='%s'", method + 6, method, method)
                 ))
         );
@@ -439,8 +459,12 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
     void createTableWithDataStorageOptions() {
         String method = getCurrentMethodName();
 
-        assertDoesNotThrow(() -> readFirst(queryProc.queryAsync(
-                "PUBLIC",
+        SessionId sessionId = 
queryProc.createSession(PropertiesHelper.emptyHolder());
+        QueryContext context = QueryContext.create(SqlQueryType.ALL);
+
+        assertDoesNotThrow(() -> readFirst(queryProc.querySingleAsync(
+                sessionId,
+                context,
                 String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 
varbinary(255)) with dataRegion='default'", method + 0)
         )));
 
@@ -449,10 +473,13 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
                 equalTo(DEFAULT_DATA_REGION_NAME)
         );
 
-        assertDoesNotThrow(() -> readFirst(queryProc.queryAsync(
-                "PUBLIC",
-                String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 
varbinary(255)) with DATAREGION='test_region'", method + 1)
-        )));
+        assertDoesNotThrow(() -> readFirst(
+                queryProc.querySingleAsync(
+                        sessionId,
+                        context,
+                        String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 
varbinary(255)) with DATAREGION='test_region'", method + 1)
+                )
+        ));
 
         assertThat(
                 ((RocksDbDataStorageView) tableView(method + 
1).dataStorage()).dataRegion(),
@@ -549,8 +576,8 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
         return tableManager;
     }
 
-    private <T> BatchedResult<T> 
readFirst(List<CompletableFuture<AsyncSqlCursor<T>>> cursors) {
-        return await(await(cursors.get(0)).requestNextAsync(512));
+    private <T> BatchedResult<T> 
readFirst(CompletableFuture<AsyncSqlCursor<List<Object>>> cursors) {
+        return (BatchedResult<T>) await(await(cursors).requestNextAsync(512));
     }
 
     private @Nullable TableView tableView(String tableName) {


Reply via email to