This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 722e8959493 [To dev/1.3] Pipe: Optimized the path construction 
efficiency in pattern match (#16265) (#16271)
722e8959493 is described below

commit 722e89594937f0fc8893ecd9332ea124829d3764
Author: Caideyipi <[email protected]>
AuthorDate: Tue Aug 26 19:01:08 2025 +0800

    [To dev/1.3] Pipe: Optimized the path construction efficiency in pattern 
match (#16265) (#16271)
    
    * cp
    
    * some
---
 .../agent/runtime/PipeDataNodeRuntimeAgent.java    |   7 +-
 .../matcher/CachedSchemaPatternMatcher.java        | 118 ++++++++++-----------
 .../realtime/matcher/PipeDataRegionMatcher.java    |   6 +-
 .../execute/utils/CompactionPathUtils.java         |  22 ++--
 .../apache/iotdb/commons/conf/CommonConfig.java    |  14 +--
 .../iotdb/commons/pipe/config/PipeConfig.java      |   6 +-
 .../iotdb/commons/pipe/config/PipeDescriptor.java  |   6 +-
 .../datastructure/pattern/IoTDBPipePattern.java    |  22 +++-
 8 files changed, 116 insertions(+), 85 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
index 70af01b288e..81702645102 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalJobExecutor;
 import 
org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalPhantomReferenceCleaner;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.service.IService;
 import org.apache.iotdb.commons.service.ServiceType;
@@ -39,6 +40,7 @@ import 
org.apache.iotdb.db.pipe.resource.PipeDataNodeHardlinkOrCopiedFileDirStar
 import org.apache.iotdb.db.pipe.source.schemaregion.SchemaRegionListeningQueue;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.service.ResourcesInformationHolder;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionPathUtils;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 
 import org.slf4j.Logger;
@@ -69,7 +71,7 @@ public class PipeDataNodeRuntimeAgent implements IService {
   //////////////////////////// System Service Interface 
////////////////////////////
 
   public synchronized void preparePipeResources(
-      ResourcesInformationHolder resourcesInformationHolder) throws 
StartupException {
+      final ResourcesInformationHolder resourcesInformationHolder) throws 
StartupException {
     // Clean sender (connector) hardlink file dir and snapshot dir
     PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner.clean();
 
@@ -78,6 +80,9 @@ public class PipeDataNodeRuntimeAgent implements IService {
 
     PipeAgentLauncher.launchPipePluginAgent(resourcesInformationHolder);
     simpleProgressIndexAssigner.start();
+
+    IoTDBPipePattern.setDevicePathGetter(CompactionPathUtils::getPath);
+    IoTDBPipePattern.setMeasurementPathGetter(CompactionPathUtils::getPath);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
index 2c68331705f..a01b50e7e68 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
@@ -46,38 +46,38 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
 
   protected final ReentrantReadWriteLock lock;
 
-  protected final Set<PipeRealtimeDataRegionSource> extractors;
-  protected final Cache<String, Set<PipeRealtimeDataRegionSource>> 
deviceToExtractorsCache;
+  protected final Set<PipeRealtimeDataRegionSource> sources;
+  protected final Cache<String, Set<PipeRealtimeDataRegionSource>> 
deviceToSourcesCache;
 
   public CachedSchemaPatternMatcher() {
     this.lock = new ReentrantReadWriteLock();
-    // Should be thread-safe because the extractors will be returned by {@link 
#match} and
-    // iterated by {@link #assignToExtractor}, at the same time the extractors 
may be added or
+    // Should be thread-safe because the sources will be returned by {@link 
#match} and
+    // iterated by {@link #assignToSource}, at the same time the sources may 
be added or
     // removed by {@link #register} and {@link #deregister}.
-    this.extractors = new CopyOnWriteArraySet<>();
-    this.deviceToExtractorsCache =
+    this.sources = new CopyOnWriteArraySet<>();
+    this.deviceToSourcesCache =
         Caffeine.newBuilder()
-            
.maximumSize(PipeConfig.getInstance().getPipeExtractorMatcherCacheSize())
+            
.maximumSize(PipeConfig.getInstance().getPipeSourceMatcherCacheSize())
             .build();
   }
 
   @Override
-  public void register(final PipeRealtimeDataRegionSource extractor) {
+  public void register(final PipeRealtimeDataRegionSource source) {
     lock.writeLock().lock();
     try {
-      extractors.add(extractor);
-      deviceToExtractorsCache.invalidateAll();
+      sources.add(source);
+      deviceToSourcesCache.invalidateAll();
     } finally {
       lock.writeLock().unlock();
     }
   }
 
   @Override
-  public void deregister(final PipeRealtimeDataRegionSource extractor) {
+  public void deregister(final PipeRealtimeDataRegionSource source) {
     lock.writeLock().lock();
     try {
-      extractors.remove(extractor);
-      deviceToExtractorsCache.invalidateAll();
+      sources.remove(source);
+      deviceToSourcesCache.invalidateAll();
     } finally {
       lock.writeLock().unlock();
     }
@@ -87,7 +87,7 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
   public int getRegisterCount() {
     lock.readLock().lock();
     try {
-      return extractors.size();
+      return sources.size();
     } finally {
       lock.readLock().unlock();
     }
@@ -96,60 +96,60 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
   @Override
   public Pair<Set<PipeRealtimeDataRegionSource>, 
Set<PipeRealtimeDataRegionSource>> match(
       final PipeRealtimeEvent event) {
-    final Set<PipeRealtimeDataRegionSource> matchedExtractors = new 
HashSet<>();
+    final Set<PipeRealtimeDataRegionSource> matchedSources = new HashSet<>();
 
     lock.readLock().lock();
     try {
-      if (extractors.isEmpty()) {
-        return new Pair<>(matchedExtractors, extractors);
+      if (sources.isEmpty()) {
+        return new Pair<>(matchedSources, sources);
       }
 
-      // HeartbeatEvent will be assigned to all extractors
+      // HeartbeatEvent will be assigned to all sources
       if (event.getEvent() instanceof PipeHeartbeatEvent) {
-        return new Pair<>(extractors, Collections.EMPTY_SET);
+        return new Pair<>(sources, Collections.EMPTY_SET);
       }
 
-      // Deletion event will be assigned to extractors listened to it
+      // Deletion event will be assigned to sources listened to it
       if (event.getEvent() instanceof PipeSchemaRegionWritePlanEvent) {
-        extractors.stream()
+        sources.stream()
             .filter(PipeRealtimeDataRegionSource::shouldExtractDeletion)
-            .forEach(matchedExtractors::add);
-        return new Pair<>(matchedExtractors, 
findUnmatchedExtractors(matchedExtractors));
+            .forEach(matchedSources::add);
+        return new Pair<>(matchedSources, 
findUnmatchedSources(matchedSources));
       }
 
       for (final Map.Entry<String, String[]> entry : 
event.getSchemaInfo().entrySet()) {
         final String device = entry.getKey();
         final String[] measurements = entry.getValue();
 
-        // 1. try to get matched extractors from cache, if not success, match 
them by device
-        final Set<PipeRealtimeDataRegionSource> extractorsFilteredByDevice =
-            deviceToExtractorsCache.get(device, 
this::filterExtractorsByDevice);
+        // 1. try to get matched sources from cache, if not success, match 
them by device
+        final Set<PipeRealtimeDataRegionSource> sourcesFilteredByDevice =
+            deviceToSourcesCache.get(device, this::filterSourcesByDevice);
         // this would not happen
-        if (extractorsFilteredByDevice == null) {
+        if (sourcesFilteredByDevice == null) {
           LOGGER.warn("Match result NPE when handle device {}", device);
           continue;
         }
 
-        // 2. filter matched candidate extractors by measurements
+        // 2. filter matched candidate sources by measurements
         if (measurements.length == 0) {
-          // `measurements` is empty (only in case of tsfile event). match all 
extractors.
+          // `measurements` is empty (only in case of tsfile event). match all 
sources.
           //
           // case 1: the pattern can match all measurements of the device.
-          // in this case, the extractor can be matched without checking the 
measurements.
+          // in this case, the source can be matched without checking the 
measurements.
           //
           // case 2: the pattern may match some measurements of the device.
           // in this case, we can't get all measurements efficiently here,
-          // so we just ASSUME the extractor matches and do more checks later.
-          matchedExtractors.addAll(extractorsFilteredByDevice);
+          // so we just ASSUME the source matches and do more checks later.
+          matchedSources.addAll(sourcesFilteredByDevice);
         } else {
           // `measurements` is not empty (only in case of tablet event).
-          // Match extractors by measurements.
-          extractorsFilteredByDevice.forEach(
-              extractor -> {
-                final PipePattern pattern = extractor.getPipePattern();
+          // Match sources by measurements.
+          sourcesFilteredByDevice.forEach(
+              source -> {
+                final PipePattern pattern = source.getPipePattern();
                 if (Objects.isNull(pattern) || pattern.isRoot() || 
pattern.coversDevice(device)) {
                   // The pattern can match all measurements of the device.
-                  matchedExtractors.add(extractor);
+                  matchedSources.add(source);
                 } else {
                   for (final String measurement : measurements) {
                     // Ignore null measurement for partial insert
@@ -158,8 +158,8 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
                     }
 
                     if (pattern.matchesMeasurement(device, measurement)) {
-                      matchedExtractors.add(extractor);
-                      // There would be no more matched extractors because the 
measurements are
+                      matchedSources.add(source);
+                      // There would be no more matched sources because the 
measurements are
                       // unique
                       break;
                     }
@@ -168,53 +168,53 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
               });
         }
 
-        if (matchedExtractors.size() == extractors.size()) {
+        if (matchedSources.size() == sources.size()) {
           break;
         }
       }
 
-      return new Pair<>(matchedExtractors, 
findUnmatchedExtractors(matchedExtractors));
+      return new Pair<>(matchedSources, findUnmatchedSources(matchedSources));
     } finally {
       lock.readLock().unlock();
     }
   }
 
-  private Set<PipeRealtimeDataRegionSource> findUnmatchedExtractors(
-      final Set<PipeRealtimeDataRegionSource> matchedExtractors) {
-    final Set<PipeRealtimeDataRegionSource> unmatchedExtractors = new 
HashSet<>();
-    for (final PipeRealtimeDataRegionSource extractor : extractors) {
-      if (!matchedExtractors.contains(extractor)) {
-        unmatchedExtractors.add(extractor);
+  private Set<PipeRealtimeDataRegionSource> findUnmatchedSources(
+      final Set<PipeRealtimeDataRegionSource> matchedSources) {
+    final Set<PipeRealtimeDataRegionSource> unmatchedSources = new HashSet<>();
+    for (final PipeRealtimeDataRegionSource source : sources) {
+      if (!matchedSources.contains(source)) {
+        unmatchedSources.add(source);
       }
     }
-    return unmatchedExtractors;
+    return unmatchedSources;
   }
 
-  protected Set<PipeRealtimeDataRegionSource> filterExtractorsByDevice(final 
String device) {
-    final Set<PipeRealtimeDataRegionSource> filteredExtractors = new 
HashSet<>();
+  protected Set<PipeRealtimeDataRegionSource> filterSourcesByDevice(final 
String device) {
+    final Set<PipeRealtimeDataRegionSource> filteredSources = new HashSet<>();
 
-    for (final PipeRealtimeDataRegionSource extractor : extractors) {
-      // Return if the extractor only extract deletion
-      if (!extractor.shouldExtractInsertion()) {
+    for (final PipeRealtimeDataRegionSource source : sources) {
+      // Return if the source only extract deletion
+      if (!source.shouldExtractInsertion()) {
         continue;
       }
 
-      final PipePattern pipePattern = extractor.getPipePattern();
+      final PipePattern pipePattern = source.getPipePattern();
       if (Objects.isNull(pipePattern) || 
pipePattern.mayOverlapWithDevice(device)) {
-        filteredExtractors.add(extractor);
+        filteredSources.add(source);
       }
     }
 
-    return filteredExtractors;
+    return filteredSources;
   }
 
   @Override
   public void clear() {
     lock.writeLock().lock();
     try {
-      extractors.clear();
-      deviceToExtractorsCache.invalidateAll();
-      deviceToExtractorsCache.cleanUp();
+      sources.clear();
+      deviceToSourcesCache.invalidateAll();
+      deviceToSourcesCache.cleanUp();
     } finally {
       lock.writeLock().unlock();
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/PipeDataRegionMatcher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/PipeDataRegionMatcher.java
index 810e42a92b2..da5e9318f5a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/PipeDataRegionMatcher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/PipeDataRegionMatcher.java
@@ -32,10 +32,10 @@ public interface PipeDataRegionMatcher {
    * Register a extractor. If the extractor's pattern matches the event's 
schema info, the event
    * will be assigned to the extractor.
    */
-  void register(PipeRealtimeDataRegionSource extractor);
+  void register(final PipeRealtimeDataRegionSource extractor);
 
   /** Deregister a extractor. */
-  void deregister(PipeRealtimeDataRegionSource extractor);
+  void deregister(final PipeRealtimeDataRegionSource extractor);
 
   /** Get the number of registered extractors in this matcher. */
   int getRegisterCount();
@@ -48,7 +48,7 @@ public interface PipeDataRegionMatcher {
    * @return pair of matched extractors and unmatched extractors.
    */
   Pair<Set<PipeRealtimeDataRegionSource>, Set<PipeRealtimeDataRegionSource>> 
match(
-      PipeRealtimeEvent event);
+      final PipeRealtimeEvent event);
 
   /** Clear all the registered extractors and internal data structures. */
   void clear();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionPathUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionPathUtils.java
index 52c4ec4554b..63140f733a1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionPathUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionPathUtils.java
@@ -31,18 +31,26 @@ public class CompactionPathUtils {
 
   private CompactionPathUtils() {}
 
-  public static PartialPath getPath(IDeviceID device, String measurement)
+  public static PartialPath getPath(final IDeviceID device, String measurement)
       throws IllegalPathException {
     return getPath(device).concatNode(measurement);
   }
 
-  public static PartialPath getPath(IDeviceID device) throws 
IllegalPathException {
-    PartialPath path;
-    String plainDeviceId = ((PlainDeviceID) device).toStringID();
-    if (plainDeviceId.contains(TsFileConstant.BACK_QUOTE_STRING)) {
-      path = 
DataNodeDevicePathCache.getInstance().getPartialPath(plainDeviceId);
+  public static PartialPath getPath(final IDeviceID device) throws 
IllegalPathException {
+    return getPath(((PlainDeviceID) device).toStringID());
+  }
+
+  public static PartialPath getPath(final String device, String measurement)
+      throws IllegalPathException {
+    return getPath(device).concatNode(measurement);
+  }
+
+  public static PartialPath getPath(final String device) throws 
IllegalPathException {
+    final PartialPath path;
+    if (device.contains(TsFileConstant.BACK_QUOTE_STRING)) {
+      path = DataNodeDevicePathCache.getInstance().getPartialPath(device);
     } else {
-      path = new 
PartialPath(plainDeviceId.split(TsFileConstant.PATH_SEPARATER_NO_REGEX));
+      path = new 
PartialPath(device.split(TsFileConstant.PATH_SEPARATER_NO_REGEX));
     }
     return path;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 1608303e46f..29c06a55589 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -257,7 +257,7 @@ public class CommonConfig {
 
   private int pipeExtractorAssignerDisruptorRingBufferSize = 128;
   private long pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes = 72 * 
KB;
-  private long pipeExtractorMatcherCacheSize = 1024;
+  private long pipeSourceMatcherCacheSize = 1024;
 
   private int pipeConnectorHandshakeTimeoutMs = 10 * 1000; // 10 seconds
   private int pipeConnectorTransferTimeoutMs = 15 * 60 * 1000; // 15 minutes
@@ -985,16 +985,16 @@ public class CommonConfig {
         pipeExtractorAssignerDisruptorRingBufferEntrySize);
   }
 
-  public long getPipeExtractorMatcherCacheSize() {
-    return pipeExtractorMatcherCacheSize;
+  public long getPipeSourceMatcherCacheSize() {
+    return pipeSourceMatcherCacheSize;
   }
 
-  public void setPipeExtractorMatcherCacheSize(long 
pipeExtractorMatcherCacheSize) {
-    if (this.pipeExtractorMatcherCacheSize == pipeExtractorMatcherCacheSize) {
+  public void setPipeSourceMatcherCacheSize(long pipeSourceMatcherCacheSize) {
+    if (this.pipeSourceMatcherCacheSize == pipeSourceMatcherCacheSize) {
       return;
     }
-    this.pipeExtractorMatcherCacheSize = pipeExtractorMatcherCacheSize;
-    logger.info("pipeExtractorMatcherCacheSize is set to {}.", 
pipeExtractorMatcherCacheSize);
+    this.pipeSourceMatcherCacheSize = pipeSourceMatcherCacheSize;
+    logger.info("pipeExtractorMatcherCacheSize is set to {}.", 
pipeSourceMatcherCacheSize);
   }
 
   public int getPipeConnectorHandshakeTimeoutMs() {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 00423730877..1dcc849f86c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -159,8 +159,8 @@ public class PipeConfig {
     return 
COMMON_CONFIG.getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes();
   }
 
-  public long getPipeExtractorMatcherCacheSize() {
-    return COMMON_CONFIG.getPipeExtractorMatcherCacheSize();
+  public long getPipeSourceMatcherCacheSize() {
+    return COMMON_CONFIG.getPipeSourceMatcherCacheSize();
   }
 
   /////////////////////////////// Connector ///////////////////////////////
@@ -478,7 +478,7 @@ public class PipeConfig {
     LOGGER.info(
         "PipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes: {}",
         getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes());
-    LOGGER.info("PipeExtractorMatcherCacheSize: {}", 
getPipeExtractorMatcherCacheSize());
+    LOGGER.info("PipeSourceMatcherCacheSize: {}", 
getPipeSourceMatcherCacheSize());
 
     LOGGER.info("PipeConnectorHandshakeTimeoutMs: {}", 
getPipeConnectorHandshakeTimeoutMs());
     LOGGER.info("PipeConnectorTransferTimeoutMs: {}", 
getPipeConnectorTransferTimeoutMs());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 640652345a1..a60ac57da02 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -322,13 +322,13 @@ public class PipeDescriptor {
                             config
                                 
.getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes())))));
 
-    config.setPipeExtractorMatcherCacheSize(
+    config.setPipeSourceMatcherCacheSize(
         Integer.parseInt(
-            
Optional.ofNullable(properties.getProperty("pipe_extractor_matcher_cache_size"))
+            
Optional.ofNullable(properties.getProperty("pipe_source_matcher_cache_size"))
                 .orElse(
                     properties.getProperty(
                         "pipe_extractor_matcher_cache_size",
-                        
String.valueOf(config.getPipeExtractorMatcherCacheSize())))));
+                        
String.valueOf(config.getPipeSourceMatcherCacheSize())))));
 
     config.setPipeConnectorHandshakeTimeoutMs(
         Long.parseLong(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBPipePattern.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBPipePattern.java
index e9129ec23e1..9ed5837e625 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBPipePattern.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBPipePattern.java
@@ -37,6 +37,8 @@ import java.util.stream.Collectors;
 public class IoTDBPipePattern extends PipePattern {
 
   private final PartialPath patternPartialPath;
+  private static volatile DevicePathGetter devicePathGetter = PartialPath::new;
+  private static volatile MeasurementPathGetter measurementPathGetter = 
PartialPath::new;
 
   public IoTDBPipePattern(final String pattern) {
     super(pattern);
@@ -99,7 +101,7 @@ public class IoTDBPipePattern extends PipePattern {
     try {
       // Another way is to use patternPath.overlapWith("device.*"),
       // there will be no false positives but time cost may be higher.
-      return patternPartialPath.matchPrefixPath(new PartialPath(device));
+      return 
patternPartialPath.matchPrefixPath(devicePathGetter.apply(device));
     } catch (final IllegalPathException e) {
       return false;
     }
@@ -122,7 +124,7 @@ public class IoTDBPipePattern extends PipePattern {
     }
 
     try {
-      return patternPartialPath.matchFullPath(new PartialPath(device, 
measurement));
+      return 
patternPartialPath.matchFullPath(measurementPathGetter.apply(device, 
measurement));
     } catch (final IllegalPathException e) {
       return false;
     }
@@ -198,8 +200,24 @@ public class IoTDBPipePattern extends PipePattern {
     return PathPatternUtil.hasWildcard(patternPartialPath.getTailNode());
   }
 
+  public static void setDevicePathGetter(final DevicePathGetter 
devicePathGetter) {
+    IoTDBPipePattern.devicePathGetter = devicePathGetter;
+  }
+
+  public static void setMeasurementPathGetter(final MeasurementPathGetter 
measurementPathGetter) {
+    IoTDBPipePattern.measurementPathGetter = measurementPathGetter;
+  }
+
   @Override
   public String toString() {
     return "IoTDBPipePattern" + super.toString();
   }
+
+  public interface DevicePathGetter {
+    PartialPath apply(final String deviceId) throws IllegalPathException;
+  }
+
+  public interface MeasurementPathGetter {
+    PartialPath apply(final String deviceId, final String measurement) throws 
IllegalPathException;
+  }
 }

Reply via email to