This is an automated email from the ASF dual-hosted git repository.

ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new a983b2cff7 IGNITE-20388 Sql. Migrate ClusterPerClassIntegrationTest to 
run queries via public API (#2618)
a983b2cff7 is described below

commit a983b2cff78eb1d5448b3694dcfd1dab3d0c4218
Author: ygerzhedovich <41903880+ygerzhedov...@users.noreply.github.com>
AuthorDate: Thu Sep 28 18:24:57 2023 +0300

    IGNITE-20388 Sql. Migrate ClusterPerClassIntegrationTest to run queries via 
public API (#2618)
---
 modules/cli/build.gradle                           |  1 +
 .../internal/cli/CliIntegrationTestBase.java       | 51 +------------
 .../ignite/internal/sql/SyncResultSetAdapter.java  |  2 +-
 .../internal/ClusterPerTestIntegrationTest.java    | 17 +----
 .../internal/sql/api/ItSqlSynchronousApiTest.java  | 10 +--
 .../sql/engine/ClusterPerClassIntegrationTest.java | 19 +----
 .../internal/sql/engine/ItCreateTableDdlTest.java  | 11 +--
 .../internal/sql/engine/ItFunctionsTest.java       | 13 +---
 .../sql/engine/datatypes/DataTypeTestSpecs.java    | 13 +++-
 .../engine/datatypes/tests/BaseDataTypeTest.java   |  4 -
 .../datatypes/tests/BaseDmlDataTypeTest.java       | 12 +--
 .../datatypes/tests/BaseQueryDataTypeTest.java     | 57 +++++++-------
 .../engine/datatypes/tests/DataTypeTestSpec.java   | 11 ++-
 .../datatypes/varbinary/ItVarBinaryDmlTest.java    |  6 +-
 .../varbinary/ItVarBinaryExpressionTest.java       | 43 ++++++-----
 .../datatypes/varbinary/ItVarBinaryIndexTest.java  |  4 +-
 .../sql/engine/util/TestQueryProcessor.java        | 86 ----------------------
 .../internal/sql/engine/SqlQueryProcessor.java     | 12 ++-
 .../internal/sql/engine/util/SqlTestUtils.java     | 45 +++++++++++
 19 files changed, 161 insertions(+), 256 deletions(-)

diff --git a/modules/cli/build.gradle b/modules/cli/build.gradle
index 32c06fe66e..b0461a7f60 100644
--- a/modules/cli/build.gradle
+++ b/modules/cli/build.gradle
@@ -102,6 +102,7 @@ dependencies {
     integrationTestImplementation testFixtures(project(":ignite-core"))
     integrationTestImplementation testFixtures(project(":ignite-schema"))
     integrationTestImplementation testFixtures(project(":ignite-api"))
+    integrationTestImplementation testFixtures(project(":ignite-sql-engine"))
     integrationTestImplementation libs.jetbrains.annotations
     integrationTestImplementation libs.micronaut.picocli
     integrationTestImplementation libs.mock.server.netty
diff --git 
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTestBase.java
 
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTestBase.java
index ac8cb7e48f..8d822429cf 100644
--- 
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTestBase.java
+++ 
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTestBase.java
@@ -17,25 +17,14 @@
 
 package org.apache.ignite.internal.cli;
 
-import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
-
 import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
 import java.io.PrintWriter;
 import java.io.Writer;
-import java.util.ArrayList;
 import java.util.List;
-import java.util.function.Consumer;
-import org.apache.ignite.internal.app.IgniteImpl;
-import org.apache.ignite.internal.sql.engine.QueryContext;
-import org.apache.ignite.internal.sql.engine.QueryProcessor;
-import org.apache.ignite.internal.sql.engine.SqlQueryType;
-import org.apache.ignite.internal.sql.engine.property.PropertiesHelper;
-import org.apache.ignite.internal.sql.engine.session.SessionId;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.sql.engine.util.SqlTestUtils;
 import org.apache.ignite.internal.testframework.IntegrationTestBase;
-import org.apache.ignite.internal.util.AsyncCursor;
-import org.apache.ignite.internal.util.AsyncCursor.BatchedResult;
 import org.apache.ignite.table.Table;
-import org.apache.ignite.tx.IgniteTransactions;
 import org.apache.ignite.tx.Transaction;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.TestInstance;
@@ -90,41 +79,9 @@ public abstract class CliIntegrationTestBase extends 
IntegrationTestBase {
     }
 
     protected static List<List<Object>> sql(@Nullable Transaction tx, String 
sql, Object... args) {
-        QueryProcessor queryEngine = ((IgniteImpl) 
CLUSTER_NODES.get(0)).queryEngine();
-        IgniteTransactions transactions = CLUSTER_NODES.get(0).transactions();
-
-        SessionId sessionId = 
queryEngine.createSession(PropertiesHelper.emptyHolder());
-
-        try {
-            var context = QueryContext.create(SqlQueryType.ALL, tx);
-
-            return getAllFromCursor(
-                    await(queryEngine.querySingleAsync(sessionId, context, 
transactions, sql, args))
-            );
-        } finally {
-            queryEngine.closeSession(sessionId);
-        }
-    }
-
-    private static <T> List<T> getAllFromCursor(AsyncCursor<T> cur) {
-        List<T> res = new ArrayList<>();
-        int batchSize = 256;
-
-        var consumer = new Consumer<BatchedResult<T>>() {
-            @Override
-            public void accept(BatchedResult<T> br) {
-                res.addAll(br.items());
-
-                if (br.hasMore()) {
-                    cur.requestNextAsync(batchSize).thenAccept(this);
-                }
-            }
-        };
-
-        await(cur.requestNextAsync(batchSize).thenAccept(consumer));
-        await(cur.closeAsync());
+        Ignite ignite = CLUSTER_NODES.get(0);
 
-        return res;
+        return SqlTestUtils.sql(ignite, tx, sql, args);
     }
 
     protected static PrintWriter output(List<Character> buffer) {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/sql/SyncResultSetAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/sql/SyncResultSetAdapter.java
index faf9f4b19b..0854513ad3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/sql/SyncResultSetAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/sql/SyncResultSetAdapter.java
@@ -89,7 +89,7 @@ class SyncResultSetAdapter<T> implements ResultSet<T> {
     @Override
     public boolean hasNext() {
         if (it == null) {
-            throw new NoRowSetExpectedException();
+            return false;
         }
 
         return it.hasNext();
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
index 4e71cc6ed5..df0d863f02 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
@@ -17,20 +17,13 @@
 
 package org.apache.ignite.internal;
 
-import static 
org.apache.ignite.internal.sql.engine.util.CursorUtils.getAllFromCursor;
-
 import java.nio.file.Path;
 import java.util.List;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.sql.engine.QueryContext;
-import org.apache.ignite.internal.sql.engine.QueryProcessor;
-import org.apache.ignite.internal.sql.engine.SqlQueryType;
-import org.apache.ignite.internal.sql.engine.property.PropertiesHelper;
-import org.apache.ignite.internal.sql.engine.session.SessionId;
-import org.apache.ignite.internal.sql.engine.util.TestQueryProcessor;
+import org.apache.ignite.internal.sql.engine.util.SqlTestUtils;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -192,12 +185,8 @@ public abstract class ClusterPerTestIntegrationTest 
extends IgniteIntegrationTes
     }
 
     protected final List<List<Object>> executeSql(String sql, Object... args) {
-        QueryProcessor qryProc = new TestQueryProcessor(node(0));
-        SessionId sessionId = 
qryProc.createSession(PropertiesHelper.emptyHolder());
-        QueryContext context = QueryContext.create(SqlQueryType.ALL);
+        IgniteImpl ignite = node(0);
 
-        return getAllFromCursor(
-                qryProc.querySingleAsync(sessionId, context, 
node(0).transactions(), sql, args).join()
-        );
+        return SqlTestUtils.sql(ignite, null, sql, args);
     }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
index b563532a3c..e0cda115b0 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.sql.api;
 
 import static 
org.apache.ignite.internal.sql.api.ItSqlAsynchronousApiTest.assertThrowsPublicException;
 import static 
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException;
-import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -54,7 +53,6 @@ import org.apache.ignite.sql.SqlException;
 import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.table.Table;
 import org.apache.ignite.tx.Transaction;
-import org.hamcrest.MatcherAssert;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
@@ -399,19 +397,19 @@ public class ItSqlSynchronousApiTest extends 
ClusterPerClassIntegrationTest {
         IntStream.range(0, ROW_COUNT).forEach(i -> assertEquals(i, 
res.get(i).get(0)));
 
         // Check invalid query type
-        SqlException ex = assertThrowsSqlException(
+        assertThrowsSqlException(
+                SqlBatchException.class,
                 Sql.STMT_VALIDATION_ERR,
                 "Invalid SQL statement type",
                 () -> ses.executeBatch(null, "SELECT * FROM TEST", args)
         );
-        MatcherAssert.assertThat(ex, instanceOf(SqlBatchException.class));
 
-        ex = assertThrowsSqlException(
+        assertThrowsSqlException(
+                SqlBatchException.class,
                 Sql.STMT_VALIDATION_ERR,
                 "Invalid SQL statement type",
                 () -> ses.executeBatch(null, "CREATE TABLE TEST1(ID INT 
PRIMARY KEY, VAL0 INT)", args)
         );
-        MatcherAssert.assertThat(ex, instanceOf(SqlBatchException.class));
     }
 
     @Test
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
index e196bf94ea..8ed419cb29 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.sql.engine;
 
 import static java.util.stream.Collectors.toList;
-import static 
org.apache.ignite.internal.sql.engine.util.CursorUtils.getAllFromCursor;
 import static org.apache.ignite.internal.table.TableTestUtils.getTable;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
@@ -54,13 +53,11 @@ import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
-import org.apache.ignite.internal.sql.engine.property.PropertiesHelper;
-import org.apache.ignite.internal.sql.engine.session.SessionId;
 import org.apache.ignite.internal.sql.engine.util.InjectQueryCheckerFactory;
 import org.apache.ignite.internal.sql.engine.util.QueryChecker;
 import org.apache.ignite.internal.sql.engine.util.QueryCheckerExtension;
 import org.apache.ignite.internal.sql.engine.util.QueryCheckerFactory;
-import org.apache.ignite.internal.sql.engine.util.TestQueryProcessor;
+import org.apache.ignite.internal.sql.engine.util.SqlTestUtils;
 import org.apache.ignite.internal.storage.index.IndexStorage;
 import org.apache.ignite.internal.storage.index.StorageIndexDescriptor;
 import org.apache.ignite.internal.table.InternalTable;
@@ -414,19 +411,9 @@ public abstract class ClusterPerClassIntegrationTest 
extends IgniteIntegrationTe
     }
 
     protected static List<List<Object>> sql(@Nullable Transaction tx, String 
sql, Object... args) {
-        var queryEngine = new TestQueryProcessor(CLUSTER_NODES.get(0));
+        Ignite ignite = CLUSTER_NODES.get(0);
 
-        SessionId sessionId = 
queryEngine.createSession(PropertiesHelper.emptyHolder());
-
-        try {
-            var context = QueryContext.create(SqlQueryType.ALL, tx);
-
-            return getAllFromCursor(
-                    await(queryEngine.querySingleAsync(sessionId, context, 
CLUSTER_NODES.get(0).transactions(), sql, args))
-            );
-        } finally {
-            queryEngine.closeSession(sessionId);
-        }
+        return SqlTestUtils.sql(ignite, tx, sql, args);
     }
 
     protected static void checkMetadata(ColumnMetadata expectedMeta, 
ColumnMetadata actualMeta) {
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCreateTableDdlTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCreateTableDdlTest.java
index d03d2255a4..6063707a6e 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCreateTableDdlTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCreateTableDdlTest.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.sql.engine;
 
 import static 
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException;
 import static org.apache.ignite.internal.table.TableTestUtils.getTableStrict;
-import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.hasSize;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -27,7 +26,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 
 import java.util.List;
 import org.apache.ignite.internal.app.IgniteImpl;
-import org.apache.ignite.internal.catalog.CatalogValidationException;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.lang.ErrorGroups.Sql;
@@ -91,11 +89,10 @@ public class ItCreateTableDdlTest extends 
ClusterPerClassIntegrationTest {
                 () -> sql("CREATE TABLE T0()")
         );
 
-        // TODO: IGNITE-20388 Fix it
-        assertThrows(
-                CatalogValidationException.class,
-                () -> sql("CREATE TABLE T0(ID0 INT PRIMARY KEY, ID1 INT, ID0 
INT)"),
-                "Column with name 'ID0' specified more than once"
+        assertThrowsSqlException(
+                Sql.STMT_VALIDATION_ERR,
+                "Column with name 'ID0' specified more than once",
+                () -> sql("CREATE TABLE T0(ID0 INT PRIMARY KEY, ID1 INT, ID0 
INT)")
         );
     }
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItFunctionsTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItFunctionsTest.java
index 4924989ff2..04e74d22c5 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItFunctionsTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItFunctionsTest.java
@@ -130,21 +130,10 @@ public class ItFunctionsTest extends 
ClusterPerClassIntegrationTest {
 
         assertEquals(0, sql("SELECT * FROM table(system_range(null, 
1))").size());
 
-        IgniteException ex = assertThrowsSqlException(
+        assertThrowsSqlException(
                 Sql.RUNTIME_ERR,
                 "Increment can't be 0",
                 () -> sql("SELECT * FROM table(system_range(1, 1, 0))"));
-
-        assertTrue(
-                ex.getCause() instanceof IllegalArgumentException,
-                format(
-                        "Expected cause is {}, but was {}",
-                        IllegalArgumentException.class.getSimpleName(),
-                        ex.getCause() == null ? null : 
ex.getCause().getClass().getSimpleName()
-                )
-        );
-
-        assertEquals("Increment can't be 0", ex.getCause().getMessage());
     }
 
     @Test
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/datatypes/DataTypeTestSpecs.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/datatypes/DataTypeTestSpecs.java
index fa63b3e98b..2f5b1a2f74 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/datatypes/DataTypeTestSpecs.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/datatypes/DataTypeTestSpecs.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.sql.engine.datatypes;
 
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
-import static org.apache.ignite.internal.sql.engine.util.VarBinary.varBinary;
 
 import com.google.common.io.BaseEncoding;
 import java.nio.charset.StandardCharsets;
@@ -112,7 +111,17 @@ public final class DataTypeTestSpecs {
         /** {@inheritDoc} */
         @Override
         public VarBinary wrapIfNecessary(Object storageValue) {
-            return varBinary((byte[]) storageValue);
+            return VarBinary.varBinary((byte[]) storageValue);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public byte[] unwrapIfNecessary(VarBinary value) {
+            if (value == null) {
+                return null;
+            }
+
+            return value.get();
         }
 
         /** {@inheritDoc} */
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/datatypes/tests/BaseDataTypeTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/datatypes/tests/BaseDataTypeTest.java
index dbeeb35e4d..93563e2ae2 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/datatypes/tests/BaseDataTypeTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/datatypes/tests/BaseDataTypeTest.java
@@ -32,7 +32,6 @@ import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.sql.engine.util.NativeTypeWrapper;
 import org.apache.ignite.internal.sql.engine.util.QueryChecker;
 import org.apache.ignite.internal.sql.engine.util.QueryChecker.QueryTemplate;
-import org.apache.ignite.internal.sql.engine.util.TestQueryProcessor;
 import org.apache.ignite.sql.ColumnType;
 import org.apache.ignite.sql.ResultSetMetadata;
 import org.junit.jupiter.api.BeforeAll;
@@ -65,9 +64,6 @@ import org.junit.jupiter.params.provider.Arguments;
  * <p>In order to test non-comparable types (e.g. java arrays) values of those 
types must be passed as {@link NativeTypeWrapper}.
  * In that case {@code T} must be an implementation of a {@link 
NativeTypeWrapper} for that type.
  *
- * <p>Helper methods such as {@link #runSql(String, Object...)} and {@link 
#checkQuery(String)} support those values and unwrap them
- * when it is necessary. See {@link TestQueryProcessor}.
- *
  * @param <T> A storage type of a data type.
  */
 public abstract class BaseDataTypeTest<T extends Comparable<T>> extends 
ClusterPerClassIntegrationTest {
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/datatypes/tests/BaseDmlDataTypeTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/datatypes/tests/BaseDmlDataTypeTest.java
index 4939cfb481..5bd10e8e0d 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/datatypes/tests/BaseDmlDataTypeTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/datatypes/tests/BaseDmlDataTypeTest.java
@@ -45,13 +45,13 @@ public abstract class BaseDmlDataTypeTest<T extends 
Comparable<T>> extends BaseD
     /** {@code DELETE} by key. */
     @Test
     public void testDelete() {
-        T value1 = values.get(0);
+        Object value = testTypeSpec.unwrapIfNecessary(values.get(0));
 
         runSql("INSERT INTO t VALUES (1, $0)");
         runSql("INSERT INTO t VALUES (2, $1)");
         runSql("INSERT INTO t VALUES (3, $2)");
 
-        runSql("DELETE FROM t WHERE test_key=?", value1);
+        runSql("DELETE FROM t WHERE test_key=?", value);
 
         checkQuery("SELECT id FROM t").returns(2).returns(3).check();
     }
@@ -75,15 +75,17 @@ public abstract class BaseDmlDataTypeTest<T extends 
Comparable<T>> extends BaseD
     /** {@code UPDATE} from a dynamic parameter. */
     @Test
     public void testUpdateFromDynamicParam() {
-        runSql("INSERT INTO t VALUES (1, ?)", dataSamples.min());
+        runSql("INSERT INTO t VALUES (1, ?)", 
testTypeSpec.unwrapIfNecessary(dataSamples.min()));
+
+        Object max = testTypeSpec.unwrapIfNecessary(dataSamples.max());
 
         checkQuery("UPDATE t SET test_key = ? WHERE id=1")
-                .withParams(dataSamples.max())
+                .withParams(max)
                 .returns(1L)
                 .check();
 
         checkQuery("SELECT test_key FROM t WHERE id=1")
-                .returns(dataSamples.max())
+                .returns(max)
                 .check();
     }
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/datatypes/tests/BaseQueryDataTypeTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/datatypes/tests/BaseQueryDataTypeTest.java
index 60f8928b4a..be4146f43d 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/datatypes/tests/BaseQueryDataTypeTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/datatypes/tests/BaseQueryDataTypeTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.sql.engine.datatypes.tests;
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 
 import java.util.stream.Stream;
+import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -54,8 +55,8 @@ public abstract class BaseQueryDataTypeTest<T extends 
Comparable<T>> extends Bas
     @ParameterizedTest
     @MethodSource("eq")
     public void testEqCondition(TestTypeArguments<T> arguments) {
-        T value1 = values.get(0);
-        T value2 = values.get(1);
+        Object value1 = testTypeSpec.unwrapIfNecessary(values.get(0));
+        Object value2 = testTypeSpec.unwrapIfNecessary(values.get(1));
 
         runSql("INSERT INTO t VALUES(1, ?)", value1);
         runSql("INSERT INTO t VALUES(2, ?)", value2);
@@ -76,8 +77,8 @@ public abstract class BaseQueryDataTypeTest<T extends 
Comparable<T>> extends Bas
     /** Test for equality predicate with dynamic parameter. */
     @Test
     public void testEqConditionDynamicParam() {
-        T value1 = values.get(0);
-        T value2 = values.get(1);
+        Object value1 = unwrap(values.get(0));
+        Object value2 = unwrap(values.get(1));
 
         runSql("INSERT INTO t VALUES(1, ?)", value1);
         runSql("INSERT INTO t VALUES(2, ?)", value2);
@@ -183,9 +184,9 @@ public abstract class BaseQueryDataTypeTest<T extends 
Comparable<T>> extends Bas
     @ParameterizedTest
     @MethodSource("between")
     public void testBetweenCondition(TestTypeArguments<T> arguments) {
-        T min = orderedValues.first();
-        T mid = orderedValues.higher(min);
-        T max = orderedValues.last();
+        Object min = unwrap(orderedValues.first());
+        Object mid = unwrap(orderedValues.higher(orderedValues.first()));
+        Object max = unwrap(orderedValues.last());
 
         runSql("INSERT INTO t VALUES(1, ?)", min);
         runSql("INSERT INTO t VALUES(2, ?)", mid);
@@ -216,9 +217,9 @@ public abstract class BaseQueryDataTypeTest<T extends 
Comparable<T>> extends Bas
     @ParameterizedTest
     @MethodSource("distinctFrom")
     public void testIsNotDistinctFrom(TestTypeArguments<T> arguments) {
-        T value1 = values.get(0);
-        T value2 = values.get(1);
-        T value3 = values.get(2);
+        Object value1 = unwrap(values.get(0));
+        Object value2 = unwrap(values.get(1));
+        Object value3 = unwrap(values.get(2));
 
         runSql("INSERT INTO t VALUES(1, ?)", value1);
         runSql("INSERT INTO t VALUES(2, ?)", value2);
@@ -236,9 +237,9 @@ public abstract class BaseQueryDataTypeTest<T extends 
Comparable<T>> extends Bas
     @ParameterizedTest
     @MethodSource("distinctFrom")
     public void 
testIsNotDistinctFromWithDynamicParameters(TestTypeArguments<T> arguments) {
-        T value1 = values.get(0);
-        T value2 = values.get(1);
-        T value3 = values.get(2);
+        Object value1 = unwrap(values.get(0));
+        Object value2 = unwrap(values.get(1));
+        Object value3 = unwrap(values.get(2));
 
         runSql("INSERT INTO t VALUES(1, ?)", value1);
         runSql("INSERT INTO t VALUES(2, ?)", value2);
@@ -254,9 +255,9 @@ public abstract class BaseQueryDataTypeTest<T extends 
Comparable<T>> extends Bas
     @ParameterizedTest
     @MethodSource("distinctFrom")
     public void testIsDistinctFrom(TestTypeArguments<T> arguments) {
-        T value1 = values.get(0);
-        T value2 = values.get(1);
-        T value3 = values.get(2);
+        Object value1 = unwrap(values.get(0));
+        Object value2 = unwrap(values.get(1));
+        Object value3 = unwrap(values.get(2));
 
         runSql("INSERT INTO t VALUES(1, ?)", value1);
         runSql("INSERT INTO t VALUES(2, ?)", value2);
@@ -277,9 +278,9 @@ public abstract class BaseQueryDataTypeTest<T extends 
Comparable<T>> extends Bas
     /** Ascending ordering.*/
     @Test
     public void testAscOrdering() {
-        T min = orderedValues.first();
-        T mid = orderedValues.higher(min);
-        T max = orderedValues.last();
+        Object min = unwrap(orderedValues.first());
+        Object mid = unwrap(orderedValues.higher(orderedValues.first()));
+        Object max = unwrap(orderedValues.last());
 
         runSql("INSERT INTO t VALUES(1, ?)", min);
         runSql("INSERT INTO t VALUES(2, ?)", mid);
@@ -295,9 +296,9 @@ public abstract class BaseQueryDataTypeTest<T extends 
Comparable<T>> extends Bas
     /** Descending ordering. */
     @Test
     public void testDescOrdering() {
-        T min = orderedValues.first();
-        T mid = orderedValues.higher(min);
-        T max = orderedValues.last();
+        Object min = unwrap(orderedValues.first());
+        Object mid = unwrap(orderedValues.higher(orderedValues.first()));
+        Object max = unwrap(orderedValues.last());
 
         runSql("INSERT INTO t VALUES(1, ?)", min);
         runSql("INSERT INTO t VALUES(2, ?)", mid);
@@ -312,8 +313,8 @@ public abstract class BaseQueryDataTypeTest<T extends 
Comparable<T>> extends Bas
     public void testFilter(TestTypeArguments<T> arguments) {
         String query = format("SELECT id FROM t WHERE t.test_key > {}", 
arguments.valueExpr(0));
 
-        T value1 = orderedValues.first();
-        T value2 = orderedValues.last();
+        Object value1 = unwrap(orderedValues.first());
+        Object value2 = unwrap(orderedValues.last());
 
         runSql("INSERT INTO t VALUES(1, ?)", value1);
         runSql("INSERT INTO t VALUES(2, ?)", value2);
@@ -329,8 +330,8 @@ public abstract class BaseQueryDataTypeTest<T extends 
Comparable<T>> extends Bas
     public void testFilterWithDynamicParameters(TestTypeArguments<T> 
arguments) {
         String query = format("SELECT id FROM t WHERE t.test_key > ?");
 
-        T value1 = orderedValues.first();
-        T value2 = orderedValues.last();
+        Object value1 = unwrap(orderedValues.first());
+        Object value2 = unwrap(orderedValues.last());
 
         runSql("INSERT INTO t VALUES(1, ?)", value1);
         runSql("INSERT INTO t VALUES(2, ?)", value2);
@@ -344,4 +345,8 @@ public abstract class BaseQueryDataTypeTest<T extends 
Comparable<T>> extends Bas
     private Stream<TestTypeArguments<T>> filter() {
         return TestTypeArguments.unary(testTypeSpec, dataSamples, 
dataSamples.min());
     }
+
+    protected Object unwrap(@Nullable T value) {
+        return testTypeSpec.unwrapIfNecessary(value);
+    }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/datatypes/tests/DataTypeTestSpec.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/datatypes/tests/DataTypeTestSpec.java
index 2dfe70a0d1..d9bce24291 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/datatypes/tests/DataTypeTestSpec.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/datatypes/tests/DataTypeTestSpec.java
@@ -101,9 +101,18 @@ public abstract class DataTypeTestSpec<T extends 
Comparable<T>> {
     /**
      * Wraps original {@link NativeType} into a {@link NativeTypeWrapper 
comparable wrapper}.
      * If storage type of this data type is {@link Comparable} then this 
method must return {@code null}.
-     * */
+     */
     public abstract T wrapIfNecessary(Object storageValue);
 
+    /**
+     * Unwraps {@link NativeTypeWrapper comparable wrapper} into a {@link 
NativeType}.
+     *
+     * <p>If passed values are not wrapped, then this method should return 
original instance.
+     */
+    public Object unwrapIfNecessary(T value) {
+        return value;
+    }
+
     /** Creates {@link TestDataSamples test samples} for the given type. */
     public abstract TestDataSamples<T> createSamples(IgniteTypeFactory 
typeFactory);
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/datatypes/varbinary/ItVarBinaryDmlTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/datatypes/varbinary/ItVarBinaryDmlTest.java
index e8d58cd117..b419d02982 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/datatypes/varbinary/ItVarBinaryDmlTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/datatypes/varbinary/ItVarBinaryDmlTest.java
@@ -35,7 +35,7 @@ public class ItVarBinaryDmlTest extends 
BaseDmlDataTypeTest<VarBinary> {
         runSql("CREATE TABLE t_def (id INT PRIMARY KEY, test_key VARBINARY 
DEFAULT $0)");
         runSql("INSERT INTO t_def (id) VALUES (0)");
 
-        VarBinary value = values.get(0);
+        byte[] value = values.get(0).get();
         checkQuery("SELECT test_key FROM t_def WHERE id=0")
                 .returns(value)
                 .check();
@@ -44,7 +44,7 @@ public class ItVarBinaryDmlTest extends 
BaseDmlDataTypeTest<VarBinary> {
     /** {@code INSERT} an empty varbinary. */
     @Test
     public void testEmptyVarBinary() {
-        VarBinary value = VarBinary.fromBytes(new byte[0]);
+        byte[] value = new byte[0];
 
         runSql("INSERT INTO t VALUES (1, ?)", value);
 
@@ -61,7 +61,7 @@ public class ItVarBinaryDmlTest extends 
BaseDmlDataTypeTest<VarBinary> {
         runSql("INSERT INTO t VALUES (1, x'AABBCC')");
 
         checkQuery("SELECT test_key FROM t WHERE id = 1")
-                .returns(VarBinary.fromBytes(value))
+                .returns(value)
                 .check();
     }
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/datatypes/varbinary/ItVarBinaryExpressionTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/datatypes/varbinary/ItVarBinaryExpressionTest.java
index d4ae9c70c7..bbc77d2b7d 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/datatypes/varbinary/ItVarBinaryExpressionTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/datatypes/varbinary/ItVarBinaryExpressionTest.java
@@ -17,8 +17,6 @@
 
 package org.apache.ignite.internal.sql.engine.datatypes.varbinary;
 
-import static org.apache.ignite.internal.sql.engine.util.VarBinary.varBinary;
-
 import java.math.BigDecimal;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.sql.validate.SqlValidatorException;
@@ -43,7 +41,7 @@ public class ItVarBinaryExpressionTest extends 
BaseExpressionDataTypeTest<VarBin
     @Test
     public void testBitStringLiteral() {
         checkQuery("SELECT x'010203'")
-                .returns(varBinary(new byte[]{1, 2, 3}))
+                .returns(new byte[]{1, 2, 3})
                 .check();
     }
 
@@ -59,17 +57,17 @@ public class ItVarBinaryExpressionTest extends 
BaseExpressionDataTypeTest<VarBin
     @Test
     public void testPositionExpressionWithDynamicParameter() {
         checkQuery("SELECT POSITION (? IN x'010203')")
-                .withParams(varBinary(new byte[]{2}))
+                .withParams(new byte[]{2})
                 .returns(2)
                 .check();
 
         checkQuery("SELECT POSITION (x'02' IN ?)")
-                .withParams(varBinary(new byte[]{1, 2, 3}))
+                .withParams(new byte[]{1, 2, 3})
                 .returns(2)
                 .check();
 
         checkQuery("SELECT POSITION (? IN ?)")
-                .withParams(varBinary(new byte[]{2}), varBinary(new byte[]{1, 
2, 3}))
+                .withParams(new byte[]{2}, new byte[]{1, 2, 3})
                 .returns(2)
                 .check();
     }
@@ -88,19 +86,19 @@ public class ItVarBinaryExpressionTest extends 
BaseExpressionDataTypeTest<VarBin
     @Test
     public void testLengthExpressionWithDynamicParameter() {
         checkQuery("SELECT OCTET_LENGTH(?)")
-                .withParams(varBinary(new byte[]{1, 2, 3}))
+                .withParams(new byte[]{1, 2, 3})
                 .returns(3).check();
 
         checkQuery("SELECT OCTET_LENGTH(?)")
-                .withParams(varBinary(new byte[0]))
+                .withParams(new byte[0])
                 .returns(0).check();
 
         checkQuery("SELECT LENGTH(?)")
-                .withParams(varBinary(new byte[]{1, 2, 3}))
+                .withParams(new byte[]{1, 2, 3})
                 .returns(3).check();
 
         checkQuery("SELECT LENGTH(?)")
-                .withParams(varBinary(new byte[0]))
+                .withParams(new byte[0])
                 .returns(0).check();
     }
 
@@ -118,15 +116,15 @@ public class ItVarBinaryExpressionTest extends 
BaseExpressionDataTypeTest<VarBin
     @Test
     public void testCastToDifferentLengths() {
         checkQuery("SELECT CAST(X'ffffff' AS VARBINARY(2))")
-                .returns((varBinary(new byte[]{(byte) 0xfff, (byte) 0xff})))
+                .returns(new byte[]{(byte) 0xfff, (byte) 0xff})
                 .check();
 
         checkQuery("SELECT CAST(X'ffffff' AS VARBINARY(100))")
-                .returns((varBinary(new byte[]{(byte) 0xfff, (byte) 0xff, 
(byte) 0xff})))
+                .returns(new byte[]{(byte) 0xfff, (byte) 0xff, (byte) 0xff})
                 .check();
 
         checkQuery("SELECT CAST(X'ffffff' AS VARBINARY)")
-                .returns((varBinary(new byte[]{(byte) 0xfff, (byte) 0xff, 
(byte) 0xff})))
+                .returns(new byte[]{(byte) 0xfff, (byte) 0xff, (byte) 0xff})
                 .check();
     }
 
@@ -140,17 +138,17 @@ public class ItVarBinaryExpressionTest extends 
BaseExpressionDataTypeTest<VarBin
 
         checkQuery("SELECT CAST(? AS VARBINARY(2))")
                 .withParam(param)
-                .returns(varBinary(result))
+                .returns(result)
                 .check();
 
         checkQuery("SELECT CAST(? AS VARBINARY(100))")
                 .withParam(param)
-                .returns(varBinary(param))
+                .returns(param)
                 .check();
 
         checkQuery("SELECT CAST(? AS VARBINARY)")
                 .withParam(param)
-                .returns(varBinary(param))
+                .returns(param)
                 .check();
     }
 
@@ -161,7 +159,7 @@ public class ItVarBinaryExpressionTest extends 
BaseExpressionDataTypeTest<VarBin
         runSql("INSERT INTO t VALUES (1, x'010203')");
 
         checkQuery("SELECT test_key || x'040506' FROM t")
-                .returns(varBinary(new byte[]{1, 2, 3, 4, 5, 6}))
+                .returns(new byte[]{1, 2, 3, 4, 5, 6})
                 .check();
     }
 
@@ -171,20 +169,21 @@ public class ItVarBinaryExpressionTest extends 
BaseExpressionDataTypeTest<VarBin
         runSql("INSERT INTO t VALUES (1, x'010203')");
 
         checkQuery("SELECT test_key || ? FROM t WHERE id = 1")
-                .withParam(varBinary(new byte[]{4, 5, 6}))
-                .returns(varBinary(new byte[]{1, 2, 3, 4, 5, 6}))
+                .withParam(new byte[]{4, 5, 6})
+                .returns(new byte[]{1, 2, 3, 4, 5, 6})
                 .check();
     }
 
     /** Concatenation of dynamic parameters. */
     @Test
     public void testConcatBetweenDynamicParameters() {
-        VarBinary v1 = varBinary(new byte[]{1, 2, 3});
-        VarBinary v2 = varBinary(new byte[]{4, 5, 6});
+        byte[] v1 = {1, 2, 3};
+        byte[] v2 = {4, 5, 6};
+        byte[] result = {1, 2, 3, 4, 5, 6};
 
         checkQuery("SELECT ? || ?")
                 .withParams(v1, v2)
-                .returns(varBinary(new byte[]{1, 2, 3, 4, 5, 6}))
+                .returns(result)
                 .check();
     }
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/datatypes/varbinary/ItVarBinaryIndexTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/datatypes/varbinary/ItVarBinaryIndexTest.java
index b6cc251942..19c56db7a3 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/datatypes/varbinary/ItVarBinaryIndexTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/datatypes/varbinary/ItVarBinaryIndexTest.java
@@ -59,8 +59,8 @@ public class ItVarBinaryIndexTest extends 
BaseIndexDataTypeTest<VarBinary> {
     @ParameterizedTest
     @MethodSource("indexChecks")
     public void testKeyLookUp2(String table, ValueMode mode) {
-        VarBinary value1 = values.get(0);
-        String value1str = mode.toSql(testTypeSpec, value1);
+        byte[] value1 = values.get(0).get();
+        String value1str = mode.toSql(testTypeSpec, 
VarBinary.varBinary(value1));
 
         // TODO Disable for VARBINARY, remove after 
https://issues.apache.org/jira/browse/IGNITE-19931 is fixed
         Assumptions.assumeFalse(mode == ValueMode.CAST);
diff --git 
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/TestQueryProcessor.java
 
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/TestQueryProcessor.java
deleted file mode 100644
index b78178768a..0000000000
--- 
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/TestQueryProcessor.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.sql.engine.util;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.internal.app.IgniteImpl;
-import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
-import org.apache.ignite.internal.sql.engine.QueryContext;
-import org.apache.ignite.internal.sql.engine.QueryProcessor;
-import org.apache.ignite.internal.sql.engine.property.PropertiesHolder;
-import org.apache.ignite.internal.sql.engine.session.SessionId;
-import org.apache.ignite.internal.sql.engine.session.SessionInfo;
-import org.apache.ignite.tx.IgniteTransactions;
-
-/**
- * {@link QueryProcessor} that handles test {@link NativeTypeWrapper native 
type wrappers} .
- */
-public final class TestQueryProcessor implements QueryProcessor {
-
-    private final QueryProcessor queryProcessor;
-
-    public TestQueryProcessor(Ignite ignite) {
-        this.queryProcessor = ((IgniteImpl) ignite).queryEngine();
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public void start() {
-        queryProcessor.start();
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public void stop() throws Exception {
-        queryProcessor.stop();
-    }
-
-    @Override
-    public SessionId createSession(PropertiesHolder properties) {
-        return queryProcessor.createSession(properties);
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public CompletableFuture<Void> closeSession(SessionId sessionId) {
-        return queryProcessor.closeSession(sessionId);
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public List<SessionInfo> liveSessions() {
-        return queryProcessor.liveSessions();
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public CompletableFuture<AsyncSqlCursor<List<Object>>> querySingleAsync(
-            SessionId sessionId,
-            QueryContext context,
-            IgniteTransactions transactions,
-            String qry,
-            Object... params
-    ) {
-        Object[] unwrappedParams = 
Arrays.stream(params).map(NativeTypeWrapper::unwrap).toArray();
-
-        return queryProcessor.querySingleAsync(sessionId, context, 
transactions, qry, unwrappedParams);
-    }
-}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 5e72e256aa..68413b652f 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -30,6 +30,7 @@ import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 import java.util.function.LongFunction;
@@ -459,6 +460,8 @@ public class SqlQueryProcessor implements QueryProcessor {
                 plan.metadata(),
                 txWrapper,
                 new AsyncCursor<>() {
+                    private AtomicBoolean finished = new AtomicBoolean(false);
+
                     @Override
                     public CompletableFuture<BatchedResult<List<Object>>> 
requestNextAsync(int rows) {
                         session.touch();
@@ -469,9 +472,14 @@ public class SqlQueryProcessor implements QueryProcessor {
                     @Override
                     public CompletableFuture<Void> closeAsync() {
                         session.touch();
-                        numberOfOpenCursors.decrementAndGet();
 
-                        return dataCursor.closeAsync();
+                        if (finished.compareAndSet(false, true)) {
+                            numberOfOpenCursors.decrementAndGet();
+
+                            return dataCursor.closeAsync();
+                        } else {
+                            return CompletableFuture.completedFuture(null);
+                        }
                     }
                 }
         );
diff --git 
a/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/SqlTestUtils.java
 
b/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/SqlTestUtils.java
index 9a5b901e96..b5d5740343 100644
--- 
a/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/SqlTestUtils.java
+++ 
b/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/SqlTestUtils.java
@@ -32,13 +32,21 @@ import java.time.LocalTime;
 import java.time.Period;
 import java.time.ZoneId;
 import java.time.ZonedDateTime;
+import java.util.ArrayList;
 import java.util.BitSet;
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.ignite.Ignite;
 import org.apache.ignite.internal.sql.engine.type.UuidType;
 import org.apache.ignite.sql.ColumnType;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.Session;
 import org.apache.ignite.sql.SqlException;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.function.Executable;
 
 /**
@@ -197,4 +205,41 @@ public class SqlTestUtils {
                 throw new IllegalArgumentException("unsupported type " + type);
         }
     }
+
+    /**
+     * Run SQL on given Ignite instance with given transaction and parameters.
+     *
+     * @param ignite Ignite instance to run a query.
+     * @param tx Transaction to run a given query. Can be {@code null} to run 
within implicit transaction.
+     * @param sql Query to be run.
+     * @param args Dynamic parameters for a given query.
+     * @return List of lists, where outer list represents a rows, internal 
lists represents a columns.
+     */
+    public static List<List<Object>> sql(Ignite ignite, @Nullable Transaction 
tx, String sql, Object... args) {
+        try (
+                Session session = ignite.sql().createSession();
+                ResultSet<SqlRow> rs = session.execute(tx, sql, args)
+        ) {
+            return getAllResultSet(rs);
+        }
+    }
+
+    private static List<List<Object>> getAllResultSet(ResultSet<SqlRow> 
resultSet) {
+        List<List<Object>> res = new ArrayList<>();
+
+        while (resultSet.hasNext()) {
+            SqlRow sqlRow = resultSet.next();
+
+            ArrayList<Object> row = new ArrayList<>(sqlRow.columnCount());
+            for (int i = 0; i < sqlRow.columnCount(); i++) {
+                row.add(sqlRow.value(i));
+            }
+
+            res.add(row);
+        }
+
+        resultSet.close();
+
+        return res;
+    }
 }


Reply via email to