This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 4afcfc0afb0 IGNITE-24751 SQL Calcite: Add thread pool starvation 
warning, add UDF query warning - Fixes #11980.
4afcfc0afb0 is described below

commit 4afcfc0afb0856a97405b021cb99f537b3169514
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Tue Apr 8 09:48:32 2025 +0300

    IGNITE-24751 SQL Calcite: Add thread pool starvation warning, add UDF query 
warning - Fixes #11980.
    
    Signed-off-by: Aleksey Plekhanov <[email protected]>
---
 .../query/calcite/CalciteQueryProcessor.java       |  29 +++++
 .../exec/task/AbstractQueryTaskExecutor.java       |   3 +
 .../exec/task/QueryBlockingTaskExecutor.java       |   4 +-
 .../exec/task/StripedQueryTaskExecutor.java        |   4 +-
 .../calcite/exec/rel/AbstractExecutionTest.java    |   4 +-
 .../exec/task/QueryBlockingTaskExecutorTest.java   |   3 +
 .../integration/SqlDiagnosticIntegrationTest.java  | 141 +++++++++++++++++++++
 .../org/apache/ignite/IgniteSystemProperties.java  |   2 +-
 .../org/apache/ignite/internal/IgniteKernal.java   |  88 -------------
 .../internal/processors/pool/PoolProcessor.java    | 117 +++++++++++++++++
 .../thread/IgniteStripedThreadPoolExecutor.java    |  18 ++-
 11 files changed, 318 insertions(+), 95 deletions(-)

diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
index 9d63235146f..3096977b106 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
@@ -82,6 +82,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.exec.QueryTaskExecuto
 import org.apache.ignite.internal.processors.query.calcite.exec.TimeoutService;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.TimeoutServiceImpl;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.RexExecutorImpl;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.task.AbstractQueryTaskExecutor;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.task.QueryBlockingTaskExecutor;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.task.StripedQueryTaskExecutor;
 import org.apache.ignite.internal.processors.query.calcite.hint.HintsConfig;
@@ -250,6 +251,9 @@ public class CalciteQueryProcessor extends 
GridProcessorAdapter implements Query
     /** */
     private final InjectResourcesService injectSvc;
 
+    /** */
+    private final AtomicBoolean udfQryWarned = new AtomicBoolean();
+
     /** */
     private volatile boolean started;
 
@@ -516,6 +520,8 @@ public class CalciteQueryProcessor extends 
GridProcessorAdapter implements Query
     ) throws IgniteSQLException {
         ensureTransactionModeSupported(qryCtx);
 
+        checkUdfQuery();
+
         SchemaPlus schema = schemaHolder.schema(schemaName);
 
         assert schema != null : "Schema not found: " + schemaName;
@@ -673,6 +679,29 @@ public class CalciteQueryProcessor extends 
GridProcessorAdapter implements Query
         
IgniteTxManager.ensureTransactionModeSupported(ctx.cache().context().tm().tx(ver).isolation());
     }
 
+    /** Checks that query is initiated by UDF and print message to log if 
needed. */
+    private void checkUdfQuery() {
+        if (udfQryWarned.get())
+            return;
+
+        if 
(Thread.currentThread().getName().startsWith(AbstractQueryTaskExecutor.THREAD_PREFIX)
+            && udfQryWarned.compareAndSet(false, true)) {
+            if (taskExecutor instanceof QueryBlockingTaskExecutor) {
+                log.info("Detected query initiated by user-defined function. " 
+
+                    "In some circumstances, this can lead to thread pool 
starvation and deadlock. Ensure that " +
+                    "the pool size is properly configured (property 
IgniteConfiguration.QueryThreadPoolSize). " +
+                    "The pool size should be greater than the maximum number 
of concurrent queries initiated by UDFs.");
+            }
+            else {
+                log.warning("Detected query initiated by user-defined 
function. " +
+                    "When a striped query task executor (the default 
configuration) is used, tasks for such queries " +
+                    "can be assigned to the same thread as that held by the 
initial query, which can lead to a " +
+                    "deadlock. To switch to a blocking tasks executor, set the 
following parameter: " +
+                    "-DIGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR=true.");
+            }
+        }
+    }
+
     /** */
     private <T> T processQuery(
         @Nullable QueryContext qryCtx,
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/AbstractQueryTaskExecutor.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/AbstractQueryTaskExecutor.java
index ffed0735085..74b195b7b2e 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/AbstractQueryTaskExecutor.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/AbstractQueryTaskExecutor.java
@@ -30,6 +30,9 @@ public abstract class AbstractQueryTaskExecutor extends 
AbstractService implemen
     /** */
     public static final String THREAD_POOL_NAME = "CalciteQueryExecutor";
 
+    /** */
+    public static final String THREAD_PREFIX = "calciteQry";
+
     /** */
     protected final GridKernalContext ctx;
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryBlockingTaskExecutor.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryBlockingTaskExecutor.java
index c19b3dad31a..c4feb74b8e9 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryBlockingTaskExecutor.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryBlockingTaskExecutor.java
@@ -58,7 +58,7 @@ public class QueryBlockingTaskExecutor extends 
AbstractQueryTaskExecutor {
         super.onStart(ctx);
 
         executor = new IgniteThreadPoolExecutor(
-            "calciteQry",
+            THREAD_PREFIX,
             ctx.igniteInstanceName(),
             ctx.config().getQueryThreadPoolSize(),
             ctx.config().getQueryThreadPoolSize(),
@@ -79,6 +79,8 @@ public class QueryBlockingTaskExecutor extends 
AbstractQueryTaskExecutor {
         executor.prestartAllCoreThreads();
 
         
executor.registerMetrics(ctx.metric().registry(metricName(THREAD_POOLS, 
THREAD_POOL_NAME)));
+
+        ctx.pools().addExecutorForStarvationDetection("calcite", executor);
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/StripedQueryTaskExecutor.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/StripedQueryTaskExecutor.java
index 213c7cbdda0..d1023f543f3 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/StripedQueryTaskExecutor.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/StripedQueryTaskExecutor.java
@@ -59,7 +59,7 @@ public class StripedQueryTaskExecutor extends 
AbstractQueryTaskExecutor {
         IgniteStripedThreadPoolExecutor executor = new 
IgniteStripedThreadPoolExecutor(
             ctx.config().getQueryThreadPoolSize(),
             ctx.igniteInstanceName(),
-            "calciteQry",
+            THREAD_PREFIX,
             this,
             false,
             0
@@ -68,6 +68,8 @@ public class StripedQueryTaskExecutor extends 
AbstractQueryTaskExecutor {
         stripedThreadPoolExecutor(executor);
 
         
executor.registerMetrics(ctx.metric().registry(metricName(THREAD_POOLS, 
THREAD_POOL_NAME)));
+
+        ctx.pools().addExecutorForStarvationDetection("calcite", executor);
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
index 03275e19036..46e4992816a 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
@@ -39,6 +39,7 @@ import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
+import org.apache.ignite.internal.processors.pool.PoolProcessor;
 import org.apache.ignite.internal.processors.query.calcite.QueryRegistryImpl;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.ArrayRowHandler;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.ExchangeService;
@@ -186,6 +187,7 @@ public class AbstractExecutionTest extends 
GridCommonAbstractTest {
             kernal.add(new GridTimeoutProcessor(kernal));
             kernal.add(new NoOpIgniteSecurityProcessor(kernal));
             kernal.add(new GridCacheProcessor(kernal));
+            kernal.add(new PoolProcessor(kernal));
 
             AbstractQueryTaskExecutor taskExecutor;
 
@@ -196,7 +198,7 @@ public class AbstractExecutionTest extends 
GridCommonAbstractTest {
                     execStgy,
                     kernal.config().getQueryThreadPoolSize(),
                     kernal.igniteInstanceName(),
-                    "calciteQry",
+                    AbstractQueryTaskExecutor.THREAD_PREFIX,
                     this::handle,
                     true,
                     DFLT_THREAD_KEEP_ALIVE_TIME
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryBlockingTaskExecutorTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryBlockingTaskExecutorTest.java
index 9f4a7a6c8a1..22d354b6180 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryBlockingTaskExecutorTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryBlockingTaskExecutorTest.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.pool.PoolProcessor;
 import 
org.apache.ignite.internal.processors.security.NoOpIgniteSecurityProcessor;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.GridTestKernalContext;
@@ -37,6 +38,7 @@ public class QueryBlockingTaskExecutorTest extends 
GridCommonAbstractTest {
     public void testConcurrentTasks() throws Exception {
         GridTestKernalContext ctx = newContext(new 
IgniteConfiguration().setQueryThreadPoolSize(10));
         ctx.add(new NoOpIgniteSecurityProcessor(ctx));
+        ctx.add(new PoolProcessor(ctx));
         QueryBlockingTaskExecutor executor = new 
QueryBlockingTaskExecutor(ctx);
         executor.onStart(ctx);
 
@@ -77,6 +79,7 @@ public class QueryBlockingTaskExecutorTest extends 
GridCommonAbstractTest {
     public void testSameQueryTasks() throws Exception {
         GridTestKernalContext ctx = newContext(new 
IgniteConfiguration().setQueryThreadPoolSize(10));
         ctx.add(new NoOpIgniteSecurityProcessor(ctx));
+        ctx.add(new PoolProcessor(ctx));
         QueryBlockingTaskExecutor executor = new 
QueryBlockingTaskExecutor(ctx);
         executor.onStart(ctx);
 
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
index 1c33edd3b06..141801edea5 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
@@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerArray;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.Ignition;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
@@ -50,6 +51,7 @@ import org.apache.ignite.configuration.SqlConfiguration;
 import org.apache.ignite.events.CacheQueryExecutedEvent;
 import org.apache.ignite.events.CacheQueryReadEvent;
 import org.apache.ignite.events.SqlQueryExecutionEvent;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
 import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
@@ -60,7 +62,11 @@ import 
org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.processors.query.calcite.Query;
 import org.apache.ignite.internal.processors.query.calcite.QueryRegistry;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.task.AbstractQueryTaskExecutor;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.task.QueryBlockingTaskExecutor;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.task.StripedQueryTaskExecutor;
+import 
org.apache.ignite.internal.processors.query.running.GridRunningQueryInfo;
 import org.apache.ignite.internal.processors.security.SecurityContext;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.metric.MetricRegistry;
@@ -68,8 +74,10 @@ import org.apache.ignite.spi.metric.LongMetric;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.ListeningTestLogger;
 import org.apache.ignite.testframework.LogListener;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
 import org.junit.Test;
 
+import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_STARVATION_CHECK_INTERVAL;
 import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
 import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
 import static org.apache.ignite.events.EventType.EVT_SQL_QUERY_EXECUTION;
@@ -82,6 +90,7 @@ import static 
org.apache.ignite.internal.processors.performancestatistics.Abstra
 import static 
org.apache.ignite.internal.processors.performancestatistics.AbstractPerformanceStatisticsTest.startCollectStatistics;
 import static 
org.apache.ignite.internal.processors.performancestatistics.AbstractPerformanceStatisticsTest.stopCollectStatisticsAndRead;
 import static 
org.apache.ignite.internal.processors.query.QueryParserMetricsHolder.QUERY_PARSER_METRIC_GROUP_NAME;
+import static 
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR;
 import static 
org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker.BIG_RESULT_SET_MSG;
 import static 
org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker.LONG_QUERY_ERROR_MSG;
 import static 
org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker.LONG_QUERY_EXEC_MSG;
@@ -101,6 +110,9 @@ public class SqlDiagnosticIntegrationTest extends 
AbstractBasicIntegrationTest {
     /** */
     private static final int BIG_RESULT_SET_THRESHOLD = 10_000;
 
+    /** */
+    private static final int POOL_SIZE = 2;
+
     /** */
     private ListeningTestLogger log;
 
@@ -112,6 +124,7 @@ public class SqlDiagnosticIntegrationTest extends 
AbstractBasicIntegrationTest {
         return super.getConfiguration(igniteInstanceName)
             .setGridLogger(log)
             .setAuthenticationEnabled(true)
+            .setQueryThreadPoolSize(POOL_SIZE)
             .setSqlConfiguration(new SqlConfiguration()
                 .setQueryEnginesConfiguration(new 
CalciteQueryEngineConfiguration())
                 .setLongQueryWarningTimeout(LONG_QRY_TIMEOUT))
@@ -793,6 +806,125 @@ public class SqlDiagnosticIntegrationTest extends 
AbstractBasicIntegrationTest {
         assertTrue(logLsnr2.check(1000L));
     }
 
+
+    /** */
+    @Test
+    @WithSystemProperty(key = IGNITE_STARVATION_CHECK_INTERVAL, value = "100")
+    public void testStarvationMessageStripedExecutor() throws Exception {
+        assertTrue(queryProcessor(grid(0)).taskExecutor() instanceof 
StripedQueryTaskExecutor);
+
+        checkStarvation();
+    }
+
+    /** */
+    @Test
+    @WithSystemProperty(key = IGNITE_STARVATION_CHECK_INTERVAL, value = "100")
+    @WithSystemProperty(key = IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR, 
value = "true")
+    public void testStarvationMessageBlockingExecutor() throws Exception {
+        assertTrue(queryProcessor(grid(0)).taskExecutor() instanceof 
QueryBlockingTaskExecutor);
+
+        checkStarvation();
+    }
+
+    /** */
+    private void checkStarvation() throws Exception {
+        client.getOrCreateCache(new CacheConfiguration<Integer, 
Integer>("func_cache")
+            .setSqlFunctionClasses(FunctionsLibrary.class)
+            .setSqlSchema("PUBLIC")
+        );
+
+        LogListener logLsnr = LogListener.matches("Possible thread pool 
starvation detected " +
+            "(no task completed in last 100ms, is calcite thread pool size 
large enough?)").build();
+
+        log.registerListener(logLsnr);
+
+        FunctionsLibrary.latch = new CountDownLatch(1);
+
+        GridCompoundFuture<List<List<?>>, ?> fut = new GridCompoundFuture<>();
+
+        for (int i = 0; i < POOL_SIZE + 1; i++)
+            fut.add(GridTestUtils.runAsync(() -> sql(grid(0), "SELECT 
waitLatch(10000)")));
+
+        fut.markInitialized();
+
+        assertTrue(logLsnr.check(10_000L));
+
+        FunctionsLibrary.latch.countDown();
+
+        fut.get();
+    }
+
+    /** */
+    @Test
+    public void testUdfQueryWarningStripedExecutor() throws Exception {
+        assertTrue(queryProcessor(grid(0)).taskExecutor() instanceof 
StripedQueryTaskExecutor);
+
+        
checkUdfQueryWarning("-DIGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR=true");
+    }
+
+    /** */
+    @Test
+    @WithSystemProperty(key = IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR, 
value = "true")
+    public void testUdfQueryWarningBlockingExecutor() throws Exception {
+        assertTrue(queryProcessor(grid(0)).taskExecutor() instanceof 
QueryBlockingTaskExecutor);
+
+        checkUdfQueryWarning("IgniteConfiguration.QueryThreadPoolSize");
+    }
+
+    /** */
+    private void checkUdfQueryWarning(String tipsMsg) throws Exception {
+        client.getOrCreateCache(new CacheConfiguration<Integer, 
Integer>(DEFAULT_CACHE_NAME)
+            .setSqlFunctionClasses(FunctionsLibrary.class)
+            .setSqlSchema("PUBLIC")
+        );
+
+        LogListener logLsnr1 = LogListener.matches("Detected query initiated 
by user-defined function.").build();
+        LogListener logLsnr2 = LogListener.matches(tipsMsg).build();
+
+        log.registerListener(logLsnr1);
+        log.registerListener(logLsnr2);
+
+        // Check that message is not printed for regular query.
+        sql(grid(0), "SELECT ?", "Test");
+
+        assertFalse(logLsnr1.check());
+        assertFalse(logLsnr2.check());
+
+        // Check that message is printed for UDF initiated query.
+        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() -> 
sql(grid(0), "SELECT innerSql(?, ?, ?)",
+            grid(0).name(), DEFAULT_CACHE_NAME, "Test"));
+
+        assertTrue(logLsnr1.check(1_000L));
+        assertTrue(logLsnr2.check());
+
+        cancelAllQueriesAndWaitForCompletion(grid(0), fut);
+
+        // Check that message is printed only once.
+        logLsnr1.reset();
+        logLsnr2.reset();
+
+        fut = GridTestUtils.runAsync(() -> sql(grid(0), "SELECT innerSql(?, ?, 
?)",
+            grid(0).name(), DEFAULT_CACHE_NAME, "Test"));
+
+        assertFalse(logLsnr1.check(1_000L));
+        assertFalse(logLsnr2.check());
+
+        cancelAllQueriesAndWaitForCompletion(grid(0), fut);
+    }
+
+    /** */
+    private void cancelAllQueriesAndWaitForCompletion(IgniteEx ignite, 
IgniteInternalFuture<?> qryFut) {
+        
ignite.context().query().runningQueryManager().runningSqlQueries().forEach(GridRunningQueryInfo::cancel);
+
+        try {
+            // Wait for future completion, it can be successful or 
unsuccessful.
+            qryFut.get();
+        }
+        catch (Exception ignore) {
+            // No-op.
+        }
+    }
+
     /** */
     public static class FunctionsLibrary {
         /** */
@@ -811,5 +943,14 @@ public class SqlDiagnosticIntegrationTest extends 
AbstractBasicIntegrationTest {
 
             return true;
         }
+
+        /** */
+        @QuerySqlFunction
+        public static String innerSql(String ignite, String cache, String val) 
{
+            return (String)Ignition.ignite(ignite)
+                .cache(cache)
+                .query(new SqlFieldsQuery("SELECT ?").setArgs(val))
+                .getAll().get(0).get(0);
+        }
     }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index e89dca81178..5dfb01f5ded 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -56,7 +56,6 @@ import static 
org.apache.ignite.cache.CacheManager.DFLT_JCACHE_DEFAULT_ISOLATED;
 import static 
org.apache.ignite.configuration.DataStorageConfiguration.DFLT_USE_ASYNC_FILE_IO_FACTORY;
 import static 
org.apache.ignite.internal.IgniteKernal.DFLT_LOG_CLASSPATH_CONTENT_ON_STARTUP;
 import static 
org.apache.ignite.internal.IgniteKernal.DFLT_LONG_OPERATIONS_DUMP_TIMEOUT;
-import static 
org.apache.ignite.internal.IgniteKernal.DFLT_PERIODIC_STARVATION_CHECK_FREQ;
 import static 
org.apache.ignite.internal.LongJVMPauseDetector.DEFAULT_JVM_PAUSE_DETECTOR_THRESHOLD;
 import static 
org.apache.ignite.internal.LongJVMPauseDetector.DFLT_JVM_PAUSE_DETECTOR_LAST_EVENTS_COUNT;
 import static 
org.apache.ignite.internal.LongJVMPauseDetector.DFLT_JVM_PAUSE_DETECTOR_PRECISION;
@@ -133,6 +132,7 @@ import static 
org.apache.ignite.internal.processors.performancestatistics.FilePe
 import static 
org.apache.ignite.internal.processors.performancestatistics.FilePerformanceStatisticsWriter.DFLT_FILE_MAX_SIZE;
 import static 
org.apache.ignite.internal.processors.performancestatistics.FilePerformanceStatisticsWriter.DFLT_FLUSH_SIZE;
 import static 
org.apache.ignite.internal.processors.platform.client.ClientRequestHandler.DFLT_ASYNC_REQUEST_WAIT_TIMEOUT_MILLIS;
+import static 
org.apache.ignite.internal.processors.pool.PoolProcessor.DFLT_PERIODIC_STARVATION_CHECK_FREQ;
 import static 
org.apache.ignite.internal.processors.query.QueryUtils.DFLT_INDEXING_DISCOVERY_HISTORY_SIZE;
 import static 
org.apache.ignite.internal.processors.query.schema.SchemaIndexCachePartitionWorker.DFLT_IGNITE_INDEX_REBUILD_BATCH_SIZE;
 import static 
org.apache.ignite.internal.processors.rest.GridRestProcessor.DFLT_SES_TIMEOUT;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 92d8c3b587d..ca27892e20d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -191,7 +191,6 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.SB;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -225,7 +224,6 @@ import static java.util.Optional.ofNullable;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
-import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_STARVATION_CHECK_INTERVAL;
 import static org.apache.ignite.IgniteSystemProperties.getBoolean;
 import static org.apache.ignite.internal.GridKernalState.DISCONNECTED;
 import static org.apache.ignite.internal.GridKernalState.STARTED;
@@ -362,14 +360,6 @@ public class IgniteKernal implements IgniteEx, 
Externalizable {
     /** Description of the configuration system view. */
     public static final String CFG_VIEW_DESC = "Node configuration";
 
-    /**
-     * Default interval of checking thread pool state for the starvation. Will 
be used only if the
-     * {@link IgniteSystemProperties#IGNITE_STARVATION_CHECK_INTERVAL} system 
property is not set.
-     * <p>
-     * Value is {@code 30 sec}.
-     */
-    public static final long DFLT_PERIODIC_STARVATION_CHECK_FREQ = 1000 * 30;
-
     /** Object is used to force completion the previous reconnection attempt. 
See {@link ReconnectState} for details. */
     private static final Object STOP_RECONNECT = new Object();
 
@@ -421,13 +411,6 @@ public class IgniteKernal implements IgniteEx, 
Externalizable {
     /** Spring context, potentially {@code null}. */
     private GridSpringResourceContext rsrcCtx;
 
-    /**
-     * The instance of scheduled thread pool starvation checker. {@code null} 
if starvation checks have been
-     * disabled by the value of {@link 
IgniteSystemProperties#IGNITE_STARVATION_CHECK_INTERVAL} system property.
-     */
-    @GridToStringExclude
-    private GridTimeoutProcessor.CancelableTask starveTask;
-
     /**
      * The instance of scheduled metrics logger. {@code null} means that the 
metrics loggin have been disabled
      * by configuration. See {@link 
IgniteConfiguration#getMetricsLogFrequency()} for details.
@@ -1295,74 +1278,6 @@ public class IgniteKernal implements IgniteEx, 
Externalizable {
         // Mark start timestamp.
         startTime = U.currentTimeMillis();
 
-        String intervalStr = 
IgniteSystemProperties.getString(IGNITE_STARVATION_CHECK_INTERVAL);
-
-        // Start starvation checker if enabled.
-        boolean starveCheck = !"0".equals(intervalStr);
-
-        if (starveCheck) {
-            final long interval = F.isEmpty(intervalStr) ? 
DFLT_PERIODIC_STARVATION_CHECK_FREQ : Long.parseLong(intervalStr);
-
-            starveTask = ctx.timeout().schedule(new Runnable() {
-                /** Last completed task count. */
-                private long lastCompletedCntPub;
-
-                /** Last completed task count. */
-                private long lastCompletedCntSys;
-
-                /** Last completed task count. */
-                private long lastCompletedCntQry;
-
-                @Override public void run() {
-                    if (ctx.pools().getExecutorService() instanceof 
ThreadPoolExecutor) {
-                        ThreadPoolExecutor exec = 
(ThreadPoolExecutor)ctx.pools().getExecutorService();
-
-                        lastCompletedCntPub = checkPoolStarvation(exec, 
lastCompletedCntPub, "public");
-                    }
-
-                    if (ctx.pools().getSystemExecutorService() instanceof 
ThreadPoolExecutor) {
-                        ThreadPoolExecutor exec = 
(ThreadPoolExecutor)ctx.pools().getSystemExecutorService();
-
-                        lastCompletedCntSys = checkPoolStarvation(exec, 
lastCompletedCntSys, "system");
-                    }
-
-                    if (ctx.pools().getQueryExecutorService() instanceof 
ThreadPoolExecutor) {
-                        ThreadPoolExecutor exec = 
(ThreadPoolExecutor)ctx.pools().getQueryExecutorService();
-
-                        lastCompletedCntQry = checkPoolStarvation(exec, 
lastCompletedCntQry, "query");
-                    }
-
-                    if (ctx.pools().getStripedExecutorService() != null)
-                        
ctx.pools().getStripedExecutorService().detectStarvation();
-                }
-
-                /**
-                 * @param exec Thread pool executor to check.
-                 * @param lastCompletedCnt Last completed tasks count.
-                 * @param pool Pool name for message.
-                 * @return Current completed tasks count.
-                 */
-                private long checkPoolStarvation(
-                    ThreadPoolExecutor exec,
-                    long lastCompletedCnt,
-                    String pool
-                ) {
-                    long completedCnt = exec.getCompletedTaskCount();
-
-                    // If all threads are active and no task has completed 
since last time and there is
-                    // at least one waiting request, then it is possible 
starvation.
-                    if (exec.getPoolSize() == exec.getActiveCount() && 
completedCnt == lastCompletedCnt &&
-                        !exec.getQueue().isEmpty())
-                        LT.warn(
-                            log,
-                            "Possible thread pool starvation detected (no task 
completed in last " +
-                                interval + "ms, is " + pool + " thread pool 
size large enough?)");
-
-                    return completedCnt;
-                }
-            }, interval, interval);
-        }
-
         Ignite g = this;
         long metricsLogFreq = cfg.getMetricsLogFrequency();
 
@@ -1861,9 +1776,6 @@ public class IgniteKernal implements IgniteEx, 
Externalizable {
                 }
             }
 
-            if (starveTask != null)
-                starveTask.close();
-
             if (metricsLogTask != null)
                 metricsLogTask.close();
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
index 704067ca228..9073f8e89c9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ThreadPoolExecutor;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.configuration.ExecutorConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.failure.FailureContext;
@@ -47,10 +48,12 @@ import 
org.apache.ignite.internal.processors.security.thread.SecurityAwareIoPool
 import 
org.apache.ignite.internal.processors.security.thread.SecurityAwareStripedExecutor;
 import 
org.apache.ignite.internal.processors.security.thread.SecurityAwareStripedThreadPoolExecutor;
 import 
org.apache.ignite.internal.processors.security.thread.SecurityAwareThreadPoolExecutor;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.StripedExecutor;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorkerListener;
 import org.apache.ignite.internal.worker.WorkersRegistry;
@@ -63,6 +66,7 @@ import org.apache.ignite.thread.IgniteThreadPoolExecutor;
 import org.apache.ignite.thread.SameThreadExecutor;
 import org.jetbrains.annotations.Nullable;
 
+import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_STARVATION_CHECK_INTERVAL;
 import static 
org.apache.ignite.configuration.IgniteConfiguration.DFLT_THREAD_KEEP_ALIVE_TIME;
 import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
 import static 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNAPSHOT_RUNNER_THREAD_PREFIX;
@@ -139,6 +143,14 @@ public class PoolProcessor extends GridProcessorAdapter {
     /** Histogram buckets for the task execution time metric (in 
milliseconds). */
     public static final long[] TASK_EXEC_TIME_HISTOGRAM_BUCKETS = new long[] 
{10, 50, 100, 500, 1000};
 
+    /**
+     * Default interval of checking thread pool state for the starvation. Will 
be used only if the
+     * {@link IgniteSystemProperties#IGNITE_STARVATION_CHECK_INTERVAL} system 
property is not set.
+     * <p>
+     * Value is {@code 30 sec}.
+     */
+    public static final long DFLT_PERIODIC_STARVATION_CHECK_FREQ = 30 * 1000L;
+
     /** Executor service. */
     @GridToStringExclude
     private ThreadPoolExecutor execSvc;
@@ -224,6 +236,16 @@ public class PoolProcessor extends GridProcessorAdapter {
     /** Custom named pools. */
     private Map<String, ThreadPoolExecutor> customExecs;
 
+    /** Pools to check for starvation. */
+    private Map<String, ExecutorService> starvationExecs = new HashMap<>();
+
+    /**
+     * The instance of scheduled thread pool starvation checker. {@code null} 
if starvation checks have been
+     * disabled by the value of {@link 
IgniteSystemProperties#IGNITE_STARVATION_CHECK_INTERVAL} system property.
+     */
+    @GridToStringExclude
+    private GridTimeoutProcessor.CancelableTask starveTask;
+
     /**
      * Constructor.
      *
@@ -292,6 +314,8 @@ public class PoolProcessor extends GridProcessorAdapter {
 
         execSvc.allowCoreThreadTimeOut(true);
 
+        addExecutorForStarvationDetection("public", execSvc);
+
         validateThreadPoolSize(cfg.getServiceThreadPoolSize(), "service");
 
         svcExecSvc = createExecutorService(
@@ -320,6 +344,8 @@ public class PoolProcessor extends GridProcessorAdapter {
 
         sysExecSvc.allowCoreThreadTimeOut(true);
 
+        addExecutorForStarvationDetection("system", sysExecSvc);
+
         validateThreadPoolSize(cfg.getStripedPoolSize(), "stripedPool");
 
         WorkersRegistry workerRegistry = ctx.workersRegistry();
@@ -338,6 +364,8 @@ public class PoolProcessor extends GridProcessorAdapter {
             workerRegistry,
             cfg.getFailureDetectionTimeout());
 
+        addExecutorForStarvationDetection("striped", stripedExecSvc);
+
         // Note that since we use 'LinkedBlockingQueue', number of
         // maximum threads has no effect.
         // Note, that we do not pre-start threads here as management pool may
@@ -490,6 +518,8 @@ public class PoolProcessor extends GridProcessorAdapter {
 
         qryExecSvc.allowCoreThreadTimeOut(true);
 
+        addExecutorForStarvationDetection("query", qryExecSvc);
+
         schemaExecSvc = createExecutorService(
             "schema",
             cfg.getIgniteInstanceName(),
@@ -641,6 +671,81 @@ public class PoolProcessor extends GridProcessorAdapter {
             StripedExecutorTaskView::new);
     }
 
+    /** {@inheritDoc} */
+    @Override public void onKernalStart(boolean active) throws 
IgniteCheckedException {
+        super.onKernalStart(active);
+
+        String intervalStr = 
IgniteSystemProperties.getString(IGNITE_STARVATION_CHECK_INTERVAL);
+
+        // Start starvation checker if enabled.
+        if ("0".equals(intervalStr))
+            return;
+
+        final long interval = F.isEmpty(intervalStr) ? 
DFLT_PERIODIC_STARVATION_CHECK_FREQ : Long.parseLong(intervalStr);
+
+        starveTask = ctx.timeout().schedule(new Runnable() {
+            /** Last completed task count by pool name. */
+            private final Map<String, Long> lastCompleted = new HashMap<>();
+
+            @Override public void run() {
+                Map<String, ExecutorService> execs = starvationExecs;
+
+                if (execs == null)
+                    return;
+
+                for (Map.Entry<String, ExecutorService> entry : 
execs.entrySet()) {
+                    String name = entry.getKey();
+                    ExecutorService exec = entry.getValue();
+
+                    if (exec instanceof ThreadPoolExecutor) {
+                        ThreadPoolExecutor exec0 = (ThreadPoolExecutor)exec;
+
+                        checkPoolStarvation(name, 
exec0.getCompletedTaskCount(), exec0.getPoolSize(),
+                            exec0.getActiveCount(), 
exec0.getQueue().isEmpty());
+                    }
+                    if (exec instanceof IgniteStripedThreadPoolExecutor) {
+                        IgniteStripedThreadPoolExecutor exec0 = 
(IgniteStripedThreadPoolExecutor)exec;
+
+                        checkPoolStarvation(name, exec0.completedTaskCount(), 
exec0.poolSize(),
+                            exec0.activeCount(), exec0.queueEmpty());
+                    }
+                    else if (exec instanceof StripedExecutor)
+                        ((StripedExecutor)exec).detectStarvation();
+                }
+            }
+
+            /** */
+            private void checkPoolStarvation(
+                String pool,
+                long completedCnt,
+                int poolSize,
+                int activeCnt,
+                boolean queueEmpty
+            ) {
+                long lastCompletedCnt = lastCompleted.getOrDefault(pool, 0L);
+
+                // If all threads are active and no task has completed since 
last time and there is
+                // at least one waiting request, then it is possible 
starvation.
+                if (poolSize == activeCnt && completedCnt == lastCompletedCnt 
&& !queueEmpty) {
+                    LT.warn(log, "Possible thread pool starvation detected (no 
task completed in last " +
+                        interval + "ms, is " + pool + " thread pool size large 
enough?)");
+                }
+
+                if (completedCnt != lastCompletedCnt)
+                    lastCompleted.put(pool, completedCnt);
+            }
+        }, interval, interval);
+    }
+
+
+    /** {@inheritDoc} */
+    @Override public void onKernalStop(boolean cancel) {
+        super.onKernalStop(cancel);
+
+        if (starveTask != null)
+            starveTask.close();
+    }
+
     /**
      * Get executor service for policy.
      *
@@ -923,6 +1028,16 @@ public class PoolProcessor extends GridProcessorAdapter {
         return reencryptExecSvc;
     }
 
+    /**
+     * Add pool to check for starvation.
+     *
+     * @param name Executor name.
+     * @param execSvc Executor service.
+     */
+    public void addExecutorForStarvationDetection(String name, ExecutorService 
execSvc) {
+        starvationExecs.put(name, execSvc);
+    }
+
     /**
      * Creates a {@link MetricRegistry} for an executor.
      *
@@ -1050,6 +1165,8 @@ public class PoolProcessor extends GridProcessorAdapter {
 
             customExecs = null;
         }
+
+        starvationExecs = null;
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
 
b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
index 215843c8795..9509e9d87b0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
@@ -273,6 +273,18 @@ public class IgniteStripedThreadPoolExecutor implements 
ExecutorService, Metrics
         return queueSize;
     }
 
+    /**
+     * @return {@code True} if task queue is empty..
+     */
+    public boolean queueEmpty() {
+        for (IgniteThreadPoolExecutor exec : execs) {
+            if (!exec.getQueue().isEmpty())
+                return false;
+        }
+
+        return true;
+    }
+
     /**
      * @return Approximate total number of tasks that have ever been scheduled 
for execution. Because the states of
      * tasks and threads may change dynamically during computation, the 
returned value is only an approximation.
@@ -291,7 +303,7 @@ public class IgniteStripedThreadPoolExecutor implements 
ExecutorService, Metrics
      * may change dynamically during computation, the returned value is only 
an approximation, but one that does not
      * ever decrease across successive calls.
      */
-    private long completedTaskCount() {
+    public long completedTaskCount() {
         long completedTaskCnt = 0;
 
         for (IgniteThreadPoolExecutor exec : execs)
@@ -303,7 +315,7 @@ public class IgniteStripedThreadPoolExecutor implements 
ExecutorService, Metrics
     /**
      * @return Approximate number of threads that are actively executing tasks.
      */
-    private int activeCount() {
+    public int activeCount() {
         int activeCnt = 0;
 
         for (IgniteThreadPoolExecutor exec : execs)
@@ -315,7 +327,7 @@ public class IgniteStripedThreadPoolExecutor implements 
ExecutorService, Metrics
     /**
      * @return current number of threads in the pool.
      */
-    private int poolSize() {
+    public int poolSize() {
         int poolSize = 0;
 
         for (IgniteThreadPoolExecutor exec : execs)


Reply via email to