This is an automated email from the ASF dual-hosted git repository.

zstan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 89eb752c99 IGNITE-19493 Sql. Change query execution flow - Fixes #2162.
89eb752c99 is described below

commit 89eb752c9981f0880de70249329637196558bb46
Author: zstan <stanilov...@gmail.com>
AuthorDate: Thu Jun 8 10:56:07 2023 +0300

    IGNITE-19493 Sql. Change query execution flow - Fixes #2162.
    
    Signed-off-by: zstan <stanilov...@gmail.com>
---
 .../ignite/lang/SchemaNotFoundException.java       |  49 ++++++++++
 .../ignite/internal/sql/api/ItCommonApiTest.java   | 100 +++++++++++++++++++++
 .../types/timestamp/test_incorrect_timestamp.test  |   1 -
 .../internal/sql/engine/SqlQueryProcessor.java     |  37 +++++---
 .../prepare/ddl/DdlSqlToCommandConverter.java      |   4 +-
 .../internal/sql/engine/schema/IgniteSchema.java   |   2 +-
 .../sql/engine/schema/SqlSchemaManagerImpl.java    |   6 +-
 .../engine/framework/PredefinedSchemaManager.java  |   2 +-
 8 files changed, 179 insertions(+), 22 deletions(-)

diff --git 
a/modules/api/src/main/java/org/apache/ignite/lang/SchemaNotFoundException.java 
b/modules/api/src/main/java/org/apache/ignite/lang/SchemaNotFoundException.java
new file mode 100644
index 0000000000..559bb091d6
--- /dev/null
+++ 
b/modules/api/src/main/java/org/apache/ignite/lang/SchemaNotFoundException.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.lang;
+
+import static org.apache.ignite.lang.ErrorGroups.Sql.SCHEMA_NOT_FOUND_ERR;
+import static org.apache.ignite.lang.IgniteStringFormatter.format;
+
+import java.util.UUID;
+
+/**
+ * Exception is thrown when a specified schema cannot be found.
+ */
+public class SchemaNotFoundException extends IgniteException {
+    /**
+     * Creates an exception with the given schema name.
+     *
+     * @param schemaName Schema name.
+     */
+    public SchemaNotFoundException(String schemaName) {
+        super(SCHEMA_NOT_FOUND_ERR, format("Schema not found [schemaName={}]", 
schemaName));
+    }
+
+    /**
+     * Creates an exception with the given trace ID, error code, detailed 
message, and cause.
+     *
+     * @param traceId Unique identifier of the exception.
+     * @param code Full error code.
+     * @param message Detailed message.
+     * @param cause Optional nested exception (can be {@code null}).
+     */
+    public SchemaNotFoundException(UUID traceId, int code, String message, 
Throwable cause) {
+        super(traceId, code, message, cause);
+    }
+}
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java
index 1b535794d0..326177814c 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java
@@ -23,11 +23,14 @@ import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.await;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static org.apache.ignite.lang.ErrorGroups.Sql;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import java.time.Instant;
 import java.time.LocalDateTime;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import org.apache.calcite.schema.SchemaPlus;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.internal.app.IgniteImpl;
 import 
org.apache.ignite.internal.schema.testutils.SchemaConfigurationConverter;
@@ -38,7 +41,12 @@ import 
org.apache.ignite.internal.schema.testutils.definition.TableDefinition;
 import org.apache.ignite.internal.sql.engine.ClusterPerClassIntegrationTest;
 import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionCancelledException;
+import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
+import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
 import org.apache.ignite.internal.table.distributed.TableManager;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.sql.IgniteSql;
 import org.apache.ignite.sql.ResultSet;
 import org.apache.ignite.sql.Session;
@@ -46,6 +54,8 @@ import org.apache.ignite.sql.SqlException;
 import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.table.Table;
 import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.Test;
 
 /** Test common SQL API. */
@@ -168,4 +178,94 @@ public class ItCommonApiTest extends 
ClusterPerClassIntegrationTest {
             assertEquals(ins, res.next().timestampValue(3));
         }
     }
+
+    /** Check transaction change status with erroneous statements.  */
+    @Test
+    public void testTxStateChangedOnErroneousOp() {
+        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+
+        // TODO: need to be refactored after 
https://issues.apache.org/jira/browse/IGNITE-19663
+        TxManager txManagerInternal =
+                (TxManager) 
IgniteTestUtils.getFieldValue(CLUSTER_NODES.get(0), IgniteImpl.class, 
"txManager");
+
+        SqlQueryProcessor queryProc =
+                (SqlQueryProcessor) 
IgniteTestUtils.getFieldValue(CLUSTER_NODES.get(0), IgniteImpl.class, 
"qryEngine");
+
+        SqlSchemaManager oldManager =
+                (SqlSchemaManager) IgniteTestUtils.getFieldValue(queryProc, 
SqlQueryProcessor.class, "sqlSchemaManager");
+
+        int txPrevCnt = txManagerInternal.finished();
+
+        Transaction tx = CLUSTER_NODES.get(0).transactions().begin();
+
+        try {
+            sql(tx, "INSERT INTO PUBLIC.TEST VALUES(1, 1)");
+            sql(tx, "INSERT INTO NOTEXIST.TEST VALUES(1, 1)");
+        } catch (Throwable ignore) {
+            // No op.
+        }
+
+        assertEquals(0, txManagerInternal.finished() - txPrevCnt);
+        InternalTransaction tx0 = (InternalTransaction) tx;
+        assertNull(tx0.state());
+
+        tx.rollback();
+        assertEquals(1, txManagerInternal.finished() - txPrevCnt);
+
+        sql("INSERT INTO TEST VALUES(1, 1)");
+        assertEquals(2, txManagerInternal.finished() - txPrevCnt);
+
+        var schemaManager = new ErroneousSchemaManager();
+
+        // TODO: refactor after 
https://issues.apache.org/jira/browse/IGNITE-17694
+        IgniteTestUtils.setFieldValue(queryProc, "sqlSchemaManager", 
schemaManager);
+
+        try {
+            sql("SELECT a FROM NOTEXIST.TEST");
+        } catch (Throwable ignore) {
+            // No op.
+        }
+
+        try {
+            sql("INSERT INTO NOTEXIST.TEST VALUES(1, 1)");
+        } catch (Throwable ignore) {
+            // No op.
+        }
+
+        assertEquals(2, txManagerInternal.finished() - txPrevCnt);
+
+        IgniteTestUtils.setFieldValue(queryProc, "sqlSchemaManager", 
oldManager);
+    }
+
+    private static class ErroneousSchemaManager implements SqlSchemaManager {
+        /** {@inheritDoc} */
+        @Override
+        public SchemaPlus schema(@Nullable String schema) {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public SchemaPlus schema(@Nullable String name, int version) {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public IgniteTable tableById(int id) {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public CompletableFuture<SchemaPlus> actualSchemaAsync(long ver) {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public SchemaPlus activeSchema(@Nullable String name, long timestamp) {
+            throw new UnsupportedOperationException();
+        }
+    }
 }
diff --git 
a/modules/runner/src/integrationTest/sql/types/timestamp/test_incorrect_timestamp.test
 
b/modules/runner/src/integrationTest/sql/types/timestamp/test_incorrect_timestamp.test
index 8901fc6e3b..cc64d410c1 100644
--- 
a/modules/runner/src/integrationTest/sql/types/timestamp/test_incorrect_timestamp.test
+++ 
b/modules/runner/src/integrationTest/sql/types/timestamp/test_incorrect_timestamp.test
@@ -1,7 +1,6 @@
 # name: test/sql/types/timestamp/test_incorrect_timestamp.test
 # description: Test out of range/incorrect timestamp formats
 # group: [timestamp]
-# Ignore https://issues.apache.org/jira/browse/IGNITE-15623
 
 statement ok
 PRAGMA enable_verification
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 526c472b6c..64d1ffaf0c 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.sql.engine;
 import static 
org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFIG;
 import static org.apache.ignite.lang.ErrorGroups.Sql.OPERATION_INTERRUPTED_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Sql.QUERY_INVALID_ERR;
-import static org.apache.ignite.lang.ErrorGroups.Sql.SCHEMA_NOT_FOUND_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Sql.SESSION_EXPIRED_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Sql.SESSION_NOT_FOUND_ERR;
 import static 
org.apache.ignite.lang.ErrorGroups.Sql.UNSUPPORTED_DDL_OPERATION_ERR;
@@ -35,6 +34,7 @@ import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.LongFunction;
 import java.util.function.Supplier;
@@ -95,6 +95,7 @@ import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.lang.SchemaNotFoundException;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.sql.SqlException;
 import org.jetbrains.annotations.NotNull;
@@ -386,13 +387,6 @@ public class SqlQueryProcessor implements QueryProcessor {
 
         String schemaName = 
session.properties().get(QueryProperty.DEFAULT_SCHEMA);
 
-        SchemaPlus schema = sqlSchemaManager.schema(schemaName);
-
-        if (schema == null) {
-            return CompletableFuture.failedFuture(
-                    new IgniteInternalException(SCHEMA_NOT_FOUND_ERR, 
format("Schema not found [schemaName={}]", schemaName)));
-        }
-
         InternalTransaction outerTx = 
context.unwrap(InternalTransaction.class);
 
         QueryCancel queryCancel = new QueryCancel();
@@ -413,6 +407,8 @@ public class SqlQueryProcessor implements QueryProcessor {
 
         CompletableFuture<Void> start = new CompletableFuture<>();
 
+        AtomicReference<InternalTransaction> tx = new AtomicReference<>();
+
         CompletableFuture<AsyncSqlCursor<List<Object>>> stage = start
                 .thenApply(v -> {
                     StatementParseResult parseResult = 
IgniteSqlParser.parse(sql, StatementParseResult.MODE);
@@ -425,6 +421,16 @@ public class SqlQueryProcessor implements QueryProcessor {
                 .thenCompose(sqlNode -> {
                     boolean rwOp = dataModificationOp(sqlNode);
 
+                    boolean implicitTxRequired = outerTx == null;
+
+                    tx.set(implicitTxRequired ? txManager.begin(!rwOp) : 
outerTx);
+
+                    SchemaPlus schema = sqlSchemaManager.schema(schemaName);
+
+                    if (schema == null) {
+                        return CompletableFuture.failedFuture(new 
SchemaNotFoundException(schemaName));
+                    }
+
                     BaseQueryContext ctx = BaseQueryContext.builder()
                             .frameworkConfig(
                                     
Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
@@ -439,11 +445,7 @@ public class SqlQueryProcessor implements QueryProcessor {
 
                     return prepareSvc.prepareAsync(sqlNode, ctx)
                             .thenApply(plan -> {
-                                boolean implicitTxRequired = outerTx == null;
-
-                                InternalTransaction tx = implicitTxRequired ? 
txManager.begin(!rwOp) : outerTx;
-
-                                var dataCursor = executionSrvc.executePlan(tx, 
plan, ctx);
+                                var dataCursor = 
executionSrvc.executePlan(tx.get(), plan, ctx);
 
                                 SqlQueryType queryType = plan.type();
                                 assert queryType != null : "Expected a full 
plan but got a fragment: " + plan;
@@ -451,7 +453,7 @@ public class SqlQueryProcessor implements QueryProcessor {
                                 return new AsyncSqlCursorImpl<>(
                                         queryType,
                                         plan.metadata(),
-                                        implicitTxRequired ? tx : null,
+                                        implicitTxRequired ? tx.get() : null,
                                         new AsyncCursor<List<Object>>() {
                                             @Override
                                             public 
CompletableFuture<BatchedResult<List<Object>>> requestNextAsync(int rows) {
@@ -475,6 +477,13 @@ public class SqlQueryProcessor implements QueryProcessor {
             if (ex instanceof CancellationException) {
                 queryCancel.cancel();
             }
+
+            if (ex != null && outerTx == null) {
+                InternalTransaction tx0 = tx.get();
+                if (tx0 != null) {
+                    tx0.rollback();
+                }
+            }
         });
 
         start.completeAsync(() -> null, taskExecutor);
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/ddl/DdlSqlToCommandConverter.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/ddl/DdlSqlToCommandConverter.java
index 2447f950d7..9bc1b486fb 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/ddl/DdlSqlToCommandConverter.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/ddl/DdlSqlToCommandConverter.java
@@ -32,7 +32,6 @@ import static 
org.apache.ignite.lang.ErrorGroups.Sql.PRIMARY_KEYS_MULTIPLE_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Sql.PRIMARY_KEY_MISSING_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Sql.QUERY_INVALID_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Sql.QUERY_VALIDATION_ERR;
-import static org.apache.ignite.lang.ErrorGroups.Sql.SCHEMA_NOT_FOUND_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Sql.SQL_TO_REL_CONVERSION_ERR;
 import static 
org.apache.ignite.lang.ErrorGroups.Sql.STORAGE_ENGINE_NOT_VALID_ERR;
 import static 
org.apache.ignite.lang.ErrorGroups.Sql.UNSUPPORTED_DDL_OPERATION_ERR;
@@ -96,6 +95,7 @@ import 
org.apache.ignite.internal.sql.engine.sql.IgniteSqlIndexType;
 import org.apache.ignite.internal.sql.engine.sql.IgniteSqlZoneOption;
 import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.lang.SchemaNotFoundException;
 import org.apache.ignite.sql.ColumnType;
 import org.apache.ignite.sql.SqlException;
 import org.jetbrains.annotations.Nullable;
@@ -709,7 +709,7 @@ public class DdlSqlToCommandConverter {
 
     private void ensureSchemaExists(PlanningContext ctx, String schemaName) {
         if (ctx.catalogReader().getRootSchema().getSubSchema(schemaName, true) 
== null) {
-            throw new SqlException(SCHEMA_NOT_FOUND_ERR, "Schema with name " + 
schemaName + " not found");
+            throw new SchemaNotFoundException(schemaName);
         }
     }
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java
index 9aa3ef3b44..27320aa2a8 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java
@@ -28,7 +28,7 @@ import org.jetbrains.annotations.Nullable;
  * Ignite schema.
  */
 public class IgniteSchema extends AbstractSchema {
-    public static final long INITIAL_VERSION = -1;
+    static final long INITIAL_VERSION = -1;
 
     private final String schemaName;
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
index 816f0cbc3b..b7c32e5c74 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
@@ -149,11 +149,11 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
     /** {@inheritDoc} */
     @Override
     public SchemaPlus schema(@Nullable String schema) {
-        SchemaPlus schemaPlus = calciteSchemaVv.latest();
-
         // stub for waiting pk indexes, more clear place is IgniteSchema
         
CompletableFuture.allOf(pkIdxReady.values().toArray(CompletableFuture[]::new)).join();
 
+        SchemaPlus schemaPlus = calciteSchemaVv.latest();
+
         return schema != null ? schemaPlus.getSubSchema(schema) : 
schemaPlus.getSubSchema(DEFAULT_SCHEMA_NAME);
     }
 
@@ -171,7 +171,7 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
         }
         try {
             if (ver == IgniteSchema.INITIAL_VERSION) {
-                return completedFuture(null);
+                return completedFuture(calciteSchemaVv.latest());
             }
 
             CompletableFuture<SchemaPlus> lastSchemaFut;
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java
index 73ddc9098b..c98b745510 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java
@@ -81,7 +81,7 @@ public class PredefinedSchemaManager implements 
SqlSchemaManager {
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<?> actualSchemaAsync(long ver) {
-        return CompletableFuture.completedFuture(null);
+        return CompletableFuture.completedFuture(root);
     }
 
     /** {@inheritDoc} */

Reply via email to