This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch term in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 46cd268672be3c680d880c1545c3414ed4f7ffbb Author: Caideyipi <[email protected]> AuthorDate: Tue May 12 12:07:44 2026 +0800 temp --- .../event/common/terminate/PipeTerminateEvent.java | 31 +++++++++++++++------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java index 3b933b92b73..64209242cc4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java @@ -49,17 +49,28 @@ public class PipeTerminateEvent extends EnrichedEvent { private final boolean shouldMark; + private static final int TERMINATE_EXECUTOR_THREAD_COUNT = + IoTDBDescriptor.getInstance().getConfig().getPipeTaskThreadCount(); + + private static final int TERMINATE_EXECUTOR_QUEUE_SIZE = + Math.max(1024, TERMINATE_EXECUTOR_THREAD_COUNT * 64); + // Do not use call run policy to avoid deadlock - private static final ExecutorService terminateExecutor = - new WrappedThreadPoolExecutor( - 0, - IoTDBDescriptor.getInstance().getConfig().getPipeTaskThreadCount(), - 0L, - TimeUnit.SECONDS, - new ArrayBlockingQueue<>( - IoTDBDescriptor.getInstance().getConfig().getPipeTaskThreadCount()), - new IoTThreadFactory(ThreadName.PIPE_TERMINATE_EXECUTION_POOL.getName()), - ThreadName.PIPE_TERMINATE_EXECUTION_POOL.getName()); + private static final ExecutorService terminateExecutor = createTerminateExecutor(); + + private static ExecutorService createTerminateExecutor() { + final WrappedThreadPoolExecutor executor = + new WrappedThreadPoolExecutor( + TERMINATE_EXECUTOR_THREAD_COUNT, + TERMINATE_EXECUTOR_THREAD_COUNT, + 60L, + TimeUnit.SECONDS, + new ArrayBlockingQueue<>(TERMINATE_EXECUTOR_QUEUE_SIZE), + new IoTThreadFactory(ThreadName.PIPE_TERMINATE_EXECUTION_POOL.getName()), + ThreadName.PIPE_TERMINATE_EXECUTION_POOL.getName()); + executor.allowCoreThreadTimeOut(true); + return executor; + } public PipeTerminateEvent( final String pipeName,
