This is an automated email from the ASF dual-hosted git repository.

korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new c44bbbb229 IGNITE-20387: Remap most exceptions to SqlExceptions for 
SQL API (#2613)
c44bbbb229 is described below

commit c44bbbb229d44737fdb99fa29c3a8b4d5e4cc248
Author: ygerzhedovich <41903880+ygerzhedov...@users.noreply.github.com>
AuthorDate: Mon Oct 16 16:50:50 2023 +0300

    IGNITE-20387: Remap most exceptions to SqlExceptions for SQL API (#2613)
---
 .../internal/lang/IgniteExceptionMapperUtil.java   |  33 +-
 .../ignite/internal/sql/SyncResultSetAdapter.java  |   4 +-
 .../dotnet/Apache.Ignite.Tests/Sql/SqlTests.cs     |  14 +-
 .../dotnet/Apache.Ignite/Internal/Sql/Sql.cs       |   2 +-
 .../internal/runner/app/ItTablesApiTest.java       |  31 +-
 ...nchronousApiTest.java => ItSqlApiBaseTest.java} | 587 +++++++-------
 .../internal/sql/api/ItSqlAsynchronousApiTest.java | 851 ++-------------------
 .../sql/api/ItSqlClientAsynchronousApiTest.java    |   6 +
 .../sql/api/ItSqlClientSynchronousApiTest.java     |  24 +-
 .../internal/sql/api/ItSqlSynchronousApiTest.java  | 574 ++------------
 .../internal/sql/engine/ItCreateTableDdlTest.java  |   6 +-
 .../internal/lang/SqlExceptionMapperUtil.java      |  64 ++
 .../ignite/internal/sql/api/SessionImpl.java       |  12 +-
 .../internal/sql/engine/AsyncSqlCursorImpl.java    |   4 +-
 .../sql/engine/prepare/PrepareServiceImpl.java     |   4 +-
 .../internal/lang/SqlExceptionMapperUtilTest.java  |  78 ++
 .../internal/sql/engine/util/SqlTestUtils.java     |   8 +
 17 files changed, 623 insertions(+), 1679 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/lang/IgniteExceptionMapperUtil.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/lang/IgniteExceptionMapperUtil.java
index e5b43059aa..fbae12e574 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/lang/IgniteExceptionMapperUtil.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/lang/IgniteExceptionMapperUtil.java
@@ -52,8 +52,8 @@ public class IgniteExceptionMapperUtil {
      *
      * @param mapper Exception mapper from internal exception to a public one.
      * @param registeredMappings Already registered mappings.
-     * @throws IgniteException If a mapper for the given {@code clazz} already 
registered,
-     *      or {@code clazz} represents Java standard exception like {@link 
NullPointerException}, {@link IllegalArgumentException}.
+     * @throws IgniteException If a mapper for the given {@code clazz} already 
registered, or {@code clazz} represents Java standard
+     *         exception like {@link NullPointerException}, {@link 
IllegalArgumentException}.
      */
     static void registerMapping(
             IgniteExceptionMapper<?, ?> mapper,
@@ -87,23 +87,30 @@ public class IgniteExceptionMapperUtil {
             if (origin instanceof AssertionError) {
                 return new IgniteException(INTERNAL_ERR, origin);
             }
-
             return origin;
         }
 
-        IgniteExceptionMapper<? extends Exception, ? extends Exception> m = 
EXCEPTION_CONVERTERS.get(origin.getClass());
+        Throwable res;
+
+        // Try to find appropriate mapper, moving from original class to 
supper-classes step by step.
+        Class exceptionClass = origin.getClass();
+        IgniteExceptionMapper<? extends Exception, ? extends Exception> m;
+        while ((m = EXCEPTION_CONVERTERS.get(exceptionClass)) == null && 
exceptionClass != Throwable.class) {
+            exceptionClass = exceptionClass.getSuperclass();
+        }
+
         if (m != null) {
-            Exception mapped = map(m, origin);
+            res = map(m, origin);
 
-            assert mapped instanceof IgniteException || mapped instanceof 
IgniteCheckedException :
-                    "Unexpected mapping of internal exception to a public one 
[origin=" + origin + ", mapped=" + mapped + ']';
+            assert res instanceof IgniteException || res instanceof 
IgniteCheckedException :
+                    "Unexpected mapping of internal exception to a public one 
[origin=" + origin + ", mapped=" + res + ']';
 
-            return mapped;
+        } else {
+            res = origin;
         }
 
-        if (origin instanceof IgniteException || origin instanceof 
IgniteCheckedException) {
-
-            return origin;
+        if (res instanceof IgniteException || res instanceof 
IgniteCheckedException) {
+            return res;
         }
 
         // There are no exception mappings for the given exception. This case 
should be considered as internal error.
@@ -111,8 +118,8 @@ public class IgniteExceptionMapperUtil {
     }
 
     /**
-     * Returns a new CompletableFuture that, when the given {@code origin} 
future completes exceptionally,
-     * maps the origin's exception to a public Ignite exception if it is 
needed.
+     * Returns a new CompletableFuture that, when the given {@code origin} 
future completes exceptionally, maps the origin's exception to a
+     * public Ignite exception if it is needed.
      *
      * @param origin The future to use to create a new stage.
      * @param <T> Type os result.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/sql/SyncResultSetAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/sql/SyncResultSetAdapter.java
index 0854513ad3..3acd814f7b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/sql/SyncResultSetAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/sql/SyncResultSetAdapter.java
@@ -32,7 +32,7 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Synchronous wrapper over {@link org.apache.ignite.sql.async.AsyncResultSet}.
  */
-class SyncResultSetAdapter<T> implements ResultSet<T> {
+public class SyncResultSetAdapter<T> implements ResultSet<T> {
     /** Wrapped async result set. */
     private final AsyncResultSet<T> ars;
 
@@ -44,7 +44,7 @@ class SyncResultSetAdapter<T> implements ResultSet<T> {
      *
      * @param ars Asynchronous result set.
      */
-    SyncResultSetAdapter(AsyncResultSet<T> ars) {
+    public SyncResultSetAdapter(AsyncResultSet<T> ars) {
         assert ars != null;
 
         this.ars = ars;
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Sql/SqlTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Sql/SqlTests.cs
index 8b5ff71468..7fa4842d28 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Sql/SqlTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Sql/SqlTests.cs
@@ -334,18 +334,14 @@ namespace Apache.Ignite.Tests.Sql
         public void TestInvalidSqlThrowsException()
         {
             var ex = Assert.ThrowsAsync<SqlException>(async () => await 
Client.Sql.ExecuteAsync(null, "select x from bad"));
-            StringAssert.Contains("Invalid query, check inner exceptions for 
details: select x from bad", ex!.Message);
 
-            var innerEx = ex.InnerException;
-            Assert.IsInstanceOf<SqlException>(innerEx);
-            StringAssert.Contains("From line 1, column 15 to line 1, column 
17: Object 'BAD' not found", innerEx!.Message);
+            StringAssert.Contains("From line 1, column 15 to line 1, column 
17: Object 'BAD' not found", ex!.Message);
         }
 
         [Test]
         public void TestCreateTableExistsThrowsException()
         {
-            // TODO: IGNITE-20388 Fix it
-            var ex = Assert.ThrowsAsync<IgniteException>(
+            var ex = Assert.ThrowsAsync<SqlException>(
                 async () => await Client.Sql.ExecuteAsync(null, "CREATE TABLE 
TEST(ID INT PRIMARY KEY)"));
 
             StringAssert.Contains("Table with name 'PUBLIC.TEST' already 
exists", ex!.Message);
@@ -354,8 +350,7 @@ namespace Apache.Ignite.Tests.Sql
         [Test]
         public void TestAlterTableNotFoundThrowsException()
         {
-            // TODO: IGNITE-20388 Fix it
-            var ex = Assert.ThrowsAsync<IgniteException>(
+            var ex = Assert.ThrowsAsync<SqlException>(
                 async () => await Client.Sql.ExecuteAsync(null, "ALTER TABLE 
NOT_EXISTS_TABLE ADD COLUMN VAL1 VARCHAR"));
 
             StringAssert.Contains("Table with name 'PUBLIC.NOT_EXISTS_TABLE' 
not found", ex!.Message);
@@ -364,11 +359,10 @@ namespace Apache.Ignite.Tests.Sql
         [Test]
         public void TestAlterTableColumnExistsThrowsException()
         {
-            // TODO: IGNITE-20388 Fix it
             var ex = Assert.ThrowsAsync<SqlException>(
                 async () => await Client.Sql.ExecuteAsync(null, "ALTER TABLE 
TEST ADD COLUMN ID INT"));
 
-            StringAssert.Contains("Invalid query, check inner exceptions for 
details: ALTER TABLE TEST ADD COLUMN ID INT", ex!.Message);
+            StringAssert.Contains("Failed to validate query. Column with name 
'ID' already exists", ex!.Message);
         }
 
         [Test]
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs
index ddc3735734..cac8f1529b 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs
@@ -144,7 +144,7 @@ namespace Apache.Ignite.Internal.Sql
                 // ResultSet will dispose the pooled buffer.
                 return new ResultSet<T>(socket, buf, rowReaderFactory);
             }
-            catch (SqlException e) when (e.Code == 
ErrorGroups.Sql.StmtValidation || e.Code == ErrorGroups.Sql.StmtParse)
+            catch (SqlException e) when (e.Code == ErrorGroups.Sql.StmtParse)
             {
                 throw new SqlException(
                     e.TraceId,
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
index 7a4c8ff7c3..8b72c52abf 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.runner.app;
 
 import static java.util.concurrent.CompletableFuture.runAsync;
 import static java.util.concurrent.CompletableFuture.supplyAsync;
+import static 
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException;
 import static 
org.apache.ignite.internal.test.WatchListenerInhibitor.metastorageEventsInhibitor;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
@@ -42,7 +43,6 @@ import org.apache.ignite.InitParameters;
 import org.apache.ignite.internal.catalog.CatalogValidationException;
 import org.apache.ignite.internal.catalog.IndexExistsValidationException;
 import org.apache.ignite.internal.catalog.TableExistsValidationException;
-import org.apache.ignite.internal.catalog.TableNotFoundValidationException;
 import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.table.IgniteTablesInternal;
 import org.apache.ignite.internal.table.TableImpl;
@@ -50,6 +50,7 @@ import org.apache.ignite.internal.test.WatchListenerInhibitor;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.testframework.TestIgnitionManager;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.ErrorGroups.Sql;
 import org.apache.ignite.sql.Session;
 import org.apache.ignite.table.Table;
 import org.apache.ignite.table.Tuple;
@@ -140,8 +141,10 @@ public class ItTablesApiTest extends IgniteAbstractTest {
 
         Table tbl = createTable(ignite0, TABLE_NAME);
 
-        // TODO: IGNITE-20388 Fix it
-        assertThrowsWithCause(() -> createTable(ignite0, TABLE_NAME), 
TableExistsValidationException.class);
+        assertThrowsSqlException(
+                Sql.STMT_VALIDATION_ERR,
+                "Table with name 'PUBLIC.TBL1' already exists",
+                () -> createTable(ignite0, TABLE_NAME));
 
         assertEquals(tbl, createTableIfNotExists(ignite0, TABLE_NAME));
     }
@@ -152,7 +155,7 @@ public class ItTablesApiTest extends IgniteAbstractTest {
      * @throws Exception If failed.
      */
     @Test
-    public void testTableAlreadyCreatedFromLaggedNode() throws Exception {
+    public void testTableAlreadyCreatedFromLaggedNode() {
         clusterNodes.forEach(ign -> 
assertNull(ign.tables().table(TABLE_NAME)));
 
         Ignite ignite0 = clusterNodes.get(0);
@@ -170,8 +173,10 @@ public class ItTablesApiTest extends IgniteAbstractTest {
 
         for (Ignite ignite : clusterNodes) {
             if (ignite != ignite1) {
-                // TODO: IGNITE-20388 Fix it
-                assertThrowsWithCause(() -> createTable(ignite, TABLE_NAME), 
TableExistsValidationException.class);
+                assertThrowsSqlException(
+                        Sql.STMT_VALIDATION_ERR,
+                        "Table with name 'PUBLIC.TBL1' already exists",
+                        () -> createTable(ignite, TABLE_NAME));
 
                 assertNotNull(createTableIfNotExists(ignite, TABLE_NAME));
             }
@@ -182,7 +187,6 @@ public class ItTablesApiTest extends IgniteAbstractTest {
 
         ignite1Inhibitor.stopInhibit();
 
-        // TODO: IGNITE-20388 Fix it
         assertThat(createTblFut, 
willThrowWithCauseOrSuppressed(TableExistsValidationException.class));
         assertThat(createTblIfNotExistsFut, willCompleteSuccessfully());
     }
@@ -328,8 +332,10 @@ public class ItTablesApiTest extends IgniteAbstractTest {
 
         for (Ignite ignite : clusterNodes) {
             if (ignite != ignite1) {
-                // TODO: IGNITE-20388 Fix it
-                assertThrowsWithCause(() -> addColumn(ignite, TABLE_NAME), 
CatalogValidationException.class);
+                assertThrowsSqlException(
+                        Sql.STMT_VALIDATION_ERR,
+                        "Failed to validate query. Column with name 'VALINT3' 
already exists",
+                        () -> addColumn(ignite, TABLE_NAME));
             }
         }
 
@@ -337,7 +343,6 @@ public class ItTablesApiTest extends IgniteAbstractTest {
 
         ignite1Inhibitor.stopInhibit();
 
-        // TODO: IGNITE-20388 Fix it
         assertThat(addColFut, 
willThrowWithCauseOrSuppressed(CatalogValidationException.class));
     }
 
@@ -400,8 +405,10 @@ public class ItTablesApiTest extends IgniteAbstractTest {
 
             assertNull(((IgniteTablesInternal) ignite.tables()).table(tblId));
 
-            // TODO: IGNITE-20388 Fix it
-            assertThrowsWithCause(() -> dropTable(ignite, TABLE_NAME), 
TableNotFoundValidationException.class);
+            assertThrowsSqlException(
+                    Sql.STMT_VALIDATION_ERR,
+                    "Table with name 'PUBLIC.TBL1' not found",
+                    () -> dropTable(ignite, TABLE_NAME));
 
             dropTableIfExists(ignite, TABLE_NAME);
         }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java
similarity index 67%
copy from 
modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
copy to 
modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java
index 069db0aab4..5f94a23702 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java
@@ -19,41 +19,28 @@ package org.apache.ignite.internal.sql.api;
 
 import static 
org.apache.ignite.internal.sql.engine.util.QueryChecker.containsIndexScan;
 import static 
org.apache.ignite.internal.sql.engine.util.QueryChecker.containsTableScan;
+import static org.apache.ignite.internal.sql.engine.util.SqlTestUtils.asStream;
 import static 
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
-import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
-import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsString;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
-import java.util.stream.Stream;
-import java.util.stream.StreamSupport;
 import org.apache.ignite.internal.catalog.commands.CatalogUtils;
-import org.apache.ignite.internal.client.sql.ClientSql;
 import org.apache.ignite.internal.sql.api.ColumnMetadataImpl.ColumnOriginImpl;
 import org.apache.ignite.internal.sql.engine.ClusterPerClassIntegrationTest;
 import org.apache.ignite.internal.sql.engine.QueryCancelledException;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
-import org.apache.ignite.internal.testframework.IgniteTestUtils.RunnableX;
 import org.apache.ignite.internal.tx.TxManager;
-import org.apache.ignite.internal.util.CollectionUtils;
 import org.apache.ignite.lang.ColumnAlreadyExistsException;
 import org.apache.ignite.lang.ColumnNotFoundException;
 import org.apache.ignite.lang.ErrorGroups;
@@ -71,27 +58,30 @@ import org.apache.ignite.sql.ColumnType;
 import org.apache.ignite.sql.CursorClosedException;
 import org.apache.ignite.sql.IgniteSql;
 import org.apache.ignite.sql.NoRowSetExpectedException;
+import org.apache.ignite.sql.ResultSet;
 import org.apache.ignite.sql.ResultSetMetadata;
 import org.apache.ignite.sql.Session;
 import org.apache.ignite.sql.SqlBatchException;
 import org.apache.ignite.sql.SqlException;
 import org.apache.ignite.sql.SqlRow;
-import org.apache.ignite.sql.async.AsyncResultSet;
 import org.apache.ignite.table.Table;
 import org.apache.ignite.tx.Transaction;
 import org.apache.ignite.tx.TransactionOptions;
 import org.hamcrest.Matcher;
-import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 /**
- * Tests for asynchronous SQL API.
+ * Tests for SQL API.
+ * Tests will be run through synchronous, asynchronous API and client entry 
points.
+ * By default, any SQL API test should be added to the base class and use 
special provided methods to interact
+ * with the API in a API-type-independent manner. For any API-specific test, 
should be used the appropriate subclass.
  */
-@SuppressWarnings("ThrowableNotThrown")
-public class ItSqlAsynchronousApiTest extends ClusterPerClassIntegrationTest {
-    private static final int ROW_COUNT = 16;
+public abstract class ItSqlApiBaseTest extends ClusterPerClassIntegrationTest {
+    protected static final int ROW_COUNT = 16;
 
     @AfterEach
     public void dropTables() {
@@ -238,31 +228,11 @@ public class ItSqlAsynchronousApiTest extends 
ClusterPerClassIntegrationTest {
         );
     }
 
-    @Test
-    public void dml() {
-        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
-        IgniteSql sql = igniteSql();
-        Session ses = sql.createSession();
-
-        for (int i = 0; i < ROW_COUNT; ++i) {
-            checkDml(1, ses, "INSERT INTO TEST VALUES (?, ?)", i, i);
-        }
-
-        checkDml(ROW_COUNT, ses, "UPDATE TEST SET VAL0 = VAL0 + ?", 1);
-
-        checkDml(ROW_COUNT, ses, "DELETE FROM TEST WHERE VAL0 >= 0");
-    }
-
     /** Check all transactions are processed correctly even with case of sql 
Exception raised. */
     @Test
     public void implicitTransactionsStates() {
         IgniteSql sql = igniteSql();
 
-        if (sql instanceof ClientSql) {
-            return;
-        }
-
         sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
 
         Session ses = sql.createSession();
@@ -270,59 +240,22 @@ public class ItSqlAsynchronousApiTest extends 
ClusterPerClassIntegrationTest {
         TxManager txManager = txManager();
 
         for (int i = 0; i < ROW_COUNT; ++i) {
-            CompletableFuture<AsyncResultSet<SqlRow>> fut = 
ses.executeAsync(null, "CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)", i, i);
-
-            AsyncResultSet asyncRes = null;
-
-            try {
-                asyncRes = await(fut);
-            } catch (Throwable ignore) {
-                // No op.
-            }
-
-            if (asyncRes != null) {
-                await(asyncRes.closeAsync());
-            }
+            assertThrowsSqlException(
+                    Sql.STMT_VALIDATION_ERR,
+                    "Table with name 'PUBLIC.TEST' already exists",
+                    () -> execute(ses, "CREATE TABLE TEST(ID INT PRIMARY KEY, 
VAL0 INT)")
+            );
         }
 
         // No new transactions through ddl.
         assertEquals(0, txManager.pending());
     }
 
-    /** Check correctness of explicit transaction rollback. */
-    @Test
-    public void checkExplicitTxRollback() {
-        IgniteSql sql = igniteSql();
-
-        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
-        Session ses = sql.createSession();
-
-        // Outer tx with further commit.
-        Transaction outerTx = igniteTx().begin();
-
-        for (int i = 0; i < ROW_COUNT; ++i) {
-            checkDml(1, ses, "INSERT INTO TEST VALUES (?, ?)", outerTx, i, i);
-        }
-
-        await(outerTx.rollbackAsync());
-
-        AsyncResultSet rs = await(ses.executeAsync(null, "SELECT VAL0 FROM 
TEST ORDER BY VAL0"));
-
-        assertEquals(0, StreamSupport.stream(rs.currentPage().spliterator(), 
false).count());
-
-        await(rs.closeAsync());
-    }
-
     /** Check correctness of implicit and explicit transactions. */
     @Test
     public void checkTransactionsWithDml() {
         IgniteSql sql = igniteSql();
 
-        if (sql instanceof ClientSql) {
-            return;
-        }
-
         sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
 
         Session ses = sql.createSession();
@@ -339,36 +272,36 @@ public class ItSqlAsynchronousApiTest extends 
ClusterPerClassIntegrationTest {
         Transaction outerTx = igniteTx().begin();
 
         for (int i = ROW_COUNT; i < 2 * ROW_COUNT; ++i) {
-            checkDml(1, ses, "INSERT INTO TEST VALUES (?, ?)", outerTx, i, i);
+            checkDml(1, outerTx, ses, "INSERT INTO TEST VALUES (?, ?)", i, i);
         }
 
-        outerTx.commit();
+        commit(outerTx);
 
         // Outdated tx.
         Transaction outerTx0 = outerTx;
-        //ToDo: IGNITE-20387 , here should be used assertThrowsSqlException 
method with code and message `"Transaction is already finished"
-        IgniteException e = assertThrows(IgniteException.class,
-                () -> checkDml(1, ses, "INSERT INTO TEST VALUES (?, ?)", 
outerTx0, ROW_COUNT, Integer.MAX_VALUE));
-        assertEquals(Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR, 
e.code());
+        assertThrowsSqlException(
+                Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR,
+                "Transaction is already finished",
+                () -> checkDml(1, outerTx0, ses, "INSERT INTO TEST VALUES (?, 
?)", ROW_COUNT, Integer.MAX_VALUE));
 
         assertThrowsSqlException(
                 Sql.CONSTRAINT_VIOLATION_ERR,
                 "PK unique constraint is violated",
                 () -> checkDml(1, ses, "INSERT INTO TEST VALUES (?, ?)", 
ROW_COUNT, Integer.MAX_VALUE));
 
-        AsyncResultSet rs = await(ses.executeAsync(null, "SELECT VAL0 FROM 
TEST ORDER BY VAL0"));
+        ResultSet<SqlRow> rs = executeForRead(ses, "SELECT VAL0 FROM TEST 
ORDER BY VAL0");
 
-        assertEquals(2 * ROW_COUNT, 
StreamSupport.stream(rs.currentPage().spliterator(), false).count());
+        assertEquals(2 * ROW_COUNT, asStream(rs).count());
 
-        rs.closeAsync();
+        rs.close();
 
         outerTx = igniteTx().begin();
 
-        rs = await(ses.executeAsync(outerTx, "SELECT VAL0 FROM TEST ORDER BY 
VAL0"));
+        rs = executeForRead(ses, outerTx, "SELECT VAL0 FROM TEST ORDER BY 
VAL0");
 
-        assertEquals(2 * ROW_COUNT, 
StreamSupport.stream(rs.currentPage().spliterator(), false).count());
+        assertEquals(2 * ROW_COUNT, asStream(rs).count());
 
-        rs.closeAsync();
+        rs.close();
 
         outerTx.commit();
 
@@ -381,6 +314,32 @@ public class ItSqlAsynchronousApiTest extends 
ClusterPerClassIntegrationTest {
         assertEquals(0, txManagerInternal.pending());
     }
 
+    /** Check correctness of explicit transaction rollback. */
+    @Test
+    public void checkExplicitTxRollback() {
+        IgniteSql sql = igniteSql();
+
+        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+
+        Session ses = sql.createSession();
+
+        // Outer tx with further commit.
+        Transaction outerTx = igniteTx().begin();
+
+        for (int i = 0; i < ROW_COUNT; ++i) {
+            checkDml(1, outerTx, ses, "INSERT INTO TEST VALUES (?, ?)", i, i);
+        }
+
+        rollback(outerTx);
+
+        ResultSet<SqlRow> rs = executeForRead(ses, "SELECT VAL0 FROM TEST 
ORDER BY VAL0");
+
+        asStream(rs);
+        assertEquals(0, asStream(rs).count());
+
+        rs.close();
+    }
+
     /** Check correctness of rw and ro transactions for table scan. */
     @Test
     public void checkMixedTransactionsForTable() {
@@ -391,6 +350,7 @@ public class ItSqlAsynchronousApiTest extends 
ClusterPerClassIntegrationTest {
         checkMixedTransactions(planMatcher);
     }
 
+
     /** Check correctness of rw and ro transactions for index scan. */
     @Test
     public void checkMixedTransactionsForIndex() throws Exception {
@@ -405,10 +365,6 @@ public class ItSqlAsynchronousApiTest extends 
ClusterPerClassIntegrationTest {
     private void checkMixedTransactions(Matcher<String> planMatcher) {
         IgniteSql sql = igniteSql();
 
-        if (sql instanceof ClientSql) {
-            return;
-        }
-
         Session ses = sql.createSession();
 
         for (int i = 0; i < ROW_COUNT; ++i) {
@@ -432,11 +388,11 @@ public class ItSqlAsynchronousApiTest extends 
ClusterPerClassIntegrationTest {
 
         assertQuery(outerTx, query).matches(planMatcher).check();
 
-        AsyncResultSet rs = await(ses.executeAsync(outerTx, query));
+        ResultSet<SqlRow> rs = executeForRead(ses, outerTx, query);
 
-        assertEquals(ROW_COUNT, 
StreamSupport.stream(rs.currentPage().spliterator(), false).count());
+        assertEquals(ROW_COUNT, asStream(rs).count());
 
-        rs.closeAsync();
+        rs.close();
 
         if (outerTx != null) {
             if (commit) {
@@ -447,29 +403,6 @@ public class ItSqlAsynchronousApiTest extends 
ClusterPerClassIntegrationTest {
         }
     }
 
-    @Test
-    public void select() {
-        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
-        IgniteSql sql = igniteSql();
-        Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 
4).build();
-
-        for (int i = 0; i < ROW_COUNT; ++i) {
-            ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i);
-        }
-
-        TestPageProcessor pageProc = new TestPageProcessor(4);
-        await(ses.executeAsync(null, "SELECT ID FROM 
TEST").thenCompose(pageProc));
-
-        Set<Integer> rs = pageProc.result().stream().map(r -> 
r.intValue(0)).collect(Collectors.toSet());
-
-        for (int i = 0; i < ROW_COUNT; ++i) {
-            assertTrue(rs.remove(i), "Results invalid: " + pageProc.result());
-        }
-
-        assertTrue(rs.isEmpty());
-    }
-
     @Test
     public void metadata() {
         sql("CREATE TABLE TEST(COL0 BIGINT PRIMARY KEY, COL1 VARCHAR NOT 
NULL)");
@@ -477,9 +410,9 @@ public class ItSqlAsynchronousApiTest extends 
ClusterPerClassIntegrationTest {
         IgniteSql sql = igniteSql();
         Session ses = sql.sessionBuilder().build();
 
-        ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", 1L, "some string");
+        execute(ses, "INSERT INTO TEST VALUES (?, ?)", 1L, "some string");
 
-        AsyncResultSet<SqlRow> rs = await(ses.executeAsync(null, "SELECT COL1, 
COL0 FROM TEST"));
+        ResultSet<SqlRow> rs = executeForRead(ses, "SELECT COL1, COL0 FROM 
TEST");
 
         // Validate columns metadata.
         ResultSetMetadata meta = rs.metadata();
@@ -508,11 +441,10 @@ public class ItSqlAsynchronousApiTest extends 
ClusterPerClassIntegrationTest {
 
         // Validate result columns types.
         assertTrue(rs.hasRowSet());
-        assertEquals(1, rs.currentPageSize());
 
-        SqlRow row = rs.currentPage().iterator().next();
+        SqlRow row = rs.next();
 
-        await(rs.closeAsync());
+        rs.close();
 
         assertInstanceOf(meta.columns().get(0).valueClass(), row.value(0));
         assertInstanceOf(meta.columns().get(1).valueClass(), row.value(1));
@@ -523,9 +455,9 @@ public class ItSqlAsynchronousApiTest extends 
ClusterPerClassIntegrationTest {
         IgniteSql sql = igniteSql();
         Session ses = sql.sessionBuilder().build();
 
-        AsyncResultSet<SqlRow> ars = await(ses.executeAsync(null, "SELECT 1 as 
COL_A, 2 as COL_B"));
+        ResultSet<SqlRow> rs = executeForRead(ses, "SELECT 1 as COL_A, 2 as 
COL_B");
 
-        SqlRow r = CollectionUtils.first(ars.currentPage());
+        SqlRow r = rs.next();
 
         assertEquals(2, r.columnCount());
         assertEquals(0, r.columnIndex("COL_A"));
@@ -548,59 +480,42 @@ public class ItSqlAsynchronousApiTest extends 
ClusterPerClassIntegrationTest {
         assertThrowsWithCause(() -> r.intValue(-2), 
IndexOutOfBoundsException.class);
         assertThrowsWithCause(() -> r.intValue(10), 
IndexOutOfBoundsException.class);
 
-        await(ars.closeAsync());
+        rs.close();
     }
 
     @Test
-    public void pageSequence() {
+    public void closeSession() {
         sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
 
         IgniteSql sql = igniteSql();
-        Session ses = sql.sessionBuilder().defaultPageSize(1).build();
+        Session ses = sql.sessionBuilder().defaultPageSize(2).build();
 
         for (int i = 0; i < ROW_COUNT; ++i) {
-            ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i);
+            execute(ses, "INSERT INTO TEST VALUES (?, ?)", i, i);
         }
 
-        AsyncResultSet<SqlRow> ars0 = await(ses.executeAsync(null, "SELECT ID 
FROM TEST ORDER BY ID"));
-        var p0 = ars0.currentPage();
-        AsyncResultSet<SqlRow> ars1 = await(ars0.fetchNextPage());
-        var p1 = ars1.currentPage();
-        AsyncResultSet<SqlRow> ars2 = 
await(ars1.fetchNextPage().toCompletableFuture());
-        var p2 = ars2.currentPage();
-        AsyncResultSet<SqlRow> ars3 = await(ars1.fetchNextPage());
-        var p3 = ars3.currentPage();
-        AsyncResultSet<SqlRow> ars4 = await(ars0.fetchNextPage());
-        var p4 = ars4.currentPage();
-
-        assertSame(ars0, ars1);
-        assertSame(ars0, ars2);
-        assertSame(ars0, ars3);
-        assertSame(ars0, ars4);
+        ResultSet rs = executeForRead(ses, "SELECT ID FROM TEST");
 
-        List<SqlRow> res = Stream.of(p0, p1, p2, p3, p4)
-                .flatMap(p -> StreamSupport.stream(p.spliterator(), false))
-                .collect(Collectors.toList());
+        ses.close();
 
-        TestPageProcessor pageProc = new TestPageProcessor(ROW_COUNT - 
res.size());
-        await(ars4.fetchNextPage().thenCompose(pageProc));
-
-        res.addAll(pageProc.result());
+        SqlException sqlEx = assertThrowsSqlException(
+                Sql.EXECUTION_CANCELLED_ERR,
+                "The query was cancelled while executing",
+                () -> rs.forEachRemaining(System.out::println));
 
-        for (int i = 0; i < ROW_COUNT; ++i) {
-            assertEquals(i, res.get(i).intValue(0));
-        }
+        assertTrue(IgniteTestUtils.hasCause(sqlEx, 
QueryCancelledException.class, null));
+        assertThrowsSqlException(Sql.SESSION_CLOSED_ERR, "Session is closed", 
() -> execute(ses, "SELECT ID FROM TEST"));
     }
 
     @Test
-    public void errors() {
+    public void errors() throws InterruptedException {
         sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT NOT NULL)");
 
         IgniteSql sql = igniteSql();
-        Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 
2).build();
+        Session ses = sql.sessionBuilder().defaultPageSize(2).build();
 
         for (int i = 0; i < ROW_COUNT; ++i) {
-            ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i);
+            execute(ses, "INSERT INTO TEST VALUES (?, ?)", i, i);
         }
 
         // Parse error.
@@ -628,87 +543,71 @@ public class ItSqlAsynchronousApiTest extends 
ClusterPerClassIntegrationTest {
 
         // No result set error.
         {
-            AsyncResultSet ars = await(ses.executeAsync(null, "CREATE TABLE 
TEST3 (ID INT PRIMARY KEY)"));
+            ResultSet rs = executeForRead(ses, "CREATE TABLE TEST3 (ID INT 
PRIMARY KEY)");
             assertThrowsSqlException(
                     NoRowSetExpectedException.class,
                     Sql.QUERY_NO_RESULT_SET_ERR, "Query has no result set",
-                    () -> await(ars.fetchNextPage()));
+                    () -> rs.next());
         }
 
         // Cursor closed error.
         {
-            AsyncResultSet ars = await(ses.executeAsync(null, "SELECT * FROM 
TEST"));
-            await(ars.closeAsync());
+            ResultSet rs = executeForRead(ses, "SELECT * FROM TEST");
+            Thread.sleep(300); // ResultSetImpl fetches next page in 
background, wait to it to complete to avoid flakiness.
+            rs.close();
 
             assertThrowsSqlException(
                     CursorClosedException.class,
                     Sql.CURSOR_CLOSED_ERR,
                     "Cursor is closed",
-                    () -> await(ars.fetchNextPage()));
+                    () -> rs.forEachRemaining(Object::hashCode));
         }
     }
 
-    /**
-     * DDL is non-transactional.
-     */
     @Test
-    public void ddlInTransaction() {
-        Session ses = igniteSql().createSession();
+    public void dml() {
         sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
 
-        {
-            Transaction tx = igniteTx().begin();
-            try {
-                assertThrowsSqlException(
-                        Sql.STMT_VALIDATION_ERR,
-                        "DDL doesn't support transactions.",
-                        () -> await(ses.executeAsync(tx, "CREATE TABLE 
TEST2(ID INT PRIMARY KEY, VAL0 INT)"))
-                );
-            } finally {
-                tx.rollback();
-            }
-        }
-        {
-            Transaction tx = igniteTx().begin();
-            AsyncResultSet<SqlRow> res = await(ses.executeAsync(tx, "INSERT 
INTO TEST VALUES (?, ?)", -1, -1));
-            assertEquals(1, res.affectedRows());
+        IgniteSql sql = igniteSql();
+        Session ses = sql.createSession();
 
-            assertThrowsSqlException(
-                    Sql.STMT_VALIDATION_ERR,
-                    "DDL doesn't support transactions.",
-                    () -> await(ses.executeAsync(tx, "CREATE TABLE TEST2(ID 
INT PRIMARY KEY, VAL0 INT)"))
-            );
-            tx.commit();
+        TxManager txManager = txManager();
 
-            assertTrue(await(ses.executeAsync(null, "SELECT ID FROM TEST WHERE 
ID = -1")).currentPage().iterator().hasNext());
+        int txPrevCnt = txManager.finished();
+
+        for (int i = 0; i < ROW_COUNT; ++i) {
+            checkDml(1, ses, "INSERT INTO TEST VALUES (?, ?)", i, i);
         }
+
+        assertEquals(ROW_COUNT, txManager.finished() - txPrevCnt);
+        // No new transactions through ddl.
+        assertEquals(0, txManager.pending());
+
+        checkDml(ROW_COUNT, ses, "UPDATE TEST SET VAL0 = VAL0 + ?", 1);
+
+        checkDml(ROW_COUNT, ses, "DELETE FROM TEST WHERE VAL0 >= 0");
     }
 
     @Test
-    public void closeSession() {
+    public void select() {
         sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
 
         IgniteSql sql = igniteSql();
-        Session ses = sql.sessionBuilder().defaultPageSize(2).build();
+        Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 
4).build();
 
         for (int i = 0; i < ROW_COUNT; ++i) {
             ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i);
         }
 
-        AsyncResultSet ars0 = await(ses.executeAsync(null, "SELECT ID FROM 
TEST"));
+        ResultProcessor resultProcessor = execute(4, ses, "SELECT ID FROM 
TEST");
 
-        await(ses.closeAsync());
+        Set<Integer> rs = resultProcessor.result().stream().map(r -> 
r.intValue(0)).collect(Collectors.toSet());
 
-        // Fetched page is available after cancel.
-        ars0.currentPage();
-
-        SqlException sqlEx = assertThrowsSqlException(
-                Sql.EXECUTION_CANCELLED_ERR,
-                "The query was cancelled while executing",
-                () -> await(ars0.fetchNextPage()));
+        for (int i = 0; i < ROW_COUNT; ++i) {
+            assertTrue(rs.remove(i), "Results invalid: " + 
resultProcessor.result());
+        }
 
-        assertTrue(IgniteTestUtils.hasCause(sqlEx, 
QueryCancelledException.class, null));
-        assertThrowsSqlException(Sql.SESSION_CLOSED_ERR, "Session is closed", 
() -> await(ses.executeAsync(null, "SELECT ID FROM TEST")));
+        assertTrue(rs.isEmpty());
     }
 
     @Test
@@ -724,7 +623,7 @@ public class ItSqlAsynchronousApiTest extends 
ClusterPerClassIntegrationTest {
             args.add(i, i);
         }
 
-        long[] batchRes = await(ses.executeBatchAsync(null, "INSERT INTO TEST 
VALUES (?, ?)", args));
+        long[] batchRes = executeBatch(ses, "INSERT INTO TEST VALUES (?, ?)", 
args);
 
         Arrays.stream(batchRes).forEach(r -> assertEquals(1L, r));
 
@@ -737,13 +636,13 @@ public class ItSqlAsynchronousApiTest extends 
ClusterPerClassIntegrationTest {
                 SqlBatchException.class,
                 Sql.STMT_VALIDATION_ERR,
                 "Invalid SQL statement type",
-                () -> await(ses.executeBatchAsync(null, "SELECT * FROM TEST", 
args)));
+                () -> executeBatch(ses, "SELECT * FROM TEST", args));
 
         assertThrowsSqlException(
                 SqlBatchException.class,
                 Sql.STMT_VALIDATION_ERR,
                 "Invalid SQL statement type",
-                () -> await(ses.executeBatchAsync(null, "CREATE TABLE TEST1(ID 
INT PRIMARY KEY, VAL0 INT)", args)));
+                () -> executeBatch(ses, "CREATE TABLE TEST1(ID INT PRIMARY 
KEY, VAL0 INT)", args));
     }
 
     @Test
@@ -769,33 +668,99 @@ public class ItSqlAsynchronousApiTest extends 
ClusterPerClassIntegrationTest {
                 SqlBatchException.class,
                 Sql.CONSTRAINT_VIOLATION_ERR,
                 "PK unique constraint is violated",
-                () -> await(ses.executeBatchAsync(null, "INSERT INTO TEST 
VALUES (?, ?)", args))
+                () -> executeBatch(ses, "INSERT INTO TEST VALUES (?, ?)", args)
         );
 
         assertEquals(err, ex.updateCounters().length);
         IntStream.range(0, ex.updateCounters().length).forEach(i -> 
assertEquals(1, ex.updateCounters()[i]));
     }
 
-    @Test
-    public void resultSetCloseShouldFinishImplicitTransaction() throws 
InterruptedException {
-        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+    @ParameterizedTest
+    @ValueSource(strings = {
+            "INSERT INTO tst VALUES (2, ?)",
+            "SELECT * FROM tst WHERE id = ? "
+    })
+    public void runtimeErrorInDmlCausesTransactionToFail(String query) {
+        sql("CREATE TABLE tst(id INTEGER PRIMARY KEY, val INTEGER)");
 
+        sql("INSERT INTO tst VALUES (?,?)", 1, 1);
+
+        try (Session ses = igniteSql().createSession()) {
+            Transaction tx = igniteTx().begin();
+            String dmlQuery = "UPDATE tst SET val = val/(val - ?) + 1";
+
+            assertThrowsSqlException(
+                    Sql.RUNTIME_ERR,
+                    "/ by zero",
+                    () -> execute(tx, ses, dmlQuery, 1).affectedRows());
+
+            IgniteException err = assertThrows(IgniteException.class, () -> {
+                ResultSet<SqlRow> rs = executeForRead(ses, tx, query, 2);
+                if (rs.hasRowSet()) {
+                    assertTrue(rs.hasNext());
+                } else {
+                    assertTrue(rs.wasApplied());
+                }
+            });
+
+            assertEquals(Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR, 
err.code(), err.toString());
+        }
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {
+            "INSERT INTO tst VALUES (2, ?)",
+            "SELECT * FROM tst WHERE id = ? "
+    })
+    public void runtimeErrorInQueryCausesTransactionToFail(String query) {
+        sql("CREATE TABLE tst(id INTEGER PRIMARY KEY, val INTEGER)");
+
+        sql("INSERT INTO tst VALUES (?,?)", 1, 1);
+
+        try (Session ses = igniteSql().createSession()) {
+            Transaction tx = igniteTx().begin();
+
+            assertThrowsSqlException(
+                    Sql.RUNTIME_ERR,
+                    "/ by zero",
+                    () -> execute(tx, ses, "SELECT val/? FROM tst WHERE id=?", 
0, 1));
+
+            IgniteException err = assertThrows(IgniteException.class, () -> {
+                ResultSet<SqlRow> rs = executeForRead(ses, tx, query, 2);
+                if (rs.hasRowSet()) {
+                    assertTrue(rs.hasNext());
+                } else {
+                    assertTrue(rs.wasApplied());
+                }
+            });
+
+            assertEquals(Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR, 
err.code(), err.toString());
+        }
+    }
+
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-20534";)
+    @Test
+    public void testLockIsNotReleasedAfterTxRollback() {
         IgniteSql sql = igniteSql();
-        Session ses = sql.sessionBuilder().defaultPageSize(2).build();
 
-        for (int i = 0; i < ROW_COUNT; ++i) {
-            ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i);
+        try (Session ses = sql.createSession()) {
+            checkDdl(true, ses, "CREATE TABLE IF NOT EXISTS tst(id INTEGER 
PRIMARY KEY, val INTEGER)");
         }
 
-        CompletableFuture<AsyncResultSet<SqlRow>> f = ses.executeAsync(null, 
"SELECT * FROM TEST");
+        try (Session session = sql.createSession()) {
+            Transaction tx = igniteTx().begin();
+
+            assertThrows(RuntimeException.class, () -> execute(tx, session, 
"SELECT 1/0"));
+            tx.rollback();
+            session.execute(tx, "INSERT INTO tst VALUES (1, 1)");
+        }
 
-        AsyncResultSet<SqlRow> ars = f.join();
-        // There should be a pending transaction since not all data was read.
-        boolean txStarted = waitForCondition(() -> txManager().pending() == 1, 
5000);
-        assertTrue(txStarted, "No pending transactions");
+        try (Session session = sql.createSession()) {
+            Transaction tx = igniteTx().begin(new 
TransactionOptions().readOnly(false));
 
-        ars.closeAsync().join();
-        assertEquals(0, txManager().pending(), "Expected no pending 
transactions");
+            execute(tx, session, "INSERT INTO tst VALUES (1, 1)");
+            tx.commit();
+        }
     }
 
     @Test
@@ -803,127 +768,141 @@ public class ItSqlAsynchronousApiTest extends 
ClusterPerClassIntegrationTest {
         sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
 
         IgniteSql sql = igniteSql();
-
         // Fetch all data in one read.
         Session ses = sql.sessionBuilder().defaultPageSize(100).build();
-
         for (int i = 0; i < ROW_COUNT; ++i) {
-            ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i);
+            execute(ses, "INSERT INTO TEST VALUES (?, ?)", i, i);
         }
 
-        CompletableFuture<AsyncResultSet<SqlRow>> f = ses.executeAsync(null, 
"SELECT * FROM TEST");
-
-        AsyncResultSet<SqlRow> ars = f.join();
-        assertFalse(ars.hasMorePages());
+        execute(1, ses, "SELECT * FROM TEST");
 
         assertEquals(0, txManager().pending(), "Expected no pending 
transactions");
     }
 
-    private static void checkDdl(boolean expectedApplied, Session ses, String 
sql, Transaction tx) {
-        CompletableFuture<AsyncResultSet<SqlRow>> fut = ses.executeAsync(
-                tx,
-                sql
-        );
+    /**
+     * DDL is non-transactional.
+     */
+    @Test
+    public void ddlInTransaction() {
+        Session ses = igniteSql().createSession();
+        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
 
-        AsyncResultSet<SqlRow> asyncRes = await(fut);
+        {
+            Transaction tx = igniteTx().begin();
+            try {
+                assertThrowsSqlException(
+                        Sql.STMT_VALIDATION_ERR,
+                        "DDL doesn't support transactions.",
+                        () -> execute(tx, ses, "CREATE TABLE TEST2(ID INT 
PRIMARY KEY, VAL0 INT)")
+                );
+            } finally {
+                tx.rollback();
+            }
+        }
+        {
+            Transaction tx = igniteTx().begin();
+            ResultProcessor result = execute(tx, ses, "INSERT INTO TEST VALUES 
(?, ?)", -1, -1);
+            assertEquals(1, result.affectedRows());
 
-        assertEquals(expectedApplied, asyncRes.wasApplied());
-        assertFalse(asyncRes.hasMorePages());
-        assertFalse(asyncRes.hasRowSet());
-        assertEquals(-1, asyncRes.affectedRows());
+            assertThrowsSqlException(
+                    Sql.STMT_VALIDATION_ERR,
+                    "DDL doesn't support transactions.",
+                    () -> ses.execute(tx, "CREATE TABLE TEST2(ID INT PRIMARY 
KEY, VAL0 INT)")
+            );
+            tx.commit();
 
-        assertNull(asyncRes.metadata());
+            assertEquals(1, execute(ses, "SELECT ID FROM TEST WHERE ID = 
-1").result().size());
+        }
 
-        await(asyncRes.closeAsync());
+        assertEquals(0, txManager().pending());
     }
 
-    private static void checkDdl(boolean expectedApplied, Session ses, String 
sql) {
-        checkDdl(expectedApplied, ses, sql, null);
+    @Test
+    public void resultSetCloseShouldFinishImplicitTransaction() {
+        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+
+        IgniteSql sql = igniteSql();
+        Session ses = sql.sessionBuilder().defaultPageSize(2).build();
+
+        for (int i = 0; i < ROW_COUNT; ++i) {
+            execute(ses, "INSERT INTO TEST VALUES (?, ?)", i, i);
+        }
+
+        ResultSet<?> rs = executeForRead(ses, "SELECT * FROM TEST");
+        assertEquals(1, txManager().pending());
+        rs.close();
+        assertEquals(0, txManager().pending(), "Expected no pending 
transactions");
+    }
+
+    protected ResultSet<SqlRow> executeForRead(Session ses, String query) {
+        return executeForRead(ses, null, query);
     }
 
-    private static <T extends IgniteException> T checkError(Class<T> expCls, 
Integer code, String msg, Session ses, String sql,
+    protected abstract ResultSet<SqlRow> executeForRead(Session ses, 
Transaction tx, String query, Object... args);
+
+    protected <T extends IgniteException> T checkError(Class<T> expCls, 
Integer code, String msg, Session ses, String sql,
             Object... args) {
-        return assertThrowsPublicException(() -> await(ses.executeAsync(null, 
sql, args)), expCls, code, msg);
+        T ex = assertThrows(expCls, () -> execute(ses, sql, args));
+
+        if (code != null) {
+            assertEquals(new IgniteException(code).codeAsString(), 
ex.codeAsString());
+        }
+
+        if (msg != null) {
+            assertThat(ex.getMessage(), containsString(msg));
+        }
+
+        return ex;
     }
 
-    private static SqlException checkSqlError(
+    protected SqlException checkSqlError(
             int code,
             String msg,
             Session ses,
             String sql,
             Object... args
     ) {
-        return assertThrowsSqlException(code, msg, () -> 
await(ses.executeAsync(null, sql, args)));
+        return assertThrowsSqlException(code, msg, () -> execute(ses, sql, 
args));
     }
 
-    protected static void checkDml(int expectedAffectedRows, Session ses, 
String sql, Transaction tx, Object... args) {
-        AsyncResultSet asyncRes = await(ses.executeAsync(tx, sql, args));
+    protected abstract long[] executeBatch(Session ses, String sql, 
BatchedArguments args);
 
-        assertFalse(asyncRes.wasApplied());
-        assertFalse(asyncRes.hasMorePages());
-        assertFalse(asyncRes.hasRowSet());
-        assertEquals(expectedAffectedRows, asyncRes.affectedRows());
+    protected abstract ResultProcessor execute(Integer expectedPages, 
Transaction tx, Session ses, String sql, Object... args);
 
-        assertNull(asyncRes.metadata());
-
-        await(asyncRes.closeAsync());
+    protected ResultProcessor execute(int expectedPages, Session ses, String 
sql, Object... args) {
+        return execute(expectedPages, null, ses, sql, args);
     }
 
-    private static void checkDml(int expectedAffectedRows, Session ses, String 
sql, Object... args) {
-        checkDml(expectedAffectedRows, ses, sql, null, args);
+    protected ResultProcessor execute(Transaction tx, Session ses, String sql, 
Object... args) {
+        return execute(null, tx, ses, sql, args);
     }
 
-    static <T extends IgniteException> T assertThrowsPublicException(
-            RunnableX executable,
-            Class<T> expCls,
-            @Nullable Integer code,
-            @Nullable String msgPart
-    ) {
-        T ex = assertThrows(expCls, executable::run);
-
-        if (code != null) {
-            assertEquals(new IgniteException(code).codeAsString(), 
ex.codeAsString());
-        }
-
-        if (msgPart != null) {
-            assertThat(ex.getMessage(), containsString(msgPart));
-        }
-
-        return ex;
+    protected ResultProcessor execute(Session ses, String sql, Object... args) 
{
+        return execute(null, null, ses, sql, args);
     }
 
-    static class TestPageProcessor implements
-            Function<AsyncResultSet<SqlRow>, 
CompletionStage<AsyncResultSet<SqlRow>>> {
-        private int expectedPages;
+    protected abstract void rollback(Transaction outerTx);
 
-        private final List<SqlRow> res = new ArrayList<>();
+    protected abstract void commit(Transaction outerTx);
 
-        TestPageProcessor(int expectedPages) {
-            this.expectedPages = expectedPages;
-        }
+    protected void checkDml(int expectedAffectedRows, Transaction tx, Session 
ses, String sql, Object... args) {
 
-        @Override
-        public CompletionStage<AsyncResultSet<SqlRow>> 
apply(AsyncResultSet<SqlRow> rs) {
-            expectedPages--;
+    }
 
-            assertTrue(rs.hasRowSet());
-            assertFalse(rs.wasApplied());
-            assertEquals(-1L, rs.affectedRows());
-            assertEquals(expectedPages > 0, rs.hasMorePages(),
-                    "hasMorePages(): [expected=" + (expectedPages > 0) + ", 
actual=" + rs.hasMorePages() + ']');
+    protected void checkDml(int expectedAffectedRows, Session ses, String sql, 
Object... args) {
+        checkDml(expectedAffectedRows, null, ses, sql, args);
+    }
 
-            rs.currentPage().forEach(res::add);
+    protected void checkDdl(boolean expectedApplied, Session ses, String sql) {
+        checkDdl(expectedApplied, ses, sql, null);
+    }
 
-            if (rs.hasMorePages()) {
-                return rs.fetchNextPage().thenCompose(this);
-            }
+    protected abstract void checkDdl(boolean expectedApplied, Session ses, 
String sql, Transaction tx);
 
-            return rs.closeAsync().thenApply(v -> rs);
-        }
+    /** Represent result of running SQL query to hide implementation specific 
for different version of tests. */
+    protected interface ResultProcessor {
+        List<SqlRow> result();
 
-        public List<SqlRow> result() {
-            //noinspection AssignmentOrReturnOfFieldWithMutableType
-            return res;
-        }
+        long affectedRows();
     }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
index 069db0aab4..77d45cc8f6 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
@@ -17,540 +17,36 @@
 
 package org.apache.ignite.internal.sql.api;
 
-import static 
org.apache.ignite.internal.sql.engine.util.QueryChecker.containsIndexScan;
-import static 
org.apache.ignite.internal.sql.engine.util.QueryChecker.containsTableScan;
-import static 
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException;
-import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
-import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.containsString;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertInstanceOf;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertSame;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.function.Function;
 import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
-import org.apache.ignite.internal.catalog.commands.CatalogUtils;
-import org.apache.ignite.internal.client.sql.ClientSql;
-import org.apache.ignite.internal.sql.api.ColumnMetadataImpl.ColumnOriginImpl;
-import org.apache.ignite.internal.sql.engine.ClusterPerClassIntegrationTest;
-import org.apache.ignite.internal.sql.engine.QueryCancelledException;
-import org.apache.ignite.internal.testframework.IgniteTestUtils;
-import org.apache.ignite.internal.testframework.IgniteTestUtils.RunnableX;
-import org.apache.ignite.internal.tx.TxManager;
-import org.apache.ignite.internal.util.CollectionUtils;
-import org.apache.ignite.lang.ColumnAlreadyExistsException;
-import org.apache.ignite.lang.ColumnNotFoundException;
-import org.apache.ignite.lang.ErrorGroups;
-import org.apache.ignite.lang.ErrorGroups.Index;
-import org.apache.ignite.lang.ErrorGroups.Sql;
-import org.apache.ignite.lang.ErrorGroups.Transactions;
-import org.apache.ignite.lang.IgniteException;
-import org.apache.ignite.lang.IndexAlreadyExistsException;
-import org.apache.ignite.lang.IndexNotFoundException;
-import org.apache.ignite.lang.TableAlreadyExistsException;
-import org.apache.ignite.lang.TableNotFoundException;
+import org.apache.ignite.internal.sql.SyncResultSetAdapter;
 import org.apache.ignite.sql.BatchedArguments;
-import org.apache.ignite.sql.ColumnMetadata;
-import org.apache.ignite.sql.ColumnType;
-import org.apache.ignite.sql.CursorClosedException;
 import org.apache.ignite.sql.IgniteSql;
-import org.apache.ignite.sql.NoRowSetExpectedException;
-import org.apache.ignite.sql.ResultSetMetadata;
+import org.apache.ignite.sql.ResultSet;
 import org.apache.ignite.sql.Session;
-import org.apache.ignite.sql.SqlBatchException;
-import org.apache.ignite.sql.SqlException;
 import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.sql.async.AsyncResultSet;
-import org.apache.ignite.table.Table;
 import org.apache.ignite.tx.Transaction;
-import org.apache.ignite.tx.TransactionOptions;
-import org.hamcrest.Matcher;
-import org.jetbrains.annotations.Nullable;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 /**
  * Tests for asynchronous SQL API.
  */
 @SuppressWarnings("ThrowableNotThrown")
-public class ItSqlAsynchronousApiTest extends ClusterPerClassIntegrationTest {
-    private static final int ROW_COUNT = 16;
-
-    @AfterEach
-    public void dropTables() {
-        for (Table t : CLUSTER_NODES.get(0).tables().tables()) {
-            sql("DROP TABLE " + t.name());
-        }
-    }
-
-    @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-20096";)
-    public void ddl() throws Exception {
-        IgniteSql sql = igniteSql();
-        Session ses = sql.createSession();
-
-        // CREATE TABLE
-        checkDdl(true, ses, "CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-        checkError(
-                TableAlreadyExistsException.class,
-                ErrorGroups.Table.TABLE_ALREADY_EXISTS_ERR,
-                "Table already exists [name=\"PUBLIC\".\"TEST\"]",
-                ses,
-                "CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"
-        );
-        checkSqlError(
-                ErrorGroups.Table.TABLE_DEFINITION_ERR,
-                "Can't create table with duplicate columns: ID, VAL, VAL",
-                ses,
-                "CREATE TABLE TEST1(ID INT PRIMARY KEY, VAL INT, VAL INT)"
-        );
-        checkDdl(false, ses, "CREATE TABLE IF NOT EXISTS TEST(ID INT PRIMARY 
KEY, VAL VARCHAR)");
-
-        // ADD COLUMN
-        checkDdl(true, ses, "ALTER TABLE TEST ADD COLUMN VAL1 VARCHAR");
-        checkError(
-                TableNotFoundException.class,
-                ErrorGroups.Table.TABLE_NOT_FOUND_ERR,
-                "The table does not exist 
[name=\"PUBLIC\".\"NOT_EXISTS_TABLE\"]",
-                ses,
-                "ALTER TABLE NOT_EXISTS_TABLE ADD COLUMN VAL1 VARCHAR"
-        );
-        checkDdl(false, ses, "ALTER TABLE IF EXISTS NOT_EXISTS_TABLE ADD 
COLUMN VAL1 VARCHAR");
-        checkError(
-                ColumnAlreadyExistsException.class,
-                ErrorGroups.Table.COLUMN_ALREADY_EXISTS_ERR,
-                "Column already exists [name=\"VAL1\"]",
-                ses,
-                "ALTER TABLE TEST ADD COLUMN VAL1 INT"
-        );
-
-        // CREATE INDEX
-        checkDdl(true, ses, "CREATE INDEX TEST_IDX ON TEST(VAL0)");
-        checkError(
-                IndexAlreadyExistsException.class,
-                Index.INDEX_ALREADY_EXISTS_ERR,
-                "Index already exists [name=\"PUBLIC\".\"TEST_IDX\"]",
-                ses,
-                "CREATE INDEX TEST_IDX ON TEST(VAL1)"
-        );
-        checkDdl(false, ses, "CREATE INDEX IF NOT EXISTS TEST_IDX ON 
TEST(VAL1)");
-
-        // TODO: IGNITE-19150 We are waiting for schema synchronization to 
avoid races to create and destroy indexes
-        waitForIndexBuild("TEST", "TEST_IDX");
-
-        checkDdl(true, ses, "DROP INDEX TESt_iDX");
-        checkDdl(true, ses, "CREATE INDEX TEST_IDX1 ON TEST(VAL0)");
-        checkDdl(true, ses, "CREATE INDEX TEST_IDX2 ON TEST(VAL0)");
-        checkDdl(true, ses, "CREATE INDEX TEST_IDX3 ON TEST(ID, VAL0, VAL1)");
-        checkSqlError(
-                Index.INVALID_INDEX_DEFINITION_ERR,
-                "Can't create index on duplicate columns: VAL0, VAL0",
-                ses,
-                "CREATE INDEX TEST_IDX4 ON TEST(VAL0, VAL0)"
-        );
-
-        checkSqlError(
-                Sql.STMT_VALIDATION_ERR,
-                "Can`t delete column(s). Column VAL1 is used by indexes 
[TEST_IDX3].",
-                ses,
-                "ALTER TABLE TEST DROP COLUMN val1"
-        );
-
-        SqlException ex = checkSqlError(
-                Sql.STMT_VALIDATION_ERR,
-                "Can`t delete column(s).",
-                ses,
-                "ALTER TABLE TEST DROP COLUMN (val0, val1)"
-        );
-
-        String msg = ex.getMessage();
-        String explainMsg = "Unexpected error message: " + msg;
-
-        assertTrue(msg.contains("Column VAL0 is used by indexes ["), 
explainMsg);
-        assertTrue(msg.contains("TEST_IDX1") && msg.contains("TEST_IDX2") && 
msg.contains("TEST_IDX3"), explainMsg);
-        assertTrue(msg.contains("Column VAL1 is used by indexes [TEST_IDX3]"), 
explainMsg);
-
-        checkSqlError(
-                Sql.STMT_VALIDATION_ERR,
-                "Can`t delete column, belongs to primary key: [name=ID]",
-                ses,
-                "ALTER TABLE TEST DROP COLUMN id"
-        );
-
-        // TODO: IGNITE-19150 We are waiting for schema synchronization to 
avoid races to create and destroy indexes
-        waitForIndexBuild("TEST", "TEST_IDX3");
-        checkDdl(true, ses, "DROP INDEX TESt_iDX3");
-
-        // DROP COLUMNS
-        checkDdl(true, ses, "ALTER TABLE TEST DROP COLUMN VAL1");
-        checkError(
-                TableNotFoundException.class,
-                ErrorGroups.Table.TABLE_NOT_FOUND_ERR,
-                "The table does not exist 
[name=\"PUBLIC\".\"NOT_EXISTS_TABLE\"]",
-                ses,
-                "ALTER TABLE NOT_EXISTS_TABLE DROP COLUMN VAL1"
-        );
-        checkDdl(false, ses, "ALTER TABLE IF EXISTS NOT_EXISTS_TABLE DROP 
COLUMN VAL1");
-        checkError(
-                ColumnNotFoundException.class,
-                ErrorGroups.Table.COLUMN_NOT_FOUND_ERR,
-                "Column does not exist [tableName=\"PUBLIC\".\"TEST\", 
columnName=\"VAL1\"]",
-                ses,
-                "ALTER TABLE TEST DROP COLUMN VAL1"
-        );
-
-        // DROP TABLE
-        checkDdl(false, ses, "DROP TABLE IF EXISTS NOT_EXISTS_TABLE");
-
-        checkDdl(true, ses, "DROP TABLE TEST");
-        checkError(
-                TableNotFoundException.class,
-                ErrorGroups.Table.TABLE_NOT_FOUND_ERR,
-                "The table does not exist [name=\"PUBLIC\".\"TEST\"]",
-                ses,
-                "DROP TABLE TEST"
-        );
-
-        checkDdl(false, ses, "DROP INDEX IF EXISTS TEST_IDX");
-
-        checkError(
-                IndexNotFoundException.class,
-                Index.INDEX_NOT_FOUND_ERR,
-                "Index does not exist [name=\"PUBLIC\".\"TEST_IDX\"]", ses,
-                "DROP INDEX TEST_IDX"
-        );
-    }
-
-    @Test
-    public void dml() {
-        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
-        IgniteSql sql = igniteSql();
-        Session ses = sql.createSession();
-
-        for (int i = 0; i < ROW_COUNT; ++i) {
-            checkDml(1, ses, "INSERT INTO TEST VALUES (?, ?)", i, i);
-        }
-
-        checkDml(ROW_COUNT, ses, "UPDATE TEST SET VAL0 = VAL0 + ?", 1);
-
-        checkDml(ROW_COUNT, ses, "DELETE FROM TEST WHERE VAL0 >= 0");
-    }
-
-    /** Check all transactions are processed correctly even with case of sql 
Exception raised. */
-    @Test
-    public void implicitTransactionsStates() {
-        IgniteSql sql = igniteSql();
-
-        if (sql instanceof ClientSql) {
-            return;
-        }
-
-        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
-        Session ses = sql.createSession();
-
-        TxManager txManager = txManager();
-
-        for (int i = 0; i < ROW_COUNT; ++i) {
-            CompletableFuture<AsyncResultSet<SqlRow>> fut = 
ses.executeAsync(null, "CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)", i, i);
-
-            AsyncResultSet asyncRes = null;
-
-            try {
-                asyncRes = await(fut);
-            } catch (Throwable ignore) {
-                // No op.
-            }
-
-            if (asyncRes != null) {
-                await(asyncRes.closeAsync());
-            }
-        }
-
-        // No new transactions through ddl.
-        assertEquals(0, txManager.pending());
-    }
-
-    /** Check correctness of explicit transaction rollback. */
-    @Test
-    public void checkExplicitTxRollback() {
-        IgniteSql sql = igniteSql();
-
-        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
-        Session ses = sql.createSession();
-
-        // Outer tx with further commit.
-        Transaction outerTx = igniteTx().begin();
-
-        for (int i = 0; i < ROW_COUNT; ++i) {
-            checkDml(1, ses, "INSERT INTO TEST VALUES (?, ?)", outerTx, i, i);
-        }
-
-        await(outerTx.rollbackAsync());
-
-        AsyncResultSet rs = await(ses.executeAsync(null, "SELECT VAL0 FROM 
TEST ORDER BY VAL0"));
-
-        assertEquals(0, StreamSupport.stream(rs.currentPage().spliterator(), 
false).count());
-
-        await(rs.closeAsync());
-    }
-
-    /** Check correctness of implicit and explicit transactions. */
-    @Test
-    public void checkTransactionsWithDml() {
-        IgniteSql sql = igniteSql();
-
-        if (sql instanceof ClientSql) {
-            return;
-        }
-
-        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
-        Session ses = sql.createSession();
-
-        TxManager txManagerInternal = txManager();
-
-        int txPrevCnt = txManagerInternal.finished();
-
-        for (int i = 0; i < ROW_COUNT; ++i) {
-            checkDml(1, ses, "INSERT INTO TEST VALUES (?, ?)", i, i);
-        }
-
-        // Outer tx with further commit.
-        Transaction outerTx = igniteTx().begin();
-
-        for (int i = ROW_COUNT; i < 2 * ROW_COUNT; ++i) {
-            checkDml(1, ses, "INSERT INTO TEST VALUES (?, ?)", outerTx, i, i);
-        }
-
-        outerTx.commit();
-
-        // Outdated tx.
-        Transaction outerTx0 = outerTx;
-        //ToDo: IGNITE-20387 , here should be used assertThrowsSqlException 
method with code and message `"Transaction is already finished"
-        IgniteException e = assertThrows(IgniteException.class,
-                () -> checkDml(1, ses, "INSERT INTO TEST VALUES (?, ?)", 
outerTx0, ROW_COUNT, Integer.MAX_VALUE));
-        assertEquals(Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR, 
e.code());
-
-        assertThrowsSqlException(
-                Sql.CONSTRAINT_VIOLATION_ERR,
-                "PK unique constraint is violated",
-                () -> checkDml(1, ses, "INSERT INTO TEST VALUES (?, ?)", 
ROW_COUNT, Integer.MAX_VALUE));
-
-        AsyncResultSet rs = await(ses.executeAsync(null, "SELECT VAL0 FROM 
TEST ORDER BY VAL0"));
-
-        assertEquals(2 * ROW_COUNT, 
StreamSupport.stream(rs.currentPage().spliterator(), false).count());
-
-        rs.closeAsync();
-
-        outerTx = igniteTx().begin();
-
-        rs = await(ses.executeAsync(outerTx, "SELECT VAL0 FROM TEST ORDER BY 
VAL0"));
-
-        assertEquals(2 * ROW_COUNT, 
StreamSupport.stream(rs.currentPage().spliterator(), false).count());
-
-        rs.closeAsync();
-
-        outerTx.commit();
-
-        checkDml(2 * ROW_COUNT, ses, "UPDATE TEST SET VAL0 = VAL0 + ?", 1);
-
-        checkDml(2 * ROW_COUNT, ses, "DELETE FROM TEST WHERE VAL0 >= 0");
-
-        assertEquals(ROW_COUNT + 1 + 1 + 1 + 1 + 1 + 1, 
txManagerInternal.finished() - txPrevCnt);
-
-        assertEquals(0, txManagerInternal.pending());
-    }
-
-    /** Check correctness of rw and ro transactions for table scan. */
-    @Test
-    public void checkMixedTransactionsForTable() {
-        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
-        Matcher<String> planMatcher = containsTableScan("PUBLIC", "TEST");
-
-        checkMixedTransactions(planMatcher);
-    }
-
-    /** Check correctness of rw and ro transactions for index scan. */
-    @Test
-    public void checkMixedTransactionsForIndex() throws Exception {
-        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-        sql("CREATE INDEX TEST_IDX ON TEST(VAL0)");
-
-        Matcher<String> planMatcher = containsIndexScan("PUBLIC", "TEST", 
"TEST_IDX");
-
-        checkMixedTransactions(planMatcher);
-    }
-
-    private void checkMixedTransactions(Matcher<String> planMatcher) {
-        IgniteSql sql = igniteSql();
-
-        if (sql instanceof ClientSql) {
-            return;
-        }
-
-        Session ses = sql.createSession();
-
-        for (int i = 0; i < ROW_COUNT; ++i) {
-            sql("INSERT INTO TEST VALUES (?, ?)", i, i);
-        }
-
-        List<Boolean> booleanList = List.of(Boolean.TRUE, Boolean.FALSE);
-        for (boolean roTx : booleanList) {
-            for (boolean commit : booleanList) {
-                for (boolean explicit : booleanList) {
-                    checkTx(ses, roTx, commit, explicit, planMatcher);
-                }
-            }
-        }
-    }
-
-    private void checkTx(Session ses, boolean readOnly, boolean commit, 
boolean explicit, Matcher<String> planMatcher) {
-        Transaction outerTx = explicit ? (readOnly ? igniteTx().begin(new 
TransactionOptions().readOnly(true)) : igniteTx().begin()) : null;
-
-        String query = "SELECT VAL0 FROM TEST ORDER BY VAL0";
-
-        assertQuery(outerTx, query).matches(planMatcher).check();
-
-        AsyncResultSet rs = await(ses.executeAsync(outerTx, query));
-
-        assertEquals(ROW_COUNT, 
StreamSupport.stream(rs.currentPage().spliterator(), false).count());
-
-        rs.closeAsync();
-
-        if (outerTx != null) {
-            if (commit) {
-                outerTx.commit();
-            } else {
-                outerTx.rollback();
-            }
-        }
-    }
-
-    @Test
-    public void select() {
-        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
-        IgniteSql sql = igniteSql();
-        Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 
4).build();
-
-        for (int i = 0; i < ROW_COUNT; ++i) {
-            ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i);
-        }
-
-        TestPageProcessor pageProc = new TestPageProcessor(4);
-        await(ses.executeAsync(null, "SELECT ID FROM 
TEST").thenCompose(pageProc));
-
-        Set<Integer> rs = pageProc.result().stream().map(r -> 
r.intValue(0)).collect(Collectors.toSet());
-
-        for (int i = 0; i < ROW_COUNT; ++i) {
-            assertTrue(rs.remove(i), "Results invalid: " + pageProc.result());
-        }
-
-        assertTrue(rs.isEmpty());
-    }
-
-    @Test
-    public void metadata() {
-        sql("CREATE TABLE TEST(COL0 BIGINT PRIMARY KEY, COL1 VARCHAR NOT 
NULL)");
-
-        IgniteSql sql = igniteSql();
-        Session ses = sql.sessionBuilder().build();
-
-        ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", 1L, "some string");
-
-        AsyncResultSet<SqlRow> rs = await(ses.executeAsync(null, "SELECT COL1, 
COL0 FROM TEST"));
-
-        // Validate columns metadata.
-        ResultSetMetadata meta = rs.metadata();
-
-        assertNotNull(meta);
-        assertEquals(-1, meta.indexOf("COL"));
-        assertEquals(0, meta.indexOf("COL1"));
-        assertEquals(1, meta.indexOf("COL0"));
-
-        checkMetadata(new ColumnMetadataImpl(
-                        "COL1",
-                        ColumnType.STRING,
-                        CatalogUtils.DEFAULT_VARLEN_LENGTH,
-                        ColumnMetadata.UNDEFINED_SCALE,
-                        false,
-                        new ColumnOriginImpl("PUBLIC", "TEST", "COL1")),
-                meta.columns().get(0));
-        checkMetadata(new ColumnMetadataImpl(
-                        "COL0",
-                        ColumnType.INT64,
-                        19,
-                        0,
-                        false,
-                        new ColumnOriginImpl("PUBLIC", "TEST", "COL0")),
-                meta.columns().get(1));
-
-        // Validate result columns types.
-        assertTrue(rs.hasRowSet());
-        assertEquals(1, rs.currentPageSize());
-
-        SqlRow row = rs.currentPage().iterator().next();
-
-        await(rs.closeAsync());
-
-        assertInstanceOf(meta.columns().get(0).valueClass(), row.value(0));
-        assertInstanceOf(meta.columns().get(1).valueClass(), row.value(1));
-    }
-
-    @Test
-    public void sqlRow() {
-        IgniteSql sql = igniteSql();
-        Session ses = sql.sessionBuilder().build();
-
-        AsyncResultSet<SqlRow> ars = await(ses.executeAsync(null, "SELECT 1 as 
COL_A, 2 as COL_B"));
-
-        SqlRow r = CollectionUtils.first(ars.currentPage());
-
-        assertEquals(2, r.columnCount());
-        assertEquals(0, r.columnIndex("COL_A"));
-        assertEquals(0, r.columnIndex("col_a"));
-        assertEquals(1, r.columnIndex("COL_B"));
-        assertEquals(-1, r.columnIndex("notExistColumn"));
-
-        assertEquals(1, r.intValue("COL_A"));
-        assertEquals(1, r.intValue("COL_a"));
-        assertEquals(2, r.intValue("COL_B"));
-
-        assertThrowsWithCause(
-                () -> r.intValue("notExistColumn"),
-                IllegalArgumentException.class,
-                "Column doesn't exist [name=notExistColumn]"
-        );
-
-        assertEquals(1, r.intValue(0));
-        assertEquals(2, r.intValue(1));
-        assertThrowsWithCause(() -> r.intValue(-2), 
IndexOutOfBoundsException.class);
-        assertThrowsWithCause(() -> r.intValue(10), 
IndexOutOfBoundsException.class);
-
-        await(ars.closeAsync());
-    }
-
+public class ItSqlAsynchronousApiTest extends ItSqlApiBaseTest {
     @Test
     public void pageSequence() {
         sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
@@ -582,7 +78,7 @@ public class ItSqlAsynchronousApiTest extends 
ClusterPerClassIntegrationTest {
                 .flatMap(p -> StreamSupport.stream(p.spliterator(), false))
                 .collect(Collectors.toList());
 
-        TestPageProcessor pageProc = new TestPageProcessor(ROW_COUNT - 
res.size());
+        AsyncResultProcessor pageProc = new AsyncResultProcessor(ROW_COUNT - 
res.size());
         await(ars4.fetchNextPage().thenCompose(pageProc));
 
         res.addAll(pageProc.result());
@@ -592,234 +88,50 @@ public class ItSqlAsynchronousApiTest extends 
ClusterPerClassIntegrationTest {
         }
     }
 
-    @Test
-    public void errors() {
-        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT NOT NULL)");
-
-        IgniteSql sql = igniteSql();
-        Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 
2).build();
-
-        for (int i = 0; i < ROW_COUNT; ++i) {
-            ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i);
-        }
-
-        // Parse error.
-        checkSqlError(Sql.STMT_PARSE_ERR, "Failed to parse query", ses, 
"SELECT ID FROM");
-
-        // Validation errors.
-        checkSqlError(Sql.STMT_VALIDATION_ERR, "Column 'VAL0' does not allow 
NULLs", ses,
-                "INSERT INTO TEST VALUES (2, NULL)");
-
-        checkSqlError(Sql.STMT_VALIDATION_ERR, "Object 'NOT_EXISTING_TABLE' 
not found", ses,
-                "SELECT * FROM NOT_EXISTING_TABLE");
-
-        checkSqlError(Sql.STMT_VALIDATION_ERR, "Column 'NOT_EXISTING_COLUMN' 
not found", ses,
-                "SELECT NOT_EXISTING_COLUMN FROM TEST");
-
-        checkSqlError(Sql.STMT_VALIDATION_ERR, "Multiple statements are not 
allowed", ses, "SELECT 1; SELECT 2");
-
-        checkSqlError(Sql.STMT_VALIDATION_ERR, "Table without PRIMARY KEY is 
not supported", ses,
-                "CREATE TABLE TEST2 (VAL INT)");
-
-        // Execute error.
-        checkSqlError(Sql.RUNTIME_ERR, "/ by zero", ses, "SELECT 1 / ?", 0);
-        checkSqlError(Sql.RUNTIME_ERR, "/ by zero", ses, "UPDATE TEST SET val0 
= val0/(val0 - ?) + " + ROW_COUNT, 0);
-        checkSqlError(Sql.RUNTIME_ERR, "negative substring length not 
allowed", ses, "SELECT SUBSTRING('foo', 1, -3)");
-
-        // No result set error.
-        {
-            AsyncResultSet ars = await(ses.executeAsync(null, "CREATE TABLE 
TEST3 (ID INT PRIMARY KEY)"));
-            assertThrowsSqlException(
-                    NoRowSetExpectedException.class,
-                    Sql.QUERY_NO_RESULT_SET_ERR, "Query has no result set",
-                    () -> await(ars.fetchNextPage()));
-        }
-
-        // Cursor closed error.
-        {
-            AsyncResultSet ars = await(ses.executeAsync(null, "SELECT * FROM 
TEST"));
-            await(ars.closeAsync());
-
-            assertThrowsSqlException(
-                    CursorClosedException.class,
-                    Sql.CURSOR_CLOSED_ERR,
-                    "Cursor is closed",
-                    () -> await(ars.fetchNextPage()));
-        }
+    @Override
+    protected ResultSet<SqlRow> executeForRead(Session ses, Transaction tx, 
String query, Object... args) {
+        return new SyncResultSetAdapter(await(ses.executeAsync(tx, query, 
args)));
     }
 
-    /**
-     * DDL is non-transactional.
-     */
-    @Test
-    public void ddlInTransaction() {
-        Session ses = igniteSql().createSession();
-        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
-        {
-            Transaction tx = igniteTx().begin();
-            try {
-                assertThrowsSqlException(
-                        Sql.STMT_VALIDATION_ERR,
-                        "DDL doesn't support transactions.",
-                        () -> await(ses.executeAsync(tx, "CREATE TABLE 
TEST2(ID INT PRIMARY KEY, VAL0 INT)"))
-                );
-            } finally {
-                tx.rollback();
-            }
-        }
-        {
-            Transaction tx = igniteTx().begin();
-            AsyncResultSet<SqlRow> res = await(ses.executeAsync(tx, "INSERT 
INTO TEST VALUES (?, ?)", -1, -1));
-            assertEquals(1, res.affectedRows());
-
-            assertThrowsSqlException(
-                    Sql.STMT_VALIDATION_ERR,
-                    "DDL doesn't support transactions.",
-                    () -> await(ses.executeAsync(tx, "CREATE TABLE TEST2(ID 
INT PRIMARY KEY, VAL0 INT)"))
-            );
-            tx.commit();
-
-            assertTrue(await(ses.executeAsync(null, "SELECT ID FROM TEST WHERE 
ID = -1")).currentPage().iterator().hasNext());
-        }
+    @Override
+    protected long[] executeBatch(Session ses, String sql, BatchedArguments 
args) {
+        return await(ses.executeBatchAsync(null, sql, args));
     }
 
-    @Test
-    public void closeSession() {
-        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
-        IgniteSql sql = igniteSql();
-        Session ses = sql.sessionBuilder().defaultPageSize(2).build();
-
-        for (int i = 0; i < ROW_COUNT; ++i) {
-            ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i);
-        }
-
-        AsyncResultSet ars0 = await(ses.executeAsync(null, "SELECT ID FROM 
TEST"));
-
-        await(ses.closeAsync());
-
-        // Fetched page is available after cancel.
-        ars0.currentPage();
-
-        SqlException sqlEx = assertThrowsSqlException(
-                Sql.EXECUTION_CANCELLED_ERR,
-                "The query was cancelled while executing",
-                () -> await(ars0.fetchNextPage()));
-
-        assertTrue(IgniteTestUtils.hasCause(sqlEx, 
QueryCancelledException.class, null));
-        assertThrowsSqlException(Sql.SESSION_CLOSED_ERR, "Session is closed", 
() -> await(ses.executeAsync(null, "SELECT ID FROM TEST")));
-    }
-
-    @Test
-    public void batch() {
-        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
-        IgniteSql sql = CLUSTER_NODES.get(0).sql();
-        Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 
2).build();
-
-        BatchedArguments args = BatchedArguments.of(0, 0);
-
-        for (int i = 1; i < ROW_COUNT; ++i) {
-            args.add(i, i);
-        }
-
-        long[] batchRes = await(ses.executeBatchAsync(null, "INSERT INTO TEST 
VALUES (?, ?)", args));
-
-        Arrays.stream(batchRes).forEach(r -> assertEquals(1L, r));
-
-        // Check that data are inserted OK
-        List<List<Object>> res = sql("SELECT ID FROM TEST ORDER BY ID");
-        IntStream.range(0, ROW_COUNT).forEach(i -> assertEquals(i, 
res.get(i).get(0)));
+    @Override
+    protected ResultProcessor execute(Integer expectedPages, Transaction tx, 
Session ses, String sql, Object... args) {
+        AsyncResultProcessor asyncProcessor = new 
AsyncResultProcessor(expectedPages);
+        await(ses.executeAsync(tx, sql, args).thenCompose(asyncProcessor));
 
-        // Check invalid query type
-        assertThrowsSqlException(
-                SqlBatchException.class,
-                Sql.STMT_VALIDATION_ERR,
-                "Invalid SQL statement type",
-                () -> await(ses.executeBatchAsync(null, "SELECT * FROM TEST", 
args)));
-
-        assertThrowsSqlException(
-                SqlBatchException.class,
-                Sql.STMT_VALIDATION_ERR,
-                "Invalid SQL statement type",
-                () -> await(ses.executeBatchAsync(null, "CREATE TABLE TEST1(ID 
INT PRIMARY KEY, VAL0 INT)", args)));
+        return asyncProcessor;
     }
 
-    @Test
-    public void batchIncomplete() {
-        int err = ROW_COUNT / 2;
-
-        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
-        IgniteSql sql = CLUSTER_NODES.get(0).sql();
-        Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 
2).build();
-
-        BatchedArguments args = BatchedArguments.of(0, 0);
-
-        for (int i = 1; i < ROW_COUNT; ++i) {
-            if (i == err) {
-                args.add(1, 1);
-            } else {
-                args.add(i, i);
-            }
-        }
-
-        SqlBatchException ex = assertThrowsSqlException(
-                SqlBatchException.class,
-                Sql.CONSTRAINT_VIOLATION_ERR,
-                "PK unique constraint is violated",
-                () -> await(ses.executeBatchAsync(null, "INSERT INTO TEST 
VALUES (?, ?)", args))
-        );
-
-        assertEquals(err, ex.updateCounters().length);
-        IntStream.range(0, ex.updateCounters().length).forEach(i -> 
assertEquals(1, ex.updateCounters()[i]));
+    @Override
+    protected void rollback(Transaction tx) {
+        await(tx.rollbackAsync());
     }
 
-    @Test
-    public void resultSetCloseShouldFinishImplicitTransaction() throws 
InterruptedException {
-        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
-        IgniteSql sql = igniteSql();
-        Session ses = sql.sessionBuilder().defaultPageSize(2).build();
-
-        for (int i = 0; i < ROW_COUNT; ++i) {
-            ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i);
-        }
-
-        CompletableFuture<AsyncResultSet<SqlRow>> f = ses.executeAsync(null, 
"SELECT * FROM TEST");
-
-        AsyncResultSet<SqlRow> ars = f.join();
-        // There should be a pending transaction since not all data was read.
-        boolean txStarted = waitForCondition(() -> txManager().pending() == 1, 
5000);
-        assertTrue(txStarted, "No pending transactions");
-
-        ars.closeAsync().join();
-        assertEquals(0, txManager().pending(), "Expected no pending 
transactions");
+    @Override
+    protected void commit(Transaction tx) {
+        await(tx.commitAsync());
     }
 
-    @Test
-    public void resultSetFullReadShouldFinishImplicitTransaction() {
-        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
-        IgniteSql sql = igniteSql();
-
-        // Fetch all data in one read.
-        Session ses = sql.sessionBuilder().defaultPageSize(100).build();
-
-        for (int i = 0; i < ROW_COUNT; ++i) {
-            ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i);
-        }
+    @Override
+    protected void checkDml(int expectedAffectedRows, Transaction tx, Session 
ses, String sql, Object... args) {
+        AsyncResultSet asyncRes = await(ses.executeAsync(tx, sql, args));
 
-        CompletableFuture<AsyncResultSet<SqlRow>> f = ses.executeAsync(null, 
"SELECT * FROM TEST");
+        assertFalse(asyncRes.wasApplied());
+        assertFalse(asyncRes.hasMorePages());
+        assertFalse(asyncRes.hasRowSet());
+        assertEquals(expectedAffectedRows, asyncRes.affectedRows());
 
-        AsyncResultSet<SqlRow> ars = f.join();
-        assertFalse(ars.hasMorePages());
+        assertNull(asyncRes.metadata());
 
-        assertEquals(0, txManager().pending(), "Expected no pending 
transactions");
+        await(asyncRes.closeAsync());
     }
 
-    private static void checkDdl(boolean expectedApplied, Session ses, String 
sql, Transaction tx) {
+    @Override
+    protected void checkDdl(boolean expectedApplied, Session ses, String sql, 
Transaction tx) {
         CompletableFuture<AsyncResultSet<SqlRow>> fut = ses.executeAsync(
                 tx,
                 sql
@@ -837,93 +149,56 @@ public class ItSqlAsynchronousApiTest extends 
ClusterPerClassIntegrationTest {
         await(asyncRes.closeAsync());
     }
 
-    private static void checkDdl(boolean expectedApplied, Session ses, String 
sql) {
-        checkDdl(expectedApplied, ses, sql, null);
-    }
-
-    private static <T extends IgniteException> T checkError(Class<T> expCls, 
Integer code, String msg, Session ses, String sql,
-            Object... args) {
-        return assertThrowsPublicException(() -> await(ses.executeAsync(null, 
sql, args)), expCls, code, msg);
-    }
-
-    private static SqlException checkSqlError(
-            int code,
-            String msg,
-            Session ses,
-            String sql,
-            Object... args
-    ) {
-        return assertThrowsSqlException(code, msg, () -> 
await(ses.executeAsync(null, sql, args)));
-    }
-
-    protected static void checkDml(int expectedAffectedRows, Session ses, 
String sql, Transaction tx, Object... args) {
-        AsyncResultSet asyncRes = await(ses.executeAsync(tx, sql, args));
-
-        assertFalse(asyncRes.wasApplied());
-        assertFalse(asyncRes.hasMorePages());
-        assertFalse(asyncRes.hasRowSet());
-        assertEquals(expectedAffectedRows, asyncRes.affectedRows());
-
-        assertNull(asyncRes.metadata());
-
-        await(asyncRes.closeAsync());
-    }
-
-    private static void checkDml(int expectedAffectedRows, Session ses, String 
sql, Object... args) {
-        checkDml(expectedAffectedRows, ses, sql, null, args);
-    }
-
-    static <T extends IgniteException> T assertThrowsPublicException(
-            RunnableX executable,
-            Class<T> expCls,
-            @Nullable Integer code,
-            @Nullable String msgPart
-    ) {
-        T ex = assertThrows(expCls, executable::run);
-
-        if (code != null) {
-            assertEquals(new IgniteException(code).codeAsString(), 
ex.codeAsString());
-        }
-
-        if (msgPart != null) {
-            assertThat(ex.getMessage(), containsString(msgPart));
-        }
-
-        return ex;
-    }
-
-    static class TestPageProcessor implements
+    static class AsyncResultProcessor implements ResultProcessor,
             Function<AsyncResultSet<SqlRow>, 
CompletionStage<AsyncResultSet<SqlRow>>> {
-        private int expectedPages;
+        private Integer expectedPages;
 
+        private long affectedRows;
         private final List<SqlRow> res = new ArrayList<>();
 
-        TestPageProcessor(int expectedPages) {
+        AsyncResultProcessor(Integer expectedPages) {
             this.expectedPages = expectedPages;
         }
 
         @Override
         public CompletionStage<AsyncResultSet<SqlRow>> 
apply(AsyncResultSet<SqlRow> rs) {
-            expectedPages--;
-
-            assertTrue(rs.hasRowSet());
             assertFalse(rs.wasApplied());
-            assertEquals(-1L, rs.affectedRows());
-            assertEquals(expectedPages > 0, rs.hasMorePages(),
-                    "hasMorePages(): [expected=" + (expectedPages > 0) + ", 
actual=" + rs.hasMorePages() + ']');
+            //SELECT
+            if (rs.hasRowSet()) {
+                assertEquals(-1L, rs.affectedRows());
+
+                if (expectedPages != null) {
+                    expectedPages--;
+                    assertEquals(expectedPages > 0, rs.hasMorePages(),
+                            "hasMorePages(): [expected=" + (expectedPages > 0) 
+ ", actual=" + rs.hasMorePages() + ']');
+                }
 
-            rs.currentPage().forEach(res::add);
+                rs.currentPage().forEach(res::add);
 
-            if (rs.hasMorePages()) {
-                return rs.fetchNextPage().thenCompose(this);
+                if (rs.hasMorePages()) {
+                    return rs.fetchNextPage().thenCompose(this);
+                }
+            } else { //DML/DDL
+                affectedRows = rs.affectedRows();
+                assertNotEquals(-1L, affectedRows());
             }
 
             return rs.closeAsync().thenApply(v -> rs);
         }
 
+        @Override
         public List<SqlRow> result() {
             //noinspection AssignmentOrReturnOfFieldWithMutableType
+            if (expectedPages != null) {
+                assertEquals(0, expectedPages, "Expected to be read more 
pages");
+            }
+
             return res;
         }
+
+        @Override
+        public long affectedRows() {
+            return affectedRows;
+        }
     }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java
index 356ce99c5d..425a76f56f 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java
@@ -57,4 +57,10 @@ public class ItSqlClientAsynchronousApiTest extends 
ItSqlAsynchronousApiTest {
     public void closeSession() {
         super.closeSession();
     }
+
+    @Override
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-20598";)
+    public void checkTransactionsWithDml() {
+        super.checkTransactionsWithDml();
+    }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java
index 671f0d3a1d..5bb9e4d9ae 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java
@@ -21,11 +21,10 @@ import static 
org.apache.ignite.internal.runner.app.client.ItAbstractThinClientT
 
 import org.apache.ignite.client.IgniteClient;
 import org.apache.ignite.sql.IgniteSql;
-import org.apache.ignite.sql.Session;
 import org.apache.ignite.tx.IgniteTransactions;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Disabled;
 
 /**
  * Tests for synchronous client SQL API.
@@ -56,19 +55,14 @@ public class ItSqlClientSynchronousApiTest extends 
ItSqlSynchronousApiTest {
     }
 
     @Override
-    @Test
-    public void dml() {
-        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
-        IgniteSql sql = igniteSql();
-        Session ses = sql.createSession();
-
-        for (int i = 0; i < ROW_COUNT; ++i) {
-            checkDml(1, ses, "INSERT INTO TEST VALUES (?, ?)", i, i);
-        }
-
-        checkDml(ROW_COUNT, ses, "UPDATE TEST SET VAL0 = VAL0 + ?", 1);
+    @Disabled("IGNITE-17134")
+    public void closeSession() {
+        super.closeSession();
+    }
 
-        checkDml(ROW_COUNT, ses, "DELETE FROM TEST WHERE VAL0 >= 0");
+    @Override
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-20598";)
+    public void checkTransactionsWithDml() {
+        super.checkTransactionsWithDml();
     }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
index d616f75134..acc946bb53 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
@@ -17,539 +17,81 @@
 
 package org.apache.ignite.internal.sql.api;
 
-import static 
org.apache.ignite.internal.sql.api.ItSqlAsynchronousApiTest.assertThrowsPublicException;
-import static 
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
-import java.util.Arrays;
-import java.util.HashSet;
+import java.util.ArrayList;
 import java.util.List;
-import java.util.Set;
-import java.util.stream.IntStream;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.internal.app.IgniteImpl;
-import org.apache.ignite.internal.sql.engine.ClusterPerClassIntegrationTest;
-import org.apache.ignite.internal.tx.TxManager;
-import org.apache.ignite.lang.ColumnAlreadyExistsException;
-import org.apache.ignite.lang.ColumnNotFoundException;
-import org.apache.ignite.lang.ErrorGroups;
-import org.apache.ignite.lang.ErrorGroups.Index;
-import org.apache.ignite.lang.ErrorGroups.Sql;
-import org.apache.ignite.lang.ErrorGroups.Transactions;
-import org.apache.ignite.lang.IgniteException;
-import org.apache.ignite.lang.IndexAlreadyExistsException;
-import org.apache.ignite.lang.IndexNotFoundException;
-import org.apache.ignite.lang.TableAlreadyExistsException;
-import org.apache.ignite.lang.TableNotFoundException;
 import org.apache.ignite.sql.BatchedArguments;
-import org.apache.ignite.sql.CursorClosedException;
-import org.apache.ignite.sql.IgniteSql;
-import org.apache.ignite.sql.NoRowSetExpectedException;
 import org.apache.ignite.sql.ResultSet;
 import org.apache.ignite.sql.Session;
-import org.apache.ignite.sql.SqlBatchException;
-import org.apache.ignite.sql.SqlException;
 import org.apache.ignite.sql.SqlRow;
-import org.apache.ignite.table.Table;
 import org.apache.ignite.tx.Transaction;
-import org.apache.ignite.tx.TransactionOptions;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
 
 /**
  * Tests for synchronous SQL API.
  */
 @SuppressWarnings("ThrowableNotThrown")
-public class ItSqlSynchronousApiTest extends ClusterPerClassIntegrationTest {
-    private static final int ROW_COUNT = 16;
-
-    @AfterEach
-    public void dropTables() {
-        for (Table t : CLUSTER_NODES.get(0).tables().tables()) {
-            sql("DROP TABLE " + t.name());
-        }
+public class ItSqlSynchronousApiTest extends ItSqlApiBaseTest {
+    @Override
+    protected ResultSet<SqlRow> executeForRead(Session ses, Transaction tx, 
String query, Object... args) {
+        return ses.execute(tx, query, args);
     }
 
-    @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-20096";)
-    public void ddl() throws Exception {
-        IgniteSql sql = igniteSql();
-        Session ses = sql.createSession();
-
-        // CREATE TABLE
-        checkDdl(true, ses, "CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-        checkError(
-                TableAlreadyExistsException.class,
-                ErrorGroups.Table.TABLE_ALREADY_EXISTS_ERR,
-                "Table already exists [name=\"PUBLIC\".\"TEST\"]",
-                ses,
-                "CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"
-        );
-        checkSqlError(
-                ErrorGroups.Table.TABLE_DEFINITION_ERR,
-                "Can't create table with duplicate columns: ID, VAL, VAL",
-                ses,
-                "CREATE TABLE TEST1(ID INT PRIMARY KEY, VAL INT, VAL INT)"
-        );
-        checkDdl(false, ses, "CREATE TABLE IF NOT EXISTS TEST(ID INT PRIMARY 
KEY, VAL VARCHAR)");
-
-        // ADD COLUMN
-        checkDdl(true, ses, "ALTER TABLE TEST ADD COLUMN VAL1 VARCHAR");
-        checkError(
-                TableNotFoundException.class,
-                ErrorGroups.Table.TABLE_NOT_FOUND_ERR,
-                "The table does not exist 
[name=\"PUBLIC\".\"NOT_EXISTS_TABLE\"]",
-                ses,
-                "ALTER TABLE NOT_EXISTS_TABLE ADD COLUMN VAL1 VARCHAR"
-        );
-        checkDdl(false, ses, "ALTER TABLE IF EXISTS NOT_EXISTS_TABLE ADD 
COLUMN VAL1 VARCHAR");
-        checkError(
-                ColumnAlreadyExistsException.class,
-                ErrorGroups.Table.COLUMN_ALREADY_EXISTS_ERR,
-                "Column already exists [name=\"VAL1\"]",
-                ses,
-                "ALTER TABLE TEST ADD COLUMN VAL1 INT"
-        );
-
-        // CREATE INDEX
-        checkDdl(true, ses, "CREATE INDEX TEST_IDX ON TEST(VAL0)");
-        checkError(
-                IndexAlreadyExistsException.class,
-                Index.INDEX_ALREADY_EXISTS_ERR,
-                "Index already exists [name=\"PUBLIC\".\"TEST_IDX\"]",
-                ses,
-                "CREATE INDEX TEST_IDX ON TEST(VAL1)"
-        );
-        checkDdl(false, ses, "CREATE INDEX IF NOT EXISTS TEST_IDX ON 
TEST(VAL1)");
-
-        // TODO: IGNITE-19150 We are waiting for schema synchronization to 
avoid races to create and destroy indexes
-        waitForIndexBuild("TEST", "TEST_IDX");
-
-        checkDdl(true, ses, "DROP INDEX TESt_iDX");
-        checkDdl(true, ses, "CREATE INDEX TEST_IDX1 ON TEST(VAL0)");
-        checkDdl(true, ses, "CREATE INDEX TEST_IDX2 ON TEST(VAL0)");
-        checkDdl(true, ses, "CREATE INDEX TEST_IDX3 ON TEST(ID, VAL0, VAL1)");
-        checkSqlError(
-                Index.INVALID_INDEX_DEFINITION_ERR,
-                "Can't create index on duplicate columns: VAL0, VAL0",
-                ses,
-                "CREATE INDEX TEST_IDX4 ON TEST(VAL0, VAL0)"
-        );
-
-        checkSqlError(
-                Sql.STMT_VALIDATION_ERR,
-                "Can`t delete column(s). Column VAL1 is used by indexes 
[TEST_IDX3].",
-                ses,
-                "ALTER TABLE TEST DROP COLUMN val1"
-        );
-
-        SqlException ex = checkSqlError(
-                Sql.STMT_VALIDATION_ERR,
-                "Can`t delete column(s).",
-                ses,
-                "ALTER TABLE TEST DROP COLUMN (val0, val1)"
-        );
-
-        String msg = ex.getMessage();
-        String explainMsg = "Unexpected error message: " + msg;
-
-        assertTrue(msg.contains("Column VAL0 is used by indexes ["), 
explainMsg);
-        assertTrue(msg.contains("TEST_IDX1") && msg.contains("TEST_IDX2") && 
msg.contains("TEST_IDX3"), explainMsg);
-        assertTrue(msg.contains("Column VAL1 is used by indexes [TEST_IDX3]"), 
explainMsg);
-
-        checkSqlError(
-                Sql.STMT_VALIDATION_ERR,
-                "Can`t delete column, belongs to primary key: [name=ID]",
-                ses,
-                "ALTER TABLE TEST DROP COLUMN id"
-        );
-
-        // TODO: IGNITE-19150 We are waiting for schema synchronization to 
avoid races to create and destroy indexes
-        waitForIndexBuild("TEST", "TEST_IDX3");
-        checkDdl(true, ses, "DROP INDEX TESt_iDX3");
-
-        // DROP COLUMNS
-        checkDdl(true, ses, "ALTER TABLE TEST DROP COLUMN VAL1");
-        checkError(
-                TableNotFoundException.class,
-                ErrorGroups.Table.TABLE_NOT_FOUND_ERR,
-                "The table does not exist 
[name=\"PUBLIC\".\"NOT_EXISTS_TABLE\"]",
-                ses,
-                "ALTER TABLE NOT_EXISTS_TABLE DROP COLUMN VAL1"
-        );
-        checkDdl(false, ses, "ALTER TABLE IF EXISTS NOT_EXISTS_TABLE DROP 
COLUMN VAL1");
-        checkError(
-                ColumnNotFoundException.class,
-                ErrorGroups.Table.COLUMN_NOT_FOUND_ERR,
-                "Column does not exist [tableName=\"PUBLIC\".\"TEST\", 
columnName=\"VAL1\"]",
-                ses,
-                "ALTER TABLE TEST DROP COLUMN VAL1"
-        );
-
-        // DROP TABLE
-        checkDdl(false, ses, "DROP TABLE IF EXISTS NOT_EXISTS_TABLE");
-
-        checkDdl(true, ses, "DROP TABLE TEST");
-        checkError(
-                TableNotFoundException.class,
-                ErrorGroups.Table.TABLE_NOT_FOUND_ERR,
-                "The table does not exist [name=\"PUBLIC\".\"TEST\"]",
-                ses,
-                "DROP TABLE TEST"
-        );
-
-        checkDdl(false, ses, "DROP INDEX IF EXISTS TEST_IDX");
-
-        checkError(
-                IndexNotFoundException.class,
-                Index.INDEX_NOT_FOUND_ERR,
-                "Index does not exist [name=\"PUBLIC\".\"TEST_IDX\"]", ses,
-                "DROP INDEX TEST_IDX"
-        );
-    }
-
-    @Test
-    public void dml() {
-        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
-        IgniteSql sql = igniteSql();
-        Session ses = sql.createSession();
-
-        TxManager txManagerInternal = txManager();
-
-        int txPrevCnt = txManagerInternal.finished();
-
-        for (int i = 0; i < ROW_COUNT; ++i) {
-            checkDml(1, ses, "INSERT INTO TEST VALUES (?, ?)", i, i);
-        }
-
-        assertEquals(ROW_COUNT, txManagerInternal.finished() - txPrevCnt);
-
-        assertEquals(0, txManagerInternal.pending());
-
-        checkDml(ROW_COUNT, ses, "UPDATE TEST SET VAL0 = VAL0 + ?", 1);
-
-        checkDml(ROW_COUNT, ses, "DELETE FROM TEST WHERE VAL0 >= 0");
-    }
-
-    @SuppressWarnings("UnstableApiUsage")
-    @Test
-    public void select() throws Exception {
-        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
-        IgniteSql sql = igniteSql();
-        Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 
4).build();
-
-        for (int i = 0; i < ROW_COUNT; ++i) {
-            ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i);
-        }
-
-        ResultSet<SqlRow> rs = ses.execute(null, "SELECT ID FROM TEST");
-
-        Set<Integer> set = new HashSet<>();
-
-        rs.forEachRemaining(r -> set.add(r.intValue(0)));
-
-        rs.close();
-
-        for (int i = 0; i < ROW_COUNT; ++i) {
-            assertTrue(set.remove(i), "Results invalid: " + rs);
-        }
-
-        assertTrue(set.isEmpty());
-    }
-
-    @Test
-    public void errors() throws InterruptedException {
-        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT NOT NULL)");
-        IgniteSql sql = igniteSql();
-        Session ses = sql.sessionBuilder().defaultPageSize(2).build();
-
-        for (int i = 0; i < ROW_COUNT; ++i) {
-            ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i);
-        }
-
-        // Parse error.
-        checkSqlError(Sql.STMT_PARSE_ERR, "Failed to parse query", ses, 
"SELECT ID FROM");
-
-        // Validation errors.
-        checkSqlError(Sql.STMT_VALIDATION_ERR, "Column 'VAL0' does not allow 
NULLs", ses,
-                "INSERT INTO TEST VALUES (2, NULL)");
-
-        checkSqlError(Sql.STMT_VALIDATION_ERR, "Object 'NOT_EXISTING_TABLE' 
not found", ses,
-                "SELECT * FROM NOT_EXISTING_TABLE");
-
-        checkSqlError(Sql.STMT_VALIDATION_ERR, "Column 'NOT_EXISTING_COLUMN' 
not found", ses,
-                "SELECT NOT_EXISTING_COLUMN FROM TEST");
-
-        checkSqlError(Sql.STMT_VALIDATION_ERR, "Multiple statements are not 
allowed", ses, "SELECT 1; SELECT 2");
-
-        checkSqlError(Sql.STMT_VALIDATION_ERR, "Table without PRIMARY KEY is 
not supported", ses,
-                "CREATE TABLE TEST2 (VAL INT)");
-
-        // Execute error.
-        checkSqlError(Sql.RUNTIME_ERR, "/ by zero", ses, "SELECT 1 / ?", 0);
-        checkSqlError(Sql.RUNTIME_ERR, "/ by zero", ses, "UPDATE TEST SET val0 
= val0/(val0 - ?) + " + ROW_COUNT, 0);
-        checkSqlError(Sql.RUNTIME_ERR, "negative substring length not 
allowed", ses, "SELECT SUBSTRING('foo', 1, -3)");
-
-        // No result set error.
-        {
-            ResultSet rs = ses.execute(null, "CREATE TABLE TEST3 (ID INT 
PRIMARY KEY)");
-            assertThrowsSqlException(NoRowSetExpectedException.class, 
Sql.QUERY_NO_RESULT_SET_ERR, "Query has no result set", rs::next);
-        }
-
-        // Cursor closed error.
-        {
-            ResultSet rs = ses.execute(null, "SELECT * FROM TEST");
-            Thread.sleep(300); // ResultSetImpl fetches next page in 
background, wait to it to complete to avoid flakiness.
-            rs.close();
-            assertThrowsSqlException(CursorClosedException.class, 
Sql.CURSOR_CLOSED_ERR, "Cursor is closed",
-                    () -> rs.forEachRemaining(Object::hashCode));
-        }
-    }
-
-    /**
-     * DDL is non-transactional.
-     */
-    @Test
-    public void ddlInTransaction() {
-        Session ses = igniteSql().createSession();
-        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
-        {
-            Transaction tx = igniteTx().begin();
-            try {
-                assertThrowsSqlException(
-                        Sql.STMT_VALIDATION_ERR,
-                        "DDL doesn't support transactions.",
-                        () -> ses.execute(tx, "CREATE TABLE TEST2(ID INT 
PRIMARY KEY, VAL0 INT)")
-                );
-            } finally {
-                tx.rollback();
-            }
-        }
-        {
-            Transaction tx = igniteTx().begin();
-            ResultSet<SqlRow> res = ses.execute(tx, "INSERT INTO TEST VALUES 
(?, ?)", -1, -1);
-            assertEquals(1, res.affectedRows());
-
-            assertThrowsSqlException(
-                    Sql.STMT_VALIDATION_ERR,
-                    "DDL doesn't support transactions.",
-                    () -> ses.execute(tx, "CREATE TABLE TEST2(ID INT PRIMARY 
KEY, VAL0 INT)")
-            );
-            tx.commit();
-
-            assertTrue(ses.execute(null, "SELECT ID FROM TEST WHERE ID = 
-1").hasNext());
-        }
-
-        assertEquals(0, ((IgniteImpl) 
CLUSTER_NODES.get(0)).txManager().pending());
+    @Override
+    protected long[] executeBatch(Session ses, String sql, BatchedArguments 
args) {
+        return ses.executeBatch(null, sql, args);
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {
-            "INSERT INTO tst VALUES (2, ?)",
-            "SELECT * FROM tst WHERE id = ? "
-    })
-    public void runtimeErrorInDmlCausesTransactionToFail(String query) {
-        sql("CREATE TABLE tst(id INTEGER PRIMARY KEY, val INTEGER)");
-
-        sql("INSERT INTO tst VALUES (?,?)", 1, 1);
 
-        try (Session ses = igniteSql().createSession()) {
-            Transaction tx = igniteTx().begin();
-            String dmlQuery = "UPDATE tst SET val = val/(val - ?) + 1";
+    @Override
+    protected ResultProcessor execute(Integer expectedPages, Transaction tx, 
Session ses, String sql, Object... args) {
+        SyncPageProcessor syncProcessor = new SyncPageProcessor();
 
-            assertThrowsSqlException(
-                    Sql.RUNTIME_ERR,
-                    "/ by zero",
-                    () -> ses.execute(tx, dmlQuery, 1).affectedRows());
+        ResultSet<SqlRow> rs = ses.execute(tx, sql, args);
+        syncProcessor.process(rs);
 
-            IgniteException err = assertThrows(IgniteException.class, () -> {
-                ResultSet<SqlRow> rs = ses.execute(tx, query, 2);
-                if (rs.hasRowSet()) {
-                    assertTrue(rs.hasNext());
-                } else {
-                    assertTrue(rs.wasApplied());
-                }
-            });
-
-            assertEquals(Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR, 
err.code(), err.toString());
-        }
+        return syncProcessor;
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {
-            "INSERT INTO tst VALUES (2, ?)",
-            "SELECT * FROM tst WHERE id = ? "
-    })
-    public void runtimeErrorInQueryCausesTransactionToFail(String query) {
-        sql("CREATE TABLE tst(id INTEGER PRIMARY KEY, val INTEGER)");
-
-        sql("INSERT INTO tst VALUES (?,?)", 1, 1);
-
-        try (Session ses = igniteSql().createSession()) {
-            Transaction tx = igniteTx().begin();
-
-            assertThrowsSqlException(
-                    Sql.RUNTIME_ERR,
-                    "/ by zero",
-                    () -> ses.execute(tx, "SELECT val/? FROM tst WHERE id=?", 
0, 1).next());
+    protected ResultProcessor execute(int expectedPages, Transaction tx, 
Session ses, String sql, Object... args) {
+        SyncPageProcessor syncProcessor = new SyncPageProcessor();
 
-            IgniteException err = assertThrows(IgniteException.class, () -> {
-                ResultSet<SqlRow> rs = ses.execute(tx, query, 2);
-                if (rs.hasRowSet()) {
-                    assertTrue(rs.hasNext());
-                } else {
-                    assertTrue(rs.wasApplied());
-                }
-            });
+        ResultSet<SqlRow> rs = ses.execute(tx, sql, args);
+        syncProcessor.process(rs);
 
-            assertEquals(Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR, 
err.code(), err.toString());
-        }
+        return syncProcessor;
     }
 
-    @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-20534";)
-    public void testLockIsNotReleasedAfterTxRollback() {
-        Ignite ignite = CLUSTER_NODES.get(0);
-        IgniteSql sql = ignite.sql();
-
-        try (Session ses = ignite.sql().createSession()) {
-            ses.execute(null, "CREATE TABLE IF NOT EXISTS tst(id INTEGER 
PRIMARY KEY, val INTEGER)").affectedRows();
-        }
-
-        try (Session session = sql.createSession()) {
-            Transaction tx = ignite.transactions().begin();
-
-            assertThrows(RuntimeException.class, () -> session.execute(tx, 
"SELECT 1/0"));
-
-            tx.rollback();
-
-            session.execute(tx, "INSERT INTO tst VALUES (1, 1)");
-        }
-
-        try (Session session = sql.createSession()) {
-            Transaction tx = ignite.transactions().begin(new 
TransactionOptions().readOnly(false));
-
-            session.execute(tx, "INSERT INTO tst VALUES (1, 1)");
-
-            tx.commit();
-        }
+    @Override
+    protected void rollback(Transaction tx) {
+        tx.rollback();
     }
 
-    @Test
-    public void batch() {
-        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
-        IgniteSql sql = CLUSTER_NODES.get(0).sql();
-        Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 
2).build();
-
-        BatchedArguments args = BatchedArguments.of(0, 0);
-
-        for (int i = 1; i < ROW_COUNT; ++i) {
-            args.add(i, i);
-        }
-
-        long[] batchRes = ses.executeBatch(null, "INSERT INTO TEST VALUES (?, 
?)", args);
-
-        Arrays.stream(batchRes).forEach(r -> assertEquals(1L, r));
-
-        // Check that data are inserted OK
-        List<List<Object>> res = sql("SELECT ID FROM TEST ORDER BY ID");
-        IntStream.range(0, ROW_COUNT).forEach(i -> assertEquals(i, 
res.get(i).get(0)));
-
-        // Check invalid query type
-        assertThrowsSqlException(
-                SqlBatchException.class,
-                Sql.STMT_VALIDATION_ERR,
-                "Invalid SQL statement type",
-                () -> ses.executeBatch(null, "SELECT * FROM TEST", args)
-        );
-
-        assertThrowsSqlException(
-                SqlBatchException.class,
-                Sql.STMT_VALIDATION_ERR,
-                "Invalid SQL statement type",
-                () -> ses.executeBatch(null, "CREATE TABLE TEST1(ID INT 
PRIMARY KEY, VAL0 INT)", args)
-        );
+    @Override
+    protected void commit(Transaction tx) {
+        tx.commit();
     }
 
-    @Test
-    public void batchIncomplete() {
-        int err = ROW_COUNT / 2;
-
-        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
-        IgniteSql sql = CLUSTER_NODES.get(0).sql();
-        Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 
2).build();
-
-        BatchedArguments args = BatchedArguments.of(0, 0);
-
-        for (int i = 1; i < ROW_COUNT; ++i) {
-            if (i == err) {
-                args.add(1, 1);
-            } else {
-                args.add(i, i);
-            }
-        }
-
-        SqlBatchException batchEx = assertThrows(
-                SqlBatchException.class,
-                () -> ses.executeBatch(null, "INSERT INTO TEST VALUES (?, ?)", 
args)
+    @Override
+    protected void checkDml(int expectedAffectedRows, Transaction tx, Session 
ses, String sql, Object... args) {
+        ResultSet res = ses.execute(
+                tx,
+                sql,
+                args
         );
 
-        assertEquals(Sql.CONSTRAINT_VIOLATION_ERR, batchEx.code());
-        assertEquals(err, batchEx.updateCounters().length);
-        IntStream.range(0, batchEx.updateCounters().length).forEach(i -> 
assertEquals(1, batchEx.updateCounters()[i]));
-    }
-
-    @Test
-    public void resultSetCloseShouldFinishImplicitTransaction() {
-        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
-        IgniteSql sql = igniteSql();
-        Session ses = sql.sessionBuilder().defaultPageSize(2).build();
-
-        for (int i = 0; i < ROW_COUNT; ++i) {
-            ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i);
-        }
-
-        ResultSet<?> rs = ses.execute(null, "SELECT * FROM TEST");
-        assertEquals(1, txManager().pending());
-        rs.close();
-        assertEquals(0, txManager().pending(), "Expected no pending 
transactions");
-    }
-
-    @Test
-    public void resultSetFullReadShouldFinishImplicitTransaction() {
-        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-        for (int i = 0; i < ROW_COUNT; ++i) {
-            sql("INSERT INTO TEST VALUES (?, ?)", i, i);
-        }
-
-        IgniteSql sql = igniteSql();
-
-        // Fetch all data in one read.
-        Session ses = sql.sessionBuilder().defaultPageSize(100).build();
-        ResultSet<SqlRow> rs = ses.execute(null, "SELECT * FROM TEST");
-
-        while (rs.hasNext()) {
-            rs.next();
-        }
+        assertFalse(res.wasApplied());
+        assertFalse(res.hasRowSet());
+        assertEquals(expectedAffectedRows, res.affectedRows());
 
-        assertEquals(0, txManager().pending(), "Expected no pending 
transactions");
+        res.close();
     }
 
-    private static void checkDdl(boolean expectedApplied, Session ses, String 
sql) {
+    @Override
+    protected void checkDdl(boolean expectedApplied, Session ses, String sql, 
Transaction tx) {
         ResultSet res = ses.execute(
-                null,
+                tx,
                 sql
         );
 
@@ -560,32 +102,24 @@ public class ItSqlSynchronousApiTest extends 
ClusterPerClassIntegrationTest {
         res.close();
     }
 
-    private static <T extends IgniteException> T checkError(Class<T> expCls, 
Integer code, String msg, Session ses, String sql,
-            Object... args) {
-        return assertThrowsPublicException(() -> ses.execute(null, sql, args), 
expCls, code, msg);
-    }
+    static class SyncPageProcessor implements ResultProcessor {
+        private final List<SqlRow> res = new ArrayList<>();
+        private long affectedRows;
 
-    private static SqlException checkSqlError(
-            int code,
-            String msg,
-            Session ses,
-            String sql,
-            Object... args
-    ) {
-        return assertThrowsSqlException(code, msg, () -> ses.execute(null, 
sql, args));
-    }
-
-    static void checkDml(int expectedAffectedRows, Session ses, String sql, 
Object... args) {
-        ResultSet res = ses.execute(
-                null,
-                sql,
-                args
-        );
+        @Override
+        public List<SqlRow> result() {
+            //noinspection AssignmentOrReturnOfFieldWithMutableType
+            return res;
+        }
 
-        assertFalse(res.wasApplied());
-        assertFalse(res.hasRowSet());
-        assertEquals(expectedAffectedRows, res.affectedRows());
+        @Override
+        public long affectedRows() {
+            return affectedRows;
+        }
 
-        res.close();
+        public void process(ResultSet<SqlRow> resultSet) {
+            affectedRows = resultSet.affectedRows();
+            resultSet.forEachRemaining(res::add);
+        }
     }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCreateTableDdlTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCreateTableDdlTest.java
index ac9e206455..f46465ea17 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCreateTableDdlTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCreateTableDdlTest.java
@@ -30,7 +30,6 @@ import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.lang.ErrorGroups.Sql;
 import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 /**
@@ -119,17 +118,16 @@ public class ItCreateTableDdlTest extends 
ClusterPerClassIntegrationTest {
      * Check invalid colocation columns configuration: - not PK columns; - 
duplicates colocation columns.
      */
     @Test
-    @Disabled("IGNITE-20149")
     public void invalidColocationColumns() {
         assertThrowsSqlException(
                 Sql.STMT_VALIDATION_ERR,
-                "Colocation columns must be subset of primary key",
+                "Failed to validate query. Colocation column 'VAL' is not part 
of PK",
                 () -> sql("CREATE TABLE T0(ID0 INT, ID1 INT, VAL INT, PRIMARY 
KEY (ID1, ID0)) COLOCATE (ID0, VAL)")
         );
 
         assertThrowsSqlException(
                 Sql.STMT_VALIDATION_ERR,
-                "Colocation columns contains duplicates: [duplicates=[ID1]]]",
+                "Failed to validate query. Colocation column 'ID1' specified 
more that once",
                 () -> sql("CREATE TABLE T0(ID0 INT, ID1 INT, VAL INT, PRIMARY 
KEY (ID1, ID0)) COLOCATE (ID1, ID0, ID1)")
         );
     }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/lang/SqlExceptionMapperUtil.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/lang/SqlExceptionMapperUtil.java
new file mode 100644
index 0000000000..a36119ccca
--- /dev/null
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/lang/SqlExceptionMapperUtil.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.lang;
+
+import static 
org.apache.ignite.internal.lang.IgniteExceptionMapperUtil.mapToPublicException;
+import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
+
+import org.apache.ignite.lang.ErrorGroups.Common;
+import org.apache.ignite.lang.TraceableException;
+import org.apache.ignite.sql.SqlException;
+
+/**
+ * This utility class provides an ability to map Ignite internal exceptions to 
public SqlException.
+ */
+public class SqlExceptionMapperUtil {
+
+    /**
+     * This method provides a mapping from internal exception to SQL public 
ones.
+     *
+     * <p>The rules of mapping are the following:</p>
+     * <ul>
+     *     <li>any instance of {@link Error} is returned as is, except {@link 
AssertionError}
+     *     that will always be mapped to {@link SqlException} with the {@link 
Common#INTERNAL_ERR} error code.</li>
+     *     <li>any instance of {@link TraceableException} is wrapped into 
{@link SqlException}
+     *         with the original {@link TraceableException#traceId() traceUd} 
and {@link TraceableException#code() code}.</li>
+     *     <li>if there are no any mappers that can do a mapping from the 
given error to a public exception,
+     *     then {@link SqlException} with the {@link Common#INTERNAL_ERR} 
error code is returned.</li>
+     * </ul>
+     *
+     * @param origin Exception to be mapped.
+     * @return Public exception.
+     */
+    public static Throwable mapToPublicSqlException(Throwable origin) {
+        Throwable e = mapToPublicException(origin);
+        if (e instanceof Error) {
+            return e;
+        }
+        if (e instanceof SqlException) {
+            return e;
+        }
+
+        if (e instanceof TraceableException) {
+            TraceableException traceable = (TraceableException) e;
+            return new SqlException(traceable.traceId(), traceable.code(), 
e.getMessage(), e);
+        }
+
+        return new SqlException(INTERNAL_ERR, origin);
+    }
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
index 6810ec8683..f947e6a7af 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
@@ -17,7 +17,7 @@
 
 package org.apache.ignite.internal.sql.api;
 
-import static 
org.apache.ignite.internal.lang.IgniteExceptionMapperUtil.mapToPublicException;
+import static 
org.apache.ignite.internal.lang.SqlExceptionMapperUtil.mapToPublicSqlException;
 import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow;
 import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Sql.SESSION_CLOSED_ERR;
@@ -192,7 +192,7 @@ public class SessionImpl implements AbstractSession {
                             )
             );
         } catch (Exception e) {
-            return CompletableFuture.failedFuture(mapToPublicException(e));
+            return CompletableFuture.failedFuture(mapToPublicSqlException(e));
         } finally {
             busyLock.leaveBusy();
         }
@@ -205,7 +205,7 @@ public class SessionImpl implements AbstractSession {
                 closeInternal();
             }
 
-            throw new CompletionException(mapToPublicException(cause));
+            throw new CompletionException(mapToPublicSqlException(cause));
         });
     }
 
@@ -282,7 +282,7 @@ public class SessionImpl implements AbstractSession {
                             throw (CancellationException) cause;
                         }
 
-                        Throwable t = mapToPublicException(cause);
+                        Throwable t = mapToPublicSqlException(cause);
 
                         if (t instanceof TraceableException) {
                             throw new SqlBatchException(
@@ -305,7 +305,7 @@ public class SessionImpl implements AbstractSession {
 
             return resFut;
         } catch (Exception e) {
-            return CompletableFuture.failedFuture(mapToPublicException(e));
+            return CompletableFuture.failedFuture(mapToPublicSqlException(e));
         } finally {
             busyLock.leaveBusy();
         }
@@ -367,7 +367,7 @@ public class SessionImpl implements AbstractSession {
 
             return qryProc.closeSession(sessionId);
         } catch (Exception e) {
-            return CompletableFuture.failedFuture(mapToPublicException(e));
+            return CompletableFuture.failedFuture(mapToPublicSqlException(e));
         }
     }
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
index eaffd75867..b81c88f74c 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
@@ -19,7 +19,7 @@ package org.apache.ignite.internal.sql.engine;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
-import org.apache.ignite.internal.lang.IgniteExceptionMapperUtil;
+import org.apache.ignite.internal.lang.SqlExceptionMapperUtil;
 import org.apache.ignite.internal.util.AsyncCursor;
 import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.sql.ResultSetMetadata;
@@ -98,6 +98,6 @@ public class AsyncSqlCursorImpl<T> implements 
AsyncSqlCursor<T> {
     private static Throwable wrapIfNecessary(Throwable t) {
         Throwable err = ExceptionUtils.unwrapCause(t);
 
-        return IgniteExceptionMapperUtil.mapToPublicException(err);
+        return SqlExceptionMapperUtil.mapToPublicSqlException(err);
     }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
index f82b8807d5..6bbd0a352c 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
@@ -40,7 +40,7 @@ import org.apache.calcite.sql.SqlExplain;
 import org.apache.calcite.sql.SqlExplainLevel;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlNodeList;
-import org.apache.ignite.internal.lang.IgniteExceptionMapperUtil;
+import org.apache.ignite.internal.lang.SqlExceptionMapperUtil;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.metrics.MetricManager;
@@ -192,7 +192,7 @@ public class PrepareServiceImpl implements PrepareService {
                                 "Planning of a query aborted due to planner 
timeout threshold is reached");
                     }
 
-                    throw new 
CompletionException(IgniteExceptionMapperUtil.mapToPublicException(th));
+                    throw new 
CompletionException(SqlExceptionMapperUtil.mapToPublicSqlException(th));
                 }
         );
     }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/lang/SqlExceptionMapperUtilTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/lang/SqlExceptionMapperUtilTest.java
new file mode 100644
index 0000000000..807909ecdc
--- /dev/null
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/lang/SqlExceptionMapperUtilTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.lang;
+
+import static 
org.apache.ignite.internal.lang.SqlExceptionMapperUtil.mapToPublicSqlException;
+import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
+import static org.apache.ignite.lang.ErrorGroups.Sql.EXECUTION_CANCELLED_ERR;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertSame;
+
+import org.apache.ignite.sql.NoRowSetExpectedException;
+import org.apache.ignite.sql.SqlException;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests mapping internal exceptions to public SqlException.
+ */
+class SqlExceptionMapperUtilTest {
+    /**
+     * Tests a default mapping of internal exceptions passed from the sql 
engine.
+     */
+    @Test
+    public void testSqlInternalExceptionDefaultMapping() {
+        CustomNoMappingException internalSqlErr = new 
CustomNoMappingException(EXECUTION_CANCELLED_ERR);
+        Throwable mappedErr = mapToPublicSqlException(internalSqlErr);
+
+        assertThat(mappedErr, instanceOf(SqlException.class));
+
+        SqlException mappedSqlErr = (SqlException) mappedErr;
+
+        assertThat("Mapped exception should have the same trace identifier.", 
mappedSqlErr.traceId(), is(internalSqlErr.traceId()));
+        assertThat("Mapped exception shouldn't have the same error code.", 
mappedSqlErr.code(), is(INTERNAL_ERR));
+    }
+
+    /**
+     * Tests a default mapping of internal exceptions passed from the sql 
engine.
+     */
+    @Test
+    public void testSqlInternalExceptionDefaultMappingForSqlException() {
+        NoRowSetExpectedException sqlErr = new NoRowSetExpectedException();
+
+        Throwable mappedErr = mapToPublicSqlException(sqlErr);
+
+        assertSame(sqlErr, mappedErr);
+    }
+
+    /**
+     * Test exception.
+     */
+    public static class CustomNoMappingException extends 
IgniteInternalException {
+        /** Serial version UID. */
+        private static final long serialVersionUID = 0L;
+
+        /**
+         * Creates a new instance of CustomNoMappingException with given code.
+         */
+        public CustomNoMappingException(int code) {
+            super(code, "Test internal exception [err=no mapping]");
+        }
+    }
+}
diff --git 
a/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/SqlTestUtils.java
 
b/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/SqlTestUtils.java
index acb0e4f73d..7c7f1e71a6 100644
--- 
a/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/SqlTestUtils.java
+++ 
b/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/SqlTestUtils.java
@@ -34,9 +34,12 @@ import java.time.ZoneId;
 import java.time.ZonedDateTime;
 import java.util.ArrayList;
 import java.util.BitSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.internal.sql.engine.type.UuidType;
@@ -91,6 +94,11 @@ public class SqlTestUtils {
         return ex;
     }
 
+    public static <T> Stream<T> asStream(Iterator<T> sourceIterator) {
+        Iterable<T> iterable = () -> sourceIterator;
+        return StreamSupport.stream(iterable.spliterator(), false);
+    }
+
     /**
      * Convert {@link ColumnType} to string representation of SQL type.
      *

Reply via email to