This is an automated email from the ASF dual-hosted git repository.

ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new ff9e052c23d IGNITE-22303 Sql. Retry operation when plan gets outdated 
by the time of execution (#7309)
ff9e052c23d is described below

commit ff9e052c23df364026b03f7587403af814dc85c5
Author: Pavel Pereslegin <[email protected]>
AuthorDate: Mon Jan 5 19:56:15 2026 +0300

    IGNITE-22303 Sql. Retry operation when plan gets outdated by the time of 
execution (#7309)
---
 .../ItSqlConcurrentSchemaModificationTest.java     | 126 ++++++++++
 .../internal/sql/engine/SqlOperationContext.java   |  37 ++-
 .../engine/SqlPlanToTxSchemaVersionValidator.java  |  91 ++++++++
 .../internal/sql/engine/SqlQueryProcessor.java     |   3 +-
 .../sql/engine/exec/ExchangeServiceImpl.java       |   4 +-
 .../sql/engine/exec/ExecutionServiceImpl.java      | 115 ++++++----
 .../sql/engine/exec/SqlPlanOutdatedException.java  |  35 +++
 .../exec/fsm/CursorPublicationPhaseHandler.java    |  10 +-
 .../engine/exec/fsm/OptimizingPhaseHandler.java    |  40 ++--
 .../internal/sql/engine/exec/fsm/Program.java      |   6 +-
 .../sql/engine/exec/fsm/QueryExecutionProgram.java |  45 +++-
 .../sql/engine/prepare/KeyValueGetPlan.java        |   7 +
 .../internal/sql/engine/prepare/QueryPlan.java     |   7 +
 .../sql/engine/exec/ExecutionServiceImplTest.java  |   4 +-
 .../internal/sql/engine/exec/QueryTimeoutTest.java |  12 +-
 .../sql/engine/exec/SqlOutdatedPlanTest.java       | 255 +++++++++++++++++++++
 .../sql/engine/framework/TestBuilders.java         |  55 ++++-
 .../internal/sql/engine/framework/TestNode.java    |  24 +-
 18 files changed, 792 insertions(+), 84 deletions(-)

diff --git 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlConcurrentSchemaModificationTest.java
 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlConcurrentSchemaModificationTest.java
new file mode 100644
index 00000000000..a08391e8e1b
--- /dev/null
+++ 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlConcurrentSchemaModificationTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine;
+
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
+import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.sql.async.AsyncResultSet;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+/**
+ * Integration tests to verify SQL query execution during concurrent schema 
updates.
+ */
+public class ItSqlConcurrentSchemaModificationTest extends 
BaseSqlIntegrationTest {
+    @Override
+    protected int initialNodes() {
+        return 1;
+    }
+
+    @Override
+    protected String getNodeBootstrapConfigTemplate() {
+        return "ignite.sql.execution.threadCount: 64";
+    }
+
+    @AfterEach
+    void dropTables() {
+        System.clearProperty("FAST_QUERY_OPTIMIZATION_ENABLED");
+        Commons.resetFastQueryOptimizationFlag();
+        
unwrapIgniteImpl(CLUSTER.aliveNode()).queryEngine().invalidatePlannerCache(Set.of());
+
+        dropAllTables();
+    }
+
+    @ParameterizedTest(name = "FastQueryOptimization={0}")
+    @ValueSource(booleans = {true, false})
+    void dmlWithConcurrentDdl(Boolean fastPlan) throws InterruptedException {
+        System.setProperty("FAST_QUERY_OPTIMIZATION_ENABLED", 
fastPlan.toString());
+
+        IgniteSql sql = CLUSTER.aliveNode().sql();
+
+        sql("CREATE TABLE t(id INT PRIMARY KEY)");
+
+        int iterations = 20;
+
+        for (int i = 0; i < iterations; i++) {
+            log.info("iteration #" + i);
+
+            String ddlQuery = i % 2 == 0
+                    ? "ALTER TABLE t ADD COLUMN val VARCHAR DEFAULT 'abc'"
+                    : "ALTER TABLE t DROP COLUMN val";
+
+            CompletableFuture<AsyncResultSet<SqlRow>> ddlFut = 
sql.executeAsync(null, ddlQuery);
+
+            Thread.sleep(ThreadLocalRandom.current().nextInt(15) * 10);
+
+            assertQuery("INSERT INTO t (id) VALUES (?)")
+                    .withParam(i)
+                    .returns(1L)
+                    .check();
+
+            await(await(ddlFut).closeAsync());
+
+            assertEquals(0, txManager().pending());
+        }
+    }
+
+    @ParameterizedTest(name = "FastQueryOptimization={0}")
+    @ValueSource(booleans = {true, false})
+    void selectWithConcurrentDdl(boolean fastPlan) throws InterruptedException 
{
+        System.setProperty("FAST_QUERY_OPTIMIZATION_ENABLED", 
String.valueOf(fastPlan));
+
+        IgniteSql sql = CLUSTER.aliveNode().sql();
+
+        sql("CREATE TABLE t(id INT PRIMARY KEY, id2 INT)");
+        sql("INSERT INTO t VALUES (0, 0), (1, 1), (2, 2), (3, 3), (4, 4), (5, 
5), (6, 6), (7, 7), (8, 8), (9, 9)");
+
+        int iterations = 20;
+
+        for (int i = 0; i < iterations; i++) {
+            log.info("iteration #" + i);
+
+            String ddlQuery = i % 2 == 0
+                    ? "ALTER TABLE t ADD COLUMN val VARCHAR DEFAULT 'abc'"
+                    : "ALTER TABLE t DROP COLUMN val";
+
+            CompletableFuture<AsyncResultSet<SqlRow>> ddlFut = 
sql.executeAsync(null, ddlQuery);
+            Thread.sleep(ThreadLocalRandom.current().nextInt(15) * 10);
+
+            int id = i % 10;
+
+            assertQuery(format("SELECT id2 FROM t WHERE id={}", id))
+                    .returns(id)
+                    .check();
+
+            await(await(ddlFut).closeAsync());
+
+            assertEquals(0, txManager().pending());
+        }
+    }
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlOperationContext.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlOperationContext.java
index 737270b9ff5..50ea6d21151 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlOperationContext.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlOperationContext.java
@@ -23,6 +23,7 @@ import java.time.ZoneId;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -52,6 +53,7 @@ public final class SqlOperationContext {
     private final @Nullable Consumer<Throwable> errorListener;
     private final @Nullable String userName;
     private final @Nullable Long topologyVersion;
+    private final @Nullable AtomicReference<QueryTransactionWrapper> 
retryTxHolder;
 
     /**
      * Private constructor, used by a builder.
@@ -67,7 +69,8 @@ public final class SqlOperationContext {
             @Nullable Consumer<QueryTransactionWrapper> txUsedListener,
             @Nullable Consumer<Throwable> errorListener,
             @Nullable String userName,
-            @Nullable Long topologyVersion
+            @Nullable Long topologyVersion,
+            @Nullable QueryTransactionWrapper retryTx
     ) {
         this.queryId = queryId;
         this.timeZoneId = timeZoneId;
@@ -80,12 +83,36 @@ public final class SqlOperationContext {
         this.errorListener = errorListener;
         this.userName = userName;
         this.topologyVersion = topologyVersion;
+        this.retryTxHolder = new AtomicReference<>(retryTx);
     }
 
     public static Builder builder() {
         return new Builder();
     }
 
+    /**
+     * Copies an existing context preserving the existing transaction.
+     *
+     * <p>Used in case of a retry. If the operation is repeated while 
preserving the running transaction,
+     * the operation time is taken from this transaction.
+     */
+    public SqlOperationContext withTransactionForRetry(QueryTransactionWrapper 
tx) {
+        return new SqlOperationContext(
+                queryId,
+                timeZoneId,
+                parameters,
+                tx.unwrap().schemaTimestamp(),
+                txContext,
+                cancel,
+                defaultSchemaName,
+                txUsedListener,
+                errorListener,
+                userName,
+                topologyVersion,
+                tx
+        );
+    }
+
     /** Returns unique identifier of the query. */
     public UUID queryId() {
         return queryId;
@@ -182,6 +209,11 @@ public final class SqlOperationContext {
         return excludedNodes.isEmpty() ? null : excludedNodes::contains;
     }
 
+    /** Returns transaction used for retry operation or {@code null}. */
+    public @Nullable QueryTransactionWrapper retryTx() {
+        return retryTxHolder.getAndSet(null);
+    }
+
     /**
      * Query context builder.
      */
@@ -268,7 +300,8 @@ public final class SqlOperationContext {
                     txUsedListener,
                     errorListener,
                     userName,
-                    topologyVersion
+                    topologyVersion,
+                    null
             );
         }
     }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlPlanToTxSchemaVersionValidator.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlPlanToTxSchemaVersionValidator.java
new file mode 100644
index 00000000000..f6972c40f50
--- /dev/null
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlPlanToTxSchemaVersionValidator.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine;
+
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.SchemaSyncService;
+import org.apache.ignite.internal.sql.engine.exec.SqlPlanOutdatedException;
+import org.apache.ignite.internal.sql.engine.prepare.MultiStepPlan;
+import org.apache.ignite.internal.sql.engine.tx.QueryTransactionWrapper;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * SQL query execution plan validator.
+ *
+ * <p>Performs validation of the catalog version from the {@link MultiStepPlan 
multi-step plan} relative to the started transaction.
+ */
+public class SqlPlanToTxSchemaVersionValidator {
+    @TestOnly
+    public static final SqlPlanToTxSchemaVersionValidator NOOP = new 
NoopSqlPlanToTxSchemaVersionValidator();
+
+    private final SchemaSyncService schemaSyncService;
+    private final CatalogService catalogService;
+
+    /**
+     * Compares the catalog version from the plan with the version of the 
active catalog
+     * at the {@link InternalTransaction#schemaTimestamp() schema time} of the 
transaction.
+     *
+     * @param plan {@link MultiStepPlan multi-step} execution plan.
+     * @param tx Query transaction wrapper.
+     * @return Successfully completed future if the provided transaction is 
explicit or the catalog versions match.
+     *         Otherwise returns a future completed with an exception {@link 
SqlPlanOutdatedException}.
+     */
+    public CompletableFuture<Void> validate(MultiStepPlan plan, 
QueryTransactionWrapper tx) {
+        if (!tx.implicit()) {
+            return nullCompletedFuture();
+        }
+
+        HybridTimestamp ts = tx.unwrap().schemaTimestamp();
+
+        return schemaSyncService.waitForMetadataCompleteness(ts)
+                .thenRun(() -> {
+                    // TODO https://issues.apache.org/jira/browse/IGNITE-27491 
Avoid re-planning in case of unrelated catalog changes
+                    int requiredCatalog = 
catalogService.activeCatalogVersion(ts.longValue());
+
+                    if (requiredCatalog != plan.catalogVersion()) {
+                        throw new SqlPlanOutdatedException();
+                    }
+                });
+    }
+
+    private SqlPlanToTxSchemaVersionValidator(SchemaSyncService 
schemaSyncService, CatalogService catalogService) {
+        this.schemaSyncService = schemaSyncService;
+        this.catalogService = catalogService;
+    }
+
+    public static SqlPlanToTxSchemaVersionValidator create(SchemaSyncService 
schemaSyncService, CatalogService catalogService) {
+        return new SqlPlanToTxSchemaVersionValidator(schemaSyncService, 
catalogService);
+    }
+
+    private static class NoopSqlPlanToTxSchemaVersionValidator extends 
SqlPlanToTxSchemaVersionValidator {
+        @SuppressWarnings("DataFlowIssue")
+        private NoopSqlPlanToTxSchemaVersionValidator() {
+            super(null, null);
+        }
+
+        @Override
+        public CompletableFuture<Void> validate(MultiStepPlan plan, 
QueryTransactionWrapper tx) {
+            return nullCompletedFuture();
+        }
+    }
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 1d1c9e7c833..00b7b55a1db 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -396,7 +396,8 @@ public class SqlQueryProcessor implements QueryProcessor, 
SystemViewProvider {
                 clockService,
                 killCommandHandler,
                 expressionFactory,
-                EXECUTION_SERVICE_SHUTDOWN_TIMEOUT
+                EXECUTION_SERVICE_SHUTDOWN_TIMEOUT,
+                SqlPlanToTxSchemaVersionValidator.create(schemaSyncService, 
catalogManager)
         ));
 
         queryExecutor = registerService(new QueryExecutor(
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
index a52f9a6bad8..674b8e37f16 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
@@ -123,8 +123,10 @@ public class ExchangeServiceImpl implements 
ExchangeService {
         Throwable traceableErr = ExceptionUtils.unwrapCause(error);
 
         if (!(traceableErr instanceof TraceableException)) {
-            traceableErr = error = new IgniteInternalException(INTERNAL_ERR, 
error);
+            traceableErr = new IgniteInternalException(INTERNAL_ERR, error);
+        }
 
+        if (((TraceableException) traceableErr).code() == INTERNAL_ERR) {
             LOG.info(format("Failed to execute query fragment: traceId={}, 
executionId={}, fragmentId={}",
                     ((TraceableException) traceableErr).traceId(), 
executionId, fragmentId), error);
         } else if (LOG.isDebugEnabled()) {
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index fa858ed8c4d..ce1e20f4a9c 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -83,6 +83,7 @@ import org.apache.ignite.internal.sql.engine.QueryCancel;
 import org.apache.ignite.internal.sql.engine.QueryCancelledException;
 import org.apache.ignite.internal.sql.engine.SchemaAwareConverter;
 import org.apache.ignite.internal.sql.engine.SqlOperationContext;
+import org.apache.ignite.internal.sql.engine.SqlPlanToTxSchemaVersionValidator;
 import 
org.apache.ignite.internal.sql.engine.SqlQueryProcessor.PrefetchCallback;
 import org.apache.ignite.internal.sql.engine.SqlQueryType;
 import org.apache.ignite.internal.sql.engine.api.expressions.RowFactoryFactory;
@@ -189,6 +190,8 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, LogicalTopo
 
     private final SqlExpressionFactory sqlExpressionFactory;
 
+    private final SqlPlanToTxSchemaVersionValidator planValidator;
+
     /**
      * Constructor.
      *
@@ -204,6 +207,7 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, LogicalTopo
      * @param clockService Clock service.
      * @param killCommandHandler Kill command handler.
      * @param shutdownTimeout Shutdown timeout.
+     * @param planValidator Validator of the catalog version from the plan 
relative to the started transaction.
      */
     public ExecutionServiceImpl(
             MessageService messageService,
@@ -220,7 +224,8 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, LogicalTopo
             ClockService clockService,
             KillCommandHandler killCommandHandler,
             SqlExpressionFactory sqlExpressionFactory,
-            long shutdownTimeout
+            long shutdownTimeout,
+            SqlPlanToTxSchemaVersionValidator planValidator
     ) {
         this.localNode = topSrvc.localMember();
         this.handler = handler;
@@ -237,6 +242,7 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, LogicalTopo
         this.killCommandHandler = killCommandHandler;
         this.sqlExpressionFactory = sqlExpressionFactory;
         this.shutdownTimeout = shutdownTimeout;
+        this.planValidator = planValidator;
     }
 
     /**
@@ -259,6 +265,7 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, LogicalTopo
      * @param clockService Clock service.
      * @param killCommandHandler Kill command handler.
      * @param shutdownTimeout Shutdown timeout.
+     * @param planValidator Validator of the catalog version from the plan 
relative to the started transaction.
      * @return An execution service.
      */
     public static <RowT> ExecutionServiceImpl<RowT> create(
@@ -278,7 +285,8 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, LogicalTopo
             ClockService clockService,
             KillCommandHandler killCommandHandler,
             SqlExpressionFactory sqlExpressionFactory,
-            long shutdownTimeout
+            long shutdownTimeout,
+            SqlPlanToTxSchemaVersionValidator planValidator
     ) {
         return new ExecutionServiceImpl<>(
                 msgSrvc,
@@ -301,7 +309,8 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, LogicalTopo
                 clockService,
                 killCommandHandler,
                 sqlExpressionFactory,
-                shutdownTimeout
+                shutdownTimeout,
+                planValidator
         );
     }
 
@@ -330,60 +339,76 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, LogicalTopo
 
         assert old == null;
 
-        QueryTransactionContext txContext = operationContext.txContext();
-
-        assert txContext != null;
-
         boolean readOnly = plan.type().implicitTransactionReadOnlyMode();
-        QueryTransactionWrapper txWrapper = 
txContext.getOrStartSqlManaged(readOnly, false);
 
+        QueryTransactionWrapper txWrapper = 
getOrStartTransaction(operationContext, readOnly);
         InternalTransaction tx = txWrapper.unwrap();
 
-        operationContext.notifyTxUsed(txWrapper);
+        return planValidator.validate(plan, txWrapper)
+                .thenCompose(ignore -> {
+                    PrefetchCallback prefetchCallback = 
queryManager.prefetchCallback;
 
-        PrefetchCallback prefetchCallback = queryManager.prefetchCallback;
+                    CompletableFuture<Void> firstPageReady = 
prefetchCallback.prefetchFuture();
 
-        CompletableFuture<Void> firstPageReady = 
prefetchCallback.prefetchFuture();
+                    if (plan.type() == SqlQueryType.DML) {
+                        // DML is supposed to have a single row response, so 
if the first page is ready, then all
+                        // inputs have been processed, all tables have been 
updated, and now it should be safe to
+                        // commit implicit transaction
+                        firstPageReady = firstPageReady.thenCompose(none -> 
txWrapper.finalise());
+                    }
 
-        if (plan.type() == SqlQueryType.DML) {
-            // DML is supposed to have a single row response, so if the first 
page is ready, then all
-            // inputs have been processed, all tables have been updated, and 
now it should be safe to
-            // commit implicit transaction
-            firstPageReady = firstPageReady.thenCompose(none -> 
txWrapper.finalise());
-        }
+                    CompletableFuture<Void> firstPageReady0 = firstPageReady;
 
-        CompletableFuture<Void> firstPageReady0 = firstPageReady;
-
-        Predicate<String> nodeExclusionFilter = 
operationContext.nodeExclusionFilter();
-
-        CompletableFuture<AsyncDataCursor<InternalSqlRow>> f = 
queryManager.execute(tx, plan, nodeExclusionFilter)
-                .thenApply(dataCursor -> new TxAwareAsyncCursor<>(
-                        txWrapper,
-                        dataCursor,
-                        firstPageReady0,
-                        queryManager::close,
-                        operationContext::notifyError
-                ));
-
-        return f.handle((r, t) -> {
-            if (t != null) {
-                // We were unable to create cursor, hence need to finalise 
transaction wrapper
-                // which were created solely for this operation.
-                return txWrapper.finalise(t).handle((none, finalizationErr) -> 
{
-                    if (finalizationErr != null) {
-                        t.addSuppressed(finalizationErr);
-                    }
+                    Predicate<String> nodeExclusionFilter = 
operationContext.nodeExclusionFilter();
 
-                    // Re-throw the exception, so execution future is 
completed with the same exception.
-                    sneakyThrow(t);
+                    CompletableFuture<AsyncDataCursor<InternalSqlRow>> f = 
queryManager.execute(tx, plan, nodeExclusionFilter)
+                            .thenApply(dataCursor -> new TxAwareAsyncCursor<>(
+                                    txWrapper,
+                                    dataCursor,
+                                    firstPageReady0,
+                                    queryManager::close,
+                                    operationContext::notifyError
+                            ));
+
+                    return f.handle((r, t) -> {
+                        if (t != null) {
+                            // We were unable to create cursor, hence need to 
finalise transaction wrapper
+                            // which were created solely for this operation.
+                            return txWrapper.finalise(t).handle((none, 
finalizationErr) -> {
+                                if (finalizationErr != null) {
+                                    t.addSuppressed(finalizationErr);
+                                }
+
+                                // Re-throw the exception, so execution future 
is completed with the same exception.
+                                sneakyThrow(t);
+
+                                // We must never reach this line.
+                                return (AsyncDataCursor<InternalSqlRow>) null;
+                            });
+                        }
 
-                    // We must never reach this line.
-                    return (AsyncDataCursor<InternalSqlRow>) null;
+                        return completedFuture(r);
+                    }).thenCompose(Function.identity());
                 });
-            }
+    }
+
+    private static QueryTransactionWrapper 
getOrStartTransaction(SqlOperationContext operationContext, boolean readOnly) {
+        QueryTransactionContext txContext = operationContext.txContext();
+
+        assert txContext != null;
+
+        // Try to use previously started transaction.
+        QueryTransactionWrapper txWrapper = operationContext.retryTx();
+
+        if (txWrapper != null) {
+            return txWrapper;
+        }
+
+        txWrapper = txContext.getOrStartSqlManaged(readOnly, false);
+
+        operationContext.notifyTxUsed(txWrapper);
 
-            return completedFuture(r);
-        }).thenCompose(Function.identity());
+        return txWrapper;
     }
 
     private static SqlOperationContext createOperationContext(
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SqlPlanOutdatedException.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SqlPlanOutdatedException.java
new file mode 100644
index 00000000000..8c1eacf79f8
--- /dev/null
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SqlPlanOutdatedException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import org.apache.ignite.internal.tx.InternalTransaction;
+
+/**
+ * Exception occurs when SQL engine detects that the execution plan is 
outdated after preparation.
+ *
+ * <p>It is used internally to signal the SQL engine to retry the optimization 
step
+ * using the transaction {@link InternalTransaction#schemaTimestamp() schema 
timestamp}.
+ * This exception should never be passed to the user, and has no special error 
code.
+ */
+public class SqlPlanOutdatedException extends RuntimeException {
+    private static final long serialVersionUID = 1L;
+
+    public SqlPlanOutdatedException() {
+        super(null, null, false, false);
+    }
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/CursorPublicationPhaseHandler.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/CursorPublicationPhaseHandler.java
index 527c8c090aa..6ff287147e2 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/CursorPublicationPhaseHandler.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/CursorPublicationPhaseHandler.java
@@ -18,14 +18,14 @@
 package org.apache.ignite.internal.sql.engine.exec.fsm;
 
 import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
-import org.apache.ignite.internal.sql.engine.SqlQueryType;
+import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
 import org.apache.ignite.internal.util.AsyncCursor;
 
 /**
  * Handler that postpones moment of publication of the cursor.
  *
  * <p>Without this delay, the cursor is published as soon as all fragments is 
initialized on remote nodes. And here the most confusing side
- * effect: for multi-step DML, if {@link AsyncCursor#closeAsync()} invoked 
immediately after cursor becomes public, all fragments will be 
+ * effect: for multi-step DML, if {@link AsyncCursor#closeAsync()} invoked 
immediately after cursor becomes public, all fragments will be
  * cancelled, and implicit transaction will be commited. This affect script 
processing, because currently it iterates over all te results
  * and just closes them. From user's point of view, such a behaviour renders 
certain DML statements to be skipped since no results are
  * commited.
@@ -41,12 +41,12 @@ class CursorPublicationPhaseHandler implements 
ExecutionPhaseHandler {
     @Override
     public Result handle(Query query) {
         AsyncSqlCursor<?> cursor = query.cursor;
+        QueryPlan plan = query.plan;
 
         assert cursor != null;
+        assert plan != null;
 
-        SqlQueryType queryType = cursor.queryType();
-
-        if (queryType == SqlQueryType.QUERY) {
+        if (plan.lazyCursorPublication()) {
             // Preserve lazy execution for statements that only reads.
             return Result.completed();
         }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/OptimizingPhaseHandler.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/OptimizingPhaseHandler.java
index fd57453007a..76646891c8a 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/OptimizingPhaseHandler.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/OptimizingPhaseHandler.java
@@ -45,6 +45,31 @@ class OptimizingPhaseHandler implements 
ExecutionPhaseHandler {
 
         assert result != null : "Query is expected to be parsed at this phase";
 
+        SqlOperationContext operationContext = buildContext(query, result);
+
+        CompletableFuture<Void> awaitFuture = 
query.executor.waitForMetadata(operationContext.operationTime())
+                .thenCompose(none -> query.executor.prepare(result, 
operationContext)
+                        .thenAccept(plan -> {
+                            if (query.txContext.explicitTx() == null) {
+                                // in case of implicit tx we have to update 
observable time to prevent tx manager to start
+                                // implicit transaction too much in the past 
where version of catalog we used to prepare the
+                                // plan was not yet available
+                                
query.txContext.updateObservableTime(query.executor.deriveMinimalRequiredTime(plan));
+                            }
+
+                            query.plan = plan;
+                        }));
+
+        return Result.proceedAfter(awaitFuture);
+    }
+
+    private static SqlOperationContext buildContext(Query query, ParsedResult 
result) {
+        SqlOperationContext retryContext = query.operationContext;
+
+        if (retryContext != null) {
+            return retryContext;
+        }
+
         validateParsedStatement(query.properties, result);
         validateDynamicParameters(result.dynamicParamsCount(), query.params, 
true);
         ensureStatementMatchesTx(result.queryType(), query.txContext);
@@ -70,20 +95,7 @@ class OptimizingPhaseHandler implements 
ExecutionPhaseHandler {
 
         query.operationContext = operationContext;
 
-        CompletableFuture<Void> awaitFuture = 
query.executor.waitForMetadata(operationTime)
-                .thenCompose(none -> query.executor.prepare(result, 
operationContext)
-                        .thenAccept(plan -> {
-                            if (query.txContext.explicitTx() == null) {
-                                // in case of implicit tx we have to update 
observable time to prevent tx manager to start
-                                // implicit transaction too much in the past 
where version of catalog we used to prepare the
-                                // plan was not yet available
-                                
query.txContext.updateObservableTime(query.executor.deriveMinimalRequiredTime(plan));
-                            }
-
-                            query.plan = plan;
-                        }));
-
-        return Result.proceedAfter(awaitFuture);
+        return operationContext;
     }
 
     /** Checks that the statement is allowed within an external/script 
transaction. */
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/Program.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/Program.java
index fc41f0fb4a3..199dbc69200 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/Program.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/Program.java
@@ -120,7 +120,11 @@ class Program<ResultT> {
                     break;
                 }
             }
-        } while (advanceQuery(query, state));
+
+            if (!advanceQuery(query, state)) {
+                break;
+            }
+        } while (true);
     }
 
     private boolean shouldRetry(Query query, Throwable th) {
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutionProgram.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutionProgram.java
index 763597b4f0a..5e71c6a485e 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutionProgram.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutionProgram.java
@@ -21,10 +21,14 @@ import static org.apache.ignite.lang.ErrorGroups.Replicator;
 import static org.apache.ignite.lang.ErrorGroups.Transactions;
 
 import java.util.List;
+import 
org.apache.ignite.internal.partition.replicator.schemacompat.InternalSchemaVersionMismatchException;
 import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
 import org.apache.ignite.internal.sql.engine.InternalSqlRow;
 import org.apache.ignite.internal.sql.engine.SqlOperationContext;
+import org.apache.ignite.internal.sql.engine.exec.SqlPlanOutdatedException;
 import org.apache.ignite.internal.sql.engine.message.UnknownNodeException;
+import org.apache.ignite.internal.sql.engine.prepare.MultiStepPlan;
+import org.apache.ignite.internal.sql.engine.tx.QueryTransactionWrapper;
 import org.apache.ignite.internal.util.ExceptionUtils;
 
 /**
@@ -75,6 +79,32 @@ class QueryExecutionProgram extends 
Program<AsyncSqlCursor<InternalSqlRow>> {
     static boolean errorHandler(Query query, Throwable th) {
         if (canRecover(query, th)) {
             query.error.set(null);
+
+            if (multiStepPlanOutdated(th)) {
+                assert query.plan instanceof MultiStepPlan;
+                assert query.currentPhase() == 
ExecutionPhase.CURSOR_INITIALIZATION : query.currentPhase();
+
+                SqlOperationContext context = query.operationContext;
+                QueryTransactionWrapper tx = query.usedTransaction;
+
+                assert context != null;
+                assert tx != null;
+
+                query.operationContext = context.withTransactionForRetry(tx);
+
+                query.moveTo(ExecutionPhase.OPTIMIZING);
+
+                return true;
+            }
+
+            if (fastPlanSchemaVersionMismatch(th) || 
incompatibleSchemaChange(th)) {
+                query.operationContext = null;
+
+                query.moveTo(ExecutionPhase.OPTIMIZING);
+
+                return true;
+            }
+
             if (query.currentPhase() == ExecutionPhase.CURSOR_PUBLICATION) {
                 // Should initialize a new cursor.
                 query.moveTo(ExecutionPhase.CURSOR_INITIALIZATION);
@@ -114,7 +144,8 @@ class QueryExecutionProgram extends 
Program<AsyncSqlCursor<InternalSqlRow>> {
             return false;
         }
 
-        return nodeLeft(th) || lockConflict(th) || replicaMiss(th) || 
groupOverloaded(th);
+        return nodeLeft(th) || lockConflict(th) || replicaMiss(th) || 
groupOverloaded(th)
+                || multiStepPlanOutdated(th) || incompatibleSchemaChange(th) 
|| fastPlanSchemaVersionMismatch(th);
     }
 
     private static boolean nodeLeft(Throwable th) {
@@ -132,4 +163,16 @@ class QueryExecutionProgram extends 
Program<AsyncSqlCursor<InternalSqlRow>> {
     private static boolean groupOverloaded(Throwable th) {
         return ExceptionUtils.extractCodeFrom(th) == 
Replicator.GROUP_OVERLOADED_ERR;
     }
+
+    private static boolean multiStepPlanOutdated(Throwable th)  {
+        return th instanceof SqlPlanOutdatedException;
+    }
+
+    private static boolean incompatibleSchemaChange(Throwable th) {
+        return ExceptionUtils.extractCodeFrom(th) == 
Transactions.TX_INCOMPATIBLE_SCHEMA_ERR;
+    }
+
+    private static boolean fastPlanSchemaVersionMismatch(Throwable th) {
+        return ExceptionUtils.hasCause(th, 
InternalSchemaVersionMismatchException.class);
+    }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueGetPlan.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueGetPlan.java
index e7d4318cfaa..6f01c9073b1 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueGetPlan.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueGetPlan.java
@@ -316,4 +316,11 @@ public class KeyValueGetPlan implements ExplainablePlan, 
ExecutablePlan {
     public int catalogVersion() {
         return catalogVersion;
     }
+
+    @Override
+    public boolean lazyCursorPublication() {
+        // Let's postpone cursor publication so we can recover from errors 
during
+        // plan execution, like `InternalSchemaVersionMismatchException`.
+        return false;
+    }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/QueryPlan.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/QueryPlan.java
index 7c37a1996a9..40945404177 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/QueryPlan.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/QueryPlan.java
@@ -62,4 +62,11 @@ public interface QueryPlan {
      * Returns the number of source relations used by this plan. Each relation 
is reported the number of times it is used. 
      */
     int numSources();
+
+    /**
+     * Returns a flag indicating that the query cursor can be published 
without waiting for the first page preloading.
+     */
+    default boolean lazyCursorPublication() {
+        return type() == SqlQueryType.QUERY;
+    }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index 68be9802cf9..7e367802eb1 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -109,6 +109,7 @@ import 
org.apache.ignite.internal.sql.engine.NodeLeftException;
 import org.apache.ignite.internal.sql.engine.QueryCancel;
 import org.apache.ignite.internal.sql.engine.QueryCancelledException;
 import org.apache.ignite.internal.sql.engine.SqlOperationContext;
+import org.apache.ignite.internal.sql.engine.SqlPlanToTxSchemaVersionValidator;
 import 
org.apache.ignite.internal.sql.engine.exec.ExecutionServiceImplTest.TestCluster.TestNode;
 import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
 import org.apache.ignite.internal.sql.engine.exec.exp.SqlExpressionFactoryImpl;
@@ -1393,7 +1394,8 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
                 new SqlExpressionFactoryImpl(
                         Commons.typeFactory(), 1024, 
CaffeineCacheFactory.INSTANCE
                 ),
-                SHUTDOWN_TIMEOUT
+                SHUTDOWN_TIMEOUT,
+                SqlPlanToTxSchemaVersionValidator.NOOP
         );
 
         taskExecutor.start();
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryTimeoutTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryTimeoutTest.java
index d12cc3cb8f5..8a0c4979f27 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryTimeoutTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryTimeoutTest.java
@@ -147,14 +147,10 @@ public class QueryTimeoutTest extends 
BaseIgniteAbstractTest {
 
     @Test
     void testTimeoutKvGet() {
-        AsyncSqlCursor<?> cursor = 
gatewayNode.executeQuery(PROPS_WITH_TIMEOUT, "SELECT * FROM my_table WHERE id = 
?", 2);
-
-        assertThat(
-                cursor.requestNextAsync(1),
-                willThrowWithCauseOrSuppressed(
-                        QueryCancelledException.class,
-                        QueryCancelledException.TIMEOUT_MSG
-                )
+        assertThrows(
+                SqlException.class,
+                () -> gatewayNode.executeQuery(PROPS_WITH_TIMEOUT, "SELECT * 
FROM my_table WHERE id = ?", 2),
+                QueryCancelledException.TIMEOUT_MSG
         );
     }
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/SqlOutdatedPlanTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/SqlOutdatedPlanTest.java
new file mode 100644
index 00000000000..844e45d5a1b
--- /dev/null
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/SqlOutdatedPlanTest.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import static org.apache.ignite.internal.catalog.CatalogTestUtils.columnParams;
+import static org.apache.ignite.internal.sql.SqlCommon.DEFAULT_SCHEMA_NAME;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static org.apache.ignite.sql.ColumnType.INT32;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.catalog.CatalogCommand;
+import org.apache.ignite.internal.catalog.commands.AlterTableAddColumnCommand;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
+import org.apache.ignite.internal.sql.engine.InternalSqlRow;
+import org.apache.ignite.internal.sql.engine.SqlPlanToTxSchemaVersionValidator;
+import org.apache.ignite.internal.sql.engine.SqlProperties;
+import org.apache.ignite.internal.sql.engine.exec.QueryRecoveryTest.TxType;
+import 
org.apache.ignite.internal.sql.engine.framework.NoOpTransactionalOperationTracker;
+import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
+import 
org.apache.ignite.internal.sql.engine.framework.TestBuilders.PrepareServiceWithPrepareCallback;
+import org.apache.ignite.internal.sql.engine.framework.TestCluster;
+import org.apache.ignite.internal.sql.engine.framework.TestNode;
+import org.apache.ignite.internal.sql.engine.tx.QueryTransactionContext;
+import org.apache.ignite.internal.sql.engine.tx.QueryTransactionWrapper;
+import org.apache.ignite.internal.sql.engine.tx.QueryTransactionWrapperImpl;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.awaitility.Awaitility;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+/**
+ * Tests that outdated SQL query execution plan is re-planned using the
+ * {@link InternalTransaction#schemaTimestamp() schema time} of the started 
transaction.
+ *
+ * <p>Currently, the transaction starts after query planning is completed, if 
the schema has changed during planning
+ * (the catalog version used in the plan does not match the catalog version of 
the corresponding transaction start time),
+ * then this plan is considered outdated and the planning phase should be 
repeated using the transaction start time.
+ *
+ * @see SqlPlanOutdatedException
+ * @see SqlPlanToTxSchemaVersionValidator
+ */
+public class SqlOutdatedPlanTest extends BaseIgniteAbstractTest {
+    private static final List<String> DATA_NODES = List.of("DATA_1", "DATA_2");
+    private static final String GATEWAY_NODE_NAME = "gateway";
+
+    private TestCluster cluster;
+
+    @BeforeAll
+    static void warmUpCluster() throws Exception {
+        TestBuilders.warmupTestCluster();
+    }
+
+    @BeforeEach
+    void startCluster() {
+        cluster = TestBuilders.cluster()
+                .nodes(GATEWAY_NODE_NAME, DATA_NODES.toArray(new String[0]))
+                .build();
+
+        cluster.start();
+
+        TestNode gatewayNode = cluster.node(GATEWAY_NODE_NAME);
+
+        cluster.setAssignmentsProvider("T1",
+                (partitionCount, b) -> IntStream.range(0, partitionCount)
+                        .mapToObj(i -> DATA_NODES)
+                        .collect(Collectors.toList())
+        );
+
+        gatewayNode.initSchema("CREATE TABLE t1 (id INT PRIMARY KEY)");
+
+        cluster.setDataProvider("T1", TestBuilders.tableScan((nodeName, 
partId) ->
+                Collections.singleton(new Object[]{partId}))
+        );
+    }
+
+    @ParameterizedTest
+    @EnumSource(TxType.class)
+    void planningIsRepeatedUsingTheSameTransaction(TxType type) {
+        TestTxContext txContext = new TestTxContext(type);
+        TestNode gatewayNode = cluster.node(GATEWAY_NODE_NAME);
+        PrepareServiceSpy prepareServiceSpy = new 
PrepareServiceSpy(gatewayNode);
+
+        CompletableFuture<Lock> lockFut1 = 
prepareServiceSpy.resetLockAndBlockNextCall();
+
+        CompletableFuture<AsyncSqlCursor<InternalSqlRow>> fut =
+                gatewayNode.executeQueryAsync(new SqlProperties(), txContext, 
"SELECT id FROM t1");
+
+        await(lockFut1);
+        assertThat(prepareServiceSpy.callsCounter.get(), is(1));
+        assertThat(txContext.startedTxCounter.get(), is(0));
+
+        // Simulate concurrent schema modification.
+        await(cluster.catalogManager().execute(
+                makeAddColumnCommand("VAL1")));
+
+        CompletableFuture<Lock> lockFut2 = 
prepareServiceSpy.resetLockAndBlockNextCall();
+        Lock lock2 = await(lockFut2);
+        assertThat(prepareServiceSpy.callsCounter.get(), is(2));
+        assertThat(txContext.startedTxCounter.get(), is(1));
+
+        // Simulate another one schema modification.
+        await(cluster.catalogManager().execute(
+                makeAddColumnCommand("VAL2")));
+
+        lock2.unlock();
+
+        await(await(fut).closeAsync());
+
+        // Planning must be repeated, but only once.
+        assertThat(prepareServiceSpy.callsCounter.get(), is(2));
+
+        // Transaction should be started only once.
+        assertThat(txContext.startedTxCounter.get(), is(1));
+    }
+
+    @Test
+    void schemaChangedAndNodeDisconnectedDuringPlanning() {
+        TestTxContext txContext = new TestTxContext(TxType.RO);
+        TestNode gatewayNode = cluster.node(GATEWAY_NODE_NAME);
+        PrepareServiceSpy prepareServiceSpy = new 
PrepareServiceSpy(gatewayNode);
+
+        CompletableFuture<Lock> lockFut = 
prepareServiceSpy.resetLockAndBlockNextCall();
+
+        CompletableFuture<AsyncSqlCursor<InternalSqlRow>> fut =
+                gatewayNode.executeQueryAsync(new SqlProperties(), txContext, 
"SELECT id FROM t1");
+
+        Lock lock = await(lockFut);
+        assertThat(prepareServiceSpy.callsCounter.get(), is(1));
+        assertThat(txContext.startedTxCounter.get(), is(0));
+
+        // Simulate concurrent schema modification.
+        await(cluster.catalogManager().execute(
+                makeAddColumnCommand("VAL1")));
+
+        // And node disconnection.
+        cluster.node(DATA_NODES.get(0)).disconnect();
+
+        lock.unlock();
+
+        await(await(fut).closeAsync());
+
+        // Planning must be repeated, but only once.
+        assertThat(prepareServiceSpy.callsCounter.get(), is(2));
+
+        // The plan execution must be repeated using a new transaction.
+        assertThat(txContext.startedTxCounter.get(), is(2));
+    }
+
+    private static CatalogCommand makeAddColumnCommand(String columnName) {
+        return AlterTableAddColumnCommand.builder()
+                .schemaName(DEFAULT_SCHEMA_NAME)
+                .tableName("T1")
+                .columns(List.of(columnParams(columnName, INT32)))
+                .build();
+    }
+
+    private static class PrepareServiceSpy {
+        private final AtomicInteger callsCounter = new AtomicInteger();
+        private final AtomicReference<ReentrantLock> prepareBlockHolder = new 
AtomicReference<>();
+
+        PrepareServiceSpy(TestNode gatewayNode) {
+            ((PrepareServiceWithPrepareCallback) gatewayNode.prepareService())
+                    .setPrepareCallback(() -> {
+                        callsCounter.incrementAndGet();
+
+                        Lock lock = prepareBlockHolder.get();
+
+                        try {
+                            lock.tryLock(10, TimeUnit.SECONDS);
+                        } catch (InterruptedException e) {
+                            throw new RuntimeException(e);
+                        } finally {
+                            lock.unlock();
+                        }
+                    });
+        }
+
+        CompletableFuture<Lock> resetLockAndBlockNextCall() {
+            ReentrantLock nextLock = new ReentrantLock();
+
+            //noinspection LockAcquiredButNotSafelyReleased
+            nextLock.lock();
+
+            ReentrantLock prevLock = prepareBlockHolder.getAndSet(nextLock);
+
+            if (prevLock != null) {
+                prevLock.unlock();
+            }
+
+            return CompletableFuture.supplyAsync(() -> {
+                Awaitility.await().until(nextLock::getQueueLength, is(1));
+
+                return nextLock;
+            });
+        }
+    }
+
+    private static class TestTxContext implements QueryTransactionContext {
+        private final TxType txType;
+        private final AtomicInteger startedTxCounter = new AtomicInteger();
+
+        TestTxContext(TxType txType) {
+            this.txType = txType;
+        }
+
+        @Override
+        public QueryTransactionWrapper getOrStartSqlManaged(boolean 
readOnlyIgnored, boolean implicit) {
+            startedTxCounter.incrementAndGet();
+
+            return new QueryTransactionWrapperImpl(txType.create(), true, 
NoOpTransactionalOperationTracker.INSTANCE);
+        }
+
+        @Override
+        public void updateObservableTime(HybridTimestamp time) {
+            // NO-OP
+        }
+
+        @Override
+        public @Nullable QueryTransactionWrapper explicitTx() {
+            return null;
+        }
+    }
+}
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
index 12c9dfc91a5..f1a82d670bb 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -96,6 +96,7 @@ import 
org.apache.ignite.internal.partitiondistribution.Assignment;
 import org.apache.ignite.internal.partitiondistribution.TokenizedAssignments;
 import 
org.apache.ignite.internal.partitiondistribution.TokenizedAssignmentsImpl;
 import org.apache.ignite.internal.sql.SqlCommon;
+import org.apache.ignite.internal.sql.engine.SqlOperationContext;
 import org.apache.ignite.internal.sql.engine.api.expressions.RowFactory;
 import org.apache.ignite.internal.sql.engine.api.kill.OperationKillHandler;
 import org.apache.ignite.internal.sql.engine.exec.ExecutableTable;
@@ -114,7 +115,10 @@ import 
org.apache.ignite.internal.sql.engine.exec.mapping.ColocationGroup;
 import 
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionDistributionProvider;
 import org.apache.ignite.internal.sql.engine.exec.mapping.FragmentDescription;
 import org.apache.ignite.internal.sql.engine.exec.mapping.MappingServiceImpl;
+import org.apache.ignite.internal.sql.engine.prepare.PrepareService;
 import org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl;
+import org.apache.ignite.internal.sql.engine.prepare.PreparedPlan;
+import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
 import 
org.apache.ignite.internal.sql.engine.prepare.ddl.DdlSqlToCommandConverter;
 import 
org.apache.ignite.internal.sql.engine.prepare.pruning.PartitionPrunerImpl;
 import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor;
@@ -130,6 +134,7 @@ import 
org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
 import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManagerImpl;
 import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
 import org.apache.ignite.internal.sql.engine.schema.TableDescriptorImpl;
+import org.apache.ignite.internal.sql.engine.sql.ParsedResult;
 import org.apache.ignite.internal.sql.engine.sql.ParserServiceImpl;
 import org.apache.ignite.internal.sql.engine.statistic.SqlStatisticManager;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
@@ -718,6 +723,8 @@ public class TestBuilders {
                     configurationValue
             );
 
+            PrepareServiceWithPrepareCallback prepareSvcWithCallback = new 
PrepareServiceWithPrepareCallback(prepareService);
+
             Map<String, List<String>> systemViewsByNode = new HashMap<>();
 
             for (Entry<String, Set<String>> entry : 
nodeName2SystemView.entrySet()) {
@@ -793,7 +800,7 @@ public class TestBuilders {
                                 catalogManager,
                                 (TestClusterService) 
clusterService.forNode(name),
                                 parserService,
-                                prepareService,
+                                prepareSvcWithCallback,
                                 schemaManager,
                                 mappingService,
                                 new TestExecutableTableRegistry(
@@ -821,7 +828,7 @@ public class TestBuilders {
                     assignmentsProviderByTableName,
                     nodes,
                     catalogManager,
-                    prepareService,
+                    prepareSvcWithCallback,
                     clockWaiter,
                     initClosure,
                     stopClosure
@@ -1702,6 +1709,50 @@ public class TestBuilders {
         }
     }
 
+    /**
+     * A wrapper for {@link PrepareService} that executes a specified callback 
each time the
+     * {@link #prepareAsync(ParsedResult, SqlOperationContext)} method is 
called.
+     */
+    public static class PrepareServiceWithPrepareCallback implements 
PrepareService {
+        private final PrepareService delegate;
+
+        private volatile Runnable prepareCallback = null;
+
+        PrepareServiceWithPrepareCallback(PrepareService delegate) {
+            this.delegate = delegate;
+        }
+
+        public void setPrepareCallback(Runnable callback) {
+            prepareCallback = callback;
+        }
+
+        @Override
+        public CompletableFuture<QueryPlan> prepareAsync(ParsedResult 
parsedResult, SqlOperationContext ctx) {
+            Runnable callback = prepareCallback;
+
+            if (callback != null) {
+                callback.run();
+            }
+
+            return delegate.prepareAsync(parsedResult, ctx);
+        }
+
+        @Override
+        public Set<PreparedPlan> preparedPlans() {
+            return delegate.preparedPlans();
+        }
+
+        @Override
+        public void start() {
+            delegate.start();
+        }
+
+        @Override
+        public void stop() throws Exception {
+            delegate.stop();
+        }
+    }
+
     /**
      * Creates a cluster and runs a simple query to facilitate loading of 
necessary classes to prepare and execute sql queries.
      *
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
index d2316ad0d58..0c7b948c62e 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
@@ -52,6 +52,7 @@ import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
 import org.apache.ignite.internal.sql.engine.InternalSqlRow;
 import org.apache.ignite.internal.sql.engine.QueryCancel;
 import org.apache.ignite.internal.sql.engine.SqlOperationContext;
+import org.apache.ignite.internal.sql.engine.SqlPlanToTxSchemaVersionValidator;
 import org.apache.ignite.internal.sql.engine.SqlProperties;
 import org.apache.ignite.internal.sql.engine.api.expressions.RowFactoryFactory;
 import org.apache.ignite.internal.sql.engine.api.kill.OperationKillHandler;
@@ -206,7 +207,8 @@ public class TestNode implements LifecycleAware {
                 new SqlExpressionFactoryImpl(
                         Commons.typeFactory(), 1024, 
CaffeineCacheFactory.INSTANCE
                 ),
-                5_000
+                5_000,
+                SqlPlanToTxSchemaVersionValidator.create(new 
AlwaysSyncedSchemaSyncService(), catalogService)
         ));
 
         registerService(new 
IgniteComponentLifecycleAwareAdapter(systemViewManager));
@@ -289,6 +291,10 @@ public class TestNode implements LifecycleAware {
         return clockService;
     }
 
+    public PrepareService prepareService() {
+        return prepareService;
+    }
+
     /**
      * Prepares (aka parses, validates, and optimizes) the given query string
      * and returns the plan to execute.
@@ -396,11 +402,10 @@ public class TestNode implements LifecycleAware {
     public AsyncSqlCursor<InternalSqlRow> executeQuery(
             SqlProperties properties, QueryTransactionContext txContext, 
String query, Object... params
     ) {
-        return await(queryExecutor.executeQuery(
+        return await(executeQueryAsync(
                 properties,
                 txContext,
                 query,
-                null,
                 params
         ));
     }
@@ -414,6 +419,19 @@ public class TestNode implements LifecycleAware {
         return executeQuery(properties, ImplicitTxContext.create(), query, 
params);
     }
 
+    /** Executes the given query. */
+    public CompletableFuture<AsyncSqlCursor<InternalSqlRow>> executeQueryAsync(
+            SqlProperties properties, QueryTransactionContext txContext, 
String query, Object... params
+    ) {
+        return queryExecutor.executeQuery(
+                properties,
+                txContext,
+                query,
+                null,
+                params
+        );
+    }
+
     public List<QueryInfo> runningQueries() {
         return queryExecutor.runningQueries();
     }

Reply via email to