This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 4afcfc0afb0 IGNITE-24751 SQL Calcite: Add thread pool starvation
warning, add UDF query warning - Fixes #11980.
4afcfc0afb0 is described below
commit 4afcfc0afb0856a97405b021cb99f537b3169514
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Tue Apr 8 09:48:32 2025 +0300
IGNITE-24751 SQL Calcite: Add thread pool starvation warning, add UDF query
warning - Fixes #11980.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../query/calcite/CalciteQueryProcessor.java | 29 +++++
.../exec/task/AbstractQueryTaskExecutor.java | 3 +
.../exec/task/QueryBlockingTaskExecutor.java | 4 +-
.../exec/task/StripedQueryTaskExecutor.java | 4 +-
.../calcite/exec/rel/AbstractExecutionTest.java | 4 +-
.../exec/task/QueryBlockingTaskExecutorTest.java | 3 +
.../integration/SqlDiagnosticIntegrationTest.java | 141 +++++++++++++++++++++
.../org/apache/ignite/IgniteSystemProperties.java | 2 +-
.../org/apache/ignite/internal/IgniteKernal.java | 88 -------------
.../internal/processors/pool/PoolProcessor.java | 117 +++++++++++++++++
.../thread/IgniteStripedThreadPoolExecutor.java | 18 ++-
11 files changed, 318 insertions(+), 95 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
index 9d63235146f..3096977b106 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
@@ -82,6 +82,7 @@ import
org.apache.ignite.internal.processors.query.calcite.exec.QueryTaskExecuto
import org.apache.ignite.internal.processors.query.calcite.exec.TimeoutService;
import
org.apache.ignite.internal.processors.query.calcite.exec.TimeoutServiceImpl;
import
org.apache.ignite.internal.processors.query.calcite.exec.exp.RexExecutorImpl;
+import
org.apache.ignite.internal.processors.query.calcite.exec.task.AbstractQueryTaskExecutor;
import
org.apache.ignite.internal.processors.query.calcite.exec.task.QueryBlockingTaskExecutor;
import
org.apache.ignite.internal.processors.query.calcite.exec.task.StripedQueryTaskExecutor;
import org.apache.ignite.internal.processors.query.calcite.hint.HintsConfig;
@@ -250,6 +251,9 @@ public class CalciteQueryProcessor extends
GridProcessorAdapter implements Query
/** */
private final InjectResourcesService injectSvc;
+ /** */
+ private final AtomicBoolean udfQryWarned = new AtomicBoolean();
+
/** */
private volatile boolean started;
@@ -516,6 +520,8 @@ public class CalciteQueryProcessor extends
GridProcessorAdapter implements Query
) throws IgniteSQLException {
ensureTransactionModeSupported(qryCtx);
+ checkUdfQuery();
+
SchemaPlus schema = schemaHolder.schema(schemaName);
assert schema != null : "Schema not found: " + schemaName;
@@ -673,6 +679,29 @@ public class CalciteQueryProcessor extends
GridProcessorAdapter implements Query
IgniteTxManager.ensureTransactionModeSupported(ctx.cache().context().tm().tx(ver).isolation());
}
+ /** Checks that query is initiated by UDF and print message to log if
needed. */
+ private void checkUdfQuery() {
+ if (udfQryWarned.get())
+ return;
+
+ if
(Thread.currentThread().getName().startsWith(AbstractQueryTaskExecutor.THREAD_PREFIX)
+ && udfQryWarned.compareAndSet(false, true)) {
+ if (taskExecutor instanceof QueryBlockingTaskExecutor) {
+ log.info("Detected query initiated by user-defined function. "
+
+ "In some circumstances, this can lead to thread pool
starvation and deadlock. Ensure that " +
+ "the pool size is properly configured (property
IgniteConfiguration.QueryThreadPoolSize). " +
+ "The pool size should be greater than the maximum number
of concurrent queries initiated by UDFs.");
+ }
+ else {
+ log.warning("Detected query initiated by user-defined
function. " +
+ "When a striped query task executor (the default
configuration) is used, tasks for such queries " +
+ "can be assigned to the same thread as that held by the
initial query, which can lead to a " +
+ "deadlock. To switch to a blocking tasks executor, set the
following parameter: " +
+ "-DIGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR=true.");
+ }
+ }
+ }
+
/** */
private <T> T processQuery(
@Nullable QueryContext qryCtx,
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/AbstractQueryTaskExecutor.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/AbstractQueryTaskExecutor.java
index ffed0735085..74b195b7b2e 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/AbstractQueryTaskExecutor.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/AbstractQueryTaskExecutor.java
@@ -30,6 +30,9 @@ public abstract class AbstractQueryTaskExecutor extends
AbstractService implemen
/** */
public static final String THREAD_POOL_NAME = "CalciteQueryExecutor";
+ /** */
+ public static final String THREAD_PREFIX = "calciteQry";
+
/** */
protected final GridKernalContext ctx;
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryBlockingTaskExecutor.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryBlockingTaskExecutor.java
index c19b3dad31a..c4feb74b8e9 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryBlockingTaskExecutor.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryBlockingTaskExecutor.java
@@ -58,7 +58,7 @@ public class QueryBlockingTaskExecutor extends
AbstractQueryTaskExecutor {
super.onStart(ctx);
executor = new IgniteThreadPoolExecutor(
- "calciteQry",
+ THREAD_PREFIX,
ctx.igniteInstanceName(),
ctx.config().getQueryThreadPoolSize(),
ctx.config().getQueryThreadPoolSize(),
@@ -79,6 +79,8 @@ public class QueryBlockingTaskExecutor extends
AbstractQueryTaskExecutor {
executor.prestartAllCoreThreads();
executor.registerMetrics(ctx.metric().registry(metricName(THREAD_POOLS,
THREAD_POOL_NAME)));
+
+ ctx.pools().addExecutorForStarvationDetection("calcite", executor);
}
/** {@inheritDoc} */
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/StripedQueryTaskExecutor.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/StripedQueryTaskExecutor.java
index 213c7cbdda0..d1023f543f3 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/StripedQueryTaskExecutor.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/task/StripedQueryTaskExecutor.java
@@ -59,7 +59,7 @@ public class StripedQueryTaskExecutor extends
AbstractQueryTaskExecutor {
IgniteStripedThreadPoolExecutor executor = new
IgniteStripedThreadPoolExecutor(
ctx.config().getQueryThreadPoolSize(),
ctx.igniteInstanceName(),
- "calciteQry",
+ THREAD_PREFIX,
this,
false,
0
@@ -68,6 +68,8 @@ public class StripedQueryTaskExecutor extends
AbstractQueryTaskExecutor {
stripedThreadPoolExecutor(executor);
executor.registerMetrics(ctx.metric().registry(metricName(THREAD_POOLS,
THREAD_POOL_NAME)));
+
+ ctx.pools().addExecutorForStarvationDetection("calcite", executor);
}
/** {@inheritDoc} */
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
index 03275e19036..46e4992816a 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
@@ -39,6 +39,7 @@ import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
+import org.apache.ignite.internal.processors.pool.PoolProcessor;
import org.apache.ignite.internal.processors.query.calcite.QueryRegistryImpl;
import
org.apache.ignite.internal.processors.query.calcite.exec.ArrayRowHandler;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExchangeService;
@@ -186,6 +187,7 @@ public class AbstractExecutionTest extends
GridCommonAbstractTest {
kernal.add(new GridTimeoutProcessor(kernal));
kernal.add(new NoOpIgniteSecurityProcessor(kernal));
kernal.add(new GridCacheProcessor(kernal));
+ kernal.add(new PoolProcessor(kernal));
AbstractQueryTaskExecutor taskExecutor;
@@ -196,7 +198,7 @@ public class AbstractExecutionTest extends
GridCommonAbstractTest {
execStgy,
kernal.config().getQueryThreadPoolSize(),
kernal.igniteInstanceName(),
- "calciteQry",
+ AbstractQueryTaskExecutor.THREAD_PREFIX,
this::handle,
true,
DFLT_THREAD_KEEP_ALIVE_TIME
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryBlockingTaskExecutorTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryBlockingTaskExecutorTest.java
index 9f4a7a6c8a1..22d354b6180 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryBlockingTaskExecutorTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/task/QueryBlockingTaskExecutorTest.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.pool.PoolProcessor;
import
org.apache.ignite.internal.processors.security.NoOpIgniteSecurityProcessor;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.GridTestKernalContext;
@@ -37,6 +38,7 @@ public class QueryBlockingTaskExecutorTest extends
GridCommonAbstractTest {
public void testConcurrentTasks() throws Exception {
GridTestKernalContext ctx = newContext(new
IgniteConfiguration().setQueryThreadPoolSize(10));
ctx.add(new NoOpIgniteSecurityProcessor(ctx));
+ ctx.add(new PoolProcessor(ctx));
QueryBlockingTaskExecutor executor = new
QueryBlockingTaskExecutor(ctx);
executor.onStart(ctx);
@@ -77,6 +79,7 @@ public class QueryBlockingTaskExecutorTest extends
GridCommonAbstractTest {
public void testSameQueryTasks() throws Exception {
GridTestKernalContext ctx = newContext(new
IgniteConfiguration().setQueryThreadPoolSize(10));
ctx.add(new NoOpIgniteSecurityProcessor(ctx));
+ ctx.add(new PoolProcessor(ctx));
QueryBlockingTaskExecutor executor = new
QueryBlockingTaskExecutor(ctx);
executor.onStart(ctx);
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
index 1c33edd3b06..141801edea5 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
@@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.Ignition;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
@@ -50,6 +51,7 @@ import org.apache.ignite.configuration.SqlConfiguration;
import org.apache.ignite.events.CacheQueryExecutedEvent;
import org.apache.ignite.events.CacheQueryReadEvent;
import org.apache.ignite.events.SqlQueryExecutionEvent;
+import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
@@ -60,7 +62,11 @@ import
org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.calcite.Query;
import org.apache.ignite.internal.processors.query.calcite.QueryRegistry;
import
org.apache.ignite.internal.processors.query.calcite.exec.task.AbstractQueryTaskExecutor;
+import
org.apache.ignite.internal.processors.query.calcite.exec.task.QueryBlockingTaskExecutor;
+import
org.apache.ignite.internal.processors.query.calcite.exec.task.StripedQueryTaskExecutor;
+import
org.apache.ignite.internal.processors.query.running.GridRunningQueryInfo;
import org.apache.ignite.internal.processors.security.SecurityContext;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.metric.MetricRegistry;
@@ -68,8 +74,10 @@ import org.apache.ignite.spi.metric.LongMetric;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.LogListener;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.junit.Test;
+import static
org.apache.ignite.IgniteSystemProperties.IGNITE_STARVATION_CHECK_INTERVAL;
import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
import static org.apache.ignite.events.EventType.EVT_SQL_QUERY_EXECUTION;
@@ -82,6 +90,7 @@ import static
org.apache.ignite.internal.processors.performancestatistics.Abstra
import static
org.apache.ignite.internal.processors.performancestatistics.AbstractPerformanceStatisticsTest.startCollectStatistics;
import static
org.apache.ignite.internal.processors.performancestatistics.AbstractPerformanceStatisticsTest.stopCollectStatisticsAndRead;
import static
org.apache.ignite.internal.processors.query.QueryParserMetricsHolder.QUERY_PARSER_METRIC_GROUP_NAME;
+import static
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR;
import static
org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker.BIG_RESULT_SET_MSG;
import static
org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker.LONG_QUERY_ERROR_MSG;
import static
org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker.LONG_QUERY_EXEC_MSG;
@@ -101,6 +110,9 @@ public class SqlDiagnosticIntegrationTest extends
AbstractBasicIntegrationTest {
/** */
private static final int BIG_RESULT_SET_THRESHOLD = 10_000;
+ /** */
+ private static final int POOL_SIZE = 2;
+
/** */
private ListeningTestLogger log;
@@ -112,6 +124,7 @@ public class SqlDiagnosticIntegrationTest extends
AbstractBasicIntegrationTest {
return super.getConfiguration(igniteInstanceName)
.setGridLogger(log)
.setAuthenticationEnabled(true)
+ .setQueryThreadPoolSize(POOL_SIZE)
.setSqlConfiguration(new SqlConfiguration()
.setQueryEnginesConfiguration(new
CalciteQueryEngineConfiguration())
.setLongQueryWarningTimeout(LONG_QRY_TIMEOUT))
@@ -793,6 +806,125 @@ public class SqlDiagnosticIntegrationTest extends
AbstractBasicIntegrationTest {
assertTrue(logLsnr2.check(1000L));
}
+
+ /** */
+ @Test
+ @WithSystemProperty(key = IGNITE_STARVATION_CHECK_INTERVAL, value = "100")
+ public void testStarvationMessageStripedExecutor() throws Exception {
+ assertTrue(queryProcessor(grid(0)).taskExecutor() instanceof
StripedQueryTaskExecutor);
+
+ checkStarvation();
+ }
+
+ /** */
+ @Test
+ @WithSystemProperty(key = IGNITE_STARVATION_CHECK_INTERVAL, value = "100")
+ @WithSystemProperty(key = IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR,
value = "true")
+ public void testStarvationMessageBlockingExecutor() throws Exception {
+ assertTrue(queryProcessor(grid(0)).taskExecutor() instanceof
QueryBlockingTaskExecutor);
+
+ checkStarvation();
+ }
+
+ /** */
+ private void checkStarvation() throws Exception {
+ client.getOrCreateCache(new CacheConfiguration<Integer,
Integer>("func_cache")
+ .setSqlFunctionClasses(FunctionsLibrary.class)
+ .setSqlSchema("PUBLIC")
+ );
+
+ LogListener logLsnr = LogListener.matches("Possible thread pool
starvation detected " +
+ "(no task completed in last 100ms, is calcite thread pool size
large enough?)").build();
+
+ log.registerListener(logLsnr);
+
+ FunctionsLibrary.latch = new CountDownLatch(1);
+
+ GridCompoundFuture<List<List<?>>, ?> fut = new GridCompoundFuture<>();
+
+ for (int i = 0; i < POOL_SIZE + 1; i++)
+ fut.add(GridTestUtils.runAsync(() -> sql(grid(0), "SELECT
waitLatch(10000)")));
+
+ fut.markInitialized();
+
+ assertTrue(logLsnr.check(10_000L));
+
+ FunctionsLibrary.latch.countDown();
+
+ fut.get();
+ }
+
+ /** */
+ @Test
+ public void testUdfQueryWarningStripedExecutor() throws Exception {
+ assertTrue(queryProcessor(grid(0)).taskExecutor() instanceof
StripedQueryTaskExecutor);
+
+
checkUdfQueryWarning("-DIGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR=true");
+ }
+
+ /** */
+ @Test
+ @WithSystemProperty(key = IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR,
value = "true")
+ public void testUdfQueryWarningBlockingExecutor() throws Exception {
+ assertTrue(queryProcessor(grid(0)).taskExecutor() instanceof
QueryBlockingTaskExecutor);
+
+ checkUdfQueryWarning("IgniteConfiguration.QueryThreadPoolSize");
+ }
+
+ /** */
+ private void checkUdfQueryWarning(String tipsMsg) throws Exception {
+ client.getOrCreateCache(new CacheConfiguration<Integer,
Integer>(DEFAULT_CACHE_NAME)
+ .setSqlFunctionClasses(FunctionsLibrary.class)
+ .setSqlSchema("PUBLIC")
+ );
+
+ LogListener logLsnr1 = LogListener.matches("Detected query initiated
by user-defined function.").build();
+ LogListener logLsnr2 = LogListener.matches(tipsMsg).build();
+
+ log.registerListener(logLsnr1);
+ log.registerListener(logLsnr2);
+
+ // Check that message is not printed for regular query.
+ sql(grid(0), "SELECT ?", "Test");
+
+ assertFalse(logLsnr1.check());
+ assertFalse(logLsnr2.check());
+
+ // Check that message is printed for UDF initiated query.
+ IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() ->
sql(grid(0), "SELECT innerSql(?, ?, ?)",
+ grid(0).name(), DEFAULT_CACHE_NAME, "Test"));
+
+ assertTrue(logLsnr1.check(1_000L));
+ assertTrue(logLsnr2.check());
+
+ cancelAllQueriesAndWaitForCompletion(grid(0), fut);
+
+ // Check that message is printed only once.
+ logLsnr1.reset();
+ logLsnr2.reset();
+
+ fut = GridTestUtils.runAsync(() -> sql(grid(0), "SELECT innerSql(?, ?,
?)",
+ grid(0).name(), DEFAULT_CACHE_NAME, "Test"));
+
+ assertFalse(logLsnr1.check(1_000L));
+ assertFalse(logLsnr2.check());
+
+ cancelAllQueriesAndWaitForCompletion(grid(0), fut);
+ }
+
+ /** */
+ private void cancelAllQueriesAndWaitForCompletion(IgniteEx ignite,
IgniteInternalFuture<?> qryFut) {
+
ignite.context().query().runningQueryManager().runningSqlQueries().forEach(GridRunningQueryInfo::cancel);
+
+ try {
+ // Wait for future completion, it can be successful or
unsuccessful.
+ qryFut.get();
+ }
+ catch (Exception ignore) {
+ // No-op.
+ }
+ }
+
/** */
public static class FunctionsLibrary {
/** */
@@ -811,5 +943,14 @@ public class SqlDiagnosticIntegrationTest extends
AbstractBasicIntegrationTest {
return true;
}
+
+ /** */
+ @QuerySqlFunction
+ public static String innerSql(String ignite, String cache, String val)
{
+ return (String)Ignition.ignite(ignite)
+ .cache(cache)
+ .query(new SqlFieldsQuery("SELECT ?").setArgs(val))
+ .getAll().get(0).get(0);
+ }
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index e89dca81178..5dfb01f5ded 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -56,7 +56,6 @@ import static
org.apache.ignite.cache.CacheManager.DFLT_JCACHE_DEFAULT_ISOLATED;
import static
org.apache.ignite.configuration.DataStorageConfiguration.DFLT_USE_ASYNC_FILE_IO_FACTORY;
import static
org.apache.ignite.internal.IgniteKernal.DFLT_LOG_CLASSPATH_CONTENT_ON_STARTUP;
import static
org.apache.ignite.internal.IgniteKernal.DFLT_LONG_OPERATIONS_DUMP_TIMEOUT;
-import static
org.apache.ignite.internal.IgniteKernal.DFLT_PERIODIC_STARVATION_CHECK_FREQ;
import static
org.apache.ignite.internal.LongJVMPauseDetector.DEFAULT_JVM_PAUSE_DETECTOR_THRESHOLD;
import static
org.apache.ignite.internal.LongJVMPauseDetector.DFLT_JVM_PAUSE_DETECTOR_LAST_EVENTS_COUNT;
import static
org.apache.ignite.internal.LongJVMPauseDetector.DFLT_JVM_PAUSE_DETECTOR_PRECISION;
@@ -133,6 +132,7 @@ import static
org.apache.ignite.internal.processors.performancestatistics.FilePe
import static
org.apache.ignite.internal.processors.performancestatistics.FilePerformanceStatisticsWriter.DFLT_FILE_MAX_SIZE;
import static
org.apache.ignite.internal.processors.performancestatistics.FilePerformanceStatisticsWriter.DFLT_FLUSH_SIZE;
import static
org.apache.ignite.internal.processors.platform.client.ClientRequestHandler.DFLT_ASYNC_REQUEST_WAIT_TIMEOUT_MILLIS;
+import static
org.apache.ignite.internal.processors.pool.PoolProcessor.DFLT_PERIODIC_STARVATION_CHECK_FREQ;
import static
org.apache.ignite.internal.processors.query.QueryUtils.DFLT_INDEXING_DISCOVERY_HISTORY_SIZE;
import static
org.apache.ignite.internal.processors.query.schema.SchemaIndexCachePartitionWorker.DFLT_IGNITE_INDEX_REBUILD_BATCH_SIZE;
import static
org.apache.ignite.internal.processors.rest.GridRestProcessor.DFLT_SES_TIMEOUT;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 92d8c3b587d..ca27892e20d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -191,7 +191,6 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -225,7 +224,6 @@ import static java.util.Optional.ofNullable;
import static
org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2;
import static
org.apache.ignite.IgniteSystemProperties.IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID;
import static
org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
-import static
org.apache.ignite.IgniteSystemProperties.IGNITE_STARVATION_CHECK_INTERVAL;
import static org.apache.ignite.IgniteSystemProperties.getBoolean;
import static org.apache.ignite.internal.GridKernalState.DISCONNECTED;
import static org.apache.ignite.internal.GridKernalState.STARTED;
@@ -362,14 +360,6 @@ public class IgniteKernal implements IgniteEx,
Externalizable {
/** Description of the configuration system view. */
public static final String CFG_VIEW_DESC = "Node configuration";
- /**
- * Default interval of checking thread pool state for the starvation. Will
be used only if the
- * {@link IgniteSystemProperties#IGNITE_STARVATION_CHECK_INTERVAL} system
property is not set.
- * <p>
- * Value is {@code 30 sec}.
- */
- public static final long DFLT_PERIODIC_STARVATION_CHECK_FREQ = 1000 * 30;
-
/** Object is used to force completion the previous reconnection attempt.
See {@link ReconnectState} for details. */
private static final Object STOP_RECONNECT = new Object();
@@ -421,13 +411,6 @@ public class IgniteKernal implements IgniteEx,
Externalizable {
/** Spring context, potentially {@code null}. */
private GridSpringResourceContext rsrcCtx;
- /**
- * The instance of scheduled thread pool starvation checker. {@code null}
if starvation checks have been
- * disabled by the value of {@link
IgniteSystemProperties#IGNITE_STARVATION_CHECK_INTERVAL} system property.
- */
- @GridToStringExclude
- private GridTimeoutProcessor.CancelableTask starveTask;
-
/**
* The instance of scheduled metrics logger. {@code null} means that the
metrics loggin have been disabled
* by configuration. See {@link
IgniteConfiguration#getMetricsLogFrequency()} for details.
@@ -1295,74 +1278,6 @@ public class IgniteKernal implements IgniteEx,
Externalizable {
// Mark start timestamp.
startTime = U.currentTimeMillis();
- String intervalStr =
IgniteSystemProperties.getString(IGNITE_STARVATION_CHECK_INTERVAL);
-
- // Start starvation checker if enabled.
- boolean starveCheck = !"0".equals(intervalStr);
-
- if (starveCheck) {
- final long interval = F.isEmpty(intervalStr) ?
DFLT_PERIODIC_STARVATION_CHECK_FREQ : Long.parseLong(intervalStr);
-
- starveTask = ctx.timeout().schedule(new Runnable() {
- /** Last completed task count. */
- private long lastCompletedCntPub;
-
- /** Last completed task count. */
- private long lastCompletedCntSys;
-
- /** Last completed task count. */
- private long lastCompletedCntQry;
-
- @Override public void run() {
- if (ctx.pools().getExecutorService() instanceof
ThreadPoolExecutor) {
- ThreadPoolExecutor exec =
(ThreadPoolExecutor)ctx.pools().getExecutorService();
-
- lastCompletedCntPub = checkPoolStarvation(exec,
lastCompletedCntPub, "public");
- }
-
- if (ctx.pools().getSystemExecutorService() instanceof
ThreadPoolExecutor) {
- ThreadPoolExecutor exec =
(ThreadPoolExecutor)ctx.pools().getSystemExecutorService();
-
- lastCompletedCntSys = checkPoolStarvation(exec,
lastCompletedCntSys, "system");
- }
-
- if (ctx.pools().getQueryExecutorService() instanceof
ThreadPoolExecutor) {
- ThreadPoolExecutor exec =
(ThreadPoolExecutor)ctx.pools().getQueryExecutorService();
-
- lastCompletedCntQry = checkPoolStarvation(exec,
lastCompletedCntQry, "query");
- }
-
- if (ctx.pools().getStripedExecutorService() != null)
-
ctx.pools().getStripedExecutorService().detectStarvation();
- }
-
- /**
- * @param exec Thread pool executor to check.
- * @param lastCompletedCnt Last completed tasks count.
- * @param pool Pool name for message.
- * @return Current completed tasks count.
- */
- private long checkPoolStarvation(
- ThreadPoolExecutor exec,
- long lastCompletedCnt,
- String pool
- ) {
- long completedCnt = exec.getCompletedTaskCount();
-
- // If all threads are active and no task has completed
since last time and there is
- // at least one waiting request, then it is possible
starvation.
- if (exec.getPoolSize() == exec.getActiveCount() &&
completedCnt == lastCompletedCnt &&
- !exec.getQueue().isEmpty())
- LT.warn(
- log,
- "Possible thread pool starvation detected (no task
completed in last " +
- interval + "ms, is " + pool + " thread pool
size large enough?)");
-
- return completedCnt;
- }
- }, interval, interval);
- }
-
Ignite g = this;
long metricsLogFreq = cfg.getMetricsLogFrequency();
@@ -1861,9 +1776,6 @@ public class IgniteKernal implements IgniteEx,
Externalizable {
}
}
- if (starveTask != null)
- starveTask.close();
-
if (metricsLogTask != null)
metricsLogTask.close();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
index 704067ca228..9073f8e89c9 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.configuration.ExecutorConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.failure.FailureContext;
@@ -47,10 +48,12 @@ import
org.apache.ignite.internal.processors.security.thread.SecurityAwareIoPool
import
org.apache.ignite.internal.processors.security.thread.SecurityAwareStripedExecutor;
import
org.apache.ignite.internal.processors.security.thread.SecurityAwareStripedThreadPoolExecutor;
import
org.apache.ignite.internal.processors.security.thread.SecurityAwareThreadPoolExecutor;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.StripedExecutor;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorkerListener;
import org.apache.ignite.internal.worker.WorkersRegistry;
@@ -63,6 +66,7 @@ import org.apache.ignite.thread.IgniteThreadPoolExecutor;
import org.apache.ignite.thread.SameThreadExecutor;
import org.jetbrains.annotations.Nullable;
+import static
org.apache.ignite.IgniteSystemProperties.IGNITE_STARVATION_CHECK_INTERVAL;
import static
org.apache.ignite.configuration.IgniteConfiguration.DFLT_THREAD_KEEP_ALIVE_TIME;
import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNAPSHOT_RUNNER_THREAD_PREFIX;
@@ -139,6 +143,14 @@ public class PoolProcessor extends GridProcessorAdapter {
/** Histogram buckets for the task execution time metric (in
milliseconds). */
public static final long[] TASK_EXEC_TIME_HISTOGRAM_BUCKETS = new long[]
{10, 50, 100, 500, 1000};
+ /**
+ * Default interval of checking thread pool state for the starvation. Will
be used only if the
+ * {@link IgniteSystemProperties#IGNITE_STARVATION_CHECK_INTERVAL} system
property is not set.
+ * <p>
+ * Value is {@code 30 sec}.
+ */
+ public static final long DFLT_PERIODIC_STARVATION_CHECK_FREQ = 30 * 1000L;
+
/** Executor service. */
@GridToStringExclude
private ThreadPoolExecutor execSvc;
@@ -224,6 +236,16 @@ public class PoolProcessor extends GridProcessorAdapter {
/** Custom named pools. */
private Map<String, ThreadPoolExecutor> customExecs;
+ /** Pools to check for starvation. */
+ private Map<String, ExecutorService> starvationExecs = new HashMap<>();
+
+ /**
+ * The instance of scheduled thread pool starvation checker. {@code null}
if starvation checks have been
+ * disabled by the value of {@link
IgniteSystemProperties#IGNITE_STARVATION_CHECK_INTERVAL} system property.
+ */
+ @GridToStringExclude
+ private GridTimeoutProcessor.CancelableTask starveTask;
+
/**
* Constructor.
*
@@ -292,6 +314,8 @@ public class PoolProcessor extends GridProcessorAdapter {
execSvc.allowCoreThreadTimeOut(true);
+ addExecutorForStarvationDetection("public", execSvc);
+
validateThreadPoolSize(cfg.getServiceThreadPoolSize(), "service");
svcExecSvc = createExecutorService(
@@ -320,6 +344,8 @@ public class PoolProcessor extends GridProcessorAdapter {
sysExecSvc.allowCoreThreadTimeOut(true);
+ addExecutorForStarvationDetection("system", sysExecSvc);
+
validateThreadPoolSize(cfg.getStripedPoolSize(), "stripedPool");
WorkersRegistry workerRegistry = ctx.workersRegistry();
@@ -338,6 +364,8 @@ public class PoolProcessor extends GridProcessorAdapter {
workerRegistry,
cfg.getFailureDetectionTimeout());
+ addExecutorForStarvationDetection("striped", stripedExecSvc);
+
// Note that since we use 'LinkedBlockingQueue', number of
// maximum threads has no effect.
// Note, that we do not pre-start threads here as management pool may
@@ -490,6 +518,8 @@ public class PoolProcessor extends GridProcessorAdapter {
qryExecSvc.allowCoreThreadTimeOut(true);
+ addExecutorForStarvationDetection("query", qryExecSvc);
+
schemaExecSvc = createExecutorService(
"schema",
cfg.getIgniteInstanceName(),
@@ -641,6 +671,81 @@ public class PoolProcessor extends GridProcessorAdapter {
StripedExecutorTaskView::new);
}
+ /** {@inheritDoc} */
+ @Override public void onKernalStart(boolean active) throws
IgniteCheckedException {
+ super.onKernalStart(active);
+
+ String intervalStr =
IgniteSystemProperties.getString(IGNITE_STARVATION_CHECK_INTERVAL);
+
+ // Start starvation checker if enabled.
+ if ("0".equals(intervalStr))
+ return;
+
+ final long interval = F.isEmpty(intervalStr) ?
DFLT_PERIODIC_STARVATION_CHECK_FREQ : Long.parseLong(intervalStr);
+
+ starveTask = ctx.timeout().schedule(new Runnable() {
+ /** Last completed task count by pool name. */
+ private final Map<String, Long> lastCompleted = new HashMap<>();
+
+ @Override public void run() {
+ Map<String, ExecutorService> execs = starvationExecs;
+
+ if (execs == null)
+ return;
+
+ for (Map.Entry<String, ExecutorService> entry :
execs.entrySet()) {
+ String name = entry.getKey();
+ ExecutorService exec = entry.getValue();
+
+ if (exec instanceof ThreadPoolExecutor) {
+ ThreadPoolExecutor exec0 = (ThreadPoolExecutor)exec;
+
+ checkPoolStarvation(name,
exec0.getCompletedTaskCount(), exec0.getPoolSize(),
+ exec0.getActiveCount(),
exec0.getQueue().isEmpty());
+ }
+ if (exec instanceof IgniteStripedThreadPoolExecutor) {
+ IgniteStripedThreadPoolExecutor exec0 =
(IgniteStripedThreadPoolExecutor)exec;
+
+ checkPoolStarvation(name, exec0.completedTaskCount(),
exec0.poolSize(),
+ exec0.activeCount(), exec0.queueEmpty());
+ }
+ else if (exec instanceof StripedExecutor)
+ ((StripedExecutor)exec).detectStarvation();
+ }
+ }
+
+ /** */
+ private void checkPoolStarvation(
+ String pool,
+ long completedCnt,
+ int poolSize,
+ int activeCnt,
+ boolean queueEmpty
+ ) {
+ long lastCompletedCnt = lastCompleted.getOrDefault(pool, 0L);
+
+ // If all threads are active and no task has completed since
last time and there is
+ // at least one waiting request, then it is possible
starvation.
+ if (poolSize == activeCnt && completedCnt == lastCompletedCnt
&& !queueEmpty) {
+ LT.warn(log, "Possible thread pool starvation detected (no
task completed in last " +
+ interval + "ms, is " + pool + " thread pool size large
enough?)");
+ }
+
+ if (completedCnt != lastCompletedCnt)
+ lastCompleted.put(pool, completedCnt);
+ }
+ }, interval, interval);
+ }
+
+
+ /** {@inheritDoc} */
+ @Override public void onKernalStop(boolean cancel) {
+ super.onKernalStop(cancel);
+
+ if (starveTask != null)
+ starveTask.close();
+ }
+
/**
* Get executor service for policy.
*
@@ -923,6 +1028,16 @@ public class PoolProcessor extends GridProcessorAdapter {
return reencryptExecSvc;
}
+ /**
+ * Add pool to check for starvation.
+ *
+ * @param name Executor name.
+ * @param execSvc Executor service.
+ */
+ public void addExecutorForStarvationDetection(String name, ExecutorService
execSvc) {
+ starvationExecs.put(name, execSvc);
+ }
+
/**
* Creates a {@link MetricRegistry} for an executor.
*
@@ -1050,6 +1165,8 @@ public class PoolProcessor extends GridProcessorAdapter {
customExecs = null;
}
+
+ starvationExecs = null;
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
index 215843c8795..9509e9d87b0 100644
---
a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
+++
b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
@@ -273,6 +273,18 @@ public class IgniteStripedThreadPoolExecutor implements
ExecutorService, Metrics
return queueSize;
}
+ /**
+ * @return {@code True} if task queue is empty..
+ */
+ public boolean queueEmpty() {
+ for (IgniteThreadPoolExecutor exec : execs) {
+ if (!exec.getQueue().isEmpty())
+ return false;
+ }
+
+ return true;
+ }
+
/**
* @return Approximate total number of tasks that have ever been scheduled
for execution. Because the states of
* tasks and threads may change dynamically during computation, the
returned value is only an approximation.
@@ -291,7 +303,7 @@ public class IgniteStripedThreadPoolExecutor implements
ExecutorService, Metrics
* may change dynamically during computation, the returned value is only
an approximation, but one that does not
* ever decrease across successive calls.
*/
- private long completedTaskCount() {
+ public long completedTaskCount() {
long completedTaskCnt = 0;
for (IgniteThreadPoolExecutor exec : execs)
@@ -303,7 +315,7 @@ public class IgniteStripedThreadPoolExecutor implements
ExecutorService, Metrics
/**
* @return Approximate number of threads that are actively executing tasks.
*/
- private int activeCount() {
+ public int activeCount() {
int activeCnt = 0;
for (IgniteThreadPoolExecutor exec : execs)
@@ -315,7 +327,7 @@ public class IgniteStripedThreadPoolExecutor implements
ExecutorService, Metrics
/**
* @return current number of threads in the pool.
*/
- private int poolSize() {
+ public int poolSize() {
int poolSize = 0;
for (IgniteThreadPoolExecutor exec : execs)