This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new c24e15f3255 IGNITE-28502 SQL Calcite: Add deadlock detection for 
queries initiated by UDF - Fixes #13004.
c24e15f3255 is described below

commit c24e15f3255e2eb1d731a9aac18e55a9814b1ec0
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Tue Apr 14 17:20:34 2026 +0300

    IGNITE-28502 SQL Calcite: Add deadlock detection for queries initiated by 
UDF - Fixes #13004.
    
    Signed-off-by: Aleksey Plekhanov <[email protected]>
---
 .../query/calcite/CalciteQueryProcessor.java       |  29 --
 .../query/calcite/exec/ExecutionServiceImpl.java   | 395 ++++++++++++---------
 .../integration/AbstractBasicIntegrationTest.java  |  14 +-
 .../integration/SqlDiagnosticIntegrationTest.java  | 114 +++---
 4 files changed, 298 insertions(+), 254 deletions(-)

diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
index 015ae25b6b9..ca6fbd6f390 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
@@ -82,7 +82,6 @@ import 
org.apache.ignite.internal.processors.query.calcite.exec.QueryTaskExecuto
 import org.apache.ignite.internal.processors.query.calcite.exec.TimeoutService;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.TimeoutServiceImpl;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.RexExecutorImpl;
-import 
org.apache.ignite.internal.processors.query.calcite.exec.task.AbstractQueryTaskExecutor;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.task.QueryBlockingTaskExecutor;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.task.StripedQueryTaskExecutor;
 import org.apache.ignite.internal.processors.query.calcite.hint.HintsConfig;
@@ -272,9 +271,6 @@ public class CalciteQueryProcessor extends 
GridProcessorAdapter implements Query
     /** */
     private final InjectResourcesService injectSvc;
 
-    /** */
-    private final AtomicBoolean udfQryWarned = new AtomicBoolean();
-
     /** */
     private volatile boolean started;
 
@@ -545,8 +541,6 @@ public class CalciteQueryProcessor extends 
GridProcessorAdapter implements Query
     ) throws IgniteSQLException {
         ensureTransactionModeSupported(qryCtx);
 
-        checkUdfQuery();
-
         SchemaPlus schema = schemaHolder.schema(schemaName);
 
         assert schema != null : "Schema not found: " + schemaName;
@@ -704,29 +698,6 @@ public class CalciteQueryProcessor extends 
GridProcessorAdapter implements Query
         
IgniteTxManager.ensureTransactionModeSupported(ctx.cache().context().tm().tx(ver).isolation());
     }
 
-    /** Checks that query is initiated by UDF and print message to log if 
needed. */
-    private void checkUdfQuery() {
-        if (udfQryWarned.get())
-            return;
-
-        if 
(Thread.currentThread().getName().startsWith(AbstractQueryTaskExecutor.THREAD_PREFIX)
-            && udfQryWarned.compareAndSet(false, true)) {
-            if (taskExecutor instanceof QueryBlockingTaskExecutor) {
-                log.info("Detected query initiated by user-defined function. " 
+
-                    "In some circumstances, this can lead to thread pool 
starvation and deadlock. Ensure that " +
-                    "the pool size is properly configured (property 
IgniteConfiguration.QueryThreadPoolSize). " +
-                    "The pool size should be greater than the maximum number 
of concurrent queries initiated by UDFs.");
-            }
-            else {
-                log.warning("Detected query initiated by user-defined 
function. " +
-                    "When a striped query task executor (the default 
configuration) is used, tasks for such queries " +
-                    "can be assigned to the same thread as that held by the 
initial query, which can lead to a " +
-                    "deadlock. To switch to a blocking tasks executor, set the 
following parameter: " +
-                    "-DIGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR=true.");
-            }
-        }
-    }
-
     /** */
     private <T> T processQuery(
         @Nullable QueryContext qryCtx,
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index 497520f96ef..24e3f8002c0 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -22,6 +22,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.calcite.plan.Context;
@@ -66,6 +67,8 @@ import 
org.apache.ignite.internal.processors.query.calcite.exec.ddl.DdlCommandHa
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.Inbox;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.Node;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.Outbox;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.task.AbstractQueryTaskExecutor;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.task.QueryBlockingTaskExecutor;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.tracker.GlobalMemoryTracker;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.tracker.IoTracker;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.tracker.MemoryTracker;
@@ -202,6 +205,9 @@ public class ExecutionServiceImpl<Row> extends 
AbstractService implements Execut
     /** */
     private InjectResourcesService injectSvc;
 
+    /** Limit for nested queries, initiated by UDF. */
+    private final AtomicInteger udfQryLimit = new AtomicInteger();
+
     /** */
     private final Map<String, FragmentPlan> fragmentPlanCache = new 
GridBoundedConcurrentLinkedHashMap<>(1024);
 
@@ -465,6 +471,8 @@ public class ExecutionServiceImpl<Row> extends 
AbstractService implements Execut
         memoryTracker = cfg.getGlobalMemoryQuota() > 0 ? new 
GlobalMemoryTracker(cfg.getGlobalMemoryQuota()) :
             NoOpMemoryTracker.INSTANCE;
 
+        udfQryLimit.set(ctx.config().getQueryThreadPoolSize() - 1);
+
         init();
     }
 
@@ -574,6 +582,38 @@ public class ExecutionServiceImpl<Row> extends 
AbstractService implements Execut
         }
     }
 
+    /**
+     * Checks that query is initiated by UDF.
+     *
+     * @return {@code True} if query is initiated by UDF (in this case UDF 
query limit is affected).
+     * @throws IgniteSQLException If query execution can lead to deadlocks.
+     */
+    private boolean checkUdfQuery() {
+        if 
(Thread.currentThread().getName().startsWith(AbstractQueryTaskExecutor.THREAD_PREFIX))
 {
+            if (taskExecutor instanceof QueryBlockingTaskExecutor) {
+                if (udfQryLimit.getAndDecrement() <= 0) {
+                    udfQryLimit.getAndIncrement();
+
+                    throw new IgniteSQLException("Detected thread pool 
starvation by queries initiated by " +
+                        "user-defined functions. Starting more queries from 
UDF will lead to deadlock. Ensure that " +
+                        "the pool size is properly configured (property 
IgniteConfiguration.QueryThreadPoolSize). " +
+                        "The pool size should be greater than the maximum 
number of concurrent queries initiated by UDFs.");
+                }
+
+                return true;
+            }
+            else {
+                throw new IgniteSQLException("Detected query initiated by 
user-defined function. " +
+                    "When a striped query task executor (the default 
configuration) is used, tasks for such queries " +
+                    "can be assigned to the same thread as that held by the 
initial query, which can lead to a " +
+                    "deadlock. To avoid deadlocks switch to a blocking tasks 
executor (set the parameter: " +
+                    "-DIGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR=true)");
+            }
+        }
+
+        return false;
+    }
+
     /** */
     private ListFieldsQueryCursor<?> mapAndExecutePlan(
         RootQuery<Row> qry,
@@ -594,207 +634,218 @@ public class ExecutionServiceImpl<Row> extends 
AbstractService implements Execut
                 checkPermissions(fragment.root());
         }
 
-        // Local execution
-        Fragment fragment = F.first(fragments);
+        boolean udfQry = checkUdfQuery();
+
+        try {
+            // Local execution
+            Fragment fragment = F.first(fragments);
 
-        if (U.assertionsEnabled()) {
-            assert fragment != null;
+            if (U.assertionsEnabled()) {
+                assert fragment != null;
 
-            FragmentMapping mapping = execPlan.mapping(fragment);
+                FragmentMapping mapping = execPlan.mapping(fragment);
 
-            assert mapping != null;
+                assert mapping != null;
 
-            List<UUID> nodes = mapping.nodeIds();
+                List<UUID> nodes = mapping.nodeIds();
 
-            assert nodes != null && (nodes.size() == 1 && 
F.first(nodes).equals(localNodeId()) || nodes.isEmpty())
+                assert nodes != null && (nodes.size() == 1 && 
F.first(nodes).equals(localNodeId()) || nodes.isEmpty())
                     : "nodes=" + nodes + ", localNode=" + localNodeId();
-        }
+            }
 
-        long timeout = qry.remainingTime();
+            long timeout = qry.remainingTime();
 
-        if (timeout == 0) {
-            throw new IgniteSQLException("The query was cancelled due to 
timeout", IgniteQueryErrorCode.QUERY_CANCELED,
-                new QueryCancelledException());
-        }
+            if (timeout == 0) {
+                throw new IgniteSQLException("The query was cancelled due to 
timeout", IgniteQueryErrorCode.QUERY_CANCELED,
+                    new QueryCancelledException());
+            }
 
-        FragmentDescription fragmentDesc = new FragmentDescription(
-            fragment.fragmentId(),
-            execPlan.mapping(fragment),
-            execPlan.target(fragment),
-            execPlan.remotes(fragment));
-
-        MemoryTracker qryMemoryTracker = 
qry.createMemoryTracker(memoryTracker, cfg.getQueryMemoryQuota());
-
-        final GridNearTxLocal userTx = Commons.queryTransaction(qry.context(), 
ctx.cache().context());
-
-        ExecutionContext<Row> ectx = new ExecutionContext<>(
-            qry.context(),
-            taskExecutor(),
-            injectSvc,
-            qry.id(),
-            locNodeId,
-            locNodeId,
-            mapCtx.topologyVersion(),
-            fragmentDesc,
-            handler,
-            qryMemoryTracker,
-            createIoTracker(locNodeId, qry.localQueryId()),
-            timeout,
-            qryParams,
-            userTx == null ? null : 
ExecutionContext.transactionChanges(userTx.writeEntries()));
-
-        Node<Row> node = new LogicalRelImplementor<>(ectx, partitionService(), 
mailboxRegistry(),
-            exchangeService(), failureProcessor()).go(fragment.root());
-
-        qry.run(ectx, execPlan, plan.fieldsMetadata(), node);
-
-        Map<UUID, Long> fragmentsPerNode = fragments.stream()
-            .skip(1)
-            .flatMap(f -> f.mapping().nodeIds().stream())
-            .collect(Collectors.groupingBy(Function.identity(), 
Collectors.counting()));
-
-        // Start remote execution.
-        for (int i = 1; i < fragments.size(); i++) {
-            fragment = fragments.get(i);
-            fragmentDesc = new FragmentDescription(
+            FragmentDescription fragmentDesc = new FragmentDescription(
                 fragment.fragmentId(),
                 execPlan.mapping(fragment),
                 execPlan.target(fragment),
                 execPlan.remotes(fragment));
 
-            Throwable ex = null;
-            byte[] parametersMarshalled = null;
-
-            for (UUID nodeId : fragmentDesc.nodeIds()) {
-                if (ex != null)
-                    qry.onResponse(nodeId, fragment.fragmentId(), ex);
-                else {
-                    try {
-                        SessionContextImpl sesCtx = 
qry.context().unwrap(SessionContextImpl.class);
-
-                        QueryProperties props = 
qry.context().unwrap(QueryProperties.class);
-                        boolean keepBinaryMode = props == null || 
props.keepBinary();
-
-                        QueryStartRequest req = new QueryStartRequest(
-                            qry.id(),
-                            qry.localQueryId(),
-                            qry.context().schemaName(),
-                            fragment.serialized(),
-                            ectx.topologyVersion(),
-                            fragmentDesc,
-                            fragmentsPerNode.get(nodeId).intValue(),
-                            qry.parameters(),
-                            parametersMarshalled,
-                            timeout,
-                            ectx.getQryTxEntries(),
-                            sesCtx == null ? null : sesCtx.attributes(),
-                            keepBinaryMode
-                        );
-
-                        messageService().send(nodeId, req);
-
-                        // Avoid marshaling of the same parameters for other 
nodes.
-                        if (parametersMarshalled == null)
-                            parametersMarshalled = req.parametersMarshalled();
-                    }
-                    catch (Throwable e) {
-                        qry.onResponse(nodeId, fragment.fragmentId(), ex = e);
+            MemoryTracker qryMemoryTracker = 
qry.createMemoryTracker(memoryTracker, cfg.getQueryMemoryQuota());
+
+            final GridNearTxLocal userTx = 
Commons.queryTransaction(qry.context(), ctx.cache().context());
+
+            ExecutionContext<Row> ectx = new ExecutionContext<>(
+                qry.context(),
+                taskExecutor(),
+                injectSvc,
+                qry.id(),
+                locNodeId,
+                locNodeId,
+                mapCtx.topologyVersion(),
+                fragmentDesc,
+                handler,
+                qryMemoryTracker,
+                createIoTracker(locNodeId, qry.localQueryId()),
+                timeout,
+                qryParams,
+                userTx == null ? null : 
ExecutionContext.transactionChanges(userTx.writeEntries()));
+
+            Node<Row> node = new LogicalRelImplementor<>(ectx, 
partitionService(), mailboxRegistry(),
+                exchangeService(), failureProcessor()).go(fragment.root());
+
+            qry.run(ectx, execPlan, plan.fieldsMetadata(), node);
+
+            Map<UUID, Long> fragmentsPerNode = fragments.stream()
+                .skip(1)
+                .flatMap(f -> f.mapping().nodeIds().stream())
+                .collect(Collectors.groupingBy(Function.identity(), 
Collectors.counting()));
+
+            QueryProperties qryProps = 
qry.context().unwrap(QueryProperties.class);
+            boolean keepBinary = qryProps == null || qryProps.keepBinary();
+
+            // Start remote execution.
+            for (int i = 1; i < fragments.size(); i++) {
+                fragment = fragments.get(i);
+                fragmentDesc = new FragmentDescription(
+                    fragment.fragmentId(),
+                    execPlan.mapping(fragment),
+                    execPlan.target(fragment),
+                    execPlan.remotes(fragment));
+
+                Throwable ex = null;
+                byte[] parametersMarshalled = null;
+
+                for (UUID nodeId : fragmentDesc.nodeIds()) {
+                    if (ex != null)
+                        qry.onResponse(nodeId, fragment.fragmentId(), ex);
+                    else {
+                        try {
+                            SessionContextImpl sesCtx = 
qry.context().unwrap(SessionContextImpl.class);
+
+                            QueryStartRequest req = new QueryStartRequest(
+                                qry.id(),
+                                qry.localQueryId(),
+                                qry.context().schemaName(),
+                                fragment.serialized(),
+                                ectx.topologyVersion(),
+                                fragmentDesc,
+                                fragmentsPerNode.get(nodeId).intValue(),
+                                qry.parameters(),
+                                parametersMarshalled,
+                                timeout,
+                                ectx.getQryTxEntries(),
+                                sesCtx == null ? null : sesCtx.attributes(),
+                                keepBinary
+                            );
+
+                            messageService().send(nodeId, req);
+
+                            // Avoid marshaling of the same parameters for 
other nodes.
+                            if (parametersMarshalled == null)
+                                parametersMarshalled = 
req.parametersMarshalled();
+                        }
+                        catch (Throwable e) {
+                            qry.onResponse(nodeId, fragment.fragmentId(), ex = 
e);
+                        }
                     }
                 }
             }
-        }
-
-        if (perfStatProc.enabled()) {
-            perfStatProc.queryProperty(
-                GridCacheQueryType.SQL_FIELDS,
-                qry.initiatorNodeId(),
-                qry.localQueryId(),
-                "Query plan",
-                plan.textPlan()
-            );
-        }
 
-        if (ctx.query().runningQueryManager().planHistoryTracker().enabled()) {
-            ctx.query().runningQueryManager().planHistoryTracker().addPlan(
-                plan.textPlan(),
-                qry.sql(),
-                qry.context().schemaName(),
-                qry.context().isLocal(),
-                CalciteQueryEngineConfiguration.ENGINE_NAME
-            );
-        }
+            if (perfStatProc.enabled()) {
+                perfStatProc.queryProperty(
+                    GridCacheQueryType.SQL_FIELDS,
+                    qry.initiatorNodeId(),
+                    qry.localQueryId(),
+                    "Query plan",
+                    plan.textPlan()
+                );
+            }
 
-        QueryProperties qryProps = qry.context().unwrap(QueryProperties.class);
+            if 
(ctx.query().runningQueryManager().planHistoryTracker().enabled()) {
+                ctx.query().runningQueryManager().planHistoryTracker().addPlan(
+                    plan.textPlan(),
+                    qry.sql(),
+                    qry.context().schemaName(),
+                    qry.context().isLocal(),
+                    CalciteQueryEngineConfiguration.ENGINE_NAME
+                );
+            }
 
-        Function<Object, Object> fieldConverter = (qryProps == null || 
qryProps.keepBinary()) ? null :
-            o -> CacheObjectUtils.unwrapBinaryIfNeeded(objValCtx, o, false, 
true, null);
+            Function<Object, Object> fieldConverter = keepBinary ? null :
+                o -> CacheObjectUtils.unwrapBinaryIfNeeded(objValCtx, o, 
false, true, null);
+
+            HeavyQueriesTracker.ResultSetChecker resultSetChecker = 
ctx.query().runningQueryManager()
+                .heavyQueriesTracker().resultSetChecker(qry);
+
+            Function<List<Object>, List<Object>> rowConverter;
+
+            // Fire EVT_CACHE_QUERY_OBJECT_READ on initiator node before 
return result to cursor.
+            if (qryProps != null && qryProps.cacheName() != null && 
evtMgr.isRecordable(EVT_CACHE_QUERY_OBJECT_READ)) {
+                ClusterNode locNode = ctx.discovery().localNode();
+                UUID subjId = SecurityUtils.securitySubjectId(ctx);
+
+                rowConverter = row -> {
+                    evtMgr.record(new CacheQueryReadEvent<>(
+                        locNode,
+                        "SQL fields query result set row read.",
+                        EVT_CACHE_QUERY_OBJECT_READ,
+                        CacheQueryType.SQL_FIELDS.name(),
+                        qryProps.cacheName(),
+                        null,
+                        qry.sql(),
+                        null,
+                        null,
+                        qry.parameters(),
+                        subjId,
+                        null,
+                        null,
+                        null,
+                        null,
+                        row));
+
+                    resultSetChecker.checkOnFetchNext();
+
+                    return row;
+                };
+            }
+            else {
+                rowConverter = row -> {
+                    resultSetChecker.checkOnFetchNext();
 
-        HeavyQueriesTracker.ResultSetChecker resultSetChecker = 
ctx.query().runningQueryManager()
-            .heavyQueriesTracker().resultSetChecker(qry);
+                    return row;
+                };
+            }
 
-        Function<List<Object>, List<Object>> rowConverter;
+            Runnable onClose = () -> {
+                if (udfQry) // Restore UDF queries limit.
+                    udfQryLimit.getAndIncrement();
+
+                if (perfStatProc.enabled()) {
+                    perfStatProc.queryRowsProcessed(
+                        GridCacheQueryType.SQL_FIELDS,
+                        qry.initiatorNodeId(),
+                        qry.localQueryId(),
+                        "Fetched",
+                        resultSetChecker.fetchedSize()
+                    );
+                }
 
-        // Fire EVT_CACHE_QUERY_OBJECT_READ on initiator node before return 
result to cursor.
-        if (qryProps != null && qryProps.cacheName() != null && 
evtMgr.isRecordable(EVT_CACHE_QUERY_OBJECT_READ)) {
-            ClusterNode locNode = ctx.discovery().localNode();
-            UUID subjId = SecurityUtils.securitySubjectId(ctx);
+                resultSetChecker.checkOnClose();
+            };
 
-            rowConverter = row -> {
-                evtMgr.record(new CacheQueryReadEvent<>(
-                    locNode,
-                    "SQL fields query result set row read.",
-                    EVT_CACHE_QUERY_OBJECT_READ,
-                    CacheQueryType.SQL_FIELDS.name(),
-                    qryProps.cacheName(),
-                    null,
-                    qry.sql(),
-                    null,
-                    null,
-                    qry.parameters(),
-                    subjId,
-                    null,
-                    null,
-                    null,
-                    null,
-                    row));
+            Iterator<List<?>> it = iteratorsHolder().iterator(new 
ConvertingClosableIterator<>(qry.iterator(), ectx,
+                fieldConverter, rowConverter, onClose));
 
-                resultSetChecker.checkOnFetchNext();
+            // Make yet another tracking layer for cursor.getAll(), so 
tracking hierarchy will look like:
+            // Row tracker -> Cursor memory tracker -> Query memory tracker -> 
Global memory tracker.
+            // It's required, since query memory tracker can be closed 
concurrently during getAll() and
+            // tracked data for cursor can be lost without additional tracker.
+            MemoryTracker curMemoryTracker = 
QueryMemoryTracker.create(qryMemoryTracker, cfg.getQueryMemoryQuota());
 
-                return row;
-            };
+            return new ListFieldsQueryCursor<>(plan, it, ectx, 
curMemoryTracker);
         }
-        else {
-            rowConverter = row -> {
-                resultSetChecker.checkOnFetchNext();
+        catch (Exception e) {
+            if (udfQry) // Restore UDF queries limit.
+                udfQryLimit.getAndIncrement();
 
-                return row;
-            };
+            throw e;
         }
-
-        Runnable onClose = () -> {
-            if (perfStatProc.enabled()) {
-                perfStatProc.queryRowsProcessed(
-                    GridCacheQueryType.SQL_FIELDS,
-                    qry.initiatorNodeId(),
-                    qry.localQueryId(),
-                    "Fetched",
-                    resultSetChecker.fetchedSize()
-                );
-            }
-
-            resultSetChecker.checkOnClose();
-        };
-
-        Iterator<List<?>> it = new 
ConvertingClosableIterator<>(iteratorsHolder().iterator(qry.iterator()), ectx,
-            fieldConverter, rowConverter, onClose);
-
-        // Make yet another tracking layer for cursor.getAll(), so tracking 
hierarchy will look like:
-        // Row tracker -> Cursor memory tracker -> Query memory tracker -> 
Global memory tracker.
-        // It's required, since query memory tracker can be closed 
concurrently during getAll() and
-        // tracked data for cursor can be lost without additional tracker.
-        MemoryTracker curMemoryTracker = 
QueryMemoryTracker.create(qryMemoryTracker, cfg.getQueryMemoryQuota());
-
-        return new ListFieldsQueryCursor<>(plan, it, ectx, curMemoryTracker);
     }
 
     /** */
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
index 7e64f373575..f60fb59014d 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
@@ -174,7 +174,7 @@ public class AbstractBasicIntegrationTest extends 
GridCommonAbstractTest {
     }
 
     /**
-     * Asserts that executeSql throws an exception.
+     * Asserts that query throws an exception.
      *
      * @param sql Query.
      * @param cls Exception class.
@@ -184,6 +184,18 @@ public class AbstractBasicIntegrationTest extends 
GridCommonAbstractTest {
         assertThrowsAnyCause(log, () -> sql(sql, args), cls, msg);
     }
 
+    /**
+     * Asserts that query throws an exception.
+     *
+     * @param ignite Ignite instance.
+     * @param sql Query.
+     * @param cls Exception class.
+     * @param msg Error message.
+     */
+    protected void assertThrows(IgniteEx ignite, String sql, Class<? extends 
Exception> cls, String msg, Object... args) {
+        assertThrowsAnyCause(log, () -> sql(ignite, sql, args), cls, msg);
+    }
+
     /** */
     protected void createAndPopulateTable() {
         createAndPopulateTable(client, 2, CacheMode.PARTITIONED);
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
index dc72fa9b314..5eb2f511ec0 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
@@ -22,6 +22,7 @@ import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.Statement;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -68,11 +69,11 @@ import 
org.apache.ignite.internal.processors.query.calcite.RootQuery;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.task.AbstractQueryTaskExecutor;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.task.QueryBlockingTaskExecutor;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.task.StripedQueryTaskExecutor;
-import 
org.apache.ignite.internal.processors.query.running.GridRunningQueryInfo;
 import org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker;
 import org.apache.ignite.internal.util.GridTestClockTimer;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.metric.MetricRegistry;
 import org.apache.ignite.spi.metric.LongMetric;
@@ -103,6 +104,8 @@ import static 
org.apache.ignite.internal.processors.query.running.HeavyQueriesTr
 import static 
org.apache.ignite.internal.processors.query.running.RunningQueryManager.SQL_QRY_HIST_VIEW;
 import static 
org.apache.ignite.internal.processors.query.running.RunningQueryManager.SQL_USER_QUERIES_REG_NAME;
 import static org.apache.ignite.internal.util.lang.GridFunc.first;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static org.apache.ignite.testframework.GridTestUtils.runMultiThreaded;
 import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
 
 /**
@@ -119,7 +122,7 @@ public class SqlDiagnosticIntegrationTest extends 
AbstractBasicIntegrationTest {
     private static final int BIG_RESULT_SET_THRESHOLD = 10_000;
 
     /** */
-    private static final int POOL_SIZE = 2;
+    private static final int POOL_SIZE = 5;
 
     /** */
     private ListeningTestLogger log;
@@ -921,73 +924,72 @@ public class SqlDiagnosticIntegrationTest extends 
AbstractBasicIntegrationTest {
 
     /** */
     @Test
-    public void testUdfQueryWarningStripedExecutor() throws Exception {
-        assertTrue(queryProcessor(grid(0)).taskExecutor() instanceof 
StripedQueryTaskExecutor);
+    public void testUdfQueryDeadlockDetectionStripedExecutor() throws 
Exception {
+        IgniteEx ignite = grid(0);
+
+        assertTrue(queryProcessor(ignite).taskExecutor() instanceof 
StripedQueryTaskExecutor);
+
+        client.getOrCreateCache(new CacheConfiguration<Integer, 
Integer>(DEFAULT_CACHE_NAME)
+            .setSqlFunctionClasses(FunctionsLibrary.class)
+            .setSqlSchema("PUBLIC")
+        );
+
+        // Expect message with tips about switching to query blocking task 
executor.
+        String expMsg = IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR + 
"=true";
 
-        
checkUdfQueryWarning("-DIGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR=true");
+        // Check that error is thrown for UDF initiated query.
+        assertThrows(ignite, "SELECT innerSql(?, ?, ?)", 
IgniteSQLException.class,
+            expMsg, ignite.name(), DEFAULT_CACHE_NAME, "SELECT 'Test'");
     }
 
     /** */
     @Test
     @WithSystemProperty(key = IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR, 
value = "true")
-    public void testUdfQueryWarningBlockingExecutor() throws Exception {
-        assertTrue(queryProcessor(grid(0)).taskExecutor() instanceof 
QueryBlockingTaskExecutor);
+    public void testUdfQueryDeadlockDetectionBlockingExecutor() throws 
Exception {
+        IgniteEx ignite = grid(0);
 
-        checkUdfQueryWarning("IgniteConfiguration.QueryThreadPoolSize");
-    }
+        assertTrue(queryProcessor(ignite).taskExecutor() instanceof 
QueryBlockingTaskExecutor);
 
-    /** */
-    private void checkUdfQueryWarning(String tipsMsg) throws Exception {
         client.getOrCreateCache(new CacheConfiguration<Integer, 
Integer>(DEFAULT_CACHE_NAME)
             .setSqlFunctionClasses(FunctionsLibrary.class)
             .setSqlSchema("PUBLIC")
         );
 
-        LogListener logLsnr1 = LogListener.matches("Detected query initiated 
by user-defined function.").build();
-        LogListener logLsnr2 = LogListener.matches(tipsMsg).build();
-
-        log.registerListener(logLsnr1);
-        log.registerListener(logLsnr2);
-
-        // Check that message is not printed for regular query.
-        sql(grid(0), "SELECT ?", "Test");
-
-        assertFalse(logLsnr1.check());
-        assertFalse(logLsnr2.check());
-
-        // Check that message is printed for UDF initiated query.
-        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() -> 
sql(grid(0), "SELECT innerSql(?, ?, ?)",
-            grid(0).name(), DEFAULT_CACHE_NAME, "Test"));
-
-        assertTrue(logLsnr1.check(1_000L));
-        assertTrue(logLsnr2.check());
-
-        cancelAllQueriesAndWaitForCompletion(grid(0), fut);
+        FunctionsLibrary.latch = new CountDownLatch(POOL_SIZE);
 
-        // Check that message is printed only once.
-        logLsnr1.reset();
-        logLsnr2.reset();
+        List<IgniteInternalFuture<List<List<?>>>> futs = new 
ArrayList<>(POOL_SIZE);
 
-        fut = GridTestUtils.runAsync(() -> sql(grid(0), "SELECT innerSql(?, ?, 
?)",
-            grid(0).name(), DEFAULT_CACHE_NAME, "Test"));
+        for (int i = 0; i < POOL_SIZE; i++) {
+            futs.add(runAsync(() -> sql(ignite,
+                "SELECT countDownLatch(), waitLatch(1000), innerSql(?, ?, ?)",
+                ignite.name(), DEFAULT_CACHE_NAME, "SELECT cast(sleep(500) AS 
varchar)")));
+        }
 
-        assertFalse(logLsnr1.check(1_000L));
-        assertFalse(logLsnr2.check());
+        // Expect message with tips about query pool size.
+        String expMsg = "IgniteConfiguration.QueryThreadPoolSize";
+        boolean errFound = false;
 
-        cancelAllQueriesAndWaitForCompletion(grid(0), fut);
-    }
+        // Check that concurrent inner queries allow to occupy all thread pool 
except one thread.
+        for (IgniteInternalFuture<List<List<?>>> fut : futs) {
+            try {
+                assertEquals(F.asList(true, true, "TRUE"), 
fut.get(5_000L).get(0));
+            }
+            catch (Exception e) {
+                assertTrue("Unexpected error: " + e, X.hasCause(e, expMsg, 
IgniteSQLException.class));
+                assertFalse(errFound);
+                errFound = true;
+            }
+        }
 
-    /** */
-    private void cancelAllQueriesAndWaitForCompletion(IgniteEx ignite, 
IgniteInternalFuture<?> qryFut) {
-        
ignite.context().query().runningQueryManager().runningSqlQueries().forEach(GridRunningQueryInfo::cancel);
+        assertTrue(errFound);
 
-        try {
-            // Wait for future completion, it can be successful or 
unsuccessful.
-            qryFut.get();
-        }
-        catch (Exception ignore) {
-            // No-op.
-        }
+        // Check that POOL_SIZE - 1 concurrent inner queries can't block the 
execution.
+        runMultiThreaded(() -> {
+            for (int i = 0; i < 1000; i++) {
+                assertEquals("Test", sql("SELECT innerSql(?, ?, ?)",
+                    ignite.name(), DEFAULT_CACHE_NAME, "SELECT 
'Test'").get(0).get(0));
+            }
+        }, POOL_SIZE - 1, "async-sql");
     }
 
     /** Verifies that user-defined query initiator ID is present in the 
SQL_QUERY_HISTORY system view and logs. */
@@ -1125,6 +1127,14 @@ public class SqlDiagnosticIntegrationTest extends 
AbstractBasicIntegrationTest {
             return true;
         }
 
+        /** */
+        @QuerySqlFunction
+        public static boolean countDownLatch() {
+            latch.countDown();
+
+            return true;
+        }
+
         /** */
         @QuerySqlFunction
         public static boolean sleep(int sleep) {
@@ -1137,10 +1147,10 @@ public class SqlDiagnosticIntegrationTest extends 
AbstractBasicIntegrationTest {
 
         /** */
         @QuerySqlFunction
-        public static String innerSql(String ignite, String cache, String val) 
{
+        public static String innerSql(String ignite, String cache, String sql) 
{
             return (String)Ignition.ignite(ignite)
                 .cache(cache)
-                .query(new SqlFieldsQuery("SELECT ?").setArgs(val))
+                .query(new SqlFieldsQuery(sql))
                 .getAll().get(0).get(0);
         }
     }


Reply via email to