This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch IOTDB-5977 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 20d7004486bf46a0a46d72d4c9e209d7939135ef Author: Steve Yurong Su <[email protected]> AuthorDate: Thu Jun 8 20:59:30 2023 +0800 [IOTDB-5977] Pipe: start-time and end-time in collector.history are not working correctly --- .../PipeHistoricalDataRegionTsFileCollector.java | 48 ++++-- .../common/tsfile/PipeTsFileInsertionEvent.java | 23 ++- .../tsfile/TsFileInsertionDataContainer.java | 87 +++++++---- .../tsfile/TsFileInsertionDataTabletIterator.java | 10 +- .../db/pipe/processor/PipeDoNothingProcessor.java | 11 +- .../event/TsFileInsertionDataContainerTest.java | 169 +++++++++++++++------ 6 files changed, 247 insertions(+), 101 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java index 8a6f21603e9..f39923fd67a 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java @@ -44,6 +44,7 @@ import java.util.ArrayDeque; import java.util.Queue; import java.util.stream.Collectors; +import static org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_HISTORY_ENABLE_KEY; import static org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_HISTORY_END_TIME; import static org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_HISTORY_START_TIME; import static org.apache.iotdb.db.pipe.config.PipeCollectorConstant.DATA_REGION_KEY; @@ -56,12 +57,13 @@ public class PipeHistoricalDataRegionTsFileCollector extends PipeHistoricalDataR private final PipeTaskMeta pipeTaskMeta; private final ProgressIndex startIndex; - private String pattern; private int dataRegionId; - private final long historicalDataCollectionTimeLowerBound; - private long historicalDataCollectionStartTime; - private long historicalDataCollectionEndTime; + private String pattern; + + private final long historicalDataCollectionTimeLowerBound; // arrival time + private long historicalDataCollectionStartTime; // event time + private long historicalDataCollectionEndTime; // event time private Queue<PipeTsFileInsertionEvent> pendingQueue; @@ -81,18 +83,24 @@ public class PipeHistoricalDataRegionTsFileCollector extends PipeHistoricalDataR @Override public void customize( PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration) { + dataRegionId = parameters.getInt(DATA_REGION_KEY); + pattern = parameters.getStringOrDefault( PipeCollectorConstant.COLLECTOR_PATTERN_KEY, PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE); - dataRegionId = parameters.getInt(DATA_REGION_KEY); + + // user may set the COLLECTOR_HISTORY_START_TIME and COLLECTOR_HISTORY_END_TIME without + // enabling the historical data collection, which may affect the realtime data collection. + final boolean isHistoricalCollectorEnabledByUser = + parameters.getBooleanOrDefault(COLLECTOR_HISTORY_ENABLE_KEY, true); historicalDataCollectionStartTime = - parameters.hasAttribute(COLLECTOR_HISTORY_START_TIME) + isHistoricalCollectorEnabledByUser && parameters.hasAttribute(COLLECTOR_HISTORY_START_TIME) ? DateTimeUtils.convertDatetimeStrToLong( parameters.getString(COLLECTOR_HISTORY_START_TIME), ZoneId.systemDefault()) : Long.MIN_VALUE; historicalDataCollectionEndTime = - parameters.hasAttribute(COLLECTOR_HISTORY_END_TIME) + isHistoricalCollectorEnabledByUser && parameters.hasAttribute(COLLECTOR_HISTORY_END_TIME) ? DateTimeUtils.convertDatetimeStrToLong( parameters.getString(COLLECTOR_HISTORY_END_TIME), ZoneId.systemDefault()) : Long.MAX_VALUE; @@ -158,7 +166,14 @@ public class PipeHistoricalDataRegionTsFileCollector extends PipeHistoricalDataR !startIndex.isAfter(resource.getMaxProgressIndexAfterClose()) && isTsFileResourceOverlappedWithTimeRange(resource) && isTsFileGeneratedAfterCollectionTimeLowerBound(resource)) - .map(resource -> new PipeTsFileInsertionEvent(resource, pipeTaskMeta, pattern)) + .map( + resource -> + new PipeTsFileInsertionEvent( + resource, + pipeTaskMeta, + pattern, + historicalDataCollectionStartTime, + historicalDataCollectionEndTime)) .collect(Collectors.toList())); pendingQueue.addAll( tsFileManager.getTsFileList(false).stream() @@ -167,7 +182,14 @@ public class PipeHistoricalDataRegionTsFileCollector extends PipeHistoricalDataR !startIndex.isAfter(resource.getMaxProgressIndexAfterClose()) && isTsFileResourceOverlappedWithTimeRange(resource) && isTsFileGeneratedAfterCollectionTimeLowerBound(resource)) - .map(resource -> new PipeTsFileInsertionEvent(resource, pipeTaskMeta, pattern)) + .map( + resource -> + new PipeTsFileInsertionEvent( + resource, + pipeTaskMeta, + pattern, + historicalDataCollectionStartTime, + historicalDataCollectionEndTime)) .collect(Collectors.toList())); pendingQueue.forEach( event -> @@ -182,8 +204,8 @@ public class PipeHistoricalDataRegionTsFileCollector extends PipeHistoricalDataR } private boolean isTsFileResourceOverlappedWithTimeRange(TsFileResource resource) { - return !(resource.getFileEndTime() < historicalDataCollectionStartTime - || historicalDataCollectionEndTime < resource.getFileStartTime()); + return historicalDataCollectionStartTime <= resource.getFileEndTime() + || resource.getFileStartTime() <= historicalDataCollectionEndTime; } private boolean isTsFileGeneratedAfterCollectionTimeLowerBound(TsFileResource resource) { @@ -192,7 +214,9 @@ public class PipeHistoricalDataRegionTsFileCollector extends PipeHistoricalDataR <= TsFileNameGenerator.getTsFileName(resource.getTsFile().getName()).getTime(); } catch (IOException e) { LOGGER.warn( - String.format("failed to get the generation time of TsFile %s", resource.getTsFilePath()), + String.format( + "failed to get the generation time of TsFile %s, collect it anyway", + resource.getTsFilePath()), e); // If failed to get the generation time of the TsFile, we will collect the data in the TsFile // anyway. diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index 785948b66a4..d82313dad8d 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -41,6 +41,10 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileIns private static final Logger LOGGER = LoggerFactory.getLogger(PipeTsFileInsertionEvent.class); + // used to filter data + private final long startTime; + private final long endTime; + private final TsFileResource resource; private File tsFile; @@ -49,13 +53,20 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileIns private TsFileInsertionDataContainer dataContainer; public PipeTsFileInsertionEvent(TsFileResource resource) { - this(resource, null, null); + this(resource, null, null, Long.MIN_VALUE, Long.MAX_VALUE); } public PipeTsFileInsertionEvent( - TsFileResource resource, PipeTaskMeta pipeTaskMeta, String pattern) { + TsFileResource resource, + PipeTaskMeta pipeTaskMeta, + String pattern, + long startTime, + long endTime) { super(pipeTaskMeta, pattern); + this.startTime = startTime; + this.endTime = endTime; + this.resource = resource; tsFile = resource.getTsFile(); @@ -89,6 +100,10 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileIns return tsFile; } + public boolean hasTimeFilter() { + return startTime != Long.MIN_VALUE || endTime != Long.MAX_VALUE; + } + /////////////////////////// EnrichedEvent /////////////////////////// @Override @@ -138,7 +153,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileIns @Override public PipeTsFileInsertionEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( PipeTaskMeta pipeTaskMeta, String pattern) { - return new PipeTsFileInsertionEvent(resource, pipeTaskMeta, pattern); + return new PipeTsFileInsertionEvent(resource, pipeTaskMeta, pattern, startTime, endTime); } /////////////////////////// TsFileInsertionEvent /////////////////////////// @@ -148,7 +163,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileIns try { if (dataContainer == null) { waitForTsFileClose(); - dataContainer = new TsFileInsertionDataContainer(tsFile, getPattern()); + dataContainer = new TsFileInsertionDataContainer(tsFile, getPattern(), startTime, endTime); } return dataContainer.toTabletInsertionEvents(); } catch (InterruptedException e) { diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java index 6426e8dccad..e9347d31202 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java @@ -26,6 +26,10 @@ import org.apache.iotdb.tsfile.common.constant.TsFileConstant; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.TsFileReader; import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.expression.IExpression; +import org.apache.iotdb.tsfile.read.expression.impl.BinaryExpression; +import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression; +import org.apache.iotdb.tsfile.read.filter.TimeFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,7 +47,9 @@ public class TsFileInsertionDataContainer implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(TsFileInsertionDataContainer.class); + // used to filter data private final String pattern; + private final IExpression timeFilterExpression; private final TsFileSequenceReader tsFileSequenceReader; private final TsFileReader tsFileReader; @@ -51,14 +57,27 @@ public class TsFileInsertionDataContainer implements AutoCloseable { private final Iterator<Map.Entry<String, List<String>>> deviceMeasurementsMapIterator; private final Map<String, TSDataType> measurementDataTypeMap; - public TsFileInsertionDataContainer(File tsFile, String pattern) throws IOException { + public TsFileInsertionDataContainer(File tsFile, String pattern, long startTime, long endTime) + throws IOException { this.pattern = pattern; - - tsFileSequenceReader = new TsFileSequenceReader(tsFile.getAbsolutePath()); - tsFileReader = new TsFileReader(tsFileSequenceReader); - - deviceMeasurementsMapIterator = filterDeviceMeasurementsMapByPattern().entrySet().iterator(); - measurementDataTypeMap = tsFileSequenceReader.getFullPathDataTypeMap(); + timeFilterExpression = + (startTime == Long.MIN_VALUE && endTime == Long.MAX_VALUE) + ? null + : BinaryExpression.and( + new GlobalTimeExpression(TimeFilter.gtEq(startTime)), + new GlobalTimeExpression(TimeFilter.ltEq(endTime))); + + try { + tsFileSequenceReader = new TsFileSequenceReader(tsFile.getAbsolutePath()); + tsFileReader = new TsFileReader(tsFileSequenceReader); + + deviceMeasurementsMapIterator = filterDeviceMeasurementsMapByPattern().entrySet().iterator(); + measurementDataTypeMap = tsFileSequenceReader.getFullPathDataTypeMap(); + } catch (Exception e) { + LOGGER.error("failed to create TsFileInsertionDataContainer", e); + close(); + throw e; + } } private Map<String, List<String>> filterDeviceMeasurementsMapByPattern() throws IOException { @@ -109,19 +128,9 @@ public class TsFileInsertionDataContainer implements AutoCloseable { @Override public boolean hasNext() { - return (tabletIterator != null && tabletIterator.hasNext()) - || deviceMeasurementsMapIterator.hasNext(); - } - - @Override - public TabletInsertionEvent next() { - if (!hasNext()) { - throw new NoSuchElementException(); - } - while (tabletIterator == null || !tabletIterator.hasNext()) { if (!deviceMeasurementsMapIterator.hasNext()) { - throw new NoSuchElementException(); + return false; } final Map.Entry<String, List<String>> entry = deviceMeasurementsMapIterator.next(); @@ -129,21 +138,32 @@ public class TsFileInsertionDataContainer implements AutoCloseable { try { tabletIterator = new TsFileInsertionDataTabletIterator( - tsFileReader, measurementDataTypeMap, entry.getKey(), entry.getValue()); + tsFileReader, + measurementDataTypeMap, + entry.getKey(), + entry.getValue(), + timeFilterExpression); } catch (IOException e) { + close(); throw new PipeException("failed to create TsFileInsertionDataTabletIterator", e); } } + return true; + } + + @Override + public TabletInsertionEvent next() { + if (!hasNext()) { + close(); + throw new NoSuchElementException(); + } + final TabletInsertionEvent next = new PipeRawTabletInsertionEvent(tabletIterator.next()); if (!hasNext()) { - try { - close(); - } catch (Exception e) { - LOGGER.warn("Failed to close TsFileInsertionDataContainer", e); - } + close(); } return next; @@ -152,12 +172,21 @@ public class TsFileInsertionDataContainer implements AutoCloseable { } @Override - public void close() throws Exception { - if (tsFileReader != null) { - tsFileReader.close(); + public void close() { + try { + if (tsFileReader != null) { + tsFileReader.close(); + } + } catch (IOException e) { + LOGGER.warn("Failed to close TsFileReader", e); } - if (tsFileSequenceReader != null) { - tsFileSequenceReader.close(); + + try { + if (tsFileSequenceReader != null) { + tsFileSequenceReader.close(); + } + } catch (IOException e) { + LOGGER.warn("Failed to close TsFileSequenceReader", e); } } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataTabletIterator.java b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataTabletIterator.java index 8b3b1e14fe5..63b2b15d403 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataTabletIterator.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataTabletIterator.java @@ -26,6 +26,7 @@ import org.apache.iotdb.tsfile.read.TsFileReader; import org.apache.iotdb.tsfile.read.common.Field; import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.read.common.RowRecord; +import org.apache.iotdb.tsfile.read.expression.IExpression; import org.apache.iotdb.tsfile.read.expression.QueryExpression; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; import org.apache.iotdb.tsfile.write.record.Tablet; @@ -47,13 +48,16 @@ public class TsFileInsertionDataTabletIterator implements Iterator<Tablet> { private final String deviceId; private final List<String> measurements; + private final IExpression timeFilterExpression; + private final QueryDataSet queryDataSet; public TsFileInsertionDataTabletIterator( TsFileReader tsFileReader, Map<String, TSDataType> measurementDataTypeMap, String deviceId, - List<String> measurements) + List<String> measurements, + IExpression timeFilterExpression) throws IOException { this.tsFileReader = tsFileReader; this.measurementDataTypeMap = measurementDataTypeMap; @@ -68,6 +72,8 @@ public class TsFileInsertionDataTabletIterator implements Iterator<Tablet> { .sorted() .collect(Collectors.toList()); + this.timeFilterExpression = timeFilterExpression; + this.queryDataSet = buildQueryDataSet(); } @@ -76,7 +82,7 @@ public class TsFileInsertionDataTabletIterator implements Iterator<Tablet> { for (String measurement : measurements) { paths.add(new Path(deviceId, measurement, false)); } - return tsFileReader.query(QueryExpression.create(paths, null)); + return tsFileReader.query(QueryExpression.create(paths, timeFilterExpression)); } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/processor/PipeDoNothingProcessor.java b/server/src/main/java/org/apache/iotdb/db/pipe/processor/PipeDoNothingProcessor.java index a976358611e..c01751ed824 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/processor/PipeDoNothingProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/processor/PipeDoNothingProcessor.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.processor; import org.apache.iotdb.db.pipe.config.PipeCollectorConstant; import org.apache.iotdb.db.pipe.event.EnrichedEvent; +import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.pipe.api.PipeProcessor; import org.apache.iotdb.pipe.api.collector.EventCollector; import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator; @@ -82,11 +83,11 @@ public class PipeDoNothingProcessor implements PipeProcessor { @Override public void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector eventCollector) throws IOException { - if (tsFileInsertionEvent instanceof EnrichedEvent) { - final EnrichedEvent enrichedEvent = (EnrichedEvent) tsFileInsertionEvent; - if (enrichedEvent - .getPattern() - .equals(PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE)) { + if (tsFileInsertionEvent instanceof PipeTsFileInsertionEvent) { + final PipeTsFileInsertionEvent enrichedEvent = + (PipeTsFileInsertionEvent) tsFileInsertionEvent; + if (enrichedEvent.getPattern().equals(PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE) + && !enrichedEvent.hasTimeFilter()) { eventCollector.collect(tsFileInsertionEvent); } else { for (final TabletInsertionEvent tabletInsertionEvent : diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java index aa0b7506cbf..8b797abcfff 100644 --- a/server/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java @@ -23,6 +23,7 @@ import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.TsFileInsertionDataContainer; import org.apache.iotdb.tsfile.read.TsFileSequenceReader; import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.utils.TsFileGeneratorUtils; import org.junit.After; @@ -45,6 +46,8 @@ public class TsFileInsertionDataContainerTest { private static final Logger LOGGER = LoggerFactory.getLogger(TsFileInsertionDataContainerTest.class); + private static final long TSFILE_START_TIME = 300L; + private File alignedTsFile; private File nonalignedTsFile; @@ -68,40 +71,83 @@ public class TsFileInsertionDataContainerTest { measurementNumbers.add(1); measurementNumbers.add(2); - for (int deviceNumber : deviceNumbers) { - for (int measurementNumber : measurementNumbers) { - testToTabletInsertionEvents(deviceNumber, measurementNumber, 0); - testToTabletInsertionEvents(deviceNumber, measurementNumber, 1); - testToTabletInsertionEvents(deviceNumber, measurementNumber, 2); + Set<Pair<Long, Long>> startEndTimes = new HashSet<>(); + startEndTimes.add(new Pair<>(100L, TSFILE_START_TIME - 1)); + startEndTimes.add(new Pair<>(100L, TSFILE_START_TIME)); + startEndTimes.add(new Pair<>(100L, TSFILE_START_TIME + 1)); - testToTabletInsertionEvents(deviceNumber, measurementNumber, 999); - testToTabletInsertionEvents(deviceNumber, measurementNumber, 1000); - testToTabletInsertionEvents(deviceNumber, measurementNumber, 1001); + startEndTimes.add(new Pair<>(TSFILE_START_TIME - 1, TSFILE_START_TIME - 1)); + startEndTimes.add(new Pair<>(TSFILE_START_TIME, TSFILE_START_TIME)); + startEndTimes.add(new Pair<>(TSFILE_START_TIME + 1, TSFILE_START_TIME + 1)); - testToTabletInsertionEvents(deviceNumber, measurementNumber, 999 * 2 + 1); - testToTabletInsertionEvents(deviceNumber, measurementNumber, 1000); - testToTabletInsertionEvents(deviceNumber, measurementNumber, 1001 * 2 - 1); + startEndTimes.add(new Pair<>(TSFILE_START_TIME + 1, TSFILE_START_TIME + 1)); + startEndTimes.add(new Pair<>(TSFILE_START_TIME + 1, TSFILE_START_TIME + 10)); + startEndTimes.add(new Pair<>(TSFILE_START_TIME + 1, TSFILE_START_TIME + 100)); + startEndTimes.add(new Pair<>(TSFILE_START_TIME + 1, TSFILE_START_TIME + 10000)); - testToTabletInsertionEvents(deviceNumber, measurementNumber, 1023); - testToTabletInsertionEvents(deviceNumber, measurementNumber, 1024); - testToTabletInsertionEvents(deviceNumber, measurementNumber, 1025); + startEndTimes.add(new Pair<>(TSFILE_START_TIME + 1000000, TSFILE_START_TIME + 2000000)); - testToTabletInsertionEvents(deviceNumber, measurementNumber, 1023 * 2 + 1); - testToTabletInsertionEvents(deviceNumber, measurementNumber, 1024 * 2); - testToTabletInsertionEvents(deviceNumber, measurementNumber, 1025 * 2 - 1); + startEndTimes.add(new Pair<>(Long.MIN_VALUE, Long.MAX_VALUE)); - testToTabletInsertionEvents(deviceNumber, measurementNumber, 10001); + for (int deviceNumber : deviceNumbers) { + for (int measurementNumber : measurementNumbers) { + for (Pair<Long, Long> startEndTime : startEndTimes) { + testToTabletInsertionEvents( + deviceNumber, measurementNumber, 0, startEndTime.left, startEndTime.right); + testToTabletInsertionEvents( + deviceNumber, measurementNumber, 1, startEndTime.left, startEndTime.right); + testToTabletInsertionEvents( + deviceNumber, measurementNumber, 2, startEndTime.left, startEndTime.right); + + testToTabletInsertionEvents( + deviceNumber, measurementNumber, 999, startEndTime.left, startEndTime.right); + testToTabletInsertionEvents( + deviceNumber, measurementNumber, 1000, startEndTime.left, startEndTime.right); + testToTabletInsertionEvents( + deviceNumber, measurementNumber, 1001, startEndTime.left, startEndTime.right); + + testToTabletInsertionEvents( + deviceNumber, measurementNumber, 999 * 2 + 1, startEndTime.left, startEndTime.right); + testToTabletInsertionEvents( + deviceNumber, measurementNumber, 1000, startEndTime.left, startEndTime.right); + testToTabletInsertionEvents( + deviceNumber, measurementNumber, 1001 * 2 - 1, startEndTime.left, startEndTime.right); + + testToTabletInsertionEvents( + deviceNumber, measurementNumber, 1023, startEndTime.left, startEndTime.right); + testToTabletInsertionEvents( + deviceNumber, measurementNumber, 1024, startEndTime.left, startEndTime.right); + testToTabletInsertionEvents( + deviceNumber, measurementNumber, 1025, startEndTime.left, startEndTime.right); + + testToTabletInsertionEvents( + deviceNumber, measurementNumber, 1023 * 2 + 1, startEndTime.left, startEndTime.right); + testToTabletInsertionEvents( + deviceNumber, measurementNumber, 1024 * 2, startEndTime.left, startEndTime.right); + testToTabletInsertionEvents( + deviceNumber, measurementNumber, 1025 * 2 - 1, startEndTime.left, startEndTime.right); + + testToTabletInsertionEvents( + deviceNumber, measurementNumber, 10001, startEndTime.left, startEndTime.right); + } } } } private void testToTabletInsertionEvents( - int deviceNumber, int measurementNumber, int rowNumberInOneDevice) throws Exception { + int deviceNumber, + int measurementNumber, + int rowNumberInOneDevice, + long startTime, + long endTime) + throws Exception { LOGGER.info( - "testToTabletInsertionEvents: deviceNumber = {}, measurementNumber = {}, rowNumberInOneDevice = {}", + "testToTabletInsertionEvents: deviceNumber: {}, measurementNumber: {}, rowNumberInOneDevice: {}, startTime: {}, endTime: {}", deviceNumber, measurementNumber, - rowNumberInOneDevice); + rowNumberInOneDevice, + startTime, + endTime); alignedTsFile = TsFileGeneratorUtils.generateAlignedTsFile( @@ -109,7 +155,7 @@ public class TsFileInsertionDataContainerTest { deviceNumber, measurementNumber, rowNumberInOneDevice, - 300, + (int) TSFILE_START_TIME, 10000, 700, 50); @@ -119,15 +165,36 @@ public class TsFileInsertionDataContainerTest { deviceNumber, measurementNumber, rowNumberInOneDevice, - 300, + (int) TSFILE_START_TIME, 10000, 700, 50); + final int tsfileEndTime = (int) TSFILE_START_TIME + rowNumberInOneDevice - 1; + + int expectedRowNumber = rowNumberInOneDevice; + Assert.assertTrue(startTime <= endTime); + if (startTime != Long.MIN_VALUE && endTime != Long.MAX_VALUE) { + if (startTime < TSFILE_START_TIME) { + if (endTime < TSFILE_START_TIME) { + expectedRowNumber = 0; + } else { + expectedRowNumber = + Math.min((int) (endTime - TSFILE_START_TIME + 1), rowNumberInOneDevice); + } + } else if (tsfileEndTime < startTime) { + expectedRowNumber = 0; + } else { + expectedRowNumber = + Math.min( + (int) (Math.min(endTime, tsfileEndTime) - startTime + 1), rowNumberInOneDevice); + } + } + try (final TsFileInsertionDataContainer alignedContainer = - new TsFileInsertionDataContainer(alignedTsFile, "root"); + new TsFileInsertionDataContainer(alignedTsFile, "root", startTime, endTime); final TsFileInsertionDataContainer nonalignedContainer = - new TsFileInsertionDataContainer(nonalignedTsFile, "root"); ) { + new TsFileInsertionDataContainer(nonalignedTsFile, "root", startTime, endTime); ) { AtomicInteger count1 = new AtomicInteger(0); AtomicInteger count2 = new AtomicInteger(0); AtomicInteger count3 = new AtomicInteger(0); @@ -178,9 +245,9 @@ public class TsFileInsertionDataContainerTest { }); })))); - Assert.assertEquals(count1.getAndSet(0), deviceNumber * rowNumberInOneDevice); - Assert.assertEquals(count2.getAndSet(0), deviceNumber * rowNumberInOneDevice); - Assert.assertEquals(count3.getAndSet(0), deviceNumber * rowNumberInOneDevice); + Assert.assertEquals(count1.getAndSet(0), deviceNumber * expectedRowNumber); + Assert.assertEquals(count2.getAndSet(0), deviceNumber * expectedRowNumber); + Assert.assertEquals(count3.getAndSet(0), deviceNumber * expectedRowNumber); nonalignedContainer .toTabletInsertionEvents() @@ -228,9 +295,9 @@ public class TsFileInsertionDataContainerTest { } })))); - Assert.assertEquals(count1.get(), deviceNumber * rowNumberInOneDevice); - Assert.assertEquals(count2.get(), deviceNumber * rowNumberInOneDevice); - Assert.assertEquals(count3.get(), deviceNumber * rowNumberInOneDevice); + Assert.assertEquals(count1.get(), deviceNumber * expectedRowNumber); + Assert.assertEquals(count2.get(), deviceNumber * expectedRowNumber); + Assert.assertEquals(count3.get(), deviceNumber * expectedRowNumber); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); @@ -272,10 +339,11 @@ public class TsFileInsertionDataContainerTest { } try (final TsFileInsertionDataContainer alignedContainer = - new TsFileInsertionDataContainer(alignedTsFile, oneDeviceInAlignedTsFile.get()); + new TsFileInsertionDataContainer( + alignedTsFile, oneDeviceInAlignedTsFile.get(), startTime, endTime); final TsFileInsertionDataContainer nonalignedContainer = new TsFileInsertionDataContainer( - nonalignedTsFile, oneDeviceInUnalignedTsFile.get()); ) { + nonalignedTsFile, oneDeviceInUnalignedTsFile.get(), startTime, endTime); ) { AtomicInteger count1 = new AtomicInteger(0); AtomicInteger count2 = new AtomicInteger(0); AtomicInteger count3 = new AtomicInteger(0); @@ -326,9 +394,9 @@ public class TsFileInsertionDataContainerTest { }); })))); - Assert.assertEquals(count1.getAndSet(0), rowNumberInOneDevice); - Assert.assertEquals(count2.getAndSet(0), rowNumberInOneDevice); - Assert.assertEquals(count3.getAndSet(0), rowNumberInOneDevice); + Assert.assertEquals(count1.getAndSet(0), expectedRowNumber); + Assert.assertEquals(count2.getAndSet(0), expectedRowNumber); + Assert.assertEquals(count3.getAndSet(0), expectedRowNumber); nonalignedContainer .toTabletInsertionEvents() @@ -376,19 +444,20 @@ public class TsFileInsertionDataContainerTest { } })))); - Assert.assertEquals(count1.get(), rowNumberInOneDevice); - Assert.assertEquals(count2.get(), rowNumberInOneDevice); - Assert.assertEquals(count3.get(), rowNumberInOneDevice); + Assert.assertEquals(count1.get(), expectedRowNumber); + Assert.assertEquals(count2.get(), expectedRowNumber); + Assert.assertEquals(count3.get(), expectedRowNumber); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } try (final TsFileInsertionDataContainer alignedContainer = - new TsFileInsertionDataContainer(alignedTsFile, oneMeasurementInAlignedTsFile.get()); + new TsFileInsertionDataContainer( + alignedTsFile, oneMeasurementInAlignedTsFile.get(), startTime, endTime); final TsFileInsertionDataContainer nonalignedContainer = new TsFileInsertionDataContainer( - nonalignedTsFile, oneMeasurementInUnalignedTsFile.get()); ) { + nonalignedTsFile, oneMeasurementInUnalignedTsFile.get(), startTime, endTime); ) { AtomicInteger count1 = new AtomicInteger(0); AtomicInteger count2 = new AtomicInteger(0); AtomicInteger count3 = new AtomicInteger(0); @@ -438,9 +507,9 @@ public class TsFileInsertionDataContainerTest { }); })))); - Assert.assertEquals(count1.getAndSet(0), rowNumberInOneDevice); - Assert.assertEquals(count2.getAndSet(0), rowNumberInOneDevice); - Assert.assertEquals(count3.getAndSet(0), rowNumberInOneDevice); + Assert.assertEquals(count1.getAndSet(0), expectedRowNumber); + Assert.assertEquals(count2.getAndSet(0), expectedRowNumber); + Assert.assertEquals(count3.getAndSet(0), expectedRowNumber); nonalignedContainer .toTabletInsertionEvents() @@ -487,18 +556,20 @@ public class TsFileInsertionDataContainerTest { } })))); - Assert.assertEquals(count1.get(), rowNumberInOneDevice); - Assert.assertEquals(count2.get(), rowNumberInOneDevice); - Assert.assertEquals(count3.get(), rowNumberInOneDevice); + Assert.assertEquals(count1.get(), expectedRowNumber); + Assert.assertEquals(count2.get(), expectedRowNumber); + Assert.assertEquals(count3.get(), expectedRowNumber); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } try (final TsFileInsertionDataContainer alignedContainer = - new TsFileInsertionDataContainer(alignedTsFile, "not-exist-pattern"); + new TsFileInsertionDataContainer( + alignedTsFile, "not-exist-pattern", startTime, endTime); final TsFileInsertionDataContainer nonalignedContainer = - new TsFileInsertionDataContainer(nonalignedTsFile, "not-exist-pattern"); ) { + new TsFileInsertionDataContainer( + nonalignedTsFile, "not-exist-pattern", startTime, endTime); ) { AtomicInteger count1 = new AtomicInteger(0); AtomicInteger count2 = new AtomicInteger(0); AtomicInteger count3 = new AtomicInteger(0);
