Copilot commented on code in PR #7283:
URL: https://github.com/apache/ignite-3/pull/7283#discussion_r2646135239
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/CursorPublicationPhaseHandler.java:
##########
@@ -46,7 +47,7 @@ public Result handle(Query query) {
SqlQueryType queryType = cursor.queryType();
- if (queryType == SqlQueryType.QUERY) {
+ if (queryType == SqlQueryType.QUERY && !(query.plan instanceof
KeyValueGetPlan)) {
// Preserve lazy execution for statements that only reads.
Review Comment:
The condition on line 50 now excludes KeyValueGetPlan from lazy execution.
However, there's no comment explaining why KeyValueGetPlan should not preserve
lazy execution like other QUERY types. This special case handling could be
confusing for future maintainers. Consider adding a comment explaining the
rationale for treating KeyValueGetPlan differently.
```suggestion
// For regular QUERY plans we can preserve lazy execution because
they are pure reads:
// if the cursor is closed early, no stateful work (like DML, KV
operations or transaction
// side effects) is cancelled. KeyValueGetPlan, although classified
as QUERY, is executed
// via the key-value path and participates in the same
lifecycle/transaction semantics as
// non-QUERY operations, so it intentionally does NOT use lazy
publication here.
if (queryType == SqlQueryType.QUERY && !(query.plan instanceof
KeyValueGetPlan)) {
// Preserve lazy execution for statements that only read.
```
##########
modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/SqlOutdatedPlanTest.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import static org.apache.ignite.internal.catalog.CatalogTestUtils.columnParams;
+import static org.apache.ignite.internal.sql.SqlCommon.DEFAULT_SCHEMA_NAME;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static org.apache.ignite.sql.ColumnType.INT32;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.catalog.CatalogCommand;
+import org.apache.ignite.internal.catalog.commands.AlterTableAddColumnCommand;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
+import org.apache.ignite.internal.sql.engine.InternalSqlRow;
+import org.apache.ignite.internal.sql.engine.SqlPlanToTxSchemaVersionValidator;
+import org.apache.ignite.internal.sql.engine.SqlProperties;
+import org.apache.ignite.internal.sql.engine.exec.QueryRecoveryTest.TxType;
+import
org.apache.ignite.internal.sql.engine.framework.NoOpTransactionalOperationTracker;
+import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
+import
org.apache.ignite.internal.sql.engine.framework.TestBuilders.PrepareServiceWithPrepareCallback;
+import org.apache.ignite.internal.sql.engine.framework.TestCluster;
+import org.apache.ignite.internal.sql.engine.framework.TestNode;
+import org.apache.ignite.internal.sql.engine.tx.QueryTransactionContext;
+import org.apache.ignite.internal.sql.engine.tx.QueryTransactionWrapper;
+import org.apache.ignite.internal.sql.engine.tx.QueryTransactionWrapperImpl;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.awaitility.Awaitility;
+import org.hamcrest.Matcher;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+/**
+ * Tests that outdated SQL query execution plan is re-planned using the
+ * {@link InternalTransaction#schemaTimestamp() schema time} of the started
transaction.
+ *
+ * <p>Currently, the transaction starts after query planning is completed, if
the schema has changed during planning
+ * (the catalog version used in the plan does not match the catalog version of
the corresponding transaction start time),
+ * then this plan is considered outdated and the planning phase should be
repeated using the transaction start time.
+ *
+ * @see SqlPlanOutdatedException
+ * @see SqlPlanToTxSchemaVersionValidator
+ */
+public class SqlOutdatedPlanTest extends BaseIgniteAbstractTest {
+ private static final List<String> DATA_NODES = List.of("DATA_1", "DATA_2");
+ private static final String GATEWAY_NODE_NAME = "gateway";
+
+ private TestCluster cluster;
+
+ @BeforeAll
+ static void warmUpCluster() throws Exception {
+ TestBuilders.warmupTestCluster();
+ }
+
+ @BeforeEach
+ void startCluster() {
+ cluster = TestBuilders.cluster()
+ .nodes(GATEWAY_NODE_NAME, DATA_NODES.toArray(new String[0]))
+ .build();
+
+ cluster.start();
+
+ TestNode gatewayNode = cluster.node(GATEWAY_NODE_NAME);
+
+ cluster.setAssignmentsProvider("T1",
+ (partitionCount, b) -> IntStream.range(0, partitionCount)
+ .mapToObj(i -> DATA_NODES)
+ .collect(Collectors.toList())
+ );
+
+ gatewayNode.initSchema("CREATE TABLE t1 (id INT PRIMARY KEY)");
+
+ cluster.setDataProvider("T1", TestBuilders.tableScan((nodeName,
partId) ->
+ Collections.singleton(new Object[]{partId}))
+ );
+ }
+
+ @ParameterizedTest
+ @EnumSource(TxType.class)
+ void planningIsRepeatedUsingTheSameTransaction(TxType type) throws
Exception {
+ TestTxContext txContext = new TestTxContext(type);
+ TestNode gatewayNode = cluster.node(GATEWAY_NODE_NAME);
+ PrepareServiceSpy prepareServiceSpy = new
PrepareServiceSpy(gatewayNode);
+
+ CompletableFuture<AsyncSqlCursor<InternalSqlRow>> fut =
+ gatewayNode.executeQueryAsync(new SqlProperties(), txContext,
"SELECT id FROM t1");
+
+ prepareServiceSpy.waitUntilCallsCount(is(1));
+ assertThat(txContext.startedTxCounter.get(), is(0));
+
+ // Simulate concurrent schema modification.
+ await(cluster.catalogManager().execute(
+ makeAddColumnCommand("VAL1")));
+
+ prepareServiceSpy.unblock();
+ prepareServiceSpy.waitUntilCallsCount(is(2));
+
+ // Simulate another one schema modification.
+ await(cluster.catalogManager().execute(
+ makeAddColumnCommand("VAL2")));
+
+ prepareServiceSpy.unblock();
+
+ await(await(fut).closeAsync());
+
+ // Planning must be repeated, but only once.
+ assertThat(prepareServiceSpy.callsCounter.get(), is(2));
+
+ // Transaction should be started only once.
+ assertThat(txContext.startedTxCounter.get(), is(1));
+ }
+
+ @Test
+ void schemaChangedAndNodeDisconnectedDuringPlanning() {
+ TestTxContext txContext = new TestTxContext(TxType.RO);
+ TestNode gatewayNode = cluster.node(GATEWAY_NODE_NAME);
+ PrepareServiceSpy prepareServiceSpy = new
PrepareServiceSpy(gatewayNode);
+
+ CompletableFuture<AsyncSqlCursor<InternalSqlRow>> fut =
+ gatewayNode.executeQueryAsync(new SqlProperties(), txContext,
"SELECT id FROM t1");
+
+ prepareServiceSpy.waitUntilCallsCount(is(1));
+ assertThat(txContext.startedTxCounter.get(), is(0));
+
+ // Simulate concurrent schema modification.
+ await(cluster.catalogManager().execute(
+ makeAddColumnCommand("VAL1")));
+
+ // And node disconnection.
+ cluster.node(DATA_NODES.get(0)).disconnect();
+
+ prepareServiceSpy.unblock();
+ prepareServiceSpy.waitUntilCallsCount(is(2));
+ prepareServiceSpy.unblock();
+
+ await(await(fut).closeAsync());
+
+ // Planning must be repeated, but only once.
+ assertThat(prepareServiceSpy.callsCounter.get(), is(2));
+
+ // The plan execution must be repeated using a new transaction.
+ assertThat(txContext.startedTxCounter.get(), is(2));
+ }
+
+ private static CatalogCommand makeAddColumnCommand(String columnName) {
+ return AlterTableAddColumnCommand.builder()
+ .schemaName(DEFAULT_SCHEMA_NAME)
+ .tableName("T1")
+ .columns(List.of(columnParams(columnName, INT32)))
+ .build();
+ }
+
+ private static class PrepareServiceSpy {
+ private final AtomicInteger callsCounter = new AtomicInteger();
+ private final CyclicBarrier prepareBarrier = new CyclicBarrier(2);
+
+ PrepareServiceSpy(TestNode gatewayNode) {
+ ((PrepareServiceWithPrepareCallback) gatewayNode.prepareService())
+ .setPrepareCallback(() -> {
+ callsCounter.incrementAndGet();
+
+ awaitBarrier();
+ });
+ }
+
+ public void waitUntilCallsCount(Matcher<Integer> integerMatcher) {
+ Awaitility.await().until(callsCounter::get, integerMatcher);
+ }
+
+ void unblock() {
+ awaitBarrier();
+ }
+
+ private void awaitBarrier() {
+ try {
+ prepareBarrier.await(5, TimeUnit.SECONDS);
Review Comment:
The CyclicBarrier is initialized with parties=2, and awaitBarrier() is
called from both the test thread (via unblock()) and the callback thread (from
setPrepareCallback). However, after the first await() completes, the barrier
automatically resets for the next use. Between test iterations at lines
161-163, there are two consecutive calls: prepareServiceSpy.unblock() followed
by prepareServiceSpy.waitUntilCallsCount(is(2)). If the callback executes and
increments the counter before the barrier is awaited again, there's a risk of a
thread waiting indefinitely at the barrier. Consider using CountDownLatch or
other synchronization mechanism that better fits the sequential nature of this
test.
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlOperationContext.java:
##########
@@ -142,6 +145,14 @@ public ZoneId timeZoneId() {
return topologyVersion;
}
+ public @Nullable QueryTransactionWrapper usedTx() {
+ return usedTx;
+ }
+
+ public void usedTx(@Nullable QueryTransactionWrapper usedTx) {
Review Comment:
The `usedTx` field is declared as `volatile` but there are potential
thread-safety issues. The getter and setter methods are not synchronized, and
there's a check-then-act pattern in OptimizingPhaseHandler where the value is
read and used multiple times. This could lead to race conditions if multiple
threads access this field concurrently. Consider using proper synchronization
or an AtomicReference if concurrent access is expected.
```suggestion
public synchronized @Nullable QueryTransactionWrapper usedTx() {
return usedTx;
}
public synchronized void usedTx(@Nullable QueryTransactionWrapper
usedTx) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]