This is an automated email from the ASF dual-hosted git repository.
asf-gitbox-commits pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 45c02d66b23 IGNITE-28568 SQL Calcite: Fix sync tx key waiting for DML
operations - Fixes #13042.
45c02d66b23 is described below
commit 45c02d66b236d4d5cdb24209a3d8f1bc85532eac
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Thu Apr 23 16:41:24 2026 +0300
IGNITE-28568 SQL Calcite: Fix sync tx key waiting for DML operations -
Fixes #13042.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../query/calcite/exec/rel/ModifyNode.java | 83 ++++++--
.../calcite/exec/rel/AbstractExecutionTest.java | 5 +-
.../processors/tx/TxThreadLockingTest.java | 219 +++++++++++++++++++++
.../ignite/testsuites/IntegrationTestSuite.java | 3 +-
4 files changed, 291 insertions(+), 19 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
index 76220c84251..fcf2b8ebd33 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
@@ -29,6 +29,8 @@ import javax.cache.processor.MutableEntry;
import org.apache.calcite.rel.core.TableModify;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cache.context.SessionContextImpl;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
@@ -50,6 +52,12 @@ import static
org.apache.ignite.internal.processors.query.QueryUtils.cacheForDML
*
*/
public class ModifyNode<Row> extends AbstractNode<Row> implements
SingleNode<Row>, Downstream<Row> {
+ /**
+ * Timeout to wait for async invoke operation to complete. In case this
timeout exceeded, release thread and
+ * continue execution after invoke operation finished.
+ */
+ private static final long INVOKE_TIMEOUT = 100;
+
/** */
protected final CacheTableDescriptor desc;
@@ -77,6 +85,9 @@ public class ModifyNode<Row> extends AbstractNode<Row>
implements SingleNode<Row
/** */
private State state = State.UPDATING;
+ /** */
+ private IgniteInternalFuture<Map<Object, EntryProcessorResult<Long>>>
invokeFut;
+
/**
* @param ctx Execution context.
* @param desc Table descriptor.
@@ -119,22 +130,23 @@ public class ModifyNode<Row> extends AbstractNode<Row>
implements SingleNode<Row
waiting--;
- switch (op) {
- case DELETE:
- case UPDATE:
- case INSERT:
- case MERGE:
- tuples.add(desc.toTuple(context(), row, op, cols));
+ tuples.add(desc.toTuple(context(), row, op, cols));
- flushTuples(false);
+ if (invokeFut != null) // Still waiting for previous invocation result.
+ return;
- break;
- default:
- throw new UnsupportedOperationException(op.name());
- }
+ flushTuples(false);
- if (waiting == 0)
- source().request(waiting = MODIFY_BATCH_SIZE);
+ if (invokeFut != null) {
+ invokeFut.listen(f -> {
+ // Push new task to execute in correct thread.
+ context().execute(() -> {
+ processInvokeResult(f.get());
+
+ flushTuples(false);
+ }, this::onError);
+ });
+ }
}
/** {@inheritDoc} */
@@ -171,8 +183,27 @@ public class ModifyNode<Row> extends AbstractNode<Row>
implements SingleNode<Row
source().request(waiting = MODIFY_BATCH_SIZE);
if (state == State.UPDATED && requested > 0) {
+ if (invokeFut != null) { // Still waiting for previous invocation
result.
+ invokeFut.listen(f -> context().execute(this::tryEnd,
this::onError));
+
+ return;
+ }
+
flushTuples(true);
+ if (invokeFut != null) {
+ invokeFut.listen(f -> {
+ // Push new task to execute in correct thread.
+ context().execute(() -> {
+ processInvokeResult(f.get());
+
+ tryEnd();
+ }, this::onError);
+ });
+
+ return;
+ }
+
state = State.END;
inLoop = true;
@@ -193,7 +224,7 @@ public class ModifyNode<Row> extends AbstractNode<Row>
implements SingleNode<Row
/** */
@SuppressWarnings("unchecked")
- private void flushTuples(boolean force) throws IgniteCheckedException {
+ private void flushTuples(boolean force) throws Exception {
if (F.isEmpty(tuples) || !force && tuples.size() < MODIFY_BATCH_SIZE)
return;
@@ -225,7 +256,7 @@ public class ModifyNode<Row> extends AbstractNode<Row>
implements SingleNode<Row
private void invokeOutsideTransaction(
List<ModifyTuple> tuples,
IgniteInternalCache<Object, Object> cache
- ) throws IgniteCheckedException {
+ ) throws Exception {
SessionContextImpl sesCtx = context().unwrap(SessionContextImpl.class);
Map<String, String> sesAttrs = sesCtx == null ? null :
sesCtx.attributes();
@@ -233,7 +264,22 @@ public class ModifyNode<Row> extends AbstractNode<Row>
implements SingleNode<Row
cache = cache.withApplicationAttributes(sesAttrs);
Map<Object, EntryProcessor<Object, Object, Long>> map =
invokeMap(tuples);
- Map<Object, EntryProcessorResult<Long>> res =
cacheForDML(cache).invokeAll(map);
+ invokeFut = cacheForDML(cache).invokeAllAsync(map);
+
+ try {
+ // Shortcut - give a chance for operation to be executed in sync
mode (it will simplify workflow).
+ Map<Object, EntryProcessorResult<Long>> res =
invokeFut.get(INVOKE_TIMEOUT);
+
+ processInvokeResult(res);
+ }
+ catch (IgniteFutureTimeoutCheckedException ignore) {
+ // No-op. Result processing task will be scheduled by caller if
invokeFut != null.
+ }
+ }
+
+ /** */
+ private void processInvokeResult(Map<Object, EntryProcessorResult<Long>>
res) throws Exception {
+ invokeFut = null;
long updated =
res.values().stream().mapToLong(EntryProcessorResult::get).sum();
@@ -247,6 +293,9 @@ public class ModifyNode<Row> extends AbstractNode<Row>
implements SingleNode<Row
}
updatedRows += updated;
+
+ if (waiting == 0)
+ source().request(waiting = MODIFY_BATCH_SIZE);
}
/**
@@ -260,7 +309,7 @@ public class ModifyNode<Row> extends AbstractNode<Row>
implements SingleNode<Row
List<ModifyTuple> tuples,
IgniteInternalCache<Object, Object> cache,
GridNearTxLocal userTx
- ) throws IgniteCheckedException {
+ ) throws Exception {
userTx.resume();
try {
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
index e67cd18885e..b7f5880259b 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
@@ -87,7 +87,10 @@ public class AbstractExecutionTest extends
GridCommonAbstractTest {
protected static final String PARAMS_STRING = "Task executor = {0},
Execution strategy = {1}";
/** */
- protected static final int IN_BUFFER_SIZE = AbstractNode.IN_BUFFER_SIZE;
+ public static final int IN_BUFFER_SIZE = AbstractNode.IN_BUFFER_SIZE;
+
+ /** */
+ public static final int MODIFY_BATCH_SIZE = AbstractNode.MODIFY_BATCH_SIZE;
/** */
private Throwable lastE;
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/tx/TxThreadLockingTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/tx/TxThreadLockingTest.java
new file mode 100644
index 00000000000..6c451ac68db
--- /dev/null
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/tx/TxThreadLockingTest.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.tx;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import
org.apache.ignite.internal.processors.query.calcite.exec.rel.AbstractExecutionTest;
+import
org.apache.ignite.internal.processors.query.calcite.integration.AbstractBasicIntegrationTest;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR;
+import static
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static
org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+
+/** */
+@RunWith(Parameterized.class)
+public class TxThreadLockingTest extends AbstractBasicIntegrationTest {
+ /** */
+ private static final long TIMEOUT = 10_000L;
+
+ /** */
+ private static final int POOL_SIZE = 10;
+
+ /** */
+ private static final int IN_BUFFER_SIZE =
AbstractExecutionTest.IN_BUFFER_SIZE;
+
+ /** */
+ private static final int MODIFY_BATCH_SIZE =
AbstractExecutionTest.MODIFY_BATCH_SIZE;
+
+ /** Use query blocking executor. */
+ @Parameterized.Parameter(0)
+ public boolean qryBlockingExecutor;
+
+ /** */
+ @Parameterized.Parameters(name = "qryBlockingExecutor={0}")
+ public static Collection<?> parameters() {
+ return List.of(false, true);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ System.setProperty(IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR,
String.valueOf(qryBlockingExecutor));
+
+ startGrid(0);
+
+ client = startClientGrid();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ System.clearProperty(IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ return
super.getConfiguration(igniteInstanceName).setQueryThreadPoolSize(POOL_SIZE);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected <K, V> CacheConfiguration<K, V> cacheConfiguration() {
+ return super.<K,
V>cacheConfiguration().setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+ }
+
+ /** */
+ @Test
+ public void testThreadsLockingByDml() throws Exception {
+ createAndPopulateTable();
+
+ CountDownLatch unlockLatch = new CountDownLatch(1);
+
+ IgniteInternalFuture<?> lockKeyFut = lockKeyAndWaitForLatch(0,
unlockLatch);
+
+ IgniteInternalFuture<?> updFut =
GridTestUtils.runMultiThreadedAsync(() -> {
+ sql("UPDATE person SET salary = 1 WHERE id = 0");
+ }, POOL_SIZE + 1, "update-thread");
+
+ // Ensure update queries are blocked.
+ assertFalse(GridTestUtils.waitForCondition(updFut::isDone, 200));
+
+ IgniteInternalFuture<?> selectFut =
GridTestUtils.runMultiThreadedAsync(() -> {
+ for (int i = 0; i < 100; i++)
+ sql("SELECT * FROM person WHERE id = 1");
+ }, POOL_SIZE + 1, "select-thread");
+
+ // Ensure other queries are not blocked by update queries.
+ selectFut.get(TIMEOUT, TimeUnit.MILLISECONDS);
+
+ unlockLatch.countDown();
+
+ lockKeyFut.get();
+
+ // Ensure update queries are released.
+ updFut.get(TIMEOUT, TimeUnit.MILLISECONDS);
+ }
+
+ /** */
+ @Test
+ public void testDifferentBlockedBatchSize() throws Exception {
+ assertTrue("Unexpected constants [MODIFY_BATCH_SIZE=" +
MODIFY_BATCH_SIZE +
+ ", IN_BUFFER_SIZE=" + IN_BUFFER_SIZE + ']', MODIFY_BATCH_SIZE <
IN_BUFFER_SIZE);
+
+ createAndPopulateTable();
+
+ IgniteCache<Integer, Employer> cache = client.cache(TABLE_NAME);
+
+ for (int i = 0; i < IN_BUFFER_SIZE + 1; i++)
+ cache.put(i, new Employer("Test" + i, (double)i));
+
+ for (int size : new int[] {MODIFY_BATCH_SIZE, MODIFY_BATCH_SIZE + 1,
IN_BUFFER_SIZE, IN_BUFFER_SIZE + 1}) {
+ log.info("Blocked batch size: " + size);
+
+ CountDownLatch unlockLatch = new CountDownLatch(1);
+
+ IgniteInternalFuture<?> lockKeyFut = lockKeyAndWaitForLatch(0,
unlockLatch);
+
+ IgniteInternalFuture<List<List<?>>> updFut =
GridTestUtils.runAsync(() ->
+ sql("UPDATE person SET salary = 1 WHERE id < ?", size));
+
+ // Ensure update query is blocked.
+ assertFalse(GridTestUtils.waitForCondition(updFut::isDone, 200));
+
+ unlockLatch.countDown();
+
+ lockKeyFut.get();
+
+ List<List<?>> res = updFut.get(TIMEOUT, TimeUnit.MILLISECONDS);
+
+ assertEquals((long)size, res.get(0).get(0));
+ }
+ }
+
+ /** */
+ @Test
+ public void testErrorAfterAsyncWait() throws Exception {
+ createAndPopulateTable();
+
+ CountDownLatch unlockLatch = new CountDownLatch(1);
+
+ IgniteInternalFuture<?> lockKeyFut = lockKeyAndWaitForLatch(0,
unlockLatch);
+
+ IgniteInternalFuture<List<List<?>>> insFut = GridTestUtils.runAsync(()
->
+ sql("INSERT INTO person (id, name, salary) VALUES (0, 'Test',
0)"));
+
+ // Ensure insert query is blocked.
+ assertFalse(GridTestUtils.waitForCondition(insFut::isDone, 200));
+
+ unlockLatch.countDown();
+
+ lockKeyFut.get();
+
+ // Ensure exception is propagated to query initiator after async batch
execution finished.
+ try {
+ insFut.get(TIMEOUT, TimeUnit.MILLISECONDS);
+
+ fail("Exception wasn't thrown");
+ }
+ catch (IgniteCheckedException expected) {
+ assertTrue(X.hasCause(expected, "Failed to INSERT some keys
because they are already in cache",
+ IgniteSQLException.class));
+ }
+ }
+
+ /** */
+ private IgniteInternalFuture<?> lockKeyAndWaitForLatch(int key,
CountDownLatch unlockLatch) throws Exception {
+ CountDownLatch lockLatch = new CountDownLatch(1);
+
+ IgniteInternalFuture<?> lockKeyFut = GridTestUtils.runAsync(() -> {
+ try (Transaction tx = client.transactions().txStart(PESSIMISTIC,
READ_COMMITTED)) {
+ client.cache(TABLE_NAME).put(key, new Employer("Test", 0d));
+
+ lockLatch.countDown();
+
+ assertTrue(unlockLatch.await(TIMEOUT, TimeUnit.MILLISECONDS));
+
+ tx.commit();
+ }
+ });
+
+ assertTrue(lockLatch.await(TIMEOUT, TimeUnit.MILLISECONDS));
+
+ return lockKeyFut;
+ }
+}
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
index af08a923f51..f4e123c6a7c 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
@@ -94,6 +94,7 @@ import
org.apache.ignite.internal.processors.query.calcite.rules.JoinOrderOptimi
import
org.apache.ignite.internal.processors.query.calcite.rules.OrToUnionRuleTest;
import
org.apache.ignite.internal.processors.query.calcite.rules.ProjectScanMergeRuleTest;
import
org.apache.ignite.internal.processors.query.calcite.thin.MultiLineQueryTest;
+import org.apache.ignite.internal.processors.tx.TxThreadLockingTest;
import
org.apache.ignite.internal.processors.tx.TxWithExceptionalInterceptorTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
@@ -180,7 +181,7 @@ import org.junit.runners.Suite;
TxWithExceptionalInterceptorTest.class,
UserDefinedTxAwareFunctionsIntegrationTest.class,
CacheWithInterceptorIntegrationTest.class,
- TxWithExceptionalInterceptorTest.class,
+ TxThreadLockingTest.class,
SelectByKeyFieldTest.class,
})
public class IntegrationTestSuite {