This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 511d08f6994 Allow idle Pipe worker threads to time out (#18000)
511d08f6994 is described below
commit 511d08f69942a55b45b37792173269f45bc583f2
Author: Zhenyu Luo <[email protected]>
AuthorDate: Mon Jun 22 17:51:08 2026 +0800
Allow idle Pipe worker threads to time out (#18000)
---
.../commons/concurrent/IoTDBThreadPoolFactory.java | 17 +++++++++++++++++
.../pipe/agent/task/execution/PipeSubtaskExecutor.java | 9 ++++++++-
.../iotdb/commons/IoTDBThreadPoolFactoryTest.java | 16 ++++++++++++++++
3 files changed, 41 insertions(+), 1 deletion(-)
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/IoTDBThreadPoolFactory.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/IoTDBThreadPoolFactory.java
index 73d6ff22430..14dc807cd2f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/IoTDBThreadPoolFactory.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/IoTDBThreadPoolFactory.java
@@ -127,6 +127,23 @@ public class IoTDBThreadPoolFactory {
poolName);
}
+ public static ExecutorService newFixedThreadPoolWithIdleThreadTimeout(
+ int nThreads, long keepAliveTime, TimeUnit unit, String poolName) {
+ logger.info(NEW_FIXED_THREAD_POOL_LOGGER_FORMAT, poolName, nThreads);
+
+ WrappedThreadPoolExecutor executor =
+ new WrappedThreadPoolExecutor(
+ nThreads,
+ nThreads,
+ keepAliveTime,
+ unit,
+ new LinkedBlockingQueue<>(),
+ new IoTThreadFactory(poolName),
+ poolName);
+ executor.allowCoreThreadTimeOut(true);
+ return executor;
+ }
+
/**
* see {@link
Executors#newSingleThreadExecutor(java.util.concurrent.ThreadFactory)}.
*
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
index f14bb483e53..4782988868a 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
@@ -38,11 +38,14 @@ import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
public abstract class PipeSubtaskExecutor {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeSubtaskExecutor.class);
+ private static final long WORKER_THREAD_KEEP_ALIVE_TIME_IN_SECONDS = 60L;
+
private static final ExecutorService globalSubtaskCallbackListeningExecutor =
IoTDBThreadPoolFactory.newSingleThreadExecutor(
ThreadName.PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL.getName());
@@ -78,7 +81,11 @@ public abstract class PipeSubtaskExecutor {
: ThreadName.PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL.getName();
underlyingThreadPool =
(WrappedThreadPoolExecutor)
- IoTDBThreadPoolFactory.newFixedThreadPool(corePoolSize,
workingThreadName);
+ IoTDBThreadPoolFactory.newFixedThreadPoolWithIdleThreadTimeout(
+ corePoolSize,
+ WORKER_THREAD_KEEP_ALIVE_TIME_IN_SECONDS,
+ TimeUnit.SECONDS,
+ workingThreadName);
if (disableLogInThreadPool) {
underlyingThreadPool.disableErrorLog();
}
diff --git
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/IoTDBThreadPoolFactoryTest.java
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/IoTDBThreadPoolFactoryTest.java
index 1db18789fbf..b735468bed0 100644
---
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/IoTDBThreadPoolFactoryTest.java
+++
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/IoTDBThreadPoolFactoryTest.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.commons;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.WrappedRunnable;
+import
org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.server.TThreadPoolServer.Args;
@@ -110,6 +111,21 @@ public class IoTDBThreadPoolFactoryTest {
}
}
+ @Test
+ public void testNewFixedThreadPoolWithIdleThreadTimeout() throws Exception {
+ ExecutorService exec =
+ IoTDBThreadPoolFactory.newFixedThreadPoolWithIdleThreadTimeout(
+ 1, 1, TimeUnit.MILLISECONDS, POOL_NAME);
+
+ exec.submit(() -> {}).get();
+ assertEquals(1, ((WrappedThreadPoolExecutor) exec).getLargestPoolSize());
+
+ Thread.sleep(100);
+ assertEquals(0, ((WrappedThreadPoolExecutor) exec).getPoolSize());
+
+ exec.shutdown();
+ }
+
@Test
public void testNewSingleThreadScheduledExecutor() throws
InterruptedException {
String reason = "(can be ignored in Tests)
NewSingleThreadScheduledExecutor";