This is an automated email from the ASF dual-hosted git repository.

tledkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new aaf48298b IGNITE-16964 SQL API: Implement async SQL API (#817)
aaf48298b is described below

commit aaf48298b6b1b8f9cdc7bb7e39ebba104ec8406c
Author: Taras Ledkov <[email protected]>
AuthorDate: Mon May 30 11:19:41 2022 +0300

    IGNITE-16964 SQL API: Implement async SQL API (#817)
---
 .../main/java/org/apache/ignite/sql/ResultSet.java |   2 +-
 .../apache/ignite/sql/async/AsyncResultSet.java    |  11 +-
 .../ignite/sql/reactive/ReactiveResultSet.java     |   2 +-
 .../client/fakes/FakeIgniteQueryProcessor.java     |   6 +
 .../ignite/internal/util/CollectionUtils.java      |  29 +-
 .../internal/testframework/IgniteTestUtils.java    |  83 ++++
 .../internal/sql/api/ItSqlAsynchronousApiTest.java | 466 +++++++++++++++++++++
 .../org/apache/ignite/internal/app/IgniteImpl.java |   8 +-
 .../internal/sql/api/AsyncResultSetImpl.java       | 410 ++++++++++++++++++
 .../internal/sql/api/IgniteSqlException.java       |  70 ++++
 .../ignite/internal/sql/api/IgniteSqlImpl.java     |  66 +++
 .../internal/sql/api/SessionBuilderImpl.java       | 122 ++++++
 .../ignite/internal/sql/api/SessionImpl.java       | 295 +++++++++++++
 .../ignite/internal/sql/engine/QueryProcessor.java |  17 +-
 .../ignite/internal/sql/engine/QueryTimeout.java   |  48 +++
 .../internal/sql/engine/SqlQueryProcessor.java     |  82 +++-
 .../sql/engine/exec/ExecutionServiceImpl.java      |   6 +-
 .../sql/engine/exec/ddl/DdlCommandHandler.java     |  81 +++-
 .../sql/engine/schema/SqlSchemaManagerImpl.java    |   6 +-
 .../internal/sql/engine/IgniteSqlApiTest.java      |  20 +-
 .../sql/engine/exec/ExecutionServiceImplTest.java  |   8 +-
 21 files changed, 1788 insertions(+), 50 deletions(-)

diff --git a/modules/api/src/main/java/org/apache/ignite/sql/ResultSet.java 
b/modules/api/src/main/java/org/apache/ignite/sql/ResultSet.java
index 3c24af954..f8f389f68 100644
--- a/modules/api/src/main/java/org/apache/ignite/sql/ResultSet.java
+++ b/modules/api/src/main/java/org/apache/ignite/sql/ResultSet.java
@@ -54,7 +54,7 @@ public interface ResultSet extends Iterable<SqlRow>, 
AutoCloseable {
      *
      * @return Number of rows affected by the query, or {@code 0} if statement 
return nothing, or {@code -1} if inapplicable.
      */
-    int affectedRows();
+    long affectedRows();
 
     /**
      * Returns whether the query that produce this result was a conditional 
query, or not. E.g. for the query "Create table if not exists"
diff --git 
a/modules/api/src/main/java/org/apache/ignite/sql/async/AsyncResultSet.java 
b/modules/api/src/main/java/org/apache/ignite/sql/async/AsyncResultSet.java
index 7ab63d8be..a2d8166be 100644
--- a/modules/api/src/main/java/org/apache/ignite/sql/async/AsyncResultSet.java
+++ b/modules/api/src/main/java/org/apache/ignite/sql/async/AsyncResultSet.java
@@ -58,7 +58,7 @@ public interface AsyncResultSet {
      * @return Number of rows affected by the query, or {@code 0} if statement 
return nothing, or {@code -1} if inapplicable.
      * @see ResultSet#affectedRows()
      */
-    int affectedRows();
+    long affectedRows();
 
     /**
      * Returns whether the query that produce this result was a conditional 
query, or not. E.g. for the query "Create table if not exists"
@@ -66,7 +66,7 @@ public interface AsyncResultSet {
      * table was already existed.
      *
      * <p>Note: when returns {@code false}, then either {@link 
#affectedRows()} return number of affected rows or {@link #hasRowSet()}
-     * returns {@code true}.
+     * returns {@code true} or conditional DDL query is not applied.
      *
      * @return {@code True} if conditional query applied, {@code false} 
otherwise.
      * @see ResultSet#wasApplied()
@@ -95,4 +95,11 @@ public interface AsyncResultSet {
      * @return {@code True} if there are more pages with results, {@code 
false} otherwise.
      */
     boolean hasMorePages();
+
+    /**
+     * Invalidates query result, stops the query and cleanups query resources.
+     *
+     * @return Operation future.
+     */
+    CompletionStage<Void> closeAsync();
 }
diff --git 
a/modules/api/src/main/java/org/apache/ignite/sql/reactive/ReactiveResultSet.java
 
b/modules/api/src/main/java/org/apache/ignite/sql/reactive/ReactiveResultSet.java
index 04d96799e..525a5422b 100644
--- 
a/modules/api/src/main/java/org/apache/ignite/sql/reactive/ReactiveResultSet.java
+++ 
b/modules/api/src/main/java/org/apache/ignite/sql/reactive/ReactiveResultSet.java
@@ -58,7 +58,7 @@ public interface ReactiveResultSet extends 
Flow.Publisher<SqlRow> {
      * @return Publisher for number of rows.
      * @see ResultSet#affectedRows()
      */
-    Flow.Publisher<Integer> affectedRows();
+    Flow.Publisher<Long> affectedRows();
 
     /**
      * Returns publisher for a flag which determines whether the query that 
produce this result was a conditional query, or not.
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
index beb263b9e..85a47ec2b 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
@@ -38,6 +38,12 @@ public class FakeIgniteQueryProcessor implements 
QueryProcessor {
         return List.of(CompletableFuture.completedFuture(new FakeCursor()));
     }
 
+    @Override
+    public CompletableFuture<AsyncSqlCursor<List<Object>>> 
querySingleAsync(QueryContext context, String schemaName, String qry,
+            Object... params) {
+        return CompletableFuture.completedFuture(new FakeCursor());
+    }
+
     @Override
     public void start() {
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
index 264f1cbe0..a20b82393 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
@@ -81,10 +81,10 @@ public final class CollectionUtils {
      * Gets first element from given list or returns {@code null} if list is 
empty.
      *
      * @param list List to retrieve the first element.
-     * @param <T>  Type of the elements of the list.
+     * @param <T> Type of the elements of the list.
      * @return The first element of the given list or {@code null} in case the 
list is empty.
      */
-    public static <T> T first(List<? extends T> list) {
+    public static <T> @Nullable T first(List<? extends T> list) {
         if (nullOrEmpty(list)) {
             return null;
         }
@@ -96,10 +96,10 @@ public final class CollectionUtils {
      * Gets first element from given collection or returns {@code null} if 
collection is empty.
      *
      * @param col Collection to retrieve the first element.
-     * @param <T>  Type of the elements of the collection.
+     * @param <T> Type of the elements of the collection.
      * @return The first element of the given collection or {@code null} in 
case the collection is empty.
      */
-    public static <T> T first(Collection<? extends T> col) {
+    public static <T> @Nullable T first(Collection<? extends T> col) {
         if (nullOrEmpty(col)) {
             return null;
         }
@@ -107,6 +107,27 @@ public final class CollectionUtils {
         return col.iterator().next();
     }
 
+    /**
+     * Returns first element from the given iterable or returns {@code null} 
if the list is empty.
+     *
+     * @param iterable Iterable to retrieve the first element.
+     * @param <T> Type of the elements of the list.
+     * @return The first element of the given iterable or {@code null} in case 
the iterable is null or empty.
+     */
+    public static <T> @Nullable T first(Iterable<? extends T> iterable) {
+        if (iterable == null) {
+            return null;
+        }
+
+        Iterator<? extends T> it = iterable.iterator();
+
+        if (!it.hasNext()) {
+            return null;
+        }
+
+        return it.next();
+    }
+
     /**
      * Union set and items.
      *
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
index 711a30d98..53f261f5b 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
@@ -24,6 +24,7 @@ import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.lang.reflect.Modifier;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Collection;
 import java.util.Collections;
@@ -164,6 +165,88 @@ public final class IgniteTestUtils {
         }
     }
 
+    /**
+     * Get object field value via reflection.
+     *
+     * @param obj Object or class to get field value from.
+     * @param fieldNames Field names to get value for: 
obj->field1->field2->...->fieldN.
+     * @param <T> Expected field class.
+     * @return Field value.
+     * @throws IgniteInternalException In case of error.
+     */
+    public static <T> T getFieldValue(Object obj, String... fieldNames) {
+        assert obj != null;
+        assert fieldNames != null;
+        assert fieldNames.length >= 1;
+
+        try {
+            for (String fieldName : fieldNames) {
+                Class<?> cls = obj instanceof Class ? (Class) obj : 
obj.getClass();
+
+                try {
+                    obj = findField(cls, obj, fieldName);
+                } catch (NoSuchFieldException e) {
+                    // Resolve inner class, if not an inner field.
+                    Class<?> innerCls = getInnerClass(cls, fieldName);
+
+                    if (innerCls == null) {
+                        throw new IgniteInternalException("Failed to get 
object field [obj=" + obj
+                                + ", fieldNames=" + 
Arrays.toString(fieldNames) + ']', e);
+                    }
+
+                    obj = innerCls;
+                }
+            }
+
+            return (T) obj;
+        } catch (IllegalAccessException e) {
+            throw new IgniteInternalException("Failed to get object field 
[obj=" + obj
+                    + ", fieldNames=" + Arrays.toString(fieldNames) + ']', e);
+        }
+    }
+
+    /**
+     * Get object field value via reflection.
+     *
+     * @param cls Class for searching.
+     * @param obj Target object.
+     * @param fieldName Field name for search.
+     * @return Field from object if it was found.
+     */
+    private static Object findField(
+            Class<?> cls,
+            Object obj,
+            String fieldName
+    ) throws NoSuchFieldException, IllegalAccessException {
+        // Resolve inner field.
+        Field field = cls.getDeclaredField(fieldName);
+
+        boolean accessible = field.isAccessible();
+
+        if (!accessible) {
+            field.setAccessible(true);
+        }
+
+        return field.get(obj);
+    }
+
+    /**
+     * Get inner class by its name from the enclosing class.
+     *
+     * @param parentCls Parent class to resolve inner class for.
+     * @param innerClsName Name of the inner class.
+     * @return Inner class.
+     */
+    @Nullable public static <T> Class<T> getInnerClass(Class<?> parentCls, 
String innerClsName) {
+        for (Class<?> cls : parentCls.getDeclaredClasses()) {
+            if (innerClsName.equals(cls.getSimpleName())) {
+                return (Class<T>) cls;
+            }
+        }
+
+        return null;
+    }
+
     /**
      * Checks whether runnable throws exception, which is itself of a 
specified class, or has a cause of the specified class.
      *
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
new file mode 100644
index 000000000..3d48a8b66
--- /dev/null
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.api;
+
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+import org.apache.ignite.internal.sql.engine.AbstractBasicIntegrationTest;
+import org.apache.ignite.internal.sql.engine.ClosedCursorException;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.util.CollectionUtils;
+import org.apache.ignite.lang.ColumnAlreadyExistsException;
+import org.apache.ignite.lang.ColumnNotFoundException;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IndexAlreadyExistsException;
+import org.apache.ignite.lang.TableAlreadyExistsException;
+import org.apache.ignite.lang.TableNotFoundException;
+import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.sql.async.AsyncResultSet;
+import org.apache.ignite.table.Table;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Tests for asynchronous SQL API.
+ */
+@Disabled("https://issues.apache.org/jira/browse/IGNITE-15655";)
+public class ItSqlAsynchronousApiTest extends AbstractBasicIntegrationTest {
+    private static final int ROW_COUNT = 16;
+
+    /**
+     * Clear tables after each test.
+     *
+     * @param testInfo Test information oject.
+     * @throws Exception If failed.
+     */
+    @AfterEach
+    @Override
+    public void tearDown(TestInfo testInfo) throws Exception {
+        for (Table t : CLUSTER_NODES.get(0).tables().tables()) {
+            sql("DROP TABLE " + t.name());
+        }
+
+        super.tearDownBase(testInfo);
+    }
+
+    @Test
+    public void ddl() throws ExecutionException, InterruptedException {
+        IgniteSql sql = CLUSTER_NODES.get(0).sql();
+        Session ses = sql.createSession();
+
+        // CREATE TABLE
+        checkDdl(true, ses, "CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+        checkError(
+                TableAlreadyExistsException.class,
+                "Table already exists [name=PUBLIC.TEST]",
+                ses,
+                "CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"
+        );
+        checkDdl(false, ses, "CREATE TABLE IF NOT EXISTS TEST(ID INT PRIMARY 
KEY, VAL VARCHAR)");
+
+        // ADD COLUMN
+        checkDdl(true, ses, "ALTER TABLE TEST ADD COLUMN IF NOT EXISTS VAL1 
VARCHAR");
+        checkError(
+                TableNotFoundException.class,
+                "Table does not exist [name=PUBLIC.NOT_EXISTS_TABLE]",
+                ses,
+                "ALTER TABLE NOT_EXISTS_TABLE ADD COLUMN VAL1 VARCHAR"
+        );
+        checkDdl(false, ses, "ALTER TABLE IF EXISTS NOT_EXISTS_TABLE ADD 
COLUMN VAL1 VARCHAR");
+        checkError(
+                ColumnAlreadyExistsException.class,
+                "Column already exists [name=VAL1]",
+                ses,
+                "ALTER TABLE TEST ADD COLUMN VAL1 INT"
+        );
+        checkDdl(false, ses, "ALTER TABLE TEST ADD COLUMN IF NOT EXISTS VAL1 
INT");
+
+        // CREATE INDEX
+        checkDdl(true, ses, "CREATE INDEX TEST_IDX ON TEST(VAL0)");
+        checkError(
+                IndexAlreadyExistsException.class,
+                "Index already exists [name=TEST_IDX]",
+                ses,
+                "CREATE INDEX TEST_IDX ON TEST(VAL1)"
+        );
+        checkDdl(false, ses, "CREATE INDEX IF NOT EXISTS TEST_IDX ON 
TEST(VAL1)");
+
+        // DROP COLUMNS
+        checkDdl(true, ses, "ALTER TABLE TEST DROP COLUMN VAL1");
+        checkError(
+                TableNotFoundException.class,
+                "Table does not exist [name=PUBLIC.NOT_EXISTS_TABLE]",
+                ses,
+                "ALTER TABLE NOT_EXISTS_TABLE DROP COLUMN VAL1"
+        );
+        checkDdl(false, ses, "ALTER TABLE IF EXISTS NOT_EXISTS_TABLE DROP 
COLUMN VAL1");
+        checkError(
+                ColumnNotFoundException.class,
+                "Column 'VAL1' does not exist in table '\"PUBLIC\".\"TEST\"'",
+                ses,
+                "ALTER TABLE TEST DROP COLUMN VAL1"
+        );
+        checkDdl(false, ses, "ALTER TABLE TEST DROP COLUMN IF EXISTS VAL1");
+
+        // DROP TABLE
+        checkDdl(false, ses, "DROP TABLE IF EXISTS NOT_EXISTS_TABLE");
+        checkDdl(true, ses, "DROP TABLE TEST");
+        checkError(
+                TableNotFoundException.class,
+                "Table does not exist [name=PUBLIC.TEST]",
+                ses,
+                "DROP TABLE TEST"
+        );
+
+        checkSession(ses);
+    }
+
+    @Test
+    public void dml() throws ExecutionException, InterruptedException {
+        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+
+        IgniteSql sql = CLUSTER_NODES.get(0).sql();
+        Session ses = sql.createSession();
+
+        for (int i = 0; i < ROW_COUNT; ++i) {
+            checkDml(1, ses, "INSERT INTO TEST VALUES (?, ?)", i, i);
+        }
+
+        checkDml(ROW_COUNT, ses, "UPDATE TEST SET VAL0 = VAL0 + ?", 1);
+
+        checkDml(ROW_COUNT, ses, "DELETE FROM TEST WHERE VAL0 >= 0");
+
+        checkSession(ses);
+    }
+
+    @Test
+    public void select() throws ExecutionException, InterruptedException {
+        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+        for (int i = 0; i < ROW_COUNT; ++i) {
+            sql("INSERT INTO TEST VALUES (?, ?)", i, i);
+        }
+
+        IgniteSql sql = CLUSTER_NODES.get(0).sql();
+        Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 
4).build();
+
+        TestPageProcessor pageProc = new TestPageProcessor(4);
+        ses.executeAsync(null, "SELECT ID FROM 
TEST").thenCompose(pageProc).get();
+
+        Set<Integer> rs = pageProc.result().stream().map(r -> 
r.intValue(0)).collect(Collectors.toSet());
+
+        for (int i = 0; i < ROW_COUNT; ++i) {
+            assertTrue(rs.remove(i), "Results invalid: " + pageProc.result());
+        }
+
+        assertTrue(rs.isEmpty());
+
+        checkSession(ses);
+    }
+
+    @Test
+    public void sqlRow() throws ExecutionException, InterruptedException {
+        IgniteSql sql = CLUSTER_NODES.get(0).sql();
+        Session ses = sql.sessionBuilder().build();
+
+        AsyncResultSet ars = ses.executeAsync(null, "SELECT 1 as COL_A, 2 as 
COL_B").get();
+
+        SqlRow r = CollectionUtils.first(ars.currentPage());
+
+        assertEquals(2, r.columnCount());
+        assertEquals(0, r.columnIndex("COL_A"));
+        assertEquals(1, r.columnIndex("COL_B"));
+        assertEquals(-1, r.columnIndex("notExistColumn"));
+
+        assertEquals(1, r.intValue("COL_A"));
+        assertEquals(2, r.intValue("COL_B"));
+
+        assertThrowsWithCause(
+                () -> r.intValue("notExistColumn"),
+                IllegalArgumentException.class,
+                "Column doesn't exist [name=notExistColumn]"
+        );
+
+        assertEquals(1, r.intValue(0));
+        assertEquals(2, r.intValue(1));
+        assertThrowsWithCause(() -> r.intValue(-2), 
IndexOutOfBoundsException.class);
+        assertThrowsWithCause(() -> r.intValue(10), 
IndexOutOfBoundsException.class);
+
+        await(ars.closeAsync());
+
+        checkSession(ses);
+    }
+
+    @Test
+    public void pageSequence() throws ExecutionException, InterruptedException 
{
+        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+        for (int i = 0; i < ROW_COUNT; ++i) {
+            sql("INSERT INTO TEST VALUES (?, ?)", i, i);
+        }
+
+        IgniteSql sql = CLUSTER_NODES.get(0).sql();
+        Session ses = sql.sessionBuilder().defaultPageSize(1).build();
+
+        AsyncResultSet ars0 = ses.executeAsync(null, "SELECT ID FROM TEST 
ORDER BY ID").get();
+        AsyncResultSet ars1 = ars0.fetchNextPage().toCompletableFuture().get();
+        AsyncResultSet ars2 = ars1.fetchNextPage().toCompletableFuture().get();
+        AsyncResultSet ars3 = ars1.fetchNextPage().toCompletableFuture().get();
+        AsyncResultSet ars4 = ars0.fetchNextPage().toCompletableFuture().get();
+
+        assertSame(ars1, ars4);
+        assertSame(ars2, ars3);
+
+        List<SqlRow> res = Stream.of(ars0, ars1, ars2)
+                .map(AsyncResultSet::currentPage)
+                .flatMap(p -> StreamSupport.stream(p.spliterator(), false))
+                .collect(Collectors.toList());
+
+        TestPageProcessor pageProc = new TestPageProcessor(ROW_COUNT - 
res.size());
+        ars3.fetchNextPage().thenCompose(pageProc).toCompletableFuture().get();
+
+        res.addAll(pageProc.result());
+
+        for (int i = 0; i < ROW_COUNT; ++i) {
+            assertEquals(i, res.get(i).intValue(0));
+        }
+    }
+
+    @Test
+    public void fetchNextPageParallel() throws Exception {
+        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+        for (int i = 0; i < ROW_COUNT; ++i) {
+            sql("INSERT INTO TEST VALUES (?, ?)", i, i);
+        }
+
+        final List<SqlRow> res = new ArrayList<>();
+
+        IgniteSql sql = CLUSTER_NODES.get(0).sql();
+        Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 
2).build();
+
+        AsyncResultSet ars0 = ses.executeAsync(null, "SELECT ID FROM 
TEST").get();
+        StreamSupport.stream(ars0.currentPage().spliterator(), 
false).forEach(res::add);
+
+        AtomicInteger cnt = new AtomicInteger();
+        ConcurrentHashMap<Integer, AsyncResultSet> results = new 
ConcurrentHashMap<>();
+
+        IgniteTestUtils.runMultiThreaded(
+                () -> {
+                    AsyncResultSet ars = 
ars0.fetchNextPage().toCompletableFuture().get();
+
+                    results.put(cnt.getAndIncrement(), ars);
+
+                    assertFalse(ars.hasMorePages());
+
+                    return null;
+                },
+                10,
+                "test-fetch");
+
+        AsyncResultSet ars1 = CollectionUtils.first(results.values());
+        StreamSupport.stream(ars1.currentPage().spliterator(), 
false).forEach(res::add);
+
+        // Check that all next page are same.
+        results.values().forEach(ars -> assertSame(ars1, ars));
+
+        assertThrowsWithCause(
+                () -> ars1.fetchNextPage().toCompletableFuture().get(),
+                IgniteSqlException.class,
+                "There are no more pages"
+        );
+
+        await(ars0.closeAsync());
+
+        // Check results
+        Set<Integer> rs = res.stream().map(r -> 
r.intValue(0)).collect(Collectors.toSet());
+
+        for (int i = 0; i < ROW_COUNT; ++i) {
+            assertTrue(rs.remove(i), "Results invalid: " + res);
+        }
+
+        assertTrue(rs.isEmpty());
+
+        checkSession(ses);
+    }
+
+    @Test
+    public void errors() {
+        IgniteSql sql = CLUSTER_NODES.get(0).sql();
+        Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 
2).build();
+
+        // Parse error.
+        {
+            CompletableFuture<AsyncResultSet> f = ses.executeAsync(null, 
"SELECT ID FROM");
+            assertThrowsWithCause(() -> f.get(), 
IgniteInternalException.class, "Failed to parse query");
+        }
+
+        // Multiple statements error.
+        {
+            CompletableFuture<AsyncResultSet> f = ses.executeAsync(null, 
"SELECT 1; SELECT 2");
+            assertThrowsWithCause(() -> f.get(), IgniteSqlException.class, 
"Multiple statements aren't allowed");
+        }
+
+        // Planning error.
+        {
+            CompletableFuture<AsyncResultSet> f = ses.executeAsync(null, 
"CREATE TABLE TEST (VAL INT)");
+            assertThrowsWithCause(() -> f.get(), IgniteException.class, "Table 
without PRIMARY KEY is not supported");
+        }
+
+        // Execute error.
+        {
+            CompletableFuture<AsyncResultSet> f = ses.executeAsync(null, 
"SELECT 1 / ?", 0);
+            assertThrowsWithCause(() -> f.get(), ArithmeticException.class, "/ 
by zero");
+        }
+
+        checkSession(ses);
+    }
+
+    @Test
+    public void closeSession() throws ExecutionException, InterruptedException 
{
+        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+        for (int i = 0; i < ROW_COUNT; ++i) {
+            sql("INSERT INTO TEST VALUES (?, ?)", i, i);
+        }
+
+        IgniteSql sql = CLUSTER_NODES.get(0).sql();
+        Session ses = sql.sessionBuilder().defaultPageSize(2).build();
+
+        AsyncResultSet ars0 = ses.executeAsync(null, "SELECT ID FROM 
TEST").get();
+
+        ses.closeAsync().get();
+
+        // Fetched page  is available after cancel.
+        ars0.currentPage();
+
+        assertThrowsWithCause(
+                () -> ars0.fetchNextPage().toCompletableFuture().get(),
+                ClosedCursorException.class
+        );
+
+        assertThrowsWithCause(
+                () -> ses.executeAsync(null, "SELECT ID FROM TEST").get(),
+                IgniteSqlException.class,
+                "Session is closed"
+        );
+
+        checkSession(ses);
+    }
+
+    private void checkDdl(boolean expectedApplied, Session ses, String sql) 
throws ExecutionException, InterruptedException {
+        CompletableFuture<AsyncResultSet> fut = ses.executeAsync(
+                null,
+                sql
+        );
+
+        AsyncResultSet asyncRes = fut.get();
+
+        assertEquals(expectedApplied, asyncRes.wasApplied());
+        assertFalse(asyncRes.hasMorePages());
+        assertFalse(asyncRes.hasRowSet());
+        assertEquals(-1, asyncRes.affectedRows());
+
+        asyncRes.closeAsync().toCompletableFuture().get();
+    }
+
+    private void checkError(Class<? extends Throwable> expectedException, 
String msg, Session ses, String sql, Object... args) {
+        CompletableFuture<AsyncResultSet> fut = ses.executeAsync(
+                null,
+                sql,
+                args
+        );
+
+        assertThrowsWithCause(fut::get, expectedException, msg);
+    }
+
+    private void checkDml(int expectedAffectedRows, Session ses, String sql, 
Object... args)
+            throws ExecutionException, InterruptedException {
+        CompletableFuture<AsyncResultSet> fut = ses.executeAsync(
+                null,
+                sql,
+                args
+        );
+
+        AsyncResultSet asyncRes = fut.get();
+
+        assertFalse(asyncRes.wasApplied());
+        assertFalse(asyncRes.hasMorePages());
+        assertFalse(asyncRes.hasRowSet());
+        assertEquals(expectedAffectedRows, asyncRes.affectedRows());
+
+        asyncRes.closeAsync().toCompletableFuture().get();
+    }
+
+    private void checkSession(Session s) {
+        assertTrue(((Set<?>) IgniteTestUtils.getFieldValue(s, 
"futsToClose")).isEmpty());
+        assertTrue(((Set<?>) IgniteTestUtils.getFieldValue(s, 
"cursToClose")).isEmpty());
+    }
+
+    static class TestPageProcessor implements
+            Function<AsyncResultSet, CompletionStage<AsyncResultSet>> {
+        private int expectedPages;
+
+        private final List<SqlRow> res = new ArrayList<>();
+
+        TestPageProcessor(int expectedPages) {
+            this.expectedPages = expectedPages;
+        }
+
+        @Override
+        public CompletionStage<AsyncResultSet> apply(AsyncResultSet rs) {
+            expectedPages--;
+
+            assertTrue(rs.hasRowSet());
+            assertFalse(rs.wasApplied());
+            assertEquals(-1L, rs.affectedRows());
+            assertEquals(expectedPages > 0, rs.hasMorePages(),
+                    "hasMorePages(): [expected=" + (expectedPages > 0) + ", 
actual=" + rs.hasMorePages() + ']');
+
+            rs.currentPage().forEach(res::add);
+
+            if (rs.hasMorePages()) {
+                return rs.fetchNextPage().thenCompose(this);
+            }
+
+            return rs.closeAsync().thenApply(v -> rs);
+        }
+
+        public List<SqlRow> result() {
+            return res;
+        }
+    }
+}
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 4becd794c..af669b885 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -61,6 +61,7 @@ import 
org.apache.ignite.internal.recovery.ConfigurationCatchUpListener;
 import org.apache.ignite.internal.recovery.RecoveryCompletionFutureFactory;
 import org.apache.ignite.internal.rest.RestComponent;
 import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.sql.api.IgniteSqlImpl;
 import org.apache.ignite.internal.sql.engine.QueryProcessor;
 import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
 import 
org.apache.ignite.internal.sql.engine.message.SqlQueryMessagesSerializationRegistryInitializer;
@@ -135,6 +136,9 @@ public class IgniteImpl implements Ignite {
     /** Sql query engine. */
     private final SqlQueryProcessor qryEngine;
 
+    /** Sql API facade. */
+    private final IgniteSql sql;
+
     /** Configuration manager that handles node (local) configuration. */
     private final ConfigurationManager nodeCfgMgr;
 
@@ -315,6 +319,8 @@ public class IgniteImpl implements Ignite {
                 () -> 
dataStorageModules.collectSchemasFields(modules.distributed().polymorphicSchemaExtensions())
         );
 
+        sql = new IgniteSqlImpl(qryEngine);
+
         compute = new IgniteComputeImpl(clusterSvc.topologyService(), 
distributedTblMgr, computeComponent);
 
         clientHandlerModule = new ClientHandlerModule(
@@ -507,7 +513,7 @@ public class IgniteImpl implements Ignite {
     /** {@inheritDoc} */
     @Override
     public IgniteSql sql() {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        return sql;
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
new file mode 100644
index 000000000..2c363696f
--- /dev/null
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.api;
+
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.BitSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.stream.Collectors;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.internal.sql.engine.AsyncCursor.BatchedResult;
+import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
+import org.apache.ignite.internal.sql.engine.ResultFieldMetadata;
+import org.apache.ignite.internal.sql.engine.SqlQueryType;
+import org.apache.ignite.internal.sql.engine.util.TransformingIterator;
+import org.apache.ignite.sql.NoRowSetExpectedException;
+import org.apache.ignite.sql.ResultSetMetadata;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.sql.async.AsyncResultSet;
+import org.apache.ignite.table.Tuple;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Asynchronous result set implementation.
+ */
+public class AsyncResultSetImpl implements AsyncResultSet {
+    private static final CompletableFuture<? extends AsyncResultSet> 
HAS_NO_MORE_PAGE_FUTURE =
+            CompletableFuture.failedFuture(new IgniteSqlException("There are 
no more pages."));
+
+    private final AsyncSqlCursor<List<Object>> cur;
+
+    private final BatchedResult<List<Object>> batchPage;
+
+    private final int pageSize;
+
+    private final Runnable closeRun;
+
+    private final Object mux = new Object();
+
+    private volatile CompletionStage<? extends AsyncResultSet> next;
+
+    /**
+     * Constructor.
+     *
+     * @param cur Asynchronous query cursor.
+     */
+    public AsyncResultSetImpl(AsyncSqlCursor<List<Object>> cur, 
BatchedResult<List<Object>> page, int pageSize, Runnable closeRun) {
+        this.cur = cur;
+        this.batchPage = page;
+        this.pageSize = pageSize;
+        this.closeRun = closeRun;
+
+        assert cur.queryType() == SqlQueryType.QUERY
+                || ((cur.queryType() == SqlQueryType.DML || cur.queryType() == 
SqlQueryType.DDL)
+                && batchPage.items().size() == 1
+                && batchPage.items().get(0).size() == 1
+                && !batchPage.hasMore()) : "Invalid query result: [type=" + 
cur.queryType() + "res=" + batchPage + ']';
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public @Nullable ResultSetMetadata metadata() {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean hasRowSet() {
+        return cur.queryType() == SqlQueryType.QUERY || cur.queryType() == 
SqlQueryType.EXPLAIN;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long affectedRows() {
+        if (cur.queryType() != SqlQueryType.DML) {
+            return -1;
+        }
+
+        assert batchPage.items().get(0).get(0) instanceof Long : "Invalid DML 
result: " + batchPage;
+
+        return (long) batchPage.items().get(0).get(0);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean wasApplied() {
+        if (cur.queryType() != SqlQueryType.DDL) {
+            return false;
+        }
+
+        assert batchPage.items().get(0).get(0) instanceof Boolean : "Invalid 
DDL result: " + batchPage;
+
+        return (boolean) batchPage.items().get(0).get(0);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Iterable<SqlRow> currentPage() {
+        if (!hasRowSet()) {
+            throw new NoRowSetExpectedException("Query hasn't result set: 
[type=" + cur.queryType() + ']');
+        }
+
+        return () -> new TransformingIterator<>(batchPage.items().iterator(), 
SqlRowImpl::new);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletionStage<? extends AsyncResultSet> fetchNextPage() {
+        if (next == null) {
+            synchronized (mux) {
+                if (next == null) {
+                    if (!hasMorePages()) {
+                        next = HAS_NO_MORE_PAGE_FUTURE;
+                    } else {
+                        next = cur.requestNextAsync(pageSize)
+                                .thenApply(batchRes -> new 
AsyncResultSetImpl(cur, batchRes, pageSize, closeRun));
+                    }
+                }
+            }
+        }
+
+        return next;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean hasMorePages() {
+        return batchPage.hasMore();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletionStage<Void> closeAsync() {
+        return cur.closeAsync().thenRun(closeRun);
+    }
+
+    private class SqlRowImpl implements SqlRow {
+        private final List<Object> row;
+
+        private final Map<String, Integer> fields;
+
+        org.apache.ignite.internal.sql.engine.ResultSetMetadata meta = 
cur.metadata();
+
+        SqlRowImpl(List<Object> row) {
+            this.row = row;
+            fields = meta.fields().stream()
+                    .collect(Collectors.toMap(ResultFieldMetadata::name, 
ResultFieldMetadata::order));
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public int columnCount() {
+            return meta.fields().size();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public String columnName(int columnIndex) {
+            return meta.fields().get(columnIndex).name();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public int columnIndex(@NotNull String columnName) {
+            return fields.getOrDefault(columnName, -1);
+        }
+
+        private int columnIndexChecked(@NotNull String columnName) {
+            int idx = columnIndex(columnName);
+
+            if (idx == -1) {
+                throw new IllegalArgumentException("Column doesn't exist 
[name=" + columnName + ']');
+            }
+
+            return idx;
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("unchecked")
+        @Override
+        public <T> T valueOrDefault(@NotNull String columnName, T 
defaultValue) {
+            T ret = (T) row.get(columnIndexChecked(columnName));
+
+            return ret != null ? ret : defaultValue;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public Tuple set(@NotNull String columnName, Object value) {
+            throw new UnsupportedOperationException("Operation not 
supported.");
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("unchecked")
+        @Override
+        public <T> T value(@NotNull String columnName) throws 
IllegalArgumentException {
+            return (T) row.get(columnIndexChecked(columnName));
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("unchecked")
+        @Override
+        public <T> T value(int columnIndex) {
+            return (T) row.get(columnIndex);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public BinaryObject binaryObjectValue(@NotNull String columnName) {
+            return (BinaryObject) row.get(columnIndexChecked(columnName));
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public BinaryObject binaryObjectValue(int columnIndex) {
+            return (BinaryObject) row.get(columnIndex);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public byte byteValue(@NotNull String columnName) {
+            return (byte) row.get(columnIndexChecked(columnName));
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public byte byteValue(int columnIndex) {
+            return (byte) row.get(columnIndex);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public short shortValue(@NotNull String columnName) {
+            return (short) row.get(columnIndexChecked(columnName));
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public short shortValue(int columnIndex) {
+            return (short) row.get(columnIndex);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public int intValue(@NotNull String columnName) {
+            return (int) row.get(columnIndexChecked(columnName));
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public int intValue(int columnIndex) {
+            return (int) row.get(columnIndex);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public long longValue(@NotNull String columnName) {
+            return (long) row.get(columnIndexChecked(columnName));
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public long longValue(int columnIndex) {
+            return (long) row.get(columnIndex);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public float floatValue(@NotNull String columnName) {
+            return (float) row.get(columnIndexChecked(columnName));
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public float floatValue(int columnIndex) {
+            return (float) row.get(columnIndex);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public double doubleValue(@NotNull String columnName) {
+            return (double) row.get(columnIndexChecked(columnName));
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public double doubleValue(int columnIndex) {
+            return (double) row.get(columnIndex);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public String stringValue(@NotNull String columnName) {
+            return (String) row.get(columnIndexChecked(columnName));
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public String stringValue(int columnIndex) {
+            return (String) row.get(columnIndex);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public UUID uuidValue(@NotNull String columnName) {
+            return (UUID) row.get(columnIndexChecked(columnName));
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public UUID uuidValue(int columnIndex) {
+            return (UUID) row.get(columnIndex);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public BitSet bitmaskValue(@NotNull String columnName) {
+            return (BitSet) row.get(columnIndexChecked(columnName));
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public BitSet bitmaskValue(int columnIndex) {
+            return (BitSet) row.get(columnIndex);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public LocalDate dateValue(String columnName) {
+            return (LocalDate) row.get(columnIndexChecked(columnName));
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public LocalDate dateValue(int columnIndex) {
+            return (LocalDate) row.get(columnIndex);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public LocalTime timeValue(String columnName) {
+            return (LocalTime) row.get(columnIndexChecked(columnName));
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public LocalTime timeValue(int columnIndex) {
+            return (LocalTime) row.get(columnIndex);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public LocalDateTime datetimeValue(String columnName) {
+            return (LocalDateTime) row.get(columnIndexChecked(columnName));
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public LocalDateTime datetimeValue(int columnIndex) {
+            return (LocalDateTime) row.get(columnIndex);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public Instant timestampValue(String columnName) {
+            return (Instant) row.get(columnIndexChecked(columnName));
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public Instant timestampValue(int columnIndex) {
+            return (Instant) row.get(columnIndex);
+        }
+
+        /** {@inheritDoc} */
+        @NotNull
+        @Override
+        public Iterator<Object> iterator() {
+            return row.iterator();
+        }
+
+        @Override
+        public ResultSetMetadata metadata() {
+            throw new UnsupportedOperationException("Not implemented yet.");
+        }
+    }
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlException.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlException.java
new file mode 100644
index 000000000..72c510f21
--- /dev/null
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlException.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.api;
+
+import org.apache.ignite.lang.IgniteException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Ignite SQL exception.
+ */
+public class IgniteSqlException extends IgniteException {
+    /** Serial version uid. */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * Creates an empty exception.
+     */
+    public IgniteSqlException() {
+        // No-op.
+    }
+
+    /**
+     * Creates a new exception with the given error message.
+     *
+     * @param msg Error message.
+     */
+    public IgniteSqlException(String msg) {
+        super(msg);
+    }
+
+    /**
+     * Creates a new grid exception with the given throwable as a cause and 
source of error message.
+     *
+     * @param cause Non-null throwable cause.
+     */
+    public IgniteSqlException(Throwable cause) {
+        this(cause.getMessage(), cause);
+    }
+
+    /**
+     * Creates a new exception with the given error message and optional 
nested exception.
+     *
+     * @param msg   Error message.
+     * @param cause Optional nested exception (can be {@code null}).
+     */
+    public IgniteSqlException(String msg, @Nullable Throwable cause) {
+        super(msg, cause);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String toString() {
+        return getClass() + ": " + getMessage();
+    }
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
new file mode 100644
index 000000000..bfa20d851
--- /dev/null
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.api;
+
+import java.util.HashMap;
+import org.apache.ignite.internal.sql.engine.QueryProcessor;
+import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.Session.SessionBuilder;
+import org.apache.ignite.sql.Statement;
+import org.apache.ignite.sql.Statement.StatementBuilder;
+
+/**
+ * Embedded implementation of the Ignite SQL query facade.
+ */
+public class IgniteSqlImpl implements IgniteSql {
+    private final QueryProcessor qryProc;
+
+    /**
+     * Constructor.
+     *
+     * @param qryProc Query processor.
+     */
+    public IgniteSqlImpl(QueryProcessor qryProc) {
+        this.qryProc = qryProc;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Session createSession() {
+        return sessionBuilder().build();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public SessionBuilder sessionBuilder() {
+        return new SessionBuilderImpl(qryProc, new HashMap<>());
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Statement createStatement(String query) {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public StatementBuilder statementBuilder() {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionBuilderImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionBuilderImpl.java
new file mode 100644
index 000000000..224859b77
--- /dev/null
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionBuilderImpl.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.api;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.sql.engine.QueryProcessor;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.Session.SessionBuilder;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Session builder implementation.
+ */
+public class SessionBuilderImpl implements SessionBuilder {
+
+    public static final long DEFAULT_TIMEOUT = 0;
+
+    private final QueryProcessor qryProc;
+
+    private long timeout = DEFAULT_TIMEOUT;
+
+    private String schema = Session.DEFAULT_SCHEMA;
+
+    private int pageSize = Session.DEFAULT_PAGE_SIZE;
+
+    private final Map<String, Object> props;
+
+    /**
+     * Session builder constructor.
+     *
+     * @param qryProc SQL query processor.
+     * @param props Initial properties.
+     */
+    SessionBuilderImpl(QueryProcessor qryProc, Map<String, Object> props) {
+        this.qryProc = qryProc;
+        this.props = props;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long defaultTimeout(TimeUnit timeUnit) {
+        return timeUnit.convert(timeout, TimeUnit.NANOSECONDS);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public SessionBuilder defaultTimeout(long timeout, TimeUnit timeUnit) {
+        this.timeout = timeUnit.toNanos(timeout);
+
+        return this;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String defaultSchema() {
+        return schema;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public SessionBuilder defaultSchema(String schema) {
+        this.schema = schema;
+
+        return this;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int defaultPageSize() {
+        return pageSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public SessionBuilder defaultPageSize(int pageSize) {
+        this.pageSize = pageSize;
+
+        return this;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public @Nullable Object property(String name) {
+        return props.get(name);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public SessionBuilder property(String name, @Nullable Object value) {
+        props.put(name, value);
+
+        return this;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Session build() {
+        return new SessionImpl(
+                qryProc,
+                schema,
+                timeout,
+                pageSize,
+                props
+        );
+    }
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
new file mode 100644
index 000000000..8ee0e4ea4
--- /dev/null
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.api;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.sql.engine.AsyncCursor;
+import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
+import org.apache.ignite.internal.sql.engine.QueryContext;
+import org.apache.ignite.internal.sql.engine.QueryProcessor;
+import org.apache.ignite.internal.sql.engine.QueryTimeout;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.sql.BatchedArguments;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.Statement;
+import org.apache.ignite.sql.async.AsyncResultSet;
+import org.apache.ignite.sql.reactive.ReactiveResultSet;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Embedded implementation of the SQL session.
+ */
+public class SessionImpl implements Session {
+    /** Busy lock for close synchronisation. */
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final QueryProcessor qryProc;
+
+    private final long timeout;
+
+    private final String schema;
+
+    private final int pageSize;
+
+    private final Set<CompletableFuture<AsyncSqlCursor<List<Object>>>> 
futsToClose = Collections.newSetFromMap(new ConcurrentHashMap<>());
+
+    private final Set<AsyncSqlCursor<List<Object>>> cursToClose = 
Collections.newSetFromMap(new ConcurrentHashMap<>());
+
+    private final Map<String, Object> props;
+
+    /**
+     * Constructor.
+     *
+     * @param qryProc Query processor.
+     * @param schema  Query default schema.
+     * @param timeout Query default timeout.
+     * @param pageSize Query fetch page size.
+     * @param props Session's properties.
+     */
+    SessionImpl(
+            QueryProcessor qryProc,
+            String schema,
+            long timeout,
+            int pageSize,
+            Map<String, Object> props
+    ) {
+        this.qryProc = qryProc;
+        this.schema = schema;
+        this.timeout = timeout;
+        this.pageSize = pageSize;
+        this.props = props;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public ResultSet execute(@Nullable Transaction transaction, String query, 
@Nullable Object... arguments) {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public ResultSet execute(@Nullable Transaction transaction, Statement 
statement, @Nullable Object... arguments) {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int[] executeBatch(@Nullable Transaction transaction, String 
dmlQuery, BatchedArguments batch) {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int[] executeBatch(@Nullable Transaction transaction, Statement 
dmlStatement, BatchedArguments batch) {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void executeScript(String query, @Nullable Object... arguments) {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long defaultTimeout(TimeUnit timeUnit) {
+        return timeUnit.convert(timeout, TimeUnit.NANOSECONDS);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String defaultSchema() {
+        return schema;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int defaultPageSize() {
+        return pageSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public @Nullable Object property(String name) {
+        return props.get(name);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public SessionBuilder toBuilder() {
+        if (!busyLock.enterBusy()) {
+            throw new IgniteSqlException("Session is closed");
+        }
+
+        try {
+            return new SessionBuilderImpl(qryProc, props)
+                    .defaultPageSize(pageSize)
+                    .defaultTimeout(timeout, TimeUnit.NANOSECONDS)
+                    .defaultSchema(schema);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<AsyncResultSet> executeAsync(@Nullable 
Transaction transaction, String query, @Nullable Object... arguments) {
+        if (!busyLock.enterBusy()) {
+            return CompletableFuture.failedFuture(new 
IgniteSqlException("Session is closed."));
+        }
+
+        try {
+            QueryContext ctx = QueryContext.of(transaction, new 
QueryTimeout(timeout, TimeUnit.NANOSECONDS));
+
+            final CompletableFuture<AsyncSqlCursor<List<Object>>> f = 
qryProc.querySingleAsync(ctx, schema, query, arguments);
+
+            futsToClose.add(f);
+
+            return f.whenComplete(
+                    (cur, ex0) -> futsToClose.remove(f))
+                    .thenCompose(cur -> {
+                        if (!busyLock.enterBusy()) {
+                            return cur.closeAsync()
+                                    .thenCompose((v) -> 
CompletableFuture.failedFuture(new IgniteSqlException("Session is closed")));
+                        }
+
+                        try {
+                            cursToClose.add(cur);
+
+                            return cur.requestNextAsync(pageSize)
+                                    .<AsyncResultSet>thenApply(
+                                            batchRes -> new AsyncResultSetImpl(
+                                                    cur,
+                                                    batchRes,
+                                                    pageSize,
+                                                    () -> 
cursToClose.remove(cur)
+                                            )
+                                    )
+                                    .whenComplete((ars, ex1) -> {
+                                        if (ex1 != null) {
+                                            cursToClose.remove(cur);
+
+                                            cur.closeAsync();
+                                        }
+                                    });
+                        } catch (Throwable e) {
+                            cursToClose.remove(cur);
+
+                            return cur.closeAsync()
+                                    .thenCompose((v) -> 
CompletableFuture.failedFuture(e));
+                        } finally {
+                            busyLock.leaveBusy();
+                        }
+                    }
+            );
+        } catch (Exception e) {
+            return CompletableFuture.failedFuture(e);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<AsyncResultSet> executeAsync(
+            @Nullable Transaction transaction,
+            Statement statement,
+            @Nullable Object... arguments
+    ) {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<int[]> executeBatchAsync(@Nullable Transaction 
transaction, String query, BatchedArguments batch) {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<int[]> executeBatchAsync(@Nullable Transaction 
transaction, Statement statement, BatchedArguments batch) {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> executeScriptAsync(String query, @Nullable 
Object... arguments) {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public ReactiveResultSet executeReactive(@Nullable Transaction 
transaction, String query, @Nullable Object... arguments) {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    @Override
+    public ReactiveResultSet executeReactive(@Nullable Transaction 
transaction, Statement statement, @Nullable Object... arguments) {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Publisher<Integer> executeBatchReactive(@Nullable Transaction 
transaction, String query, BatchedArguments batch) {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Publisher<Integer> executeBatchReactive(@Nullable Transaction 
transaction, Statement statement, BatchedArguments batch) {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void close() {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> closeAsync() {
+        return CompletableFuture
+                .runAsync(busyLock::block)
+                .thenCompose(
+                        v0 -> {
+                            futsToClose.forEach(f -> f.cancel(false));
+
+                            return CompletableFuture.allOf(
+                                            
cursToClose.stream().map(AsyncCursor::closeAsync).toArray(CompletableFuture[]::new)
+                                    )
+                                    .whenComplete((v, e) -> 
cursToClose.clear());
+                        }
+                );
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Publisher<Void> closeReactive() {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java
index 74ec29027..14ccd191d 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java
@@ -48,6 +48,21 @@ public interface QueryProcessor extends IgniteComponent {
      * @return List of sql cursors.
      *
      * @throws IgniteException in case of an error.
-     * */
+     */
     List<CompletableFuture<AsyncSqlCursor<List<Object>>>> 
queryAsync(QueryContext context, String schemaName, String qry, Object... 
params);
+
+    /**
+     * Execute the single statement query with given schema name and 
parameters.
+     *
+     * <p>If the query string contains more than one statement the 
IgniteException will be thrown.
+     *
+     * @param context User query context.
+     * @param schemaName Schema name.
+     * @param qry Single statement SQL query .
+     * @param params Query parameters.
+     * @return Sql cursor.
+     *
+     * @throws IgniteException in case of an error.
+     */
+    CompletableFuture<AsyncSqlCursor<List<Object>>> 
querySingleAsync(QueryContext context, String schemaName, String qry, Object... 
params);
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryTimeout.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryTimeout.java
new file mode 100644
index 000000000..132a89bb1
--- /dev/null
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryTimeout.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Query timeout.
+ */
+public class QueryTimeout {
+    /** Timeout. */
+    private final long timeoutNanos;
+
+    /**
+     * Constructor.
+     *
+     * @param timeout Query timeout value.
+     * @param timeUnit Timeunit.
+     */
+    public QueryTimeout(long timeout, TimeUnit timeUnit) {
+        this.timeoutNanos = timeUnit.toNanos(timeout);
+    }
+
+    /**
+     * Return query timeout.
+     *
+     * @param timeUnit Timeunit to convert timeout to.
+     * @return Default query timeout in the given timeunit.
+     */
+    public long timeout(TimeUnit timeUnit) {
+        return timeUnit.convert(timeoutNanos, TimeUnit.NANOSECONDS);
+    }
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index be5c4aa32..11394e6ef 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -36,6 +37,7 @@ import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.tools.Frameworks;
 import org.apache.calcite.util.Pair;
 import org.apache.ignite.internal.manager.EventListener;
+import org.apache.ignite.internal.sql.api.IgniteSqlException;
 import org.apache.ignite.internal.sql.engine.exec.ArrayRowHandler;
 import org.apache.ignite.internal.sql.engine.exec.ExchangeServiceImpl;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionService;
@@ -228,6 +230,78 @@ public class SqlQueryProcessor implements QueryProcessor {
         }
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<AsyncSqlCursor<List<Object>>> 
querySingleAsync(QueryContext context, String schemaName, String qry,
+            Object... params) {
+        if (!busyLock.enterBusy()) {
+            throw new IgniteInternalException(new NodeStoppingException());
+        }
+
+        try {
+            return querySingle0(context, schemaName, qry, params);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    private CompletableFuture<AsyncSqlCursor<List<Object>>> querySingle0(
+            QueryContext context,
+            String schemaName,
+            String sql,
+            Object... params) {
+        SchemaPlus schema = schemaManager.schema(schemaName);
+
+        if (schema == null) {
+            return CompletableFuture.failedFuture(new 
IgniteInternalException(format("Schema not found [schemaName={}]", 
schemaName)));
+        }
+
+        final BaseQueryContext ctx = BaseQueryContext.builder()
+                .cancel(new QueryCancel())
+                .frameworkConfig(
+                        Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
+                                .defaultSchema(schema)
+                                .build()
+                )
+                .logger(LOG)
+                .parameters(params)
+                .build();
+
+        CompletableFuture<Void> start = new CompletableFuture<>();
+
+        CompletableFuture<AsyncSqlCursor<List<Object>>> stage = 
start.thenApply(
+                        (v) -> Commons.parse(sql, 
FRAMEWORK_CONFIG.getParserConfig())
+                )
+                .thenApply(nodes -> {
+                    if (nodes.size() > 1) {
+                        throw new IgniteSqlException("Multiple statements 
aren't allowed.");
+                    }
+
+                    return nodes.get(0);
+                })
+                .thenCompose(sqlNode -> prepareSvc.prepareAsync(sqlNode, ctx))
+                .thenApply(plan -> {
+                    context.maybeUnwrap(QueryValidator.class)
+                            .ifPresent(queryValidator -> 
queryValidator.validatePlan(plan));
+
+                    return new AsyncSqlCursorImpl<>(
+                            SqlQueryType.mapPlanTypeToSqlType(plan.type()),
+                            plan.metadata(),
+                            executionSrvc.executePlan(plan, ctx)
+                    );
+                });
+
+        stage.whenComplete((cur, ex) -> {
+            if (ex instanceof CancellationException) {
+                ctx.cancel().cancel();
+            }
+        });
+
+        start.completeAsync(() -> null, taskExecutor);
+
+        return stage;
+    }
+
     private List<CompletableFuture<AsyncSqlCursor<List<Object>>>> query0(
             QueryContext context,
             String schemaName,
@@ -247,7 +321,7 @@ public class SqlQueryProcessor implements QueryProcessor {
         CompletableFuture<Void> start = new CompletableFuture<>();
 
         for (SqlNode sqlNode : nodes) {
-            BaseQueryContext ctx = BaseQueryContext.builder()
+            final BaseQueryContext ctx = BaseQueryContext.builder()
                     .cancel(new QueryCancel())
                     .frameworkConfig(
                             Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
@@ -270,6 +344,12 @@ public class SqlQueryProcessor implements QueryProcessor {
                         );
                     });
 
+            stage.whenComplete((cur, ex) -> {
+                if (ex instanceof CancellationException) {
+                    ctx.cancel().cancel();
+                }
+            });
+
             res.add(stage);
         }
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index bb379faaa..d919a0281 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -239,13 +239,13 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
 
     private AsyncCursor<List<Object>> executeDdl(DdlPlan plan) {
         try {
-            ddlCmdHnd.handle(plan.command());
+            boolean ret = ddlCmdHnd.handle(plan.command());
+
+            return new 
AsyncWrapper<>(Collections.singletonList(Collections.<Object>singletonList(ret)).iterator());
         } catch (IgniteInternalCheckedException e) {
             throw new IgniteInternalException("Failed to execute DDL statement 
[stmt=" /*+ qry.sql()*/
                     + ", err=" + e.getMessage() + ']', e);
         }
-
-        return new AsyncWrapper<>(Collections.emptyIterator());
     }
 
     private AsyncCursor<List<Object>> executeExplain(ExplainPlan plan) {
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
index de86acc1c..64af405e7 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
@@ -26,6 +26,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import org.apache.ignite.configuration.NamedListView;
@@ -81,21 +82,21 @@ public class DdlCommandHandler {
     }
 
     /** Handles ddl commands. */
-    public void handle(DdlCommand cmd) throws IgniteInternalCheckedException {
+    public boolean handle(DdlCommand cmd) throws 
IgniteInternalCheckedException {
         validateCommand(cmd);
 
         if (cmd instanceof CreateTableCommand) {
-            handleCreateTable((CreateTableCommand) cmd);
+            return handleCreateTable((CreateTableCommand) cmd);
         } else if (cmd instanceof DropTableCommand) {
-            handleDropTable((DropTableCommand) cmd);
+            return handleDropTable((DropTableCommand) cmd);
         } else if (cmd instanceof AlterTableAddCommand) {
-            handleAlterAddColumn((AlterTableAddCommand) cmd);
+            return handleAlterAddColumn((AlterTableAddCommand) cmd);
         } else if (cmd instanceof AlterTableDropCommand) {
-            handleAlterDropColumn((AlterTableDropCommand) cmd);
+            return handleAlterDropColumn((AlterTableDropCommand) cmd);
         } else if (cmd instanceof CreateIndexCommand) {
-            handleCreateIndex((CreateIndexCommand) cmd);
+            return handleCreateIndex((CreateIndexCommand) cmd);
         } else if (cmd instanceof DropIndexCommand) {
-            handleDropIndex((DropIndexCommand) cmd);
+            return handleDropIndex((DropIndexCommand) cmd);
         } else {
             throw new IgniteInternalCheckedException("Unsupported DDL 
operation ["
                     + "cmdName=" + (cmd == null ? null : 
cmd.getClass().getSimpleName()) + "; "
@@ -115,7 +116,7 @@ public class DdlCommandHandler {
     }
 
     /** Handles create table command. */
-    private void handleCreateTable(CreateTableCommand cmd) {
+    private boolean handleCreateTable(CreateTableCommand cmd) {
         final PrimaryKeyDefinitionBuilder pkeyDef = 
SchemaBuilders.primaryKey();
 
         
pkeyDef.withColumns(IgniteObjectName.quoteNames(cmd.primaryKeyColumns()));
@@ -162,32 +163,40 @@ public class DdlCommandHandler {
 
         try {
             tableManager.createTable(fullName, tblChanger);
+
+            return true;
         } catch (TableAlreadyExistsException ex) {
             if (!cmd.ifTableExists()) {
                 throw ex;
+            } else {
+                return false;
             }
         }
     }
 
     /** Handles drop table command. */
-    private void handleDropTable(DropTableCommand cmd) {
+    private boolean handleDropTable(DropTableCommand cmd) {
         String fullName = TableDefinitionImpl.canonicalName(
                 IgniteObjectName.quote(cmd.schemaName()),
                 IgniteObjectName.quote(cmd.tableName())
         );
         try {
             tableManager.dropTable(fullName);
+
+            return true;
         } catch (TableNotFoundException ex) {
             if (!cmd.ifTableExists()) {
                 throw ex;
+            } else {
+                return false;
             }
         }
     }
 
     /** Handles add column command. */
-    private void handleAlterAddColumn(AlterTableAddCommand cmd) {
+    private boolean handleAlterAddColumn(AlterTableAddCommand cmd) {
         if (nullOrEmpty(cmd.columns())) {
-            return;
+            return false;
         }
 
         String fullName = TableDefinitionImpl.canonicalName(
@@ -196,18 +205,20 @@ public class DdlCommandHandler {
         );
 
         try {
-            addColumnInternal(fullName, cmd.columns(), 
cmd.ifColumnNotExists());
+            return addColumnInternal(fullName, cmd.columns(), 
cmd.ifColumnNotExists());
         } catch (TableNotFoundException ex) {
             if (!cmd.ifTableExists()) {
                 throw ex;
+            } else {
+                return false;
             }
         }
     }
 
     /** Handles drop column command. */
-    private void handleAlterDropColumn(AlterTableDropCommand cmd) {
+    private boolean handleAlterDropColumn(AlterTableDropCommand cmd) {
         if (nullOrEmpty(cmd.columns())) {
-            return;
+            return false;
         }
 
         String fullName = TableDefinitionImpl.canonicalName(
@@ -216,16 +227,18 @@ public class DdlCommandHandler {
         );
 
         try {
-            dropColumnInternal(fullName, cmd.columns(), cmd.ifColumnExists());
+            return dropColumnInternal(fullName, cmd.columns(), 
cmd.ifColumnExists());
         } catch (TableNotFoundException ex) {
             if (!cmd.ifTableExists()) {
                 throw ex;
+            } else {
+                return false;
             }
         }
     }
 
     /** Handles create index command. */
-    private void handleCreateIndex(CreateIndexCommand cmd) {
+    private boolean handleCreateIndex(CreateIndexCommand cmd) {
         // Only sorted idx for now.
         SortedIndexDefinitionBuilder idx = 
SchemaBuilders.sortedIndex(cmd.indexName());
 
@@ -244,21 +257,29 @@ public class DdlCommandHandler {
                 IgniteObjectName.quote(cmd.tableName())
         );
 
+        AtomicBoolean ret = new AtomicBoolean();
+
         tableManager.alterTable(fullName, chng -> chng.changeIndices(idxes -> {
             if (idxes.get(cmd.indexName()) != null) {
                 if (!cmd.ifIndexNotExists()) {
                     throw new IndexAlreadyExistsException(cmd.indexName());
                 } else {
+                    ret.set(false);
+
                     return;
                 }
             }
 
             idxes.create(cmd.indexName(), tableIndexChange -> 
convert(idx.build(), tableIndexChange));
+
+            ret.set(true);
         }));
+
+        return ret.get();
     }
 
     /** Handles drop index command. */
-    private void handleDropIndex(DropIndexCommand cmd) {
+    private boolean handleDropIndex(DropIndexCommand cmd) {
         throw new UnsupportedOperationException("DROP INDEX command not 
supported for now.");
     }
 
@@ -268,8 +289,11 @@ public class DdlCommandHandler {
      * @param fullName Table with schema name.
      * @param colsDef  Columns defenitions.
      * @param colNotExist Flag indicates exceptionally behavior in case of 
already existing column.
+     *
+     * @return {@code true} if the full columns set is applied successfully. 
Otherwise, returns {@code false}.
      */
-    private void addColumnInternal(String fullName, List<ColumnDefinition> 
colsDef, boolean colNotExist) {
+    private boolean addColumnInternal(String fullName, List<ColumnDefinition> 
colsDef, boolean colNotExist) {
+        AtomicBoolean ret = new AtomicBoolean(true);
         tableManager.alterTable(
                 fullName,
                 chng -> chng.changeColumns(cols -> {
@@ -285,7 +309,15 @@ public class DdlCommandHandler {
 
                         colsDef0 = colsDef;
                     } else {
-                        colsDef0 = colsDef.stream().filter(k -> 
!colNamesToOrders.containsKey(k.name())).collect(Collectors.toList());
+                        colsDef0 = colsDef.stream().filter(k -> {
+                            if (colNamesToOrders.containsKey(k.name())) {
+                                ret.set(false);
+
+                                return false;
+                            } else {
+                                return true;
+                            }
+                        }).collect(Collectors.toList());
                     }
 
                     final IgniteTypeFactory typeFactory = 
Commons.typeFactory();
@@ -301,6 +333,8 @@ public class DdlCommandHandler {
                         cols.create(col.name(), colChg -> 
convert(col0.build(), colChg));
                     }
                 }));
+
+        return ret.get();
     }
 
     /**
@@ -309,8 +343,11 @@ public class DdlCommandHandler {
      * @param fullName Table with schema name.
      * @param colNames Columns definitions.
      * @param colExist Flag indicates exceptionally behavior in case of 
already existing column.
+     * @return {@code true} if the full columns set is applied successfully. 
Otherwise, returns {@code false}.
      */
-    private void dropColumnInternal(String fullName, Set<String> colNames, 
boolean colExist) {
+    private boolean dropColumnInternal(String fullName, Set<String> colNames, 
boolean colExist) {
+        AtomicBoolean ret = new AtomicBoolean(true);
+
         tableManager.alterTable(
                 fullName,
                 chng -> chng.changeColumns(cols -> {
@@ -324,6 +361,8 @@ public class DdlCommandHandler {
 
                     for (String colName : colNames) {
                         if (!colNamesToOrders.containsKey(colName)) {
+                            ret.set(false);
+
                             if (!colExist) {
                                 throw new ColumnNotFoundException(colName, 
fullName);
                             }
@@ -339,6 +378,8 @@ public class DdlCommandHandler {
 
                     colNames0.forEach(k -> 
cols.delete(colNamesToOrders.get(k)));
                 }));
+
+        return ret.get();
     }
 
     /** Map column name to order. */
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
index 9d0d8bb31..5a6a74132 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
@@ -48,6 +48,8 @@ import org.jetbrains.annotations.Nullable;
  * Holds actual schema and mutates it on schema change, requested by Ignite.
  */
 public class SqlSchemaManagerImpl implements SqlSchemaManager {
+    private static final String DEFAULT_SCHEMA_NAME = "PUBLIC";
+
     private final VersionedValue<Map<String, IgniteSchema>> schemasVv;
 
     private final VersionedValue<Map<UUID, IgniteTable>> tablesVv;
@@ -72,7 +74,7 @@ public class SqlSchemaManagerImpl implements SqlSchemaManager 
{
 
         calciteSchemaVv = new VersionedValue<>(registry, () -> {
             SchemaPlus newCalciteSchema = Frameworks.createRootSchema(false);
-            newCalciteSchema.add("PUBLIC", new IgniteSchema("PUBLIC"));
+            newCalciteSchema.add(DEFAULT_SCHEMA_NAME, new 
IgniteSchema(DEFAULT_SCHEMA_NAME));
             return newCalciteSchema;
         });
 
@@ -84,7 +86,7 @@ public class SqlSchemaManagerImpl implements SqlSchemaManager 
{
     public SchemaPlus schema(@Nullable String schema) {
         SchemaPlus schemaPlus = calciteSchemaVv.latest();
 
-        return schema != null ? schemaPlus.getSubSchema(schema) : schemaPlus;
+        return schema != null ? schemaPlus.getSubSchema(schema) : 
schemaPlus.getSubSchema(DEFAULT_SCHEMA_NAME);
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/IgniteSqlApiTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/IgniteSqlApiTest.java
index 19be69538..0a6b08463 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/IgniteSqlApiTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/IgniteSqlApiTest.java
@@ -132,12 +132,12 @@ public class IgniteSqlApiTest {
 
         assertTrue(rs.wasApplied());
         assertFalse(rs.hasRowSet());
-        assertEquals(-1, rs.affectedRows());
+        assertEquals(-1L, rs.affectedRows());
 
         // Execute DML.
         rs = sess.execute(null, "INSERT INTO tbl VALUES (?, ?)", 1, "str1");
 
-        assertEquals(1, rs.affectedRows());
+        assertEquals(1L, rs.affectedRows());
         assertFalse(rs.wasApplied());
         assertFalse(rs.hasRowSet());
 
@@ -153,7 +153,7 @@ public class IgniteSqlApiTest {
 
         assertTrue(rs.hasRowSet());
         assertFalse(rs.wasApplied());
-        assertEquals(-1, rs.affectedRows());
+        assertEquals(-1L, rs.affectedRows());
 
         assertTrue(rs.iterator().hasNext());
         for (SqlRow r : rs) {
@@ -163,7 +163,7 @@ public class IgniteSqlApiTest {
         // Execute DML.
         rs = sess.execute(null, "DELETE FROM tbl");
 
-        assertEquals(4, rs.affectedRows());
+        assertEquals(4L, rs.affectedRows());
         assertFalse(rs.wasApplied());
         assertFalse(rs.hasRowSet());
     }
@@ -205,7 +205,7 @@ public class IgniteSqlApiTest {
             // Execute DML outside tx in same session after tx commit.
             rs = sess.execute(null, "DELETE FROM tbl");
 
-            assertEquals(4, rs.affectedRows());
+            assertEquals(4L, rs.affectedRows());
         });
 
         Mockito.verify(transaction).commit();
@@ -451,7 +451,7 @@ public class IgniteSqlApiTest {
                     
Mockito.when(res.iterator()).thenThrow(AssertionError.class);
                     Mockito.when(res.wasApplied()).thenReturn(false);
                     Mockito.when(res.hasRowSet()).thenReturn(false);
-                    Mockito.when(res.affectedRows()).thenReturn(1);
+                    Mockito.when(res.affectedRows()).thenReturn(1L);
 
                     return res;
                 });
@@ -474,7 +474,7 @@ public class IgniteSqlApiTest {
                     ResultSet res = Mockito.mock(ResultSet.class);
                     Mockito.when(res.wasApplied()).thenReturn(false);
                     Mockito.when(res.hasRowSet()).thenReturn(true);
-                    Mockito.when(res.affectedRows()).thenReturn(-1);
+                    Mockito.when(res.affectedRows()).thenReturn(-1L);
 
                     Transaction txArg = ans.getArgument(0);
                     Integer filterArg = ans.getArgument(2);
@@ -491,7 +491,7 @@ public class IgniteSqlApiTest {
 
                             oldState.forEach((k, v) -> state.put(k, null));
 
-                            return oldState.size();
+                            return (long) oldState.size();
                         })
                         .getMock());
 
@@ -500,7 +500,7 @@ public class IgniteSqlApiTest {
                     ResultSet res = Mockito.mock(ResultSet.class);
                     Mockito.when(res.wasApplied()).thenReturn(false);
                     Mockito.when(res.hasRowSet()).thenReturn(true);
-                    Mockito.when(res.affectedRows()).thenReturn(-1);
+                    Mockito.when(res.affectedRows()).thenReturn(-1L);
                     Mockito.when(res.iterator())
                             .thenReturn(List.of(
                                     createRow(2, "str2").build(),
@@ -515,7 +515,7 @@ public class IgniteSqlApiTest {
                     
Mockito.when(res.iterator()).thenThrow(AssertionError.class);
                     Mockito.when(res.wasApplied()).thenReturn(true);
                     Mockito.when(res.hasRowSet()).thenReturn(false);
-                    Mockito.when(res.affectedRows()).thenReturn(-1);
+                    Mockito.when(res.affectedRows()).thenReturn(-1L);
                     return res;
                 });
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index 493532df7..ca23ad08f 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.sql.engine.exec;
 import static 
org.apache.ignite.internal.sql.engine.util.BaseQueryContext.CLUSTER;
 import static 
org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFIG;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
-import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.sneakyThrow;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static org.apache.ignite.lang.IgniteStringFormatter.format;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -78,6 +77,7 @@ import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.testframework.IgniteTestUtils.RunnableX;
 import org.apache.ignite.internal.util.ArrayUtils;
 import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.network.NetworkMessage;
 import org.jetbrains.annotations.Nullable;
@@ -216,7 +216,7 @@ public class ExecutionServiceImplTest {
                             .build()
                     );
                 } catch (IgniteInternalCheckedException e) {
-                    sneakyThrow(e);
+                    throw new IgniteInternalException(e);
                 }
             } else {
                 original.onMessage(nodeId, msg);
@@ -528,7 +528,7 @@ public class ExecutionServiceImplTest {
                                         try {
                                             task.run();
                                         } catch (Throwable ex) {
-                                            sneakyThrow(ex);
+                                            throw new 
IgniteInternalException(ex);
                                         }
                                     }
                                 }
@@ -605,4 +605,4 @@ public class ExecutionServiceImplTest {
             }
         };
     }
-}
\ No newline at end of file
+}

Reply via email to