This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch thread-e in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 7327aaff4e72d0af6567cb16a1fe15abc91ba362 Author: Caideyipi <[email protected]> AuthorDate: Tue May 12 17:13:12 2026 +0800 Pipe: Optimized the thread executor of the terminate event (#17638) --- .../event/common/terminate/PipeTerminateEvent.java | 31 +++++++++++++++------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java index a8bf19ebf3b..4bf79a3df03 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java @@ -48,17 +48,28 @@ public class PipeTerminateEvent extends EnrichedEvent { private final boolean shouldMark; + private static final int TERMINATE_EXECUTOR_THREAD_COUNT = + IoTDBDescriptor.getInstance().getConfig().getPipeTaskThreadCount(); + + private static final int TERMINATE_EXECUTOR_QUEUE_SIZE = + Math.max(1024, TERMINATE_EXECUTOR_THREAD_COUNT * 64); + // Do not use call run policy to avoid deadlock - private static final ExecutorService terminateExecutor = - new WrappedThreadPoolExecutor( - 0, - IoTDBDescriptor.getInstance().getConfig().getPipeTaskThreadCount(), - 0L, - TimeUnit.SECONDS, - new ArrayBlockingQueue<>( - IoTDBDescriptor.getInstance().getConfig().getPipeTaskThreadCount()), - new IoTThreadFactory(ThreadName.PIPE_TERMINATE_EXECUTION_POOL.getName()), - ThreadName.PIPE_TERMINATE_EXECUTION_POOL.getName()); + private static final ExecutorService terminateExecutor = createTerminateExecutor(); + + private static ExecutorService createTerminateExecutor() { + final WrappedThreadPoolExecutor executor = + new WrappedThreadPoolExecutor( + TERMINATE_EXECUTOR_THREAD_COUNT, + TERMINATE_EXECUTOR_THREAD_COUNT, + 60L, + TimeUnit.SECONDS, + new ArrayBlockingQueue<>(TERMINATE_EXECUTOR_QUEUE_SIZE), + new IoTThreadFactory(ThreadName.PIPE_TERMINATE_EXECUTION_POOL.getName()), + ThreadName.PIPE_TERMINATE_EXECUTION_POOL.getName()); + executor.allowCoreThreadTimeOut(true); + return executor; + } public PipeTerminateEvent( final String pipeName,
