This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rc/1.3.5
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8c694e434f6dc53787058dad603be773a34f4a1c
Author: Caideyipi <[email protected]>
AuthorDate: Thu Aug 7 09:28:36 2025 +0800

    [To dev/1.3] Pipe: Do not use the fork join pool in TerminateEvent #16113 
(#16114)
    
    * fix
    
    * optimize
---
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  |  2 +-
 .../event/common/terminate/PipeTerminateEvent.java | 37 ++++++++++++++++++----
 .../PipeHistoricalDataRegionTsFileExtractor.java   | 21 ++++++++++--
 .../iotdb/commons/concurrent/ThreadName.java       |  1 +
 4 files changed, 52 insertions(+), 9 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index f800c15c3f8..56e4ef9159c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -132,7 +132,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
           0L,
           TimeUnit.SECONDS,
           new ArrayBlockingQueue<>(
-              
IoTDBDescriptor.getInstance().getConfig().getSchemaThreadCount()),
+              
IoTDBDescriptor.getInstance().getConfig().getPipeTaskThreadCount()),
           new 
IoTThreadFactory(ThreadName.PIPE_PARALLEL_EXECUTION_POOL.getName()),
           ThreadName.PIPE_PARALLEL_EXECUTION_POOL.getName(),
           new ThreadPoolExecutor.CallerRunsPolicy());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
index 91d38cf3361..3e0475a3d60 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
@@ -19,16 +19,22 @@
 
 package org.apache.iotdb.db.pipe.event.common.terminate;
 
+import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import 
org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import org.apache.iotdb.db.pipe.agent.task.PipeDataNodeTask;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
  * The {@link PipeTerminateEvent} is an {@link EnrichedEvent} that controls 
the termination of pipe,
@@ -40,13 +46,29 @@ public class PipeTerminateEvent extends EnrichedEvent {
 
   private final int dataRegionId;
 
+  private final boolean shouldMark;
+
+  // Do not use call run policy to avoid deadlock
+  private static final ExecutorService terminateExecutor =
+      new WrappedThreadPoolExecutor(
+          0,
+          IoTDBDescriptor.getInstance().getConfig().getPipeTaskThreadCount(),
+          0L,
+          TimeUnit.SECONDS,
+          new ArrayBlockingQueue<>(
+              
IoTDBDescriptor.getInstance().getConfig().getPipeTaskThreadCount()),
+          new 
IoTThreadFactory(ThreadName.PIPE_TERMINATE_EXECUTION_POOL.getName()),
+          ThreadName.PIPE_TERMINATE_EXECUTION_POOL.getName());
+
   public PipeTerminateEvent(
       final String pipeName,
       final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
-      final int dataRegionId) {
+      final int dataRegionId,
+      final boolean shouldMark) {
     super(pipeName, creationTime, pipeTaskMeta, null, Long.MIN_VALUE, 
Long.MAX_VALUE);
     this.dataRegionId = dataRegionId;
+    this.shouldMark = shouldMark;
   }
 
   @Override
@@ -74,7 +96,7 @@ public class PipeTerminateEvent extends EnrichedEvent {
       final long endTime) {
     // Should record PipeTaskMeta, for the terminateEvent shall report 
progress to
     // notify the pipeTask it's completed.
-    return new PipeTerminateEvent(pipeName, creationTime, pipeTaskMeta, 
dataRegionId);
+    return new PipeTerminateEvent(pipeName, creationTime, pipeTaskMeta, 
dataRegionId, shouldMark);
   }
 
   @Override
@@ -95,13 +117,16 @@ public class PipeTerminateEvent extends EnrichedEvent {
   @Override
   public void reportProgress() {
     // To avoid deadlock
-    CompletableFuture.runAsync(
-        () -> PipeDataNodeAgent.task().markCompleted(pipeName, dataRegionId));
+    if (shouldMark) {
+      terminateExecutor.submit(
+          () -> PipeDataNodeAgent.task().markCompleted(pipeName, 
dataRegionId));
+    }
   }
 
   @Override
   public String toString() {
-    return String.format("PipeTerminateEvent{dataRegionId=%s}", dataRegionId)
+    return String.format(
+            "PipeTerminateEvent{dataRegionId=%s, shouldMark=%s}", 
dataRegionId, shouldMark)
         + " - "
         + super.toString();
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 25759654a5b..efd03a3709c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -77,6 +77,10 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_PATH_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_TIME_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_START_TIME_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_QUERY_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_START_TIME_KEY;
@@ -86,11 +90,11 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_END_TIME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_LOOSE_RANGE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_START_TIME_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODS_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_START_TIME_KEY;
 
 public class PipeHistoricalDataRegionTsFileExtractor implements 
PipeHistoricalDataRegionExtractor {
-
   private static final Logger LOGGER =
       LoggerFactory.getLogger(PipeHistoricalDataRegionTsFileExtractor.class);
 
@@ -117,6 +121,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
   private boolean shouldExtractInsertion;
   private boolean shouldTransferModFile; // Whether to transfer mods
 
+  private boolean shouldTerminatePipeOnAllHistoricalEventsConsumed;
   private boolean isTerminateSignalSent = false;
 
   private boolean isForwardingPipeRequests;
@@ -294,6 +299,13 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
                 || // Should extract deletion
                 listeningOptionPair.getRight());
 
+    final String extractorModeValue =
+        parameters.getStringOrDefault(
+            Arrays.asList(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY), 
EXTRACTOR_MODE_DEFAULT_VALUE);
+    shouldTerminatePipeOnAllHistoricalEventsConsumed =
+        extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE)
+            || extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE);
+
     isForwardingPipeRequests =
         parameters.getBooleanOrDefault(
             Arrays.asList(
@@ -554,7 +566,12 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
 
     if (resource == null) {
       final PipeTerminateEvent terminateEvent =
-          new PipeTerminateEvent(pipeName, creationTime, pipeTaskMeta, 
dataRegionId);
+          new PipeTerminateEvent(
+              pipeName,
+              creationTime,
+              pipeTaskMeta,
+              dataRegionId,
+              shouldTerminatePipeOnAllHistoricalEventsConsumed);
       if (!terminateEvent.increaseReferenceCount(
           PipeHistoricalDataRegionTsFileExtractor.class.getName())) {
         LOGGER.warn(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 72a0e4ebb0c..6f81aaba419 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -145,6 +145,7 @@ public enum ThreadName {
   PIPE_RECEIVER_AIR_GAP_AGENT("Pipe-Receiver-Air-Gap-Agent"),
   PIPE_AIR_GAP_RECEIVER("Pipe-Air-Gap-Receiver"),
   PIPE_PARALLEL_EXECUTION_POOL("Pipe-Parallel-Execution-Pool"),
+  PIPE_TERMINATE_EXECUTION_POOL("Pipe-Terminate-Execution-Pool"),
   LOAD_DATATYPE_CONVERT_POOL("Load-Datatype-Convert-Pool"),
   SUBSCRIPTION_EXECUTOR_POOL("Subscription-Executor-Pool"),
   SUBSCRIPTION_RUNTIME_META_SYNCER("Subscription-Runtime-Meta-Syncer"),

Reply via email to