This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch fix-leak-1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d8cc34270ab87233dd6e258700ce9a12e318f346
Author: Steve Yurong Su <[email protected]>
AuthorDate: Fri Aug 30 00:55:16 2024 +0800

    Pipe: Fix reference count leak when tasks restart (#13250)
    
    (cherry picked from commit b7369ee2e17fd90e9b03457506461299be4d64ab)
---
 .../protocol/IoTDBConfigRegionAirGapConnector.java |  20 ++--
 .../protocol/IoTDBConfigRegionConnector.java       |  20 ++--
 .../pipe/execution/PipeConfigNodeSubtask.java      |  14 ++-
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  |  97 ++++++----------
 .../evolvable/batch/PipeTabletEventBatch.java      |   7 +-
 .../airgap/IoTDBDataNodeAirGapConnector.java       |  10 +-
 .../airgap/IoTDBDataRegionAirGapConnector.java     |  30 ++---
 .../airgap/IoTDBSchemaRegionAirGapConnector.java   |  10 +-
 .../protocol/legacy/IoTDBLegacyPipeConnector.java  |  30 ++---
 .../connector/protocol/opcua/OpcUaConnector.java   |  18 +--
 .../pipeconsensus/PipeConsensusAsyncConnector.java |   4 -
 .../pipeconsensus/PipeConsensusSyncConnector.java  |  10 +-
 .../async/IoTDBDataRegionAsyncConnector.java       |   6 -
 .../thrift/sync/IoTDBDataNodeSyncConnector.java    |  10 +-
 .../thrift/sync/IoTDBDataRegionSyncConnector.java  |  30 ++---
 .../thrift/sync/IoTDBSchemaRegionConnector.java    |  10 +-
 .../protocol/websocket/WebSocketConnector.java     |   9 +-
 .../protocol/writeback/WriteBackConnector.java     |  18 +--
 .../PipeHistoricalDataRegionTsFileExtractor.java   |  42 ++++---
 .../realtime/assigner/PipeDataRegionAssigner.java  |  25 +++-
 .../db/pipe/metric/PipeDataRegionEventCounter.java |  12 +-
 .../pipe/task/connection/PipeEventCollector.java   |  17 ++-
 .../subtask/connector/PipeConnectorSubtask.java    |  34 ++----
 .../PipeRealtimePriorityBlockingQueue.java         |  34 +++++-
 .../subtask/processor/PipeProcessorSubtask.java    |   9 +-
 .../apache/iotdb/commons/conf/CommonConfig.java    |  10 ++
 .../iotdb/commons/conf/CommonDescriptor.java       |   5 +
 .../iotdb/commons/pipe/config/PipeConfig.java      |   7 ++
 .../iotdb/commons/pipe/event/EnrichedEvent.java    | 128 +++++++++++++--------
 .../commons/pipe/metric/PipeEventCounter.java      |   6 +-
 .../pipe/task/connection/BlockingPendingQueue.java |  32 +++++-
 .../task/subtask/PipeAbstractConnectorSubtask.java |   5 +-
 .../pipe/task/subtask/PipeReportableSubtask.java   |   2 +-
 .../commons/pipe/task/subtask/PipeSubtask.java     |  31 ++++-
 34 files changed, 447 insertions(+), 305 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
index 0d466605d8f..e615616e2ba 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
@@ -137,12 +137,12 @@ public class IoTDBConfigRegionAirGapConnector extends 
IoTDBAirGapConnector {
       final AirGapSocket socket,
       final PipeConfigRegionWritePlanEvent pipeConfigRegionWritePlanEvent)
       throws PipeException, IOException {
+    // We increase the reference count for this event to determine if the 
event may be released.
+    if (!pipeConfigRegionWritePlanEvent.increaseReferenceCount(
+        IoTDBConfigRegionAirGapConnector.class.getName())) {
+      return;
+    }
     try {
-      // We increase the reference count for this event to determine if the 
event may be released.
-      if (!pipeConfigRegionWritePlanEvent.increaseReferenceCount(
-          IoTDBConfigRegionAirGapConnector.class.getName())) {
-        return;
-      }
       doTransfer(socket, pipeConfigRegionWritePlanEvent);
     } finally {
       pipeConfigRegionWritePlanEvent.decreaseReferenceCount(
@@ -178,12 +178,12 @@ public class IoTDBConfigRegionAirGapConnector extends 
IoTDBAirGapConnector {
   private void doTransferWrapper(
       final AirGapSocket socket, final PipeConfigRegionSnapshotEvent 
pipeConfigRegionSnapshotEvent)
       throws PipeException, IOException {
+    // We increase the reference count for this event to determine if the 
event may be released.
+    if (!pipeConfigRegionSnapshotEvent.increaseReferenceCount(
+        IoTDBConfigRegionAirGapConnector.class.getName())) {
+      return;
+    }
     try {
-      // We increase the reference count for this event to determine if the 
event may be released.
-      if (!pipeConfigRegionSnapshotEvent.increaseReferenceCount(
-          IoTDBConfigRegionAirGapConnector.class.getName())) {
-        return;
-      }
       doTransfer(socket, pipeConfigRegionSnapshotEvent);
     } finally {
       pipeConfigRegionSnapshotEvent.decreaseReferenceCount(
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
index 1e2f3c19e5a..3e4ca8ac9c5 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
@@ -114,12 +114,12 @@ public class IoTDBConfigRegionConnector extends 
IoTDBSslSyncConnector {
 
   private void doTransferWrapper(
       final PipeConfigRegionWritePlanEvent pipeConfigRegionWritePlanEvent) 
throws PipeException {
+    // We increase the reference count for this event to determine if the 
event may be released.
+    if (!pipeConfigRegionWritePlanEvent.increaseReferenceCount(
+        IoTDBConfigRegionConnector.class.getName())) {
+      return;
+    }
     try {
-      // We increase the reference count for this event to determine if the 
event may be released.
-      if (!pipeConfigRegionWritePlanEvent.increaseReferenceCount(
-          IoTDBConfigRegionConnector.class.getName())) {
-        return;
-      }
       doTransfer(pipeConfigRegionWritePlanEvent);
     } finally {
       pipeConfigRegionWritePlanEvent.decreaseReferenceCount(
@@ -175,12 +175,12 @@ public class IoTDBConfigRegionConnector extends 
IoTDBSslSyncConnector {
 
   private void doTransferWrapper(final PipeConfigRegionSnapshotEvent 
pipeConfigRegionSnapshotEvent)
       throws PipeException, IOException {
+    // We increase the reference count for this event to determine if the 
event may be released.
+    if (!pipeConfigRegionSnapshotEvent.increaseReferenceCount(
+        IoTDBConfigRegionConnector.class.getName())) {
+      return;
+    }
     try {
-      // We increase the reference count for this event to determine if the 
event may be released.
-      if (!pipeConfigRegionSnapshotEvent.increaseReferenceCount(
-          IoTDBConfigRegionConnector.class.getName())) {
-        return;
-      }
       doTransfer(pipeConfigRegionSnapshotEvent);
     } finally {
       pipeConfigRegionSnapshotEvent.decreaseReferenceCount(
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
index 02cb5373359..cfd16480f62 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
@@ -25,6 +25,7 @@ import 
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeC
 import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
 import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskRuntimeEnvironment;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
 import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
 import org.apache.iotdb.commons.pipe.progress.PipeEventCommitManager;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
@@ -181,10 +182,11 @@ public class PipeConfigNodeSubtask extends 
PipeAbstractConnectorSubtask {
         return false;
       }
 
-      outputPipeConnector.transfer(event);
-      decreaseReferenceCountAndReleaseLastEvent(true);
-
-      PipeConfigRegionConnectorMetrics.getInstance().markConfigEvent(taskID);
+      if (!(event instanceof ProgressReportEvent)) {
+        outputPipeConnector.transfer(event);
+        PipeConfigRegionConnectorMetrics.getInstance().markConfigEvent(taskID);
+      }
+      decreaseReferenceCountAndReleaseLastEvent(event, true);
     } catch (final PipeException e) {
       setLastExceptionEvent(event);
       if (!isClosed.get()) {
@@ -194,7 +196,7 @@ public class PipeConfigNodeSubtask extends 
PipeAbstractConnectorSubtask {
             "{} in pipe transfer, ignored because pipe is dropped.",
             e.getClass().getSimpleName(),
             e);
-        clearReferenceCountAndReleaseLastEvent();
+        clearReferenceCountAndReleaseLastEvent(event);
       }
     } catch (final Exception e) {
       setLastExceptionEvent(event);
@@ -205,7 +207,7 @@ public class PipeConfigNodeSubtask extends 
PipeAbstractConnectorSubtask {
             e);
       } else {
         LOGGER.info("Exception in pipe transfer, ignored because pipe is 
dropped.", e);
-        clearReferenceCountAndReleaseLastEvent();
+        clearReferenceCountAndReleaseLastEvent(event);
       }
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 888f7c50c96..8a73a19684f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -83,6 +83,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
@@ -92,6 +93,9 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
 
   protected static final IoTDBConfig CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
 
+  private static final AtomicLong LAST_FORCED_RESTART_TIME =
+      new AtomicLong(System.currentTimeMillis());
+
   ////////////////////////// Pipe Task Management Entry 
//////////////////////////
 
   @Override
@@ -475,18 +479,32 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     if (!tryWriteLockWithTimeOut(5)) {
       return;
     }
+
+    final Set<PipeMeta> stuckPipes;
     try {
-      restartAllStuckPipesInternal();
+      stuckPipes = findAllStuckPipes();
     } finally {
       releaseWriteLock();
     }
+
+    // Restart all stuck pipes
+    stuckPipes.parallelStream().forEach(this::restartStuckPipe);
   }
 
-  private void restartAllStuckPipesInternal() {
+  private Set<PipeMeta> findAllStuckPipes() {
+    final Set<PipeMeta> stuckPipes = new HashSet<>();
+
+    if (System.currentTimeMillis() - LAST_FORCED_RESTART_TIME.get()
+        > 
PipeConfig.getInstance().getPipeSubtaskExecutorForcedRestartIntervalMs()) {
+      LAST_FORCED_RESTART_TIME.set(System.currentTimeMillis());
+      for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
+        stuckPipes.add(pipeMeta);
+      }
+      return stuckPipes;
+    }
+
     final Map<String, IoTDBDataRegionExtractor> taskId2ExtractorMap =
         PipeDataRegionExtractorMetrics.getInstance().getExtractorMap();
-
-    final Set<PipeMeta> stuckPipes = new HashSet<>();
     for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
       final String pipeName = pipeMeta.getStaticMeta().getPipeName();
       final List<IoTDBDataRegionExtractor> extractors =
@@ -525,8 +543,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
       }
     }
 
-    // Restart all stuck pipes
-    stuckPipes.parallelStream().forEach(this::restartStuckPipe);
+    return stuckPipes;
   }
 
   private boolean mayDeletedTsFileSizeReachDangerousThreshold() {
@@ -565,59 +582,21 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
 
   private void restartStuckPipe(final PipeMeta pipeMeta) {
     LOGGER.warn("Pipe {} will be restarted because of stuck.", 
pipeMeta.getStaticMeta());
-    final long startTime = System.currentTimeMillis();
-    changePipeStatusBeforeRestart(pipeMeta.getStaticMeta().getPipeName());
-    handleSinglePipeMetaChangesInternal(pipeMeta);
-    LOGGER.warn(
-        "Pipe {} was restarted because of stuck, time cost: {} ms.",
-        pipeMeta.getStaticMeta(),
-        System.currentTimeMillis() - startTime);
-  }
-
-  private void changePipeStatusBeforeRestart(final String pipeName) {
-    final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
-    final Map<Integer, PipeTask> pipeTasks = 
pipeTaskManager.getPipeTasks(pipeMeta.getStaticMeta());
-    final Set<Integer> taskRegionIds = new HashSet<>(pipeTasks.keySet());
-    final Set<Integer> dataRegionIds =
-        StorageEngine.getInstance().getAllDataRegionIds().stream()
-            .map(DataRegionId::getId)
-            .collect(Collectors.toSet());
-    final Set<PipeTask> dataRegionPipeTasks =
-        taskRegionIds.stream()
-            .filter(dataRegionIds::contains)
-            .map(regionId -> 
pipeTaskManager.removePipeTask(pipeMeta.getStaticMeta(), regionId))
-            .filter(Objects::nonNull)
-            .collect(Collectors.toSet());
-
-    // Drop data region tasks
-    dataRegionPipeTasks.parallelStream().forEach(PipeTask::drop);
-
-    // Stop schema region tasks
-    
pipeTaskManager.getPipeTasks(pipeMeta.getStaticMeta()).values().parallelStream()
-        .forEach(PipeTask::stop);
-
-    // Re-create data region tasks
-    dataRegionPipeTasks.parallelStream()
-        .forEach(
-            pipeTask -> {
-              final PipeTask newPipeTask =
-                  new PipeDataNodeTaskBuilder(
-                          pipeMeta.getStaticMeta(),
-                          ((PipeDataNodeTask) pipeTask).getRegionId(),
-                          pipeMeta
-                              .getRuntimeMeta()
-                              .getConsensusGroupId2TaskMetaMap()
-                              .get(((PipeDataNodeTask) 
pipeTask).getRegionId()))
-                      .build();
-              newPipeTask.create();
-              pipeTaskManager.addPipeTask(
-                  pipeMeta.getStaticMeta(),
-                  ((PipeDataNodeTask) pipeTask).getRegionId(),
-                  newPipeTask);
-            });
-
-    // Set pipe meta status to STOPPED
-    pipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
+    acquireWriteLock();
+    try {
+      final long startTime = System.currentTimeMillis();
+      final PipeMeta originalPipeMeta = pipeMeta.deepCopy();
+      handleDropPipe(pipeMeta.getStaticMeta().getPipeName());
+      handleSinglePipeMetaChanges(originalPipeMeta);
+      LOGGER.warn(
+          "Pipe {} was restarted because of stuck, time cost: {} ms.",
+          originalPipeMeta.getStaticMeta(),
+          System.currentTimeMillis() - startTime);
+    } catch (final Exception e) {
+      LOGGER.warn("Failed to restart stuck pipe {}.", 
pipeMeta.getStaticMeta(), e);
+    } finally {
+      releaseWriteLock();
+    }
   }
 
   ///////////////////////// Terminate Logic /////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
index bae98a9efa2..75d4b2f9e3c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
@@ -26,6 +26,8 @@ import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 
 import org.apache.tsfile.exception.write.WriteProcessException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -34,6 +36,8 @@ import java.util.Objects;
 
 public abstract class PipeTabletEventBatch implements AutoCloseable {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTabletEventBatch.class);
+
   protected final List<EnrichedEvent> events = new ArrayList<>();
 
   private final int maxDelayInMs;
@@ -74,8 +78,7 @@ public abstract class PipeTabletEventBatch implements 
AutoCloseable {
           firstEventProcessingTime = System.currentTimeMillis();
         }
       } else {
-        ((EnrichedEvent) event)
-            
.decreaseReferenceCount(PipeTransferBatchReqBuilder.class.getName(), false);
+        LOGGER.warn("Cannot increase reference count for event: {}, ignore it 
in batch.", event);
       }
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
index 221fa516448..33be8e002f4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
@@ -113,12 +113,12 @@ public abstract class IoTDBDataNodeAirGapConnector 
extends IoTDBAirGapConnector
       final AirGapSocket socket,
       final PipeSchemaRegionWritePlanEvent pipeSchemaRegionWritePlanEvent)
       throws PipeException, IOException {
+    // We increase the reference count for this event to determine if the 
event may be released.
+    if (!pipeSchemaRegionWritePlanEvent.increaseReferenceCount(
+        IoTDBDataNodeAirGapConnector.class.getName())) {
+      return;
+    }
     try {
-      // We increase the reference count for this event to determine if the 
event may be released.
-      if (!pipeSchemaRegionWritePlanEvent.increaseReferenceCount(
-          IoTDBDataNodeAirGapConnector.class.getName())) {
-        return;
-      }
       doTransfer(socket, pipeSchemaRegionWritePlanEvent);
     } finally {
       pipeSchemaRegionWritePlanEvent.decreaseReferenceCount(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
index 9874741c7f0..7b6c4376d0a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
@@ -150,12 +150,12 @@ public class IoTDBDataRegionAirGapConnector extends 
IoTDBDataNodeAirGapConnector
       final AirGapSocket socket,
       final PipeInsertNodeTabletInsertionEvent 
pipeInsertNodeTabletInsertionEvent)
       throws PipeException, WALPipeException, IOException {
+    // We increase the reference count for this event to determine if the 
event may be released.
+    if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(
+        IoTDBDataRegionAirGapConnector.class.getName())) {
+      return;
+    }
     try {
-      // We increase the reference count for this event to determine if the 
event may be released.
-      if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(
-          IoTDBDataRegionAirGapConnector.class.getName())) {
-        return;
-      }
       doTransfer(socket, pipeInsertNodeTabletInsertionEvent);
     } finally {
       pipeInsertNodeTabletInsertionEvent.decreaseReferenceCount(
@@ -195,12 +195,12 @@ public class IoTDBDataRegionAirGapConnector extends 
IoTDBDataNodeAirGapConnector
   private void doTransferWrapper(
       final AirGapSocket socket, final PipeRawTabletInsertionEvent 
pipeRawTabletInsertionEvent)
       throws PipeException, IOException {
+    // We increase the reference count for this event to determine if the 
event may be released.
+    if (!pipeRawTabletInsertionEvent.increaseReferenceCount(
+        IoTDBDataRegionAirGapConnector.class.getName())) {
+      return;
+    }
     try {
-      // We increase the reference count for this event to determine if the 
event may be released.
-      if (!pipeRawTabletInsertionEvent.increaseReferenceCount(
-          IoTDBDataRegionAirGapConnector.class.getName())) {
-        return;
-      }
       doTransfer(socket, pipeRawTabletInsertionEvent);
     } finally {
       pipeRawTabletInsertionEvent.decreaseReferenceCount(
@@ -233,12 +233,12 @@ public class IoTDBDataRegionAirGapConnector extends 
IoTDBDataNodeAirGapConnector
   private void doTransferWrapper(
       final AirGapSocket socket, final PipeTsFileInsertionEvent 
pipeTsFileInsertionEvent)
       throws PipeException, IOException {
+    // We increase the reference count for this event to determine if the 
event may be released.
+    if (!pipeTsFileInsertionEvent.increaseReferenceCount(
+        IoTDBDataRegionAirGapConnector.class.getName())) {
+      return;
+    }
     try {
-      // We increase the reference count for this event to determine if the 
event may be released.
-      if (!pipeTsFileInsertionEvent.increaseReferenceCount(
-          IoTDBDataRegionAirGapConnector.class.getName())) {
-        return;
-      }
       doTransfer(socket, pipeTsFileInsertionEvent);
     } finally {
       pipeTsFileInsertionEvent.decreaseReferenceCount(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java
index e53423779d0..342807a5dfb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java
@@ -86,12 +86,12 @@ public class IoTDBSchemaRegionAirGapConnector extends 
IoTDBDataNodeAirGapConnect
   private void doTransferWrapper(
       final AirGapSocket socket, final PipeSchemaRegionSnapshotEvent 
pipeSchemaRegionSnapshotEvent)
       throws PipeException, IOException {
+    // We increase the reference count for this event to determine if the 
event may be released.
+    if (!pipeSchemaRegionSnapshotEvent.increaseReferenceCount(
+        IoTDBSchemaRegionAirGapConnector.class.getName())) {
+      return;
+    }
     try {
-      // We increase the reference count for this event to determine if the 
event may be released.
-      if (!pipeSchemaRegionSnapshotEvent.increaseReferenceCount(
-          IoTDBSchemaRegionAirGapConnector.class.getName())) {
-        return;
-      }
       doTransfer(socket, pipeSchemaRegionSnapshotEvent);
     } finally {
       pipeSchemaRegionSnapshotEvent.decreaseReferenceCount(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
index 5edf485adfb..62a1f040624 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
@@ -313,12 +313,12 @@ public class IoTDBLegacyPipeConnector implements 
PipeConnector {
   private void doTransferWrapper(
       final PipeInsertNodeTabletInsertionEvent pipeInsertNodeInsertionEvent)
       throws IoTDBConnectionException, StatementExecutionException {
+    // We increase the reference count for this event to determine if the 
event may be released.
+    if (!pipeInsertNodeInsertionEvent.increaseReferenceCount(
+        IoTDBLegacyPipeConnector.class.getName())) {
+      return;
+    }
     try {
-      // We increase the reference count for this event to determine if the 
event may be released.
-      if (!pipeInsertNodeInsertionEvent.increaseReferenceCount(
-          IoTDBLegacyPipeConnector.class.getName())) {
-        return;
-      }
       doTransfer(pipeInsertNodeInsertionEvent);
     } finally {
       pipeInsertNodeInsertionEvent.decreaseReferenceCount(
@@ -344,12 +344,12 @@ public class IoTDBLegacyPipeConnector implements 
PipeConnector {
 
   private void doTransferWrapper(final PipeRawTabletInsertionEvent 
pipeRawTabletInsertionEvent)
       throws PipeException, IoTDBConnectionException, 
StatementExecutionException {
+    // We increase the reference count for this event to determine if the 
event may be released.
+    if (!pipeRawTabletInsertionEvent.increaseReferenceCount(
+        IoTDBLegacyPipeConnector.class.getName())) {
+      return;
+    }
     try {
-      // We increase the reference count for this event to determine if the 
event may be released.
-      if (!pipeRawTabletInsertionEvent.increaseReferenceCount(
-          IoTDBLegacyPipeConnector.class.getName())) {
-        return;
-      }
       doTransfer(pipeRawTabletInsertionEvent);
     } finally {
       pipeRawTabletInsertionEvent.decreaseReferenceCount(
@@ -369,12 +369,12 @@ public class IoTDBLegacyPipeConnector implements 
PipeConnector {
 
   private void doTransferWrapper(final PipeTsFileInsertionEvent 
pipeTsFileInsertionEvent)
       throws PipeException, TException, IOException {
+    // We increase the reference count for this event to determine if the 
event may be released.
+    if (!pipeTsFileInsertionEvent.increaseReferenceCount(
+        IoTDBLegacyPipeConnector.class.getName())) {
+      return;
+    }
     try {
-      // We increase the reference count for this event to determine if the 
event may be released.
-      if (!pipeTsFileInsertionEvent.increaseReferenceCount(
-          IoTDBLegacyPipeConnector.class.getName())) {
-        return;
-      }
       doTransfer(pipeTsFileInsertionEvent);
     } finally {
       pipeTsFileInsertionEvent.decreaseReferenceCount(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
index 390b2fe73bb..1885f181677 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
@@ -204,12 +204,12 @@ public class OpcUaConnector implements PipeConnector {
   private void transferTabletWrapper(
       final PipeInsertNodeTabletInsertionEvent 
pipeInsertNodeTabletInsertionEvent)
       throws UaException {
+    // We increase the reference count for this event to determine if the 
event may be released.
+    if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(
+        OpcUaConnector.class.getName())) {
+      return;
+    }
     try {
-      // We increase the reference count for this event to determine if the 
event may be released.
-      if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(
-          OpcUaConnector.class.getName())) {
-        return;
-      }
       for (final Tablet tablet : 
pipeInsertNodeTabletInsertionEvent.convertToTablets()) {
         nameSpace.transfer(tablet);
       }
@@ -221,11 +221,11 @@ public class OpcUaConnector implements PipeConnector {
 
   private void transferTabletWrapper(final PipeRawTabletInsertionEvent 
pipeRawTabletInsertionEvent)
       throws UaException {
+    // We increase the reference count for this event to determine if the 
event may be released.
+    if 
(!pipeRawTabletInsertionEvent.increaseReferenceCount(OpcUaConnector.class.getName()))
 {
+      return;
+    }
     try {
-      // We increase the reference count for this event to determine if the 
event may be released.
-      if 
(!pipeRawTabletInsertionEvent.increaseReferenceCount(OpcUaConnector.class.getName()))
 {
-        return;
-      }
       nameSpace.transfer(pipeRawTabletInsertionEvent.convertToTablet());
     } finally {
       
pipeRawTabletInsertionEvent.decreaseReferenceCount(OpcUaConnector.class.getName(),
 false);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
index 85ba921e24d..e4489799652 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
@@ -274,8 +274,6 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector implements Conse
       // We increase the reference count for this event to determine if the 
event may be released.
       if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(
           PipeConsensusAsyncConnector.class.getName())) {
-        pipeInsertNodeTabletInsertionEvent.decreaseReferenceCount(
-            PipeConsensusAsyncConnector.class.getName(), false);
         return;
       }
 
@@ -354,8 +352,6 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector implements Conse
     // We increase the reference count for this event to determine if the 
event may be released.
     if (!pipeTsFileInsertionEvent.increaseReferenceCount(
         PipeConsensusAsyncConnector.class.getName())) {
-      pipeTsFileInsertionEvent.decreaseReferenceCount(
-          PipeConsensusAsyncConnector.class.getName(), false);
       return;
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
index ad7ae47b302..3d274812091 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
@@ -224,12 +224,12 @@ public class PipeConsensusSyncConnector extends 
IoTDBConnector {
   private void doTransferWrapper(
       final PipeInsertNodeTabletInsertionEvent 
pipeInsertNodeTabletInsertionEvent)
       throws PipeException {
+    // We increase the reference count for this event to determine if the 
event may be released.
+    if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(
+        PipeConsensusSyncConnector.class.getName())) {
+      return;
+    }
     try {
-      // We increase the reference count for this event to determine if the 
event may be released.
-      if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(
-          PipeConsensusSyncConnector.class.getName())) {
-        return;
-      }
       doTransfer(pipeInsertNodeTabletInsertionEvent);
     } finally {
       pipeInsertNodeTabletInsertionEvent.decreaseReferenceCount(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 083f12485ba..5e13c4e0243 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -214,8 +214,6 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
       // We increase the reference count for this event to determine if the 
event may be released.
       if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(
           IoTDBDataRegionAsyncConnector.class.getName())) {
-        pipeInsertNodeTabletInsertionEvent.decreaseReferenceCount(
-            IoTDBDataRegionAsyncConnector.class.getName(), false);
         return;
       }
 
@@ -240,8 +238,6 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
       // We increase the reference count for this event to determine if the 
event may be released.
       if (!pipeRawTabletInsertionEvent.increaseReferenceCount(
           IoTDBDataRegionAsyncConnector.class.getName())) {
-        pipeRawTabletInsertionEvent.decreaseReferenceCount(
-            IoTDBDataRegionAsyncConnector.class.getName(), false);
         return;
       }
 
@@ -318,8 +314,6 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
     // We increase the reference count for this event to determine if the 
event may be released.
     if (!pipeTsFileInsertionEvent.increaseReferenceCount(
         IoTDBDataRegionAsyncConnector.class.getName())) {
-      pipeTsFileInsertionEvent.decreaseReferenceCount(
-          IoTDBDataRegionAsyncConnector.class.getName(), false);
       return;
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
index 9580346d25b..294e69a769a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
@@ -104,12 +104,12 @@ public abstract class IoTDBDataNodeSyncConnector extends 
IoTDBSslSyncConnector {
 
   protected void doTransferWrapper(
       final PipeSchemaRegionWritePlanEvent pipeSchemaRegionWritePlanEvent) 
throws PipeException {
+    // We increase the reference count for this event to determine if the 
event may be released.
+    if (!pipeSchemaRegionWritePlanEvent.increaseReferenceCount(
+        IoTDBDataNodeSyncConnector.class.getName())) {
+      return;
+    }
     try {
-      // We increase the reference count for this event to determine if the 
event may be released.
-      if (!pipeSchemaRegionWritePlanEvent.increaseReferenceCount(
-          IoTDBDataNodeSyncConnector.class.getName())) {
-        return;
-      }
       doTransfer(pipeSchemaRegionWritePlanEvent);
     } finally {
       pipeSchemaRegionWritePlanEvent.decreaseReferenceCount(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
index a92790c5e80..772a55b0e17 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
@@ -268,12 +268,12 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
   private void doTransferWrapper(
       final PipeInsertNodeTabletInsertionEvent 
pipeInsertNodeTabletInsertionEvent)
       throws PipeException {
+    // We increase the reference count for this event to determine if the 
event may be released.
+    if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(
+        IoTDBDataRegionSyncConnector.class.getName())) {
+      return;
+    }
     try {
-      // We increase the reference count for this event to determine if the 
event may be released.
-      if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(
-          IoTDBDataRegionSyncConnector.class.getName())) {
-        return;
-      }
       doTransfer(pipeInsertNodeTabletInsertionEvent);
     } finally {
       pipeInsertNodeTabletInsertionEvent.decreaseReferenceCount(
@@ -336,12 +336,12 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
 
   private void doTransferWrapper(final PipeRawTabletInsertionEvent 
pipeRawTabletInsertionEvent)
       throws PipeException {
+    // We increase the reference count for this event to determine if the 
event may be released.
+    if (!pipeRawTabletInsertionEvent.increaseReferenceCount(
+        IoTDBDataRegionSyncConnector.class.getName())) {
+      return;
+    }
     try {
-      // We increase the reference count for this event to determine if the 
event may be released.
-      if (!pipeRawTabletInsertionEvent.increaseReferenceCount(
-          IoTDBDataRegionSyncConnector.class.getName())) {
-        return;
-      }
       doTransfer(pipeRawTabletInsertionEvent);
     } finally {
       pipeRawTabletInsertionEvent.decreaseReferenceCount(
@@ -395,12 +395,12 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
 
   private void doTransferWrapper(final PipeTsFileInsertionEvent 
pipeTsFileInsertionEvent)
       throws PipeException, IOException {
+    // We increase the reference count for this event to determine if the 
event may be released.
+    if (!pipeTsFileInsertionEvent.increaseReferenceCount(
+        IoTDBDataRegionSyncConnector.class.getName())) {
+      return;
+    }
     try {
-      // We increase the reference count for this event to determine if the 
event may be released.
-      if (!pipeTsFileInsertionEvent.increaseReferenceCount(
-          IoTDBDataRegionSyncConnector.class.getName())) {
-        return;
-      }
       doTransfer(
           Collections.singletonMap(
               new Pair<>(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java
index 878430e027b..f70e18c0651 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java
@@ -74,12 +74,12 @@ public class IoTDBSchemaRegionConnector extends 
IoTDBDataNodeSyncConnector {
 
   private void doTransferWrapper(final PipeSchemaRegionSnapshotEvent 
pipeSchemaRegionSnapshotEvent)
       throws PipeException, IOException {
+    // We increase the reference count for this event to determine if the 
event may be released.
+    if (!pipeSchemaRegionSnapshotEvent.increaseReferenceCount(
+        IoTDBSchemaRegionConnector.class.getName())) {
+      return;
+    }
     try {
-      // We increase the reference count for this event to determine if the 
event may be released.
-      if (!pipeSchemaRegionSnapshotEvent.increaseReferenceCount(
-          IoTDBSchemaRegionConnector.class.getName())) {
-        return;
-      }
       doTransfer(pipeSchemaRegionSnapshotEvent);
     } finally {
       pipeSchemaRegionSnapshotEvent.decreaseReferenceCount(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
index e1e5a5dd44c..cf258452346 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
@@ -118,8 +118,13 @@ public class WebSocketConnector implements PipeConnector {
       return;
     }
 
-    ((EnrichedEvent) tabletInsertionEvent)
-        .increaseReferenceCount(WebSocketConnector.class.getName());
+    if (!((EnrichedEvent) tabletInsertionEvent)
+        .increaseReferenceCount(WebSocketConnector.class.getName())) {
+      LOGGER.warn(
+          "WebsocketConnector failed to increase the reference count of the 
event. Ignore it. Current event: {}.",
+          tabletInsertionEvent);
+      return;
+    }
 
     server.addEvent(tabletInsertionEvent, this);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
index 68789bd3ef5..7889b778d86 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
@@ -111,12 +111,12 @@ public class WriteBackConnector implements PipeConnector {
   private void doTransferWrapper(
       final PipeInsertNodeTabletInsertionEvent 
pipeInsertNodeTabletInsertionEvent)
       throws PipeException, WALPipeException {
+    // We increase the reference count for this event to determine if the 
event may be released.
+    if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(
+        WriteBackConnector.class.getName())) {
+      return;
+    }
     try {
-      // We increase the reference count for this event to determine if the 
event may be released.
-      if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(
-          WriteBackConnector.class.getName())) {
-        return;
-      }
       doTransfer(pipeInsertNodeTabletInsertionEvent);
     } finally {
       pipeInsertNodeTabletInsertionEvent.decreaseReferenceCount(
@@ -155,11 +155,11 @@ public class WriteBackConnector implements PipeConnector {
 
   private void doTransferWrapper(final PipeRawTabletInsertionEvent 
pipeRawTabletInsertionEvent)
       throws PipeException {
+    // We increase the reference count for this event to determine if the 
event may be released.
+    if 
(!pipeRawTabletInsertionEvent.increaseReferenceCount(WriteBackConnector.class.getName()))
 {
+      return;
+    }
     try {
-      // We increase the reference count for this event to determine if the 
event may be released.
-      if 
(!pipeRawTabletInsertionEvent.increaseReferenceCount(WriteBackConnector.class.getName()))
 {
-        return;
-      }
       doTransfer(pipeRawTabletInsertionEvent);
     } finally {
       
pipeRawTabletInsertionEvent.decreaseReferenceCount(WriteBackConnector.class.getName(),
 false);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index ca1888b0200..16f6c6b7d91 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -574,11 +574,17 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
 
     final TsFileResource resource = pendingQueue.poll();
     if (resource == null) {
-      isTerminateSignalSent = true;
       final PipeTerminateEvent terminateEvent =
           new PipeTerminateEvent(pipeName, creationTime, pipeTaskMeta, 
dataRegionId);
-      terminateEvent.increaseReferenceCount(
-          PipeHistoricalDataRegionTsFileExtractor.class.getName());
+      if (!terminateEvent.increaseReferenceCount(
+          PipeHistoricalDataRegionTsFileExtractor.class.getName())) {
+        LOGGER.warn(
+            "Pipe {}@{}: failed to increase reference count for terminate 
event, will resend it",
+            pipeName,
+            dataRegionId);
+        return null;
+      }
+      isTerminateSignalSent = true;
       return terminateEvent;
     }
 
@@ -603,18 +609,28 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
       event.skipParsingTime();
     }
 
-    
event.increaseReferenceCount(PipeHistoricalDataRegionTsFileExtractor.class.getName());
     try {
-      PipeDataNodeResourceManager.tsfile().unpinTsFileResource(resource);
-    } catch (final IOException e) {
-      LOGGER.warn(
-          "Pipe {}@{}: failed to unpin TsFileResource after creating event, 
original path: {}",
-          pipeName,
-          dataRegionId,
-          resource.getTsFilePath());
+      final boolean isReferenceCountIncreased =
+          
event.increaseReferenceCount(PipeHistoricalDataRegionTsFileExtractor.class.getName());
+      if (!isReferenceCountIncreased) {
+        LOGGER.warn(
+            "Pipe {}@{}: failed to increase reference count for historical 
event {}, will discard it",
+            pipeName,
+            dataRegionId,
+            event);
+      }
+      return isReferenceCountIncreased ? event : null;
+    } finally {
+      try {
+        PipeDataNodeResourceManager.tsfile().unpinTsFileResource(resource);
+      } catch (final IOException e) {
+        LOGGER.warn(
+            "Pipe {}@{}: failed to unpin TsFileResource after creating event, 
original path: {}",
+            pipeName,
+            dataRegionId,
+            resource.getTsFilePath());
+      }
     }
-
-    return event;
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index 762577d2f84..c36a8877dff 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -32,10 +32,15 @@ import 
org.apache.iotdb.db.pipe.pattern.CachedSchemaPatternMatcher;
 import org.apache.iotdb.db.pipe.pattern.PipeDataRegionMatcher;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.Closeable;
 
 public class PipeDataRegionAssigner implements Closeable {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeDataRegionAssigner.class);
+
   private static final int nonForwardingEventsProgressReportInterval =
       
PipeConfig.getInstance().getPipeNonForwardingEventsProgressReportInterval();
 
@@ -64,7 +69,11 @@ public class PipeDataRegionAssigner implements Closeable {
   }
 
   public void publishToAssign(final PipeRealtimeEvent event) {
-    event.increaseReferenceCount(PipeDataRegionAssigner.class.getName());
+    if (!event.increaseReferenceCount(PipeDataRegionAssigner.class.getName())) 
{
+      LOGGER.warn(
+          "The reference count of the realtime event {} cannot be increased, 
skipping it.", event);
+      return;
+    }
 
     disruptor.publish(event);
 
@@ -99,7 +108,12 @@ public class PipeDataRegionAssigner implements Closeable {
                         extractor.getRealtimeDataExtractionStartTime(),
                         extractor.getRealtimeDataExtractionEndTime());
                 reportEvent.bindProgressIndex(event.getProgressIndex());
-                
reportEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName());
+                if 
(!reportEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName())) {
+                  LOGGER.warn(
+                      "The reference count of the event {} cannot be 
increased, skipping it.",
+                      reportEvent);
+                  return;
+                }
                 
extractor.extract(PipeRealtimeEventFactory.createRealtimeEvent(reportEvent));
                 return;
               }
@@ -118,7 +132,12 @@ public class PipeDataRegionAssigner implements Closeable {
                     
.disableMod4NonTransferPipes(extractor.isShouldTransferModFile());
               }
 
-              
copiedEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName());
+              if 
(!copiedEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName())) {
+                LOGGER.warn(
+                    "The reference count of the event {} cannot be increased, 
skipping it.",
+                    copiedEvent);
+                return;
+              }
               extractor.extract(copiedEvent);
 
               if (innerEvent instanceof PipeHeartbeatEvent) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataRegionEventCounter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataRegionEventCounter.java
index 0aed61801ef..b1405628f37 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataRegionEventCounter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataRegionEventCounter.java
@@ -36,17 +36,17 @@ public class PipeDataRegionEventCounter extends 
PipeEventCounter {
   private final AtomicInteger pipeHeartbeatEventCount = new AtomicInteger(0);
 
   @Override
-  public Integer getTsFileInsertionEventCount() {
+  public int getTsFileInsertionEventCount() {
     return tsFileInsertionEventCount.get();
   }
 
   @Override
-  public Integer getTabletInsertionEventCount() {
+  public int getTabletInsertionEventCount() {
     return tabletInsertionEventCount.get();
   }
 
   @Override
-  public Integer getPipeHeartbeatEventCount() {
+  public int getPipeHeartbeatEventCount() {
     return pipeHeartbeatEventCount.get();
   }
 
@@ -76,11 +76,11 @@ public class PipeDataRegionEventCounter extends 
PipeEventCounter {
       return;
     }
     if (event instanceof PipeHeartbeatEvent) {
-      pipeHeartbeatEventCount.decrementAndGet();
+      pipeHeartbeatEventCount.getAndUpdate(count -> count > 0 ? count - 1 : 0);
     } else if (event instanceof TabletInsertionEvent) {
-      tabletInsertionEventCount.decrementAndGet();
+      tabletInsertionEventCount.getAndUpdate(count -> count > 0 ? count - 1 : 
0);
     } else if (event instanceof TsFileInsertionEvent) {
-      tsFileInsertionEventCount.decrementAndGet();
+      tsFileInsertionEventCount.getAndUpdate(count -> count > 0 ? count - 1 : 
0);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
index c4c91f80f58..de0ede99623 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
@@ -56,6 +56,7 @@ public class PipeEventCollector implements EventCollector {
 
   private final AtomicInteger collectInvocationCount = new AtomicInteger(0);
   private boolean hasNoGeneratedEvent = true;
+  private boolean isFailedToIncreaseReferenceCount = false;
 
   public PipeEventCollector(
       final UnboundedBlockingPendingQueue<Event> pendingQueue,
@@ -162,10 +163,12 @@ public class PipeEventCollector implements EventCollector 
{
   }
 
   private void collectEvent(final Event event) {
-    collectInvocationCount.incrementAndGet();
-
     if (event instanceof EnrichedEvent) {
-      ((EnrichedEvent) 
event).increaseReferenceCount(PipeEventCollector.class.getName());
+      if (!((EnrichedEvent) 
event).increaseReferenceCount(PipeEventCollector.class.getName())) {
+        LOGGER.warn("PipeEventCollector: The event {} is already released, 
skipping it.", event);
+        isFailedToIncreaseReferenceCount = true;
+        return;
+      }
 
       // Assign a commit id for this event in order to report progress in 
order.
       PipeEventCommitManager.getInstance()
@@ -180,11 +183,13 @@ public class PipeEventCollector implements EventCollector 
{
     }
 
     pendingQueue.directOffer(event);
+    collectInvocationCount.incrementAndGet();
   }
 
-  public void resetCollectInvocationCountAndGenerateFlag() {
+  public void resetFlags() {
     collectInvocationCount.set(0);
     hasNoGeneratedEvent = true;
+    isFailedToIncreaseReferenceCount = false;
   }
 
   public long getCollectInvocationCount() {
@@ -198,4 +203,8 @@ public class PipeEventCollector implements EventCollector {
   public boolean hasNoGeneratedEvent() {
     return hasNoGeneratedEvent;
   }
+
+  public boolean isFailedToIncreaseReferenceCount() {
+    return isFailedToIncreaseReferenceCount;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index a832d43f73e..d55f18f9838 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -32,7 +32,6 @@ import 
org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import 
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
 import org.apache.iotdb.db.pipe.metric.PipeDataRegionConnectorMetrics;
 import org.apache.iotdb.db.pipe.metric.PipeSchemaRegionConnectorMetrics;
-import org.apache.iotdb.db.pipe.task.connection.PipeEventCollector;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.utils.ErrorHandlingUtils;
 import org.apache.iotdb.pipe.api.PipeConnector;
@@ -100,6 +99,10 @@ public class PipeConnectorSubtask extends 
PipeAbstractConnectorSubtask {
             : UserDefinedEnrichedEvent.maybeOf(inputPendingQueue.waitedPoll());
     // Record this event for retrying on connection failure or other exceptions
     setLastEvent(event);
+    if (event instanceof EnrichedEvent && ((EnrichedEvent) 
event).isReleased()) {
+      lastEvent = null;
+      return true;
+    }
 
     try {
       if (event == null) {
@@ -133,7 +136,7 @@ public class PipeConnectorSubtask extends 
PipeAbstractConnectorSubtask {
                 : event);
       }
 
-      decreaseReferenceCountAndReleaseLastEvent(true);
+      decreaseReferenceCountAndReleaseLastEvent(event, true);
     } catch (final PipeException e) {
       if (!isClosed.get()) {
         setLastExceptionEvent(event);
@@ -143,7 +146,7 @@ public class PipeConnectorSubtask extends 
PipeAbstractConnectorSubtask {
             "{} in pipe transfer, ignored because the connector subtask is 
dropped.",
             e.getClass().getSimpleName(),
             e);
-        clearReferenceCountAndReleaseLastEvent();
+        clearReferenceCountAndReleaseLastEvent(event);
       }
     } catch (final Exception e) {
       if (!isClosed.get()) {
@@ -160,7 +163,7 @@ public class PipeConnectorSubtask extends 
PipeAbstractConnectorSubtask {
       } else {
         LOGGER.info(
             "Exception in pipe transfer, ignored because the connector subtask 
is dropped.", e);
-        clearReferenceCountAndReleaseLastEvent();
+        clearReferenceCountAndReleaseLastEvent(event);
       }
     }
 
@@ -204,13 +207,7 @@ public class PipeConnectorSubtask extends 
PipeAbstractConnectorSubtask {
           ErrorHandlingUtils.getRootCause(e).getMessage(),
           e);
     } finally {
-      inputPendingQueue.forEach(
-          event -> {
-            if (event instanceof EnrichedEvent) {
-              ((EnrichedEvent) 
event).clearReferenceCount(PipeEventCollector.class.getName());
-            }
-          });
-      inputPendingQueue.clear();
+      inputPendingQueue.discardAllEvents();
 
       // Should be called after outputPipeConnector.close()
       super.close();
@@ -223,18 +220,9 @@ public class PipeConnectorSubtask extends 
PipeAbstractConnectorSubtask {
    */
   public void discardEventsOfPipe(final String pipeNameToDrop) {
     // Try to remove the events as much as possible
-    inputPendingQueue.removeIf(
-        event -> {
-          if (event instanceof EnrichedEvent
-              && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName())) 
{
-            ((EnrichedEvent) event)
-                
.clearReferenceCount(IoTDBDataRegionAsyncConnector.class.getName());
-            return true;
-          }
-          return false;
-        });
-
-    // synchronized to use the lastEvent and lastExceptionEvent
+    inputPendingQueue.discardEventsOfPipe(pipeNameToDrop);
+
+    // synchronized to use the lastEvent & lastExceptionEvent
     synchronized (this) {
       // Here we discard the last event, and re-submit the pipe task to avoid 
that the pipe task has
       // stopped submission but will not be stopped by critical exceptions, 
because when it acquires
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
index a169cb6a338..6b33c248bfb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.task.subtask.connector;
 
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.task.connection.BlockingPendingQueue;
 import 
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import org.apache.iotdb.db.pipe.metric.PipeDataRegionEventCounter;
@@ -33,7 +34,6 @@ import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
-import java.util.function.Predicate;
 
 public class PipeRealtimePriorityBlockingQueue extends 
UnboundedBlockingPendingQueue<Event> {
 
@@ -153,9 +153,35 @@ public class PipeRealtimePriorityBlockingQueue extends 
UnboundedBlockingPendingQ
   }
 
   @Override
-  public void removeIf(final Predicate<? super Event> filter) {
-    super.removeIf(filter);
-    pendingQueue.removeIf(filter);
+  public void discardAllEvents() {
+    super.discardAllEvents();
+    tsfileInsertEventDeque.removeIf(
+        event -> {
+          if (event instanceof EnrichedEvent) {
+            if (((EnrichedEvent) 
event).clearReferenceCount(BlockingPendingQueue.class.getName())) {
+              eventCounter.decreaseEventCount(event);
+            }
+          }
+          return true;
+        });
+    eventCounter.reset();
+  }
+
+  @Override
+  public void discardEventsOfPipe(final String pipeNameToDrop) {
+    super.discardEventsOfPipe(pipeNameToDrop);
+    tsfileInsertEventDeque.removeIf(
+        event -> {
+          if (event instanceof EnrichedEvent
+              && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName())) 
{
+            if (((EnrichedEvent) event)
+                
.clearReferenceCount(PipeRealtimePriorityBlockingQueue.class.getName())) {
+              eventCounter.decreaseEventCount(event);
+            }
+            return true;
+          }
+          return false;
+        });
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
index 812b47a978a..ff17a11d8b1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
@@ -129,7 +129,7 @@ public class PipeProcessorSubtask extends 
PipeReportableSubtask {
       return false;
     }
 
-    outputEventCollector.resetCollectInvocationCountAndGenerateFlag();
+    outputEventCollector.resetFlags();
     try {
       // event can be supplied after the subtask is closed, so we need to 
check isClosed here
       if (!isClosed.get()) {
@@ -168,6 +168,9 @@ public class PipeProcessorSubtask extends 
PipeReportableSubtask {
               // of the event must be zero in the processor stage, at this 
time, the progress of the
               // event needs to be reported.
               && outputEventCollector.hasNoGeneratedEvent()
+              // If the event's reference count cannot be increased, it means 
that the event has
+              // been released, and the progress of the event can not be 
reported.
+              && !outputEventCollector.isFailedToIncreaseReferenceCount()
               // Events generated from consensusPipe's transferred data should 
never be reported.
               && !(pipeProcessor instanceof PipeConsensusProcessor);
       if (shouldReport
@@ -182,7 +185,7 @@ public class PipeProcessorSubtask extends 
PipeReportableSubtask {
         PipeEventCommitManager.getInstance()
             .enrichWithCommitterKeyAndCommitId((EnrichedEvent) event, 
creationTime, regionId);
       }
-      decreaseReferenceCountAndReleaseLastEvent(shouldReport);
+      decreaseReferenceCountAndReleaseLastEvent(event, shouldReport);
     } catch (final PipeRuntimeOutOfMemoryCriticalException e) {
       LOGGER.info(
           "Temporarily out of memory in pipe event processing, will wait for 
the memory to release.",
@@ -201,7 +204,7 @@ public class PipeProcessorSubtask extends 
PipeReportableSubtask {
             e);
       } else {
         LOGGER.info("Exception in pipe event processing, ignored because pipe 
is dropped.", e);
-        clearReferenceCountAndReleaseLastEvent();
+        clearReferenceCountAndReleaseLastEvent(event);
       }
     }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index d3dba643e26..ef8f013f7a1 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -208,6 +208,7 @@ public class CommonConfig {
   private long pipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration = 10 * 
1000L;
   private long pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs = 1000;
   private long pipeSubtaskExecutorCronHeartbeatEventIntervalSeconds = 20;
+  private long pipeSubtaskExecutorForcedRestartIntervalMs = Long.MAX_VALUE;
 
   private int pipeExtractorAssignerDisruptorRingBufferSize = 65536;
   private long pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes = 50; 
// 50B
@@ -883,6 +884,15 @@ public class CommonConfig {
         pipeSubtaskExecutorCronHeartbeatEventIntervalSeconds;
   }
 
+  public long getPipeSubtaskExecutorForcedRestartIntervalMs() {
+    return pipeSubtaskExecutorForcedRestartIntervalMs;
+  }
+
+  public void setPipeSubtaskExecutorForcedRestartIntervalMs(
+      long pipeSubtaskExecutorForcedRestartIntervalMs) {
+    this.pipeSubtaskExecutorForcedRestartIntervalMs = 
pipeSubtaskExecutorForcedRestartIntervalMs;
+  }
+
   public int getPipeRealTimeQueuePollHistoryThreshold() {
     return pipeRealTimeQueuePollHistoryThreshold;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 5183ab1b776..76e0a853dc2 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -318,6 +318,11 @@ public class CommonDescriptor {
             properties.getProperty(
                 "pipe_subtask_executor_cron_heartbeat_event_interval_seconds",
                 
String.valueOf(config.getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds()))));
+    config.setPipeSubtaskExecutorForcedRestartIntervalMs(
+        Long.parseLong(
+            properties.getProperty(
+                "pipe_subtask_executor_forced_restart_interval_ms",
+                
String.valueOf(config.getPipeSubtaskExecutorForcedRestartIntervalMs()))));
 
     config.setPipeExtractorAssignerDisruptorRingBufferSize(
         Integer.parseInt(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index fc9fb0911a0..449bd149e67 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -92,6 +92,10 @@ public class PipeConfig {
     return 
COMMON_CONFIG.getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds();
   }
 
+  public long getPipeSubtaskExecutorForcedRestartIntervalMs() {
+    return COMMON_CONFIG.getPipeSubtaskExecutorForcedRestartIntervalMs();
+  }
+
   /////////////////////////////// Extractor ///////////////////////////////
 
   public int getPipeExtractorAssignerDisruptorRingBufferSize() {
@@ -341,6 +345,9 @@ public class PipeConfig {
     LOGGER.info(
         "PipeSubtaskExecutorCronHeartbeatEventIntervalSeconds: {}",
         getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds());
+    LOGGER.info(
+        "PipeSubtaskExecutorForcedRestartIntervalMs: {}",
+        getPipeSubtaskExecutorForcedRestartIntervalMs());
 
     LOGGER.info(
         "PipeExtractorAssignerDisruptorRingBufferSize: {}",
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
index cbdbef1e4e9..57a2ee6a6eb 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
@@ -67,7 +67,7 @@ public abstract class EnrichedEvent implements Event {
   protected boolean isPatternParsed;
   protected boolean isTimeParsed;
 
-  protected boolean shouldReportOnCommit = true;
+  protected volatile boolean shouldReportOnCommit = true;
   protected List<Supplier<Void>> onCommittedHooks = new ArrayList<>();
 
   protected EnrichedEvent(
@@ -106,29 +106,32 @@ public abstract class EnrichedEvent implements Event {
    *     {@code false} otherwise; {@link EnrichedEvent#referenceCount} will be 
incremented
    *     regardless of the circumstances
    */
-  public boolean increaseReferenceCount(final String holderMessage) {
+  public synchronized boolean increaseReferenceCount(final String 
holderMessage) {
     boolean isSuccessful = true;
-    synchronized (this) {
-      if (isReleased.get()) {
-        LOGGER.warn(
-            "re-increase reference count to event that has already been 
released: {}, stack trace: {}",
-            coreReportMessage(),
-            Thread.currentThread().getStackTrace());
-        isSuccessful = false;
-        // Here we still increase the reference count, to remain consistent 
with the behavior after
-        // internal increase failure.
-        referenceCount.incrementAndGet();
-      } else {
-        if (referenceCount.get() == 0) {
-          // We assume that this function will not throw any exceptions.
-          isSuccessful = 
internallyIncreaseResourceReferenceCount(holderMessage);
-        }
-        referenceCount.incrementAndGet();
-      }
+
+    if (isReleased.get()) {
+      LOGGER.warn(
+          "re-increase reference count to event that has already been 
released: {}, stack trace: {}",
+          coreReportMessage(),
+          Thread.currentThread().getStackTrace());
+      isSuccessful = false;
+      return isSuccessful;
     }
-    if (!isSuccessful) {
-      LOGGER.warn("increase reference count failed, EnrichedEvent: {}", 
coreReportMessage());
+
+    if (referenceCount.get() == 0) {
+      // We assume that this function will not throw any exceptions.
+      isSuccessful = internallyIncreaseResourceReferenceCount(holderMessage);
+    }
+
+    if (isSuccessful) {
+      referenceCount.incrementAndGet();
+    } else {
+      LOGGER.warn(
+          "increase reference count failed, EnrichedEvent: {}, stack trace: 
{}",
+          coreReportMessage(),
+          Thread.currentThread().getStackTrace());
     }
+
     return isSuccessful;
   }
 
@@ -156,31 +159,53 @@ public abstract class EnrichedEvent implements Event {
    *     {@code false} otherwise; {@link EnrichedEvent#referenceCount} will be 
decremented
    *     regardless of the circumstances
    */
-  public boolean decreaseReferenceCount(final String holderMessage, final 
boolean shouldReport) {
+  public synchronized boolean decreaseReferenceCount(
+      final String holderMessage, final boolean shouldReport) {
     boolean isSuccessful = true;
-    synchronized (this) {
-      if (referenceCount.get() == 1 && !isReleased.get()) {
-        // We assume that this function will not throw any exceptions.
-        isSuccessful = internallyDecreaseResourceReferenceCount(holderMessage);
-        if (!shouldReport) {
-          shouldReportOnCommit = false;
-        }
-        PipeEventCommitManager.getInstance().commit(this, committerKey);
+
+    if (isReleased.get()) {
+      LOGGER.warn(
+          "decrease reference count to event that has already been released: 
{}, stack trace: {}",
+          coreReportMessage(),
+          Thread.currentThread().getStackTrace());
+      isSuccessful = false;
+      return isSuccessful;
+    }
+
+    if (referenceCount.get() == 1) {
+      // We assume that this function will not throw any exceptions.
+      if (!internallyDecreaseResourceReferenceCount(holderMessage)) {
+        LOGGER.warn(
+            "resource reference count is decreased to 0, but failed to release 
the resource, EnrichedEvent: {}, stack trace: {}",
+            coreReportMessage(),
+            Thread.currentThread().getStackTrace());
       }
-      final int newReferenceCount = referenceCount.decrementAndGet();
-      if (newReferenceCount == 0) {
-        isReleased.set(true);
+      if (!shouldReport) {
+        shouldReportOnCommit = false;
       }
+      PipeEventCommitManager.getInstance().commit(this, committerKey);
+    }
+
+    // No matter whether the resource is released, we should decrease the 
reference count.
+    final int newReferenceCount = referenceCount.decrementAndGet();
+    if (newReferenceCount <= 0) {
+      isReleased.set(true);
+      isSuccessful = newReferenceCount == 0;
       if (newReferenceCount < 0) {
-        LOGGER.debug(
+        LOGGER.warn(
             "reference count is decreased to {}, event: {}, stack trace: {}",
             newReferenceCount,
             coreReportMessage(),
             Thread.currentThread().getStackTrace());
+        referenceCount.set(0);
       }
     }
+
     if (!isSuccessful) {
-      LOGGER.warn("decrease reference count failed, EnrichedEvent: {}", 
coreReportMessage());
+      LOGGER.warn(
+          "decrease reference count failed, EnrichedEvent: {}, stack trace: 
{}",
+          coreReportMessage(),
+          Thread.currentThread().getStackTrace());
     }
     return isSuccessful;
   }
@@ -195,20 +220,29 @@ public abstract class EnrichedEvent implements Event {
    *     {@code false} otherwise; {@link EnrichedEvent#referenceCount} will be 
reset to zero
    *     regardless of the circumstances
    */
-  public boolean clearReferenceCount(final String holderMessage) {
-    boolean isSuccessful = true;
-    synchronized (this) {
-      if (referenceCount.get() >= 1 && !isReleased.get()) {
-        // We assume that this function will not throw any exceptions.
-        isSuccessful = internallyDecreaseResourceReferenceCount(holderMessage);
-        isReleased.set(true);
-      }
-      referenceCount.set(0);
+  public synchronized boolean clearReferenceCount(final String holderMessage) {
+    if (isReleased.get()) {
+      LOGGER.warn(
+          "clear reference count to event that has already been released: {}, 
stack trace: {}",
+          coreReportMessage(),
+          Thread.currentThread().getStackTrace());
+      return false;
     }
-    if (!isSuccessful) {
-      LOGGER.warn("clear reference count failed, EnrichedEvent: {}", 
coreReportMessage());
+
+    if (referenceCount.get() >= 1) {
+      shouldReportOnCommit = false;
+      // We assume that this function will not throw any exceptions.
+      if (!internallyDecreaseResourceReferenceCount(holderMessage)) {
+        LOGGER.warn(
+            "resource reference count is decreased to 0, but failed to release 
the resource, EnrichedEvent: {}, stack trace: {}",
+            coreReportMessage(),
+            Thread.currentThread().getStackTrace());
+      }
     }
-    return isSuccessful;
+
+    referenceCount.set(0);
+    isReleased.set(true);
+    return true;
   }
 
   /**
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/metric/PipeEventCounter.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/metric/PipeEventCounter.java
index c62e1ef60ff..591ec500dae 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/metric/PipeEventCounter.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/metric/PipeEventCounter.java
@@ -23,15 +23,15 @@ import org.apache.iotdb.pipe.api.event.Event;
 
 public abstract class PipeEventCounter {
 
-  public Integer getTsFileInsertionEventCount() {
+  public int getTsFileInsertionEventCount() {
     return 0;
   }
 
-  public Integer getTabletInsertionEventCount() {
+  public int getTabletInsertionEventCount() {
     return 0;
   }
 
-  public Integer getPipeHeartbeatEventCount() {
+  public int getPipeHeartbeatEventCount() {
     return 0;
   }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/BlockingPendingQueue.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/BlockingPendingQueue.java
index 04983a984d9..c67e80181f9 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/BlockingPendingQueue.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/BlockingPendingQueue.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.commons.pipe.task.connection;
 
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.metric.PipeEventCounter;
 import org.apache.iotdb.pipe.api.event.Event;
 
@@ -29,7 +30,6 @@ import org.slf4j.LoggerFactory;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
-import java.util.function.Predicate;
 
 public abstract class BlockingPendingQueue<E extends Event> {
 
@@ -40,7 +40,7 @@ public abstract class BlockingPendingQueue<E extends Event> {
 
   protected final BlockingQueue<E> pendingQueue;
 
-  private final PipeEventCounter eventCounter;
+  protected final PipeEventCounter eventCounter;
 
   protected BlockingPendingQueue(
       final BlockingQueue<E> pendingQueue, final PipeEventCounter 
eventCounter) {
@@ -106,12 +106,36 @@ public abstract class BlockingPendingQueue<E extends 
Event> {
     eventCounter.reset();
   }
 
+  /** DO NOT FORGET to set eventCounter to new value after invoking this 
method. */
   public void forEach(final Consumer<? super E> action) {
     pendingQueue.forEach(action);
   }
 
-  public void removeIf(final Predicate<? super E> filter) {
-    pendingQueue.removeIf(filter);
+  public void discardAllEvents() {
+    pendingQueue.removeIf(
+        event -> {
+          if (event instanceof EnrichedEvent) {
+            if (((EnrichedEvent) 
event).clearReferenceCount(BlockingPendingQueue.class.getName())) {
+              eventCounter.decreaseEventCount(event);
+            }
+          }
+          return true;
+        });
+    eventCounter.reset();
+  }
+
+  public void discardEventsOfPipe(final String pipeNameToDrop) {
+    pendingQueue.removeIf(
+        event -> {
+          if (event instanceof EnrichedEvent
+              && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName())) 
{
+            if (((EnrichedEvent) 
event).clearReferenceCount(BlockingPendingQueue.class.getName())) {
+              eventCounter.decreaseEventCount(event);
+            }
+            return true;
+          }
+          return false;
+        });
   }
 
   public boolean isEmpty() {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeAbstractConnectorSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeAbstractConnectorSubtask.java
index 25ddb438806..6e520de1b76 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeAbstractConnectorSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeAbstractConnectorSubtask.java
@@ -84,7 +84,7 @@ public abstract class PipeAbstractConnectorSubtask extends 
PipeReportableSubtask
       LOGGER.info(
           "onFailure in pipe transfer, ignored because the connector subtask 
is dropped.",
           throwable);
-      clearReferenceCountAndReleaseLastEvent();
+      clearReferenceCountAndReleaseLastEvent(null);
       return;
     }
 
@@ -227,7 +227,8 @@ public abstract class PipeAbstractConnectorSubtask extends 
PipeReportableSubtask
 
   protected synchronized void 
clearReferenceCountAndReleaseLastExceptionEvent() {
     if (lastExceptionEvent != null) {
-      if (lastExceptionEvent instanceof EnrichedEvent) {
+      if (lastExceptionEvent instanceof EnrichedEvent
+          && !((EnrichedEvent) lastExceptionEvent).isReleased()) {
         ((EnrichedEvent) 
lastExceptionEvent).clearReferenceCount(PipeSubtask.class.getName());
       }
       lastExceptionEvent = null;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java
index c366b4d62e4..cff90b88481 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java
@@ -39,7 +39,7 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
   public synchronized void onFailure(final Throwable throwable) {
     if (isClosed.get()) {
       LOGGER.info("onFailure in pipe subtask, ignored because pipe is 
dropped.", throwable);
-      clearReferenceCountAndReleaseLastEvent();
+      clearReferenceCountAndReleaseLastEvent(null);
       return;
     }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeSubtask.java
index a6f11bcae9b..493e272e799 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeSubtask.java
@@ -148,26 +148,47 @@ public abstract class PipeSubtask
 
   @Override
   public void close() {
-    clearReferenceCountAndReleaseLastEvent();
+    clearReferenceCountAndReleaseLastEvent(null);
   }
 
   protected synchronized void decreaseReferenceCountAndReleaseLastEvent(
-      final boolean shouldReport) {
+      final Event actualLastEvent, final boolean shouldReport) {
+    // lastEvent may be set to null due to 
PipeConnectorSubtask#discardEventsOfPipe
     if (lastEvent != null) {
-      if (lastEvent instanceof EnrichedEvent) {
+      if (lastEvent instanceof EnrichedEvent && !((EnrichedEvent) 
lastEvent).isReleased()) {
         ((EnrichedEvent) lastEvent)
             .decreaseReferenceCount(PipeSubtask.class.getName(), shouldReport);
       }
       lastEvent = null;
+      return;
+    }
+
+    // If lastEvent is set to null due to 
PipeConnectorSubtask#discardEventsOfPipe (connector close)
+    // and finally exception occurs, we need to release the actual last event 
from the connector
+    // given by the parameter
+    if (actualLastEvent instanceof EnrichedEvent
+        && !((EnrichedEvent) actualLastEvent).isReleased()) {
+      ((EnrichedEvent) actualLastEvent)
+          .decreaseReferenceCount(PipeSubtask.class.getName(), shouldReport);
     }
   }
 
-  protected synchronized void clearReferenceCountAndReleaseLastEvent() {
+  protected synchronized void clearReferenceCountAndReleaseLastEvent(final 
Event actualLastEvent) {
+    // lastEvent may be set to null due to 
PipeConnectorSubtask#discardEventsOfPipe
     if (lastEvent != null) {
-      if (lastEvent instanceof EnrichedEvent) {
+      if (lastEvent instanceof EnrichedEvent && !((EnrichedEvent) 
lastEvent).isReleased()) {
         ((EnrichedEvent) 
lastEvent).clearReferenceCount(PipeSubtask.class.getName());
       }
       lastEvent = null;
+      return;
+    }
+
+    // If lastEvent is set to null due to 
PipeConnectorSubtask#discardEventsOfPipe (connector close)
+    // and finally exception occurs, we need to release the actual last event 
from the connector
+    // given by the parameter
+    if (actualLastEvent instanceof EnrichedEvent
+        && !((EnrichedEvent) actualLastEvent).isReleased()) {
+      ((EnrichedEvent) 
actualLastEvent).clearReferenceCount(PipeSubtask.class.getName());
     }
   }
 

Reply via email to