This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 181b3177774 [IOTDB-5879] Pipe: logical view of pipe event (#9855)
181b3177774 is described below

commit 181b317777484eba467906678d4870cd5e56bffd
Author: Itami Sho <[email protected]>
AuthorDate: Thu Jun 1 11:35:07 2023 +0800

    [IOTDB-5879] Pipe: logical view of pipe event (#9855)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
    Co-authored-by: yschengzi <[email protected]>
---
 .../java/org/apache/iotdb/pipe/api/access/Row.java |  38 +-
 .../event/dml/insertion/TabletInsertionEvent.java  |   9 -
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   4 +-
 .../apache/iotdb/commons/conf/CommonConfig.java    |   9 +
 .../iotdb/commons/conf/CommonDescriptor.java       |  14 +-
 .../iotdb/commons/pipe/config/PipeConfig.java      |   4 +
 .../builtin/processor/DoNothingProcessor.java      |  13 +-
 .../commons/pipe/utils/PipeBinaryTransformer.java  |  19 +-
 .../pipe/utils/PipeDataTypeTransformer.java        |  94 +++++
 .../PipeHistoricalDataRegionTsFileCollector.java   |  15 +-
 .../realtime/assigner/PipeDataRegionAssigner.java  |   2 +-
 .../impl/iotdb/v1/IoTDBThriftConnectorV1.java      |  39 +-
 .../impl/iotdb/v1/IoTDBThriftReceiverV1.java       |  59 ++-
 .../connector/impl/iotdb/v1/PipeRequestType.java   |   5 +-
 .../iotdb/v1/request/PipeTransferTabletReq.java    | 222 +++++++++++
 .../iotdb/db/pipe/core/event/EnrichedEvent.java    |  18 +-
 ...vent.java => PipeInsertNodeInsertionEvent.java} |  64 ++-
 .../core/event/impl/PipeTabletInsertionEvent.java  | 111 ++----
 .../core/event/impl/PipeTsFileInsertionEvent.java  |  30 +-
 .../event/realtime/PipeRealtimeCollectEvent.java   |  19 +-
 .../realtime/PipeRealtimeCollectEventFactory.java  |   6 +-
 .../core/event/realtime/TsFileEpochManager.java    |  12 +-
 .../db/pipe/core/event/view/access/PipeRow.java    |  96 +++--
 .../core/event/view/access/PipeRowIterator.java    |  60 ---
 .../event/view/collector/PipeRowCollector.java     |  45 ++-
 .../TabletInsertionDataContainer.java              | 430 +++++++++++++++++++++
 .../TsFileInsertionDataContainer.java              | 136 +++++++
 .../TsFileInsertionDataTabletIterator.java         | 287 ++++++++++++++
 .../core/processor/PipeDoNothingProcessor.java     |  44 ++-
 .../db/pipe/task/stage/PipeTaskProcessorStage.java |  12 +-
 .../collector/CachedSchemaPatternMatcherTest.java  |   4 +-
 .../apache/iotdb/tsfile/write/record/Tablet.java   | 108 +++---
 .../iotdb/tsfile/write/record/TabletTest.java      |  14 +-
 33 files changed, 1690 insertions(+), 352 deletions(-)

diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/access/Row.java 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/access/Row.java
index 54782637484..c27ed608e2e 100644
--- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/access/Row.java
+++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/access/Row.java
@@ -34,7 +34,7 @@ public interface Row {
    *
    * @return timestamp
    */
-  long getTime() throws IOException;
+  long getTime();
 
   /**
    * Returns the int value at the specified column in this row.
@@ -44,7 +44,7 @@ public interface Row {
    * @param columnIndex index of the specified column
    * @return the int value at the specified column in this row
    */
-  int getInt(int columnIndex) throws IOException;
+  int getInt(int columnIndex);
 
   /**
    * Returns the long value at the specified column in this row.
@@ -64,7 +64,7 @@ public interface Row {
    * @param columnIndex index of the specified column
    * @return the float value at the specified column in this row
    */
-  float getFloat(int columnIndex) throws IOException;
+  float getFloat(int columnIndex);
 
   /**
    * Returns the double value at the specified column in this row.
@@ -74,7 +74,7 @@ public interface Row {
    * @param columnIndex index of the specified column
    * @return the double value at the specified column in this row
    */
-  double getDouble(int columnIndex) throws IOException;
+  double getDouble(int columnIndex);
 
   /**
    * Returns the boolean value at the specified column in this row.
@@ -84,7 +84,7 @@ public interface Row {
    * @param columnIndex index of the specified column
    * @return the boolean value at the specified column in this row
    */
-  boolean getBoolean(int columnIndex) throws IOException;
+  boolean getBoolean(int columnIndex);
 
   /**
    * Returns the Binary value at the specified column in this row.
@@ -94,7 +94,7 @@ public interface Row {
    * @param columnIndex index of the specified column
    * @return the Binary value at the specified column in this row
    */
-  Binary getBinary(int columnIndex) throws IOException;
+  Binary getBinary(int columnIndex);
 
   /**
    * Returns the String value at the specified column in this row.
@@ -104,7 +104,15 @@ public interface Row {
    * @param columnIndex index of the specified column
    * @return the String value at the specified column in this row
    */
-  String getString(int columnIndex) throws IOException;
+  String getString(int columnIndex);
+
+  /**
+   * Returns the Object value at the specified column in this row.
+   *
+   * @param columnIndex index of the specified column
+   * @return the Object value at the specified column in this row
+   */
+  Object getObject(int columnIndex);
 
   /**
    * Returns the actual data type of the value at the specified column in this 
row.
@@ -123,9 +131,9 @@ public interface Row {
   boolean isNull(int columnIndex);
 
   /**
-   * Returns the number of columns
+   * Returns the number of columns (excluding the timestamp column)
    *
-   * @return the number of columns
+   * @return the number of columns (excluding the timestamp column)
    */
   int size();
 
@@ -139,16 +147,16 @@ public interface Row {
   int getColumnIndex(Path columnName) throws PipeParameterNotValidException;
 
   /**
-   * Returns the column names in the Row
+   * Returns the column data types in the Row
    *
-   * @return the column names in the Row
+   * @return the column data types in the Row
    */
-  List<Path> getColumnNames();
+  List<Type> getColumnTypes();
 
   /**
-   * Returns the column data types in the Row
+   * Returns the device id of the Row
    *
-   * @return the column data types in the Row
+   * @return the device id of the Row
    */
-  List<Type> getColumnTypes();
+  String getDeviceId();
 }
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
index 9fd6d89428c..4a8073a5a26 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.pipe.api.collector.RowCollector;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 
-import java.util.Iterator;
 import java.util.function.BiConsumer;
 
 /** TabletInsertionEvent is used to define the event of data insertion. */
@@ -38,14 +37,6 @@ public interface TabletInsertionEvent extends Event {
    */
   TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector> consumer);
 
-  /**
-   * The consumer processes the data by the Iterator and collects the results 
by RowCollector.
-   *
-   * @return TabletInsertionEvent a new TabletInsertionEvent contains the 
results collected by the
-   *     RowCollector
-   */
-  TabletInsertionEvent processByIterator(BiConsumer<Iterator<Row>, 
RowCollector> consumer);
-
   /**
    * The consumer processes the Tablet directly and collects the results by 
RowCollector.
    *
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 79871f857af..e3092d072fa 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -214,7 +214,9 @@ public enum TSStatusCode {
   PIPE_TYPE_ERROR(1801),
   PIPE_HANDSHAKE_ERROR(1802),
   PIPE_TRANSFER_FILE_OFFSET_RESET(1803),
-  PIPE_TRANSFER_FILE_ERROR(1804);
+  PIPE_TRANSFER_FILE_ERROR(1804),
+  PIPE_TRANSFER_EXECUTE_STATEMENT_ERROR(1805),
+  ;
 
   private final int statusCode;
 
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java 
b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index e5a9fae2d8d..5c7c91545ee 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -154,6 +154,7 @@ public class CommonConfig {
   private int pipeConnectorReadFileBufferSize = 8388608;
   private long pipeConnectorRetryIntervalMs = 1000L;
   private int pipeConnectorPendingQueueSize = 1024;
+  private long pipeConnectorSessionId = Long.MAX_VALUE / 2;
 
   private int pipeHeartbeatLoopCyclesForCollectingPipeMeta = 100;
   private long pipeMetaSyncerInitialSyncDelayMinutes = 3;
@@ -549,4 +550,12 @@ public class CommonConfig {
     this.pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs =
         pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs;
   }
+
+  public long getPipeConnectorSessionId() {
+    return pipeConnectorSessionId;
+  }
+
+  public void setPipeConnectorSessionId(long pipeSessionId) {
+    this.pipeConnectorSessionId = pipeSessionId;
+  }
 }
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index bd72a56ae88..e45c4af45c5 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -238,13 +238,13 @@ public class CommonDescriptor {
                 String.valueOf(
                     
config.getPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount()))));
     config.setPipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration(
-        Integer.parseInt(
+        Long.parseLong(
             properties.getProperty(
                 
"pipe_subtask_executor_basic_check_point_interval_by_time_duration",
                 String.valueOf(
                     
config.getPipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration()))));
     config.setPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs(
-        Integer.parseInt(
+        Long.parseLong(
             properties.getProperty(
                 "pipe_subtask_executor_pending_queue_max_blocking_time_ms",
                 
String.valueOf(config.getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs()))));
@@ -276,7 +276,7 @@ public class CommonDescriptor {
                 "pipe_connector_read_file_buffer_size",
                 String.valueOf(config.getPipeConnectorReadFileBufferSize()))));
     config.setPipeConnectorRetryIntervalMs(
-        Integer.parseInt(
+        Long.parseLong(
             properties.getProperty(
                 "pipe_connector_retry_interval_ms",
                 String.valueOf(config.getPipeConnectorRetryIntervalMs()))));
@@ -285,6 +285,10 @@ public class CommonDescriptor {
             properties.getProperty(
                 "pipe_connector_pending_queue_size",
                 String.valueOf(config.getPipeConnectorPendingQueueSize()))));
+    config.setPipeConnectorSessionId(
+        Long.parseLong(
+            properties.getProperty(
+                "pipe_connector_session_id", 
String.valueOf(config.getPipeConnectorSessionId()))));
 
     config.setPipeHeartbeatLoopCyclesForCollectingPipeMeta(
         Integer.parseInt(
@@ -292,12 +296,12 @@ public class CommonDescriptor {
                 "pipe_heartbeat_loop_cycles_for_collecting_pipe_meta",
                 
String.valueOf(config.getPipeHeartbeatLoopCyclesForCollectingPipeMeta()))));
     config.setPipeMetaSyncerInitialSyncDelayMinutes(
-        Integer.parseInt(
+        Long.parseLong(
             properties.getProperty(
                 "pipe_meta_syncer_initial_sync_delay_minutes",
                 
String.valueOf(config.getPipeMetaSyncerInitialSyncDelayMinutes()))));
     config.setPipeMetaSyncerSyncIntervalMinutes(
-        Integer.parseInt(
+        Long.parseLong(
             properties.getProperty(
                 "pipe_meta_syncer_sync_interval_minutes",
                 
String.valueOf(config.getPipeMetaSyncerSyncIntervalMinutes()))));
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 365ea0b8003..af0fcd379ff 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -85,6 +85,10 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeConnectorPendingQueueSize();
   }
 
+  public long getPipeConnectorSessionId() {
+    return COMMON_CONFIG.getPipeConnectorSessionId();
+  }
+
   /////////////////////////////// Meta Consistency 
///////////////////////////////
 
   public int getHeartbeatLoopCyclesForCollectingPipeMeta() {
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
index bc56a8bb3cf..d083783a716 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
@@ -30,38 +30,39 @@ import 
org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 
 import java.io.IOException;
 
+/** This class is a placeholder and should not be used. */
 public class DoNothingProcessor implements PipeProcessor {
 
   @Override
   public void validate(PipeParameterValidator validator) {
-    // do nothing
+    throw new UnsupportedOperationException("This class is a placeholder and 
should not be used.");
   }
 
   @Override
   public void customize(
       PipeParameters parameters, PipeProcessorRuntimeConfiguration 
configuration) {
-    // do nothing
+    throw new UnsupportedOperationException("This class is a placeholder and 
should not be used.");
   }
 
   @Override
   public void process(TabletInsertionEvent tabletInsertionEvent, 
EventCollector eventCollector)
       throws IOException {
-    eventCollector.collect(tabletInsertionEvent);
+    throw new UnsupportedOperationException("This class is a placeholder and 
should not be used.");
   }
 
   @Override
   public void process(TsFileInsertionEvent tsFileInsertionEvent, 
EventCollector eventCollector)
       throws IOException {
-    eventCollector.collect(tsFileInsertionEvent);
+    throw new UnsupportedOperationException("This class is a placeholder and 
should not be used.");
   }
 
   @Override
   public void process(Event event, EventCollector eventCollector) throws 
IOException {
-    eventCollector.collect(event);
+    throw new UnsupportedOperationException("This class is a placeholder and 
should not be used.");
   }
 
   @Override
   public void close() {
-    // do nothing
+    throw new UnsupportedOperationException("This class is a placeholder and 
should not be used.");
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/utils/PipeBinaryTransformer.java
similarity index 60%
copy from 
server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java
copy to 
node-commons/src/main/java/org/apache/iotdb/commons/pipe/utils/PipeBinaryTransformer.java
index 687c3e72c14..125de0e9206 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/utils/PipeBinaryTransformer.java
@@ -17,15 +17,16 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.core.event.view.collector;
+package org.apache.iotdb.commons.pipe.utils;
 
-import org.apache.iotdb.pipe.api.access.Row;
-import org.apache.iotdb.pipe.api.collector.RowCollector;
+public class PipeBinaryTransformer {
+  public static org.apache.iotdb.tsfile.utils.Binary transformToBinary(
+      org.apache.iotdb.pipe.api.type.Binary binary) {
+    return binary == null ? null : new 
org.apache.iotdb.tsfile.utils.Binary(binary.getValues());
+  }
 
-import java.io.IOException;
-
-public class PipeRowCollector implements RowCollector {
-
-  @Override
-  public void collectRow(Row row) throws IOException {}
+  public static org.apache.iotdb.pipe.api.type.Binary transformToPipeBinary(
+      org.apache.iotdb.tsfile.utils.Binary binary) {
+    return binary == null ? null : new 
org.apache.iotdb.pipe.api.type.Binary(binary.getValues());
+  }
 }
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/utils/PipeDataTypeTransformer.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/utils/PipeDataTypeTransformer.java
new file mode 100644
index 00000000000..7e810f41c28
--- /dev/null
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/utils/PipeDataTypeTransformer.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.utils;
+
+import org.apache.iotdb.pipe.api.type.Type;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Transform between {@link 
org.apache.iotdb.tsfile.file.metadata.enums.TSDataType} and {@link
+ * org.apache.iotdb.pipe.api.type.Type}
+ */
+public class PipeDataTypeTransformer {
+  public static Type transformToPipeDataType(TSDataType tsDataType) {
+    return tsDataType == null ? null : getPipeDataType(tsDataType.getType());
+  }
+
+  public static List<Type> transformToPipeDataTypeList(List<TSDataType> 
tsDataTypeList) {
+    return tsDataTypeList == null
+        ? null
+        : tsDataTypeList.stream()
+            .map(PipeDataTypeTransformer::transformToPipeDataType)
+            .collect(Collectors.toList());
+  }
+
+  private static Type getPipeDataType(byte type) {
+    switch (type) {
+      case 0:
+        return Type.BOOLEAN;
+      case 1:
+        return Type.INT32;
+      case 2:
+        return Type.INT64;
+      case 3:
+        return Type.FLOAT;
+      case 4:
+        return Type.DOUBLE;
+      case 5:
+        return Type.TEXT;
+      default:
+        throw new IllegalArgumentException("Invalid input: " + type);
+    }
+  }
+
+  public static TSDataType transformToTsDataType(Type type) {
+    return type == null ? null : getTsDataType(type.getType());
+  }
+
+  public static List<TSDataType> transformToTsDataTypeList(List<Type> 
typeList) {
+    return typeList == null
+        ? null
+        : typeList.stream()
+            .map(PipeDataTypeTransformer::transformToTsDataType)
+            .collect(Collectors.toList());
+  }
+
+  private static TSDataType getTsDataType(byte type) {
+    switch (type) {
+      case 0:
+        return TSDataType.BOOLEAN;
+      case 1:
+        return TSDataType.INT32;
+      case 2:
+        return TSDataType.INT64;
+      case 3:
+        return TSDataType.FLOAT;
+      case 4:
+        return TSDataType.DOUBLE;
+      case 5:
+        return TSDataType.TEXT;
+      default:
+        throw new IllegalArgumentException("Invalid input: " + type);
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
index c1157d96979..a1742f9fce3 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
 import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
 import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.utils.DateTimeUtils;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
@@ -152,7 +153,12 @@ public class PipeHistoricalDataRegionTsFileCollector 
extends PipeHistoricalDataR
                         
!startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
                             && 
isTsFileResourceOverlappedWithTimeRange(resource)
                             && 
isTsFileGeneratedAfterCollectionTimeLowerBound(resource))
-                .map(resource -> new PipeTsFileInsertionEvent(resource, 
pipeTaskMeta))
+                .map(
+                    resource ->
+                        new PipeTsFileInsertionEvent(
+                            resource,
+                            pipeTaskMeta,
+                            
PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE))
                 .collect(Collectors.toList()));
         pendingQueue.addAll(
             tsFileManager.getTsFileList(false).stream()
@@ -161,7 +167,12 @@ public class PipeHistoricalDataRegionTsFileCollector 
extends PipeHistoricalDataR
                         
!startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
                             && 
isTsFileResourceOverlappedWithTimeRange(resource)
                             && 
isTsFileGeneratedAfterCollectionTimeLowerBound(resource))
-                .map(resource -> new PipeTsFileInsertionEvent(resource, 
pipeTaskMeta))
+                .map(
+                    resource ->
+                        new PipeTsFileInsertionEvent(
+                            resource,
+                            pipeTaskMeta,
+                            
PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE))
                 .collect(Collectors.toList()));
         pendingQueue.forEach(
             event ->
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/PipeDataRegionAssigner.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/PipeDataRegionAssigner.java
index 3e264ce02e9..4b7c0cf8dbf 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/PipeDataRegionAssigner.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/PipeDataRegionAssigner.java
@@ -55,7 +55,7 @@ public class PipeDataRegionAssigner {
             collector -> {
               final PipeRealtimeCollectEvent copiedEvent =
                   event.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
-                      collector.getPipeTaskMeta());
+                      collector.getPipeTaskMeta(), collector.getPattern());
               
copiedEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName());
               collector.collect(copiedEvent);
             });
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
index 6f7db5bbf97..40c156e3285 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
@@ -32,6 +32,8 @@ import 
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransfe
 import 
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferFileSealReq;
 import 
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferHandshakeReq;
 import 
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferInsertNodeReq;
+import 
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferTabletReq;
+import org.apache.iotdb.db.pipe.core.event.impl.PipeInsertNodeInsertionEvent;
 import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.wal.exception.WALPipeException;
@@ -113,15 +115,16 @@ public class IoTDBThriftConnectorV1 implements 
PipeConnector {
 
   @Override
   public void transfer(TabletInsertionEvent tabletInsertionEvent) throws 
Exception {
-    // TODO: support more TabletInsertionEvent
     // PipeProcessor can change the type of TabletInsertionEvent
-    if (!(tabletInsertionEvent instanceof PipeTabletInsertionEvent)) {
-      throw new NotImplementedException(
-          "IoTDBThriftConnectorV1 only support PipeTabletInsertionEvent.");
-    }
-
     try {
-      doTransfer((PipeTabletInsertionEvent) tabletInsertionEvent);
+      if (tabletInsertionEvent instanceof PipeInsertNodeInsertionEvent) {
+        doTransfer((PipeInsertNodeInsertionEvent) tabletInsertionEvent);
+      } else if (tabletInsertionEvent instanceof PipeTabletInsertionEvent) {
+        doTransfer((PipeTabletInsertionEvent) tabletInsertionEvent);
+      } else {
+        throw new NotImplementedException(
+            "IoTDBThriftConnectorV1 only support PipeInsertNodeInsertionEvent 
and PipeTabletInsertionEvent.");
+      }
     } catch (TException e) {
       LOGGER.error(
           "Network error when transfer tablet insertion event: {}.", 
tabletInsertionEvent, e);
@@ -133,23 +136,37 @@ public class IoTDBThriftConnectorV1 implements 
PipeConnector {
     }
   }
 
-  private void doTransfer(PipeTabletInsertionEvent pipeTabletInsertionEvent)
+  private void doTransfer(PipeInsertNodeInsertionEvent 
pipeInsertNodeInsertionEvent)
       throws PipeException, TException, WALPipeException {
     final TPipeTransferResp resp =
         client.pipeTransfer(
-            
PipeTransferInsertNodeReq.toTPipeTransferReq(pipeTabletInsertionEvent.getInsertNode()));
+            PipeTransferInsertNodeReq.toTPipeTransferReq(
+                pipeInsertNodeInsertionEvent.getInsertNode()));
+
+    if (resp.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeException(
+          String.format(
+              "Transfer PipeInsertNodeInsertionEvent %s error, result status 
%s",
+              pipeInsertNodeInsertionEvent, resp.status));
+    }
+  }
+
+  private void doTransfer(PipeTabletInsertionEvent pipeTabletInsertionEvent)
+      throws PipeException, TException, IOException {
+    final TPipeTransferResp resp =
+        client.pipeTransfer(
+            
PipeTransferTabletReq.toTPipeTransferReq(pipeTabletInsertionEvent.convertToTablet()));
 
     if (resp.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       throw new PipeException(
           String.format(
-              "Transfer tablet insertion event %s error, result status %s",
+              "Transfer PipeTabletInsertionEvent %s error, result status %s",
               pipeTabletInsertionEvent, resp.status));
     }
   }
 
   @Override
   public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws 
Exception {
-    // TODO: support more TsFileInsertionEvent
     // PipeProcessor can change the type of TabletInsertionEvent
     if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
       throw new NotImplementedException(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftReceiverV1.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftReceiverV1.java
index ea1f221d035..b7fcb594c20 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftReceiverV1.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftReceiverV1.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.LoadTsFileStatement;
 import org.apache.iotdb.db.pipe.agent.receiver.IoTDBThriftReceiver;
 import 
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.IoTDBThriftConnectorVersion;
@@ -35,6 +36,7 @@ import 
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransfe
 import 
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferFileSealReq;
 import 
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferHandshakeReq;
 import 
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferInsertNodeReq;
+import 
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferTabletReq;
 import org.apache.iotdb.db.query.control.SessionManager;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -69,6 +71,9 @@ public class IoTDBThriftReceiverV1 implements 
IoTDBThriftReceiver {
         case TRANSFER_INSERT_NODE:
           return handleTransferInsertNode(
               PipeTransferInsertNodeReq.fromTPipeTransferReq(req), 
partitionFetcher, schemaFetcher);
+        case TRANSFER_TABLET:
+          return handleTransferTablet(
+              PipeTransferTabletReq.fromTPipeTransferReq(req), 
partitionFetcher, schemaFetcher);
         case TRANSFER_FILE_PIECE:
           return 
handleTransferFilePiece(PipeTransferFilePieceReq.fromTPipeTransferReq(req));
         case TRANSFER_FILE_SEAL:
@@ -109,26 +114,13 @@ public class IoTDBThriftReceiverV1 implements 
IoTDBThriftReceiver {
         executeStatement(req.constructStatement(), partitionFetcher, 
schemaFetcher));
   }
 
-  private TSStatus executeStatement(
-      Statement statement, IPartitionFetcher partitionFetcher, ISchemaFetcher 
schemaFetcher) {
-    final long queryId = SessionManager.getInstance().requestQueryId();
-    final ExecutionResult result =
-        Coordinator.getInstance()
-            .execute(
-                statement,
-                queryId,
-                null,
-                "",
-                partitionFetcher,
-                schemaFetcher,
-                
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold());
-    if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      LOGGER.warn(
-          "failed to execute statement, statement: {}, result status is: {}",
-          statement,
-          result.status);
-    }
-    return result.status;
+  private TPipeTransferResp handleTransferTablet(
+      PipeTransferTabletReq req, IPartitionFetcher partitionFetcher, 
ISchemaFetcher schemaFetcher) {
+    InsertTabletStatement statement = req.constructStatement();
+    return new TPipeTransferResp(
+        statement.isEmpty()
+            ? RpcUtils.SUCCESS_STATUS
+            : executeStatement(statement, partitionFetcher, schemaFetcher));
   }
 
   private TPipeTransferResp handleTransferFilePiece(PipeTransferFilePieceReq 
req) {
@@ -258,6 +250,33 @@ public class IoTDBThriftReceiverV1 implements 
IoTDBThriftReceiver {
     return writingFile != null && writingFile.exists() && writingFileWriter != 
null;
   }
 
+  private TSStatus executeStatement(
+      Statement statement, IPartitionFetcher partitionFetcher, ISchemaFetcher 
schemaFetcher) {
+    if (statement == null) {
+      return RpcUtils.getStatus(
+          TSStatusCode.PIPE_TRANSFER_EXECUTE_STATEMENT_ERROR, "Execute null 
statement.");
+    }
+
+    final long queryId = SessionManager.getInstance().requestQueryId();
+    final ExecutionResult result =
+        Coordinator.getInstance()
+            .execute(
+                statement,
+                queryId,
+                null,
+                "",
+                partitionFetcher,
+                schemaFetcher,
+                
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold());
+    if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      LOGGER.warn(
+          "failed to execute statement, statement: {}, result status is: {}",
+          statement,
+          result.status);
+    }
+    return result.status;
+  }
+
   @Override
   public synchronized void handleExit() {
     try {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/PipeRequestType.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/PipeRequestType.java
index 982243cd87e..c62034353d0 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/PipeRequestType.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/PipeRequestType.java
@@ -27,9 +27,10 @@ public enum PipeRequestType {
   HANDSHAKE((short) 1),
 
   TRANSFER_INSERT_NODE((short) 2),
+  TRANSFER_TABLET((short) 3),
 
-  TRANSFER_FILE_PIECE((short) 3),
-  TRANSFER_FILE_SEAL((short) 4),
+  TRANSFER_FILE_PIECE((short) 4),
+  TRANSFER_FILE_SEAL((short) 5),
   ;
 
   private final short type;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/request/PipeTransferTabletReq.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/request/PipeTransferTabletReq.java
new file mode 100644
index 00000000000..ad54dd267e5
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/request/PipeTransferTabletReq.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request;
+
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.utils.PathUtils;
+import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
+import 
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.IoTDBThriftConnectorVersion;
+import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.PipeRequestType;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
+import org.apache.iotdb.session.util.SessionUtils;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+
+public class PipeTransferTabletReq extends TPipeTransferReq {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTransferTabletReq.class);
+  private Tablet tablet;
+
+  public static TPipeTransferReq toTPipeTransferReq(Tablet tablet) throws 
IOException {
+    final PipeTransferTabletReq tabletReq = new PipeTransferTabletReq();
+
+    tabletReq.tablet = tablet;
+
+    tabletReq.version = IoTDBThriftConnectorVersion.VERSION_ONE.getVersion();
+    tabletReq.type = PipeRequestType.TRANSFER_TABLET.getType();
+    tabletReq.body = tablet.serialize();
+    return tabletReq;
+  }
+
+  private static boolean checkSorted(Tablet tablet) {
+    for (int i = 1; i < tablet.rowSize; i++) {
+      if (tablet.timestamps[i] < tablet.timestamps[i - 1]) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  public static void sortTablet(Tablet tablet) {
+    /*
+     * following part of code sort the batch data by time,
+     * so we can insert continuous data in value list to get a better 
performance
+     */
+    // sort to get index, and use index to sort value list
+    Integer[] index = new Integer[tablet.rowSize];
+    for (int i = 0; i < tablet.rowSize; i++) {
+      index[i] = i;
+    }
+    Arrays.sort(index, Comparator.comparingLong(o -> tablet.timestamps[o]));
+    Arrays.sort(tablet.timestamps, 0, tablet.rowSize);
+    int columnIndex = 0;
+    for (int i = 0; i < tablet.getSchemas().size(); i++) {
+      IMeasurementSchema schema = tablet.getSchemas().get(i);
+      if (schema instanceof MeasurementSchema) {
+        tablet.values[columnIndex] = sortList(tablet.values[columnIndex], 
schema.getType(), index);
+        if (tablet.bitMaps != null && tablet.bitMaps[columnIndex] != null) {
+          tablet.bitMaps[columnIndex] = 
sortBitMap(tablet.bitMaps[columnIndex], index);
+        }
+        columnIndex++;
+      } else {
+        int measurementSize = schema.getSubMeasurementsList().size();
+        for (int j = 0; j < measurementSize; j++) {
+          tablet.values[columnIndex] =
+              sortList(
+                  tablet.values[columnIndex],
+                  schema.getSubMeasurementsTSDataTypeList().get(j),
+                  index);
+          if (tablet.bitMaps != null && tablet.bitMaps[columnIndex] != null) {
+            tablet.bitMaps[columnIndex] = 
sortBitMap(tablet.bitMaps[columnIndex], index);
+          }
+          columnIndex++;
+        }
+      }
+    }
+  }
+
+  /**
+   * sort value list by index
+   *
+   * @param valueList value list
+   * @param dataType data type
+   * @param index index
+   * @return sorted list
+   */
+  private static Object sortList(Object valueList, TSDataType dataType, 
Integer[] index) {
+    switch (dataType) {
+      case BOOLEAN:
+        boolean[] boolValues = (boolean[]) valueList;
+        boolean[] sortedValues = new boolean[boolValues.length];
+        for (int i = 0; i < index.length; i++) {
+          sortedValues[i] = boolValues[index[i]];
+        }
+        return sortedValues;
+      case INT32:
+        int[] intValues = (int[]) valueList;
+        int[] sortedIntValues = new int[intValues.length];
+        for (int i = 0; i < index.length; i++) {
+          sortedIntValues[i] = intValues[index[i]];
+        }
+        return sortedIntValues;
+      case INT64:
+        long[] longValues = (long[]) valueList;
+        long[] sortedLongValues = new long[longValues.length];
+        for (int i = 0; i < index.length; i++) {
+          sortedLongValues[i] = longValues[index[i]];
+        }
+        return sortedLongValues;
+      case FLOAT:
+        float[] floatValues = (float[]) valueList;
+        float[] sortedFloatValues = new float[floatValues.length];
+        for (int i = 0; i < index.length; i++) {
+          sortedFloatValues[i] = floatValues[index[i]];
+        }
+        return sortedFloatValues;
+      case DOUBLE:
+        double[] doubleValues = (double[]) valueList;
+        double[] sortedDoubleValues = new double[doubleValues.length];
+        for (int i = 0; i < index.length; i++) {
+          sortedDoubleValues[i] = doubleValues[index[i]];
+        }
+        return sortedDoubleValues;
+      case TEXT:
+        Binary[] binaryValues = (Binary[]) valueList;
+        Binary[] sortedBinaryValues = new Binary[binaryValues.length];
+        for (int i = 0; i < index.length; i++) {
+          sortedBinaryValues[i] = binaryValues[index[i]];
+        }
+        return sortedBinaryValues;
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Data type %s is not supported.", dataType));
+    }
+  }
+
+  /**
+   * sort BitMap by index
+   *
+   * @param bitMap BitMap to be sorted
+   * @param index index
+   * @return sorted bitMap
+   */
+  private static BitMap sortBitMap(BitMap bitMap, Integer[] index) {
+    BitMap sortedBitMap = new BitMap(bitMap.getSize());
+    for (int i = 0; i < index.length; i++) {
+      if (bitMap.isMarked(index[i])) {
+        sortedBitMap.mark(i);
+      }
+    }
+    return sortedBitMap;
+  }
+
+  public InsertTabletStatement constructStatement() {
+    if (!checkSorted(tablet)) {
+      sortTablet(tablet);
+    }
+
+    try {
+      final TSInsertTabletReq request = new TSInsertTabletReq();
+
+      for (IMeasurementSchema measurementSchema : tablet.getSchemas()) {
+        request.addToMeasurements(measurementSchema.getMeasurementId());
+        request.addToTypes(measurementSchema.getType().ordinal());
+      }
+
+      request.setPrefixPath(tablet.deviceId);
+      request.setIsAligned(false);
+      request.setTimestamps(SessionUtils.getTimeBuffer(tablet));
+      request.setValues(SessionUtils.getValueBuffer(tablet));
+      request.setSize(tablet.rowSize);
+      request.setMeasurements(
+          
PathUtils.checkIsLegalSingleMeasurementsAndUpdate(request.getMeasurements()));
+
+      return StatementGenerator.createStatement(request);
+    } catch (MetadataException e) {
+      LOGGER.warn(String.format("Generate Statement from tablet %s error.", 
tablet), e);
+      return null;
+    }
+  }
+
+  public static PipeTransferTabletReq fromTPipeTransferReq(TPipeTransferReq 
transferReq) {
+    final PipeTransferTabletReq tabletReq = new PipeTransferTabletReq();
+
+    tabletReq.tablet = Tablet.deserialize(transferReq.body);
+
+    tabletReq.version = transferReq.version;
+    tabletReq.type = transferReq.type;
+    tabletReq.body = transferReq.body;
+
+    return tabletReq;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/EnrichedEvent.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/EnrichedEvent.java
index 9fcc5fd1d90..e972104ce5d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/EnrichedEvent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/EnrichedEvent.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
 import org.apache.iotdb.pipe.api.event.Event;
 
 import java.util.concurrent.atomic.AtomicInteger;
@@ -32,13 +33,17 @@ import java.util.concurrent.atomic.AtomicInteger;
  * additional information mainly includes the reference count of the event.
  */
 public abstract class EnrichedEvent implements Event {
+
   private final AtomicInteger referenceCount;
 
   private final PipeTaskMeta pipeTaskMeta;
 
-  public EnrichedEvent(PipeTaskMeta pipeTaskMeta) {
+  private final String pattern;
+
+  public EnrichedEvent(PipeTaskMeta pipeTaskMeta, String pattern) {
     referenceCount = new AtomicInteger(0);
     this.pipeTaskMeta = pipeTaskMeta;
+    this.pattern = pattern;
   }
 
   /**
@@ -114,8 +119,17 @@ public abstract class EnrichedEvent implements Event {
     return referenceCount.get();
   }
 
+  /**
+   * Get the pattern of this event.
+   *
+   * @return the pattern
+   */
+  public final String getPattern() {
+    return pattern == null ? 
PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE : pattern;
+  }
+
   public abstract EnrichedEvent 
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
-      PipeTaskMeta pipeTaskMeta);
+      PipeTaskMeta pipeTaskMeta, String pattern);
 
   public void reportException(PipeRuntimeException pipeRuntimeException) {
     PipeAgent.runtime().report(this.pipeTaskMeta, pipeRuntimeException);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeInsertNodeInsertionEvent.java
similarity index 65%
copy from 
server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
copy to 
server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeInsertNodeInsertionEvent.java
index fb9788d1a95..3b980f10b9d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeInsertNodeInsertionEvent.java
@@ -23,34 +23,41 @@ import 
org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
+import 
org.apache.iotdb.db.pipe.core.event.view.datastructure.TabletInsertionDataContainer;
 import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
 import org.apache.iotdb.db.wal.exception.WALPipeException;
 import org.apache.iotdb.db.wal.utils.WALEntryHandler;
 import org.apache.iotdb.pipe.api.access.Row;
 import org.apache.iotdb.pipe.api.collector.RowCollector;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Iterator;
 import java.util.function.BiConsumer;
 
-public class PipeTabletInsertionEvent extends EnrichedEvent implements 
TabletInsertionEvent {
+public class PipeInsertNodeInsertionEvent extends EnrichedEvent implements 
TabletInsertionEvent {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTabletInsertionEvent.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeInsertNodeInsertionEvent.class);
 
   private final WALEntryHandler walEntryHandler;
   private final ProgressIndex progressIndex;
 
-  public PipeTabletInsertionEvent(WALEntryHandler walEntryHandler, 
ProgressIndex progressIndex) {
-    this(walEntryHandler, progressIndex, null);
+  private TabletInsertionDataContainer dataContainer;
+
+  public PipeInsertNodeInsertionEvent(
+      WALEntryHandler walEntryHandler, ProgressIndex progressIndex) {
+    this(walEntryHandler, progressIndex, null, null);
   }
 
-  private PipeTabletInsertionEvent(
-      WALEntryHandler walEntryHandler, ProgressIndex progressIndex, 
PipeTaskMeta pipeTaskMeta) {
-    super(pipeTaskMeta);
+  private PipeInsertNodeInsertionEvent(
+      WALEntryHandler walEntryHandler,
+      ProgressIndex progressIndex,
+      PipeTaskMeta pipeTaskMeta,
+      String pattern) {
+    super(pipeTaskMeta, pattern);
     this.walEntryHandler = walEntryHandler;
     this.progressIndex = progressIndex;
   }
@@ -97,26 +104,49 @@ public class PipeTabletInsertionEvent extends 
EnrichedEvent implements TabletIns
   }
 
   @Override
-  public PipeTabletInsertionEvent 
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
-      PipeTaskMeta pipeTaskMeta) {
-    return new PipeTabletInsertionEvent(walEntryHandler, progressIndex, 
pipeTaskMeta);
+  public PipeInsertNodeInsertionEvent 
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
+      PipeTaskMeta pipeTaskMeta, String pattern) {
+    return new PipeInsertNodeInsertionEvent(walEntryHandler, progressIndex, 
pipeTaskMeta, pattern);
   }
 
   /////////////////////////// TabletInsertionEvent ///////////////////////////
 
   @Override
   public TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector> 
consumer) {
-    throw new UnsupportedOperationException("Not implemented yet");
+    try {
+      if (dataContainer == null) {
+        dataContainer = new TabletInsertionDataContainer(getInsertNode(), 
getPattern());
+      }
+      return dataContainer.processRowByRow(consumer);
+    } catch (Exception e) {
+      LOGGER.error("Process row by row error.", e);
+      throw new PipeException("Process row by row error.", e);
+    }
   }
 
   @Override
-  public TabletInsertionEvent processByIterator(BiConsumer<Iterator<Row>, 
RowCollector> consumer) {
-    throw new UnsupportedOperationException("Not implemented yet");
+  public TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector> 
consumer) {
+    try {
+      if (dataContainer == null) {
+        dataContainer = new TabletInsertionDataContainer(getInsertNode(), 
getPattern());
+      }
+      return dataContainer.processTablet(consumer);
+    } catch (Exception e) {
+      LOGGER.error("Process tablet error.", e);
+      throw new PipeException("Process tablet error.", e);
+    }
   }
 
-  @Override
-  public TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector> 
consumer) {
-    throw new UnsupportedOperationException("Not implemented yet");
+  public Tablet convertToTablet() {
+    try {
+      if (dataContainer == null) {
+        dataContainer = new TabletInsertionDataContainer(getInsertNode(), 
getPattern());
+      }
+      return dataContainer.convertToTablet();
+    } catch (Exception e) {
+      LOGGER.error("Process tablet error.", e);
+      throw new PipeException("Process tablet error.", e);
+    }
   }
 
   /////////////////////////// Object ///////////////////////////
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
index fb9788d1a95..399b091264d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
@@ -19,115 +19,58 @@
 
 package org.apache.iotdb.db.pipe.core.event.impl;
 
-import org.apache.iotdb.commons.consensus.index.ProgressIndex;
-import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
-import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
-import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
-import org.apache.iotdb.db.wal.exception.WALPipeException;
-import org.apache.iotdb.db.wal.utils.WALEntryHandler;
+import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
+import 
org.apache.iotdb.db.pipe.core.event.view.datastructure.TabletInsertionDataContainer;
 import org.apache.iotdb.pipe.api.access.Row;
 import org.apache.iotdb.pipe.api.collector.RowCollector;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Iterator;
+import java.util.Objects;
 import java.util.function.BiConsumer;
 
-public class PipeTabletInsertionEvent extends EnrichedEvent implements 
TabletInsertionEvent {
-
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTabletInsertionEvent.class);
-
-  private final WALEntryHandler walEntryHandler;
-  private final ProgressIndex progressIndex;
-
-  public PipeTabletInsertionEvent(WALEntryHandler walEntryHandler, 
ProgressIndex progressIndex) {
-    this(walEntryHandler, progressIndex, null);
-  }
-
-  private PipeTabletInsertionEvent(
-      WALEntryHandler walEntryHandler, ProgressIndex progressIndex, 
PipeTaskMeta pipeTaskMeta) {
-    super(pipeTaskMeta);
-    this.walEntryHandler = walEntryHandler;
-    this.progressIndex = progressIndex;
-  }
-
-  public InsertNode getInsertNode() throws WALPipeException {
-    return walEntryHandler.getValue();
-  }
+public class PipeTabletInsertionEvent implements TabletInsertionEvent {
 
-  /////////////////////////// EnrichedEvent ///////////////////////////
+  private final Tablet tablet;
+  private final String pattern;
 
-  @Override
-  public boolean increaseResourceReferenceCount(String holderMessage) {
-    try {
-      PipeResourceManager.wal().pin(walEntryHandler.getMemTableId(), 
walEntryHandler);
-      return true;
-    } catch (Exception e) {
-      LOGGER.warn(
-          String.format(
-              "Increase reference count for memtable %d error. Holder Message: 
%s",
-              walEntryHandler.getMemTableId(), holderMessage),
-          e);
-      return false;
-    }
-  }
+  private TabletInsertionDataContainer dataContainer;
 
-  @Override
-  public boolean decreaseResourceReferenceCount(String holderMessage) {
-    try {
-      PipeResourceManager.wal().unpin(walEntryHandler.getMemTableId());
-      return true;
-    } catch (Exception e) {
-      LOGGER.warn(
-          String.format(
-              "Decrease reference count for memtable %d error. Holder Message: 
%s",
-              walEntryHandler.getMemTableId(), holderMessage),
-          e);
-      return false;
-    }
+  public PipeTabletInsertionEvent(Tablet tablet) {
+    this(Objects.requireNonNull(tablet), null);
   }
 
-  @Override
-  public ProgressIndex getProgressIndex() {
-    return progressIndex;
+  public PipeTabletInsertionEvent(Tablet tablet, String pattern) {
+    this.tablet = Objects.requireNonNull(tablet);
+    this.pattern = pattern;
   }
 
-  @Override
-  public PipeTabletInsertionEvent 
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
-      PipeTaskMeta pipeTaskMeta) {
-    return new PipeTabletInsertionEvent(walEntryHandler, progressIndex, 
pipeTaskMeta);
+  public String getPattern() {
+    return pattern == null ? 
PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE : pattern;
   }
 
   /////////////////////////// TabletInsertionEvent ///////////////////////////
 
   @Override
   public TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector> 
consumer) {
-    throw new UnsupportedOperationException("Not implemented yet");
-  }
-
-  @Override
-  public TabletInsertionEvent processByIterator(BiConsumer<Iterator<Row>, 
RowCollector> consumer) {
-    throw new UnsupportedOperationException("Not implemented yet");
+    if (dataContainer == null) {
+      dataContainer = new TabletInsertionDataContainer(tablet, getPattern());
+    }
+    return dataContainer.processRowByRow(consumer);
   }
 
   @Override
   public TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector> 
consumer) {
-    throw new UnsupportedOperationException("Not implemented yet");
+    if (dataContainer == null) {
+      dataContainer = new TabletInsertionDataContainer(tablet, getPattern());
+    }
+    return dataContainer.processTablet(consumer);
   }
 
-  /////////////////////////// Object ///////////////////////////
-
-  @Override
-  public String toString() {
-    return "PipeTabletInsertionEvent{"
-        + "walEntryHandler="
-        + walEntryHandler
-        + ", progressIndex="
-        + progressIndex
-        + '}';
+  public Tablet convertToTablet() {
+    if (dataContainer == null) {
+      dataContainer = new TabletInsertionDataContainer(tablet, getPattern());
+    }
+    return dataContainer.convertToTablet();
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
index 4b346b0f915..c813619be9a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
@@ -25,9 +25,11 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
+import 
org.apache.iotdb.db.pipe.core.event.view.datastructure.TsFileInsertionDataContainer;
 import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,12 +46,15 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
 
   private final AtomicBoolean isClosed;
 
+  private TsFileInsertionDataContainer dataContainer;
+
   public PipeTsFileInsertionEvent(TsFileResource resource) {
-    this(resource, null);
+    this(resource, null, null);
   }
 
-  public PipeTsFileInsertionEvent(TsFileResource resource, PipeTaskMeta 
pipeTaskMeta) {
-    super(pipeTaskMeta);
+  public PipeTsFileInsertionEvent(
+      TsFileResource resource, PipeTaskMeta pipeTaskMeta, String pattern) {
+    super(pipeTaskMeta, pattern);
 
     this.resource = resource;
     tsFile = resource.getTsFile();
@@ -132,15 +137,28 @@ public class PipeTsFileInsertionEvent extends 
EnrichedEvent implements TsFileIns
 
   @Override
   public PipeTsFileInsertionEvent 
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
-      PipeTaskMeta pipeTaskMeta) {
-    return new PipeTsFileInsertionEvent(resource, pipeTaskMeta);
+      PipeTaskMeta pipeTaskMeta, String pattern) {
+    return new PipeTsFileInsertionEvent(resource, pipeTaskMeta, pattern);
   }
 
   /////////////////////////// TsFileInsertionEvent ///////////////////////////
 
   @Override
   public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
-    throw new UnsupportedOperationException("Not implemented yet");
+    try {
+      if (dataContainer == null) {
+        waitForTsFileClose();
+        dataContainer = new TsFileInsertionDataContainer(tsFile, getPattern());
+      }
+      return dataContainer.toTabletInsertionEvents();
+    } catch (InterruptedException e) {
+      String errorMsg =
+          String.format(
+              "Interrupted when waiting for closing TsFile %s.", 
resource.getTsFilePath());
+      LOGGER.warn(errorMsg);
+      Thread.currentThread().interrupt();
+      throw new PipeException(errorMsg);
+    }
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
index f15ef605617..d91eb07a4b0 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
@@ -38,11 +38,14 @@ public class PipeRealtimeCollectEvent extends EnrichedEvent 
{
   private Map<String, String[]> device2Measurements;
 
   public PipeRealtimeCollectEvent(
-      EnrichedEvent event, TsFileEpoch tsFileEpoch, Map<String, String[]> 
device2Measurements) {
+      EnrichedEvent event,
+      TsFileEpoch tsFileEpoch,
+      Map<String, String[]> device2Measurements,
+      String pattern) {
     // pipeTaskMeta is used to report the progress of the event, the 
PipeRealtimeCollectEvent
     // is only used in the realtime event collector, which does not need to 
report the progress
     // of the event, so the pipeTaskMeta is always null.
-    super(null);
+    super(null, pattern);
 
     this.event = event;
     this.tsFileEpoch = tsFileEpoch;
@@ -53,11 +56,12 @@ public class PipeRealtimeCollectEvent extends EnrichedEvent 
{
       EnrichedEvent event,
       TsFileEpoch tsFileEpoch,
       Map<String, String[]> device2Measurements,
-      PipeTaskMeta pipeTaskMeta) {
+      PipeTaskMeta pipeTaskMeta,
+      String pattern) {
     // pipeTaskMeta is used to report the progress of the event, the 
PipeRealtimeCollectEvent
     // is only used in the realtime event collector, which does not need to 
report the progress
     // of the event, so the pipeTaskMeta is always null.
-    super(pipeTaskMeta);
+    super(pipeTaskMeta, pattern);
 
     this.event = event;
     this.tsFileEpoch = tsFileEpoch;
@@ -115,12 +119,13 @@ public class PipeRealtimeCollectEvent extends 
EnrichedEvent {
 
   @Override
   public PipeRealtimeCollectEvent 
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
-      PipeTaskMeta pipeTaskMeta) {
+      PipeTaskMeta pipeTaskMeta, String pattern) {
     return new PipeRealtimeCollectEvent(
-        
event.shallowCopySelfAndBindPipeTaskMetaForProgressReport(pipeTaskMeta),
+        
event.shallowCopySelfAndBindPipeTaskMetaForProgressReport(pipeTaskMeta, 
pattern),
         this.tsFileEpoch,
         this.device2Measurements,
-        pipeTaskMeta);
+        pipeTaskMeta,
+        pattern);
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
index 937e3073a27..61d65cd43e8 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.pipe.core.event.realtime;
 
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
-import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.core.event.impl.PipeInsertNodeInsertionEvent;
 import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.wal.utils.WALEntryHandler;
 
@@ -36,8 +36,8 @@ public class PipeRealtimeCollectEventFactory {
 
   public static PipeRealtimeCollectEvent createCollectEvent(
       WALEntryHandler walEntryHandler, InsertNode insertNode, TsFileResource 
resource) {
-    return TS_FILE_EPOCH_MANAGER.bindPipeTabletInsertionEvent(
-        new PipeTabletInsertionEvent(walEntryHandler, 
insertNode.getProgressIndex()),
+    return TS_FILE_EPOCH_MANAGER.bindPipeInsertNodeInsertionEvent(
+        new PipeInsertNodeInsertionEvent(walEntryHandler, 
insertNode.getProgressIndex()),
         insertNode,
         resource);
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochManager.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochManager.java
index b7869778642..1dcc1fb9320 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochManager.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.pipe.core.event.realtime;
 
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
-import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.core.event.impl.PipeInsertNodeInsertionEvent;
 import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent;
 
 import org.slf4j.Logger;
@@ -57,14 +57,16 @@ public class TsFileEpochManager {
         // TsFileEpoch's life cycle
         filePath2Epoch.remove(filePath),
         resource.getDevices().stream()
-            .collect(Collectors.toMap(device -> device, device -> 
EMPTY_MEASUREMENT_ARRAY)));
+            .collect(Collectors.toMap(device -> device, device -> 
EMPTY_MEASUREMENT_ARRAY)),
+        event.getPattern());
   }
 
-  public PipeRealtimeCollectEvent bindPipeTabletInsertionEvent(
-      PipeTabletInsertionEvent event, InsertNode node, TsFileResource 
resource) {
+  public PipeRealtimeCollectEvent bindPipeInsertNodeInsertionEvent(
+      PipeInsertNodeInsertionEvent event, InsertNode node, TsFileResource 
resource) {
     return new PipeRealtimeCollectEvent(
         event,
         filePath2Epoch.computeIfAbsent(resource.getTsFilePath(), 
TsFileEpoch::new),
-        Collections.singletonMap(node.getDevicePath().getFullPath(), 
node.getMeasurements()));
+        Collections.singletonMap(node.getDevicePath().getFullPath(), 
node.getMeasurements()),
+        event.getPattern());
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java
index 6c9a5c55f8c..043be5ec83f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java
@@ -19,84 +19,130 @@
 
 package org.apache.iotdb.db.pipe.core.event.view.access;
 
+import org.apache.iotdb.commons.pipe.utils.PipeDataTypeTransformer;
 import org.apache.iotdb.pipe.api.access.Row;
 import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
 import org.apache.iotdb.pipe.api.type.Binary;
 import org.apache.iotdb.pipe.api.type.Type;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
-import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 
 public class PipeRow implements Row {
 
+  private final int rowIndex;
+
+  private final String deviceId;
+  private final MeasurementSchema[] measurementSchemaList;
+
+  private final long[] timestampColumn;
+  private final Object[][] valueColumns;
+  private final TSDataType[] valueColumnTypes;
+
+  private final String[] columnNameStringList;
+
+  public PipeRow(
+      int rowIndex,
+      String deviceId,
+      MeasurementSchema[] measurementSchemaList,
+      long[] timestampColumn,
+      Object[][] valueColumns,
+      TSDataType[] valueColumnTypes,
+      String[] columnNameStringList) {
+    this.rowIndex = rowIndex;
+    this.deviceId = deviceId;
+    this.measurementSchemaList = measurementSchemaList;
+    this.timestampColumn = timestampColumn;
+    this.valueColumns = valueColumns;
+    this.valueColumnTypes = valueColumnTypes;
+    this.columnNameStringList = columnNameStringList;
+  }
+
+  @Override
+  public long getTime() {
+    return timestampColumn[rowIndex];
+  }
+
   @Override
-  public long getTime() throws IOException {
-    return 0;
+  public int getInt(int columnIndex) {
+    return (int) valueColumns[columnIndex][rowIndex];
   }
 
   @Override
-  public int getInt(int columnIndex) throws IOException {
-    return 0;
+  public long getLong(int columnIndex) {
+    return (long) valueColumns[columnIndex][rowIndex];
   }
 
   @Override
-  public long getLong(int columnIndex) throws IOException {
-    return 0;
+  public float getFloat(int columnIndex) {
+    return (float) valueColumns[columnIndex][rowIndex];
   }
 
   @Override
-  public float getFloat(int columnIndex) throws IOException {
-    return 0;
+  public double getDouble(int columnIndex) {
+    return (double) valueColumns[columnIndex][rowIndex];
   }
 
   @Override
-  public double getDouble(int columnIndex) throws IOException {
-    return 0;
+  public boolean getBoolean(int columnIndex) {
+    return (boolean) valueColumns[columnIndex][rowIndex];
   }
 
   @Override
-  public boolean getBoolean(int columnIndex) throws IOException {
-    return false;
+  public Binary getBinary(int columnIndex) {
+    return Binary.valueOf((String) valueColumns[columnIndex][rowIndex]);
   }
 
   @Override
-  public Binary getBinary(int columnIndex) throws IOException {
-    return null;
+  public String getString(int columnIndex) {
+    return (String) valueColumns[columnIndex][rowIndex];
   }
 
   @Override
-  public String getString(int columnIndex) throws IOException {
-    return null;
+  public Object getObject(int columnIndex) {
+    return valueColumns[columnIndex][rowIndex];
   }
 
   @Override
   public Type getDataType(int columnIndex) {
-    return null;
+    return 
PipeDataTypeTransformer.transformToPipeDataType(valueColumnTypes[columnIndex]);
   }
 
   @Override
   public boolean isNull(int columnIndex) {
-    return false;
+    return valueColumns[columnIndex][rowIndex] == null;
   }
 
   @Override
   public int size() {
-    return 0;
+    return valueColumns.length;
   }
 
   @Override
   public int getColumnIndex(Path columnName) throws 
PipeParameterNotValidException {
-    return 0;
+    for (int i = 0; i < columnNameStringList.length; i++) {
+      if (columnNameStringList[i].equals(columnName.getFullPath())) {
+        return i;
+      }
+    }
+    throw new PipeParameterNotValidException(
+        String.format("column %s not found", columnName.getFullPath()));
   }
 
   @Override
-  public List<Path> getColumnNames() {
-    return null;
+  public List<Type> getColumnTypes() {
+    return 
PipeDataTypeTransformer.transformToPipeDataTypeList(Arrays.asList(valueColumnTypes));
   }
 
   @Override
-  public List<Type> getColumnTypes() {
-    return null;
+  public String getDeviceId() {
+    return deviceId;
+  }
+
+  public MeasurementSchema[] getMeasurementSchemaList() {
+    return measurementSchemaList;
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRowIterator.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRowIterator.java
deleted file mode 100644
index 73ee4d041a4..00000000000
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRowIterator.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.core.event.view.access;
-
-import org.apache.iotdb.pipe.api.access.Row;
-import org.apache.iotdb.pipe.api.access.RowIterator;
-import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
-import org.apache.iotdb.pipe.api.type.Type;
-import org.apache.iotdb.tsfile.read.common.Path;
-
-import java.io.IOException;
-import java.util.List;
-
-public class PipeRowIterator implements RowIterator {
-
-  @Override
-  public boolean hasNextRow() {
-    return false;
-  }
-
-  @Override
-  public Row next() throws IOException {
-    return null;
-  }
-
-  @Override
-  public void reset() {}
-
-  @Override
-  public int getColumnIndex(Path columnName) throws 
PipeParameterNotValidException {
-    return 0;
-  }
-
-  @Override
-  public List<Path> getColumnNames() {
-    return null;
-  }
-
-  @Override
-  public List<Type> getColumnTypes() {
-    return null;
-  }
-}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java
index 687c3e72c14..3d39e08c32a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java
@@ -19,13 +19,54 @@
 
 package org.apache.iotdb.db.pipe.core.event.view.collector;
 
+import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.core.event.view.access.PipeRow;
 import org.apache.iotdb.pipe.api.access.Row;
 import org.apache.iotdb.pipe.api.collector.RowCollector;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
-import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 
 public class PipeRowCollector implements RowCollector {
 
+  private Tablet tablet = null;
+
   @Override
-  public void collectRow(Row row) throws IOException {}
+  public void collectRow(Row row) {
+    if (!(row instanceof PipeRow)) {
+      throw new PipeException("Row can not be customized");
+    }
+
+    final PipeRow pipeRow = (PipeRow) row;
+    final MeasurementSchema[] measurementSchemaArray = 
pipeRow.getMeasurementSchemaList();
+
+    if (tablet == null) {
+      final String deviceId = pipeRow.getDeviceId();
+      final List<MeasurementSchema> measurementSchemaList =
+          new ArrayList<>(Arrays.asList(measurementSchemaArray));
+      tablet = new Tablet(deviceId, measurementSchemaList);
+      tablet.initBitMaps();
+    }
+
+    final int rowIndex = tablet.rowSize;
+    tablet.addTimestamp(rowIndex, row.getTime());
+    for (int i = 0; i < row.size(); i++) {
+      tablet.addValue(measurementSchemaArray[i].getMeasurementId(), rowIndex, 
row.getObject(i));
+      if (row.isNull(i)) {
+        tablet.bitMaps[i].mark(rowIndex);
+      }
+    }
+    tablet.rowSize++;
+  }
+
+  public TabletInsertionEvent toTabletInsertionEvent() {
+    PipeTabletInsertionEvent tabletInsertionEvent = new 
PipeTabletInsertionEvent(tablet);
+    this.tablet = null;
+    return tabletInsertionEvent;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TabletInsertionDataContainer.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TabletInsertionDataContainer.java
new file mode 100644
index 00000000000..f61d4f623ad
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TabletInsertionDataContainer.java
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.event.view.datastructure;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
+import org.apache.iotdb.db.pipe.core.event.view.access.PipeRow;
+import org.apache.iotdb.db.pipe.core.event.view.collector.PipeRowCollector;
+import org.apache.iotdb.pipe.api.access.Row;
+import org.apache.iotdb.pipe.api.collector.RowCollector;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.BiConsumer;
+import java.util.stream.IntStream;
+
+public class TabletInsertionDataContainer {
+
+  private String deviceId;
+  private MeasurementSchema[] measurementSchemaList;
+  private String[] columnNameStringList;
+
+  private long[] timestampColumn;
+  private Object[][] valueColumns;
+  private TSDataType[] valueColumnTypes;
+  private BitMap[] nullValueColumnBitmaps;
+  private int rowCount;
+
+  private Tablet tablet;
+
+  public TabletInsertionDataContainer(InsertNode insertNode, String pattern) {
+    try {
+      if (insertNode instanceof InsertRowNode) {
+        parse((InsertRowNode) insertNode, pattern);
+      } else if (insertNode instanceof InsertTabletNode) {
+        parse((InsertTabletNode) insertNode, pattern);
+      } else {
+        throw new UnSupportedDataTypeException(
+            String.format("InsertNode type %s is not supported.", 
insertNode.getClass().getName()));
+      }
+    } catch (IllegalPathException e) {
+      throw new PipeException(
+          String.format("Failed to parse insertNode with pattern %s.", 
pattern), e);
+    }
+  }
+
+  public TabletInsertionDataContainer(Tablet tablet, String pattern) {
+    parse(tablet, pattern);
+  }
+
+  //////////////////////////// parse ////////////////////////////
+
+  private void parse(InsertRowNode insertRowNode, String pattern) throws 
IllegalPathException {
+    final int originColumnSize = insertRowNode.getMeasurements().length;
+    final Integer[] originColumnIndex2FilteredColumnIndexMapperList = new 
Integer[originColumnSize];
+
+    this.deviceId = insertRowNode.getDevicePath().getFullPath();
+    this.timestampColumn = new long[] {insertRowNode.getTime()};
+
+    generateColumnIndexMapper(
+        insertRowNode, pattern, 
originColumnIndex2FilteredColumnIndexMapperList);
+    final int filteredColumnSize =
+        Arrays.stream(originColumnIndex2FilteredColumnIndexMapperList)
+            .filter(Objects::nonNull)
+            .toArray()
+            .length;
+
+    this.measurementSchemaList = new MeasurementSchema[filteredColumnSize];
+    this.columnNameStringList = new String[filteredColumnSize];
+    this.valueColumns = new Object[filteredColumnSize][1];
+    this.valueColumnTypes = new TSDataType[filteredColumnSize];
+    this.nullValueColumnBitmaps = new BitMap[filteredColumnSize];
+
+    final MeasurementSchema[] originMeasurementSchemaList = 
insertRowNode.getMeasurementSchemas();
+    final String[] originColumnNameStringList = 
insertRowNode.getMeasurements();
+    final Object[] originValueColumns = insertRowNode.getValues();
+    final TSDataType[] originValueColumnTypes = insertRowNode.getDataTypes();
+
+    for (int i = 0; i < 
originColumnIndex2FilteredColumnIndexMapperList.length; i++) {
+      if (originColumnIndex2FilteredColumnIndexMapperList[i] != null) {
+        final int filteredColumnIndex = 
originColumnIndex2FilteredColumnIndexMapperList[i];
+        this.measurementSchemaList[filteredColumnIndex] = 
originMeasurementSchemaList[i];
+        this.columnNameStringList[filteredColumnIndex] = 
originColumnNameStringList[i];
+        this.valueColumns[filteredColumnIndex][0] = originValueColumns[i];
+        this.valueColumnTypes[filteredColumnIndex] = originValueColumnTypes[i];
+        this.nullValueColumnBitmaps[filteredColumnIndex] = new BitMap(1);
+      }
+    }
+
+    rowCount = 1;
+  }
+
+  private void parse(InsertTabletNode insertTabletNode, String pattern)
+      throws IllegalPathException {
+    final int originColumnSize = insertTabletNode.getMeasurements().length;
+    final Integer[] originColumnIndex2FilteredColumnIndexMapperList = new 
Integer[originColumnSize];
+
+    this.deviceId = insertTabletNode.getDevicePath().getFullPath();
+    this.timestampColumn = insertTabletNode.getTimes();
+
+    generateColumnIndexMapper(
+        insertTabletNode, pattern, 
originColumnIndex2FilteredColumnIndexMapperList);
+
+    final int filteredColumnSize =
+        Arrays.stream(originColumnIndex2FilteredColumnIndexMapperList)
+            .filter(Objects::nonNull)
+            .toArray()
+            .length;
+
+    this.measurementSchemaList = new MeasurementSchema[filteredColumnSize];
+    this.columnNameStringList = new String[filteredColumnSize];
+    this.valueColumns = new Object[filteredColumnSize][];
+    this.valueColumnTypes = new TSDataType[filteredColumnSize];
+    this.nullValueColumnBitmaps = new BitMap[filteredColumnSize];
+
+    final MeasurementSchema[] originMeasurementSchemaList =
+        insertTabletNode.getMeasurementSchemas();
+    final String[] originColumnNameStringList = 
insertTabletNode.getMeasurements();
+    final Object[] originValueColumns = insertTabletNode.getColumns();
+    final TSDataType[] originValueColumnTypes = 
insertTabletNode.getDataTypes();
+    final BitMap[] originBitMapList =
+        (insertTabletNode.getBitMaps() == null
+            ? IntStream.range(0, originColumnSize)
+                .boxed()
+                .map(o -> new BitMap(timestampColumn.length))
+                .toArray(BitMap[]::new)
+            : insertTabletNode.getBitMaps());
+
+    for (int i = 0; i < 
originColumnIndex2FilteredColumnIndexMapperList.length; i++) {
+      if (originColumnIndex2FilteredColumnIndexMapperList[i] != null) {
+        final int filteredColumnIndex = 
originColumnIndex2FilteredColumnIndexMapperList[i];
+        this.measurementSchemaList[filteredColumnIndex] = 
originMeasurementSchemaList[i];
+        this.columnNameStringList[filteredColumnIndex] = 
originColumnNameStringList[i];
+        this.valueColumns[filteredColumnIndex] =
+            convertToColumn(originValueColumns[i], originValueColumnTypes[i], 
originBitMapList[i]);
+        this.valueColumnTypes[filteredColumnIndex] = originValueColumnTypes[i];
+        this.nullValueColumnBitmaps[filteredColumnIndex] = originBitMapList[i];
+      }
+    }
+
+    rowCount = timestampColumn.length;
+  }
+
+  private void parse(Tablet tablet, String pattern) {
+    final int originColumnSize = tablet.getSchemas().size();
+    final Integer[] originColumnIndex2FilteredColumnIndexMapperList = new 
Integer[originColumnSize];
+
+    this.deviceId = tablet.deviceId;
+    this.timestampColumn = tablet.timestamps;
+
+    generateColumnIndexMapper(tablet, pattern, 
originColumnIndex2FilteredColumnIndexMapperList);
+
+    final int filteredColumnSize =
+        Arrays.stream(originColumnIndex2FilteredColumnIndexMapperList)
+            .filter(Objects::nonNull)
+            .toArray()
+            .length;
+
+    this.measurementSchemaList = new MeasurementSchema[filteredColumnSize];
+    this.columnNameStringList = new String[filteredColumnSize];
+    this.valueColumns = new Object[filteredColumnSize][];
+    this.valueColumnTypes = new TSDataType[filteredColumnSize];
+    this.nullValueColumnBitmaps = new BitMap[filteredColumnSize];
+
+    final List<MeasurementSchema> originMeasurementSchemaList = 
tablet.getSchemas();
+    final String[] originColumnNameStringList = new String[originColumnSize];
+    final TSDataType[] originValueColumnTypes = new 
TSDataType[originColumnSize];
+    for (int i = 0; i < originColumnSize; i++) {
+      originColumnNameStringList[i] = 
originMeasurementSchemaList.get(i).getMeasurementId();
+      originValueColumnTypes[i] = originMeasurementSchemaList.get(i).getType();
+    }
+    final Object[] originValueColumns = tablet.values;
+    final BitMap[] originBitMapList = tablet.bitMaps;
+
+    for (int i = 0; i < 
originColumnIndex2FilteredColumnIndexMapperList.length; i++) {
+      if (originColumnIndex2FilteredColumnIndexMapperList[i] != null) {
+        final int filteredColumnIndex = 
originColumnIndex2FilteredColumnIndexMapperList[i];
+        this.measurementSchemaList[filteredColumnIndex] = 
originMeasurementSchemaList.get(i);
+        this.columnNameStringList[filteredColumnIndex] = 
originColumnNameStringList[i];
+        this.valueColumns[filteredColumnIndex] =
+            convertToColumn(originValueColumns[i], originValueColumnTypes[i], 
originBitMapList[i]);
+        this.valueColumnTypes[filteredColumnIndex] = originValueColumnTypes[i];
+        this.nullValueColumnBitmaps[filteredColumnIndex] = originBitMapList[i];
+      }
+    }
+
+    rowCount = tablet.rowSize;
+  }
+
+  private void generateColumnIndexMapper(
+      String[] originMeasurementList,
+      String pattern,
+      Integer[] originColumnIndex2FilteredColumnIndexMapperList) {
+    final int originColumnSize = originMeasurementList.length;
+
+    // case 1: for example, pattern is root.a.b or pattern is null and device 
is root.a.b.c
+    // in this case, all data can be matched without checking the measurements
+    if (pattern == null || pattern.length() <= deviceId.length() && 
deviceId.startsWith(pattern)) {
+      for (int i = 0; i < originColumnSize; i++) {
+        originColumnIndex2FilteredColumnIndexMapperList[i] = i;
+      }
+    }
+
+    // case 2: for example, pattern is root.a.b.c and device is root.a.b
+    // in this case, we need to check the full path
+    else if (pattern.length() > deviceId.length() && 
pattern.startsWith(deviceId)) {
+      int filteredCount = 0;
+
+      for (int i = 0; i < originColumnSize; i++) {
+        final String measurement = originMeasurementList[i];
+
+        // low cost check comes first
+        if (pattern.length() == deviceId.length() + measurement.length() + 1
+            // high cost check comes later
+            && pattern.startsWith(deviceId)
+            && pattern.endsWith(TsFileConstant.PATH_SEPARATOR + measurement)) {
+          originColumnIndex2FilteredColumnIndexMapperList[i] = filteredCount++;
+        }
+      }
+    }
+  }
+
+  private void generateColumnIndexMapper(
+      InsertNode insertNode,
+      String pattern,
+      Integer[] originColumnIndex2FilteredColumnIndexMapperList) {
+    generateColumnIndexMapper(
+        insertNode.getMeasurements(), pattern, 
originColumnIndex2FilteredColumnIndexMapperList);
+  }
+
+  private void generateColumnIndexMapper(
+      Tablet tablet, String pattern, Integer[] 
originColumnIndex2FilteredColumnIndexMapperList) {
+    final List<MeasurementSchema> originMeasurementSchemaList = 
tablet.getSchemas();
+    final String[] originMeasurementList = new 
String[originMeasurementSchemaList.size()];
+    for (int i = 0; i < originMeasurementSchemaList.size(); i++) {
+      originMeasurementList[i] = 
originMeasurementSchemaList.get(i).getMeasurementId();
+    }
+    generateColumnIndexMapper(
+        originMeasurementList, pattern, 
originColumnIndex2FilteredColumnIndexMapperList);
+  }
+
+  private Object[] convertToColumn(Object originColumn, TSDataType dataType, 
BitMap bitMap) {
+    switch (dataType) {
+      case INT32:
+        final int[] intValues = (int[]) originColumn;
+        final Integer[] integerValues = new Integer[intValues.length];
+        for (int i = 0; i < intValues.length; i++) {
+          integerValues[i] = bitMap != null && bitMap.isMarked(i) ? null : 
intValues[i];
+        }
+        return integerValues;
+      case INT64:
+        final long[] longValues = (long[]) originColumn;
+        final Long[] longValues2 = new Long[longValues.length];
+        for (int i = 0; i < longValues.length; i++) {
+          longValues2[i] = bitMap != null && bitMap.isMarked(i) ? null : 
longValues[i];
+        }
+        return longValues2;
+      case FLOAT:
+        final float[] floatValues = (float[]) originColumn;
+        final Float[] floatValues2 = new Float[floatValues.length];
+        for (int i = 0; i < floatValues.length; i++) {
+          floatValues2[i] = bitMap != null && bitMap.isMarked(i) ? null : 
floatValues[i];
+        }
+        return floatValues2;
+      case DOUBLE:
+        final double[] doubleValues = (double[]) originColumn;
+        final Double[] doubleValues2 = new Double[doubleValues.length];
+        for (int i = 0; i < doubleValues.length; i++) {
+          doubleValues2[i] = bitMap != null && bitMap.isMarked(i) ? null : 
doubleValues[i];
+        }
+        return doubleValues2;
+      case BOOLEAN:
+        final boolean[] booleanValues = (boolean[]) originColumn;
+        final Boolean[] booleanValues2 = new Boolean[booleanValues.length];
+        for (int i = 0; i < booleanValues.length; i++) {
+          booleanValues2[i] = bitMap != null && bitMap.isMarked(i) ? null : 
booleanValues[i];
+        }
+        return booleanValues2;
+      case TEXT:
+        final Binary[] binaryValues = (Binary[]) originColumn;
+        final String[] stringValues = new String[binaryValues.length];
+        for (int i = 0; i < binaryValues.length; i++) {
+          stringValues[i] =
+              bitMap != null && bitMap.isMarked(i)
+                  ? null
+                  : (binaryValues[i] == null ? null : 
binaryValues[i].getStringValue());
+        }
+        return stringValues;
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Data type %s is not supported.", dataType));
+    }
+  }
+
+  ////////////////////////////  process  ////////////////////////////
+
+  public TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector> 
consumer) {
+    final PipeRowCollector rowCollector = new PipeRowCollector();
+    for (int i = 0; i < timestampColumn.length; i++) {
+      consumer.accept(
+          new PipeRow(
+              i,
+              deviceId,
+              measurementSchemaList,
+              timestampColumn,
+              valueColumns,
+              valueColumnTypes,
+              columnNameStringList),
+          rowCollector);
+    }
+    return rowCollector.toTabletInsertionEvent();
+  }
+
+  public TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector> 
consumer) {
+    final PipeRowCollector rowCollector = new PipeRowCollector();
+    consumer.accept(convertToTablet(), rowCollector);
+    return rowCollector.toTabletInsertionEvent();
+  }
+
+  ////////////////////////////  convert  ////////////////////////////
+
+  public Tablet convertToTablet() {
+    if (tablet != null) {
+      return tablet;
+    }
+
+    final int columnSize = measurementSchemaList.length;
+    final int rowSize = valueColumns[0].length;
+
+    final List<MeasurementSchema> measurementSchemaArrayList =
+        new ArrayList<>(Arrays.asList(measurementSchemaList).subList(0, 
columnSize));
+
+    final Tablet newTablet = new Tablet(deviceId, measurementSchemaArrayList, 
rowSize);
+    newTablet.timestamps = timestampColumn;
+    newTablet.bitMaps = nullValueColumnBitmaps;
+    newTablet.values = squashFromColumnList(valueColumns, valueColumnTypes);
+    newTablet.rowSize = rowCount;
+
+    tablet = newTablet;
+    return tablet;
+  }
+
+  private Object[] squashFromColumnList(Object[][] valueColumns, TSDataType[] 
valueColumnTypes) {
+    final Object[] values = new Object[valueColumns.length];
+    for (int i = 0; i < valueColumns.length; i++) {
+      values[i] = squashFromColumn(valueColumns[i], valueColumnTypes[i]);
+    }
+    return values;
+  }
+
+  private Object squashFromColumn(Object[] valueColumn, TSDataType 
valueColumnType) {
+    switch (valueColumnType) {
+      case INT32:
+        final Integer[] intValues = (Integer[]) valueColumn;
+        final int[] intValues2 = new int[intValues.length];
+        for (int i = 0; i < intValues.length; i++) {
+          intValues2[i] = intValues[i] == null ? 0 : intValues[i];
+        }
+        return intValues2;
+      case INT64:
+        final Long[] longValues = (Long[]) valueColumn;
+        final long[] longValues2 = new long[longValues.length];
+        for (int i = 0; i < longValues.length; i++) {
+          longValues2[i] = longValues[i] == null ? 0 : longValues[i];
+        }
+        return longValues2;
+      case FLOAT:
+        final Float[] floatValues = (Float[]) valueColumn;
+        final float[] floatValues2 = new float[floatValues.length];
+        for (int i = 0; i < floatValues.length; i++) {
+          floatValues2[i] = floatValues[i] == null ? 0 : floatValues[i];
+        }
+        return floatValues2;
+      case DOUBLE:
+        final Double[] doubleValues = (Double[]) valueColumn;
+        final double[] doubleValues2 = new double[doubleValues.length];
+        for (int i = 0; i < doubleValues.length; i++) {
+          doubleValues2[i] = doubleValues[i] == null ? 0 : doubleValues[i];
+        }
+        return doubleValues2;
+      case BOOLEAN:
+        final Boolean[] booleanValues = (Boolean[]) valueColumn;
+        final boolean[] booleanValues2 = new boolean[booleanValues.length];
+        for (int i = 0; i < booleanValues.length; i++) {
+          booleanValues2[i] = booleanValues[i] != null && booleanValues[i];
+        }
+        return booleanValues2;
+      case TEXT:
+        final String[] stringValues = (String[]) valueColumn;
+        final Binary[] binaryValues = new Binary[stringValues.length];
+        for (int i = 0; i < stringValues.length; i++) {
+          binaryValues[i] = stringValues[i] == null ? null : 
Binary.valueOf(stringValues[i]);
+        }
+        return binaryValues;
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Data type %s is not supported.", valueColumnType));
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataContainer.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataContainer.java
new file mode 100644
index 00000000000..260814d609f
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataContainer.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.event.view.datastructure;
+
+import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class TsFileInsertionDataContainer {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TsFileInsertionDataContainer.class);
+
+  private final File tsFile;
+  private final String pattern;
+
+  private TimeseriesMetadata vectorTimeseriesMetadata;
+
+  private final Map<String, List<TimeseriesMetadata>> 
device2TimeseriesMetadataMap;
+
+  public TsFileInsertionDataContainer(File tsFile, String pattern) {
+    this.tsFile = tsFile;
+    this.pattern = pattern;
+
+    this.device2TimeseriesMetadataMap = collectDevice2TimeseriesMetadataMap();
+  }
+
+  private Map<String, List<TimeseriesMetadata>> 
collectDevice2TimeseriesMetadataMap() {
+    final Map<String, List<TimeseriesMetadata>> result = new HashMap<>();
+
+    try (TsFileSequenceReader reader = new 
TsFileSequenceReader(tsFile.getPath())) {
+      // match pattern
+      for (Map.Entry<String, List<TimeseriesMetadata>> entry :
+          reader.getAllTimeseriesMetadata(true).entrySet()) {
+        final String device = entry.getKey();
+        boolean isVector = false;
+
+        // case 1: for example, pattern is root.a.b or pattern is null and 
device is root.a.b.c
+        // in this case, all data can be matched without checking the 
measurements
+        if (pattern == null || pattern.length() <= device.length() && 
device.startsWith(pattern)) {
+          result.put(device, entry.getValue());
+        }
+
+        // case 2: for example, pattern is root.a.b.c and device is root.a.b
+        // in this case, we need to check the full path
+        else {
+          final List<TimeseriesMetadata> timeseriesMetadataList = new 
ArrayList<>();
+
+          for (TimeseriesMetadata timeseriesMetadata : entry.getValue()) {
+            // TODO: test me!!!
+            if (timeseriesMetadata.getTSDataType() == TSDataType.VECTOR) {
+              vectorTimeseriesMetadata = timeseriesMetadata;
+              isVector = false;
+              continue;
+            }
+
+            final String measurement = timeseriesMetadata.getMeasurementId();
+            // low cost check comes first
+            if (pattern.length() == measurement.length() + device.length() + 1
+                // high cost check comes later
+                && pattern.startsWith(device)
+                && pattern.endsWith(TsFileConstant.PATH_SEPARATOR + 
measurement)) {
+              if (!isVector) {
+                isVector = true;
+                timeseriesMetadataList.add(vectorTimeseriesMetadata);
+              }
+              timeseriesMetadataList.add(timeseriesMetadata);
+            }
+          }
+
+          if (!timeseriesMetadataList.isEmpty()) {
+            result.put(device, timeseriesMetadataList);
+          }
+        }
+      }
+    } catch (IOException e) {
+      LOGGER.error("Cannot read TsFile {}.", tsFile.getPath(), e);
+    }
+
+    return result;
+  }
+
+  public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
+    return () ->
+        new Iterator<TabletInsertionEvent>() {
+
+          private final Iterator<Tablet> tabletIterator = 
constructTabletIterable().iterator();
+
+          @Override
+          public boolean hasNext() {
+            return tabletIterator.hasNext();
+          }
+
+          @Override
+          public TabletInsertionEvent next() {
+            return new PipeTabletInsertionEvent(tabletIterator.next());
+          }
+        };
+  }
+
+  private Iterable<Tablet> constructTabletIterable() {
+    return () ->
+        new TsFileInsertionDataTabletIterator(tsFile.getPath(), 
device2TimeseriesMetadataMap);
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataTabletIterator.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataTabletIterator.java
new file mode 100644
index 00000000000..7100e68b753
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataTabletIterator.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.event.view.datastructure;
+
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.reader.page.PageReader;
+import org.apache.iotdb.tsfile.read.reader.page.TimePageReader;
+import org.apache.iotdb.tsfile.read.reader.page.ValuePageReader;
+import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+public class TsFileInsertionDataTabletIterator implements Iterator<Tablet> {
+
+  private static Logger LOGGER = 
LoggerFactory.getLogger(TsFileInsertionDataTabletIterator.class);
+  private final TsFileSequenceReader reader;
+  private final String filePath;
+  private final Iterator<Map.Entry<String, List<TimeseriesMetadata>>> 
entriesIterator;
+  private Map.Entry<String, List<TimeseriesMetadata>> currentEntry;
+  private Iterator<TimeseriesMetadata> timeseriesMetadataIterator;
+  private TimeseriesMetadata currentTimeseriesMetadata;
+  private List<MeasurementSchema> measurementSchemas;
+
+  private boolean isAligned;
+  private final List<long[]> timeBatches;
+  private long[] timestampsForAligned;
+
+  public TsFileInsertionDataTabletIterator(
+      String filePath, Map<String, List<TimeseriesMetadata>> 
device2TimeseriesMetadataMap) {
+    this.filePath = filePath;
+    this.entriesIterator = device2TimeseriesMetadataMap.entrySet().iterator();
+    this.timeBatches = new ArrayList<>();
+    this.currentEntry = null;
+    this.timeseriesMetadataIterator = null;
+    this.currentTimeseriesMetadata = null;
+    this.measurementSchemas = null;
+    this.isAligned = false;
+    this.timestampsForAligned = null;
+    try {
+      this.reader = new TsFileSequenceReader(filePath);
+    } catch (IOException e) {
+      throw new PipeException("Cannot create TsFileSequenceReader for file " + 
filePath, e);
+    }
+
+    // Initialize timeseriesMetadataIterator if there is a next entry
+    if (entriesIterator.hasNext()) {
+      currentEntry = entriesIterator.next();
+      timeseriesMetadataIterator = currentEntry.getValue().iterator();
+    } else {
+      timeseriesMetadataIterator =
+          new Iterator<TimeseriesMetadata>() {
+            @Override
+            public boolean hasNext() {
+              return false;
+            }
+
+            @Override
+            public TimeseriesMetadata next() {
+              return null;
+            }
+          };
+    }
+  }
+
+  @Override
+  public boolean hasNext() {
+    boolean hasNext = timeseriesMetadataIterator.hasNext() || 
entriesIterator.hasNext();
+    if (!hasNext) {
+      try {
+        reader.close();
+      } catch (IOException e) {
+        LOGGER.warn("Cannot close TsFileSequenceReader for file {}", filePath, 
e);
+      }
+    }
+    return hasNext;
+  }
+
+  @Override
+  public Tablet next() {
+    if (!hasNext()) {
+      throw new NoSuchElementException();
+    }
+
+    if (!timeseriesMetadataIterator.hasNext()) {
+      currentEntry = entriesIterator.next();
+      timeseriesMetadataIterator = currentEntry.getValue().iterator();
+    }
+    currentTimeseriesMetadata = timeseriesMetadataIterator.next();
+    measurementSchemas = new ArrayList<>();
+
+    try (TsFileSequenceReader reader = new TsFileSequenceReader(filePath)) {
+      if (currentTimeseriesMetadata.getTSDataType() == TSDataType.VECTOR) {
+        processTimeseriesMetadata(currentTimeseriesMetadata, reader);
+        currentTimeseriesMetadata = timeseriesMetadataIterator.next();
+      }
+      return processTimeseriesMetadata(currentTimeseriesMetadata, reader);
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  private Tablet createTablet(long[] timestamps, Object[] values, BitMap[] 
bitMaps) {
+    long[] tmp;
+
+    if (isAligned) {
+      if (timestampsForAligned == null) {
+        timestampsForAligned = timestamps;
+        return null;
+      }
+      tmp = timestampsForAligned;
+    } else {
+      tmp = timestamps;
+    }
+
+    // create tablet
+    int rowSize = tmp.length;
+    Tablet tablet = new Tablet(currentEntry.getKey(), measurementSchemas, 
rowSize);
+    tablet.timestamps = tmp;
+    tablet.values = values;
+    tablet.rowSize = rowSize;
+    tablet.bitMaps = bitMaps;
+
+    return tablet;
+  }
+
+  private Tablet processTimeseriesMetadata(
+      TimeseriesMetadata timeseriesMetadata, TsFileSequenceReader reader) {
+    int pageIndex = 0;
+    if (timeseriesMetadata.getTSDataType() == TSDataType.VECTOR) {
+      isAligned = true;
+      timeBatches.clear();
+    } else {
+      MeasurementSchema measurementSchema =
+          new MeasurementSchema(
+              timeseriesMetadata.getMeasurementId(), 
timeseriesMetadata.getTSDataType());
+      measurementSchemas.add(measurementSchema);
+    }
+
+    List<Byte> bitMapBytes = new ArrayList<>();
+    List<Object> measurementValues = new ArrayList<>();
+    List<Long> measurementTimestamps = new ArrayList<>();
+
+    for (IChunkMetadata chunkMetadata : 
timeseriesMetadata.getChunkMetadataList()) {
+      long offset = chunkMetadata.getOffsetOfChunkHeader();
+      try {
+        reader.position(offset);
+        ChunkHeader header = reader.readChunkHeader(reader.readMarker());
+        int dataSize = header.getDataSize();
+
+        Decoder defaultTimeDecoder =
+            Decoder.getDecoderByType(
+                
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
+                TSDataType.INT64);
+        Decoder valueDecoder =
+            Decoder.getDecoderByType(header.getEncodingType(), 
header.getDataType());
+        pageIndex = 0;
+        if (header.getDataType() == TSDataType.VECTOR) {
+          timeBatches.clear();
+        }
+
+        while (dataSize > 0) {
+          PageHeader pageHeader =
+              reader.readPageHeader(
+                  header.getDataType(), (header.getChunkType() & 0x3F) == 
MetaMarker.CHUNK_HEADER);
+          ByteBuffer pageData = reader.readPage(pageHeader, 
header.getCompressionType());
+
+          // Time column chunk
+          if ((header.getChunkType() & TsFileConstant.TIME_COLUMN_MASK)
+              == TsFileConstant.TIME_COLUMN_MASK) {
+            TimePageReader timePageReader =
+                new TimePageReader(pageHeader, pageData, defaultTimeDecoder);
+            long[] timeBatch = timePageReader.getNextTimeBatch();
+            timeBatches.add(timeBatch);
+
+            for (long time : timeBatch) {
+              measurementTimestamps.add(time);
+            }
+          }
+          // Value column chunk
+          else if ((header.getChunkType() & TsFileConstant.VALUE_COLUMN_MASK)
+              == TsFileConstant.VALUE_COLUMN_MASK) {
+            ValuePageReader valuePageReader =
+                new ValuePageReader(pageHeader, pageData, 
header.getDataType(), valueDecoder);
+
+            for (byte value : valuePageReader.getBitmap()) {
+              bitMapBytes.add(value);
+            }
+
+            for (TsPrimitiveType value :
+                valuePageReader.nextValueBatch(timeBatches.get(pageIndex))) {
+              measurementValues.add(value.getValue());
+            }
+          }
+
+          // NonAligned Chunk
+          else {
+            PageReader pageReader =
+                new PageReader(
+                    pageData, header.getDataType(), valueDecoder, 
defaultTimeDecoder, null);
+            BatchData batchData = pageReader.getAllSatisfiedPageData();
+            List<Integer> isNullList = new ArrayList<>();
+            int index = 0;
+            while (batchData.hasCurrent()) {
+              measurementTimestamps.add(batchData.currentTime());
+              Object value = batchData.currentValue();
+
+              if (value == null) {
+                isNullList.add(index);
+              }
+              measurementValues.add(value);
+              index++;
+              batchData.next();
+            }
+
+            BitMap bitmap = new BitMap(measurementTimestamps.size());
+            for (int isNull : isNullList) {
+              bitmap.mark(isNull);
+            }
+            byte[] bytes = bitmap.getByteArray();
+            for (byte value : bytes) {
+              bitMapBytes.add(value);
+            }
+          }
+          pageIndex++;
+          dataSize -= pageHeader.getSerializedPageSize();
+        }
+      } catch (IOException e) {
+        throw new UncheckedIOException(e);
+      }
+    }
+
+    long[] timestamps = new long[measurementTimestamps.size()];
+    for (int i = 0; i < measurementTimestamps.size(); i++) {
+      timestamps[i] = measurementTimestamps.get(i);
+    }
+
+    byte[] byteArray = new byte[bitMapBytes.size()];
+    for (int i = 0; i < bitMapBytes.size(); i++) {
+      byteArray[i] = bitMapBytes.get(i);
+    }
+    BitMap[] bitMaps = new BitMap[] {new BitMap(byteArray.length, byteArray)};
+
+    return createTablet(timestamps, measurementValues.toArray(), bitMaps);
+  }
+}
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeDoNothingProcessor.java
similarity index 55%
copy from 
node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
copy to 
server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeDoNothingProcessor.java
index bc56a8bb3cf..62979b7d52c 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/processor/DoNothingProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeDoNothingProcessor.java
@@ -17,8 +17,10 @@
  * under the License.
  */
 
-package org.apache.iotdb.commons.pipe.plugin.builtin.processor;
+package org.apache.iotdb.db.pipe.core.processor;
 
+import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
+import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
 import org.apache.iotdb.pipe.api.PipeProcessor;
 import org.apache.iotdb.pipe.api.collector.EventCollector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
@@ -27,10 +29,11 @@ import 
org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfig
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeException;
 
 import java.io.IOException;
 
-public class DoNothingProcessor implements PipeProcessor {
+public class PipeDoNothingProcessor implements PipeProcessor {
 
   @Override
   public void validate(PipeParameterValidator validator) {
@@ -46,13 +49,46 @@ public class DoNothingProcessor implements PipeProcessor {
   @Override
   public void process(TabletInsertionEvent tabletInsertionEvent, 
EventCollector eventCollector)
       throws IOException {
-    eventCollector.collect(tabletInsertionEvent);
+    if (tabletInsertionEvent instanceof EnrichedEvent) {
+      final EnrichedEvent enrichedEvent = (EnrichedEvent) tabletInsertionEvent;
+      if (enrichedEvent
+          .getPattern()
+          .equals(PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE)) {
+        eventCollector.collect(tabletInsertionEvent);
+      } else {
+        eventCollector.collect(
+            tabletInsertionEvent.processRowByRow(
+                (row, rowCollector) -> {
+                  try {
+                    rowCollector.collectRow(row);
+                  } catch (IOException e) {
+                    throw new PipeException("Failed to collect row", e);
+                  }
+                }));
+      }
+    } else {
+      eventCollector.collect(tabletInsertionEvent);
+    }
   }
 
   @Override
   public void process(TsFileInsertionEvent tsFileInsertionEvent, 
EventCollector eventCollector)
       throws IOException {
-    eventCollector.collect(tsFileInsertionEvent);
+    if (tsFileInsertionEvent instanceof EnrichedEvent) {
+      final EnrichedEvent enrichedEvent = (EnrichedEvent) tsFileInsertionEvent;
+      if (enrichedEvent
+          .getPattern()
+          .equals(PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE)) {
+        eventCollector.collect(tsFileInsertionEvent);
+      } else {
+        for (final TabletInsertionEvent tabletInsertionEvent :
+            tsFileInsertionEvent.toTabletInsertionEvents()) {
+          eventCollector.collect(tabletInsertionEvent);
+        }
+      }
+    } else {
+      eventCollector.collect(tsFileInsertionEvent);
+    }
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index 7af0df4ecff..b21a3a85c34 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -20,8 +20,11 @@
 package org.apache.iotdb.db.pipe.task.stage;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.pipe.config.PipeProcessorConstant;
 import org.apache.iotdb.db.pipe.core.event.view.collector.PipeEventCollector;
+import org.apache.iotdb.db.pipe.core.processor.PipeDoNothingProcessor;
 import 
org.apache.iotdb.db.pipe.execution.executor.PipeProcessorSubtaskExecutor;
 import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager;
 import org.apache.iotdb.db.pipe.task.queue.BlockingPendingQueue;
@@ -68,7 +71,14 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
     this.pipeProcessorParameters = pipeProcessorParameters;
 
     final String taskId = pipeName + "_" + dataRegionId;
-    pipeProcessor = 
PipeAgent.plugin().reflectProcessor(pipeProcessorParameters);
+    pipeProcessor =
+        pipeProcessorParameters
+                .getStringOrDefault(
+                    PipeProcessorConstant.PROCESSOR_KEY,
+                    BuiltinPipePlugin.DO_NOTHING_PROCESSOR.getPipePluginName())
+                
.equals(BuiltinPipePlugin.DO_NOTHING_PROCESSOR.getPipePluginName())
+            ? new PipeDoNothingProcessor()
+            : PipeAgent.plugin().reflectProcessor(pipeProcessorParameters);
     final PipeEventCollector pipeConnectorOutputEventCollector =
         new PipeEventCollector(pipeConnectorOutputPendingQueue);
 
diff --git 
a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
 
b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
index 6deaed9343d..4bc74e6d929 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
@@ -131,12 +131,12 @@ public class CachedSchemaPatternMatcherTest {
       for (int j = 0; j < deviceNum; j++) {
         PipeRealtimeCollectEvent event =
             new PipeRealtimeCollectEvent(
-                null, null, Collections.singletonMap("root." + i, 
measurements));
+                null, null, Collections.singletonMap("root." + i, 
measurements), "root");
         long startTime = System.currentTimeMillis();
         matcher.match(event).forEach(collector -> collector.collect(event));
         totalTime += (System.currentTimeMillis() - startTime);
       }
-      PipeRealtimeCollectEvent event = new PipeRealtimeCollectEvent(null, 
null, deviceMap);
+      PipeRealtimeCollectEvent event = new PipeRealtimeCollectEvent(null, 
null, deviceMap, "root");
       long startTime = System.currentTimeMillis();
       matcher.match(event).forEach(collector -> collector.collect(event));
       totalTime += (System.currentTimeMillis() - startTime);
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/Tablet.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/Tablet.java
index 24be2f73c6f..ce70aecb1da 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/Tablet.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/Tablet.java
@@ -378,6 +378,7 @@ public class Tablet {
   }
 
   private void writeTimes(DataOutputStream stream) throws IOException {
+    ReadWriteIOUtils.write(timestamps.length, stream);
     for (long time : timestamps) {
       ReadWriteIOUtils.write(time, stream);
     }
@@ -385,14 +386,15 @@ public class Tablet {
 
   /** Serialize bitmaps */
   private void writeBitMaps(DataOutputStream stream) throws IOException {
-    ReadWriteIOUtils.write(BytesUtils.boolToByte(bitMaps != null), stream);
+    ReadWriteIOUtils.write(bitMaps != null ? 1 : 0, stream);
     if (bitMaps != null) {
       for (BitMap bitMap : bitMaps) {
         if (bitMap == null) {
-          ReadWriteIOUtils.write(BytesUtils.boolToByte(false), stream);
+          ReadWriteIOUtils.write(0, stream);
         } else {
-          ReadWriteIOUtils.write(BytesUtils.boolToByte(true), stream);
-          stream.write(bitMap.getByteArray());
+          ReadWriteIOUtils.write(1, stream);
+          ReadWriteIOUtils.write(bitMap.getSize(), stream);
+          ReadWriteIOUtils.write(new Binary(bitMap.getByteArray()), stream);
         }
       }
     }
@@ -410,38 +412,48 @@ public class Tablet {
     switch (dataType) {
       case INT32:
         int[] intValues = (int[]) column;
-        for (int j = 0; j < rowSize; j++) {
+        ReadWriteIOUtils.write(intValues.length, stream);
+        for (int j = 0; j < intValues.length; j++) {
           ReadWriteIOUtils.write(intValues[j], stream);
         }
         break;
       case INT64:
         long[] longValues = (long[]) column;
-        for (int j = 0; j < rowSize; j++) {
+        ReadWriteIOUtils.write(longValues.length, stream);
+        for (int j = 0; j < longValues.length; j++) {
           ReadWriteIOUtils.write(longValues[j], stream);
         }
         break;
       case FLOAT:
         float[] floatValues = (float[]) column;
-        for (int j = 0; j < rowSize; j++) {
+        ReadWriteIOUtils.write(floatValues.length, stream);
+        for (int j = 0; j < floatValues.length; j++) {
           ReadWriteIOUtils.write(floatValues[j], stream);
         }
         break;
       case DOUBLE:
         double[] doubleValues = (double[]) column;
-        for (int j = 0; j < rowSize; j++) {
+        ReadWriteIOUtils.write(doubleValues.length, stream);
+        for (int j = 0; j < doubleValues.length; j++) {
           ReadWriteIOUtils.write(doubleValues[j], stream);
         }
         break;
       case BOOLEAN:
         boolean[] boolValues = (boolean[]) column;
-        for (int j = 0; j < rowSize; j++) {
-          ReadWriteIOUtils.write(BytesUtils.boolToByte(boolValues[j]), stream);
+        ReadWriteIOUtils.write(boolValues.length, stream);
+        for (int j = 0; j < boolValues.length; j++) {
+          ReadWriteIOUtils.write(boolValues[j] ? 1 : 0, stream);
         }
         break;
       case TEXT:
         Binary[] binaryValues = (Binary[]) column;
-        for (int j = 0; j < rowSize; j++) {
-          ReadWriteIOUtils.write(binaryValues[j], stream);
+        ReadWriteIOUtils.write(binaryValues.length, stream);
+        for (int j = 0; j < binaryValues.length; j++) {
+          boolean isNull = (binaryValues[j] == null);
+          ReadWriteIOUtils.write(isNull ? 1 : 0, stream);
+          if (!isNull) {
+            ReadWriteIOUtils.write(binaryValues[j], stream);
+          }
         }
         break;
       default:
@@ -466,22 +478,23 @@ public class Tablet {
     }
 
     // deserialize times
-    long[] times = new long[rowSize];
-    for (int i = 0; i < rowSize; i++) {
+    int timesSize = ReadWriteIOUtils.readInt(byteBuffer);
+    long[] times = new long[timesSize];
+    for (int i = 0; i < timesSize; i++) {
       times[i] = ReadWriteIOUtils.readLong(byteBuffer);
     }
 
     // deserialize bitmaps
-    boolean hasBitMaps = BytesUtils.byteToBool(byteBuffer.get());
+    boolean hasBitMaps = (ReadWriteIOUtils.readInt(byteBuffer) == 1);
     BitMap[] bitMaps = null;
     if (hasBitMaps) {
-      bitMaps = readBitMapsFromBuffer(byteBuffer, schemaSize, rowSize);
+      bitMaps = readBitMapsFromBuffer(byteBuffer, schemaSize);
     }
 
     // deserialize values
     TSDataType[] dataTypes =
         
schemas.stream().map(MeasurementSchema::getType).toArray(TSDataType[]::new);
-    Object[] values = readTabletValuesFromBuffer(byteBuffer, dataTypes, 
schemaSize, rowSize);
+    Object[] values = readTabletValuesFromBuffer(byteBuffer, dataTypes, 
schemaSize);
 
     Tablet tablet = new Tablet(deviceId, schemas, times, values, bitMaps, 
rowSize);
     tablet.constructMeasurementIndexMap();
@@ -489,19 +502,14 @@ public class Tablet {
   }
 
   /** deserialize bitmaps */
-  public static BitMap[] readBitMapsFromBuffer(ByteBuffer buffer, int columns, 
int size) {
-    if (!buffer.hasRemaining()) {
-      return null;
-    }
+  public static BitMap[] readBitMapsFromBuffer(ByteBuffer buffer, int columns) 
{
     BitMap[] bitMaps = new BitMap[columns];
     for (int i = 0; i < columns; i++) {
-      boolean hasBitMap = BytesUtils.byteToBool(buffer.get());
+      boolean hasBitMap = (ReadWriteIOUtils.readInt(buffer) == 1);
       if (hasBitMap) {
-        byte[] bytes = new byte[size / Byte.SIZE + 1];
-        for (int j = 0; j < bytes.length; j++) {
-          bytes[j] = buffer.get();
-        }
-        bitMaps[i] = new BitMap(size, bytes);
+        int bitMapSize = ReadWriteIOUtils.readInt(buffer);
+        byte[] bytes = ReadWriteIOUtils.readBinary(buffer).getValues();
+        bitMaps[i] = new BitMap(bitMapSize, bytes);
       }
     }
     return bitMaps;
@@ -510,56 +518,56 @@ public class Tablet {
   /**
    * @param buffer data values
    * @param columns column number
-   * @param size value count in each column
    */
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity 
warning
   public static Object[] readTabletValuesFromBuffer(
-      ByteBuffer buffer, TSDataType[] types, int columns, int size) {
+      ByteBuffer buffer, TSDataType[] types, int columns) {
     Object[] values = new Object[columns];
     for (int i = 0; i < columns; i++) {
+      int arraySize = ReadWriteIOUtils.readInt(buffer);
       switch (types[i]) {
         case BOOLEAN:
-          boolean[] boolValues = new boolean[size];
-          for (int index = 0; index < size; index++) {
-            boolValues[index] = BytesUtils.byteToBool(buffer.get());
+          boolean[] boolValues = new boolean[arraySize];
+          for (int index = 0; index < arraySize; index++) {
+            boolValues[index] = ReadWriteIOUtils.readInt(buffer) == 1;
           }
           values[i] = boolValues;
           break;
         case INT32:
-          int[] intValues = new int[size];
-          for (int index = 0; index < size; index++) {
-            intValues[index] = buffer.getInt();
+          int[] intValues = new int[arraySize];
+          for (int index = 0; index < arraySize; index++) {
+            intValues[index] = ReadWriteIOUtils.readInt(buffer);
           }
           values[i] = intValues;
           break;
         case INT64:
-          long[] longValues = new long[size];
-          for (int index = 0; index < size; index++) {
-            longValues[index] = buffer.getLong();
+          long[] longValues = new long[arraySize];
+          for (int index = 0; index < arraySize; index++) {
+            longValues[index] = ReadWriteIOUtils.readLong(buffer);
           }
           values[i] = longValues;
           break;
         case FLOAT:
-          float[] floatValues = new float[size];
-          for (int index = 0; index < size; index++) {
-            floatValues[index] = buffer.getFloat();
+          float[] floatValues = new float[arraySize];
+          for (int index = 0; index < arraySize; index++) {
+            floatValues[index] = ReadWriteIOUtils.readFloat(buffer);
           }
           values[i] = floatValues;
           break;
         case DOUBLE:
-          double[] doubleValues = new double[size];
-          for (int index = 0; index < size; index++) {
-            doubleValues[index] = buffer.getDouble();
+          double[] doubleValues = new double[arraySize];
+          for (int index = 0; index < arraySize; index++) {
+            doubleValues[index] = ReadWriteIOUtils.readDouble(buffer);
           }
           values[i] = doubleValues;
           break;
         case TEXT:
-          Binary[] binaryValues = new Binary[size];
-          for (int index = 0; index < size; index++) {
-            int binarySize = buffer.getInt();
-            byte[] binaryValue = new byte[binarySize];
-            buffer.get(binaryValue);
-            binaryValues[index] = new Binary(binaryValue);
+          Binary[] binaryValues = new Binary[arraySize];
+          for (int index = 0; index < arraySize; index++) {
+            boolean isNull = (ReadWriteIOUtils.readInt(buffer) == 1);
+            if (!isNull) {
+              binaryValues[index] = ReadWriteIOUtils.readBinary(buffer);
+            }
           }
           values[i] = binaryValues;
           break;
diff --git 
a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/record/TabletTest.java 
b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/record/TabletTest.java
index f37712197a8..34ad5a5a7d9 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/record/TabletTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/record/TabletTest.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.tsfile.write.record;
 
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.BitMap;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import org.junit.Test;
@@ -29,7 +30,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class TabletTest {
@@ -52,11 +53,18 @@ public class TabletTest {
       ((long[]) values[1])[i] = 1;
     }
 
-    Tablet tablet = new Tablet(deviceId, measurementSchemas, timestamps, 
values, null, rowSize);
+    Tablet tablet =
+        new Tablet(
+            deviceId,
+            measurementSchemas,
+            timestamps,
+            values,
+            new BitMap[] {new BitMap(1024), new BitMap(1024)},
+            rowSize);
     try {
       ByteBuffer byteBuffer = tablet.serialize();
       Tablet newTablet = Tablet.deserialize(byteBuffer);
-      assertEquals(newTablet, tablet);
+      assertTrue(newTablet.equals(tablet));
     } catch (Exception e) {
       e.printStackTrace();
       fail();

Reply via email to