This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 511d08f6994 Allow idle Pipe worker threads to time out (#18000)
511d08f6994 is described below

commit 511d08f69942a55b45b37792173269f45bc583f2
Author: Zhenyu Luo <[email protected]>
AuthorDate: Mon Jun 22 17:51:08 2026 +0800

    Allow idle Pipe worker threads to time out (#18000)
---
 .../commons/concurrent/IoTDBThreadPoolFactory.java      | 17 +++++++++++++++++
 .../pipe/agent/task/execution/PipeSubtaskExecutor.java  |  9 ++++++++-
 .../iotdb/commons/IoTDBThreadPoolFactoryTest.java       | 16 ++++++++++++++++
 3 files changed, 41 insertions(+), 1 deletion(-)

diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/IoTDBThreadPoolFactory.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/IoTDBThreadPoolFactory.java
index 73d6ff22430..14dc807cd2f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/IoTDBThreadPoolFactory.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/IoTDBThreadPoolFactory.java
@@ -127,6 +127,23 @@ public class IoTDBThreadPoolFactory {
         poolName);
   }
 
+  public static ExecutorService newFixedThreadPoolWithIdleThreadTimeout(
+      int nThreads, long keepAliveTime, TimeUnit unit, String poolName) {
+    logger.info(NEW_FIXED_THREAD_POOL_LOGGER_FORMAT, poolName, nThreads);
+
+    WrappedThreadPoolExecutor executor =
+        new WrappedThreadPoolExecutor(
+            nThreads,
+            nThreads,
+            keepAliveTime,
+            unit,
+            new LinkedBlockingQueue<>(),
+            new IoTThreadFactory(poolName),
+            poolName);
+    executor.allowCoreThreadTimeOut(true);
+    return executor;
+  }
+
   /**
    * see {@link 
Executors#newSingleThreadExecutor(java.util.concurrent.ThreadFactory)}.
    *
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
index f14bb483e53..4782988868a 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
@@ -38,11 +38,14 @@ import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 public abstract class PipeSubtaskExecutor {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeSubtaskExecutor.class);
 
+  private static final long WORKER_THREAD_KEEP_ALIVE_TIME_IN_SECONDS = 60L;
+
   private static final ExecutorService globalSubtaskCallbackListeningExecutor =
       IoTDBThreadPoolFactory.newSingleThreadExecutor(
           ThreadName.PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL.getName());
@@ -78,7 +81,11 @@ public abstract class PipeSubtaskExecutor {
             : ThreadName.PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL.getName();
     underlyingThreadPool =
         (WrappedThreadPoolExecutor)
-            IoTDBThreadPoolFactory.newFixedThreadPool(corePoolSize, 
workingThreadName);
+            IoTDBThreadPoolFactory.newFixedThreadPoolWithIdleThreadTimeout(
+                corePoolSize,
+                WORKER_THREAD_KEEP_ALIVE_TIME_IN_SECONDS,
+                TimeUnit.SECONDS,
+                workingThreadName);
     if (disableLogInThreadPool) {
       underlyingThreadPool.disableErrorLog();
     }
diff --git 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/IoTDBThreadPoolFactoryTest.java
 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/IoTDBThreadPoolFactoryTest.java
index 1db18789fbf..b735468bed0 100644
--- 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/IoTDBThreadPoolFactoryTest.java
+++ 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/IoTDBThreadPoolFactoryTest.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.commons;
 
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.WrappedRunnable;
+import 
org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor;
 
 import org.apache.thrift.server.TThreadPoolServer;
 import org.apache.thrift.server.TThreadPoolServer.Args;
@@ -110,6 +111,21 @@ public class IoTDBThreadPoolFactoryTest {
     }
   }
 
+  @Test
+  public void testNewFixedThreadPoolWithIdleThreadTimeout() throws Exception {
+    ExecutorService exec =
+        IoTDBThreadPoolFactory.newFixedThreadPoolWithIdleThreadTimeout(
+            1, 1, TimeUnit.MILLISECONDS, POOL_NAME);
+
+    exec.submit(() -> {}).get();
+    assertEquals(1, ((WrappedThreadPoolExecutor) exec).getLargestPoolSize());
+
+    Thread.sleep(100);
+    assertEquals(0, ((WrappedThreadPoolExecutor) exec).getPoolSize());
+
+    exec.shutdown();
+  }
+
   @Test
   public void testNewSingleThreadScheduledExecutor() throws 
InterruptedException {
     String reason = "(can be ignored in Tests) 
NewSingleThreadScheduledExecutor";

Reply via email to