This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 76702b0e2fa Pipe: Pattern parsing pruning: When pattern is at the
level below database, the parsing logic can be skipped if tsfiles / tablets
completely match with the pattern (#12049)
76702b0e2fa is described below
commit 76702b0e2fadff7f919657efdd2c933d53fd6890
Author: Caideyipi <[email protected]>
AuthorDate: Tue Feb 20 16:42:28 2024 +0800
Pipe: Pattern parsing pruning: When pattern is at the level below database,
the parsing logic can be skipped if tsfiles / tablets completely match with the
pattern (#12049)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../apache/iotdb/db/pipe/event/EnrichedEvent.java | 8 +++++--
.../tablet/PipeInsertNodeTabletInsertionEvent.java | 7 ++++++
.../common/tablet/PipeRawTabletInsertionEvent.java | 4 ++--
.../common/tsfile/PipeTsFileInsertionEvent.java | 27 +++++++++++++++++++++-
.../tsfile/TsFileInsertionDataContainer.java | 16 +++++++++++++
.../pipe/task/connection/PipeEventCollector.java | 6 ++---
6 files changed, 60 insertions(+), 8 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
index 6b83e0023d8..34b337a8c98 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
@@ -208,8 +208,12 @@ public abstract class EnrichedEvent implements Event {
isTimeParsed = true;
}
- public boolean shouldParsePatternOrTime() {
- return !isPatternParsed || !isTimeParsed;
+ public boolean shouldParseTimeOrPattern() {
+ return shouldParseTime() || shouldParsePattern();
+ }
+
+ public boolean shouldParsePattern() {
+ return !isPatternParsed;
}
public boolean shouldParseTime() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 652cda59593..c2e0f07abd3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -240,6 +240,13 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
/////////////////////////// parsePatternOrTime ///////////////////////////
+ @Override
+ public boolean shouldParsePattern() {
+ final InsertNode node = getInsertNodeViaCacheIfPossible();
+ return super.shouldParsePattern()
+ && (Objects.isNull(node) ||
!node.getDevicePath().getFullPath().startsWith(pattern));
+ }
+
public PipeRawTabletInsertionEvent parseEventWithPatternOrTime() {
return new PipeRawTabletInsertionEvent(
convertToTablet(), isAligned, pipeName, pipeTaskMeta, this, true);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 1b843265515..5593265c40a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -186,7 +186,7 @@ public class PipeRawTabletInsertionEvent extends
EnrichedEvent implements Tablet
}
public Tablet convertToTablet() {
- if (!shouldParsePatternOrTime()) {
+ if (!shouldParseTimeOrPattern()) {
return tablet;
}
@@ -206,7 +206,7 @@ public class PipeRawTabletInsertionEvent extends
EnrichedEvent implements Tablet
}
public boolean hasNoNeedParsingAndIsEmpty() {
- return !shouldParsePatternOrTime() && tablet.rowSize == 0;
+ return !shouldParseTimeOrPattern() && tablet.rowSize == 0;
}
/////////////////////////// Object ///////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index ee2f442e104..532ca5a1a7e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -181,8 +181,33 @@ public class PipeTsFileInsertionEvent extends
EnrichedEvent implements TsFileIns
/////////////////////////// TsFileInsertionEvent ///////////////////////////
+ @Override
+ public boolean shouldParseTimeOrPattern() {
+ boolean shouldParseTimeOrPattern = false;
+ try {
+ shouldParseTimeOrPattern = super.shouldParseTimeOrPattern();
+ return shouldParseTimeOrPattern;
+ } finally {
+ // Super method will call shouldParsePattern() and then init
dataContainer at
+ // shouldParsePattern(). If shouldParsePattern() returns false,
dataContainer will
+ // not be used, so we need to close the resource here.
+ if (!shouldParseTimeOrPattern) {
+ close();
+ }
+ }
+ }
+
+ @Override
+ public boolean shouldParsePattern() {
+ return super.shouldParsePattern() &&
initDataContainer().shouldParsePattern();
+ }
+
@Override
public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
+ return initDataContainer().toTabletInsertionEvents();
+ }
+
+ private TsFileInsertionDataContainer initDataContainer() {
try {
if (dataContainer == null) {
waitForTsFileClose();
@@ -190,7 +215,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
new TsFileInsertionDataContainer(
tsFile, getPattern(), startTime, endTime, pipeTaskMeta, this);
}
- return dataContainer.toTabletInsertionEvents();
+ return dataContainer;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
close();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
index dbb3125183a..b0f944ac892 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
@@ -72,6 +72,8 @@ public class TsFileInsertionDataContainer implements
AutoCloseable {
private final Map<String, Boolean> deviceIsAlignedMap;
private final Map<String, TSDataType> measurementDataTypeMap;
+ private boolean shouldParsePattern = false;
+
public TsFileInsertionDataContainer(File tsFile, String pattern, long
startTime, long endTime)
throws IOException {
this(tsFile, pattern, startTime, endTime, null, null);
@@ -161,6 +163,9 @@ public class TsFileInsertionDataContainer implements
AutoCloseable {
// high cost check comes later
&& pattern.endsWith(TsFileConstant.PATH_SEPARATOR +
measurement)) {
filteredMeasurements.add(measurement);
+ } else {
+ // Parse pattern iff there are measurements filtered out
+ shouldParsePattern = true;
}
}
@@ -168,6 +173,13 @@ public class TsFileInsertionDataContainer implements
AutoCloseable {
filteredDeviceMeasurementsMap.put(deviceId, filteredMeasurements);
}
}
+
+ // case 3: for example, pattern is root.a.b.c and device is root.a.b.d
+ // in this case, no data can be matched
+ else {
+ // Parse pattern iff there are measurements filtered out
+ shouldParsePattern = true;
+ }
}
return filteredDeviceMeasurementsMap;
@@ -254,6 +266,10 @@ public class TsFileInsertionDataContainer implements
AutoCloseable {
};
}
+ public boolean shouldParsePattern() {
+ return shouldParsePattern;
+ }
+
@Override
public void close() {
try {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
index 9ee6ff3f836..b16ecf657b4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
@@ -78,7 +78,7 @@ public class PipeEventCollector implements EventCollector,
AutoCloseable {
}
private void parseAndCollectEvent(PipeInsertNodeTabletInsertionEvent
sourceEvent) {
- if (sourceEvent.shouldParsePatternOrTime()) {
+ if (sourceEvent.shouldParseTimeOrPattern()) {
final PipeRawTabletInsertionEvent parsedEvent =
sourceEvent.parseEventWithPatternOrTime();
if (!parsedEvent.hasNoNeedParsingAndIsEmpty()) {
collectEvent(parsedEvent);
@@ -89,7 +89,7 @@ public class PipeEventCollector implements EventCollector,
AutoCloseable {
}
private void parseAndCollectEvent(PipeRawTabletInsertionEvent sourceEvent) {
- if (sourceEvent.shouldParsePatternOrTime()) {
+ if (sourceEvent.shouldParseTimeOrPattern()) {
final PipeRawTabletInsertionEvent parsedEvent =
sourceEvent.parseEventWithPatternOrTime();
if (!parsedEvent.hasNoNeedParsingAndIsEmpty()) {
collectEvent(parsedEvent);
@@ -107,7 +107,7 @@ public class PipeEventCollector implements EventCollector,
AutoCloseable {
return;
}
- if (!sourceEvent.shouldParsePatternOrTime()) {
+ if (!sourceEvent.shouldParseTimeOrPattern()) {
collectEvent(sourceEvent);
return;
}