This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 08281208099 [IOTDB-5977][IOTDB-5979][IOTDB-5981] Pipe: serveral bug
fixes in pipe execution engine (#10087)
08281208099 is described below
commit 082812080998c12964cecbd1f8da209b39c0420c
Author: Steve Yurong Su <[email protected]>
AuthorDate: Fri Jun 9 01:50:23 2023 +0800
[IOTDB-5977][IOTDB-5979][IOTDB-5981] Pipe: serveral bug fixes in pipe
execution engine (#10087)
* [IOTDB-5977] Pipe: start-time and end-time in collector.history are not
working correctly
* [IOTDB-5979] Pipe: validation and customization failures during the first
run of the PipeProcessor will affect the creation of subsequent pipes
* [IOTDB-5981] Pipe: fail to process realtime TEXT data with pattern filter
---
.../iotdb/db/pipe/agent/task/PipeTaskAgent.java | 96 ++++++++----
.../PipeHistoricalDataRegionTsFileCollector.java | 48 ++++--
.../event/common/row/PipeBinaryTransformer.java | 37 +++++
.../iotdb/db/pipe/event/common/row/PipeRow.java | 10 +-
.../db/pipe/event/common/row/PipeRowCollector.java | 10 +-
.../common/tsfile/PipeTsFileInsertionEvent.java | 23 ++-
.../tsfile/TsFileInsertionDataContainer.java | 87 +++++++----
.../tsfile/TsFileInsertionDataTabletIterator.java | 10 +-
.../db/pipe/processor/PipeDoNothingProcessor.java | 11 +-
.../apache/iotdb/db/pipe/task/PipeTaskBuilder.java | 1 -
.../db/pipe/task/stage/PipeTaskCollectorStage.java | 49 +++---
.../db/pipe/task/stage/PipeTaskProcessorStage.java | 42 ++---
.../event/TsFileInsertionDataContainerTest.java | 169 +++++++++++++++------
13 files changed, 397 insertions(+), 196 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index d21d8ed5d4e..e3fe56a52ec 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.db.pipe.task.PipeTaskBuilder;
import org.apache.iotdb.db.pipe.task.PipeTaskManager;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
+import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.thrift.TException;
import org.slf4j.Logger;
@@ -86,42 +87,52 @@ public class PipeTaskAgent {
return;
}
+ final List<Exception> exceptions = new ArrayList<>();
+
// iterate through pipe meta list from config node, check if pipe meta
exists on data node
// or has changed
for (final PipeMeta metaFromConfigNode : pipeMetaListFromConfigNode) {
final String pipeName = metaFromConfigNode.getStaticMeta().getPipeName();
- final PipeMeta metaOnDataNode = pipeMetaKeeper.getPipeMeta(pipeName);
-
- // if pipe meta does not exist on data node, create a new pipe
- if (metaOnDataNode == null) {
- if (createPipe(metaFromConfigNode)) {
- // if the status recorded in config node is RUNNING, start the pipe
- startPipe(pipeName,
metaFromConfigNode.getStaticMeta().getCreationTime());
+ try {
+ final PipeMeta metaOnDataNode = pipeMetaKeeper.getPipeMeta(pipeName);
+
+ // if pipe meta does not exist on data node, create a new pipe
+ if (metaOnDataNode == null) {
+ if (createPipe(metaFromConfigNode)) {
+ // if the status recorded in config node is RUNNING, start the pipe
+ startPipe(pipeName,
metaFromConfigNode.getStaticMeta().getCreationTime());
+ }
+ // if the status recorded in config node is STOPPED or DROPPED, do
nothing
+ continue;
}
- // if the status recorded in config node is STOPPED or DROPPED, do
nothing
- continue;
- }
- // if pipe meta exists on data node, check if it has changed
- final PipeStaticMeta staticMetaOnDataNode =
metaOnDataNode.getStaticMeta();
- final PipeStaticMeta staticMetaFromConfigNode =
metaFromConfigNode.getStaticMeta();
-
- // first check if pipe static meta has changed, if so, drop the pipe and
create a new one
- if (!staticMetaOnDataNode.equals(staticMetaFromConfigNode)) {
- dropPipe(pipeName);
- if (createPipe(metaFromConfigNode)) {
- startPipe(pipeName,
metaFromConfigNode.getStaticMeta().getCreationTime());
+ // if pipe meta exists on data node, check if it has changed
+ final PipeStaticMeta staticMetaOnDataNode =
metaOnDataNode.getStaticMeta();
+ final PipeStaticMeta staticMetaFromConfigNode =
metaFromConfigNode.getStaticMeta();
+
+ // first check if pipe static meta has changed, if so, drop the pipe
and create a new one
+ if (!staticMetaOnDataNode.equals(staticMetaFromConfigNode)) {
+ dropPipe(pipeName);
+ if (createPipe(metaFromConfigNode)) {
+ startPipe(pipeName,
metaFromConfigNode.getStaticMeta().getCreationTime());
+ }
+ // if the status is STOPPED or DROPPED, do nothing
+ continue;
}
- // if the status is STOPPED or DROPPED, do nothing
- continue;
- }
- // then check if pipe runtime meta has changed, if so, update the pipe
- final PipeRuntimeMeta runtimeMetaOnDataNode =
metaOnDataNode.getRuntimeMeta();
- final PipeRuntimeMeta runtimeMetaFromConfigNode =
metaFromConfigNode.getRuntimeMeta();
- handlePipeRuntimeMetaChanges(
- staticMetaFromConfigNode, runtimeMetaFromConfigNode,
runtimeMetaOnDataNode);
+ // then check if pipe runtime meta has changed, if so, update the pipe
+ final PipeRuntimeMeta runtimeMetaOnDataNode =
metaOnDataNode.getRuntimeMeta();
+ final PipeRuntimeMeta runtimeMetaFromConfigNode =
metaFromConfigNode.getRuntimeMeta();
+ handlePipeRuntimeMetaChanges(
+ staticMetaFromConfigNode, runtimeMetaFromConfigNode,
runtimeMetaOnDataNode);
+ } catch (Exception e) {
+ final String errorMessage =
+ String.format(
+ "Failed to handle pipe meta changes for %s, because %s",
pipeName, e.getMessage());
+ LOGGER.warn("Failed to handle pipe meta changes for {}", pipeName, e);
+ exceptions.add(new PipeException(errorMessage, e));
+ }
}
// check if there are pipes on data node that do not exist on config node,
if so, drop them
@@ -130,10 +141,26 @@ public class PipeTaskAgent {
.map(meta -> meta.getStaticMeta().getPipeName())
.collect(Collectors.toSet());
for (final PipeMeta metaOnDataNode : pipeMetaKeeper.getPipeMetaList()) {
- if
(!pipeNamesFromConfigNode.contains(metaOnDataNode.getStaticMeta().getPipeName()))
{
- dropPipe(metaOnDataNode.getStaticMeta().getPipeName());
+ final String pipeName = metaOnDataNode.getStaticMeta().getPipeName();
+
+ try {
+ if (!pipeNamesFromConfigNode.contains(pipeName)) {
+ dropPipe(metaOnDataNode.getStaticMeta().getPipeName());
+ }
+ } catch (Exception e) {
+ final String errorMessage =
+ String.format(
+ "Failed to handle pipe meta changes for %s, because %s",
pipeName, e.getMessage());
+ LOGGER.warn("Failed to handle pipe meta changes for {}", pipeName, e);
+ exceptions.add(new PipeException(errorMessage, e));
}
}
+
+ if (!exceptions.isEmpty()) {
+ throw new PipeException(
+ String.format(
+ "Failed to handle pipe meta changes on data node, because: %s",
exceptions));
+ }
}
private void handlePipeRuntimeMetaChanges(
@@ -239,7 +266,16 @@ public class PipeTaskAgent {
public synchronized void dropAllPipeTasks() {
for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
- dropPipe(pipeMeta.getStaticMeta().getPipeName(),
pipeMeta.getStaticMeta().getCreationTime());
+ try {
+ dropPipe(
+ pipeMeta.getStaticMeta().getPipeName(),
pipeMeta.getStaticMeta().getCreationTime());
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "Failed to drop pipe {} with creation time {}",
+ pipeMeta.getStaticMeta().getPipeName(),
+ pipeMeta.getStaticMeta().getCreationTime(),
+ e);
+ }
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
index 8a6f21603e9..f39923fd67a 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
@@ -44,6 +44,7 @@ import java.util.ArrayDeque;
import java.util.Queue;
import java.util.stream.Collectors;
+import static
org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_HISTORY_ENABLE_KEY;
import static
org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_HISTORY_END_TIME;
import static
org.apache.iotdb.db.pipe.config.PipeCollectorConstant.COLLECTOR_HISTORY_START_TIME;
import static
org.apache.iotdb.db.pipe.config.PipeCollectorConstant.DATA_REGION_KEY;
@@ -56,12 +57,13 @@ public class PipeHistoricalDataRegionTsFileCollector
extends PipeHistoricalDataR
private final PipeTaskMeta pipeTaskMeta;
private final ProgressIndex startIndex;
- private String pattern;
private int dataRegionId;
- private final long historicalDataCollectionTimeLowerBound;
- private long historicalDataCollectionStartTime;
- private long historicalDataCollectionEndTime;
+ private String pattern;
+
+ private final long historicalDataCollectionTimeLowerBound; // arrival time
+ private long historicalDataCollectionStartTime; // event time
+ private long historicalDataCollectionEndTime; // event time
private Queue<PipeTsFileInsertionEvent> pendingQueue;
@@ -81,18 +83,24 @@ public class PipeHistoricalDataRegionTsFileCollector
extends PipeHistoricalDataR
@Override
public void customize(
PipeParameters parameters, PipeCollectorRuntimeConfiguration
configuration) {
+ dataRegionId = parameters.getInt(DATA_REGION_KEY);
+
pattern =
parameters.getStringOrDefault(
PipeCollectorConstant.COLLECTOR_PATTERN_KEY,
PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE);
- dataRegionId = parameters.getInt(DATA_REGION_KEY);
+
+ // user may set the COLLECTOR_HISTORY_START_TIME and
COLLECTOR_HISTORY_END_TIME without
+ // enabling the historical data collection, which may affect the realtime
data collection.
+ final boolean isHistoricalCollectorEnabledByUser =
+ parameters.getBooleanOrDefault(COLLECTOR_HISTORY_ENABLE_KEY, true);
historicalDataCollectionStartTime =
- parameters.hasAttribute(COLLECTOR_HISTORY_START_TIME)
+ isHistoricalCollectorEnabledByUser &&
parameters.hasAttribute(COLLECTOR_HISTORY_START_TIME)
? DateTimeUtils.convertDatetimeStrToLong(
parameters.getString(COLLECTOR_HISTORY_START_TIME),
ZoneId.systemDefault())
: Long.MIN_VALUE;
historicalDataCollectionEndTime =
- parameters.hasAttribute(COLLECTOR_HISTORY_END_TIME)
+ isHistoricalCollectorEnabledByUser &&
parameters.hasAttribute(COLLECTOR_HISTORY_END_TIME)
? DateTimeUtils.convertDatetimeStrToLong(
parameters.getString(COLLECTOR_HISTORY_END_TIME),
ZoneId.systemDefault())
: Long.MAX_VALUE;
@@ -158,7 +166,14 @@ public class PipeHistoricalDataRegionTsFileCollector
extends PipeHistoricalDataR
!startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
&&
isTsFileResourceOverlappedWithTimeRange(resource)
&&
isTsFileGeneratedAfterCollectionTimeLowerBound(resource))
- .map(resource -> new PipeTsFileInsertionEvent(resource,
pipeTaskMeta, pattern))
+ .map(
+ resource ->
+ new PipeTsFileInsertionEvent(
+ resource,
+ pipeTaskMeta,
+ pattern,
+ historicalDataCollectionStartTime,
+ historicalDataCollectionEndTime))
.collect(Collectors.toList()));
pendingQueue.addAll(
tsFileManager.getTsFileList(false).stream()
@@ -167,7 +182,14 @@ public class PipeHistoricalDataRegionTsFileCollector
extends PipeHistoricalDataR
!startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
&&
isTsFileResourceOverlappedWithTimeRange(resource)
&&
isTsFileGeneratedAfterCollectionTimeLowerBound(resource))
- .map(resource -> new PipeTsFileInsertionEvent(resource,
pipeTaskMeta, pattern))
+ .map(
+ resource ->
+ new PipeTsFileInsertionEvent(
+ resource,
+ pipeTaskMeta,
+ pattern,
+ historicalDataCollectionStartTime,
+ historicalDataCollectionEndTime))
.collect(Collectors.toList()));
pendingQueue.forEach(
event ->
@@ -182,8 +204,8 @@ public class PipeHistoricalDataRegionTsFileCollector
extends PipeHistoricalDataR
}
private boolean isTsFileResourceOverlappedWithTimeRange(TsFileResource
resource) {
- return !(resource.getFileEndTime() < historicalDataCollectionStartTime
- || historicalDataCollectionEndTime < resource.getFileStartTime());
+ return historicalDataCollectionStartTime <= resource.getFileEndTime()
+ || resource.getFileStartTime() <= historicalDataCollectionEndTime;
}
private boolean
isTsFileGeneratedAfterCollectionTimeLowerBound(TsFileResource resource) {
@@ -192,7 +214,9 @@ public class PipeHistoricalDataRegionTsFileCollector
extends PipeHistoricalDataR
<=
TsFileNameGenerator.getTsFileName(resource.getTsFile().getName()).getTime();
} catch (IOException e) {
LOGGER.warn(
- String.format("failed to get the generation time of TsFile %s",
resource.getTsFilePath()),
+ String.format(
+ "failed to get the generation time of TsFile %s, collect it
anyway",
+ resource.getTsFilePath()),
e);
// If failed to get the generation time of the TsFile, we will collect
the data in the TsFile
// anyway.
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeBinaryTransformer.java
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeBinaryTransformer.java
new file mode 100644
index 00000000000..70d4c362c70
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeBinaryTransformer.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.event.common.row;
+
+public class PipeBinaryTransformer {
+
+ public static org.apache.iotdb.tsfile.utils.Binary transformToBinary(
+ org.apache.iotdb.pipe.api.type.Binary binary) {
+ return binary == null ? null : new
org.apache.iotdb.tsfile.utils.Binary(binary.getValues());
+ }
+
+ public static org.apache.iotdb.pipe.api.type.Binary transformToPipeBinary(
+ org.apache.iotdb.tsfile.utils.Binary binary) {
+ return binary == null ? null : new
org.apache.iotdb.pipe.api.type.Binary(binary.getValues());
+ }
+
+ private PipeBinaryTransformer() {
+ // util class
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
index 8b9e10ebf91..24ad5e108fc 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.pipe.event.common.row;
import org.apache.iotdb.pipe.api.access.Row;
import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
-import org.apache.iotdb.pipe.api.type.Binary;
import org.apache.iotdb.pipe.api.type.Type;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
@@ -95,13 +94,16 @@ public class PipeRow implements Row {
}
@Override
- public Binary getBinary(int columnIndex) {
- return ((Binary[]) valueColumns[columnIndex])[rowIndex];
+ public org.apache.iotdb.pipe.api.type.Binary getBinary(int columnIndex) {
+ return PipeBinaryTransformer.transformToPipeBinary(
+ ((org.apache.iotdb.tsfile.utils.Binary[])
valueColumns[columnIndex])[rowIndex]);
}
@Override
public String getString(int columnIndex) {
- return ((Binary[]) valueColumns[columnIndex])[rowIndex].getStringValue();
+ final org.apache.iotdb.tsfile.utils.Binary binary =
+ ((org.apache.iotdb.tsfile.utils.Binary[])
valueColumns[columnIndex])[rowIndex];
+ return binary == null ? null : binary.getStringValue();
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
index 983a7877713..ea989d48027 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
@@ -56,7 +56,15 @@ public class PipeRowCollector implements RowCollector {
final int rowIndex = tablet.rowSize;
tablet.addTimestamp(rowIndex, row.getTime());
for (int i = 0; i < row.size(); i++) {
- tablet.addValue(measurementSchemaArray[i].getMeasurementId(), rowIndex,
row.getObject(i));
+ final Object value = row.getObject(i);
+ if (value instanceof org.apache.iotdb.pipe.api.type.Binary) {
+ tablet.addValue(
+ measurementSchemaArray[i].getMeasurementId(),
+ rowIndex,
+
PipeBinaryTransformer.transformToBinary((org.apache.iotdb.pipe.api.type.Binary)
value));
+ } else {
+ tablet.addValue(measurementSchemaArray[i].getMeasurementId(),
rowIndex, value);
+ }
if (row.isNull(i)) {
tablet.bitMaps[i].mark(rowIndex);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 785948b66a4..d82313dad8d 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -41,6 +41,10 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTsFileInsertionEvent.class);
+ // used to filter data
+ private final long startTime;
+ private final long endTime;
+
private final TsFileResource resource;
private File tsFile;
@@ -49,13 +53,20 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
private TsFileInsertionDataContainer dataContainer;
public PipeTsFileInsertionEvent(TsFileResource resource) {
- this(resource, null, null);
+ this(resource, null, null, Long.MIN_VALUE, Long.MAX_VALUE);
}
public PipeTsFileInsertionEvent(
- TsFileResource resource, PipeTaskMeta pipeTaskMeta, String pattern) {
+ TsFileResource resource,
+ PipeTaskMeta pipeTaskMeta,
+ String pattern,
+ long startTime,
+ long endTime) {
super(pipeTaskMeta, pattern);
+ this.startTime = startTime;
+ this.endTime = endTime;
+
this.resource = resource;
tsFile = resource.getTsFile();
@@ -89,6 +100,10 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
return tsFile;
}
+ public boolean hasTimeFilter() {
+ return startTime != Long.MIN_VALUE || endTime != Long.MAX_VALUE;
+ }
+
/////////////////////////// EnrichedEvent ///////////////////////////
@Override
@@ -138,7 +153,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
@Override
public PipeTsFileInsertionEvent
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
PipeTaskMeta pipeTaskMeta, String pattern) {
- return new PipeTsFileInsertionEvent(resource, pipeTaskMeta, pattern);
+ return new PipeTsFileInsertionEvent(resource, pipeTaskMeta, pattern,
startTime, endTime);
}
/////////////////////////// TsFileInsertionEvent ///////////////////////////
@@ -148,7 +163,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
try {
if (dataContainer == null) {
waitForTsFileClose();
- dataContainer = new TsFileInsertionDataContainer(tsFile, getPattern());
+ dataContainer = new TsFileInsertionDataContainer(tsFile, getPattern(),
startTime, endTime);
}
return dataContainer.toTabletInsertionEvents();
} catch (InterruptedException e) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
index 6426e8dccad..e9347d31202 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
@@ -26,6 +26,10 @@ import
org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TsFileReader;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.BinaryExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,7 +47,9 @@ public class TsFileInsertionDataContainer implements
AutoCloseable {
private static final Logger LOGGER =
LoggerFactory.getLogger(TsFileInsertionDataContainer.class);
+ // used to filter data
private final String pattern;
+ private final IExpression timeFilterExpression;
private final TsFileSequenceReader tsFileSequenceReader;
private final TsFileReader tsFileReader;
@@ -51,14 +57,27 @@ public class TsFileInsertionDataContainer implements
AutoCloseable {
private final Iterator<Map.Entry<String, List<String>>>
deviceMeasurementsMapIterator;
private final Map<String, TSDataType> measurementDataTypeMap;
- public TsFileInsertionDataContainer(File tsFile, String pattern) throws
IOException {
+ public TsFileInsertionDataContainer(File tsFile, String pattern, long
startTime, long endTime)
+ throws IOException {
this.pattern = pattern;
-
- tsFileSequenceReader = new TsFileSequenceReader(tsFile.getAbsolutePath());
- tsFileReader = new TsFileReader(tsFileSequenceReader);
-
- deviceMeasurementsMapIterator =
filterDeviceMeasurementsMapByPattern().entrySet().iterator();
- measurementDataTypeMap = tsFileSequenceReader.getFullPathDataTypeMap();
+ timeFilterExpression =
+ (startTime == Long.MIN_VALUE && endTime == Long.MAX_VALUE)
+ ? null
+ : BinaryExpression.and(
+ new GlobalTimeExpression(TimeFilter.gtEq(startTime)),
+ new GlobalTimeExpression(TimeFilter.ltEq(endTime)));
+
+ try {
+ tsFileSequenceReader = new
TsFileSequenceReader(tsFile.getAbsolutePath());
+ tsFileReader = new TsFileReader(tsFileSequenceReader);
+
+ deviceMeasurementsMapIterator =
filterDeviceMeasurementsMapByPattern().entrySet().iterator();
+ measurementDataTypeMap = tsFileSequenceReader.getFullPathDataTypeMap();
+ } catch (Exception e) {
+ LOGGER.error("failed to create TsFileInsertionDataContainer", e);
+ close();
+ throw e;
+ }
}
private Map<String, List<String>> filterDeviceMeasurementsMapByPattern()
throws IOException {
@@ -109,19 +128,9 @@ public class TsFileInsertionDataContainer implements
AutoCloseable {
@Override
public boolean hasNext() {
- return (tabletIterator != null && tabletIterator.hasNext())
- || deviceMeasurementsMapIterator.hasNext();
- }
-
- @Override
- public TabletInsertionEvent next() {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
-
while (tabletIterator == null || !tabletIterator.hasNext()) {
if (!deviceMeasurementsMapIterator.hasNext()) {
- throw new NoSuchElementException();
+ return false;
}
final Map.Entry<String, List<String>> entry =
deviceMeasurementsMapIterator.next();
@@ -129,21 +138,32 @@ public class TsFileInsertionDataContainer implements
AutoCloseable {
try {
tabletIterator =
new TsFileInsertionDataTabletIterator(
- tsFileReader, measurementDataTypeMap, entry.getKey(),
entry.getValue());
+ tsFileReader,
+ measurementDataTypeMap,
+ entry.getKey(),
+ entry.getValue(),
+ timeFilterExpression);
} catch (IOException e) {
+ close();
throw new PipeException("failed to create
TsFileInsertionDataTabletIterator", e);
}
}
+ return true;
+ }
+
+ @Override
+ public TabletInsertionEvent next() {
+ if (!hasNext()) {
+ close();
+ throw new NoSuchElementException();
+ }
+
final TabletInsertionEvent next =
new PipeRawTabletInsertionEvent(tabletIterator.next());
if (!hasNext()) {
- try {
- close();
- } catch (Exception e) {
- LOGGER.warn("Failed to close TsFileInsertionDataContainer", e);
- }
+ close();
}
return next;
@@ -152,12 +172,21 @@ public class TsFileInsertionDataContainer implements
AutoCloseable {
}
@Override
- public void close() throws Exception {
- if (tsFileReader != null) {
- tsFileReader.close();
+ public void close() {
+ try {
+ if (tsFileReader != null) {
+ tsFileReader.close();
+ }
+ } catch (IOException e) {
+ LOGGER.warn("Failed to close TsFileReader", e);
}
- if (tsFileSequenceReader != null) {
- tsFileSequenceReader.close();
+
+ try {
+ if (tsFileSequenceReader != null) {
+ tsFileSequenceReader.close();
+ }
+ } catch (IOException e) {
+ LOGGER.warn("Failed to close TsFileSequenceReader", e);
}
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataTabletIterator.java
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataTabletIterator.java
index 8b3b1e14fe5..63b2b15d403 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataTabletIterator.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataTabletIterator.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.tsfile.read.TsFileReader;
import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.QueryExpression;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.write.record.Tablet;
@@ -47,13 +48,16 @@ public class TsFileInsertionDataTabletIterator implements
Iterator<Tablet> {
private final String deviceId;
private final List<String> measurements;
+ private final IExpression timeFilterExpression;
+
private final QueryDataSet queryDataSet;
public TsFileInsertionDataTabletIterator(
TsFileReader tsFileReader,
Map<String, TSDataType> measurementDataTypeMap,
String deviceId,
- List<String> measurements)
+ List<String> measurements,
+ IExpression timeFilterExpression)
throws IOException {
this.tsFileReader = tsFileReader;
this.measurementDataTypeMap = measurementDataTypeMap;
@@ -68,6 +72,8 @@ public class TsFileInsertionDataTabletIterator implements
Iterator<Tablet> {
.sorted()
.collect(Collectors.toList());
+ this.timeFilterExpression = timeFilterExpression;
+
this.queryDataSet = buildQueryDataSet();
}
@@ -76,7 +82,7 @@ public class TsFileInsertionDataTabletIterator implements
Iterator<Tablet> {
for (String measurement : measurements) {
paths.add(new Path(deviceId, measurement, false));
}
- return tsFileReader.query(QueryExpression.create(paths, null));
+ return tsFileReader.query(QueryExpression.create(paths,
timeFilterExpression));
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/processor/PipeDoNothingProcessor.java
b/server/src/main/java/org/apache/iotdb/db/pipe/processor/PipeDoNothingProcessor.java
index a976358611e..c01751ed824 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/processor/PipeDoNothingProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/processor/PipeDoNothingProcessor.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.processor;
import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
+import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.pipe.api.PipeProcessor;
import org.apache.iotdb.pipe.api.collector.EventCollector;
import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
@@ -82,11 +83,11 @@ public class PipeDoNothingProcessor implements
PipeProcessor {
@Override
public void process(TsFileInsertionEvent tsFileInsertionEvent,
EventCollector eventCollector)
throws IOException {
- if (tsFileInsertionEvent instanceof EnrichedEvent) {
- final EnrichedEvent enrichedEvent = (EnrichedEvent) tsFileInsertionEvent;
- if (enrichedEvent
- .getPattern()
- .equals(PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE)) {
+ if (tsFileInsertionEvent instanceof PipeTsFileInsertionEvent) {
+ final PipeTsFileInsertionEvent enrichedEvent =
+ (PipeTsFileInsertionEvent) tsFileInsertionEvent;
+ if
(enrichedEvent.getPattern().equals(PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE)
+ && !enrichedEvent.hasTimeFilter()) {
eventCollector.collect(tsFileInsertionEvent);
} else {
for (final TabletInsertionEvent tabletInsertionEvent :
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
index 715816cfbcc..7ad1bffa2a4 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
@@ -58,7 +58,6 @@ public class PipeTaskBuilder {
pipeStaticMeta.getPipeName(),
dataRegionId,
collectorStage.getEventSupplier(),
- collectorStage.getCollectorPendingQueue(),
pipeStaticMeta.getProcessorParameters(),
connectorStage.getPipeConnectorPendingQueue());
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
index 9e03458bfa0..ca1ec8fda9b 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
@@ -31,28 +31,12 @@ import org.apache.iotdb.pipe.api.PipeCollector;
import org.apache.iotdb.pipe.api.customizer.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.PipeParameters;
import
org.apache.iotdb.pipe.api.customizer.collector.PipeCollectorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeException;
import java.util.HashMap;
public class PipeTaskCollectorStage extends PipeTaskStage {
- private final PipeParameters collectorParameters;
-
- /**
- * TODO: have a better way to control busy/idle status of
PipeTaskCollectorStage.
- *
- * <p>Currently, this field is for IoTDBDataRegionCollector only.
IoTDBDataRegionCollector uses
- * collectorPendingQueue as an internal data structure to store realtime
events.
- *
- * <p>PendingQueue can detect whether the queue is empty or not, and it can
notify the
- * PipeTaskProcessorStage to stop processing data when the queue is empty to
avoid unnecessary
- * processing, and it also can notify the PipeTaskProcessorStage to start
processing data when the
- * queue is not empty.
- */
- private UnboundedBlockingPendingQueue<Event> collectorPendingQueue;
-
private final PipeCollector pipeCollector;
public PipeTaskCollectorStage(
@@ -60,6 +44,8 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
PipeTaskMeta pipeTaskMeta,
long creationTime,
PipeParameters collectorParameters) {
+ PipeParameters localizedCollectorParameters;
+
// TODO: avoid if-else, use reflection to create collector all the time
if (collectorParameters
.getStringOrDefault(
@@ -69,40 +55,43 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
// we want to pass data region id to collector, so we need to create a
new collector
// parameters and put data region id into it. we can't put data region
id into collector
// parameters directly, because the given collector parameters may be
used by other pipe task.
- this.collectorParameters =
+ localizedCollectorParameters =
new PipeParameters(new
HashMap<>(collectorParameters.getAttribute()));
// set data region id to collector parameters, so that collector can get
data region id inside
// collector
- this.collectorParameters
+ localizedCollectorParameters
.getAttribute()
.put(PipeCollectorConstant.DATA_REGION_KEY,
String.valueOf(dataRegionId.getId()));
- collectorPendingQueue = new UnboundedBlockingPendingQueue<>();
this.pipeCollector =
- new IoTDBDataRegionCollector(pipeTaskMeta, creationTime,
collectorPendingQueue);
+ new IoTDBDataRegionCollector(
+ pipeTaskMeta, creationTime, new
UnboundedBlockingPendingQueue<>());
} else {
- this.collectorParameters = collectorParameters;
+ localizedCollectorParameters = collectorParameters;
- this.pipeCollector =
PipeAgent.plugin().reflectCollector(collectorParameters);
+ this.pipeCollector =
PipeAgent.plugin().reflectCollector(localizedCollectorParameters);
}
- }
- @Override
- public void createSubtask() throws PipeException {
+ // validate and customize should be called before createSubtask. this
allows collector exposing
+ // exceptions in advance.
try {
// 1. validate collector parameters
- pipeCollector.validate(new PipeParameterValidator(collectorParameters));
+ pipeCollector.validate(new
PipeParameterValidator(localizedCollectorParameters));
// 2. customize collector
final PipeCollectorRuntimeConfiguration runtimeConfiguration =
new PipeCollectorRuntimeConfiguration();
- pipeCollector.customize(collectorParameters, runtimeConfiguration);
- // TODO: use runtimeConfiguration to configure collector
+ pipeCollector.customize(localizedCollectorParameters,
runtimeConfiguration);
} catch (Exception e) {
throw new PipeException(e.getMessage(), e);
}
}
+ @Override
+ public void createSubtask() throws PipeException {
+ // do nothing
+ }
+
@Override
public void startSubtask() throws PipeException {
try {
@@ -129,8 +118,4 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
public EventSupplier getEventSupplier() {
return pipeCollector::supply;
}
-
- public UnboundedBlockingPendingQueue<Event> getCollectorPendingQueue() {
- return collectorPendingQueue;
- }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index 571b04773a4..02c6576ce94 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.db.pipe.config.PipeProcessorConstant;
import
org.apache.iotdb.db.pipe.execution.executor.PipeProcessorSubtaskExecutor;
import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager;
import org.apache.iotdb.db.pipe.processor.PipeDoNothingProcessor;
-import org.apache.iotdb.db.pipe.task.connection.BlockingPendingQueue;
import org.apache.iotdb.db.pipe.task.connection.BoundedBlockingPendingQueue;
import org.apache.iotdb.db.pipe.task.connection.EventSupplier;
import org.apache.iotdb.db.pipe.task.connection.PipeEventCollector;
@@ -38,8 +37,6 @@ import
org.apache.iotdb.pipe.api.customizer.processor.PipeProcessorRuntimeConfig
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeException;
-import javax.annotation.Nullable;
-
public class PipeTaskProcessorStage extends PipeTaskStage {
protected final PipeProcessorSubtaskExecutor executor =
@@ -49,15 +46,10 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
protected final PipeProcessor pipeProcessor;
protected final PipeProcessorSubtask pipeProcessorSubtask;
- protected final BlockingPendingQueue<Event> pipeCollectorInputPendingQueue;
- protected final BlockingPendingQueue<Event> pipeConnectorOutputPendingQueue;
-
/**
* @param pipeName pipe name
* @param dataRegionId data region id
* @param pipeCollectorInputEventSupplier used to input events from pipe
collector
- * @param pipeCollectorInputPendingQueue used to listen whether pipe
collector event queue is from
- * empty to not empty or from not empty to empty, null means no need to
listen
* @param pipeProcessorParameters used to create pipe processor
* @param pipeConnectorOutputPendingQueue used to output events to pipe
connector
*/
@@ -65,12 +57,10 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
String pipeName,
TConsensusGroupId dataRegionId,
EventSupplier pipeCollectorInputEventSupplier,
- @Nullable BlockingPendingQueue<Event> pipeCollectorInputPendingQueue,
PipeParameters pipeProcessorParameters,
BoundedBlockingPendingQueue<Event> pipeConnectorOutputPendingQueue) {
this.pipeProcessorParameters = pipeProcessorParameters;
- final String taskId = pipeName + "_" + dataRegionId;
pipeProcessor =
pipeProcessorParameters
.getStringOrDefault(
@@ -79,22 +69,8 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
.equals(BuiltinPipePlugin.DO_NOTHING_PROCESSOR.getPipePluginName())
? new PipeDoNothingProcessor()
: PipeAgent.plugin().reflectProcessor(pipeProcessorParameters);
- final PipeEventCollector pipeConnectorOutputEventCollector =
- new PipeEventCollector(pipeConnectorOutputPendingQueue);
-
- this.pipeProcessorSubtask =
- new PipeProcessorSubtask(
- taskId,
- pipeCollectorInputEventSupplier,
- pipeProcessor,
- pipeConnectorOutputEventCollector);
-
- this.pipeCollectorInputPendingQueue = pipeCollectorInputPendingQueue;
- this.pipeConnectorOutputPendingQueue = pipeConnectorOutputPendingQueue;
- }
-
- @Override
- public void createSubtask() throws PipeException {
+ // validate and customize should be called before createSubtask. this
allows collector exposing
+ // exceptions in advance.
try {
// 1. validate processor parameters
pipeProcessor.validate(new
PipeParameterValidator(pipeProcessorParameters));
@@ -103,11 +79,23 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
final PipeProcessorRuntimeConfiguration runtimeConfiguration =
new PipeProcessorRuntimeConfiguration();
pipeProcessor.customize(pipeProcessorParameters, runtimeConfiguration);
- // TODO: use runtimeConfiguration to configure processor
} catch (Exception e) {
throw new PipeException(e.getMessage(), e);
}
+ final String taskId = pipeName + "_" + dataRegionId;
+ final PipeEventCollector pipeConnectorOutputEventCollector =
+ new PipeEventCollector(pipeConnectorOutputPendingQueue);
+ this.pipeProcessorSubtask =
+ new PipeProcessorSubtask(
+ taskId,
+ pipeCollectorInputEventSupplier,
+ pipeProcessor,
+ pipeConnectorOutputEventCollector);
+ }
+
+ @Override
+ public void createSubtask() throws PipeException {
executor.register(pipeProcessorSubtask);
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
b/server/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
index aa0b7506cbf..8b797abcfff 100644
---
a/server/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
@@ -23,6 +23,7 @@ import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tsfile.TsFileInsertionDataContainer;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.TsFileGeneratorUtils;
import org.junit.After;
@@ -45,6 +46,8 @@ public class TsFileInsertionDataContainerTest {
private static final Logger LOGGER =
LoggerFactory.getLogger(TsFileInsertionDataContainerTest.class);
+ private static final long TSFILE_START_TIME = 300L;
+
private File alignedTsFile;
private File nonalignedTsFile;
@@ -68,40 +71,83 @@ public class TsFileInsertionDataContainerTest {
measurementNumbers.add(1);
measurementNumbers.add(2);
- for (int deviceNumber : deviceNumbers) {
- for (int measurementNumber : measurementNumbers) {
- testToTabletInsertionEvents(deviceNumber, measurementNumber, 0);
- testToTabletInsertionEvents(deviceNumber, measurementNumber, 1);
- testToTabletInsertionEvents(deviceNumber, measurementNumber, 2);
+ Set<Pair<Long, Long>> startEndTimes = new HashSet<>();
+ startEndTimes.add(new Pair<>(100L, TSFILE_START_TIME - 1));
+ startEndTimes.add(new Pair<>(100L, TSFILE_START_TIME));
+ startEndTimes.add(new Pair<>(100L, TSFILE_START_TIME + 1));
- testToTabletInsertionEvents(deviceNumber, measurementNumber, 999);
- testToTabletInsertionEvents(deviceNumber, measurementNumber, 1000);
- testToTabletInsertionEvents(deviceNumber, measurementNumber, 1001);
+ startEndTimes.add(new Pair<>(TSFILE_START_TIME - 1, TSFILE_START_TIME -
1));
+ startEndTimes.add(new Pair<>(TSFILE_START_TIME, TSFILE_START_TIME));
+ startEndTimes.add(new Pair<>(TSFILE_START_TIME + 1, TSFILE_START_TIME +
1));
- testToTabletInsertionEvents(deviceNumber, measurementNumber, 999 * 2 +
1);
- testToTabletInsertionEvents(deviceNumber, measurementNumber, 1000);
- testToTabletInsertionEvents(deviceNumber, measurementNumber, 1001 * 2
- 1);
+ startEndTimes.add(new Pair<>(TSFILE_START_TIME + 1, TSFILE_START_TIME +
1));
+ startEndTimes.add(new Pair<>(TSFILE_START_TIME + 1, TSFILE_START_TIME +
10));
+ startEndTimes.add(new Pair<>(TSFILE_START_TIME + 1, TSFILE_START_TIME +
100));
+ startEndTimes.add(new Pair<>(TSFILE_START_TIME + 1, TSFILE_START_TIME +
10000));
- testToTabletInsertionEvents(deviceNumber, measurementNumber, 1023);
- testToTabletInsertionEvents(deviceNumber, measurementNumber, 1024);
- testToTabletInsertionEvents(deviceNumber, measurementNumber, 1025);
+ startEndTimes.add(new Pair<>(TSFILE_START_TIME + 1000000,
TSFILE_START_TIME + 2000000));
- testToTabletInsertionEvents(deviceNumber, measurementNumber, 1023 * 2
+ 1);
- testToTabletInsertionEvents(deviceNumber, measurementNumber, 1024 * 2);
- testToTabletInsertionEvents(deviceNumber, measurementNumber, 1025 * 2
- 1);
+ startEndTimes.add(new Pair<>(Long.MIN_VALUE, Long.MAX_VALUE));
- testToTabletInsertionEvents(deviceNumber, measurementNumber, 10001);
+ for (int deviceNumber : deviceNumbers) {
+ for (int measurementNumber : measurementNumbers) {
+ for (Pair<Long, Long> startEndTime : startEndTimes) {
+ testToTabletInsertionEvents(
+ deviceNumber, measurementNumber, 0, startEndTime.left,
startEndTime.right);
+ testToTabletInsertionEvents(
+ deviceNumber, measurementNumber, 1, startEndTime.left,
startEndTime.right);
+ testToTabletInsertionEvents(
+ deviceNumber, measurementNumber, 2, startEndTime.left,
startEndTime.right);
+
+ testToTabletInsertionEvents(
+ deviceNumber, measurementNumber, 999, startEndTime.left,
startEndTime.right);
+ testToTabletInsertionEvents(
+ deviceNumber, measurementNumber, 1000, startEndTime.left,
startEndTime.right);
+ testToTabletInsertionEvents(
+ deviceNumber, measurementNumber, 1001, startEndTime.left,
startEndTime.right);
+
+ testToTabletInsertionEvents(
+ deviceNumber, measurementNumber, 999 * 2 + 1, startEndTime.left,
startEndTime.right);
+ testToTabletInsertionEvents(
+ deviceNumber, measurementNumber, 1000, startEndTime.left,
startEndTime.right);
+ testToTabletInsertionEvents(
+ deviceNumber, measurementNumber, 1001 * 2 - 1,
startEndTime.left, startEndTime.right);
+
+ testToTabletInsertionEvents(
+ deviceNumber, measurementNumber, 1023, startEndTime.left,
startEndTime.right);
+ testToTabletInsertionEvents(
+ deviceNumber, measurementNumber, 1024, startEndTime.left,
startEndTime.right);
+ testToTabletInsertionEvents(
+ deviceNumber, measurementNumber, 1025, startEndTime.left,
startEndTime.right);
+
+ testToTabletInsertionEvents(
+ deviceNumber, measurementNumber, 1023 * 2 + 1,
startEndTime.left, startEndTime.right);
+ testToTabletInsertionEvents(
+ deviceNumber, measurementNumber, 1024 * 2, startEndTime.left,
startEndTime.right);
+ testToTabletInsertionEvents(
+ deviceNumber, measurementNumber, 1025 * 2 - 1,
startEndTime.left, startEndTime.right);
+
+ testToTabletInsertionEvents(
+ deviceNumber, measurementNumber, 10001, startEndTime.left,
startEndTime.right);
+ }
}
}
}
private void testToTabletInsertionEvents(
- int deviceNumber, int measurementNumber, int rowNumberInOneDevice)
throws Exception {
+ int deviceNumber,
+ int measurementNumber,
+ int rowNumberInOneDevice,
+ long startTime,
+ long endTime)
+ throws Exception {
LOGGER.info(
- "testToTabletInsertionEvents: deviceNumber = {}, measurementNumber =
{}, rowNumberInOneDevice = {}",
+ "testToTabletInsertionEvents: deviceNumber: {}, measurementNumber: {},
rowNumberInOneDevice: {}, startTime: {}, endTime: {}",
deviceNumber,
measurementNumber,
- rowNumberInOneDevice);
+ rowNumberInOneDevice,
+ startTime,
+ endTime);
alignedTsFile =
TsFileGeneratorUtils.generateAlignedTsFile(
@@ -109,7 +155,7 @@ public class TsFileInsertionDataContainerTest {
deviceNumber,
measurementNumber,
rowNumberInOneDevice,
- 300,
+ (int) TSFILE_START_TIME,
10000,
700,
50);
@@ -119,15 +165,36 @@ public class TsFileInsertionDataContainerTest {
deviceNumber,
measurementNumber,
rowNumberInOneDevice,
- 300,
+ (int) TSFILE_START_TIME,
10000,
700,
50);
+ final int tsfileEndTime = (int) TSFILE_START_TIME + rowNumberInOneDevice -
1;
+
+ int expectedRowNumber = rowNumberInOneDevice;
+ Assert.assertTrue(startTime <= endTime);
+ if (startTime != Long.MIN_VALUE && endTime != Long.MAX_VALUE) {
+ if (startTime < TSFILE_START_TIME) {
+ if (endTime < TSFILE_START_TIME) {
+ expectedRowNumber = 0;
+ } else {
+ expectedRowNumber =
+ Math.min((int) (endTime - TSFILE_START_TIME + 1),
rowNumberInOneDevice);
+ }
+ } else if (tsfileEndTime < startTime) {
+ expectedRowNumber = 0;
+ } else {
+ expectedRowNumber =
+ Math.min(
+ (int) (Math.min(endTime, tsfileEndTime) - startTime + 1),
rowNumberInOneDevice);
+ }
+ }
+
try (final TsFileInsertionDataContainer alignedContainer =
- new TsFileInsertionDataContainer(alignedTsFile, "root");
+ new TsFileInsertionDataContainer(alignedTsFile, "root", startTime,
endTime);
final TsFileInsertionDataContainer nonalignedContainer =
- new TsFileInsertionDataContainer(nonalignedTsFile, "root"); ) {
+ new TsFileInsertionDataContainer(nonalignedTsFile, "root",
startTime, endTime); ) {
AtomicInteger count1 = new AtomicInteger(0);
AtomicInteger count2 = new AtomicInteger(0);
AtomicInteger count3 = new AtomicInteger(0);
@@ -178,9 +245,9 @@ public class TsFileInsertionDataContainerTest {
});
}))));
- Assert.assertEquals(count1.getAndSet(0), deviceNumber *
rowNumberInOneDevice);
- Assert.assertEquals(count2.getAndSet(0), deviceNumber *
rowNumberInOneDevice);
- Assert.assertEquals(count3.getAndSet(0), deviceNumber *
rowNumberInOneDevice);
+ Assert.assertEquals(count1.getAndSet(0), deviceNumber *
expectedRowNumber);
+ Assert.assertEquals(count2.getAndSet(0), deviceNumber *
expectedRowNumber);
+ Assert.assertEquals(count3.getAndSet(0), deviceNumber *
expectedRowNumber);
nonalignedContainer
.toTabletInsertionEvents()
@@ -228,9 +295,9 @@ public class TsFileInsertionDataContainerTest {
}
}))));
- Assert.assertEquals(count1.get(), deviceNumber * rowNumberInOneDevice);
- Assert.assertEquals(count2.get(), deviceNumber * rowNumberInOneDevice);
- Assert.assertEquals(count3.get(), deviceNumber * rowNumberInOneDevice);
+ Assert.assertEquals(count1.get(), deviceNumber * expectedRowNumber);
+ Assert.assertEquals(count2.get(), deviceNumber * expectedRowNumber);
+ Assert.assertEquals(count3.get(), deviceNumber * expectedRowNumber);
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
@@ -272,10 +339,11 @@ public class TsFileInsertionDataContainerTest {
}
try (final TsFileInsertionDataContainer alignedContainer =
- new TsFileInsertionDataContainer(alignedTsFile,
oneDeviceInAlignedTsFile.get());
+ new TsFileInsertionDataContainer(
+ alignedTsFile, oneDeviceInAlignedTsFile.get(), startTime,
endTime);
final TsFileInsertionDataContainer nonalignedContainer =
new TsFileInsertionDataContainer(
- nonalignedTsFile, oneDeviceInUnalignedTsFile.get()); ) {
+ nonalignedTsFile, oneDeviceInUnalignedTsFile.get(), startTime,
endTime); ) {
AtomicInteger count1 = new AtomicInteger(0);
AtomicInteger count2 = new AtomicInteger(0);
AtomicInteger count3 = new AtomicInteger(0);
@@ -326,9 +394,9 @@ public class TsFileInsertionDataContainerTest {
});
}))));
- Assert.assertEquals(count1.getAndSet(0), rowNumberInOneDevice);
- Assert.assertEquals(count2.getAndSet(0), rowNumberInOneDevice);
- Assert.assertEquals(count3.getAndSet(0), rowNumberInOneDevice);
+ Assert.assertEquals(count1.getAndSet(0), expectedRowNumber);
+ Assert.assertEquals(count2.getAndSet(0), expectedRowNumber);
+ Assert.assertEquals(count3.getAndSet(0), expectedRowNumber);
nonalignedContainer
.toTabletInsertionEvents()
@@ -376,19 +444,20 @@ public class TsFileInsertionDataContainerTest {
}
}))));
- Assert.assertEquals(count1.get(), rowNumberInOneDevice);
- Assert.assertEquals(count2.get(), rowNumberInOneDevice);
- Assert.assertEquals(count3.get(), rowNumberInOneDevice);
+ Assert.assertEquals(count1.get(), expectedRowNumber);
+ Assert.assertEquals(count2.get(), expectedRowNumber);
+ Assert.assertEquals(count3.get(), expectedRowNumber);
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
try (final TsFileInsertionDataContainer alignedContainer =
- new TsFileInsertionDataContainer(alignedTsFile,
oneMeasurementInAlignedTsFile.get());
+ new TsFileInsertionDataContainer(
+ alignedTsFile, oneMeasurementInAlignedTsFile.get(), startTime,
endTime);
final TsFileInsertionDataContainer nonalignedContainer =
new TsFileInsertionDataContainer(
- nonalignedTsFile, oneMeasurementInUnalignedTsFile.get()); ) {
+ nonalignedTsFile, oneMeasurementInUnalignedTsFile.get(),
startTime, endTime); ) {
AtomicInteger count1 = new AtomicInteger(0);
AtomicInteger count2 = new AtomicInteger(0);
AtomicInteger count3 = new AtomicInteger(0);
@@ -438,9 +507,9 @@ public class TsFileInsertionDataContainerTest {
});
}))));
- Assert.assertEquals(count1.getAndSet(0), rowNumberInOneDevice);
- Assert.assertEquals(count2.getAndSet(0), rowNumberInOneDevice);
- Assert.assertEquals(count3.getAndSet(0), rowNumberInOneDevice);
+ Assert.assertEquals(count1.getAndSet(0), expectedRowNumber);
+ Assert.assertEquals(count2.getAndSet(0), expectedRowNumber);
+ Assert.assertEquals(count3.getAndSet(0), expectedRowNumber);
nonalignedContainer
.toTabletInsertionEvents()
@@ -487,18 +556,20 @@ public class TsFileInsertionDataContainerTest {
}
}))));
- Assert.assertEquals(count1.get(), rowNumberInOneDevice);
- Assert.assertEquals(count2.get(), rowNumberInOneDevice);
- Assert.assertEquals(count3.get(), rowNumberInOneDevice);
+ Assert.assertEquals(count1.get(), expectedRowNumber);
+ Assert.assertEquals(count2.get(), expectedRowNumber);
+ Assert.assertEquals(count3.get(), expectedRowNumber);
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
try (final TsFileInsertionDataContainer alignedContainer =
- new TsFileInsertionDataContainer(alignedTsFile,
"not-exist-pattern");
+ new TsFileInsertionDataContainer(
+ alignedTsFile, "not-exist-pattern", startTime, endTime);
final TsFileInsertionDataContainer nonalignedContainer =
- new TsFileInsertionDataContainer(nonalignedTsFile,
"not-exist-pattern"); ) {
+ new TsFileInsertionDataContainer(
+ nonalignedTsFile, "not-exist-pattern", startTime, endTime); ) {
AtomicInteger count1 = new AtomicInteger(0);
AtomicInteger count2 = new AtomicInteger(0);
AtomicInteger count3 = new AtomicInteger(0);