This is an automated email from the ASF dual-hosted git repository.

zstan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 15f21221355 IGNITE-28352 Calcite. User defined sql function miss 
entries are written under the same tx lock (#12936)
15f21221355 is described below

commit 15f21221355904003ac41b7a6d6f29e2fa546ce2
Author: Evgeniy Stanilovskiy <[email protected]>
AuthorDate: Wed Apr 8 08:15:44 2026 +0300

    IGNITE-28352 Calcite. User defined sql function miss entries are written 
under the same tx lock (#12936)
---
 .../query/calcite/exec/ExecutionContext.java       |  10 +-
 ...UserDefinedTxAwareFunctionsIntegrationTest.java | 212 +++++++++++++++++++++
 .../query/calcite/planner/PlanExecutionTest.java   |   3 +-
 .../ignite/testsuites/IntegrationTestSuite.java    |   2 +
 4 files changed, 225 insertions(+), 2 deletions(-)

diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
index 1e7bda269da..41a477ef083 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
@@ -145,6 +145,9 @@ public class ExecutionContext<Row> extends 
AbstractQueryContext implements DataC
     /** */
     private Object[] correlations = new Object[16];
 
+    /** Entries holder per execution thread. */
+    private static final ThreadLocal<Collection<QueryTxEntry>> txEntriesHolder 
= new ThreadLocal<>();
+
     /**
      * @param qctx Parent base query context.
      * @param qryId Query ID.
@@ -182,7 +185,7 @@ public class ExecutionContext<Row> extends 
AbstractQueryContext implements DataC
         this.ioTracker = ioTracker;
         this.params = params;
         this.timeout = timeout;
-        this.qryTxEntries = qryTxEntries;
+        this.qryTxEntries = qryTxEntries == null ? txEntriesHolder.get() : 
qryTxEntries;
 
         startTs = U.currentTimeMillis();
 
@@ -421,12 +424,17 @@ public class ExecutionContext<Row> extends 
AbstractQueryContext implements DataC
 
         executor.execute(qryId, fragmentId(), () -> {
             try {
+                txEntriesHolder.set(qryTxEntries);
+
                 if (!isCancelled())
                     task.run();
             }
             catch (Throwable e) {
                 onError.accept(e);
             }
+            finally {
+                txEntriesHolder.remove();
+            }
         });
     }
 
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/UserDefinedTxAwareFunctionsIntegrationTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/UserDefinedTxAwareFunctionsIntegrationTest.java
new file mode 100644
index 00000000000..e9824582388
--- /dev/null
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/UserDefinedTxAwareFunctionsIntegrationTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
+import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.Test;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static 
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR;
+import static 
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static 
org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Integration test for user defined functions with tx aware.
+ */
+@WithSystemProperty(key = IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR, 
value = "true")
+public class UserDefinedTxAwareFunctionsIntegrationTest extends 
AbstractBasicIntegrationTest {
+    /** */
+    private static final int THREAD_NUM = 10;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.getSqlConfiguration().setQueryEnginesConfiguration(new 
CalciteQueryEngineConfiguration());
+        cfg.getTransactionConfiguration().setTxAwareQueriesEnabled(true);
+        cfg.setQueryThreadPoolSize(2 * THREAD_NUM + 1);
+
+        return cfg;
+    }
+
+    /** Check tx aware UDF execution results. */
+    @Test
+    public void testTxAwareUserDefinedFunc() {
+        assertTrue(nodeCount() > 1);
+        int nodeCnt = nodeCount();
+
+        List<List<Object>> refResults = new ArrayList<>();
+
+        IgniteCache<Integer, Object> cache = 
client.getOrCreateCache(cacheConfig());
+
+        refResults.add(List.of(0, Integer.toString(0)));
+        // Insert outside tx.
+        cache.query(new SqlFieldsQuery("INSERT INTO Employer(id, name) VALUES 
(?, ?)").setArgs(0, 0)).getAll();
+
+        try (Transaction tx = client.transactions().txStart(PESSIMISTIC, 
READ_COMMITTED)) {
+            for (int i = 1; i < 2 * nodeCnt; ++i) {
+                refResults.add(List.of(i, Integer.toString(i)));
+                cache.query(new SqlFieldsQuery("INSERT INTO Employer(id, name) 
VALUES (?, ?)").setArgs(i, i)).getAll();
+            }
+
+            // Simple select without UDF.
+            List<List<?>> selectResult = cache
+                .query(new SqlFieldsQuery("SELECT id, name FROM Employer ORDER 
BY id"))
+                .getAll();
+
+            assertThat(selectResult, equalTo(refResults));
+
+            // Select with UDF.
+            List<List<?>> res = cache.query(new SqlFieldsQuery("SELECT 
customTableFunc() AS result")).getAll();
+
+            assertThat(res.get(0).get(0), equalTo(refResults));
+
+            // Select with nested UDF.
+            res = cache.query(new SqlFieldsQuery("SELECT 
customNestedTableFunc() AS result")).getAll();
+
+            assertThat(res.get(0).get(0), equalTo(refResults));
+
+            // UDF participate in DML.
+            cache.query(new SqlFieldsQuery("INSERT INTO Employer(id, name) 
VALUES (100, nameAsStr(1))")).getAll();
+
+            res = cache.query(new SqlFieldsQuery("SELECT name FROM Employer 
WHERE id = 100")).getAll();
+
+            assertEquals("1", res.get(0).get(0));
+
+            for (int i = 0; i < 2 * nodeCnt; ++i) {
+                // A bit different case of UDF.
+                List<List<?>> res1 = cache.query(new SqlFieldsQuery("SELECT 
nameTableFunc(?) AS result").setArgs(i)).getAll();
+
+                List<List<?>> res2 = (List<List<?>>)res1.get(0).get(0);
+
+                assertThat(res2.get(0).get(0), equalTo(Integer.toString(i)));
+            }
+
+            tx.commit();
+        }
+    }
+
+    /** */
+    @Test
+    public void testIsolationCorrectnessWithUdf() throws 
IgniteCheckedException {
+        assertTrue(nodeCount() > 1);
+        int nodeCnt = nodeCount();
+
+        IgniteCache<Integer, Object> cache = 
client.getOrCreateCache(cacheConfig());
+
+        /* The pool size should be greater than the maximum number of 
concurrent queries initiated by UDFs. */
+        IgniteInternalFuture<Long> fut = 
GridTestUtils.runMultiThreadedAsync(() -> {
+            for (int iter = 0; iter < 10; ++iter) {
+                try (Transaction tx = 
client.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) {
+                    List<List<Object>> refResults = new ArrayList<>();
+
+                    for (int i = 0; i < 2 * nodeCnt; ++i) {
+                        refResults.add(List.of(i, Integer.toString(i)));
+                        cache.query(new SqlFieldsQuery("INSERT INTO 
Employer(id, name) VALUES (?, ?)").setArgs(i, i)).getAll();
+                    }
+
+                    List<List<?>> res = cache.query(new SqlFieldsQuery("SELECT 
customNestedTableFunc() AS result")).getAll();
+
+                    assertThat(res.get(0).get(0), equalTo(refResults));
+
+                    tx.rollback();
+                }
+            }
+        }, THREAD_NUM, "calcite-tx-with-udf");
+
+        fut.get(30_000);
+    }
+
+    /** */
+    private CacheConfiguration<Integer, Object> cacheConfig() {
+        return this.<Integer, Object>cacheConfiguration()
+            .setName(DEFAULT_CACHE_NAME)
+            .setQueryEntities(F.asList(new QueryEntity(Integer.class, 
Employer.class)
+                .setTableName("Employer")
+                .addQueryField("ID", Integer.class.getName(), null)
+                .setKeyFieldName("ID")
+            ))
+            .setSqlFunctionClasses(InnerSqlFunctionsLibrary.class)
+            .setAtomicityMode(TRANSACTIONAL);
+    }
+
+    /** */
+    public static class InnerSqlFunctionsLibrary {
+        /** */
+        @QuerySqlFunction
+        public List<List<?>> customTableFunc() {
+            Ignite ignite = Ignition.localIgnite();
+
+            return ignite.cache(DEFAULT_CACHE_NAME)
+                .query(new SqlFieldsQuery("SELECT id, name FROM Employer ORDER 
BY id"))
+                .getAll();
+        }
+
+        /** */
+        @QuerySqlFunction
+        public List<List<?>> customNestedTableFunc() {
+            Ignite ignite = Ignition.localIgnite();
+
+            Object res = ignite.cache(DEFAULT_CACHE_NAME)
+                .query(new SqlFieldsQuery("SELECT customTableFunc() AS 
result"))
+                .getAll().get(0).get(0);
+
+            return (List<List<?>>)res;
+        }
+
+        /** */
+        @QuerySqlFunction
+        public static List<List<?>> nameTableFunc(int id) {
+            Ignite ignite = Ignition.localIgnite();
+
+            return ignite.cache(DEFAULT_CACHE_NAME)
+                .query(new SqlFieldsQuery("SELECT name FROM Employer WHERE id 
= ?").setArgs(id))
+                .getAll();
+        }
+
+        /** */
+        @QuerySqlFunction
+        public static String nameAsStr(int id) {
+            Ignite ignite = Ignition.localIgnite();
+
+            List<List<?>> res = ignite.cache(DEFAULT_CACHE_NAME)
+                .query(new SqlFieldsQuery("SELECT name FROM Employer WHERE id 
= ?").setArgs(id))
+                .getAll();
+
+            return (String)res.get(0).get(0);
+        }
+    }
+}
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlanExecutionTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlanExecutionTest.java
index 00eb593739e..11483e74181 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlanExecutionTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlanExecutionTest.java
@@ -353,7 +353,8 @@ public class PlanExecutionTest extends AbstractPlannerTest {
             NoOpIoTracker.INSTANCE,
             0,
             Commons.parametersMap(ctx.parameters()),
-            null);
+            null
+        );
 
         return new LogicalRelImplementor<>(ectx, c -> r -> 0, mailboxRegistry, 
exchangeSvc,
             new TestFailureProcessor(kernal)).go(fragment.root());
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
 
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
index 6558537af07..af08a923f51 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
@@ -84,6 +84,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.integration.UnstableT
 import 
org.apache.ignite.internal.processors.query.calcite.integration.UserDdlIntegrationTest;
 import 
org.apache.ignite.internal.processors.query.calcite.integration.UserDefinedFunctionsIntegrationTest;
 import 
org.apache.ignite.internal.processors.query.calcite.integration.UserDefinedFunctionsIntegrationTransactionalTest;
+import 
org.apache.ignite.internal.processors.query.calcite.integration.UserDefinedTxAwareFunctionsIntegrationTest;
 import 
org.apache.ignite.internal.processors.query.calcite.integration.ViewsIntegrationTest;
 import 
org.apache.ignite.internal.processors.query.calcite.integration.tpch.TpchScale001Test;
 import 
org.apache.ignite.internal.processors.query.calcite.integration.tpch.TpchScale010Test;
@@ -177,6 +178,7 @@ import org.junit.runners.Suite;
     CacheStoreTest.class,
     MultiDcQueryMappingTest.class,
     TxWithExceptionalInterceptorTest.class,
+    UserDefinedTxAwareFunctionsIntegrationTest.class,
     CacheWithInterceptorIntegrationTest.class,
     TxWithExceptionalInterceptorTest.class,
     SelectByKeyFieldTest.class,

Reply via email to