This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch term
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 46cd268672be3c680d880c1545c3414ed4f7ffbb
Author: Caideyipi <[email protected]>
AuthorDate: Tue May 12 12:07:44 2026 +0800

    temp
---
 .../event/common/terminate/PipeTerminateEvent.java | 31 +++++++++++++++-------
 1 file changed, 21 insertions(+), 10 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
index 3b933b92b73..64209242cc4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
@@ -49,17 +49,28 @@ public class PipeTerminateEvent extends EnrichedEvent {
 
   private final boolean shouldMark;
 
+  private static final int TERMINATE_EXECUTOR_THREAD_COUNT =
+      IoTDBDescriptor.getInstance().getConfig().getPipeTaskThreadCount();
+
+  private static final int TERMINATE_EXECUTOR_QUEUE_SIZE =
+      Math.max(1024, TERMINATE_EXECUTOR_THREAD_COUNT * 64);
+
   // Do not use call run policy to avoid deadlock
-  private static final ExecutorService terminateExecutor =
-      new WrappedThreadPoolExecutor(
-          0,
-          IoTDBDescriptor.getInstance().getConfig().getPipeTaskThreadCount(),
-          0L,
-          TimeUnit.SECONDS,
-          new ArrayBlockingQueue<>(
-              
IoTDBDescriptor.getInstance().getConfig().getPipeTaskThreadCount()),
-          new 
IoTThreadFactory(ThreadName.PIPE_TERMINATE_EXECUTION_POOL.getName()),
-          ThreadName.PIPE_TERMINATE_EXECUTION_POOL.getName());
+  private static final ExecutorService terminateExecutor = 
createTerminateExecutor();
+
+  private static ExecutorService createTerminateExecutor() {
+    final WrappedThreadPoolExecutor executor =
+        new WrappedThreadPoolExecutor(
+            TERMINATE_EXECUTOR_THREAD_COUNT,
+            TERMINATE_EXECUTOR_THREAD_COUNT,
+            60L,
+            TimeUnit.SECONDS,
+            new ArrayBlockingQueue<>(TERMINATE_EXECUTOR_QUEUE_SIZE),
+            new 
IoTThreadFactory(ThreadName.PIPE_TERMINATE_EXECUTION_POOL.getName()),
+            ThreadName.PIPE_TERMINATE_EXECUTION_POOL.getName());
+    executor.allowCoreThreadTimeOut(true);
+    return executor;
+  }
 
   public PipeTerminateEvent(
       final String pipeName,

Reply via email to