This is an automated email from the ASF dual-hosted git repository.
ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new ff9e052c23d IGNITE-22303 Sql. Retry operation when plan gets outdated
by the time of execution (#7309)
ff9e052c23d is described below
commit ff9e052c23df364026b03f7587403af814dc85c5
Author: Pavel Pereslegin <[email protected]>
AuthorDate: Mon Jan 5 19:56:15 2026 +0300
IGNITE-22303 Sql. Retry operation when plan gets outdated by the time of
execution (#7309)
---
.../ItSqlConcurrentSchemaModificationTest.java | 126 ++++++++++
.../internal/sql/engine/SqlOperationContext.java | 37 ++-
.../engine/SqlPlanToTxSchemaVersionValidator.java | 91 ++++++++
.../internal/sql/engine/SqlQueryProcessor.java | 3 +-
.../sql/engine/exec/ExchangeServiceImpl.java | 4 +-
.../sql/engine/exec/ExecutionServiceImpl.java | 115 ++++++----
.../sql/engine/exec/SqlPlanOutdatedException.java | 35 +++
.../exec/fsm/CursorPublicationPhaseHandler.java | 10 +-
.../engine/exec/fsm/OptimizingPhaseHandler.java | 40 ++--
.../internal/sql/engine/exec/fsm/Program.java | 6 +-
.../sql/engine/exec/fsm/QueryExecutionProgram.java | 45 +++-
.../sql/engine/prepare/KeyValueGetPlan.java | 7 +
.../internal/sql/engine/prepare/QueryPlan.java | 7 +
.../sql/engine/exec/ExecutionServiceImplTest.java | 4 +-
.../internal/sql/engine/exec/QueryTimeoutTest.java | 12 +-
.../sql/engine/exec/SqlOutdatedPlanTest.java | 255 +++++++++++++++++++++
.../sql/engine/framework/TestBuilders.java | 55 ++++-
.../internal/sql/engine/framework/TestNode.java | 24 +-
18 files changed, 792 insertions(+), 84 deletions(-)
diff --git
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlConcurrentSchemaModificationTest.java
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlConcurrentSchemaModificationTest.java
new file mode 100644
index 00000000000..a08391e8e1b
--- /dev/null
+++
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlConcurrentSchemaModificationTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine;
+
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
+import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.sql.async.AsyncResultSet;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+/**
+ * Integration tests to verify SQL query execution during concurrent schema
updates.
+ */
+public class ItSqlConcurrentSchemaModificationTest extends
BaseSqlIntegrationTest {
+ @Override
+ protected int initialNodes() {
+ return 1;
+ }
+
+ @Override
+ protected String getNodeBootstrapConfigTemplate() {
+ return "ignite.sql.execution.threadCount: 64";
+ }
+
+ @AfterEach
+ void dropTables() {
+ System.clearProperty("FAST_QUERY_OPTIMIZATION_ENABLED");
+ Commons.resetFastQueryOptimizationFlag();
+
unwrapIgniteImpl(CLUSTER.aliveNode()).queryEngine().invalidatePlannerCache(Set.of());
+
+ dropAllTables();
+ }
+
+ @ParameterizedTest(name = "FastQueryOptimization={0}")
+ @ValueSource(booleans = {true, false})
+ void dmlWithConcurrentDdl(Boolean fastPlan) throws InterruptedException {
+ System.setProperty("FAST_QUERY_OPTIMIZATION_ENABLED",
fastPlan.toString());
+
+ IgniteSql sql = CLUSTER.aliveNode().sql();
+
+ sql("CREATE TABLE t(id INT PRIMARY KEY)");
+
+ int iterations = 20;
+
+ for (int i = 0; i < iterations; i++) {
+ log.info("iteration #" + i);
+
+ String ddlQuery = i % 2 == 0
+ ? "ALTER TABLE t ADD COLUMN val VARCHAR DEFAULT 'abc'"
+ : "ALTER TABLE t DROP COLUMN val";
+
+ CompletableFuture<AsyncResultSet<SqlRow>> ddlFut =
sql.executeAsync(null, ddlQuery);
+
+ Thread.sleep(ThreadLocalRandom.current().nextInt(15) * 10);
+
+ assertQuery("INSERT INTO t (id) VALUES (?)")
+ .withParam(i)
+ .returns(1L)
+ .check();
+
+ await(await(ddlFut).closeAsync());
+
+ assertEquals(0, txManager().pending());
+ }
+ }
+
+ @ParameterizedTest(name = "FastQueryOptimization={0}")
+ @ValueSource(booleans = {true, false})
+ void selectWithConcurrentDdl(boolean fastPlan) throws InterruptedException
{
+ System.setProperty("FAST_QUERY_OPTIMIZATION_ENABLED",
String.valueOf(fastPlan));
+
+ IgniteSql sql = CLUSTER.aliveNode().sql();
+
+ sql("CREATE TABLE t(id INT PRIMARY KEY, id2 INT)");
+ sql("INSERT INTO t VALUES (0, 0), (1, 1), (2, 2), (3, 3), (4, 4), (5,
5), (6, 6), (7, 7), (8, 8), (9, 9)");
+
+ int iterations = 20;
+
+ for (int i = 0; i < iterations; i++) {
+ log.info("iteration #" + i);
+
+ String ddlQuery = i % 2 == 0
+ ? "ALTER TABLE t ADD COLUMN val VARCHAR DEFAULT 'abc'"
+ : "ALTER TABLE t DROP COLUMN val";
+
+ CompletableFuture<AsyncResultSet<SqlRow>> ddlFut =
sql.executeAsync(null, ddlQuery);
+ Thread.sleep(ThreadLocalRandom.current().nextInt(15) * 10);
+
+ int id = i % 10;
+
+ assertQuery(format("SELECT id2 FROM t WHERE id={}", id))
+ .returns(id)
+ .check();
+
+ await(await(ddlFut).closeAsync());
+
+ assertEquals(0, txManager().pending());
+ }
+ }
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlOperationContext.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlOperationContext.java
index 737270b9ff5..50ea6d21151 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlOperationContext.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlOperationContext.java
@@ -23,6 +23,7 @@ import java.time.ZoneId;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -52,6 +53,7 @@ public final class SqlOperationContext {
private final @Nullable Consumer<Throwable> errorListener;
private final @Nullable String userName;
private final @Nullable Long topologyVersion;
+ private final @Nullable AtomicReference<QueryTransactionWrapper>
retryTxHolder;
/**
* Private constructor, used by a builder.
@@ -67,7 +69,8 @@ public final class SqlOperationContext {
@Nullable Consumer<QueryTransactionWrapper> txUsedListener,
@Nullable Consumer<Throwable> errorListener,
@Nullable String userName,
- @Nullable Long topologyVersion
+ @Nullable Long topologyVersion,
+ @Nullable QueryTransactionWrapper retryTx
) {
this.queryId = queryId;
this.timeZoneId = timeZoneId;
@@ -80,12 +83,36 @@ public final class SqlOperationContext {
this.errorListener = errorListener;
this.userName = userName;
this.topologyVersion = topologyVersion;
+ this.retryTxHolder = new AtomicReference<>(retryTx);
}
public static Builder builder() {
return new Builder();
}
+ /**
+ * Copies an existing context preserving the existing transaction.
+ *
+ * <p>Used in case of a retry. If the operation is repeated while
preserving the running transaction,
+ * the operation time is taken from this transaction.
+ */
+ public SqlOperationContext withTransactionForRetry(QueryTransactionWrapper
tx) {
+ return new SqlOperationContext(
+ queryId,
+ timeZoneId,
+ parameters,
+ tx.unwrap().schemaTimestamp(),
+ txContext,
+ cancel,
+ defaultSchemaName,
+ txUsedListener,
+ errorListener,
+ userName,
+ topologyVersion,
+ tx
+ );
+ }
+
/** Returns unique identifier of the query. */
public UUID queryId() {
return queryId;
@@ -182,6 +209,11 @@ public final class SqlOperationContext {
return excludedNodes.isEmpty() ? null : excludedNodes::contains;
}
+ /** Returns transaction used for retry operation or {@code null}. */
+ public @Nullable QueryTransactionWrapper retryTx() {
+ return retryTxHolder.getAndSet(null);
+ }
+
/**
* Query context builder.
*/
@@ -268,7 +300,8 @@ public final class SqlOperationContext {
txUsedListener,
errorListener,
userName,
- topologyVersion
+ topologyVersion,
+ null
);
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlPlanToTxSchemaVersionValidator.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlPlanToTxSchemaVersionValidator.java
new file mode 100644
index 00000000000..f6972c40f50
--- /dev/null
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlPlanToTxSchemaVersionValidator.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine;
+
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.SchemaSyncService;
+import org.apache.ignite.internal.sql.engine.exec.SqlPlanOutdatedException;
+import org.apache.ignite.internal.sql.engine.prepare.MultiStepPlan;
+import org.apache.ignite.internal.sql.engine.tx.QueryTransactionWrapper;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * SQL query execution plan validator.
+ *
+ * <p>Performs validation of the catalog version from the {@link MultiStepPlan
multi-step plan} relative to the started transaction.
+ */
+public class SqlPlanToTxSchemaVersionValidator {
+ @TestOnly
+ public static final SqlPlanToTxSchemaVersionValidator NOOP = new
NoopSqlPlanToTxSchemaVersionValidator();
+
+ private final SchemaSyncService schemaSyncService;
+ private final CatalogService catalogService;
+
+ /**
+ * Compares the catalog version from the plan with the version of the
active catalog
+ * at the {@link InternalTransaction#schemaTimestamp() schema time} of the
transaction.
+ *
+ * @param plan {@link MultiStepPlan multi-step} execution plan.
+ * @param tx Query transaction wrapper.
+ * @return Successfully completed future if the provided transaction is
explicit or the catalog versions match.
+ * Otherwise returns a future completed with an exception {@link
SqlPlanOutdatedException}.
+ */
+ public CompletableFuture<Void> validate(MultiStepPlan plan,
QueryTransactionWrapper tx) {
+ if (!tx.implicit()) {
+ return nullCompletedFuture();
+ }
+
+ HybridTimestamp ts = tx.unwrap().schemaTimestamp();
+
+ return schemaSyncService.waitForMetadataCompleteness(ts)
+ .thenRun(() -> {
+ // TODO https://issues.apache.org/jira/browse/IGNITE-27491
Avoid re-planning in case of unrelated catalog changes
+ int requiredCatalog =
catalogService.activeCatalogVersion(ts.longValue());
+
+ if (requiredCatalog != plan.catalogVersion()) {
+ throw new SqlPlanOutdatedException();
+ }
+ });
+ }
+
+ private SqlPlanToTxSchemaVersionValidator(SchemaSyncService
schemaSyncService, CatalogService catalogService) {
+ this.schemaSyncService = schemaSyncService;
+ this.catalogService = catalogService;
+ }
+
+ public static SqlPlanToTxSchemaVersionValidator create(SchemaSyncService
schemaSyncService, CatalogService catalogService) {
+ return new SqlPlanToTxSchemaVersionValidator(schemaSyncService,
catalogService);
+ }
+
+ private static class NoopSqlPlanToTxSchemaVersionValidator extends
SqlPlanToTxSchemaVersionValidator {
+ @SuppressWarnings("DataFlowIssue")
+ private NoopSqlPlanToTxSchemaVersionValidator() {
+ super(null, null);
+ }
+
+ @Override
+ public CompletableFuture<Void> validate(MultiStepPlan plan,
QueryTransactionWrapper tx) {
+ return nullCompletedFuture();
+ }
+ }
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 1d1c9e7c833..00b7b55a1db 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -396,7 +396,8 @@ public class SqlQueryProcessor implements QueryProcessor,
SystemViewProvider {
clockService,
killCommandHandler,
expressionFactory,
- EXECUTION_SERVICE_SHUTDOWN_TIMEOUT
+ EXECUTION_SERVICE_SHUTDOWN_TIMEOUT,
+ SqlPlanToTxSchemaVersionValidator.create(schemaSyncService,
catalogManager)
));
queryExecutor = registerService(new QueryExecutor(
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
index a52f9a6bad8..674b8e37f16 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
@@ -123,8 +123,10 @@ public class ExchangeServiceImpl implements
ExchangeService {
Throwable traceableErr = ExceptionUtils.unwrapCause(error);
if (!(traceableErr instanceof TraceableException)) {
- traceableErr = error = new IgniteInternalException(INTERNAL_ERR,
error);
+ traceableErr = new IgniteInternalException(INTERNAL_ERR, error);
+ }
+ if (((TraceableException) traceableErr).code() == INTERNAL_ERR) {
LOG.info(format("Failed to execute query fragment: traceId={},
executionId={}, fragmentId={}",
((TraceableException) traceableErr).traceId(),
executionId, fragmentId), error);
} else if (LOG.isDebugEnabled()) {
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index fa858ed8c4d..ce1e20f4a9c 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -83,6 +83,7 @@ import org.apache.ignite.internal.sql.engine.QueryCancel;
import org.apache.ignite.internal.sql.engine.QueryCancelledException;
import org.apache.ignite.internal.sql.engine.SchemaAwareConverter;
import org.apache.ignite.internal.sql.engine.SqlOperationContext;
+import org.apache.ignite.internal.sql.engine.SqlPlanToTxSchemaVersionValidator;
import
org.apache.ignite.internal.sql.engine.SqlQueryProcessor.PrefetchCallback;
import org.apache.ignite.internal.sql.engine.SqlQueryType;
import org.apache.ignite.internal.sql.engine.api.expressions.RowFactoryFactory;
@@ -189,6 +190,8 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, LogicalTopo
private final SqlExpressionFactory sqlExpressionFactory;
+ private final SqlPlanToTxSchemaVersionValidator planValidator;
+
/**
* Constructor.
*
@@ -204,6 +207,7 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, LogicalTopo
* @param clockService Clock service.
* @param killCommandHandler Kill command handler.
* @param shutdownTimeout Shutdown timeout.
+ * @param planValidator Validator of the catalog version from the plan
relative to the started transaction.
*/
public ExecutionServiceImpl(
MessageService messageService,
@@ -220,7 +224,8 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, LogicalTopo
ClockService clockService,
KillCommandHandler killCommandHandler,
SqlExpressionFactory sqlExpressionFactory,
- long shutdownTimeout
+ long shutdownTimeout,
+ SqlPlanToTxSchemaVersionValidator planValidator
) {
this.localNode = topSrvc.localMember();
this.handler = handler;
@@ -237,6 +242,7 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, LogicalTopo
this.killCommandHandler = killCommandHandler;
this.sqlExpressionFactory = sqlExpressionFactory;
this.shutdownTimeout = shutdownTimeout;
+ this.planValidator = planValidator;
}
/**
@@ -259,6 +265,7 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, LogicalTopo
* @param clockService Clock service.
* @param killCommandHandler Kill command handler.
* @param shutdownTimeout Shutdown timeout.
+ * @param planValidator Validator of the catalog version from the plan
relative to the started transaction.
* @return An execution service.
*/
public static <RowT> ExecutionServiceImpl<RowT> create(
@@ -278,7 +285,8 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, LogicalTopo
ClockService clockService,
KillCommandHandler killCommandHandler,
SqlExpressionFactory sqlExpressionFactory,
- long shutdownTimeout
+ long shutdownTimeout,
+ SqlPlanToTxSchemaVersionValidator planValidator
) {
return new ExecutionServiceImpl<>(
msgSrvc,
@@ -301,7 +309,8 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, LogicalTopo
clockService,
killCommandHandler,
sqlExpressionFactory,
- shutdownTimeout
+ shutdownTimeout,
+ planValidator
);
}
@@ -330,60 +339,76 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, LogicalTopo
assert old == null;
- QueryTransactionContext txContext = operationContext.txContext();
-
- assert txContext != null;
-
boolean readOnly = plan.type().implicitTransactionReadOnlyMode();
- QueryTransactionWrapper txWrapper =
txContext.getOrStartSqlManaged(readOnly, false);
+ QueryTransactionWrapper txWrapper =
getOrStartTransaction(operationContext, readOnly);
InternalTransaction tx = txWrapper.unwrap();
- operationContext.notifyTxUsed(txWrapper);
+ return planValidator.validate(plan, txWrapper)
+ .thenCompose(ignore -> {
+ PrefetchCallback prefetchCallback =
queryManager.prefetchCallback;
- PrefetchCallback prefetchCallback = queryManager.prefetchCallback;
+ CompletableFuture<Void> firstPageReady =
prefetchCallback.prefetchFuture();
- CompletableFuture<Void> firstPageReady =
prefetchCallback.prefetchFuture();
+ if (plan.type() == SqlQueryType.DML) {
+ // DML is supposed to have a single row response, so
if the first page is ready, then all
+ // inputs have been processed, all tables have been
updated, and now it should be safe to
+ // commit implicit transaction
+ firstPageReady = firstPageReady.thenCompose(none ->
txWrapper.finalise());
+ }
- if (plan.type() == SqlQueryType.DML) {
- // DML is supposed to have a single row response, so if the first
page is ready, then all
- // inputs have been processed, all tables have been updated, and
now it should be safe to
- // commit implicit transaction
- firstPageReady = firstPageReady.thenCompose(none ->
txWrapper.finalise());
- }
+ CompletableFuture<Void> firstPageReady0 = firstPageReady;
- CompletableFuture<Void> firstPageReady0 = firstPageReady;
-
- Predicate<String> nodeExclusionFilter =
operationContext.nodeExclusionFilter();
-
- CompletableFuture<AsyncDataCursor<InternalSqlRow>> f =
queryManager.execute(tx, plan, nodeExclusionFilter)
- .thenApply(dataCursor -> new TxAwareAsyncCursor<>(
- txWrapper,
- dataCursor,
- firstPageReady0,
- queryManager::close,
- operationContext::notifyError
- ));
-
- return f.handle((r, t) -> {
- if (t != null) {
- // We were unable to create cursor, hence need to finalise
transaction wrapper
- // which were created solely for this operation.
- return txWrapper.finalise(t).handle((none, finalizationErr) ->
{
- if (finalizationErr != null) {
- t.addSuppressed(finalizationErr);
- }
+ Predicate<String> nodeExclusionFilter =
operationContext.nodeExclusionFilter();
- // Re-throw the exception, so execution future is
completed with the same exception.
- sneakyThrow(t);
+ CompletableFuture<AsyncDataCursor<InternalSqlRow>> f =
queryManager.execute(tx, plan, nodeExclusionFilter)
+ .thenApply(dataCursor -> new TxAwareAsyncCursor<>(
+ txWrapper,
+ dataCursor,
+ firstPageReady0,
+ queryManager::close,
+ operationContext::notifyError
+ ));
+
+ return f.handle((r, t) -> {
+ if (t != null) {
+ // We were unable to create cursor, hence need to
finalise transaction wrapper
+ // which were created solely for this operation.
+ return txWrapper.finalise(t).handle((none,
finalizationErr) -> {
+ if (finalizationErr != null) {
+ t.addSuppressed(finalizationErr);
+ }
+
+ // Re-throw the exception, so execution future
is completed with the same exception.
+ sneakyThrow(t);
+
+ // We must never reach this line.
+ return (AsyncDataCursor<InternalSqlRow>) null;
+ });
+ }
- // We must never reach this line.
- return (AsyncDataCursor<InternalSqlRow>) null;
+ return completedFuture(r);
+ }).thenCompose(Function.identity());
});
- }
+ }
+
+ private static QueryTransactionWrapper
getOrStartTransaction(SqlOperationContext operationContext, boolean readOnly) {
+ QueryTransactionContext txContext = operationContext.txContext();
+
+ assert txContext != null;
+
+ // Try to use previously started transaction.
+ QueryTransactionWrapper txWrapper = operationContext.retryTx();
+
+ if (txWrapper != null) {
+ return txWrapper;
+ }
+
+ txWrapper = txContext.getOrStartSqlManaged(readOnly, false);
+
+ operationContext.notifyTxUsed(txWrapper);
- return completedFuture(r);
- }).thenCompose(Function.identity());
+ return txWrapper;
}
private static SqlOperationContext createOperationContext(
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SqlPlanOutdatedException.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SqlPlanOutdatedException.java
new file mode 100644
index 00000000000..8c1eacf79f8
--- /dev/null
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SqlPlanOutdatedException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import org.apache.ignite.internal.tx.InternalTransaction;
+
+/**
+ * Exception occurs when SQL engine detects that the execution plan is
outdated after preparation.
+ *
+ * <p>It is used internally to signal the SQL engine to retry the optimization
step
+ * using the transaction {@link InternalTransaction#schemaTimestamp() schema
timestamp}.
+ * This exception should never be passed to the user, and has no special error
code.
+ */
+public class SqlPlanOutdatedException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public SqlPlanOutdatedException() {
+ super(null, null, false, false);
+ }
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/CursorPublicationPhaseHandler.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/CursorPublicationPhaseHandler.java
index 527c8c090aa..6ff287147e2 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/CursorPublicationPhaseHandler.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/CursorPublicationPhaseHandler.java
@@ -18,14 +18,14 @@
package org.apache.ignite.internal.sql.engine.exec.fsm;
import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
-import org.apache.ignite.internal.sql.engine.SqlQueryType;
+import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
import org.apache.ignite.internal.util.AsyncCursor;
/**
* Handler that postpones moment of publication of the cursor.
*
* <p>Without this delay, the cursor is published as soon as all fragments is
initialized on remote nodes. And here the most confusing side
- * effect: for multi-step DML, if {@link AsyncCursor#closeAsync()} invoked
immediately after cursor becomes public, all fragments will be
+ * effect: for multi-step DML, if {@link AsyncCursor#closeAsync()} invoked
immediately after cursor becomes public, all fragments will be
* cancelled, and implicit transaction will be commited. This affect script
processing, because currently it iterates over all te results
* and just closes them. From user's point of view, such a behaviour renders
certain DML statements to be skipped since no results are
* commited.
@@ -41,12 +41,12 @@ class CursorPublicationPhaseHandler implements
ExecutionPhaseHandler {
@Override
public Result handle(Query query) {
AsyncSqlCursor<?> cursor = query.cursor;
+ QueryPlan plan = query.plan;
assert cursor != null;
+ assert plan != null;
- SqlQueryType queryType = cursor.queryType();
-
- if (queryType == SqlQueryType.QUERY) {
+ if (plan.lazyCursorPublication()) {
// Preserve lazy execution for statements that only reads.
return Result.completed();
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/OptimizingPhaseHandler.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/OptimizingPhaseHandler.java
index fd57453007a..76646891c8a 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/OptimizingPhaseHandler.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/OptimizingPhaseHandler.java
@@ -45,6 +45,31 @@ class OptimizingPhaseHandler implements
ExecutionPhaseHandler {
assert result != null : "Query is expected to be parsed at this phase";
+ SqlOperationContext operationContext = buildContext(query, result);
+
+ CompletableFuture<Void> awaitFuture =
query.executor.waitForMetadata(operationContext.operationTime())
+ .thenCompose(none -> query.executor.prepare(result,
operationContext)
+ .thenAccept(plan -> {
+ if (query.txContext.explicitTx() == null) {
+ // in case of implicit tx we have to update
observable time to prevent tx manager to start
+ // implicit transaction too much in the past
where version of catalog we used to prepare the
+ // plan was not yet available
+
query.txContext.updateObservableTime(query.executor.deriveMinimalRequiredTime(plan));
+ }
+
+ query.plan = plan;
+ }));
+
+ return Result.proceedAfter(awaitFuture);
+ }
+
+ private static SqlOperationContext buildContext(Query query, ParsedResult
result) {
+ SqlOperationContext retryContext = query.operationContext;
+
+ if (retryContext != null) {
+ return retryContext;
+ }
+
validateParsedStatement(query.properties, result);
validateDynamicParameters(result.dynamicParamsCount(), query.params,
true);
ensureStatementMatchesTx(result.queryType(), query.txContext);
@@ -70,20 +95,7 @@ class OptimizingPhaseHandler implements
ExecutionPhaseHandler {
query.operationContext = operationContext;
- CompletableFuture<Void> awaitFuture =
query.executor.waitForMetadata(operationTime)
- .thenCompose(none -> query.executor.prepare(result,
operationContext)
- .thenAccept(plan -> {
- if (query.txContext.explicitTx() == null) {
- // in case of implicit tx we have to update
observable time to prevent tx manager to start
- // implicit transaction too much in the past
where version of catalog we used to prepare the
- // plan was not yet available
-
query.txContext.updateObservableTime(query.executor.deriveMinimalRequiredTime(plan));
- }
-
- query.plan = plan;
- }));
-
- return Result.proceedAfter(awaitFuture);
+ return operationContext;
}
/** Checks that the statement is allowed within an external/script
transaction. */
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/Program.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/Program.java
index fc41f0fb4a3..199dbc69200 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/Program.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/Program.java
@@ -120,7 +120,11 @@ class Program<ResultT> {
break;
}
}
- } while (advanceQuery(query, state));
+
+ if (!advanceQuery(query, state)) {
+ break;
+ }
+ } while (true);
}
private boolean shouldRetry(Query query, Throwable th) {
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutionProgram.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutionProgram.java
index 763597b4f0a..5e71c6a485e 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutionProgram.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutionProgram.java
@@ -21,10 +21,14 @@ import static org.apache.ignite.lang.ErrorGroups.Replicator;
import static org.apache.ignite.lang.ErrorGroups.Transactions;
import java.util.List;
+import
org.apache.ignite.internal.partition.replicator.schemacompat.InternalSchemaVersionMismatchException;
import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
import org.apache.ignite.internal.sql.engine.InternalSqlRow;
import org.apache.ignite.internal.sql.engine.SqlOperationContext;
+import org.apache.ignite.internal.sql.engine.exec.SqlPlanOutdatedException;
import org.apache.ignite.internal.sql.engine.message.UnknownNodeException;
+import org.apache.ignite.internal.sql.engine.prepare.MultiStepPlan;
+import org.apache.ignite.internal.sql.engine.tx.QueryTransactionWrapper;
import org.apache.ignite.internal.util.ExceptionUtils;
/**
@@ -75,6 +79,32 @@ class QueryExecutionProgram extends
Program<AsyncSqlCursor<InternalSqlRow>> {
static boolean errorHandler(Query query, Throwable th) {
if (canRecover(query, th)) {
query.error.set(null);
+
+ if (multiStepPlanOutdated(th)) {
+ assert query.plan instanceof MultiStepPlan;
+ assert query.currentPhase() ==
ExecutionPhase.CURSOR_INITIALIZATION : query.currentPhase();
+
+ SqlOperationContext context = query.operationContext;
+ QueryTransactionWrapper tx = query.usedTransaction;
+
+ assert context != null;
+ assert tx != null;
+
+ query.operationContext = context.withTransactionForRetry(tx);
+
+ query.moveTo(ExecutionPhase.OPTIMIZING);
+
+ return true;
+ }
+
+ if (fastPlanSchemaVersionMismatch(th) ||
incompatibleSchemaChange(th)) {
+ query.operationContext = null;
+
+ query.moveTo(ExecutionPhase.OPTIMIZING);
+
+ return true;
+ }
+
if (query.currentPhase() == ExecutionPhase.CURSOR_PUBLICATION) {
// Should initialize a new cursor.
query.moveTo(ExecutionPhase.CURSOR_INITIALIZATION);
@@ -114,7 +144,8 @@ class QueryExecutionProgram extends
Program<AsyncSqlCursor<InternalSqlRow>> {
return false;
}
- return nodeLeft(th) || lockConflict(th) || replicaMiss(th) ||
groupOverloaded(th);
+ return nodeLeft(th) || lockConflict(th) || replicaMiss(th) ||
groupOverloaded(th)
+ || multiStepPlanOutdated(th) || incompatibleSchemaChange(th)
|| fastPlanSchemaVersionMismatch(th);
}
private static boolean nodeLeft(Throwable th) {
@@ -132,4 +163,16 @@ class QueryExecutionProgram extends
Program<AsyncSqlCursor<InternalSqlRow>> {
private static boolean groupOverloaded(Throwable th) {
return ExceptionUtils.extractCodeFrom(th) ==
Replicator.GROUP_OVERLOADED_ERR;
}
+
+ private static boolean multiStepPlanOutdated(Throwable th) {
+ return th instanceof SqlPlanOutdatedException;
+ }
+
+ private static boolean incompatibleSchemaChange(Throwable th) {
+ return ExceptionUtils.extractCodeFrom(th) ==
Transactions.TX_INCOMPATIBLE_SCHEMA_ERR;
+ }
+
+ private static boolean fastPlanSchemaVersionMismatch(Throwable th) {
+ return ExceptionUtils.hasCause(th,
InternalSchemaVersionMismatchException.class);
+ }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueGetPlan.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueGetPlan.java
index e7d4318cfaa..6f01c9073b1 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueGetPlan.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueGetPlan.java
@@ -316,4 +316,11 @@ public class KeyValueGetPlan implements ExplainablePlan,
ExecutablePlan {
public int catalogVersion() {
return catalogVersion;
}
+
+ @Override
+ public boolean lazyCursorPublication() {
+ // Let's postpone cursor publication so we can recover from errors
during
+ // plan execution, like `InternalSchemaVersionMismatchException`.
+ return false;
+ }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/QueryPlan.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/QueryPlan.java
index 7c37a1996a9..40945404177 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/QueryPlan.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/QueryPlan.java
@@ -62,4 +62,11 @@ public interface QueryPlan {
* Returns the number of source relations used by this plan. Each relation
is reported the number of times it is used.
*/
int numSources();
+
+ /**
+ * Returns a flag indicating that the query cursor can be published
without waiting for the first page preloading.
+ */
+ default boolean lazyCursorPublication() {
+ return type() == SqlQueryType.QUERY;
+ }
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index 68be9802cf9..7e367802eb1 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -109,6 +109,7 @@ import
org.apache.ignite.internal.sql.engine.NodeLeftException;
import org.apache.ignite.internal.sql.engine.QueryCancel;
import org.apache.ignite.internal.sql.engine.QueryCancelledException;
import org.apache.ignite.internal.sql.engine.SqlOperationContext;
+import org.apache.ignite.internal.sql.engine.SqlPlanToTxSchemaVersionValidator;
import
org.apache.ignite.internal.sql.engine.exec.ExecutionServiceImplTest.TestCluster.TestNode;
import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
import org.apache.ignite.internal.sql.engine.exec.exp.SqlExpressionFactoryImpl;
@@ -1393,7 +1394,8 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
new SqlExpressionFactoryImpl(
Commons.typeFactory(), 1024,
CaffeineCacheFactory.INSTANCE
),
- SHUTDOWN_TIMEOUT
+ SHUTDOWN_TIMEOUT,
+ SqlPlanToTxSchemaVersionValidator.NOOP
);
taskExecutor.start();
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryTimeoutTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryTimeoutTest.java
index d12cc3cb8f5..8a0c4979f27 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryTimeoutTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryTimeoutTest.java
@@ -147,14 +147,10 @@ public class QueryTimeoutTest extends
BaseIgniteAbstractTest {
@Test
void testTimeoutKvGet() {
- AsyncSqlCursor<?> cursor =
gatewayNode.executeQuery(PROPS_WITH_TIMEOUT, "SELECT * FROM my_table WHERE id =
?", 2);
-
- assertThat(
- cursor.requestNextAsync(1),
- willThrowWithCauseOrSuppressed(
- QueryCancelledException.class,
- QueryCancelledException.TIMEOUT_MSG
- )
+ assertThrows(
+ SqlException.class,
+ () -> gatewayNode.executeQuery(PROPS_WITH_TIMEOUT, "SELECT *
FROM my_table WHERE id = ?", 2),
+ QueryCancelledException.TIMEOUT_MSG
);
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/SqlOutdatedPlanTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/SqlOutdatedPlanTest.java
new file mode 100644
index 00000000000..844e45d5a1b
--- /dev/null
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/SqlOutdatedPlanTest.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import static org.apache.ignite.internal.catalog.CatalogTestUtils.columnParams;
+import static org.apache.ignite.internal.sql.SqlCommon.DEFAULT_SCHEMA_NAME;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static org.apache.ignite.sql.ColumnType.INT32;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.catalog.CatalogCommand;
+import org.apache.ignite.internal.catalog.commands.AlterTableAddColumnCommand;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
+import org.apache.ignite.internal.sql.engine.InternalSqlRow;
+import org.apache.ignite.internal.sql.engine.SqlPlanToTxSchemaVersionValidator;
+import org.apache.ignite.internal.sql.engine.SqlProperties;
+import org.apache.ignite.internal.sql.engine.exec.QueryRecoveryTest.TxType;
+import
org.apache.ignite.internal.sql.engine.framework.NoOpTransactionalOperationTracker;
+import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
+import
org.apache.ignite.internal.sql.engine.framework.TestBuilders.PrepareServiceWithPrepareCallback;
+import org.apache.ignite.internal.sql.engine.framework.TestCluster;
+import org.apache.ignite.internal.sql.engine.framework.TestNode;
+import org.apache.ignite.internal.sql.engine.tx.QueryTransactionContext;
+import org.apache.ignite.internal.sql.engine.tx.QueryTransactionWrapper;
+import org.apache.ignite.internal.sql.engine.tx.QueryTransactionWrapperImpl;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.awaitility.Awaitility;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+/**
+ * Tests that outdated SQL query execution plan is re-planned using the
+ * {@link InternalTransaction#schemaTimestamp() schema time} of the started
transaction.
+ *
+ * <p>Currently, the transaction starts after query planning is completed, if
the schema has changed during planning
+ * (the catalog version used in the plan does not match the catalog version of
the corresponding transaction start time),
+ * then this plan is considered outdated and the planning phase should be
repeated using the transaction start time.
+ *
+ * @see SqlPlanOutdatedException
+ * @see SqlPlanToTxSchemaVersionValidator
+ */
+public class SqlOutdatedPlanTest extends BaseIgniteAbstractTest {
+ private static final List<String> DATA_NODES = List.of("DATA_1", "DATA_2");
+ private static final String GATEWAY_NODE_NAME = "gateway";
+
+ private TestCluster cluster;
+
+ @BeforeAll
+ static void warmUpCluster() throws Exception {
+ TestBuilders.warmupTestCluster();
+ }
+
+ @BeforeEach
+ void startCluster() {
+ cluster = TestBuilders.cluster()
+ .nodes(GATEWAY_NODE_NAME, DATA_NODES.toArray(new String[0]))
+ .build();
+
+ cluster.start();
+
+ TestNode gatewayNode = cluster.node(GATEWAY_NODE_NAME);
+
+ cluster.setAssignmentsProvider("T1",
+ (partitionCount, b) -> IntStream.range(0, partitionCount)
+ .mapToObj(i -> DATA_NODES)
+ .collect(Collectors.toList())
+ );
+
+ gatewayNode.initSchema("CREATE TABLE t1 (id INT PRIMARY KEY)");
+
+ cluster.setDataProvider("T1", TestBuilders.tableScan((nodeName,
partId) ->
+ Collections.singleton(new Object[]{partId}))
+ );
+ }
+
+ @ParameterizedTest
+ @EnumSource(TxType.class)
+ void planningIsRepeatedUsingTheSameTransaction(TxType type) {
+ TestTxContext txContext = new TestTxContext(type);
+ TestNode gatewayNode = cluster.node(GATEWAY_NODE_NAME);
+ PrepareServiceSpy prepareServiceSpy = new
PrepareServiceSpy(gatewayNode);
+
+ CompletableFuture<Lock> lockFut1 =
prepareServiceSpy.resetLockAndBlockNextCall();
+
+ CompletableFuture<AsyncSqlCursor<InternalSqlRow>> fut =
+ gatewayNode.executeQueryAsync(new SqlProperties(), txContext,
"SELECT id FROM t1");
+
+ await(lockFut1);
+ assertThat(prepareServiceSpy.callsCounter.get(), is(1));
+ assertThat(txContext.startedTxCounter.get(), is(0));
+
+ // Simulate concurrent schema modification.
+ await(cluster.catalogManager().execute(
+ makeAddColumnCommand("VAL1")));
+
+ CompletableFuture<Lock> lockFut2 =
prepareServiceSpy.resetLockAndBlockNextCall();
+ Lock lock2 = await(lockFut2);
+ assertThat(prepareServiceSpy.callsCounter.get(), is(2));
+ assertThat(txContext.startedTxCounter.get(), is(1));
+
+ // Simulate another one schema modification.
+ await(cluster.catalogManager().execute(
+ makeAddColumnCommand("VAL2")));
+
+ lock2.unlock();
+
+ await(await(fut).closeAsync());
+
+ // Planning must be repeated, but only once.
+ assertThat(prepareServiceSpy.callsCounter.get(), is(2));
+
+ // Transaction should be started only once.
+ assertThat(txContext.startedTxCounter.get(), is(1));
+ }
+
+ @Test
+ void schemaChangedAndNodeDisconnectedDuringPlanning() {
+ TestTxContext txContext = new TestTxContext(TxType.RO);
+ TestNode gatewayNode = cluster.node(GATEWAY_NODE_NAME);
+ PrepareServiceSpy prepareServiceSpy = new
PrepareServiceSpy(gatewayNode);
+
+ CompletableFuture<Lock> lockFut =
prepareServiceSpy.resetLockAndBlockNextCall();
+
+ CompletableFuture<AsyncSqlCursor<InternalSqlRow>> fut =
+ gatewayNode.executeQueryAsync(new SqlProperties(), txContext,
"SELECT id FROM t1");
+
+ Lock lock = await(lockFut);
+ assertThat(prepareServiceSpy.callsCounter.get(), is(1));
+ assertThat(txContext.startedTxCounter.get(), is(0));
+
+ // Simulate concurrent schema modification.
+ await(cluster.catalogManager().execute(
+ makeAddColumnCommand("VAL1")));
+
+ // And node disconnection.
+ cluster.node(DATA_NODES.get(0)).disconnect();
+
+ lock.unlock();
+
+ await(await(fut).closeAsync());
+
+ // Planning must be repeated, but only once.
+ assertThat(prepareServiceSpy.callsCounter.get(), is(2));
+
+ // The plan execution must be repeated using a new transaction.
+ assertThat(txContext.startedTxCounter.get(), is(2));
+ }
+
+ private static CatalogCommand makeAddColumnCommand(String columnName) {
+ return AlterTableAddColumnCommand.builder()
+ .schemaName(DEFAULT_SCHEMA_NAME)
+ .tableName("T1")
+ .columns(List.of(columnParams(columnName, INT32)))
+ .build();
+ }
+
+ private static class PrepareServiceSpy {
+ private final AtomicInteger callsCounter = new AtomicInteger();
+ private final AtomicReference<ReentrantLock> prepareBlockHolder = new
AtomicReference<>();
+
+ PrepareServiceSpy(TestNode gatewayNode) {
+ ((PrepareServiceWithPrepareCallback) gatewayNode.prepareService())
+ .setPrepareCallback(() -> {
+ callsCounter.incrementAndGet();
+
+ Lock lock = prepareBlockHolder.get();
+
+ try {
+ lock.tryLock(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } finally {
+ lock.unlock();
+ }
+ });
+ }
+
+ CompletableFuture<Lock> resetLockAndBlockNextCall() {
+ ReentrantLock nextLock = new ReentrantLock();
+
+ //noinspection LockAcquiredButNotSafelyReleased
+ nextLock.lock();
+
+ ReentrantLock prevLock = prepareBlockHolder.getAndSet(nextLock);
+
+ if (prevLock != null) {
+ prevLock.unlock();
+ }
+
+ return CompletableFuture.supplyAsync(() -> {
+ Awaitility.await().until(nextLock::getQueueLength, is(1));
+
+ return nextLock;
+ });
+ }
+ }
+
+ private static class TestTxContext implements QueryTransactionContext {
+ private final TxType txType;
+ private final AtomicInteger startedTxCounter = new AtomicInteger();
+
+ TestTxContext(TxType txType) {
+ this.txType = txType;
+ }
+
+ @Override
+ public QueryTransactionWrapper getOrStartSqlManaged(boolean
readOnlyIgnored, boolean implicit) {
+ startedTxCounter.incrementAndGet();
+
+ return new QueryTransactionWrapperImpl(txType.create(), true,
NoOpTransactionalOperationTracker.INSTANCE);
+ }
+
+ @Override
+ public void updateObservableTime(HybridTimestamp time) {
+ // NO-OP
+ }
+
+ @Override
+ public @Nullable QueryTransactionWrapper explicitTx() {
+ return null;
+ }
+ }
+}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
index 12c9dfc91a5..f1a82d670bb 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -96,6 +96,7 @@ import
org.apache.ignite.internal.partitiondistribution.Assignment;
import org.apache.ignite.internal.partitiondistribution.TokenizedAssignments;
import
org.apache.ignite.internal.partitiondistribution.TokenizedAssignmentsImpl;
import org.apache.ignite.internal.sql.SqlCommon;
+import org.apache.ignite.internal.sql.engine.SqlOperationContext;
import org.apache.ignite.internal.sql.engine.api.expressions.RowFactory;
import org.apache.ignite.internal.sql.engine.api.kill.OperationKillHandler;
import org.apache.ignite.internal.sql.engine.exec.ExecutableTable;
@@ -114,7 +115,10 @@ import
org.apache.ignite.internal.sql.engine.exec.mapping.ColocationGroup;
import
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionDistributionProvider;
import org.apache.ignite.internal.sql.engine.exec.mapping.FragmentDescription;
import org.apache.ignite.internal.sql.engine.exec.mapping.MappingServiceImpl;
+import org.apache.ignite.internal.sql.engine.prepare.PrepareService;
import org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl;
+import org.apache.ignite.internal.sql.engine.prepare.PreparedPlan;
+import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
import
org.apache.ignite.internal.sql.engine.prepare.ddl.DdlSqlToCommandConverter;
import
org.apache.ignite.internal.sql.engine.prepare.pruning.PartitionPrunerImpl;
import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor;
@@ -130,6 +134,7 @@ import
org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManagerImpl;
import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
import org.apache.ignite.internal.sql.engine.schema.TableDescriptorImpl;
+import org.apache.ignite.internal.sql.engine.sql.ParsedResult;
import org.apache.ignite.internal.sql.engine.sql.ParserServiceImpl;
import org.apache.ignite.internal.sql.engine.statistic.SqlStatisticManager;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
@@ -718,6 +723,8 @@ public class TestBuilders {
configurationValue
);
+ PrepareServiceWithPrepareCallback prepareSvcWithCallback = new
PrepareServiceWithPrepareCallback(prepareService);
+
Map<String, List<String>> systemViewsByNode = new HashMap<>();
for (Entry<String, Set<String>> entry :
nodeName2SystemView.entrySet()) {
@@ -793,7 +800,7 @@ public class TestBuilders {
catalogManager,
(TestClusterService)
clusterService.forNode(name),
parserService,
- prepareService,
+ prepareSvcWithCallback,
schemaManager,
mappingService,
new TestExecutableTableRegistry(
@@ -821,7 +828,7 @@ public class TestBuilders {
assignmentsProviderByTableName,
nodes,
catalogManager,
- prepareService,
+ prepareSvcWithCallback,
clockWaiter,
initClosure,
stopClosure
@@ -1702,6 +1709,50 @@ public class TestBuilders {
}
}
+ /**
+ * A wrapper for {@link PrepareService} that executes a specified callback
each time the
+ * {@link #prepareAsync(ParsedResult, SqlOperationContext)} method is
called.
+ */
+ public static class PrepareServiceWithPrepareCallback implements
PrepareService {
+ private final PrepareService delegate;
+
+ private volatile Runnable prepareCallback = null;
+
+ PrepareServiceWithPrepareCallback(PrepareService delegate) {
+ this.delegate = delegate;
+ }
+
+ public void setPrepareCallback(Runnable callback) {
+ prepareCallback = callback;
+ }
+
+ @Override
+ public CompletableFuture<QueryPlan> prepareAsync(ParsedResult
parsedResult, SqlOperationContext ctx) {
+ Runnable callback = prepareCallback;
+
+ if (callback != null) {
+ callback.run();
+ }
+
+ return delegate.prepareAsync(parsedResult, ctx);
+ }
+
+ @Override
+ public Set<PreparedPlan> preparedPlans() {
+ return delegate.preparedPlans();
+ }
+
+ @Override
+ public void start() {
+ delegate.start();
+ }
+
+ @Override
+ public void stop() throws Exception {
+ delegate.stop();
+ }
+ }
+
/**
* Creates a cluster and runs a simple query to facilitate loading of
necessary classes to prepare and execute sql queries.
*
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
index d2316ad0d58..0c7b948c62e 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
@@ -52,6 +52,7 @@ import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
import org.apache.ignite.internal.sql.engine.InternalSqlRow;
import org.apache.ignite.internal.sql.engine.QueryCancel;
import org.apache.ignite.internal.sql.engine.SqlOperationContext;
+import org.apache.ignite.internal.sql.engine.SqlPlanToTxSchemaVersionValidator;
import org.apache.ignite.internal.sql.engine.SqlProperties;
import org.apache.ignite.internal.sql.engine.api.expressions.RowFactoryFactory;
import org.apache.ignite.internal.sql.engine.api.kill.OperationKillHandler;
@@ -206,7 +207,8 @@ public class TestNode implements LifecycleAware {
new SqlExpressionFactoryImpl(
Commons.typeFactory(), 1024,
CaffeineCacheFactory.INSTANCE
),
- 5_000
+ 5_000,
+ SqlPlanToTxSchemaVersionValidator.create(new
AlwaysSyncedSchemaSyncService(), catalogService)
));
registerService(new
IgniteComponentLifecycleAwareAdapter(systemViewManager));
@@ -289,6 +291,10 @@ public class TestNode implements LifecycleAware {
return clockService;
}
+ public PrepareService prepareService() {
+ return prepareService;
+ }
+
/**
* Prepares (aka parses, validates, and optimizes) the given query string
* and returns the plan to execute.
@@ -396,11 +402,10 @@ public class TestNode implements LifecycleAware {
public AsyncSqlCursor<InternalSqlRow> executeQuery(
SqlProperties properties, QueryTransactionContext txContext,
String query, Object... params
) {
- return await(queryExecutor.executeQuery(
+ return await(executeQueryAsync(
properties,
txContext,
query,
- null,
params
));
}
@@ -414,6 +419,19 @@ public class TestNode implements LifecycleAware {
return executeQuery(properties, ImplicitTxContext.create(), query,
params);
}
+ /** Executes the given query. */
+ public CompletableFuture<AsyncSqlCursor<InternalSqlRow>> executeQueryAsync(
+ SqlProperties properties, QueryTransactionContext txContext,
String query, Object... params
+ ) {
+ return queryExecutor.executeQuery(
+ properties,
+ txContext,
+ query,
+ null,
+ params
+ );
+ }
+
public List<QueryInfo> runningQueries() {
return queryExecutor.runningQueries();
}