This is an automated email from the ASF dual-hosted git repository. korlov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new c44bbbb229 IGNITE-20387: Remap most exceptions to SqlExceptions for SQL API (#2613) c44bbbb229 is described below commit c44bbbb229d44737fdb99fa29c3a8b4d5e4cc248 Author: ygerzhedovich <41903880+ygerzhedov...@users.noreply.github.com> AuthorDate: Mon Oct 16 16:50:50 2023 +0300 IGNITE-20387: Remap most exceptions to SqlExceptions for SQL API (#2613) --- .../internal/lang/IgniteExceptionMapperUtil.java | 33 +- .../ignite/internal/sql/SyncResultSetAdapter.java | 4 +- .../dotnet/Apache.Ignite.Tests/Sql/SqlTests.cs | 14 +- .../dotnet/Apache.Ignite/Internal/Sql/Sql.cs | 2 +- .../internal/runner/app/ItTablesApiTest.java | 31 +- ...nchronousApiTest.java => ItSqlApiBaseTest.java} | 587 +++++++------- .../internal/sql/api/ItSqlAsynchronousApiTest.java | 851 ++------------------- .../sql/api/ItSqlClientAsynchronousApiTest.java | 6 + .../sql/api/ItSqlClientSynchronousApiTest.java | 24 +- .../internal/sql/api/ItSqlSynchronousApiTest.java | 574 ++------------ .../internal/sql/engine/ItCreateTableDdlTest.java | 6 +- .../internal/lang/SqlExceptionMapperUtil.java | 64 ++ .../ignite/internal/sql/api/SessionImpl.java | 12 +- .../internal/sql/engine/AsyncSqlCursorImpl.java | 4 +- .../sql/engine/prepare/PrepareServiceImpl.java | 4 +- .../internal/lang/SqlExceptionMapperUtilTest.java | 78 ++ .../internal/sql/engine/util/SqlTestUtils.java | 8 + 17 files changed, 623 insertions(+), 1679 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/lang/IgniteExceptionMapperUtil.java b/modules/core/src/main/java/org/apache/ignite/internal/lang/IgniteExceptionMapperUtil.java index e5b43059aa..fbae12e574 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/lang/IgniteExceptionMapperUtil.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/lang/IgniteExceptionMapperUtil.java @@ -52,8 +52,8 @@ public class IgniteExceptionMapperUtil { * * @param mapper Exception mapper from internal exception to a public one. * @param registeredMappings Already registered mappings. - * @throws IgniteException If a mapper for the given {@code clazz} already registered, - * or {@code clazz} represents Java standard exception like {@link NullPointerException}, {@link IllegalArgumentException}. + * @throws IgniteException If a mapper for the given {@code clazz} already registered, or {@code clazz} represents Java standard + * exception like {@link NullPointerException}, {@link IllegalArgumentException}. */ static void registerMapping( IgniteExceptionMapper<?, ?> mapper, @@ -87,23 +87,30 @@ public class IgniteExceptionMapperUtil { if (origin instanceof AssertionError) { return new IgniteException(INTERNAL_ERR, origin); } - return origin; } - IgniteExceptionMapper<? extends Exception, ? extends Exception> m = EXCEPTION_CONVERTERS.get(origin.getClass()); + Throwable res; + + // Try to find appropriate mapper, moving from original class to supper-classes step by step. + Class exceptionClass = origin.getClass(); + IgniteExceptionMapper<? extends Exception, ? extends Exception> m; + while ((m = EXCEPTION_CONVERTERS.get(exceptionClass)) == null && exceptionClass != Throwable.class) { + exceptionClass = exceptionClass.getSuperclass(); + } + if (m != null) { - Exception mapped = map(m, origin); + res = map(m, origin); - assert mapped instanceof IgniteException || mapped instanceof IgniteCheckedException : - "Unexpected mapping of internal exception to a public one [origin=" + origin + ", mapped=" + mapped + ']'; + assert res instanceof IgniteException || res instanceof IgniteCheckedException : + "Unexpected mapping of internal exception to a public one [origin=" + origin + ", mapped=" + res + ']'; - return mapped; + } else { + res = origin; } - if (origin instanceof IgniteException || origin instanceof IgniteCheckedException) { - - return origin; + if (res instanceof IgniteException || res instanceof IgniteCheckedException) { + return res; } // There are no exception mappings for the given exception. This case should be considered as internal error. @@ -111,8 +118,8 @@ public class IgniteExceptionMapperUtil { } /** - * Returns a new CompletableFuture that, when the given {@code origin} future completes exceptionally, - * maps the origin's exception to a public Ignite exception if it is needed. + * Returns a new CompletableFuture that, when the given {@code origin} future completes exceptionally, maps the origin's exception to a + * public Ignite exception if it is needed. * * @param origin The future to use to create a new stage. * @param <T> Type os result. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/sql/SyncResultSetAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/sql/SyncResultSetAdapter.java index 0854513ad3..3acd814f7b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/sql/SyncResultSetAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/sql/SyncResultSetAdapter.java @@ -32,7 +32,7 @@ import org.jetbrains.annotations.Nullable; /** * Synchronous wrapper over {@link org.apache.ignite.sql.async.AsyncResultSet}. */ -class SyncResultSetAdapter<T> implements ResultSet<T> { +public class SyncResultSetAdapter<T> implements ResultSet<T> { /** Wrapped async result set. */ private final AsyncResultSet<T> ars; @@ -44,7 +44,7 @@ class SyncResultSetAdapter<T> implements ResultSet<T> { * * @param ars Asynchronous result set. */ - SyncResultSetAdapter(AsyncResultSet<T> ars) { + public SyncResultSetAdapter(AsyncResultSet<T> ars) { assert ars != null; this.ars = ars; diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Sql/SqlTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Sql/SqlTests.cs index 8b5ff71468..7fa4842d28 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Tests/Sql/SqlTests.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Sql/SqlTests.cs @@ -334,18 +334,14 @@ namespace Apache.Ignite.Tests.Sql public void TestInvalidSqlThrowsException() { var ex = Assert.ThrowsAsync<SqlException>(async () => await Client.Sql.ExecuteAsync(null, "select x from bad")); - StringAssert.Contains("Invalid query, check inner exceptions for details: select x from bad", ex!.Message); - var innerEx = ex.InnerException; - Assert.IsInstanceOf<SqlException>(innerEx); - StringAssert.Contains("From line 1, column 15 to line 1, column 17: Object 'BAD' not found", innerEx!.Message); + StringAssert.Contains("From line 1, column 15 to line 1, column 17: Object 'BAD' not found", ex!.Message); } [Test] public void TestCreateTableExistsThrowsException() { - // TODO: IGNITE-20388 Fix it - var ex = Assert.ThrowsAsync<IgniteException>( + var ex = Assert.ThrowsAsync<SqlException>( async () => await Client.Sql.ExecuteAsync(null, "CREATE TABLE TEST(ID INT PRIMARY KEY)")); StringAssert.Contains("Table with name 'PUBLIC.TEST' already exists", ex!.Message); @@ -354,8 +350,7 @@ namespace Apache.Ignite.Tests.Sql [Test] public void TestAlterTableNotFoundThrowsException() { - // TODO: IGNITE-20388 Fix it - var ex = Assert.ThrowsAsync<IgniteException>( + var ex = Assert.ThrowsAsync<SqlException>( async () => await Client.Sql.ExecuteAsync(null, "ALTER TABLE NOT_EXISTS_TABLE ADD COLUMN VAL1 VARCHAR")); StringAssert.Contains("Table with name 'PUBLIC.NOT_EXISTS_TABLE' not found", ex!.Message); @@ -364,11 +359,10 @@ namespace Apache.Ignite.Tests.Sql [Test] public void TestAlterTableColumnExistsThrowsException() { - // TODO: IGNITE-20388 Fix it var ex = Assert.ThrowsAsync<SqlException>( async () => await Client.Sql.ExecuteAsync(null, "ALTER TABLE TEST ADD COLUMN ID INT")); - StringAssert.Contains("Invalid query, check inner exceptions for details: ALTER TABLE TEST ADD COLUMN ID INT", ex!.Message); + StringAssert.Contains("Failed to validate query. Column with name 'ID' already exists", ex!.Message); } [Test] diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs index ddc3735734..cac8f1529b 100644 --- a/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs +++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs @@ -144,7 +144,7 @@ namespace Apache.Ignite.Internal.Sql // ResultSet will dispose the pooled buffer. return new ResultSet<T>(socket, buf, rowReaderFactory); } - catch (SqlException e) when (e.Code == ErrorGroups.Sql.StmtValidation || e.Code == ErrorGroups.Sql.StmtParse) + catch (SqlException e) when (e.Code == ErrorGroups.Sql.StmtParse) { throw new SqlException( e.TraceId, diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java index 7a4c8ff7c3..8b72c52abf 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.runner.app; import static java.util.concurrent.CompletableFuture.runAsync; import static java.util.concurrent.CompletableFuture.supplyAsync; +import static org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException; import static org.apache.ignite.internal.test.WatchListenerInhibitor.metastorageEventsInhibitor; import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause; import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName; @@ -42,7 +43,6 @@ import org.apache.ignite.InitParameters; import org.apache.ignite.internal.catalog.CatalogValidationException; import org.apache.ignite.internal.catalog.IndexExistsValidationException; import org.apache.ignite.internal.catalog.TableExistsValidationException; -import org.apache.ignite.internal.catalog.TableNotFoundValidationException; import org.apache.ignite.internal.lang.NodeStoppingException; import org.apache.ignite.internal.table.IgniteTablesInternal; import org.apache.ignite.internal.table.TableImpl; @@ -50,6 +50,7 @@ import org.apache.ignite.internal.test.WatchListenerInhibitor; import org.apache.ignite.internal.testframework.IgniteAbstractTest; import org.apache.ignite.internal.testframework.TestIgnitionManager; import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.lang.ErrorGroups.Sql; import org.apache.ignite.sql.Session; import org.apache.ignite.table.Table; import org.apache.ignite.table.Tuple; @@ -140,8 +141,10 @@ public class ItTablesApiTest extends IgniteAbstractTest { Table tbl = createTable(ignite0, TABLE_NAME); - // TODO: IGNITE-20388 Fix it - assertThrowsWithCause(() -> createTable(ignite0, TABLE_NAME), TableExistsValidationException.class); + assertThrowsSqlException( + Sql.STMT_VALIDATION_ERR, + "Table with name 'PUBLIC.TBL1' already exists", + () -> createTable(ignite0, TABLE_NAME)); assertEquals(tbl, createTableIfNotExists(ignite0, TABLE_NAME)); } @@ -152,7 +155,7 @@ public class ItTablesApiTest extends IgniteAbstractTest { * @throws Exception If failed. */ @Test - public void testTableAlreadyCreatedFromLaggedNode() throws Exception { + public void testTableAlreadyCreatedFromLaggedNode() { clusterNodes.forEach(ign -> assertNull(ign.tables().table(TABLE_NAME))); Ignite ignite0 = clusterNodes.get(0); @@ -170,8 +173,10 @@ public class ItTablesApiTest extends IgniteAbstractTest { for (Ignite ignite : clusterNodes) { if (ignite != ignite1) { - // TODO: IGNITE-20388 Fix it - assertThrowsWithCause(() -> createTable(ignite, TABLE_NAME), TableExistsValidationException.class); + assertThrowsSqlException( + Sql.STMT_VALIDATION_ERR, + "Table with name 'PUBLIC.TBL1' already exists", + () -> createTable(ignite, TABLE_NAME)); assertNotNull(createTableIfNotExists(ignite, TABLE_NAME)); } @@ -182,7 +187,6 @@ public class ItTablesApiTest extends IgniteAbstractTest { ignite1Inhibitor.stopInhibit(); - // TODO: IGNITE-20388 Fix it assertThat(createTblFut, willThrowWithCauseOrSuppressed(TableExistsValidationException.class)); assertThat(createTblIfNotExistsFut, willCompleteSuccessfully()); } @@ -328,8 +332,10 @@ public class ItTablesApiTest extends IgniteAbstractTest { for (Ignite ignite : clusterNodes) { if (ignite != ignite1) { - // TODO: IGNITE-20388 Fix it - assertThrowsWithCause(() -> addColumn(ignite, TABLE_NAME), CatalogValidationException.class); + assertThrowsSqlException( + Sql.STMT_VALIDATION_ERR, + "Failed to validate query. Column with name 'VALINT3' already exists", + () -> addColumn(ignite, TABLE_NAME)); } } @@ -337,7 +343,6 @@ public class ItTablesApiTest extends IgniteAbstractTest { ignite1Inhibitor.stopInhibit(); - // TODO: IGNITE-20388 Fix it assertThat(addColFut, willThrowWithCauseOrSuppressed(CatalogValidationException.class)); } @@ -400,8 +405,10 @@ public class ItTablesApiTest extends IgniteAbstractTest { assertNull(((IgniteTablesInternal) ignite.tables()).table(tblId)); - // TODO: IGNITE-20388 Fix it - assertThrowsWithCause(() -> dropTable(ignite, TABLE_NAME), TableNotFoundValidationException.class); + assertThrowsSqlException( + Sql.STMT_VALIDATION_ERR, + "Table with name 'PUBLIC.TBL1' not found", + () -> dropTable(ignite, TABLE_NAME)); dropTableIfExists(ignite, TABLE_NAME); } diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java similarity index 67% copy from modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java copy to modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java index 069db0aab4..5f94a23702 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java @@ -19,41 +19,28 @@ package org.apache.ignite.internal.sql.api; import static org.apache.ignite.internal.sql.engine.util.QueryChecker.containsIndexScan; import static org.apache.ignite.internal.sql.engine.util.QueryChecker.containsTableScan; +import static org.apache.ignite.internal.sql.engine.util.SqlTestUtils.asStream; import static org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException; import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause; -import static org.apache.ignite.internal.testframework.IgniteTestUtils.await; -import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; -import java.util.stream.Stream; -import java.util.stream.StreamSupport; import org.apache.ignite.internal.catalog.commands.CatalogUtils; -import org.apache.ignite.internal.client.sql.ClientSql; import org.apache.ignite.internal.sql.api.ColumnMetadataImpl.ColumnOriginImpl; import org.apache.ignite.internal.sql.engine.ClusterPerClassIntegrationTest; import org.apache.ignite.internal.sql.engine.QueryCancelledException; import org.apache.ignite.internal.testframework.IgniteTestUtils; -import org.apache.ignite.internal.testframework.IgniteTestUtils.RunnableX; import org.apache.ignite.internal.tx.TxManager; -import org.apache.ignite.internal.util.CollectionUtils; import org.apache.ignite.lang.ColumnAlreadyExistsException; import org.apache.ignite.lang.ColumnNotFoundException; import org.apache.ignite.lang.ErrorGroups; @@ -71,27 +58,30 @@ import org.apache.ignite.sql.ColumnType; import org.apache.ignite.sql.CursorClosedException; import org.apache.ignite.sql.IgniteSql; import org.apache.ignite.sql.NoRowSetExpectedException; +import org.apache.ignite.sql.ResultSet; import org.apache.ignite.sql.ResultSetMetadata; import org.apache.ignite.sql.Session; import org.apache.ignite.sql.SqlBatchException; import org.apache.ignite.sql.SqlException; import org.apache.ignite.sql.SqlRow; -import org.apache.ignite.sql.async.AsyncResultSet; import org.apache.ignite.table.Table; import org.apache.ignite.tx.Transaction; import org.apache.ignite.tx.TransactionOptions; import org.hamcrest.Matcher; -import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; /** - * Tests for asynchronous SQL API. + * Tests for SQL API. + * Tests will be run through synchronous, asynchronous API and client entry points. + * By default, any SQL API test should be added to the base class and use special provided methods to interact + * with the API in a API-type-independent manner. For any API-specific test, should be used the appropriate subclass. */ -@SuppressWarnings("ThrowableNotThrown") -public class ItSqlAsynchronousApiTest extends ClusterPerClassIntegrationTest { - private static final int ROW_COUNT = 16; +public abstract class ItSqlApiBaseTest extends ClusterPerClassIntegrationTest { + protected static final int ROW_COUNT = 16; @AfterEach public void dropTables() { @@ -238,31 +228,11 @@ public class ItSqlAsynchronousApiTest extends ClusterPerClassIntegrationTest { ); } - @Test - public void dml() { - sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); - - IgniteSql sql = igniteSql(); - Session ses = sql.createSession(); - - for (int i = 0; i < ROW_COUNT; ++i) { - checkDml(1, ses, "INSERT INTO TEST VALUES (?, ?)", i, i); - } - - checkDml(ROW_COUNT, ses, "UPDATE TEST SET VAL0 = VAL0 + ?", 1); - - checkDml(ROW_COUNT, ses, "DELETE FROM TEST WHERE VAL0 >= 0"); - } - /** Check all transactions are processed correctly even with case of sql Exception raised. */ @Test public void implicitTransactionsStates() { IgniteSql sql = igniteSql(); - if (sql instanceof ClientSql) { - return; - } - sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); Session ses = sql.createSession(); @@ -270,59 +240,22 @@ public class ItSqlAsynchronousApiTest extends ClusterPerClassIntegrationTest { TxManager txManager = txManager(); for (int i = 0; i < ROW_COUNT; ++i) { - CompletableFuture<AsyncResultSet<SqlRow>> fut = ses.executeAsync(null, "CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)", i, i); - - AsyncResultSet asyncRes = null; - - try { - asyncRes = await(fut); - } catch (Throwable ignore) { - // No op. - } - - if (asyncRes != null) { - await(asyncRes.closeAsync()); - } + assertThrowsSqlException( + Sql.STMT_VALIDATION_ERR, + "Table with name 'PUBLIC.TEST' already exists", + () -> execute(ses, "CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)") + ); } // No new transactions through ddl. assertEquals(0, txManager.pending()); } - /** Check correctness of explicit transaction rollback. */ - @Test - public void checkExplicitTxRollback() { - IgniteSql sql = igniteSql(); - - sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); - - Session ses = sql.createSession(); - - // Outer tx with further commit. - Transaction outerTx = igniteTx().begin(); - - for (int i = 0; i < ROW_COUNT; ++i) { - checkDml(1, ses, "INSERT INTO TEST VALUES (?, ?)", outerTx, i, i); - } - - await(outerTx.rollbackAsync()); - - AsyncResultSet rs = await(ses.executeAsync(null, "SELECT VAL0 FROM TEST ORDER BY VAL0")); - - assertEquals(0, StreamSupport.stream(rs.currentPage().spliterator(), false).count()); - - await(rs.closeAsync()); - } - /** Check correctness of implicit and explicit transactions. */ @Test public void checkTransactionsWithDml() { IgniteSql sql = igniteSql(); - if (sql instanceof ClientSql) { - return; - } - sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); Session ses = sql.createSession(); @@ -339,36 +272,36 @@ public class ItSqlAsynchronousApiTest extends ClusterPerClassIntegrationTest { Transaction outerTx = igniteTx().begin(); for (int i = ROW_COUNT; i < 2 * ROW_COUNT; ++i) { - checkDml(1, ses, "INSERT INTO TEST VALUES (?, ?)", outerTx, i, i); + checkDml(1, outerTx, ses, "INSERT INTO TEST VALUES (?, ?)", i, i); } - outerTx.commit(); + commit(outerTx); // Outdated tx. Transaction outerTx0 = outerTx; - //ToDo: IGNITE-20387 , here should be used assertThrowsSqlException method with code and message `"Transaction is already finished" - IgniteException e = assertThrows(IgniteException.class, - () -> checkDml(1, ses, "INSERT INTO TEST VALUES (?, ?)", outerTx0, ROW_COUNT, Integer.MAX_VALUE)); - assertEquals(Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR, e.code()); + assertThrowsSqlException( + Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR, + "Transaction is already finished", + () -> checkDml(1, outerTx0, ses, "INSERT INTO TEST VALUES (?, ?)", ROW_COUNT, Integer.MAX_VALUE)); assertThrowsSqlException( Sql.CONSTRAINT_VIOLATION_ERR, "PK unique constraint is violated", () -> checkDml(1, ses, "INSERT INTO TEST VALUES (?, ?)", ROW_COUNT, Integer.MAX_VALUE)); - AsyncResultSet rs = await(ses.executeAsync(null, "SELECT VAL0 FROM TEST ORDER BY VAL0")); + ResultSet<SqlRow> rs = executeForRead(ses, "SELECT VAL0 FROM TEST ORDER BY VAL0"); - assertEquals(2 * ROW_COUNT, StreamSupport.stream(rs.currentPage().spliterator(), false).count()); + assertEquals(2 * ROW_COUNT, asStream(rs).count()); - rs.closeAsync(); + rs.close(); outerTx = igniteTx().begin(); - rs = await(ses.executeAsync(outerTx, "SELECT VAL0 FROM TEST ORDER BY VAL0")); + rs = executeForRead(ses, outerTx, "SELECT VAL0 FROM TEST ORDER BY VAL0"); - assertEquals(2 * ROW_COUNT, StreamSupport.stream(rs.currentPage().spliterator(), false).count()); + assertEquals(2 * ROW_COUNT, asStream(rs).count()); - rs.closeAsync(); + rs.close(); outerTx.commit(); @@ -381,6 +314,32 @@ public class ItSqlAsynchronousApiTest extends ClusterPerClassIntegrationTest { assertEquals(0, txManagerInternal.pending()); } + /** Check correctness of explicit transaction rollback. */ + @Test + public void checkExplicitTxRollback() { + IgniteSql sql = igniteSql(); + + sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); + + Session ses = sql.createSession(); + + // Outer tx with further commit. + Transaction outerTx = igniteTx().begin(); + + for (int i = 0; i < ROW_COUNT; ++i) { + checkDml(1, outerTx, ses, "INSERT INTO TEST VALUES (?, ?)", i, i); + } + + rollback(outerTx); + + ResultSet<SqlRow> rs = executeForRead(ses, "SELECT VAL0 FROM TEST ORDER BY VAL0"); + + asStream(rs); + assertEquals(0, asStream(rs).count()); + + rs.close(); + } + /** Check correctness of rw and ro transactions for table scan. */ @Test public void checkMixedTransactionsForTable() { @@ -391,6 +350,7 @@ public class ItSqlAsynchronousApiTest extends ClusterPerClassIntegrationTest { checkMixedTransactions(planMatcher); } + /** Check correctness of rw and ro transactions for index scan. */ @Test public void checkMixedTransactionsForIndex() throws Exception { @@ -405,10 +365,6 @@ public class ItSqlAsynchronousApiTest extends ClusterPerClassIntegrationTest { private void checkMixedTransactions(Matcher<String> planMatcher) { IgniteSql sql = igniteSql(); - if (sql instanceof ClientSql) { - return; - } - Session ses = sql.createSession(); for (int i = 0; i < ROW_COUNT; ++i) { @@ -432,11 +388,11 @@ public class ItSqlAsynchronousApiTest extends ClusterPerClassIntegrationTest { assertQuery(outerTx, query).matches(planMatcher).check(); - AsyncResultSet rs = await(ses.executeAsync(outerTx, query)); + ResultSet<SqlRow> rs = executeForRead(ses, outerTx, query); - assertEquals(ROW_COUNT, StreamSupport.stream(rs.currentPage().spliterator(), false).count()); + assertEquals(ROW_COUNT, asStream(rs).count()); - rs.closeAsync(); + rs.close(); if (outerTx != null) { if (commit) { @@ -447,29 +403,6 @@ public class ItSqlAsynchronousApiTest extends ClusterPerClassIntegrationTest { } } - @Test - public void select() { - sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); - - IgniteSql sql = igniteSql(); - Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 4).build(); - - for (int i = 0; i < ROW_COUNT; ++i) { - ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i); - } - - TestPageProcessor pageProc = new TestPageProcessor(4); - await(ses.executeAsync(null, "SELECT ID FROM TEST").thenCompose(pageProc)); - - Set<Integer> rs = pageProc.result().stream().map(r -> r.intValue(0)).collect(Collectors.toSet()); - - for (int i = 0; i < ROW_COUNT; ++i) { - assertTrue(rs.remove(i), "Results invalid: " + pageProc.result()); - } - - assertTrue(rs.isEmpty()); - } - @Test public void metadata() { sql("CREATE TABLE TEST(COL0 BIGINT PRIMARY KEY, COL1 VARCHAR NOT NULL)"); @@ -477,9 +410,9 @@ public class ItSqlAsynchronousApiTest extends ClusterPerClassIntegrationTest { IgniteSql sql = igniteSql(); Session ses = sql.sessionBuilder().build(); - ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", 1L, "some string"); + execute(ses, "INSERT INTO TEST VALUES (?, ?)", 1L, "some string"); - AsyncResultSet<SqlRow> rs = await(ses.executeAsync(null, "SELECT COL1, COL0 FROM TEST")); + ResultSet<SqlRow> rs = executeForRead(ses, "SELECT COL1, COL0 FROM TEST"); // Validate columns metadata. ResultSetMetadata meta = rs.metadata(); @@ -508,11 +441,10 @@ public class ItSqlAsynchronousApiTest extends ClusterPerClassIntegrationTest { // Validate result columns types. assertTrue(rs.hasRowSet()); - assertEquals(1, rs.currentPageSize()); - SqlRow row = rs.currentPage().iterator().next(); + SqlRow row = rs.next(); - await(rs.closeAsync()); + rs.close(); assertInstanceOf(meta.columns().get(0).valueClass(), row.value(0)); assertInstanceOf(meta.columns().get(1).valueClass(), row.value(1)); @@ -523,9 +455,9 @@ public class ItSqlAsynchronousApiTest extends ClusterPerClassIntegrationTest { IgniteSql sql = igniteSql(); Session ses = sql.sessionBuilder().build(); - AsyncResultSet<SqlRow> ars = await(ses.executeAsync(null, "SELECT 1 as COL_A, 2 as COL_B")); + ResultSet<SqlRow> rs = executeForRead(ses, "SELECT 1 as COL_A, 2 as COL_B"); - SqlRow r = CollectionUtils.first(ars.currentPage()); + SqlRow r = rs.next(); assertEquals(2, r.columnCount()); assertEquals(0, r.columnIndex("COL_A")); @@ -548,59 +480,42 @@ public class ItSqlAsynchronousApiTest extends ClusterPerClassIntegrationTest { assertThrowsWithCause(() -> r.intValue(-2), IndexOutOfBoundsException.class); assertThrowsWithCause(() -> r.intValue(10), IndexOutOfBoundsException.class); - await(ars.closeAsync()); + rs.close(); } @Test - public void pageSequence() { + public void closeSession() { sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); IgniteSql sql = igniteSql(); - Session ses = sql.sessionBuilder().defaultPageSize(1).build(); + Session ses = sql.sessionBuilder().defaultPageSize(2).build(); for (int i = 0; i < ROW_COUNT; ++i) { - ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i); + execute(ses, "INSERT INTO TEST VALUES (?, ?)", i, i); } - AsyncResultSet<SqlRow> ars0 = await(ses.executeAsync(null, "SELECT ID FROM TEST ORDER BY ID")); - var p0 = ars0.currentPage(); - AsyncResultSet<SqlRow> ars1 = await(ars0.fetchNextPage()); - var p1 = ars1.currentPage(); - AsyncResultSet<SqlRow> ars2 = await(ars1.fetchNextPage().toCompletableFuture()); - var p2 = ars2.currentPage(); - AsyncResultSet<SqlRow> ars3 = await(ars1.fetchNextPage()); - var p3 = ars3.currentPage(); - AsyncResultSet<SqlRow> ars4 = await(ars0.fetchNextPage()); - var p4 = ars4.currentPage(); - - assertSame(ars0, ars1); - assertSame(ars0, ars2); - assertSame(ars0, ars3); - assertSame(ars0, ars4); + ResultSet rs = executeForRead(ses, "SELECT ID FROM TEST"); - List<SqlRow> res = Stream.of(p0, p1, p2, p3, p4) - .flatMap(p -> StreamSupport.stream(p.spliterator(), false)) - .collect(Collectors.toList()); + ses.close(); - TestPageProcessor pageProc = new TestPageProcessor(ROW_COUNT - res.size()); - await(ars4.fetchNextPage().thenCompose(pageProc)); - - res.addAll(pageProc.result()); + SqlException sqlEx = assertThrowsSqlException( + Sql.EXECUTION_CANCELLED_ERR, + "The query was cancelled while executing", + () -> rs.forEachRemaining(System.out::println)); - for (int i = 0; i < ROW_COUNT; ++i) { - assertEquals(i, res.get(i).intValue(0)); - } + assertTrue(IgniteTestUtils.hasCause(sqlEx, QueryCancelledException.class, null)); + assertThrowsSqlException(Sql.SESSION_CLOSED_ERR, "Session is closed", () -> execute(ses, "SELECT ID FROM TEST")); } @Test - public void errors() { + public void errors() throws InterruptedException { sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT NOT NULL)"); IgniteSql sql = igniteSql(); - Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 2).build(); + Session ses = sql.sessionBuilder().defaultPageSize(2).build(); for (int i = 0; i < ROW_COUNT; ++i) { - ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i); + execute(ses, "INSERT INTO TEST VALUES (?, ?)", i, i); } // Parse error. @@ -628,87 +543,71 @@ public class ItSqlAsynchronousApiTest extends ClusterPerClassIntegrationTest { // No result set error. { - AsyncResultSet ars = await(ses.executeAsync(null, "CREATE TABLE TEST3 (ID INT PRIMARY KEY)")); + ResultSet rs = executeForRead(ses, "CREATE TABLE TEST3 (ID INT PRIMARY KEY)"); assertThrowsSqlException( NoRowSetExpectedException.class, Sql.QUERY_NO_RESULT_SET_ERR, "Query has no result set", - () -> await(ars.fetchNextPage())); + () -> rs.next()); } // Cursor closed error. { - AsyncResultSet ars = await(ses.executeAsync(null, "SELECT * FROM TEST")); - await(ars.closeAsync()); + ResultSet rs = executeForRead(ses, "SELECT * FROM TEST"); + Thread.sleep(300); // ResultSetImpl fetches next page in background, wait to it to complete to avoid flakiness. + rs.close(); assertThrowsSqlException( CursorClosedException.class, Sql.CURSOR_CLOSED_ERR, "Cursor is closed", - () -> await(ars.fetchNextPage())); + () -> rs.forEachRemaining(Object::hashCode)); } } - /** - * DDL is non-transactional. - */ @Test - public void ddlInTransaction() { - Session ses = igniteSql().createSession(); + public void dml() { sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); - { - Transaction tx = igniteTx().begin(); - try { - assertThrowsSqlException( - Sql.STMT_VALIDATION_ERR, - "DDL doesn't support transactions.", - () -> await(ses.executeAsync(tx, "CREATE TABLE TEST2(ID INT PRIMARY KEY, VAL0 INT)")) - ); - } finally { - tx.rollback(); - } - } - { - Transaction tx = igniteTx().begin(); - AsyncResultSet<SqlRow> res = await(ses.executeAsync(tx, "INSERT INTO TEST VALUES (?, ?)", -1, -1)); - assertEquals(1, res.affectedRows()); + IgniteSql sql = igniteSql(); + Session ses = sql.createSession(); - assertThrowsSqlException( - Sql.STMT_VALIDATION_ERR, - "DDL doesn't support transactions.", - () -> await(ses.executeAsync(tx, "CREATE TABLE TEST2(ID INT PRIMARY KEY, VAL0 INT)")) - ); - tx.commit(); + TxManager txManager = txManager(); - assertTrue(await(ses.executeAsync(null, "SELECT ID FROM TEST WHERE ID = -1")).currentPage().iterator().hasNext()); + int txPrevCnt = txManager.finished(); + + for (int i = 0; i < ROW_COUNT; ++i) { + checkDml(1, ses, "INSERT INTO TEST VALUES (?, ?)", i, i); } + + assertEquals(ROW_COUNT, txManager.finished() - txPrevCnt); + // No new transactions through ddl. + assertEquals(0, txManager.pending()); + + checkDml(ROW_COUNT, ses, "UPDATE TEST SET VAL0 = VAL0 + ?", 1); + + checkDml(ROW_COUNT, ses, "DELETE FROM TEST WHERE VAL0 >= 0"); } @Test - public void closeSession() { + public void select() { sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); IgniteSql sql = igniteSql(); - Session ses = sql.sessionBuilder().defaultPageSize(2).build(); + Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 4).build(); for (int i = 0; i < ROW_COUNT; ++i) { ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i); } - AsyncResultSet ars0 = await(ses.executeAsync(null, "SELECT ID FROM TEST")); + ResultProcessor resultProcessor = execute(4, ses, "SELECT ID FROM TEST"); - await(ses.closeAsync()); + Set<Integer> rs = resultProcessor.result().stream().map(r -> r.intValue(0)).collect(Collectors.toSet()); - // Fetched page is available after cancel. - ars0.currentPage(); - - SqlException sqlEx = assertThrowsSqlException( - Sql.EXECUTION_CANCELLED_ERR, - "The query was cancelled while executing", - () -> await(ars0.fetchNextPage())); + for (int i = 0; i < ROW_COUNT; ++i) { + assertTrue(rs.remove(i), "Results invalid: " + resultProcessor.result()); + } - assertTrue(IgniteTestUtils.hasCause(sqlEx, QueryCancelledException.class, null)); - assertThrowsSqlException(Sql.SESSION_CLOSED_ERR, "Session is closed", () -> await(ses.executeAsync(null, "SELECT ID FROM TEST"))); + assertTrue(rs.isEmpty()); } @Test @@ -724,7 +623,7 @@ public class ItSqlAsynchronousApiTest extends ClusterPerClassIntegrationTest { args.add(i, i); } - long[] batchRes = await(ses.executeBatchAsync(null, "INSERT INTO TEST VALUES (?, ?)", args)); + long[] batchRes = executeBatch(ses, "INSERT INTO TEST VALUES (?, ?)", args); Arrays.stream(batchRes).forEach(r -> assertEquals(1L, r)); @@ -737,13 +636,13 @@ public class ItSqlAsynchronousApiTest extends ClusterPerClassIntegrationTest { SqlBatchException.class, Sql.STMT_VALIDATION_ERR, "Invalid SQL statement type", - () -> await(ses.executeBatchAsync(null, "SELECT * FROM TEST", args))); + () -> executeBatch(ses, "SELECT * FROM TEST", args)); assertThrowsSqlException( SqlBatchException.class, Sql.STMT_VALIDATION_ERR, "Invalid SQL statement type", - () -> await(ses.executeBatchAsync(null, "CREATE TABLE TEST1(ID INT PRIMARY KEY, VAL0 INT)", args))); + () -> executeBatch(ses, "CREATE TABLE TEST1(ID INT PRIMARY KEY, VAL0 INT)", args)); } @Test @@ -769,33 +668,99 @@ public class ItSqlAsynchronousApiTest extends ClusterPerClassIntegrationTest { SqlBatchException.class, Sql.CONSTRAINT_VIOLATION_ERR, "PK unique constraint is violated", - () -> await(ses.executeBatchAsync(null, "INSERT INTO TEST VALUES (?, ?)", args)) + () -> executeBatch(ses, "INSERT INTO TEST VALUES (?, ?)", args) ); assertEquals(err, ex.updateCounters().length); IntStream.range(0, ex.updateCounters().length).forEach(i -> assertEquals(1, ex.updateCounters()[i])); } - @Test - public void resultSetCloseShouldFinishImplicitTransaction() throws InterruptedException { - sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); + @ParameterizedTest + @ValueSource(strings = { + "INSERT INTO tst VALUES (2, ?)", + "SELECT * FROM tst WHERE id = ? " + }) + public void runtimeErrorInDmlCausesTransactionToFail(String query) { + sql("CREATE TABLE tst(id INTEGER PRIMARY KEY, val INTEGER)"); + sql("INSERT INTO tst VALUES (?,?)", 1, 1); + + try (Session ses = igniteSql().createSession()) { + Transaction tx = igniteTx().begin(); + String dmlQuery = "UPDATE tst SET val = val/(val - ?) + 1"; + + assertThrowsSqlException( + Sql.RUNTIME_ERR, + "/ by zero", + () -> execute(tx, ses, dmlQuery, 1).affectedRows()); + + IgniteException err = assertThrows(IgniteException.class, () -> { + ResultSet<SqlRow> rs = executeForRead(ses, tx, query, 2); + if (rs.hasRowSet()) { + assertTrue(rs.hasNext()); + } else { + assertTrue(rs.wasApplied()); + } + }); + + assertEquals(Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR, err.code(), err.toString()); + } + } + + @ParameterizedTest + @ValueSource(strings = { + "INSERT INTO tst VALUES (2, ?)", + "SELECT * FROM tst WHERE id = ? " + }) + public void runtimeErrorInQueryCausesTransactionToFail(String query) { + sql("CREATE TABLE tst(id INTEGER PRIMARY KEY, val INTEGER)"); + + sql("INSERT INTO tst VALUES (?,?)", 1, 1); + + try (Session ses = igniteSql().createSession()) { + Transaction tx = igniteTx().begin(); + + assertThrowsSqlException( + Sql.RUNTIME_ERR, + "/ by zero", + () -> execute(tx, ses, "SELECT val/? FROM tst WHERE id=?", 0, 1)); + + IgniteException err = assertThrows(IgniteException.class, () -> { + ResultSet<SqlRow> rs = executeForRead(ses, tx, query, 2); + if (rs.hasRowSet()) { + assertTrue(rs.hasNext()); + } else { + assertTrue(rs.wasApplied()); + } + }); + + assertEquals(Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR, err.code(), err.toString()); + } + } + + @Disabled("https://issues.apache.org/jira/browse/IGNITE-20534") + @Test + public void testLockIsNotReleasedAfterTxRollback() { IgniteSql sql = igniteSql(); - Session ses = sql.sessionBuilder().defaultPageSize(2).build(); - for (int i = 0; i < ROW_COUNT; ++i) { - ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i); + try (Session ses = sql.createSession()) { + checkDdl(true, ses, "CREATE TABLE IF NOT EXISTS tst(id INTEGER PRIMARY KEY, val INTEGER)"); } - CompletableFuture<AsyncResultSet<SqlRow>> f = ses.executeAsync(null, "SELECT * FROM TEST"); + try (Session session = sql.createSession()) { + Transaction tx = igniteTx().begin(); + + assertThrows(RuntimeException.class, () -> execute(tx, session, "SELECT 1/0")); + tx.rollback(); + session.execute(tx, "INSERT INTO tst VALUES (1, 1)"); + } - AsyncResultSet<SqlRow> ars = f.join(); - // There should be a pending transaction since not all data was read. - boolean txStarted = waitForCondition(() -> txManager().pending() == 1, 5000); - assertTrue(txStarted, "No pending transactions"); + try (Session session = sql.createSession()) { + Transaction tx = igniteTx().begin(new TransactionOptions().readOnly(false)); - ars.closeAsync().join(); - assertEquals(0, txManager().pending(), "Expected no pending transactions"); + execute(tx, session, "INSERT INTO tst VALUES (1, 1)"); + tx.commit(); + } } @Test @@ -803,127 +768,141 @@ public class ItSqlAsynchronousApiTest extends ClusterPerClassIntegrationTest { sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); IgniteSql sql = igniteSql(); - // Fetch all data in one read. Session ses = sql.sessionBuilder().defaultPageSize(100).build(); - for (int i = 0; i < ROW_COUNT; ++i) { - ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i); + execute(ses, "INSERT INTO TEST VALUES (?, ?)", i, i); } - CompletableFuture<AsyncResultSet<SqlRow>> f = ses.executeAsync(null, "SELECT * FROM TEST"); - - AsyncResultSet<SqlRow> ars = f.join(); - assertFalse(ars.hasMorePages()); + execute(1, ses, "SELECT * FROM TEST"); assertEquals(0, txManager().pending(), "Expected no pending transactions"); } - private static void checkDdl(boolean expectedApplied, Session ses, String sql, Transaction tx) { - CompletableFuture<AsyncResultSet<SqlRow>> fut = ses.executeAsync( - tx, - sql - ); + /** + * DDL is non-transactional. + */ + @Test + public void ddlInTransaction() { + Session ses = igniteSql().createSession(); + sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); - AsyncResultSet<SqlRow> asyncRes = await(fut); + { + Transaction tx = igniteTx().begin(); + try { + assertThrowsSqlException( + Sql.STMT_VALIDATION_ERR, + "DDL doesn't support transactions.", + () -> execute(tx, ses, "CREATE TABLE TEST2(ID INT PRIMARY KEY, VAL0 INT)") + ); + } finally { + tx.rollback(); + } + } + { + Transaction tx = igniteTx().begin(); + ResultProcessor result = execute(tx, ses, "INSERT INTO TEST VALUES (?, ?)", -1, -1); + assertEquals(1, result.affectedRows()); - assertEquals(expectedApplied, asyncRes.wasApplied()); - assertFalse(asyncRes.hasMorePages()); - assertFalse(asyncRes.hasRowSet()); - assertEquals(-1, asyncRes.affectedRows()); + assertThrowsSqlException( + Sql.STMT_VALIDATION_ERR, + "DDL doesn't support transactions.", + () -> ses.execute(tx, "CREATE TABLE TEST2(ID INT PRIMARY KEY, VAL0 INT)") + ); + tx.commit(); - assertNull(asyncRes.metadata()); + assertEquals(1, execute(ses, "SELECT ID FROM TEST WHERE ID = -1").result().size()); + } - await(asyncRes.closeAsync()); + assertEquals(0, txManager().pending()); } - private static void checkDdl(boolean expectedApplied, Session ses, String sql) { - checkDdl(expectedApplied, ses, sql, null); + @Test + public void resultSetCloseShouldFinishImplicitTransaction() { + sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); + + IgniteSql sql = igniteSql(); + Session ses = sql.sessionBuilder().defaultPageSize(2).build(); + + for (int i = 0; i < ROW_COUNT; ++i) { + execute(ses, "INSERT INTO TEST VALUES (?, ?)", i, i); + } + + ResultSet<?> rs = executeForRead(ses, "SELECT * FROM TEST"); + assertEquals(1, txManager().pending()); + rs.close(); + assertEquals(0, txManager().pending(), "Expected no pending transactions"); + } + + protected ResultSet<SqlRow> executeForRead(Session ses, String query) { + return executeForRead(ses, null, query); } - private static <T extends IgniteException> T checkError(Class<T> expCls, Integer code, String msg, Session ses, String sql, + protected abstract ResultSet<SqlRow> executeForRead(Session ses, Transaction tx, String query, Object... args); + + protected <T extends IgniteException> T checkError(Class<T> expCls, Integer code, String msg, Session ses, String sql, Object... args) { - return assertThrowsPublicException(() -> await(ses.executeAsync(null, sql, args)), expCls, code, msg); + T ex = assertThrows(expCls, () -> execute(ses, sql, args)); + + if (code != null) { + assertEquals(new IgniteException(code).codeAsString(), ex.codeAsString()); + } + + if (msg != null) { + assertThat(ex.getMessage(), containsString(msg)); + } + + return ex; } - private static SqlException checkSqlError( + protected SqlException checkSqlError( int code, String msg, Session ses, String sql, Object... args ) { - return assertThrowsSqlException(code, msg, () -> await(ses.executeAsync(null, sql, args))); + return assertThrowsSqlException(code, msg, () -> execute(ses, sql, args)); } - protected static void checkDml(int expectedAffectedRows, Session ses, String sql, Transaction tx, Object... args) { - AsyncResultSet asyncRes = await(ses.executeAsync(tx, sql, args)); + protected abstract long[] executeBatch(Session ses, String sql, BatchedArguments args); - assertFalse(asyncRes.wasApplied()); - assertFalse(asyncRes.hasMorePages()); - assertFalse(asyncRes.hasRowSet()); - assertEquals(expectedAffectedRows, asyncRes.affectedRows()); + protected abstract ResultProcessor execute(Integer expectedPages, Transaction tx, Session ses, String sql, Object... args); - assertNull(asyncRes.metadata()); - - await(asyncRes.closeAsync()); + protected ResultProcessor execute(int expectedPages, Session ses, String sql, Object... args) { + return execute(expectedPages, null, ses, sql, args); } - private static void checkDml(int expectedAffectedRows, Session ses, String sql, Object... args) { - checkDml(expectedAffectedRows, ses, sql, null, args); + protected ResultProcessor execute(Transaction tx, Session ses, String sql, Object... args) { + return execute(null, tx, ses, sql, args); } - static <T extends IgniteException> T assertThrowsPublicException( - RunnableX executable, - Class<T> expCls, - @Nullable Integer code, - @Nullable String msgPart - ) { - T ex = assertThrows(expCls, executable::run); - - if (code != null) { - assertEquals(new IgniteException(code).codeAsString(), ex.codeAsString()); - } - - if (msgPart != null) { - assertThat(ex.getMessage(), containsString(msgPart)); - } - - return ex; + protected ResultProcessor execute(Session ses, String sql, Object... args) { + return execute(null, null, ses, sql, args); } - static class TestPageProcessor implements - Function<AsyncResultSet<SqlRow>, CompletionStage<AsyncResultSet<SqlRow>>> { - private int expectedPages; + protected abstract void rollback(Transaction outerTx); - private final List<SqlRow> res = new ArrayList<>(); + protected abstract void commit(Transaction outerTx); - TestPageProcessor(int expectedPages) { - this.expectedPages = expectedPages; - } + protected void checkDml(int expectedAffectedRows, Transaction tx, Session ses, String sql, Object... args) { - @Override - public CompletionStage<AsyncResultSet<SqlRow>> apply(AsyncResultSet<SqlRow> rs) { - expectedPages--; + } - assertTrue(rs.hasRowSet()); - assertFalse(rs.wasApplied()); - assertEquals(-1L, rs.affectedRows()); - assertEquals(expectedPages > 0, rs.hasMorePages(), - "hasMorePages(): [expected=" + (expectedPages > 0) + ", actual=" + rs.hasMorePages() + ']'); + protected void checkDml(int expectedAffectedRows, Session ses, String sql, Object... args) { + checkDml(expectedAffectedRows, null, ses, sql, args); + } - rs.currentPage().forEach(res::add); + protected void checkDdl(boolean expectedApplied, Session ses, String sql) { + checkDdl(expectedApplied, ses, sql, null); + } - if (rs.hasMorePages()) { - return rs.fetchNextPage().thenCompose(this); - } + protected abstract void checkDdl(boolean expectedApplied, Session ses, String sql, Transaction tx); - return rs.closeAsync().thenApply(v -> rs); - } + /** Represent result of running SQL query to hide implementation specific for different version of tests. */ + protected interface ResultProcessor { + List<SqlRow> result(); - public List<SqlRow> result() { - //noinspection AssignmentOrReturnOfFieldWithMutableType - return res; - } + long affectedRows(); } } diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java index 069db0aab4..77d45cc8f6 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java @@ -17,540 +17,36 @@ package org.apache.ignite.internal.sql.api; -import static org.apache.ignite.internal.sql.engine.util.QueryChecker.containsIndexScan; -import static org.apache.ignite.internal.sql.engine.util.QueryChecker.containsTableScan; -import static org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException; -import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause; import static org.apache.ignite.internal.testframework.IgniteTestUtils.await; -import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.containsString; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertInstanceOf; -import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertSame; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; -import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.function.Function; import java.util.stream.Collectors; -import java.util.stream.IntStream; import java.util.stream.Stream; import java.util.stream.StreamSupport; -import org.apache.ignite.internal.catalog.commands.CatalogUtils; -import org.apache.ignite.internal.client.sql.ClientSql; -import org.apache.ignite.internal.sql.api.ColumnMetadataImpl.ColumnOriginImpl; -import org.apache.ignite.internal.sql.engine.ClusterPerClassIntegrationTest; -import org.apache.ignite.internal.sql.engine.QueryCancelledException; -import org.apache.ignite.internal.testframework.IgniteTestUtils; -import org.apache.ignite.internal.testframework.IgniteTestUtils.RunnableX; -import org.apache.ignite.internal.tx.TxManager; -import org.apache.ignite.internal.util.CollectionUtils; -import org.apache.ignite.lang.ColumnAlreadyExistsException; -import org.apache.ignite.lang.ColumnNotFoundException; -import org.apache.ignite.lang.ErrorGroups; -import org.apache.ignite.lang.ErrorGroups.Index; -import org.apache.ignite.lang.ErrorGroups.Sql; -import org.apache.ignite.lang.ErrorGroups.Transactions; -import org.apache.ignite.lang.IgniteException; -import org.apache.ignite.lang.IndexAlreadyExistsException; -import org.apache.ignite.lang.IndexNotFoundException; -import org.apache.ignite.lang.TableAlreadyExistsException; -import org.apache.ignite.lang.TableNotFoundException; +import org.apache.ignite.internal.sql.SyncResultSetAdapter; import org.apache.ignite.sql.BatchedArguments; -import org.apache.ignite.sql.ColumnMetadata; -import org.apache.ignite.sql.ColumnType; -import org.apache.ignite.sql.CursorClosedException; import org.apache.ignite.sql.IgniteSql; -import org.apache.ignite.sql.NoRowSetExpectedException; -import org.apache.ignite.sql.ResultSetMetadata; +import org.apache.ignite.sql.ResultSet; import org.apache.ignite.sql.Session; -import org.apache.ignite.sql.SqlBatchException; -import org.apache.ignite.sql.SqlException; import org.apache.ignite.sql.SqlRow; import org.apache.ignite.sql.async.AsyncResultSet; -import org.apache.ignite.table.Table; import org.apache.ignite.tx.Transaction; -import org.apache.ignite.tx.TransactionOptions; -import org.hamcrest.Matcher; -import org.jetbrains.annotations.Nullable; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; /** * Tests for asynchronous SQL API. */ @SuppressWarnings("ThrowableNotThrown") -public class ItSqlAsynchronousApiTest extends ClusterPerClassIntegrationTest { - private static final int ROW_COUNT = 16; - - @AfterEach - public void dropTables() { - for (Table t : CLUSTER_NODES.get(0).tables().tables()) { - sql("DROP TABLE " + t.name()); - } - } - - @Test - @Disabled("https://issues.apache.org/jira/browse/IGNITE-20096") - public void ddl() throws Exception { - IgniteSql sql = igniteSql(); - Session ses = sql.createSession(); - - // CREATE TABLE - checkDdl(true, ses, "CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); - checkError( - TableAlreadyExistsException.class, - ErrorGroups.Table.TABLE_ALREADY_EXISTS_ERR, - "Table already exists [name=\"PUBLIC\".\"TEST\"]", - ses, - "CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)" - ); - checkSqlError( - ErrorGroups.Table.TABLE_DEFINITION_ERR, - "Can't create table with duplicate columns: ID, VAL, VAL", - ses, - "CREATE TABLE TEST1(ID INT PRIMARY KEY, VAL INT, VAL INT)" - ); - checkDdl(false, ses, "CREATE TABLE IF NOT EXISTS TEST(ID INT PRIMARY KEY, VAL VARCHAR)"); - - // ADD COLUMN - checkDdl(true, ses, "ALTER TABLE TEST ADD COLUMN VAL1 VARCHAR"); - checkError( - TableNotFoundException.class, - ErrorGroups.Table.TABLE_NOT_FOUND_ERR, - "The table does not exist [name=\"PUBLIC\".\"NOT_EXISTS_TABLE\"]", - ses, - "ALTER TABLE NOT_EXISTS_TABLE ADD COLUMN VAL1 VARCHAR" - ); - checkDdl(false, ses, "ALTER TABLE IF EXISTS NOT_EXISTS_TABLE ADD COLUMN VAL1 VARCHAR"); - checkError( - ColumnAlreadyExistsException.class, - ErrorGroups.Table.COLUMN_ALREADY_EXISTS_ERR, - "Column already exists [name=\"VAL1\"]", - ses, - "ALTER TABLE TEST ADD COLUMN VAL1 INT" - ); - - // CREATE INDEX - checkDdl(true, ses, "CREATE INDEX TEST_IDX ON TEST(VAL0)"); - checkError( - IndexAlreadyExistsException.class, - Index.INDEX_ALREADY_EXISTS_ERR, - "Index already exists [name=\"PUBLIC\".\"TEST_IDX\"]", - ses, - "CREATE INDEX TEST_IDX ON TEST(VAL1)" - ); - checkDdl(false, ses, "CREATE INDEX IF NOT EXISTS TEST_IDX ON TEST(VAL1)"); - - // TODO: IGNITE-19150 We are waiting for schema synchronization to avoid races to create and destroy indexes - waitForIndexBuild("TEST", "TEST_IDX"); - - checkDdl(true, ses, "DROP INDEX TESt_iDX"); - checkDdl(true, ses, "CREATE INDEX TEST_IDX1 ON TEST(VAL0)"); - checkDdl(true, ses, "CREATE INDEX TEST_IDX2 ON TEST(VAL0)"); - checkDdl(true, ses, "CREATE INDEX TEST_IDX3 ON TEST(ID, VAL0, VAL1)"); - checkSqlError( - Index.INVALID_INDEX_DEFINITION_ERR, - "Can't create index on duplicate columns: VAL0, VAL0", - ses, - "CREATE INDEX TEST_IDX4 ON TEST(VAL0, VAL0)" - ); - - checkSqlError( - Sql.STMT_VALIDATION_ERR, - "Can`t delete column(s). Column VAL1 is used by indexes [TEST_IDX3].", - ses, - "ALTER TABLE TEST DROP COLUMN val1" - ); - - SqlException ex = checkSqlError( - Sql.STMT_VALIDATION_ERR, - "Can`t delete column(s).", - ses, - "ALTER TABLE TEST DROP COLUMN (val0, val1)" - ); - - String msg = ex.getMessage(); - String explainMsg = "Unexpected error message: " + msg; - - assertTrue(msg.contains("Column VAL0 is used by indexes ["), explainMsg); - assertTrue(msg.contains("TEST_IDX1") && msg.contains("TEST_IDX2") && msg.contains("TEST_IDX3"), explainMsg); - assertTrue(msg.contains("Column VAL1 is used by indexes [TEST_IDX3]"), explainMsg); - - checkSqlError( - Sql.STMT_VALIDATION_ERR, - "Can`t delete column, belongs to primary key: [name=ID]", - ses, - "ALTER TABLE TEST DROP COLUMN id" - ); - - // TODO: IGNITE-19150 We are waiting for schema synchronization to avoid races to create and destroy indexes - waitForIndexBuild("TEST", "TEST_IDX3"); - checkDdl(true, ses, "DROP INDEX TESt_iDX3"); - - // DROP COLUMNS - checkDdl(true, ses, "ALTER TABLE TEST DROP COLUMN VAL1"); - checkError( - TableNotFoundException.class, - ErrorGroups.Table.TABLE_NOT_FOUND_ERR, - "The table does not exist [name=\"PUBLIC\".\"NOT_EXISTS_TABLE\"]", - ses, - "ALTER TABLE NOT_EXISTS_TABLE DROP COLUMN VAL1" - ); - checkDdl(false, ses, "ALTER TABLE IF EXISTS NOT_EXISTS_TABLE DROP COLUMN VAL1"); - checkError( - ColumnNotFoundException.class, - ErrorGroups.Table.COLUMN_NOT_FOUND_ERR, - "Column does not exist [tableName=\"PUBLIC\".\"TEST\", columnName=\"VAL1\"]", - ses, - "ALTER TABLE TEST DROP COLUMN VAL1" - ); - - // DROP TABLE - checkDdl(false, ses, "DROP TABLE IF EXISTS NOT_EXISTS_TABLE"); - - checkDdl(true, ses, "DROP TABLE TEST"); - checkError( - TableNotFoundException.class, - ErrorGroups.Table.TABLE_NOT_FOUND_ERR, - "The table does not exist [name=\"PUBLIC\".\"TEST\"]", - ses, - "DROP TABLE TEST" - ); - - checkDdl(false, ses, "DROP INDEX IF EXISTS TEST_IDX"); - - checkError( - IndexNotFoundException.class, - Index.INDEX_NOT_FOUND_ERR, - "Index does not exist [name=\"PUBLIC\".\"TEST_IDX\"]", ses, - "DROP INDEX TEST_IDX" - ); - } - - @Test - public void dml() { - sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); - - IgniteSql sql = igniteSql(); - Session ses = sql.createSession(); - - for (int i = 0; i < ROW_COUNT; ++i) { - checkDml(1, ses, "INSERT INTO TEST VALUES (?, ?)", i, i); - } - - checkDml(ROW_COUNT, ses, "UPDATE TEST SET VAL0 = VAL0 + ?", 1); - - checkDml(ROW_COUNT, ses, "DELETE FROM TEST WHERE VAL0 >= 0"); - } - - /** Check all transactions are processed correctly even with case of sql Exception raised. */ - @Test - public void implicitTransactionsStates() { - IgniteSql sql = igniteSql(); - - if (sql instanceof ClientSql) { - return; - } - - sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); - - Session ses = sql.createSession(); - - TxManager txManager = txManager(); - - for (int i = 0; i < ROW_COUNT; ++i) { - CompletableFuture<AsyncResultSet<SqlRow>> fut = ses.executeAsync(null, "CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)", i, i); - - AsyncResultSet asyncRes = null; - - try { - asyncRes = await(fut); - } catch (Throwable ignore) { - // No op. - } - - if (asyncRes != null) { - await(asyncRes.closeAsync()); - } - } - - // No new transactions through ddl. - assertEquals(0, txManager.pending()); - } - - /** Check correctness of explicit transaction rollback. */ - @Test - public void checkExplicitTxRollback() { - IgniteSql sql = igniteSql(); - - sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); - - Session ses = sql.createSession(); - - // Outer tx with further commit. - Transaction outerTx = igniteTx().begin(); - - for (int i = 0; i < ROW_COUNT; ++i) { - checkDml(1, ses, "INSERT INTO TEST VALUES (?, ?)", outerTx, i, i); - } - - await(outerTx.rollbackAsync()); - - AsyncResultSet rs = await(ses.executeAsync(null, "SELECT VAL0 FROM TEST ORDER BY VAL0")); - - assertEquals(0, StreamSupport.stream(rs.currentPage().spliterator(), false).count()); - - await(rs.closeAsync()); - } - - /** Check correctness of implicit and explicit transactions. */ - @Test - public void checkTransactionsWithDml() { - IgniteSql sql = igniteSql(); - - if (sql instanceof ClientSql) { - return; - } - - sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); - - Session ses = sql.createSession(); - - TxManager txManagerInternal = txManager(); - - int txPrevCnt = txManagerInternal.finished(); - - for (int i = 0; i < ROW_COUNT; ++i) { - checkDml(1, ses, "INSERT INTO TEST VALUES (?, ?)", i, i); - } - - // Outer tx with further commit. - Transaction outerTx = igniteTx().begin(); - - for (int i = ROW_COUNT; i < 2 * ROW_COUNT; ++i) { - checkDml(1, ses, "INSERT INTO TEST VALUES (?, ?)", outerTx, i, i); - } - - outerTx.commit(); - - // Outdated tx. - Transaction outerTx0 = outerTx; - //ToDo: IGNITE-20387 , here should be used assertThrowsSqlException method with code and message `"Transaction is already finished" - IgniteException e = assertThrows(IgniteException.class, - () -> checkDml(1, ses, "INSERT INTO TEST VALUES (?, ?)", outerTx0, ROW_COUNT, Integer.MAX_VALUE)); - assertEquals(Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR, e.code()); - - assertThrowsSqlException( - Sql.CONSTRAINT_VIOLATION_ERR, - "PK unique constraint is violated", - () -> checkDml(1, ses, "INSERT INTO TEST VALUES (?, ?)", ROW_COUNT, Integer.MAX_VALUE)); - - AsyncResultSet rs = await(ses.executeAsync(null, "SELECT VAL0 FROM TEST ORDER BY VAL0")); - - assertEquals(2 * ROW_COUNT, StreamSupport.stream(rs.currentPage().spliterator(), false).count()); - - rs.closeAsync(); - - outerTx = igniteTx().begin(); - - rs = await(ses.executeAsync(outerTx, "SELECT VAL0 FROM TEST ORDER BY VAL0")); - - assertEquals(2 * ROW_COUNT, StreamSupport.stream(rs.currentPage().spliterator(), false).count()); - - rs.closeAsync(); - - outerTx.commit(); - - checkDml(2 * ROW_COUNT, ses, "UPDATE TEST SET VAL0 = VAL0 + ?", 1); - - checkDml(2 * ROW_COUNT, ses, "DELETE FROM TEST WHERE VAL0 >= 0"); - - assertEquals(ROW_COUNT + 1 + 1 + 1 + 1 + 1 + 1, txManagerInternal.finished() - txPrevCnt); - - assertEquals(0, txManagerInternal.pending()); - } - - /** Check correctness of rw and ro transactions for table scan. */ - @Test - public void checkMixedTransactionsForTable() { - sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); - - Matcher<String> planMatcher = containsTableScan("PUBLIC", "TEST"); - - checkMixedTransactions(planMatcher); - } - - /** Check correctness of rw and ro transactions for index scan. */ - @Test - public void checkMixedTransactionsForIndex() throws Exception { - sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); - sql("CREATE INDEX TEST_IDX ON TEST(VAL0)"); - - Matcher<String> planMatcher = containsIndexScan("PUBLIC", "TEST", "TEST_IDX"); - - checkMixedTransactions(planMatcher); - } - - private void checkMixedTransactions(Matcher<String> planMatcher) { - IgniteSql sql = igniteSql(); - - if (sql instanceof ClientSql) { - return; - } - - Session ses = sql.createSession(); - - for (int i = 0; i < ROW_COUNT; ++i) { - sql("INSERT INTO TEST VALUES (?, ?)", i, i); - } - - List<Boolean> booleanList = List.of(Boolean.TRUE, Boolean.FALSE); - for (boolean roTx : booleanList) { - for (boolean commit : booleanList) { - for (boolean explicit : booleanList) { - checkTx(ses, roTx, commit, explicit, planMatcher); - } - } - } - } - - private void checkTx(Session ses, boolean readOnly, boolean commit, boolean explicit, Matcher<String> planMatcher) { - Transaction outerTx = explicit ? (readOnly ? igniteTx().begin(new TransactionOptions().readOnly(true)) : igniteTx().begin()) : null; - - String query = "SELECT VAL0 FROM TEST ORDER BY VAL0"; - - assertQuery(outerTx, query).matches(planMatcher).check(); - - AsyncResultSet rs = await(ses.executeAsync(outerTx, query)); - - assertEquals(ROW_COUNT, StreamSupport.stream(rs.currentPage().spliterator(), false).count()); - - rs.closeAsync(); - - if (outerTx != null) { - if (commit) { - outerTx.commit(); - } else { - outerTx.rollback(); - } - } - } - - @Test - public void select() { - sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); - - IgniteSql sql = igniteSql(); - Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 4).build(); - - for (int i = 0; i < ROW_COUNT; ++i) { - ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i); - } - - TestPageProcessor pageProc = new TestPageProcessor(4); - await(ses.executeAsync(null, "SELECT ID FROM TEST").thenCompose(pageProc)); - - Set<Integer> rs = pageProc.result().stream().map(r -> r.intValue(0)).collect(Collectors.toSet()); - - for (int i = 0; i < ROW_COUNT; ++i) { - assertTrue(rs.remove(i), "Results invalid: " + pageProc.result()); - } - - assertTrue(rs.isEmpty()); - } - - @Test - public void metadata() { - sql("CREATE TABLE TEST(COL0 BIGINT PRIMARY KEY, COL1 VARCHAR NOT NULL)"); - - IgniteSql sql = igniteSql(); - Session ses = sql.sessionBuilder().build(); - - ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", 1L, "some string"); - - AsyncResultSet<SqlRow> rs = await(ses.executeAsync(null, "SELECT COL1, COL0 FROM TEST")); - - // Validate columns metadata. - ResultSetMetadata meta = rs.metadata(); - - assertNotNull(meta); - assertEquals(-1, meta.indexOf("COL")); - assertEquals(0, meta.indexOf("COL1")); - assertEquals(1, meta.indexOf("COL0")); - - checkMetadata(new ColumnMetadataImpl( - "COL1", - ColumnType.STRING, - CatalogUtils.DEFAULT_VARLEN_LENGTH, - ColumnMetadata.UNDEFINED_SCALE, - false, - new ColumnOriginImpl("PUBLIC", "TEST", "COL1")), - meta.columns().get(0)); - checkMetadata(new ColumnMetadataImpl( - "COL0", - ColumnType.INT64, - 19, - 0, - false, - new ColumnOriginImpl("PUBLIC", "TEST", "COL0")), - meta.columns().get(1)); - - // Validate result columns types. - assertTrue(rs.hasRowSet()); - assertEquals(1, rs.currentPageSize()); - - SqlRow row = rs.currentPage().iterator().next(); - - await(rs.closeAsync()); - - assertInstanceOf(meta.columns().get(0).valueClass(), row.value(0)); - assertInstanceOf(meta.columns().get(1).valueClass(), row.value(1)); - } - - @Test - public void sqlRow() { - IgniteSql sql = igniteSql(); - Session ses = sql.sessionBuilder().build(); - - AsyncResultSet<SqlRow> ars = await(ses.executeAsync(null, "SELECT 1 as COL_A, 2 as COL_B")); - - SqlRow r = CollectionUtils.first(ars.currentPage()); - - assertEquals(2, r.columnCount()); - assertEquals(0, r.columnIndex("COL_A")); - assertEquals(0, r.columnIndex("col_a")); - assertEquals(1, r.columnIndex("COL_B")); - assertEquals(-1, r.columnIndex("notExistColumn")); - - assertEquals(1, r.intValue("COL_A")); - assertEquals(1, r.intValue("COL_a")); - assertEquals(2, r.intValue("COL_B")); - - assertThrowsWithCause( - () -> r.intValue("notExistColumn"), - IllegalArgumentException.class, - "Column doesn't exist [name=notExistColumn]" - ); - - assertEquals(1, r.intValue(0)); - assertEquals(2, r.intValue(1)); - assertThrowsWithCause(() -> r.intValue(-2), IndexOutOfBoundsException.class); - assertThrowsWithCause(() -> r.intValue(10), IndexOutOfBoundsException.class); - - await(ars.closeAsync()); - } - +public class ItSqlAsynchronousApiTest extends ItSqlApiBaseTest { @Test public void pageSequence() { sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); @@ -582,7 +78,7 @@ public class ItSqlAsynchronousApiTest extends ClusterPerClassIntegrationTest { .flatMap(p -> StreamSupport.stream(p.spliterator(), false)) .collect(Collectors.toList()); - TestPageProcessor pageProc = new TestPageProcessor(ROW_COUNT - res.size()); + AsyncResultProcessor pageProc = new AsyncResultProcessor(ROW_COUNT - res.size()); await(ars4.fetchNextPage().thenCompose(pageProc)); res.addAll(pageProc.result()); @@ -592,234 +88,50 @@ public class ItSqlAsynchronousApiTest extends ClusterPerClassIntegrationTest { } } - @Test - public void errors() { - sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT NOT NULL)"); - - IgniteSql sql = igniteSql(); - Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 2).build(); - - for (int i = 0; i < ROW_COUNT; ++i) { - ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i); - } - - // Parse error. - checkSqlError(Sql.STMT_PARSE_ERR, "Failed to parse query", ses, "SELECT ID FROM"); - - // Validation errors. - checkSqlError(Sql.STMT_VALIDATION_ERR, "Column 'VAL0' does not allow NULLs", ses, - "INSERT INTO TEST VALUES (2, NULL)"); - - checkSqlError(Sql.STMT_VALIDATION_ERR, "Object 'NOT_EXISTING_TABLE' not found", ses, - "SELECT * FROM NOT_EXISTING_TABLE"); - - checkSqlError(Sql.STMT_VALIDATION_ERR, "Column 'NOT_EXISTING_COLUMN' not found", ses, - "SELECT NOT_EXISTING_COLUMN FROM TEST"); - - checkSqlError(Sql.STMT_VALIDATION_ERR, "Multiple statements are not allowed", ses, "SELECT 1; SELECT 2"); - - checkSqlError(Sql.STMT_VALIDATION_ERR, "Table without PRIMARY KEY is not supported", ses, - "CREATE TABLE TEST2 (VAL INT)"); - - // Execute error. - checkSqlError(Sql.RUNTIME_ERR, "/ by zero", ses, "SELECT 1 / ?", 0); - checkSqlError(Sql.RUNTIME_ERR, "/ by zero", ses, "UPDATE TEST SET val0 = val0/(val0 - ?) + " + ROW_COUNT, 0); - checkSqlError(Sql.RUNTIME_ERR, "negative substring length not allowed", ses, "SELECT SUBSTRING('foo', 1, -3)"); - - // No result set error. - { - AsyncResultSet ars = await(ses.executeAsync(null, "CREATE TABLE TEST3 (ID INT PRIMARY KEY)")); - assertThrowsSqlException( - NoRowSetExpectedException.class, - Sql.QUERY_NO_RESULT_SET_ERR, "Query has no result set", - () -> await(ars.fetchNextPage())); - } - - // Cursor closed error. - { - AsyncResultSet ars = await(ses.executeAsync(null, "SELECT * FROM TEST")); - await(ars.closeAsync()); - - assertThrowsSqlException( - CursorClosedException.class, - Sql.CURSOR_CLOSED_ERR, - "Cursor is closed", - () -> await(ars.fetchNextPage())); - } + @Override + protected ResultSet<SqlRow> executeForRead(Session ses, Transaction tx, String query, Object... args) { + return new SyncResultSetAdapter(await(ses.executeAsync(tx, query, args))); } - /** - * DDL is non-transactional. - */ - @Test - public void ddlInTransaction() { - Session ses = igniteSql().createSession(); - sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); - - { - Transaction tx = igniteTx().begin(); - try { - assertThrowsSqlException( - Sql.STMT_VALIDATION_ERR, - "DDL doesn't support transactions.", - () -> await(ses.executeAsync(tx, "CREATE TABLE TEST2(ID INT PRIMARY KEY, VAL0 INT)")) - ); - } finally { - tx.rollback(); - } - } - { - Transaction tx = igniteTx().begin(); - AsyncResultSet<SqlRow> res = await(ses.executeAsync(tx, "INSERT INTO TEST VALUES (?, ?)", -1, -1)); - assertEquals(1, res.affectedRows()); - - assertThrowsSqlException( - Sql.STMT_VALIDATION_ERR, - "DDL doesn't support transactions.", - () -> await(ses.executeAsync(tx, "CREATE TABLE TEST2(ID INT PRIMARY KEY, VAL0 INT)")) - ); - tx.commit(); - - assertTrue(await(ses.executeAsync(null, "SELECT ID FROM TEST WHERE ID = -1")).currentPage().iterator().hasNext()); - } + @Override + protected long[] executeBatch(Session ses, String sql, BatchedArguments args) { + return await(ses.executeBatchAsync(null, sql, args)); } - @Test - public void closeSession() { - sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); - - IgniteSql sql = igniteSql(); - Session ses = sql.sessionBuilder().defaultPageSize(2).build(); - - for (int i = 0; i < ROW_COUNT; ++i) { - ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i); - } - - AsyncResultSet ars0 = await(ses.executeAsync(null, "SELECT ID FROM TEST")); - - await(ses.closeAsync()); - - // Fetched page is available after cancel. - ars0.currentPage(); - - SqlException sqlEx = assertThrowsSqlException( - Sql.EXECUTION_CANCELLED_ERR, - "The query was cancelled while executing", - () -> await(ars0.fetchNextPage())); - - assertTrue(IgniteTestUtils.hasCause(sqlEx, QueryCancelledException.class, null)); - assertThrowsSqlException(Sql.SESSION_CLOSED_ERR, "Session is closed", () -> await(ses.executeAsync(null, "SELECT ID FROM TEST"))); - } - - @Test - public void batch() { - sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); - - IgniteSql sql = CLUSTER_NODES.get(0).sql(); - Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 2).build(); - - BatchedArguments args = BatchedArguments.of(0, 0); - - for (int i = 1; i < ROW_COUNT; ++i) { - args.add(i, i); - } - - long[] batchRes = await(ses.executeBatchAsync(null, "INSERT INTO TEST VALUES (?, ?)", args)); - - Arrays.stream(batchRes).forEach(r -> assertEquals(1L, r)); - - // Check that data are inserted OK - List<List<Object>> res = sql("SELECT ID FROM TEST ORDER BY ID"); - IntStream.range(0, ROW_COUNT).forEach(i -> assertEquals(i, res.get(i).get(0))); + @Override + protected ResultProcessor execute(Integer expectedPages, Transaction tx, Session ses, String sql, Object... args) { + AsyncResultProcessor asyncProcessor = new AsyncResultProcessor(expectedPages); + await(ses.executeAsync(tx, sql, args).thenCompose(asyncProcessor)); - // Check invalid query type - assertThrowsSqlException( - SqlBatchException.class, - Sql.STMT_VALIDATION_ERR, - "Invalid SQL statement type", - () -> await(ses.executeBatchAsync(null, "SELECT * FROM TEST", args))); - - assertThrowsSqlException( - SqlBatchException.class, - Sql.STMT_VALIDATION_ERR, - "Invalid SQL statement type", - () -> await(ses.executeBatchAsync(null, "CREATE TABLE TEST1(ID INT PRIMARY KEY, VAL0 INT)", args))); + return asyncProcessor; } - @Test - public void batchIncomplete() { - int err = ROW_COUNT / 2; - - sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); - - IgniteSql sql = CLUSTER_NODES.get(0).sql(); - Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 2).build(); - - BatchedArguments args = BatchedArguments.of(0, 0); - - for (int i = 1; i < ROW_COUNT; ++i) { - if (i == err) { - args.add(1, 1); - } else { - args.add(i, i); - } - } - - SqlBatchException ex = assertThrowsSqlException( - SqlBatchException.class, - Sql.CONSTRAINT_VIOLATION_ERR, - "PK unique constraint is violated", - () -> await(ses.executeBatchAsync(null, "INSERT INTO TEST VALUES (?, ?)", args)) - ); - - assertEquals(err, ex.updateCounters().length); - IntStream.range(0, ex.updateCounters().length).forEach(i -> assertEquals(1, ex.updateCounters()[i])); + @Override + protected void rollback(Transaction tx) { + await(tx.rollbackAsync()); } - @Test - public void resultSetCloseShouldFinishImplicitTransaction() throws InterruptedException { - sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); - - IgniteSql sql = igniteSql(); - Session ses = sql.sessionBuilder().defaultPageSize(2).build(); - - for (int i = 0; i < ROW_COUNT; ++i) { - ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i); - } - - CompletableFuture<AsyncResultSet<SqlRow>> f = ses.executeAsync(null, "SELECT * FROM TEST"); - - AsyncResultSet<SqlRow> ars = f.join(); - // There should be a pending transaction since not all data was read. - boolean txStarted = waitForCondition(() -> txManager().pending() == 1, 5000); - assertTrue(txStarted, "No pending transactions"); - - ars.closeAsync().join(); - assertEquals(0, txManager().pending(), "Expected no pending transactions"); + @Override + protected void commit(Transaction tx) { + await(tx.commitAsync()); } - @Test - public void resultSetFullReadShouldFinishImplicitTransaction() { - sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); - - IgniteSql sql = igniteSql(); - - // Fetch all data in one read. - Session ses = sql.sessionBuilder().defaultPageSize(100).build(); - - for (int i = 0; i < ROW_COUNT; ++i) { - ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i); - } + @Override + protected void checkDml(int expectedAffectedRows, Transaction tx, Session ses, String sql, Object... args) { + AsyncResultSet asyncRes = await(ses.executeAsync(tx, sql, args)); - CompletableFuture<AsyncResultSet<SqlRow>> f = ses.executeAsync(null, "SELECT * FROM TEST"); + assertFalse(asyncRes.wasApplied()); + assertFalse(asyncRes.hasMorePages()); + assertFalse(asyncRes.hasRowSet()); + assertEquals(expectedAffectedRows, asyncRes.affectedRows()); - AsyncResultSet<SqlRow> ars = f.join(); - assertFalse(ars.hasMorePages()); + assertNull(asyncRes.metadata()); - assertEquals(0, txManager().pending(), "Expected no pending transactions"); + await(asyncRes.closeAsync()); } - private static void checkDdl(boolean expectedApplied, Session ses, String sql, Transaction tx) { + @Override + protected void checkDdl(boolean expectedApplied, Session ses, String sql, Transaction tx) { CompletableFuture<AsyncResultSet<SqlRow>> fut = ses.executeAsync( tx, sql @@ -837,93 +149,56 @@ public class ItSqlAsynchronousApiTest extends ClusterPerClassIntegrationTest { await(asyncRes.closeAsync()); } - private static void checkDdl(boolean expectedApplied, Session ses, String sql) { - checkDdl(expectedApplied, ses, sql, null); - } - - private static <T extends IgniteException> T checkError(Class<T> expCls, Integer code, String msg, Session ses, String sql, - Object... args) { - return assertThrowsPublicException(() -> await(ses.executeAsync(null, sql, args)), expCls, code, msg); - } - - private static SqlException checkSqlError( - int code, - String msg, - Session ses, - String sql, - Object... args - ) { - return assertThrowsSqlException(code, msg, () -> await(ses.executeAsync(null, sql, args))); - } - - protected static void checkDml(int expectedAffectedRows, Session ses, String sql, Transaction tx, Object... args) { - AsyncResultSet asyncRes = await(ses.executeAsync(tx, sql, args)); - - assertFalse(asyncRes.wasApplied()); - assertFalse(asyncRes.hasMorePages()); - assertFalse(asyncRes.hasRowSet()); - assertEquals(expectedAffectedRows, asyncRes.affectedRows()); - - assertNull(asyncRes.metadata()); - - await(asyncRes.closeAsync()); - } - - private static void checkDml(int expectedAffectedRows, Session ses, String sql, Object... args) { - checkDml(expectedAffectedRows, ses, sql, null, args); - } - - static <T extends IgniteException> T assertThrowsPublicException( - RunnableX executable, - Class<T> expCls, - @Nullable Integer code, - @Nullable String msgPart - ) { - T ex = assertThrows(expCls, executable::run); - - if (code != null) { - assertEquals(new IgniteException(code).codeAsString(), ex.codeAsString()); - } - - if (msgPart != null) { - assertThat(ex.getMessage(), containsString(msgPart)); - } - - return ex; - } - - static class TestPageProcessor implements + static class AsyncResultProcessor implements ResultProcessor, Function<AsyncResultSet<SqlRow>, CompletionStage<AsyncResultSet<SqlRow>>> { - private int expectedPages; + private Integer expectedPages; + private long affectedRows; private final List<SqlRow> res = new ArrayList<>(); - TestPageProcessor(int expectedPages) { + AsyncResultProcessor(Integer expectedPages) { this.expectedPages = expectedPages; } @Override public CompletionStage<AsyncResultSet<SqlRow>> apply(AsyncResultSet<SqlRow> rs) { - expectedPages--; - - assertTrue(rs.hasRowSet()); assertFalse(rs.wasApplied()); - assertEquals(-1L, rs.affectedRows()); - assertEquals(expectedPages > 0, rs.hasMorePages(), - "hasMorePages(): [expected=" + (expectedPages > 0) + ", actual=" + rs.hasMorePages() + ']'); + //SELECT + if (rs.hasRowSet()) { + assertEquals(-1L, rs.affectedRows()); + + if (expectedPages != null) { + expectedPages--; + assertEquals(expectedPages > 0, rs.hasMorePages(), + "hasMorePages(): [expected=" + (expectedPages > 0) + ", actual=" + rs.hasMorePages() + ']'); + } - rs.currentPage().forEach(res::add); + rs.currentPage().forEach(res::add); - if (rs.hasMorePages()) { - return rs.fetchNextPage().thenCompose(this); + if (rs.hasMorePages()) { + return rs.fetchNextPage().thenCompose(this); + } + } else { //DML/DDL + affectedRows = rs.affectedRows(); + assertNotEquals(-1L, affectedRows()); } return rs.closeAsync().thenApply(v -> rs); } + @Override public List<SqlRow> result() { //noinspection AssignmentOrReturnOfFieldWithMutableType + if (expectedPages != null) { + assertEquals(0, expectedPages, "Expected to be read more pages"); + } + return res; } + + @Override + public long affectedRows() { + return affectedRows; + } } } diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java index 356ce99c5d..425a76f56f 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java @@ -57,4 +57,10 @@ public class ItSqlClientAsynchronousApiTest extends ItSqlAsynchronousApiTest { public void closeSession() { super.closeSession(); } + + @Override + @Disabled("https://issues.apache.org/jira/browse/IGNITE-20598") + public void checkTransactionsWithDml() { + super.checkTransactionsWithDml(); + } } diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java index 671f0d3a1d..5bb9e4d9ae 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java @@ -21,11 +21,10 @@ import static org.apache.ignite.internal.runner.app.client.ItAbstractThinClientT import org.apache.ignite.client.IgniteClient; import org.apache.ignite.sql.IgniteSql; -import org.apache.ignite.sql.Session; import org.apache.ignite.tx.IgniteTransactions; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Disabled; /** * Tests for synchronous client SQL API. @@ -56,19 +55,14 @@ public class ItSqlClientSynchronousApiTest extends ItSqlSynchronousApiTest { } @Override - @Test - public void dml() { - sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); - - IgniteSql sql = igniteSql(); - Session ses = sql.createSession(); - - for (int i = 0; i < ROW_COUNT; ++i) { - checkDml(1, ses, "INSERT INTO TEST VALUES (?, ?)", i, i); - } - - checkDml(ROW_COUNT, ses, "UPDATE TEST SET VAL0 = VAL0 + ?", 1); + @Disabled("IGNITE-17134") + public void closeSession() { + super.closeSession(); + } - checkDml(ROW_COUNT, ses, "DELETE FROM TEST WHERE VAL0 >= 0"); + @Override + @Disabled("https://issues.apache.org/jira/browse/IGNITE-20598") + public void checkTransactionsWithDml() { + super.checkTransactionsWithDml(); } } diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java index d616f75134..acc946bb53 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java @@ -17,539 +17,81 @@ package org.apache.ignite.internal.sql.api; -import static org.apache.ignite.internal.sql.api.ItSqlAsynchronousApiTest.assertThrowsPublicException; -import static org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; -import java.util.Arrays; -import java.util.HashSet; +import java.util.ArrayList; import java.util.List; -import java.util.Set; -import java.util.stream.IntStream; -import org.apache.ignite.Ignite; -import org.apache.ignite.internal.app.IgniteImpl; -import org.apache.ignite.internal.sql.engine.ClusterPerClassIntegrationTest; -import org.apache.ignite.internal.tx.TxManager; -import org.apache.ignite.lang.ColumnAlreadyExistsException; -import org.apache.ignite.lang.ColumnNotFoundException; -import org.apache.ignite.lang.ErrorGroups; -import org.apache.ignite.lang.ErrorGroups.Index; -import org.apache.ignite.lang.ErrorGroups.Sql; -import org.apache.ignite.lang.ErrorGroups.Transactions; -import org.apache.ignite.lang.IgniteException; -import org.apache.ignite.lang.IndexAlreadyExistsException; -import org.apache.ignite.lang.IndexNotFoundException; -import org.apache.ignite.lang.TableAlreadyExistsException; -import org.apache.ignite.lang.TableNotFoundException; import org.apache.ignite.sql.BatchedArguments; -import org.apache.ignite.sql.CursorClosedException; -import org.apache.ignite.sql.IgniteSql; -import org.apache.ignite.sql.NoRowSetExpectedException; import org.apache.ignite.sql.ResultSet; import org.apache.ignite.sql.Session; -import org.apache.ignite.sql.SqlBatchException; -import org.apache.ignite.sql.SqlException; import org.apache.ignite.sql.SqlRow; -import org.apache.ignite.table.Table; import org.apache.ignite.tx.Transaction; -import org.apache.ignite.tx.TransactionOptions; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; /** * Tests for synchronous SQL API. */ @SuppressWarnings("ThrowableNotThrown") -public class ItSqlSynchronousApiTest extends ClusterPerClassIntegrationTest { - private static final int ROW_COUNT = 16; - - @AfterEach - public void dropTables() { - for (Table t : CLUSTER_NODES.get(0).tables().tables()) { - sql("DROP TABLE " + t.name()); - } +public class ItSqlSynchronousApiTest extends ItSqlApiBaseTest { + @Override + protected ResultSet<SqlRow> executeForRead(Session ses, Transaction tx, String query, Object... args) { + return ses.execute(tx, query, args); } - @Test - @Disabled("https://issues.apache.org/jira/browse/IGNITE-20096") - public void ddl() throws Exception { - IgniteSql sql = igniteSql(); - Session ses = sql.createSession(); - - // CREATE TABLE - checkDdl(true, ses, "CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); - checkError( - TableAlreadyExistsException.class, - ErrorGroups.Table.TABLE_ALREADY_EXISTS_ERR, - "Table already exists [name=\"PUBLIC\".\"TEST\"]", - ses, - "CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)" - ); - checkSqlError( - ErrorGroups.Table.TABLE_DEFINITION_ERR, - "Can't create table with duplicate columns: ID, VAL, VAL", - ses, - "CREATE TABLE TEST1(ID INT PRIMARY KEY, VAL INT, VAL INT)" - ); - checkDdl(false, ses, "CREATE TABLE IF NOT EXISTS TEST(ID INT PRIMARY KEY, VAL VARCHAR)"); - - // ADD COLUMN - checkDdl(true, ses, "ALTER TABLE TEST ADD COLUMN VAL1 VARCHAR"); - checkError( - TableNotFoundException.class, - ErrorGroups.Table.TABLE_NOT_FOUND_ERR, - "The table does not exist [name=\"PUBLIC\".\"NOT_EXISTS_TABLE\"]", - ses, - "ALTER TABLE NOT_EXISTS_TABLE ADD COLUMN VAL1 VARCHAR" - ); - checkDdl(false, ses, "ALTER TABLE IF EXISTS NOT_EXISTS_TABLE ADD COLUMN VAL1 VARCHAR"); - checkError( - ColumnAlreadyExistsException.class, - ErrorGroups.Table.COLUMN_ALREADY_EXISTS_ERR, - "Column already exists [name=\"VAL1\"]", - ses, - "ALTER TABLE TEST ADD COLUMN VAL1 INT" - ); - - // CREATE INDEX - checkDdl(true, ses, "CREATE INDEX TEST_IDX ON TEST(VAL0)"); - checkError( - IndexAlreadyExistsException.class, - Index.INDEX_ALREADY_EXISTS_ERR, - "Index already exists [name=\"PUBLIC\".\"TEST_IDX\"]", - ses, - "CREATE INDEX TEST_IDX ON TEST(VAL1)" - ); - checkDdl(false, ses, "CREATE INDEX IF NOT EXISTS TEST_IDX ON TEST(VAL1)"); - - // TODO: IGNITE-19150 We are waiting for schema synchronization to avoid races to create and destroy indexes - waitForIndexBuild("TEST", "TEST_IDX"); - - checkDdl(true, ses, "DROP INDEX TESt_iDX"); - checkDdl(true, ses, "CREATE INDEX TEST_IDX1 ON TEST(VAL0)"); - checkDdl(true, ses, "CREATE INDEX TEST_IDX2 ON TEST(VAL0)"); - checkDdl(true, ses, "CREATE INDEX TEST_IDX3 ON TEST(ID, VAL0, VAL1)"); - checkSqlError( - Index.INVALID_INDEX_DEFINITION_ERR, - "Can't create index on duplicate columns: VAL0, VAL0", - ses, - "CREATE INDEX TEST_IDX4 ON TEST(VAL0, VAL0)" - ); - - checkSqlError( - Sql.STMT_VALIDATION_ERR, - "Can`t delete column(s). Column VAL1 is used by indexes [TEST_IDX3].", - ses, - "ALTER TABLE TEST DROP COLUMN val1" - ); - - SqlException ex = checkSqlError( - Sql.STMT_VALIDATION_ERR, - "Can`t delete column(s).", - ses, - "ALTER TABLE TEST DROP COLUMN (val0, val1)" - ); - - String msg = ex.getMessage(); - String explainMsg = "Unexpected error message: " + msg; - - assertTrue(msg.contains("Column VAL0 is used by indexes ["), explainMsg); - assertTrue(msg.contains("TEST_IDX1") && msg.contains("TEST_IDX2") && msg.contains("TEST_IDX3"), explainMsg); - assertTrue(msg.contains("Column VAL1 is used by indexes [TEST_IDX3]"), explainMsg); - - checkSqlError( - Sql.STMT_VALIDATION_ERR, - "Can`t delete column, belongs to primary key: [name=ID]", - ses, - "ALTER TABLE TEST DROP COLUMN id" - ); - - // TODO: IGNITE-19150 We are waiting for schema synchronization to avoid races to create and destroy indexes - waitForIndexBuild("TEST", "TEST_IDX3"); - checkDdl(true, ses, "DROP INDEX TESt_iDX3"); - - // DROP COLUMNS - checkDdl(true, ses, "ALTER TABLE TEST DROP COLUMN VAL1"); - checkError( - TableNotFoundException.class, - ErrorGroups.Table.TABLE_NOT_FOUND_ERR, - "The table does not exist [name=\"PUBLIC\".\"NOT_EXISTS_TABLE\"]", - ses, - "ALTER TABLE NOT_EXISTS_TABLE DROP COLUMN VAL1" - ); - checkDdl(false, ses, "ALTER TABLE IF EXISTS NOT_EXISTS_TABLE DROP COLUMN VAL1"); - checkError( - ColumnNotFoundException.class, - ErrorGroups.Table.COLUMN_NOT_FOUND_ERR, - "Column does not exist [tableName=\"PUBLIC\".\"TEST\", columnName=\"VAL1\"]", - ses, - "ALTER TABLE TEST DROP COLUMN VAL1" - ); - - // DROP TABLE - checkDdl(false, ses, "DROP TABLE IF EXISTS NOT_EXISTS_TABLE"); - - checkDdl(true, ses, "DROP TABLE TEST"); - checkError( - TableNotFoundException.class, - ErrorGroups.Table.TABLE_NOT_FOUND_ERR, - "The table does not exist [name=\"PUBLIC\".\"TEST\"]", - ses, - "DROP TABLE TEST" - ); - - checkDdl(false, ses, "DROP INDEX IF EXISTS TEST_IDX"); - - checkError( - IndexNotFoundException.class, - Index.INDEX_NOT_FOUND_ERR, - "Index does not exist [name=\"PUBLIC\".\"TEST_IDX\"]", ses, - "DROP INDEX TEST_IDX" - ); - } - - @Test - public void dml() { - sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); - - IgniteSql sql = igniteSql(); - Session ses = sql.createSession(); - - TxManager txManagerInternal = txManager(); - - int txPrevCnt = txManagerInternal.finished(); - - for (int i = 0; i < ROW_COUNT; ++i) { - checkDml(1, ses, "INSERT INTO TEST VALUES (?, ?)", i, i); - } - - assertEquals(ROW_COUNT, txManagerInternal.finished() - txPrevCnt); - - assertEquals(0, txManagerInternal.pending()); - - checkDml(ROW_COUNT, ses, "UPDATE TEST SET VAL0 = VAL0 + ?", 1); - - checkDml(ROW_COUNT, ses, "DELETE FROM TEST WHERE VAL0 >= 0"); - } - - @SuppressWarnings("UnstableApiUsage") - @Test - public void select() throws Exception { - sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); - - IgniteSql sql = igniteSql(); - Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 4).build(); - - for (int i = 0; i < ROW_COUNT; ++i) { - ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i); - } - - ResultSet<SqlRow> rs = ses.execute(null, "SELECT ID FROM TEST"); - - Set<Integer> set = new HashSet<>(); - - rs.forEachRemaining(r -> set.add(r.intValue(0))); - - rs.close(); - - for (int i = 0; i < ROW_COUNT; ++i) { - assertTrue(set.remove(i), "Results invalid: " + rs); - } - - assertTrue(set.isEmpty()); - } - - @Test - public void errors() throws InterruptedException { - sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT NOT NULL)"); - IgniteSql sql = igniteSql(); - Session ses = sql.sessionBuilder().defaultPageSize(2).build(); - - for (int i = 0; i < ROW_COUNT; ++i) { - ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i); - } - - // Parse error. - checkSqlError(Sql.STMT_PARSE_ERR, "Failed to parse query", ses, "SELECT ID FROM"); - - // Validation errors. - checkSqlError(Sql.STMT_VALIDATION_ERR, "Column 'VAL0' does not allow NULLs", ses, - "INSERT INTO TEST VALUES (2, NULL)"); - - checkSqlError(Sql.STMT_VALIDATION_ERR, "Object 'NOT_EXISTING_TABLE' not found", ses, - "SELECT * FROM NOT_EXISTING_TABLE"); - - checkSqlError(Sql.STMT_VALIDATION_ERR, "Column 'NOT_EXISTING_COLUMN' not found", ses, - "SELECT NOT_EXISTING_COLUMN FROM TEST"); - - checkSqlError(Sql.STMT_VALIDATION_ERR, "Multiple statements are not allowed", ses, "SELECT 1; SELECT 2"); - - checkSqlError(Sql.STMT_VALIDATION_ERR, "Table without PRIMARY KEY is not supported", ses, - "CREATE TABLE TEST2 (VAL INT)"); - - // Execute error. - checkSqlError(Sql.RUNTIME_ERR, "/ by zero", ses, "SELECT 1 / ?", 0); - checkSqlError(Sql.RUNTIME_ERR, "/ by zero", ses, "UPDATE TEST SET val0 = val0/(val0 - ?) + " + ROW_COUNT, 0); - checkSqlError(Sql.RUNTIME_ERR, "negative substring length not allowed", ses, "SELECT SUBSTRING('foo', 1, -3)"); - - // No result set error. - { - ResultSet rs = ses.execute(null, "CREATE TABLE TEST3 (ID INT PRIMARY KEY)"); - assertThrowsSqlException(NoRowSetExpectedException.class, Sql.QUERY_NO_RESULT_SET_ERR, "Query has no result set", rs::next); - } - - // Cursor closed error. - { - ResultSet rs = ses.execute(null, "SELECT * FROM TEST"); - Thread.sleep(300); // ResultSetImpl fetches next page in background, wait to it to complete to avoid flakiness. - rs.close(); - assertThrowsSqlException(CursorClosedException.class, Sql.CURSOR_CLOSED_ERR, "Cursor is closed", - () -> rs.forEachRemaining(Object::hashCode)); - } - } - - /** - * DDL is non-transactional. - */ - @Test - public void ddlInTransaction() { - Session ses = igniteSql().createSession(); - sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); - - { - Transaction tx = igniteTx().begin(); - try { - assertThrowsSqlException( - Sql.STMT_VALIDATION_ERR, - "DDL doesn't support transactions.", - () -> ses.execute(tx, "CREATE TABLE TEST2(ID INT PRIMARY KEY, VAL0 INT)") - ); - } finally { - tx.rollback(); - } - } - { - Transaction tx = igniteTx().begin(); - ResultSet<SqlRow> res = ses.execute(tx, "INSERT INTO TEST VALUES (?, ?)", -1, -1); - assertEquals(1, res.affectedRows()); - - assertThrowsSqlException( - Sql.STMT_VALIDATION_ERR, - "DDL doesn't support transactions.", - () -> ses.execute(tx, "CREATE TABLE TEST2(ID INT PRIMARY KEY, VAL0 INT)") - ); - tx.commit(); - - assertTrue(ses.execute(null, "SELECT ID FROM TEST WHERE ID = -1").hasNext()); - } - - assertEquals(0, ((IgniteImpl) CLUSTER_NODES.get(0)).txManager().pending()); + @Override + protected long[] executeBatch(Session ses, String sql, BatchedArguments args) { + return ses.executeBatch(null, sql, args); } - @ParameterizedTest - @ValueSource(strings = { - "INSERT INTO tst VALUES (2, ?)", - "SELECT * FROM tst WHERE id = ? " - }) - public void runtimeErrorInDmlCausesTransactionToFail(String query) { - sql("CREATE TABLE tst(id INTEGER PRIMARY KEY, val INTEGER)"); - - sql("INSERT INTO tst VALUES (?,?)", 1, 1); - try (Session ses = igniteSql().createSession()) { - Transaction tx = igniteTx().begin(); - String dmlQuery = "UPDATE tst SET val = val/(val - ?) + 1"; + @Override + protected ResultProcessor execute(Integer expectedPages, Transaction tx, Session ses, String sql, Object... args) { + SyncPageProcessor syncProcessor = new SyncPageProcessor(); - assertThrowsSqlException( - Sql.RUNTIME_ERR, - "/ by zero", - () -> ses.execute(tx, dmlQuery, 1).affectedRows()); + ResultSet<SqlRow> rs = ses.execute(tx, sql, args); + syncProcessor.process(rs); - IgniteException err = assertThrows(IgniteException.class, () -> { - ResultSet<SqlRow> rs = ses.execute(tx, query, 2); - if (rs.hasRowSet()) { - assertTrue(rs.hasNext()); - } else { - assertTrue(rs.wasApplied()); - } - }); - - assertEquals(Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR, err.code(), err.toString()); - } + return syncProcessor; } - @ParameterizedTest - @ValueSource(strings = { - "INSERT INTO tst VALUES (2, ?)", - "SELECT * FROM tst WHERE id = ? " - }) - public void runtimeErrorInQueryCausesTransactionToFail(String query) { - sql("CREATE TABLE tst(id INTEGER PRIMARY KEY, val INTEGER)"); - - sql("INSERT INTO tst VALUES (?,?)", 1, 1); - - try (Session ses = igniteSql().createSession()) { - Transaction tx = igniteTx().begin(); - - assertThrowsSqlException( - Sql.RUNTIME_ERR, - "/ by zero", - () -> ses.execute(tx, "SELECT val/? FROM tst WHERE id=?", 0, 1).next()); + protected ResultProcessor execute(int expectedPages, Transaction tx, Session ses, String sql, Object... args) { + SyncPageProcessor syncProcessor = new SyncPageProcessor(); - IgniteException err = assertThrows(IgniteException.class, () -> { - ResultSet<SqlRow> rs = ses.execute(tx, query, 2); - if (rs.hasRowSet()) { - assertTrue(rs.hasNext()); - } else { - assertTrue(rs.wasApplied()); - } - }); + ResultSet<SqlRow> rs = ses.execute(tx, sql, args); + syncProcessor.process(rs); - assertEquals(Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR, err.code(), err.toString()); - } + return syncProcessor; } - @Test - @Disabled("https://issues.apache.org/jira/browse/IGNITE-20534") - public void testLockIsNotReleasedAfterTxRollback() { - Ignite ignite = CLUSTER_NODES.get(0); - IgniteSql sql = ignite.sql(); - - try (Session ses = ignite.sql().createSession()) { - ses.execute(null, "CREATE TABLE IF NOT EXISTS tst(id INTEGER PRIMARY KEY, val INTEGER)").affectedRows(); - } - - try (Session session = sql.createSession()) { - Transaction tx = ignite.transactions().begin(); - - assertThrows(RuntimeException.class, () -> session.execute(tx, "SELECT 1/0")); - - tx.rollback(); - - session.execute(tx, "INSERT INTO tst VALUES (1, 1)"); - } - - try (Session session = sql.createSession()) { - Transaction tx = ignite.transactions().begin(new TransactionOptions().readOnly(false)); - - session.execute(tx, "INSERT INTO tst VALUES (1, 1)"); - - tx.commit(); - } + @Override + protected void rollback(Transaction tx) { + tx.rollback(); } - @Test - public void batch() { - sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); - - IgniteSql sql = CLUSTER_NODES.get(0).sql(); - Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 2).build(); - - BatchedArguments args = BatchedArguments.of(0, 0); - - for (int i = 1; i < ROW_COUNT; ++i) { - args.add(i, i); - } - - long[] batchRes = ses.executeBatch(null, "INSERT INTO TEST VALUES (?, ?)", args); - - Arrays.stream(batchRes).forEach(r -> assertEquals(1L, r)); - - // Check that data are inserted OK - List<List<Object>> res = sql("SELECT ID FROM TEST ORDER BY ID"); - IntStream.range(0, ROW_COUNT).forEach(i -> assertEquals(i, res.get(i).get(0))); - - // Check invalid query type - assertThrowsSqlException( - SqlBatchException.class, - Sql.STMT_VALIDATION_ERR, - "Invalid SQL statement type", - () -> ses.executeBatch(null, "SELECT * FROM TEST", args) - ); - - assertThrowsSqlException( - SqlBatchException.class, - Sql.STMT_VALIDATION_ERR, - "Invalid SQL statement type", - () -> ses.executeBatch(null, "CREATE TABLE TEST1(ID INT PRIMARY KEY, VAL0 INT)", args) - ); + @Override + protected void commit(Transaction tx) { + tx.commit(); } - @Test - public void batchIncomplete() { - int err = ROW_COUNT / 2; - - sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); - - IgniteSql sql = CLUSTER_NODES.get(0).sql(); - Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 2).build(); - - BatchedArguments args = BatchedArguments.of(0, 0); - - for (int i = 1; i < ROW_COUNT; ++i) { - if (i == err) { - args.add(1, 1); - } else { - args.add(i, i); - } - } - - SqlBatchException batchEx = assertThrows( - SqlBatchException.class, - () -> ses.executeBatch(null, "INSERT INTO TEST VALUES (?, ?)", args) + @Override + protected void checkDml(int expectedAffectedRows, Transaction tx, Session ses, String sql, Object... args) { + ResultSet res = ses.execute( + tx, + sql, + args ); - assertEquals(Sql.CONSTRAINT_VIOLATION_ERR, batchEx.code()); - assertEquals(err, batchEx.updateCounters().length); - IntStream.range(0, batchEx.updateCounters().length).forEach(i -> assertEquals(1, batchEx.updateCounters()[i])); - } - - @Test - public void resultSetCloseShouldFinishImplicitTransaction() { - sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); - - IgniteSql sql = igniteSql(); - Session ses = sql.sessionBuilder().defaultPageSize(2).build(); - - for (int i = 0; i < ROW_COUNT; ++i) { - ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i); - } - - ResultSet<?> rs = ses.execute(null, "SELECT * FROM TEST"); - assertEquals(1, txManager().pending()); - rs.close(); - assertEquals(0, txManager().pending(), "Expected no pending transactions"); - } - - @Test - public void resultSetFullReadShouldFinishImplicitTransaction() { - sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); - for (int i = 0; i < ROW_COUNT; ++i) { - sql("INSERT INTO TEST VALUES (?, ?)", i, i); - } - - IgniteSql sql = igniteSql(); - - // Fetch all data in one read. - Session ses = sql.sessionBuilder().defaultPageSize(100).build(); - ResultSet<SqlRow> rs = ses.execute(null, "SELECT * FROM TEST"); - - while (rs.hasNext()) { - rs.next(); - } + assertFalse(res.wasApplied()); + assertFalse(res.hasRowSet()); + assertEquals(expectedAffectedRows, res.affectedRows()); - assertEquals(0, txManager().pending(), "Expected no pending transactions"); + res.close(); } - private static void checkDdl(boolean expectedApplied, Session ses, String sql) { + @Override + protected void checkDdl(boolean expectedApplied, Session ses, String sql, Transaction tx) { ResultSet res = ses.execute( - null, + tx, sql ); @@ -560,32 +102,24 @@ public class ItSqlSynchronousApiTest extends ClusterPerClassIntegrationTest { res.close(); } - private static <T extends IgniteException> T checkError(Class<T> expCls, Integer code, String msg, Session ses, String sql, - Object... args) { - return assertThrowsPublicException(() -> ses.execute(null, sql, args), expCls, code, msg); - } + static class SyncPageProcessor implements ResultProcessor { + private final List<SqlRow> res = new ArrayList<>(); + private long affectedRows; - private static SqlException checkSqlError( - int code, - String msg, - Session ses, - String sql, - Object... args - ) { - return assertThrowsSqlException(code, msg, () -> ses.execute(null, sql, args)); - } - - static void checkDml(int expectedAffectedRows, Session ses, String sql, Object... args) { - ResultSet res = ses.execute( - null, - sql, - args - ); + @Override + public List<SqlRow> result() { + //noinspection AssignmentOrReturnOfFieldWithMutableType + return res; + } - assertFalse(res.wasApplied()); - assertFalse(res.hasRowSet()); - assertEquals(expectedAffectedRows, res.affectedRows()); + @Override + public long affectedRows() { + return affectedRows; + } - res.close(); + public void process(ResultSet<SqlRow> resultSet) { + affectedRows = resultSet.affectedRows(); + resultSet.forEachRemaining(res::add); + } } } diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCreateTableDdlTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCreateTableDdlTest.java index ac9e206455..f46465ea17 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCreateTableDdlTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCreateTableDdlTest.java @@ -30,7 +30,6 @@ import org.apache.ignite.internal.schema.Column; import org.apache.ignite.internal.table.TableImpl; import org.apache.ignite.lang.ErrorGroups.Sql; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; /** @@ -119,17 +118,16 @@ public class ItCreateTableDdlTest extends ClusterPerClassIntegrationTest { * Check invalid colocation columns configuration: - not PK columns; - duplicates colocation columns. */ @Test - @Disabled("IGNITE-20149") public void invalidColocationColumns() { assertThrowsSqlException( Sql.STMT_VALIDATION_ERR, - "Colocation columns must be subset of primary key", + "Failed to validate query. Colocation column 'VAL' is not part of PK", () -> sql("CREATE TABLE T0(ID0 INT, ID1 INT, VAL INT, PRIMARY KEY (ID1, ID0)) COLOCATE (ID0, VAL)") ); assertThrowsSqlException( Sql.STMT_VALIDATION_ERR, - "Colocation columns contains duplicates: [duplicates=[ID1]]]", + "Failed to validate query. Colocation column 'ID1' specified more that once", () -> sql("CREATE TABLE T0(ID0 INT, ID1 INT, VAL INT, PRIMARY KEY (ID1, ID0)) COLOCATE (ID1, ID0, ID1)") ); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/lang/SqlExceptionMapperUtil.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/lang/SqlExceptionMapperUtil.java new file mode 100644 index 0000000000..a36119ccca --- /dev/null +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/lang/SqlExceptionMapperUtil.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.lang; + +import static org.apache.ignite.internal.lang.IgniteExceptionMapperUtil.mapToPublicException; +import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR; + +import org.apache.ignite.lang.ErrorGroups.Common; +import org.apache.ignite.lang.TraceableException; +import org.apache.ignite.sql.SqlException; + +/** + * This utility class provides an ability to map Ignite internal exceptions to public SqlException. + */ +public class SqlExceptionMapperUtil { + + /** + * This method provides a mapping from internal exception to SQL public ones. + * + * <p>The rules of mapping are the following:</p> + * <ul> + * <li>any instance of {@link Error} is returned as is, except {@link AssertionError} + * that will always be mapped to {@link SqlException} with the {@link Common#INTERNAL_ERR} error code.</li> + * <li>any instance of {@link TraceableException} is wrapped into {@link SqlException} + * with the original {@link TraceableException#traceId() traceUd} and {@link TraceableException#code() code}.</li> + * <li>if there are no any mappers that can do a mapping from the given error to a public exception, + * then {@link SqlException} with the {@link Common#INTERNAL_ERR} error code is returned.</li> + * </ul> + * + * @param origin Exception to be mapped. + * @return Public exception. + */ + public static Throwable mapToPublicSqlException(Throwable origin) { + Throwable e = mapToPublicException(origin); + if (e instanceof Error) { + return e; + } + if (e instanceof SqlException) { + return e; + } + + if (e instanceof TraceableException) { + TraceableException traceable = (TraceableException) e; + return new SqlException(traceable.traceId(), traceable.code(), e.getMessage(), e); + } + + return new SqlException(INTERNAL_ERR, origin); + } +} diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java index 6810ec8683..f947e6a7af 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java @@ -17,7 +17,7 @@ package org.apache.ignite.internal.sql.api; -import static org.apache.ignite.internal.lang.IgniteExceptionMapperUtil.mapToPublicException; +import static org.apache.ignite.internal.lang.SqlExceptionMapperUtil.mapToPublicSqlException; import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow; import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR; import static org.apache.ignite.lang.ErrorGroups.Sql.SESSION_CLOSED_ERR; @@ -192,7 +192,7 @@ public class SessionImpl implements AbstractSession { ) ); } catch (Exception e) { - return CompletableFuture.failedFuture(mapToPublicException(e)); + return CompletableFuture.failedFuture(mapToPublicSqlException(e)); } finally { busyLock.leaveBusy(); } @@ -205,7 +205,7 @@ public class SessionImpl implements AbstractSession { closeInternal(); } - throw new CompletionException(mapToPublicException(cause)); + throw new CompletionException(mapToPublicSqlException(cause)); }); } @@ -282,7 +282,7 @@ public class SessionImpl implements AbstractSession { throw (CancellationException) cause; } - Throwable t = mapToPublicException(cause); + Throwable t = mapToPublicSqlException(cause); if (t instanceof TraceableException) { throw new SqlBatchException( @@ -305,7 +305,7 @@ public class SessionImpl implements AbstractSession { return resFut; } catch (Exception e) { - return CompletableFuture.failedFuture(mapToPublicException(e)); + return CompletableFuture.failedFuture(mapToPublicSqlException(e)); } finally { busyLock.leaveBusy(); } @@ -367,7 +367,7 @@ public class SessionImpl implements AbstractSession { return qryProc.closeSession(sessionId); } catch (Exception e) { - return CompletableFuture.failedFuture(mapToPublicException(e)); + return CompletableFuture.failedFuture(mapToPublicSqlException(e)); } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java index eaffd75867..b81c88f74c 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java @@ -19,7 +19,7 @@ package org.apache.ignite.internal.sql.engine; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; -import org.apache.ignite.internal.lang.IgniteExceptionMapperUtil; +import org.apache.ignite.internal.lang.SqlExceptionMapperUtil; import org.apache.ignite.internal.util.AsyncCursor; import org.apache.ignite.internal.util.ExceptionUtils; import org.apache.ignite.sql.ResultSetMetadata; @@ -98,6 +98,6 @@ public class AsyncSqlCursorImpl<T> implements AsyncSqlCursor<T> { private static Throwable wrapIfNecessary(Throwable t) { Throwable err = ExceptionUtils.unwrapCause(t); - return IgniteExceptionMapperUtil.mapToPublicException(err); + return SqlExceptionMapperUtil.mapToPublicSqlException(err); } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java index f82b8807d5..6bbd0a352c 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java @@ -40,7 +40,7 @@ import org.apache.calcite.sql.SqlExplain; import org.apache.calcite.sql.SqlExplainLevel; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; -import org.apache.ignite.internal.lang.IgniteExceptionMapperUtil; +import org.apache.ignite.internal.lang.SqlExceptionMapperUtil; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.metrics.MetricManager; @@ -192,7 +192,7 @@ public class PrepareServiceImpl implements PrepareService { "Planning of a query aborted due to planner timeout threshold is reached"); } - throw new CompletionException(IgniteExceptionMapperUtil.mapToPublicException(th)); + throw new CompletionException(SqlExceptionMapperUtil.mapToPublicSqlException(th)); } ); } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/lang/SqlExceptionMapperUtilTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/lang/SqlExceptionMapperUtilTest.java new file mode 100644 index 0000000000..807909ecdc --- /dev/null +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/lang/SqlExceptionMapperUtilTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.lang; + +import static org.apache.ignite.internal.lang.SqlExceptionMapperUtil.mapToPublicSqlException; +import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR; +import static org.apache.ignite.lang.ErrorGroups.Sql.EXECUTION_CANCELLED_ERR; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertSame; + +import org.apache.ignite.sql.NoRowSetExpectedException; +import org.apache.ignite.sql.SqlException; +import org.junit.jupiter.api.Test; + +/** + * Tests mapping internal exceptions to public SqlException. + */ +class SqlExceptionMapperUtilTest { + /** + * Tests a default mapping of internal exceptions passed from the sql engine. + */ + @Test + public void testSqlInternalExceptionDefaultMapping() { + CustomNoMappingException internalSqlErr = new CustomNoMappingException(EXECUTION_CANCELLED_ERR); + Throwable mappedErr = mapToPublicSqlException(internalSqlErr); + + assertThat(mappedErr, instanceOf(SqlException.class)); + + SqlException mappedSqlErr = (SqlException) mappedErr; + + assertThat("Mapped exception should have the same trace identifier.", mappedSqlErr.traceId(), is(internalSqlErr.traceId())); + assertThat("Mapped exception shouldn't have the same error code.", mappedSqlErr.code(), is(INTERNAL_ERR)); + } + + /** + * Tests a default mapping of internal exceptions passed from the sql engine. + */ + @Test + public void testSqlInternalExceptionDefaultMappingForSqlException() { + NoRowSetExpectedException sqlErr = new NoRowSetExpectedException(); + + Throwable mappedErr = mapToPublicSqlException(sqlErr); + + assertSame(sqlErr, mappedErr); + } + + /** + * Test exception. + */ + public static class CustomNoMappingException extends IgniteInternalException { + /** Serial version UID. */ + private static final long serialVersionUID = 0L; + + /** + * Creates a new instance of CustomNoMappingException with given code. + */ + public CustomNoMappingException(int code) { + super(code, "Test internal exception [err=no mapping]"); + } + } +} diff --git a/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/SqlTestUtils.java b/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/SqlTestUtils.java index acb0e4f73d..7c7f1e71a6 100644 --- a/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/SqlTestUtils.java +++ b/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/SqlTestUtils.java @@ -34,9 +34,12 @@ import java.time.ZoneId; import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.BitSet; +import java.util.Iterator; import java.util.List; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.ignite.Ignite; import org.apache.ignite.internal.sql.engine.type.UuidType; @@ -91,6 +94,11 @@ public class SqlTestUtils { return ex; } + public static <T> Stream<T> asStream(Iterator<T> sourceIterator) { + Iterable<T> iterable = () -> sourceIterator; + return StreamSupport.stream(iterable.spliterator(), false); + } + /** * Convert {@link ColumnType} to string representation of SQL type. *