This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch ty/TableModelGrammar
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ty/TableModelGrammar by this
push:
new 57d6953e24a Pipe: Replaced IDeviceID in pipe module (#12556)
57d6953e24a is described below
commit 57d6953e24ae4ed3f3a778be3ddcbf9186a4c950
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 30 14:44:29 2024 +0800
Pipe: Replaced IDeviceID in pipe module (#12556)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../tablet/PipeInsertNodeTabletInsertionEvent.java | 6 ++---
.../tablet/TabletInsertionDataContainer.java | 28 +++++++++++++++++-----
.../tsfile/TsFileInsertionDataContainer.java | 9 ++++---
.../tsfile/TsFileInsertionDataTabletIterator.java | 11 ++++++---
.../common/tsfile/TsFileInsertionPointCounter.java | 8 +++----
.../db/pipe/event/realtime/PipeRealtimeEvent.java | 10 ++++----
.../realtime/epoch/TsFileEpochManager.java | 10 ++++----
.../schemaregion/PipePlanPatternParseVisitor.java | 22 ++++++++++-------
.../pipe/pattern/CachedSchemaPatternMatcher.java | 9 +++----
.../visitor/PipeStatementPatternParseVisitor.java | 9 ++++---
.../pattern/CachedSchemaPatternMatcherTest.java | 21 ++++++++++------
.../db/pipe/pattern/IoTDBPipePatternTest.java | 4 +++-
.../db/pipe/pattern/PrefixPipePatternTest.java | 4 +++-
.../commons/pipe/pattern/IoTDBPipePattern.java | 23 +++++++++++-------
.../iotdb/commons/pipe/pattern/PipePattern.java | 9 +++----
.../commons/pipe/pattern/PrefixPipePattern.java | 22 ++++++++++-------
16 files changed, 127 insertions(+), 78 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 11711fa1d03..c4fd1a133c2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -322,10 +322,8 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
? ((InsertRowsNode) node)
.getInsertRowNodeList().stream()
.anyMatch(
- insertRowNode ->
- !pipePattern.coversDevice(
-
insertRowNode.getDevicePath().getFullPath()))
- :
!pipePattern.coversDevice(node.getDevicePath().getFullPath())));
+ insertRowNode ->
!pipePattern.coversDevice(insertRowNode.getDeviceID()))
+ : !pipePattern.coversDevice(node.getDeviceID())));
}
public List<PipeRawTabletInsertionEvent> toRawTabletInsertionEvents() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
index ae17883a487..c7a563bcb05 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
@@ -33,6 +33,8 @@ import org.apache.iotdb.pipe.api.collector.RowCollector;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.StringArrayDeviceID;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.write.UnSupportedDataTypeException;
@@ -62,7 +64,9 @@ public class TabletInsertionDataContainer {
private final EnrichedEvent
sourceEvent; // used to report progress and filter value columns by time
range
- private String deviceId;
+ // Used to preserve performance
+ private String deviceStr;
+ private IDeviceID deviceId;
private boolean isAligned;
private IMeasurementSchema[] measurementSchemaList;
private String[] columnNameStringList;
@@ -138,7 +142,9 @@ public class TabletInsertionDataContainer {
final int originColumnSize = insertRowNode.getMeasurements().length;
final Integer[] originColumnIndex2FilteredColumnIndexMapperList = new
Integer[originColumnSize];
- this.deviceId = insertRowNode.getDevicePath().getFullPath();
+ // The full path is always cached when device path is deserialized
+ this.deviceStr = insertRowNode.getDevicePath().getFullPath();
+ this.deviceId = insertRowNode.getDeviceID();
this.isAligned = insertRowNode.isAligned();
final long[] originTimestampColumn = new long[] {insertRowNode.getTime()};
@@ -205,7 +211,9 @@ public class TabletInsertionDataContainer {
final int originColumnSize = insertTabletNode.getMeasurements().length;
final Integer[] originColumnIndex2FilteredColumnIndexMapperList = new
Integer[originColumnSize];
- this.deviceId = insertTabletNode.getDevicePath().getFullPath();
+ // The full path is always cached when device path is deserialized
+ this.deviceStr = insertTabletNode.getDevicePath().getFullPath();
+ this.deviceId = insertTabletNode.getDeviceID();
this.isAligned = insertTabletNode.isAligned();
final long[] originTimestampColumn = insertTabletNode.getTimes();
@@ -288,7 +296,9 @@ public class TabletInsertionDataContainer {
final int originColumnSize = tablet.getSchemas().size();
final Integer[] originColumnIndex2FilteredColumnIndexMapperList = new
Integer[originColumnSize];
- this.deviceId = tablet.getDeviceId();
+ // Only support tree-model tablet
+ this.deviceStr = tablet.getDeviceId();
+ this.deviceId = new StringArrayDeviceID(tablet.getDeviceId());
this.isAligned = isAligned;
final long[] originTimestampColumn =
@@ -567,9 +577,10 @@ public class TabletInsertionDataContainer {
final PipeRowCollector rowCollector = new PipeRowCollector(pipeTaskMeta,
sourceEvent);
for (int i = 0; i < rowCount; i++) {
consumer.accept(
+ // Used for tree model
new PipeRow(
i,
- deviceId,
+ getDeviceStr(),
isAligned,
measurementSchemaList,
timestampColumn,
@@ -595,7 +606,8 @@ public class TabletInsertionDataContainer {
return tablet;
}
- final Tablet newTablet = new Tablet(deviceId,
Arrays.asList(measurementSchemaList), rowCount);
+ final Tablet newTablet =
+ new Tablet(getDeviceStr(), Arrays.asList(measurementSchemaList),
rowCount);
newTablet.timestamps = timestampColumn;
newTablet.bitMaps = nullValueColumnBitmaps;
newTablet.values = valueColumns;
@@ -605,4 +617,8 @@ public class TabletInsertionDataContainer {
return tablet;
}
+
+ private String getDeviceStr() {
+ return Objects.nonNull(deviceStr) ? deviceStr : deviceId.toString();
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
index 1eccb7c03be..ee8c4e62bb8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
@@ -146,11 +146,10 @@ public class TsFileInsertionDataContainer implements
AutoCloseable {
final Map<IDeviceID, List<String>> filteredDeviceMeasurementsMap = new
HashMap<>();
for (Map.Entry<IDeviceID, List<String>> entry :
originalDeviceMeasurementsMap.entrySet()) {
final IDeviceID deviceId = entry.getKey();
- String deviceStr = deviceId.toString();
// case 1: for example, pattern is root.a.b or pattern is null and
device is root.a.b.c
// in this case, all data can be matched without checking the
measurements
- if (Objects.isNull(pattern) || pattern.isRoot() ||
pattern.coversDevice(deviceStr)) {
+ if (Objects.isNull(pattern) || pattern.isRoot() ||
pattern.coversDevice(deviceId)) {
if (!entry.getValue().isEmpty()) {
filteredDeviceMeasurementsMap.put(deviceId, entry.getValue());
}
@@ -158,11 +157,11 @@ public class TsFileInsertionDataContainer implements
AutoCloseable {
// case 2: for example, pattern is root.a.b.c and device is root.a.b
// in this case, we need to check the full path
- else if (pattern.mayOverlapWithDevice(deviceStr)) {
+ else if (pattern.mayOverlapWithDevice(deviceId)) {
final List<String> filteredMeasurements = new ArrayList<>();
for (final String measurement : entry.getValue()) {
- if (pattern.matchesMeasurement(deviceStr, measurement)) {
+ if (pattern.matchesMeasurement(deviceId, measurement)) {
filteredMeasurements.add(measurement);
} else {
// Parse pattern iff there are measurements filtered out
@@ -221,7 +220,7 @@ public class TsFileInsertionDataContainer implements
AutoCloseable {
new TsFileInsertionDataTabletIterator(
tsFileReader,
measurementDataTypeMap,
- entry.getKey().toString(),
+ entry.getKey(),
entry.getValue(),
timeFilterExpression);
} catch (final IOException e) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataTabletIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataTabletIterator.java
index f64b1fb353a..05b34ad026f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataTabletIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataTabletIterator.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.tsfile.common.constant.TsFileConstant;
import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.read.TsFileReader;
import org.apache.tsfile.read.common.Field;
import org.apache.tsfile.read.common.Path;
@@ -48,7 +49,7 @@ public class TsFileInsertionDataTabletIterator implements
Iterator<Tablet> {
private final TsFileReader tsFileReader;
private final Map<String, TSDataType> measurementDataTypeMap;
- private final String deviceId;
+ private final IDeviceID deviceId;
private final List<String> measurements;
private final IExpression timeFilterExpression;
@@ -58,7 +59,7 @@ public class TsFileInsertionDataTabletIterator implements
Iterator<Tablet> {
public TsFileInsertionDataTabletIterator(
TsFileReader tsFileReader,
Map<String, TSDataType> measurementDataTypeMap,
- String deviceId,
+ IDeviceID deviceId,
List<String> measurements,
IExpression timeFilterExpression)
throws IOException {
@@ -118,7 +119,11 @@ public class TsFileInsertionDataTabletIterator implements
Iterator<Tablet> {
schemas.add(new MeasurementSchema(measurement, dataType));
}
final Tablet tablet =
- new Tablet(deviceId, schemas,
PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
+ new Tablet(
+ // Used for tree model
+ deviceId.toString(),
+ schemas,
+ PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
tablet.initBitMaps();
while (queryDataSet.hasNext()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionPointCounter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionPointCounter.java
index f76c220dd2b..4f61e82824c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionPointCounter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionPointCounter.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.db.pipe.event.common.tsfile;
import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import org.apache.tsfile.file.metadata.IDeviceID;
-import org.apache.tsfile.file.metadata.PlainDeviceID;
import org.apache.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.tsfile.read.TsFileSequenceReader;
import org.slf4j.Logger;
@@ -83,14 +82,13 @@ public class TsFileInsertionPointCounter implements
AutoCloseable {
for (final Map.Entry<IDeviceID, List<String>> entry :
originalDeviceMeasurementsMap.entrySet()) {
- final String deviceId = ((PlainDeviceID) entry.getKey()).toStringID();
+ final IDeviceID deviceId = entry.getKey();
// case 1: for example, pattern is root.a.b or pattern is null and
device is root.a.b.c
// in this case, all data can be matched without checking the
measurements
if (Objects.isNull(pattern) || pattern.isRoot() ||
pattern.coversDevice(deviceId)) {
if (!entry.getValue().isEmpty()) {
- filteredDeviceMeasurementsMap.put(
- new PlainDeviceID(deviceId), new HashSet<>(entry.getValue()));
+ filteredDeviceMeasurementsMap.put(deviceId, new
HashSet<>(entry.getValue()));
}
}
@@ -109,7 +107,7 @@ public class TsFileInsertionPointCounter implements
AutoCloseable {
}
if (!filteredMeasurements.isEmpty()) {
- filteredDeviceMeasurementsMap.put(new PlainDeviceID(deviceId),
filteredMeasurements);
+ filteredDeviceMeasurementsMap.put(deviceId, filteredMeasurements);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
index 77df1f0292b..0717dc4d5c3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.epoch.TsFileEpoch;
+import org.apache.tsfile.file.metadata.IDeviceID;
+
import java.util.Map;
/**
@@ -37,12 +39,12 @@ public class PipeRealtimeEvent extends EnrichedEvent {
private final EnrichedEvent event;
private final TsFileEpoch tsFileEpoch;
- private Map<String, String[]> device2Measurements;
+ private Map<IDeviceID, String[]> device2Measurements;
public PipeRealtimeEvent(
final EnrichedEvent event,
final TsFileEpoch tsFileEpoch,
- final Map<String, String[]> device2Measurements,
+ final Map<IDeviceID, String[]> device2Measurements,
final PipePattern pattern) {
this(event, tsFileEpoch, device2Measurements, null, pattern,
Long.MIN_VALUE, Long.MAX_VALUE);
}
@@ -50,7 +52,7 @@ public class PipeRealtimeEvent extends EnrichedEvent {
public PipeRealtimeEvent(
final EnrichedEvent event,
final TsFileEpoch tsFileEpoch,
- final Map<String, String[]> device2Measurements,
+ final Map<IDeviceID, String[]> device2Measurements,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pattern,
final long startTime,
@@ -73,7 +75,7 @@ public class PipeRealtimeEvent extends EnrichedEvent {
return tsFileEpoch;
}
- public Map<String, String[]> getSchemaInfo() {
+ public Map<IDeviceID, String[]> getSchemaInfo() {
return device2Measurements;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpochManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpochManager.java
index e0bfabb0c63..e4244a1335e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpochManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpochManager.java
@@ -26,6 +26,8 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import com.google.common.base.Functions;
+import org.apache.tsfile.file.metadata.IDeviceID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,7 +70,7 @@ public class TsFileEpochManager {
event,
epoch,
resource.getDevices().stream()
- .collect(Collectors.toMap(Object::toString, device ->
EMPTY_MEASUREMENT_ARRAY)),
+ .collect(Collectors.toMap(Functions.identity(), device ->
EMPTY_MEASUREMENT_ARRAY)),
event.getPipePattern());
}
@@ -82,16 +84,16 @@ public class TsFileEpochManager {
epoch,
node instanceof InsertRowsNode
? getDevice2MeasurementsMapFromInsertRowsNode((InsertRowsNode)
node)
- : Collections.singletonMap(node.getDevicePath().getFullPath(),
node.getMeasurements()),
+ : Collections.singletonMap(node.getDeviceID(),
node.getMeasurements()),
event.getPipePattern());
}
- private Map<String, String[]> getDevice2MeasurementsMapFromInsertRowsNode(
+ private Map<IDeviceID, String[]> getDevice2MeasurementsMapFromInsertRowsNode(
InsertRowsNode insertRowsNode) {
return insertRowsNode.getInsertRowNodeList().stream()
.collect(
Collectors.toMap(
- insertRowNode -> insertRowNode.getDevicePath().getFullPath(),
+ InsertNode::getDeviceID,
InsertNode::getMeasurements,
(oldMeasurements, newMeasurements) ->
Stream.of(Arrays.asList(oldMeasurements),
Arrays.asList(newMeasurements))
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/PipePlanPatternParseVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/PipePlanPatternParseVisitor.java
index 79dbf04ba42..472b1de1073 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/PipePlanPatternParseVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/PipePlanPatternParseVisitor.java
@@ -39,6 +39,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.vie
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.CreateLogicalViewNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
+import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.utils.Pair;
import java.util.Arrays;
@@ -74,7 +75,7 @@ public class PipePlanPatternParseVisitor extends
PlanVisitor<Optional<PlanNode>,
public Optional<PlanNode> visitCreateTimeSeries(
final CreateTimeSeriesNode node, final IoTDBPipePattern pattern) {
return pattern.matchesMeasurement(
- node.getPath().getDeviceString(), node.getPath().getMeasurement())
+ node.getPath().getIDeviceID(), node.getPath().getMeasurement())
? Optional.of(node)
: Optional.empty();
}
@@ -87,7 +88,8 @@ public class PipePlanPatternParseVisitor extends
PlanVisitor<Optional<PlanNode>,
.filter(
index ->
pattern.matchesMeasurement(
- node.getDevicePath().getFullPath(),
node.getMeasurements().get(index)))
+ node.getDevicePath().getIDeviceIDAsFullDevice(),
+ node.getMeasurements().get(index)))
.toArray();
return filteredIndexes.length > 0
? Optional.of(
@@ -115,7 +117,7 @@ public class PipePlanPatternParseVisitor extends
PlanVisitor<Optional<PlanNode>,
new Pair<>(
entry.getKey(),
trimMeasurementGroup(
- entry.getKey().getFullPath(), entry.getValue(),
pattern)))
+ entry.getKey().getIDeviceIDAsFullDevice(),
entry.getValue(), pattern)))
.filter(pair -> Objects.nonNull(pair.getRight()))
.collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
return !filteredMeasurementGroupMap.isEmpty()
@@ -125,7 +127,7 @@ public class PipePlanPatternParseVisitor extends
PlanVisitor<Optional<PlanNode>,
}
private static MeasurementGroup trimMeasurementGroup(
- final String device, final MeasurementGroup group, final
IoTDBPipePattern pattern) {
+ final IDeviceID device, final MeasurementGroup group, final
IoTDBPipePattern pattern) {
final int[] filteredIndexes =
IntStream.range(0, group.size())
.filter(index -> pattern.matchesMeasurement(device,
group.getMeasurements().get(index)))
@@ -154,7 +156,7 @@ public class PipePlanPatternParseVisitor extends
PlanVisitor<Optional<PlanNode>,
public Optional<PlanNode> visitAlterTimeSeries(
final AlterTimeSeriesNode node, final IoTDBPipePattern pattern) {
return pattern.matchesMeasurement(
- node.getPath().getDeviceString(), node.getPath().getMeasurement())
+ node.getPath().getIDeviceID(), node.getPath().getMeasurement())
? Optional.of(node)
: Optional.empty();
}
@@ -165,7 +167,9 @@ public class PipePlanPatternParseVisitor extends
PlanVisitor<Optional<PlanNode>,
final MeasurementGroup group =
pattern.matchPrefixPath(node.getDevicePath().getFullPath())
? trimMeasurementGroup(
- node.getDevicePath().getFullPath(),
node.getMeasurementGroup(), pattern)
+ node.getDevicePath().getIDeviceIDAsFullDevice(),
+ node.getMeasurementGroup(),
+ pattern)
: null;
return Objects.nonNull(group)
? Optional.of(
@@ -209,7 +213,7 @@ public class PipePlanPatternParseVisitor extends
PlanVisitor<Optional<PlanNode>,
new Pair<>(
entry.getValue().getLeft(),
trimMeasurementGroup(
- entry.getKey().getFullPath(),
+ entry.getKey().getIDeviceIDAsFullDevice(),
entry.getValue().getRight(),
pattern))))
.filter(pair -> Objects.nonNull(pair.getRight().getRight()))
@@ -241,7 +245,7 @@ public class PipePlanPatternParseVisitor extends
PlanVisitor<Optional<PlanNode>,
.filter(
entry ->
pattern.matchesMeasurement(
- entry.getKey().getDeviceString(),
entry.getKey().getMeasurement()))
+ entry.getKey().getIDeviceID(),
entry.getKey().getMeasurement()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
return !filteredViewPathToSourceMap.isEmpty()
? Optional.of(new CreateLogicalViewNode(node.getPlanNodeId(),
filteredViewPathToSourceMap))
@@ -256,7 +260,7 @@ public class PipePlanPatternParseVisitor extends
PlanVisitor<Optional<PlanNode>,
.filter(
entry ->
pattern.matchesMeasurement(
- entry.getKey().getDeviceString(),
entry.getKey().getMeasurement()))
+ entry.getKey().getIDeviceID(),
entry.getKey().getMeasurement()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
return !filteredViewPathToSourceMap.isEmpty()
? Optional.of(new AlterLogicalViewNode(node.getPlanNodeId(),
filteredViewPathToSourceMap))
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcher.java
index 4be44a0fecc..546d8185c3a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcher.java
@@ -28,6 +28,7 @@ import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRe
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.tsfile.file.metadata.IDeviceID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,7 +47,7 @@ public class CachedSchemaPatternMatcher implements
PipeDataRegionMatcher {
protected final ReentrantReadWriteLock lock;
protected final Set<PipeRealtimeDataRegionExtractor> extractors;
- protected final Cache<String, Set<PipeRealtimeDataRegionExtractor>>
deviceToExtractorsCache;
+ protected final Cache<IDeviceID, Set<PipeRealtimeDataRegionExtractor>>
deviceToExtractorsCache;
public CachedSchemaPatternMatcher() {
this.lock = new ReentrantReadWriteLock();
@@ -114,8 +115,8 @@ public class CachedSchemaPatternMatcher implements
PipeDataRegionMatcher {
.collect(Collectors.toSet());
}
- for (final Map.Entry<String, String[]> entry :
event.getSchemaInfo().entrySet()) {
- final String device = entry.getKey();
+ for (final Map.Entry<IDeviceID, String[]> entry :
event.getSchemaInfo().entrySet()) {
+ final IDeviceID device = entry.getKey();
final String[] measurements = entry.getValue();
// 1. try to get matched extractors from cache, if not success, match
them by device
@@ -176,7 +177,7 @@ public class CachedSchemaPatternMatcher implements
PipeDataRegionMatcher {
return matchedExtractors;
}
- protected Set<PipeRealtimeDataRegionExtractor>
filterExtractorsByDevice(final String device) {
+ protected Set<PipeRealtimeDataRegionExtractor>
filterExtractorsByDevice(final IDeviceID device) {
final Set<PipeRealtimeDataRegionExtractor> filteredExtractors = new
HashSet<>();
for (final PipeRealtimeDataRegionExtractor extractor : extractors) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementPatternParseVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementPatternParseVisitor.java
index 6535e6acfe8..26c22f89b3a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementPatternParseVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementPatternParseVisitor.java
@@ -59,7 +59,7 @@ public class PipeStatementPatternParseVisitor
public Optional<Statement> visitCreateTimeseries(
final CreateTimeSeriesStatement statement, final IoTDBPipePattern
pattern) {
return pattern.matchesMeasurement(
- statement.getPath().getDeviceString(),
statement.getPath().getMeasurement())
+ statement.getPath().getIDeviceID(),
statement.getPath().getMeasurement())
? Optional.of(statement)
: Optional.empty();
}
@@ -72,7 +72,7 @@ public class PipeStatementPatternParseVisitor
.filter(
index ->
pattern.matchesMeasurement(
- statement.getDevicePath().getFullPath(),
+ statement.getDevicePath().getIDeviceIDAsFullDevice(),
statement.getMeasurements().get(index)))
.toArray();
if (filteredIndexes.length == 0) {
@@ -118,7 +118,10 @@ public class PipeStatementPatternParseVisitor
.filter(
index ->
pattern.matchesMeasurement(
-
createLogicalViewStatement.getTargetPathList().get(index).getDeviceString(),
+ createLogicalViewStatement
+ .getTargetPathList()
+ .get(index)
+ .getIDeviceIDAsFullDevice(),
createLogicalViewStatement.getTargetPathList().get(index).getMeasurement()))
.toArray();
if (filteredIndexes.length == 0) {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
index 1e7587153fb..07855ebe829 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
@@ -29,6 +29,8 @@ import
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.tsfile.common.constant.TsFileConstant;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.StringArrayDeviceID;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -85,7 +87,7 @@ public class CachedSchemaPatternMatcherTest {
new PipeParameters(
new HashMap<String, String>() {
{
- put(PipeExtractorConstant.EXTRACTOR_PATTERN_KEY, "root." +
finalI1);
+ put(PipeExtractorConstant.EXTRACTOR_PATTERN_KEY, "root.db" +
finalI1);
}
}),
new PipeTaskRuntimeConfiguration(
@@ -101,7 +103,7 @@ public class CachedSchemaPatternMatcherTest {
{
put(
PipeExtractorConstant.EXTRACTOR_PATTERN_KEY,
- "root." + finalI + "." + finalJ);
+ "root.db" + finalI + ".s" + finalJ);
}
}),
new PipeTaskRuntimeConfiguration(
@@ -116,18 +118,22 @@ public class CachedSchemaPatternMatcherTest {
int epochNum = 10000;
int deviceNum = 1000;
int seriesNum = 100;
- Map<String, String[]> deviceMap =
+ Map<IDeviceID, String[]> deviceMap =
IntStream.range(0, deviceNum)
.mapToObj(String::valueOf)
- .collect(Collectors.toMap(s -> "root." + s, s -> new String[0]));
+ .collect(
+ Collectors.toMap(s -> new StringArrayDeviceID("root.db" + s),
s -> new String[0]));
String[] measurements =
- IntStream.range(0,
seriesNum).mapToObj(String::valueOf).toArray(String[]::new);
+ IntStream.range(0, seriesNum).mapToObj(num -> "s" +
num).toArray(String[]::new);
long totalTime = 0;
for (int i = 0; i < epochNum; i++) {
for (int j = 0; j < deviceNum; j++) {
PipeRealtimeEvent event =
new PipeRealtimeEvent(
- null, null, Collections.singletonMap("root." + i,
measurements), null);
+ null,
+ null,
+ Collections.singletonMap(new StringArrayDeviceID("root.db" +
i), measurements),
+ null);
long startTime = System.currentTimeMillis();
matcher.match(event).forEach(extractor -> extractor.extract(event));
totalTime += (System.currentTimeMillis() - startTime);
@@ -174,7 +180,8 @@ public class CachedSchemaPatternMatcherTest {
} else {
match[0] =
match[0]
- || (getPatternString().startsWith(k) ||
k.startsWith(getPatternString()));
+ || (getPatternString().startsWith(k.toString())
+ || k.toString().startsWith(getPatternString()));
}
});
Assert.assertTrue(match[0]);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/IoTDBPipePatternTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/IoTDBPipePatternTest.java
index db42d713012..d443577df26 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/IoTDBPipePatternTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/IoTDBPipePatternTest.java
@@ -22,6 +22,8 @@ package org.apache.iotdb.db.pipe.pattern;
import org.apache.iotdb.commons.pipe.pattern.IoTDBPipePattern;
import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.StringArrayDeviceID;
import org.junit.Assert;
import org.junit.Test;
@@ -62,7 +64,7 @@ public class IoTDBPipePatternTest {
Assert.assertFalse(new IoTDBPipePattern(t).coversDb(db));
}
- final String device = "root.db.d1";
+ final IDeviceID device = new StringArrayDeviceID("root.db.d1");
// Test pattern cover device
final String[] patternsCoverDevice = {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/PrefixPipePatternTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/PrefixPipePatternTest.java
index d81c50a57ce..6008303b3da 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/PrefixPipePatternTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/PrefixPipePatternTest.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.pipe.pattern;
import org.apache.iotdb.commons.pipe.pattern.PrefixPipePattern;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.StringArrayDeviceID;
import org.junit.Assert;
import org.junit.Test;
@@ -57,7 +59,7 @@ public class PrefixPipePatternTest {
Assert.assertFalse(new PrefixPipePattern(t).coversDb(db));
}
- final String device = "root.db.d1";
+ final IDeviceID device = new StringArrayDeviceID("root.db.d1");
// Test pattern cover device
final String[] patternsCoverDevice = {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/IoTDBPipePattern.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/IoTDBPipePattern.java
index 6c82c7e3d9c..b270a2b0700 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/IoTDBPipePattern.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/IoTDBPipePattern.java
@@ -28,6 +28,8 @@ import
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.tsfile.file.metadata.IDeviceID;
+
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -83,7 +85,7 @@ public class IoTDBPipePattern extends PipePattern {
}
@Override
- public boolean coversDevice(final String device) {
+ public boolean coversDevice(final IDeviceID device) {
try {
return patternPartialPath.include(
new PartialPath(device, IoTDBConstant.ONE_LEVEL_PATH_WILDCARD));
@@ -93,7 +95,7 @@ public class IoTDBPipePattern extends PipePattern {
}
@Override
- public boolean mayOverlapWithDevice(final String device) {
+ public boolean mayOverlapWithDevice(final IDeviceID device) {
try {
// Another way is to use patternPath.overlapWith("device.*"),
// there will be no false positives but time cost may be higher.
@@ -104,7 +106,7 @@ public class IoTDBPipePattern extends PipePattern {
}
@Override
- public boolean matchesMeasurement(final String device, final String
measurement) {
+ public boolean matchesMeasurement(final IDeviceID device, final String
measurement) {
// For aligned timeseries, empty measurement is an alias of the time
column.
if (Objects.isNull(measurement) || measurement.isEmpty()) {
return false;
@@ -118,14 +120,17 @@ public class IoTDBPipePattern extends PipePattern {
}
/**
- * Check if the {@link PipePattern} matches the given prefix path.
- *
- * <p>NOTE: In schema transmission, {@link #mayOverlapWithDevice(String)}
can be used to detect
- * whether the given path can act as a parent path of the {@link
PipePattern}, and to transmit
- * possibly used schemas like database creation and template setting.
+ * Check if the {@link PipePattern} matches the given prefix path. In schema
transmission, this
+ * can be used to detect whether the given path can act as a parent path of
the {@link
+ * PipePattern}, and to transmit possibly used schemas like database
creation and template
+ * setting.
*/
public boolean matchPrefixPath(final String path) {
- return mayOverlapWithDevice(path);
+ try {
+ return patternPartialPath.matchPrefixPath(new PartialPath(path));
+ } catch (final IllegalPathException e) {
+ return false;
+ }
}
/**
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/PipePattern.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/PipePattern.java
index 803c24621cf..510cd02b8ed 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/PipePattern.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/PipePattern.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.commons.pipe.pattern;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.tsfile.file.metadata.IDeviceID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -108,7 +109,7 @@ public abstract class PipePattern {
public abstract boolean coversDb(final String db);
/** Check if a device's all measurements are covered by this pattern. */
- public abstract boolean coversDevice(final String device);
+ public abstract boolean coversDevice(final IDeviceID device);
/**
* Check if a device may have some measurements matched by the pattern.
@@ -118,14 +119,14 @@ public abstract class PipePattern {
* <p>NOTE2: this is just a loose check and may have false positives. To
further check if a
* measurement matches the pattern, please use {@link
PipePattern#matchesMeasurement} after this.
*/
- public abstract boolean mayOverlapWithDevice(final String device);
+ public abstract boolean mayOverlapWithDevice(final IDeviceID device);
/**
* Check if a full path with device and measurement can be matched by
pattern.
*
- * <p>NOTE: this is only called when {@link
PipePattern#mayOverlapWithDevice(String)} is true.
+ * <p>NOTE: this is only called when {@link
PipePattern#mayOverlapWithDevice} is true.
*/
- public abstract boolean matchesMeasurement(final String device, final String
measurement);
+ public abstract boolean matchesMeasurement(final IDeviceID device, final
String measurement);
@Override
public String toString() {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/PrefixPipePattern.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/PrefixPipePattern.java
index 52f70057052..50e3c463a72 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/PrefixPipePattern.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/PrefixPipePattern.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.tsfile.common.constant.TsFileConstant;
+import org.apache.tsfile.file.metadata.IDeviceID;
import java.util.Arrays;
@@ -78,28 +79,31 @@ public class PrefixPipePattern extends PipePattern {
}
@Override
- public boolean coversDevice(final String device) {
+ public boolean coversDevice(final IDeviceID device) {
+ final String deviceStr = device.toString();
// for example, pattern is root.a.b and device is root.a.b.c
// in this case, the extractor can be matched without checking the
measurements
- return pattern.length() <= device.length() && device.startsWith(pattern);
+ return pattern.length() <= deviceStr.length() &&
deviceStr.startsWith(pattern);
}
@Override
- public boolean mayOverlapWithDevice(final String device) {
+ public boolean mayOverlapWithDevice(final IDeviceID device) {
+ final String deviceStr = device.toString();
return (
// for example, pattern is root.a.b and device is root.a.b.c
// in this case, the extractor can be matched without checking the
measurements
- pattern.length() <= device.length() && device.startsWith(pattern))
+ pattern.length() <= deviceStr.length() &&
deviceStr.startsWith(pattern))
// for example, pattern is root.a.b.c and device is root.a.b
// in this case, the extractor can be selected as candidate, but the
measurements should
// be checked further
- || (pattern.length() > device.length() && pattern.startsWith(device));
+ || (pattern.length() > deviceStr.length() &&
pattern.startsWith(deviceStr));
}
@Override
- public boolean matchesMeasurement(final String device, final String
measurement) {
+ public boolean matchesMeasurement(final IDeviceID device, String
measurement) {
+ final String deviceStr = device.toString();
// We assume that the device is already matched.
- if (pattern.length() <= device.length()) {
+ if (pattern.length() <= deviceStr.length()) {
return true;
}
@@ -109,9 +113,9 @@ public class PrefixPipePattern extends PipePattern {
final String dotAndMeasurement = TsFileConstant.PATH_SEPARATOR +
measurement;
return
// low cost check comes first
- pattern.length() <= device.length() + dotAndMeasurement.length()
+ pattern.length() <= deviceStr.length() + dotAndMeasurement.length()
// high cost check comes later
- && dotAndMeasurement.startsWith(pattern.substring(device.length()));
+ && dotAndMeasurement.startsWith(pattern.substring(deviceStr.length()));
}
@Override