This is an automated email from the ASF dual-hosted git repository.
tledkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new aaf48298b IGNITE-16964 SQL API: Implement async SQL API (#817)
aaf48298b is described below
commit aaf48298b6b1b8f9cdc7bb7e39ebba104ec8406c
Author: Taras Ledkov <[email protected]>
AuthorDate: Mon May 30 11:19:41 2022 +0300
IGNITE-16964 SQL API: Implement async SQL API (#817)
---
.../main/java/org/apache/ignite/sql/ResultSet.java | 2 +-
.../apache/ignite/sql/async/AsyncResultSet.java | 11 +-
.../ignite/sql/reactive/ReactiveResultSet.java | 2 +-
.../client/fakes/FakeIgniteQueryProcessor.java | 6 +
.../ignite/internal/util/CollectionUtils.java | 29 +-
.../internal/testframework/IgniteTestUtils.java | 83 ++++
.../internal/sql/api/ItSqlAsynchronousApiTest.java | 466 +++++++++++++++++++++
.../org/apache/ignite/internal/app/IgniteImpl.java | 8 +-
.../internal/sql/api/AsyncResultSetImpl.java | 410 ++++++++++++++++++
.../internal/sql/api/IgniteSqlException.java | 70 ++++
.../ignite/internal/sql/api/IgniteSqlImpl.java | 66 +++
.../internal/sql/api/SessionBuilderImpl.java | 122 ++++++
.../ignite/internal/sql/api/SessionImpl.java | 295 +++++++++++++
.../ignite/internal/sql/engine/QueryProcessor.java | 17 +-
.../ignite/internal/sql/engine/QueryTimeout.java | 48 +++
.../internal/sql/engine/SqlQueryProcessor.java | 82 +++-
.../sql/engine/exec/ExecutionServiceImpl.java | 6 +-
.../sql/engine/exec/ddl/DdlCommandHandler.java | 81 +++-
.../sql/engine/schema/SqlSchemaManagerImpl.java | 6 +-
.../internal/sql/engine/IgniteSqlApiTest.java | 20 +-
.../sql/engine/exec/ExecutionServiceImplTest.java | 8 +-
21 files changed, 1788 insertions(+), 50 deletions(-)
diff --git a/modules/api/src/main/java/org/apache/ignite/sql/ResultSet.java
b/modules/api/src/main/java/org/apache/ignite/sql/ResultSet.java
index 3c24af954..f8f389f68 100644
--- a/modules/api/src/main/java/org/apache/ignite/sql/ResultSet.java
+++ b/modules/api/src/main/java/org/apache/ignite/sql/ResultSet.java
@@ -54,7 +54,7 @@ public interface ResultSet extends Iterable<SqlRow>,
AutoCloseable {
*
* @return Number of rows affected by the query, or {@code 0} if statement
return nothing, or {@code -1} if inapplicable.
*/
- int affectedRows();
+ long affectedRows();
/**
* Returns whether the query that produce this result was a conditional
query, or not. E.g. for the query "Create table if not exists"
diff --git
a/modules/api/src/main/java/org/apache/ignite/sql/async/AsyncResultSet.java
b/modules/api/src/main/java/org/apache/ignite/sql/async/AsyncResultSet.java
index 7ab63d8be..a2d8166be 100644
--- a/modules/api/src/main/java/org/apache/ignite/sql/async/AsyncResultSet.java
+++ b/modules/api/src/main/java/org/apache/ignite/sql/async/AsyncResultSet.java
@@ -58,7 +58,7 @@ public interface AsyncResultSet {
* @return Number of rows affected by the query, or {@code 0} if statement
return nothing, or {@code -1} if inapplicable.
* @see ResultSet#affectedRows()
*/
- int affectedRows();
+ long affectedRows();
/**
* Returns whether the query that produce this result was a conditional
query, or not. E.g. for the query "Create table if not exists"
@@ -66,7 +66,7 @@ public interface AsyncResultSet {
* table was already existed.
*
* <p>Note: when returns {@code false}, then either {@link
#affectedRows()} return number of affected rows or {@link #hasRowSet()}
- * returns {@code true}.
+ * returns {@code true} or conditional DDL query is not applied.
*
* @return {@code True} if conditional query applied, {@code false}
otherwise.
* @see ResultSet#wasApplied()
@@ -95,4 +95,11 @@ public interface AsyncResultSet {
* @return {@code True} if there are more pages with results, {@code
false} otherwise.
*/
boolean hasMorePages();
+
+ /**
+ * Invalidates query result, stops the query and cleanups query resources.
+ *
+ * @return Operation future.
+ */
+ CompletionStage<Void> closeAsync();
}
diff --git
a/modules/api/src/main/java/org/apache/ignite/sql/reactive/ReactiveResultSet.java
b/modules/api/src/main/java/org/apache/ignite/sql/reactive/ReactiveResultSet.java
index 04d96799e..525a5422b 100644
---
a/modules/api/src/main/java/org/apache/ignite/sql/reactive/ReactiveResultSet.java
+++
b/modules/api/src/main/java/org/apache/ignite/sql/reactive/ReactiveResultSet.java
@@ -58,7 +58,7 @@ public interface ReactiveResultSet extends
Flow.Publisher<SqlRow> {
* @return Publisher for number of rows.
* @see ResultSet#affectedRows()
*/
- Flow.Publisher<Integer> affectedRows();
+ Flow.Publisher<Long> affectedRows();
/**
* Returns publisher for a flag which determines whether the query that
produce this result was a conditional query, or not.
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
index beb263b9e..85a47ec2b 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
@@ -38,6 +38,12 @@ public class FakeIgniteQueryProcessor implements
QueryProcessor {
return List.of(CompletableFuture.completedFuture(new FakeCursor()));
}
+ @Override
+ public CompletableFuture<AsyncSqlCursor<List<Object>>>
querySingleAsync(QueryContext context, String schemaName, String qry,
+ Object... params) {
+ return CompletableFuture.completedFuture(new FakeCursor());
+ }
+
@Override
public void start() {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
index 264f1cbe0..a20b82393 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
@@ -81,10 +81,10 @@ public final class CollectionUtils {
* Gets first element from given list or returns {@code null} if list is
empty.
*
* @param list List to retrieve the first element.
- * @param <T> Type of the elements of the list.
+ * @param <T> Type of the elements of the list.
* @return The first element of the given list or {@code null} in case the
list is empty.
*/
- public static <T> T first(List<? extends T> list) {
+ public static <T> @Nullable T first(List<? extends T> list) {
if (nullOrEmpty(list)) {
return null;
}
@@ -96,10 +96,10 @@ public final class CollectionUtils {
* Gets first element from given collection or returns {@code null} if
collection is empty.
*
* @param col Collection to retrieve the first element.
- * @param <T> Type of the elements of the collection.
+ * @param <T> Type of the elements of the collection.
* @return The first element of the given collection or {@code null} in
case the collection is empty.
*/
- public static <T> T first(Collection<? extends T> col) {
+ public static <T> @Nullable T first(Collection<? extends T> col) {
if (nullOrEmpty(col)) {
return null;
}
@@ -107,6 +107,27 @@ public final class CollectionUtils {
return col.iterator().next();
}
+ /**
+ * Returns first element from the given iterable or returns {@code null}
if the list is empty.
+ *
+ * @param iterable Iterable to retrieve the first element.
+ * @param <T> Type of the elements of the list.
+ * @return The first element of the given iterable or {@code null} in case
the iterable is null or empty.
+ */
+ public static <T> @Nullable T first(Iterable<? extends T> iterable) {
+ if (iterable == null) {
+ return null;
+ }
+
+ Iterator<? extends T> it = iterable.iterator();
+
+ if (!it.hasNext()) {
+ return null;
+ }
+
+ return it.next();
+ }
+
/**
* Union set and items.
*
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
b/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
index 711a30d98..53f261f5b 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
@@ -24,6 +24,7 @@ import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
@@ -164,6 +165,88 @@ public final class IgniteTestUtils {
}
}
+ /**
+ * Get object field value via reflection.
+ *
+ * @param obj Object or class to get field value from.
+ * @param fieldNames Field names to get value for:
obj->field1->field2->...->fieldN.
+ * @param <T> Expected field class.
+ * @return Field value.
+ * @throws IgniteInternalException In case of error.
+ */
+ public static <T> T getFieldValue(Object obj, String... fieldNames) {
+ assert obj != null;
+ assert fieldNames != null;
+ assert fieldNames.length >= 1;
+
+ try {
+ for (String fieldName : fieldNames) {
+ Class<?> cls = obj instanceof Class ? (Class) obj :
obj.getClass();
+
+ try {
+ obj = findField(cls, obj, fieldName);
+ } catch (NoSuchFieldException e) {
+ // Resolve inner class, if not an inner field.
+ Class<?> innerCls = getInnerClass(cls, fieldName);
+
+ if (innerCls == null) {
+ throw new IgniteInternalException("Failed to get
object field [obj=" + obj
+ + ", fieldNames=" +
Arrays.toString(fieldNames) + ']', e);
+ }
+
+ obj = innerCls;
+ }
+ }
+
+ return (T) obj;
+ } catch (IllegalAccessException e) {
+ throw new IgniteInternalException("Failed to get object field
[obj=" + obj
+ + ", fieldNames=" + Arrays.toString(fieldNames) + ']', e);
+ }
+ }
+
+ /**
+ * Get object field value via reflection.
+ *
+ * @param cls Class for searching.
+ * @param obj Target object.
+ * @param fieldName Field name for search.
+ * @return Field from object if it was found.
+ */
+ private static Object findField(
+ Class<?> cls,
+ Object obj,
+ String fieldName
+ ) throws NoSuchFieldException, IllegalAccessException {
+ // Resolve inner field.
+ Field field = cls.getDeclaredField(fieldName);
+
+ boolean accessible = field.isAccessible();
+
+ if (!accessible) {
+ field.setAccessible(true);
+ }
+
+ return field.get(obj);
+ }
+
+ /**
+ * Get inner class by its name from the enclosing class.
+ *
+ * @param parentCls Parent class to resolve inner class for.
+ * @param innerClsName Name of the inner class.
+ * @return Inner class.
+ */
+ @Nullable public static <T> Class<T> getInnerClass(Class<?> parentCls,
String innerClsName) {
+ for (Class<?> cls : parentCls.getDeclaredClasses()) {
+ if (innerClsName.equals(cls.getSimpleName())) {
+ return (Class<T>) cls;
+ }
+ }
+
+ return null;
+ }
+
/**
* Checks whether runnable throws exception, which is itself of a
specified class, or has a cause of the specified class.
*
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
new file mode 100644
index 000000000..3d48a8b66
--- /dev/null
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.api;
+
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+import org.apache.ignite.internal.sql.engine.AbstractBasicIntegrationTest;
+import org.apache.ignite.internal.sql.engine.ClosedCursorException;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.util.CollectionUtils;
+import org.apache.ignite.lang.ColumnAlreadyExistsException;
+import org.apache.ignite.lang.ColumnNotFoundException;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IndexAlreadyExistsException;
+import org.apache.ignite.lang.TableAlreadyExistsException;
+import org.apache.ignite.lang.TableNotFoundException;
+import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.sql.async.AsyncResultSet;
+import org.apache.ignite.table.Table;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Tests for asynchronous SQL API.
+ */
+@Disabled("https://issues.apache.org/jira/browse/IGNITE-15655")
+public class ItSqlAsynchronousApiTest extends AbstractBasicIntegrationTest {
+ private static final int ROW_COUNT = 16;
+
+ /**
+ * Clear tables after each test.
+ *
+ * @param testInfo Test information oject.
+ * @throws Exception If failed.
+ */
+ @AfterEach
+ @Override
+ public void tearDown(TestInfo testInfo) throws Exception {
+ for (Table t : CLUSTER_NODES.get(0).tables().tables()) {
+ sql("DROP TABLE " + t.name());
+ }
+
+ super.tearDownBase(testInfo);
+ }
+
+ @Test
+ public void ddl() throws ExecutionException, InterruptedException {
+ IgniteSql sql = CLUSTER_NODES.get(0).sql();
+ Session ses = sql.createSession();
+
+ // CREATE TABLE
+ checkDdl(true, ses, "CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+ checkError(
+ TableAlreadyExistsException.class,
+ "Table already exists [name=PUBLIC.TEST]",
+ ses,
+ "CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"
+ );
+ checkDdl(false, ses, "CREATE TABLE IF NOT EXISTS TEST(ID INT PRIMARY
KEY, VAL VARCHAR)");
+
+ // ADD COLUMN
+ checkDdl(true, ses, "ALTER TABLE TEST ADD COLUMN IF NOT EXISTS VAL1
VARCHAR");
+ checkError(
+ TableNotFoundException.class,
+ "Table does not exist [name=PUBLIC.NOT_EXISTS_TABLE]",
+ ses,
+ "ALTER TABLE NOT_EXISTS_TABLE ADD COLUMN VAL1 VARCHAR"
+ );
+ checkDdl(false, ses, "ALTER TABLE IF EXISTS NOT_EXISTS_TABLE ADD
COLUMN VAL1 VARCHAR");
+ checkError(
+ ColumnAlreadyExistsException.class,
+ "Column already exists [name=VAL1]",
+ ses,
+ "ALTER TABLE TEST ADD COLUMN VAL1 INT"
+ );
+ checkDdl(false, ses, "ALTER TABLE TEST ADD COLUMN IF NOT EXISTS VAL1
INT");
+
+ // CREATE INDEX
+ checkDdl(true, ses, "CREATE INDEX TEST_IDX ON TEST(VAL0)");
+ checkError(
+ IndexAlreadyExistsException.class,
+ "Index already exists [name=TEST_IDX]",
+ ses,
+ "CREATE INDEX TEST_IDX ON TEST(VAL1)"
+ );
+ checkDdl(false, ses, "CREATE INDEX IF NOT EXISTS TEST_IDX ON
TEST(VAL1)");
+
+ // DROP COLUMNS
+ checkDdl(true, ses, "ALTER TABLE TEST DROP COLUMN VAL1");
+ checkError(
+ TableNotFoundException.class,
+ "Table does not exist [name=PUBLIC.NOT_EXISTS_TABLE]",
+ ses,
+ "ALTER TABLE NOT_EXISTS_TABLE DROP COLUMN VAL1"
+ );
+ checkDdl(false, ses, "ALTER TABLE IF EXISTS NOT_EXISTS_TABLE DROP
COLUMN VAL1");
+ checkError(
+ ColumnNotFoundException.class,
+ "Column 'VAL1' does not exist in table '\"PUBLIC\".\"TEST\"'",
+ ses,
+ "ALTER TABLE TEST DROP COLUMN VAL1"
+ );
+ checkDdl(false, ses, "ALTER TABLE TEST DROP COLUMN IF EXISTS VAL1");
+
+ // DROP TABLE
+ checkDdl(false, ses, "DROP TABLE IF EXISTS NOT_EXISTS_TABLE");
+ checkDdl(true, ses, "DROP TABLE TEST");
+ checkError(
+ TableNotFoundException.class,
+ "Table does not exist [name=PUBLIC.TEST]",
+ ses,
+ "DROP TABLE TEST"
+ );
+
+ checkSession(ses);
+ }
+
+ @Test
+ public void dml() throws ExecutionException, InterruptedException {
+ sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+
+ IgniteSql sql = CLUSTER_NODES.get(0).sql();
+ Session ses = sql.createSession();
+
+ for (int i = 0; i < ROW_COUNT; ++i) {
+ checkDml(1, ses, "INSERT INTO TEST VALUES (?, ?)", i, i);
+ }
+
+ checkDml(ROW_COUNT, ses, "UPDATE TEST SET VAL0 = VAL0 + ?", 1);
+
+ checkDml(ROW_COUNT, ses, "DELETE FROM TEST WHERE VAL0 >= 0");
+
+ checkSession(ses);
+ }
+
+ @Test
+ public void select() throws ExecutionException, InterruptedException {
+ sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+ for (int i = 0; i < ROW_COUNT; ++i) {
+ sql("INSERT INTO TEST VALUES (?, ?)", i, i);
+ }
+
+ IgniteSql sql = CLUSTER_NODES.get(0).sql();
+ Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT /
4).build();
+
+ TestPageProcessor pageProc = new TestPageProcessor(4);
+ ses.executeAsync(null, "SELECT ID FROM
TEST").thenCompose(pageProc).get();
+
+ Set<Integer> rs = pageProc.result().stream().map(r ->
r.intValue(0)).collect(Collectors.toSet());
+
+ for (int i = 0; i < ROW_COUNT; ++i) {
+ assertTrue(rs.remove(i), "Results invalid: " + pageProc.result());
+ }
+
+ assertTrue(rs.isEmpty());
+
+ checkSession(ses);
+ }
+
+ @Test
+ public void sqlRow() throws ExecutionException, InterruptedException {
+ IgniteSql sql = CLUSTER_NODES.get(0).sql();
+ Session ses = sql.sessionBuilder().build();
+
+ AsyncResultSet ars = ses.executeAsync(null, "SELECT 1 as COL_A, 2 as
COL_B").get();
+
+ SqlRow r = CollectionUtils.first(ars.currentPage());
+
+ assertEquals(2, r.columnCount());
+ assertEquals(0, r.columnIndex("COL_A"));
+ assertEquals(1, r.columnIndex("COL_B"));
+ assertEquals(-1, r.columnIndex("notExistColumn"));
+
+ assertEquals(1, r.intValue("COL_A"));
+ assertEquals(2, r.intValue("COL_B"));
+
+ assertThrowsWithCause(
+ () -> r.intValue("notExistColumn"),
+ IllegalArgumentException.class,
+ "Column doesn't exist [name=notExistColumn]"
+ );
+
+ assertEquals(1, r.intValue(0));
+ assertEquals(2, r.intValue(1));
+ assertThrowsWithCause(() -> r.intValue(-2),
IndexOutOfBoundsException.class);
+ assertThrowsWithCause(() -> r.intValue(10),
IndexOutOfBoundsException.class);
+
+ await(ars.closeAsync());
+
+ checkSession(ses);
+ }
+
+ @Test
+ public void pageSequence() throws ExecutionException, InterruptedException
{
+ sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+ for (int i = 0; i < ROW_COUNT; ++i) {
+ sql("INSERT INTO TEST VALUES (?, ?)", i, i);
+ }
+
+ IgniteSql sql = CLUSTER_NODES.get(0).sql();
+ Session ses = sql.sessionBuilder().defaultPageSize(1).build();
+
+ AsyncResultSet ars0 = ses.executeAsync(null, "SELECT ID FROM TEST
ORDER BY ID").get();
+ AsyncResultSet ars1 = ars0.fetchNextPage().toCompletableFuture().get();
+ AsyncResultSet ars2 = ars1.fetchNextPage().toCompletableFuture().get();
+ AsyncResultSet ars3 = ars1.fetchNextPage().toCompletableFuture().get();
+ AsyncResultSet ars4 = ars0.fetchNextPage().toCompletableFuture().get();
+
+ assertSame(ars1, ars4);
+ assertSame(ars2, ars3);
+
+ List<SqlRow> res = Stream.of(ars0, ars1, ars2)
+ .map(AsyncResultSet::currentPage)
+ .flatMap(p -> StreamSupport.stream(p.spliterator(), false))
+ .collect(Collectors.toList());
+
+ TestPageProcessor pageProc = new TestPageProcessor(ROW_COUNT -
res.size());
+ ars3.fetchNextPage().thenCompose(pageProc).toCompletableFuture().get();
+
+ res.addAll(pageProc.result());
+
+ for (int i = 0; i < ROW_COUNT; ++i) {
+ assertEquals(i, res.get(i).intValue(0));
+ }
+ }
+
+ @Test
+ public void fetchNextPageParallel() throws Exception {
+ sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+ for (int i = 0; i < ROW_COUNT; ++i) {
+ sql("INSERT INTO TEST VALUES (?, ?)", i, i);
+ }
+
+ final List<SqlRow> res = new ArrayList<>();
+
+ IgniteSql sql = CLUSTER_NODES.get(0).sql();
+ Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT /
2).build();
+
+ AsyncResultSet ars0 = ses.executeAsync(null, "SELECT ID FROM
TEST").get();
+ StreamSupport.stream(ars0.currentPage().spliterator(),
false).forEach(res::add);
+
+ AtomicInteger cnt = new AtomicInteger();
+ ConcurrentHashMap<Integer, AsyncResultSet> results = new
ConcurrentHashMap<>();
+
+ IgniteTestUtils.runMultiThreaded(
+ () -> {
+ AsyncResultSet ars =
ars0.fetchNextPage().toCompletableFuture().get();
+
+ results.put(cnt.getAndIncrement(), ars);
+
+ assertFalse(ars.hasMorePages());
+
+ return null;
+ },
+ 10,
+ "test-fetch");
+
+ AsyncResultSet ars1 = CollectionUtils.first(results.values());
+ StreamSupport.stream(ars1.currentPage().spliterator(),
false).forEach(res::add);
+
+ // Check that all next page are same.
+ results.values().forEach(ars -> assertSame(ars1, ars));
+
+ assertThrowsWithCause(
+ () -> ars1.fetchNextPage().toCompletableFuture().get(),
+ IgniteSqlException.class,
+ "There are no more pages"
+ );
+
+ await(ars0.closeAsync());
+
+ // Check results
+ Set<Integer> rs = res.stream().map(r ->
r.intValue(0)).collect(Collectors.toSet());
+
+ for (int i = 0; i < ROW_COUNT; ++i) {
+ assertTrue(rs.remove(i), "Results invalid: " + res);
+ }
+
+ assertTrue(rs.isEmpty());
+
+ checkSession(ses);
+ }
+
+ @Test
+ public void errors() {
+ IgniteSql sql = CLUSTER_NODES.get(0).sql();
+ Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT /
2).build();
+
+ // Parse error.
+ {
+ CompletableFuture<AsyncResultSet> f = ses.executeAsync(null,
"SELECT ID FROM");
+ assertThrowsWithCause(() -> f.get(),
IgniteInternalException.class, "Failed to parse query");
+ }
+
+ // Multiple statements error.
+ {
+ CompletableFuture<AsyncResultSet> f = ses.executeAsync(null,
"SELECT 1; SELECT 2");
+ assertThrowsWithCause(() -> f.get(), IgniteSqlException.class,
"Multiple statements aren't allowed");
+ }
+
+ // Planning error.
+ {
+ CompletableFuture<AsyncResultSet> f = ses.executeAsync(null,
"CREATE TABLE TEST (VAL INT)");
+ assertThrowsWithCause(() -> f.get(), IgniteException.class, "Table
without PRIMARY KEY is not supported");
+ }
+
+ // Execute error.
+ {
+ CompletableFuture<AsyncResultSet> f = ses.executeAsync(null,
"SELECT 1 / ?", 0);
+ assertThrowsWithCause(() -> f.get(), ArithmeticException.class, "/
by zero");
+ }
+
+ checkSession(ses);
+ }
+
+ @Test
+ public void closeSession() throws ExecutionException, InterruptedException
{
+ sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+ for (int i = 0; i < ROW_COUNT; ++i) {
+ sql("INSERT INTO TEST VALUES (?, ?)", i, i);
+ }
+
+ IgniteSql sql = CLUSTER_NODES.get(0).sql();
+ Session ses = sql.sessionBuilder().defaultPageSize(2).build();
+
+ AsyncResultSet ars0 = ses.executeAsync(null, "SELECT ID FROM
TEST").get();
+
+ ses.closeAsync().get();
+
+ // Fetched page is available after cancel.
+ ars0.currentPage();
+
+ assertThrowsWithCause(
+ () -> ars0.fetchNextPage().toCompletableFuture().get(),
+ ClosedCursorException.class
+ );
+
+ assertThrowsWithCause(
+ () -> ses.executeAsync(null, "SELECT ID FROM TEST").get(),
+ IgniteSqlException.class,
+ "Session is closed"
+ );
+
+ checkSession(ses);
+ }
+
+ private void checkDdl(boolean expectedApplied, Session ses, String sql)
throws ExecutionException, InterruptedException {
+ CompletableFuture<AsyncResultSet> fut = ses.executeAsync(
+ null,
+ sql
+ );
+
+ AsyncResultSet asyncRes = fut.get();
+
+ assertEquals(expectedApplied, asyncRes.wasApplied());
+ assertFalse(asyncRes.hasMorePages());
+ assertFalse(asyncRes.hasRowSet());
+ assertEquals(-1, asyncRes.affectedRows());
+
+ asyncRes.closeAsync().toCompletableFuture().get();
+ }
+
+ private void checkError(Class<? extends Throwable> expectedException,
String msg, Session ses, String sql, Object... args) {
+ CompletableFuture<AsyncResultSet> fut = ses.executeAsync(
+ null,
+ sql,
+ args
+ );
+
+ assertThrowsWithCause(fut::get, expectedException, msg);
+ }
+
+ private void checkDml(int expectedAffectedRows, Session ses, String sql,
Object... args)
+ throws ExecutionException, InterruptedException {
+ CompletableFuture<AsyncResultSet> fut = ses.executeAsync(
+ null,
+ sql,
+ args
+ );
+
+ AsyncResultSet asyncRes = fut.get();
+
+ assertFalse(asyncRes.wasApplied());
+ assertFalse(asyncRes.hasMorePages());
+ assertFalse(asyncRes.hasRowSet());
+ assertEquals(expectedAffectedRows, asyncRes.affectedRows());
+
+ asyncRes.closeAsync().toCompletableFuture().get();
+ }
+
+ private void checkSession(Session s) {
+ assertTrue(((Set<?>) IgniteTestUtils.getFieldValue(s,
"futsToClose")).isEmpty());
+ assertTrue(((Set<?>) IgniteTestUtils.getFieldValue(s,
"cursToClose")).isEmpty());
+ }
+
+ static class TestPageProcessor implements
+ Function<AsyncResultSet, CompletionStage<AsyncResultSet>> {
+ private int expectedPages;
+
+ private final List<SqlRow> res = new ArrayList<>();
+
+ TestPageProcessor(int expectedPages) {
+ this.expectedPages = expectedPages;
+ }
+
+ @Override
+ public CompletionStage<AsyncResultSet> apply(AsyncResultSet rs) {
+ expectedPages--;
+
+ assertTrue(rs.hasRowSet());
+ assertFalse(rs.wasApplied());
+ assertEquals(-1L, rs.affectedRows());
+ assertEquals(expectedPages > 0, rs.hasMorePages(),
+ "hasMorePages(): [expected=" + (expectedPages > 0) + ",
actual=" + rs.hasMorePages() + ']');
+
+ rs.currentPage().forEach(res::add);
+
+ if (rs.hasMorePages()) {
+ return rs.fetchNextPage().thenCompose(this);
+ }
+
+ return rs.closeAsync().thenApply(v -> rs);
+ }
+
+ public List<SqlRow> result() {
+ return res;
+ }
+ }
+}
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 4becd794c..af669b885 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -61,6 +61,7 @@ import
org.apache.ignite.internal.recovery.ConfigurationCatchUpListener;
import org.apache.ignite.internal.recovery.RecoveryCompletionFutureFactory;
import org.apache.ignite.internal.rest.RestComponent;
import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.sql.api.IgniteSqlImpl;
import org.apache.ignite.internal.sql.engine.QueryProcessor;
import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
import
org.apache.ignite.internal.sql.engine.message.SqlQueryMessagesSerializationRegistryInitializer;
@@ -135,6 +136,9 @@ public class IgniteImpl implements Ignite {
/** Sql query engine. */
private final SqlQueryProcessor qryEngine;
+ /** Sql API facade. */
+ private final IgniteSql sql;
+
/** Configuration manager that handles node (local) configuration. */
private final ConfigurationManager nodeCfgMgr;
@@ -315,6 +319,8 @@ public class IgniteImpl implements Ignite {
() ->
dataStorageModules.collectSchemasFields(modules.distributed().polymorphicSchemaExtensions())
);
+ sql = new IgniteSqlImpl(qryEngine);
+
compute = new IgniteComputeImpl(clusterSvc.topologyService(),
distributedTblMgr, computeComponent);
clientHandlerModule = new ClientHandlerModule(
@@ -507,7 +513,7 @@ public class IgniteImpl implements Ignite {
/** {@inheritDoc} */
@Override
public IgniteSql sql() {
- throw new UnsupportedOperationException("Not implemented yet.");
+ return sql;
}
/** {@inheritDoc} */
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
new file mode 100644
index 000000000..2c363696f
--- /dev/null
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.api;
+
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.BitSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.stream.Collectors;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.internal.sql.engine.AsyncCursor.BatchedResult;
+import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
+import org.apache.ignite.internal.sql.engine.ResultFieldMetadata;
+import org.apache.ignite.internal.sql.engine.SqlQueryType;
+import org.apache.ignite.internal.sql.engine.util.TransformingIterator;
+import org.apache.ignite.sql.NoRowSetExpectedException;
+import org.apache.ignite.sql.ResultSetMetadata;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.sql.async.AsyncResultSet;
+import org.apache.ignite.table.Tuple;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Asynchronous result set implementation.
+ */
+public class AsyncResultSetImpl implements AsyncResultSet {
+ private static final CompletableFuture<? extends AsyncResultSet>
HAS_NO_MORE_PAGE_FUTURE =
+ CompletableFuture.failedFuture(new IgniteSqlException("There are
no more pages."));
+
+ private final AsyncSqlCursor<List<Object>> cur;
+
+ private final BatchedResult<List<Object>> batchPage;
+
+ private final int pageSize;
+
+ private final Runnable closeRun;
+
+ private final Object mux = new Object();
+
+ private volatile CompletionStage<? extends AsyncResultSet> next;
+
+ /**
+ * Constructor.
+ *
+ * @param cur Asynchronous query cursor.
+ */
+ public AsyncResultSetImpl(AsyncSqlCursor<List<Object>> cur,
BatchedResult<List<Object>> page, int pageSize, Runnable closeRun) {
+ this.cur = cur;
+ this.batchPage = page;
+ this.pageSize = pageSize;
+ this.closeRun = closeRun;
+
+ assert cur.queryType() == SqlQueryType.QUERY
+ || ((cur.queryType() == SqlQueryType.DML || cur.queryType() ==
SqlQueryType.DDL)
+ && batchPage.items().size() == 1
+ && batchPage.items().get(0).size() == 1
+ && !batchPage.hasMore()) : "Invalid query result: [type=" +
cur.queryType() + "res=" + batchPage + ']';
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public @Nullable ResultSetMetadata metadata() {
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean hasRowSet() {
+ return cur.queryType() == SqlQueryType.QUERY || cur.queryType() ==
SqlQueryType.EXPLAIN;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long affectedRows() {
+ if (cur.queryType() != SqlQueryType.DML) {
+ return -1;
+ }
+
+ assert batchPage.items().get(0).get(0) instanceof Long : "Invalid DML
result: " + batchPage;
+
+ return (long) batchPage.items().get(0).get(0);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean wasApplied() {
+ if (cur.queryType() != SqlQueryType.DDL) {
+ return false;
+ }
+
+ assert batchPage.items().get(0).get(0) instanceof Boolean : "Invalid
DDL result: " + batchPage;
+
+ return (boolean) batchPage.items().get(0).get(0);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Iterable<SqlRow> currentPage() {
+ if (!hasRowSet()) {
+ throw new NoRowSetExpectedException("Query hasn't result set:
[type=" + cur.queryType() + ']');
+ }
+
+ return () -> new TransformingIterator<>(batchPage.items().iterator(),
SqlRowImpl::new);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CompletionStage<? extends AsyncResultSet> fetchNextPage() {
+ if (next == null) {
+ synchronized (mux) {
+ if (next == null) {
+ if (!hasMorePages()) {
+ next = HAS_NO_MORE_PAGE_FUTURE;
+ } else {
+ next = cur.requestNextAsync(pageSize)
+ .thenApply(batchRes -> new
AsyncResultSetImpl(cur, batchRes, pageSize, closeRun));
+ }
+ }
+ }
+ }
+
+ return next;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean hasMorePages() {
+ return batchPage.hasMore();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CompletionStage<Void> closeAsync() {
+ return cur.closeAsync().thenRun(closeRun);
+ }
+
+ private class SqlRowImpl implements SqlRow {
+ private final List<Object> row;
+
+ private final Map<String, Integer> fields;
+
+ org.apache.ignite.internal.sql.engine.ResultSetMetadata meta =
cur.metadata();
+
+ SqlRowImpl(List<Object> row) {
+ this.row = row;
+ fields = meta.fields().stream()
+ .collect(Collectors.toMap(ResultFieldMetadata::name,
ResultFieldMetadata::order));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int columnCount() {
+ return meta.fields().size();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String columnName(int columnIndex) {
+ return meta.fields().get(columnIndex).name();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int columnIndex(@NotNull String columnName) {
+ return fields.getOrDefault(columnName, -1);
+ }
+
+ private int columnIndexChecked(@NotNull String columnName) {
+ int idx = columnIndex(columnName);
+
+ if (idx == -1) {
+ throw new IllegalArgumentException("Column doesn't exist
[name=" + columnName + ']');
+ }
+
+ return idx;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T> T valueOrDefault(@NotNull String columnName, T
defaultValue) {
+ T ret = (T) row.get(columnIndexChecked(columnName));
+
+ return ret != null ? ret : defaultValue;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Tuple set(@NotNull String columnName, Object value) {
+ throw new UnsupportedOperationException("Operation not
supported.");
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T> T value(@NotNull String columnName) throws
IllegalArgumentException {
+ return (T) row.get(columnIndexChecked(columnName));
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T> T value(int columnIndex) {
+ return (T) row.get(columnIndex);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public BinaryObject binaryObjectValue(@NotNull String columnName) {
+ return (BinaryObject) row.get(columnIndexChecked(columnName));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public BinaryObject binaryObjectValue(int columnIndex) {
+ return (BinaryObject) row.get(columnIndex);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public byte byteValue(@NotNull String columnName) {
+ return (byte) row.get(columnIndexChecked(columnName));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public byte byteValue(int columnIndex) {
+ return (byte) row.get(columnIndex);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public short shortValue(@NotNull String columnName) {
+ return (short) row.get(columnIndexChecked(columnName));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public short shortValue(int columnIndex) {
+ return (short) row.get(columnIndex);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int intValue(@NotNull String columnName) {
+ return (int) row.get(columnIndexChecked(columnName));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int intValue(int columnIndex) {
+ return (int) row.get(columnIndex);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long longValue(@NotNull String columnName) {
+ return (long) row.get(columnIndexChecked(columnName));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long longValue(int columnIndex) {
+ return (long) row.get(columnIndex);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public float floatValue(@NotNull String columnName) {
+ return (float) row.get(columnIndexChecked(columnName));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public float floatValue(int columnIndex) {
+ return (float) row.get(columnIndex);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public double doubleValue(@NotNull String columnName) {
+ return (double) row.get(columnIndexChecked(columnName));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public double doubleValue(int columnIndex) {
+ return (double) row.get(columnIndex);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String stringValue(@NotNull String columnName) {
+ return (String) row.get(columnIndexChecked(columnName));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String stringValue(int columnIndex) {
+ return (String) row.get(columnIndex);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public UUID uuidValue(@NotNull String columnName) {
+ return (UUID) row.get(columnIndexChecked(columnName));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public UUID uuidValue(int columnIndex) {
+ return (UUID) row.get(columnIndex);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public BitSet bitmaskValue(@NotNull String columnName) {
+ return (BitSet) row.get(columnIndexChecked(columnName));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public BitSet bitmaskValue(int columnIndex) {
+ return (BitSet) row.get(columnIndex);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public LocalDate dateValue(String columnName) {
+ return (LocalDate) row.get(columnIndexChecked(columnName));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public LocalDate dateValue(int columnIndex) {
+ return (LocalDate) row.get(columnIndex);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public LocalTime timeValue(String columnName) {
+ return (LocalTime) row.get(columnIndexChecked(columnName));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public LocalTime timeValue(int columnIndex) {
+ return (LocalTime) row.get(columnIndex);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public LocalDateTime datetimeValue(String columnName) {
+ return (LocalDateTime) row.get(columnIndexChecked(columnName));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public LocalDateTime datetimeValue(int columnIndex) {
+ return (LocalDateTime) row.get(columnIndex);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Instant timestampValue(String columnName) {
+ return (Instant) row.get(columnIndexChecked(columnName));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Instant timestampValue(int columnIndex) {
+ return (Instant) row.get(columnIndex);
+ }
+
+ /** {@inheritDoc} */
+ @NotNull
+ @Override
+ public Iterator<Object> iterator() {
+ return row.iterator();
+ }
+
+ @Override
+ public ResultSetMetadata metadata() {
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+ }
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlException.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlException.java
new file mode 100644
index 000000000..72c510f21
--- /dev/null
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlException.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.api;
+
+import org.apache.ignite.lang.IgniteException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Ignite SQL exception.
+ */
+public class IgniteSqlException extends IgniteException {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Creates an empty exception.
+ */
+ public IgniteSqlException() {
+ // No-op.
+ }
+
+ /**
+ * Creates a new exception with the given error message.
+ *
+ * @param msg Error message.
+ */
+ public IgniteSqlException(String msg) {
+ super(msg);
+ }
+
+ /**
+ * Creates a new grid exception with the given throwable as a cause and
source of error message.
+ *
+ * @param cause Non-null throwable cause.
+ */
+ public IgniteSqlException(Throwable cause) {
+ this(cause.getMessage(), cause);
+ }
+
+ /**
+ * Creates a new exception with the given error message and optional
nested exception.
+ *
+ * @param msg Error message.
+ * @param cause Optional nested exception (can be {@code null}).
+ */
+ public IgniteSqlException(String msg, @Nullable Throwable cause) {
+ super(msg, cause);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString() {
+ return getClass() + ": " + getMessage();
+ }
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
new file mode 100644
index 000000000..bfa20d851
--- /dev/null
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.api;
+
+import java.util.HashMap;
+import org.apache.ignite.internal.sql.engine.QueryProcessor;
+import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.Session.SessionBuilder;
+import org.apache.ignite.sql.Statement;
+import org.apache.ignite.sql.Statement.StatementBuilder;
+
+/**
+ * Embedded implementation of the Ignite SQL query facade.
+ */
+public class IgniteSqlImpl implements IgniteSql {
+ private final QueryProcessor qryProc;
+
+ /**
+ * Constructor.
+ *
+ * @param qryProc Query processor.
+ */
+ public IgniteSqlImpl(QueryProcessor qryProc) {
+ this.qryProc = qryProc;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Session createSession() {
+ return sessionBuilder().build();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public SessionBuilder sessionBuilder() {
+ return new SessionBuilderImpl(qryProc, new HashMap<>());
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Statement createStatement(String query) {
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public StatementBuilder statementBuilder() {
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionBuilderImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionBuilderImpl.java
new file mode 100644
index 000000000..224859b77
--- /dev/null
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionBuilderImpl.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.api;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.sql.engine.QueryProcessor;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.Session.SessionBuilder;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Session builder implementation.
+ */
+public class SessionBuilderImpl implements SessionBuilder {
+
+ public static final long DEFAULT_TIMEOUT = 0;
+
+ private final QueryProcessor qryProc;
+
+ private long timeout = DEFAULT_TIMEOUT;
+
+ private String schema = Session.DEFAULT_SCHEMA;
+
+ private int pageSize = Session.DEFAULT_PAGE_SIZE;
+
+ private final Map<String, Object> props;
+
+ /**
+ * Session builder constructor.
+ *
+ * @param qryProc SQL query processor.
+ * @param props Initial properties.
+ */
+ SessionBuilderImpl(QueryProcessor qryProc, Map<String, Object> props) {
+ this.qryProc = qryProc;
+ this.props = props;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long defaultTimeout(TimeUnit timeUnit) {
+ return timeUnit.convert(timeout, TimeUnit.NANOSECONDS);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public SessionBuilder defaultTimeout(long timeout, TimeUnit timeUnit) {
+ this.timeout = timeUnit.toNanos(timeout);
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String defaultSchema() {
+ return schema;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public SessionBuilder defaultSchema(String schema) {
+ this.schema = schema;
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int defaultPageSize() {
+ return pageSize;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public SessionBuilder defaultPageSize(int pageSize) {
+ this.pageSize = pageSize;
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public @Nullable Object property(String name) {
+ return props.get(name);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public SessionBuilder property(String name, @Nullable Object value) {
+ props.put(name, value);
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Session build() {
+ return new SessionImpl(
+ qryProc,
+ schema,
+ timeout,
+ pageSize,
+ props
+ );
+ }
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
new file mode 100644
index 000000000..8ee0e4ea4
--- /dev/null
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.api;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.sql.engine.AsyncCursor;
+import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
+import org.apache.ignite.internal.sql.engine.QueryContext;
+import org.apache.ignite.internal.sql.engine.QueryProcessor;
+import org.apache.ignite.internal.sql.engine.QueryTimeout;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.sql.BatchedArguments;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.Statement;
+import org.apache.ignite.sql.async.AsyncResultSet;
+import org.apache.ignite.sql.reactive.ReactiveResultSet;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Embedded implementation of the SQL session.
+ */
+public class SessionImpl implements Session {
+ /** Busy lock for close synchronisation. */
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ private final QueryProcessor qryProc;
+
+ private final long timeout;
+
+ private final String schema;
+
+ private final int pageSize;
+
+ private final Set<CompletableFuture<AsyncSqlCursor<List<Object>>>>
futsToClose = Collections.newSetFromMap(new ConcurrentHashMap<>());
+
+ private final Set<AsyncSqlCursor<List<Object>>> cursToClose =
Collections.newSetFromMap(new ConcurrentHashMap<>());
+
+ private final Map<String, Object> props;
+
+ /**
+ * Constructor.
+ *
+ * @param qryProc Query processor.
+ * @param schema Query default schema.
+ * @param timeout Query default timeout.
+ * @param pageSize Query fetch page size.
+ * @param props Session's properties.
+ */
+ SessionImpl(
+ QueryProcessor qryProc,
+ String schema,
+ long timeout,
+ int pageSize,
+ Map<String, Object> props
+ ) {
+ this.qryProc = qryProc;
+ this.schema = schema;
+ this.timeout = timeout;
+ this.pageSize = pageSize;
+ this.props = props;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ResultSet execute(@Nullable Transaction transaction, String query,
@Nullable Object... arguments) {
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ResultSet execute(@Nullable Transaction transaction, Statement
statement, @Nullable Object... arguments) {
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int[] executeBatch(@Nullable Transaction transaction, String
dmlQuery, BatchedArguments batch) {
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int[] executeBatch(@Nullable Transaction transaction, Statement
dmlStatement, BatchedArguments batch) {
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void executeScript(String query, @Nullable Object... arguments) {
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long defaultTimeout(TimeUnit timeUnit) {
+ return timeUnit.convert(timeout, TimeUnit.NANOSECONDS);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String defaultSchema() {
+ return schema;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int defaultPageSize() {
+ return pageSize;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public @Nullable Object property(String name) {
+ return props.get(name);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public SessionBuilder toBuilder() {
+ if (!busyLock.enterBusy()) {
+ throw new IgniteSqlException("Session is closed");
+ }
+
+ try {
+ return new SessionBuilderImpl(qryProc, props)
+ .defaultPageSize(pageSize)
+ .defaultTimeout(timeout, TimeUnit.NANOSECONDS)
+ .defaultSchema(schema);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<AsyncResultSet> executeAsync(@Nullable
Transaction transaction, String query, @Nullable Object... arguments) {
+ if (!busyLock.enterBusy()) {
+ return CompletableFuture.failedFuture(new
IgniteSqlException("Session is closed."));
+ }
+
+ try {
+ QueryContext ctx = QueryContext.of(transaction, new
QueryTimeout(timeout, TimeUnit.NANOSECONDS));
+
+ final CompletableFuture<AsyncSqlCursor<List<Object>>> f =
qryProc.querySingleAsync(ctx, schema, query, arguments);
+
+ futsToClose.add(f);
+
+ return f.whenComplete(
+ (cur, ex0) -> futsToClose.remove(f))
+ .thenCompose(cur -> {
+ if (!busyLock.enterBusy()) {
+ return cur.closeAsync()
+ .thenCompose((v) ->
CompletableFuture.failedFuture(new IgniteSqlException("Session is closed")));
+ }
+
+ try {
+ cursToClose.add(cur);
+
+ return cur.requestNextAsync(pageSize)
+ .<AsyncResultSet>thenApply(
+ batchRes -> new AsyncResultSetImpl(
+ cur,
+ batchRes,
+ pageSize,
+ () ->
cursToClose.remove(cur)
+ )
+ )
+ .whenComplete((ars, ex1) -> {
+ if (ex1 != null) {
+ cursToClose.remove(cur);
+
+ cur.closeAsync();
+ }
+ });
+ } catch (Throwable e) {
+ cursToClose.remove(cur);
+
+ return cur.closeAsync()
+ .thenCompose((v) ->
CompletableFuture.failedFuture(e));
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+ );
+ } catch (Exception e) {
+ return CompletableFuture.failedFuture(e);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<AsyncResultSet> executeAsync(
+ @Nullable Transaction transaction,
+ Statement statement,
+ @Nullable Object... arguments
+ ) {
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<int[]> executeBatchAsync(@Nullable Transaction
transaction, String query, BatchedArguments batch) {
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<int[]> executeBatchAsync(@Nullable Transaction
transaction, Statement statement, BatchedArguments batch) {
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<Void> executeScriptAsync(String query, @Nullable
Object... arguments) {
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ReactiveResultSet executeReactive(@Nullable Transaction
transaction, String query, @Nullable Object... arguments) {
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+
+ @Override
+ public ReactiveResultSet executeReactive(@Nullable Transaction
transaction, Statement statement, @Nullable Object... arguments) {
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Publisher<Integer> executeBatchReactive(@Nullable Transaction
transaction, String query, BatchedArguments batch) {
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Publisher<Integer> executeBatchReactive(@Nullable Transaction
transaction, Statement statement, BatchedArguments batch) {
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void close() {
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ return CompletableFuture
+ .runAsync(busyLock::block)
+ .thenCompose(
+ v0 -> {
+ futsToClose.forEach(f -> f.cancel(false));
+
+ return CompletableFuture.allOf(
+
cursToClose.stream().map(AsyncCursor::closeAsync).toArray(CompletableFuture[]::new)
+ )
+ .whenComplete((v, e) ->
cursToClose.clear());
+ }
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Publisher<Void> closeReactive() {
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java
index 74ec29027..14ccd191d 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java
@@ -48,6 +48,21 @@ public interface QueryProcessor extends IgniteComponent {
* @return List of sql cursors.
*
* @throws IgniteException in case of an error.
- * */
+ */
List<CompletableFuture<AsyncSqlCursor<List<Object>>>>
queryAsync(QueryContext context, String schemaName, String qry, Object...
params);
+
+ /**
+ * Execute the single statement query with given schema name and
parameters.
+ *
+ * <p>If the query string contains more than one statement the
IgniteException will be thrown.
+ *
+ * @param context User query context.
+ * @param schemaName Schema name.
+ * @param qry Single statement SQL query .
+ * @param params Query parameters.
+ * @return Sql cursor.
+ *
+ * @throws IgniteException in case of an error.
+ */
+ CompletableFuture<AsyncSqlCursor<List<Object>>>
querySingleAsync(QueryContext context, String schemaName, String qry, Object...
params);
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryTimeout.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryTimeout.java
new file mode 100644
index 000000000..132a89bb1
--- /dev/null
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryTimeout.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Query timeout.
+ */
+public class QueryTimeout {
+ /** Timeout. */
+ private final long timeoutNanos;
+
+ /**
+ * Constructor.
+ *
+ * @param timeout Query timeout value.
+ * @param timeUnit Timeunit.
+ */
+ public QueryTimeout(long timeout, TimeUnit timeUnit) {
+ this.timeoutNanos = timeUnit.toNanos(timeout);
+ }
+
+ /**
+ * Return query timeout.
+ *
+ * @param timeUnit Timeunit to convert timeout to.
+ * @return Default query timeout in the given timeunit.
+ */
+ public long timeout(TimeUnit timeUnit) {
+ return timeUnit.convert(timeoutNanos, TimeUnit.NANOSECONDS);
+ }
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index be5c4aa32..11394e6ef 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -36,6 +37,7 @@ import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.util.Pair;
import org.apache.ignite.internal.manager.EventListener;
+import org.apache.ignite.internal.sql.api.IgniteSqlException;
import org.apache.ignite.internal.sql.engine.exec.ArrayRowHandler;
import org.apache.ignite.internal.sql.engine.exec.ExchangeServiceImpl;
import org.apache.ignite.internal.sql.engine.exec.ExecutionService;
@@ -228,6 +230,78 @@ public class SqlQueryProcessor implements QueryProcessor {
}
}
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<AsyncSqlCursor<List<Object>>>
querySingleAsync(QueryContext context, String schemaName, String qry,
+ Object... params) {
+ if (!busyLock.enterBusy()) {
+ throw new IgniteInternalException(new NodeStoppingException());
+ }
+
+ try {
+ return querySingle0(context, schemaName, qry, params);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ private CompletableFuture<AsyncSqlCursor<List<Object>>> querySingle0(
+ QueryContext context,
+ String schemaName,
+ String sql,
+ Object... params) {
+ SchemaPlus schema = schemaManager.schema(schemaName);
+
+ if (schema == null) {
+ return CompletableFuture.failedFuture(new
IgniteInternalException(format("Schema not found [schemaName={}]",
schemaName)));
+ }
+
+ final BaseQueryContext ctx = BaseQueryContext.builder()
+ .cancel(new QueryCancel())
+ .frameworkConfig(
+ Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
+ .defaultSchema(schema)
+ .build()
+ )
+ .logger(LOG)
+ .parameters(params)
+ .build();
+
+ CompletableFuture<Void> start = new CompletableFuture<>();
+
+ CompletableFuture<AsyncSqlCursor<List<Object>>> stage =
start.thenApply(
+ (v) -> Commons.parse(sql,
FRAMEWORK_CONFIG.getParserConfig())
+ )
+ .thenApply(nodes -> {
+ if (nodes.size() > 1) {
+ throw new IgniteSqlException("Multiple statements
aren't allowed.");
+ }
+
+ return nodes.get(0);
+ })
+ .thenCompose(sqlNode -> prepareSvc.prepareAsync(sqlNode, ctx))
+ .thenApply(plan -> {
+ context.maybeUnwrap(QueryValidator.class)
+ .ifPresent(queryValidator ->
queryValidator.validatePlan(plan));
+
+ return new AsyncSqlCursorImpl<>(
+ SqlQueryType.mapPlanTypeToSqlType(plan.type()),
+ plan.metadata(),
+ executionSrvc.executePlan(plan, ctx)
+ );
+ });
+
+ stage.whenComplete((cur, ex) -> {
+ if (ex instanceof CancellationException) {
+ ctx.cancel().cancel();
+ }
+ });
+
+ start.completeAsync(() -> null, taskExecutor);
+
+ return stage;
+ }
+
private List<CompletableFuture<AsyncSqlCursor<List<Object>>>> query0(
QueryContext context,
String schemaName,
@@ -247,7 +321,7 @@ public class SqlQueryProcessor implements QueryProcessor {
CompletableFuture<Void> start = new CompletableFuture<>();
for (SqlNode sqlNode : nodes) {
- BaseQueryContext ctx = BaseQueryContext.builder()
+ final BaseQueryContext ctx = BaseQueryContext.builder()
.cancel(new QueryCancel())
.frameworkConfig(
Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
@@ -270,6 +344,12 @@ public class SqlQueryProcessor implements QueryProcessor {
);
});
+ stage.whenComplete((cur, ex) -> {
+ if (ex instanceof CancellationException) {
+ ctx.cancel().cancel();
+ }
+ });
+
res.add(stage);
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index bb379faaa..d919a0281 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -239,13 +239,13 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
private AsyncCursor<List<Object>> executeDdl(DdlPlan plan) {
try {
- ddlCmdHnd.handle(plan.command());
+ boolean ret = ddlCmdHnd.handle(plan.command());
+
+ return new
AsyncWrapper<>(Collections.singletonList(Collections.<Object>singletonList(ret)).iterator());
} catch (IgniteInternalCheckedException e) {
throw new IgniteInternalException("Failed to execute DDL statement
[stmt=" /*+ qry.sql()*/
+ ", err=" + e.getMessage() + ']', e);
}
-
- return new AsyncWrapper<>(Collections.emptyIterator());
}
private AsyncCursor<List<Object>> executeExplain(ExplainPlan plan) {
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
index de86acc1c..64af405e7 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
@@ -26,6 +26,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.ignite.configuration.NamedListView;
@@ -81,21 +82,21 @@ public class DdlCommandHandler {
}
/** Handles ddl commands. */
- public void handle(DdlCommand cmd) throws IgniteInternalCheckedException {
+ public boolean handle(DdlCommand cmd) throws
IgniteInternalCheckedException {
validateCommand(cmd);
if (cmd instanceof CreateTableCommand) {
- handleCreateTable((CreateTableCommand) cmd);
+ return handleCreateTable((CreateTableCommand) cmd);
} else if (cmd instanceof DropTableCommand) {
- handleDropTable((DropTableCommand) cmd);
+ return handleDropTable((DropTableCommand) cmd);
} else if (cmd instanceof AlterTableAddCommand) {
- handleAlterAddColumn((AlterTableAddCommand) cmd);
+ return handleAlterAddColumn((AlterTableAddCommand) cmd);
} else if (cmd instanceof AlterTableDropCommand) {
- handleAlterDropColumn((AlterTableDropCommand) cmd);
+ return handleAlterDropColumn((AlterTableDropCommand) cmd);
} else if (cmd instanceof CreateIndexCommand) {
- handleCreateIndex((CreateIndexCommand) cmd);
+ return handleCreateIndex((CreateIndexCommand) cmd);
} else if (cmd instanceof DropIndexCommand) {
- handleDropIndex((DropIndexCommand) cmd);
+ return handleDropIndex((DropIndexCommand) cmd);
} else {
throw new IgniteInternalCheckedException("Unsupported DDL
operation ["
+ "cmdName=" + (cmd == null ? null :
cmd.getClass().getSimpleName()) + "; "
@@ -115,7 +116,7 @@ public class DdlCommandHandler {
}
/** Handles create table command. */
- private void handleCreateTable(CreateTableCommand cmd) {
+ private boolean handleCreateTable(CreateTableCommand cmd) {
final PrimaryKeyDefinitionBuilder pkeyDef =
SchemaBuilders.primaryKey();
pkeyDef.withColumns(IgniteObjectName.quoteNames(cmd.primaryKeyColumns()));
@@ -162,32 +163,40 @@ public class DdlCommandHandler {
try {
tableManager.createTable(fullName, tblChanger);
+
+ return true;
} catch (TableAlreadyExistsException ex) {
if (!cmd.ifTableExists()) {
throw ex;
+ } else {
+ return false;
}
}
}
/** Handles drop table command. */
- private void handleDropTable(DropTableCommand cmd) {
+ private boolean handleDropTable(DropTableCommand cmd) {
String fullName = TableDefinitionImpl.canonicalName(
IgniteObjectName.quote(cmd.schemaName()),
IgniteObjectName.quote(cmd.tableName())
);
try {
tableManager.dropTable(fullName);
+
+ return true;
} catch (TableNotFoundException ex) {
if (!cmd.ifTableExists()) {
throw ex;
+ } else {
+ return false;
}
}
}
/** Handles add column command. */
- private void handleAlterAddColumn(AlterTableAddCommand cmd) {
+ private boolean handleAlterAddColumn(AlterTableAddCommand cmd) {
if (nullOrEmpty(cmd.columns())) {
- return;
+ return false;
}
String fullName = TableDefinitionImpl.canonicalName(
@@ -196,18 +205,20 @@ public class DdlCommandHandler {
);
try {
- addColumnInternal(fullName, cmd.columns(),
cmd.ifColumnNotExists());
+ return addColumnInternal(fullName, cmd.columns(),
cmd.ifColumnNotExists());
} catch (TableNotFoundException ex) {
if (!cmd.ifTableExists()) {
throw ex;
+ } else {
+ return false;
}
}
}
/** Handles drop column command. */
- private void handleAlterDropColumn(AlterTableDropCommand cmd) {
+ private boolean handleAlterDropColumn(AlterTableDropCommand cmd) {
if (nullOrEmpty(cmd.columns())) {
- return;
+ return false;
}
String fullName = TableDefinitionImpl.canonicalName(
@@ -216,16 +227,18 @@ public class DdlCommandHandler {
);
try {
- dropColumnInternal(fullName, cmd.columns(), cmd.ifColumnExists());
+ return dropColumnInternal(fullName, cmd.columns(),
cmd.ifColumnExists());
} catch (TableNotFoundException ex) {
if (!cmd.ifTableExists()) {
throw ex;
+ } else {
+ return false;
}
}
}
/** Handles create index command. */
- private void handleCreateIndex(CreateIndexCommand cmd) {
+ private boolean handleCreateIndex(CreateIndexCommand cmd) {
// Only sorted idx for now.
SortedIndexDefinitionBuilder idx =
SchemaBuilders.sortedIndex(cmd.indexName());
@@ -244,21 +257,29 @@ public class DdlCommandHandler {
IgniteObjectName.quote(cmd.tableName())
);
+ AtomicBoolean ret = new AtomicBoolean();
+
tableManager.alterTable(fullName, chng -> chng.changeIndices(idxes -> {
if (idxes.get(cmd.indexName()) != null) {
if (!cmd.ifIndexNotExists()) {
throw new IndexAlreadyExistsException(cmd.indexName());
} else {
+ ret.set(false);
+
return;
}
}
idxes.create(cmd.indexName(), tableIndexChange ->
convert(idx.build(), tableIndexChange));
+
+ ret.set(true);
}));
+
+ return ret.get();
}
/** Handles drop index command. */
- private void handleDropIndex(DropIndexCommand cmd) {
+ private boolean handleDropIndex(DropIndexCommand cmd) {
throw new UnsupportedOperationException("DROP INDEX command not
supported for now.");
}
@@ -268,8 +289,11 @@ public class DdlCommandHandler {
* @param fullName Table with schema name.
* @param colsDef Columns defenitions.
* @param colNotExist Flag indicates exceptionally behavior in case of
already existing column.
+ *
+ * @return {@code true} if the full columns set is applied successfully.
Otherwise, returns {@code false}.
*/
- private void addColumnInternal(String fullName, List<ColumnDefinition>
colsDef, boolean colNotExist) {
+ private boolean addColumnInternal(String fullName, List<ColumnDefinition>
colsDef, boolean colNotExist) {
+ AtomicBoolean ret = new AtomicBoolean(true);
tableManager.alterTable(
fullName,
chng -> chng.changeColumns(cols -> {
@@ -285,7 +309,15 @@ public class DdlCommandHandler {
colsDef0 = colsDef;
} else {
- colsDef0 = colsDef.stream().filter(k ->
!colNamesToOrders.containsKey(k.name())).collect(Collectors.toList());
+ colsDef0 = colsDef.stream().filter(k -> {
+ if (colNamesToOrders.containsKey(k.name())) {
+ ret.set(false);
+
+ return false;
+ } else {
+ return true;
+ }
+ }).collect(Collectors.toList());
}
final IgniteTypeFactory typeFactory =
Commons.typeFactory();
@@ -301,6 +333,8 @@ public class DdlCommandHandler {
cols.create(col.name(), colChg ->
convert(col0.build(), colChg));
}
}));
+
+ return ret.get();
}
/**
@@ -309,8 +343,11 @@ public class DdlCommandHandler {
* @param fullName Table with schema name.
* @param colNames Columns definitions.
* @param colExist Flag indicates exceptionally behavior in case of
already existing column.
+ * @return {@code true} if the full columns set is applied successfully.
Otherwise, returns {@code false}.
*/
- private void dropColumnInternal(String fullName, Set<String> colNames,
boolean colExist) {
+ private boolean dropColumnInternal(String fullName, Set<String> colNames,
boolean colExist) {
+ AtomicBoolean ret = new AtomicBoolean(true);
+
tableManager.alterTable(
fullName,
chng -> chng.changeColumns(cols -> {
@@ -324,6 +361,8 @@ public class DdlCommandHandler {
for (String colName : colNames) {
if (!colNamesToOrders.containsKey(colName)) {
+ ret.set(false);
+
if (!colExist) {
throw new ColumnNotFoundException(colName,
fullName);
}
@@ -339,6 +378,8 @@ public class DdlCommandHandler {
colNames0.forEach(k ->
cols.delete(colNamesToOrders.get(k)));
}));
+
+ return ret.get();
}
/** Map column name to order. */
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
index 9d0d8bb31..5a6a74132 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
@@ -48,6 +48,8 @@ import org.jetbrains.annotations.Nullable;
* Holds actual schema and mutates it on schema change, requested by Ignite.
*/
public class SqlSchemaManagerImpl implements SqlSchemaManager {
+ private static final String DEFAULT_SCHEMA_NAME = "PUBLIC";
+
private final VersionedValue<Map<String, IgniteSchema>> schemasVv;
private final VersionedValue<Map<UUID, IgniteTable>> tablesVv;
@@ -72,7 +74,7 @@ public class SqlSchemaManagerImpl implements SqlSchemaManager
{
calciteSchemaVv = new VersionedValue<>(registry, () -> {
SchemaPlus newCalciteSchema = Frameworks.createRootSchema(false);
- newCalciteSchema.add("PUBLIC", new IgniteSchema("PUBLIC"));
+ newCalciteSchema.add(DEFAULT_SCHEMA_NAME, new
IgniteSchema(DEFAULT_SCHEMA_NAME));
return newCalciteSchema;
});
@@ -84,7 +86,7 @@ public class SqlSchemaManagerImpl implements SqlSchemaManager
{
public SchemaPlus schema(@Nullable String schema) {
SchemaPlus schemaPlus = calciteSchemaVv.latest();
- return schema != null ? schemaPlus.getSubSchema(schema) : schemaPlus;
+ return schema != null ? schemaPlus.getSubSchema(schema) :
schemaPlus.getSubSchema(DEFAULT_SCHEMA_NAME);
}
/** {@inheritDoc} */
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/IgniteSqlApiTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/IgniteSqlApiTest.java
index 19be69538..0a6b08463 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/IgniteSqlApiTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/IgniteSqlApiTest.java
@@ -132,12 +132,12 @@ public class IgniteSqlApiTest {
assertTrue(rs.wasApplied());
assertFalse(rs.hasRowSet());
- assertEquals(-1, rs.affectedRows());
+ assertEquals(-1L, rs.affectedRows());
// Execute DML.
rs = sess.execute(null, "INSERT INTO tbl VALUES (?, ?)", 1, "str1");
- assertEquals(1, rs.affectedRows());
+ assertEquals(1L, rs.affectedRows());
assertFalse(rs.wasApplied());
assertFalse(rs.hasRowSet());
@@ -153,7 +153,7 @@ public class IgniteSqlApiTest {
assertTrue(rs.hasRowSet());
assertFalse(rs.wasApplied());
- assertEquals(-1, rs.affectedRows());
+ assertEquals(-1L, rs.affectedRows());
assertTrue(rs.iterator().hasNext());
for (SqlRow r : rs) {
@@ -163,7 +163,7 @@ public class IgniteSqlApiTest {
// Execute DML.
rs = sess.execute(null, "DELETE FROM tbl");
- assertEquals(4, rs.affectedRows());
+ assertEquals(4L, rs.affectedRows());
assertFalse(rs.wasApplied());
assertFalse(rs.hasRowSet());
}
@@ -205,7 +205,7 @@ public class IgniteSqlApiTest {
// Execute DML outside tx in same session after tx commit.
rs = sess.execute(null, "DELETE FROM tbl");
- assertEquals(4, rs.affectedRows());
+ assertEquals(4L, rs.affectedRows());
});
Mockito.verify(transaction).commit();
@@ -451,7 +451,7 @@ public class IgniteSqlApiTest {
Mockito.when(res.iterator()).thenThrow(AssertionError.class);
Mockito.when(res.wasApplied()).thenReturn(false);
Mockito.when(res.hasRowSet()).thenReturn(false);
- Mockito.when(res.affectedRows()).thenReturn(1);
+ Mockito.when(res.affectedRows()).thenReturn(1L);
return res;
});
@@ -474,7 +474,7 @@ public class IgniteSqlApiTest {
ResultSet res = Mockito.mock(ResultSet.class);
Mockito.when(res.wasApplied()).thenReturn(false);
Mockito.when(res.hasRowSet()).thenReturn(true);
- Mockito.when(res.affectedRows()).thenReturn(-1);
+ Mockito.when(res.affectedRows()).thenReturn(-1L);
Transaction txArg = ans.getArgument(0);
Integer filterArg = ans.getArgument(2);
@@ -491,7 +491,7 @@ public class IgniteSqlApiTest {
oldState.forEach((k, v) -> state.put(k, null));
- return oldState.size();
+ return (long) oldState.size();
})
.getMock());
@@ -500,7 +500,7 @@ public class IgniteSqlApiTest {
ResultSet res = Mockito.mock(ResultSet.class);
Mockito.when(res.wasApplied()).thenReturn(false);
Mockito.when(res.hasRowSet()).thenReturn(true);
- Mockito.when(res.affectedRows()).thenReturn(-1);
+ Mockito.when(res.affectedRows()).thenReturn(-1L);
Mockito.when(res.iterator())
.thenReturn(List.of(
createRow(2, "str2").build(),
@@ -515,7 +515,7 @@ public class IgniteSqlApiTest {
Mockito.when(res.iterator()).thenThrow(AssertionError.class);
Mockito.when(res.wasApplied()).thenReturn(true);
Mockito.when(res.hasRowSet()).thenReturn(false);
- Mockito.when(res.affectedRows()).thenReturn(-1);
+ Mockito.when(res.affectedRows()).thenReturn(-1L);
return res;
});
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index 493532df7..ca23ad08f 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.sql.engine.exec;
import static
org.apache.ignite.internal.sql.engine.util.BaseQueryContext.CLUSTER;
import static
org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFIG;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
-import static
org.apache.ignite.internal.testframework.IgniteTestUtils.sneakyThrow;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.lang.IgniteStringFormatter.format;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -78,6 +77,7 @@ import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.internal.testframework.IgniteTestUtils.RunnableX;
import org.apache.ignite.internal.util.ArrayUtils;
import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.NetworkMessage;
import org.jetbrains.annotations.Nullable;
@@ -216,7 +216,7 @@ public class ExecutionServiceImplTest {
.build()
);
} catch (IgniteInternalCheckedException e) {
- sneakyThrow(e);
+ throw new IgniteInternalException(e);
}
} else {
original.onMessage(nodeId, msg);
@@ -528,7 +528,7 @@ public class ExecutionServiceImplTest {
try {
task.run();
} catch (Throwable ex) {
- sneakyThrow(ex);
+ throw new
IgniteInternalException(ex);
}
}
}
@@ -605,4 +605,4 @@ public class ExecutionServiceImplTest {
}
};
}
-}
\ No newline at end of file
+}