This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 96754ec2fb2 Pipe: Optimized the thread executor of the terminate event 
(#17638) (#17647)
96754ec2fb2 is described below

commit 96754ec2fb2dfd3b069ce2d87ef85bb1f48d011d
Author: Caideyipi <[email protected]>
AuthorDate: Wed May 13 10:16:37 2026 +0800

    Pipe: Optimized the thread executor of the terminate event (#17638) (#17647)
---
 .../event/common/terminate/PipeTerminateEvent.java | 31 +++++++++++++++-------
 1 file changed, 21 insertions(+), 10 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
index a8bf19ebf3b..4bf79a3df03 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
@@ -48,17 +48,28 @@ public class PipeTerminateEvent extends EnrichedEvent {
 
   private final boolean shouldMark;
 
+  private static final int TERMINATE_EXECUTOR_THREAD_COUNT =
+      IoTDBDescriptor.getInstance().getConfig().getPipeTaskThreadCount();
+
+  private static final int TERMINATE_EXECUTOR_QUEUE_SIZE =
+      Math.max(1024, TERMINATE_EXECUTOR_THREAD_COUNT * 64);
+
   // Do not use call run policy to avoid deadlock
-  private static final ExecutorService terminateExecutor =
-      new WrappedThreadPoolExecutor(
-          0,
-          IoTDBDescriptor.getInstance().getConfig().getPipeTaskThreadCount(),
-          0L,
-          TimeUnit.SECONDS,
-          new ArrayBlockingQueue<>(
-              
IoTDBDescriptor.getInstance().getConfig().getPipeTaskThreadCount()),
-          new 
IoTThreadFactory(ThreadName.PIPE_TERMINATE_EXECUTION_POOL.getName()),
-          ThreadName.PIPE_TERMINATE_EXECUTION_POOL.getName());
+  private static final ExecutorService terminateExecutor = 
createTerminateExecutor();
+
+  private static ExecutorService createTerminateExecutor() {
+    final WrappedThreadPoolExecutor executor =
+        new WrappedThreadPoolExecutor(
+            TERMINATE_EXECUTOR_THREAD_COUNT,
+            TERMINATE_EXECUTOR_THREAD_COUNT,
+            60L,
+            TimeUnit.SECONDS,
+            new ArrayBlockingQueue<>(TERMINATE_EXECUTOR_QUEUE_SIZE),
+            new 
IoTThreadFactory(ThreadName.PIPE_TERMINATE_EXECUTION_POOL.getName()),
+            ThreadName.PIPE_TERMINATE_EXECUTION_POOL.getName());
+    executor.allowCoreThreadTimeOut(true);
+    return executor;
+  }
 
   public PipeTerminateEvent(
       final String pipeName,

Reply via email to