This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 08281208099 [IOTDB-5977][IOTDB-5979][IOTDB-5981] Pipe: serveral bug 
fixes in pipe execution engine (#10087)
08281208099 is described below

commit 082812080998c12964cecbd1f8da209b39c0420c
Author: Steve Yurong Su <[email protected]>
AuthorDate: Fri Jun 9 01:50:23 2023 +0800

    [IOTDB-5977][IOTDB-5979][IOTDB-5981] Pipe: serveral bug fixes in pipe 
execution engine (#10087)
    
    * [IOTDB-5977] Pipe: start-time and end-time in collector.history are not 
working correctly
    
    * [IOTDB-5979] Pipe: validation and customization failures during the first 
run of the PipeProcessor will affect the creation of subsequent pipes
    
    * [IOTDB-5981] Pipe: fail to process realtime TEXT data with pattern filter
---
 .../iotdb/db/pipe/agent/task/PipeTaskAgent.java    |  96 ++++++++----
 .../PipeHistoricalDataRegionTsFileCollector.java   |  48 ++++--
 .../event/common/row/PipeBinaryTransformer.java    |  37 +++++
 .../iotdb/db/pipe/event/common/row/PipeRow.java    |  10 +-
 .../db/pipe/event/common/row/PipeRowCollector.java |  10 +-
 .../common/tsfile/PipeTsFileInsertionEvent.java    |  23 ++-
 .../tsfile/TsFileInsertionDataContainer.java       |  87 +++++++----
 .../tsfile/TsFileInsertionDataTabletIterator.java  |  10 +-
 .../db/pipe/processor/PipeDoNothingProcessor.java  |  11 +-
 .../apache/iotdb/db/pipe/task/PipeTaskBuilder.java |   1 -
 .../db/pipe/task/stage/PipeTaskCollectorStage.java |  49 +++---
 .../db/pipe/task/stage/PipeTaskProcessorStage.java |  42 ++---
 .../event/TsFileInsertionDataContainerTest.java    | 169 +++++++++++++++------
 13 files changed, 397 insertions(+), 196 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index d21d8ed5d4e..e3fe56a52ec 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.db.pipe.task.PipeTaskBuilder;
 import org.apache.iotdb.db.pipe.task.PipeTaskManager;
 import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
 import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
+import org.apache.iotdb.pipe.api.exception.PipeException;
 
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
@@ -86,42 +87,52 @@ public class PipeTaskAgent {
       return;
     }
 
+    final List<Exception> exceptions = new ArrayList<>();
+
     // iterate through pipe meta list from config node, check if pipe meta 
exists on data node
     // or has changed
     for (final PipeMeta metaFromConfigNode : pipeMetaListFromConfigNode) {
       final String pipeName = metaFromConfigNode.getStaticMeta().getPipeName();
 
-      final PipeMeta metaOnDataNode = pipeMetaKeeper.getPipeMeta(pipeName);
-
-      // if pipe meta does not exist on data node, create a new pipe
-      if (metaOnDataNode == null) {
-        if (createPipe(metaFromConfigNode)) {
-          // if the status recorded in config node is RUNNING, start the pipe
-          startPipe(pipeName, 
metaFromConfigNode.getStaticMeta().getCreationTime());
+      try {
+        final PipeMeta metaOnDataNode = pipeMetaKeeper.getPipeMeta(pipeName);
+
+        // if pipe meta does not exist on data node, create a new pipe
+        if (metaOnDataNode == null) {
+          if (createPipe(metaFromConfigNode)) {
+            // if the status recorded in config node is RUNNING, start the pipe
+            startPipe(pipeName, 
metaFromConfigNode.getStaticMeta().getCreationTime());
+          }
+          // if the status recorded in config node is STOPPED or DROPPED, do 
nothing
+          continue;
         }
-        // if the status recorded in config node is STOPPED or DROPPED, do 
nothing
-        continue;
-      }
 
-      // if pipe meta exists on data node, check if it has changed
-      final PipeStaticMeta staticMetaOnDataNode = 
metaOnDataNode.getStaticMeta();
-      final PipeStaticMeta staticMetaFromConfigNode = 
metaFromConfigNode.getStaticMeta();
-
-      // first check if pipe static meta has changed, if so, drop the pipe and 
create a new one
-      if (!staticMetaOnDataNode.equals(staticMetaFromConfigNode)) {
-        dropPipe(pipeName);
-        if (createPipe(metaFromConfigNode)) {
-          startPipe(pipeName, 
metaFromConfigNode.getStaticMeta().getCreationTime());
+        // if pipe meta exists on data node, check if it has changed
+        final PipeStaticMeta staticMetaOnDataNode = 
metaOnDataNode.getStaticMeta();
+        final PipeStaticMeta staticMetaFromConfigNode = 
metaFromConfigNode.getStaticMeta();
+
+        // first check if pipe static meta has changed, if so, drop the pipe 
and create a new one
+        if (!staticMetaOnDataNode.equals(staticMetaFromConfigNode)) {
+          dropPipe(pipeName);
+          if (createPipe(metaFromConfigNode)) {
+            startPipe(pipeName, 
metaFromConfigNode.getStaticMeta().getCreationTime());
+          }
+          // if the status is STOPPED or DROPPED, do nothing
+          continue;
         }
-        // if the status is STOPPED or DROPPED, do nothing
-        continue;
-      }
 
-      // then check if pipe runtime meta has changed, if so, update the pipe
-      final PipeRuntimeMeta runtimeMetaOnDataNode = 
metaOnDataNode.getRuntimeMeta();
-      final PipeRuntimeMeta runtimeMetaFromConfigNode = 
metaFromConfigNode.getRuntimeMeta();
-      handlePipeRuntimeMetaChanges(
-          staticMetaFromConfigNode, runtimeMetaFromConfigNode, 
runtimeMetaOnDataNode);
+        // then check if pipe runtime meta has changed, if so, update the pipe
+        final PipeRuntimeMeta runtimeMetaOnDataNode = 
metaOnDataNode.getRuntimeMeta();
+        final PipeRuntimeMeta runtimeMetaFromConfigNode = 
metaFromConfigNode.getRuntimeMeta();
+        handlePipeRuntimeMetaChanges(
+            staticMetaFromConfigNode, runtimeMetaFromConfigNode, 
runtimeMetaOnDataNode);
+      } catch (Exception e) {
+        final String errorMessage =
+            String.format(
+                "Failed to handle pipe meta changes for %s, because %s", 
pipeName, e.getMessage());
+        LOGGER.warn("Failed to handle pipe meta changes for {}", pipeName, e);
+        exceptions.add(new PipeException(errorMessage, e));
+      }
     }
 
     // check if there are pipes on data node that do not exist on config node, 
if so, drop them
@@ -130,10 +141,26 @@ public class PipeTaskAgent {
             .map(meta -> meta.getStaticMeta().getPipeName())
             .collect(Collectors.toSet());
     for (final PipeMeta metaOnDataNode : pipeMetaKeeper.getPipeMetaList()) {
-      if 
(!pipeNamesFromConfigNode.contains(metaOnDataNode.getStaticMeta().getPipeName()))
 {
-        dropPipe(metaOnDataNode.getStaticMeta().getPipeName());
+      final String pipeName = metaOnDataNode.getStaticMeta().getPipeName();
+
+      try {
+        if (!pipeNamesFromConfigNode.contains(pipeName)) {
+          dropPipe(metaOnDataNode.getStaticMeta().getPipeName());
+        }
+      } catch (Exception e) {
+        final String errorMessage =
+            String.format(
+                "Failed to handle pipe meta changes for %s, because %s", 
pipeName, e.getMessage());
+        LOGGER.warn("Failed to handle pipe meta changes for {}", pipeName, e);
+        exceptions.add(new PipeException(errorMessage, e));
       }
     }
+
+    if (!exceptions.isEmpty()) {
+      throw new PipeException(
+          String.format(
+              "Failed to handle pipe meta changes on data node, because: %s", 
exceptions));
+    }
   }
 
   private void handlePipeRuntimeMetaChanges(
@@ -239,7 +266,16 @@ public class PipeTaskAgent {
 
   public synchronized void dropAllPipeTasks() {
     for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
-      dropPipe(pipeMeta.getStaticMeta().getPipeName(), 
pipeMeta.getStaticMeta().getCreationTime());
+      try {
+        dropPipe(
+            pipeMeta.getStaticMeta().getPipeName(), 
pipeMeta.getStaticMeta().getCreationTime());
+      } catch (final Exception e) {
+        LOGGER.warn(
+            "Failed to drop pipe {} with creation time {}",
+            pipeMeta.getStaticMeta().getPipeName(),
+            pipeMeta.getStaticMeta().getCreationTime(),
+            e);
+      }
     }
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
index 8a6f21603e9..f39923fd67a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
@@ -44,6 +44,7 @@ import java.util.ArrayDeque;
 import java.util.Queue;
 import java.util.stream.Collectors;
 
+import static 
org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_HISTORY_ENABLE_KEY;
 import static 
org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_HISTORY_END_TIME;
 import static 
org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_HISTORY_START_TIME;
 import static 
org.apache.iotdb.db.pipe.config.PipeCollectorConstant.DATA_REGION_KEY;
@@ -56,12 +57,13 @@ public class PipeHistoricalDataRegionTsFileCollector 
extends PipeHistoricalDataR
   private final PipeTaskMeta pipeTaskMeta;
   private final ProgressIndex startIndex;
 
-  private String pattern;
   private int dataRegionId;
 
-  private final long historicalDataCollectionTimeLowerBound;
-  private long historicalDataCollectionStartTime;
-  private long historicalDataCollectionEndTime;
+  private String pattern;
+
+  private final long historicalDataCollectionTimeLowerBound; // arrival time
+  private long historicalDataCollectionStartTime; // event time
+  private long historicalDataCollectionEndTime; // event time
 
   private Queue<PipeTsFileInsertionEvent> pendingQueue;
 
@@ -81,18 +83,24 @@ public class PipeHistoricalDataRegionTsFileCollector 
extends PipeHistoricalDataR
   @Override
   public void customize(
       PipeParameters parameters, PipeCollectorRuntimeConfiguration 
configuration) {
+    dataRegionId = parameters.getInt(DATA_REGION_KEY);
+
     pattern =
         parameters.getStringOrDefault(
             PipeCollectorConstant.COLLECTOR_PATTERN_KEY,
             PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE);
-    dataRegionId = parameters.getInt(DATA_REGION_KEY);
+
+    // user may set the COLLECTOR_HISTORY_START_TIME and 
COLLECTOR_HISTORY_END_TIME without
+    // enabling the historical data collection, which may affect the realtime 
data collection.
+    final boolean isHistoricalCollectorEnabledByUser =
+        parameters.getBooleanOrDefault(COLLECTOR_HISTORY_ENABLE_KEY, true);
     historicalDataCollectionStartTime =
-        parameters.hasAttribute(COLLECTOR_HISTORY_START_TIME)
+        isHistoricalCollectorEnabledByUser && 
parameters.hasAttribute(COLLECTOR_HISTORY_START_TIME)
             ? DateTimeUtils.convertDatetimeStrToLong(
                 parameters.getString(COLLECTOR_HISTORY_START_TIME), 
ZoneId.systemDefault())
             : Long.MIN_VALUE;
     historicalDataCollectionEndTime =
-        parameters.hasAttribute(COLLECTOR_HISTORY_END_TIME)
+        isHistoricalCollectorEnabledByUser && 
parameters.hasAttribute(COLLECTOR_HISTORY_END_TIME)
             ? DateTimeUtils.convertDatetimeStrToLong(
                 parameters.getString(COLLECTOR_HISTORY_END_TIME), 
ZoneId.systemDefault())
             : Long.MAX_VALUE;
@@ -158,7 +166,14 @@ public class PipeHistoricalDataRegionTsFileCollector 
extends PipeHistoricalDataR
                         
!startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
                             && 
isTsFileResourceOverlappedWithTimeRange(resource)
                             && 
isTsFileGeneratedAfterCollectionTimeLowerBound(resource))
-                .map(resource -> new PipeTsFileInsertionEvent(resource, 
pipeTaskMeta, pattern))
+                .map(
+                    resource ->
+                        new PipeTsFileInsertionEvent(
+                            resource,
+                            pipeTaskMeta,
+                            pattern,
+                            historicalDataCollectionStartTime,
+                            historicalDataCollectionEndTime))
                 .collect(Collectors.toList()));
         pendingQueue.addAll(
             tsFileManager.getTsFileList(false).stream()
@@ -167,7 +182,14 @@ public class PipeHistoricalDataRegionTsFileCollector 
extends PipeHistoricalDataR
                         
!startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
                             && 
isTsFileResourceOverlappedWithTimeRange(resource)
                             && 
isTsFileGeneratedAfterCollectionTimeLowerBound(resource))
-                .map(resource -> new PipeTsFileInsertionEvent(resource, 
pipeTaskMeta, pattern))
+                .map(
+                    resource ->
+                        new PipeTsFileInsertionEvent(
+                            resource,
+                            pipeTaskMeta,
+                            pattern,
+                            historicalDataCollectionStartTime,
+                            historicalDataCollectionEndTime))
                 .collect(Collectors.toList()));
         pendingQueue.forEach(
             event ->
@@ -182,8 +204,8 @@ public class PipeHistoricalDataRegionTsFileCollector 
extends PipeHistoricalDataR
   }
 
   private boolean isTsFileResourceOverlappedWithTimeRange(TsFileResource 
resource) {
-    return !(resource.getFileEndTime() < historicalDataCollectionStartTime
-        || historicalDataCollectionEndTime < resource.getFileStartTime());
+    return historicalDataCollectionStartTime <= resource.getFileEndTime()
+        || resource.getFileStartTime() <= historicalDataCollectionEndTime;
   }
 
   private boolean 
isTsFileGeneratedAfterCollectionTimeLowerBound(TsFileResource resource) {
@@ -192,7 +214,9 @@ public class PipeHistoricalDataRegionTsFileCollector 
extends PipeHistoricalDataR
           <= 
TsFileNameGenerator.getTsFileName(resource.getTsFile().getName()).getTime();
     } catch (IOException e) {
       LOGGER.warn(
-          String.format("failed to get the generation time of TsFile %s", 
resource.getTsFilePath()),
+          String.format(
+              "failed to get the generation time of TsFile %s, collect it 
anyway",
+              resource.getTsFilePath()),
           e);
       // If failed to get the generation time of the TsFile, we will collect 
the data in the TsFile
       // anyway.
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeBinaryTransformer.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeBinaryTransformer.java
new file mode 100644
index 00000000000..70d4c362c70
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeBinaryTransformer.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.event.common.row;
+
+public class PipeBinaryTransformer {
+
+  public static org.apache.iotdb.tsfile.utils.Binary transformToBinary(
+      org.apache.iotdb.pipe.api.type.Binary binary) {
+    return binary == null ? null : new 
org.apache.iotdb.tsfile.utils.Binary(binary.getValues());
+  }
+
+  public static org.apache.iotdb.pipe.api.type.Binary transformToPipeBinary(
+      org.apache.iotdb.tsfile.utils.Binary binary) {
+    return binary == null ? null : new 
org.apache.iotdb.pipe.api.type.Binary(binary.getValues());
+  }
+
+  private PipeBinaryTransformer() {
+    // util class
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
index 8b9e10ebf91..24ad5e108fc 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.pipe.event.common.row;
 
 import org.apache.iotdb.pipe.api.access.Row;
 import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
-import org.apache.iotdb.pipe.api.type.Binary;
 import org.apache.iotdb.pipe.api.type.Type;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
@@ -95,13 +94,16 @@ public class PipeRow implements Row {
   }
 
   @Override
-  public Binary getBinary(int columnIndex) {
-    return ((Binary[]) valueColumns[columnIndex])[rowIndex];
+  public org.apache.iotdb.pipe.api.type.Binary getBinary(int columnIndex) {
+    return PipeBinaryTransformer.transformToPipeBinary(
+        ((org.apache.iotdb.tsfile.utils.Binary[]) 
valueColumns[columnIndex])[rowIndex]);
   }
 
   @Override
   public String getString(int columnIndex) {
-    return ((Binary[]) valueColumns[columnIndex])[rowIndex].getStringValue();
+    final org.apache.iotdb.tsfile.utils.Binary binary =
+        ((org.apache.iotdb.tsfile.utils.Binary[]) 
valueColumns[columnIndex])[rowIndex];
+    return binary == null ? null : binary.getStringValue();
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
index 983a7877713..ea989d48027 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
@@ -56,7 +56,15 @@ public class PipeRowCollector implements RowCollector {
     final int rowIndex = tablet.rowSize;
     tablet.addTimestamp(rowIndex, row.getTime());
     for (int i = 0; i < row.size(); i++) {
-      tablet.addValue(measurementSchemaArray[i].getMeasurementId(), rowIndex, 
row.getObject(i));
+      final Object value = row.getObject(i);
+      if (value instanceof org.apache.iotdb.pipe.api.type.Binary) {
+        tablet.addValue(
+            measurementSchemaArray[i].getMeasurementId(),
+            rowIndex,
+            
PipeBinaryTransformer.transformToBinary((org.apache.iotdb.pipe.api.type.Binary) 
value));
+      } else {
+        tablet.addValue(measurementSchemaArray[i].getMeasurementId(), 
rowIndex, value);
+      }
       if (row.isNull(i)) {
         tablet.bitMaps[i].mark(rowIndex);
       }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 785948b66a4..d82313dad8d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -41,6 +41,10 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTsFileInsertionEvent.class);
 
+  // used to filter data
+  private final long startTime;
+  private final long endTime;
+
   private final TsFileResource resource;
   private File tsFile;
 
@@ -49,13 +53,20 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
   private TsFileInsertionDataContainer dataContainer;
 
   public PipeTsFileInsertionEvent(TsFileResource resource) {
-    this(resource, null, null);
+    this(resource, null, null, Long.MIN_VALUE, Long.MAX_VALUE);
   }
 
   public PipeTsFileInsertionEvent(
-      TsFileResource resource, PipeTaskMeta pipeTaskMeta, String pattern) {
+      TsFileResource resource,
+      PipeTaskMeta pipeTaskMeta,
+      String pattern,
+      long startTime,
+      long endTime) {
     super(pipeTaskMeta, pattern);
 
+    this.startTime = startTime;
+    this.endTime = endTime;
+
     this.resource = resource;
     tsFile = resource.getTsFile();
 
@@ -89,6 +100,10 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
     return tsFile;
   }
 
+  public boolean hasTimeFilter() {
+    return startTime != Long.MIN_VALUE || endTime != Long.MAX_VALUE;
+  }
+
   /////////////////////////// EnrichedEvent ///////////////////////////
 
   @Override
@@ -138,7 +153,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
   @Override
   public PipeTsFileInsertionEvent 
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
       PipeTaskMeta pipeTaskMeta, String pattern) {
-    return new PipeTsFileInsertionEvent(resource, pipeTaskMeta, pattern);
+    return new PipeTsFileInsertionEvent(resource, pipeTaskMeta, pattern, 
startTime, endTime);
   }
 
   /////////////////////////// TsFileInsertionEvent ///////////////////////////
@@ -148,7 +163,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
     try {
       if (dataContainer == null) {
         waitForTsFileClose();
-        dataContainer = new TsFileInsertionDataContainer(tsFile, getPattern());
+        dataContainer = new TsFileInsertionDataContainer(tsFile, getPattern(), 
startTime, endTime);
       }
       return dataContainer.toTabletInsertionEvents();
     } catch (InterruptedException e) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
index 6426e8dccad..e9347d31202 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
@@ -26,6 +26,10 @@ import 
org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TsFileReader;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.BinaryExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,7 +47,9 @@ public class TsFileInsertionDataContainer implements 
AutoCloseable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TsFileInsertionDataContainer.class);
 
+  // used to filter data
   private final String pattern;
+  private final IExpression timeFilterExpression;
 
   private final TsFileSequenceReader tsFileSequenceReader;
   private final TsFileReader tsFileReader;
@@ -51,14 +57,27 @@ public class TsFileInsertionDataContainer implements 
AutoCloseable {
   private final Iterator<Map.Entry<String, List<String>>> 
deviceMeasurementsMapIterator;
   private final Map<String, TSDataType> measurementDataTypeMap;
 
-  public TsFileInsertionDataContainer(File tsFile, String pattern) throws 
IOException {
+  public TsFileInsertionDataContainer(File tsFile, String pattern, long 
startTime, long endTime)
+      throws IOException {
     this.pattern = pattern;
-
-    tsFileSequenceReader = new TsFileSequenceReader(tsFile.getAbsolutePath());
-    tsFileReader = new TsFileReader(tsFileSequenceReader);
-
-    deviceMeasurementsMapIterator = 
filterDeviceMeasurementsMapByPattern().entrySet().iterator();
-    measurementDataTypeMap = tsFileSequenceReader.getFullPathDataTypeMap();
+    timeFilterExpression =
+        (startTime == Long.MIN_VALUE && endTime == Long.MAX_VALUE)
+            ? null
+            : BinaryExpression.and(
+                new GlobalTimeExpression(TimeFilter.gtEq(startTime)),
+                new GlobalTimeExpression(TimeFilter.ltEq(endTime)));
+
+    try {
+      tsFileSequenceReader = new 
TsFileSequenceReader(tsFile.getAbsolutePath());
+      tsFileReader = new TsFileReader(tsFileSequenceReader);
+
+      deviceMeasurementsMapIterator = 
filterDeviceMeasurementsMapByPattern().entrySet().iterator();
+      measurementDataTypeMap = tsFileSequenceReader.getFullPathDataTypeMap();
+    } catch (Exception e) {
+      LOGGER.error("failed to create TsFileInsertionDataContainer", e);
+      close();
+      throw e;
+    }
   }
 
   private Map<String, List<String>> filterDeviceMeasurementsMapByPattern() 
throws IOException {
@@ -109,19 +128,9 @@ public class TsFileInsertionDataContainer implements 
AutoCloseable {
 
           @Override
           public boolean hasNext() {
-            return (tabletIterator != null && tabletIterator.hasNext())
-                || deviceMeasurementsMapIterator.hasNext();
-          }
-
-          @Override
-          public TabletInsertionEvent next() {
-            if (!hasNext()) {
-              throw new NoSuchElementException();
-            }
-
             while (tabletIterator == null || !tabletIterator.hasNext()) {
               if (!deviceMeasurementsMapIterator.hasNext()) {
-                throw new NoSuchElementException();
+                return false;
               }
 
               final Map.Entry<String, List<String>> entry = 
deviceMeasurementsMapIterator.next();
@@ -129,21 +138,32 @@ public class TsFileInsertionDataContainer implements 
AutoCloseable {
               try {
                 tabletIterator =
                     new TsFileInsertionDataTabletIterator(
-                        tsFileReader, measurementDataTypeMap, entry.getKey(), 
entry.getValue());
+                        tsFileReader,
+                        measurementDataTypeMap,
+                        entry.getKey(),
+                        entry.getValue(),
+                        timeFilterExpression);
               } catch (IOException e) {
+                close();
                 throw new PipeException("failed to create 
TsFileInsertionDataTabletIterator", e);
               }
             }
 
+            return true;
+          }
+
+          @Override
+          public TabletInsertionEvent next() {
+            if (!hasNext()) {
+              close();
+              throw new NoSuchElementException();
+            }
+
             final TabletInsertionEvent next =
                 new PipeRawTabletInsertionEvent(tabletIterator.next());
 
             if (!hasNext()) {
-              try {
-                close();
-              } catch (Exception e) {
-                LOGGER.warn("Failed to close TsFileInsertionDataContainer", e);
-              }
+              close();
             }
 
             return next;
@@ -152,12 +172,21 @@ public class TsFileInsertionDataContainer implements 
AutoCloseable {
   }
 
   @Override
-  public void close() throws Exception {
-    if (tsFileReader != null) {
-      tsFileReader.close();
+  public void close() {
+    try {
+      if (tsFileReader != null) {
+        tsFileReader.close();
+      }
+    } catch (IOException e) {
+      LOGGER.warn("Failed to close TsFileReader", e);
     }
-    if (tsFileSequenceReader != null) {
-      tsFileSequenceReader.close();
+
+    try {
+      if (tsFileSequenceReader != null) {
+        tsFileSequenceReader.close();
+      }
+    } catch (IOException e) {
+      LOGGER.warn("Failed to close TsFileSequenceReader", e);
     }
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataTabletIterator.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataTabletIterator.java
index 8b3b1e14fe5..63b2b15d403 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataTabletIterator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataTabletIterator.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.tsfile.read.TsFileReader;
 import org.apache.iotdb.tsfile.read.common.Field;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.expression.QueryExpression;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.write.record.Tablet;
@@ -47,13 +48,16 @@ public class TsFileInsertionDataTabletIterator implements 
Iterator<Tablet> {
   private final String deviceId;
   private final List<String> measurements;
 
+  private final IExpression timeFilterExpression;
+
   private final QueryDataSet queryDataSet;
 
   public TsFileInsertionDataTabletIterator(
       TsFileReader tsFileReader,
       Map<String, TSDataType> measurementDataTypeMap,
       String deviceId,
-      List<String> measurements)
+      List<String> measurements,
+      IExpression timeFilterExpression)
       throws IOException {
     this.tsFileReader = tsFileReader;
     this.measurementDataTypeMap = measurementDataTypeMap;
@@ -68,6 +72,8 @@ public class TsFileInsertionDataTabletIterator implements 
Iterator<Tablet> {
             .sorted()
             .collect(Collectors.toList());
 
+    this.timeFilterExpression = timeFilterExpression;
+
     this.queryDataSet = buildQueryDataSet();
   }
 
@@ -76,7 +82,7 @@ public class TsFileInsertionDataTabletIterator implements 
Iterator<Tablet> {
     for (String measurement : measurements) {
       paths.add(new Path(deviceId, measurement, false));
     }
-    return tsFileReader.query(QueryExpression.create(paths, null));
+    return tsFileReader.query(QueryExpression.create(paths, 
timeFilterExpression));
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/processor/PipeDoNothingProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/processor/PipeDoNothingProcessor.java
index a976358611e..c01751ed824 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/processor/PipeDoNothingProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/processor/PipeDoNothingProcessor.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.processor;
 
 import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
 import org.apache.iotdb.db.pipe.event.EnrichedEvent;
+import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.PipeProcessor;
 import org.apache.iotdb.pipe.api.collector.EventCollector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
@@ -82,11 +83,11 @@ public class PipeDoNothingProcessor implements 
PipeProcessor {
   @Override
   public void process(TsFileInsertionEvent tsFileInsertionEvent, 
EventCollector eventCollector)
       throws IOException {
-    if (tsFileInsertionEvent instanceof EnrichedEvent) {
-      final EnrichedEvent enrichedEvent = (EnrichedEvent) tsFileInsertionEvent;
-      if (enrichedEvent
-          .getPattern()
-          .equals(PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE)) {
+    if (tsFileInsertionEvent instanceof PipeTsFileInsertionEvent) {
+      final PipeTsFileInsertionEvent enrichedEvent =
+          (PipeTsFileInsertionEvent) tsFileInsertionEvent;
+      if 
(enrichedEvent.getPattern().equals(PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE)
+          && !enrichedEvent.hasTimeFilter()) {
         eventCollector.collect(tsFileInsertionEvent);
       } else {
         for (final TabletInsertionEvent tabletInsertionEvent :
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
index 715816cfbcc..7ad1bffa2a4 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
@@ -58,7 +58,6 @@ public class PipeTaskBuilder {
             pipeStaticMeta.getPipeName(),
             dataRegionId,
             collectorStage.getEventSupplier(),
-            collectorStage.getCollectorPendingQueue(),
             pipeStaticMeta.getProcessorParameters(),
             connectorStage.getPipeConnectorPendingQueue());
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
index 9e03458bfa0..ca1ec8fda9b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
@@ -31,28 +31,12 @@ import org.apache.iotdb.pipe.api.PipeCollector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 import 
org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
 import java.util.HashMap;
 
 public class PipeTaskCollectorStage extends PipeTaskStage {
 
-  private final PipeParameters collectorParameters;
-
-  /**
-   * TODO: have a better way to control busy/idle status of 
PipeTaskCollectorStage.
-   *
-   * <p>Currently, this field is for IoTDBDataRegionCollector only. 
IoTDBDataRegionCollector uses
-   * collectorPendingQueue as an internal data structure to store realtime 
events.
-   *
-   * <p>PendingQueue can detect whether the queue is empty or not, and it can 
notify the
-   * PipeTaskProcessorStage to stop processing data when the queue is empty to 
avoid unnecessary
-   * processing, and it also can notify the PipeTaskProcessorStage to start 
processing data when the
-   * queue is not empty.
-   */
-  private UnboundedBlockingPendingQueue<Event> collectorPendingQueue;
-
   private final PipeCollector pipeCollector;
 
   public PipeTaskCollectorStage(
@@ -60,6 +44,8 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
       PipeTaskMeta pipeTaskMeta,
       long creationTime,
       PipeParameters collectorParameters) {
+    PipeParameters localizedCollectorParameters;
+
     // TODO: avoid if-else, use reflection to create collector all the time
     if (collectorParameters
         .getStringOrDefault(
@@ -69,40 +55,43 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
       // we want to pass data region id to collector, so we need to create a 
new collector
       // parameters and put data region id into it. we can't put data region 
id into collector
       // parameters directly, because the given collector parameters may be 
used by other pipe task.
-      this.collectorParameters =
+      localizedCollectorParameters =
           new PipeParameters(new 
HashMap<>(collectorParameters.getAttribute()));
       // set data region id to collector parameters, so that collector can get 
data region id inside
       // collector
-      this.collectorParameters
+      localizedCollectorParameters
           .getAttribute()
           .put(PipeCollectorConstant.DATA_REGION_KEY, 
String.valueOf(dataRegionId.getId()));
 
-      collectorPendingQueue = new UnboundedBlockingPendingQueue<>();
       this.pipeCollector =
-          new IoTDBDataRegionCollector(pipeTaskMeta, creationTime, 
collectorPendingQueue);
+          new IoTDBDataRegionCollector(
+              pipeTaskMeta, creationTime, new 
UnboundedBlockingPendingQueue<>());
     } else {
-      this.collectorParameters = collectorParameters;
+      localizedCollectorParameters = collectorParameters;
 
-      this.pipeCollector = 
PipeAgent.plugin().reflectCollector(collectorParameters);
+      this.pipeCollector = 
PipeAgent.plugin().reflectCollector(localizedCollectorParameters);
     }
-  }
 
-  @Override
-  public void createSubtask() throws PipeException {
+    // validate and customize should be called before createSubtask. this 
allows collector exposing
+    // exceptions in advance.
     try {
       // 1. validate collector parameters
-      pipeCollector.validate(new PipeParameterValidator(collectorParameters));
+      pipeCollector.validate(new 
PipeParameterValidator(localizedCollectorParameters));
 
       // 2. customize collector
       final PipeCollectorRuntimeConfiguration runtimeConfiguration =
           new PipeCollectorRuntimeConfiguration();
-      pipeCollector.customize(collectorParameters, runtimeConfiguration);
-      // TODO: use runtimeConfiguration to configure collector
+      pipeCollector.customize(localizedCollectorParameters, 
runtimeConfiguration);
     } catch (Exception e) {
       throw new PipeException(e.getMessage(), e);
     }
   }
 
+  @Override
+  public void createSubtask() throws PipeException {
+    // do nothing
+  }
+
   @Override
   public void startSubtask() throws PipeException {
     try {
@@ -129,8 +118,4 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
   public EventSupplier getEventSupplier() {
     return pipeCollector::supply;
   }
-
-  public UnboundedBlockingPendingQueue<Event> getCollectorPendingQueue() {
-    return collectorPendingQueue;
-  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index 571b04773a4..02c6576ce94 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.db.pipe.config.PipeProcessorConstant;
 import 
org.apache.iotdb.db.pipe.execution.executor.PipeProcessorSubtaskExecutor;
 import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager;
 import org.apache.iotdb.db.pipe.processor.PipeDoNothingProcessor;
-import org.apache.iotdb.db.pipe.task.connection.BlockingPendingQueue;
 import org.apache.iotdb.db.pipe.task.connection.BoundedBlockingPendingQueue;
 import org.apache.iotdb.db.pipe.task.connection.EventSupplier;
 import org.apache.iotdb.db.pipe.task.connection.PipeEventCollector;
@@ -38,8 +37,6 @@ import 
org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfig
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
-import javax.annotation.Nullable;
-
 public class PipeTaskProcessorStage extends PipeTaskStage {
 
   protected final PipeProcessorSubtaskExecutor executor =
@@ -49,15 +46,10 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
   protected final PipeProcessor pipeProcessor;
   protected final PipeProcessorSubtask pipeProcessorSubtask;
 
-  protected final BlockingPendingQueue<Event> pipeCollectorInputPendingQueue;
-  protected final BlockingPendingQueue<Event> pipeConnectorOutputPendingQueue;
-
   /**
    * @param pipeName pipe name
    * @param dataRegionId data region id
    * @param pipeCollectorInputEventSupplier used to input events from pipe 
collector
-   * @param pipeCollectorInputPendingQueue used to listen whether pipe 
collector event queue is from
-   *     empty to not empty or from not empty to empty, null means no need to 
listen
    * @param pipeProcessorParameters used to create pipe processor
    * @param pipeConnectorOutputPendingQueue used to output events to pipe 
connector
    */
@@ -65,12 +57,10 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
       String pipeName,
       TConsensusGroupId dataRegionId,
       EventSupplier pipeCollectorInputEventSupplier,
-      @Nullable BlockingPendingQueue<Event> pipeCollectorInputPendingQueue,
       PipeParameters pipeProcessorParameters,
       BoundedBlockingPendingQueue<Event> pipeConnectorOutputPendingQueue) {
     this.pipeProcessorParameters = pipeProcessorParameters;
 
-    final String taskId = pipeName + "_" + dataRegionId;
     pipeProcessor =
         pipeProcessorParameters
                 .getStringOrDefault(
@@ -79,22 +69,8 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
                 
.equals(BuiltinPipePlugin.DO_NOTHING_PROCESSOR.getPipePluginName())
             ? new PipeDoNothingProcessor()
             : PipeAgent.plugin().reflectProcessor(pipeProcessorParameters);
-    final PipeEventCollector pipeConnectorOutputEventCollector =
-        new PipeEventCollector(pipeConnectorOutputPendingQueue);
-
-    this.pipeProcessorSubtask =
-        new PipeProcessorSubtask(
-            taskId,
-            pipeCollectorInputEventSupplier,
-            pipeProcessor,
-            pipeConnectorOutputEventCollector);
-
-    this.pipeCollectorInputPendingQueue = pipeCollectorInputPendingQueue;
-    this.pipeConnectorOutputPendingQueue = pipeConnectorOutputPendingQueue;
-  }
-
-  @Override
-  public void createSubtask() throws PipeException {
+    // validate and customize should be called before createSubtask. this 
allows collector exposing
+    // exceptions in advance.
     try {
       // 1. validate processor parameters
       pipeProcessor.validate(new 
PipeParameterValidator(pipeProcessorParameters));
@@ -103,11 +79,23 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
       final PipeProcessorRuntimeConfiguration runtimeConfiguration =
           new PipeProcessorRuntimeConfiguration();
       pipeProcessor.customize(pipeProcessorParameters, runtimeConfiguration);
-      // TODO: use runtimeConfiguration to configure processor
     } catch (Exception e) {
       throw new PipeException(e.getMessage(), e);
     }
 
+    final String taskId = pipeName + "_" + dataRegionId;
+    final PipeEventCollector pipeConnectorOutputEventCollector =
+        new PipeEventCollector(pipeConnectorOutputPendingQueue);
+    this.pipeProcessorSubtask =
+        new PipeProcessorSubtask(
+            taskId,
+            pipeCollectorInputEventSupplier,
+            pipeProcessor,
+            pipeConnectorOutputEventCollector);
+  }
+
+  @Override
+  public void createSubtask() throws PipeException {
     executor.register(pipeProcessorSubtask);
   }
 
diff --git 
a/server/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
 
b/server/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
index aa0b7506cbf..8b797abcfff 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
@@ -23,6 +23,7 @@ import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.TsFileInsertionDataContainer;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.utils.TsFileGeneratorUtils;
 
 import org.junit.After;
@@ -45,6 +46,8 @@ public class TsFileInsertionDataContainerTest {
   private static final Logger LOGGER =
       LoggerFactory.getLogger(TsFileInsertionDataContainerTest.class);
 
+  private static final long TSFILE_START_TIME = 300L;
+
   private File alignedTsFile;
   private File nonalignedTsFile;
 
@@ -68,40 +71,83 @@ public class TsFileInsertionDataContainerTest {
     measurementNumbers.add(1);
     measurementNumbers.add(2);
 
-    for (int deviceNumber : deviceNumbers) {
-      for (int measurementNumber : measurementNumbers) {
-        testToTabletInsertionEvents(deviceNumber, measurementNumber, 0);
-        testToTabletInsertionEvents(deviceNumber, measurementNumber, 1);
-        testToTabletInsertionEvents(deviceNumber, measurementNumber, 2);
+    Set<Pair<Long, Long>> startEndTimes = new HashSet<>();
+    startEndTimes.add(new Pair<>(100L, TSFILE_START_TIME - 1));
+    startEndTimes.add(new Pair<>(100L, TSFILE_START_TIME));
+    startEndTimes.add(new Pair<>(100L, TSFILE_START_TIME + 1));
 
-        testToTabletInsertionEvents(deviceNumber, measurementNumber, 999);
-        testToTabletInsertionEvents(deviceNumber, measurementNumber, 1000);
-        testToTabletInsertionEvents(deviceNumber, measurementNumber, 1001);
+    startEndTimes.add(new Pair<>(TSFILE_START_TIME - 1, TSFILE_START_TIME - 
1));
+    startEndTimes.add(new Pair<>(TSFILE_START_TIME, TSFILE_START_TIME));
+    startEndTimes.add(new Pair<>(TSFILE_START_TIME + 1, TSFILE_START_TIME + 
1));
 
-        testToTabletInsertionEvents(deviceNumber, measurementNumber, 999 * 2 + 
1);
-        testToTabletInsertionEvents(deviceNumber, measurementNumber, 1000);
-        testToTabletInsertionEvents(deviceNumber, measurementNumber, 1001 * 2 
- 1);
+    startEndTimes.add(new Pair<>(TSFILE_START_TIME + 1, TSFILE_START_TIME + 
1));
+    startEndTimes.add(new Pair<>(TSFILE_START_TIME + 1, TSFILE_START_TIME + 
10));
+    startEndTimes.add(new Pair<>(TSFILE_START_TIME + 1, TSFILE_START_TIME + 
100));
+    startEndTimes.add(new Pair<>(TSFILE_START_TIME + 1, TSFILE_START_TIME + 
10000));
 
-        testToTabletInsertionEvents(deviceNumber, measurementNumber, 1023);
-        testToTabletInsertionEvents(deviceNumber, measurementNumber, 1024);
-        testToTabletInsertionEvents(deviceNumber, measurementNumber, 1025);
+    startEndTimes.add(new Pair<>(TSFILE_START_TIME + 1000000, 
TSFILE_START_TIME + 2000000));
 
-        testToTabletInsertionEvents(deviceNumber, measurementNumber, 1023 * 2 
+ 1);
-        testToTabletInsertionEvents(deviceNumber, measurementNumber, 1024 * 2);
-        testToTabletInsertionEvents(deviceNumber, measurementNumber, 1025 * 2 
- 1);
+    startEndTimes.add(new Pair<>(Long.MIN_VALUE, Long.MAX_VALUE));
 
-        testToTabletInsertionEvents(deviceNumber, measurementNumber, 10001);
+    for (int deviceNumber : deviceNumbers) {
+      for (int measurementNumber : measurementNumbers) {
+        for (Pair<Long, Long> startEndTime : startEndTimes) {
+          testToTabletInsertionEvents(
+              deviceNumber, measurementNumber, 0, startEndTime.left, 
startEndTime.right);
+          testToTabletInsertionEvents(
+              deviceNumber, measurementNumber, 1, startEndTime.left, 
startEndTime.right);
+          testToTabletInsertionEvents(
+              deviceNumber, measurementNumber, 2, startEndTime.left, 
startEndTime.right);
+
+          testToTabletInsertionEvents(
+              deviceNumber, measurementNumber, 999, startEndTime.left, 
startEndTime.right);
+          testToTabletInsertionEvents(
+              deviceNumber, measurementNumber, 1000, startEndTime.left, 
startEndTime.right);
+          testToTabletInsertionEvents(
+              deviceNumber, measurementNumber, 1001, startEndTime.left, 
startEndTime.right);
+
+          testToTabletInsertionEvents(
+              deviceNumber, measurementNumber, 999 * 2 + 1, startEndTime.left, 
startEndTime.right);
+          testToTabletInsertionEvents(
+              deviceNumber, measurementNumber, 1000, startEndTime.left, 
startEndTime.right);
+          testToTabletInsertionEvents(
+              deviceNumber, measurementNumber, 1001 * 2 - 1, 
startEndTime.left, startEndTime.right);
+
+          testToTabletInsertionEvents(
+              deviceNumber, measurementNumber, 1023, startEndTime.left, 
startEndTime.right);
+          testToTabletInsertionEvents(
+              deviceNumber, measurementNumber, 1024, startEndTime.left, 
startEndTime.right);
+          testToTabletInsertionEvents(
+              deviceNumber, measurementNumber, 1025, startEndTime.left, 
startEndTime.right);
+
+          testToTabletInsertionEvents(
+              deviceNumber, measurementNumber, 1023 * 2 + 1, 
startEndTime.left, startEndTime.right);
+          testToTabletInsertionEvents(
+              deviceNumber, measurementNumber, 1024 * 2, startEndTime.left, 
startEndTime.right);
+          testToTabletInsertionEvents(
+              deviceNumber, measurementNumber, 1025 * 2 - 1, 
startEndTime.left, startEndTime.right);
+
+          testToTabletInsertionEvents(
+              deviceNumber, measurementNumber, 10001, startEndTime.left, 
startEndTime.right);
+        }
       }
     }
   }
 
   private void testToTabletInsertionEvents(
-      int deviceNumber, int measurementNumber, int rowNumberInOneDevice) 
throws Exception {
+      int deviceNumber,
+      int measurementNumber,
+      int rowNumberInOneDevice,
+      long startTime,
+      long endTime)
+      throws Exception {
     LOGGER.info(
-        "testToTabletInsertionEvents: deviceNumber = {}, measurementNumber = 
{}, rowNumberInOneDevice = {}",
+        "testToTabletInsertionEvents: deviceNumber: {}, measurementNumber: {}, 
rowNumberInOneDevice: {}, startTime: {}, endTime: {}",
         deviceNumber,
         measurementNumber,
-        rowNumberInOneDevice);
+        rowNumberInOneDevice,
+        startTime,
+        endTime);
 
     alignedTsFile =
         TsFileGeneratorUtils.generateAlignedTsFile(
@@ -109,7 +155,7 @@ public class TsFileInsertionDataContainerTest {
             deviceNumber,
             measurementNumber,
             rowNumberInOneDevice,
-            300,
+            (int) TSFILE_START_TIME,
             10000,
             700,
             50);
@@ -119,15 +165,36 @@ public class TsFileInsertionDataContainerTest {
             deviceNumber,
             measurementNumber,
             rowNumberInOneDevice,
-            300,
+            (int) TSFILE_START_TIME,
             10000,
             700,
             50);
 
+    final int tsfileEndTime = (int) TSFILE_START_TIME + rowNumberInOneDevice - 
1;
+
+    int expectedRowNumber = rowNumberInOneDevice;
+    Assert.assertTrue(startTime <= endTime);
+    if (startTime != Long.MIN_VALUE && endTime != Long.MAX_VALUE) {
+      if (startTime < TSFILE_START_TIME) {
+        if (endTime < TSFILE_START_TIME) {
+          expectedRowNumber = 0;
+        } else {
+          expectedRowNumber =
+              Math.min((int) (endTime - TSFILE_START_TIME + 1), 
rowNumberInOneDevice);
+        }
+      } else if (tsfileEndTime < startTime) {
+        expectedRowNumber = 0;
+      } else {
+        expectedRowNumber =
+            Math.min(
+                (int) (Math.min(endTime, tsfileEndTime) - startTime + 1), 
rowNumberInOneDevice);
+      }
+    }
+
     try (final TsFileInsertionDataContainer alignedContainer =
-            new TsFileInsertionDataContainer(alignedTsFile, "root");
+            new TsFileInsertionDataContainer(alignedTsFile, "root", startTime, 
endTime);
         final TsFileInsertionDataContainer nonalignedContainer =
-            new TsFileInsertionDataContainer(nonalignedTsFile, "root"); ) {
+            new TsFileInsertionDataContainer(nonalignedTsFile, "root", 
startTime, endTime); ) {
       AtomicInteger count1 = new AtomicInteger(0);
       AtomicInteger count2 = new AtomicInteger(0);
       AtomicInteger count3 = new AtomicInteger(0);
@@ -178,9 +245,9 @@ public class TsFileInsertionDataContainerTest {
                                                         });
                                               }))));
 
-      Assert.assertEquals(count1.getAndSet(0), deviceNumber * 
rowNumberInOneDevice);
-      Assert.assertEquals(count2.getAndSet(0), deviceNumber * 
rowNumberInOneDevice);
-      Assert.assertEquals(count3.getAndSet(0), deviceNumber * 
rowNumberInOneDevice);
+      Assert.assertEquals(count1.getAndSet(0), deviceNumber * 
expectedRowNumber);
+      Assert.assertEquals(count2.getAndSet(0), deviceNumber * 
expectedRowNumber);
+      Assert.assertEquals(count3.getAndSet(0), deviceNumber * 
expectedRowNumber);
 
       nonalignedContainer
           .toTabletInsertionEvents()
@@ -228,9 +295,9 @@ public class TsFileInsertionDataContainerTest {
                                                 }
                                               }))));
 
-      Assert.assertEquals(count1.get(), deviceNumber * rowNumberInOneDevice);
-      Assert.assertEquals(count2.get(), deviceNumber * rowNumberInOneDevice);
-      Assert.assertEquals(count3.get(), deviceNumber * rowNumberInOneDevice);
+      Assert.assertEquals(count1.get(), deviceNumber * expectedRowNumber);
+      Assert.assertEquals(count2.get(), deviceNumber * expectedRowNumber);
+      Assert.assertEquals(count3.get(), deviceNumber * expectedRowNumber);
     } catch (Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
@@ -272,10 +339,11 @@ public class TsFileInsertionDataContainerTest {
     }
 
     try (final TsFileInsertionDataContainer alignedContainer =
-            new TsFileInsertionDataContainer(alignedTsFile, 
oneDeviceInAlignedTsFile.get());
+            new TsFileInsertionDataContainer(
+                alignedTsFile, oneDeviceInAlignedTsFile.get(), startTime, 
endTime);
         final TsFileInsertionDataContainer nonalignedContainer =
             new TsFileInsertionDataContainer(
-                nonalignedTsFile, oneDeviceInUnalignedTsFile.get()); ) {
+                nonalignedTsFile, oneDeviceInUnalignedTsFile.get(), startTime, 
endTime); ) {
       AtomicInteger count1 = new AtomicInteger(0);
       AtomicInteger count2 = new AtomicInteger(0);
       AtomicInteger count3 = new AtomicInteger(0);
@@ -326,9 +394,9 @@ public class TsFileInsertionDataContainerTest {
                                                         });
                                               }))));
 
-      Assert.assertEquals(count1.getAndSet(0), rowNumberInOneDevice);
-      Assert.assertEquals(count2.getAndSet(0), rowNumberInOneDevice);
-      Assert.assertEquals(count3.getAndSet(0), rowNumberInOneDevice);
+      Assert.assertEquals(count1.getAndSet(0), expectedRowNumber);
+      Assert.assertEquals(count2.getAndSet(0), expectedRowNumber);
+      Assert.assertEquals(count3.getAndSet(0), expectedRowNumber);
 
       nonalignedContainer
           .toTabletInsertionEvents()
@@ -376,19 +444,20 @@ public class TsFileInsertionDataContainerTest {
                                                 }
                                               }))));
 
-      Assert.assertEquals(count1.get(), rowNumberInOneDevice);
-      Assert.assertEquals(count2.get(), rowNumberInOneDevice);
-      Assert.assertEquals(count3.get(), rowNumberInOneDevice);
+      Assert.assertEquals(count1.get(), expectedRowNumber);
+      Assert.assertEquals(count2.get(), expectedRowNumber);
+      Assert.assertEquals(count3.get(), expectedRowNumber);
     } catch (Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
     }
 
     try (final TsFileInsertionDataContainer alignedContainer =
-            new TsFileInsertionDataContainer(alignedTsFile, 
oneMeasurementInAlignedTsFile.get());
+            new TsFileInsertionDataContainer(
+                alignedTsFile, oneMeasurementInAlignedTsFile.get(), startTime, 
endTime);
         final TsFileInsertionDataContainer nonalignedContainer =
             new TsFileInsertionDataContainer(
-                nonalignedTsFile, oneMeasurementInUnalignedTsFile.get()); ) {
+                nonalignedTsFile, oneMeasurementInUnalignedTsFile.get(), 
startTime, endTime); ) {
       AtomicInteger count1 = new AtomicInteger(0);
       AtomicInteger count2 = new AtomicInteger(0);
       AtomicInteger count3 = new AtomicInteger(0);
@@ -438,9 +507,9 @@ public class TsFileInsertionDataContainerTest {
                                                         });
                                               }))));
 
-      Assert.assertEquals(count1.getAndSet(0), rowNumberInOneDevice);
-      Assert.assertEquals(count2.getAndSet(0), rowNumberInOneDevice);
-      Assert.assertEquals(count3.getAndSet(0), rowNumberInOneDevice);
+      Assert.assertEquals(count1.getAndSet(0), expectedRowNumber);
+      Assert.assertEquals(count2.getAndSet(0), expectedRowNumber);
+      Assert.assertEquals(count3.getAndSet(0), expectedRowNumber);
 
       nonalignedContainer
           .toTabletInsertionEvents()
@@ -487,18 +556,20 @@ public class TsFileInsertionDataContainerTest {
                                                 }
                                               }))));
 
-      Assert.assertEquals(count1.get(), rowNumberInOneDevice);
-      Assert.assertEquals(count2.get(), rowNumberInOneDevice);
-      Assert.assertEquals(count3.get(), rowNumberInOneDevice);
+      Assert.assertEquals(count1.get(), expectedRowNumber);
+      Assert.assertEquals(count2.get(), expectedRowNumber);
+      Assert.assertEquals(count3.get(), expectedRowNumber);
     } catch (Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
     }
 
     try (final TsFileInsertionDataContainer alignedContainer =
-            new TsFileInsertionDataContainer(alignedTsFile, 
"not-exist-pattern");
+            new TsFileInsertionDataContainer(
+                alignedTsFile, "not-exist-pattern", startTime, endTime);
         final TsFileInsertionDataContainer nonalignedContainer =
-            new TsFileInsertionDataContainer(nonalignedTsFile, 
"not-exist-pattern"); ) {
+            new TsFileInsertionDataContainer(
+                nonalignedTsFile, "not-exist-pattern", startTime, endTime); ) {
       AtomicInteger count1 = new AtomicInteger(0);
       AtomicInteger count2 = new AtomicInteger(0);
       AtomicInteger count3 = new AtomicInteger(0);

Reply via email to