This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 79ea1ef23d2 Pipe: Enable `'sink.format'='tsfile'` to use tsFile as 
tablet event batch (#12737)
79ea1ef23d2 is described below

commit 79ea1ef23d29ce6b5290a01060c3614ded3e1f13
Author: Caideyipi <[email protected]>
AuthorDate: Mon Jun 24 23:32:43 2024 +0800

    Pipe: Enable `'sink.format'='tsfile'` to use tsFile as tablet event batch 
(#12737)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../pipe/it/autocreate/IoTDBPipeDataSinkIT.java    |  89 +++++
 .../protocol/IoTDBConfigRegionConnector.java       |  13 +-
 .../evolvable/batch/PipeTabletEventBatch.java      | 133 ++++++++
 .../PipeTabletEventPlainBatch.java}                | 117 ++-----
 .../batch/PipeTabletEventTsFileBatch.java          | 365 +++++++++++++++++++++
 .../PipeTransferBatchReqBuilder.java               |  96 ++++--
 .../request/PipeTransferTabletRawReq.java          |   4 +-
 .../protocol/legacy/IoTDBLegacyPipeConnector.java  |  11 +-
 .../PipeConsensusTransferBatchReqBuilder.java      |   8 +-
 .../async/IoTDBDataRegionAsyncConnector.java       | 228 +++++++------
 .../PipeTransferTabletBatchEventHandler.java       |  31 +-
 ...Handler.java => PipeTransferTsFileHandler.java} | 134 ++++++--
 .../thrift/sync/IoTDBDataRegionSyncConnector.java  | 126 +++++--
 .../thrift/sync/IoTDBSchemaRegionConnector.java    |  13 +-
 .../tablet/PipeInsertNodeTabletInsertionEvent.java |  18 +-
 .../tsfile/TsFileInsertionDataContainer.java       |   9 +-
 .../db/pipe/resource/memory/PipeMemoryManager.java |  63 +---
 ...oryWeighUtil.java => PipeMemoryWeightUtil.java} |  63 +++-
 .../pipe/resource/tsfile/PipeTsFileResource.java   |   9 +-
 .../subtask/connector/PipeConnectorSubtask.java    |   2 +-
 .../SubscriptionPrefetchingTabletsQueue.java       |   6 +-
 .../config/constant/PipeConnectorConstant.java     |  12 +-
 .../pipe/connector/protocol/IoTDBConnector.java    |  40 ++-
 .../connector/protocol/IoTDBSslSyncConnector.java  |  16 +-
 .../iotdb/commons/pipe/event/EnrichedEvent.java    |   2 +-
 .../task/subtask/PipeAbstractConnectorSubtask.java |  13 +-
 26 files changed, 1204 insertions(+), 417 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
index 27893c49381..d7149a401b6 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
@@ -99,6 +99,95 @@ public class IoTDBPipeDataSinkIT extends 
AbstractPipeDualAutoIT {
     }
   }
 
+  @Test
+  public void testSinkTsFileFormat() throws Exception {
+    final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    final String receiverIp = receiverDataNode.getIp();
+    final int receiverPort = receiverDataNode.getPort();
+
+    try (final SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+
+      // Do not fail if the failure has nothing to do with pipe
+      // Because the failures will randomly generate due to resource limitation
+      if (!TestUtils.tryExecuteNonQueriesWithRetry(
+          senderEnv,
+          Arrays.asList("insert into root.vehicle.d0(time, s1) values (1, 1)", 
"flush"))) {
+        return;
+      }
+
+      final Map<String, String> extractorAttributes = new HashMap<>();
+      final Map<String, String> processorAttributes = new HashMap<>();
+      final Map<String, String> connectorAttributes = new HashMap<>();
+
+      extractorAttributes.put("extractor.realtime.mode", "forced-log");
+
+      connectorAttributes.put("connector", "iotdb-thrift-connector");
+      connectorAttributes.put("connector.batch.enable", "false");
+      connectorAttributes.put("connector.ip", receiverIp);
+      connectorAttributes.put("connector.port", 
Integer.toString(receiverPort));
+      connectorAttributes.put("connector.format", "tsfile");
+      connectorAttributes.put("connector.realtime-first", "false");
+
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+          client
+              .createPipe(
+                  new TCreatePipeReq("testPipe", connectorAttributes)
+                      .setExtractorAttributes(extractorAttributes)
+                      .setProcessorAttributes(processorAttributes))
+              .getCode());
+
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("testPipe").getCode());
+
+      // Do not fail if the failure has nothing to do with pipe
+      // Because the failures will randomly generate due to resource limitation
+      if (!TestUtils.tryExecuteNonQueriesWithRetry(
+          senderEnv,
+          Arrays.asList("insert into root.vehicle.d0(time, s1) values (2, 1)", 
"flush"))) {
+        return;
+      }
+
+      TestUtils.assertDataEventuallyOnEnv(
+          receiverEnv,
+          "select * from root.**",
+          "Time,root.vehicle.d0.s1,",
+          Collections.unmodifiableSet(new HashSet<>(Arrays.asList("1,1.0,", 
"2,1.0,"))));
+
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.dropPipe("testPipe").getCode());
+
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+          client
+              .createPipe(
+                  new TCreatePipeReq("testPipe", connectorAttributes)
+                      .setExtractorAttributes(extractorAttributes)
+                      .setProcessorAttributes(processorAttributes))
+              .getCode());
+
+      // Do not fail if the failure has nothing to do with pipe
+      // Because the failures will randomly generate due to resource limitation
+      if (!TestUtils.tryExecuteNonQueriesWithRetry(
+          senderEnv,
+          Arrays.asList(
+              "insert into root.vehicle.d0(time, s1) values (4, 1)",
+              "insert into root.vehicle.d0(time, s1) values (3, 1), (0, 1)",
+              "flush"))) {
+        return;
+      }
+
+      TestUtils.assertDataEventuallyOnEnv(
+          receiverEnv,
+          "select * from root.**",
+          "Time,root.vehicle.d0.s1,",
+          Collections.unmodifiableSet(
+              new HashSet<>(Arrays.asList("0,1.0,", "1,1.0,", "2,1.0,", 
"3,1.0,", "4,1.0,"))));
+    }
+  }
+
   @Test
   public void testLegacyConnector() throws Exception {
     final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
index 25fc9de720a..9ba081018b9 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
@@ -47,6 +47,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 
@@ -188,9 +189,17 @@ public class IoTDBConfigRegionConnector extends 
IoTDBSslSyncConnector {
     final Pair<IoTDBSyncClient, Boolean> clientAndStatus = 
clientManager.getClient();
 
     // 1. Transfer snapshotFile, and template File if exists
-    transferFilePieces(pipeName, creationTime, snapshotFile, clientAndStatus, 
true);
+    transferFilePieces(
+        Collections.singletonMap(new Pair<>(pipeName, creationTime), 1.0),
+        snapshotFile,
+        clientAndStatus,
+        true);
     if (Objects.nonNull(templateFile)) {
-      transferFilePieces(pipeName, creationTime, templateFile, 
clientAndStatus, true);
+      transferFilePieces(
+          Collections.singletonMap(new Pair<>(pipeName, creationTime), 1.0),
+          templateFile,
+          clientAndStatus,
+          true);
     }
     // 2. Transfer file seal signal, which means the snapshots are transferred 
completely
     final TPipeTransferResp resp;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
new file mode 100644
index 00000000000..1f4123c6cc2
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.payload.evolvable.batch;
+
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+
+import org.apache.tsfile.exception.write.WriteProcessException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public abstract class PipeTabletEventBatch implements AutoCloseable {
+
+  protected final List<EnrichedEvent> events = new ArrayList<>();
+
+  private final int maxDelayInMs;
+  private long firstEventProcessingTime = Long.MIN_VALUE;
+
+  protected long totalBufferSize = 0;
+
+  protected volatile boolean isClosed = false;
+
+  protected PipeTabletEventBatch(final int maxDelayInMs) {
+    this.maxDelayInMs = maxDelayInMs;
+  }
+
+  /**
+   * Try offer {@link Event} into batch if the given {@link Event} is not 
duplicated.
+   *
+   * @param event the given {@link Event}
+   * @return {@code true} if the batch can be transferred
+   */
+  synchronized boolean onEvent(final TabletInsertionEvent event)
+      throws WALPipeException, IOException, WriteProcessException {
+    if (isClosed || !(event instanceof EnrichedEvent)) {
+      return false;
+    }
+
+    // The deduplication logic here is to avoid the accumulation of
+    // the same event in a batch when retrying.
+    if (events.isEmpty() || !Objects.equals(events.get(events.size() - 1), 
event)) {
+      // We increase the reference count for this event to determine if the 
event may be released.
+      if (((EnrichedEvent) event)
+          
.increaseReferenceCount(PipeTransferBatchReqBuilder.class.getName())) {
+
+        if (constructBatch(event)) {
+          events.add((EnrichedEvent) event);
+        }
+
+        if (firstEventProcessingTime == Long.MIN_VALUE) {
+          firstEventProcessingTime = System.currentTimeMillis();
+        }
+      } else {
+        ((EnrichedEvent) event)
+            
.decreaseReferenceCount(PipeTransferBatchReqBuilder.class.getName(), false);
+      }
+    }
+
+    return shouldEmit();
+  }
+
+  /**
+   * Added an {@link TabletInsertionEvent} into batch.
+   *
+   * @param event the {@link TabletInsertionEvent} in batch
+   * @return {@code true} if the event is calculated into batch, {@code false} 
if the event is
+   *     cached and not emitted in this batch. If there are failure 
encountered, just throw
+   *     exceptions and do not return {@code false} here.
+   */
+  protected abstract boolean constructBatch(final TabletInsertionEvent event)
+      throws WALPipeException, IOException, WriteProcessException;
+
+  private boolean shouldEmit() {
+    return totalBufferSize >= getMaxBatchSizeInBytes()
+        || System.currentTimeMillis() - firstEventProcessingTime >= 
maxDelayInMs;
+  }
+
+  protected abstract long getMaxBatchSizeInBytes();
+
+  public synchronized void onSuccess() {
+    events.clear();
+
+    totalBufferSize = 0;
+
+    firstEventProcessingTime = Long.MIN_VALUE;
+  }
+
+  @Override
+  public synchronized void close() {
+    isClosed = true;
+
+    clearEventsReferenceCount(PipeTabletEventBatch.class.getName());
+    events.clear();
+  }
+
+  public void decreaseEventsReferenceCount(final String holderMessage, final 
boolean shouldReport) {
+    events.forEach(event -> event.decreaseReferenceCount(holderMessage, 
shouldReport));
+  }
+
+  private void clearEventsReferenceCount(final String holderMessage) {
+    events.forEach(event -> event.clearReferenceCount(holderMessage));
+  }
+
+  public List<EnrichedEvent> deepCopyEvents() {
+    return new ArrayList<>(events);
+  }
+
+  boolean isEmpty() {
+    return events.isEmpty();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventPlainBatch.java
similarity index 61%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeEventBatch.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventPlainBatch.java
index 97070313308..0b97487b04e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventPlainBatch.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.connector.payload.evolvable.builder;
+package org.apache.iotdb.db.pipe.connector.payload.evolvable.batch;
 
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBatchReq;
@@ -27,7 +27,6 @@ import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
-import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 
 import org.apache.tsfile.utils.Pair;
@@ -45,30 +44,22 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
-public class PipeEventBatch implements AutoCloseable {
+public class PipeTabletEventPlainBatch extends PipeTabletEventBatch {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeEventBatch.class);
-
-  private final List<Event> events = new ArrayList<>();
-  private final List<Long> requestCommitIds = new ArrayList<>();
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTabletEventPlainBatch.class);
 
   private final List<ByteBuffer> binaryBuffers = new ArrayList<>();
   private final List<ByteBuffer> insertNodeBuffers = new ArrayList<>();
   private final List<ByteBuffer> tabletBuffers = new ArrayList<>();
 
-  // limit in delayed time
-  private final int maxDelayInMs;
-  private long firstEventProcessingTime = Long.MIN_VALUE;
-
   // limit in buffer size
   private final PipeMemoryBlock allocatedMemoryBlock;
-  private long totalBufferSize = 0;
 
   // Used to rate limit when transferring data
   private final Map<Pair<String, Long>, Long> pipe2BytesAccumulated = new 
HashMap<>();
 
-  public PipeEventBatch(int maxDelayInMs, long requestMaxBatchSizeInBytes) {
-    this.maxDelayInMs = maxDelayInMs;
+  PipeTabletEventPlainBatch(final int maxDelayInMs, final long 
requestMaxBatchSizeInBytes) {
+    super(maxDelayInMs);
     this.allocatedMemoryBlock =
         PipeResourceManager.memory()
             .tryAllocate(requestMaxBatchSizeInBytes)
@@ -86,68 +77,34 @@ public class PipeEventBatch implements AutoCloseable {
 
     if (getMaxBatchSizeInBytes() != requestMaxBatchSizeInBytes) {
       LOGGER.info(
-          "PipeTransferBatchReqBuilder: the max batch size is adjusted from {} 
to {} due to the "
+          "PipeTabletEventBatch: the max batch size is adjusted from {} to {} 
due to the "
               + "memory restriction",
           requestMaxBatchSizeInBytes,
           getMaxBatchSizeInBytes());
     }
   }
 
-  /**
-   * Try offer {@link Event} into batch if the given {@link Event} is not 
duplicated.
-   *
-   * @param event the given {@link Event}
-   * @return {@code true} if the batch can be transferred
-   */
-  public synchronized boolean onEvent(final TabletInsertionEvent event)
-      throws IOException, WALPipeException {
-    if (!(event instanceof EnrichedEvent)) {
-      return false;
-    }
-
-    final long requestCommitId = ((EnrichedEvent) event).getCommitId();
-
-    // The deduplication logic here is to avoid the accumulation of the same 
event in a batch when
-    // retrying.
-    if ((events.isEmpty() || !events.get(events.size() - 1).equals(event))) {
-      // We increase the reference count for this event to determine if the 
event may be released.
-      if (((EnrichedEvent) event)
-          
.increaseReferenceCount(PipeTransferBatchReqBuilder.class.getName())) {
-        events.add(event);
-        requestCommitIds.add(requestCommitId);
-
-        final int bufferSize = buildTabletInsertionBuffer(event);
-        totalBufferSize += bufferSize;
-        pipe2BytesAccumulated.compute(
-            new Pair<>(
-                ((EnrichedEvent) event).getPipeName(), ((EnrichedEvent) 
event).getCreationTime()),
-            (pipeName, bytesAccumulated) ->
-                bytesAccumulated == null ? bufferSize : bytesAccumulated + 
bufferSize);
-
-        if (firstEventProcessingTime == Long.MIN_VALUE) {
-          firstEventProcessingTime = System.currentTimeMillis();
-        }
-      } else {
-        ((EnrichedEvent) event)
-            
.decreaseReferenceCount(PipeTransferBatchReqBuilder.class.getName(), false);
-      }
-    }
-
-    return totalBufferSize >= getMaxBatchSizeInBytes()
-        || System.currentTimeMillis() - firstEventProcessingTime >= 
maxDelayInMs;
+  @Override
+  protected boolean constructBatch(final TabletInsertionEvent event)
+      throws WALPipeException, IOException {
+    final int bufferSize = buildTabletInsertionBuffer(event);
+    totalBufferSize += bufferSize;
+    pipe2BytesAccumulated.compute(
+        new Pair<>(
+            ((EnrichedEvent) event).getPipeName(), ((EnrichedEvent) 
event).getCreationTime()),
+        (pipeName, bytesAccumulated) ->
+            bytesAccumulated == null ? bufferSize : bytesAccumulated + 
bufferSize);
+    return true;
   }
 
+  @Override
   public synchronized void onSuccess() {
+    super.onSuccess();
+
     binaryBuffers.clear();
     insertNodeBuffers.clear();
     tabletBuffers.clear();
 
-    events.clear();
-    requestCommitIds.clear();
-
-    firstEventProcessingTime = Long.MIN_VALUE;
-
-    totalBufferSize = 0;
     pipe2BytesAccumulated.clear();
   }
 
@@ -156,22 +113,11 @@ public class PipeEventBatch implements AutoCloseable {
         binaryBuffers, insertNodeBuffers, tabletBuffers);
   }
 
-  private long getMaxBatchSizeInBytes() {
+  @Override
+  protected long getMaxBatchSizeInBytes() {
     return allocatedMemoryBlock.getMemoryUsageInBytes();
   }
 
-  public boolean isEmpty() {
-    return binaryBuffers.isEmpty() && insertNodeBuffers.isEmpty() && 
tabletBuffers.isEmpty();
-  }
-
-  public List<Event> deepCopyEvents() {
-    return new ArrayList<>(events);
-  }
-
-  public List<Long> deepCopyRequestCommitIds() {
-    return new ArrayList<>(requestCommitIds);
-  }
-
   public Map<Pair<String, Long>, Long> deepCopyPipeName2BytesAccumulated() {
     return new HashMap<>(pipe2BytesAccumulated);
   }
@@ -213,23 +159,8 @@ public class PipeEventBatch implements AutoCloseable {
 
   @Override
   public synchronized void close() {
-    clearEventsReferenceCount(PipeTransferBatchReqBuilder.class.getName());
-    allocatedMemoryBlock.close();
-  }
+    super.close();
 
-  public void decreaseEventsReferenceCount(final String holderMessage, final 
boolean shouldReport) {
-    for (final Event event : events) {
-      if (event instanceof EnrichedEvent) {
-        ((EnrichedEvent) event).decreaseReferenceCount(holderMessage, 
shouldReport);
-      }
-    }
-  }
-
-  public void clearEventsReferenceCount(final String holderMessage) {
-    for (final Event event : events) {
-      if (event instanceof EnrichedEvent) {
-        ((EnrichedEvent) event).clearReferenceCount(holderMessage);
-      }
-    }
+    allocatedMemoryBlock.close();
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
new file mode 100644
index 00000000000..6abca7b3183
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.payload.evolvable.batch;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
+import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager;
+import 
org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.tsfile.common.constant.TsFileConstant;
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.read.common.Path;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.TsFileWriter;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+public class PipeTabletEventTsFileBatch extends PipeTabletEventBatch {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTabletEventTsFileBatch.class);
+
+  private static final AtomicReference<FolderManager> FOLDER_MANAGER = new 
AtomicReference<>();
+  private static final AtomicLong BATCH_ID_GENERATOR = new AtomicLong(0);
+  private final AtomicLong currentBatchId = new 
AtomicLong(BATCH_ID_GENERATOR.incrementAndGet());
+  private final File batchFileBaseDir;
+
+  private static final String TS_FILE_PREFIX = "tb"; // tb means tablet batch
+  private final AtomicLong tsFileIdGenerator = new AtomicLong(0);
+
+  private final long maxSizeInBytes;
+
+  private final Map<Pair<String, Long>, Double> pipeName2WeightMap = new 
HashMap<>();
+
+  private final List<Tablet> tabletList = new ArrayList<>();
+  private final List<Boolean> isTabletAlignedList = new ArrayList<>();
+
+  private volatile TsFileWriter fileWriter;
+
+  PipeTabletEventTsFileBatch(final int maxDelayInMs, final long 
requestMaxBatchSizeInBytes) {
+    super(maxDelayInMs);
+
+    this.maxSizeInBytes = requestMaxBatchSizeInBytes;
+    try {
+      this.batchFileBaseDir = getNextBaseDir();
+    } catch (final Exception e) {
+      throw new PipeException(
+          String.format("Failed to create file dir for batch: %s", 
e.getMessage()));
+    }
+  }
+
+  private File getNextBaseDir() throws DiskSpaceInsufficientException {
+    if (FOLDER_MANAGER.get() == null) {
+      synchronized (FOLDER_MANAGER) {
+        if (FOLDER_MANAGER.get() == null) {
+          FOLDER_MANAGER.set(
+              new FolderManager(
+                  
Arrays.stream(IoTDBDescriptor.getInstance().getConfig().getPipeReceiverFileDirs())
+                      .map(fileDir -> fileDir + File.separator + ".batch")
+                      .collect(Collectors.toList()),
+                  DirectoryStrategyType.SEQUENCE_STRATEGY));
+        }
+      }
+    }
+
+    final File baseDir =
+        new File(FOLDER_MANAGER.get().getNextFolder(), 
Long.toString(currentBatchId.get()));
+    if (baseDir.exists()) {
+      FileUtils.deleteQuietly(baseDir);
+    }
+    if (!baseDir.exists() && !baseDir.mkdirs()) {
+      LOGGER.warn(
+          "Batch id = {}: Failed to create batch file dir {}.",
+          currentBatchId.get(),
+          baseDir.getPath());
+      throw new PipeException(
+          String.format(
+              "Failed to create batch file dir %s. (Batch id = %s)",
+              baseDir.getPath(), currentBatchId.get()));
+    }
+    LOGGER.info(
+        "Batch id = {}: Create batch dir successfully, batch file dir = {}.",
+        currentBatchId.get(),
+        baseDir.getPath());
+    return baseDir;
+  }
+
+  @Override
+  protected boolean constructBatch(final TabletInsertionEvent event) {
+    if (event instanceof PipeInsertNodeTabletInsertionEvent) {
+      final PipeInsertNodeTabletInsertionEvent insertNodeTabletInsertionEvent =
+          (PipeInsertNodeTabletInsertionEvent) event;
+      final List<Tablet> tablets = 
insertNodeTabletInsertionEvent.convertToTablets();
+      for (int i = 0; i < tablets.size(); ++i) {
+        final Tablet tablet = tablets.get(i);
+        if (tablet.rowSize == 0) {
+          continue;
+        }
+        bufferTablet(
+            insertNodeTabletInsertionEvent.getPipeName(),
+            insertNodeTabletInsertionEvent.getCreationTime(),
+            tablet,
+            insertNodeTabletInsertionEvent.isAligned(i));
+      }
+    } else if (event instanceof PipeRawTabletInsertionEvent) {
+      final PipeRawTabletInsertionEvent rawTabletInsertionEvent =
+          (PipeRawTabletInsertionEvent) event;
+      final Tablet tablet = rawTabletInsertionEvent.convertToTablet();
+      if (tablet.rowSize == 0) {
+        return true;
+      }
+      bufferTablet(
+          rawTabletInsertionEvent.getPipeName(),
+          rawTabletInsertionEvent.getCreationTime(),
+          tablet,
+          rawTabletInsertionEvent.isAligned());
+    } else {
+      LOGGER.warn(
+          "Batch id = {}: Unsupported event {} type {} when constructing 
tsfile batch",
+          currentBatchId.get(),
+          event,
+          event.getClass());
+    }
+    return true;
+  }
+
+  private void bufferTablet(
+      final String pipeName, long creationTime, Tablet tablet, boolean 
isAligned) {
+    totalBufferSize += PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet);
+
+    pipeName2WeightMap.compute(
+        new Pair<>(pipeName, creationTime),
+        (pipe, weight) -> Objects.nonNull(weight) ? ++weight : 1);
+
+    tabletList.add(tablet);
+    isTabletAlignedList.add(isAligned);
+  }
+
+  public Map<Pair<String, Long>, Double> deepCopyPipe2WeightMap() {
+    final double sum = 
pipeName2WeightMap.values().stream().reduce(Double::sum).orElse(0.0);
+    if (sum == 0.0) {
+      return Collections.emptyMap();
+    }
+    pipeName2WeightMap.entrySet().forEach(entry -> 
entry.setValue(entry.getValue() / sum));
+    return new HashMap<>(pipeName2WeightMap);
+  }
+
+  public synchronized List<File> sealTsFiles() throws IOException, 
WriteProcessException {
+    return isClosed ? Collections.emptyList() : writeTabletsToTsFiles();
+  }
+
+  private List<File> writeTabletsToTsFiles() throws IOException, 
WriteProcessException {
+    final Map<String, List<Tablet>> device2Tablets = new HashMap<>();
+    final Map<String, Boolean> device2Aligned = new HashMap<>();
+
+    // Sort the tablets by device id
+    for (int i = 0, size = tabletList.size(); i < size; ++i) {
+      final Tablet tablet = tabletList.get(i);
+      final String deviceId = tablet.deviceId;
+      device2Tablets.computeIfAbsent(deviceId, k -> new 
ArrayList<>()).add(tablet);
+      device2Aligned.put(deviceId, isTabletAlignedList.get(i));
+    }
+
+    // Sort the tablets by start time in each device
+    for (final List<Tablet> tablets : device2Tablets.values()) {
+      tablets.sort(
+          // Each tablet has at least one timestamp
+          Comparator.comparingLong(tablet -> tablet.timestamps[0]));
+    }
+
+    // Replace ArrayList with LinkedList to improve performance
+    final Map<String, LinkedList<Tablet>> device2TabletsLinkedList = new 
HashMap<>();
+    for (final Map.Entry<String, List<Tablet>> entry : 
device2Tablets.entrySet()) {
+      device2TabletsLinkedList.put(entry.getKey(), new 
LinkedList<>(entry.getValue()));
+    }
+    // Clear the original device2Tablets to release memory
+    device2Tablets.clear();
+
+    // Write the tablets to the tsfile device by device, and the tablets
+    // in the same device are written in order of start time. Tablets in
+    // the same device should not be written if their time ranges overlap.
+    // If overlapped, we try to write the tablets whose device id is not
+    // the same as the previous one. For the tablets not written in the
+    // previous round, we write them in a new tsfile.
+    final List<File> sealedFiles = new ArrayList<>();
+
+    // Try making the tsfile size as large as possible
+    while (!device2TabletsLinkedList.isEmpty()) {
+      if (Objects.isNull(fileWriter)) {
+        fileWriter =
+            new TsFileWriter(
+                new File(
+                    batchFileBaseDir,
+                    TS_FILE_PREFIX
+                        + "_"
+                        + 
IoTDBDescriptor.getInstance().getConfig().getDataNodeId()
+                        + "_"
+                        + currentBatchId.get()
+                        + "_"
+                        + tsFileIdGenerator.getAndIncrement()
+                        + TsFileConstant.TSFILE_SUFFIX));
+      }
+
+      final Iterator<Map.Entry<String, LinkedList<Tablet>>> iterator =
+          device2TabletsLinkedList.entrySet().iterator();
+
+      while (iterator.hasNext()) {
+        final Map.Entry<String, LinkedList<Tablet>> entry = iterator.next();
+        final String deviceId = entry.getKey();
+        final LinkedList<Tablet> tablets = entry.getValue();
+
+        final List<Tablet> tabletsToWrite = new ArrayList<>();
+
+        Tablet lastTablet = null;
+        while (!tablets.isEmpty()) {
+          final Tablet tablet = tablets.peekFirst();
+          if (Objects.isNull(lastTablet)
+              // lastTablet.rowSize is not 0
+              || lastTablet.timestamps[lastTablet.rowSize - 1] < 
tablet.timestamps[0]) {
+            tabletsToWrite.add(tablet);
+            lastTablet = tablet;
+            tablets.pollFirst();
+          } else {
+            break;
+          }
+        }
+
+        if (tablets.isEmpty()) {
+          iterator.remove();
+        }
+
+        final boolean isAligned = device2Aligned.get(deviceId);
+        for (final Tablet tablet : tabletsToWrite) {
+          if (isAligned) {
+            try {
+              fileWriter.registerAlignedTimeseries(new Path(tablet.deviceId), 
tablet.getSchemas());
+            } catch (final WriteProcessException ignore) {
+              // Do nothing if the timeSeries has been registered
+            }
+
+            fileWriter.writeAligned(tablet);
+          } else {
+            for (final MeasurementSchema schema : tablet.getSchemas()) {
+              try {
+                fileWriter.registerTimeseries(new Path(tablet.deviceId), 
schema);
+              } catch (final WriteProcessException ignore) {
+                // Do nothing if the timeSeries has been registered
+              }
+            }
+
+            fileWriter.write(tablet);
+          }
+        }
+      }
+
+      fileWriter.close();
+      final File sealedFile = fileWriter.getIOWriter().getFile();
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug(
+            "Batch id = {}: Seal tsfile {} successfully.",
+            currentBatchId.get(),
+            sealedFile.getPath());
+      }
+      sealedFiles.add(sealedFile);
+      fileWriter = null;
+    }
+
+    return sealedFiles;
+  }
+
+  @Override
+  protected long getMaxBatchSizeInBytes() {
+    return maxSizeInBytes;
+  }
+
+  @Override
+  public synchronized void onSuccess() {
+    super.onSuccess();
+
+    pipeName2WeightMap.clear();
+
+    tabletList.clear();
+    isTabletAlignedList.clear();
+
+    // We don't need to delete the tsFile here, because the tsFile
+    // will be deleted after the file is transferred.
+    fileWriter = null;
+  }
+
+  @Override
+  public synchronized void close() {
+    super.close();
+
+    pipeName2WeightMap.clear();
+
+    tabletList.clear();
+    isTabletAlignedList.clear();
+
+    if (Objects.nonNull(fileWriter)) {
+      try {
+        fileWriter.close();
+      } catch (final Exception e) {
+        LOGGER.info(
+            "Batch id = {}: Failed to close the tsfile {} when trying to close 
batch, because {}",
+            currentBatchId.get(),
+            fileWriter.getIOWriter().getFile().getPath(),
+            e.getMessage(),
+            e);
+      }
+
+      try {
+        FileUtils.delete(fileWriter.getIOWriter().getFile());
+      } catch (final Exception e) {
+        LOGGER.info(
+            "Batch id = {}: Failed to delete the tsfile {} when trying to 
close batch, because {}",
+            currentBatchId.get(),
+            fileWriter.getIOWriter().getFile().getPath(),
+            e.getMessage(),
+            e);
+      }
+
+      fileWriter = null;
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
similarity index 55%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
index 33b524b1a9a..f28f19e4da1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.connector.payload.evolvable.builder;
+package org.apache.iotdb.db.pipe.connector.payload.evolvable.batch;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 
+import org.apache.tsfile.exception.write.WriteProcessException;
 import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,12 +42,18 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_HYBRID_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_TS_FILE_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PLAIN_BATCH_DELAY_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_TS_FILE_BATCH_DELAY_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_TS_FILE_BATCH_SIZE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LEADER_CACHE_ENABLE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_FORMAT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_DELAY_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_SIZE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LEADER_CACHE_ENABLE_KEY;
@@ -62,28 +69,53 @@ public class PipeTransferBatchReqBuilder implements 
AutoCloseable {
 
   // If the leader cache is disabled (or unable to find the endpoint of event 
in the leader cache),
   // the event will be stored in the default batch.
-  private final PipeEventBatch defaultBatch;
+  private final PipeTabletEventBatch defaultBatch;
   // If the leader cache is enabled, the batch will be divided by the leader 
endpoint,
   // each endpoint has a batch.
-  private final Map<TEndPoint, PipeEventBatch> endPointToBatch = new 
HashMap<>();
+  // This is only used in plain batch since tsfile does not return redirection 
info.
+  private final Map<TEndPoint, PipeTabletEventPlainBatch> endPointToBatch = 
new HashMap<>();
 
   public PipeTransferBatchReqBuilder(final PipeParameters parameters) {
+    final boolean usingTsFileBatch =
+        parameters
+            .getStringOrDefault(
+                Arrays.asList(CONNECTOR_FORMAT_KEY, SINK_FORMAT_KEY), 
CONNECTOR_FORMAT_HYBRID_VALUE)
+            .equals(CONNECTOR_FORMAT_TS_FILE_VALUE);
+
     useLeaderCache =
-        parameters.getBooleanOrDefault(
-            Arrays.asList(SINK_LEADER_CACHE_ENABLE_KEY, 
CONNECTOR_LEADER_CACHE_ENABLE_KEY),
-            CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE);
-
-    requestMaxDelayInMs =
-        parameters.getIntOrDefault(
-                Arrays.asList(CONNECTOR_IOTDB_BATCH_DELAY_KEY, 
SINK_IOTDB_BATCH_DELAY_KEY),
-                CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE)
-            * 1000;
-    requestMaxBatchSizeInBytes =
-        parameters.getLongOrDefault(
-            Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY, 
SINK_IOTDB_BATCH_SIZE_KEY),
-            CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE);
-
-    this.defaultBatch = new PipeEventBatch(requestMaxDelayInMs, 
requestMaxBatchSizeInBytes);
+        !usingTsFileBatch
+            && parameters.getBooleanOrDefault(
+                Arrays.asList(SINK_LEADER_CACHE_ENABLE_KEY, 
CONNECTOR_LEADER_CACHE_ENABLE_KEY),
+                CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE);
+
+    final int requestMaxDelayInSeconds;
+    if (usingTsFileBatch) {
+      requestMaxDelayInSeconds =
+          parameters.getIntOrDefault(
+              Arrays.asList(CONNECTOR_IOTDB_BATCH_DELAY_KEY, 
SINK_IOTDB_BATCH_DELAY_KEY),
+              CONNECTOR_IOTDB_TS_FILE_BATCH_DELAY_DEFAULT_VALUE);
+      requestMaxDelayInMs =
+          requestMaxDelayInSeconds < 0 ? Integer.MAX_VALUE : 
requestMaxDelayInSeconds * 1000;
+      requestMaxBatchSizeInBytes =
+          parameters.getLongOrDefault(
+              Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY, 
SINK_IOTDB_BATCH_SIZE_KEY),
+              CONNECTOR_IOTDB_TS_FILE_BATCH_SIZE_DEFAULT_VALUE);
+      this.defaultBatch =
+          new PipeTabletEventTsFileBatch(requestMaxDelayInMs, 
requestMaxBatchSizeInBytes);
+    } else {
+      requestMaxDelayInSeconds =
+          parameters.getIntOrDefault(
+              Arrays.asList(CONNECTOR_IOTDB_BATCH_DELAY_KEY, 
SINK_IOTDB_BATCH_DELAY_KEY),
+              CONNECTOR_IOTDB_PLAIN_BATCH_DELAY_DEFAULT_VALUE);
+      requestMaxDelayInMs =
+          requestMaxDelayInSeconds < 0 ? Integer.MAX_VALUE : 
requestMaxDelayInSeconds * 1000;
+      requestMaxBatchSizeInBytes =
+          parameters.getLongOrDefault(
+              Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY, 
SINK_IOTDB_BATCH_SIZE_KEY),
+              CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE);
+      this.defaultBatch =
+          new PipeTabletEventPlainBatch(requestMaxDelayInMs, 
requestMaxBatchSizeInBytes);
+    }
   }
 
   /**
@@ -91,12 +123,13 @@ public class PipeTransferBatchReqBuilder implements 
AutoCloseable {
    * duplicated.
    *
    * @param event the given {@link Event}
-   * @return {@link Pair}<{@link TEndPoint}, {@link PipeEventBatch}> not null 
means this {@link
-   *     PipeEventBatch} can be transferred. the first element is the leader 
endpoint to transfer to
-   *     (might be null), the second element is the batch to be transferred.
+   * @return {@link Pair}<{@link TEndPoint}, {@link 
PipeTabletEventPlainBatch}> not null means this
+   *     {@link PipeTabletEventPlainBatch} can be transferred. the first 
element is the leader
+   *     endpoint to transfer to (might be null), the second element is the 
batch to be transferred.
    */
-  public synchronized Pair<TEndPoint, PipeEventBatch> onEvent(final 
TabletInsertionEvent event)
-      throws IOException, WALPipeException {
+  public synchronized Pair<TEndPoint, PipeTabletEventBatch> onEvent(
+      final TabletInsertionEvent event)
+      throws IOException, WALPipeException, WriteProcessException {
     if (!(event instanceof EnrichedEvent)) {
       LOGGER.warn(
           "Unsupported event {} type {} when building transfer request", 
event, event.getClass());
@@ -124,15 +157,16 @@ public class PipeTransferBatchReqBuilder implements 
AutoCloseable {
       return defaultBatch.onEvent(event) ? new Pair<>(null, defaultBatch) : 
null;
     }
 
-    final PipeEventBatch batch =
+    final PipeTabletEventPlainBatch batch =
         endPointToBatch.computeIfAbsent(
-            endPoint, k -> new PipeEventBatch(requestMaxDelayInMs, 
requestMaxBatchSizeInBytes));
+            endPoint,
+            k -> new PipeTabletEventPlainBatch(requestMaxDelayInMs, 
requestMaxBatchSizeInBytes));
     return batch.onEvent(event) ? new Pair<>(endPoint, batch) : null;
   }
 
   /** Get all batches that have at least 1 event. */
-  public synchronized List<Pair<TEndPoint, PipeEventBatch>> 
getAllNonEmptyBatches() {
-    final List<Pair<TEndPoint, PipeEventBatch>> nonEmptyBatches = new 
ArrayList<>();
+  public synchronized List<Pair<TEndPoint, PipeTabletEventBatch>> 
getAllNonEmptyBatches() {
+    final List<Pair<TEndPoint, PipeTabletEventBatch>> nonEmptyBatches = new 
ArrayList<>();
     if (!defaultBatch.isEmpty()) {
       nonEmptyBatches.add(new Pair<>(null, defaultBatch));
     }
@@ -147,12 +181,12 @@ public class PipeTransferBatchReqBuilder implements 
AutoCloseable {
 
   public boolean isEmpty() {
     return defaultBatch.isEmpty()
-        && endPointToBatch.values().stream().allMatch(PipeEventBatch::isEmpty);
+        && 
endPointToBatch.values().stream().allMatch(PipeTabletEventPlainBatch::isEmpty);
   }
 
   @Override
   public synchronized void close() {
     defaultBatch.close();
-    endPointToBatch.values().forEach(PipeEventBatch::close);
+    endPointToBatch.values().forEach(PipeTabletEventPlainBatch::close);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java
index f6d677389e7..33a98d84e42 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java
@@ -91,7 +91,7 @@ public class PipeTransferTabletRawReq extends 
TPipeTransferReq {
     }
   }
 
-  private static boolean checkSorted(final Tablet tablet) {
+  public static boolean checkSorted(final Tablet tablet) {
     for (int i = 1; i < tablet.rowSize; i++) {
       if (tablet.timestamps[i] < tablet.timestamps[i - 1]) {
         return false;
@@ -100,7 +100,7 @@ public class PipeTransferTabletRawReq extends 
TPipeTransferReq {
     return true;
   }
 
-  private static void sortTablet(final Tablet tablet) {
+  public static void sortTablet(final Tablet tablet) {
     /*
      * following part of code sort the batch data by time,
      * so we can insert continuous data in value list to get a better 
performance
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
index 355d354c385..e964ddffd26 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
@@ -65,6 +65,8 @@ import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -325,8 +327,13 @@ public class IoTDBLegacyPipeConnector implements 
PipeConnector {
 
   private void doTransfer(final PipeInsertNodeTabletInsertionEvent 
pipeInsertNodeInsertionEvent)
       throws IoTDBConnectionException, StatementExecutionException {
-    for (final Tablet tablet : 
pipeInsertNodeInsertionEvent.convertToTablets()) {
-      if (pipeInsertNodeInsertionEvent.isAligned()) {
+    final List<Tablet> tablets = 
pipeInsertNodeInsertionEvent.convertToTablets();
+    for (int i = 0; i < tablets.size(); ++i) {
+      final Tablet tablet = tablets.get(i);
+      if (Objects.isNull(tablet) || tablet.rowSize == 0) {
+        continue;
+      }
+      if (pipeInsertNodeInsertionEvent.isAligned(i)) {
         sessionPool.insertAlignedTablet(tablet);
       } else {
         sessionPool.insertTablet(tablet);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
index daa70051d59..bf116c36798 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
@@ -46,10 +46,10 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
 
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PLAIN_BATCH_DELAY_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_DELAY_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_SIZE_KEY;
 
@@ -76,7 +76,7 @@ public abstract class PipeConsensusTransferBatchReqBuilder 
implements AutoClosea
     maxDelayInMs =
         parameters.getIntOrDefault(
                 Arrays.asList(CONNECTOR_IOTDB_BATCH_DELAY_KEY, 
SINK_IOTDB_BATCH_DELAY_KEY),
-                CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE)
+                CONNECTOR_IOTDB_PLAIN_BATCH_DELAY_DEFAULT_VALUE)
             * 1000;
 
     this.consensusGroupId = consensusGroupId;
@@ -85,7 +85,7 @@ public abstract class PipeConsensusTransferBatchReqBuilder 
implements AutoClosea
     final long requestMaxBatchSizeInBytes =
         parameters.getLongOrDefault(
             Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY, 
SINK_IOTDB_BATCH_SIZE_KEY),
-            CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE);
+            CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE);
 
     allocatedMemoryBlock =
         PipeResourceManager.memory()
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 3edcedc31d8..b5b4397fd60 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -24,21 +24,24 @@ import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
 import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBConnector;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import 
org.apache.iotdb.db.pipe.connector.client.IoTDBDataNodeAsyncClientManager;
-import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.PipeEventBatch;
-import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.PipeTransferBatchReqBuilder;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventBatch;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventPlainBatch;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventTsFileBatch;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTransferBatchReqBuilder;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTabletBatchEventHandler;
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTabletInsertNodeEventHandler;
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTabletRawEventHandler;
-import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTsFileInsertionEventHandler;
+import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTsFileHandler;
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBDataRegionSyncConnector;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import 
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
@@ -49,24 +52,26 @@ import 
org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 
+import org.apache.tsfile.exception.write.WriteProcessException;
 import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.HashMap;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LEADER_CACHE_ENABLE_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_MODE_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY;
@@ -112,13 +117,7 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
       final PipeParameters parameters, final PipeConnectorRuntimeConfiguration 
configuration)
       throws Exception {
     super.customize(parameters, configuration);
-
-    // Disable batch mode for retry connector, in case retry events are never 
sent again
-    final PipeParameters retryParameters =
-        new PipeParameters(new HashMap<>(parameters.getAttribute()));
-    retryParameters.getAttribute().put(SINK_IOTDB_BATCH_MODE_ENABLE_KEY, 
"false");
-    retryParameters.getAttribute().put(CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY, 
"false");
-    retryConnector.customize(retryParameters, configuration);
+    retryConnector.customize(parameters, configuration);
 
     clientManager =
         new IoTDBDataNodeAsyncClientManager(
@@ -157,69 +156,104 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
       return;
     }
 
-    transferWithoutCheck(tabletInsertionEvent);
-  }
-
-  private void transferWithoutCheck(final TabletInsertionEvent 
tabletInsertionEvent)
-      throws Exception {
     if (isTabletBatchModeEnabled) {
-      final Pair<TEndPoint, PipeEventBatch> endPointAndBatch =
+      final Pair<TEndPoint, PipeTabletEventBatch> endPointAndBatch =
           tabletBatchBuilder.onEvent(tabletInsertionEvent);
-      if (Objects.nonNull(endPointAndBatch)) {
-        transfer(
-            endPointAndBatch.getLeft(),
-            new 
PipeTransferTabletBatchEventHandler(endPointAndBatch.getRight(), this));
-        endPointAndBatch.getRight().onSuccess();
+      if (Objects.isNull(endPointAndBatch)) {
+        return;
       }
+      transferInBatchWithoutCheck(endPointAndBatch);
     } else {
-      if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
-        final PipeInsertNodeTabletInsertionEvent 
pipeInsertNodeTabletInsertionEvent =
-            (PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent;
-        // We increase the reference count for this event to determine if the 
event may be released.
-        if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(
-            IoTDBDataRegionAsyncConnector.class.getName())) {
-          pipeInsertNodeTabletInsertionEvent.decreaseReferenceCount(
-              IoTDBDataRegionAsyncConnector.class.getName(), false);
-          return;
-        }
+      transferInEventWithoutCheck(tabletInsertionEvent);
+    }
+  }
 
-        final InsertNode insertNode =
-            
pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible();
-        final TPipeTransferReq pipeTransferReq =
-            compressIfNeeded(
-                Objects.isNull(insertNode)
-                    ? PipeTransferTabletBinaryReq.toTPipeTransferReq(
-                        pipeInsertNodeTabletInsertionEvent.getByteBuffer())
-                    : 
PipeTransferTabletInsertNodeReq.toTPipeTransferReq(insertNode));
-        final PipeTransferTabletInsertNodeEventHandler 
pipeTransferInsertNodeReqHandler =
-            new PipeTransferTabletInsertNodeEventHandler(
-                pipeInsertNodeTabletInsertionEvent, pipeTransferReq, this);
+  private void transferInBatchWithoutCheck(
+      final Pair<TEndPoint, PipeTabletEventBatch> endPointAndBatch)
+      throws IOException, WriteProcessException {
+    final PipeTabletEventBatch batch = endPointAndBatch.getRight();
 
+    if (batch instanceof PipeTabletEventPlainBatch) {
+      transfer(
+          endPointAndBatch.getLeft(),
+          new PipeTransferTabletBatchEventHandler((PipeTabletEventPlainBatch) 
batch, this));
+    } else if (batch instanceof PipeTabletEventTsFileBatch) {
+      final PipeTabletEventTsFileBatch tsFileBatch = 
(PipeTabletEventTsFileBatch) batch;
+      final List<File> sealedFiles = tsFileBatch.sealTsFiles();
+      final Map<Pair<String, Long>, Double> pipe2WeightMap = 
tsFileBatch.deepCopyPipe2WeightMap();
+      final List<EnrichedEvent> events = tsFileBatch.deepCopyEvents();
+      final AtomicInteger eventsReferenceCount = new 
AtomicInteger(sealedFiles.size());
+      final AtomicBoolean eventsHadBeenAddedToRetryQueue = new 
AtomicBoolean(false);
+
+      for (final File sealedFile : sealedFiles) {
         transfer(
-            // getDeviceId() may return null for InsertRowsNode
-            pipeInsertNodeTabletInsertionEvent.getDeviceId(), 
pipeTransferInsertNodeReqHandler);
-      } else { // tabletInsertionEvent instanceof PipeRawTabletInsertionEvent
-        final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
-            (PipeRawTabletInsertionEvent) tabletInsertionEvent;
-        // We increase the reference count for this event to determine if the 
event may be released.
-        if (!pipeRawTabletInsertionEvent.increaseReferenceCount(
-            IoTDBDataRegionAsyncConnector.class.getName())) {
-          pipeRawTabletInsertionEvent.decreaseReferenceCount(
-              IoTDBDataRegionAsyncConnector.class.getName(), false);
-          return;
-        }
+            new PipeTransferTsFileHandler(
+                this,
+                pipe2WeightMap,
+                events,
+                eventsReferenceCount,
+                eventsHadBeenAddedToRetryQueue,
+                sealedFile,
+                null,
+                false));
+      }
+    } else {
+      LOGGER.warn(
+          "Unsupported batch type {} when transferring tablet insertion 
event.", batch.getClass());
+    }
 
-        final TPipeTransferReq pipeTransferTabletRawReq =
-            compressIfNeeded(
-                PipeTransferTabletRawReq.toTPipeTransferReq(
-                    pipeRawTabletInsertionEvent.convertToTablet(),
-                    pipeRawTabletInsertionEvent.isAligned()));
-        final PipeTransferTabletRawEventHandler pipeTransferTabletReqHandler =
-            new PipeTransferTabletRawEventHandler(
-                pipeRawTabletInsertionEvent, pipeTransferTabletRawReq, this);
+    endPointAndBatch.getRight().onSuccess();
+  }
+
+  private void transferInEventWithoutCheck(final TabletInsertionEvent 
tabletInsertionEvent)
+      throws Exception {
+    if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
+      final PipeInsertNodeTabletInsertionEvent 
pipeInsertNodeTabletInsertionEvent =
+          (PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent;
+      // We increase the reference count for this event to determine if the 
event may be released.
+      if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(
+          IoTDBDataRegionAsyncConnector.class.getName())) {
+        pipeInsertNodeTabletInsertionEvent.decreaseReferenceCount(
+            IoTDBDataRegionAsyncConnector.class.getName(), false);
+        return;
+      }
 
-        transfer(pipeRawTabletInsertionEvent.getDeviceId(), 
pipeTransferTabletReqHandler);
+      final InsertNode insertNode =
+          pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible();
+      final TPipeTransferReq pipeTransferReq =
+          compressIfNeeded(
+              Objects.isNull(insertNode)
+                  ? PipeTransferTabletBinaryReq.toTPipeTransferReq(
+                      pipeInsertNodeTabletInsertionEvent.getByteBuffer())
+                  : 
PipeTransferTabletInsertNodeReq.toTPipeTransferReq(insertNode));
+      final PipeTransferTabletInsertNodeEventHandler 
pipeTransferInsertNodeReqHandler =
+          new PipeTransferTabletInsertNodeEventHandler(
+              pipeInsertNodeTabletInsertionEvent, pipeTransferReq, this);
+
+      transfer(
+          // getDeviceId() may return null for InsertRowsNode
+          pipeInsertNodeTabletInsertionEvent.getDeviceId(), 
pipeTransferInsertNodeReqHandler);
+    } else { // tabletInsertionEvent instanceof PipeRawTabletInsertionEvent
+      final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
+          (PipeRawTabletInsertionEvent) tabletInsertionEvent;
+      // We increase the reference count for this event to determine if the 
event may be released.
+      if (!pipeRawTabletInsertionEvent.increaseReferenceCount(
+          IoTDBDataRegionAsyncConnector.class.getName())) {
+        pipeRawTabletInsertionEvent.decreaseReferenceCount(
+            IoTDBDataRegionAsyncConnector.class.getName(), false);
+        return;
       }
+
+      final TPipeTransferReq pipeTransferTabletRawReq =
+          compressIfNeeded(
+              PipeTransferTabletRawReq.toTPipeTransferReq(
+                  pipeRawTabletInsertionEvent.convertToTablet(),
+                  pipeRawTabletInsertionEvent.isAligned()));
+      final PipeTransferTabletRawEventHandler pipeTransferTabletReqHandler =
+          new PipeTransferTabletRawEventHandler(
+              pipeRawTabletInsertionEvent, pipeTransferTabletRawReq, this);
+
+      transfer(pipeRawTabletInsertionEvent.getDeviceId(), 
pipeTransferTabletReqHandler);
     }
   }
 
@@ -295,11 +329,24 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
         throw new 
FileNotFoundException(pipeTsFileInsertionEvent.getTsFile().getAbsolutePath());
       }
 
-      final PipeTransferTsFileInsertionEventHandler 
pipeTransferTsFileInsertionEventHandler =
-          new 
PipeTransferTsFileInsertionEventHandler(pipeTsFileInsertionEvent, this);
-
-      transfer(pipeTransferTsFileInsertionEventHandler);
-    } catch (Exception e) {
+      final PipeTransferTsFileHandler pipeTransferTsFileHandler =
+          new PipeTransferTsFileHandler(
+              this,
+              Collections.singletonMap(
+                  new Pair<>(
+                      pipeTsFileInsertionEvent.getPipeName(),
+                      pipeTsFileInsertionEvent.getCreationTime()),
+                  1.0),
+              Collections.singletonList(pipeTsFileInsertionEvent),
+              new AtomicInteger(1),
+              new AtomicBoolean(false),
+              pipeTsFileInsertionEvent.getTsFile(),
+              pipeTsFileInsertionEvent.getModFile(),
+              pipeTsFileInsertionEvent.isWithMod()
+                  && clientManager.supportModsIfIsDataNodeReceiver());
+
+      transfer(pipeTransferTsFileHandler);
+    } catch (final Exception e) {
       // Just in case. To avoid the case that exception occurred when 
constructing the handler.
       pipeTsFileInsertionEvent.decreaseReferenceCount(
           IoTDBDataRegionAsyncConnector.class.getName(), false);
@@ -307,15 +354,14 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
     }
   }
 
-  private void transfer(
-      final PipeTransferTsFileInsertionEventHandler 
pipeTransferTsFileInsertionEventHandler) {
+  private void transfer(final PipeTransferTsFileHandler 
pipeTransferTsFileHandler) {
     AsyncPipeDataTransferServiceClient client = null;
     try {
       client = clientManager.borrowClient();
-      pipeTransferTsFileInsertionEventHandler.transfer(clientManager, client);
+      pipeTransferTsFileHandler.transfer(clientManager, client);
     } catch (final Exception ex) {
       logOnClientException(client, ex);
-      pipeTransferTsFileInsertionEventHandler.onError(ex);
+      pipeTransferTsFileHandler.onError(ex);
     }
   }
 
@@ -401,27 +447,27 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
         }
       }
     }
+
+    // Trigger cron heartbeat event in retry connector to send batch in time
+    retryConnector.transfer(PipeConnectorSubtask.CRON_HEARTBEAT_EVENT);
   }
 
   /** Try its best to commit data in order. Flush can also be a trigger to 
transfer batched data. */
-  private void transferBatchedEventsIfNecessary() throws IOException {
+  private void transferBatchedEventsIfNecessary() throws IOException, 
WriteProcessException {
     if (!isTabletBatchModeEnabled || tabletBatchBuilder.isEmpty()) {
       return;
     }
 
-    for (final Pair<TEndPoint, PipeEventBatch> endPointAndBatch :
+    for (final Pair<TEndPoint, PipeTabletEventBatch> endPointAndBatch :
         tabletBatchBuilder.getAllNonEmptyBatches()) {
-      transfer(
-          endPointAndBatch.getLeft(),
-          new PipeTransferTabletBatchEventHandler(endPointAndBatch.getRight(), 
this));
-      endPointAndBatch.getRight().onSuccess();
+      transferInBatchWithoutCheck(endPointAndBatch);
     }
   }
 
   /**
-   * Add failure event to retry queue.
+   * Add failure {@link Event} to retry queue.
    *
-   * @param event event to retry
+   * @param event {@link Event} to retry
    */
   public void addFailureEventToRetryQueue(final Event event) {
     if (isClosed.get()) {
@@ -444,14 +490,12 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
   }
 
   /**
-   * Add failure events to retry queue.
+   * Add failure {@link EnrichedEvent}s to retry queue.
    *
-   * @param events events to retry
+   * @param events {@link EnrichedEvent}s to retry
    */
-  public void addFailureEventsToRetryQueue(final Iterable<Event> events) {
-    for (final Event event : events) {
-      addFailureEventToRetryQueue(event);
-    }
+  public void addFailureEventsToRetryQueue(final Iterable<EnrichedEvent> 
events) {
+    events.forEach(this::addFailureEventToRetryQueue);
   }
 
   public synchronized void clearRetryEventsReferenceCount() {
@@ -463,10 +507,6 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
     }
   }
 
-  public boolean supportModsIfIsDataNodeReceiver() {
-    return clientManager.supportModsIfIsDataNodeReceiver();
-  }
-
   //////////////////////////// Operations for close 
////////////////////////////
 
   /**
@@ -520,7 +560,7 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
             }
           });
       return count.get();
-    } catch (Exception e) {
+    } catch (final Exception e) {
       if (LOGGER.isDebugEnabled()) {
         LOGGER.debug("Failed to get retry event count for pipe {}.", pipeName, 
e);
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
index adc508e47e8..fb879c7b1dc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
@@ -24,10 +24,9 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferCompressedReq;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
-import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.PipeEventBatch;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventPlainBatch;
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
 import org.apache.iotdb.db.pipe.connector.util.LeaderCacheUtils;
-import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
@@ -49,8 +48,7 @@ public class PipeTransferTabletBatchEventHandler implements 
AsyncMethodCallback<
   private static final Logger LOGGER =
       LoggerFactory.getLogger(PipeTransferTabletBatchEventHandler.class);
 
-  private final List<Long> requestCommitIds;
-  private final List<Event> events;
+  private final List<EnrichedEvent> events;
   private final Map<Pair<String, Long>, Long> pipeName2BytesAccumulated;
 
   private final TPipeTransferReq req;
@@ -59,10 +57,9 @@ public class PipeTransferTabletBatchEventHandler implements 
AsyncMethodCallback<
   private final IoTDBDataRegionAsyncConnector connector;
 
   public PipeTransferTabletBatchEventHandler(
-      final PipeEventBatch batch, final IoTDBDataRegionAsyncConnector 
connector)
+      final PipeTabletEventPlainBatch batch, final 
IoTDBDataRegionAsyncConnector connector)
       throws IOException {
-    // Deep copy to keep Ids' and events' reference
-    requestCommitIds = batch.deepCopyRequestCommitIds();
+    // Deep copy to keep events' reference
     events = batch.deepCopyEvents();
     pipeName2BytesAccumulated = batch.deepCopyPipeName2BytesAccumulated();
 
@@ -111,12 +108,10 @@ public class PipeTransferTabletBatchEventHandler 
implements AsyncMethodCallback<
         connector.updateLeaderCache(redirectPair.getLeft(), 
redirectPair.getRight());
       }
 
-      for (final Event event : events) {
-        if (event instanceof EnrichedEvent) {
-          ((EnrichedEvent) event)
-              
.decreaseReferenceCount(PipeTransferTabletBatchEventHandler.class.getName(), 
true);
-        }
-      }
+      events.forEach(
+          event ->
+              event.decreaseReferenceCount(
+                  PipeTransferTabletBatchEventHandler.class.getName(), true));
     } catch (final Exception e) {
       onError(e);
     }
@@ -127,14 +122,8 @@ public class PipeTransferTabletBatchEventHandler 
implements AsyncMethodCallback<
     try {
       LOGGER.warn(
           "Failed to transfer TabletInsertionEvent batch {} (request commit 
ids={}).",
-          events.stream()
-              .map(
-                  event ->
-                      event instanceof EnrichedEvent
-                          ? ((EnrichedEvent) event).coreReportMessage()
-                          : event.toString())
-              .collect(Collectors.toList()),
-          requestCommitIds,
+          
events.stream().map(EnrichedEvent::coreReportMessage).collect(Collectors.toList()),
+          
events.stream().map(EnrichedEvent::getCommitId).collect(Collectors.toList()),
           exception);
     } finally {
       connector.addFailureEventsToRetryQueue(events);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
similarity index 66%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index 0c15b3560fe..a5f126d7123 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferCompressedReq;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.response.PipeTransferFilePieceResp;
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import 
org.apache.iotdb.db.pipe.connector.client.IoTDBDataNodeAsyncClientManager;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceWithModReq;
@@ -35,8 +36,10 @@ import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.thrift.TException;
 import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,18 +48,29 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
-public class PipeTransferTsFileInsertionEventHandler
-    implements AsyncMethodCallback<TPipeTransferResp> {
+public class PipeTransferTsFileHandler implements 
AsyncMethodCallback<TPipeTransferResp> {
 
-  private static final Logger LOGGER =
-      LoggerFactory.getLogger(PipeTransferTsFileInsertionEventHandler.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTransferTsFileHandler.class);
 
-  private final PipeTsFileInsertionEvent event;
+  // Used to transfer the file
   private final IoTDBDataRegionAsyncConnector connector;
 
+  // Used to rate limit the transfer
+  private final Map<Pair<String, Long>, Double> pipeName2WeightMap;
+
+  // The original events to be transferred, can be multiple events if
+  // the file is batched with other events
+  private final List<EnrichedEvent> events;
+  private final AtomicInteger eventsReferenceCount;
+  private final AtomicBoolean eventsHadBeenAddedToRetryQueue;
+
   private final File tsFile;
   private final File modFile;
   private File currentFile;
@@ -74,15 +88,27 @@ public class PipeTransferTsFileInsertionEventHandler
   private IoTDBDataNodeAsyncClientManager clientManager;
   private AsyncPipeDataTransferServiceClient client;
 
-  public PipeTransferTsFileInsertionEventHandler(
-      final PipeTsFileInsertionEvent event, final 
IoTDBDataRegionAsyncConnector connector)
+  public PipeTransferTsFileHandler(
+      final IoTDBDataRegionAsyncConnector connector,
+      final Map<Pair<String, Long>, Double> pipeName2WeightMap,
+      final List<EnrichedEvent> events,
+      final AtomicInteger eventsReferenceCount,
+      final AtomicBoolean eventsHadBeenAddedToRetryQueue,
+      final File tsFile,
+      final File modFile,
+      final boolean transferMod)
       throws FileNotFoundException {
-    this.event = event;
     this.connector = connector;
 
-    tsFile = event.getTsFile();
-    modFile = event.getModFile();
-    transferMod = event.isWithMod() && 
connector.supportModsIfIsDataNodeReceiver();
+    this.pipeName2WeightMap = pipeName2WeightMap;
+
+    this.events = events;
+    this.eventsReferenceCount = eventsReferenceCount;
+    this.eventsHadBeenAddedToRetryQueue = eventsHadBeenAddedToRetryQueue;
+
+    this.tsFile = tsFile;
+    this.modFile = modFile;
+    this.transferMod = transferMod;
     currentFile = transferMod ? modFile : tsFile;
 
     readFileBufferSize = 
PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
@@ -98,7 +124,7 @@ public class PipeTransferTsFileInsertionEventHandler
   }
 
   public void transfer(
-      IoTDBDataNodeAsyncClientManager clientManager,
+      final IoTDBDataNodeAsyncClientManager clientManager,
       final AsyncPipeDataTransferServiceClient client)
       throws TException, IOException {
     this.clientManager = clientManager;
@@ -134,11 +160,13 @@ public class PipeTransferTsFileInsertionEventHandler
                     uncompressedReq, connector.getCompressors())
                 : uncompressedReq;
 
-        connector.rateLimitIfNeeded(
-            event.getPipeName(),
-            event.getCreationTime(),
-            client.getEndPoint(),
-            req.getBody().length);
+        pipeName2WeightMap.forEach(
+            (pipePair, weight) ->
+                connector.rateLimitIfNeeded(
+                    pipePair.getLeft(),
+                    pipePair.getRight(),
+                    client.getEndPoint(),
+                    (long) (req.getBody().length * weight)));
 
         client.pipeTransfer(req, this);
       }
@@ -161,8 +189,13 @@ public class PipeTransferTsFileInsertionEventHandler
                 uncompressedReq, connector.getCompressors())
             : uncompressedReq;
 
-    connector.rateLimitIfNeeded(
-        event.getPipeName(), event.getCreationTime(), client.getEndPoint(), 
req.getBody().length);
+    pipeName2WeightMap.forEach(
+        (pipePair, weight) ->
+            connector.rateLimitIfNeeded(
+                pipePair.getLeft(),
+                pipePair.getRight(),
+                client.getEndPoint(),
+                (long) (req.getBody().length * weight)));
 
     client.pipeTransfer(req, this);
 
@@ -194,16 +227,35 @@ public class PipeTransferTsFileInsertionEventHandler
         if (reader != null) {
           reader.close();
         }
+
+        // Delete current file when using tsFile as batch
+        if (events.stream().anyMatch(event -> !(event instanceof 
PipeTsFileInsertionEvent))) {
+          FileUtils.delete(currentFile);
+        }
       } catch (final IOException e) {
-        LOGGER.warn("Failed to close file reader when successfully transferred 
file.", e);
+        LOGGER.warn(
+            "Failed to close file reader or delete tsFile when successfully 
transferred file.", e);
       } finally {
-        
event.decreaseReferenceCount(PipeTransferTsFileInsertionEventHandler.class.getName(),
 true);
+        final int referenceCount = eventsReferenceCount.decrementAndGet();
+        if (referenceCount <= 0) {
+          events.forEach(
+              event ->
+                  
event.decreaseReferenceCount(PipeTransferTsFileHandler.class.getName(), true));
+        }
 
-        LOGGER.info(
-            "Successfully transferred file {} (committer key={}, commit 
id={}).",
-            tsFile,
-            event.getCommitterKey(),
-            event.getCommitId());
+        if (events.size() <= 1 || LOGGER.isDebugEnabled()) {
+          LOGGER.info(
+              "Successfully transferred file {} (committer key={}, commit 
id={}, reference count={}).",
+              tsFile,
+              
events.stream().map(EnrichedEvent::getCommitterKey).collect(Collectors.toList()),
+              
events.stream().map(EnrichedEvent::getCommitId).collect(Collectors.toList()),
+              referenceCount);
+        } else {
+          LOGGER.info(
+              "Successfully transferred file {} (batched TableInsertionEvents, 
reference count={}).",
+              tsFile,
+              referenceCount);
+        }
 
         if (client != null) {
           client.setShouldReturnSelf(true);
@@ -246,12 +298,19 @@ public class PipeTransferTsFileInsertionEventHandler
   @Override
   public void onError(final Exception exception) {
     try {
-      LOGGER.warn(
-          "Failed to transfer TsFileInsertionEvent {} (committer key {}, 
commit id {}).",
-          tsFile,
-          event.getCommitterKey(),
-          event.getCommitId(),
-          exception);
+      if (events.size() <= 1 || LOGGER.isDebugEnabled()) {
+        LOGGER.warn(
+            "Failed to transfer TsFileInsertionEvent {} (committer key {}, 
commit id {}).",
+            tsFile,
+            
events.stream().map(EnrichedEvent::getCommitterKey).collect(Collectors.toList()),
+            
events.stream().map(EnrichedEvent::getCommitId).collect(Collectors.toList()),
+            exception);
+      } else {
+        LOGGER.warn(
+            "Failed to transfer TsFileInsertionEvent {} (batched 
TableInsertionEvents)",
+            tsFile,
+            exception);
+      }
     } catch (final Exception e) {
       LOGGER.warn("Failed to log error when failed to transfer file.", e);
     }
@@ -268,8 +327,13 @@ public class PipeTransferTsFileInsertionEventHandler
       if (reader != null) {
         reader.close();
       }
+
+      // Delete current file when using tsFile as batch
+      if (events.stream().anyMatch(event -> !(event instanceof 
PipeTsFileInsertionEvent))) {
+        FileUtils.delete(currentFile);
+      }
     } catch (final IOException e) {
-      LOGGER.warn("Failed to close file reader when failed to transfer file.", 
e);
+      LOGGER.warn("Failed to close file reader or delete tsFile when failed to 
transfer file.", e);
     } finally {
       try {
         if (client != null) {
@@ -277,7 +341,9 @@ public class PipeTransferTsFileInsertionEventHandler
           client.returnSelf();
         }
       } finally {
-        connector.addFailureEventToRetryQueue(event);
+        if (eventsHadBeenAddedToRetryQueue.compareAndSet(false, true)) {
+          connector.addFailureEventsToRetryQueue(events);
+        }
       }
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
index 0509c53beed..3b664ac50c1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
@@ -24,8 +24,10 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClient;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFilePieceReq;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
-import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.PipeEventBatch;
-import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.PipeTransferBatchReqBuilder;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventBatch;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventPlainBatch;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventTsFileBatch;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTransferBatchReqBuilder;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
@@ -51,12 +53,17 @@ import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 
+import org.apache.commons.io.FileUtils;
+import org.apache.tsfile.exception.write.WriteProcessException;
 import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.NoSuchFileException;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
@@ -105,10 +112,10 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
 
     try {
       if (isTabletBatchModeEnabled) {
-        final Pair<TEndPoint, PipeEventBatch> endPointAndBatch =
+        final Pair<TEndPoint, PipeTabletEventBatch> endPointAndBatch =
             tabletBatchBuilder.onEvent(tabletInsertionEvent);
         if (Objects.nonNull(endPointAndBatch)) {
-          doTransfer(endPointAndBatch);
+          doTransferWrapper(endPointAndBatch);
         }
       } else {
         if (tabletInsertionEvent instanceof 
PipeInsertNodeTabletInsertionEvent) {
@@ -139,7 +146,7 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
     try {
       // In order to commit in order
       if (isTabletBatchModeEnabled && !tabletBatchBuilder.isEmpty()) {
-        doTransfer();
+        doTransferWrapper();
       }
 
       doTransferWrapper((PipeTsFileInsertionEvent) tsFileInsertionEvent);
@@ -162,7 +169,7 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
 
     // in order to commit in order
     if (isTabletBatchModeEnabled && !tabletBatchBuilder.isEmpty()) {
-      doTransfer();
+      doTransferWrapper();
     }
 
     if (!(event instanceof PipeHeartbeatEvent)) {
@@ -171,10 +178,30 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
     }
   }
 
-  private void doTransfer(final Pair<TEndPoint, PipeEventBatch> 
endPointAndBatch) {
-    final Pair<IoTDBSyncClient, Boolean> clientAndStatus =
-        clientManager.getClient(endPointAndBatch.getLeft());
-    final PipeEventBatch batchToTransfer = endPointAndBatch.getRight();
+  private void doTransferWrapper() throws IOException, WriteProcessException {
+    for (final Pair<TEndPoint, PipeTabletEventBatch> nonEmptyBatch :
+        tabletBatchBuilder.getAllNonEmptyBatches()) {
+      doTransferWrapper(nonEmptyBatch);
+    }
+  }
+
+  private void doTransferWrapper(final Pair<TEndPoint, PipeTabletEventBatch> 
endPointAndBatch)
+      throws IOException, WriteProcessException {
+    final PipeTabletEventBatch batch = endPointAndBatch.getRight();
+    if (batch instanceof PipeTabletEventPlainBatch) {
+      doTransfer(endPointAndBatch.getLeft(), (PipeTabletEventPlainBatch) 
batch);
+    } else if (batch instanceof PipeTabletEventTsFileBatch) {
+      doTransfer((PipeTabletEventTsFileBatch) batch);
+    } else {
+      LOGGER.warn("Unsupported batch type {}.", batch.getClass());
+    }
+    
batch.decreaseEventsReferenceCount(IoTDBDataRegionSyncConnector.class.getName(),
 true);
+    batch.onSuccess();
+  }
+
+  private void doTransfer(
+      final TEndPoint endPoint, final PipeTabletEventPlainBatch 
batchToTransfer) {
+    final Pair<IoTDBSyncClient, Boolean> clientAndStatus = 
clientManager.getClient(endPoint);
 
     final TPipeTransferResp resp;
     try {
@@ -217,14 +244,24 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
         LeaderCacheUtils.parseRecommendedRedirections(status)) {
       clientManager.updateLeaderCache(redirectPair.getLeft(), 
redirectPair.getRight());
     }
-
-    batchToTransfer.decreaseEventsReferenceCount(
-        IoTDBDataRegionSyncConnector.class.getName(), true);
-    batchToTransfer.onSuccess();
   }
 
-  private void doTransfer() {
-    tabletBatchBuilder.getAllNonEmptyBatches().forEach(this::doTransfer);
+  private void doTransfer(final PipeTabletEventTsFileBatch batchToTransfer)
+      throws IOException, WriteProcessException {
+    final List<File> sealedFiles = batchToTransfer.sealTsFiles();
+    final Map<Pair<String, Long>, Double> pipe2WeightMap = 
batchToTransfer.deepCopyPipe2WeightMap();
+
+    for (final File tsFile : sealedFiles) {
+      doTransfer(pipe2WeightMap, tsFile, null);
+      try {
+        FileUtils.delete(tsFile);
+      } catch (final NoSuchFileException e) {
+        LOGGER.info("The file {} is not found, may already be deleted.", 
tsFile);
+      } catch (final Exception e) {
+        LOGGER.warn(
+            "Failed to delete batch file {}, this file should be deleted 
manually later", tsFile);
+      }
+    }
   }
 
   private void doTransferWrapper(
@@ -363,37 +400,49 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
           IoTDBDataRegionSyncConnector.class.getName())) {
         return;
       }
-      doTransfer(pipeTsFileInsertionEvent);
+      doTransfer(
+          Collections.singletonMap(
+              new Pair<>(
+                  pipeTsFileInsertionEvent.getPipeName(),
+                  pipeTsFileInsertionEvent.getCreationTime()),
+              1.0),
+          pipeTsFileInsertionEvent.getTsFile(),
+          pipeTsFileInsertionEvent.isWithMod() ? 
pipeTsFileInsertionEvent.getModFile() : null);
     } finally {
       pipeTsFileInsertionEvent.decreaseReferenceCount(
           IoTDBDataRegionSyncConnector.class.getName(), false);
     }
   }
 
-  private void doTransfer(final PipeTsFileInsertionEvent 
pipeTsFileInsertionEvent)
+  private void doTransfer(
+      final Map<Pair<String, Long>, Double> pipeName2WeightMap,
+      final File tsFile,
+      final File modFile)
       throws PipeException, IOException {
-    final String pipeName = pipeTsFileInsertionEvent.getPipeName();
-    final long creationTime = pipeTsFileInsertionEvent.getCreationTime();
-    final File tsFile = pipeTsFileInsertionEvent.getTsFile();
-    final File modFile = pipeTsFileInsertionEvent.getModFile();
+
     final Pair<IoTDBSyncClient, Boolean> clientAndStatus = 
clientManager.getClient();
     final TPipeTransferResp resp;
 
     // 1. Transfer tsFile, and mod file if exists and receiver's version >= 2
-    if (pipeTsFileInsertionEvent.isWithMod() && 
clientManager.supportModsIfIsDataNodeReceiver()) {
-      transferFilePieces(pipeName, creationTime, modFile, clientAndStatus, 
true);
-      transferFilePieces(pipeName, creationTime, tsFile, clientAndStatus, 
true);
+    if (Objects.nonNull(modFile) && 
clientManager.supportModsIfIsDataNodeReceiver()) {
+      transferFilePieces(pipeName2WeightMap, modFile, clientAndStatus, true);
+      transferFilePieces(pipeName2WeightMap, tsFile, clientAndStatus, true);
+
       // 2. Transfer file seal signal with mod, which means the file is 
transferred completely
       try {
         final TPipeTransferReq req =
             compressIfNeeded(
                 PipeTransferTsFileSealWithModReq.toTPipeTransferReq(
                     modFile.getName(), modFile.length(), tsFile.getName(), 
tsFile.length()));
-        rateLimitIfNeeded(
-            pipeTsFileInsertionEvent.getPipeName(),
-            pipeTsFileInsertionEvent.getCreationTime(),
-            clientAndStatus.getLeft().getEndPoint(),
-            req.getBody().length);
+
+        pipeName2WeightMap.forEach(
+            (pipePair, weight) ->
+                rateLimitIfNeeded(
+                    pipePair.getLeft(),
+                    pipePair.getRight(),
+                    clientAndStatus.getLeft().getEndPoint(),
+                    (long) (req.getBody().length * weight)));
+
         resp = clientAndStatus.getLeft().pipeTransfer(req);
       } catch (final Exception e) {
         clientAndStatus.setRight(false);
@@ -403,17 +452,22 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
             e);
       }
     } else {
-      transferFilePieces(pipeName, creationTime, tsFile, clientAndStatus, 
false);
+      transferFilePieces(pipeName2WeightMap, tsFile, clientAndStatus, false);
+
       // 2. Transfer file seal signal without mod, which means the file is 
transferred completely
       try {
         final TPipeTransferReq req =
             compressIfNeeded(
                 PipeTransferTsFileSealReq.toTPipeTransferReq(tsFile.getName(), 
tsFile.length()));
-        rateLimitIfNeeded(
-            pipeTsFileInsertionEvent.getPipeName(),
-            pipeTsFileInsertionEvent.getCreationTime(),
-            clientAndStatus.getLeft().getEndPoint(),
-            req.getBody().length);
+
+        pipeName2WeightMap.forEach(
+            (pipePair, weight) ->
+                rateLimitIfNeeded(
+                    pipePair.getLeft(),
+                    pipePair.getRight(),
+                    clientAndStatus.getLeft().getEndPoint(),
+                    (long) (req.getBody().length * weight)));
+
         resp = clientAndStatus.getLeft().pipeTransfer(req);
       } catch (final Exception e) {
         clientAndStatus.setRight(false);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java
index 93ec5e4ea43..878430e027b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java
@@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.Objects;
 
 public class IoTDBSchemaRegionConnector extends IoTDBDataNodeSyncConnector {
@@ -96,9 +97,17 @@ public class IoTDBSchemaRegionConnector extends 
IoTDBDataNodeSyncConnector {
     final TPipeTransferResp resp;
 
     // 1. Transfer mTreeSnapshotFile, and tLog file if exists
-    transferFilePieces(pipeName, creationTime, mTreeSnapshotFile, 
clientAndStatus, true);
+    transferFilePieces(
+        Collections.singletonMap(new Pair<>(pipeName, creationTime), 1.0),
+        mTreeSnapshotFile,
+        clientAndStatus,
+        true);
     if (Objects.nonNull(tagLogSnapshotFile)) {
-      transferFilePieces(pipeName, creationTime, tagLogSnapshotFile, 
clientAndStatus, true);
+      transferFilePieces(
+          Collections.singletonMap(new Pair<>(pipeName, creationTime), 1.0),
+          tagLogSnapshotFile,
+          clientAndStatus,
+          true);
     }
     // 2. Transfer file seal signal, which means the snapshots are transferred 
completely
     try {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index abb0fcb572c..faba8482c4d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -303,8 +303,8 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
 
   /////////////////////////// convertToTablet ///////////////////////////
 
-  public boolean isAligned() {
-    return isAligned;
+  public boolean isAligned(final int i) {
+    return initDataContainers().get(i).isAligned();
   }
 
   public List<Tablet> convertToTablets() {
@@ -345,7 +345,7 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
       }
 
       return dataContainers;
-    } catch (Exception e) {
+    } catch (final Exception e) {
       throw new PipeException("Initialize data container error.", e);
     }
   }
@@ -362,11 +362,17 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
 
   public List<PipeRawTabletInsertionEvent> toRawTabletInsertionEvents() {
     final List<PipeRawTabletInsertionEvent> events =
-        convertToTablets().stream()
+        initDataContainers().stream()
             .map(
-                tablet ->
+                container ->
                     new PipeRawTabletInsertionEvent(
-                        tablet, isAligned, pipeName, creationTime, 
pipeTaskMeta, this, false))
+                        container.convertToTablet(),
+                        container.isAligned(),
+                        pipeName,
+                        creationTime,
+                        pipeTaskMeta,
+                        this,
+                        false))
             .filter(event -> !event.hasNoNeedParsingAndIsEmpty())
             .collect(Collectors.toList());
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
index 8030cce9324..6366cf8bb95 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.commons.utils.TestOnly;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
-import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeighUtil;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
 import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -124,18 +124,19 @@ public class TsFileInsertionDataContainer implements 
AutoCloseable {
       } else {
         // We need to create these objects here and remove them later.
         deviceIsAlignedMap = readDeviceIsAlignedMap();
-        memoryRequiredInBytes += 
PipeMemoryWeighUtil.memoryOfIDeviceId2Bool(deviceIsAlignedMap);
+        memoryRequiredInBytes += 
PipeMemoryWeightUtil.memoryOfIDeviceId2Bool(deviceIsAlignedMap);
 
         // Filter devices that may overlap with pattern first
         // to avoid reading all time-series of all devices.
         final Set<IDeviceID> devices = 
filterDevicesByPattern(deviceIsAlignedMap.keySet());
 
         measurementDataTypeMap = readFilteredFullPathDataTypeMap(devices);
-        memoryRequiredInBytes += 
PipeMemoryWeighUtil.memoryOfStr2TSDataType(measurementDataTypeMap);
+        memoryRequiredInBytes +=
+            
PipeMemoryWeightUtil.memoryOfStr2TSDataType(measurementDataTypeMap);
 
         deviceMeasurementsMap = readFilteredDeviceMeasurementsMap(devices);
         memoryRequiredInBytes +=
-            
PipeMemoryWeighUtil.memoryOfIDeviceID2StrList(deviceMeasurementsMap);
+            
PipeMemoryWeightUtil.memoryOfIDeviceID2StrList(deviceMeasurementsMap);
       }
       allocatedMemoryBlock = 
PipeResourceManager.memory().forceAllocate(memoryRequiredInBytes);
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
index 68e377ecfd3..8c7bc2c8ed2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
@@ -24,10 +24,7 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 
-import org.apache.tsfile.enums.TSDataType;
-import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.write.record.Tablet;
-import org.apache.tsfile.write.schema.MeasurementSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -110,7 +107,8 @@ public class PipeMemoryManager {
 
     synchronized (this) {
       final PipeTabletMemoryBlock block =
-          (PipeTabletMemoryBlock) 
forceAllocate(calculateTabletSizeInBytes(tablet), true);
+          (PipeTabletMemoryBlock)
+              
forceAllocate(PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet), true);
       usedMemorySizeInBytesOfTablets += block.getMemoryUsageInBytes();
       return block;
     }
@@ -183,63 +181,6 @@ public class PipeMemoryManager {
     return null;
   }
 
-  public static long calculateTabletSizeInBytes(Tablet tablet) {
-    long totalSizeInBytes = 0;
-
-    if (tablet == null) {
-      return totalSizeInBytes;
-    }
-
-    // timestamps
-    if (tablet.timestamps != null) {
-      totalSizeInBytes += tablet.timestamps.length * 8L;
-    }
-
-    // values
-    final List<MeasurementSchema> timeseries = tablet.getSchemas();
-    if (timeseries != null) {
-      for (int column = 0; column < timeseries.size(); column++) {
-        final MeasurementSchema measurementSchema = timeseries.get(column);
-        if (measurementSchema == null) {
-          continue;
-        }
-
-        final TSDataType tsDataType = measurementSchema.getType();
-        if (tsDataType == null) {
-          continue;
-        }
-
-        if (tsDataType.isBinary()) {
-          if (tablet.values == null || tablet.values.length <= column) {
-            continue;
-          }
-          final Binary[] values = ((Binary[]) tablet.values[column]);
-          if (values == null) {
-            continue;
-          }
-          for (Binary value : values) {
-            totalSizeInBytes +=
-                value == null ? 0 : (value.getLength() == -1 ? 0 : 
value.getLength());
-          }
-        } else {
-          totalSizeInBytes += (long) tablet.timestamps.length * 
tsDataType.getDataTypeSize();
-        }
-      }
-    }
-
-    // bitMaps
-    if (tablet.bitMaps != null) {
-      for (int i = 0; i < tablet.bitMaps.length; i++) {
-        totalSizeInBytes += tablet.bitMaps[i] == null ? 0 : 
tablet.bitMaps[i].getSize();
-      }
-    }
-
-    // estimate other dataStructures size
-    totalSizeInBytes += 100;
-
-    return totalSizeInBytes;
-  }
-
   public synchronized PipeMemoryBlock tryAllocate(long sizeInBytes) {
     return tryAllocate(sizeInBytes, currentSize -> currentSize * 2 / 3);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeighUtil.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
similarity index 56%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeighUtil.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
index d93f946a087..f93d8f33fbf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeighUtil.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
@@ -23,11 +23,15 @@ import org.apache.iotdb.db.utils.MemUtils;
 
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.MeasurementSchema;
 
 import java.util.List;
 import java.util.Map;
 
-public class PipeMemoryWeighUtil {
+public class PipeMemoryWeightUtil {
+
   /** Estimates memory usage of a {@link Map}<{@link IDeviceID}, {@link 
Boolean}>. */
   public static long memoryOfIDeviceId2Bool(Map<IDeviceID, Boolean> map) {
     long usageInBytes = 0L;
@@ -57,4 +61,61 @@ public class PipeMemoryWeighUtil {
     }
     return usageInBytes + 16L; // add the overhead of map
   }
+
+  public static long calculateTabletSizeInBytes(Tablet tablet) {
+    long totalSizeInBytes = 0;
+
+    if (tablet == null) {
+      return totalSizeInBytes;
+    }
+
+    // timestamps
+    if (tablet.timestamps != null) {
+      totalSizeInBytes += tablet.timestamps.length * 8L;
+    }
+
+    // values
+    final List<MeasurementSchema> timeseries = tablet.getSchemas();
+    if (timeseries != null) {
+      for (int column = 0; column < timeseries.size(); column++) {
+        final MeasurementSchema measurementSchema = timeseries.get(column);
+        if (measurementSchema == null) {
+          continue;
+        }
+
+        final TSDataType tsDataType = measurementSchema.getType();
+        if (tsDataType == null) {
+          continue;
+        }
+
+        if (tsDataType.isBinary()) {
+          if (tablet.values == null || tablet.values.length <= column) {
+            continue;
+          }
+          final Binary[] values = ((Binary[]) tablet.values[column]);
+          if (values == null) {
+            continue;
+          }
+          for (Binary value : values) {
+            totalSizeInBytes +=
+                value == null ? 0 : (value.getLength() == -1 ? 0 : 
value.getLength());
+          }
+        } else {
+          totalSizeInBytes += (long) tablet.timestamps.length * 
tsDataType.getDataTypeSize();
+        }
+      }
+    }
+
+    // bitMaps
+    if (tablet.bitMaps != null) {
+      for (int i = 0; i < tablet.bitMaps.length; i++) {
+        totalSizeInBytes += tablet.bitMaps[i] == null ? 0 : 
tablet.bitMaps[i].getSize();
+      }
+    }
+
+    // estimate other dataStructures size
+    totalSizeInBytes += 100;
+
+    return totalSizeInBytes;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
index b49ca5de4e2..090af91fb3c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.pipe.resource.tsfile;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
-import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeighUtil;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
 
@@ -205,7 +205,8 @@ public class PipeTsFileResource implements AutoCloseable {
     try (TsFileSequenceReader sequenceReader =
         new TsFileSequenceReader(hardlinkOrCopiedFile.getPath(), true, true)) {
       deviceMeasurementsMap = sequenceReader.getDeviceMeasurementsMap();
-      memoryRequiredInBytes += 
PipeMemoryWeighUtil.memoryOfIDeviceID2StrList(deviceMeasurementsMap);
+      memoryRequiredInBytes +=
+          
PipeMemoryWeightUtil.memoryOfIDeviceID2StrList(deviceMeasurementsMap);
 
       deviceIsAlignedMap = new HashMap<>();
       final TsFileDeviceIterator deviceIsAlignedIterator =
@@ -214,10 +215,10 @@ public class PipeTsFileResource implements AutoCloseable {
         final Pair<IDeviceID, Boolean> deviceIsAlignedPair = 
deviceIsAlignedIterator.next();
         deviceIsAlignedMap.put(deviceIsAlignedPair.getLeft(), 
deviceIsAlignedPair.getRight());
       }
-      memoryRequiredInBytes += 
PipeMemoryWeighUtil.memoryOfIDeviceId2Bool(deviceIsAlignedMap);
+      memoryRequiredInBytes += 
PipeMemoryWeightUtil.memoryOfIDeviceId2Bool(deviceIsAlignedMap);
 
       measurementDataTypeMap = sequenceReader.getFullPathDataTypeMap();
-      memoryRequiredInBytes += 
PipeMemoryWeighUtil.memoryOfStr2TSDataType(measurementDataTypeMap);
+      memoryRequiredInBytes += 
PipeMemoryWeightUtil.memoryOfStr2TSDataType(measurementDataTypeMap);
     }
     // Release memory of TsFileSequenceReader.
     allocatedMemoryBlock.close();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index cae63102111..2994f41a3f9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -62,7 +62,7 @@ public class PipeConnectorSubtask extends 
PipeAbstractConnectorSubtask {
   // to trigger the general event transfer function, causing potentially such 
as
   // the random delay of the batch transmission. Therefore, here we inject 
cron events
   // when no event can be pulled.
-  private static final PipeHeartbeatEvent CRON_HEARTBEAT_EVENT =
+  public static final PipeHeartbeatEvent CRON_HEARTBEAT_EVENT =
       new PipeHeartbeatEvent("cron", false);
   private static final long CRON_HEARTBEAT_EVENT_INJECT_INTERVAL_MILLISECONDS =
       
PipeConfig.getInstance().getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds()
 * 1000;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletsQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletsQueue.java
index 935494537b3..67b78e6a046 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletsQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletsQueue.java
@@ -27,7 +27,7 @@ import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertio
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
-import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
 import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
 import org.apache.iotdb.db.subscription.event.SubscriptionEventBinaryCache;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -158,7 +158,7 @@ public class SubscriptionPrefetchingTabletsQueue extends 
SubscriptionPrefetching
         tablets.addAll(currentTablets);
         calculatedTabletsSizeInBytes +=
             currentTablets.stream()
-                .map((PipeMemoryManager::calculateTabletSizeInBytes))
+                .map((PipeMemoryWeightUtil::calculateTabletSizeInBytes))
                 .reduce(Long::sum)
                 .orElse(0L);
         enrichedEvents.add((EnrichedEvent) event);
@@ -172,7 +172,7 @@ public class SubscriptionPrefetchingTabletsQueue extends 
SubscriptionPrefetching
           tablets.addAll(currentTablets);
           calculatedTabletsSizeInBytes +=
               currentTablets.stream()
-                  .map((PipeMemoryManager::calculateTabletSizeInBytes))
+                  .map((PipeMemoryWeightUtil::calculateTabletSizeInBytes))
                   .reduce(Long::sum)
                   .orElse(0L);
         }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
index dfcf35c03ca..61e773da3e4 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
@@ -65,11 +65,13 @@ public class PipeConnectorConstant {
 
   public static final String CONNECTOR_IOTDB_BATCH_DELAY_KEY = 
"connector.batch.max-delay-seconds";
   public static final String SINK_IOTDB_BATCH_DELAY_KEY = 
"sink.batch.max-delay-seconds";
-  public static final int CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE = 1;
+  public static final int CONNECTOR_IOTDB_PLAIN_BATCH_DELAY_DEFAULT_VALUE = 1;
+  public static final int CONNECTOR_IOTDB_TS_FILE_BATCH_DELAY_DEFAULT_VALUE = 
5;
 
   public static final String CONNECTOR_IOTDB_BATCH_SIZE_KEY = 
"connector.batch.size-bytes";
   public static final String SINK_IOTDB_BATCH_SIZE_KEY = 
"sink.batch.size-bytes";
-  public static final long CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE = 16 * MB;
+  public static final long CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE = 16 
* MB;
+  public static final long CONNECTOR_IOTDB_TS_FILE_BATCH_SIZE_DEFAULT_VALUE = 
80 * MB;
 
   public static final String CONNECTOR_IOTDB_USER_KEY = "connector.user";
   public static final String SINK_IOTDB_USER_KEY = "sink.user";
@@ -191,6 +193,12 @@ public class PipeConnectorConstant {
   public static final String SINK_RATE_LIMIT_KEY = 
"sink.rate-limit-bytes-per-second";
   public static final double CONNECTOR_RATE_LIMIT_DEFAULT_VALUE = -1;
 
+  public static final String CONNECTOR_FORMAT_KEY = "connector.format";
+  public static final String SINK_FORMAT_KEY = "sink.format";
+  public static final String CONNECTOR_FORMAT_TABLET_VALUE = "tablet";
+  public static final String CONNECTOR_FORMAT_TS_FILE_VALUE = "tsfile";
+  public static final String CONNECTOR_FORMAT_HYBRID_VALUE = "hybrid";
+
   public static final String SINK_TOPIC_KEY = "sink.topic";
   public static final String SINK_CONSUMER_GROUP_KEY = "sink.consumer-group";
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
index aa398d6e9ff..6ac6f9432df 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
@@ -66,11 +66,17 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_OTHERS_RETRY_MAX_TIME_SECONDS_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_OTHERS_RETRY_MAX_TIME_SECONDS_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_HYBRID_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_TABLET_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_TS_FILE_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_HOST_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_NODE_URLS_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_STRATEGY_KEY;
@@ -84,7 +90,9 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_CONFLICT_RETRY_MAX_TIME_SECONDS_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_OTHERS_RETRY_MAX_TIME_SECONDS_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_FORMAT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_MODE_ENABLE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_SIZE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_HOST_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_IP_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_NODE_URLS_KEY;
@@ -144,6 +152,18 @@ public abstract class IoTDBConnector implements 
PipeConnector {
         parameters.hasAttribute(SINK_IOTDB_HOST_KEY),
         parameters.hasAttribute(SINK_IOTDB_PORT_KEY));
 
+    validator.validate(
+        requestMaxBatchSizeInBytes -> (long) requestMaxBatchSizeInBytes > 0,
+        String.format(
+            "%s must be > 0, but got %s",
+            SINK_IOTDB_BATCH_SIZE_KEY,
+            parameters.getLongOrDefault(
+                Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY, 
SINK_IOTDB_BATCH_SIZE_KEY),
+                CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE)),
+        parameters.getLongOrDefault(
+            Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY, 
SINK_IOTDB_BATCH_SIZE_KEY),
+            CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE));
+
     loadBalanceStrategy =
         parameters
             .getStringOrDefault(
@@ -230,6 +250,15 @@ public abstract class IoTDBConnector implements 
PipeConnector {
                 CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_DEFAULT_VALUE)
             .trim()
             .toLowerCase());
+
+    validator.validateAttributeValueRange(
+        validator.getParameters().hasAttribute(CONNECTOR_FORMAT_KEY)
+            ? CONNECTOR_FORMAT_KEY
+            : SINK_FORMAT_KEY,
+        true,
+        CONNECTOR_FORMAT_TABLET_VALUE,
+        CONNECTOR_FORMAT_HYBRID_VALUE,
+        CONNECTOR_FORMAT_TS_FILE_VALUE);
   }
 
   @Override
@@ -242,8 +271,15 @@ public abstract class IoTDBConnector implements 
PipeConnector {
 
     isTabletBatchModeEnabled =
         parameters.getBooleanOrDefault(
-            Arrays.asList(CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY, 
SINK_IOTDB_BATCH_MODE_ENABLE_KEY),
-            CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE);
+                Arrays.asList(
+                    CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY, 
SINK_IOTDB_BATCH_MODE_ENABLE_KEY),
+                CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE)
+            || parameters
+                .getStringOrDefault(
+                    Arrays.asList(CONNECTOR_FORMAT_KEY, SINK_FORMAT_KEY),
+                    CONNECTOR_FORMAT_HYBRID_VALUE)
+                .equals(CONNECTOR_FORMAT_TS_FILE_VALUE);
+
     LOGGER.info("IoTDBConnector isTabletBatchModeEnabled: {}", 
isTabletBatchModeEnabled);
 
     receiverStatusHandler =
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
index a51edf46cd4..be93018bf9c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
@@ -44,6 +44,7 @@ import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE;
@@ -144,8 +145,7 @@ public abstract class IoTDBSslSyncConnector extends 
IoTDBConnector {
   }
 
   protected void transferFilePieces(
-      final String pipeName,
-      final long creationTime,
+      final Map<Pair<String, Long>, Double> pipe2WeightMap,
       final File file,
       final Pair<IoTDBSyncClient, Boolean> clientAndStatus,
       final boolean isMultiFile)
@@ -171,11 +171,13 @@ public abstract class IoTDBSslSyncConnector extends 
IoTDBConnector {
                   isMultiFile
                       ? getTransferMultiFilePieceReq(file.getName(), position, 
payLoad)
                       : getTransferSingleFilePieceReq(file.getName(), 
position, payLoad));
-          rateLimitIfNeeded(
-              pipeName,
-              creationTime,
-              clientAndStatus.getLeft().getEndPoint(),
-              req.getBody().length);
+          pipe2WeightMap.forEach(
+              (namePair, weight) ->
+                  rateLimitIfNeeded(
+                      namePair.getLeft(),
+                      namePair.getRight(),
+                      clientAndStatus.getLeft().getEndPoint(),
+                      (long) (req.getBody().length * weight)));
           resp =
               PipeTransferFilePieceResp.fromTPipeTransferResp(
                   clientAndStatus.getLeft().pipeTransfer(req));
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
index 6d68b607294..6d44d26c332 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
@@ -161,7 +161,7 @@ public abstract class EnrichedEvent implements Event {
         isReleased.set(true);
       }
       if (newReferenceCount < 0) {
-        LOGGER.warn(
+        LOGGER.debug(
             "reference count is decreased to {}, event: {}, stack trace: {}",
             newReferenceCount,
             coreReportMessage(),
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeAbstractConnectorSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeAbstractConnectorSubtask.java
index 78e913d6575..a95badf6a94 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeAbstractConnectorSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeAbstractConnectorSubtask.java
@@ -94,10 +94,15 @@ public abstract class PipeAbstractConnectorSubtask extends 
PipeReportableSubtask
     // Notice that the PipeRuntimeConnectorCriticalException must be thrown 
here
     // because the upper layer relies on this to stop all the related pipe 
tasks
     // Other exceptions may cause the subtask to stop forever and can not be 
restarted
-    super.onFailure(
-        throwable instanceof PipeRuntimeConnectorCriticalException
-            ? throwable
-            : new 
PipeRuntimeConnectorCriticalException(throwable.getMessage()));
+    if (throwable instanceof PipeRuntimeConnectorCriticalException) {
+      super.onFailure(throwable);
+    } else {
+      // Print stack trace for better debugging
+      LOGGER.warn(
+          "A non PipeRuntimeConnectorCriticalException occurred, will throw a 
PipeRuntimeConnectorCriticalException.",
+          throwable);
+      super.onFailure(new 
PipeRuntimeConnectorCriticalException(throwable.getMessage()));
+    }
   }
 
   /**

Reply via email to