This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch IOTDB-5977
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 20d7004486bf46a0a46d72d4c9e209d7939135ef
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu Jun 8 20:59:30 2023 +0800

    [IOTDB-5977] Pipe: start-time and end-time in collector.history are not 
working correctly
---
 .../PipeHistoricalDataRegionTsFileCollector.java   |  48 ++++--
 .../common/tsfile/PipeTsFileInsertionEvent.java    |  23 ++-
 .../tsfile/TsFileInsertionDataContainer.java       |  87 +++++++----
 .../tsfile/TsFileInsertionDataTabletIterator.java  |  10 +-
 .../db/pipe/processor/PipeDoNothingProcessor.java  |  11 +-
 .../event/TsFileInsertionDataContainerTest.java    | 169 +++++++++++++++------
 6 files changed, 247 insertions(+), 101 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
index 8a6f21603e9..f39923fd67a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
@@ -44,6 +44,7 @@ import java.util.ArrayDeque;
 import java.util.Queue;
 import java.util.stream.Collectors;
 
+import static 
org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_HISTORY_ENABLE_KEY;
 import static 
org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_HISTORY_END_TIME;
 import static 
org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_HISTORY_START_TIME;
 import static 
org.apache.iotdb.db.pipe.config.PipeCollectorConstant.DATA_REGION_KEY;
@@ -56,12 +57,13 @@ public class PipeHistoricalDataRegionTsFileCollector 
extends PipeHistoricalDataR
   private final PipeTaskMeta pipeTaskMeta;
   private final ProgressIndex startIndex;
 
-  private String pattern;
   private int dataRegionId;
 
-  private final long historicalDataCollectionTimeLowerBound;
-  private long historicalDataCollectionStartTime;
-  private long historicalDataCollectionEndTime;
+  private String pattern;
+
+  private final long historicalDataCollectionTimeLowerBound; // arrival time
+  private long historicalDataCollectionStartTime; // event time
+  private long historicalDataCollectionEndTime; // event time
 
   private Queue<PipeTsFileInsertionEvent> pendingQueue;
 
@@ -81,18 +83,24 @@ public class PipeHistoricalDataRegionTsFileCollector 
extends PipeHistoricalDataR
   @Override
   public void customize(
       PipeParameters parameters, PipeCollectorRuntimeConfiguration 
configuration) {
+    dataRegionId = parameters.getInt(DATA_REGION_KEY);
+
     pattern =
         parameters.getStringOrDefault(
             PipeCollectorConstant.COLLECTOR_PATTERN_KEY,
             PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE);
-    dataRegionId = parameters.getInt(DATA_REGION_KEY);
+
+    // user may set the COLLECTOR_HISTORY_START_TIME and 
COLLECTOR_HISTORY_END_TIME without
+    // enabling the historical data collection, which may affect the realtime 
data collection.
+    final boolean isHistoricalCollectorEnabledByUser =
+        parameters.getBooleanOrDefault(COLLECTOR_HISTORY_ENABLE_KEY, true);
     historicalDataCollectionStartTime =
-        parameters.hasAttribute(COLLECTOR_HISTORY_START_TIME)
+        isHistoricalCollectorEnabledByUser && 
parameters.hasAttribute(COLLECTOR_HISTORY_START_TIME)
             ? DateTimeUtils.convertDatetimeStrToLong(
                 parameters.getString(COLLECTOR_HISTORY_START_TIME), 
ZoneId.systemDefault())
             : Long.MIN_VALUE;
     historicalDataCollectionEndTime =
-        parameters.hasAttribute(COLLECTOR_HISTORY_END_TIME)
+        isHistoricalCollectorEnabledByUser && 
parameters.hasAttribute(COLLECTOR_HISTORY_END_TIME)
             ? DateTimeUtils.convertDatetimeStrToLong(
                 parameters.getString(COLLECTOR_HISTORY_END_TIME), 
ZoneId.systemDefault())
             : Long.MAX_VALUE;
@@ -158,7 +166,14 @@ public class PipeHistoricalDataRegionTsFileCollector 
extends PipeHistoricalDataR
                         
!startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
                             && 
isTsFileResourceOverlappedWithTimeRange(resource)
                             && 
isTsFileGeneratedAfterCollectionTimeLowerBound(resource))
-                .map(resource -> new PipeTsFileInsertionEvent(resource, 
pipeTaskMeta, pattern))
+                .map(
+                    resource ->
+                        new PipeTsFileInsertionEvent(
+                            resource,
+                            pipeTaskMeta,
+                            pattern,
+                            historicalDataCollectionStartTime,
+                            historicalDataCollectionEndTime))
                 .collect(Collectors.toList()));
         pendingQueue.addAll(
             tsFileManager.getTsFileList(false).stream()
@@ -167,7 +182,14 @@ public class PipeHistoricalDataRegionTsFileCollector 
extends PipeHistoricalDataR
                         
!startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
                             && 
isTsFileResourceOverlappedWithTimeRange(resource)
                             && 
isTsFileGeneratedAfterCollectionTimeLowerBound(resource))
-                .map(resource -> new PipeTsFileInsertionEvent(resource, 
pipeTaskMeta, pattern))
+                .map(
+                    resource ->
+                        new PipeTsFileInsertionEvent(
+                            resource,
+                            pipeTaskMeta,
+                            pattern,
+                            historicalDataCollectionStartTime,
+                            historicalDataCollectionEndTime))
                 .collect(Collectors.toList()));
         pendingQueue.forEach(
             event ->
@@ -182,8 +204,8 @@ public class PipeHistoricalDataRegionTsFileCollector 
extends PipeHistoricalDataR
   }
 
   private boolean isTsFileResourceOverlappedWithTimeRange(TsFileResource 
resource) {
-    return !(resource.getFileEndTime() < historicalDataCollectionStartTime
-        || historicalDataCollectionEndTime < resource.getFileStartTime());
+    return historicalDataCollectionStartTime <= resource.getFileEndTime()
+        || resource.getFileStartTime() <= historicalDataCollectionEndTime;
   }
 
   private boolean 
isTsFileGeneratedAfterCollectionTimeLowerBound(TsFileResource resource) {
@@ -192,7 +214,9 @@ public class PipeHistoricalDataRegionTsFileCollector 
extends PipeHistoricalDataR
           <= 
TsFileNameGenerator.getTsFileName(resource.getTsFile().getName()).getTime();
     } catch (IOException e) {
       LOGGER.warn(
-          String.format("failed to get the generation time of TsFile %s", 
resource.getTsFilePath()),
+          String.format(
+              "failed to get the generation time of TsFile %s, collect it 
anyway",
+              resource.getTsFilePath()),
           e);
       // If failed to get the generation time of the TsFile, we will collect 
the data in the TsFile
       // anyway.
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 785948b66a4..d82313dad8d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -41,6 +41,10 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTsFileInsertionEvent.class);
 
+  // used to filter data
+  private final long startTime;
+  private final long endTime;
+
   private final TsFileResource resource;
   private File tsFile;
 
@@ -49,13 +53,20 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
   private TsFileInsertionDataContainer dataContainer;
 
   public PipeTsFileInsertionEvent(TsFileResource resource) {
-    this(resource, null, null);
+    this(resource, null, null, Long.MIN_VALUE, Long.MAX_VALUE);
   }
 
   public PipeTsFileInsertionEvent(
-      TsFileResource resource, PipeTaskMeta pipeTaskMeta, String pattern) {
+      TsFileResource resource,
+      PipeTaskMeta pipeTaskMeta,
+      String pattern,
+      long startTime,
+      long endTime) {
     super(pipeTaskMeta, pattern);
 
+    this.startTime = startTime;
+    this.endTime = endTime;
+
     this.resource = resource;
     tsFile = resource.getTsFile();
 
@@ -89,6 +100,10 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
     return tsFile;
   }
 
+  public boolean hasTimeFilter() {
+    return startTime != Long.MIN_VALUE || endTime != Long.MAX_VALUE;
+  }
+
   /////////////////////////// EnrichedEvent ///////////////////////////
 
   @Override
@@ -138,7 +153,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
   @Override
   public PipeTsFileInsertionEvent 
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
       PipeTaskMeta pipeTaskMeta, String pattern) {
-    return new PipeTsFileInsertionEvent(resource, pipeTaskMeta, pattern);
+    return new PipeTsFileInsertionEvent(resource, pipeTaskMeta, pattern, 
startTime, endTime);
   }
 
   /////////////////////////// TsFileInsertionEvent ///////////////////////////
@@ -148,7 +163,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
     try {
       if (dataContainer == null) {
         waitForTsFileClose();
-        dataContainer = new TsFileInsertionDataContainer(tsFile, getPattern());
+        dataContainer = new TsFileInsertionDataContainer(tsFile, getPattern(), 
startTime, endTime);
       }
       return dataContainer.toTabletInsertionEvents();
     } catch (InterruptedException e) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
index 6426e8dccad..e9347d31202 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
@@ -26,6 +26,10 @@ import 
org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TsFileReader;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.BinaryExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,7 +47,9 @@ public class TsFileInsertionDataContainer implements 
AutoCloseable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TsFileInsertionDataContainer.class);
 
+  // used to filter data
   private final String pattern;
+  private final IExpression timeFilterExpression;
 
   private final TsFileSequenceReader tsFileSequenceReader;
   private final TsFileReader tsFileReader;
@@ -51,14 +57,27 @@ public class TsFileInsertionDataContainer implements 
AutoCloseable {
   private final Iterator<Map.Entry<String, List<String>>> 
deviceMeasurementsMapIterator;
   private final Map<String, TSDataType> measurementDataTypeMap;
 
-  public TsFileInsertionDataContainer(File tsFile, String pattern) throws 
IOException {
+  public TsFileInsertionDataContainer(File tsFile, String pattern, long 
startTime, long endTime)
+      throws IOException {
     this.pattern = pattern;
-
-    tsFileSequenceReader = new TsFileSequenceReader(tsFile.getAbsolutePath());
-    tsFileReader = new TsFileReader(tsFileSequenceReader);
-
-    deviceMeasurementsMapIterator = 
filterDeviceMeasurementsMapByPattern().entrySet().iterator();
-    measurementDataTypeMap = tsFileSequenceReader.getFullPathDataTypeMap();
+    timeFilterExpression =
+        (startTime == Long.MIN_VALUE && endTime == Long.MAX_VALUE)
+            ? null
+            : BinaryExpression.and(
+                new GlobalTimeExpression(TimeFilter.gtEq(startTime)),
+                new GlobalTimeExpression(TimeFilter.ltEq(endTime)));
+
+    try {
+      tsFileSequenceReader = new 
TsFileSequenceReader(tsFile.getAbsolutePath());
+      tsFileReader = new TsFileReader(tsFileSequenceReader);
+
+      deviceMeasurementsMapIterator = 
filterDeviceMeasurementsMapByPattern().entrySet().iterator();
+      measurementDataTypeMap = tsFileSequenceReader.getFullPathDataTypeMap();
+    } catch (Exception e) {
+      LOGGER.error("failed to create TsFileInsertionDataContainer", e);
+      close();
+      throw e;
+    }
   }
 
   private Map<String, List<String>> filterDeviceMeasurementsMapByPattern() 
throws IOException {
@@ -109,19 +128,9 @@ public class TsFileInsertionDataContainer implements 
AutoCloseable {
 
           @Override
           public boolean hasNext() {
-            return (tabletIterator != null && tabletIterator.hasNext())
-                || deviceMeasurementsMapIterator.hasNext();
-          }
-
-          @Override
-          public TabletInsertionEvent next() {
-            if (!hasNext()) {
-              throw new NoSuchElementException();
-            }
-
             while (tabletIterator == null || !tabletIterator.hasNext()) {
               if (!deviceMeasurementsMapIterator.hasNext()) {
-                throw new NoSuchElementException();
+                return false;
               }
 
               final Map.Entry<String, List<String>> entry = 
deviceMeasurementsMapIterator.next();
@@ -129,21 +138,32 @@ public class TsFileInsertionDataContainer implements 
AutoCloseable {
               try {
                 tabletIterator =
                     new TsFileInsertionDataTabletIterator(
-                        tsFileReader, measurementDataTypeMap, entry.getKey(), 
entry.getValue());
+                        tsFileReader,
+                        measurementDataTypeMap,
+                        entry.getKey(),
+                        entry.getValue(),
+                        timeFilterExpression);
               } catch (IOException e) {
+                close();
                 throw new PipeException("failed to create 
TsFileInsertionDataTabletIterator", e);
               }
             }
 
+            return true;
+          }
+
+          @Override
+          public TabletInsertionEvent next() {
+            if (!hasNext()) {
+              close();
+              throw new NoSuchElementException();
+            }
+
             final TabletInsertionEvent next =
                 new PipeRawTabletInsertionEvent(tabletIterator.next());
 
             if (!hasNext()) {
-              try {
-                close();
-              } catch (Exception e) {
-                LOGGER.warn("Failed to close TsFileInsertionDataContainer", e);
-              }
+              close();
             }
 
             return next;
@@ -152,12 +172,21 @@ public class TsFileInsertionDataContainer implements 
AutoCloseable {
   }
 
   @Override
-  public void close() throws Exception {
-    if (tsFileReader != null) {
-      tsFileReader.close();
+  public void close() {
+    try {
+      if (tsFileReader != null) {
+        tsFileReader.close();
+      }
+    } catch (IOException e) {
+      LOGGER.warn("Failed to close TsFileReader", e);
     }
-    if (tsFileSequenceReader != null) {
-      tsFileSequenceReader.close();
+
+    try {
+      if (tsFileSequenceReader != null) {
+        tsFileSequenceReader.close();
+      }
+    } catch (IOException e) {
+      LOGGER.warn("Failed to close TsFileSequenceReader", e);
     }
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataTabletIterator.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataTabletIterator.java
index 8b3b1e14fe5..63b2b15d403 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataTabletIterator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataTabletIterator.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.tsfile.read.TsFileReader;
 import org.apache.iotdb.tsfile.read.common.Field;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.expression.QueryExpression;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.write.record.Tablet;
@@ -47,13 +48,16 @@ public class TsFileInsertionDataTabletIterator implements 
Iterator<Tablet> {
   private final String deviceId;
   private final List<String> measurements;
 
+  private final IExpression timeFilterExpression;
+
   private final QueryDataSet queryDataSet;
 
   public TsFileInsertionDataTabletIterator(
       TsFileReader tsFileReader,
       Map<String, TSDataType> measurementDataTypeMap,
       String deviceId,
-      List<String> measurements)
+      List<String> measurements,
+      IExpression timeFilterExpression)
       throws IOException {
     this.tsFileReader = tsFileReader;
     this.measurementDataTypeMap = measurementDataTypeMap;
@@ -68,6 +72,8 @@ public class TsFileInsertionDataTabletIterator implements 
Iterator<Tablet> {
             .sorted()
             .collect(Collectors.toList());
 
+    this.timeFilterExpression = timeFilterExpression;
+
     this.queryDataSet = buildQueryDataSet();
   }
 
@@ -76,7 +82,7 @@ public class TsFileInsertionDataTabletIterator implements 
Iterator<Tablet> {
     for (String measurement : measurements) {
       paths.add(new Path(deviceId, measurement, false));
     }
-    return tsFileReader.query(QueryExpression.create(paths, null));
+    return tsFileReader.query(QueryExpression.create(paths, 
timeFilterExpression));
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/processor/PipeDoNothingProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/processor/PipeDoNothingProcessor.java
index a976358611e..c01751ed824 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/processor/PipeDoNothingProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/processor/PipeDoNothingProcessor.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.processor;
 
 import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
 import org.apache.iotdb.db.pipe.event.EnrichedEvent;
+import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.PipeProcessor;
 import org.apache.iotdb.pipe.api.collector.EventCollector;
 import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
@@ -82,11 +83,11 @@ public class PipeDoNothingProcessor implements 
PipeProcessor {
   @Override
   public void process(TsFileInsertionEvent tsFileInsertionEvent, 
EventCollector eventCollector)
       throws IOException {
-    if (tsFileInsertionEvent instanceof EnrichedEvent) {
-      final EnrichedEvent enrichedEvent = (EnrichedEvent) tsFileInsertionEvent;
-      if (enrichedEvent
-          .getPattern()
-          .equals(PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE)) {
+    if (tsFileInsertionEvent instanceof PipeTsFileInsertionEvent) {
+      final PipeTsFileInsertionEvent enrichedEvent =
+          (PipeTsFileInsertionEvent) tsFileInsertionEvent;
+      if 
(enrichedEvent.getPattern().equals(PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE)
+          && !enrichedEvent.hasTimeFilter()) {
         eventCollector.collect(tsFileInsertionEvent);
       } else {
         for (final TabletInsertionEvent tabletInsertionEvent :
diff --git 
a/server/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
 
b/server/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
index aa0b7506cbf..8b797abcfff 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
@@ -23,6 +23,7 @@ import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.TsFileInsertionDataContainer;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.utils.TsFileGeneratorUtils;
 
 import org.junit.After;
@@ -45,6 +46,8 @@ public class TsFileInsertionDataContainerTest {
   private static final Logger LOGGER =
       LoggerFactory.getLogger(TsFileInsertionDataContainerTest.class);
 
+  private static final long TSFILE_START_TIME = 300L;
+
   private File alignedTsFile;
   private File nonalignedTsFile;
 
@@ -68,40 +71,83 @@ public class TsFileInsertionDataContainerTest {
     measurementNumbers.add(1);
     measurementNumbers.add(2);
 
-    for (int deviceNumber : deviceNumbers) {
-      for (int measurementNumber : measurementNumbers) {
-        testToTabletInsertionEvents(deviceNumber, measurementNumber, 0);
-        testToTabletInsertionEvents(deviceNumber, measurementNumber, 1);
-        testToTabletInsertionEvents(deviceNumber, measurementNumber, 2);
+    Set<Pair<Long, Long>> startEndTimes = new HashSet<>();
+    startEndTimes.add(new Pair<>(100L, TSFILE_START_TIME - 1));
+    startEndTimes.add(new Pair<>(100L, TSFILE_START_TIME));
+    startEndTimes.add(new Pair<>(100L, TSFILE_START_TIME + 1));
 
-        testToTabletInsertionEvents(deviceNumber, measurementNumber, 999);
-        testToTabletInsertionEvents(deviceNumber, measurementNumber, 1000);
-        testToTabletInsertionEvents(deviceNumber, measurementNumber, 1001);
+    startEndTimes.add(new Pair<>(TSFILE_START_TIME - 1, TSFILE_START_TIME - 
1));
+    startEndTimes.add(new Pair<>(TSFILE_START_TIME, TSFILE_START_TIME));
+    startEndTimes.add(new Pair<>(TSFILE_START_TIME + 1, TSFILE_START_TIME + 
1));
 
-        testToTabletInsertionEvents(deviceNumber, measurementNumber, 999 * 2 + 
1);
-        testToTabletInsertionEvents(deviceNumber, measurementNumber, 1000);
-        testToTabletInsertionEvents(deviceNumber, measurementNumber, 1001 * 2 
- 1);
+    startEndTimes.add(new Pair<>(TSFILE_START_TIME + 1, TSFILE_START_TIME + 
1));
+    startEndTimes.add(new Pair<>(TSFILE_START_TIME + 1, TSFILE_START_TIME + 
10));
+    startEndTimes.add(new Pair<>(TSFILE_START_TIME + 1, TSFILE_START_TIME + 
100));
+    startEndTimes.add(new Pair<>(TSFILE_START_TIME + 1, TSFILE_START_TIME + 
10000));
 
-        testToTabletInsertionEvents(deviceNumber, measurementNumber, 1023);
-        testToTabletInsertionEvents(deviceNumber, measurementNumber, 1024);
-        testToTabletInsertionEvents(deviceNumber, measurementNumber, 1025);
+    startEndTimes.add(new Pair<>(TSFILE_START_TIME + 1000000, 
TSFILE_START_TIME + 2000000));
 
-        testToTabletInsertionEvents(deviceNumber, measurementNumber, 1023 * 2 
+ 1);
-        testToTabletInsertionEvents(deviceNumber, measurementNumber, 1024 * 2);
-        testToTabletInsertionEvents(deviceNumber, measurementNumber, 1025 * 2 
- 1);
+    startEndTimes.add(new Pair<>(Long.MIN_VALUE, Long.MAX_VALUE));
 
-        testToTabletInsertionEvents(deviceNumber, measurementNumber, 10001);
+    for (int deviceNumber : deviceNumbers) {
+      for (int measurementNumber : measurementNumbers) {
+        for (Pair<Long, Long> startEndTime : startEndTimes) {
+          testToTabletInsertionEvents(
+              deviceNumber, measurementNumber, 0, startEndTime.left, 
startEndTime.right);
+          testToTabletInsertionEvents(
+              deviceNumber, measurementNumber, 1, startEndTime.left, 
startEndTime.right);
+          testToTabletInsertionEvents(
+              deviceNumber, measurementNumber, 2, startEndTime.left, 
startEndTime.right);
+
+          testToTabletInsertionEvents(
+              deviceNumber, measurementNumber, 999, startEndTime.left, 
startEndTime.right);
+          testToTabletInsertionEvents(
+              deviceNumber, measurementNumber, 1000, startEndTime.left, 
startEndTime.right);
+          testToTabletInsertionEvents(
+              deviceNumber, measurementNumber, 1001, startEndTime.left, 
startEndTime.right);
+
+          testToTabletInsertionEvents(
+              deviceNumber, measurementNumber, 999 * 2 + 1, startEndTime.left, 
startEndTime.right);
+          testToTabletInsertionEvents(
+              deviceNumber, measurementNumber, 1000, startEndTime.left, 
startEndTime.right);
+          testToTabletInsertionEvents(
+              deviceNumber, measurementNumber, 1001 * 2 - 1, 
startEndTime.left, startEndTime.right);
+
+          testToTabletInsertionEvents(
+              deviceNumber, measurementNumber, 1023, startEndTime.left, 
startEndTime.right);
+          testToTabletInsertionEvents(
+              deviceNumber, measurementNumber, 1024, startEndTime.left, 
startEndTime.right);
+          testToTabletInsertionEvents(
+              deviceNumber, measurementNumber, 1025, startEndTime.left, 
startEndTime.right);
+
+          testToTabletInsertionEvents(
+              deviceNumber, measurementNumber, 1023 * 2 + 1, 
startEndTime.left, startEndTime.right);
+          testToTabletInsertionEvents(
+              deviceNumber, measurementNumber, 1024 * 2, startEndTime.left, 
startEndTime.right);
+          testToTabletInsertionEvents(
+              deviceNumber, measurementNumber, 1025 * 2 - 1, 
startEndTime.left, startEndTime.right);
+
+          testToTabletInsertionEvents(
+              deviceNumber, measurementNumber, 10001, startEndTime.left, 
startEndTime.right);
+        }
       }
     }
   }
 
   private void testToTabletInsertionEvents(
-      int deviceNumber, int measurementNumber, int rowNumberInOneDevice) 
throws Exception {
+      int deviceNumber,
+      int measurementNumber,
+      int rowNumberInOneDevice,
+      long startTime,
+      long endTime)
+      throws Exception {
     LOGGER.info(
-        "testToTabletInsertionEvents: deviceNumber = {}, measurementNumber = 
{}, rowNumberInOneDevice = {}",
+        "testToTabletInsertionEvents: deviceNumber: {}, measurementNumber: {}, 
rowNumberInOneDevice: {}, startTime: {}, endTime: {}",
         deviceNumber,
         measurementNumber,
-        rowNumberInOneDevice);
+        rowNumberInOneDevice,
+        startTime,
+        endTime);
 
     alignedTsFile =
         TsFileGeneratorUtils.generateAlignedTsFile(
@@ -109,7 +155,7 @@ public class TsFileInsertionDataContainerTest {
             deviceNumber,
             measurementNumber,
             rowNumberInOneDevice,
-            300,
+            (int) TSFILE_START_TIME,
             10000,
             700,
             50);
@@ -119,15 +165,36 @@ public class TsFileInsertionDataContainerTest {
             deviceNumber,
             measurementNumber,
             rowNumberInOneDevice,
-            300,
+            (int) TSFILE_START_TIME,
             10000,
             700,
             50);
 
+    final int tsfileEndTime = (int) TSFILE_START_TIME + rowNumberInOneDevice - 
1;
+
+    int expectedRowNumber = rowNumberInOneDevice;
+    Assert.assertTrue(startTime <= endTime);
+    if (startTime != Long.MIN_VALUE && endTime != Long.MAX_VALUE) {
+      if (startTime < TSFILE_START_TIME) {
+        if (endTime < TSFILE_START_TIME) {
+          expectedRowNumber = 0;
+        } else {
+          expectedRowNumber =
+              Math.min((int) (endTime - TSFILE_START_TIME + 1), 
rowNumberInOneDevice);
+        }
+      } else if (tsfileEndTime < startTime) {
+        expectedRowNumber = 0;
+      } else {
+        expectedRowNumber =
+            Math.min(
+                (int) (Math.min(endTime, tsfileEndTime) - startTime + 1), 
rowNumberInOneDevice);
+      }
+    }
+
     try (final TsFileInsertionDataContainer alignedContainer =
-            new TsFileInsertionDataContainer(alignedTsFile, "root");
+            new TsFileInsertionDataContainer(alignedTsFile, "root", startTime, 
endTime);
         final TsFileInsertionDataContainer nonalignedContainer =
-            new TsFileInsertionDataContainer(nonalignedTsFile, "root"); ) {
+            new TsFileInsertionDataContainer(nonalignedTsFile, "root", 
startTime, endTime); ) {
       AtomicInteger count1 = new AtomicInteger(0);
       AtomicInteger count2 = new AtomicInteger(0);
       AtomicInteger count3 = new AtomicInteger(0);
@@ -178,9 +245,9 @@ public class TsFileInsertionDataContainerTest {
                                                         });
                                               }))));
 
-      Assert.assertEquals(count1.getAndSet(0), deviceNumber * 
rowNumberInOneDevice);
-      Assert.assertEquals(count2.getAndSet(0), deviceNumber * 
rowNumberInOneDevice);
-      Assert.assertEquals(count3.getAndSet(0), deviceNumber * 
rowNumberInOneDevice);
+      Assert.assertEquals(count1.getAndSet(0), deviceNumber * 
expectedRowNumber);
+      Assert.assertEquals(count2.getAndSet(0), deviceNumber * 
expectedRowNumber);
+      Assert.assertEquals(count3.getAndSet(0), deviceNumber * 
expectedRowNumber);
 
       nonalignedContainer
           .toTabletInsertionEvents()
@@ -228,9 +295,9 @@ public class TsFileInsertionDataContainerTest {
                                                 }
                                               }))));
 
-      Assert.assertEquals(count1.get(), deviceNumber * rowNumberInOneDevice);
-      Assert.assertEquals(count2.get(), deviceNumber * rowNumberInOneDevice);
-      Assert.assertEquals(count3.get(), deviceNumber * rowNumberInOneDevice);
+      Assert.assertEquals(count1.get(), deviceNumber * expectedRowNumber);
+      Assert.assertEquals(count2.get(), deviceNumber * expectedRowNumber);
+      Assert.assertEquals(count3.get(), deviceNumber * expectedRowNumber);
     } catch (Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
@@ -272,10 +339,11 @@ public class TsFileInsertionDataContainerTest {
     }
 
     try (final TsFileInsertionDataContainer alignedContainer =
-            new TsFileInsertionDataContainer(alignedTsFile, 
oneDeviceInAlignedTsFile.get());
+            new TsFileInsertionDataContainer(
+                alignedTsFile, oneDeviceInAlignedTsFile.get(), startTime, 
endTime);
         final TsFileInsertionDataContainer nonalignedContainer =
             new TsFileInsertionDataContainer(
-                nonalignedTsFile, oneDeviceInUnalignedTsFile.get()); ) {
+                nonalignedTsFile, oneDeviceInUnalignedTsFile.get(), startTime, 
endTime); ) {
       AtomicInteger count1 = new AtomicInteger(0);
       AtomicInteger count2 = new AtomicInteger(0);
       AtomicInteger count3 = new AtomicInteger(0);
@@ -326,9 +394,9 @@ public class TsFileInsertionDataContainerTest {
                                                         });
                                               }))));
 
-      Assert.assertEquals(count1.getAndSet(0), rowNumberInOneDevice);
-      Assert.assertEquals(count2.getAndSet(0), rowNumberInOneDevice);
-      Assert.assertEquals(count3.getAndSet(0), rowNumberInOneDevice);
+      Assert.assertEquals(count1.getAndSet(0), expectedRowNumber);
+      Assert.assertEquals(count2.getAndSet(0), expectedRowNumber);
+      Assert.assertEquals(count3.getAndSet(0), expectedRowNumber);
 
       nonalignedContainer
           .toTabletInsertionEvents()
@@ -376,19 +444,20 @@ public class TsFileInsertionDataContainerTest {
                                                 }
                                               }))));
 
-      Assert.assertEquals(count1.get(), rowNumberInOneDevice);
-      Assert.assertEquals(count2.get(), rowNumberInOneDevice);
-      Assert.assertEquals(count3.get(), rowNumberInOneDevice);
+      Assert.assertEquals(count1.get(), expectedRowNumber);
+      Assert.assertEquals(count2.get(), expectedRowNumber);
+      Assert.assertEquals(count3.get(), expectedRowNumber);
     } catch (Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
     }
 
     try (final TsFileInsertionDataContainer alignedContainer =
-            new TsFileInsertionDataContainer(alignedTsFile, 
oneMeasurementInAlignedTsFile.get());
+            new TsFileInsertionDataContainer(
+                alignedTsFile, oneMeasurementInAlignedTsFile.get(), startTime, 
endTime);
         final TsFileInsertionDataContainer nonalignedContainer =
             new TsFileInsertionDataContainer(
-                nonalignedTsFile, oneMeasurementInUnalignedTsFile.get()); ) {
+                nonalignedTsFile, oneMeasurementInUnalignedTsFile.get(), 
startTime, endTime); ) {
       AtomicInteger count1 = new AtomicInteger(0);
       AtomicInteger count2 = new AtomicInteger(0);
       AtomicInteger count3 = new AtomicInteger(0);
@@ -438,9 +507,9 @@ public class TsFileInsertionDataContainerTest {
                                                         });
                                               }))));
 
-      Assert.assertEquals(count1.getAndSet(0), rowNumberInOneDevice);
-      Assert.assertEquals(count2.getAndSet(0), rowNumberInOneDevice);
-      Assert.assertEquals(count3.getAndSet(0), rowNumberInOneDevice);
+      Assert.assertEquals(count1.getAndSet(0), expectedRowNumber);
+      Assert.assertEquals(count2.getAndSet(0), expectedRowNumber);
+      Assert.assertEquals(count3.getAndSet(0), expectedRowNumber);
 
       nonalignedContainer
           .toTabletInsertionEvents()
@@ -487,18 +556,20 @@ public class TsFileInsertionDataContainerTest {
                                                 }
                                               }))));
 
-      Assert.assertEquals(count1.get(), rowNumberInOneDevice);
-      Assert.assertEquals(count2.get(), rowNumberInOneDevice);
-      Assert.assertEquals(count3.get(), rowNumberInOneDevice);
+      Assert.assertEquals(count1.get(), expectedRowNumber);
+      Assert.assertEquals(count2.get(), expectedRowNumber);
+      Assert.assertEquals(count3.get(), expectedRowNumber);
     } catch (Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
     }
 
     try (final TsFileInsertionDataContainer alignedContainer =
-            new TsFileInsertionDataContainer(alignedTsFile, 
"not-exist-pattern");
+            new TsFileInsertionDataContainer(
+                alignedTsFile, "not-exist-pattern", startTime, endTime);
         final TsFileInsertionDataContainer nonalignedContainer =
-            new TsFileInsertionDataContainer(nonalignedTsFile, 
"not-exist-pattern"); ) {
+            new TsFileInsertionDataContainer(
+                nonalignedTsFile, "not-exist-pattern", startTime, endTime); ) {
       AtomicInteger count1 = new AtomicInteger(0);
       AtomicInteger count2 = new AtomicInteger(0);
       AtomicInteger count3 = new AtomicInteger(0);


Reply via email to