This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 3d08001d793 Pipe: Fixed the deadlock cause by terminate event 
reporting & Added sink.batch.max-delay-ms to enable delicate control over batch 
delay & Fixed the bug of premature halt in extractor snapshot mode cause by 
real-time-first transfer & Stabilized the trigger of default batch sending & 
Added "isNeedToReport" getter in PipeRawTabletInsertionEvent for user-defined 
plugins & Reduce logs from PipeEventCommitter (#15377) + Fix batch type can not 
be changed & Enhance close() in [...]
3d08001d793 is described below

commit 3d08001d793cc4f34d8b8b854b08a42cd04499a7
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 8 10:41:20 2025 +0800

    Pipe: Fixed the deadlock cause by terminate event reporting & Added 
sink.batch.max-delay-ms to enable delicate control over batch delay & Fixed the 
bug of premature halt in extractor snapshot mode cause by real-time-first 
transfer & Stabilized the trigger of default batch sending & Added 
"isNeedToReport" getter in PipeRawTabletInsertionEvent for user-defined plugins 
& Reduce logs from PipeEventCommitter (#15377) + Fix batch type can not be 
changed & Enhance close() in PipeTsFileResour [...]
    
    * Pipe: Fixed the deadlock cause by terminate event reporting & Added 
sink.batch.max-delay-ms to enable delicate control over batch delay & Fixed the 
bug of premature halt in extractor snapshot mode cause by real-time-first 
transfer & Stabilized the trigger of default batch sending & Added 
"isNeedToReport" getter in PipeRawTabletInsertionEvent for user-defined plugins 
& Reduce logs from PipeEventCommitter (#15377)
    
    * Pipe: Fix batch type can not be changed (Introduced in #15377) & Enhance 
close() in PipeTsFileResource (#15401)
    
    ---------
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../it/dual/IoTDBSubscriptionTopicIT.java          |  6 ++--
 .../task/builder/PipeDataNodeTaskBuilder.java      | 40 +++++++++++++++++----
 .../subtask/connector/PipeConnectorSubtask.java    | 13 ++++---
 .../batch/PipeTransferBatchReqBuilder.java         | 41 +++++++++++-----------
 .../PipeConsensusTransferBatchReqBuilder.java      | 19 +++++++---
 .../common/tablet/PipeRawTabletInsertionEvent.java |  5 +++
 .../event/common/terminate/PipeTerminateEvent.java |  6 +++-
 .../PipeHistoricalDataRegionTsFileExtractor.java   | 21 ++---------
 .../pipe/resource/tsfile/PipeTsFileResource.java   | 14 ++++++--
 .../resource/tsfile/PipeTsFileResourceManager.java |  2 +-
 .../agent/task/progress/PipeEventCommitter.java    | 28 ++++++++-------
 .../config/constant/PipeConnectorConstant.java     |  3 ++
 .../pipe/connector/protocol/IoTDBConnector.java    | 10 ++++++
 13 files changed, 131 insertions(+), 77 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
index 6010430fe00..723090d433e 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
@@ -833,7 +833,7 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
     final String topicName = "topic11";
     final String host = senderEnv.getIP();
     final int port = Integer.parseInt(senderEnv.getPort());
-    try (final SubscriptionTreeSession session = new 
SubscriptionTreeSession(host, port)) {
+    try (final SubscriptionSession session = new SubscriptionSession(host, 
port)) {
       session.open();
       final Properties config = new Properties();
       config.put(TopicConstant.MODE_KEY, TopicConstant.MODE_SNAPSHOT_VALUE);
@@ -848,8 +848,8 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
     final Thread thread =
         new Thread(
             () -> {
-              try (final SubscriptionTreePullConsumer consumer =
-                  new SubscriptionTreePullConsumer.Builder()
+              try (final SubscriptionPullConsumer consumer =
+                  new SubscriptionPullConsumer.Builder()
                       .host(host)
                       .port(port)
                       .consumerId("c1")
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
index 1b517419c98..4b78fc49c30 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java
@@ -51,6 +51,11 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_TABLET_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_TS_FILE_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_FORMAT_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_QUERY_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODE_KEY;
 
 public class PipeDataNodeTaskBuilder {
 
@@ -171,14 +176,25 @@ public class PipeDataNodeTaskBuilder {
 
   private void checkConflict(
       final PipeParameters extractorParameters, final PipeParameters 
connectorParameters) {
+    final Pair<Boolean, Boolean> insertionDeletionListeningOptionPair;
+    final boolean shouldTerminatePipeOnAllHistoricalEventsConsumed;
 
     try {
-      final Pair<Boolean, Boolean> insertionDeletionListeningOptionPair =
+      insertionDeletionListeningOptionPair =
           
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(extractorParameters);
-      if (!insertionDeletionListeningOptionPair.right) {
+
+      final String extractorModeValue =
+          extractorParameters.getStringOrDefault(
+              Arrays.asList(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY), 
EXTRACTOR_MODE_DEFAULT_VALUE);
+      shouldTerminatePipeOnAllHistoricalEventsConsumed =
+          extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE)
+              || 
extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE);
+
+      if (!insertionDeletionListeningOptionPair.right
+          && !shouldTerminatePipeOnAllHistoricalEventsConsumed) {
         return;
       }
-    } catch (IllegalPathException e) {
+    } catch (final IllegalPathException e) {
       LOGGER.warn(
           "PipeDataNodeTaskBuilder failed to parse 'inclusion' and 'exclusion' 
parameters: {}",
           e.getMessage(),
@@ -192,14 +208,24 @@ public class PipeDataNodeTaskBuilder {
             PipeConnectorConstant.SINK_REALTIME_FIRST_KEY);
     if (isRealtime == null) {
       
connectorParameters.addAttribute(PipeConnectorConstant.CONNECTOR_REALTIME_FIRST_KEY,
 "false");
-      LOGGER.info(
-          "PipeDataNodeTaskBuilder: When 'inclusion' contains 'data.delete', 
'realtime-first' is defaulted to 'false' to prevent sync issues after 
deletion.");
+      if (insertionDeletionListeningOptionPair.right) {
+        LOGGER.info(
+            "PipeDataNodeTaskBuilder: When 'inclusion' contains 'data.delete', 
'realtime-first' is defaulted to 'false' to prevent sync issues after 
deletion.");
+      } else {
+        LOGGER.info(
+            "PipeDataNodeTaskBuilder: When extractor uses snapshot model, 
'realtime-first' is defaulted to 'false' to prevent premature halt before 
transfer completion.");
+      }
       return;
     }
 
     if (isRealtime) {
-      LOGGER.warn(
-          "PipeDataNodeTaskBuilder: When 'inclusion' includes 'data.delete', 
'realtime-first' set to 'true' may result in data synchronization issues after 
deletion.");
+      if (insertionDeletionListeningOptionPair.right) {
+        LOGGER.warn(
+            "PipeDataNodeTaskBuilder: When 'inclusion' includes 'data.delete', 
'realtime-first' set to 'true' may result in data synchronization issues after 
deletion.");
+      } else {
+        LOGGER.warn(
+            "PipeDataNodeTaskBuilder: When extractor uses snapshot model, 
'realtime-first' set to 'true' may cause prevent premature halt before transfer 
completion.");
+      }
     }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
index 18673931a96..5e77505186c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
@@ -44,6 +44,8 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Objects;
+
 public class PipeConnectorSubtask extends PipeAbstractConnectorSubtask {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeConnectorSubtask.class);
@@ -102,11 +104,12 @@ public class PipeConnectorSubtask extends 
PipeAbstractConnectorSubtask {
     }
 
     try {
-      if (event == null) {
-        if (System.currentTimeMillis() - lastHeartbeatEventInjectTime
-            > CRON_HEARTBEAT_EVENT_INJECT_INTERVAL_MILLISECONDS) {
-          transferHeartbeatEvent(CRON_HEARTBEAT_EVENT);
-        }
+      if (System.currentTimeMillis() - lastHeartbeatEventInjectTime
+          > CRON_HEARTBEAT_EVENT_INJECT_INTERVAL_MILLISECONDS) {
+        transferHeartbeatEvent(CRON_HEARTBEAT_EVENT);
+      }
+
+      if (Objects.isNull(event)) {
         return false;
       }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
index 74ab3a1ebdc..50160ba61f6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
@@ -45,6 +45,7 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_TS_FILE_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PLAIN_BATCH_DELAY_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE;
@@ -54,6 +55,7 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LEADER_CACHE_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_FORMAT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_DELAY_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_DELAY_MS_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_SIZE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LEADER_CACHE_ENABLE_KEY;
 
@@ -87,34 +89,31 @@ public class PipeTransferBatchReqBuilder implements 
AutoCloseable {
                 Arrays.asList(SINK_LEADER_CACHE_ENABLE_KEY, 
CONNECTOR_LEADER_CACHE_ENABLE_KEY),
                 CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE);
 
-    final int requestMaxDelayInSeconds;
-    if (usingTsFileBatch) {
-      requestMaxDelayInSeconds =
+    final Integer requestMaxDelayInMillis =
+        parameters.getIntByKeys(CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY, 
SINK_IOTDB_BATCH_DELAY_MS_KEY);
+    if (Objects.isNull(requestMaxDelayInMillis)) {
+      final int requestMaxDelayInSeconds =
           parameters.getIntOrDefault(
               Arrays.asList(CONNECTOR_IOTDB_BATCH_DELAY_KEY, 
SINK_IOTDB_BATCH_DELAY_KEY),
-              CONNECTOR_IOTDB_TS_FILE_BATCH_DELAY_DEFAULT_VALUE);
+              usingTsFileBatch
+                  ? CONNECTOR_IOTDB_TS_FILE_BATCH_DELAY_DEFAULT_VALUE
+                  : CONNECTOR_IOTDB_PLAIN_BATCH_DELAY_DEFAULT_VALUE);
       requestMaxDelayInMs =
           requestMaxDelayInSeconds < 0 ? Integer.MAX_VALUE : 
requestMaxDelayInSeconds * 1000;
-      requestMaxBatchSizeInBytes =
-          parameters.getLongOrDefault(
-              Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY, 
SINK_IOTDB_BATCH_SIZE_KEY),
-              CONNECTOR_IOTDB_TS_FILE_BATCH_SIZE_DEFAULT_VALUE);
-      this.defaultBatch =
-          new PipeTabletEventTsFileBatch(requestMaxDelayInMs, 
requestMaxBatchSizeInBytes);
     } else {
-      requestMaxDelayInSeconds =
-          parameters.getIntOrDefault(
-              Arrays.asList(CONNECTOR_IOTDB_BATCH_DELAY_KEY, 
SINK_IOTDB_BATCH_DELAY_KEY),
-              CONNECTOR_IOTDB_PLAIN_BATCH_DELAY_DEFAULT_VALUE);
       requestMaxDelayInMs =
-          requestMaxDelayInSeconds < 0 ? Integer.MAX_VALUE : 
requestMaxDelayInSeconds * 1000;
-      requestMaxBatchSizeInBytes =
-          parameters.getLongOrDefault(
-              Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY, 
SINK_IOTDB_BATCH_SIZE_KEY),
-              CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE);
-      this.defaultBatch =
-          new PipeTabletEventPlainBatch(requestMaxDelayInMs, 
requestMaxBatchSizeInBytes);
+          requestMaxDelayInMillis < 0 ? Integer.MAX_VALUE : 
requestMaxDelayInMillis;
     }
+    requestMaxBatchSizeInBytes =
+        parameters.getLongOrDefault(
+            Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY, 
SINK_IOTDB_BATCH_SIZE_KEY),
+            usingTsFileBatch
+                ? CONNECTOR_IOTDB_TS_FILE_BATCH_SIZE_DEFAULT_VALUE
+                : CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE);
+    this.defaultBatch =
+        usingTsFileBatch
+            ? new PipeTabletEventTsFileBatch(requestMaxDelayInMs, 
requestMaxBatchSizeInBytes)
+            : new PipeTabletEventPlainBatch(requestMaxDelayInMs, 
requestMaxBatchSizeInBytes);
   }
 
   /**
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
index cdecc272772..870cea0ee87 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
@@ -47,10 +47,12 @@ import java.util.List;
 import java.util.Objects;
 
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PLAIN_BATCH_DELAY_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_DELAY_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_DELAY_MS_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_SIZE_KEY;
 
 public abstract class PipeConsensusTransferBatchReqBuilder implements 
AutoCloseable {
@@ -73,11 +75,18 @@ public abstract class PipeConsensusTransferBatchReqBuilder 
implements AutoClosea
 
   protected PipeConsensusTransferBatchReqBuilder(
       PipeParameters parameters, TConsensusGroupId consensusGroupId, int 
thisDataNodeId) {
-    maxDelayInMs =
-        parameters.getIntOrDefault(
-                Arrays.asList(CONNECTOR_IOTDB_BATCH_DELAY_KEY, 
SINK_IOTDB_BATCH_DELAY_KEY),
-                CONNECTOR_IOTDB_PLAIN_BATCH_DELAY_DEFAULT_VALUE)
-            * 1000;
+    final Integer requestMaxDelayInMillis =
+        parameters.getIntByKeys(CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY, 
SINK_IOTDB_BATCH_DELAY_MS_KEY);
+    if (Objects.isNull(requestMaxDelayInMillis)) {
+      final int requestMaxDelayInSeconds =
+          parameters.getIntOrDefault(
+              Arrays.asList(CONNECTOR_IOTDB_BATCH_DELAY_KEY, 
SINK_IOTDB_BATCH_DELAY_KEY),
+              CONNECTOR_IOTDB_PLAIN_BATCH_DELAY_DEFAULT_VALUE);
+      maxDelayInMs =
+          requestMaxDelayInSeconds < 0 ? Integer.MAX_VALUE : 
requestMaxDelayInSeconds * 1000;
+    } else {
+      maxDelayInMs = requestMaxDelayInMillis < 0 ? Integer.MAX_VALUE : 
requestMaxDelayInMillis;
+    }
 
     this.consensusGroupId = consensusGroupId;
     this.thisDataNodeId = thisDataNodeId;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index f7884076f26..ae9207c6343 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -231,6 +231,11 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent
     this.needToReport = true;
   }
 
+  // This getter is reserved for user-defined plugins
+  public boolean isNeedToReport() {
+    return needToReport;
+  }
+
   public String getDeviceId() {
     // NonNull indicates that the internallyDecreaseResourceReferenceCount has 
not been called.
     return Objects.nonNull(tablet) ? tablet.deviceId : deviceId;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
index dbe8aada88d..1e3c9e4faf9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
@@ -28,6 +28,8 @@ import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import org.apache.iotdb.db.pipe.agent.task.PipeDataNodeTask;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 
+import java.util.concurrent.CompletableFuture;
+
 /**
  * The {@link PipeTerminateEvent} is an {@link EnrichedEvent} that controls 
the termination of pipe,
  * that is, when the historical {@link PipeTsFileInsertionEvent}s are all 
processed, this will be
@@ -91,7 +93,9 @@ public class PipeTerminateEvent extends EnrichedEvent {
 
   @Override
   public void reportProgress() {
-    PipeDataNodeAgent.task().markCompleted(pipeName, dataRegionId);
+    // To avoid deadlock
+    CompletableFuture.runAsync(
+        () -> PipeDataNodeAgent.task().markCompleted(pipeName, dataRegionId));
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index dba9561105a..753075f3b20 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -26,7 +26,6 @@ import 
org.apache.iotdb.commons.consensus.index.impl.TimeWindowStateProgressInde
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
-import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
 import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
 import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
@@ -118,7 +117,6 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
   private boolean shouldExtractInsertion;
   private boolean shouldTransferModFile; // Whether to transfer mods
 
-  private boolean shouldTerminatePipeOnAllHistoricalEventsConsumed;
   private boolean isTerminateSignalSent = false;
 
   private volatile boolean hasBeenStarted = false;
@@ -329,19 +327,9 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
                 || // Should extract deletion
                 listeningOptionPair.getRight());
 
-    final String extractorModeValue =
-        parameters.getStringOrDefault(
-            Arrays.asList(
-                PipeExtractorConstant.EXTRACTOR_MODE_KEY, 
PipeExtractorConstant.SOURCE_MODE_KEY),
-            PipeExtractorConstant.EXTRACTOR_MODE_DEFAULT_VALUE);
-    shouldTerminatePipeOnAllHistoricalEventsConsumed =
-        
extractorModeValue.equalsIgnoreCase(PipeExtractorConstant.EXTRACTOR_MODE_QUERY_VALUE)
-            || extractorModeValue.equalsIgnoreCase(
-                PipeExtractorConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE);
-
     if (LOGGER.isInfoEnabled()) {
       LOGGER.info(
-          "Pipe {}@{}: historical data extraction time range, start time 
{}({}), end time {}({}), sloppy pattern {}, sloppy time range {}, should 
transfer mod file {}, should terminate pipe on all historical events consumed 
{}",
+          "Pipe {}@{}: historical data extraction time range, start time 
{}({}), end time {}({}), sloppy pattern {}, sloppy time range {}, should 
transfer mod file {}",
           pipeName,
           dataRegionId,
           DateTimeUtils.convertLongToDate(historicalDataExtractionStartTime),
@@ -350,8 +338,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
           historicalDataExtractionEndTime,
           sloppyPattern,
           sloppyTimeRange,
-          shouldTransferModFile,
-          shouldTerminatePipeOnAllHistoricalEventsConsumed);
+          shouldTransferModFile);
     }
   }
 
@@ -672,9 +659,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
     // If the pendingQueue is null when the function is called, it implies 
that the extractor only
     // extracts deletion thus the historical event has nothing to consume.
     return hasBeenStarted
-        && (Objects.isNull(pendingQueue)
-            || pendingQueue.isEmpty()
-                && (!shouldTerminatePipeOnAllHistoricalEventsConsumed || 
isTerminateSignalSent));
+        && (Objects.isNull(pendingQueue) || pendingQueue.isEmpty() && 
isTerminateSignalSent);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
index dd3ca9c5bc0..bf789b9732d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
@@ -122,7 +122,7 @@ public class PipeTsFileResource implements AutoCloseable {
     return finalReferenceCount;
   }
 
-  public synchronized boolean closeIfOutOfTimeToLive() throws IOException {
+  public synchronized boolean closeIfOutOfTimeToLive() {
     if (referenceCount.get() <= 0
         && (deviceMeasurementsMap == null // Not cached yet.
             || System.currentTimeMillis() - lastUnpinToZeroTime.get()
@@ -135,7 +135,7 @@ public class PipeTsFileResource implements AutoCloseable {
   }
 
   @Override
-  public synchronized void close() throws IOException {
+  public synchronized void close() {
     if (deviceMeasurementsMap != null) {
       deviceMeasurementsMap = null;
     }
@@ -153,7 +153,15 @@ public class PipeTsFileResource implements AutoCloseable {
       allocatedMemoryBlock = null;
     }
 
-    Files.deleteIfExists(hardlinkOrCopiedFile.toPath());
+    try {
+      Files.deleteIfExists(hardlinkOrCopiedFile.toPath());
+    } catch (final Exception e) {
+      LOGGER.error(
+          "PipeTsFileResource: Failed to delete tsfile {} when closing, 
because {}. Please MANUALLY delete it.",
+          hardlinkOrCopiedFile,
+          e.getMessage(),
+          e);
+    }
 
     LOGGER.info("PipeTsFileResource: Closed tsfile {} and cleaned up.", 
hardlinkOrCopiedFile);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
index 19de7a107be..d00318ae561 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
@@ -104,7 +104,7 @@ public class PipeTsFileResourceManager {
                   entry.getValue().getReferenceCount(),
                   entry.getValue().getFileSize()));
         }
-      } catch (final IOException e) {
+      } catch (final Exception e) {
         LOGGER.warn("failed to close PipeTsFileResource when checking TTL: ", 
e);
       } finally {
         segmentLock.unlock(new File(hardlinkOrCopiedFile));
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitter.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitter.java
index ba61f03f841..9479c7a752a 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitter.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitter.java
@@ -62,19 +62,21 @@ public class PipeEventCommitter {
 
     final int commitQueueSizeBeforeCommit = commitQueue.size();
     if (LOGGER.isDebugEnabled()) {
-      LOGGER.debug(
-          "COMMIT QUEUE OFFER: committer key {}, event commit id {}, last 
commit id {}, commit queue size {}",
-          committerKey,
-          event.getCommitId(),
-          lastCommitId.get(),
-          commitQueueSizeBeforeCommit);
-    } else if (commitQueueSizeBeforeCommit != 0 && commitQueueSizeBeforeCommit 
% 100 == 0) {
-      LOGGER.info(
-          "COMMIT QUEUE OFFER: committer key {}, event commit id {}, last 
commit id {}, commit queue size {}",
-          committerKey,
-          event.getCommitId(),
-          lastCommitId.get(),
-          commitQueueSizeBeforeCommit);
+      if (commitQueueSizeBeforeCommit != 0 && commitQueueSizeBeforeCommit % 
100 == 0) {
+        LOGGER.info(
+            "COMMIT QUEUE OFFER: committer key {}, event commit id {}, last 
commit id {}, commit queue size {}",
+            committerKey,
+            event.getCommitId(),
+            lastCommitId.get(),
+            commitQueueSizeBeforeCommit);
+      } else {
+        LOGGER.debug(
+            "COMMIT QUEUE OFFER: committer key {}, event commit id {}, last 
commit id {}, commit queue size {}",
+            committerKey,
+            event.getCommitId(),
+            lastCommitId.get(),
+            commitQueueSizeBeforeCommit);
+      }
     }
 
     while (!commitQueue.isEmpty()) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
index 13a9cd8a7f9..8a04d133d60 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
@@ -68,6 +68,9 @@ public class PipeConnectorConstant {
   public static final int CONNECTOR_IOTDB_PLAIN_BATCH_DELAY_DEFAULT_VALUE = 1;
   public static final int CONNECTOR_IOTDB_TS_FILE_BATCH_DELAY_DEFAULT_VALUE = 
5;
 
+  public static final String CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY = 
"connector.batch.max-delay-ms";
+  public static final String SINK_IOTDB_BATCH_DELAY_MS_KEY = 
"sink.batch.max-delay-ms";
+
   public static final String CONNECTOR_IOTDB_BATCH_SIZE_KEY = 
"connector.batch.size-bytes";
   public static final String SINK_IOTDB_BATCH_SIZE_KEY = 
"sink.batch.size-bytes";
   public static final long CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE = 16 
* MB;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
index 9dc7bac704a..6af18db77e9 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
@@ -72,6 +72,8 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_TABLET_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_TS_FILE_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY;
@@ -106,6 +108,8 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_OTHERS_RETRY_MAX_TIME_SECONDS_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_FORMAT_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_DELAY_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_DELAY_MS_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_MODE_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_SIZE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_HOST_KEY;
@@ -201,6 +205,12 @@ public abstract class IoTDBConnector implements 
PipeConnector {
         Arrays.asList(CONNECTOR_IOTDB_USERNAME_KEY, SINK_IOTDB_USERNAME_KEY),
         false);
 
+    // Check coexistence of batch.max-delay-ms and batch.max-delay-seconds
+    validator.validateSynonymAttributes(
+        Arrays.asList(CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY, 
SINK_IOTDB_BATCH_DELAY_MS_KEY),
+        Arrays.asList(CONNECTOR_IOTDB_BATCH_DELAY_KEY, 
SINK_IOTDB_BATCH_DELAY_KEY),
+        false);
+
     username =
         parameters.getStringOrDefault(
             Arrays.asList(

Reply via email to