This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch ty/TableModelGrammar
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ty/TableModelGrammar by this 
push:
     new 57d6953e24a Pipe: Replaced IDeviceID in pipe module (#12556)
57d6953e24a is described below

commit 57d6953e24ae4ed3f3a778be3ddcbf9186a4c950
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 30 14:44:29 2024 +0800

    Pipe: Replaced IDeviceID in pipe module (#12556)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../tablet/PipeInsertNodeTabletInsertionEvent.java |  6 ++---
 .../tablet/TabletInsertionDataContainer.java       | 28 +++++++++++++++++-----
 .../tsfile/TsFileInsertionDataContainer.java       |  9 ++++---
 .../tsfile/TsFileInsertionDataTabletIterator.java  | 11 ++++++---
 .../common/tsfile/TsFileInsertionPointCounter.java |  8 +++----
 .../db/pipe/event/realtime/PipeRealtimeEvent.java  | 10 ++++----
 .../realtime/epoch/TsFileEpochManager.java         | 10 ++++----
 .../schemaregion/PipePlanPatternParseVisitor.java  | 22 ++++++++++-------
 .../pipe/pattern/CachedSchemaPatternMatcher.java   |  9 +++----
 .../visitor/PipeStatementPatternParseVisitor.java  |  9 ++++---
 .../pattern/CachedSchemaPatternMatcherTest.java    | 21 ++++++++++------
 .../db/pipe/pattern/IoTDBPipePatternTest.java      |  4 +++-
 .../db/pipe/pattern/PrefixPipePatternTest.java     |  4 +++-
 .../commons/pipe/pattern/IoTDBPipePattern.java     | 23 +++++++++++-------
 .../iotdb/commons/pipe/pattern/PipePattern.java    |  9 +++----
 .../commons/pipe/pattern/PrefixPipePattern.java    | 22 ++++++++++-------
 16 files changed, 127 insertions(+), 78 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 11711fa1d03..c4fd1a133c2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -322,10 +322,8 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
                 ? ((InsertRowsNode) node)
                     .getInsertRowNodeList().stream()
                         .anyMatch(
-                            insertRowNode ->
-                                !pipePattern.coversDevice(
-                                    
insertRowNode.getDevicePath().getFullPath()))
-                : 
!pipePattern.coversDevice(node.getDevicePath().getFullPath())));
+                            insertRowNode -> 
!pipePattern.coversDevice(insertRowNode.getDeviceID()))
+                : !pipePattern.coversDevice(node.getDeviceID())));
   }
 
   public List<PipeRawTabletInsertionEvent> toRawTabletInsertionEvents() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
index ae17883a487..c7a563bcb05 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
@@ -33,6 +33,8 @@ import org.apache.iotdb.pipe.api.collector.RowCollector;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 
 import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.StringArrayDeviceID;
 import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.BitMap;
 import org.apache.tsfile.write.UnSupportedDataTypeException;
@@ -62,7 +64,9 @@ public class TabletInsertionDataContainer {
   private final EnrichedEvent
       sourceEvent; // used to report progress and filter value columns by time 
range
 
-  private String deviceId;
+  // Used to preserve performance
+  private String deviceStr;
+  private IDeviceID deviceId;
   private boolean isAligned;
   private IMeasurementSchema[] measurementSchemaList;
   private String[] columnNameStringList;
@@ -138,7 +142,9 @@ public class TabletInsertionDataContainer {
     final int originColumnSize = insertRowNode.getMeasurements().length;
     final Integer[] originColumnIndex2FilteredColumnIndexMapperList = new 
Integer[originColumnSize];
 
-    this.deviceId = insertRowNode.getDevicePath().getFullPath();
+    // The full path is always cached when device path is deserialized
+    this.deviceStr = insertRowNode.getDevicePath().getFullPath();
+    this.deviceId = insertRowNode.getDeviceID();
     this.isAligned = insertRowNode.isAligned();
 
     final long[] originTimestampColumn = new long[] {insertRowNode.getTime()};
@@ -205,7 +211,9 @@ public class TabletInsertionDataContainer {
     final int originColumnSize = insertTabletNode.getMeasurements().length;
     final Integer[] originColumnIndex2FilteredColumnIndexMapperList = new 
Integer[originColumnSize];
 
-    this.deviceId = insertTabletNode.getDevicePath().getFullPath();
+    // The full path is always cached when device path is deserialized
+    this.deviceStr = insertTabletNode.getDevicePath().getFullPath();
+    this.deviceId = insertTabletNode.getDeviceID();
     this.isAligned = insertTabletNode.isAligned();
 
     final long[] originTimestampColumn = insertTabletNode.getTimes();
@@ -288,7 +296,9 @@ public class TabletInsertionDataContainer {
     final int originColumnSize = tablet.getSchemas().size();
     final Integer[] originColumnIndex2FilteredColumnIndexMapperList = new 
Integer[originColumnSize];
 
-    this.deviceId = tablet.getDeviceId();
+    // Only support tree-model tablet
+    this.deviceStr = tablet.getDeviceId();
+    this.deviceId = new StringArrayDeviceID(tablet.getDeviceId());
     this.isAligned = isAligned;
 
     final long[] originTimestampColumn =
@@ -567,9 +577,10 @@ public class TabletInsertionDataContainer {
     final PipeRowCollector rowCollector = new PipeRowCollector(pipeTaskMeta, 
sourceEvent);
     for (int i = 0; i < rowCount; i++) {
       consumer.accept(
+          // Used for tree model
           new PipeRow(
               i,
-              deviceId,
+              getDeviceStr(),
               isAligned,
               measurementSchemaList,
               timestampColumn,
@@ -595,7 +606,8 @@ public class TabletInsertionDataContainer {
       return tablet;
     }
 
-    final Tablet newTablet = new Tablet(deviceId, 
Arrays.asList(measurementSchemaList), rowCount);
+    final Tablet newTablet =
+        new Tablet(getDeviceStr(), Arrays.asList(measurementSchemaList), 
rowCount);
     newTablet.timestamps = timestampColumn;
     newTablet.bitMaps = nullValueColumnBitmaps;
     newTablet.values = valueColumns;
@@ -605,4 +617,8 @@ public class TabletInsertionDataContainer {
 
     return tablet;
   }
+
+  private String getDeviceStr() {
+    return Objects.nonNull(deviceStr) ? deviceStr : deviceId.toString();
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
index 1eccb7c03be..ee8c4e62bb8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
@@ -146,11 +146,10 @@ public class TsFileInsertionDataContainer implements 
AutoCloseable {
     final Map<IDeviceID, List<String>> filteredDeviceMeasurementsMap = new 
HashMap<>();
     for (Map.Entry<IDeviceID, List<String>> entry : 
originalDeviceMeasurementsMap.entrySet()) {
       final IDeviceID deviceId = entry.getKey();
-      String deviceStr = deviceId.toString();
 
       // case 1: for example, pattern is root.a.b or pattern is null and 
device is root.a.b.c
       // in this case, all data can be matched without checking the 
measurements
-      if (Objects.isNull(pattern) || pattern.isRoot() || 
pattern.coversDevice(deviceStr)) {
+      if (Objects.isNull(pattern) || pattern.isRoot() || 
pattern.coversDevice(deviceId)) {
         if (!entry.getValue().isEmpty()) {
           filteredDeviceMeasurementsMap.put(deviceId, entry.getValue());
         }
@@ -158,11 +157,11 @@ public class TsFileInsertionDataContainer implements 
AutoCloseable {
 
       // case 2: for example, pattern is root.a.b.c and device is root.a.b
       // in this case, we need to check the full path
-      else if (pattern.mayOverlapWithDevice(deviceStr)) {
+      else if (pattern.mayOverlapWithDevice(deviceId)) {
         final List<String> filteredMeasurements = new ArrayList<>();
 
         for (final String measurement : entry.getValue()) {
-          if (pattern.matchesMeasurement(deviceStr, measurement)) {
+          if (pattern.matchesMeasurement(deviceId, measurement)) {
             filteredMeasurements.add(measurement);
           } else {
             // Parse pattern iff there are measurements filtered out
@@ -221,7 +220,7 @@ public class TsFileInsertionDataContainer implements 
AutoCloseable {
                     new TsFileInsertionDataTabletIterator(
                         tsFileReader,
                         measurementDataTypeMap,
-                        entry.getKey().toString(),
+                        entry.getKey(),
                         entry.getValue(),
                         timeFilterExpression);
               } catch (final IOException e) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataTabletIterator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataTabletIterator.java
index f64b1fb353a..05b34ad026f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataTabletIterator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataTabletIterator.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
 
 import org.apache.tsfile.common.constant.TsFileConstant;
 import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.read.TsFileReader;
 import org.apache.tsfile.read.common.Field;
 import org.apache.tsfile.read.common.Path;
@@ -48,7 +49,7 @@ public class TsFileInsertionDataTabletIterator implements 
Iterator<Tablet> {
   private final TsFileReader tsFileReader;
   private final Map<String, TSDataType> measurementDataTypeMap;
 
-  private final String deviceId;
+  private final IDeviceID deviceId;
   private final List<String> measurements;
 
   private final IExpression timeFilterExpression;
@@ -58,7 +59,7 @@ public class TsFileInsertionDataTabletIterator implements 
Iterator<Tablet> {
   public TsFileInsertionDataTabletIterator(
       TsFileReader tsFileReader,
       Map<String, TSDataType> measurementDataTypeMap,
-      String deviceId,
+      IDeviceID deviceId,
       List<String> measurements,
       IExpression timeFilterExpression)
       throws IOException {
@@ -118,7 +119,11 @@ public class TsFileInsertionDataTabletIterator implements 
Iterator<Tablet> {
       schemas.add(new MeasurementSchema(measurement, dataType));
     }
     final Tablet tablet =
-        new Tablet(deviceId, schemas, 
PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
+        new Tablet(
+            // Used for tree model
+            deviceId.toString(),
+            schemas,
+            PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
     tablet.initBitMaps();
 
     while (queryDataSet.hasNext()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionPointCounter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionPointCounter.java
index f76c220dd2b..4f61e82824c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionPointCounter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionPointCounter.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.db.pipe.event.common.tsfile;
 import org.apache.iotdb.commons.pipe.pattern.PipePattern;
 
 import org.apache.tsfile.file.metadata.IDeviceID;
-import org.apache.tsfile.file.metadata.PlainDeviceID;
 import org.apache.tsfile.file.metadata.TimeseriesMetadata;
 import org.apache.tsfile.read.TsFileSequenceReader;
 import org.slf4j.Logger;
@@ -83,14 +82,13 @@ public class TsFileInsertionPointCounter implements 
AutoCloseable {
 
     for (final Map.Entry<IDeviceID, List<String>> entry :
         originalDeviceMeasurementsMap.entrySet()) {
-      final String deviceId = ((PlainDeviceID) entry.getKey()).toStringID();
+      final IDeviceID deviceId = entry.getKey();
 
       // case 1: for example, pattern is root.a.b or pattern is null and 
device is root.a.b.c
       // in this case, all data can be matched without checking the 
measurements
       if (Objects.isNull(pattern) || pattern.isRoot() || 
pattern.coversDevice(deviceId)) {
         if (!entry.getValue().isEmpty()) {
-          filteredDeviceMeasurementsMap.put(
-              new PlainDeviceID(deviceId), new HashSet<>(entry.getValue()));
+          filteredDeviceMeasurementsMap.put(deviceId, new 
HashSet<>(entry.getValue()));
         }
       }
 
@@ -109,7 +107,7 @@ public class TsFileInsertionPointCounter implements 
AutoCloseable {
         }
 
         if (!filteredMeasurements.isEmpty()) {
-          filteredDeviceMeasurementsMap.put(new PlainDeviceID(deviceId), 
filteredMeasurements);
+          filteredDeviceMeasurementsMap.put(deviceId, filteredMeasurements);
         }
       }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
index 77df1f0292b..0717dc4d5c3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.commons.pipe.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.epoch.TsFileEpoch;
 
+import org.apache.tsfile.file.metadata.IDeviceID;
+
 import java.util.Map;
 
 /**
@@ -37,12 +39,12 @@ public class PipeRealtimeEvent extends EnrichedEvent {
   private final EnrichedEvent event;
   private final TsFileEpoch tsFileEpoch;
 
-  private Map<String, String[]> device2Measurements;
+  private Map<IDeviceID, String[]> device2Measurements;
 
   public PipeRealtimeEvent(
       final EnrichedEvent event,
       final TsFileEpoch tsFileEpoch,
-      final Map<String, String[]> device2Measurements,
+      final Map<IDeviceID, String[]> device2Measurements,
       final PipePattern pattern) {
     this(event, tsFileEpoch, device2Measurements, null, pattern, 
Long.MIN_VALUE, Long.MAX_VALUE);
   }
@@ -50,7 +52,7 @@ public class PipeRealtimeEvent extends EnrichedEvent {
   public PipeRealtimeEvent(
       final EnrichedEvent event,
       final TsFileEpoch tsFileEpoch,
-      final Map<String, String[]> device2Measurements,
+      final Map<IDeviceID, String[]> device2Measurements,
       final PipeTaskMeta pipeTaskMeta,
       final PipePattern pattern,
       final long startTime,
@@ -73,7 +75,7 @@ public class PipeRealtimeEvent extends EnrichedEvent {
     return tsFileEpoch;
   }
 
-  public Map<String, String[]> getSchemaInfo() {
+  public Map<IDeviceID, String[]> getSchemaInfo() {
     return device2Measurements;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpochManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpochManager.java
index e0bfabb0c63..e4244a1335e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpochManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpochManager.java
@@ -26,6 +26,8 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 
+import com.google.common.base.Functions;
+import org.apache.tsfile.file.metadata.IDeviceID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -68,7 +70,7 @@ public class TsFileEpochManager {
         event,
         epoch,
         resource.getDevices().stream()
-            .collect(Collectors.toMap(Object::toString, device -> 
EMPTY_MEASUREMENT_ARRAY)),
+            .collect(Collectors.toMap(Functions.identity(), device -> 
EMPTY_MEASUREMENT_ARRAY)),
         event.getPipePattern());
   }
 
@@ -82,16 +84,16 @@ public class TsFileEpochManager {
         epoch,
         node instanceof InsertRowsNode
             ? getDevice2MeasurementsMapFromInsertRowsNode((InsertRowsNode) 
node)
-            : Collections.singletonMap(node.getDevicePath().getFullPath(), 
node.getMeasurements()),
+            : Collections.singletonMap(node.getDeviceID(), 
node.getMeasurements()),
         event.getPipePattern());
   }
 
-  private Map<String, String[]> getDevice2MeasurementsMapFromInsertRowsNode(
+  private Map<IDeviceID, String[]> getDevice2MeasurementsMapFromInsertRowsNode(
       InsertRowsNode insertRowsNode) {
     return insertRowsNode.getInsertRowNodeList().stream()
         .collect(
             Collectors.toMap(
-                insertRowNode -> insertRowNode.getDevicePath().getFullPath(),
+                InsertNode::getDeviceID,
                 InsertNode::getMeasurements,
                 (oldMeasurements, newMeasurements) ->
                     Stream.of(Arrays.asList(oldMeasurements), 
Arrays.asList(newMeasurements))
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/PipePlanPatternParseVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/PipePlanPatternParseVisitor.java
index 79dbf04ba42..472b1de1073 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/PipePlanPatternParseVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/PipePlanPatternParseVisitor.java
@@ -39,6 +39,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.vie
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.CreateLogicalViewNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
 
+import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.utils.Pair;
 
 import java.util.Arrays;
@@ -74,7 +75,7 @@ public class PipePlanPatternParseVisitor extends 
PlanVisitor<Optional<PlanNode>,
   public Optional<PlanNode> visitCreateTimeSeries(
       final CreateTimeSeriesNode node, final IoTDBPipePattern pattern) {
     return pattern.matchesMeasurement(
-            node.getPath().getDeviceString(), node.getPath().getMeasurement())
+            node.getPath().getIDeviceID(), node.getPath().getMeasurement())
         ? Optional.of(node)
         : Optional.empty();
   }
@@ -87,7 +88,8 @@ public class PipePlanPatternParseVisitor extends 
PlanVisitor<Optional<PlanNode>,
             .filter(
                 index ->
                     pattern.matchesMeasurement(
-                        node.getDevicePath().getFullPath(), 
node.getMeasurements().get(index)))
+                        node.getDevicePath().getIDeviceIDAsFullDevice(),
+                        node.getMeasurements().get(index)))
             .toArray();
     return filteredIndexes.length > 0
         ? Optional.of(
@@ -115,7 +117,7 @@ public class PipePlanPatternParseVisitor extends 
PlanVisitor<Optional<PlanNode>,
                     new Pair<>(
                         entry.getKey(),
                         trimMeasurementGroup(
-                            entry.getKey().getFullPath(), entry.getValue(), 
pattern)))
+                            entry.getKey().getIDeviceIDAsFullDevice(), 
entry.getValue(), pattern)))
             .filter(pair -> Objects.nonNull(pair.getRight()))
             .collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
     return !filteredMeasurementGroupMap.isEmpty()
@@ -125,7 +127,7 @@ public class PipePlanPatternParseVisitor extends 
PlanVisitor<Optional<PlanNode>,
   }
 
   private static MeasurementGroup trimMeasurementGroup(
-      final String device, final MeasurementGroup group, final 
IoTDBPipePattern pattern) {
+      final IDeviceID device, final MeasurementGroup group, final 
IoTDBPipePattern pattern) {
     final int[] filteredIndexes =
         IntStream.range(0, group.size())
             .filter(index -> pattern.matchesMeasurement(device, 
group.getMeasurements().get(index)))
@@ -154,7 +156,7 @@ public class PipePlanPatternParseVisitor extends 
PlanVisitor<Optional<PlanNode>,
   public Optional<PlanNode> visitAlterTimeSeries(
       final AlterTimeSeriesNode node, final IoTDBPipePattern pattern) {
     return pattern.matchesMeasurement(
-            node.getPath().getDeviceString(), node.getPath().getMeasurement())
+            node.getPath().getIDeviceID(), node.getPath().getMeasurement())
         ? Optional.of(node)
         : Optional.empty();
   }
@@ -165,7 +167,9 @@ public class PipePlanPatternParseVisitor extends 
PlanVisitor<Optional<PlanNode>,
     final MeasurementGroup group =
         pattern.matchPrefixPath(node.getDevicePath().getFullPath())
             ? trimMeasurementGroup(
-                node.getDevicePath().getFullPath(), 
node.getMeasurementGroup(), pattern)
+                node.getDevicePath().getIDeviceIDAsFullDevice(),
+                node.getMeasurementGroup(),
+                pattern)
             : null;
     return Objects.nonNull(group)
         ? Optional.of(
@@ -209,7 +213,7 @@ public class PipePlanPatternParseVisitor extends 
PlanVisitor<Optional<PlanNode>,
                         new Pair<>(
                             entry.getValue().getLeft(),
                             trimMeasurementGroup(
-                                entry.getKey().getFullPath(),
+                                entry.getKey().getIDeviceIDAsFullDevice(),
                                 entry.getValue().getRight(),
                                 pattern))))
             .filter(pair -> Objects.nonNull(pair.getRight().getRight()))
@@ -241,7 +245,7 @@ public class PipePlanPatternParseVisitor extends 
PlanVisitor<Optional<PlanNode>,
             .filter(
                 entry ->
                     pattern.matchesMeasurement(
-                        entry.getKey().getDeviceString(), 
entry.getKey().getMeasurement()))
+                        entry.getKey().getIDeviceID(), 
entry.getKey().getMeasurement()))
             .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
     return !filteredViewPathToSourceMap.isEmpty()
         ? Optional.of(new CreateLogicalViewNode(node.getPlanNodeId(), 
filteredViewPathToSourceMap))
@@ -256,7 +260,7 @@ public class PipePlanPatternParseVisitor extends 
PlanVisitor<Optional<PlanNode>,
             .filter(
                 entry ->
                     pattern.matchesMeasurement(
-                        entry.getKey().getDeviceString(), 
entry.getKey().getMeasurement()))
+                        entry.getKey().getIDeviceID(), 
entry.getKey().getMeasurement()))
             .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
     return !filteredViewPathToSourceMap.isEmpty()
         ? Optional.of(new AlterLogicalViewNode(node.getPlanNodeId(), 
filteredViewPathToSourceMap))
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcher.java
index 4be44a0fecc..546d8185c3a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcher.java
@@ -28,6 +28,7 @@ import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRe
 
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.tsfile.file.metadata.IDeviceID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,7 +47,7 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
   protected final ReentrantReadWriteLock lock;
 
   protected final Set<PipeRealtimeDataRegionExtractor> extractors;
-  protected final Cache<String, Set<PipeRealtimeDataRegionExtractor>> 
deviceToExtractorsCache;
+  protected final Cache<IDeviceID, Set<PipeRealtimeDataRegionExtractor>> 
deviceToExtractorsCache;
 
   public CachedSchemaPatternMatcher() {
     this.lock = new ReentrantReadWriteLock();
@@ -114,8 +115,8 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
             .collect(Collectors.toSet());
       }
 
-      for (final Map.Entry<String, String[]> entry : 
event.getSchemaInfo().entrySet()) {
-        final String device = entry.getKey();
+      for (final Map.Entry<IDeviceID, String[]> entry : 
event.getSchemaInfo().entrySet()) {
+        final IDeviceID device = entry.getKey();
         final String[] measurements = entry.getValue();
 
         // 1. try to get matched extractors from cache, if not success, match 
them by device
@@ -176,7 +177,7 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
     return matchedExtractors;
   }
 
-  protected Set<PipeRealtimeDataRegionExtractor> 
filterExtractorsByDevice(final String device) {
+  protected Set<PipeRealtimeDataRegionExtractor> 
filterExtractorsByDevice(final IDeviceID device) {
     final Set<PipeRealtimeDataRegionExtractor> filteredExtractors = new 
HashSet<>();
 
     for (final PipeRealtimeDataRegionExtractor extractor : extractors) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementPatternParseVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementPatternParseVisitor.java
index 6535e6acfe8..26c22f89b3a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementPatternParseVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementPatternParseVisitor.java
@@ -59,7 +59,7 @@ public class PipeStatementPatternParseVisitor
   public Optional<Statement> visitCreateTimeseries(
       final CreateTimeSeriesStatement statement, final IoTDBPipePattern 
pattern) {
     return pattern.matchesMeasurement(
-            statement.getPath().getDeviceString(), 
statement.getPath().getMeasurement())
+            statement.getPath().getIDeviceID(), 
statement.getPath().getMeasurement())
         ? Optional.of(statement)
         : Optional.empty();
   }
@@ -72,7 +72,7 @@ public class PipeStatementPatternParseVisitor
             .filter(
                 index ->
                     pattern.matchesMeasurement(
-                        statement.getDevicePath().getFullPath(),
+                        statement.getDevicePath().getIDeviceIDAsFullDevice(),
                         statement.getMeasurements().get(index)))
             .toArray();
     if (filteredIndexes.length == 0) {
@@ -118,7 +118,10 @@ public class PipeStatementPatternParseVisitor
             .filter(
                 index ->
                     pattern.matchesMeasurement(
-                        
createLogicalViewStatement.getTargetPathList().get(index).getDeviceString(),
+                        createLogicalViewStatement
+                            .getTargetPathList()
+                            .get(index)
+                            .getIDeviceIDAsFullDevice(),
                         
createLogicalViewStatement.getTargetPathList().get(index).getMeasurement()))
             .toArray();
     if (filteredIndexes.length == 0) {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
index 1e7587153fb..07855ebe829 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
@@ -29,6 +29,8 @@ import 
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 
 import org.apache.tsfile.common.constant.TsFileConstant;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.StringArrayDeviceID;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -85,7 +87,7 @@ public class CachedSchemaPatternMatcherTest {
           new PipeParameters(
               new HashMap<String, String>() {
                 {
-                  put(PipeExtractorConstant.EXTRACTOR_PATTERN_KEY, "root." + 
finalI1);
+                  put(PipeExtractorConstant.EXTRACTOR_PATTERN_KEY, "root.db" + 
finalI1);
                 }
               }),
           new PipeTaskRuntimeConfiguration(
@@ -101,7 +103,7 @@ public class CachedSchemaPatternMatcherTest {
                   {
                     put(
                         PipeExtractorConstant.EXTRACTOR_PATTERN_KEY,
-                        "root." + finalI + "." + finalJ);
+                        "root.db" + finalI + ".s" + finalJ);
                   }
                 }),
             new PipeTaskRuntimeConfiguration(
@@ -116,18 +118,22 @@ public class CachedSchemaPatternMatcherTest {
     int epochNum = 10000;
     int deviceNum = 1000;
     int seriesNum = 100;
-    Map<String, String[]> deviceMap =
+    Map<IDeviceID, String[]> deviceMap =
         IntStream.range(0, deviceNum)
             .mapToObj(String::valueOf)
-            .collect(Collectors.toMap(s -> "root." + s, s -> new String[0]));
+            .collect(
+                Collectors.toMap(s -> new StringArrayDeviceID("root.db" + s), 
s -> new String[0]));
     String[] measurements =
-        IntStream.range(0, 
seriesNum).mapToObj(String::valueOf).toArray(String[]::new);
+        IntStream.range(0, seriesNum).mapToObj(num -> "s" + 
num).toArray(String[]::new);
     long totalTime = 0;
     for (int i = 0; i < epochNum; i++) {
       for (int j = 0; j < deviceNum; j++) {
         PipeRealtimeEvent event =
             new PipeRealtimeEvent(
-                null, null, Collections.singletonMap("root." + i, 
measurements), null);
+                null,
+                null,
+                Collections.singletonMap(new StringArrayDeviceID("root.db" + 
i), measurements),
+                null);
         long startTime = System.currentTimeMillis();
         matcher.match(event).forEach(extractor -> extractor.extract(event));
         totalTime += (System.currentTimeMillis() - startTime);
@@ -174,7 +180,8 @@ public class CachedSchemaPatternMatcherTest {
                 } else {
                   match[0] =
                       match[0]
-                          || (getPatternString().startsWith(k) || 
k.startsWith(getPatternString()));
+                          || (getPatternString().startsWith(k.toString())
+                              || k.toString().startsWith(getPatternString()));
                 }
               });
       Assert.assertTrue(match[0]);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/IoTDBPipePatternTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/IoTDBPipePatternTest.java
index db42d713012..d443577df26 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/IoTDBPipePatternTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/IoTDBPipePatternTest.java
@@ -22,6 +22,8 @@ package org.apache.iotdb.db.pipe.pattern;
 import org.apache.iotdb.commons.pipe.pattern.IoTDBPipePattern;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.StringArrayDeviceID;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -62,7 +64,7 @@ public class IoTDBPipePatternTest {
       Assert.assertFalse(new IoTDBPipePattern(t).coversDb(db));
     }
 
-    final String device = "root.db.d1";
+    final IDeviceID device = new StringArrayDeviceID("root.db.d1");
 
     // Test pattern cover device
     final String[] patternsCoverDevice = {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/PrefixPipePatternTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/PrefixPipePatternTest.java
index d81c50a57ce..6008303b3da 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/PrefixPipePatternTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/PrefixPipePatternTest.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.pipe.pattern;
 
 import org.apache.iotdb.commons.pipe.pattern.PrefixPipePattern;
 
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.StringArrayDeviceID;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -57,7 +59,7 @@ public class PrefixPipePatternTest {
       Assert.assertFalse(new PrefixPipePattern(t).coversDb(db));
     }
 
-    final String device = "root.db.d1";
+    final IDeviceID device = new StringArrayDeviceID("root.db.d1");
 
     // Test pattern cover device
     final String[] patternsCoverDevice = {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/IoTDBPipePattern.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/IoTDBPipePattern.java
index 6c82c7e3d9c..b270a2b0700 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/IoTDBPipePattern.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/IoTDBPipePattern.java
@@ -28,6 +28,8 @@ import 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
 import org.apache.iotdb.commons.utils.PathUtils;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
+import org.apache.tsfile.file.metadata.IDeviceID;
+
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -83,7 +85,7 @@ public class IoTDBPipePattern extends PipePattern {
   }
 
   @Override
-  public boolean coversDevice(final String device) {
+  public boolean coversDevice(final IDeviceID device) {
     try {
       return patternPartialPath.include(
           new PartialPath(device, IoTDBConstant.ONE_LEVEL_PATH_WILDCARD));
@@ -93,7 +95,7 @@ public class IoTDBPipePattern extends PipePattern {
   }
 
   @Override
-  public boolean mayOverlapWithDevice(final String device) {
+  public boolean mayOverlapWithDevice(final IDeviceID device) {
     try {
       // Another way is to use patternPath.overlapWith("device.*"),
       // there will be no false positives but time cost may be higher.
@@ -104,7 +106,7 @@ public class IoTDBPipePattern extends PipePattern {
   }
 
   @Override
-  public boolean matchesMeasurement(final String device, final String 
measurement) {
+  public boolean matchesMeasurement(final IDeviceID device, final String 
measurement) {
     // For aligned timeseries, empty measurement is an alias of the time 
column.
     if (Objects.isNull(measurement) || measurement.isEmpty()) {
       return false;
@@ -118,14 +120,17 @@ public class IoTDBPipePattern extends PipePattern {
   }
 
   /**
-   * Check if the {@link PipePattern} matches the given prefix path.
-   *
-   * <p>NOTE: In schema transmission, {@link #mayOverlapWithDevice(String)} 
can be used to detect
-   * whether the given path can act as a parent path of the {@link 
PipePattern}, and to transmit
-   * possibly used schemas like database creation and template setting.
+   * Check if the {@link PipePattern} matches the given prefix path. In schema 
transmission, this
+   * can be used to detect whether the given path can act as a parent path of 
the {@link
+   * PipePattern}, and to transmit possibly used schemas like database 
creation and template
+   * setting.
    */
   public boolean matchPrefixPath(final String path) {
-    return mayOverlapWithDevice(path);
+    try {
+      return patternPartialPath.matchPrefixPath(new PartialPath(path));
+    } catch (final IllegalPathException e) {
+      return false;
+    }
   }
 
   /**
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/PipePattern.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/PipePattern.java
index 803c24621cf..510cd02b8ed 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/PipePattern.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/PipePattern.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.commons.pipe.pattern;
 
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 
+import org.apache.tsfile.file.metadata.IDeviceID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -108,7 +109,7 @@ public abstract class PipePattern {
   public abstract boolean coversDb(final String db);
 
   /** Check if a device's all measurements are covered by this pattern. */
-  public abstract boolean coversDevice(final String device);
+  public abstract boolean coversDevice(final IDeviceID device);
 
   /**
    * Check if a device may have some measurements matched by the pattern.
@@ -118,14 +119,14 @@ public abstract class PipePattern {
    * <p>NOTE2: this is just a loose check and may have false positives. To 
further check if a
    * measurement matches the pattern, please use {@link 
PipePattern#matchesMeasurement} after this.
    */
-  public abstract boolean mayOverlapWithDevice(final String device);
+  public abstract boolean mayOverlapWithDevice(final IDeviceID device);
 
   /**
    * Check if a full path with device and measurement can be matched by 
pattern.
    *
-   * <p>NOTE: this is only called when {@link 
PipePattern#mayOverlapWithDevice(String)} is true.
+   * <p>NOTE: this is only called when {@link 
PipePattern#mayOverlapWithDevice} is true.
    */
-  public abstract boolean matchesMeasurement(final String device, final String 
measurement);
+  public abstract boolean matchesMeasurement(final IDeviceID device, final 
String measurement);
 
   @Override
   public String toString() {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/PrefixPipePattern.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/PrefixPipePattern.java
index 52f70057052..50e3c463a72 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/PrefixPipePattern.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/PrefixPipePattern.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.utils.PathUtils;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.tsfile.common.constant.TsFileConstant;
+import org.apache.tsfile.file.metadata.IDeviceID;
 
 import java.util.Arrays;
 
@@ -78,28 +79,31 @@ public class PrefixPipePattern extends PipePattern {
   }
 
   @Override
-  public boolean coversDevice(final String device) {
+  public boolean coversDevice(final IDeviceID device) {
+    final String deviceStr = device.toString();
     // for example, pattern is root.a.b and device is root.a.b.c
     // in this case, the extractor can be matched without checking the 
measurements
-    return pattern.length() <= device.length() && device.startsWith(pattern);
+    return pattern.length() <= deviceStr.length() && 
deviceStr.startsWith(pattern);
   }
 
   @Override
-  public boolean mayOverlapWithDevice(final String device) {
+  public boolean mayOverlapWithDevice(final IDeviceID device) {
+    final String deviceStr = device.toString();
     return (
         // for example, pattern is root.a.b and device is root.a.b.c
         // in this case, the extractor can be matched without checking the 
measurements
-        pattern.length() <= device.length() && device.startsWith(pattern))
+        pattern.length() <= deviceStr.length() && 
deviceStr.startsWith(pattern))
         // for example, pattern is root.a.b.c and device is root.a.b
         // in this case, the extractor can be selected as candidate, but the 
measurements should
         // be checked further
-        || (pattern.length() > device.length() && pattern.startsWith(device));
+        || (pattern.length() > deviceStr.length() && 
pattern.startsWith(deviceStr));
   }
 
   @Override
-  public boolean matchesMeasurement(final String device, final String 
measurement) {
+  public boolean matchesMeasurement(final IDeviceID device, String 
measurement) {
+    final String deviceStr = device.toString();
     // We assume that the device is already matched.
-    if (pattern.length() <= device.length()) {
+    if (pattern.length() <= deviceStr.length()) {
       return true;
     }
 
@@ -109,9 +113,9 @@ public class PrefixPipePattern extends PipePattern {
     final String dotAndMeasurement = TsFileConstant.PATH_SEPARATOR + 
measurement;
     return
     // low cost check comes first
-    pattern.length() <= device.length() + dotAndMeasurement.length()
+    pattern.length() <= deviceStr.length() + dotAndMeasurement.length()
         // high cost check comes later
-        && dotAndMeasurement.startsWith(pattern.substring(device.length()));
+        && dotAndMeasurement.startsWith(pattern.substring(deviceStr.length()));
   }
 
   @Override


Reply via email to