This is an automated email from the ASF dual-hosted git repository. tledkov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 45619b1c3 IGNITE-16963 SQL API: Add batched DML queries support (#843) 45619b1c3 is described below commit 45619b1c3bfe1344a04f4c76842a187aba194677 Author: Taras Ledkov <tled...@gridgain.com> AuthorDate: Thu Jun 9 12:02:02 2022 +0300 IGNITE-16963 SQL API: Add batched DML queries support (#843) --- .../apache/ignite/example/sql/SqlApiExample.java | 2 +- .../apache/ignite/internal/sql/ResultSetImpl.java | 6 +- .../org/apache/ignite/sql/BatchedArguments.java | 9 ++ .../main/java/org/apache/ignite/sql/Session.java | 29 ++++-- .../{SqlException.java => SqlBatchException.java} | 35 ++++--- .../java/org/apache/ignite/sql/SqlException.java | 7 ++ .../ignite/internal/client/sql/ClientSession.java | 17 +--- .../apache/ignite/client/fakes/FakeSession.java | 12 +-- .../internal/testframework/IgniteTestUtils.java | 54 +++++++++++ .../internal/sql/api/ItSqlAsynchronousApiTest.java | 76 ++++++++++++++- .../internal/sql/api/ItSqlSynchronousApiTest.java | 75 +++++++++++++- .../internal/sql/api/AsyncResultSetImpl.java | 3 +- .../internal/sql/api/IgniteSqlException.java | 70 ------------- .../ignite/internal/sql/api/SessionImpl.java | 108 +++++++++++++++++---- .../ignite/internal/sql/engine/QueryContext.java | 2 +- .../internal/sql/engine/SqlQueryProcessor.java | 4 +- .../internal/sql/engine/IgniteSqlApiTest.java | 8 +- 17 files changed, 369 insertions(+), 148 deletions(-) diff --git a/examples/src/main/java/org/apache/ignite/example/sql/SqlApiExample.java b/examples/src/main/java/org/apache/ignite/example/sql/SqlApiExample.java index feeb79189..502de66e6 100644 --- a/examples/src/main/java/org/apache/ignite/example/sql/SqlApiExample.java +++ b/examples/src/main/java/org/apache/ignite/example/sql/SqlApiExample.java @@ -113,7 +113,7 @@ public class SqlApiExample { .add(2, 1, "Jane", "Roe", 2000.0d) .add(3, 1, "Mary", "Major", 1500.0d) .add(4, 1, "Richard", "Miles", 1450.0d))) - .asLongStream().sum(); + .sum(); System.out.println("\nAdded accounts: " + rowsAdded); diff --git a/modules/api/src/main/java/org/apache/ignite/internal/sql/ResultSetImpl.java b/modules/api/src/main/java/org/apache/ignite/internal/sql/ResultSetImpl.java index 28ae4cd4b..6c774ffe9 100644 --- a/modules/api/src/main/java/org/apache/ignite/internal/sql/ResultSetImpl.java +++ b/modules/api/src/main/java/org/apache/ignite/internal/sql/ResultSetImpl.java @@ -20,9 +20,9 @@ package org.apache.ignite.internal.sql; import java.util.Iterator; import java.util.NoSuchElementException; import java.util.concurrent.CompletionStage; -import org.apache.ignite.lang.IgniteException; import org.apache.ignite.sql.ResultSet; import org.apache.ignite.sql.ResultSetMetadata; +import org.apache.ignite.sql.SqlException; import org.apache.ignite.sql.SqlRow; import org.apache.ignite.sql.async.AsyncResultSet; import org.jetbrains.annotations.Nullable; @@ -83,7 +83,7 @@ public class ResultSetImpl implements ResultSet { @Override public boolean hasNext() { if (it == null) { - throw new IgniteException("There are no results"); + throw new SqlException("There are no results"); } return it.hasNext(); @@ -93,7 +93,7 @@ public class ResultSetImpl implements ResultSet { @Override public SqlRow next() { if (it == null) { - throw new IgniteException("There are no results"); + throw new SqlException("There are no results"); } return it.next(); diff --git a/modules/api/src/main/java/org/apache/ignite/sql/BatchedArguments.java b/modules/api/src/main/java/org/apache/ignite/sql/BatchedArguments.java index 20949084a..e991c49db 100644 --- a/modules/api/src/main/java/org/apache/ignite/sql/BatchedArguments.java +++ b/modules/api/src/main/java/org/apache/ignite/sql/BatchedArguments.java @@ -29,6 +29,15 @@ import java.util.List; * TODO: add named arguments support. */ public class BatchedArguments extends ArrayList<List<Object>> implements List<List<Object>> { + /** + * Creates batched arguments. + * + * @return Batch query arguments. + */ + public static BatchedArguments create() { + return new BatchedArguments(); + } + /** * Creates batched arguments. * diff --git a/modules/api/src/main/java/org/apache/ignite/sql/Session.java b/modules/api/src/main/java/org/apache/ignite/sql/Session.java index fc3753839..fb4a612a6 100644 --- a/modules/api/src/main/java/org/apache/ignite/sql/Session.java +++ b/modules/api/src/main/java/org/apache/ignite/sql/Session.java @@ -131,8 +131,16 @@ public interface Session extends AutoCloseable { * @param dmlQuery DML query template. * @param batch Batch of query arguments. * @return Number of rows affected by each query in the batch. + * @throws SqlBatchException If the batch fails. */ - int[] executeBatch(@Nullable Transaction transaction, String dmlQuery, BatchedArguments batch); + default long[] executeBatch(@Nullable Transaction transaction, String dmlQuery, BatchedArguments batch) { + // TODO: IGNITE-17135 fix exception handling. + try { + return executeBatchAsync(transaction, dmlQuery, batch).join(); + } catch (CompletionException e) { + throw new SqlException(e); + } + } /** * Executes batched SQL query. Only DML queries are supported. @@ -141,8 +149,9 @@ public interface Session extends AutoCloseable { * @param dmlStatement DML statement to execute. * @param batch Batch of query arguments. * @return Number of rows affected by each query in the batch. + * @throws SqlBatchException If the batch fails. */ - int[] executeBatch(@Nullable Transaction transaction, Statement dmlStatement, BatchedArguments batch); + long[] executeBatch(@Nullable Transaction transaction, Statement dmlStatement, BatchedArguments batch); /** * Executes batched SQL query in an asynchronous way. @@ -150,10 +159,10 @@ public interface Session extends AutoCloseable { * @param transaction Transaction to execute the statement within or {@code null}. * @param query SQL query template. * @param batch List of batch rows, where each row is a list of statement arguments. - * @return Operation future. - * @throws SqlException If failed. + * @return Operation future completed with number of rows affected by each query in the batch on batch success, + * if the batch fails the future completed with the {@link SqlBatchException}. */ - CompletableFuture<int[]> executeBatchAsync(@Nullable Transaction transaction, String query, BatchedArguments batch); + CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction transaction, String query, BatchedArguments batch); /** * Executes batched SQL query in an asynchronous way. @@ -161,10 +170,10 @@ public interface Session extends AutoCloseable { * @param transaction Transaction to execute the statement within or {@code null}. * @param statement SQL statement to execute. * @param batch List of batch rows, where each row is a list of statement arguments. - * @return Operation future. - * @throws SqlException If failed. + * @return Operation future completed with number of rows affected by each query in the batch on batch success, + * if the batch fails the future completed with the {@link SqlBatchException}. */ - CompletableFuture<int[]> executeBatchAsync(@Nullable Transaction transaction, Statement statement, BatchedArguments batch); + CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction transaction, Statement statement, BatchedArguments batch); /** * Executes batched SQL query in a reactive way. @@ -175,7 +184,7 @@ public interface Session extends AutoCloseable { * @return Publisher for the number of rows affected by the query. * @throws SqlException If failed. */ - Flow.Publisher<Integer> executeBatchReactive(@Nullable Transaction transaction, String query, BatchedArguments batch); + Flow.Publisher<Long> executeBatchReactive(@Nullable Transaction transaction, String query, BatchedArguments batch); /** * Executes batched SQL query in a reactive way. @@ -186,7 +195,7 @@ public interface Session extends AutoCloseable { * @return Publisher for the number of rows affected by the query. * @throws SqlException If failed. */ - Flow.Publisher<Integer> executeBatchReactive(@Nullable Transaction transaction, Statement statement, BatchedArguments batch); + Flow.Publisher<Long> executeBatchReactive(@Nullable Transaction transaction, Statement statement, BatchedArguments batch); /** * Executes multi-statement SQL query. diff --git a/modules/api/src/main/java/org/apache/ignite/sql/SqlException.java b/modules/api/src/main/java/org/apache/ignite/sql/SqlBatchException.java similarity index 50% copy from modules/api/src/main/java/org/apache/ignite/sql/SqlException.java copy to modules/api/src/main/java/org/apache/ignite/sql/SqlBatchException.java index 85c801481..3ebf0ef08 100644 --- a/modules/api/src/main/java/org/apache/ignite/sql/SqlException.java +++ b/modules/api/src/main/java/org/apache/ignite/sql/SqlBatchException.java @@ -17,38 +17,37 @@ package org.apache.ignite.sql; -import org.apache.ignite.lang.IgniteException; -import org.jetbrains.annotations.Nullable; +import org.apache.ignite.internal.util.ArrayUtils; /** - * SQL exception base class. + * The subclass of {@link SqlException} thrown when an error occurs during a batch update operation. In addition to the + * information provided by {@link SqlException}, a <code>SqlBatchException</code> provides the update + * counts for all commands that were executed successfully during the batch update, that is, + * all commands that were executed before the error occurred. The order of elements in an array of update counts + * corresponds to the order in which commands were added to the batch. + * */ -public class SqlException extends IgniteException { - /** - * Creates a new exception with the given error message. - * - * @param msg Error message. - */ - public SqlException(String msg) { - super(msg); - } +public class SqlBatchException extends SqlException { + private final long[] updCntrs; /** * Creates a new grid exception with the given throwable as a cause and source of error message. * + * @param updCntrs Array that describes the outcome of a batch execution. * @param cause Non-null throwable cause. */ - public SqlException(Throwable cause) { + public SqlBatchException(long[] updCntrs, Throwable cause) { super(cause); + + this.updCntrs = updCntrs != null ? updCntrs : ArrayUtils.LONG_EMPTY_ARRAY; } /** - * Creates a new exception with the given error message and optional nested exception. + * Returns the array that describes the outcome of a batch execution. * - * @param msg Error message. - * @param cause Optional nested exception (can be {@code null}). + * @return Array that describes the outcome of a batch execution. */ - public SqlException(String msg, @Nullable Throwable cause) { - super(msg, cause); + public long[] updateCounters() { + return updCntrs; } } diff --git a/modules/api/src/main/java/org/apache/ignite/sql/SqlException.java b/modules/api/src/main/java/org/apache/ignite/sql/SqlException.java index 85c801481..76e360b07 100644 --- a/modules/api/src/main/java/org/apache/ignite/sql/SqlException.java +++ b/modules/api/src/main/java/org/apache/ignite/sql/SqlException.java @@ -24,6 +24,13 @@ import org.jetbrains.annotations.Nullable; * SQL exception base class. */ public class SqlException extends IgniteException { + /** + * Empty constructor for subclasses. + */ + protected SqlException() { + // No-op. + } + /** * Creates a new exception with the given error message. * diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java index 455b9dcb1..c0de66705 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java @@ -142,42 +142,35 @@ public class ClientSession implements Session { /** {@inheritDoc} */ @Override - public int[] executeBatch(@Nullable Transaction transaction, String dmlQuery, BatchedArguments batch) { + public long[] executeBatch(@Nullable Transaction transaction, Statement dmlStatement, BatchedArguments batch) { // TODO IGNITE-17059. throw new UnsupportedOperationException("Not implemented yet."); } /** {@inheritDoc} */ @Override - public int[] executeBatch(@Nullable Transaction transaction, Statement dmlStatement, BatchedArguments batch) { + public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction transaction, String query, BatchedArguments batch) { // TODO IGNITE-17059. throw new UnsupportedOperationException("Not implemented yet."); } /** {@inheritDoc} */ @Override - public CompletableFuture<int[]> executeBatchAsync(@Nullable Transaction transaction, String query, BatchedArguments batch) { + public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction transaction, Statement statement, BatchedArguments batch) { // TODO IGNITE-17059. throw new UnsupportedOperationException("Not implemented yet."); } /** {@inheritDoc} */ @Override - public CompletableFuture<int[]> executeBatchAsync(@Nullable Transaction transaction, Statement statement, BatchedArguments batch) { - // TODO IGNITE-17059. - throw new UnsupportedOperationException("Not implemented yet."); - } - - /** {@inheritDoc} */ - @Override - public Publisher<Integer> executeBatchReactive(@Nullable Transaction transaction, String query, BatchedArguments batch) { + public Publisher<Long> executeBatchReactive(@Nullable Transaction transaction, String query, BatchedArguments batch) { // TODO IGNITE-17058. throw new UnsupportedOperationException("Not implemented yet."); } /** {@inheritDoc} */ @Override - public Publisher<Integer> executeBatchReactive(@Nullable Transaction transaction, Statement statement, BatchedArguments batch) { + public Publisher<Long> executeBatchReactive(@Nullable Transaction transaction, Statement statement, BatchedArguments batch) { // TODO IGNITE-17058. throw new UnsupportedOperationException("Not implemented yet."); } diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSession.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSession.java index bed8390a2..809f6a8d2 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSession.java +++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSession.java @@ -99,37 +99,37 @@ public class FakeSession implements Session { /** {@inheritDoc} */ @Override - public int[] executeBatch(@Nullable Transaction transaction, String dmlQuery, BatchedArguments batch) { + public long[] executeBatch(@Nullable Transaction transaction, String dmlQuery, BatchedArguments batch) { throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override - public int[] executeBatch(@Nullable Transaction transaction, Statement dmlStatement, BatchedArguments batch) { + public long[] executeBatch(@Nullable Transaction transaction, Statement dmlStatement, BatchedArguments batch) { throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override - public CompletableFuture<int[]> executeBatchAsync(@Nullable Transaction transaction, String query, BatchedArguments batch) { + public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction transaction, String query, BatchedArguments batch) { throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override - public CompletableFuture<int[]> executeBatchAsync(@Nullable Transaction transaction, Statement statement, BatchedArguments batch) { + public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction transaction, Statement statement, BatchedArguments batch) { throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override - public Publisher<Integer> executeBatchReactive(@Nullable Transaction transaction, String query, BatchedArguments batch) { + public Publisher<Long> executeBatchReactive(@Nullable Transaction transaction, String query, BatchedArguments batch) { throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override - public Publisher<Integer> executeBatchReactive(@Nullable Transaction transaction, Statement statement, BatchedArguments batch) { + public Publisher<Long> executeBatchReactive(@Nullable Transaction transaction, Statement statement, BatchedArguments batch) { throw new UnsupportedOperationException(); } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java b/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java index 53f261f5b..46f9ecc83 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java @@ -330,6 +330,60 @@ public final class IgniteTestUtils { return false; } + /** + * Checks if passed in {@code 'Throwable'} has given class in {@code 'cause'} hierarchy + * <b>including</b> that throwable itself. + * + * <p>Note that this method follows includes {@link Throwable#getSuppressed()} into check. + * + * @param t Throwable to check. + * @param cls Cause classes to check. + * @return reference to the cause error if found, otherwise returns {@code null}. + */ + public static <T extends Throwable> T cause( + @NotNull Throwable t, + @NotNull Class<T> cls + ) { + return cause(t, cls, null); + } + + /** + * Checks if passed in {@code 'Throwable'} has given class in {@code 'cause'} hierarchy + * <b>including</b> that throwable itself. + * + * <p>Note that this method follows includes {@link Throwable#getSuppressed()} into check. + * + * @param t Throwable to check. + * @param cls Cause classes to check. + * @param msg Message text that should be in cause (if {@code null}, message won't be checked). + * @return reference to the cause error if found, otherwise returns {@code null}. + */ + public static <T extends Throwable> T cause( + @NotNull Throwable t, + @NotNull Class<T> cls, + @Nullable String msg + ) { + for (Throwable th = t; th != null; th = th.getCause()) { + if (cls.isAssignableFrom(th.getClass())) { + if (msg != null) { + if (th.getMessage() != null && th.getMessage().contains(msg)) { + return (T) th; + } else { + continue; + } + } + + return (T) th; + } + + if (th.getCause() == th) { + break; + } + } + + return null; + } + /** * Runs runnable task asyncronously. * diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java index af0edc6a4..6ddc88719 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java @@ -19,6 +19,8 @@ package org.apache.ignite.internal.sql.api; import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause; import static org.apache.ignite.internal.testframework.IgniteTestUtils.await; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.cause; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.hasCause; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; @@ -28,6 +30,7 @@ import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Set; @@ -36,6 +39,7 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.Stream; import java.util.stream.StreamSupport; import org.apache.ignite.internal.sql.api.ColumnMetadataImpl.ColumnOriginImpl; @@ -50,10 +54,13 @@ import org.apache.ignite.lang.IgniteInternalException; import org.apache.ignite.lang.IndexAlreadyExistsException; import org.apache.ignite.lang.TableAlreadyExistsException; import org.apache.ignite.lang.TableNotFoundException; +import org.apache.ignite.sql.BatchedArguments; import org.apache.ignite.sql.IgniteSql; import org.apache.ignite.sql.ResultSetMetadata; import org.apache.ignite.sql.Session; +import org.apache.ignite.sql.SqlBatchException; import org.apache.ignite.sql.SqlColumnType; +import org.apache.ignite.sql.SqlException; import org.apache.ignite.sql.SqlRow; import org.apache.ignite.sql.async.AsyncResultSet; import org.apache.ignite.table.Table; @@ -330,7 +337,7 @@ public class ItSqlAsynchronousApiTest extends AbstractBasicIntegrationTest { // Multiple statements error. { CompletableFuture<AsyncResultSet> f = ses.executeAsync(null, "SELECT 1; SELECT 2"); - assertThrowsWithCause(f::get, IgniteSqlException.class, "Multiple statements aren't allowed"); + assertThrowsWithCause(f::get, SqlException.class, "Multiple statements aren't allowed"); } // Planning error. @@ -372,13 +379,78 @@ public class ItSqlAsynchronousApiTest extends AbstractBasicIntegrationTest { assertThrowsWithCause( () -> ses.executeAsync(null, "SELECT ID FROM TEST").get(), - IgniteSqlException.class, + SqlException.class, "Session is closed" ); checkSession(ses); } + @Test + public void batch() { + sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); + + IgniteSql sql = CLUSTER_NODES.get(0).sql(); + Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 2).build(); + + BatchedArguments args = BatchedArguments.of(0, 0); + + for (int i = 1; i < ROW_COUNT; ++i) { + args.add(i, i); + } + + long[] batchRes = ses.executeBatchAsync(null, "INSERT INTO TEST VALUES (?, ?)", args).join(); + + Arrays.stream(batchRes).forEach(r -> assertEquals(1L, r)); + + // Check that data are inserted OK + List<List<Object>> res = sql("SELECT ID FROM TEST ORDER BY ID"); + IntStream.range(0, ROW_COUNT).forEach(i -> assertEquals(i, res.get(i).get(0))); + + // Check invalid query type + assertThrowsWithCause( + () -> ses.executeBatchAsync(null, "SELECT * FROM TEST", args).get(), + SqlException.class, + "Invalid SQL statement type in the batch" + ); + + assertThrowsWithCause( + () -> ses.executeBatchAsync(null, "CREATE TABLE TEST1(ID INT PRIMARY KEY, VAL0 INT)", args).get(), + SqlException.class, + "Invalid SQL statement type in the batch" + ); + } + + @Test + public void batchIncomplete() { + int err = ROW_COUNT / 2; + + sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); + + IgniteSql sql = CLUSTER_NODES.get(0).sql(); + Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 2).build(); + + BatchedArguments args = BatchedArguments.of(0, 0); + + for (int i = 1; i < ROW_COUNT; ++i) { + if (i == err) { + args.add(1, 1); + } else { + args.add(i, i); + } + } + + Throwable ex = assertThrowsWithCause( + () -> ses.executeBatchAsync(null, "INSERT INTO TEST VALUES (?, ?)", args).join(), + SqlBatchException.class + ); + assertTrue(hasCause(ex, IgniteInternalException.class, "Failed to INSERT some keys because they are already in cache")); + SqlBatchException batchEx = cause(ex, SqlBatchException.class, null); + + assertEquals(err, batchEx.updateCounters().length); + IntStream.range(0, batchEx.updateCounters().length).forEach(i -> assertEquals(1, batchEx.updateCounters()[i])); + } + private static void checkDdl(boolean expectedApplied, Session ses, String sql) throws ExecutionException, InterruptedException { CompletableFuture<AsyncResultSet> fut = ses.executeAsync( null, diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java index 082dfc93c..2643ed179 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java @@ -18,14 +18,19 @@ package org.apache.ignite.internal.sql.api; import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.cause; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.hasCause; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import com.google.common.collect.Streams; +import java.util.Arrays; +import java.util.List; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.ignite.internal.sql.engine.AbstractBasicIntegrationTest; import org.apache.ignite.lang.ColumnAlreadyExistsException; import org.apache.ignite.lang.ColumnNotFoundException; @@ -34,9 +39,12 @@ import org.apache.ignite.lang.IgniteInternalException; import org.apache.ignite.lang.IndexAlreadyExistsException; import org.apache.ignite.lang.TableAlreadyExistsException; import org.apache.ignite.lang.TableNotFoundException; +import org.apache.ignite.sql.BatchedArguments; import org.apache.ignite.sql.IgniteSql; import org.apache.ignite.sql.ResultSet; import org.apache.ignite.sql.Session; +import org.apache.ignite.sql.SqlBatchException; +import org.apache.ignite.sql.SqlException; import org.apache.ignite.table.Table; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Disabled; @@ -199,7 +207,7 @@ public class ItSqlSynchronousApiTest extends AbstractBasicIntegrationTest { // Multiple statements error. assertThrowsWithCause( () -> ses.execute(null, "SELECT 1; SELECT 2"), - IgniteSqlException.class, + SqlException.class, "Multiple statements aren't allowed" ); @@ -218,6 +226,71 @@ public class ItSqlSynchronousApiTest extends AbstractBasicIntegrationTest { ); } + @Test + public void batch() { + sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); + + IgniteSql sql = CLUSTER_NODES.get(0).sql(); + Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 2).build(); + + BatchedArguments args = BatchedArguments.of(0, 0); + + for (int i = 1; i < ROW_COUNT; ++i) { + args.add(i, i); + } + + long[] batchRes = ses.executeBatch(null, "INSERT INTO TEST VALUES (?, ?)", args); + + Arrays.stream(batchRes).forEach(r -> assertEquals(1L, r)); + + // Check that data are inserted OK + List<List<Object>> res = sql("SELECT ID FROM TEST ORDER BY ID"); + IntStream.range(0, ROW_COUNT).forEach(i -> assertEquals(i, res.get(i).get(0))); + + // Check invalid query type + assertThrowsWithCause( + () -> ses.executeBatch(null, "SELECT * FROM TEST", args), + SqlException.class, + "Invalid SQL statement type in the batch" + ); + + assertThrowsWithCause( + () -> ses.executeBatch(null, "CREATE TABLE TEST1(ID INT PRIMARY KEY, VAL0 INT)", args), + SqlException.class, + "Invalid SQL statement type in the batch" + ); + } + + @Test + public void batchIncomplete() { + int err = ROW_COUNT / 2; + + sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); + + IgniteSql sql = CLUSTER_NODES.get(0).sql(); + Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 2).build(); + + BatchedArguments args = BatchedArguments.of(0, 0); + + for (int i = 1; i < ROW_COUNT; ++i) { + if (i == err) { + args.add(1, 1); + } else { + args.add(i, i); + } + } + + Throwable ex = assertThrowsWithCause( + () -> ses.executeBatch(null, "INSERT INTO TEST VALUES (?, ?)", args), + SqlBatchException.class + ); + assertTrue(hasCause(ex, IgniteInternalException.class, "Failed to INSERT some keys because they are already in cache")); + SqlBatchException batchEx = cause(ex, SqlBatchException.class, null); + + assertEquals(err, batchEx.updateCounters().length); + IntStream.range(0, batchEx.updateCounters().length).forEach(i -> assertEquals(1, batchEx.updateCounters()[i])); + } + private static void checkDdl(boolean expectedApplied, Session ses, String sql) { ResultSet res = ses.execute( null, diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java index 5a0703f96..5a6629d47 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java @@ -34,6 +34,7 @@ import org.apache.ignite.internal.sql.engine.SqlQueryType; import org.apache.ignite.internal.sql.engine.util.TransformingIterator; import org.apache.ignite.sql.NoRowSetExpectedException; import org.apache.ignite.sql.ResultSetMetadata; +import org.apache.ignite.sql.SqlException; import org.apache.ignite.sql.SqlRow; import org.apache.ignite.sql.async.AsyncResultSet; import org.apache.ignite.table.Tuple; @@ -45,7 +46,7 @@ import org.jetbrains.annotations.Nullable; */ public class AsyncResultSetImpl implements AsyncResultSet { private static final CompletableFuture<? extends AsyncResultSet> HAS_NO_MORE_PAGE_FUTURE = - CompletableFuture.failedFuture(new IgniteSqlException("No more pages.")); + CompletableFuture.failedFuture(new SqlException("There are no more pages.")); private final AsyncSqlCursor<List<Object>> cur; diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlException.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlException.java deleted file mode 100644 index 72c510f21..000000000 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlException.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.sql.api; - -import org.apache.ignite.lang.IgniteException; -import org.jetbrains.annotations.Nullable; - -/** - * Ignite SQL exception. - */ -public class IgniteSqlException extends IgniteException { - /** Serial version uid. */ - private static final long serialVersionUID = 0L; - - /** - * Creates an empty exception. - */ - public IgniteSqlException() { - // No-op. - } - - /** - * Creates a new exception with the given error message. - * - * @param msg Error message. - */ - public IgniteSqlException(String msg) { - super(msg); - } - - /** - * Creates a new grid exception with the given throwable as a cause and source of error message. - * - * @param cause Non-null throwable cause. - */ - public IgniteSqlException(Throwable cause) { - this(cause.getMessage(), cause); - } - - /** - * Creates a new exception with the given error message and optional nested exception. - * - * @param msg Error message. - * @param cause Optional nested exception (can be {@code null}). - */ - public IgniteSqlException(String msg, @Nullable Throwable cause) { - super(msg, cause); - } - - /** {@inheritDoc} */ - @Override - public String toString() { - return getClass() + ": " + getMessage(); - } -} diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java index c3d94e60a..4c75caaf1 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java @@ -17,24 +17,35 @@ package org.apache.ignite.internal.sql.api; +import it.unimi.dsi.fastutil.longs.LongArrayList; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Flow.Publisher; import java.util.concurrent.TimeUnit; import org.apache.ignite.internal.sql.engine.AsyncCursor; +import org.apache.ignite.internal.sql.engine.AsyncCursor.BatchedResult; import org.apache.ignite.internal.sql.engine.AsyncSqlCursor; import org.apache.ignite.internal.sql.engine.QueryContext; import org.apache.ignite.internal.sql.engine.QueryProcessor; import org.apache.ignite.internal.sql.engine.QueryTimeout; +import org.apache.ignite.internal.sql.engine.QueryValidator; +import org.apache.ignite.internal.sql.engine.prepare.QueryPlan.Type; +import org.apache.ignite.internal.util.ArrayUtils; import org.apache.ignite.internal.util.IgniteSpinBusyLock; import org.apache.ignite.lang.IgniteException; import org.apache.ignite.sql.BatchedArguments; import org.apache.ignite.sql.Session; +import org.apache.ignite.sql.SqlBatchException; +import org.apache.ignite.sql.SqlException; import org.apache.ignite.sql.Statement; import org.apache.ignite.sql.async.AsyncResultSet; import org.apache.ignite.sql.reactive.ReactiveResultSet; @@ -56,7 +67,7 @@ public class SessionImpl implements Session { private final int pageSize; - private final Set<CompletableFuture<AsyncSqlCursor<List<Object>>>> futsToClose = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final Set<CompletableFuture<?>> futsToClose = Collections.newSetFromMap(new ConcurrentHashMap<>()); private final Set<AsyncSqlCursor<List<Object>>> cursToClose = Collections.newSetFromMap(new ConcurrentHashMap<>()); @@ -87,13 +98,7 @@ public class SessionImpl implements Session { /** {@inheritDoc} */ @Override - public int[] executeBatch(@Nullable Transaction transaction, String dmlQuery, BatchedArguments batch) { - throw new UnsupportedOperationException("Not implemented yet."); - } - - /** {@inheritDoc} */ - @Override - public int[] executeBatch(@Nullable Transaction transaction, Statement dmlStatement, BatchedArguments batch) { + public long[] executeBatch(@Nullable Transaction transaction, Statement dmlStatement, BatchedArguments batch) { throw new UnsupportedOperationException("Not implemented yet."); } @@ -131,11 +136,11 @@ public class SessionImpl implements Session { @Override public SessionBuilder toBuilder() { if (!busyLock.enterBusy()) { - throw new IgniteSqlException("Session is closed"); + throw new SqlException("Session is closed"); } try { - return new SessionBuilderImpl(qryProc, props) + return new SessionBuilderImpl(qryProc, new HashMap<>(props)) .defaultPageSize(pageSize) .defaultTimeout(timeout, TimeUnit.NANOSECONDS) .defaultSchema(schema); @@ -148,7 +153,7 @@ public class SessionImpl implements Session { @Override public CompletableFuture<AsyncResultSet> executeAsync(@Nullable Transaction transaction, String query, @Nullable Object... arguments) { if (!busyLock.enterBusy()) { - return CompletableFuture.failedFuture(new IgniteSqlException("Session is closed.")); + return CompletableFuture.failedFuture(new SqlException("Session is closed.")); } try { @@ -163,7 +168,7 @@ public class SessionImpl implements Session { .thenCompose(cur -> { if (!busyLock.enterBusy()) { return cur.closeAsync() - .thenCompose((v) -> CompletableFuture.failedFuture(new IgniteSqlException("Session is closed"))); + .thenCompose((v) -> CompletableFuture.failedFuture(new SqlException("Session is closed"))); } try { @@ -215,13 +220,70 @@ public class SessionImpl implements Session { /** {@inheritDoc} */ @Override - public CompletableFuture<int[]> executeBatchAsync(@Nullable Transaction transaction, String query, BatchedArguments batch) { - throw new UnsupportedOperationException("Not implemented yet."); + public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction transaction, String query, BatchedArguments batch) { + if (!busyLock.enterBusy()) { + return CompletableFuture.failedFuture(new SqlException("Session is closed.")); + } + + try { + QueryContext ctx = QueryContext.of( + transaction, + new QueryTimeout(timeout, TimeUnit.NANOSECONDS), + (QueryValidator) plan -> { + if (plan.type() != Type.DML) { + throw new SqlException("Invalid SQL statement type in the batch [plan=" + plan + ']'); + } + } + ); + + final var counters = new LongArrayList(batch.size()); + CompletableFuture<Void> tail = CompletableFuture.completedFuture(null); + ArrayList<CompletableFuture<Void>> batchFuts = new ArrayList<>(batch.size()); + + for (int i = 0; i < batch.size(); ++i) { + Object[] args = batch.get(i).toArray(); + + final var qryFut = tail + .thenCompose(v -> qryProc.querySingleAsync(ctx, schema, query, args)); + + tail = qryFut.thenCompose(cur -> cur.requestNextAsync(1)) + .thenAccept(page -> { + validateDmlResult(page); + + counters.add((long) page.items().get(0).get(0)); + }) + .whenComplete((v, ex) -> { + if (ex instanceof CancellationException) { + qryFut.cancel(false); + } + }); + + batchFuts.add(tail); + } + + CompletableFuture<long[]> resFut = tail + .exceptionally((ex) -> { + throw new SqlBatchException(counters.toArray(ArrayUtils.LONG_EMPTY_ARRAY), ex); + }) + .thenApply(v -> counters.toArray(ArrayUtils.LONG_EMPTY_ARRAY)); + + resFut.whenComplete((cur, ex) -> { + if (ex instanceof CancellationException) { + batchFuts.forEach(f -> f.cancel(false)); + } + }); + + return resFut; + } catch (Exception e) { + return CompletableFuture.failedFuture(e); + } finally { + busyLock.leaveBusy(); + } } /** {@inheritDoc} */ @Override - public CompletableFuture<int[]> executeBatchAsync(@Nullable Transaction transaction, Statement statement, BatchedArguments batch) { + public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction transaction, Statement statement, BatchedArguments batch) { throw new UnsupportedOperationException("Not implemented yet."); } @@ -245,13 +307,13 @@ public class SessionImpl implements Session { /** {@inheritDoc} */ @Override - public Publisher<Integer> executeBatchReactive(@Nullable Transaction transaction, String query, BatchedArguments batch) { + public Publisher<Long> executeBatchReactive(@Nullable Transaction transaction, String query, BatchedArguments batch) { throw new UnsupportedOperationException("Not implemented yet."); } /** {@inheritDoc} */ @Override - public Publisher<Integer> executeBatchReactive(@Nullable Transaction transaction, Statement statement, BatchedArguments batch) { + public Publisher<Long> executeBatchReactive(@Nullable Transaction transaction, Statement statement, BatchedArguments batch) { throw new UnsupportedOperationException("Not implemented yet."); } @@ -295,8 +357,20 @@ public class SessionImpl implements Session { public static <T> T await(CompletionStage<T> stage) { try { return stage.toCompletableFuture().get(); + } catch (ExecutionException e) { + throw new IgniteException(e.getCause()); } catch (Throwable e) { throw new IgniteException(e); } } + + private static void validateDmlResult(AsyncCursor.BatchedResult<List<Object>> page) { + if (page == null + || page.items() == null + || page.items().size() != 1 + || page.items().get(0).size() != 1 + || page.hasMore()) { + throw new SqlException("Invalid DML results: " + page); + } + } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryContext.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryContext.java index ee6c133d6..ab6e408aa 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryContext.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryContext.java @@ -35,7 +35,7 @@ public class QueryContext implements Context { * Constructor. * * @param params Context params. - * */ + */ private QueryContext(Object[] params) { this.params = params; } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java index 11394e6ef..2492082b8 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java @@ -37,7 +37,6 @@ import org.apache.calcite.sql.SqlNodeList; import org.apache.calcite.tools.Frameworks; import org.apache.calcite.util.Pair; import org.apache.ignite.internal.manager.EventListener; -import org.apache.ignite.internal.sql.api.IgniteSqlException; import org.apache.ignite.internal.sql.engine.exec.ArrayRowHandler; import org.apache.ignite.internal.sql.engine.exec.ExchangeServiceImpl; import org.apache.ignite.internal.sql.engine.exec.ExecutionService; @@ -63,6 +62,7 @@ import org.apache.ignite.lang.IgniteInternalException; import org.apache.ignite.lang.IgniteLogger; import org.apache.ignite.lang.NodeStoppingException; import org.apache.ignite.network.ClusterService; +import org.apache.ignite.sql.SqlException; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -274,7 +274,7 @@ public class SqlQueryProcessor implements QueryProcessor { ) .thenApply(nodes -> { if (nodes.size() > 1) { - throw new IgniteSqlException("Multiple statements aren't allowed."); + throw new SqlException("Multiple statements aren't allowed."); } return nodes.get(0); diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/IgniteSqlApiTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/IgniteSqlApiTest.java index 85f7d2d27..cdfbb80af 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/IgniteSqlApiTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/IgniteSqlApiTest.java @@ -142,7 +142,7 @@ public class IgniteSqlApiTest { assertFalse(rs.hasRowSet()); // Execute batched DML query. - int[] res = sess.executeBatch(null, "INSERT INTO tbl VALUES (?, ?)", + long[] res = sess.executeBatch(null, "INSERT INTO tbl VALUES (?, ?)", BatchedArguments.of(2, "str2").add(3, "str3").add(4, "str4")); assertEquals(3, res.length); @@ -183,7 +183,7 @@ public class IgniteSqlApiTest { assertEquals(1, rs.affectedRows()); // Execute batched DML query. - int[] res = sess.executeBatch(tx, "INSERT INTO tbl VALUES (?, ?)", + long[] res = sess.executeBatch(tx, "INSERT INTO tbl VALUES (?, ?)", BatchedArguments.of(2, "str2").add(3, "str3").add(4, "str4")); assertTrue(Arrays.stream(res).allMatch(i -> i == 1)); @@ -470,8 +470,8 @@ public class IgniteSqlApiTest { args.forEach(a -> state(ans.getArgument(0)).put((Integer) a.get(0), (String) a.get(1))); - int[] res = new int[args.size()]; - Arrays.fill(res, 1); + long[] res = new long[args.size()]; + Arrays.fill(res, 1L); return res; });