This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e1ebf46e93 Pipe: Intoduce TsFileInsertionScanDataContainer to read 
data from tsfile sequentially to improve pattern parse performance when filter 
rate is high (#12781)
2e1ebf46e93 is described below

commit 2e1ebf46e930f49c47fa1ff54da902d97cfc65b1
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jun 27 18:57:27 2024 +0800

    Pipe: Intoduce TsFileInsertionScanDataContainer to read data from tsfile 
sequentially to improve pattern parse performance when filter rate is high 
(#12781)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../common/tsfile/PipeTsFileInsertionEvent.java    |  12 +-
 .../container/TsFileInsertionDataContainer.java    |  78 +++++
 .../TsFileInsertionDataContainerProvider.java      | 123 +++++++
 .../query/TsFileInsertionQueryDataContainer.java}  | 107 +++---
 .../TsFileInsertionQueryDataTabletIterator.java}   |  22 +-
 .../scan/AlignedSinglePageWholeChunkReader.java    | 170 +++++++++
 .../container/scan/SinglePageWholeChunkReader.java | 116 ++++++
 .../scan/TsFileInsertionScanDataContainer.java     | 388 +++++++++++++++++++++
 .../PipeHistoricalDataRegionTsFileExtractor.java   |   3 +-
 .../pipe/resource/tsfile/PipeTsFileResource.java   |  87 ++++-
 .../resource/tsfile/PipeTsFileResourceManager.java |   6 +-
 .../event/TsFileInsertionDataContainerTest.java    | 232 +++++++-----
 .../apache/iotdb/commons/conf/CommonConfig.java    |   9 +
 .../iotdb/commons/conf/CommonDescriptor.java       |   5 +
 .../iotdb/commons/pipe/config/PipeConfig.java      |   5 +
 .../commons/pipe/pattern/IoTDBPipePattern.java     |   4 +
 16 files changed, 1189 insertions(+), 178 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 05a5a6ff0f9..9d4a013b069 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
+import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainerProvider;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
 import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
@@ -179,7 +181,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
 
   // If the previous "isWithMod" is false, the modFile has been set to "null", 
then the isWithMod
   // can't be set to true
-  public void disableMod4NonTransferPipes(boolean isWithMod) {
+  public void disableMod4NonTransferPipes(final boolean isWithMod) {
     this.isWithMod = isWithMod && this.isWithMod;
   }
 
@@ -293,7 +295,8 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
       final Map<IDeviceID, Boolean> deviceIsAlignedMap =
           PipeDataNodeResourceManager.tsfile()
               .getDeviceIsAlignedMapFromCache(
-                  
PipeTsFileResourceManager.getHardlinkOrCopiedFileInPipeDir(resource.getTsFile()));
+                  
PipeTsFileResourceManager.getHardlinkOrCopiedFileInPipeDir(resource.getTsFile()),
+                  false);
       final Set<IDeviceID> deviceSet =
           Objects.nonNull(deviceIsAlignedMap) ? deviceIsAlignedMap.keySet() : 
resource.getDevices();
       return deviceSet.stream()
@@ -338,8 +341,9 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
     try {
       if (dataContainer == null) {
         dataContainer =
-            new TsFileInsertionDataContainer(
-                tsFile, pipePattern, startTime, endTime, pipeTaskMeta, this);
+            new TsFileInsertionDataContainerProvider(
+                    tsFile, pipePattern, startTime, endTime, pipeTaskMeta, 
this)
+                .provide();
       }
       return dataContainer;
     } catch (final IOException e) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java
new file mode 100644
index 00000000000..2e8a7ec6efa
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.event.common.tsfile.container;
+
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+
+import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.read.expression.impl.GlobalTimeExpression;
+import org.apache.tsfile.read.filter.factory.TimeFilterApi;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public abstract class TsFileInsertionDataContainer implements AutoCloseable {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TsFileInsertionDataContainer.class);
+
+  protected final PipePattern pattern; // used to filter data
+  protected final GlobalTimeExpression timeFilterExpression; // used to filter 
data
+
+  protected final PipeTaskMeta pipeTaskMeta; // used to report progress
+  protected final EnrichedEvent sourceEvent; // used to report progress
+
+  protected TsFileSequenceReader tsFileSequenceReader;
+
+  protected TsFileInsertionDataContainer(
+      final PipePattern pattern,
+      final long startTime,
+      final long endTime,
+      final PipeTaskMeta pipeTaskMeta,
+      final EnrichedEvent sourceEvent) {
+    this.pattern = pattern;
+    timeFilterExpression =
+        (startTime == Long.MIN_VALUE && endTime == Long.MAX_VALUE)
+            ? null
+            : new GlobalTimeExpression(TimeFilterApi.between(startTime, 
endTime));
+
+    this.pipeTaskMeta = pipeTaskMeta;
+    this.sourceEvent = sourceEvent;
+  }
+
+  /**
+   * @return {@link TabletInsertionEvent} in a streaming way
+   */
+  public abstract Iterable<TabletInsertionEvent> toTabletInsertionEvents();
+
+  @Override
+  public void close() {
+    try {
+      if (tsFileSequenceReader != null) {
+        tsFileSequenceReader.close();
+      }
+    } catch (final IOException e) {
+      LOGGER.warn("Failed to close TsFileSequenceReader", e);
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainerProvider.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainerProvider.java
new file mode 100644
index 00000000000..9ce61d70eab
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainerProvider.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.event.common.tsfile.container;
+
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.pattern.IoTDBPipePattern;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.query.TsFileInsertionQueryDataContainer;
+import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
+
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.PlainDeviceID;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+public class TsFileInsertionDataContainerProvider {
+
+  private final File tsFile;
+  private final PipePattern pattern;
+  private final long startTime;
+  private final long endTime;
+
+  protected final PipeTaskMeta pipeTaskMeta;
+  protected final PipeTsFileInsertionEvent sourceEvent;
+
+  public TsFileInsertionDataContainerProvider(
+      final File tsFile,
+      final PipePattern pipePattern,
+      final long startTime,
+      final long endTime,
+      final PipeTaskMeta pipeTaskMeta,
+      final PipeTsFileInsertionEvent sourceEvent) {
+    this.tsFile = tsFile;
+    this.pattern = pipePattern;
+    this.startTime = startTime;
+    this.endTime = endTime;
+    this.pipeTaskMeta = pipeTaskMeta;
+    this.sourceEvent = sourceEvent;
+  }
+
+  public TsFileInsertionDataContainer provide() throws IOException {
+    if (startTime != Long.MIN_VALUE
+        || endTime != Long.MAX_VALUE
+        || pattern instanceof IoTDBPipePattern
+            && !((IoTDBPipePattern) 
pattern).mayMatchMultipleTimeSeriesInOneDevice()) {
+      // 1. If time filter exists, use query here because the scan container 
may filter it
+      // row by row in single page chunk.
+      // 2. If the pattern matches only one time series in one device, use 
query container here
+      // because there is no timestamps merge overhead.
+      //
+      // Note: We judge prefix pattern as matching multiple timeseries in one 
device because it's
+      // hard to know whether it only matches one timeseries, while matching 
multiple is often the
+      // case.
+      return new TsFileInsertionQueryDataContainer(
+          tsFile, pattern, startTime, endTime, pipeTaskMeta, sourceEvent);
+    }
+
+    final Map<IDeviceID, Boolean> deviceIsAlignedMap =
+        
PipeDataNodeResourceManager.tsfile().getDeviceIsAlignedMapFromCache(tsFile, 
false);
+    if (Objects.isNull(deviceIsAlignedMap)) {
+      // If we failed to get from cache, it indicates that the memory usage is 
high.
+      // We use scan data container because it requires less memory.
+      return new TsFileInsertionScanDataContainer(
+          tsFile, pattern, startTime, endTime, pipeTaskMeta, sourceEvent);
+    }
+
+    final int originalSize = deviceIsAlignedMap.size();
+    final Map<IDeviceID, Boolean> filteredDeviceIsAlignedMap =
+        filterDeviceIsAlignedMapByPattern(deviceIsAlignedMap);
+    // Use scan data container if we need enough amount to data thus it's 
better to scan than query.
+    return (double) filteredDeviceIsAlignedMap.size() / originalSize
+            > PipeConfig.getInstance().getPipeTsFileScanParsingThreshold()
+        ? new TsFileInsertionScanDataContainer(
+            tsFile, pattern, startTime, endTime, pipeTaskMeta, sourceEvent)
+        : new TsFileInsertionQueryDataContainer(
+            tsFile,
+            pattern,
+            startTime,
+            endTime,
+            pipeTaskMeta,
+            sourceEvent,
+            filteredDeviceIsAlignedMap);
+  }
+
+  private Map<IDeviceID, Boolean> filterDeviceIsAlignedMapByPattern(
+      final Map<IDeviceID, Boolean> deviceIsAlignedMap) {
+    if (Objects.isNull(pattern) || pattern.isRoot()) {
+      return deviceIsAlignedMap;
+    }
+
+    return deviceIsAlignedMap.entrySet().stream()
+        .filter(
+            entry -> {
+              final String deviceId = ((PlainDeviceID) 
entry.getKey()).toStringID();
+              return pattern.coversDevice(deviceId) || 
pattern.mayOverlapWithDevice(deviceId);
+            })
+        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java
similarity index 80%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java
index 4e324bb8afe..2a0c7c410f7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.event.common.tsfile;
+package org.apache.iotdb.db.pipe.event.common.tsfile.container.query;
 
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.pipe.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.utils.TestOnly;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
@@ -38,10 +39,6 @@ import org.apache.tsfile.file.metadata.PlainDeviceID;
 import org.apache.tsfile.read.TsFileDeviceIterator;
 import org.apache.tsfile.read.TsFileReader;
 import org.apache.tsfile.read.TsFileSequenceReader;
-import org.apache.tsfile.read.expression.IExpression;
-import org.apache.tsfile.read.expression.impl.BinaryExpression;
-import org.apache.tsfile.read.expression.impl.GlobalTimeExpression;
-import org.apache.tsfile.read.filter.factory.TimeFilterApi;
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.write.record.Tablet;
 import org.slf4j.Logger;
@@ -59,35 +56,26 @@ import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.Set;
 
-public class TsFileInsertionDataContainer implements AutoCloseable {
+public class TsFileInsertionQueryDataContainer extends 
TsFileInsertionDataContainer {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(TsFileInsertionDataContainer.class);
-
-  private final PipePattern pattern; // used to filter data
-  private final IExpression timeFilterExpression; // used to filter data
-
-  private final PipeTaskMeta pipeTaskMeta; // used to report progress
-  private final EnrichedEvent sourceEvent; // used to report progress
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(TsFileInsertionQueryDataContainer.class);
 
   private final PipeMemoryBlock allocatedMemoryBlock;
-
-  private final TsFileSequenceReader tsFileSequenceReader;
   private final TsFileReader tsFileReader;
 
   private final Iterator<Map.Entry<IDeviceID, List<String>>> 
deviceMeasurementsMapIterator;
   private final Map<IDeviceID, Boolean> deviceIsAlignedMap;
   private final Map<String, TSDataType> measurementDataTypeMap;
 
-  private boolean shouldParsePattern = false;
-
   @TestOnly
-  public TsFileInsertionDataContainer(
+  public TsFileInsertionQueryDataContainer(
       final File tsFile, final PipePattern pattern, final long startTime, 
final long endTime)
       throws IOException {
     this(tsFile, pattern, startTime, endTime, null, null);
   }
 
-  public TsFileInsertionDataContainer(
+  public TsFileInsertionQueryDataContainer(
       final File tsFile,
       final PipePattern pattern,
       final long startTime,
@@ -95,16 +83,19 @@ public class TsFileInsertionDataContainer implements 
AutoCloseable {
       final PipeTaskMeta pipeTaskMeta,
       final EnrichedEvent sourceEvent)
       throws IOException {
-    this.pattern = pattern;
-    timeFilterExpression =
-        (startTime == Long.MIN_VALUE && endTime == Long.MAX_VALUE)
-            ? null
-            : BinaryExpression.and(
-                new GlobalTimeExpression(TimeFilterApi.gtEq(startTime)),
-                new GlobalTimeExpression(TimeFilterApi.ltEq(endTime)));
+    this(tsFile, pattern, startTime, endTime, pipeTaskMeta, sourceEvent, null);
+  }
 
-    this.pipeTaskMeta = pipeTaskMeta;
-    this.sourceEvent = sourceEvent;
+  public TsFileInsertionQueryDataContainer(
+      final File tsFile,
+      final PipePattern pattern,
+      final long startTime,
+      final long endTime,
+      final PipeTaskMeta pipeTaskMeta,
+      final EnrichedEvent sourceEvent,
+      final Map<IDeviceID, Boolean> deviceIsAlignedMap)
+      throws IOException {
+    super(pattern, startTime, endTime, pipeTaskMeta, sourceEvent);
 
     try {
       final PipeTsFileResourceManager tsFileResourceManager = 
PipeDataNodeResourceManager.tsfile();
@@ -118,17 +109,27 @@ public class TsFileInsertionDataContainer implements 
AutoCloseable {
 
       if (tsFileResourceManager.cacheObjectsIfAbsent(tsFile)) {
         // These read-only objects can be found in cache.
-        deviceIsAlignedMap = 
tsFileResourceManager.getDeviceIsAlignedMapFromCache(tsFile);
+        this.deviceIsAlignedMap =
+            Objects.nonNull(deviceIsAlignedMap)
+                ? deviceIsAlignedMap
+                : tsFileResourceManager.getDeviceIsAlignedMapFromCache(tsFile, 
true);
         measurementDataTypeMap = 
tsFileResourceManager.getMeasurementDataTypeMapFromCache(tsFile);
         deviceMeasurementsMap = 
tsFileResourceManager.getDeviceMeasurementsMapFromCache(tsFile);
       } else {
         // We need to create these objects here and remove them later.
-        deviceIsAlignedMap = readDeviceIsAlignedMap();
-        memoryRequiredInBytes += 
PipeMemoryWeightUtil.memoryOfIDeviceId2Bool(deviceIsAlignedMap);
-
-        // Filter devices that may overlap with pattern first
-        // to avoid reading all time-series of all devices.
-        final Set<IDeviceID> devices = 
filterDevicesByPattern(deviceIsAlignedMap.keySet());
+        final Set<IDeviceID> devices;
+        if (Objects.isNull(deviceIsAlignedMap)) {
+          this.deviceIsAlignedMap = readDeviceIsAlignedMap();
+          memoryRequiredInBytes +=
+              
PipeMemoryWeightUtil.memoryOfIDeviceId2Bool(this.deviceIsAlignedMap);
+
+          // Filter devices that may overlap with pattern first
+          // to avoid reading all time-series of all devices.
+          devices = filterDevicesByPattern(this.deviceIsAlignedMap.keySet());
+        } else {
+          this.deviceIsAlignedMap = deviceIsAlignedMap;
+          devices = deviceIsAlignedMap.keySet();
+        }
 
         measurementDataTypeMap = readFilteredFullPathDataTypeMap(devices);
         memoryRequiredInBytes +=
@@ -176,9 +177,6 @@ public class TsFileInsertionDataContainer implements 
AutoCloseable {
         for (final String measurement : entry.getValue()) {
           if (pattern.matchesMeasurement(deviceId, measurement)) {
             filteredMeasurements.add(measurement);
-          } else {
-            // Parse pattern iff there are measurements filtered out
-            shouldParsePattern = true;
           }
         }
 
@@ -186,13 +184,6 @@ public class TsFileInsertionDataContainer implements 
AutoCloseable {
           filteredDeviceMeasurementsMap.put(new PlainDeviceID(deviceId), 
filteredMeasurements);
         }
       }
-
-      // case 3: for example, pattern is root.a.b.c and device is root.a.b.d
-      // in this case, no data can be matched
-      else {
-        // Parse pattern iff there are measurements filtered out
-        shouldParsePattern = true;
-      }
     }
 
     return filteredDeviceMeasurementsMap;
@@ -232,7 +223,7 @@ public class TsFileInsertionDataContainer implements 
AutoCloseable {
       throws IOException {
     final Map<String, TSDataType> result = new HashMap<>();
 
-    for (IDeviceID device : devices) {
+    for (final IDeviceID device : devices) {
       tsFileSequenceReader
           .readDeviceMetadata(device)
           .values()
@@ -256,7 +247,7 @@ public class TsFileInsertionDataContainer implements 
AutoCloseable {
       final Set<IDeviceID> devices) throws IOException {
     final Map<IDeviceID, List<String>> result = new HashMap<>();
 
-    for (IDeviceID device : devices) {
+    for (final IDeviceID device : devices) {
       tsFileSequenceReader
           .readDeviceMetadata(device)
           .values()
@@ -270,14 +261,12 @@ public class TsFileInsertionDataContainer implements 
AutoCloseable {
     return result;
   }
 
-  /**
-   * @return {@link TabletInsertionEvent} in a streaming way
-   */
+  @Override
   public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
     return () ->
         new Iterator<TabletInsertionEvent>() {
 
-          private TsFileInsertionDataTabletIterator tabletIterator = null;
+          private TsFileInsertionQueryDataTabletIterator tabletIterator = null;
 
           @Override
           public boolean hasNext() {
@@ -291,13 +280,13 @@ public class TsFileInsertionDataContainer implements 
AutoCloseable {
 
               try {
                 tabletIterator =
-                    new TsFileInsertionDataTabletIterator(
+                    new TsFileInsertionQueryDataTabletIterator(
                         tsFileReader,
                         measurementDataTypeMap,
                         ((PlainDeviceID) entry.getKey()).toStringID(),
                         entry.getValue(),
                         timeFilterExpression);
-              } catch (final IOException e) {
+              } catch (final Exception e) {
                 close();
                 throw new PipeException("failed to create 
TsFileInsertionDataTabletIterator", e);
               }
@@ -345,10 +334,6 @@ public class TsFileInsertionDataContainer implements 
AutoCloseable {
         };
   }
 
-  public boolean shouldParsePattern() {
-    return shouldParsePattern;
-  }
-
   @Override
   public void close() {
     try {
@@ -359,13 +344,7 @@ public class TsFileInsertionDataContainer implements 
AutoCloseable {
       LOGGER.warn("Failed to close TsFileReader", e);
     }
 
-    try {
-      if (tsFileSequenceReader != null) {
-        tsFileSequenceReader.close();
-      }
-    } catch (final IOException e) {
-      LOGGER.warn("Failed to close TsFileSequenceReader", e);
-    }
+    super.close();
 
     if (allocatedMemoryBlock != null) {
       allocatedMemoryBlock.close();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataTabletIterator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataTabletIterator.java
similarity index 89%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataTabletIterator.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataTabletIterator.java
index 1e028ae870e..efe58f5ff60 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataTabletIterator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataTabletIterator.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.event.common.tsfile;
+package org.apache.iotdb.db.pipe.event.common.tsfile.container.query;
 
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -42,7 +42,7 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.stream.Collectors;
 
-public class TsFileInsertionDataTabletIterator implements Iterator<Tablet> {
+public class TsFileInsertionQueryDataTabletIterator implements 
Iterator<Tablet> {
 
   private final TsFileReader tsFileReader;
   private final Map<String, TSDataType> measurementDataTypeMap;
@@ -54,12 +54,12 @@ public class TsFileInsertionDataTabletIterator implements 
Iterator<Tablet> {
 
   private final QueryDataSet queryDataSet;
 
-  public TsFileInsertionDataTabletIterator(
-      TsFileReader tsFileReader,
-      Map<String, TSDataType> measurementDataTypeMap,
-      String deviceId,
-      List<String> measurements,
-      IExpression timeFilterExpression)
+  TsFileInsertionQueryDataTabletIterator(
+      final TsFileReader tsFileReader,
+      final Map<String, TSDataType> measurementDataTypeMap,
+      final String deviceId,
+      final List<String> measurements,
+      final IExpression timeFilterExpression)
       throws IOException {
     this.tsFileReader = tsFileReader;
     this.measurementDataTypeMap = measurementDataTypeMap;
@@ -81,7 +81,7 @@ public class TsFileInsertionDataTabletIterator implements 
Iterator<Tablet> {
 
   private QueryDataSet buildQueryDataSet() throws IOException {
     final List<Path> paths = new ArrayList<>();
-    for (String measurement : measurements) {
+    for (final String measurement : measurements) {
       paths.add(new Path(deviceId, measurement, false));
     }
     return tsFileReader.query(QueryExpression.create(paths, 
timeFilterExpression));
@@ -91,7 +91,7 @@ public class TsFileInsertionDataTabletIterator implements 
Iterator<Tablet> {
   public boolean hasNext() {
     try {
       return queryDataSet.hasNext();
-    } catch (IOException e) {
+    } catch (final IOException e) {
       throw new PipeException("Failed to check next", e);
     }
   }
@@ -104,7 +104,7 @@ public class TsFileInsertionDataTabletIterator implements 
Iterator<Tablet> {
 
     try {
       return buildNextTablet();
-    } catch (IOException e) {
+    } catch (final IOException e) {
       throw new PipeException("Failed to build tablet", e);
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/AlignedSinglePageWholeChunkReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/AlignedSinglePageWholeChunkReader.java
new file mode 100644
index 00000000000..740a1523d27
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/AlignedSinglePageWholeChunkReader.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.event.common.tsfile.container.scan;
+
+import org.apache.tsfile.encoding.decoder.Decoder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.header.ChunkHeader;
+import org.apache.tsfile.file.header.PageHeader;
+import org.apache.tsfile.file.metadata.statistics.Statistics;
+import org.apache.tsfile.read.common.Chunk;
+import org.apache.tsfile.read.common.TimeRange;
+import org.apache.tsfile.read.reader.chunk.AbstractChunkReader;
+import org.apache.tsfile.read.reader.chunk.ChunkReader;
+import org.apache.tsfile.read.reader.page.AlignedPageReader;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The {@link AlignedSinglePageWholeChunkReader} is used to read a whole 
single page aligned chunk
+ * with need to pass in the statistics.
+ */
+public class AlignedSinglePageWholeChunkReader extends AbstractChunkReader {
+
+  // chunk header of the time column
+  private final ChunkHeader timeChunkHeader;
+  // chunk data of the time column
+  private final ByteBuffer timeChunkDataBuffer;
+
+  // chunk headers of all the sub sensors
+  private final List<ChunkHeader> valueChunkHeaderList = new ArrayList<>();
+  // chunk data of all the sub sensors
+  private final List<ByteBuffer> valueChunkDataBufferList = new ArrayList<>();
+  // deleted intervals of all the sub sensors
+  private final List<List<TimeRange>> valueDeleteIntervalsList = new 
ArrayList<>();
+
+  public AlignedSinglePageWholeChunkReader(Chunk timeChunk, List<Chunk> 
valueChunkList)
+      throws IOException {
+    super(Long.MIN_VALUE, null);
+    this.timeChunkHeader = timeChunk.getHeader();
+    this.timeChunkDataBuffer = timeChunk.getData();
+
+    valueChunkList.forEach(
+        chunk -> {
+          this.valueChunkHeaderList.add(chunk == null ? null : 
chunk.getHeader());
+          this.valueChunkDataBufferList.add(chunk == null ? null : 
chunk.getData());
+          this.valueDeleteIntervalsList.add(chunk == null ? null : 
chunk.getDeleteIntervalList());
+        });
+
+    initAllPageReaders();
+  }
+
+  private void initAllPageReaders() throws IOException {
+    while (timeChunkDataBuffer.remaining() > 0) {
+      AlignedPageReader alignedPageReader = deserializeFromSinglePageChunk();
+      if (alignedPageReader != null) {
+        pageReaderList.add(alignedPageReader);
+      }
+    }
+  }
+
+  private AlignedPageReader deserializeFromSinglePageChunk() throws 
IOException {
+    PageHeader timePageHeader =
+        PageHeader.deserializeFrom(timeChunkDataBuffer, (Statistics<? extends 
Serializable>) null);
+    List<PageHeader> valuePageHeaderList = new ArrayList<>();
+
+    boolean isAllNull = true;
+    for (ByteBuffer byteBuffer : valueChunkDataBufferList) {
+      if (byteBuffer != null) {
+        isAllNull = false;
+        valuePageHeaderList.add(
+            PageHeader.deserializeFrom(byteBuffer, (Statistics<? extends 
Serializable>) null));
+      } else {
+        valuePageHeaderList.add(null);
+      }
+    }
+
+    if (isAllNull) {
+      // when there is only one page in the chunk, the page statistic is the 
same as the chunk, so
+      // we needn't filter the page again
+      skipCurrentPage(timePageHeader, valuePageHeaderList);
+      return null;
+    }
+    return constructAlignedPageReader(timePageHeader, valuePageHeaderList);
+  }
+
+  private void skipCurrentPage(PageHeader timePageHeader, List<PageHeader> 
valuePageHeader) {
+    timeChunkDataBuffer.position(
+        timeChunkDataBuffer.position() + timePageHeader.getCompressedSize());
+    for (int i = 0; i < valuePageHeader.size(); i++) {
+      if (valuePageHeader.get(i) != null) {
+        valueChunkDataBufferList
+            .get(i)
+            .position(
+                valueChunkDataBufferList.get(i).position()
+                    + valuePageHeader.get(i).getCompressedSize());
+      }
+    }
+  }
+
+  private AlignedPageReader constructAlignedPageReader(
+      PageHeader timePageHeader, List<PageHeader> rawValuePageHeaderList) 
throws IOException {
+    ByteBuffer timePageData =
+        ChunkReader.deserializePageData(timePageHeader, timeChunkDataBuffer, 
timeChunkHeader);
+
+    List<PageHeader> valuePageHeaderList = new ArrayList<>();
+    List<ByteBuffer> valuePageDataList = new ArrayList<>();
+    List<TSDataType> valueDataTypeList = new ArrayList<>();
+    List<Decoder> valueDecoderList = new ArrayList<>();
+
+    boolean isAllNull = true;
+    for (int i = 0; i < rawValuePageHeaderList.size(); i++) {
+      PageHeader valuePageHeader = rawValuePageHeaderList.get(i);
+
+      if (valuePageHeader == null || valuePageHeader.getUncompressedSize() == 
0) {
+        // Empty Page
+        valuePageHeaderList.add(null);
+        valuePageDataList.add(null);
+        valueDataTypeList.add(null);
+        valueDecoderList.add(null);
+      } else {
+        ChunkHeader valueChunkHeader = valueChunkHeaderList.get(i);
+        valuePageHeaderList.add(valuePageHeader);
+        valuePageDataList.add(
+            ChunkReader.deserializePageData(
+                valuePageHeader, valueChunkDataBufferList.get(i), 
valueChunkHeader));
+        valueDataTypeList.add(valueChunkHeader.getDataType());
+        valueDecoderList.add(
+            Decoder.getDecoderByType(
+                valueChunkHeader.getEncodingType(), 
valueChunkHeader.getDataType()));
+        isAllNull = false;
+      }
+    }
+    if (isAllNull) {
+      return null;
+    }
+    AlignedPageReader alignedPageReader =
+        new AlignedPageReader(
+            timePageHeader,
+            timePageData,
+            defaultTimeDecoder,
+            valuePageHeaderList,
+            valuePageDataList,
+            valueDataTypeList,
+            valueDecoderList,
+            queryFilter);
+    alignedPageReader.setDeleteIntervalList(valueDeleteIntervalsList);
+    return alignedPageReader;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/SinglePageWholeChunkReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/SinglePageWholeChunkReader.java
new file mode 100644
index 00000000000..3f1aadbba0c
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/SinglePageWholeChunkReader.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.event.common.tsfile.container.scan;
+
+import org.apache.tsfile.compress.IUnCompressor;
+import org.apache.tsfile.encoding.decoder.Decoder;
+import org.apache.tsfile.file.header.ChunkHeader;
+import org.apache.tsfile.file.header.PageHeader;
+import org.apache.tsfile.file.metadata.statistics.Statistics;
+import org.apache.tsfile.read.common.Chunk;
+import org.apache.tsfile.read.reader.chunk.AbstractChunkReader;
+import org.apache.tsfile.read.reader.page.PageReader;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+
+public class SinglePageWholeChunkReader extends AbstractChunkReader {
+  private final ChunkHeader chunkHeader;
+  private final ByteBuffer chunkDataBuffer;
+
+  public SinglePageWholeChunkReader(Chunk chunk) throws IOException {
+    super(Long.MIN_VALUE, null);
+
+    this.chunkHeader = chunk.getHeader();
+    this.chunkDataBuffer = chunk.getData();
+
+    initAllPageReaders();
+  }
+
+  private void initAllPageReaders() throws IOException {
+    // construct next satisfied page header
+    while (chunkDataBuffer.remaining() > 0) {
+      pageReaderList.add(
+          constructPageReader(
+              PageHeader.deserializeFrom(
+                  chunkDataBuffer, (Statistics<? extends Serializable>) 
null)));
+    }
+  }
+
+  private PageReader constructPageReader(PageHeader pageHeader) throws 
IOException {
+    return new PageReader(
+        pageHeader,
+        deserializePageData(pageHeader, chunkDataBuffer, chunkHeader),
+        chunkHeader.getDataType(),
+        Decoder.getDecoderByType(chunkHeader.getEncodingType(), 
chunkHeader.getDataType()),
+        defaultTimeDecoder,
+        null);
+  }
+
+  
/////////////////////////////////////////////////////////////////////////////////////////////////
+  // util methods
+  
/////////////////////////////////////////////////////////////////////////////////////////////////
+
+  public static ByteBuffer readCompressedPageData(PageHeader pageHeader, 
ByteBuffer chunkBuffer)
+      throws IOException {
+    int compressedPageBodyLength = pageHeader.getCompressedSize();
+    byte[] compressedPageBody = new byte[compressedPageBodyLength];
+    // doesn't have a complete page body
+    if (compressedPageBodyLength > chunkBuffer.remaining()) {
+      throw new IOException(
+          "do not has a complete page body. Expected:"
+              + compressedPageBodyLength
+              + ". Actual:"
+              + chunkBuffer.remaining());
+    }
+    chunkBuffer.get(compressedPageBody);
+    return ByteBuffer.wrap(compressedPageBody);
+  }
+
+  public static ByteBuffer uncompressPageData(
+      PageHeader pageHeader, IUnCompressor unCompressor, ByteBuffer 
compressedPageData)
+      throws IOException {
+    int compressedPageBodyLength = pageHeader.getCompressedSize();
+    byte[] uncompressedPageData = new byte[pageHeader.getUncompressedSize()];
+    try {
+      unCompressor.uncompress(
+          compressedPageData.array(), 0, compressedPageBodyLength, 
uncompressedPageData, 0);
+    } catch (Exception e) {
+      throw new IOException(
+          "Uncompress error! uncompress size: "
+              + pageHeader.getUncompressedSize()
+              + "compressed size: "
+              + pageHeader.getCompressedSize()
+              + "page header: "
+              + pageHeader
+              + e.getMessage());
+    }
+
+    return ByteBuffer.wrap(uncompressedPageData);
+  }
+
+  public static ByteBuffer deserializePageData(
+      PageHeader pageHeader, ByteBuffer chunkBuffer, ChunkHeader chunkHeader) 
throws IOException {
+    IUnCompressor unCompressor = 
IUnCompressor.getUnCompressor(chunkHeader.getCompressionType());
+    ByteBuffer compressedPageBody = readCompressedPageData(pageHeader, 
chunkBuffer);
+    return uncompressPageData(pageHeader, unCompressor, compressedPageBody);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
new file mode 100644
index 00000000000..a841986064e
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.event.common.tsfile.container.scan;
+
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.common.constant.TsFileConstant;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.MetaMarker;
+import org.apache.tsfile.file.header.ChunkHeader;
+import org.apache.tsfile.file.metadata.PlainDeviceID;
+import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.read.common.BatchData;
+import org.apache.tsfile.read.common.Chunk;
+import org.apache.tsfile.read.filter.basic.Filter;
+import org.apache.tsfile.read.reader.IChunkReader;
+import org.apache.tsfile.read.reader.chunk.AlignedChunkReader;
+import org.apache.tsfile.read.reader.chunk.ChunkReader;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.DateUtils;
+import org.apache.tsfile.utils.TsPrimitiveType;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.LocalDate;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+
+public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContainer {
+
+  private final long startTime;
+  private final long endTime;
+  private final Filter filter;
+
+  private IChunkReader chunkReader;
+  private BatchData data;
+
+  private boolean isMultiPage;
+  private String currentDevice;
+  private boolean currentIsAligned;
+  private final List<MeasurementSchema> currentMeasurements = new 
ArrayList<>();
+
+  private byte lastMarker = Byte.MIN_VALUE;
+
+  public TsFileInsertionScanDataContainer(
+      final File tsFile,
+      final PipePattern pattern,
+      final long startTime,
+      final long endTime,
+      final PipeTaskMeta pipeTaskMeta,
+      final EnrichedEvent sourceEvent)
+      throws IOException {
+    super(pattern, startTime, endTime, pipeTaskMeta, sourceEvent);
+
+    this.startTime = startTime;
+    this.endTime = endTime;
+    filter = Objects.nonNull(timeFilterExpression) ? 
timeFilterExpression.getFilter() : null;
+
+    try {
+      tsFileSequenceReader = new 
TsFileSequenceReader(tsFile.getAbsolutePath(), false, false);
+      tsFileSequenceReader.position((long) 
TSFileConfig.MAGIC_STRING.getBytes().length + 1);
+
+      prepareData();
+    } catch (final Exception e) {
+      close();
+      throw e;
+    }
+  }
+
+  @Override
+  public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
+    return () ->
+        new Iterator<TabletInsertionEvent>() {
+
+          @Override
+          public boolean hasNext() {
+            return Objects.nonNull(chunkReader);
+          }
+
+          @Override
+          public TabletInsertionEvent next() {
+            if (!hasNext()) {
+              close();
+              throw new NoSuchElementException();
+            }
+
+            final Tablet tablet = getNextTablet();
+            final boolean hasNext = hasNext();
+            try {
+              return new PipeRawTabletInsertionEvent(
+                  tablet,
+                  currentIsAligned,
+                  sourceEvent != null ? sourceEvent.getPipeName() : null,
+                  sourceEvent != null ? sourceEvent.getCreationTime() : 0,
+                  pipeTaskMeta,
+                  sourceEvent,
+                  !hasNext);
+            } finally {
+              if (!hasNext) {
+                close();
+              }
+            }
+          }
+        };
+  }
+
+  private Tablet getNextTablet() {
+    try {
+      final Tablet tablet =
+          new Tablet(
+              currentDevice,
+              currentMeasurements,
+              PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
+      tablet.initBitMaps();
+
+      while (data.hasCurrent()) {
+        if (isMultiPage || data.currentTime() >= startTime && 
data.currentTime() <= endTime) {
+          final int rowIndex = tablet.rowSize;
+
+          tablet.addTimestamp(rowIndex, data.currentTime());
+          putValueToColumns(data, tablet.values, rowIndex);
+
+          tablet.rowSize++;
+        }
+
+        data.next();
+        while (!data.hasCurrent() && chunkReader.hasNextSatisfiedPage()) {
+          data = chunkReader.nextPageData();
+        }
+
+        if (tablet.rowSize == tablet.getMaxRowNumber()) {
+          break;
+        }
+      }
+
+      // Switch chunk reader iff current chunk is all consumed
+      if (!data.hasCurrent()) {
+        prepareData();
+      }
+
+      return tablet;
+    } catch (final Exception e) {
+      close();
+      throw new PipeException("Failed to get next tablet insertion event.", e);
+    }
+  }
+
+  private void prepareData() throws IOException {
+    do {
+      do {
+        moveToNextChunkReader();
+      } while (Objects.nonNull(chunkReader) && 
!chunkReader.hasNextSatisfiedPage());
+
+      if (Objects.isNull(chunkReader)) {
+        close();
+        break;
+      }
+
+      do {
+        data = chunkReader.nextPageData();
+      } while (!data.hasCurrent() && chunkReader.hasNextSatisfiedPage());
+    } while (!data.hasCurrent());
+  }
+
+  private void putValueToColumns(final BatchData data, final Object[] columns, 
final int rowIndex) {
+    final TSDataType type = data.getDataType();
+    if (type == TSDataType.VECTOR) {
+      for (int i = 0; i < columns.length; ++i) {
+        final TsPrimitiveType primitiveType = data.getVector()[i];
+        switch (primitiveType.getDataType()) {
+          case BOOLEAN:
+            ((boolean[]) columns[i])[rowIndex] = primitiveType.getBoolean();
+            break;
+          case INT32:
+            ((int[]) columns[i])[rowIndex] = primitiveType.getInt();
+            break;
+          case DATE:
+            ((LocalDate[]) columns[i])[rowIndex] =
+                DateUtils.parseIntToLocalDate(primitiveType.getInt());
+            break;
+          case INT64:
+          case TIMESTAMP:
+            ((long[]) columns[i])[rowIndex] = primitiveType.getLong();
+            break;
+          case FLOAT:
+            ((float[]) columns[i])[rowIndex] = primitiveType.getFloat();
+            break;
+          case DOUBLE:
+            ((double[]) columns[i])[rowIndex] = primitiveType.getDouble();
+            break;
+          case TEXT:
+          case BLOB:
+          case STRING:
+            ((Binary[]) columns[i])[rowIndex] = primitiveType.getBinary();
+            break;
+          default:
+            throw new UnSupportedDataTypeException("UnSupported" + 
primitiveType.getDataType());
+        }
+      }
+    } else {
+      switch (type) {
+        case BOOLEAN:
+          ((boolean[]) columns[0])[rowIndex] = data.getBoolean();
+          break;
+        case INT32:
+          ((int[]) columns[0])[rowIndex] = data.getInt();
+          break;
+        case DATE:
+          ((LocalDate[]) columns[0])[rowIndex] = 
DateUtils.parseIntToLocalDate(data.getInt());
+          break;
+        case INT64:
+        case TIMESTAMP:
+          ((long[]) columns[0])[rowIndex] = data.getLong();
+          break;
+        case FLOAT:
+          ((float[]) columns[0])[rowIndex] = data.getFloat();
+          break;
+        case DOUBLE:
+          ((double[]) columns[0])[rowIndex] = data.getDouble();
+          break;
+        case TEXT:
+        case BLOB:
+        case STRING:
+          ((Binary[]) columns[0])[rowIndex] = data.getBinary();
+          break;
+        default:
+          throw new UnSupportedDataTypeException("UnSupported" + 
data.getDataType());
+      }
+    }
+  }
+
+  private void moveToNextChunkReader() throws IOException, 
IllegalStateException {
+    ChunkHeader chunkHeader;
+    Chunk timeChunk = null;
+    final List<Chunk> valueChunkList = new ArrayList<>();
+    currentMeasurements.clear();
+
+    if (lastMarker == MetaMarker.SEPARATOR) {
+      chunkReader = null;
+      return;
+    }
+
+    byte marker;
+    while ((marker = lastMarker != Byte.MIN_VALUE ? lastMarker : 
tsFileSequenceReader.readMarker())
+        != MetaMarker.SEPARATOR) {
+      lastMarker = Byte.MIN_VALUE;
+      switch (marker) {
+        case MetaMarker.CHUNK_HEADER:
+        case MetaMarker.TIME_CHUNK_HEADER:
+        case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER:
+        case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER:
+          if (Objects.nonNull(timeChunk) && !currentMeasurements.isEmpty()) {
+            chunkReader =
+                isMultiPage
+                    ? new AlignedChunkReader(timeChunk, valueChunkList, filter)
+                    : new AlignedSinglePageWholeChunkReader(timeChunk, 
valueChunkList);
+            currentIsAligned = true;
+            lastMarker = marker;
+            return;
+          }
+
+          isMultiPage = marker == MetaMarker.CHUNK_HEADER || marker == 
MetaMarker.TIME_CHUNK_HEADER;
+
+          chunkHeader = tsFileSequenceReader.readChunkHeader(marker);
+
+          if (Objects.isNull(currentDevice)) {
+            tsFileSequenceReader.position(
+                tsFileSequenceReader.position() + chunkHeader.getDataSize());
+            break;
+          }
+
+          if ((chunkHeader.getChunkType() & TsFileConstant.TIME_COLUMN_MASK)
+              == TsFileConstant.TIME_COLUMN_MASK) {
+            timeChunk =
+                new Chunk(
+                    chunkHeader, tsFileSequenceReader.readChunk(-1, 
chunkHeader.getDataSize()));
+            break;
+          }
+
+          if (!pattern.matchesMeasurement(currentDevice, 
chunkHeader.getMeasurementID())) {
+            tsFileSequenceReader.position(
+                tsFileSequenceReader.position() + chunkHeader.getDataSize());
+            break;
+          }
+
+          chunkReader =
+              isMultiPage
+                  ? new ChunkReader(
+                      new Chunk(
+                          chunkHeader,
+                          tsFileSequenceReader.readChunk(-1, 
chunkHeader.getDataSize())),
+                      filter)
+                  : new SinglePageWholeChunkReader(
+                      new Chunk(
+                          chunkHeader,
+                          tsFileSequenceReader.readChunk(-1, 
chunkHeader.getDataSize())));
+          currentIsAligned = false;
+          currentMeasurements.add(
+              new MeasurementSchema(chunkHeader.getMeasurementID(), 
chunkHeader.getDataType()));
+          return;
+        case MetaMarker.VALUE_CHUNK_HEADER:
+        case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
+          chunkHeader = tsFileSequenceReader.readChunkHeader(marker);
+
+          if (Objects.isNull(currentDevice)
+              || !pattern.matchesMeasurement(currentDevice, 
chunkHeader.getMeasurementID())) {
+            tsFileSequenceReader.position(
+                tsFileSequenceReader.position() + chunkHeader.getDataSize());
+            break;
+          }
+
+          // Do not record empty chunk
+          if (chunkHeader.getDataSize() > 0) {
+            valueChunkList.add(
+                new Chunk(
+                    chunkHeader, tsFileSequenceReader.readChunk(-1, 
chunkHeader.getDataSize())));
+            currentMeasurements.add(
+                new MeasurementSchema(chunkHeader.getMeasurementID(), 
chunkHeader.getDataType()));
+          }
+          break;
+        case MetaMarker.CHUNK_GROUP_HEADER:
+          // Return before "currentDevice" changes
+          if (Objects.nonNull(timeChunk) && !currentMeasurements.isEmpty()) {
+            chunkReader =
+                isMultiPage
+                    ? new AlignedChunkReader(timeChunk, valueChunkList, filter)
+                    : new AlignedSinglePageWholeChunkReader(timeChunk, 
valueChunkList);
+            currentIsAligned = true;
+            lastMarker = marker;
+            return;
+          }
+          // TODO: Replace it by IDeviceID
+          final String deviceID =
+              ((PlainDeviceID) 
tsFileSequenceReader.readChunkGroupHeader().getDeviceID())
+                  .toStringID();
+          currentDevice = pattern.mayOverlapWithDevice(deviceID) ? deviceID : 
null;
+          break;
+        case MetaMarker.OPERATION_INDEX_RANGE:
+          tsFileSequenceReader.readPlanIndex();
+          break;
+        default:
+          MetaMarker.handleUnexpectedMarker(marker);
+      }
+    }
+
+    lastMarker = marker;
+    if (Objects.nonNull(timeChunk) && !currentMeasurements.isEmpty()) {
+      chunkReader =
+          isMultiPage
+              ? new AlignedChunkReader(timeChunk, valueChunkList, filter)
+              : new AlignedSinglePageWholeChunkReader(timeChunk, 
valueChunkList);
+      currentIsAligned = true;
+    } else {
+      chunkReader = null;
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 34d331a0b6b..765c48536ab 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -510,7 +510,8 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
       final Map<IDeviceID, Boolean> deviceIsAlignedMap =
           PipeDataNodeResourceManager.tsfile()
               .getDeviceIsAlignedMapFromCache(
-                  
PipeTsFileResourceManager.getHardlinkOrCopiedFileInPipeDir(resource.getTsFile()));
+                  
PipeTsFileResourceManager.getHardlinkOrCopiedFileInPipeDir(resource.getTsFile()),
+                  false);
       deviceSet =
           Objects.nonNull(deviceIsAlignedMap) ? deviceIsAlignedMap.keySet() : 
resource.getDevices();
     } catch (final IOException e) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
index 95be9281d5b..7bb67e781f2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
@@ -164,9 +164,14 @@ public class PipeTsFileResource implements AutoCloseable {
     return deviceMeasurementsMap;
   }
 
-  public synchronized Map<IDeviceID, Boolean> tryGetDeviceIsAlignedMap() 
throws IOException {
+  public synchronized Map<IDeviceID, Boolean> tryGetDeviceIsAlignedMap(
+      final boolean cacheOtherMetadata) throws IOException {
     if (deviceIsAlignedMap == null && isTsFile) {
-      cacheObjectsIfAbsent();
+      if (cacheOtherMetadata) {
+        cacheObjectsIfAbsent();
+      } else {
+        cacheDeviceIsAlignedMapIfAbsent();
+      }
     }
     return deviceIsAlignedMap;
   }
@@ -178,7 +183,7 @@ public class PipeTsFileResource implements AutoCloseable {
     return measurementDataTypeMap;
   }
 
-  synchronized boolean cacheObjectsIfAbsent() throws IOException {
+  synchronized boolean cacheDeviceIsAlignedMapIfAbsent() throws IOException {
     if (!isTsFile) {
       return false;
     }
@@ -205,11 +210,7 @@ public class PipeTsFileResource implements AutoCloseable {
 
     long memoryRequiredInBytes = 0L;
     try (TsFileSequenceReader sequenceReader =
-        new TsFileSequenceReader(hardlinkOrCopiedFile.getPath(), true, true)) {
-      deviceMeasurementsMap = sequenceReader.getDeviceMeasurementsMap();
-      memoryRequiredInBytes +=
-          
PipeMemoryWeightUtil.memoryOfIDeviceID2StrList(deviceMeasurementsMap);
-
+        new TsFileSequenceReader(hardlinkOrCopiedFile.getPath(), true, false)) 
{
       deviceIsAlignedMap = new HashMap<>();
       final TsFileDeviceIterator deviceIsAlignedIterator =
           sequenceReader.getAllDevicesIteratorWithIsAligned();
@@ -218,6 +219,76 @@ public class PipeTsFileResource implements AutoCloseable {
         deviceIsAlignedMap.put(deviceIsAlignedPair.getLeft(), 
deviceIsAlignedPair.getRight());
       }
       memoryRequiredInBytes += 
PipeMemoryWeightUtil.memoryOfIDeviceId2Bool(deviceIsAlignedMap);
+    }
+    // Release memory of TsFileSequenceReader.
+    allocatedMemoryBlock.close();
+    allocatedMemoryBlock = null;
+
+    // Allocate again for the cached objects.
+    allocatedMemoryBlock =
+        PipeDataNodeResourceManager.memory()
+            .forceAllocateIfSufficient(memoryRequiredInBytes, 
MEMORY_SUFFICIENT_THRESHOLD);
+    if (allocatedMemoryBlock == null) {
+      LOGGER.info(
+          "PipeTsFileResource: Failed to cache objects for tsfile {} in cache, 
because memory usage is high",
+          hardlinkOrCopiedFile.getPath());
+      deviceIsAlignedMap = null;
+      return false;
+    }
+
+    LOGGER.info(
+        "PipeTsFileResource: Cached deviceIsAlignedMap for tsfile {}.",
+        hardlinkOrCopiedFile.getPath());
+    return true;
+  }
+
+  synchronized boolean cacheObjectsIfAbsent() throws IOException {
+    if (!isTsFile) {
+      return false;
+    }
+
+    if (allocatedMemoryBlock != null) {
+      if (deviceMeasurementsMap != null) {
+        return true;
+      } else {
+        // Recalculate it again because only deviceIsAligned map is cached
+        allocatedMemoryBlock.close();
+        allocatedMemoryBlock = null;
+      }
+    }
+
+    // See if pipe memory is sufficient to be allocated for 
TsFileSequenceReader.
+    // Only allocate when pipe memory used is less than 50%, because memory 
here
+    // is hard to shrink and may consume too much memory.
+    allocatedMemoryBlock =
+        PipeDataNodeResourceManager.memory()
+            .forceAllocateIfSufficient(
+                
PipeConfig.getInstance().getPipeMemoryAllocateForTsFileSequenceReaderInBytes(),
+                MEMORY_SUFFICIENT_THRESHOLD);
+    if (allocatedMemoryBlock == null) {
+      LOGGER.info(
+          "PipeTsFileResource: Failed to create TsFileSequenceReader for 
tsfile {} in cache, because memory usage is high",
+          hardlinkOrCopiedFile.getPath());
+      return false;
+    }
+
+    long memoryRequiredInBytes = 0L;
+    try (TsFileSequenceReader sequenceReader =
+        new TsFileSequenceReader(hardlinkOrCopiedFile.getPath(), true, true)) {
+      deviceMeasurementsMap = sequenceReader.getDeviceMeasurementsMap();
+      memoryRequiredInBytes +=
+          
PipeMemoryWeightUtil.memoryOfIDeviceID2StrList(deviceMeasurementsMap);
+
+      if (Objects.isNull(deviceIsAlignedMap)) {
+        deviceIsAlignedMap = new HashMap<>();
+        final TsFileDeviceIterator deviceIsAlignedIterator =
+            sequenceReader.getAllDevicesIteratorWithIsAligned();
+        while (deviceIsAlignedIterator.hasNext()) {
+          final Pair<IDeviceID, Boolean> deviceIsAlignedPair = 
deviceIsAlignedIterator.next();
+          deviceIsAlignedMap.put(deviceIsAlignedPair.getLeft(), 
deviceIsAlignedPair.getRight());
+        }
+      }
+      memoryRequiredInBytes += 
PipeMemoryWeightUtil.memoryOfIDeviceId2Bool(deviceIsAlignedMap);
 
       measurementDataTypeMap = sequenceReader.getFullPathDataTypeMap();
       memoryRequiredInBytes += 
PipeMemoryWeightUtil.memoryOfStr2TSDataType(measurementDataTypeMap);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
index e40bafe03b7..e7367956126 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
@@ -278,13 +278,13 @@ public class PipeTsFileResourceManager {
     }
   }
 
-  public Map<IDeviceID, Boolean> getDeviceIsAlignedMapFromCache(final File 
hardlinkOrCopiedTsFile)
-      throws IOException {
+  public Map<IDeviceID, Boolean> getDeviceIsAlignedMapFromCache(
+      final File hardlinkOrCopiedTsFile, final boolean cacheOtherMetadata) 
throws IOException {
     lock.lock();
     try {
       final PipeTsFileResource resource =
           
hardlinkOrCopiedFileToPipeTsFileResourceMap.get(hardlinkOrCopiedTsFile.getPath());
-      return resource == null ? null : resource.tryGetDeviceIsAlignedMap();
+      return resource == null ? null : 
resource.tryGetDeviceIsAlignedMap(cacheOtherMetadata);
     } finally {
       lock.unlock();
     }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
index 3f9f34f722e..74d500006f8 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
@@ -23,7 +23,9 @@ import org.apache.iotdb.commons.pipe.pattern.IoTDBPipePattern;
 import org.apache.iotdb.commons.pipe.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.pattern.PrefixPipePattern;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
-import 
org.apache.iotdb.db.pipe.event.common.tsfile.TsFileInsertionDataContainer;
+import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
+import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.query.TsFileInsertionQueryDataContainer;
+import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer;
 
 import org.apache.tsfile.file.metadata.PlainDeviceID;
 import org.apache.tsfile.read.TsFileSequenceReader;
@@ -69,20 +71,33 @@ public class TsFileInsertionDataContainerTest {
   }
 
   @Test
-  public void testToTabletInsertionEvents() throws Exception {
-    Set<Integer> deviceNumbers = new HashSet<>();
+  public void testQueryContainer() throws Exception {
+    final long startTime = System.currentTimeMillis();
+    testToTabletInsertionEvents(true);
+    System.out.println(System.currentTimeMillis() - startTime);
+  }
+
+  @Test
+  public void testScanContainer() throws Exception {
+    final long startTime = System.currentTimeMillis();
+    testToTabletInsertionEvents(false);
+    System.out.println(System.currentTimeMillis() - startTime);
+  }
+
+  public void testToTabletInsertionEvents(final boolean isQuery) throws 
Exception {
+    final Set<Integer> deviceNumbers = new HashSet<>();
     deviceNumbers.add(1);
     deviceNumbers.add(2);
 
-    Set<Integer> measurementNumbers = new HashSet<>();
+    final Set<Integer> measurementNumbers = new HashSet<>();
     measurementNumbers.add(1);
     measurementNumbers.add(2);
 
-    Set<String> patternFormats = new HashSet<>();
+    final Set<String> patternFormats = new HashSet<>();
     patternFormats.add(PREFIX_FORMAT);
     patternFormats.add(IOTDB_FORMAT);
 
-    Set<Pair<Long, Long>> startEndTimes = new HashSet<>();
+    final Set<Pair<Long, Long>> startEndTimes = new HashSet<>();
     startEndTimes.add(new Pair<>(100L, TSFILE_START_TIME - 1));
     startEndTimes.add(new Pair<>(100L, TSFILE_START_TIME));
     startEndTimes.add(new Pair<>(100L, TSFILE_START_TIME + 1));
@@ -100,31 +115,34 @@ public class TsFileInsertionDataContainerTest {
 
     startEndTimes.add(new Pair<>(Long.MIN_VALUE, Long.MAX_VALUE));
 
-    for (int deviceNumber : deviceNumbers) {
-      for (int measurementNumber : measurementNumbers) {
-        for (String patternFormat : patternFormats) {
-          for (Pair<Long, Long> startEndTime : startEndTimes) {
+    for (final int deviceNumber : deviceNumbers) {
+      for (final int measurementNumber : measurementNumbers) {
+        for (final String patternFormat : patternFormats) {
+          for (final Pair<Long, Long> startEndTime : startEndTimes) {
             testToTabletInsertionEvents(
                 deviceNumber,
                 measurementNumber,
                 0,
                 patternFormat,
                 startEndTime.left,
-                startEndTime.right);
+                startEndTime.right,
+                isQuery);
             testToTabletInsertionEvents(
                 deviceNumber,
                 measurementNumber,
                 1,
                 patternFormat,
                 startEndTime.left,
-                startEndTime.right);
+                startEndTime.right,
+                isQuery);
             testToTabletInsertionEvents(
                 deviceNumber,
                 measurementNumber,
                 2,
                 patternFormat,
                 startEndTime.left,
-                startEndTime.right);
+                startEndTime.right,
+                isQuery);
 
             testToTabletInsertionEvents(
                 deviceNumber,
@@ -132,21 +150,24 @@ public class TsFileInsertionDataContainerTest {
                 999,
                 patternFormat,
                 startEndTime.left,
-                startEndTime.right);
+                startEndTime.right,
+                isQuery);
             testToTabletInsertionEvents(
                 deviceNumber,
                 measurementNumber,
                 1000,
                 patternFormat,
                 startEndTime.left,
-                startEndTime.right);
+                startEndTime.right,
+                isQuery);
             testToTabletInsertionEvents(
                 deviceNumber,
                 measurementNumber,
                 1001,
                 patternFormat,
                 startEndTime.left,
-                startEndTime.right);
+                startEndTime.right,
+                isQuery);
 
             testToTabletInsertionEvents(
                 deviceNumber,
@@ -154,21 +175,24 @@ public class TsFileInsertionDataContainerTest {
                 999 * 2 + 1,
                 patternFormat,
                 startEndTime.left,
-                startEndTime.right);
+                startEndTime.right,
+                isQuery);
             testToTabletInsertionEvents(
                 deviceNumber,
                 measurementNumber,
                 1000,
                 patternFormat,
                 startEndTime.left,
-                startEndTime.right);
+                startEndTime.right,
+                isQuery);
             testToTabletInsertionEvents(
                 deviceNumber,
                 measurementNumber,
                 1001 * 2 - 1,
                 patternFormat,
                 startEndTime.left,
-                startEndTime.right);
+                startEndTime.right,
+                isQuery);
 
             testToTabletInsertionEvents(
                 deviceNumber,
@@ -176,21 +200,24 @@ public class TsFileInsertionDataContainerTest {
                 1023,
                 patternFormat,
                 startEndTime.left,
-                startEndTime.right);
+                startEndTime.right,
+                isQuery);
             testToTabletInsertionEvents(
                 deviceNumber,
                 measurementNumber,
                 1024,
                 patternFormat,
                 startEndTime.left,
-                startEndTime.right);
+                startEndTime.right,
+                isQuery);
             testToTabletInsertionEvents(
                 deviceNumber,
                 measurementNumber,
                 1025,
                 patternFormat,
                 startEndTime.left,
-                startEndTime.right);
+                startEndTime.right,
+                isQuery);
 
             testToTabletInsertionEvents(
                 deviceNumber,
@@ -198,21 +225,24 @@ public class TsFileInsertionDataContainerTest {
                 1023 * 2 + 1,
                 patternFormat,
                 startEndTime.left,
-                startEndTime.right);
+                startEndTime.right,
+                isQuery);
             testToTabletInsertionEvents(
                 deviceNumber,
                 measurementNumber,
                 1024 * 2,
                 patternFormat,
                 startEndTime.left,
-                startEndTime.right);
+                startEndTime.right,
+                isQuery);
             testToTabletInsertionEvents(
                 deviceNumber,
                 measurementNumber,
                 1025 * 2 - 1,
                 patternFormat,
                 startEndTime.left,
-                startEndTime.right);
+                startEndTime.right,
+                isQuery);
 
             testToTabletInsertionEvents(
                 deviceNumber,
@@ -220,7 +250,8 @@ public class TsFileInsertionDataContainerTest {
                 10001,
                 patternFormat,
                 startEndTime.left,
-                startEndTime.right);
+                startEndTime.right,
+                isQuery);
           }
         }
       }
@@ -228,12 +259,13 @@ public class TsFileInsertionDataContainerTest {
   }
 
   private void testToTabletInsertionEvents(
-      int deviceNumber,
-      int measurementNumber,
-      int rowNumberInOneDevice,
-      String patternFormat,
-      long startTime,
-      long endTime)
+      final int deviceNumber,
+      final int measurementNumber,
+      final int rowNumberInOneDevice,
+      final String patternFormat,
+      final long startTime,
+      final long endTime,
+      final boolean isQuery)
       throws Exception {
     LOGGER.debug(
         "testToTabletInsertionEvents: deviceNumber: {}, measurementNumber: {}, 
rowNumberInOneDevice: {}, patternFormat: {}, startTime: {}, endTime: {}",
@@ -298,12 +330,20 @@ public class TsFileInsertionDataContainerTest {
     }
 
     try (final TsFileInsertionDataContainer alignedContainer =
-            new TsFileInsertionDataContainer(alignedTsFile, rootPattern, 
startTime, endTime);
+            isQuery
+                ? new TsFileInsertionQueryDataContainer(
+                    alignedTsFile, rootPattern, startTime, endTime)
+                : new TsFileInsertionScanDataContainer(
+                    alignedTsFile, rootPattern, startTime, endTime, null, 
null);
         final TsFileInsertionDataContainer nonalignedContainer =
-            new TsFileInsertionDataContainer(nonalignedTsFile, rootPattern, 
startTime, endTime)) {
-      AtomicInteger count1 = new AtomicInteger(0);
-      AtomicInteger count2 = new AtomicInteger(0);
-      AtomicInteger count3 = new AtomicInteger(0);
+            isQuery
+                ? new TsFileInsertionQueryDataContainer(
+                    nonalignedTsFile, rootPattern, startTime, endTime)
+                : new TsFileInsertionScanDataContainer(
+                    nonalignedTsFile, rootPattern, startTime, endTime, null, 
null)) {
+      final AtomicInteger count1 = new AtomicInteger(0);
+      final AtomicInteger count2 = new AtomicInteger(0);
+      final AtomicInteger count3 = new AtomicInteger(0);
 
       alignedContainer
           .toTabletInsertionEvents()
@@ -366,8 +406,7 @@ public class TsFileInsertionDataContainerTest {
                                       (row, collector) -> {
                                         try {
                                           rowCollector.collectRow(row);
-                                          
Assert.assertEquals(measurementNumber, row.size());
-                                          count1.incrementAndGet();
+                                          count1.addAndGet(row.size());
                                         } catch (IOException e) {
                                           throw new RuntimeException(e);
                                         }
@@ -379,8 +418,7 @@ public class TsFileInsertionDataContainerTest {
                                       (row, collector) -> {
                                         try {
                                           collector.collectRow(row);
-                                          
Assert.assertEquals(measurementNumber, row.size());
-                                          count2.incrementAndGet();
+                                          count2.addAndGet(row.size());
                                         } catch (IOException e) {
                                           throw new RuntimeException(e);
                                         }
@@ -391,31 +429,30 @@ public class TsFileInsertionDataContainerTest {
                                               (row, collector) -> {
                                                 try {
                                                   collector.collectRow(row);
-                                                  Assert.assertEquals(
-                                                      measurementNumber, 
row.size());
-                                                  count3.incrementAndGet();
+                                                  count3.addAndGet(row.size());
                                                 } catch (IOException e) {
                                                   throw new 
RuntimeException(e);
                                                 }
                                               }))));
 
-      Assert.assertEquals(deviceNumber * expectedRowNumber, count1.get());
-      Assert.assertEquals(deviceNumber * expectedRowNumber, count2.get());
-      Assert.assertEquals(deviceNumber * expectedRowNumber, count3.get());
-    } catch (Exception e) {
+      // Calculate points in non-aligned tablets
+      Assert.assertEquals(deviceNumber * expectedRowNumber * 
measurementNumber, count1.get());
+      Assert.assertEquals(deviceNumber * expectedRowNumber * 
measurementNumber, count2.get());
+      Assert.assertEquals(deviceNumber * expectedRowNumber * 
measurementNumber, count3.get());
+    } catch (final Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
     }
 
-    AtomicReference<String> oneDeviceInAlignedTsFile = new AtomicReference<>();
-    AtomicReference<String> oneMeasurementInAlignedTsFile = new 
AtomicReference<>();
+    final AtomicReference<String> oneDeviceInAlignedTsFile = new 
AtomicReference<>();
+    final AtomicReference<String> oneMeasurementInAlignedTsFile = new 
AtomicReference<>();
 
-    AtomicReference<String> oneDeviceInUnalignedTsFile = new 
AtomicReference<>();
-    AtomicReference<String> oneMeasurementInUnalignedTsFile = new 
AtomicReference<>();
+    final AtomicReference<String> oneDeviceInUnalignedTsFile = new 
AtomicReference<>();
+    final AtomicReference<String> oneMeasurementInUnalignedTsFile = new 
AtomicReference<>();
 
-    try (TsFileSequenceReader alignedReader =
+    try (final TsFileSequenceReader alignedReader =
             new TsFileSequenceReader(alignedTsFile.getAbsolutePath());
-        TsFileSequenceReader nonalignedReader =
+        final TsFileSequenceReader nonalignedReader =
             new TsFileSequenceReader(nonalignedTsFile.getAbsolutePath())) {
 
       alignedReader
@@ -457,14 +494,20 @@ public class TsFileInsertionDataContainerTest {
     }
 
     try (final TsFileInsertionDataContainer alignedContainer =
-            new TsFileInsertionDataContainer(
-                alignedTsFile, oneAlignedDevicePattern, startTime, endTime);
+            isQuery
+                ? new TsFileInsertionQueryDataContainer(
+                    alignedTsFile, oneAlignedDevicePattern, startTime, endTime)
+                : new TsFileInsertionScanDataContainer(
+                    alignedTsFile, oneAlignedDevicePattern, startTime, 
endTime, null, null);
         final TsFileInsertionDataContainer nonalignedContainer =
-            new TsFileInsertionDataContainer(
-                nonalignedTsFile, oneNonAlignedDevicePattern, startTime, 
endTime)) {
-      AtomicInteger count1 = new AtomicInteger(0);
-      AtomicInteger count2 = new AtomicInteger(0);
-      AtomicInteger count3 = new AtomicInteger(0);
+            isQuery
+                ? new TsFileInsertionQueryDataContainer(
+                    nonalignedTsFile, oneNonAlignedDevicePattern, startTime, 
endTime)
+                : new TsFileInsertionScanDataContainer(
+                    nonalignedTsFile, oneNonAlignedDevicePattern, startTime, 
endTime, null, null)) {
+      final AtomicInteger count1 = new AtomicInteger(0);
+      final AtomicInteger count2 = new AtomicInteger(0);
+      final AtomicInteger count3 = new AtomicInteger(0);
 
       alignedContainer
           .toTabletInsertionEvents()
@@ -527,8 +570,7 @@ public class TsFileInsertionDataContainerTest {
                                       (row, collector) -> {
                                         try {
                                           rowCollector.collectRow(row);
-                                          
Assert.assertEquals(measurementNumber, row.size());
-                                          count1.incrementAndGet();
+                                          count1.addAndGet(row.size());
                                         } catch (IOException e) {
                                           throw new RuntimeException(e);
                                         }
@@ -540,8 +582,7 @@ public class TsFileInsertionDataContainerTest {
                                       (row, collector) -> {
                                         try {
                                           collector.collectRow(row);
-                                          
Assert.assertEquals(measurementNumber, row.size());
-                                          count2.incrementAndGet();
+                                          count2.addAndGet(row.size());
                                         } catch (IOException e) {
                                           throw new RuntimeException(e);
                                         }
@@ -552,18 +593,17 @@ public class TsFileInsertionDataContainerTest {
                                               (row, collector) -> {
                                                 try {
                                                   collector.collectRow(row);
-                                                  Assert.assertEquals(
-                                                      measurementNumber, 
row.size());
-                                                  count3.incrementAndGet();
+                                                  count3.addAndGet(row.size());
                                                 } catch (IOException e) {
                                                   throw new 
RuntimeException(e);
                                                 }
                                               }))));
 
-      Assert.assertEquals(expectedRowNumber, count1.get());
-      Assert.assertEquals(expectedRowNumber, count2.get());
-      Assert.assertEquals(expectedRowNumber, count3.get());
-    } catch (Exception e) {
+      // Calculate points in non-aligned tablets
+      Assert.assertEquals(expectedRowNumber * measurementNumber, count1.get());
+      Assert.assertEquals(expectedRowNumber * measurementNumber, count2.get());
+      Assert.assertEquals(expectedRowNumber * measurementNumber, count3.get());
+    } catch (final Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
     }
@@ -585,14 +625,25 @@ public class TsFileInsertionDataContainerTest {
     }
 
     try (final TsFileInsertionDataContainer alignedContainer =
-            new TsFileInsertionDataContainer(
-                alignedTsFile, oneAlignedMeasurementPattern, startTime, 
endTime);
+            isQuery
+                ? new TsFileInsertionQueryDataContainer(
+                    alignedTsFile, oneAlignedMeasurementPattern, startTime, 
endTime)
+                : new TsFileInsertionScanDataContainer(
+                    alignedTsFile, oneAlignedMeasurementPattern, startTime, 
endTime, null, null);
         final TsFileInsertionDataContainer nonalignedContainer =
-            new TsFileInsertionDataContainer(
-                nonalignedTsFile, oneNonAlignedMeasurementPattern, startTime, 
endTime)) {
-      AtomicInteger count1 = new AtomicInteger(0);
-      AtomicInteger count2 = new AtomicInteger(0);
-      AtomicInteger count3 = new AtomicInteger(0);
+            isQuery
+                ? new TsFileInsertionQueryDataContainer(
+                    nonalignedTsFile, oneNonAlignedMeasurementPattern, 
startTime, endTime)
+                : new TsFileInsertionScanDataContainer(
+                    nonalignedTsFile,
+                    oneNonAlignedMeasurementPattern,
+                    startTime,
+                    endTime,
+                    null,
+                    null)) {
+      final AtomicInteger count1 = new AtomicInteger(0);
+      final AtomicInteger count2 = new AtomicInteger(0);
+      final AtomicInteger count3 = new AtomicInteger(0);
 
       alignedContainer
           .toTabletInsertionEvents()
@@ -689,7 +740,7 @@ public class TsFileInsertionDataContainerTest {
       Assert.assertEquals(expectedRowNumber, count1.get());
       Assert.assertEquals(expectedRowNumber, count2.get());
       Assert.assertEquals(expectedRowNumber, count3.get());
-    } catch (Exception e) {
+    } catch (final Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
     }
@@ -706,13 +757,20 @@ public class TsFileInsertionDataContainerTest {
     }
 
     try (final TsFileInsertionDataContainer alignedContainer =
-            new TsFileInsertionDataContainer(alignedTsFile, notExistPattern, 
startTime, endTime);
+            isQuery
+                ? new TsFileInsertionQueryDataContainer(
+                    alignedTsFile, notExistPattern, startTime, endTime)
+                : new TsFileInsertionScanDataContainer(
+                    alignedTsFile, notExistPattern, startTime, endTime, null, 
null);
         final TsFileInsertionDataContainer nonalignedContainer =
-            new TsFileInsertionDataContainer(
-                nonalignedTsFile, notExistPattern, startTime, endTime)) {
-      AtomicInteger count1 = new AtomicInteger(0);
-      AtomicInteger count2 = new AtomicInteger(0);
-      AtomicInteger count3 = new AtomicInteger(0);
+            isQuery
+                ? new TsFileInsertionQueryDataContainer(
+                    nonalignedTsFile, notExistPattern, startTime, endTime)
+                : new TsFileInsertionScanDataContainer(
+                    nonalignedTsFile, notExistPattern, startTime, endTime, 
null, null)) {
+      final AtomicInteger count1 = new AtomicInteger(0);
+      final AtomicInteger count2 = new AtomicInteger(0);
+      final AtomicInteger count3 = new AtomicInteger(0);
 
       alignedContainer
           .toTabletInsertionEvents()
@@ -809,7 +867,7 @@ public class TsFileInsertionDataContainerTest {
       Assert.assertEquals(0, count1.get());
       Assert.assertEquals(0, count2.get());
       Assert.assertEquals(0, count3.get());
-    } catch (Exception e) {
+    } catch (final Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
     }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 1c6fb77e835..aef1a020a7f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -241,6 +241,7 @@ public class CommonConfig {
   private long pipeRemainingTimeCommitRateAutoSwitchSeconds = 30;
   private PipeRemainingTimeRateAverageTime 
pipeRemainingTimeCommitRateAverageTime =
       PipeRemainingTimeRateAverageTime.MEAN;
+  private double pipeTsFileScanParsingThreshold = 0.05;
 
   private long twoStageAggregateMaxCombinerLiveTimeInMs = 8 * 60 * 1000L; // 8 
minutes
   private long twoStageAggregateDataRegionInfoCacheTimeInMs = 3 * 60 * 1000L; 
// 3 minutes
@@ -1044,6 +1045,14 @@ public class CommonConfig {
     this.pipeRemainingTimeCommitRateAverageTime = 
pipeRemainingTimeCommitRateAverageTime;
   }
 
+  public double getPipeTsFileScanParsingThreshold() {
+    return pipeTsFileScanParsingThreshold;
+  }
+
+  public void setPipeTsFileScanParsingThreshold(double 
pipeTsFileScanParsingThreshold) {
+    this.pipeTsFileScanParsingThreshold = pipeTsFileScanParsingThreshold;
+  }
+
   public double getPipeAllSinksRateLimitBytesPerSecond() {
     return pipeAllSinksRateLimitBytesPerSecond;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index b989e8e5055..135cef409b0 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -559,6 +559,11 @@ public class CommonDescriptor {
                     "pipe_remaining_time_commit_rate_average_time",
                     
String.valueOf(config.getPipeRemainingTimeCommitRateAverageTime()))
                 .trim()));
+    config.setPipeTsFileScanParsingThreshold(
+        Double.parseDouble(
+            properties.getProperty(
+                "pipe_tsfile_scan_parsing_threshold",
+                String.valueOf(config.getPipeTsFileScanParsingThreshold()))));
 
     config.setTwoStageAggregateMaxCombinerLiveTimeInMs(
         Long.parseLong(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 818ef38dd84..270a5d4fb8e 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -152,6 +152,10 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeRemainingTimeCommitRateAverageTime();
   }
 
+  public double getPipeTsFileScanParsingThreshold() {
+    return COMMON_CONFIG.getPipeTsFileScanParsingThreshold();
+  }
+
   /////////////////////////////// Meta Consistency 
///////////////////////////////
 
   public boolean isSeperatedPipeHeartbeatEnabled() {
@@ -341,6 +345,7 @@ public class PipeConfig {
         getPipeRemainingTimeCommitAutoSwitchSeconds());
     LOGGER.info(
         "PipeRemainingTimeCommitRateAverageTime: {}", 
getPipeRemainingTimeCommitRateAverageTime());
+    LOGGER.info("PipeTsFileScanParsingThreshold(): {}", 
getPipeTsFileScanParsingThreshold());
 
     LOGGER.info("PipeAsyncConnectorSelectorNumber: {}", 
getPipeAsyncConnectorSelectorNumber());
     LOGGER.info("PipeAsyncConnectorMaxClientNumber: {}", 
getPipeAsyncConnectorMaxClientNumber());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/IoTDBPipePattern.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/IoTDBPipePattern.java
index 30307d07711..4ca04ef35e1 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/IoTDBPipePattern.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/IoTDBPipePattern.java
@@ -185,6 +185,10 @@ public class IoTDBPipePattern extends PipePattern {
     return !patternPartialPath.hasWildcard();
   }
 
+  public boolean mayMatchMultipleTimeSeriesInOneDevice() {
+    return PathPatternUtil.hasWildcard(patternPartialPath.getTailNode());
+  }
+
   @Override
   public String toString() {
     return "IoTDBPipePattern" + super.toString();


Reply via email to