This is an automated email from the ASF dual-hosted git repository.

asf-gitbox-commits pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 45c02d66b23 IGNITE-28568 SQL Calcite: Fix sync tx key waiting for DML 
operations - Fixes #13042.
45c02d66b23 is described below

commit 45c02d66b236d4d5cdb24209a3d8f1bc85532eac
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Thu Apr 23 16:41:24 2026 +0300

    IGNITE-28568 SQL Calcite: Fix sync tx key waiting for DML operations - 
Fixes #13042.
    
    Signed-off-by: Aleksey Plekhanov <[email protected]>
---
 .../query/calcite/exec/rel/ModifyNode.java         |  83 ++++++--
 .../calcite/exec/rel/AbstractExecutionTest.java    |   5 +-
 .../processors/tx/TxThreadLockingTest.java         | 219 +++++++++++++++++++++
 .../ignite/testsuites/IntegrationTestSuite.java    |   3 +-
 4 files changed, 291 insertions(+), 19 deletions(-)

diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
index 76220c84251..fcf2b8ebd33 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
@@ -29,6 +29,8 @@ import javax.cache.processor.MutableEntry;
 import org.apache.calcite.rel.core.TableModify;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cache.context.SessionContextImpl;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
@@ -50,6 +52,12 @@ import static 
org.apache.ignite.internal.processors.query.QueryUtils.cacheForDML
  *
  */
 public class ModifyNode<Row> extends AbstractNode<Row> implements 
SingleNode<Row>, Downstream<Row> {
+    /**
+     * Timeout to wait for async invoke operation to complete. In case this 
timeout exceeded, release thread and
+     * continue execution after invoke operation finished.
+     */
+    private static final long INVOKE_TIMEOUT = 100;
+
     /** */
     protected final CacheTableDescriptor desc;
 
@@ -77,6 +85,9 @@ public class ModifyNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row
     /** */
     private State state = State.UPDATING;
 
+    /** */
+    private IgniteInternalFuture<Map<Object, EntryProcessorResult<Long>>> 
invokeFut;
+
     /**
      * @param ctx Execution context.
      * @param desc Table descriptor.
@@ -119,22 +130,23 @@ public class ModifyNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row
 
         waiting--;
 
-        switch (op) {
-            case DELETE:
-            case UPDATE:
-            case INSERT:
-            case MERGE:
-                tuples.add(desc.toTuple(context(), row, op, cols));
+        tuples.add(desc.toTuple(context(), row, op, cols));
 
-                flushTuples(false);
+        if (invokeFut != null) // Still waiting for previous invocation result.
+            return;
 
-                break;
-            default:
-                throw new UnsupportedOperationException(op.name());
-        }
+        flushTuples(false);
 
-        if (waiting == 0)
-            source().request(waiting = MODIFY_BATCH_SIZE);
+        if (invokeFut != null) {
+            invokeFut.listen(f -> {
+                // Push new task to execute in correct thread.
+                context().execute(() -> {
+                    processInvokeResult(f.get());
+
+                    flushTuples(false);
+                }, this::onError);
+            });
+        }
     }
 
     /** {@inheritDoc} */
@@ -171,8 +183,27 @@ public class ModifyNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row
             source().request(waiting = MODIFY_BATCH_SIZE);
 
         if (state == State.UPDATED && requested > 0) {
+            if (invokeFut != null) { // Still waiting for previous invocation 
result.
+                invokeFut.listen(f -> context().execute(this::tryEnd, 
this::onError));
+
+                return;
+            }
+
             flushTuples(true);
 
+            if (invokeFut != null) {
+                invokeFut.listen(f -> {
+                    // Push new task to execute in correct thread.
+                    context().execute(() -> {
+                        processInvokeResult(f.get());
+
+                        tryEnd();
+                    }, this::onError);
+                });
+
+                return;
+            }
+
             state = State.END;
 
             inLoop = true;
@@ -193,7 +224,7 @@ public class ModifyNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row
 
     /** */
     @SuppressWarnings("unchecked")
-    private void flushTuples(boolean force) throws IgniteCheckedException {
+    private void flushTuples(boolean force) throws Exception {
         if (F.isEmpty(tuples) || !force && tuples.size() < MODIFY_BATCH_SIZE)
             return;
 
@@ -225,7 +256,7 @@ public class ModifyNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row
     private void invokeOutsideTransaction(
         List<ModifyTuple> tuples,
         IgniteInternalCache<Object, Object> cache
-    ) throws IgniteCheckedException {
+    ) throws Exception {
         SessionContextImpl sesCtx = context().unwrap(SessionContextImpl.class);
         Map<String, String> sesAttrs = sesCtx == null ? null : 
sesCtx.attributes();
 
@@ -233,7 +264,22 @@ public class ModifyNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row
             cache = cache.withApplicationAttributes(sesAttrs);
 
         Map<Object, EntryProcessor<Object, Object, Long>> map = 
invokeMap(tuples);
-        Map<Object, EntryProcessorResult<Long>> res = 
cacheForDML(cache).invokeAll(map);
+        invokeFut = cacheForDML(cache).invokeAllAsync(map);
+
+        try {
+            // Shortcut - give a chance for operation to be executed in sync 
mode (it will simplify workflow).
+            Map<Object, EntryProcessorResult<Long>> res = 
invokeFut.get(INVOKE_TIMEOUT);
+
+            processInvokeResult(res);
+        }
+        catch (IgniteFutureTimeoutCheckedException ignore) {
+            // No-op. Result processing task will be scheduled by caller if 
invokeFut != null.
+        }
+    }
+
+    /** */
+    private void processInvokeResult(Map<Object, EntryProcessorResult<Long>> 
res) throws Exception {
+        invokeFut = null;
 
         long updated = 
res.values().stream().mapToLong(EntryProcessorResult::get).sum();
 
@@ -247,6 +293,9 @@ public class ModifyNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row
         }
 
         updatedRows += updated;
+
+        if (waiting == 0)
+            source().request(waiting = MODIFY_BATCH_SIZE);
     }
 
     /**
@@ -260,7 +309,7 @@ public class ModifyNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row
         List<ModifyTuple> tuples,
         IgniteInternalCache<Object, Object> cache,
         GridNearTxLocal userTx
-    ) throws IgniteCheckedException {
+    ) throws Exception {
         userTx.resume();
 
         try {
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
index e67cd18885e..b7f5880259b 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
@@ -87,7 +87,10 @@ public class AbstractExecutionTest extends 
GridCommonAbstractTest {
     protected static final String PARAMS_STRING = "Task executor = {0}, 
Execution strategy = {1}";
 
     /** */
-    protected static final int IN_BUFFER_SIZE = AbstractNode.IN_BUFFER_SIZE;
+    public static final int IN_BUFFER_SIZE = AbstractNode.IN_BUFFER_SIZE;
+
+    /** */
+    public static final int MODIFY_BATCH_SIZE = AbstractNode.MODIFY_BATCH_SIZE;
 
     /** */
     private Throwable lastE;
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/tx/TxThreadLockingTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/tx/TxThreadLockingTest.java
new file mode 100644
index 00000000000..6c451ac68db
--- /dev/null
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/tx/TxThreadLockingTest.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.tx;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.rel.AbstractExecutionTest;
+import 
org.apache.ignite.internal.processors.query.calcite.integration.AbstractBasicIntegrationTest;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static 
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR;
+import static 
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static 
org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+
+/** */
+@RunWith(Parameterized.class)
+public class TxThreadLockingTest extends AbstractBasicIntegrationTest {
+    /** */
+    private static final long TIMEOUT = 10_000L;
+
+    /** */
+    private static final int POOL_SIZE = 10;
+
+    /** */
+    private static final int IN_BUFFER_SIZE = 
AbstractExecutionTest.IN_BUFFER_SIZE;
+
+    /** */
+    private static final int MODIFY_BATCH_SIZE = 
AbstractExecutionTest.MODIFY_BATCH_SIZE;
+
+    /** Use query blocking executor. */
+    @Parameterized.Parameter(0)
+    public boolean qryBlockingExecutor;
+
+    /** */
+    @Parameterized.Parameters(name = "qryBlockingExecutor={0}")
+    public static Collection<?> parameters() {
+        return List.of(false, true);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        System.setProperty(IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR, 
String.valueOf(qryBlockingExecutor));
+
+        startGrid(0);
+
+        client = startClientGrid();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        System.clearProperty(IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        return 
super.getConfiguration(igniteInstanceName).setQueryThreadPoolSize(POOL_SIZE);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected <K, V> CacheConfiguration<K, V> cacheConfiguration() {
+        return super.<K, 
V>cacheConfiguration().setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+    }
+
+    /** */
+    @Test
+    public void testThreadsLockingByDml() throws Exception {
+        createAndPopulateTable();
+
+        CountDownLatch unlockLatch = new CountDownLatch(1);
+
+        IgniteInternalFuture<?> lockKeyFut = lockKeyAndWaitForLatch(0, 
unlockLatch);
+
+        IgniteInternalFuture<?> updFut = 
GridTestUtils.runMultiThreadedAsync(() -> {
+            sql("UPDATE person SET salary = 1 WHERE id = 0");
+        }, POOL_SIZE + 1, "update-thread");
+
+        // Ensure update queries are blocked.
+        assertFalse(GridTestUtils.waitForCondition(updFut::isDone, 200));
+
+        IgniteInternalFuture<?> selectFut = 
GridTestUtils.runMultiThreadedAsync(() -> {
+            for (int i = 0; i < 100; i++)
+                sql("SELECT * FROM person WHERE id = 1");
+        }, POOL_SIZE + 1, "select-thread");
+
+        // Ensure other queries are not blocked by update queries.
+        selectFut.get(TIMEOUT, TimeUnit.MILLISECONDS);
+
+        unlockLatch.countDown();
+
+        lockKeyFut.get();
+
+        // Ensure update queries are released.
+        updFut.get(TIMEOUT, TimeUnit.MILLISECONDS);
+    }
+
+    /** */
+    @Test
+    public void testDifferentBlockedBatchSize() throws Exception {
+        assertTrue("Unexpected constants [MODIFY_BATCH_SIZE=" + 
MODIFY_BATCH_SIZE +
+            ", IN_BUFFER_SIZE=" + IN_BUFFER_SIZE + ']', MODIFY_BATCH_SIZE < 
IN_BUFFER_SIZE);
+
+        createAndPopulateTable();
+
+        IgniteCache<Integer, Employer> cache = client.cache(TABLE_NAME);
+
+        for (int i = 0; i < IN_BUFFER_SIZE + 1; i++)
+            cache.put(i, new Employer("Test" + i, (double)i));
+
+        for (int size : new int[] {MODIFY_BATCH_SIZE, MODIFY_BATCH_SIZE + 1, 
IN_BUFFER_SIZE, IN_BUFFER_SIZE + 1}) {
+            log.info("Blocked batch size: " + size);
+
+            CountDownLatch unlockLatch = new CountDownLatch(1);
+
+            IgniteInternalFuture<?> lockKeyFut = lockKeyAndWaitForLatch(0, 
unlockLatch);
+
+            IgniteInternalFuture<List<List<?>>> updFut = 
GridTestUtils.runAsync(() ->
+                sql("UPDATE person SET salary = 1 WHERE id < ?", size));
+
+            // Ensure update query is blocked.
+            assertFalse(GridTestUtils.waitForCondition(updFut::isDone, 200));
+
+            unlockLatch.countDown();
+
+            lockKeyFut.get();
+
+            List<List<?>> res = updFut.get(TIMEOUT, TimeUnit.MILLISECONDS);
+
+            assertEquals((long)size, res.get(0).get(0));
+        }
+    }
+
+    /** */
+    @Test
+    public void testErrorAfterAsyncWait() throws Exception {
+        createAndPopulateTable();
+
+        CountDownLatch unlockLatch = new CountDownLatch(1);
+
+        IgniteInternalFuture<?> lockKeyFut = lockKeyAndWaitForLatch(0, 
unlockLatch);
+
+        IgniteInternalFuture<List<List<?>>> insFut = GridTestUtils.runAsync(() 
->
+            sql("INSERT INTO person (id, name, salary) VALUES (0, 'Test', 
0)"));
+
+        // Ensure insert query is blocked.
+        assertFalse(GridTestUtils.waitForCondition(insFut::isDone, 200));
+
+        unlockLatch.countDown();
+
+        lockKeyFut.get();
+
+        // Ensure exception is propagated to query initiator after async batch 
execution finished.
+        try {
+            insFut.get(TIMEOUT, TimeUnit.MILLISECONDS);
+
+            fail("Exception wasn't thrown");
+        }
+        catch (IgniteCheckedException expected) {
+            assertTrue(X.hasCause(expected, "Failed to INSERT some keys 
because they are already in cache",
+                IgniteSQLException.class));
+        }
+    }
+
+    /** */
+    private IgniteInternalFuture<?> lockKeyAndWaitForLatch(int key, 
CountDownLatch unlockLatch) throws Exception {
+        CountDownLatch lockLatch = new CountDownLatch(1);
+
+        IgniteInternalFuture<?> lockKeyFut = GridTestUtils.runAsync(() -> {
+            try (Transaction tx = client.transactions().txStart(PESSIMISTIC, 
READ_COMMITTED)) {
+                client.cache(TABLE_NAME).put(key, new Employer("Test", 0d));
+
+                lockLatch.countDown();
+
+                assertTrue(unlockLatch.await(TIMEOUT, TimeUnit.MILLISECONDS));
+
+                tx.commit();
+            }
+        });
+
+        assertTrue(lockLatch.await(TIMEOUT, TimeUnit.MILLISECONDS));
+
+        return lockKeyFut;
+    }
+}
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
 
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
index af08a923f51..f4e123c6a7c 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
@@ -94,6 +94,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.rules.JoinOrderOptimi
 import 
org.apache.ignite.internal.processors.query.calcite.rules.OrToUnionRuleTest;
 import 
org.apache.ignite.internal.processors.query.calcite.rules.ProjectScanMergeRuleTest;
 import 
org.apache.ignite.internal.processors.query.calcite.thin.MultiLineQueryTest;
+import org.apache.ignite.internal.processors.tx.TxThreadLockingTest;
 import 
org.apache.ignite.internal.processors.tx.TxWithExceptionalInterceptorTest;
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
@@ -180,7 +181,7 @@ import org.junit.runners.Suite;
     TxWithExceptionalInterceptorTest.class,
     UserDefinedTxAwareFunctionsIntegrationTest.class,
     CacheWithInterceptorIntegrationTest.class,
-    TxWithExceptionalInterceptorTest.class,
+    TxThreadLockingTest.class,
     SelectByKeyFieldTest.class,
 })
 public class IntegrationTestSuite {

Reply via email to