This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new cbc84ebce0 IGNITE-21815 Client handler: use QueryProcessor instead of 
IgniteSql (#3459)
cbc84ebce0 is described below

commit cbc84ebce03c7554d8b9e09699f4884033ccaac1
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Thu Mar 21 17:09:36 2024 +0200

    IGNITE-21815 Client handler: use QueryProcessor instead of IgniteSql (#3459)
    
    Use internal `QueryProcessor` API in client handler instead of public 
`IgniteSql`:
    * Simplify `HybridTimestamp` propagation
    * Get rid of Session usage, reduce allocations and indirection
    * Unblock IGNITE-21669 Remove sessions from SQL API
---
 check-rules/spotbugs-excludes.xml                  |   5 +-
 .../apache/ignite/client/handler/TestServer.java   |   2 -
 .../ignite/client/handler/ClientHandlerModule.java |   9 -
 .../handler/ClientInboundMessageHandler.java       |  14 +-
 .../handler/requests/sql/ClientSqlCommon.java      |  50 +---
 .../requests/sql/ClientSqlExecuteBatchRequest.java |  86 +++---
 .../requests/sql/ClientSqlExecuteRequest.java      |  84 ++++--
 .../sql/ClientSqlExecuteScriptRequest.java         |  14 +-
 .../handler/requests/sql/ClientSqlProperties.java  |  68 +++++
 .../handler/requests/sql/ClientSqlResultSet.java   |  11 +-
 .../apache/ignite/client/ClientMetricsTest.java    |   4 +-
 .../org/apache/ignite/client/ClientSqlTest.java    |  21 +-
 .../ignite/client/TestClientHandlerModule.java     |   6 +-
 .../java/org/apache/ignite/client/TestServer.java  |   1 -
 .../ignite/client/fakes/FakeAsyncResultSet.java    | 251 ------------------
 .../org/apache/ignite/client/fakes/FakeCursor.java | 144 ++++++++--
 .../org/apache/ignite/client/fakes/FakeIgnite.java |   2 +-
 .../client/fakes/FakeIgniteQueryProcessor.java     |  30 ++-
 .../apache/ignite/client/fakes/FakeIgniteSql.java  |  52 ----
 .../ignite/client/fakes/FakeIgniteTables.java      |   4 +-
 .../apache/ignite/client/fakes/FakeSession.java    | 290 ---------------------
 .../ignite/client/fakes/FakeSessionBuilder.java    | 136 ----------
 .../org/apache/ignite/client/fakes/FakeSqlRow.java | 290 ---------------------
 .../runner/app/client/ItThinClientSqlTest.java     |  16 ++
 .../org/apache/ignite/internal/app/IgniteImpl.java |   1 -
 .../internal/sql/api/AsyncResultSetImpl.java       |  27 +-
 .../ignite/internal/sql/api/SessionImpl.java       | 247 ++++++++++++------
 27 files changed, 548 insertions(+), 1317 deletions(-)

diff --git a/check-rules/spotbugs-excludes.xml 
b/check-rules/spotbugs-excludes.xml
index c35cd0fc21..3d4554aca2 100644
--- a/check-rules/spotbugs-excludes.xml
+++ b/check-rules/spotbugs-excludes.xml
@@ -118,7 +118,10 @@
   <Match>
     <!-- this method has side effects -->
     <Bug pattern="RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT"/>
-    <Class name="org.apache.ignite.internal.client.table.AbstractClientView"/>
+    <Or>
+      <Class 
name="org.apache.ignite.internal.client.table.AbstractClientView"/>
+      <Class 
name="org.apache.ignite.client.handler.requests.sql.ClientSqlCommon"/>
+    </Or>
   </Match>
   <Match>
     <Bug pattern="MS_PKGPROTECT"/>
diff --git 
a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/TestServer.java
 
b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/TestServer.java
index 286e2a5d66..c0347f7a8e 100644
--- 
a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/TestServer.java
+++ 
b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/TestServer.java
@@ -40,7 +40,6 @@ import org.apache.ignite.internal.table.IgniteTablesInternal;
 import org.apache.ignite.internal.table.TestLowWatermark;
 import 
org.apache.ignite.internal.table.distributed.schema.AlwaysSyncedSchemaSyncService;
 import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
-import org.apache.ignite.sql.IgniteSql;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.TestInfo;
 import org.mockito.Mockito;
@@ -121,7 +120,6 @@ public class TestServer {
                 mock(IgniteComputeInternal.class),
                 clusterService,
                 bootstrapFactory,
-                mock(IgniteSql.class),
                 () -> 
CompletableFuture.completedFuture(ClusterTag.clusterTag(msgFactory, "Test 
Server")),
                 mock(MetricManager.class),
                 metrics,
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
index b5a7f88e58..18898f812b 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
@@ -62,7 +62,6 @@ import 
org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
 import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.lang.IgniteException;
-import org.apache.ignite.sql.IgniteSql;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
@@ -83,9 +82,6 @@ public class ClientHandlerModule implements IgniteComponent {
     /** Ignite transactions API. */
     private final IgniteTransactionsImpl igniteTransactions;
 
-    /** Ignite SQL API. */
-    private final IgniteSql sql;
-
     /** Cluster ID supplier. */
     private final Supplier<CompletableFuture<ClusterTag>> clusterTagSupplier;
 
@@ -140,7 +136,6 @@ public class ClientHandlerModule implements IgniteComponent 
{
      * @param igniteCompute Compute.
      * @param clusterService Cluster.
      * @param bootstrapFactory Bootstrap factory.
-     * @param sql SQL.
      * @param clusterTagSupplier ClusterTag supplier.
      * @param metricManager Metric manager.
      * @param authenticationManager Authentication manager.
@@ -155,7 +150,6 @@ public class ClientHandlerModule implements IgniteComponent 
{
             IgniteComputeInternal igniteCompute,
             ClusterService clusterService,
             NettyBootstrapFactory bootstrapFactory,
-            IgniteSql sql,
             Supplier<CompletableFuture<ClusterTag>> clusterTagSupplier,
             MetricManager metricManager,
             ClientHandlerMetricSource metrics,
@@ -172,7 +166,6 @@ public class ClientHandlerModule implements IgniteComponent 
{
         assert igniteCompute != null;
         assert clusterService != null;
         assert bootstrapFactory != null;
-        assert sql != null;
         assert clusterTagSupplier != null;
         assert metricManager != null;
         assert metrics != null;
@@ -190,7 +183,6 @@ public class ClientHandlerModule implements IgniteComponent 
{
         this.igniteCompute = igniteCompute;
         this.clusterService = clusterService;
         this.bootstrapFactory = bootstrapFactory;
-        this.sql = sql;
         this.clusterTagSupplier = clusterTagSupplier;
         this.metricManager = metricManager;
         this.metrics = metrics;
@@ -371,7 +363,6 @@ public class ClientHandlerModule implements IgniteComponent 
{
                 configuration,
                 igniteCompute,
                 clusterService,
-                sql,
                 clusterTag,
                 metrics,
                 authenticationManager,
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index a04e1f0c41..cbf3431fc4 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -136,7 +136,6 @@ import org.apache.ignite.lang.TraceableException;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.security.AuthenticationType;
 import 
org.apache.ignite.security.exception.UnsupportedAuthenticationTypeException;
-import org.apache.ignite.sql.IgniteSql;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -168,9 +167,6 @@ public class ClientInboundMessageHandler extends 
ChannelInboundHandlerAdapter im
     /** Cluster. */
     private final ClusterService clusterService;
 
-    /** SQL. */
-    private final IgniteSql sql;
-
     /** Query processor. */
     private final QueryProcessor queryProcessor;
 
@@ -216,7 +212,6 @@ public class ClientInboundMessageHandler extends 
ChannelInboundHandlerAdapter im
      * @param configuration Configuration.
      * @param compute Compute.
      * @param clusterService Cluster.
-     * @param sql SQL.
      * @param clusterTag Cluster tag.
      * @param metrics Metrics.
      * @param authenticationManager Authentication manager.
@@ -229,7 +224,6 @@ public class ClientInboundMessageHandler extends 
ChannelInboundHandlerAdapter im
             ClientConnectorView configuration,
             IgniteComputeInternal compute,
             ClusterService clusterService,
-            IgniteSql sql,
             CompletableFuture<ClusterTag> clusterTag,
             ClientHandlerMetricSource metrics,
             AuthenticationManager authenticationManager,
@@ -245,7 +239,6 @@ public class ClientInboundMessageHandler extends 
ChannelInboundHandlerAdapter im
         assert configuration != null;
         assert compute != null;
         assert clusterService != null;
-        assert sql != null;
         assert clusterTag != null;
         assert metrics != null;
         assert authenticationManager != null;
@@ -259,7 +252,6 @@ public class ClientInboundMessageHandler extends 
ChannelInboundHandlerAdapter im
         this.configuration = configuration;
         this.compute = compute;
         this.clusterService = clusterService;
-        this.sql = sql;
         this.queryProcessor = processor;
         this.clusterTag = clusterTag;
         this.metrics = metrics;
@@ -766,7 +758,7 @@ public class ClientInboundMessageHandler extends 
ChannelInboundHandlerAdapter im
                 return ClientClusterGetNodesRequest.process(out, 
clusterService);
 
             case ClientOp.SQL_EXEC:
-                return ClientSqlExecuteRequest.process(in, out, sql, 
resources, metrics, igniteTransactions);
+                return ClientSqlExecuteRequest.process(in, out, 
queryProcessor, resources, metrics, igniteTransactions);
 
             case ClientOp.SQL_CURSOR_NEXT_PAGE:
                 return ClientSqlCursorNextPageRequest.process(in, out, 
resources, igniteTransactions);
@@ -781,13 +773,13 @@ public class ClientInboundMessageHandler extends 
ChannelInboundHandlerAdapter im
                 return ClientJdbcFinishTxRequest.process(in, out, 
jdbcQueryEventHandler);
 
             case ClientOp.SQL_EXEC_SCRIPT:
-                return ClientSqlExecuteScriptRequest.process(in, sql, 
igniteTransactions);
+                return ClientSqlExecuteScriptRequest.process(in, 
queryProcessor, igniteTransactions);
 
             case ClientOp.SQL_QUERY_META:
                 return ClientSqlQueryMetadataRequest.process(in, out, 
queryProcessor, resources);
 
             case ClientOp.SQL_EXEC_BATCH:
-                return ClientSqlExecuteBatchRequest.process(in, out, sql, 
resources, metrics, igniteTransactions);
+                return ClientSqlExecuteBatchRequest.process(in, out, 
queryProcessor, resources, igniteTransactions);
 
             case ClientOp.STREAMER_BATCH_SEND:
                 return ClientStreamerBatchSendRequest.process(in, out, 
igniteTables);
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommon.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommon.java
index a92622dcb0..e4df80e201 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommon.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommon.java
@@ -20,22 +20,13 @@ package org.apache.ignite.client.handler.requests.sql;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
-import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
-import org.apache.ignite.internal.client.proto.ClientBinaryTupleUtils;
 import org.apache.ignite.internal.client.proto.ClientMessagePacker;
-import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
-import org.apache.ignite.internal.sql.api.SessionBuilderImpl;
 import org.apache.ignite.sql.ColumnMetadata;
 import org.apache.ignite.sql.ColumnMetadata.ColumnOrigin;
-import org.apache.ignite.sql.IgniteSql;
 import org.apache.ignite.sql.ResultSetMetadata;
-import org.apache.ignite.sql.Session;
-import org.apache.ignite.sql.Session.SessionBuilder;
 import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.sql.async.AsyncResultSet;
-import org.apache.ignite.tx.IgniteTransactions;
 
 /**
  * Common SQL request handling logic.
@@ -61,6 +52,7 @@ class ClientSqlCommon {
         }
 
         if (!asyncResultSet.hasMorePages()) {
+            // Close in background.
             asyncResultSet.closeAsync();
         }
     }
@@ -153,51 +145,13 @@ class ClientSqlCommon {
         }
     }
 
-    static Session readSession(ClientMessageUnpacker in, IgniteSql sql, 
IgniteTransactions transactions) {
-        SessionBuilder sessionBuilder = sql.sessionBuilder();
-
-        if (transactions != null && sessionBuilder instanceof 
SessionBuilderImpl) {
-            ((SessionBuilderImpl) 
sessionBuilder).igniteTransactions(transactions);
-        }
-
-        if (!in.tryUnpackNil()) {
-            sessionBuilder.defaultSchema(in.unpackString());
-        }
-
-        if (!in.tryUnpackNil()) {
-            sessionBuilder.defaultPageSize(in.unpackInt());
-        }
-
-        if (!in.tryUnpackNil()) {
-            sessionBuilder.defaultQueryTimeout(in.unpackLong(), 
TimeUnit.MILLISECONDS);
-        }
-
-        if (!in.tryUnpackNil()) {
-            sessionBuilder.idleTimeout(in.unpackLong(), TimeUnit.MILLISECONDS);
-        }
-
-        readSessionProperties(in, sessionBuilder);
-
-        return sessionBuilder.build();
-    }
-
-    private static void readSessionProperties(ClientMessageUnpacker in, 
SessionBuilder sessionBuilder) {
-        var propCount = in.unpackInt();
-        var reader = new BinaryTupleReader(propCount * 4, 
in.readBinaryUnsafe());
-
-        for (int i = 0; i < propCount; i++) {
-            // noinspection DataFlowIssue
-            sessionBuilder.property(reader.stringValue(i * 4), 
ClientBinaryTupleUtils.readObject(reader, i * 4 + 1));
-        }
-    }
-
     /**
      * Pack columns metadata.
      *
      * @param out Message packer.
      * @param cols Columns.
      */
-    public static void packColumns(ClientMessagePacker out, 
List<ColumnMetadata> cols) {
+    static void packColumns(ClientMessagePacker out, List<ColumnMetadata> 
cols) {
         out.packInt(cols.size());
 
         // In many cases there are multiple columns from the same table.
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java
index 5496b5f761..979fc81a38 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java
@@ -17,28 +17,21 @@
 
 package org.apache.ignite.client.handler.requests.sql;
 
-import static 
org.apache.ignite.client.handler.requests.sql.ClientSqlCommon.readSession;
 import static 
org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTx;
 
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.function.Function;
-import org.apache.ignite.client.handler.ClientHandlerMetricSource;
 import org.apache.ignite.client.handler.ClientResourceRegistry;
 import org.apache.ignite.internal.client.proto.ClientMessagePacker;
 import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.sql.api.SessionImpl;
+import org.apache.ignite.internal.sql.engine.QueryProcessor;
+import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
 import org.apache.ignite.internal.util.ArrayUtils;
 import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.sql.BatchedArguments;
-import org.apache.ignite.sql.IgniteSql;
-import org.apache.ignite.sql.ResultSetMetadata;
-import org.apache.ignite.sql.Session;
 import org.apache.ignite.sql.SqlBatchException;
-import org.apache.ignite.sql.Statement;
-import org.apache.ignite.sql.Statement.StatementBuilder;
-import org.jetbrains.annotations.Nullable;
 
 /**
  * Client SQL execute batch request.
@@ -51,21 +44,19 @@ public class ClientSqlExecuteBatchRequest {
      * @param out Packer.
      * @param sql SQL API.
      * @param resources Resources.
-     * @param metrics Metrics.
      * @param transactions Transactional facade. Used to acquire last observed 
time to propagate to client in response.
      * @return Future representing result of operation.
      */
     public static CompletableFuture<Void> process(
             ClientMessageUnpacker in,
             ClientMessagePacker out,
-            IgniteSql sql,
+            QueryProcessor sql,
             ClientResourceRegistry resources,
-            ClientHandlerMetricSource metrics,
             IgniteTransactionsImpl transactions
     ) {
-        var tx = readTx(in, out, resources);
-        Session session = readSession(in, sql, transactions);
-        Statement statement = readStatement(in, sql);
+        InternalTransaction tx = readTx(in, out, resources);
+        ClientSqlProperties props = new ClientSqlProperties(in);
+        String statement = in.unpackString();
         BatchedArguments arguments = 
in.unpackObjectArrayFromBinaryTupleArray();
 
         if (arguments == null) {
@@ -73,38 +64,46 @@ public class ClientSqlExecuteBatchRequest {
             arguments = BatchedArguments.of(ArrayUtils.OBJECT_EMPTY_ARRAY);
         }
 
-        // TODO IGNITE-20232 Propagate observable timestamp to sql engine 
using internal API.
         HybridTimestamp clientTs = 
HybridTimestamp.nullableHybridTimestamp(in.unpackLong());
-
         transactions.updateObservableTimestamp(clientTs);
 
-        return session
-                .executeBatchAsync(tx, statement.query(), arguments)
+        return SessionImpl.executeBatchCore(
+                sql,
+                        transactions,
+                        tx,
+                        statement,
+                        arguments,
+                        props.toSqlProps(),
+                        () -> true,
+                        () -> {},
+                        cursor -> 0,
+                        cursorId -> {})
                 .handle((affectedRows, ex) -> {
                     out.meta(transactions.observableTimestamp());
+
                     if (ex != null) {
                         var cause = ExceptionUtils.unwrapCause(ex.getCause());
 
                         if (cause instanceof SqlBatchException) {
                             var exBatch = ((SqlBatchException) cause);
-                            return writeBatchResultAsync(out, resources, 
exBatch.updateCounters(),
-                                    exBatch.errorCode(), exBatch.getMessage(), 
session, metrics);
+
+                            writeBatchResult(out, exBatch.updateCounters(), 
exBatch.errorCode(), exBatch.getMessage());
+                            return null;
                         }
+
                         affectedRows = ArrayUtils.LONG_EMPTY_ARRAY;
                     }
 
-                    return writeBatchResultAsync(out, resources, affectedRows, 
session, metrics);
-                }).thenCompose(Function.identity());
+                    writeBatchResult(out, affectedRows);
+                    return null;
+                });
     }
 
-    private static CompletionStage<Void> writeBatchResultAsync(
+    private static void writeBatchResult(
             ClientMessagePacker out,
-            ClientResourceRegistry resources,
             long[] affectedRows,
             Short errorCode,
-            String errorMessage,
-            Session session,
-            ClientHandlerMetricSource metrics) {
+            String errorMessage) {
         out.packNil(); // resourceId
 
         out.packBoolean(false); // has row set
@@ -113,16 +112,11 @@ public class ClientSqlExecuteBatchRequest {
         out.packLongArray(affectedRows); // affected rows
         out.packShort(errorCode); // error code
         out.packString(errorMessage); // error message
-
-        return session.closeAsync();
     }
 
-    private static CompletionStage<Void> writeBatchResultAsync(
+    private static void writeBatchResult(
             ClientMessagePacker out,
-            ClientResourceRegistry resources,
-            long[] affectedRows,
-            Session session,
-            ClientHandlerMetricSource metrics) {
+            long[] affectedRows) {
         out.packNil(); // resourceId
 
         out.packBoolean(false); // has row set
@@ -131,25 +125,5 @@ public class ClientSqlExecuteBatchRequest {
         out.packLongArray(affectedRows); // affected rows
         out.packNil(); // error code
         out.packNil(); // error message
-
-        return session.closeAsync();
-    }
-
-    private static Statement readStatement(ClientMessageUnpacker in, IgniteSql 
sql) {
-        StatementBuilder statementBuilder = sql.statementBuilder();
-
-        statementBuilder.query(in.unpackString());
-
-        return statementBuilder.build();
-    }
-
-    private static void packMeta(ClientMessagePacker out, @Nullable 
ResultSetMetadata meta) {
-        // TODO IGNITE-17179 metadata caching - avoid sending same meta over 
and over.
-        if (meta == null || meta.columns() == null) {
-            out.packInt(0);
-            return;
-        }
-
-        ClientSqlCommon.packColumns(out, meta.columns());
     }
 }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
index 605e644c74..6cf39ac297 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
@@ -18,11 +18,12 @@
 package org.apache.ignite.client.handler.requests.sql;
 
 import static 
org.apache.ignite.client.handler.requests.sql.ClientSqlCommon.packCurrentPage;
-import static 
org.apache.ignite.client.handler.requests.sql.ClientSqlCommon.readSession;
 import static 
org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTx;
+import static 
org.apache.ignite.internal.lang.SqlExceptionMapperUtil.mapToPublicSqlException;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.CompletionStage;
 import org.apache.ignite.client.handler.ClientHandlerMetricSource;
 import org.apache.ignite.client.handler.ClientResource;
@@ -32,14 +33,21 @@ import 
org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
 import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.sql.api.AsyncResultSetImpl;
+import org.apache.ignite.internal.sql.engine.QueryProcessor;
+import org.apache.ignite.internal.sql.engine.QueryProperty;
+import org.apache.ignite.internal.sql.engine.SqlQueryType;
+import org.apache.ignite.internal.sql.engine.property.SqlProperties;
+import org.apache.ignite.internal.sql.engine.property.SqlPropertiesHelper;
+import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
 import org.apache.ignite.internal.util.ArrayUtils;
-import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.sql.ResultSetMetadata;
-import org.apache.ignite.sql.Session;
-import org.apache.ignite.sql.Statement;
-import org.apache.ignite.sql.Statement.StatementBuilder;
+import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.sql.async.AsyncResultSet;
+import org.apache.ignite.tx.IgniteTransactions;
+import org.apache.ignite.tx.Transaction;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -61,14 +69,14 @@ public class ClientSqlExecuteRequest {
     public static CompletableFuture<Void> process(
             ClientMessageUnpacker in,
             ClientMessagePacker out,
-            IgniteSql sql,
+            QueryProcessor sql,
             ClientResourceRegistry resources,
             ClientHandlerMetricSource metrics,
             IgniteTransactionsImpl transactions
     ) {
-        var tx = readTx(in, out, resources);
-        Session session = readSession(in, sql, transactions);
-        Statement statement = readStatement(in, sql);
+        InternalTransaction tx = readTx(in, out, resources);
+        ClientSqlProperties props = new ClientSqlProperties(in);
+        String statement = in.unpackString();
         Object[] arguments = in.unpackObjectArrayFromBinaryTuple();
 
         if (arguments == null) {
@@ -76,17 +84,14 @@ public class ClientSqlExecuteRequest {
             arguments = ArrayUtils.OBJECT_EMPTY_ARRAY;
         }
 
-        // TODO IGNITE-20232 Propagate observable timestamp to sql engine 
using internal API.
         HybridTimestamp clientTs = 
HybridTimestamp.nullableHybridTimestamp(in.unpackLong());
-
         transactions.updateObservableTimestamp(clientTs);
 
-        return session
-                .executeAsync(tx, statement, arguments)
+        return executeAsync(tx, sql, transactions, statement, 
props.pageSize(), props.toSqlProps(), arguments)
                 .thenCompose(asyncResultSet -> {
                     out.meta(transactions.observableTimestamp());
 
-                    return writeResultSetAsync(out, resources, asyncResultSet, 
session, metrics);
+                    return writeResultSetAsync(out, resources, asyncResultSet, 
metrics);
                 });
     }
 
@@ -94,7 +99,6 @@ public class ClientSqlExecuteRequest {
             ClientMessagePacker out,
             ClientResourceRegistry resources,
             AsyncResultSet asyncResultSet,
-            Session session,
             ClientHandlerMetricSource metrics) {
         boolean hasResource = asyncResultSet.hasRowSet() && 
asyncResultSet.hasMorePages();
 
@@ -102,7 +106,7 @@ public class ClientSqlExecuteRequest {
             try {
                 metrics.cursorsActiveIncrement();
 
-                var clientResultSet = new ClientSqlResultSet(asyncResultSet, 
session, metrics);
+                var clientResultSet = new ClientSqlResultSet(asyncResultSet, 
metrics);
 
                 ClientResource resource = new ClientResource(
                         clientResultSet,
@@ -133,20 +137,12 @@ public class ClientSqlExecuteRequest {
 
             return hasResource
                     ? nullCompletedFuture()
-                    : asyncResultSet.closeAsync().thenCompose(res -> 
session.closeAsync());
+                    : asyncResultSet.closeAsync();
         } else {
-            return asyncResultSet.closeAsync().thenCompose(res -> 
session.closeAsync());
+            return asyncResultSet.closeAsync();
         }
     }
 
-    private static Statement readStatement(ClientMessageUnpacker in, IgniteSql 
sql) {
-        StatementBuilder statementBuilder = sql.statementBuilder();
-
-        statementBuilder.query(in.unpackString());
-
-        return statementBuilder.build();
-    }
-
     private static void packMeta(ClientMessagePacker out, @Nullable 
ResultSetMetadata meta) {
         // TODO IGNITE-17179 metadata caching - avoid sending same meta over 
and over.
         if (meta == null || meta.columns() == null) {
@@ -156,4 +152,40 @@ public class ClientSqlExecuteRequest {
 
         ClientSqlCommon.packColumns(out, meta.columns());
     }
+
+    private static CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
+            @Nullable Transaction transaction,
+            QueryProcessor qryProc,
+            IgniteTransactions transactions,
+            String query,
+            int pageSize,
+            SqlProperties props,
+            @Nullable Object... arguments
+    ) {
+        try {
+            SqlProperties properties = 
SqlPropertiesHelper.builderFromProperties(props)
+                    .set(QueryProperty.ALLOWED_QUERY_TYPES, 
SqlQueryType.SINGLE_STMT_TYPES)
+                    .build();
+
+            CompletableFuture<AsyncResultSet<SqlRow>> fut = 
qryProc.querySingleAsync(
+                            properties, transactions, (InternalTransaction) 
transaction, query, arguments)
+                    .thenCompose(cur -> cur.requestNextAsync(pageSize)
+                            .thenApply(
+                                    batchRes -> new AsyncResultSetImpl<>(
+                                            cur,
+                                            batchRes,
+                                            pageSize
+                                    )
+                            )
+                    );
+
+            return fut.exceptionally((th) -> {
+                Throwable cause = ExceptionUtils.unwrapCause(th);
+
+                throw new CompletionException(mapToPublicSqlException(cause));
+            });
+        } catch (Exception e) {
+            return CompletableFuture.failedFuture(mapToPublicSqlException(e));
+        }
+    }
 }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteScriptRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteScriptRequest.java
index 641f87890e..c6c8d9f450 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteScriptRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteScriptRequest.java
@@ -17,15 +17,13 @@
 
 package org.apache.ignite.client.handler.requests.sql;
 
-import static 
org.apache.ignite.client.handler.requests.sql.ClientSqlCommon.readSession;
-
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.sql.api.SessionImpl;
+import org.apache.ignite.internal.sql.engine.QueryProcessor;
 import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
 import org.apache.ignite.internal.util.ArrayUtils;
-import org.apache.ignite.sql.IgniteSql;
-import org.apache.ignite.sql.Session;
 
 /**
  * Client SQL execute script request.
@@ -40,10 +38,10 @@ public class ClientSqlExecuteScriptRequest {
      */
     public static CompletableFuture<Void> process(
             ClientMessageUnpacker in,
-            IgniteSql sql,
+            QueryProcessor sql,
             IgniteTransactionsImpl transactions
     ) {
-        Session session = readSession(in, sql, transactions);
+        ClientSqlProperties props = new ClientSqlProperties(in);
         String script = in.unpackString();
         Object[] arguments = in.unpackObjectArrayFromBinaryTuple();
 
@@ -52,11 +50,9 @@ public class ClientSqlExecuteScriptRequest {
             arguments = ArrayUtils.OBJECT_EMPTY_ARRAY;
         }
 
-        // TODO IGNITE-20232 Propagate observable timestamp to sql engine 
using internal API.
         HybridTimestamp clientTs = 
HybridTimestamp.nullableHybridTimestamp(in.unpackLong());
-
         transactions.updateObservableTimestamp(clientTs);
 
-        return session.executeScriptAsync(script, arguments);
+        return SessionImpl.executeScriptCore(sql, transactions, () -> true, () 
-> {}, script, arguments, props.toSqlProps());
     }
 }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlProperties.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlProperties.java
new file mode 100644
index 0000000000..7eefc2c904
--- /dev/null
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlProperties.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.handler.requests.sql;
+
+import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
+import org.apache.ignite.internal.sql.AbstractSession;
+import org.apache.ignite.internal.sql.engine.QueryProperty;
+import org.apache.ignite.internal.sql.engine.property.SqlProperties;
+import org.apache.ignite.internal.sql.engine.property.SqlPropertiesHelper;
+
+class ClientSqlProperties {
+    private final String schema;
+
+    private final int pageSize;
+
+    private final long queryTimeout;
+
+    private final long idleTimeout;
+
+    ClientSqlProperties(ClientMessageUnpacker in) {
+        schema = in.tryUnpackNil() ? AbstractSession.DEFAULT_SCHEMA : 
in.unpackString();
+        pageSize = in.tryUnpackNil() ? AbstractSession.DEFAULT_PAGE_SIZE : 
in.unpackInt();
+        queryTimeout = in.tryUnpackNil() ? 0 : in.unpackLong();
+        idleTimeout = in.tryUnpackNil() ? 0 : in.unpackLong();
+
+        // Skip properties - not used by SQL engine.
+        in.unpackInt(); // Number of properties.
+        in.readBinaryUnsafe(); // Binary tuple with properties
+    }
+
+    public String schema() {
+        return schema;
+    }
+
+    public int pageSize() {
+        return pageSize;
+    }
+
+    public long queryTimeout() {
+        return queryTimeout;
+    }
+
+    public long idleTimeout() {
+        return idleTimeout;
+    }
+
+    SqlProperties toSqlProps() {
+        return  SqlPropertiesHelper.newBuilder()
+                .set(QueryProperty.QUERY_TIMEOUT, queryTimeout)
+                .set(QueryProperty.DEFAULT_SCHEMA, schema)
+                .build();
+    }
+}
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlResultSet.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlResultSet.java
index 802cce0723..0712927247 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlResultSet.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlResultSet.java
@@ -22,7 +22,6 @@ import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.client.handler.ClientHandlerMetricSource;
-import org.apache.ignite.sql.Session;
 import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.sql.async.AsyncResultSet;
 
@@ -33,9 +32,6 @@ class ClientSqlResultSet {
     /** Result set. */
     private final AsyncResultSet<SqlRow> resultSet;
 
-    /** Session. */
-    private final Session session;
-
     /** Metrics. */
     private final ClientHandlerMetricSource metrics;
 
@@ -46,16 +42,13 @@ class ClientSqlResultSet {
      * Constructor.
      *
      * @param resultSet Result set.
-     * @param session Session.
      * @param metrics Metrics.
      */
-    ClientSqlResultSet(AsyncResultSet<SqlRow> resultSet, Session session, 
ClientHandlerMetricSource metrics) {
+    ClientSqlResultSet(AsyncResultSet<SqlRow> resultSet, 
ClientHandlerMetricSource metrics) {
         assert resultSet != null;
-        assert session != null;
         assert metrics != null;
 
         this.resultSet = resultSet;
-        this.session = session;
         this.metrics = metrics;
     }
 
@@ -77,7 +70,7 @@ class ClientSqlResultSet {
         if (closed.compareAndSet(false, true)) {
             metrics.cursorsActiveDecrement();
 
-            return resultSet.closeAsync().thenCompose(res -> 
session.closeAsync()).toCompletableFuture();
+            return resultSet.closeAsync();
         }
 
         return nullCompletedFuture();
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/ClientMetricsTest.java 
b/modules/client/src/test/java/org/apache/ignite/client/ClientMetricsTest.java
index 77f366c449..077b4f611d 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/ClientMetricsTest.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/ClientMetricsTest.java
@@ -32,8 +32,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import org.apache.ignite.client.IgniteClient.Builder;
 import org.apache.ignite.client.fakes.FakeIgnite;
+import org.apache.ignite.client.fakes.FakeIgniteQueryProcessor;
 import org.apache.ignite.client.fakes.FakeIgniteTables;
-import org.apache.ignite.client.fakes.FakeSession;
 import org.apache.ignite.internal.client.ClientMetricSource;
 import org.apache.ignite.internal.client.TcpIgniteClient;
 import org.apache.ignite.internal.metrics.AbstractMetricSource;
@@ -192,7 +192,7 @@ public class ClientMetricsTest extends 
BaseIgniteAbstractTest {
         assertThrowsSqlException(
                 Sql.STMT_VALIDATION_ERR,
                 "Query failed",
-                () -> client.sql().createSession().execute(null, 
FakeSession.FAILED_SQL));
+                () -> client.sql().createSession().execute(null, 
FakeIgniteQueryProcessor.FAILED_SQL));
 
         assertEquals(0, metrics().requestsActive());
         assertEquals(1, metrics().requestsFailed());
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/ClientSqlTest.java 
b/modules/client/src/test/java/org/apache/ignite/client/ClientSqlTest.java
index d27a970b79..0836a00b15 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/ClientSqlTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/ClientSqlTest.java
@@ -21,6 +21,7 @@ import static 
org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -97,8 +98,10 @@ public class ClientSqlTest extends AbstractClientTableTest {
         assertEquals("SCHEMA1", props.get("schema"));
         assertEquals("123000", props.get("timeout"));
         assertEquals("234", props.get("pageSize"));
-        assertEquals("1", props.get("prop1"));
-        assertEquals("2", props.get("prop2"));
+
+        // Properties are ignored by the SQL engine for now.
+        assertNull(props.get("prop1"));
+        assertNull(props.get("prop2"));
     }
 
     @Test
@@ -128,9 +131,11 @@ public class ClientSqlTest extends AbstractClientTableTest 
{
         assertEquals("SCHEMA2", props.get("schema"));
         assertEquals("124000", props.get("timeout"));
         assertEquals("235", props.get("pageSize"));
-        assertEquals("1", props.get("prop1"));
-        assertEquals("22", props.get("prop2"));
-        assertEquals("3", props.get("prop3"));
+
+        // Properties are ignored by the SQL engine for now.
+        assertNull(props.get("prop1"));
+        assertNull(props.get("prop2"));
+        assertNull(props.get("prop3"));
     }
 
     @Test
@@ -227,8 +232,7 @@ public class ClientSqlTest extends AbstractClientTableTest {
         SqlRow row = resultSet.next();
 
         assertEquals(
-                "foo, arguments: [], properties: [], defaultPageSize=null, 
defaultSchema=null, "
-                        + "defaultQueryTimeout=null, 
defaultSessionTimeout=null",
+                "foo, arguments: [], defaultSchema=PUBLIC, 
defaultQueryTimeout=0",
                 row.value(0));
     }
 
@@ -250,8 +254,7 @@ public class ClientSqlTest extends AbstractClientTableTest {
         SqlRow row = resultSet.next();
 
         assertEquals(
-                "do bar baz, arguments: [arg1, null, 2, ], properties: 
[prop2=-5, prop1=val1, prop3=null, ], "
-                        + "defaultPageSize=123, defaultSchema=script-schema, 
defaultQueryTimeout=456, defaultSessionTimeout=789000",
+                "do bar baz, arguments: [arg1, null, 2, ], 
defaultSchema=script-schema, defaultQueryTimeout=456",
                 row.value(0));
     }
 }
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
 
b/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
index e22a5c4c37..4e6dab3a72 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.client;
 
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
-import static org.mockito.Mockito.mock;
 
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.Channel;
@@ -35,6 +34,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 import org.apache.ignite.Ignite;
+import org.apache.ignite.client.fakes.FakeIgniteQueryProcessor;
 import org.apache.ignite.client.fakes.FakeInternalTable;
 import org.apache.ignite.client.handler.ClientHandlerMetricSource;
 import org.apache.ignite.client.handler.ClientInboundMessageHandler;
@@ -51,7 +51,6 @@ import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.network.NettyBootstrapFactory;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import 
org.apache.ignite.internal.security.authentication.AuthenticationManager;
-import org.apache.ignite.internal.sql.engine.QueryProcessor;
 import org.apache.ignite.internal.table.IgniteTablesInternal;
 import org.apache.ignite.internal.table.TestLowWatermark;
 import 
org.apache.ignite.internal.table.distributed.schema.AlwaysSyncedSchemaSyncService;
@@ -211,11 +210,10 @@ public class TestClientHandlerModule implements 
IgniteComponent {
                                 new ClientInboundMessageHandler(
                                         (IgniteTablesInternal) ignite.tables(),
                                         (IgniteTransactionsImpl) 
ignite.transactions(),
-                                        mock(QueryProcessor.class),
+                                        new FakeIgniteQueryProcessor(),
                                         configuration,
                                         compute,
                                         clusterService,
-                                        ignite.sql(),
                                         
CompletableFuture.completedFuture(clusterTag),
                                         metrics,
                                         authenticationManager,
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java 
b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
index ad9d640c5e..dae638f486 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
@@ -238,7 +238,6 @@ public class TestServer implements AutoCloseable {
                         compute,
                         clusterService,
                         bootstrapFactory,
-                        ignite.sql(),
                         () -> CompletableFuture.completedFuture(tag),
                         mock(MetricManager.class),
                         metrics,
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeAsyncResultSet.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeAsyncResultSet.java
deleted file mode 100644
index acb7d6d719..0000000000
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeAsyncResultSet.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.client.fakes;
-
-import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
-
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.time.Duration;
-import java.time.Instant;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.LocalTime;
-import java.time.Period;
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import org.apache.ignite.sql.ColumnMetadata;
-import org.apache.ignite.sql.ColumnType;
-import org.apache.ignite.sql.ResultSetMetadata;
-import org.apache.ignite.sql.Session;
-import org.apache.ignite.sql.SqlRow;
-import org.apache.ignite.sql.Statement;
-import org.apache.ignite.sql.async.AsyncResultSet;
-import org.apache.ignite.tx.Transaction;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Fake result set.
- */
-@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
-public class FakeAsyncResultSet implements AsyncResultSet {
-    private final Session session;
-
-    private final Transaction transaction;
-
-    private final Statement statement;
-
-    private final Object[] arguments;
-
-    private final List<SqlRow> rows;
-
-    private final List<ColumnMetadata> columns;
-
-    private final boolean hasMorePages;
-
-    private final FakeIgniteSql sql;
-
-    /**
-     * Constructor.
-     *
-     * @param session Session.
-     * @param transaction Transaction.
-     * @param statement Statement.
-     * @param arguments Arguments.
-     */
-    public FakeAsyncResultSet(Session session, Transaction transaction, 
Statement statement, Object[] arguments, FakeIgniteSql sql) {
-        assert session != null;
-        assert statement != null;
-
-        this.session = session;
-        this.transaction = transaction;
-        this.statement = statement;
-        this.arguments = arguments;
-        this.sql = sql;
-
-        hasMorePages = session.property("hasMorePages") != null;
-
-        if ("SELECT PROPS".equals(statement.query())) {
-            rows = new ArrayList<>();
-
-            rows.add(getRow("schema", session.defaultSchema()));
-            rows.add(getRow("timeout", 
String.valueOf(session.defaultQueryTimeout(TimeUnit.MILLISECONDS))));
-            rows.add(getRow("pageSize", 
String.valueOf(session.defaultPageSize())));
-
-            var props = ((FakeSession) session).properties();
-
-            for (var e : props.entrySet()) {
-                rows.add(getRow(e.getKey(), e.getValue()));
-            }
-
-            columns = new ArrayList<>();
-
-            columns.add(new FakeColumnMetadata("name", ColumnType.STRING));
-            columns.add(new FakeColumnMetadata("val", ColumnType.STRING));
-        } else if ("SELECT META".equals(statement.query())) {
-            columns = new ArrayList<>();
-
-            columns.add(new FakeColumnMetadata("BOOL", ColumnType.BOOLEAN));
-            columns.add(new FakeColumnMetadata("INT8", ColumnType.INT8));
-            columns.add(new FakeColumnMetadata("INT16", ColumnType.INT16));
-            columns.add(new FakeColumnMetadata("INT32", ColumnType.INT32));
-            columns.add(new FakeColumnMetadata("INT64", ColumnType.INT64));
-            columns.add(new FakeColumnMetadata("FLOAT", ColumnType.FLOAT));
-            columns.add(new FakeColumnMetadata("DOUBLE", ColumnType.DOUBLE));
-            columns.add(new FakeColumnMetadata("DECIMAL", ColumnType.DECIMAL, 
1, 2,
-                    true, new ColumnOrigin("SCHEMA1", "TBL2", "BIG_DECIMAL")));
-            columns.add(new FakeColumnMetadata("DATE", ColumnType.DATE));
-            columns.add(new FakeColumnMetadata("TIME", ColumnType.TIME));
-            columns.add(new FakeColumnMetadata("DATETIME", 
ColumnType.DATETIME));
-            columns.add(new FakeColumnMetadata("TIMESTAMP", 
ColumnType.TIMESTAMP));
-            columns.add(new FakeColumnMetadata("UUID", ColumnType.UUID));
-            columns.add(new FakeColumnMetadata("BITMASK", ColumnType.BITMASK));
-            columns.add(new FakeColumnMetadata("BYTE_ARRAY", 
ColumnType.BYTE_ARRAY));
-            columns.add(new FakeColumnMetadata("PERIOD", ColumnType.PERIOD));
-            columns.add(new FakeColumnMetadata("DURATION", 
ColumnType.DURATION));
-            columns.add(new FakeColumnMetadata("NUMBER", ColumnType.NUMBER));
-
-            var row = getRow(
-                    true,
-                    Byte.MIN_VALUE,
-                    Short.MIN_VALUE,
-                    Integer.MIN_VALUE,
-                    Long.MIN_VALUE,
-                    1.3f,
-                    1.4d,
-                    BigDecimal.valueOf(145),
-                    LocalDate.of(2001, 2, 3),
-                    LocalTime.of(4, 5),
-                    LocalDateTime.of(2001, 3, 4, 5, 6),
-                    Instant.ofEpochSecond(987),
-                    new UUID(0, 0),
-                    BitSet.valueOf(new byte[0]),
-                    new byte[1],
-                    Period.of(10, 9, 8),
-                    Duration.ofDays(11),
-                    BigInteger.valueOf(42));
-
-            rows = List.of(row);
-        } else if ("SELECT LAST SCRIPT".equals(statement.query())) {
-            rows = List.of(getRow(sql.lastScript));
-            columns = List.of(new FakeColumnMetadata("script", 
ColumnType.STRING));
-        } else {
-            rows = List.of(getRow(1));
-            columns = List.of(new FakeColumnMetadata("col1", 
ColumnType.INT32));
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public @Nullable ResultSetMetadata metadata() {
-        return new ResultSetMetadata() {
-            @Override
-            public List<ColumnMetadata> columns() {
-                return columns;
-            }
-
-            @Override
-            public int indexOf(String columnName) {
-                return 0;
-            }
-        };
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public boolean hasRowSet() {
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public long affectedRows() {
-        return 0;
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public boolean wasApplied() {
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public Iterable<SqlRow> currentPage() {
-        return rows;
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public int currentPageSize() {
-        return rows.size();
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public CompletableFuture<? extends AsyncResultSet> fetchNextPage() {
-        return CompletableFuture.completedFuture(new 
FakeAsyncResultSet(session, transaction, statement, arguments, sql));
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public boolean hasMorePages() {
-        return hasMorePages;
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public CompletableFuture<Void> closeAsync() {
-        return nullCompletedFuture();
-    }
-
-    private SqlRow getRow(Object... vals) {
-        return new FakeSqlRow(List.of(vals), metadata());
-    }
-
-    private static class ColumnOrigin implements ColumnMetadata.ColumnOrigin {
-        private final String schemaName;
-        private final String tableName;
-        private final String columnName;
-
-        public ColumnOrigin(String schemaName, String tableName, String 
columnName) {
-            this.schemaName = schemaName;
-            this.tableName = tableName;
-            this.columnName = columnName;
-        }
-
-        @Override
-        public String schemaName() {
-            return schemaName;
-        }
-
-        @Override
-        public String tableName() {
-            return tableName;
-        }
-
-        @Override
-        public String columnName() {
-            return columnName;
-        }
-    }
-}
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCursor.java 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCursor.java
index 77b882cd9b..159099f5c6 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCursor.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCursor.java
@@ -17,27 +17,99 @@
 
 package org.apache.ignite.client.fakes;
 
+import static 
org.apache.ignite.internal.sql.engine.QueryProperty.DEFAULT_SCHEMA;
+import static 
org.apache.ignite.internal.sql.engine.QueryProperty.QUERY_TIMEOUT;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.Period;
 import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collections;
 import java.util.List;
-import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
 import org.apache.ignite.internal.sql.engine.InternalSqlRow;
 import org.apache.ignite.internal.sql.engine.SqlQueryType;
+import org.apache.ignite.internal.sql.engine.property.SqlProperties;
 import org.apache.ignite.internal.sql.engine.util.ListToInternalSqlRowAdapter;
+import org.apache.ignite.sql.ColumnMetadata;
+import org.apache.ignite.sql.ColumnType;
 import org.apache.ignite.sql.ResultSetMetadata;
 
 /**
  * Fake {@link AsyncSqlCursor}.
  */
 public class FakeCursor implements AsyncSqlCursor<InternalSqlRow> {
-    private final Random random;
-
-    FakeCursor() {
-        random = new Random();
+    private final String qry;
+    private final List<ColumnMetadata> columns = new ArrayList<>();
+    private final List<InternalSqlRow> rows = new ArrayList<>();
+
+    FakeCursor(String qry, SqlProperties properties, Object[] params, 
FakeIgniteQueryProcessor proc) {
+        this.qry = qry;
+
+        if ("SELECT PROPS".equals(qry)) {
+            columns.add(new FakeColumnMetadata("name", ColumnType.STRING));
+            columns.add(new FakeColumnMetadata("val", ColumnType.STRING));
+
+            rows.add(getRow("schema", properties.get(DEFAULT_SCHEMA)));
+            rows.add(getRow("timeout", 
String.valueOf(properties.get(QUERY_TIMEOUT))));
+        } else if ("SELECT META".equals(qry)) {
+            columns.add(new FakeColumnMetadata("BOOL", ColumnType.BOOLEAN));
+            columns.add(new FakeColumnMetadata("INT8", ColumnType.INT8));
+            columns.add(new FakeColumnMetadata("INT16", ColumnType.INT16));
+            columns.add(new FakeColumnMetadata("INT32", ColumnType.INT32));
+            columns.add(new FakeColumnMetadata("INT64", ColumnType.INT64));
+            columns.add(new FakeColumnMetadata("FLOAT", ColumnType.FLOAT));
+            columns.add(new FakeColumnMetadata("DOUBLE", ColumnType.DOUBLE));
+            columns.add(new FakeColumnMetadata("DECIMAL", ColumnType.DECIMAL, 
1, 2,
+                    true, new ColumnOrigin("SCHEMA1", "TBL2", "BIG_DECIMAL")));
+            columns.add(new FakeColumnMetadata("DATE", ColumnType.DATE));
+            columns.add(new FakeColumnMetadata("TIME", ColumnType.TIME));
+            columns.add(new FakeColumnMetadata("DATETIME", 
ColumnType.DATETIME));
+            columns.add(new FakeColumnMetadata("TIMESTAMP", 
ColumnType.TIMESTAMP));
+            columns.add(new FakeColumnMetadata("UUID", ColumnType.UUID));
+            columns.add(new FakeColumnMetadata("BITMASK", ColumnType.BITMASK));
+            columns.add(new FakeColumnMetadata("BYTE_ARRAY", 
ColumnType.BYTE_ARRAY));
+            columns.add(new FakeColumnMetadata("PERIOD", ColumnType.PERIOD));
+            columns.add(new FakeColumnMetadata("DURATION", 
ColumnType.DURATION));
+            columns.add(new FakeColumnMetadata("NUMBER", ColumnType.NUMBER));
+
+            var row = getRow(
+                    true,
+                    Byte.MIN_VALUE,
+                    Short.MIN_VALUE,
+                    Integer.MIN_VALUE,
+                    Long.MIN_VALUE,
+                    1.3f,
+                    1.4d,
+                    BigDecimal.valueOf(145),
+                    LocalDate.of(2001, 2, 3),
+                    LocalTime.of(4, 5),
+                    LocalDateTime.of(2001, 3, 4, 5, 6),
+                    Instant.ofEpochSecond(987),
+                    new UUID(0, 0),
+                    BitSet.valueOf(new byte[0]),
+                    new byte[1],
+                    Period.of(10, 9, 8),
+                    Duration.ofDays(11),
+                    BigInteger.valueOf(42));
+
+            rows.add(row);
+        } else if ("SELECT LAST SCRIPT".equals(qry)) {
+            rows.add(getRow(proc.lastScript));
+            columns.add(new FakeColumnMetadata("script", ColumnType.STRING));
+        } else {
+            rows.add(getRow(1));
+            columns.add(new FakeColumnMetadata("col1", ColumnType.INT32));
+        }
     }
 
     @Override
@@ -47,18 +119,10 @@ public class FakeCursor implements 
AsyncSqlCursor<InternalSqlRow> {
 
     @Override
     public CompletableFuture<BatchedResult<InternalSqlRow>> 
requestNextAsync(int rows) {
-        var batch = new ArrayList<InternalSqlRow>();
-
-        for (int i = 0; i < rows; i++) {
-            List<Object> row = new ArrayList<>();
-            row.add(random.nextInt());
-            row.add(random.nextLong());
-            row.add(random.nextFloat());
-            row.add(random.nextDouble());
-            row.add(UUID.randomUUID().toString());
-            row.add(null);
-
-            batch.add(new ListToInternalSqlRowAdapter(row));
+        var batch = new ArrayList<>(this.rows);
+
+        if ("SELECT PROPS".equals(qry)) {
+            batch.add(getRow("pageSize", String.valueOf(rows)));
         }
 
         return CompletableFuture.completedFuture(new BatchedResult<>(batch, 
true));
@@ -71,7 +135,17 @@ public class FakeCursor implements 
AsyncSqlCursor<InternalSqlRow> {
 
     @Override
     public ResultSetMetadata metadata() {
-        return null;
+        return new ResultSetMetadata() {
+            @Override
+            public List<ColumnMetadata> columns() {
+                return columns;
+            }
+
+            @Override
+            public int indexOf(String columnName) {
+                return 0;
+            }
+        };
     }
 
     @Override
@@ -93,4 +167,38 @@ public class FakeCursor implements 
AsyncSqlCursor<InternalSqlRow> {
     public CompletableFuture<Void> onFirstPageReady() {
         throw new UnsupportedOperationException();
     }
+
+    private static InternalSqlRow getRow(Object... vals) {
+        var list = new ArrayList<>(vals.length);
+        Collections.addAll(list, vals);
+
+        return new ListToInternalSqlRowAdapter(list);
+    }
+
+    private static class ColumnOrigin implements ColumnMetadata.ColumnOrigin {
+        private final String schemaName;
+        private final String tableName;
+        private final String columnName;
+
+        private ColumnOrigin(String schemaName, String tableName, String 
columnName) {
+            this.schemaName = schemaName;
+            this.tableName = tableName;
+            this.columnName = columnName;
+        }
+
+        @Override
+        public String schemaName() {
+            return schemaName;
+        }
+
+        @Override
+        public String tableName() {
+            return tableName;
+        }
+
+        @Override
+        public String columnName() {
+            return columnName;
+        }
+    }
 }
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java
index 0b9d0da870..4fcc3818ca 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java
@@ -85,7 +85,7 @@ public class FakeIgnite implements Ignite {
     /** {@inheritDoc} */
     @Override
     public IgniteSql sql() {
-        return new FakeIgniteSql();
+        throw new UnsupportedOperationException("Not implemented yet");
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
index fbdfc5ffb2..3c7b99c093 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
@@ -17,7 +17,10 @@
 
 package org.apache.ignite.client.fakes;
 
+import static 
org.apache.ignite.internal.sql.engine.QueryProperty.DEFAULT_SCHEMA;
+import static 
org.apache.ignite.internal.sql.engine.QueryProperty.QUERY_TIMEOUT;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.lang.ErrorGroups.Sql.STMT_VALIDATION_ERR;
 
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
@@ -26,6 +29,7 @@ import org.apache.ignite.internal.sql.engine.QueryProcessor;
 import org.apache.ignite.internal.sql.engine.prepare.QueryMetadata;
 import org.apache.ignite.internal.sql.engine.property.SqlProperties;
 import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.sql.SqlException;
 import org.apache.ignite.tx.IgniteTransactions;
 import org.jetbrains.annotations.Nullable;
 
@@ -33,6 +37,10 @@ import org.jetbrains.annotations.Nullable;
  * Fake {@link QueryProcessor}.
  */
 public class FakeIgniteQueryProcessor implements QueryProcessor {
+    public static final String FAILED_SQL = "SELECT FAIL";
+
+    String lastScript;
+
     @Override
     public CompletableFuture<QueryMetadata> prepareSingleAsync(SqlProperties 
properties,
             @Nullable InternalTransaction transaction, String qry, Object... 
params) {
@@ -47,7 +55,11 @@ public class FakeIgniteQueryProcessor implements 
QueryProcessor {
             String qry,
             Object... params
     ) {
-        return CompletableFuture.completedFuture(new FakeCursor());
+        if (FAILED_SQL.equals(qry)) {
+            return CompletableFuture.failedFuture(new 
SqlException(STMT_VALIDATION_ERR, "Query failed"));
+        }
+
+        return CompletableFuture.completedFuture(new FakeCursor(qry, 
properties, params, this));
     }
 
     @Override
@@ -58,7 +70,21 @@ public class FakeIgniteQueryProcessor implements 
QueryProcessor {
             String qry,
             Object... params
     ) {
-        throw new UnsupportedOperationException();
+        var sb = new StringBuilder(qry);
+
+        sb.append(", arguments: [");
+
+        for (Object arg : params) {
+            sb.append(arg).append(", ");
+        }
+
+        sb.append(']').append(", ")
+                
.append("defaultSchema=").append(properties.get(DEFAULT_SCHEMA)).append(", ")
+                
.append("defaultQueryTimeout=").append(properties.get(QUERY_TIMEOUT));
+
+        lastScript = sb.toString();
+
+        return CompletableFuture.completedFuture(new FakeCursor(qry, 
properties, params, this));
     }
 
     @Override
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteSql.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteSql.java
deleted file mode 100644
index 7095c58fd5..0000000000
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteSql.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.client.fakes;
-
-import org.apache.ignite.internal.client.sql.ClientStatementBuilder;
-import org.apache.ignite.sql.IgniteSql;
-import org.apache.ignite.sql.Session;
-import org.apache.ignite.sql.Session.SessionBuilder;
-import org.apache.ignite.sql.Statement;
-import org.apache.ignite.sql.Statement.StatementBuilder;
-
-/**
- * Fake SQL implementation.
- */
-public class FakeIgniteSql implements IgniteSql {
-    String lastScript;
-
-    @Override
-    public Session createSession() {
-        return sessionBuilder().build();
-    }
-
-    @Override
-    public SessionBuilder sessionBuilder() {
-        return new FakeSessionBuilder(this);
-    }
-
-    @Override
-    public Statement createStatement(String query) {
-        return statementBuilder().build();
-    }
-
-    @Override
-    public StatementBuilder statementBuilder() {
-        return new ClientStatementBuilder();
-    }
-}
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java
index b8d58c7028..2179649d60 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.client.fakes;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.mockito.Mockito.mock;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -38,6 +39,7 @@ import 
org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.type.NativeTypes;
 import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.sql.IgniteSql;
 import org.apache.ignite.table.Table;
 import org.jetbrains.annotations.Nullable;
 
@@ -220,7 +222,7 @@ public class FakeIgniteTables implements 
IgniteTablesInternal {
                         return 
completedFuture(schemaReg.lastKnownSchemaVersion());
                     }
                 },
-                new FakeIgniteSql(),
+                mock(IgniteSql.class),
                 -1
         );
     }
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSession.java 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSession.java
deleted file mode 100644
index 7a87b387a1..0000000000
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSession.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.client.fakes;
-
-import static org.apache.ignite.internal.client.ClientUtils.sync;
-import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
-import static org.apache.ignite.lang.ErrorGroups.Sql.STMT_VALIDATION_ERR;
-
-import java.time.ZoneId;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Flow.Publisher;
-import java.util.concurrent.TimeUnit;
-import org.apache.ignite.internal.sql.AbstractSession;
-import org.apache.ignite.sql.BatchedArguments;
-import org.apache.ignite.sql.SqlException;
-import org.apache.ignite.sql.SqlRow;
-import org.apache.ignite.sql.Statement;
-import org.apache.ignite.sql.async.AsyncResultSet;
-import org.apache.ignite.sql.reactive.ReactiveResultSet;
-import org.apache.ignite.table.mapper.Mapper;
-import org.apache.ignite.tx.Transaction;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Client SQL session.
- */
-public class FakeSession implements AbstractSession {
-    public static final String FAILED_SQL = "SELECT FAIL";
-
-    @Nullable
-    private final Integer defaultPageSize;
-
-    @Nullable
-    private final String defaultSchema;
-
-    @Nullable
-    private final Long defaultQueryTimeout;
-
-    @Nullable
-    private final Long defaultSessionTimeout;
-
-    @Nullable
-    private final Map<String, Object> properties;
-
-    private final FakeIgniteSql sql;
-
-    /**
-     * Constructor.
-     *
-     * @param defaultPageSize Default page size.
-     * @param defaultSchema Default schema.
-     * @param defaultQueryTimeout Default timeout.
-     * @param properties Properties.
-     * @param sql SQL.
-     */
-    @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
-    public FakeSession(
-            @Nullable Integer defaultPageSize,
-            @Nullable String defaultSchema,
-            @Nullable Long defaultQueryTimeout,
-            @Nullable Long defaultSessionTimeout,
-            @Nullable Map<String, Object> properties,
-            FakeIgniteSql sql) {
-        this.defaultPageSize = defaultPageSize;
-        this.defaultSchema = defaultSchema;
-        this.defaultQueryTimeout = defaultQueryTimeout;
-        this.defaultSessionTimeout = defaultSessionTimeout;
-        this.properties = properties;
-        this.sql = sql;
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
-            @Nullable Transaction transaction,
-            String query,
-            @Nullable Object... arguments) {
-        throw new UnsupportedOperationException();
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
-            @Nullable Transaction transaction,
-            Statement statement,
-            @Nullable Object... arguments) {
-        Objects.requireNonNull(statement);
-
-        if (FAILED_SQL.equals(statement.query())) {
-            return CompletableFuture.failedFuture(new 
SqlException(STMT_VALIDATION_ERR, "Query failed"));
-        }
-
-        return CompletableFuture.completedFuture(new FakeAsyncResultSet(this, 
transaction, statement, arguments, sql));
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(@Nullable 
Transaction transaction, @Nullable Mapper<T> mapper,
-            String query, @Nullable Object... arguments) {
-        throw new UnsupportedOperationException();
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
-            @Nullable Transaction transaction,
-            @Nullable Mapper<T> mapper,
-            Statement statement,
-            @Nullable Object... arguments) {
-        throw new UnsupportedOperationException();
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public ReactiveResultSet executeReactive(@Nullable Transaction 
transaction, String query, @Nullable Object... arguments) {
-        throw new UnsupportedOperationException();
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public ReactiveResultSet executeReactive(@Nullable Transaction 
transaction, Statement statement, @Nullable Object... arguments) {
-        throw new UnsupportedOperationException();
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public long[] executeBatch(@Nullable Transaction transaction, String 
dmlQuery, BatchedArguments batch) {
-        throw new UnsupportedOperationException();
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public long[] executeBatch(@Nullable Transaction transaction, Statement 
dmlStatement, BatchedArguments batch) {
-        throw new UnsupportedOperationException();
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction 
transaction, String query, BatchedArguments batch) {
-        throw new UnsupportedOperationException();
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction 
transaction, Statement statement, BatchedArguments batch) {
-        throw new UnsupportedOperationException();
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public Publisher<Long> executeBatchReactive(@Nullable Transaction 
transaction, String query, BatchedArguments batch) {
-        throw new UnsupportedOperationException();
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public Publisher<Long> executeBatchReactive(@Nullable Transaction 
transaction, Statement statement, BatchedArguments batch) {
-        throw new UnsupportedOperationException();
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public void executeScript(String query, @Nullable Object... arguments) {
-        throw new UnsupportedOperationException();
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public CompletableFuture<Void> executeScriptAsync(String query, @Nullable 
Object... arguments) {
-        var sb = new StringBuilder(query);
-
-        if (arguments != null) {
-            sb.append(", arguments: [");
-
-            for (Object arg : arguments) {
-                sb.append(arg).append(", ");
-            }
-
-            sb.append(']');
-        }
-
-        if (properties != null) {
-            sb.append(", properties: [");
-
-            for (Map.Entry<String, Object> entry : properties.entrySet()) {
-                
sb.append(entry.getKey()).append('=').append(entry.getValue()).append(", ");
-            }
-
-            sb.append(']');
-        }
-
-        sb.append(", ").append("defaultPageSize=").append(defaultPageSize)
-                .append(", ").append("defaultSchema=").append(defaultSchema)
-                .append(", 
").append("defaultQueryTimeout=").append(defaultQueryTimeout)
-                .append(", 
").append("defaultSessionTimeout=").append(defaultSessionTimeout);
-
-        sql.lastScript = sb.toString();
-
-        return nullCompletedFuture();
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public long defaultQueryTimeout(TimeUnit timeUnit) {
-        return defaultQueryTimeout;
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public long idleTimeout(TimeUnit timeUnit) {
-        return defaultSessionTimeout;
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public String defaultSchema() {
-        return defaultSchema;
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public int defaultPageSize() {
-        return defaultPageSize;
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public ZoneId timeZoneId() {
-        // TODO https://issues.apache.org/jira/browse/IGNITE-21568
-        throw new UnsupportedOperationException();
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public @Nullable Object property(String name) {
-        return properties == null ? null : properties.get(name);
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public void close() {
-        sync(closeAsync());
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public CompletableFuture<Void> closeAsync() {
-        return nullCompletedFuture();
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public Publisher<Void> closeReactive() {
-        throw new UnsupportedOperationException();
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public boolean closed() {
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public SessionBuilder toBuilder() {
-        return null;
-    }
-
-    public Map<String, Object> properties() {
-        // noinspection AssignmentOrReturnOfFieldWithMutableType
-        return properties;
-    }
-}
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSessionBuilder.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSessionBuilder.java
deleted file mode 100644
index c704377a13..0000000000
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSessionBuilder.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.client.fakes;
-
-import java.time.ZoneId;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.TimeUnit;
-import org.apache.ignite.sql.Session;
-import org.apache.ignite.sql.Session.SessionBuilder;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Client SQL session builder.
- */
-public class FakeSessionBuilder implements SessionBuilder {
-    private final Map<String, Object> properties = new HashMap<>();
-
-    private final FakeIgniteSql sql;
-
-    private String defaultSchema;
-
-    private Long defaultQueryTimeoutMs;
-
-    private Long defaultSessionTimeoutMs;
-
-    private Integer pageSize;
-
-    public FakeSessionBuilder(FakeIgniteSql sql) {
-        this.sql = sql;
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public long defaultQueryTimeout(TimeUnit timeUnit) {
-        Objects.requireNonNull(timeUnit);
-
-        return timeUnit.convert(defaultQueryTimeoutMs == null ? 0 : 
defaultQueryTimeoutMs, TimeUnit.MILLISECONDS);
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public SessionBuilder defaultQueryTimeout(long timeout, TimeUnit timeUnit) 
{
-        Objects.requireNonNull(timeUnit);
-
-        defaultQueryTimeoutMs = TimeUnit.MILLISECONDS.convert(timeout, 
timeUnit);
-
-        return this;
-    }
-
-    @Override
-    public long idleTimeout(TimeUnit timeUnit) {
-        Objects.requireNonNull(timeUnit);
-
-        return timeUnit.convert(defaultSessionTimeoutMs == null ? 0 : 
defaultSessionTimeoutMs, TimeUnit.MILLISECONDS);
-    }
-
-    @Override
-    public SessionBuilder idleTimeout(long timeout, TimeUnit timeUnit) {
-        Objects.requireNonNull(timeUnit);
-
-        defaultSessionTimeoutMs = TimeUnit.MILLISECONDS.convert(timeout, 
timeUnit);
-
-        return this;
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public String defaultSchema() {
-        return defaultSchema;
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public SessionBuilder defaultSchema(String schema) {
-        defaultSchema = schema;
-
-        return this;
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public int defaultPageSize() {
-        return pageSize == null ? 0 : pageSize;
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public SessionBuilder defaultPageSize(int pageSize) {
-        this.pageSize = pageSize;
-
-        return this;
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public SessionBuilder timeZoneId(ZoneId timeZoneId) {
-        throw new UnsupportedOperationException();
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public @Nullable Object property(String name) {
-        return properties.get(name);
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public SessionBuilder property(String name, @Nullable Object value) {
-        properties.put(name, value);
-
-        return this;
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public Session build() {
-        return new FakeSession(pageSize, defaultSchema, defaultQueryTimeoutMs, 
defaultSessionTimeoutMs, new HashMap<>(properties), sql);
-    }
-}
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSqlRow.java 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSqlRow.java
deleted file mode 100644
index 6c6652f3ab..0000000000
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSqlRow.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.client.fakes;
-
-import java.time.Instant;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.LocalTime;
-import java.util.BitSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.UUID;
-import org.apache.ignite.sql.ResultSetMetadata;
-import org.apache.ignite.sql.SqlRow;
-import org.apache.ignite.table.Tuple;
-
-/**
- * Client SQL row.
- */
-public class FakeSqlRow implements SqlRow {
-    /** Row. */
-    private final List<Object> row;
-
-    /** Meta. */
-    private final ResultSetMetadata metadata;
-
-    /**
-     * Constructor.
-     *
-     * @param row Row.
-     * @param meta Meta.
-     */
-    public FakeSqlRow(List<Object> row, ResultSetMetadata meta) {
-        assert row != null;
-        assert meta != null;
-
-        // noinspection AssignmentOrReturnOfFieldWithMutableType
-        this.row = row;
-        this.metadata = meta;
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public int columnCount() {
-        return row.size();
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public String columnName(int columnIndex) {
-        return metadata.columns().get(columnIndex).name();
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public int columnIndex(String columnName) {
-        return metadata.indexOf(columnName);
-    }
-
-    private int columnIndexChecked(String columnName) {
-        int idx = columnIndex(columnName);
-
-        if (idx == -1) {
-            throw new IllegalArgumentException("Column doesn't exist [name=" + 
columnName + ']');
-        }
-
-        return idx;
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public <T> T valueOrDefault(String columnName, T defaultValue) {
-        T ret = (T) row.get(columnIndexChecked(columnName));
-
-        return ret != null ? ret : defaultValue;
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public Tuple set(String columnName, Object value) {
-        throw new UnsupportedOperationException("Operation not supported.");
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public <T> T value(String columnName) throws IllegalArgumentException {
-        return (T) row.get(columnIndexChecked(columnName));
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public <T> T value(int columnIndex) {
-        return (T) row.get(columnIndex);
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public boolean booleanValue(String columnName) {
-        return (boolean) row.get(columnIndexChecked(columnName));
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public boolean booleanValue(int columnIndex) {
-        return (boolean) row.get(columnIndex);
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public byte byteValue(String columnName) {
-        return (byte) row.get(columnIndexChecked(columnName));
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public byte byteValue(int columnIndex) {
-        return (byte) row.get(columnIndex);
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public short shortValue(String columnName) {
-        return (short) row.get(columnIndexChecked(columnName));
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public short shortValue(int columnIndex) {
-        return (short) row.get(columnIndex);
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public int intValue(String columnName) {
-        return (int) row.get(columnIndexChecked(columnName));
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public int intValue(int columnIndex) {
-        return (int) row.get(columnIndex);
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public long longValue(String columnName) {
-        return (long) row.get(columnIndexChecked(columnName));
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public long longValue(int columnIndex) {
-        return (long) row.get(columnIndex);
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public float floatValue(String columnName) {
-        return (float) row.get(columnIndexChecked(columnName));
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public float floatValue(int columnIndex) {
-        return (float) row.get(columnIndex);
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public double doubleValue(String columnName) {
-        return (double) row.get(columnIndexChecked(columnName));
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public double doubleValue(int columnIndex) {
-        return (double) row.get(columnIndex);
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public String stringValue(String columnName) {
-        return (String) row.get(columnIndexChecked(columnName));
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public String stringValue(int columnIndex) {
-        return (String) row.get(columnIndex);
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public UUID uuidValue(String columnName) {
-        return (UUID) row.get(columnIndexChecked(columnName));
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public UUID uuidValue(int columnIndex) {
-        return (UUID) row.get(columnIndex);
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public BitSet bitmaskValue(String columnName) {
-        return (BitSet) row.get(columnIndexChecked(columnName));
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public BitSet bitmaskValue(int columnIndex) {
-        return (BitSet) row.get(columnIndex);
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public LocalDate dateValue(String columnName) {
-        return (LocalDate) row.get(columnIndexChecked(columnName));
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public LocalDate dateValue(int columnIndex) {
-        return (LocalDate) row.get(columnIndex);
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public LocalTime timeValue(String columnName) {
-        return (LocalTime) row.get(columnIndexChecked(columnName));
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public LocalTime timeValue(int columnIndex) {
-        return (LocalTime) row.get(columnIndex);
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public LocalDateTime datetimeValue(String columnName) {
-        return (LocalDateTime) row.get(columnIndexChecked(columnName));
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public LocalDateTime datetimeValue(int columnIndex) {
-        return (LocalDateTime) row.get(columnIndex);
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public Instant timestampValue(String columnName) {
-        return (Instant) row.get(columnIndexChecked(columnName));
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public Instant timestampValue(int columnIndex) {
-        return (Instant) row.get(columnIndex);
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public Iterator<Object> iterator() {
-        return row.iterator();
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public ResultSetMetadata metadata() {
-        return metadata;
-    }
-}
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSqlTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSqlTest.java
index 1815b8617e..9d70105580 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSqlTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSqlTest.java
@@ -42,6 +42,7 @@ import org.apache.ignite.sql.NoRowSetExpectedException;
 import org.apache.ignite.sql.ResultSet;
 import org.apache.ignite.sql.ResultSetMetadata;
 import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlException;
 import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.sql.Statement;
 import org.apache.ignite.sql.async.AsyncResultSet;
@@ -507,6 +508,21 @@ public class ItThinClientSqlTest extends 
ItAbstractThinClientTest {
         assertEquals(ColumnType.NULL, meta.columns().get(14).type());
     }
 
+    @Test
+    public void testExecuteScriptFail() {
+        var script = "CREATE TABLE execute_script_fail (id INT PRIMARY KEY, 
step INTEGER); "
+                + "INSERT INTO execute_script_fail VALUES(1, 0); "
+                + "UPDATE execute_script_fail SET step = 1; "
+                + "UPDATE execute_script_fail SET step = 3 WHERE step > 1/0; "
+                + "UPDATE execute_script_fail SET step = 2; ";
+
+        SqlException e = assertThrows(
+                SqlException.class,
+                () -> client().sql().createSession().executeScript(script));
+
+        assertThat(e.getMessage(), Matchers.containsString("Division by 
zero"));
+    }
+
     private static class Pojo {
         public int num;
 
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 1627d80b6b..8ea348e858 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -833,7 +833,6 @@ public class IgniteImpl implements Ignite {
                 compute,
                 clusterSvc,
                 nettyBootstrapFactory,
-                sql,
                 () -> cmgMgr.clusterState().thenApply(s -> s.clusterTag()),
                 metricManager,
                 new ClientHandlerMetricSource(),
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
index 13e2422cb9..026ca58f17 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
@@ -41,7 +41,7 @@ import org.jetbrains.annotations.Nullable;
  * Asynchronous result set implementation.
  */
 public class AsyncResultSetImpl<T> implements AsyncResultSet<T> {
-    private final IdleExpirationTracker expirationTracker;
+    private final @Nullable IdleExpirationTracker expirationTracker;
 
     private final AsyncSqlCursor<InternalSqlRow> cursor;
 
@@ -49,6 +49,21 @@ public class AsyncResultSetImpl<T> implements 
AsyncResultSet<T> {
 
     private final int pageSize;
 
+    /**
+     * Constructor.
+     *
+     * @param cursor Query cursor representing the result of execution.
+     * @param page Current page.
+     * @param pageSize Size of the page to fetch.
+     */
+    public AsyncResultSetImpl(
+            AsyncSqlCursor<InternalSqlRow> cursor,
+            BatchedResult<InternalSqlRow> page,
+            int pageSize
+    ) {
+        this(cursor, page, pageSize, null);
+    }
+
     /**
      * Constructor.
      *
@@ -62,7 +77,7 @@ public class AsyncResultSetImpl<T> implements 
AsyncResultSet<T> {
             AsyncSqlCursor<InternalSqlRow> cursor,
             BatchedResult<InternalSqlRow> page,
             int pageSize,
-            IdleExpirationTracker expirationTracker
+            @Nullable IdleExpirationTracker expirationTracker
     ) {
         this.cursor = cursor;
         this.curPage = page;
@@ -111,7 +126,9 @@ public class AsyncResultSetImpl<T> implements 
AsyncResultSet<T> {
     public Iterable<T> currentPage() {
         requireResultSet();
 
-        expirationTracker.touch();
+        if (expirationTracker != null) {
+            expirationTracker.touch();
+        }
 
         Iterator<InternalSqlRow> it0 = curPage.items().iterator();
         ResultSetMetadata meta0 = cursor.metadata();
@@ -133,7 +150,9 @@ public class AsyncResultSetImpl<T> implements 
AsyncResultSet<T> {
     public CompletableFuture<? extends AsyncResultSet<T>> fetchNextPage() {
         requireResultSet();
 
-        expirationTracker.touch();
+        if (expirationTracker != null) {
+            expirationTracker.touch();
+        }
 
         return cursor.requestNextAsync(pageSize)
                 .thenApply(page -> {
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
index c7b2453aa9..2193854eda 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
@@ -41,7 +41,9 @@ import java.util.concurrent.Flow.Publisher;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.sql.AbstractSession;
 import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
@@ -309,98 +311,138 @@ public class SessionImpl implements AbstractSession {
                     .set(QueryProperty.ALLOWED_QUERY_TYPES, 
EnumSet.of(SqlQueryType.DML))
                     .build();
 
-            var counters = new LongArrayList(batch.size());
-            CompletableFuture<?> tail = nullCompletedFuture();
-            ArrayList<CompletableFuture<?>> batchFuts = new 
ArrayList<>(batch.size());
-
-            for (int i = 0; i < batch.size(); ++i) {
-                Object[] args = batch.get(i).toArray();
+            return executeBatchCore(
+                    qryProc,
+                    transactions,
+                    (InternalTransaction) transaction,
+                    query,
+                    batch,
+                    properties,
+                    busyLock::enterBusy,
+                    busyLock::leaveBusy,
+                    this::registerCursor,
+                    openedCursors::remove);
+        } catch (Exception e) {
+            return CompletableFuture.failedFuture(mapToPublicSqlException(e));
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
 
-                tail = tail.thenCompose(v -> {
-                    if (!busyLock.enterBusy()) {
-                        return 
CompletableFuture.failedFuture(sessionIsClosedException());
-                    }
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction 
transaction, Statement statement, BatchedArguments batch) {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
 
-                    try {
-                        return qryProc.querySingleAsync(properties, 
transactions, (InternalTransaction) transaction, query, args)
-                                .thenCompose(cursor -> {
-                                    if (!busyLock.enterBusy()) {
-                                        cursor.closeAsync();
+    /**
+     * Execute batch of DML statements.
+     *
+     * @param qryProc Query processor.
+     * @param transactions Transactions facade.
+     * @param transaction Transaction.
+     * @param query Query.
+     * @param batch Batch of arguments.
+     * @param properties Properties.
+     * @param enterBusy Enter busy lock action.
+     * @param leaveBusy Leave busy lock action.
+     * @param registerCursor Register cursor action.
+     * @param removeCursor Remove cursor action.
+     * @return Operation Future completed with the number of rows affected by 
each query in the batch
+     *         (if the batch succeeds), future completed with the {@link 
SqlBatchException} (if the batch fails).
+     */
+    public static CompletableFuture<long[]> executeBatchCore(
+            QueryProcessor qryProc,
+            IgniteTransactions transactions,
+            @Nullable InternalTransaction transaction,
+            String query,
+            BatchedArguments batch,
+            SqlProperties properties,
+            Supplier<Boolean> enterBusy,
+            Runnable leaveBusy,
+            Function<AsyncSqlCursor<?>, Integer> registerCursor,
+            Consumer<Integer> removeCursor) {
+        var counters = new LongArrayList(batch.size());
+        CompletableFuture<?> tail = nullCompletedFuture();
+        ArrayList<CompletableFuture<?>> batchFuts = new 
ArrayList<>(batch.size());
+
+        for (int i = 0; i < batch.size(); ++i) {
+            Object[] args = batch.get(i).toArray();
+
+            tail = tail.thenCompose(v -> {
+                if (!enterBusy.get()) {
+                    return 
CompletableFuture.failedFuture(sessionIsClosedException());
+                }
 
-                                        return 
CompletableFuture.failedFuture(sessionIsClosedException());
-                                    }
+                try {
+                    return qryProc.querySingleAsync(properties, transactions, 
transaction, query, args)
+                            .thenCompose(cursor -> {
+                                if (!enterBusy.get()) {
+                                    cursor.closeAsync();
 
-                                    try {
-                                        int cursorId = registerCursor(cursor);
+                                    return 
CompletableFuture.failedFuture(sessionIsClosedException());
+                                }
 
-                                        return cursor.requestNextAsync(1)
-                                                .handle((page, th) -> {
-                                                    
openedCursors.remove(cursorId);
-                                                    cursor.closeAsync();
+                                try {
+                                    int cursorId = 
registerCursor.apply(cursor);
 
-                                                    if (th != null) {
-                                                        return 
CompletableFuture.failedFuture(th);
-                                                    }
+                                    return cursor.requestNextAsync(1)
+                                            .handle((page, th) -> {
+                                                removeCursor.accept(cursorId);
+                                                cursor.closeAsync();
 
-                                                    validateDmlResult(page);
+                                                if (th != null) {
+                                                    return 
CompletableFuture.failedFuture(th);
+                                                }
 
-                                                    counters.add((long) 
page.items().get(0).get(0));
+                                                validateDmlResult(page);
 
-                                                    return 
nullCompletedFuture();
-                                                
}).thenCompose(Function.identity());
-                                    } finally {
-                                        busyLock.leaveBusy();
-                                    }
-                                });
-                    } finally {
-                        busyLock.leaveBusy();
-                    }
-                });
+                                                counters.add((long) 
page.items().get(0).get(0));
 
-                batchFuts.add(tail);
-            }
+                                                return nullCompletedFuture();
+                                            
}).thenCompose(Function.identity());
+                                } finally {
+                                    leaveBusy.run();
+                                }
+                            });
+                } finally {
+                    leaveBusy.run();
+                }
+            });
 
-            CompletableFuture<long[]> resFut = tail
-                    .exceptionally((ex) -> {
-                        Throwable cause = ExceptionUtils.unwrapCause(ex);
+            batchFuts.add(tail);
+        }
 
-                        if (cause instanceof CancellationException) {
-                            throw (CancellationException) cause;
-                        }
+        CompletableFuture<long[]> resFut = tail
+                .exceptionally((ex) -> {
+                    Throwable cause = ExceptionUtils.unwrapCause(ex);
 
-                        Throwable t = mapToPublicSqlException(cause);
+                    if (cause instanceof CancellationException) {
+                        throw (CancellationException) cause;
+                    }
 
-                        if (t instanceof TraceableException) {
-                            throw new SqlBatchException(
-                                    ((TraceableException) t).traceId(),
-                                    ((TraceableException) t).code(),
-                                    
counters.toArray(ArrayUtils.LONG_EMPTY_ARRAY),
-                                    t);
-                        }
+                    Throwable t = mapToPublicSqlException(cause);
 
-                        // JVM error.
-                        throw new CompletionException(cause);
-                    })
-                    .thenApply(v -> 
counters.toArray(ArrayUtils.LONG_EMPTY_ARRAY));
+                    if (t instanceof TraceableException) {
+                        throw new SqlBatchException(
+                                ((TraceableException) t).traceId(),
+                                ((TraceableException) t).code(),
+                                counters.toArray(ArrayUtils.LONG_EMPTY_ARRAY),
+                                t);
+                    }
 
-            resFut.whenComplete((cur, ex) -> {
-                if (ExceptionUtils.unwrapCause(ex) instanceof 
CancellationException) {
-                    batchFuts.forEach(f -> f.cancel(false));
-                }
-            });
+                    // JVM error.
+                    throw new CompletionException(cause);
+                })
+                .thenApply(v -> counters.toArray(ArrayUtils.LONG_EMPTY_ARRAY));
 
-            return resFut;
-        } catch (Exception e) {
-            return CompletableFuture.failedFuture(mapToPublicSqlException(e));
-        } finally {
-            busyLock.leaveBusy();
-        }
-    }
+        resFut.whenComplete((cur, ex) -> {
+            if (ExceptionUtils.unwrapCause(ex) instanceof 
CancellationException) {
+                batchFuts.forEach(f -> f.cancel(false));
+            }
+        });
 
-    /** {@inheritDoc} */
-    @Override
-    public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction 
transaction, Statement statement, BatchedArguments batch) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        return resFut;
     }
 
     /** {@inheritDoc} */
@@ -416,16 +458,46 @@ public class SessionImpl implements AbstractSession {
                 .set(QueryProperty.ALLOWED_QUERY_TYPES, SqlQueryType.ALL)
                 .build();
 
-        CompletableFuture<Void> resFut = new CompletableFuture<>();
         try {
-            CompletableFuture<AsyncSqlCursor<InternalSqlRow>> f =
-                    qryProc.queryScriptAsync(properties, transactions, null, 
query, arguments);
-
-            ScriptHandler handler = new ScriptHandler(resFut);
-            f.whenComplete(handler::processCursor);
+            return executeScriptCore(
+                    qryProc,
+                    transactions,
+                    busyLock::enterBusy,
+                    busyLock::leaveBusy,
+                    query,
+                    arguments,
+                    properties);
         } finally {
             busyLock.leaveBusy();
         }
+    }
+
+    /**
+     * Execute SQL script.
+     *
+     * @param qryProc Query processor.
+     * @param transactions Transactions facade.
+     * @param enterBusy Enter busy lock action.
+     * @param leaveBusy Leave busy lock action.
+     * @param query SQL script.
+     * @param arguments Arguments.
+     * @param properties Properties.
+     * @return Operation future.
+     */
+    public static CompletableFuture<Void> executeScriptCore(
+            QueryProcessor qryProc,
+            IgniteTransactions transactions,
+            Supplier<Boolean> enterBusy,
+            Runnable leaveBusy,
+            String query,
+            @Nullable Object[] arguments,
+            SqlProperties properties) {
+        CompletableFuture<AsyncSqlCursor<InternalSqlRow>> f =
+                qryProc.queryScriptAsync(properties, transactions, null, 
query, arguments);
+
+        CompletableFuture<Void> resFut = new CompletableFuture<>();
+        ScriptHandler handler = new ScriptHandler(resFut, enterBusy, 
leaveBusy);
+        f.whenComplete(handler::processCursor);
 
         return resFut.exceptionally((th) -> {
             Throwable cause = ExceptionUtils.unwrapCause(th);
@@ -606,12 +678,19 @@ public class SessionImpl implements AbstractSession {
         return List.copyOf(openedCursors.values());
     }
 
-    private class ScriptHandler {
+    private static class ScriptHandler {
         private final CompletableFuture<Void> resFut;
         private final List<Throwable> cursorCloseErrors = 
Collections.synchronizedList(new ArrayList<>());
+        private final Supplier<Boolean> enterBusy;
+        private final Runnable leaveBusy;
 
-        ScriptHandler(CompletableFuture<Void> resFut) {
+        ScriptHandler(
+                CompletableFuture<Void> resFut,
+                Supplier<Boolean> enterBusy,
+                Runnable leaveBusy) {
             this.resFut = resFut;
+            this.enterBusy = enterBusy;
+            this.leaveBusy = leaveBusy;
         }
 
         void processCursor(AsyncSqlCursor<InternalSqlRow> cursor, Throwable 
scriptError) {
@@ -628,7 +707,7 @@ public class SessionImpl implements AbstractSession {
                     cursorCloseErrors.add(cursorCloseError);
                 }
 
-                if (!busyLock.enterBusy()) {
+                if (!enterBusy.get()) {
                     onFail(sessionIsClosedException());
                     return;
                 }
@@ -639,7 +718,7 @@ public class SessionImpl implements AbstractSession {
                         return;
                     }
                 } finally {
-                    busyLock.leaveBusy();
+                    leaveBusy.run();
                 }
 
                 onComplete();

Reply via email to