This is an automated email from the ASF dual-hosted git repository. zstan pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 89eb752c99 IGNITE-19493 Sql. Change query execution flow - Fixes #2162. 89eb752c99 is described below commit 89eb752c9981f0880de70249329637196558bb46 Author: zstan <stanilov...@gmail.com> AuthorDate: Thu Jun 8 10:56:07 2023 +0300 IGNITE-19493 Sql. Change query execution flow - Fixes #2162. Signed-off-by: zstan <stanilov...@gmail.com> --- .../ignite/lang/SchemaNotFoundException.java | 49 ++++++++++ .../ignite/internal/sql/api/ItCommonApiTest.java | 100 +++++++++++++++++++++ .../types/timestamp/test_incorrect_timestamp.test | 1 - .../internal/sql/engine/SqlQueryProcessor.java | 37 +++++--- .../prepare/ddl/DdlSqlToCommandConverter.java | 4 +- .../internal/sql/engine/schema/IgniteSchema.java | 2 +- .../sql/engine/schema/SqlSchemaManagerImpl.java | 6 +- .../engine/framework/PredefinedSchemaManager.java | 2 +- 8 files changed, 179 insertions(+), 22 deletions(-) diff --git a/modules/api/src/main/java/org/apache/ignite/lang/SchemaNotFoundException.java b/modules/api/src/main/java/org/apache/ignite/lang/SchemaNotFoundException.java new file mode 100644 index 0000000000..559bb091d6 --- /dev/null +++ b/modules/api/src/main/java/org/apache/ignite/lang/SchemaNotFoundException.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.lang; + +import static org.apache.ignite.lang.ErrorGroups.Sql.SCHEMA_NOT_FOUND_ERR; +import static org.apache.ignite.lang.IgniteStringFormatter.format; + +import java.util.UUID; + +/** + * Exception is thrown when a specified schema cannot be found. + */ +public class SchemaNotFoundException extends IgniteException { + /** + * Creates an exception with the given schema name. + * + * @param schemaName Schema name. + */ + public SchemaNotFoundException(String schemaName) { + super(SCHEMA_NOT_FOUND_ERR, format("Schema not found [schemaName={}]", schemaName)); + } + + /** + * Creates an exception with the given trace ID, error code, detailed message, and cause. + * + * @param traceId Unique identifier of the exception. + * @param code Full error code. + * @param message Detailed message. + * @param cause Optional nested exception (can be {@code null}). + */ + public SchemaNotFoundException(UUID traceId, int code, String message, Throwable cause) { + super(traceId, code, message, cause); + } +} diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java index 1b535794d0..326177814c 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java @@ -23,11 +23,14 @@ import static org.apache.ignite.internal.testframework.IgniteTestUtils.await; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; import static org.apache.ignite.lang.ErrorGroups.Sql; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import java.time.Instant; import java.time.LocalDateTime; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import org.apache.calcite.schema.SchemaPlus; import org.apache.ignite.Ignite; import org.apache.ignite.internal.app.IgniteImpl; import org.apache.ignite.internal.schema.testutils.SchemaConfigurationConverter; @@ -38,7 +41,12 @@ import org.apache.ignite.internal.schema.testutils.definition.TableDefinition; import org.apache.ignite.internal.sql.engine.ClusterPerClassIntegrationTest; import org.apache.ignite.internal.sql.engine.SqlQueryProcessor; import org.apache.ignite.internal.sql.engine.exec.ExecutionCancelledException; +import org.apache.ignite.internal.sql.engine.schema.IgniteTable; +import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager; import org.apache.ignite.internal.table.distributed.TableManager; +import org.apache.ignite.internal.testframework.IgniteTestUtils; +import org.apache.ignite.internal.tx.InternalTransaction; +import org.apache.ignite.internal.tx.TxManager; import org.apache.ignite.sql.IgniteSql; import org.apache.ignite.sql.ResultSet; import org.apache.ignite.sql.Session; @@ -46,6 +54,8 @@ import org.apache.ignite.sql.SqlException; import org.apache.ignite.sql.SqlRow; import org.apache.ignite.table.Table; import org.apache.ignite.table.Tuple; +import org.apache.ignite.tx.Transaction; +import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.Test; /** Test common SQL API. */ @@ -168,4 +178,94 @@ public class ItCommonApiTest extends ClusterPerClassIntegrationTest { assertEquals(ins, res.next().timestampValue(3)); } } + + /** Check transaction change status with erroneous statements. */ + @Test + public void testTxStateChangedOnErroneousOp() { + sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"); + + // TODO: need to be refactored after https://issues.apache.org/jira/browse/IGNITE-19663 + TxManager txManagerInternal = + (TxManager) IgniteTestUtils.getFieldValue(CLUSTER_NODES.get(0), IgniteImpl.class, "txManager"); + + SqlQueryProcessor queryProc = + (SqlQueryProcessor) IgniteTestUtils.getFieldValue(CLUSTER_NODES.get(0), IgniteImpl.class, "qryEngine"); + + SqlSchemaManager oldManager = + (SqlSchemaManager) IgniteTestUtils.getFieldValue(queryProc, SqlQueryProcessor.class, "sqlSchemaManager"); + + int txPrevCnt = txManagerInternal.finished(); + + Transaction tx = CLUSTER_NODES.get(0).transactions().begin(); + + try { + sql(tx, "INSERT INTO PUBLIC.TEST VALUES(1, 1)"); + sql(tx, "INSERT INTO NOTEXIST.TEST VALUES(1, 1)"); + } catch (Throwable ignore) { + // No op. + } + + assertEquals(0, txManagerInternal.finished() - txPrevCnt); + InternalTransaction tx0 = (InternalTransaction) tx; + assertNull(tx0.state()); + + tx.rollback(); + assertEquals(1, txManagerInternal.finished() - txPrevCnt); + + sql("INSERT INTO TEST VALUES(1, 1)"); + assertEquals(2, txManagerInternal.finished() - txPrevCnt); + + var schemaManager = new ErroneousSchemaManager(); + + // TODO: refactor after https://issues.apache.org/jira/browse/IGNITE-17694 + IgniteTestUtils.setFieldValue(queryProc, "sqlSchemaManager", schemaManager); + + try { + sql("SELECT a FROM NOTEXIST.TEST"); + } catch (Throwable ignore) { + // No op. + } + + try { + sql("INSERT INTO NOTEXIST.TEST VALUES(1, 1)"); + } catch (Throwable ignore) { + // No op. + } + + assertEquals(2, txManagerInternal.finished() - txPrevCnt); + + IgniteTestUtils.setFieldValue(queryProc, "sqlSchemaManager", oldManager); + } + + private static class ErroneousSchemaManager implements SqlSchemaManager { + /** {@inheritDoc} */ + @Override + public SchemaPlus schema(@Nullable String schema) { + return null; + } + + /** {@inheritDoc} */ + @Override + public SchemaPlus schema(@Nullable String name, int version) { + return null; + } + + /** {@inheritDoc} */ + @Override + public IgniteTable tableById(int id) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override + public CompletableFuture<SchemaPlus> actualSchemaAsync(long ver) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override + public SchemaPlus activeSchema(@Nullable String name, long timestamp) { + throw new UnsupportedOperationException(); + } + } } diff --git a/modules/runner/src/integrationTest/sql/types/timestamp/test_incorrect_timestamp.test b/modules/runner/src/integrationTest/sql/types/timestamp/test_incorrect_timestamp.test index 8901fc6e3b..cc64d410c1 100644 --- a/modules/runner/src/integrationTest/sql/types/timestamp/test_incorrect_timestamp.test +++ b/modules/runner/src/integrationTest/sql/types/timestamp/test_incorrect_timestamp.test @@ -1,7 +1,6 @@ # name: test/sql/types/timestamp/test_incorrect_timestamp.test # description: Test out of range/incorrect timestamp formats # group: [timestamp] -# Ignore https://issues.apache.org/jira/browse/IGNITE-15623 statement ok PRAGMA enable_verification diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java index 526c472b6c..64d1ffaf0c 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java @@ -20,7 +20,6 @@ package org.apache.ignite.internal.sql.engine; import static org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFIG; import static org.apache.ignite.lang.ErrorGroups.Sql.OPERATION_INTERRUPTED_ERR; import static org.apache.ignite.lang.ErrorGroups.Sql.QUERY_INVALID_ERR; -import static org.apache.ignite.lang.ErrorGroups.Sql.SCHEMA_NOT_FOUND_ERR; import static org.apache.ignite.lang.ErrorGroups.Sql.SESSION_EXPIRED_ERR; import static org.apache.ignite.lang.ErrorGroups.Sql.SESSION_NOT_FOUND_ERR; import static org.apache.ignite.lang.ErrorGroups.Sql.UNSUPPORTED_DDL_OPERATION_ERR; @@ -35,6 +34,7 @@ import java.util.Set; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.LongFunction; import java.util.function.Supplier; @@ -95,6 +95,7 @@ import org.apache.ignite.internal.util.IgniteSpinBusyLock; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.lang.IgniteInternalException; import org.apache.ignite.lang.NodeStoppingException; +import org.apache.ignite.lang.SchemaNotFoundException; import org.apache.ignite.network.ClusterService; import org.apache.ignite.sql.SqlException; import org.jetbrains.annotations.NotNull; @@ -386,13 +387,6 @@ public class SqlQueryProcessor implements QueryProcessor { String schemaName = session.properties().get(QueryProperty.DEFAULT_SCHEMA); - SchemaPlus schema = sqlSchemaManager.schema(schemaName); - - if (schema == null) { - return CompletableFuture.failedFuture( - new IgniteInternalException(SCHEMA_NOT_FOUND_ERR, format("Schema not found [schemaName={}]", schemaName))); - } - InternalTransaction outerTx = context.unwrap(InternalTransaction.class); QueryCancel queryCancel = new QueryCancel(); @@ -413,6 +407,8 @@ public class SqlQueryProcessor implements QueryProcessor { CompletableFuture<Void> start = new CompletableFuture<>(); + AtomicReference<InternalTransaction> tx = new AtomicReference<>(); + CompletableFuture<AsyncSqlCursor<List<Object>>> stage = start .thenApply(v -> { StatementParseResult parseResult = IgniteSqlParser.parse(sql, StatementParseResult.MODE); @@ -425,6 +421,16 @@ public class SqlQueryProcessor implements QueryProcessor { .thenCompose(sqlNode -> { boolean rwOp = dataModificationOp(sqlNode); + boolean implicitTxRequired = outerTx == null; + + tx.set(implicitTxRequired ? txManager.begin(!rwOp) : outerTx); + + SchemaPlus schema = sqlSchemaManager.schema(schemaName); + + if (schema == null) { + return CompletableFuture.failedFuture(new SchemaNotFoundException(schemaName)); + } + BaseQueryContext ctx = BaseQueryContext.builder() .frameworkConfig( Frameworks.newConfigBuilder(FRAMEWORK_CONFIG) @@ -439,11 +445,7 @@ public class SqlQueryProcessor implements QueryProcessor { return prepareSvc.prepareAsync(sqlNode, ctx) .thenApply(plan -> { - boolean implicitTxRequired = outerTx == null; - - InternalTransaction tx = implicitTxRequired ? txManager.begin(!rwOp) : outerTx; - - var dataCursor = executionSrvc.executePlan(tx, plan, ctx); + var dataCursor = executionSrvc.executePlan(tx.get(), plan, ctx); SqlQueryType queryType = plan.type(); assert queryType != null : "Expected a full plan but got a fragment: " + plan; @@ -451,7 +453,7 @@ public class SqlQueryProcessor implements QueryProcessor { return new AsyncSqlCursorImpl<>( queryType, plan.metadata(), - implicitTxRequired ? tx : null, + implicitTxRequired ? tx.get() : null, new AsyncCursor<List<Object>>() { @Override public CompletableFuture<BatchedResult<List<Object>>> requestNextAsync(int rows) { @@ -475,6 +477,13 @@ public class SqlQueryProcessor implements QueryProcessor { if (ex instanceof CancellationException) { queryCancel.cancel(); } + + if (ex != null && outerTx == null) { + InternalTransaction tx0 = tx.get(); + if (tx0 != null) { + tx0.rollback(); + } + } }); start.completeAsync(() -> null, taskExecutor); diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/ddl/DdlSqlToCommandConverter.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/ddl/DdlSqlToCommandConverter.java index 2447f950d7..9bc1b486fb 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/ddl/DdlSqlToCommandConverter.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/ddl/DdlSqlToCommandConverter.java @@ -32,7 +32,6 @@ import static org.apache.ignite.lang.ErrorGroups.Sql.PRIMARY_KEYS_MULTIPLE_ERR; import static org.apache.ignite.lang.ErrorGroups.Sql.PRIMARY_KEY_MISSING_ERR; import static org.apache.ignite.lang.ErrorGroups.Sql.QUERY_INVALID_ERR; import static org.apache.ignite.lang.ErrorGroups.Sql.QUERY_VALIDATION_ERR; -import static org.apache.ignite.lang.ErrorGroups.Sql.SCHEMA_NOT_FOUND_ERR; import static org.apache.ignite.lang.ErrorGroups.Sql.SQL_TO_REL_CONVERSION_ERR; import static org.apache.ignite.lang.ErrorGroups.Sql.STORAGE_ENGINE_NOT_VALID_ERR; import static org.apache.ignite.lang.ErrorGroups.Sql.UNSUPPORTED_DDL_OPERATION_ERR; @@ -96,6 +95,7 @@ import org.apache.ignite.internal.sql.engine.sql.IgniteSqlIndexType; import org.apache.ignite.internal.sql.engine.sql.IgniteSqlZoneOption; import org.apache.ignite.internal.sql.engine.util.Commons; import org.apache.ignite.lang.IgniteException; +import org.apache.ignite.lang.SchemaNotFoundException; import org.apache.ignite.sql.ColumnType; import org.apache.ignite.sql.SqlException; import org.jetbrains.annotations.Nullable; @@ -709,7 +709,7 @@ public class DdlSqlToCommandConverter { private void ensureSchemaExists(PlanningContext ctx, String schemaName) { if (ctx.catalogReader().getRootSchema().getSubSchema(schemaName, true) == null) { - throw new SqlException(SCHEMA_NOT_FOUND_ERR, "Schema with name " + schemaName + " not found"); + throw new SchemaNotFoundException(schemaName); } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java index 9aa3ef3b44..27320aa2a8 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java @@ -28,7 +28,7 @@ import org.jetbrains.annotations.Nullable; * Ignite schema. */ public class IgniteSchema extends AbstractSchema { - public static final long INITIAL_VERSION = -1; + static final long INITIAL_VERSION = -1; private final String schemaName; diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java index 816f0cbc3b..b7c32e5c74 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java @@ -149,11 +149,11 @@ public class SqlSchemaManagerImpl implements SqlSchemaManager { /** {@inheritDoc} */ @Override public SchemaPlus schema(@Nullable String schema) { - SchemaPlus schemaPlus = calciteSchemaVv.latest(); - // stub for waiting pk indexes, more clear place is IgniteSchema CompletableFuture.allOf(pkIdxReady.values().toArray(CompletableFuture[]::new)).join(); + SchemaPlus schemaPlus = calciteSchemaVv.latest(); + return schema != null ? schemaPlus.getSubSchema(schema) : schemaPlus.getSubSchema(DEFAULT_SCHEMA_NAME); } @@ -171,7 +171,7 @@ public class SqlSchemaManagerImpl implements SqlSchemaManager { } try { if (ver == IgniteSchema.INITIAL_VERSION) { - return completedFuture(null); + return completedFuture(calciteSchemaVv.latest()); } CompletableFuture<SchemaPlus> lastSchemaFut; diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java index 73ddc9098b..c98b745510 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java @@ -81,7 +81,7 @@ public class PredefinedSchemaManager implements SqlSchemaManager { /** {@inheritDoc} */ @Override public CompletableFuture<?> actualSchemaAsync(long ver) { - return CompletableFuture.completedFuture(null); + return CompletableFuture.completedFuture(root); } /** {@inheritDoc} */