This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 7640df9a2b9 Pipe: Introduce PipeEventCommitManager to manage event 
commit order to avoid losing data during parallel connector scheduling (#11489)
7640df9a2b9 is described below

commit 7640df9a2b91e06b13931429994e72cf0caccc52
Author: Zikun Ma <[email protected]>
AuthorDate: Wed Nov 22 00:13:16 2023 +0800

    Pipe: Introduce PipeEventCommitManager to manage event commit order to 
avoid losing data during parallel connector scheduling (#11489)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../java/org/apache/iotdb/CountPointProcessor.java |   3 +-
 .../db/pipe/agent/plugin/PipePluginAgent.java      |   6 +-
 .../db/pipe/commit/PipeEventCommitManager.java     | 106 +++++++++++++++++++
 .../iotdb/db/pipe/commit/PipeEventCommitter.java   |  79 ++++++++++++++
 .../env/PipeTaskConnectorRuntimeEnvironment.java   |  34 +++++++
 ...oTDBThriftAsyncPipeTransferBatchReqBuilder.java |  49 ---------
 ...IoTDBThriftSyncPipeTransferBatchReqBuilder.java |  45 +-------
 .../builder/PipeTransferBatchReqBuilder.java       |  45 ++++++++
 .../thrift/async/IoTDBThriftAsyncConnector.java    | 113 +++++++--------------
 .../PipeTransferTabletBatchEventHandler.java       |  13 +--
 .../PipeTransferTabletInsertNodeEventHandler.java  |   3 +-
 .../PipeTransferTabletInsertionEventHandler.java   |  39 +++----
 .../handler/PipeTransferTabletRawEventHandler.java |   3 +-
 .../PipeTransferTsFileInsertionEventHandler.java   |  20 ++--
 .../protocol/websocket/WebSocketConnector.java     |  53 ++--------
 .../websocket/WebSocketConnectorServer.java        |  83 ++++++++-------
 .../apache/iotdb/db/pipe/event/EnrichedEvent.java  |  39 ++++++-
 .../event/common/heartbeat/PipeHeartbeatEvent.java |  10 +-
 .../db/pipe/event/common/row/PipeRowCollector.java |   8 +-
 .../tablet/PipeInsertNodeTabletInsertionEvent.java |  18 +++-
 .../common/tablet/PipeRawTabletInsertionEvent.java |  16 +--
 .../tablet/TabletInsertionDataContainer.java       |  10 +-
 .../common/tsfile/PipeTsFileInsertionEvent.java    |  17 +++-
 .../tsfile/TsFileInsertionDataContainer.java       |  14 ++-
 .../db/pipe/event/realtime/PipeRealtimeEvent.java  |   8 +-
 .../execution/executor/PipeSubtaskExecutor.java    |   1 +
 .../PipeHistoricalDataRegionTsFileExtractor.java   |   3 +
 .../realtime/assigner/PipeDataRegionAssigner.java  |   2 +-
 .../legacy/IoTDBLegacyPipeReceiverAgent.java       |   5 +-
 .../receiver/legacy/loader/DeletionLoader.java     |   5 +-
 .../pipe/receiver/legacy/loader/TsFileLoader.java  |   5 +-
 .../apache/iotdb/db/pipe/task/PipeTaskBuilder.java |   3 +-
 .../pipe/task/connection/PipeEventCollector.java   |  10 +-
 .../db/pipe/task/stage/PipeTaskConnectorStage.java |  17 +++-
 .../db/pipe/task/stage/PipeTaskProcessorStage.java |   2 +-
 .../iotdb/db/pipe/task/stage/PipeTaskStage.java    |   6 +-
 .../subtask/connector/PipeConnectorSubtask.java    |  11 ++
 .../connector/PipeConnectorSubtaskLifeCycle.java   |  51 +++++++++-
 .../connector/PipeConnectorSubtaskManager.java     |  32 ++++--
 .../event/TsFileInsertionDataContainerTest.java    |   2 +-
 40 files changed, 633 insertions(+), 356 deletions(-)

diff --git 
a/example/pipe-count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java
 
b/example/pipe-count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java
index 13fd6804aab..766c1ce67b0 100644
--- 
a/example/pipe-count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java
+++ 
b/example/pipe-count-point-processor/src/main/java/org/apache/iotdb/CountPointProcessor.java
@@ -71,7 +71,8 @@ public class CountPointProcessor implements PipeProcessor {
       tablet.rowSize = 1;
       tablet.addTimestamp(0, System.currentTimeMillis());
       tablet.addValue(aggregateSeries.getMeasurement(), 0, 
writePointCount.get());
-      eventCollector.collect(new PipeRawTabletInsertionEvent(tablet, false, 
null, null, false));
+      eventCollector.collect(
+          new PipeRawTabletInsertionEvent(tablet, false, null, null, null, 
false));
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
index 9ea0684e15f..7ee2f99b1ea 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
@@ -218,7 +218,7 @@ public class PipePluginAgent {
       try {
         temporaryExtractor.close();
       } catch (Exception e) {
-        LOGGER.warn("Failed to close temporary extractor: {}", e.getMessage());
+        LOGGER.warn("Failed to close temporary extractor: {}", e.getMessage(), 
e);
       }
     }
 
@@ -231,7 +231,7 @@ public class PipePluginAgent {
       try {
         temporaryProcessor.close();
       } catch (Exception e) {
-        LOGGER.warn("Failed to close temporary processor: {}", e.getMessage());
+        LOGGER.warn("Failed to close temporary processor: {}", e.getMessage(), 
e);
       }
     }
 
@@ -244,7 +244,7 @@ public class PipePluginAgent {
       try {
         temporaryConnector.close();
       } catch (Exception e) {
-        LOGGER.warn("Failed to close temporary connector: {}", e.getMessage());
+        LOGGER.warn("Failed to close temporary connector: {}", e.getMessage(), 
e);
       }
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/commit/PipeEventCommitManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/commit/PipeEventCommitManager.java
new file mode 100644
index 00000000000..8305578ba29
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/commit/PipeEventCommitManager.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.commit;
+
+import org.apache.iotdb.db.pipe.event.EnrichedEvent;
+import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class PipeEventCommitManager {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeEventCommitManager.class);
+
+  // key: pipeName_dataRegionId
+  private final Map<String, PipeEventCommitter> eventCommitterMap = new 
ConcurrentHashMap<>();
+
+  public void register(String pipeName, int dataRegionId, String 
pipePluginName) {
+    if (pipeName == null || pipePluginName == null) {
+      return;
+    }
+
+    final String committerKey = generateCommitterKey(pipeName, dataRegionId);
+    if (eventCommitterMap.containsKey(committerKey)) {
+      LOGGER.warn(
+          "Pipe with same name is already registered on this data region, 
overwriting: {}",
+          committerKey);
+    }
+    eventCommitterMap.put(committerKey, new PipeEventCommitter());
+    LOGGER.info("Pipe committer registered for pipe on data region: {}", 
committerKey);
+  }
+
+  public void deregister(String pipeName, int dataRegionId) {
+    final String committerKey = generateCommitterKey(pipeName, dataRegionId);
+    eventCommitterMap.remove(committerKey);
+    LOGGER.info("Pipe committer deregistered for pipe on data region: {}", 
committerKey);
+  }
+
+  /**
+   * Assign a commit id and a key for commit. Make sure {@code 
EnrichedEvent.pipeName} is set before
+   * calling this.
+   */
+  public void enrichWithCommitterKeyAndCommitId(EnrichedEvent event, int 
dataRegionId) {
+    if (event == null || event instanceof PipeHeartbeatEvent || 
event.getPipeName() == null) {
+      return;
+    }
+
+    final String committerKey = generateCommitterKey(event.getPipeName(), 
dataRegionId);
+    final PipeEventCommitter committer = eventCommitterMap.get(committerKey);
+    if (committer == null) {
+      return;
+    }
+    event.setCommitterKeyAndCommitId(committerKey, 
committer.generateCommitId());
+  }
+
+  public void commit(EnrichedEvent event, String committerKey) {
+    if (event == null
+        || event instanceof PipeHeartbeatEvent
+        || event.getCommitId() <= EnrichedEvent.NO_COMMIT_ID
+        || committerKey == null) {
+      return;
+    }
+
+    final PipeEventCommitter committer = eventCommitterMap.get(committerKey);
+    if (committer == null) {
+      return;
+    }
+    committer.commit(event);
+  }
+
+  private static String generateCommitterKey(String pipeName, int 
dataRegionId) {
+    return String.format("%s_%s", pipeName, dataRegionId);
+  }
+
+  private PipeEventCommitManager() {
+    // Do nothing but make it private.
+  }
+
+  private static class PipeEventCommitManagerHolder {
+    private static final PipeEventCommitManager INSTANCE = new 
PipeEventCommitManager();
+  }
+
+  public static PipeEventCommitManager getInstance() {
+    return PipeEventCommitManagerHolder.INSTANCE;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/commit/PipeEventCommitter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/commit/PipeEventCommitter.java
new file mode 100644
index 00000000000..fff2599f099
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/commit/PipeEventCommitter.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.commit;
+
+import org.apache.iotdb.db.pipe.event.EnrichedEvent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Comparator;
+import java.util.Objects;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+/** Used to queue events for one pipe of one dataRegion to commit in order. */
+public class PipeEventCommitter {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeEventCommitter.class);
+
+  private final AtomicLong commitIdGenerator = new AtomicLong(0);
+  private final AtomicLong lastCommitId = new AtomicLong(0);
+
+  private final PriorityBlockingQueue<EnrichedEvent> commitQueue =
+      new PriorityBlockingQueue<>(
+          11,
+          Comparator.comparing(
+              event ->
+                  Objects.requireNonNull(event, "committable event cannot be 
null").getCommitId()));
+
+  PipeEventCommitter() {
+    // Do nothing but make it package-private.
+  }
+
+  public synchronized long generateCommitId() {
+    return commitIdGenerator.incrementAndGet();
+  }
+
+  public synchronized void commit(EnrichedEvent event) {
+    commitQueue.offer(event);
+
+    while (!commitQueue.isEmpty()) {
+      final EnrichedEvent e = commitQueue.peek();
+
+      if (e.getCommitId() <= lastCommitId.get()) {
+        LOGGER.warn(
+            "commit id must be monotonically increasing, lastCommitId: {}, 
event: {}",
+            lastCommitId.get(),
+            e);
+        commitQueue.poll();
+        continue;
+      }
+
+      if (e.getCommitId() != lastCommitId.get() + 1) {
+        break;
+      }
+
+      e.onCommitted();
+      lastCommitId.incrementAndGet();
+      commitQueue.poll();
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskConnectorRuntimeEnvironment.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskConnectorRuntimeEnvironment.java
new file mode 100644
index 00000000000..a8fe2187750
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/plugin/env/PipeTaskConnectorRuntimeEnvironment.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.config.plugin.env;
+
+public class PipeTaskConnectorRuntimeEnvironment extends 
PipeTaskRuntimeEnvironment {
+
+  private final int regionId;
+
+  public PipeTaskConnectorRuntimeEnvironment(String pipeName, long 
creationTime, int regionId) {
+    super(pipeName, creationTime);
+    this.regionId = regionId;
+  }
+
+  public int getRegionId() {
+    return regionId;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftAsyncPipeTransferBatchReqBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftAsyncPipeTransferBatchReqBuilder.java
index d08b9d50233..1c87115420d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftAsyncPipeTransferBatchReqBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftAsyncPipeTransferBatchReqBuilder.java
@@ -19,67 +19,18 @@
 
 package org.apache.iotdb.db.pipe.connector.payload.evolvable.builder;
 
-import org.apache.iotdb.db.pipe.event.EnrichedEvent;
-import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
-import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
 public class IoTDBThriftAsyncPipeTransferBatchReqBuilder extends 
PipeTransferBatchReqBuilder {
 
-  protected final List<Long> requestCommitIds = new ArrayList<>();
-
   public IoTDBThriftAsyncPipeTransferBatchReqBuilder(PipeParameters 
parameters) {
     super(parameters);
   }
 
-  /**
-   * Try offer event into cache if the given event is not duplicated.
-   *
-   * @param event the given event
-   * @return true if the batch can be transferred
-   */
-  public boolean onEvent(TabletInsertionEvent event, long requestCommitId)
-      throws IOException, WALPipeException {
-    final TPipeTransferReq req = buildTabletInsertionReq(event);
-
-    if (requestCommitIds.isEmpty()
-        || !requestCommitIds.get(requestCommitIds.size() - 
1).equals(requestCommitId)) {
-      reqs.add(req);
-
-      if (event instanceof EnrichedEvent) {
-        ((EnrichedEvent) 
event).increaseReferenceCount(PipeTransferBatchReqBuilder.class.getName());
-      }
-      events.add(event);
-      requestCommitIds.add(requestCommitId);
-
-      if (firstEventProcessingTime == Long.MIN_VALUE) {
-        firstEventProcessingTime = System.currentTimeMillis();
-      }
-
-      bufferSize += req.getBody().length;
-    }
-
-    return bufferSize >= getMaxBatchSizeInBytes()
-        || System.currentTimeMillis() - firstEventProcessingTime >= 
maxDelayInMs;
-  }
-
-  public void onSuccess() {
-    reqs.clear();
-
-    events.clear();
-    requestCommitIds.clear();
-
-    firstEventProcessingTime = Long.MIN_VALUE;
-
-    bufferSize = 0;
-  }
-
   public List<Event> deepcopyEvents() {
     return new ArrayList<>(events);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java
index bd6244ed600..4c292749f41 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java
@@ -19,15 +19,9 @@
 
 package org.apache.iotdb.db.pipe.connector.payload.evolvable.builder;
 
-import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBThriftSyncConnector;
 import org.apache.iotdb.db.pipe.event.EnrichedEvent;
-import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
-import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
-
-import java.io.IOException;
 
 public class IoTDBThriftSyncPipeTransferBatchReqBuilder extends 
PipeTransferBatchReqBuilder {
 
@@ -35,47 +29,16 @@ public class IoTDBThriftSyncPipeTransferBatchReqBuilder 
extends PipeTransferBatc
     super(parameters);
   }
 
-  /**
-   * Try offer event into cache if the given event is not duplicated.
-   *
-   * @param event the given event
-   * @return true if the batch can be transferred
-   */
-  public boolean onEvent(TabletInsertionEvent event) throws IOException, 
WALPipeException {
-    final TPipeTransferReq req = buildTabletInsertionReq(event);
-
-    if (events.isEmpty() || !events.get(events.size() - 1).equals(event)) {
-      reqs.add(req);
-
-      if (event instanceof EnrichedEvent) {
-        ((EnrichedEvent) 
event).increaseReferenceCount(PipeTransferBatchReqBuilder.class.getName());
-      }
-      events.add(event);
-
-      if (firstEventProcessingTime == Long.MIN_VALUE) {
-        firstEventProcessingTime = System.currentTimeMillis();
-      }
-
-      bufferSize += req.getBody().length;
-    }
-
-    return bufferSize >= getMaxBatchSizeInBytes()
-        || System.currentTimeMillis() - firstEventProcessingTime >= 
maxDelayInMs;
-  }
-
+  @Override
   public void onSuccess() {
-    reqs.clear();
-
     for (final Event event : events) {
       if (event instanceof EnrichedEvent) {
         ((EnrichedEvent) event)
-            .decreaseReferenceCount(IoTDBThriftSyncConnector.class.getName(), 
true);
+            .decreaseReferenceCount(
+                IoTDBThriftSyncPipeTransferBatchReqBuilder.class.getName(), 
true);
       }
     }
-    events.clear();
-
-    firstEventProcessingTime = Long.MIN_VALUE;
 
-    bufferSize = 0;
+    super.onSuccess();
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
index ff0e066dc8a..9367ec2c184 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
@@ -22,6 +22,7 @@ package 
org.apache.iotdb.db.pipe.connector.payload.evolvable.builder;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
+import org.apache.iotdb.db.pipe.event.EnrichedEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
@@ -53,6 +54,7 @@ public abstract class PipeTransferBatchReqBuilder implements 
AutoCloseable {
 
   protected final List<TPipeTransferReq> reqs = new ArrayList<>();
   protected final List<Event> events = new ArrayList<>();
+  protected final List<Long> requestCommitIds = new ArrayList<>();
 
   // limit in delayed time
   protected final int maxDelayInMs;
@@ -98,6 +100,49 @@ public abstract class PipeTransferBatchReqBuilder 
implements AutoCloseable {
     }
   }
 
+  /**
+   * Try offer event into cache if the given event is not duplicated.
+   *
+   * @param event the given event
+   * @return true if the batch can be transferred
+   */
+  public boolean onEvent(TabletInsertionEvent event) throws IOException, 
WALPipeException {
+    if (!(event instanceof EnrichedEvent)) {
+      return false;
+    }
+
+    final TPipeTransferReq req = buildTabletInsertionReq(event);
+    final long requestCommitId = ((EnrichedEvent) event).getCommitId();
+
+    if (requestCommitIds.isEmpty()
+        || !requestCommitIds.get(requestCommitIds.size() - 
1).equals(requestCommitId)) {
+      reqs.add(req);
+      events.add(event);
+      requestCommitIds.add(requestCommitId);
+
+      ((EnrichedEvent) 
event).increaseReferenceCount(PipeTransferBatchReqBuilder.class.getName());
+
+      if (firstEventProcessingTime == Long.MIN_VALUE) {
+        firstEventProcessingTime = System.currentTimeMillis();
+      }
+
+      bufferSize += req.getBody().length;
+    }
+
+    return bufferSize >= getMaxBatchSizeInBytes()
+        || System.currentTimeMillis() - firstEventProcessingTime >= 
maxDelayInMs;
+  }
+
+  public void onSuccess() {
+    reqs.clear();
+    events.clear();
+    requestCommitIds.clear();
+
+    firstEventProcessingTime = Long.MIN_VALUE;
+
+    bufferSize = 0;
+  }
+
   public List<TPipeTransferReq> getTPipeTransferReqs() {
     return reqs;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
index 1e34ef7c4be..b51b4284c8c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
@@ -52,23 +52,17 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
-import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Comparator;
 import java.util.HashMap;
-import java.util.Optional;
-import java.util.PriorityQueue;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY;
@@ -90,13 +84,14 @@ public class IoTDBThriftAsyncConnector extends 
IoTDBConnector {
       asyncPipeDataTransferClientManager;
 
   private final IoTDBThriftSyncConnector retryConnector = new 
IoTDBThriftSyncConnector();
-  private final PriorityBlockingQueue<Pair<Long, Event>> retryEventQueue =
-      new PriorityBlockingQueue<>(11, Comparator.comparing(o -> o.left));
-
-  private final AtomicLong commitIdGenerator = new AtomicLong(0);
-  private final AtomicLong lastCommitId = new AtomicLong(0);
-  private final PriorityQueue<Pair<Long, Runnable>> commitQueue =
-      new PriorityQueue<>(Comparator.comparing(o -> o.left));
+  private final PriorityBlockingQueue<Event> retryEventQueue =
+      new PriorityBlockingQueue<>(
+          11,
+          Comparator.comparing(
+              e ->
+                  // Non-enriched events will be put at the front of the queue,
+                  // because they are more likely to be lost and need to be 
retried first.
+                  e instanceof EnrichedEvent ? ((EnrichedEvent) 
e).getCommitId() : 0));
 
   private IoTDBThriftAsyncPipeTransferBatchReqBuilder tabletBatchBuilder;
 
@@ -176,13 +171,15 @@ public class IoTDBThriftAsyncConnector extends 
IoTDBConnector {
       return;
     }
 
+    final long commitId = ((EnrichedEvent) tabletInsertionEvent).getCommitId();
+
     if (isTabletBatchModeEnabled) {
-      final long requestCommitId = commitIdGenerator.incrementAndGet();
-      if (tabletBatchBuilder.onEvent(tabletInsertionEvent, requestCommitId)) {
+      if (tabletBatchBuilder.onEvent(tabletInsertionEvent)) {
         final PipeTransferTabletBatchEventHandler 
pipeTransferTabletBatchEventHandler =
             new PipeTransferTabletBatchEventHandler(tabletBatchBuilder, this);
 
-        transfer(requestCommitId, pipeTransferTabletBatchEventHandler);
+        transfer(commitId, pipeTransferTabletBatchEventHandler);
+
         tabletBatchBuilder.onSuccess();
       }
     } else {
@@ -195,13 +192,11 @@ public class IoTDBThriftAsyncConnector extends 
IoTDBConnector {
                     pipeInsertNodeTabletInsertionEvent.getByteBuffer())
                 : PipeTransferTabletInsertNodeReq.toTPipeTransferReq(
                     pipeInsertNodeTabletInsertionEvent.getInsertNode());
-
-        final long requestCommitId = commitIdGenerator.incrementAndGet();
         final PipeTransferTabletInsertNodeEventHandler 
pipeTransferInsertNodeReqHandler =
             new PipeTransferTabletInsertNodeEventHandler(
-                requestCommitId, pipeInsertNodeTabletInsertionEvent, 
pipeTransferReq, this);
+                pipeInsertNodeTabletInsertionEvent, pipeTransferReq, this);
 
-        transfer(requestCommitId, pipeTransferInsertNodeReqHandler);
+        transfer(commitId, pipeTransferInsertNodeReqHandler);
       } else { // tabletInsertionEvent instanceof PipeRawTabletInsertionEvent
         final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
             (PipeRawTabletInsertionEvent) tabletInsertionEvent;
@@ -209,13 +204,11 @@ public class IoTDBThriftAsyncConnector extends 
IoTDBConnector {
             PipeTransferTabletRawReq.toTPipeTransferReq(
                 pipeRawTabletInsertionEvent.convertToTablet(),
                 pipeRawTabletInsertionEvent.isAligned());
-
-        final long requestCommitId = commitIdGenerator.incrementAndGet();
         final PipeTransferTabletRawEventHandler pipeTransferTabletReqHandler =
             new PipeTransferTabletRawEventHandler(
-                requestCommitId, pipeRawTabletInsertionEvent, 
pipeTransferTabletRawReq, this);
+                pipeRawTabletInsertionEvent, pipeTransferTabletRawReq, this);
 
-        transfer(requestCommitId, pipeTransferTabletReqHandler);
+        transfer(commitId, pipeTransferTabletReqHandler);
       }
     }
   }
@@ -305,12 +298,10 @@ public class IoTDBThriftAsyncConnector extends 
IoTDBConnector {
       throw new 
FileNotFoundException(pipeTsFileInsertionEvent.getTsFile().getAbsolutePath());
     }
 
-    final long requestCommitId = commitIdGenerator.incrementAndGet();
     final PipeTransferTsFileInsertionEventHandler 
pipeTransferTsFileInsertionEventHandler =
-        new PipeTransferTsFileInsertionEventHandler(
-            requestCommitId, pipeTsFileInsertionEvent, this);
+        new PipeTransferTsFileInsertionEventHandler(pipeTsFileInsertionEvent, 
this);
 
-    transfer(requestCommitId, pipeTransferTsFileInsertionEventHandler);
+    transfer(pipeTsFileInsertionEvent.getCommitId(), 
pipeTransferTsFileInsertionEventHandler);
   }
 
   private void transfer(
@@ -441,9 +432,7 @@ public class IoTDBThriftAsyncConnector extends 
IoTDBConnector {
    */
   private synchronized void transferQueuedEventsIfNecessary() throws Exception 
{
     while (!retryEventQueue.isEmpty()) {
-      final Pair<Long, Event> queuedEventPair = retryEventQueue.peek();
-      final long requestCommitId = queuedEventPair.getLeft();
-      final Event event = queuedEventPair.getRight();
+      final Event event = retryEventQueue.peek();
 
       if (event instanceof PipeInsertNodeTabletInsertionEvent) {
         retryConnector.transfer((PipeInsertNodeTabletInsertionEvent) event);
@@ -456,7 +445,10 @@ public class IoTDBThriftAsyncConnector extends 
IoTDBConnector {
             "IoTDBThriftAsyncConnector does not support transfer generic 
event: {}.", event);
       }
 
-      commit(requestCommitId, event instanceof EnrichedEvent ? (EnrichedEvent) 
event : null);
+      if (event instanceof EnrichedEvent) {
+        ((EnrichedEvent) event)
+            .decreaseReferenceCount(IoTDBThriftAsyncConnector.class.getName(), 
true);
+      }
 
       retryEventQueue.poll();
     }
@@ -481,58 +473,23 @@ public class IoTDBThriftAsyncConnector extends 
IoTDBConnector {
   }
 
   /**
-   * Commit the event. Decrease the reference count of the event. If the 
reference count is 0, the
-   * progress index of the event will be recalculated and the resources of the 
event will be
-   * released.
-   *
-   * <p>The synchronization is necessary because the commit order must be the 
same as the order of
-   * the events. Concurrent commit may cause the commit order to be 
inconsistent with the order of
-   * the events.
+   * Add failure event to retry queue.
    *
-   * @param requestCommitId commit id of the request
-   * @param enrichedEvent event to commit
+   * @param event event to retry
    */
-  public synchronized void commit(long requestCommitId, @Nullable 
EnrichedEvent enrichedEvent) {
-    commitQueue.offer(
-        new Pair<>(
-            requestCommitId,
-            () ->
-                Optional.ofNullable(enrichedEvent)
-                    .ifPresent(
-                        event ->
-                            event.decreaseReferenceCount(
-                                IoTDBThriftAsyncConnector.class.getName(), 
true))));
-
-    while (!commitQueue.isEmpty()) {
-      final Pair<Long, Runnable> committer = commitQueue.peek();
-
-      // If the commit id is less than or equals to the last commit id, it 
means that
-      // the event has been committed before, and has been retried. So the 
event can
-      // be ignored.
-      if (committer.left <= lastCommitId.get()) {
-        commitQueue.poll();
-        continue;
-      }
-
-      if (committer.left != lastCommitId.get() + 1) {
-        break;
-      }
-
-      committer.right.run();
-      lastCommitId.incrementAndGet();
-
-      commitQueue.poll();
-    }
+  public void addFailureEventToRetryQueue(Event event) {
+    retryEventQueue.offer(event);
   }
 
   /**
-   * Add failure event to retry queue.
-   *
-   * @param requestCommitId commit id of the request
-   * @param event event to retry
+   * When a pipe is dropped, the connector maybe reused and will not be 
closed. So we just discard
+   * its queued events in the output pipe connector.
    */
-  public void addFailureEventToRetryQueue(long requestCommitId, Event event) {
-    retryEventQueue.offer(new Pair<>(requestCommitId, event));
+  public synchronized void discardEventsOfPipe(String pipeNameToDrop) {
+    retryEventQueue.removeIf(
+        event ->
+            event instanceof EnrichedEvent
+                && pipeNameToDrop.equals(((EnrichedEvent) 
event).getPipeName()));
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
index a3a316983cf..cb7b8066203 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
@@ -73,10 +73,11 @@ public class PipeTransferTabletBatchEventHandler implements 
AsyncMethodCallback<
     }
 
     if (response.getStatus().getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      for (int i = 0; i < events.size(); ++i) {
-        connector.commit(
-            requestCommitIds.get(i),
-            events.get(i) instanceof EnrichedEvent ? (EnrichedEvent) 
events.get(i) : null);
+      for (final Event event : events) {
+        if (event instanceof EnrichedEvent) {
+          ((EnrichedEvent) event)
+              
.decreaseReferenceCount(PipeTransferTabletBatchEventHandler.class.getName(), 
true);
+        }
       }
     } else {
       onError(new PipeException(response.getStatus().getMessage()));
@@ -91,8 +92,8 @@ public class PipeTransferTabletBatchEventHandler implements 
AsyncMethodCallback<
         requestCommitIds,
         exception);
 
-    for (int i = 0; i < events.size(); ++i) {
-      connector.addFailureEventToRetryQueue(requestCommitIds.get(i), 
events.get(i));
+    for (final Event event : events) {
+      connector.addFailureEventToRetryQueue(event);
     }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
index dbdda7df226..c0dcf8670f4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
@@ -31,11 +31,10 @@ public class PipeTransferTabletInsertNodeEventHandler
     extends PipeTransferTabletInsertionEventHandler<TPipeTransferResp> {
 
   public PipeTransferTabletInsertNodeEventHandler(
-      long requestCommitId,
       PipeInsertNodeTabletInsertionEvent event,
       TPipeTransferReq req,
       IoTDBThriftAsyncConnector connector) {
-    super(requestCommitId, event, req, connector);
+    super(event, req, connector);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
index 619a0f961b8..d1b0ae18c50 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
@@ -22,7 +22,7 @@ package 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler;
 import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBThriftAsyncConnector;
 import org.apache.iotdb.db.pipe.event.EnrichedEvent;
-import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
@@ -33,39 +33,27 @@ import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Optional;
-
 public abstract class PipeTransferTabletInsertionEventHandler<E extends 
TPipeTransferResp>
     implements AsyncMethodCallback<E> {
 
   private static final Logger LOGGER =
       LoggerFactory.getLogger(PipeTransferTabletInsertionEventHandler.class);
 
-  private final long requestCommitId;
-  private final Event event;
+  private final TabletInsertionEvent event;
   private final TPipeTransferReq req;
 
   private final IoTDBThriftAsyncConnector connector;
 
   protected PipeTransferTabletInsertionEventHandler(
-      long requestCommitId,
-      Event event,
-      TPipeTransferReq req,
-      IoTDBThriftAsyncConnector connector) {
-    this.requestCommitId = requestCommitId;
+      TabletInsertionEvent event, TPipeTransferReq req, 
IoTDBThriftAsyncConnector connector) {
     this.event = event;
     this.req = req;
     this.connector = connector;
 
-    Optional.ofNullable(event)
-        .ifPresent(
-            e -> {
-              if (e instanceof EnrichedEvent) {
-                ((EnrichedEvent) e)
-                    .increaseReferenceCount(
-                        
PipeTransferTabletInsertionEventHandler.class.getName());
-              }
-            });
+    if (this.event instanceof EnrichedEvent) {
+      ((EnrichedEvent) this.event)
+          
.increaseReferenceCount(PipeTransferTabletInsertionEventHandler.class.getName());
+    }
   }
 
   public void transfer(AsyncPipeDataTransferServiceClient client) throws 
TException {
@@ -84,8 +72,10 @@ public abstract class 
PipeTransferTabletInsertionEventHandler<E extends TPipeTra
     }
 
     if (response.getStatus().getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      connector.commit(
-          requestCommitId, event instanceof EnrichedEvent ? (EnrichedEvent) 
event : null);
+      if (event instanceof EnrichedEvent) {
+        ((EnrichedEvent) event)
+            
.decreaseReferenceCount(PipeTransferTabletInsertionEventHandler.class.getName(),
 true);
+      }
     } else {
       onError(new PipeException(response.getStatus().getMessage()));
     }
@@ -94,11 +84,12 @@ public abstract class 
PipeTransferTabletInsertionEventHandler<E extends TPipeTra
   @Override
   public void onError(Exception exception) {
     LOGGER.warn(
-        "Failed to transfer TabletInsertionEvent {} (request commit id={}).",
+        "Failed to transfer TabletInsertionEvent {} (committer key={}, commit 
id={}).",
         event,
-        requestCommitId,
+        event instanceof EnrichedEvent ? ((EnrichedEvent) 
event).getCommitterKey() : null,
+        event instanceof EnrichedEvent ? ((EnrichedEvent) event).getCommitId() 
: null,
         exception);
 
-    connector.addFailureEventToRetryQueue(requestCommitId, event);
+    connector.addFailureEventToRetryQueue(event);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
index db0c5f235a1..4cf8a768048 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
@@ -31,11 +31,10 @@ public class PipeTransferTabletRawEventHandler
     extends PipeTransferTabletInsertionEventHandler<TPipeTransferResp> {
 
   public PipeTransferTabletRawEventHandler(
-      long requestCommitId,
       PipeRawTabletInsertionEvent event,
       TPipeTransferReq req,
       IoTDBThriftAsyncConnector connector) {
-    super(requestCommitId, event, req, connector);
+    super(event, req, connector);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
index f3b130eae64..70b11ba2ce5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
@@ -48,7 +48,6 @@ public class PipeTransferTsFileInsertionEventHandler
   private static final Logger LOGGER =
       LoggerFactory.getLogger(PipeTransferTsFileInsertionEventHandler.class);
 
-  private final long requestCommitId;
   private final PipeTsFileInsertionEvent event;
   private final IoTDBThriftAsyncConnector connector;
 
@@ -64,9 +63,8 @@ public class PipeTransferTsFileInsertionEventHandler
   private AsyncPipeDataTransferServiceClient client;
 
   public PipeTransferTsFileInsertionEventHandler(
-      long requestCommitId, PipeTsFileInsertionEvent event, 
IoTDBThriftAsyncConnector connector)
+      PipeTsFileInsertionEvent event, IoTDBThriftAsyncConnector connector)
       throws FileNotFoundException {
-    this.requestCommitId = requestCommitId;
     this.event = event;
     this.connector = connector;
 
@@ -79,7 +77,7 @@ public class PipeTransferTsFileInsertionEventHandler
 
     isSealSignalSent = new AtomicBoolean(false);
 
-    
event.increaseReferenceCount(PipeTransferTabletInsertionEventHandler.class.getName());
+    
event.increaseReferenceCount(PipeTransferTsFileInsertionEventHandler.class.getName());
   }
 
   public void transfer(AsyncPipeDataTransferServiceClient client) throws 
TException, IOException {
@@ -124,10 +122,13 @@ public class PipeTransferTsFileInsertionEventHandler
       } catch (IOException e) {
         LOGGER.warn("Failed to close file reader when successfully transferred 
file.", e);
       } finally {
-        connector.commit(requestCommitId, event);
+        
event.decreaseReferenceCount(PipeTransferTsFileInsertionEventHandler.class.getName(),
 true);
 
         LOGGER.info(
-            "Successfully transferred file {}. Request commit id is {}.", 
tsFile, requestCommitId);
+            "Successfully transferred file {} (committer key={}, commit 
id={}).",
+            tsFile,
+            event.getCommitterKey(),
+            event.getCommitId());
 
         if (client != null) {
           client.setShouldReturnSelf(true);
@@ -164,9 +165,10 @@ public class PipeTransferTsFileInsertionEventHandler
   @Override
   public void onError(Exception exception) {
     LOGGER.warn(
-        "Failed to transfer TsFileInsertionEvent {} (request commit id {}).",
+        "Failed to transfer TsFileInsertionEvent {} (committer key {}, commit 
id {}).",
         tsFile,
-        requestCommitId,
+        event.getCommitterKey(),
+        event.getCommitId(),
         exception);
 
     try {
@@ -176,7 +178,7 @@ public class PipeTransferTsFileInsertionEventHandler
     } catch (IOException e) {
       LOGGER.warn("Failed to close file reader when failed to transfer file.", 
e);
     } finally {
-      connector.addFailureEventToRetryQueue(requestCommitId, event);
+      connector.addFailureEventToRetryQueue(event);
 
       if (client != null) {
         client.setShouldReturnSelf(true);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
index 4fc3c9a41e5..75bd571f0b6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
@@ -39,15 +39,13 @@ import javax.annotation.Nullable;
 
 import java.net.InetSocketAddress;
 import java.util.Arrays;
-import java.util.Comparator;
 import java.util.Map;
 import java.util.Optional;
-import java.util.PriorityQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 
 public class WebSocketConnector implements PipeConnector {
+
   private static final Logger LOGGER = 
LoggerFactory.getLogger(WebSocketConnector.class);
 
   private static final Map<Integer, Pair<AtomicInteger, 
WebSocketConnectorServer>>
@@ -56,11 +54,6 @@ public class WebSocketConnector implements PipeConnector {
   private Integer port;
   private WebSocketConnectorServer server;
 
-  public final AtomicLong commitIdGenerator = new AtomicLong(0);
-  private final AtomicLong lastCommitId = new AtomicLong(0);
-  private final PriorityQueue<Pair<Long, Runnable>> commitQueue =
-      new PriorityQueue<>(Comparator.comparing(o -> o.left));
-
   @Override
   public void validate(PipeParameterValidator validator) throws Exception {
     // Do nothing
@@ -108,10 +101,11 @@ public class WebSocketConnector implements PipeConnector {
           tabletInsertionEvent);
       return;
     }
-    long commitId = commitIdGenerator.incrementAndGet();
+
     ((EnrichedEvent) tabletInsertionEvent)
         .increaseReferenceCount(WebSocketConnector.class.getName());
-    server.addEvent(new Pair<>(commitId, tabletInsertionEvent));
+
+    server.addEvent(tabletInsertionEvent);
   }
 
   @Override
@@ -122,11 +116,12 @@ public class WebSocketConnector implements PipeConnector {
           tsFileInsertionEvent);
       return;
     }
+
     try {
       for (TabletInsertionEvent event : 
tsFileInsertionEvent.toTabletInsertionEvents()) {
-        long commitId = commitIdGenerator.incrementAndGet();
         ((EnrichedEvent) 
event).increaseReferenceCount(WebSocketConnector.class.getName());
-        server.addEvent(new Pair<>(commitId, event));
+
+        server.addEvent(event);
       }
     } finally {
       tsFileInsertionEvent.close();
@@ -159,36 +154,8 @@ public class WebSocketConnector implements PipeConnector {
     }
   }
 
-  public synchronized void commit(long requestCommitId, @Nullable 
EnrichedEvent enrichedEvent) {
-    commitQueue.offer(
-        new Pair<>(
-            requestCommitId,
-            () ->
-                Optional.ofNullable(enrichedEvent)
-                    .ifPresent(
-                        event ->
-                            event.decreaseReferenceCount(
-                                WebSocketConnector.class.getName(), true))));
-
-    while (!commitQueue.isEmpty()) {
-      final Pair<Long, Runnable> committer = commitQueue.peek();
-
-      // If the commit id is less than or equals to the last commit id, it 
means that
-      // the event has been committed before, and has been retried. So the 
event can
-      // be ignored.
-      if (committer.left <= lastCommitId.get()) {
-        commitQueue.poll();
-        continue;
-      }
-
-      if (committer.left != lastCommitId.get() + 1) {
-        break;
-      }
-
-      committer.right.run();
-      lastCommitId.incrementAndGet();
-
-      commitQueue.poll();
-    }
+  public synchronized void commit(@Nullable EnrichedEvent enrichedEvent) {
+    Optional.ofNullable(enrichedEvent)
+        .ifPresent(event -> 
event.decreaseReferenceCount(WebSocketConnector.class.getName(), true));
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnectorServer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnectorServer.java
index cb4aaef0f97..3ba0f7feb89 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnectorServer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnectorServer.java
@@ -40,14 +40,18 @@ import java.util.Comparator;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class WebSocketConnectorServer extends WebSocketServer {
+
   private static final Logger LOGGER = 
LoggerFactory.getLogger(WebSocketConnectorServer.class);
-  private final PriorityBlockingQueue<Pair<Long, Event>> events =
+
+  private final AtomicLong eventIdGenerator = new AtomicLong(0);
+  private final PriorityBlockingQueue<Pair<Long, Event>> 
eventsWaitingForTransfer =
       new PriorityBlockingQueue<>(11, Comparator.comparing(o -> o.left));
-  private final WebSocketConnector websocketConnector;
+  private final ConcurrentMap<Long, Event> eventsWaitingForAck = new 
ConcurrentHashMap<>();
 
-  private final ConcurrentMap<Long, Event> eventMap = new 
ConcurrentHashMap<>();
+  private final WebSocketConnector websocketConnector;
 
   public WebSocketConnectorServer(
       InetSocketAddress address, WebSocketConnector websocketConnector) {
@@ -120,12 +124,12 @@ public class WebSocketConnectorServer extends 
WebSocketServer {
     LOGGER.info(log);
   }
 
-  public void addEvent(Pair<Long, Event> event) {
-    if (events.size() >= 5) {
-      synchronized (events) {
-        while (events.size() >= 5) {
+  public void addEvent(Event event) {
+    if (eventsWaitingForTransfer.size() >= 5) {
+      synchronized (eventsWaitingForTransfer) {
+        while (eventsWaitingForTransfer.size() >= 5) {
           try {
-            events.wait();
+            eventsWaitingForTransfer.wait();
           } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new PipeException(e.getMessage());
@@ -133,22 +137,22 @@ public class WebSocketConnectorServer extends 
WebSocketServer {
         }
       }
     }
-    events.put(event);
+
+    eventsWaitingForTransfer.put(new 
Pair<>(eventIdGenerator.incrementAndGet(), event));
   }
 
   private void handleStart(WebSocket webSocket) {
     try {
       while (true) {
-        Pair<Long, Event> eventPair = events.take();
-        synchronized (events) {
-          events.notifyAll();
+        Pair<Long, Event> eventPair = eventsWaitingForTransfer.take();
+        synchronized (eventsWaitingForTransfer) {
+          eventsWaitingForTransfer.notifyAll();
         }
         boolean transferred = transfer(eventPair, webSocket);
         if (transferred) {
           break;
         } else {
           websocketConnector.commit(
-              eventPair.getLeft(),
               eventPair.getRight() instanceof EnrichedEvent
                   ? (EnrichedEvent) eventPair.getRight()
                   : null);
@@ -162,30 +166,6 @@ public class WebSocketConnectorServer extends 
WebSocketServer {
     }
   }
 
-  private void handleAck(WebSocket webSocket, String s) {
-    long commitId = Long.parseLong(s.replace("ACK:", ""));
-    Event event = eventMap.remove(commitId);
-    if (event != null) {
-      websocketConnector.commit(
-          commitId, event instanceof EnrichedEvent ? (EnrichedEvent) event : 
null);
-    }
-    handleStart(webSocket);
-  }
-
-  private void handleError(WebSocket webSocket, String s) {
-    long commitId = Long.parseLong(s.replace("ERROR:", ""));
-    String log =
-        String.format(
-            "The tablet of commitId: %d can't be parsed by client, it will be 
retried later.",
-            commitId);
-    LOGGER.warn(log);
-    Event event = eventMap.remove(commitId);
-    if (event != null) {
-      events.put(new Pair<>(commitId, event));
-    }
-    handleStart(webSocket);
-  }
-
   private boolean transfer(Pair<Long, Event> eventPair, WebSocket webSocket) {
     Long commitId = eventPair.getLeft();
     Event event = eventPair.getRight();
@@ -203,16 +183,41 @@ public class WebSocketConnectorServer extends 
WebSocketServer {
       if (tabletBuffer == null) {
         return false;
       }
+
       ByteBuffer payload = ByteBuffer.allocate(Long.BYTES + 
tabletBuffer.limit());
       payload.putLong(commitId);
       payload.put(tabletBuffer);
       payload.flip();
+
       this.broadcast(payload, Collections.singletonList(webSocket));
-      eventMap.put(eventPair.getLeft(), eventPair.getRight());
+      eventsWaitingForAck.put(eventPair.getLeft(), eventPair.getRight());
     } catch (Exception e) {
-      events.put(eventPair);
+      eventsWaitingForTransfer.put(eventPair);
       throw new PipeException(e.getMessage());
     }
     return true;
   }
+
+  private void handleAck(WebSocket webSocket, String s) {
+    long commitId = Long.parseLong(s.replace("ACK:", ""));
+    Event event = eventsWaitingForAck.remove(commitId);
+    if (event != null) {
+      websocketConnector.commit(event instanceof EnrichedEvent ? 
(EnrichedEvent) event : null);
+    }
+    handleStart(webSocket);
+  }
+
+  private void handleError(WebSocket webSocket, String s) {
+    long commitId = Long.parseLong(s.replace("ERROR:", ""));
+    String log =
+        String.format(
+            "The tablet of commitId: %d can't be parsed by client, it will be 
retried later.",
+            commitId);
+    LOGGER.warn(log);
+    Event event = eventsWaitingForAck.remove(commitId);
+    if (event != null) {
+      eventsWaitingForTransfer.put(new Pair<>(commitId, event));
+    }
+    handleStart(webSocket);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
index 801b92cc51f..b01cd181ccc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.pipe.commit.PipeEventCommitManager;
 import org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant;
 import org.apache.iotdb.pipe.api.event.Event;
 
@@ -42,15 +43,23 @@ public abstract class EnrichedEvent implements Event {
 
   private final AtomicInteger referenceCount;
 
+  protected final String pipeName;
   protected final PipeTaskMeta pipeTaskMeta;
 
+  private String committerKey;
+  public static final long NO_COMMIT_ID = -1;
+  private long commitId = NO_COMMIT_ID;
+
   private final String pattern;
 
   protected boolean isPatternParsed;
   protected boolean isTimeParsed = true;
 
-  protected EnrichedEvent(PipeTaskMeta pipeTaskMeta, String pattern) {
+  private boolean shouldReportOnCommit = false;
+
+  protected EnrichedEvent(String pipeName, PipeTaskMeta pipeTaskMeta, String 
pattern) {
     referenceCount = new AtomicInteger(0);
+    this.pipeName = pipeName;
     this.pipeTaskMeta = pipeTaskMeta;
     this.pattern = pattern;
     isPatternParsed = 
getPattern().equals(PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE);
@@ -97,8 +106,9 @@ public abstract class EnrichedEvent implements Event {
       if (referenceCount.get() == 1) {
         isSuccessful = internallyDecreaseResourceReferenceCount(holderMessage);
         if (shouldReport) {
-          reportProgress();
+          shouldReportOnCommit = true;
         }
+        PipeEventCommitManager.getInstance().commit(this, committerKey);
       }
       final int newReferenceCount = referenceCount.decrementAndGet();
       if (newReferenceCount < 0) {
@@ -154,6 +164,10 @@ public abstract class EnrichedEvent implements Event {
     return referenceCount.get();
   }
 
+  public final String getPipeName() {
+    return pipeName;
+  }
+
   /**
    * Get the pattern of this event.
    *
@@ -176,7 +190,7 @@ public abstract class EnrichedEvent implements Event {
   }
 
   public abstract EnrichedEvent 
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
-      PipeTaskMeta pipeTaskMeta, String pattern);
+      String pipeName, PipeTaskMeta pipeTaskMeta, String pattern);
 
   public void reportException(PipeRuntimeException pipeRuntimeException) {
     if (pipeTaskMeta != null) {
@@ -187,4 +201,23 @@ public abstract class EnrichedEvent implements Event {
   }
 
   public abstract boolean isGeneratedByPipe();
+
+  public void setCommitterKeyAndCommitId(String committerKey, long commitId) {
+    this.committerKey = committerKey;
+    this.commitId = commitId;
+  }
+
+  public String getCommitterKey() {
+    return committerKey;
+  }
+
+  public long getCommitId() {
+    return commitId;
+  }
+
+  public void onCommitted() {
+    if (shouldReportOnCommit) {
+      reportProgress();
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
index 2c54d93401a..e94e43e61fc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
@@ -68,17 +68,18 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
   private final boolean shouldPrintMessage;
 
   public PipeHeartbeatEvent(String dataRegionId, boolean shouldPrintMessage) {
-    super(null, null);
+    super(null, null, null);
     this.dataRegionId = dataRegionId;
     this.shouldPrintMessage = shouldPrintMessage;
   }
 
   public PipeHeartbeatEvent(
+      String pipeName,
       PipeTaskMeta pipeTaskMeta,
       String dataRegionId,
       long timePublished,
       boolean shouldPrintMessage) {
-    super(pipeTaskMeta, null);
+    super(pipeName, pipeTaskMeta, null);
     this.dataRegionId = dataRegionId;
     this.timePublished = timePublished;
     this.shouldPrintMessage = shouldPrintMessage;
@@ -106,9 +107,10 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
 
   @Override
   public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
-      PipeTaskMeta pipeTaskMeta, String pattern) {
+      String pipeName, PipeTaskMeta pipeTaskMeta, String pattern) {
     // Should record PipeTaskMeta, for sometimes HeartbeatEvents should report 
exceptions.
-    return new PipeHeartbeatEvent(pipeTaskMeta, dataRegionId, timePublished, 
shouldPrintMessage);
+    return new PipeHeartbeatEvent(
+        pipeName, pipeTaskMeta, dataRegionId, timePublished, 
shouldPrintMessage);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
index 76c4016936f..aea3964399b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
@@ -95,7 +95,13 @@ public class PipeRowCollector implements RowCollector {
   private void collectTabletInsertionEvent() {
     if (tablet != null) {
       tabletInsertionEventList.add(
-          new PipeRawTabletInsertionEvent(tablet, isAligned, pipeTaskMeta, 
sourceEvent, false));
+          new PipeRawTabletInsertionEvent(
+              tablet,
+              isAligned,
+              sourceEvent == null ? null : sourceEvent.getPipeName(),
+              pipeTaskMeta,
+              sourceEvent,
+              false));
     }
     this.tablet = null;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 9bb99a21bb2..cdc689e6ac0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -57,7 +57,7 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
       ProgressIndex progressIndex,
       boolean isAligned,
       boolean isGeneratedByPipe) {
-    this(walEntryHandler, progressIndex, isAligned, isGeneratedByPipe, null, 
null);
+    this(walEntryHandler, progressIndex, isAligned, isGeneratedByPipe, null, 
null, null);
   }
 
   private PipeInsertNodeTabletInsertionEvent(
@@ -65,9 +65,10 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
       ProgressIndex progressIndex,
       boolean isAligned,
       boolean isGeneratedByPipe,
+      String pipeName,
       PipeTaskMeta pipeTaskMeta,
       String pattern) {
-    super(pipeTaskMeta, pattern);
+    super(pipeName, pipeTaskMeta, pattern);
     this.walEntryHandler = walEntryHandler;
     this.progressIndex = progressIndex;
     this.isAligned = isAligned;
@@ -129,9 +130,15 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
 
   @Override
   public PipeInsertNodeTabletInsertionEvent 
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
-      PipeTaskMeta pipeTaskMeta, String pattern) {
+      String pipeName, PipeTaskMeta pipeTaskMeta, String pattern) {
     return new PipeInsertNodeTabletInsertionEvent(
-        walEntryHandler, progressIndex, isAligned, isGeneratedByPipe, 
pipeTaskMeta, pattern);
+        walEntryHandler,
+        progressIndex,
+        isAligned,
+        isGeneratedByPipe,
+        pipeName,
+        pipeTaskMeta,
+        pattern);
   }
 
   @Override
@@ -188,7 +195,8 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
   /////////////////////////// parsePattern ///////////////////////////
 
   public TabletInsertionEvent parseEventWithPattern() {
-    return new PipeRawTabletInsertionEvent(convertToTablet(), isAligned, 
pipeTaskMeta, this, true);
+    return new PipeRawTabletInsertionEvent(
+        convertToTablet(), isAligned, pipeName, pipeTaskMeta, this, true);
   }
 
   /////////////////////////// Object ///////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 2ad67057995..91269fd1541 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -51,9 +51,10 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent implements Tablet
       boolean isAligned,
       EnrichedEvent sourceEvent,
       boolean needToReport,
+      String pipeName,
       PipeTaskMeta pipeTaskMeta,
       String pattern) {
-    super(pipeTaskMeta, pattern);
+    super(pipeName, pipeTaskMeta, pattern);
     this.tablet = Objects.requireNonNull(tablet);
     this.isAligned = isAligned;
     this.sourceEvent = sourceEvent;
@@ -63,20 +64,21 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent implements Tablet
   public PipeRawTabletInsertionEvent(
       Tablet tablet,
       boolean isAligned,
+      String pipeName,
       PipeTaskMeta pipeTaskMeta,
       EnrichedEvent sourceEvent,
       boolean needToReport) {
-    this(tablet, isAligned, sourceEvent, needToReport, pipeTaskMeta, null);
+    this(tablet, isAligned, sourceEvent, needToReport, pipeName, pipeTaskMeta, 
null);
   }
 
   @TestOnly
   public PipeRawTabletInsertionEvent(Tablet tablet, boolean isAligned) {
-    this(tablet, isAligned, null, false, null, null);
+    this(tablet, isAligned, null, false, null, null, null);
   }
 
   @TestOnly
   public PipeRawTabletInsertionEvent(Tablet tablet, boolean isAligned, String 
pattern) {
-    this(tablet, isAligned, null, false, null, pattern);
+    this(tablet, isAligned, null, false, null, null, pattern);
   }
 
   @Override
@@ -105,9 +107,9 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent implements Tablet
 
   @Override
   public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
-      PipeTaskMeta pipeTaskMeta, String pattern) {
+      String pipeName, PipeTaskMeta pipeTaskMeta, String pattern) {
     return new PipeRawTabletInsertionEvent(
-        tablet, isAligned, sourceEvent, needToReport, pipeTaskMeta, pattern);
+        tablet, isAligned, sourceEvent, needToReport, pipeName, pipeTaskMeta, 
pattern);
   }
 
   @Override
@@ -162,6 +164,6 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent implements Tablet
 
   public TabletInsertionEvent parseEventWithPattern() {
     return new PipeRawTabletInsertionEvent(
-        convertToTablet(), isAligned, pipeTaskMeta, this, needToReport);
+        convertToTablet(), isAligned, pipeName, pipeTaskMeta, this, 
needToReport);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
index 3e7506adef3..09d8cdea112 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.event.common.tablet;
 
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.pipe.event.EnrichedEvent;
 import org.apache.iotdb.db.pipe.event.common.row.PipeRow;
 import org.apache.iotdb.db.pipe.event.common.row.PipeRowCollector;
@@ -64,10 +65,6 @@ public class TabletInsertionDataContainer {
 
   private Tablet tablet;
 
-  public TabletInsertionDataContainer(InsertNode insertNode, String pattern) {
-    this(null, null, insertNode, pattern);
-  }
-
   public TabletInsertionDataContainer(
       PipeTaskMeta pipeTaskMeta, EnrichedEvent sourceEvent, InsertNode 
insertNode, String pattern) {
     this.pipeTaskMeta = pipeTaskMeta;
@@ -95,6 +92,11 @@ public class TabletInsertionDataContainer {
     parse(tablet, isAligned, pattern);
   }
 
+  @TestOnly
+  public TabletInsertionDataContainer(InsertNode insertNode, String pattern) {
+    this(null, null, insertNode, pattern);
+  }
+
   //////////////////////////// parse ////////////////////////////
 
   private void parse(InsertRowNode insertRowNode, String pattern) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 0acefd60f54..7f49919eb32 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -58,19 +58,29 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
 
   public PipeTsFileInsertionEvent(
       TsFileResource resource, boolean isLoaded, boolean isGeneratedByPipe) {
-    this(resource, isLoaded, isGeneratedByPipe, null, null, Long.MIN_VALUE, 
Long.MAX_VALUE, false);
+    this(
+        resource,
+        isLoaded,
+        isGeneratedByPipe,
+        null,
+        null,
+        null,
+        Long.MIN_VALUE,
+        Long.MAX_VALUE,
+        false);
   }
 
   public PipeTsFileInsertionEvent(
       TsFileResource resource,
       boolean isLoaded,
       boolean isGeneratedByPipe,
+      String pipeName,
       PipeTaskMeta pipeTaskMeta,
       String pattern,
       long startTime,
       long endTime,
       boolean needParseTime) {
-    super(pipeTaskMeta, pattern);
+    super(pipeName, pipeTaskMeta, pattern);
 
     this.startTime = startTime;
     this.endTime = endTime;
@@ -176,11 +186,12 @@ public class PipeTsFileInsertionEvent extends 
EnrichedEvent implements TsFileIns
 
   @Override
   public PipeTsFileInsertionEvent 
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
-      PipeTaskMeta pipeTaskMeta, String pattern) {
+      String pipeName, PipeTaskMeta pipeTaskMeta, String pattern) {
     return new PipeTsFileInsertionEvent(
         resource,
         isLoaded,
         isGeneratedByPipe,
+        pipeName,
         pipeTaskMeta,
         pattern,
         startTime,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
index 664f0f6804e..10b746b3f36 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
@@ -210,12 +210,22 @@ public class TsFileInsertionDataContainer implements 
AutoCloseable {
             if (!hasNext()) {
               next =
                   new PipeRawTabletInsertionEvent(
-                      tablet, isAligned, pipeTaskMeta, sourceEvent, true);
+                      tablet,
+                      isAligned,
+                      sourceEvent != null ? sourceEvent.getPipeName() : null,
+                      pipeTaskMeta,
+                      sourceEvent,
+                      true);
               close();
             } else {
               next =
                   new PipeRawTabletInsertionEvent(
-                      tablet, isAligned, pipeTaskMeta, sourceEvent, false);
+                      tablet,
+                      isAligned,
+                      sourceEvent != null ? sourceEvent.getPipeName() : null,
+                      pipeTaskMeta,
+                      sourceEvent,
+                      false);
             }
             return next;
           }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
index 600df727f2c..4375b598b6a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
@@ -45,7 +45,7 @@ public class PipeRealtimeEvent extends EnrichedEvent {
     // pipeTaskMeta is used to report the progress of the event, the 
PipeRealtimeEvent
     // is only used in the realtime event extractor, which does not need to 
report the progress
     // of the event, so the pipeTaskMeta is always null.
-    super(null, pattern);
+    super(event != null ? event.getPipeName() : null, null, pattern);
 
     this.event = event;
     this.tsFileEpoch = tsFileEpoch;
@@ -61,7 +61,7 @@ public class PipeRealtimeEvent extends EnrichedEvent {
     // pipeTaskMeta is used to report the progress of the event, the 
PipeRealtimeEvent
     // is only used in the realtime event extractor, which does not need to 
report the progress
     // of the event, so the pipeTaskMeta is always null.
-    super(pipeTaskMeta, pattern);
+    super(event != null ? event.getPipeName() : null, pipeTaskMeta, pattern);
 
     this.event = event;
     this.tsFileEpoch = tsFileEpoch;
@@ -136,9 +136,9 @@ public class PipeRealtimeEvent extends EnrichedEvent {
 
   @Override
   public PipeRealtimeEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
-      PipeTaskMeta pipeTaskMeta, String pattern) {
+      String pipeName, PipeTaskMeta pipeTaskMeta, String pattern) {
     return new PipeRealtimeEvent(
-        
event.shallowCopySelfAndBindPipeTaskMetaForProgressReport(pipeTaskMeta, 
pattern),
+        event.shallowCopySelfAndBindPipeTaskMetaForProgressReport(pipeName, 
pipeTaskMeta, pattern),
         this.tsFileEpoch,
         this.device2Measurements,
         pipeTaskMeta,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
index d1befd3e08a..0dadae365f9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutor.java
@@ -112,6 +112,7 @@ public abstract class PipeSubtaskExecutor {
     if (subtask != null) {
       try {
         subtask.close();
+        LOGGER.info("The subtask {} is closed successfully.", subTaskID);
       } catch (Exception e) {
         LOGGER.error("Failed to close the subtask {}.", subTaskID, e);
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 9c9fb01ff25..522622bca6c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -72,6 +72,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
   private static final Map<Integer, Long> 
DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP = new HashMap<>();
   private static final long PIPE_MIN_FLUSH_INTERVAL_IN_MS = 2000;
 
+  private String pipeName;
   private PipeTaskMeta pipeTaskMeta;
   private ProgressIndex startIndex;
 
@@ -100,6 +101,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
     final PipeTaskExtractorRuntimeEnvironment environment =
         (PipeTaskExtractorRuntimeEnvironment) 
configuration.getRuntimeEnvironment();
 
+    pipeName = environment.getPipeName();
     pipeTaskMeta = environment.getPipeTaskMeta();
     startIndex = environment.getPipeTaskMeta().getProgressIndex();
 
@@ -351,6 +353,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
             resource,
             false,
             false,
+            pipeName,
             pipeTaskMeta,
             pattern,
             historicalDataExtractionStartTime,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java
index 0042c9d3c91..279f20f2a0c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java
@@ -69,7 +69,7 @@ public class PipeDataRegionAssigner {
 
               final PipeRealtimeEvent copiedEvent =
                   event.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
-                      extractor.getPipeTaskMeta(), extractor.getPattern());
+                      extractor.getPipeName(), extractor.getPipeTaskMeta(), 
extractor.getPattern());
 
               
copiedEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName());
               extractor.extract(copiedEvent);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent.java
index bc5b79e4c79..8bb22523e51 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent.java
@@ -24,10 +24,12 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.connector.payload.legacy.PipeData;
 import org.apache.iotdb.db.pipe.connector.payload.legacy.TsFilePipeData;
 import org.apache.iotdb.db.protocol.session.SessionManager;
+import org.apache.iotdb.db.queryengine.common.SessionInfo;
 import org.apache.iotdb.db.queryengine.plan.Coordinator;
 import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
 import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
@@ -48,6 +50,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
+import java.time.ZoneId;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
@@ -136,7 +139,7 @@ public class IoTDBLegacyPipeReceiverAgent {
               .execute(
                   statement,
                   queryId,
-                  null,
+                  new SessionInfo(0, AuthorityChecker.SUPER_USER, 
ZoneId.systemDefault().getId()),
                   "",
                   partitionFetcher,
                   schemaFetcher,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/DeletionLoader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/DeletionLoader.java
index f7c2b188277..3f779641530 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/DeletionLoader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/DeletionLoader.java
@@ -20,9 +20,11 @@
 package org.apache.iotdb.db.pipe.receiver.legacy.loader;
 
 import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.LoadFileException;
 import org.apache.iotdb.db.protocol.session.SessionManager;
+import org.apache.iotdb.db.queryengine.common.SessionInfo;
 import org.apache.iotdb.db.queryengine.plan.Coordinator;
 import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.queryengine.plan.statement.Statement;
@@ -35,6 +37,7 @@ import org.apache.iotdb.rpc.TSStatusCode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.ZoneId;
 import java.util.Collections;
 
 /** This loader is used to load deletion plan. */
@@ -61,7 +64,7 @@ public class DeletionLoader implements ILoader {
               .execute(
                   statement,
                   queryId,
-                  null,
+                  new SessionInfo(0, AuthorityChecker.SUPER_USER, 
ZoneId.systemDefault().getId()),
                   "",
                   PARTITION_FETCHER,
                   SCHEMA_FETCHER,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/TsFileLoader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/TsFileLoader.java
index 19e2cf5d080..6288d74371c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/TsFileLoader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/TsFileLoader.java
@@ -21,9 +21,11 @@ package org.apache.iotdb.db.pipe.receiver.legacy.loader;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.LoadFileException;
 import org.apache.iotdb.db.protocol.session.SessionManager;
+import org.apache.iotdb.db.queryengine.common.SessionInfo;
 import org.apache.iotdb.db.queryengine.plan.Coordinator;
 import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
@@ -34,6 +36,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.time.ZoneId;
 
 /** This loader is used to load tsFiles. If .mods file exists, it will be 
loaded as well. */
 public class TsFileLoader implements ILoader {
@@ -63,7 +66,7 @@ public class TsFileLoader implements ILoader {
               .execute(
                   statement,
                   queryId,
-                  null,
+                  new SessionInfo(0, AuthorityChecker.SUPER_USER, 
ZoneId.systemDefault().getId()),
                   "",
                   PARTITION_FETCHER,
                   SCHEMA_FETCHER,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
index 57ed8834a72..3621158e1b0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
@@ -55,7 +55,8 @@ public class PipeTaskBuilder {
         new PipeTaskConnectorStage(
             pipeStaticMeta.getPipeName(),
             pipeStaticMeta.getCreationTime(),
-            pipeStaticMeta.getConnectorParameters());
+            pipeStaticMeta.getConnectorParameters(),
+            dataRegionId);
 
     // The processor connects the extractor and connector.
     final PipeTaskProcessorStage processorStage =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
index cfb0aabeb9d..b8a5042707f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.task.connection;
 
+import org.apache.iotdb.db.pipe.commit.PipeEventCommitManager;
 import org.apache.iotdb.db.pipe.event.EnrichedEvent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import org.apache.iotdb.pipe.api.collector.EventCollector;
@@ -33,10 +34,13 @@ public class PipeEventCollector implements EventCollector, 
AutoCloseable {
 
   private final EnrichedDeque<Event> bufferQueue;
 
+  private final int dataRegionId;
+
   private final AtomicBoolean isClosed = new AtomicBoolean(false);
 
-  public PipeEventCollector(BoundedBlockingPendingQueue<Event> pendingQueue) {
+  public PipeEventCollector(BoundedBlockingPendingQueue<Event> pendingQueue, 
int dataRegionId) {
     this.pendingQueue = pendingQueue;
+    this.dataRegionId = dataRegionId;
     bufferQueue = new EnrichedDeque<>(new LinkedList<>());
   }
 
@@ -44,6 +48,10 @@ public class PipeEventCollector implements EventCollector, 
AutoCloseable {
   public synchronized void collect(Event event) {
     if (event instanceof EnrichedEvent) {
       ((EnrichedEvent) 
event).increaseReferenceCount(PipeEventCollector.class.getName());
+
+      // Assign a commit id for this event in order to report progress in 
order.
+      PipeEventCommitManager.getInstance()
+          .enrichWithCommitterKeyAndCommitId((EnrichedEvent) event, 
dataRegionId);
     }
     if (event instanceof PipeHeartbeatEvent) {
       ((PipeHeartbeatEvent) event).recordBufferQueueSize(bufferQueue);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
index 131b2e7c32e..a78dcfe9ed8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
@@ -19,7 +19,8 @@
 
 package org.apache.iotdb.db.pipe.task.stage;
 
-import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskRuntimeEnvironment;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import 
org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskConnectorRuntimeEnvironment;
 import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager;
 import org.apache.iotdb.db.pipe.task.connection.BoundedBlockingPendingQueue;
 import 
org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtaskManager;
@@ -29,12 +30,19 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
 
 public class PipeTaskConnectorStage extends PipeTaskStage {
 
+  private final String pipeName;
+  private final int dataRegionId;
   protected final PipeParameters pipeConnectorParameters;
 
   protected String connectorSubtaskId;
 
   public PipeTaskConnectorStage(
-      String pipeName, long creationTime, PipeParameters 
pipeConnectorParameters) {
+      String pipeName,
+      long creationTime,
+      PipeParameters pipeConnectorParameters,
+      TConsensusGroupId dataRegionId) {
+    this.pipeName = pipeName;
+    this.dataRegionId = dataRegionId.getId();
     this.pipeConnectorParameters = pipeConnectorParameters;
 
     connectorSubtaskId =
@@ -42,7 +50,8 @@ public class PipeTaskConnectorStage extends PipeTaskStage {
             .register(
                 
PipeSubtaskExecutorManager.getInstance().getConnectorSubtaskExecutor(),
                 pipeConnectorParameters,
-                new PipeTaskRuntimeEnvironment(pipeName, creationTime));
+                new PipeTaskConnectorRuntimeEnvironment(
+                    this.pipeName, creationTime, this.dataRegionId));
   }
 
   @Override
@@ -62,7 +71,7 @@ public class PipeTaskConnectorStage extends PipeTaskStage {
 
   @Override
   public void dropSubtask() throws PipeException {
-    PipeConnectorSubtaskManager.instance().deregister(connectorSubtaskId);
+    PipeConnectorSubtaskManager.instance().deregister(pipeName, dataRegionId, 
connectorSubtaskId);
   }
 
   public BoundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index d050594a9ff..4bcf0e44c76 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -83,7 +83,7 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
     // old one, so we need creationTime to make their hash code different in 
the map.
     final String taskId = pipeName + "_" + dataRegionId.getId() + "_" + 
creationTime;
     final PipeEventCollector pipeConnectorOutputEventCollector =
-        new PipeEventCollector(pipeConnectorOutputPendingQueue);
+        new PipeEventCollector(pipeConnectorOutputPendingQueue, 
dataRegionId.getId());
     this.pipeProcessorSubtask =
         new PipeProcessorSubtask(
             taskId,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java
index 40d257480be..e16c0448274 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java
@@ -130,7 +130,11 @@ public abstract class PipeTaskStage {
       return;
     }
 
-    // status == PipeStatus.RUNNING or PipeStatus.STOPPED, drop the connector
+    // MUST stop the subtask before dropping it, otherwise
+    // the subtask might be in an inconsistent state!
+    stop();
+
+    // status == PipeStatus.STOPPED, drop the connector
     dropSubtask();
 
     status = PipeStatus.DROPPED;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index c9b8c20a3db..b8acefbada3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.task.subtask.connector;
 
 import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalException;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBThriftAsyncConnector;
 import org.apache.iotdb.db.pipe.event.EnrichedEvent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import org.apache.iotdb.db.pipe.execution.scheduler.PipeSubtaskScheduler;
@@ -332,6 +333,16 @@ public class PipeConnectorSubtask extends PipeSubtask {
     }
   }
 
+  /**
+   * When a pipe is dropped, the connector maybe reused and will not be 
closed. So we just discard
+   * its queued events in the output pipe connector.
+   */
+  public void discardEventsOfPipe(String pipeNameToDrop) {
+    if (outputPipeConnector instanceof IoTDBThriftAsyncConnector) {
+      ((IoTDBThriftAsyncConnector) 
outputPipeConnector).discardEventsOfPipe(pipeNameToDrop);
+    }
+  }
+
   //////////////////////////// APIs provided for metric framework 
////////////////////////////
 
   public String getAttributeSortedString() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
index 854811028a1..abee2971a11 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
@@ -23,8 +23,13 @@ import 
org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
 import org.apache.iotdb.db.pipe.task.connection.BoundedBlockingPendingQueue;
 import org.apache.iotdb.pipe.api.event.Event;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeConnectorSubtaskLifeCycle.class);
+
   private final PipeConnectorSubtaskExecutor executor;
   private final PipeConnectorSubtask subtask;
   private final BoundedBlockingPendingQueue<Event> pendingQueue;
@@ -56,51 +61,89 @@ public class PipeConnectorSubtaskLifeCycle implements 
AutoCloseable {
     if (aliveTaskCount < 0) {
       throw new IllegalStateException("aliveTaskCount < 0");
     }
+
     if (aliveTaskCount == 0) {
       executor.register(subtask);
       runningTaskCount = 0;
     }
+
     aliveTaskCount++;
+    LOGGER.info(
+        "Register subtask {}. runningTaskCount: {}, aliveTaskCount: {}",
+        subtask,
+        runningTaskCount,
+        aliveTaskCount);
   }
 
   /**
    * Deregister the subtask. If the subtask is the last one, close the subtask.
    *
+   * <p>Note that this method should be called after the subtask is stopped. 
Otherwise, the
+   * runningTaskCount might be inconsistent with the aliveTaskCount because of 
parallel connector
+   * scheduling.
+   *
+   * @param pipeNameToDeregister pipe name
    * @return true if the subtask is out of life cycle, indicating that the 
subtask should never be
    *     used again
    * @throws IllegalStateException if aliveTaskCount <= 0
    */
-  public synchronized boolean deregister() {
+  public synchronized boolean deregister(String pipeNameToDeregister) {
     if (aliveTaskCount <= 0) {
       throw new IllegalStateException("aliveTaskCount <= 0");
     }
-    if (aliveTaskCount == 1) {
+
+    subtask.discardEventsOfPipe(pipeNameToDeregister);
+
+    try {
+      if (aliveTaskCount > 1) {
+        return false;
+      }
+
       close();
       // This subtask is out of life cycle, should never be used again
       return true;
+    } finally {
+      aliveTaskCount--;
+      LOGGER.info(
+          "Deregister subtask {}. runningTaskCount: {}, aliveTaskCount: {}",
+          subtask,
+          runningTaskCount,
+          aliveTaskCount);
     }
-    aliveTaskCount--;
-    return false;
   }
 
   public synchronized void start() {
     if (runningTaskCount < 0) {
       throw new IllegalStateException("runningTaskCount < 0");
     }
+
     if (runningTaskCount == 0) {
       executor.start(subtask.getTaskID());
     }
+
     runningTaskCount++;
+    LOGGER.info(
+        "Start subtask {}. runningTaskCount: {}, aliveTaskCount: {}",
+        subtask,
+        runningTaskCount,
+        aliveTaskCount);
   }
 
   public synchronized void stop() {
     if (runningTaskCount <= 0) {
       throw new IllegalStateException("runningTaskCount <= 0");
     }
+
     if (runningTaskCount == 1) {
       executor.stop(subtask.getTaskID());
     }
+
     runningTaskCount--;
+    LOGGER.info(
+        "Stop subtask {}. runningTaskCount: {}, aliveTaskCount: {}",
+        subtask,
+        runningTaskCount,
+        aliveTaskCount);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
index 6f1f488785c..e29d09bb103 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
@@ -20,13 +20,15 @@
 package org.apache.iotdb.db.pipe.task.subtask.connector;
 
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.pipe.commit.PipeEventCommitManager;
 import org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant;
 import 
org.apache.iotdb.db.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
+import 
org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskConnectorRuntimeEnvironment;
 import 
org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
 import org.apache.iotdb.db.pipe.task.connection.BoundedBlockingPendingQueue;
 import org.apache.iotdb.pipe.api.PipeConnector;
-import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeRuntimeEnvironment;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -50,7 +52,18 @@ public class PipeConnectorSubtaskManager {
   public synchronized String register(
       PipeConnectorSubtaskExecutor executor,
       PipeParameters pipeConnectorParameters,
-      PipeRuntimeEnvironment pipeRuntimeEnvironment) {
+      PipeTaskConnectorRuntimeEnvironment environment) {
+    final String connectorKey =
+        pipeConnectorParameters
+            .getStringOrDefault(
+                Arrays.asList(PipeConnectorConstant.CONNECTOR_KEY, 
PipeConnectorConstant.SINK_KEY),
+                BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName())
+            // Convert the value of `CONNECTOR_KEY` or `SINK_KEY` to lowercase
+            // for matching in `CONNECTOR_CONSTRUCTORS`
+            .toLowerCase();
+    PipeEventCommitManager.getInstance()
+        .register(environment.getPipeName(), environment.getRegionId(), 
connectorKey);
+
     final String attributeSortedString =
         new TreeMap<>(pipeConnectorParameters.getAttribute()).toString();
 
@@ -78,7 +91,7 @@ public class PipeConnectorSubtaskManager {
         try {
           pipeConnector.validate(new 
PipeParameterValidator(pipeConnectorParameters));
           pipeConnector.customize(
-              pipeConnectorParameters, new 
PipeTaskRuntimeConfiguration(pipeRuntimeEnvironment));
+              pipeConnectorParameters, new 
PipeTaskRuntimeConfiguration(environment));
           pipeConnector.handshake();
         } catch (Exception e) {
           throw new PipeException(
@@ -90,10 +103,8 @@ public class PipeConnectorSubtaskManager {
             new PipeConnectorSubtask(
                 String.format(
                     "%s_%s_%s",
-                    attributeSortedString,
-                    pipeRuntimeEnvironment.getCreationTime(),
-                    connectorIndex),
-                pipeRuntimeEnvironment.getCreationTime(),
+                    attributeSortedString, environment.getCreationTime(), 
connectorIndex),
+                environment.getCreationTime(),
                 attributeSortedString,
                 connectorIndex,
                 pendingQueue,
@@ -115,18 +126,21 @@ public class PipeConnectorSubtaskManager {
     return attributeSortedString;
   }
 
-  public synchronized void deregister(String attributeSortedString) {
+  public synchronized void deregister(
+      String pipeName, int dataRegionId, String attributeSortedString) {
     if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
       throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + 
attributeSortedString);
     }
 
     final List<PipeConnectorSubtaskLifeCycle> lifeCycles =
         attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString);
-    lifeCycles.removeIf(PipeConnectorSubtaskLifeCycle::deregister);
+    lifeCycles.removeIf(o -> o.deregister(pipeName));
 
     if (lifeCycles.isEmpty()) {
       attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString);
     }
+
+    PipeEventCommitManager.getInstance().deregister(pipeName, dataRegionId);
   }
 
   public synchronized void start(String attributeSortedString) {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
index 642a66dc660..da2a7a00667 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
@@ -194,7 +194,7 @@ public class TsFileInsertionDataContainerTest {
     try (final TsFileInsertionDataContainer alignedContainer =
             new TsFileInsertionDataContainer(alignedTsFile, "root", startTime, 
endTime);
         final TsFileInsertionDataContainer nonalignedContainer =
-            new TsFileInsertionDataContainer(nonalignedTsFile, "root", 
startTime, endTime); ) {
+            new TsFileInsertionDataContainer(nonalignedTsFile, "root", 
startTime, endTime)) {
       AtomicInteger count1 = new AtomicInteger(0);
       AtomicInteger count2 = new AtomicInteger(0);
       AtomicInteger count3 = new AtomicInteger(0);

Reply via email to