Copilot commented on code in PR #7283:
URL: https://github.com/apache/ignite-3/pull/7283#discussion_r2646135239


##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/CursorPublicationPhaseHandler.java:
##########
@@ -46,7 +47,7 @@ public Result handle(Query query) {
 
         SqlQueryType queryType = cursor.queryType();
 
-        if (queryType == SqlQueryType.QUERY) {
+        if (queryType == SqlQueryType.QUERY && !(query.plan instanceof 
KeyValueGetPlan)) {
             // Preserve lazy execution for statements that only reads.

Review Comment:
   The condition on line 50 now excludes KeyValueGetPlan from lazy execution. 
However, there's no comment explaining why KeyValueGetPlan should not preserve 
lazy execution like other QUERY types. This special case handling could be 
confusing for future maintainers. Consider adding a comment explaining the 
rationale for treating KeyValueGetPlan differently.
   ```suggestion
           // For regular QUERY plans we can preserve lazy execution because 
they are pure reads:
           // if the cursor is closed early, no stateful work (like DML, KV 
operations or transaction
           // side effects) is cancelled. KeyValueGetPlan, although classified 
as QUERY, is executed
           // via the key-value path and participates in the same 
lifecycle/transaction semantics as
           // non-QUERY operations, so it intentionally does NOT use lazy 
publication here.
           if (queryType == SqlQueryType.QUERY && !(query.plan instanceof 
KeyValueGetPlan)) {
               // Preserve lazy execution for statements that only read.
   ```



##########
modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/SqlOutdatedPlanTest.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import static org.apache.ignite.internal.catalog.CatalogTestUtils.columnParams;
+import static org.apache.ignite.internal.sql.SqlCommon.DEFAULT_SCHEMA_NAME;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static org.apache.ignite.sql.ColumnType.INT32;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.catalog.CatalogCommand;
+import org.apache.ignite.internal.catalog.commands.AlterTableAddColumnCommand;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
+import org.apache.ignite.internal.sql.engine.InternalSqlRow;
+import org.apache.ignite.internal.sql.engine.SqlPlanToTxSchemaVersionValidator;
+import org.apache.ignite.internal.sql.engine.SqlProperties;
+import org.apache.ignite.internal.sql.engine.exec.QueryRecoveryTest.TxType;
+import 
org.apache.ignite.internal.sql.engine.framework.NoOpTransactionalOperationTracker;
+import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
+import 
org.apache.ignite.internal.sql.engine.framework.TestBuilders.PrepareServiceWithPrepareCallback;
+import org.apache.ignite.internal.sql.engine.framework.TestCluster;
+import org.apache.ignite.internal.sql.engine.framework.TestNode;
+import org.apache.ignite.internal.sql.engine.tx.QueryTransactionContext;
+import org.apache.ignite.internal.sql.engine.tx.QueryTransactionWrapper;
+import org.apache.ignite.internal.sql.engine.tx.QueryTransactionWrapperImpl;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.awaitility.Awaitility;
+import org.hamcrest.Matcher;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+/**
+ * Tests that outdated SQL query execution plan is re-planned using the
+ * {@link InternalTransaction#schemaTimestamp() schema time} of the started 
transaction.
+ *
+ * <p>Currently, the transaction starts after query planning is completed, if 
the schema has changed during planning
+ * (the catalog version used in the plan does not match the catalog version of 
the corresponding transaction start time),
+ * then this plan is considered outdated and the planning phase should be 
repeated using the transaction start time.
+ *
+ * @see SqlPlanOutdatedException
+ * @see SqlPlanToTxSchemaVersionValidator
+ */
+public class SqlOutdatedPlanTest extends BaseIgniteAbstractTest {
+    private static final List<String> DATA_NODES = List.of("DATA_1", "DATA_2");
+    private static final String GATEWAY_NODE_NAME = "gateway";
+
+    private TestCluster cluster;
+
+    @BeforeAll
+    static void warmUpCluster() throws Exception {
+        TestBuilders.warmupTestCluster();
+    }
+
+    @BeforeEach
+    void startCluster() {
+        cluster = TestBuilders.cluster()
+                .nodes(GATEWAY_NODE_NAME, DATA_NODES.toArray(new String[0]))
+                .build();
+
+        cluster.start();
+
+        TestNode gatewayNode = cluster.node(GATEWAY_NODE_NAME);
+
+        cluster.setAssignmentsProvider("T1",
+                (partitionCount, b) -> IntStream.range(0, partitionCount)
+                        .mapToObj(i -> DATA_NODES)
+                        .collect(Collectors.toList())
+        );
+
+        gatewayNode.initSchema("CREATE TABLE t1 (id INT PRIMARY KEY)");
+
+        cluster.setDataProvider("T1", TestBuilders.tableScan((nodeName, 
partId) ->
+                Collections.singleton(new Object[]{partId}))
+        );
+    }
+
+    @ParameterizedTest
+    @EnumSource(TxType.class)
+    void planningIsRepeatedUsingTheSameTransaction(TxType type) throws 
Exception {
+        TestTxContext txContext = new TestTxContext(type);
+        TestNode gatewayNode = cluster.node(GATEWAY_NODE_NAME);
+        PrepareServiceSpy prepareServiceSpy = new 
PrepareServiceSpy(gatewayNode);
+
+        CompletableFuture<AsyncSqlCursor<InternalSqlRow>> fut =
+                gatewayNode.executeQueryAsync(new SqlProperties(), txContext, 
"SELECT id FROM t1");
+
+        prepareServiceSpy.waitUntilCallsCount(is(1));
+        assertThat(txContext.startedTxCounter.get(), is(0));
+
+        // Simulate concurrent schema modification.
+        await(cluster.catalogManager().execute(
+                makeAddColumnCommand("VAL1")));
+
+        prepareServiceSpy.unblock();
+        prepareServiceSpy.waitUntilCallsCount(is(2));
+
+        // Simulate another one schema modification.
+        await(cluster.catalogManager().execute(
+                makeAddColumnCommand("VAL2")));
+
+        prepareServiceSpy.unblock();
+
+        await(await(fut).closeAsync());
+
+        // Planning must be repeated, but only once.
+        assertThat(prepareServiceSpy.callsCounter.get(), is(2));
+
+        // Transaction should be started only once.
+        assertThat(txContext.startedTxCounter.get(), is(1));
+    }
+
+    @Test
+    void schemaChangedAndNodeDisconnectedDuringPlanning() {
+        TestTxContext txContext = new TestTxContext(TxType.RO);
+        TestNode gatewayNode = cluster.node(GATEWAY_NODE_NAME);
+        PrepareServiceSpy prepareServiceSpy = new 
PrepareServiceSpy(gatewayNode);
+
+        CompletableFuture<AsyncSqlCursor<InternalSqlRow>> fut =
+                gatewayNode.executeQueryAsync(new SqlProperties(), txContext, 
"SELECT id FROM t1");
+
+        prepareServiceSpy.waitUntilCallsCount(is(1));
+        assertThat(txContext.startedTxCounter.get(), is(0));
+
+        // Simulate concurrent schema modification.
+        await(cluster.catalogManager().execute(
+                makeAddColumnCommand("VAL1")));
+
+        // And node disconnection.
+        cluster.node(DATA_NODES.get(0)).disconnect();
+
+        prepareServiceSpy.unblock();
+        prepareServiceSpy.waitUntilCallsCount(is(2));
+        prepareServiceSpy.unblock();
+
+        await(await(fut).closeAsync());
+
+        // Planning must be repeated, but only once.
+        assertThat(prepareServiceSpy.callsCounter.get(), is(2));
+
+        // The plan execution must be repeated using a new transaction.
+        assertThat(txContext.startedTxCounter.get(), is(2));
+    }
+
+    private static CatalogCommand makeAddColumnCommand(String columnName) {
+        return AlterTableAddColumnCommand.builder()
+                .schemaName(DEFAULT_SCHEMA_NAME)
+                .tableName("T1")
+                .columns(List.of(columnParams(columnName, INT32)))
+                .build();
+    }
+
+    private static class PrepareServiceSpy {
+        private final AtomicInteger callsCounter = new AtomicInteger();
+        private final CyclicBarrier prepareBarrier = new CyclicBarrier(2);
+
+        PrepareServiceSpy(TestNode gatewayNode) {
+            ((PrepareServiceWithPrepareCallback) gatewayNode.prepareService())
+                    .setPrepareCallback(() -> {
+                        callsCounter.incrementAndGet();
+
+                        awaitBarrier();
+                    });
+        }
+
+        public void waitUntilCallsCount(Matcher<Integer> integerMatcher) {
+            Awaitility.await().until(callsCounter::get, integerMatcher);
+        }
+
+        void unblock() {
+            awaitBarrier();
+        }
+
+        private void awaitBarrier() {
+            try {
+                prepareBarrier.await(5, TimeUnit.SECONDS);

Review Comment:
   The CyclicBarrier is initialized with parties=2, and awaitBarrier() is 
called from both the test thread (via unblock()) and the callback thread (from 
setPrepareCallback). However, after the first await() completes, the barrier 
automatically resets for the next use. Between test iterations at lines 
161-163, there are two consecutive calls: prepareServiceSpy.unblock() followed 
by prepareServiceSpy.waitUntilCallsCount(is(2)). If the callback executes and 
increments the counter before the barrier is awaited again, there's a risk of a 
thread waiting indefinitely at the barrier. Consider using CountDownLatch or 
other synchronization mechanism that better fits the sequential nature of this 
test.



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlOperationContext.java:
##########
@@ -142,6 +145,14 @@ public ZoneId timeZoneId() {
         return topologyVersion;
     }
 
+    public @Nullable QueryTransactionWrapper usedTx() {
+        return usedTx;
+    }
+
+    public void usedTx(@Nullable QueryTransactionWrapper usedTx) {

Review Comment:
   The `usedTx` field is declared as `volatile` but there are potential 
thread-safety issues. The getter and setter methods are not synchronized, and 
there's a check-then-act pattern in OptimizingPhaseHandler where the value is 
read and used multiple times. This could lead to race conditions if multiple 
threads access this field concurrently. Consider using proper synchronization 
or an AtomicReference if concurrent access is expected.
   ```suggestion
       public synchronized @Nullable QueryTransactionWrapper usedTx() {
           return usedTx;
       }
   
       public synchronized void usedTx(@Nullable QueryTransactionWrapper 
usedTx) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to