This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new c24e15f3255 IGNITE-28502 SQL Calcite: Add deadlock detection for
queries initiated by UDF - Fixes #13004.
c24e15f3255 is described below
commit c24e15f3255e2eb1d731a9aac18e55a9814b1ec0
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Tue Apr 14 17:20:34 2026 +0300
IGNITE-28502 SQL Calcite: Add deadlock detection for queries initiated by
UDF - Fixes #13004.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../query/calcite/CalciteQueryProcessor.java | 29 --
.../query/calcite/exec/ExecutionServiceImpl.java | 395 ++++++++++++---------
.../integration/AbstractBasicIntegrationTest.java | 14 +-
.../integration/SqlDiagnosticIntegrationTest.java | 114 +++---
4 files changed, 298 insertions(+), 254 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
index 015ae25b6b9..ca6fbd6f390 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
@@ -82,7 +82,6 @@ import
org.apache.ignite.internal.processors.query.calcite.exec.QueryTaskExecuto
import org.apache.ignite.internal.processors.query.calcite.exec.TimeoutService;
import
org.apache.ignite.internal.processors.query.calcite.exec.TimeoutServiceImpl;
import
org.apache.ignite.internal.processors.query.calcite.exec.exp.RexExecutorImpl;
-import
org.apache.ignite.internal.processors.query.calcite.exec.task.AbstractQueryTaskExecutor;
import
org.apache.ignite.internal.processors.query.calcite.exec.task.QueryBlockingTaskExecutor;
import
org.apache.ignite.internal.processors.query.calcite.exec.task.StripedQueryTaskExecutor;
import org.apache.ignite.internal.processors.query.calcite.hint.HintsConfig;
@@ -272,9 +271,6 @@ public class CalciteQueryProcessor extends
GridProcessorAdapter implements Query
/** */
private final InjectResourcesService injectSvc;
- /** */
- private final AtomicBoolean udfQryWarned = new AtomicBoolean();
-
/** */
private volatile boolean started;
@@ -545,8 +541,6 @@ public class CalciteQueryProcessor extends
GridProcessorAdapter implements Query
) throws IgniteSQLException {
ensureTransactionModeSupported(qryCtx);
- checkUdfQuery();
-
SchemaPlus schema = schemaHolder.schema(schemaName);
assert schema != null : "Schema not found: " + schemaName;
@@ -704,29 +698,6 @@ public class CalciteQueryProcessor extends
GridProcessorAdapter implements Query
IgniteTxManager.ensureTransactionModeSupported(ctx.cache().context().tm().tx(ver).isolation());
}
- /** Checks that query is initiated by UDF and print message to log if
needed. */
- private void checkUdfQuery() {
- if (udfQryWarned.get())
- return;
-
- if
(Thread.currentThread().getName().startsWith(AbstractQueryTaskExecutor.THREAD_PREFIX)
- && udfQryWarned.compareAndSet(false, true)) {
- if (taskExecutor instanceof QueryBlockingTaskExecutor) {
- log.info("Detected query initiated by user-defined function. "
+
- "In some circumstances, this can lead to thread pool
starvation and deadlock. Ensure that " +
- "the pool size is properly configured (property
IgniteConfiguration.QueryThreadPoolSize). " +
- "The pool size should be greater than the maximum number
of concurrent queries initiated by UDFs.");
- }
- else {
- log.warning("Detected query initiated by user-defined
function. " +
- "When a striped query task executor (the default
configuration) is used, tasks for such queries " +
- "can be assigned to the same thread as that held by the
initial query, which can lead to a " +
- "deadlock. To switch to a blocking tasks executor, set the
following parameter: " +
- "-DIGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR=true.");
- }
- }
- }
-
/** */
private <T> T processQuery(
@Nullable QueryContext qryCtx,
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index 497520f96ef..24e3f8002c0 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -22,6 +22,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.calcite.plan.Context;
@@ -66,6 +67,8 @@ import
org.apache.ignite.internal.processors.query.calcite.exec.ddl.DdlCommandHa
import org.apache.ignite.internal.processors.query.calcite.exec.rel.Inbox;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.Node;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.Outbox;
+import
org.apache.ignite.internal.processors.query.calcite.exec.task.AbstractQueryTaskExecutor;
+import
org.apache.ignite.internal.processors.query.calcite.exec.task.QueryBlockingTaskExecutor;
import
org.apache.ignite.internal.processors.query.calcite.exec.tracker.GlobalMemoryTracker;
import
org.apache.ignite.internal.processors.query.calcite.exec.tracker.IoTracker;
import
org.apache.ignite.internal.processors.query.calcite.exec.tracker.MemoryTracker;
@@ -202,6 +205,9 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
/** */
private InjectResourcesService injectSvc;
+ /** Limit for nested queries, initiated by UDF. */
+ private final AtomicInteger udfQryLimit = new AtomicInteger();
+
/** */
private final Map<String, FragmentPlan> fragmentPlanCache = new
GridBoundedConcurrentLinkedHashMap<>(1024);
@@ -465,6 +471,8 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
memoryTracker = cfg.getGlobalMemoryQuota() > 0 ? new
GlobalMemoryTracker(cfg.getGlobalMemoryQuota()) :
NoOpMemoryTracker.INSTANCE;
+ udfQryLimit.set(ctx.config().getQueryThreadPoolSize() - 1);
+
init();
}
@@ -574,6 +582,38 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
}
}
+ /**
+ * Checks that query is initiated by UDF.
+ *
+ * @return {@code True} if query is initiated by UDF (in this case UDF
query limit is affected).
+ * @throws IgniteSQLException If query execution can lead to deadlocks.
+ */
+ private boolean checkUdfQuery() {
+ if
(Thread.currentThread().getName().startsWith(AbstractQueryTaskExecutor.THREAD_PREFIX))
{
+ if (taskExecutor instanceof QueryBlockingTaskExecutor) {
+ if (udfQryLimit.getAndDecrement() <= 0) {
+ udfQryLimit.getAndIncrement();
+
+ throw new IgniteSQLException("Detected thread pool
starvation by queries initiated by " +
+ "user-defined functions. Starting more queries from
UDF will lead to deadlock. Ensure that " +
+ "the pool size is properly configured (property
IgniteConfiguration.QueryThreadPoolSize). " +
+ "The pool size should be greater than the maximum
number of concurrent queries initiated by UDFs.");
+ }
+
+ return true;
+ }
+ else {
+ throw new IgniteSQLException("Detected query initiated by
user-defined function. " +
+ "When a striped query task executor (the default
configuration) is used, tasks for such queries " +
+ "can be assigned to the same thread as that held by the
initial query, which can lead to a " +
+ "deadlock. To avoid deadlocks switch to a blocking tasks
executor (set the parameter: " +
+ "-DIGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR=true)");
+ }
+ }
+
+ return false;
+ }
+
/** */
private ListFieldsQueryCursor<?> mapAndExecutePlan(
RootQuery<Row> qry,
@@ -594,207 +634,218 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
checkPermissions(fragment.root());
}
- // Local execution
- Fragment fragment = F.first(fragments);
+ boolean udfQry = checkUdfQuery();
+
+ try {
+ // Local execution
+ Fragment fragment = F.first(fragments);
- if (U.assertionsEnabled()) {
- assert fragment != null;
+ if (U.assertionsEnabled()) {
+ assert fragment != null;
- FragmentMapping mapping = execPlan.mapping(fragment);
+ FragmentMapping mapping = execPlan.mapping(fragment);
- assert mapping != null;
+ assert mapping != null;
- List<UUID> nodes = mapping.nodeIds();
+ List<UUID> nodes = mapping.nodeIds();
- assert nodes != null && (nodes.size() == 1 &&
F.first(nodes).equals(localNodeId()) || nodes.isEmpty())
+ assert nodes != null && (nodes.size() == 1 &&
F.first(nodes).equals(localNodeId()) || nodes.isEmpty())
: "nodes=" + nodes + ", localNode=" + localNodeId();
- }
+ }
- long timeout = qry.remainingTime();
+ long timeout = qry.remainingTime();
- if (timeout == 0) {
- throw new IgniteSQLException("The query was cancelled due to
timeout", IgniteQueryErrorCode.QUERY_CANCELED,
- new QueryCancelledException());
- }
+ if (timeout == 0) {
+ throw new IgniteSQLException("The query was cancelled due to
timeout", IgniteQueryErrorCode.QUERY_CANCELED,
+ new QueryCancelledException());
+ }
- FragmentDescription fragmentDesc = new FragmentDescription(
- fragment.fragmentId(),
- execPlan.mapping(fragment),
- execPlan.target(fragment),
- execPlan.remotes(fragment));
-
- MemoryTracker qryMemoryTracker =
qry.createMemoryTracker(memoryTracker, cfg.getQueryMemoryQuota());
-
- final GridNearTxLocal userTx = Commons.queryTransaction(qry.context(),
ctx.cache().context());
-
- ExecutionContext<Row> ectx = new ExecutionContext<>(
- qry.context(),
- taskExecutor(),
- injectSvc,
- qry.id(),
- locNodeId,
- locNodeId,
- mapCtx.topologyVersion(),
- fragmentDesc,
- handler,
- qryMemoryTracker,
- createIoTracker(locNodeId, qry.localQueryId()),
- timeout,
- qryParams,
- userTx == null ? null :
ExecutionContext.transactionChanges(userTx.writeEntries()));
-
- Node<Row> node = new LogicalRelImplementor<>(ectx, partitionService(),
mailboxRegistry(),
- exchangeService(), failureProcessor()).go(fragment.root());
-
- qry.run(ectx, execPlan, plan.fieldsMetadata(), node);
-
- Map<UUID, Long> fragmentsPerNode = fragments.stream()
- .skip(1)
- .flatMap(f -> f.mapping().nodeIds().stream())
- .collect(Collectors.groupingBy(Function.identity(),
Collectors.counting()));
-
- // Start remote execution.
- for (int i = 1; i < fragments.size(); i++) {
- fragment = fragments.get(i);
- fragmentDesc = new FragmentDescription(
+ FragmentDescription fragmentDesc = new FragmentDescription(
fragment.fragmentId(),
execPlan.mapping(fragment),
execPlan.target(fragment),
execPlan.remotes(fragment));
- Throwable ex = null;
- byte[] parametersMarshalled = null;
-
- for (UUID nodeId : fragmentDesc.nodeIds()) {
- if (ex != null)
- qry.onResponse(nodeId, fragment.fragmentId(), ex);
- else {
- try {
- SessionContextImpl sesCtx =
qry.context().unwrap(SessionContextImpl.class);
-
- QueryProperties props =
qry.context().unwrap(QueryProperties.class);
- boolean keepBinaryMode = props == null ||
props.keepBinary();
-
- QueryStartRequest req = new QueryStartRequest(
- qry.id(),
- qry.localQueryId(),
- qry.context().schemaName(),
- fragment.serialized(),
- ectx.topologyVersion(),
- fragmentDesc,
- fragmentsPerNode.get(nodeId).intValue(),
- qry.parameters(),
- parametersMarshalled,
- timeout,
- ectx.getQryTxEntries(),
- sesCtx == null ? null : sesCtx.attributes(),
- keepBinaryMode
- );
-
- messageService().send(nodeId, req);
-
- // Avoid marshaling of the same parameters for other
nodes.
- if (parametersMarshalled == null)
- parametersMarshalled = req.parametersMarshalled();
- }
- catch (Throwable e) {
- qry.onResponse(nodeId, fragment.fragmentId(), ex = e);
+ MemoryTracker qryMemoryTracker =
qry.createMemoryTracker(memoryTracker, cfg.getQueryMemoryQuota());
+
+ final GridNearTxLocal userTx =
Commons.queryTransaction(qry.context(), ctx.cache().context());
+
+ ExecutionContext<Row> ectx = new ExecutionContext<>(
+ qry.context(),
+ taskExecutor(),
+ injectSvc,
+ qry.id(),
+ locNodeId,
+ locNodeId,
+ mapCtx.topologyVersion(),
+ fragmentDesc,
+ handler,
+ qryMemoryTracker,
+ createIoTracker(locNodeId, qry.localQueryId()),
+ timeout,
+ qryParams,
+ userTx == null ? null :
ExecutionContext.transactionChanges(userTx.writeEntries()));
+
+ Node<Row> node = new LogicalRelImplementor<>(ectx,
partitionService(), mailboxRegistry(),
+ exchangeService(), failureProcessor()).go(fragment.root());
+
+ qry.run(ectx, execPlan, plan.fieldsMetadata(), node);
+
+ Map<UUID, Long> fragmentsPerNode = fragments.stream()
+ .skip(1)
+ .flatMap(f -> f.mapping().nodeIds().stream())
+ .collect(Collectors.groupingBy(Function.identity(),
Collectors.counting()));
+
+ QueryProperties qryProps =
qry.context().unwrap(QueryProperties.class);
+ boolean keepBinary = qryProps == null || qryProps.keepBinary();
+
+ // Start remote execution.
+ for (int i = 1; i < fragments.size(); i++) {
+ fragment = fragments.get(i);
+ fragmentDesc = new FragmentDescription(
+ fragment.fragmentId(),
+ execPlan.mapping(fragment),
+ execPlan.target(fragment),
+ execPlan.remotes(fragment));
+
+ Throwable ex = null;
+ byte[] parametersMarshalled = null;
+
+ for (UUID nodeId : fragmentDesc.nodeIds()) {
+ if (ex != null)
+ qry.onResponse(nodeId, fragment.fragmentId(), ex);
+ else {
+ try {
+ SessionContextImpl sesCtx =
qry.context().unwrap(SessionContextImpl.class);
+
+ QueryStartRequest req = new QueryStartRequest(
+ qry.id(),
+ qry.localQueryId(),
+ qry.context().schemaName(),
+ fragment.serialized(),
+ ectx.topologyVersion(),
+ fragmentDesc,
+ fragmentsPerNode.get(nodeId).intValue(),
+ qry.parameters(),
+ parametersMarshalled,
+ timeout,
+ ectx.getQryTxEntries(),
+ sesCtx == null ? null : sesCtx.attributes(),
+ keepBinary
+ );
+
+ messageService().send(nodeId, req);
+
+ // Avoid marshaling of the same parameters for
other nodes.
+ if (parametersMarshalled == null)
+ parametersMarshalled =
req.parametersMarshalled();
+ }
+ catch (Throwable e) {
+ qry.onResponse(nodeId, fragment.fragmentId(), ex =
e);
+ }
}
}
}
- }
-
- if (perfStatProc.enabled()) {
- perfStatProc.queryProperty(
- GridCacheQueryType.SQL_FIELDS,
- qry.initiatorNodeId(),
- qry.localQueryId(),
- "Query plan",
- plan.textPlan()
- );
- }
- if (ctx.query().runningQueryManager().planHistoryTracker().enabled()) {
- ctx.query().runningQueryManager().planHistoryTracker().addPlan(
- plan.textPlan(),
- qry.sql(),
- qry.context().schemaName(),
- qry.context().isLocal(),
- CalciteQueryEngineConfiguration.ENGINE_NAME
- );
- }
+ if (perfStatProc.enabled()) {
+ perfStatProc.queryProperty(
+ GridCacheQueryType.SQL_FIELDS,
+ qry.initiatorNodeId(),
+ qry.localQueryId(),
+ "Query plan",
+ plan.textPlan()
+ );
+ }
- QueryProperties qryProps = qry.context().unwrap(QueryProperties.class);
+ if
(ctx.query().runningQueryManager().planHistoryTracker().enabled()) {
+ ctx.query().runningQueryManager().planHistoryTracker().addPlan(
+ plan.textPlan(),
+ qry.sql(),
+ qry.context().schemaName(),
+ qry.context().isLocal(),
+ CalciteQueryEngineConfiguration.ENGINE_NAME
+ );
+ }
- Function<Object, Object> fieldConverter = (qryProps == null ||
qryProps.keepBinary()) ? null :
- o -> CacheObjectUtils.unwrapBinaryIfNeeded(objValCtx, o, false,
true, null);
+ Function<Object, Object> fieldConverter = keepBinary ? null :
+ o -> CacheObjectUtils.unwrapBinaryIfNeeded(objValCtx, o,
false, true, null);
+
+ HeavyQueriesTracker.ResultSetChecker resultSetChecker =
ctx.query().runningQueryManager()
+ .heavyQueriesTracker().resultSetChecker(qry);
+
+ Function<List<Object>, List<Object>> rowConverter;
+
+ // Fire EVT_CACHE_QUERY_OBJECT_READ on initiator node before
return result to cursor.
+ if (qryProps != null && qryProps.cacheName() != null &&
evtMgr.isRecordable(EVT_CACHE_QUERY_OBJECT_READ)) {
+ ClusterNode locNode = ctx.discovery().localNode();
+ UUID subjId = SecurityUtils.securitySubjectId(ctx);
+
+ rowConverter = row -> {
+ evtMgr.record(new CacheQueryReadEvent<>(
+ locNode,
+ "SQL fields query result set row read.",
+ EVT_CACHE_QUERY_OBJECT_READ,
+ CacheQueryType.SQL_FIELDS.name(),
+ qryProps.cacheName(),
+ null,
+ qry.sql(),
+ null,
+ null,
+ qry.parameters(),
+ subjId,
+ null,
+ null,
+ null,
+ null,
+ row));
+
+ resultSetChecker.checkOnFetchNext();
+
+ return row;
+ };
+ }
+ else {
+ rowConverter = row -> {
+ resultSetChecker.checkOnFetchNext();
- HeavyQueriesTracker.ResultSetChecker resultSetChecker =
ctx.query().runningQueryManager()
- .heavyQueriesTracker().resultSetChecker(qry);
+ return row;
+ };
+ }
- Function<List<Object>, List<Object>> rowConverter;
+ Runnable onClose = () -> {
+ if (udfQry) // Restore UDF queries limit.
+ udfQryLimit.getAndIncrement();
+
+ if (perfStatProc.enabled()) {
+ perfStatProc.queryRowsProcessed(
+ GridCacheQueryType.SQL_FIELDS,
+ qry.initiatorNodeId(),
+ qry.localQueryId(),
+ "Fetched",
+ resultSetChecker.fetchedSize()
+ );
+ }
- // Fire EVT_CACHE_QUERY_OBJECT_READ on initiator node before return
result to cursor.
- if (qryProps != null && qryProps.cacheName() != null &&
evtMgr.isRecordable(EVT_CACHE_QUERY_OBJECT_READ)) {
- ClusterNode locNode = ctx.discovery().localNode();
- UUID subjId = SecurityUtils.securitySubjectId(ctx);
+ resultSetChecker.checkOnClose();
+ };
- rowConverter = row -> {
- evtMgr.record(new CacheQueryReadEvent<>(
- locNode,
- "SQL fields query result set row read.",
- EVT_CACHE_QUERY_OBJECT_READ,
- CacheQueryType.SQL_FIELDS.name(),
- qryProps.cacheName(),
- null,
- qry.sql(),
- null,
- null,
- qry.parameters(),
- subjId,
- null,
- null,
- null,
- null,
- row));
+ Iterator<List<?>> it = iteratorsHolder().iterator(new
ConvertingClosableIterator<>(qry.iterator(), ectx,
+ fieldConverter, rowConverter, onClose));
- resultSetChecker.checkOnFetchNext();
+ // Make yet another tracking layer for cursor.getAll(), so
tracking hierarchy will look like:
+ // Row tracker -> Cursor memory tracker -> Query memory tracker ->
Global memory tracker.
+ // It's required, since query memory tracker can be closed
concurrently during getAll() and
+ // tracked data for cursor can be lost without additional tracker.
+ MemoryTracker curMemoryTracker =
QueryMemoryTracker.create(qryMemoryTracker, cfg.getQueryMemoryQuota());
- return row;
- };
+ return new ListFieldsQueryCursor<>(plan, it, ectx,
curMemoryTracker);
}
- else {
- rowConverter = row -> {
- resultSetChecker.checkOnFetchNext();
+ catch (Exception e) {
+ if (udfQry) // Restore UDF queries limit.
+ udfQryLimit.getAndIncrement();
- return row;
- };
+ throw e;
}
-
- Runnable onClose = () -> {
- if (perfStatProc.enabled()) {
- perfStatProc.queryRowsProcessed(
- GridCacheQueryType.SQL_FIELDS,
- qry.initiatorNodeId(),
- qry.localQueryId(),
- "Fetched",
- resultSetChecker.fetchedSize()
- );
- }
-
- resultSetChecker.checkOnClose();
- };
-
- Iterator<List<?>> it = new
ConvertingClosableIterator<>(iteratorsHolder().iterator(qry.iterator()), ectx,
- fieldConverter, rowConverter, onClose);
-
- // Make yet another tracking layer for cursor.getAll(), so tracking
hierarchy will look like:
- // Row tracker -> Cursor memory tracker -> Query memory tracker ->
Global memory tracker.
- // It's required, since query memory tracker can be closed
concurrently during getAll() and
- // tracked data for cursor can be lost without additional tracker.
- MemoryTracker curMemoryTracker =
QueryMemoryTracker.create(qryMemoryTracker, cfg.getQueryMemoryQuota());
-
- return new ListFieldsQueryCursor<>(plan, it, ectx, curMemoryTracker);
}
/** */
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
index 7e64f373575..f60fb59014d 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
@@ -174,7 +174,7 @@ public class AbstractBasicIntegrationTest extends
GridCommonAbstractTest {
}
/**
- * Asserts that executeSql throws an exception.
+ * Asserts that query throws an exception.
*
* @param sql Query.
* @param cls Exception class.
@@ -184,6 +184,18 @@ public class AbstractBasicIntegrationTest extends
GridCommonAbstractTest {
assertThrowsAnyCause(log, () -> sql(sql, args), cls, msg);
}
+ /**
+ * Asserts that query throws an exception.
+ *
+ * @param ignite Ignite instance.
+ * @param sql Query.
+ * @param cls Exception class.
+ * @param msg Error message.
+ */
+ protected void assertThrows(IgniteEx ignite, String sql, Class<? extends
Exception> cls, String msg, Object... args) {
+ assertThrowsAnyCause(log, () -> sql(ignite, sql, args), cls, msg);
+ }
+
/** */
protected void createAndPopulateTable() {
createAndPopulateTable(client, 2, CacheMode.PARTITIONED);
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
index dc72fa9b314..5eb2f511ec0 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
@@ -22,6 +22,7 @@ import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.Statement;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -68,11 +69,11 @@ import
org.apache.ignite.internal.processors.query.calcite.RootQuery;
import
org.apache.ignite.internal.processors.query.calcite.exec.task.AbstractQueryTaskExecutor;
import
org.apache.ignite.internal.processors.query.calcite.exec.task.QueryBlockingTaskExecutor;
import
org.apache.ignite.internal.processors.query.calcite.exec.task.StripedQueryTaskExecutor;
-import
org.apache.ignite.internal.processors.query.running.GridRunningQueryInfo;
import org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker;
import org.apache.ignite.internal.util.GridTestClockTimer;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.metric.MetricRegistry;
import org.apache.ignite.spi.metric.LongMetric;
@@ -103,6 +104,8 @@ import static
org.apache.ignite.internal.processors.query.running.HeavyQueriesTr
import static
org.apache.ignite.internal.processors.query.running.RunningQueryManager.SQL_QRY_HIST_VIEW;
import static
org.apache.ignite.internal.processors.query.running.RunningQueryManager.SQL_USER_QUERIES_REG_NAME;
import static org.apache.ignite.internal.util.lang.GridFunc.first;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static org.apache.ignite.testframework.GridTestUtils.runMultiThreaded;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
/**
@@ -119,7 +122,7 @@ public class SqlDiagnosticIntegrationTest extends
AbstractBasicIntegrationTest {
private static final int BIG_RESULT_SET_THRESHOLD = 10_000;
/** */
- private static final int POOL_SIZE = 2;
+ private static final int POOL_SIZE = 5;
/** */
private ListeningTestLogger log;
@@ -921,73 +924,72 @@ public class SqlDiagnosticIntegrationTest extends
AbstractBasicIntegrationTest {
/** */
@Test
- public void testUdfQueryWarningStripedExecutor() throws Exception {
- assertTrue(queryProcessor(grid(0)).taskExecutor() instanceof
StripedQueryTaskExecutor);
+ public void testUdfQueryDeadlockDetectionStripedExecutor() throws
Exception {
+ IgniteEx ignite = grid(0);
+
+ assertTrue(queryProcessor(ignite).taskExecutor() instanceof
StripedQueryTaskExecutor);
+
+ client.getOrCreateCache(new CacheConfiguration<Integer,
Integer>(DEFAULT_CACHE_NAME)
+ .setSqlFunctionClasses(FunctionsLibrary.class)
+ .setSqlSchema("PUBLIC")
+ );
+
+ // Expect message with tips about switching to query blocking task
executor.
+ String expMsg = IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR +
"=true";
-
checkUdfQueryWarning("-DIGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR=true");
+ // Check that error is thrown for UDF initiated query.
+ assertThrows(ignite, "SELECT innerSql(?, ?, ?)",
IgniteSQLException.class,
+ expMsg, ignite.name(), DEFAULT_CACHE_NAME, "SELECT 'Test'");
}
/** */
@Test
@WithSystemProperty(key = IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR,
value = "true")
- public void testUdfQueryWarningBlockingExecutor() throws Exception {
- assertTrue(queryProcessor(grid(0)).taskExecutor() instanceof
QueryBlockingTaskExecutor);
+ public void testUdfQueryDeadlockDetectionBlockingExecutor() throws
Exception {
+ IgniteEx ignite = grid(0);
- checkUdfQueryWarning("IgniteConfiguration.QueryThreadPoolSize");
- }
+ assertTrue(queryProcessor(ignite).taskExecutor() instanceof
QueryBlockingTaskExecutor);
- /** */
- private void checkUdfQueryWarning(String tipsMsg) throws Exception {
client.getOrCreateCache(new CacheConfiguration<Integer,
Integer>(DEFAULT_CACHE_NAME)
.setSqlFunctionClasses(FunctionsLibrary.class)
.setSqlSchema("PUBLIC")
);
- LogListener logLsnr1 = LogListener.matches("Detected query initiated
by user-defined function.").build();
- LogListener logLsnr2 = LogListener.matches(tipsMsg).build();
-
- log.registerListener(logLsnr1);
- log.registerListener(logLsnr2);
-
- // Check that message is not printed for regular query.
- sql(grid(0), "SELECT ?", "Test");
-
- assertFalse(logLsnr1.check());
- assertFalse(logLsnr2.check());
-
- // Check that message is printed for UDF initiated query.
- IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() ->
sql(grid(0), "SELECT innerSql(?, ?, ?)",
- grid(0).name(), DEFAULT_CACHE_NAME, "Test"));
-
- assertTrue(logLsnr1.check(1_000L));
- assertTrue(logLsnr2.check());
-
- cancelAllQueriesAndWaitForCompletion(grid(0), fut);
+ FunctionsLibrary.latch = new CountDownLatch(POOL_SIZE);
- // Check that message is printed only once.
- logLsnr1.reset();
- logLsnr2.reset();
+ List<IgniteInternalFuture<List<List<?>>>> futs = new
ArrayList<>(POOL_SIZE);
- fut = GridTestUtils.runAsync(() -> sql(grid(0), "SELECT innerSql(?, ?,
?)",
- grid(0).name(), DEFAULT_CACHE_NAME, "Test"));
+ for (int i = 0; i < POOL_SIZE; i++) {
+ futs.add(runAsync(() -> sql(ignite,
+ "SELECT countDownLatch(), waitLatch(1000), innerSql(?, ?, ?)",
+ ignite.name(), DEFAULT_CACHE_NAME, "SELECT cast(sleep(500) AS
varchar)")));
+ }
- assertFalse(logLsnr1.check(1_000L));
- assertFalse(logLsnr2.check());
+ // Expect message with tips about query pool size.
+ String expMsg = "IgniteConfiguration.QueryThreadPoolSize";
+ boolean errFound = false;
- cancelAllQueriesAndWaitForCompletion(grid(0), fut);
- }
+ // Check that concurrent inner queries allow to occupy all thread pool
except one thread.
+ for (IgniteInternalFuture<List<List<?>>> fut : futs) {
+ try {
+ assertEquals(F.asList(true, true, "TRUE"),
fut.get(5_000L).get(0));
+ }
+ catch (Exception e) {
+ assertTrue("Unexpected error: " + e, X.hasCause(e, expMsg,
IgniteSQLException.class));
+ assertFalse(errFound);
+ errFound = true;
+ }
+ }
- /** */
- private void cancelAllQueriesAndWaitForCompletion(IgniteEx ignite,
IgniteInternalFuture<?> qryFut) {
-
ignite.context().query().runningQueryManager().runningSqlQueries().forEach(GridRunningQueryInfo::cancel);
+ assertTrue(errFound);
- try {
- // Wait for future completion, it can be successful or
unsuccessful.
- qryFut.get();
- }
- catch (Exception ignore) {
- // No-op.
- }
+ // Check that POOL_SIZE - 1 concurrent inner queries can't block the
execution.
+ runMultiThreaded(() -> {
+ for (int i = 0; i < 1000; i++) {
+ assertEquals("Test", sql("SELECT innerSql(?, ?, ?)",
+ ignite.name(), DEFAULT_CACHE_NAME, "SELECT
'Test'").get(0).get(0));
+ }
+ }, POOL_SIZE - 1, "async-sql");
}
/** Verifies that user-defined query initiator ID is present in the
SQL_QUERY_HISTORY system view and logs. */
@@ -1125,6 +1127,14 @@ public class SqlDiagnosticIntegrationTest extends
AbstractBasicIntegrationTest {
return true;
}
+ /** */
+ @QuerySqlFunction
+ public static boolean countDownLatch() {
+ latch.countDown();
+
+ return true;
+ }
+
/** */
@QuerySqlFunction
public static boolean sleep(int sleep) {
@@ -1137,10 +1147,10 @@ public class SqlDiagnosticIntegrationTest extends
AbstractBasicIntegrationTest {
/** */
@QuerySqlFunction
- public static String innerSql(String ignite, String cache, String val)
{
+ public static String innerSql(String ignite, String cache, String sql)
{
return (String)Ignition.ignite(ignite)
.cache(cache)
- .query(new SqlFieldsQuery("SELECT ?").setArgs(val))
+ .query(new SqlFieldsQuery(sql))
.getAll().get(0).get(0);
}
}