This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new f00763dc5cd Pipe: support IoTDB-style pattern (#12085)
f00763dc5cd is described below

commit f00763dc5cd24fb835d827f61fb0b76514c0f7ac
Author: Zikun Ma <[email protected]>
AuthorDate: Tue Mar 12 21:54:18 2024 +0800

    Pipe: support IoTDB-style pattern (#12085)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../pipe/it/autocreate/PipePatternFormatIT.java    | 197 ++++++++
 .../pipe/event/PipeConfigRegionSnapshotEvent.java  |   9 +-
 .../pipe/event/PipeConfigRegionWritePlanEvent.java |   9 +-
 .../db/pipe/event/UserDefinedEnrichedEvent.java    |   9 +-
 .../event/common/heartbeat/PipeHeartbeatEvent.java |   7 +-
 .../schema/PipeSchemaRegionSnapshotEvent.java      |   9 +-
 .../schema/PipeSchemaRegionWritePlanEvent.java     |   9 +-
 .../tablet/PipeInsertNodeTabletInsertionEvent.java |  18 +-
 .../common/tablet/PipeRawTabletInsertionEvent.java |  17 +-
 .../tablet/TabletInsertionDataContainer.java       |  28 +-
 .../common/tsfile/PipeTsFileInsertionEvent.java    |  11 +-
 .../tsfile/TsFileInsertionDataContainer.java       |  21 +-
 .../db/pipe/event/realtime/PipeRealtimeEvent.java  |  11 +-
 .../dataregion/IoTDBDataRegionExtractor.java       |  68 ++-
 .../PipeHistoricalDataRegionTsFileExtractor.java   |  20 +-
 .../realtime/PipeRealtimeDataRegionExtractor.java  |  29 +-
 .../realtime/assigner/PipeDataRegionAssigner.java  |   6 +-
 .../realtime/epoch/TsFileEpochManager.java         |   4 +-
 .../matcher/CachedSchemaPatternMatcher.java        |  67 +--
 .../matcher/PipeDataRegionMatcher.java             |   2 +-
 .../pipe/event/PipeTabletInsertionEventTest.java   |  29 +-
 .../event/TsFileInsertionDataContainerTest.java    | 493 +++++++++++++--------
 .../CachedSchemaPatternMatcherTest.java            |  20 +-
 .../db/pipe/pattern/IoTDBPipePatternTest.java      | 110 +++++
 .../db/pipe/pattern/PrefixPipePatternTest.java     | 105 +++++
 .../config/constant/PipeExtractorConstant.java     |   9 +-
 .../iotdb/commons/pipe/event/EnrichedEvent.java    |  40 +-
 .../commons/pipe/event/PipeSnapshotEvent.java      |   3 +-
 .../commons/pipe/event/PipeWritePlanEvent.java     |   3 +-
 .../commons/pipe/pattern/IoTDBPipePattern.java     | 113 +++++
 .../iotdb/commons/pipe/pattern/PipePattern.java    | 133 ++++++
 .../commons/pipe/pattern/PrefixPipePattern.java    | 121 +++++
 32 files changed, 1341 insertions(+), 389 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/PipePatternFormatIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/PipePatternFormatIT.java
new file mode 100644
index 00000000000..e93b969d532
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/PipePatternFormatIT.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.autocreate;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2.class})
+public class PipePatternFormatIT extends AbstractPipeDualAutoIT {
+  @Test
+  public void testPrefixPattern() throws Exception {
+    DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    String receiverIp = receiverDataNode.getIp();
+    int receiverPort = receiverDataNode.getPort();
+
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+
+      if (!TestUtils.tryExecuteNonQueriesWithRetry(
+          senderEnv,
+          Arrays.asList(
+              "insert into root.db.d1(time, s, s1, t) values (1, 1, 1, 1)",
+              "insert into root.db.d2(time, s) values (1, 1)",
+              "insert into root.db2.d1(time, s) values (1, 1)"))) {
+        return;
+      }
+
+      Map<String, String> extractorAttributes = new HashMap<>();
+      Map<String, String> processorAttributes = new HashMap<>();
+      Map<String, String> connectorAttributes = new HashMap<>();
+
+      extractorAttributes.put("extractor.pattern", "root.db.d1.s");
+
+      connectorAttributes.put("connector", "iotdb-thrift-connector");
+      connectorAttributes.put("connector.batch.enable", "false");
+      connectorAttributes.put("connector.ip", receiverIp);
+      connectorAttributes.put("connector.port", 
Integer.toString(receiverPort));
+
+      TSStatus status =
+          client.createPipe(
+              new TCreatePipeReq("p1", connectorAttributes)
+                  .setExtractorAttributes(extractorAttributes)
+                  .setProcessorAttributes(processorAttributes));
+
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
+
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("p1").getCode());
+
+      Set<String> expectedResSet = new HashSet<>();
+      expectedResSet.add("1,1.0,1.0,");
+      TestUtils.assertDataEventuallyOnEnv(
+          receiverEnv, "select * from root.**", 
"Time,root.db.d1.s,root.db.d1.s1,", expectedResSet);
+    }
+  }
+
+  @Test
+  public void testIotdbPattern() throws Exception {
+    DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    String receiverIp = receiverDataNode.getIp();
+    int receiverPort = receiverDataNode.getPort();
+
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+
+      if (!TestUtils.tryExecuteNonQueriesWithRetry(
+          senderEnv,
+          Arrays.asList(
+              "insert into root.db.d1(time, s, s1, t) values (1, 1, 1, 1)",
+              "insert into root.db.d2(time, s) values (1, 1)",
+              "insert into root.db2.d1(time, s) values (1, 1)"))) {
+        return;
+      }
+
+      Map<String, String> extractorAttributes = new HashMap<>();
+      Map<String, String> processorAttributes = new HashMap<>();
+      Map<String, String> connectorAttributes = new HashMap<>();
+
+      extractorAttributes.put("extractor.path", "root.**.d1.s*");
+      // When path is set, pattern should be ignored
+      extractorAttributes.put("extractor.pattern", "root");
+
+      connectorAttributes.put("connector", "iotdb-thrift-connector");
+      connectorAttributes.put("connector.batch.enable", "false");
+      connectorAttributes.put("connector.ip", receiverIp);
+      connectorAttributes.put("connector.port", 
Integer.toString(receiverPort));
+
+      TSStatus status =
+          client.createPipe(
+              new TCreatePipeReq("p1", connectorAttributes)
+                  .setExtractorAttributes(extractorAttributes)
+                  .setProcessorAttributes(processorAttributes));
+
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
+
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("p1").getCode());
+
+      Set<String> expectedResSet = new HashSet<>();
+      expectedResSet.add("1,1.0,1.0,1.0,");
+      TestUtils.assertDataEventuallyOnEnv(
+          receiverEnv,
+          "select * from root.**",
+          "Time,root.db2.d1.s,root.db.d1.s,root.db.d1.s1,",
+          expectedResSet);
+    }
+  }
+
+  @Test
+  public void testIotdbPatternWithLegacySyntax() throws Exception {
+    DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    String receiverIp = receiverDataNode.getIp();
+    int receiverPort = receiverDataNode.getPort();
+
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+
+      if (!TestUtils.tryExecuteNonQueriesWithRetry(
+          senderEnv,
+          Arrays.asList(
+              "insert into root.db.d1(time, s, s1, t) values (1, 1, 1, 1)",
+              "insert into root.db.d2(time, s) values (1, 1)",
+              "insert into root.db2.d1(time, s) values (1, 1)"))) {
+        return;
+      }
+
+      Map<String, String> extractorAttributes = new HashMap<>();
+      Map<String, String> processorAttributes = new HashMap<>();
+      Map<String, String> connectorAttributes = new HashMap<>();
+
+      extractorAttributes.put("extractor.pattern", "root.**.d1.s*");
+      extractorAttributes.put("extractor.pattern.format", "iotdb");
+
+      connectorAttributes.put("connector", "iotdb-thrift-connector");
+      connectorAttributes.put("connector.batch.enable", "false");
+      connectorAttributes.put("connector.ip", receiverIp);
+      connectorAttributes.put("connector.port", 
Integer.toString(receiverPort));
+
+      TSStatus status =
+          client.createPipe(
+              new TCreatePipeReq("p1", connectorAttributes)
+                  .setExtractorAttributes(extractorAttributes)
+                  .setProcessorAttributes(processorAttributes));
+
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
+
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("p1").getCode());
+
+      Set<String> expectedResSet = new HashSet<>();
+      expectedResSet.add("1,1.0,1.0,1.0,");
+      TestUtils.assertDataEventuallyOnEnv(
+          receiverEnv,
+          "select * from root.**",
+          "Time,root.db2.d1.s,root.db.d1.s,root.db.d1.s1,",
+          expectedResSet);
+    }
+  }
+}
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
index a13008b4caf..48fe334e6df 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionSnapshotEvent.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.confignode.manager.pipe.event;
 
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.event.PipeSnapshotEvent;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import 
org.apache.iotdb.confignode.manager.pipe.resource.snapshot.PipeConfigNodeSnapshotResourceManager;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -39,7 +40,7 @@ public class PipeConfigRegionSnapshotEvent extends 
PipeSnapshotEvent {
   }
 
   public PipeConfigRegionSnapshotEvent(
-      String snapshotPath, String pipeName, PipeTaskMeta pipeTaskMeta, String 
pattern) {
+      String snapshotPath, String pipeName, PipeTaskMeta pipeTaskMeta, 
PipePattern pattern) {
     super(
         snapshotPath,
         pipeName,
@@ -50,7 +51,11 @@ public class PipeConfigRegionSnapshotEvent extends 
PipeSnapshotEvent {
 
   @Override
   public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
-      String pipeName, PipeTaskMeta pipeTaskMeta, String pattern, long 
startTime, long endTime) {
+      String pipeName,
+      PipeTaskMeta pipeTaskMeta,
+      PipePattern pattern,
+      long startTime,
+      long endTime) {
     return new PipeConfigRegionSnapshotEvent(snapshotPath, pipeName, 
pipeTaskMeta, pattern);
   }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionWritePlanEvent.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionWritePlanEvent.java
index 980bb5308c0..d94b58ea892 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionWritePlanEvent.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/event/PipeConfigRegionWritePlanEvent.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.confignode.manager.pipe.event;
 
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.event.PipeWritePlanEvent;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -46,7 +47,7 @@ public class PipeConfigRegionWritePlanEvent extends 
PipeWritePlanEvent {
       ConfigPhysicalPlan configPhysicalPlan,
       String pipeName,
       PipeTaskMeta pipeTaskMeta,
-      String pattern,
+      PipePattern pattern,
       boolean isGeneratedByPipe) {
     super(pipeName, pipeTaskMeta, pattern, isGeneratedByPipe);
     this.configPhysicalPlan = configPhysicalPlan;
@@ -58,7 +59,11 @@ public class PipeConfigRegionWritePlanEvent extends 
PipeWritePlanEvent {
 
   @Override
   public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
-      String pipeName, PipeTaskMeta pipeTaskMeta, String pattern, long 
startTime, long endTime) {
+      String pipeName,
+      PipeTaskMeta pipeTaskMeta,
+      PipePattern pattern,
+      long startTime,
+      long endTime) {
     return new PipeConfigRegionWritePlanEvent(
         configPhysicalPlan, pipeName, pipeTaskMeta, pattern, false);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java
index 2a2eb25721f..ff6e34fffcf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/UserDefinedEnrichedEvent.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.event;
 
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.UserDefinedEvent;
@@ -42,7 +43,7 @@ public class UserDefinedEnrichedEvent extends EnrichedEvent {
     super(
         enrichedEvent.getPipeName(),
         enrichedEvent.getPipeTaskMeta(),
-        enrichedEvent.getPattern(),
+        enrichedEvent.getPipePattern(),
         enrichedEvent.getStartTime(),
         enrichedEvent.getEndTime());
     this.userDefinedEvent = userDefinedEvent;
@@ -70,7 +71,11 @@ public class UserDefinedEnrichedEvent extends EnrichedEvent {
 
   @Override
   public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
-      String pipeName, PipeTaskMeta pipeTaskMeta, String pattern, long 
startTime, long endTime) {
+      String pipeName,
+      PipeTaskMeta pipeTaskMeta,
+      PipePattern pattern,
+      long startTime,
+      long endTime) {
     return enrichedEvent.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
         pipeName, pipeTaskMeta, pattern, startTime, endTime);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
index a5c0e267488..c675ff5542c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.event.common.heartbeat;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
 import 
org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue;
 import 
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
@@ -107,7 +108,11 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
 
   @Override
   public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
-      String pipeName, PipeTaskMeta pipeTaskMeta, String pattern, long 
startTime, long endTime) {
+      String pipeName,
+      PipeTaskMeta pipeTaskMeta,
+      PipePattern pattern,
+      long startTime,
+      long endTime) {
     // Should record PipeTaskMeta, for sometimes HeartbeatEvents should report 
exceptions.
     // Here we ignore parameters `pattern`, `startTime`, and `endTime`.
     return new PipeHeartbeatEvent(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
index b4a8b05b2e5..dc66d4f006f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionSnapshotEvent.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.event.common.schema;
 
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.event.PipeSnapshotEvent;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -38,13 +39,17 @@ public class PipeSchemaRegionSnapshotEvent extends 
PipeSnapshotEvent {
   }
 
   public PipeSchemaRegionSnapshotEvent(
-      String snapshotPath, String pipeName, PipeTaskMeta pipeTaskMeta, String 
pattern) {
+      String snapshotPath, String pipeName, PipeTaskMeta pipeTaskMeta, 
PipePattern pattern) {
     super(snapshotPath, pipeName, pipeTaskMeta, pattern, 
PipeResourceManager.snapshot());
   }
 
   @Override
   public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
-      String pipeName, PipeTaskMeta pipeTaskMeta, String pattern, long 
startTime, long endTime) {
+      String pipeName,
+      PipeTaskMeta pipeTaskMeta,
+      PipePattern pattern,
+      long startTime,
+      long endTime) {
     return new PipeSchemaRegionSnapshotEvent(snapshotPath, pipeName, 
pipeTaskMeta, pattern);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionWritePlanEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionWritePlanEvent.java
index dacfddfdda9..96892c7004b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionWritePlanEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/schema/PipeSchemaRegionWritePlanEvent.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.event.common.schema;
 
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.event.PipeWritePlanEvent;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
@@ -45,7 +46,7 @@ public class PipeSchemaRegionWritePlanEvent extends 
PipeWritePlanEvent {
       PlanNode planNode,
       String pipeName,
       PipeTaskMeta pipeTaskMeta,
-      String pattern,
+      PipePattern pattern,
       boolean isGeneratedByPipe) {
     super(pipeName, pipeTaskMeta, pattern, isGeneratedByPipe);
     this.planNode = planNode;
@@ -57,7 +58,11 @@ public class PipeSchemaRegionWritePlanEvent extends 
PipeWritePlanEvent {
 
   @Override
   public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
-      String pipeName, PipeTaskMeta pipeTaskMeta, String pattern, long 
startTime, long endTime) {
+      String pipeName,
+      PipeTaskMeta pipeTaskMeta,
+      PipePattern pattern,
+      long startTime,
+      long endTime) {
     return new PipeSchemaRegionWritePlanEvent(
         planNode, pipeName, pipeTaskMeta, pattern, isGeneratedByPipe);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 3a09e865b09..19368be2412 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.event.common.tablet;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
@@ -80,7 +81,7 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
       boolean isGeneratedByPipe,
       String pipeName,
       PipeTaskMeta pipeTaskMeta,
-      String pattern,
+      PipePattern pattern,
       long startTime,
       long endTime) {
     super(pipeName, pipeTaskMeta, pattern, startTime, endTime);
@@ -147,7 +148,11 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
 
   @Override
   public PipeInsertNodeTabletInsertionEvent 
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
-      String pipeName, PipeTaskMeta pipeTaskMeta, String pattern, long 
startTime, long endTime) {
+      String pipeName,
+      PipeTaskMeta pipeTaskMeta,
+      PipePattern pattern,
+      long startTime,
+      long endTime) {
     return new PipeInsertNodeTabletInsertionEvent(
         walEntryHandler,
         progressIndex,
@@ -201,7 +206,7 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
     try {
       if (dataContainer == null) {
         dataContainer =
-            new TabletInsertionDataContainer(pipeTaskMeta, this, 
getInsertNode(), getPattern());
+            new TabletInsertionDataContainer(pipeTaskMeta, this, 
getInsertNode(), pipePattern);
       }
       return dataContainer.processRowByRow(consumer);
     } catch (Exception e) {
@@ -214,7 +219,7 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
     try {
       if (dataContainer == null) {
         dataContainer =
-            new TabletInsertionDataContainer(pipeTaskMeta, this, 
getInsertNode(), getPattern());
+            new TabletInsertionDataContainer(pipeTaskMeta, this, 
getInsertNode(), pipePattern);
       }
       return dataContainer.processTablet(consumer);
     } catch (Exception e) {
@@ -232,7 +237,7 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
     try {
       if (dataContainer == null) {
         dataContainer =
-            new TabletInsertionDataContainer(pipeTaskMeta, this, 
getInsertNode(), getPattern());
+            new TabletInsertionDataContainer(pipeTaskMeta, this, 
getInsertNode(), pipePattern);
       }
       return dataContainer.convertToTablet();
     } catch (Exception e) {
@@ -246,7 +251,8 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
   public boolean shouldParsePattern() {
     final InsertNode node = getInsertNodeViaCacheIfPossible();
     return super.shouldParsePattern()
-        && (Objects.isNull(node) || 
!node.getDevicePath().getFullPath().startsWith(pattern));
+        && Objects.nonNull(pipePattern)
+        && (Objects.isNull(node) || 
!pipePattern.coversDevice(node.getDevicePath().getFullPath()));
   }
 
   public PipeRawTabletInsertionEvent parseEventWithPatternOrTime() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 02386f1e375..beb6c3ea9e9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.event.common.tablet;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
@@ -53,7 +54,7 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent implements Tablet
       boolean needToReport,
       String pipeName,
       PipeTaskMeta pipeTaskMeta,
-      String pattern,
+      PipePattern pattern,
       long startTime,
       long endTime) {
     super(pipeName, pipeTaskMeta, pattern, startTime, endTime);
@@ -88,7 +89,7 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent implements Tablet
   }
 
   @TestOnly
-  public PipeRawTabletInsertionEvent(Tablet tablet, boolean isAligned, String 
pattern) {
+  public PipeRawTabletInsertionEvent(Tablet tablet, boolean isAligned, 
PipePattern pattern) {
     this(tablet, isAligned, null, false, null, null, pattern, Long.MIN_VALUE, 
Long.MAX_VALUE);
   }
 
@@ -127,7 +128,11 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent implements Tablet
 
   @Override
   public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
-      String pipeName, PipeTaskMeta pipeTaskMeta, String pattern, long 
startTime, long endTime) {
+      String pipeName,
+      PipeTaskMeta pipeTaskMeta,
+      PipePattern pattern,
+      long startTime,
+      long endTime) {
     return new PipeRawTabletInsertionEvent(
         tablet,
         isAligned,
@@ -169,7 +174,7 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent implements Tablet
   public Iterable<TabletInsertionEvent> processRowByRow(BiConsumer<Row, 
RowCollector> consumer) {
     if (dataContainer == null) {
       dataContainer =
-          new TabletInsertionDataContainer(pipeTaskMeta, this, tablet, 
isAligned, getPattern());
+          new TabletInsertionDataContainer(pipeTaskMeta, this, tablet, 
isAligned, pipePattern);
     }
     return dataContainer.processRowByRow(consumer);
   }
@@ -178,7 +183,7 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent implements Tablet
   public Iterable<TabletInsertionEvent> processTablet(BiConsumer<Tablet, 
RowCollector> consumer) {
     if (dataContainer == null) {
       dataContainer =
-          new TabletInsertionDataContainer(pipeTaskMeta, this, tablet, 
isAligned, getPattern());
+          new TabletInsertionDataContainer(pipeTaskMeta, this, tablet, 
isAligned, pipePattern);
     }
     return dataContainer.processTablet(consumer);
   }
@@ -197,7 +202,7 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent implements Tablet
     // if notNullPattern is not "root", we need to convert the tablet
     if (dataContainer == null) {
       dataContainer =
-          new TabletInsertionDataContainer(pipeTaskMeta, this, tablet, 
isAligned, getPattern());
+          new TabletInsertionDataContainer(pipeTaskMeta, this, tablet, 
isAligned, pipePattern);
     }
     return dataContainer.convertToTablet();
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
index 9c57245cd1a..158b8b35401 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.event.common.tablet;
 
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.pipe.event.common.row.PipeRow;
@@ -30,7 +31,6 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTablet
 import org.apache.iotdb.pipe.api.access.Row;
 import org.apache.iotdb.pipe.api.collector.RowCollector;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
-import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
@@ -86,7 +86,10 @@ public class TabletInsertionDataContainer {
   }
 
   public TabletInsertionDataContainer(
-      PipeTaskMeta pipeTaskMeta, EnrichedEvent sourceEvent, InsertNode 
insertNode, String pattern) {
+      PipeTaskMeta pipeTaskMeta,
+      EnrichedEvent sourceEvent,
+      InsertNode insertNode,
+      PipePattern pattern) {
     this.pipeTaskMeta = pipeTaskMeta;
     this.sourceEvent = sourceEvent;
 
@@ -105,7 +108,7 @@ public class TabletInsertionDataContainer {
       EnrichedEvent sourceEvent,
       Tablet tablet,
       boolean isAligned,
-      String pattern) {
+      PipePattern pattern) {
     this.pipeTaskMeta = pipeTaskMeta;
     this.sourceEvent = sourceEvent;
 
@@ -113,7 +116,7 @@ public class TabletInsertionDataContainer {
   }
 
   @TestOnly
-  public TabletInsertionDataContainer(InsertNode insertNode, String pattern) {
+  public TabletInsertionDataContainer(InsertNode insertNode, PipePattern 
pattern) {
     this(null, null, insertNode, pattern);
   }
 
@@ -123,7 +126,7 @@ public class TabletInsertionDataContainer {
 
   //////////////////////////// parse ////////////////////////////
 
-  private void parse(InsertRowNode insertRowNode, String pattern) {
+  private void parse(InsertRowNode insertRowNode, PipePattern pattern) {
     final int originColumnSize = insertRowNode.getMeasurements().length;
     final Integer[] originColumnIndex2FilteredColumnIndexMapperList = new 
Integer[originColumnSize];
 
@@ -190,7 +193,7 @@ public class TabletInsertionDataContainer {
     }
   }
 
-  private void parse(InsertTabletNode insertTabletNode, String pattern) {
+  private void parse(InsertTabletNode insertTabletNode, PipePattern pattern) {
     final int originColumnSize = insertTabletNode.getMeasurements().length;
     final Integer[] originColumnIndex2FilteredColumnIndexMapperList = new 
Integer[originColumnSize];
 
@@ -273,7 +276,7 @@ public class TabletInsertionDataContainer {
     }
   }
 
-  private void parse(Tablet tablet, boolean isAligned, String pattern) {
+  private void parse(Tablet tablet, boolean isAligned, PipePattern pattern) {
     final int originColumnSize = tablet.getSchemas().size();
     final Integer[] originColumnIndex2FilteredColumnIndexMapperList = new 
Integer[originColumnSize];
 
@@ -366,13 +369,13 @@ public class TabletInsertionDataContainer {
 
   private void generateColumnIndexMapper(
       String[] originMeasurementList,
-      String pattern,
+      PipePattern pattern,
       Integer[] originColumnIndex2FilteredColumnIndexMapperList) {
     final int originColumnSize = originMeasurementList.length;
 
     // case 1: for example, pattern is root.a.b or pattern is null and device 
is root.a.b.c
     // in this case, all data can be matched without checking the measurements
-    if (pattern == null || pattern.length() <= deviceId.length() && 
deviceId.startsWith(pattern)) {
+    if (Objects.isNull(pattern) || pattern.isRoot() || 
pattern.coversDevice(deviceId)) {
       for (int i = 0; i < originColumnSize; i++) {
         originColumnIndex2FilteredColumnIndexMapperList[i] = i;
       }
@@ -380,7 +383,7 @@ public class TabletInsertionDataContainer {
 
     // case 2: for example, pattern is root.a.b.c and device is root.a.b
     // in this case, we need to check the full path
-    else if (pattern.length() > deviceId.length() && 
pattern.startsWith(deviceId)) {
+    else if (pattern.mayOverlapWithDevice(deviceId)) {
       int filteredCount = 0;
 
       for (int i = 0; i < originColumnSize; i++) {
@@ -391,10 +394,7 @@ public class TabletInsertionDataContainer {
           continue;
         }
 
-        // low cost check comes first
-        if (pattern.length() == deviceId.length() + measurement.length() + 1
-            // high cost check comes later
-            && pattern.endsWith(TsFileConstant.PATH_SEPARATOR + measurement)) {
+        if (pattern.matchesMeasurement(deviceId, measurement)) {
           originColumnIndex2FilteredColumnIndexMapperList[i] = filteredCount++;
         }
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index dfa229bf108..110414a64ba 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.event.common.tsfile;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
 import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
@@ -63,7 +64,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
       boolean isGeneratedByPipe,
       String pipeName,
       PipeTaskMeta pipeTaskMeta,
-      String pattern,
+      PipePattern pattern,
       long startTime,
       long endTime) {
     super(pipeName, pipeTaskMeta, pattern, startTime, endTime);
@@ -168,7 +169,11 @@ public class PipeTsFileInsertionEvent extends 
EnrichedEvent implements TsFileIns
 
   @Override
   public PipeTsFileInsertionEvent 
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
-      String pipeName, PipeTaskMeta pipeTaskMeta, String pattern, long 
startTime, long endTime) {
+      String pipeName,
+      PipeTaskMeta pipeTaskMeta,
+      PipePattern pattern,
+      long startTime,
+      long endTime) {
     return new PipeTsFileInsertionEvent(
         resource, isLoaded, isGeneratedByPipe, pipeName, pipeTaskMeta, 
pattern, startTime, endTime);
   }
@@ -221,7 +226,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
         waitForTsFileClose();
         dataContainer =
             new TsFileInsertionDataContainer(
-                tsFile, getPattern(), startTime, endTime, pipeTaskMeta, this);
+                tsFile, pipePattern, startTime, endTime, pipeTaskMeta, this);
       }
       return dataContainer;
     } catch (InterruptedException e) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
index e91d9c4b264..0bb2b8be2be 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.event.common.tsfile;
 
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
@@ -29,7 +30,6 @@ import 
org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeighUtil;
 import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeException;
-import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TsFileDeviceIterator;
 import org.apache.iotdb.tsfile.read.TsFileReader;
@@ -52,12 +52,13 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.Objects;
 
 public class TsFileInsertionDataContainer implements AutoCloseable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TsFileInsertionDataContainer.class);
 
-  private final String pattern; // used to filter data
+  private final PipePattern pattern; // used to filter data
   private final IExpression timeFilterExpression; // used to filter data
 
   private final PipeTaskMeta pipeTaskMeta; // used to report progress
@@ -74,14 +75,14 @@ public class TsFileInsertionDataContainer implements 
AutoCloseable {
 
   private boolean shouldParsePattern = false;
 
-  public TsFileInsertionDataContainer(File tsFile, String pattern, long 
startTime, long endTime)
-      throws IOException {
+  public TsFileInsertionDataContainer(
+      File tsFile, PipePattern pattern, long startTime, long endTime) throws 
IOException {
     this(tsFile, pattern, startTime, endTime, null, null);
   }
 
   public TsFileInsertionDataContainer(
       File tsFile,
-      String pattern,
+      PipePattern pattern,
       long startTime,
       long endTime,
       PipeTaskMeta pipeTaskMeta,
@@ -145,8 +146,7 @@ public class TsFileInsertionDataContainer implements 
AutoCloseable {
 
       // case 1: for example, pattern is root.a.b or pattern is null and 
device is root.a.b.c
       // in this case, all data can be matched without checking the 
measurements
-      if (pattern == null
-          || pattern.length() <= deviceId.length() && 
deviceId.startsWith(pattern)) {
+      if (Objects.isNull(pattern) || pattern.isRoot() || 
pattern.coversDevice(deviceId)) {
         if (!entry.getValue().isEmpty()) {
           filteredDeviceMeasurementsMap.put(deviceId, entry.getValue());
         }
@@ -154,14 +154,11 @@ public class TsFileInsertionDataContainer implements 
AutoCloseable {
 
       // case 2: for example, pattern is root.a.b.c and device is root.a.b
       // in this case, we need to check the full path
-      else if (pattern.length() > deviceId.length() && 
pattern.startsWith(deviceId)) {
+      else if (pattern.mayOverlapWithDevice(deviceId)) {
         final List<String> filteredMeasurements = new ArrayList<>();
 
         for (final String measurement : entry.getValue()) {
-          // low cost check comes first
-          if (pattern.length() == deviceId.length() + measurement.length() + 1
-              // high cost check comes later
-              && pattern.endsWith(TsFileConstant.PATH_SEPARATOR + 
measurement)) {
+          if (pattern.matchesMeasurement(deviceId, measurement)) {
             filteredMeasurements.add(measurement);
           } else {
             // Parse pattern iff there are measurements filtered out
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
index e693f6dae4b..f291afdefb9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.event.realtime;
 
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.epoch.TsFileEpoch;
 
@@ -42,7 +43,7 @@ public class PipeRealtimeEvent extends EnrichedEvent {
       EnrichedEvent event,
       TsFileEpoch tsFileEpoch,
       Map<String, String[]> device2Measurements,
-      String pattern) {
+      PipePattern pattern) {
     this(event, tsFileEpoch, device2Measurements, null, pattern, 
Long.MIN_VALUE, Long.MAX_VALUE);
   }
 
@@ -51,7 +52,7 @@ public class PipeRealtimeEvent extends EnrichedEvent {
       TsFileEpoch tsFileEpoch,
       Map<String, String[]> device2Measurements,
       PipeTaskMeta pipeTaskMeta,
-      String pattern,
+      PipePattern pattern,
       long startTime,
       long endTime) {
     // pipeTaskMeta is used to report the progress of the event, the 
PipeRealtimeEvent
@@ -137,7 +138,11 @@ public class PipeRealtimeEvent extends EnrichedEvent {
 
   @Override
   public PipeRealtimeEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
-      String pipeName, PipeTaskMeta pipeTaskMeta, String pattern, long 
startTime, long endTime) {
+      String pipeName,
+      PipeTaskMeta pipeTaskMeta,
+      PipePattern pattern,
+      long startTime,
+      long endTime) {
     return new PipeRealtimeEvent(
         event.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
             pipeName, pipeTaskMeta, pattern, startTime, endTime),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
index e4a3edb4ef6..68023a73cdf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
@@ -20,9 +20,8 @@
 package org.apache.iotdb.db.pipe.extractor.dataregion;
 
 import org.apache.iotdb.commons.consensus.DataRegionId;
-import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.pipe.extractor.IoTDBExtractor;
-import org.apache.iotdb.commons.utils.PathUtils;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.historical.PipeHistoricalDataRegionExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.historical.PipeHistoricalDataRegionTsFileExtractor;
@@ -42,7 +41,6 @@ import 
org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.tsfile.utils.Pair;
 
-import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,8 +52,9 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_END_TIME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_START_TIME_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_FORMAT_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE;
@@ -69,7 +68,7 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_END_TIME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_START_TIME_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_PATTERN_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_PATTERN_FORMAT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_MODE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_START_TIME_KEY;
@@ -96,13 +95,25 @@ public class IoTDBDataRegionExtractor extends 
IoTDBExtractor {
     }
     hasNoExtractionNeed = false;
 
+    // Validate extractor.pattern.format is within valid range
+    validator
+        .validateAttributeValueRange(
+            EXTRACTOR_PATTERN_FORMAT_KEY,
+            true,
+            EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE,
+            EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE)
+        .validateAttributeValueRange(
+            SOURCE_PATTERN_FORMAT_KEY,
+            true,
+            EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE,
+            EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE);
+
+    // Get the pattern format to check whether the pattern is legal
+    final PipePattern pattern =
+        
PipePattern.parsePipePatternFromSourceParameters(validator.getParameters());
+
     // Check whether the pattern is legal
-    validatePattern(
-        validator
-            .getParameters()
-            .getStringOrDefault(
-                Arrays.asList(EXTRACTOR_PATTERN_KEY, SOURCE_PATTERN_KEY),
-                EXTRACTOR_PATTERN_DEFAULT_VALUE));
+    validatePattern(pattern);
 
     // Validate extractor.history.enable and extractor.realtime.enable
     validator
@@ -174,36 +185,9 @@ public class IoTDBDataRegionExtractor extends 
IoTDBExtractor {
     realtimeExtractor.validate(validator);
   }
 
-  private void validatePattern(String pattern) {
-    if (!pattern.startsWith("root")) {
-      throw new IllegalArgumentException(
-          "The argument `extractor.pattern` or `source.pattern` is an illegal 
path.");
-    }
-
-    try {
-      PathUtils.isLegalPath(pattern);
-    } catch (IllegalPathException e) {
-      try {
-        if ("root".equals(pattern) || "root.".equals(pattern)) {
-          return;
-        }
-
-        // Split the pattern to nodes.
-        String[] pathNodes = StringUtils.splitPreserveAllTokens(pattern, 
"\\.");
-
-        // Check whether the pattern without last node is legal.
-        PathUtils.splitPathToDetachedNodes(
-            String.join(".", Arrays.copyOfRange(pathNodes, 0, pathNodes.length 
- 1)));
-        String lastNode = pathNodes[pathNodes.length - 1];
-
-        // Check whether the last node is legal.
-        if (!"".equals(lastNode)) {
-          Double.parseDouble(lastNode);
-        }
-      } catch (Exception ignored) {
-        throw new IllegalArgumentException(
-            "The argument `extractor.pattern` or `source.pattern` is an 
illegal path.");
-      }
+  private void validatePattern(PipePattern pattern) {
+    if (!pattern.isLegal()) {
+      throw new IllegalArgumentException(String.format("Pattern \"%s\" is 
illegal.", pattern));
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 9133b38caca..f5efe198ecf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
 import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.pipe.extractor.dataregion.DataRegionListeningFilter;
@@ -60,14 +61,11 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_END_TIME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_START_TIME_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_END_TIME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_END_TIME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_LOOSE_RANGE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_START_TIME_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_PATTERN_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_START_TIME_KEY;
 
 public class PipeHistoricalDataRegionTsFileExtractor implements 
PipeHistoricalDataRegionExtractor {
@@ -84,7 +82,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
 
   private int dataRegionId;
 
-  private String pattern;
+  private PipePattern pipePattern;
   private boolean isDbNameCoveredByPattern = false;
 
   private boolean isHistoricalExtractorEnabled = false;
@@ -197,18 +195,14 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
       DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.putIfAbsent(dataRegionId, 0L);
     }
 
-    pattern =
-        parameters.getStringOrDefault(
-            Arrays.asList(EXTRACTOR_PATTERN_KEY, SOURCE_PATTERN_KEY),
-            EXTRACTOR_PATTERN_DEFAULT_VALUE);
+    pipePattern = PipePattern.parsePipePatternFromSourceParameters(parameters);
+
     final DataRegion dataRegion =
         StorageEngine.getInstance().getDataRegion(new 
DataRegionId(environment.getRegionId()));
     if (Objects.nonNull(dataRegion)) {
       final String databaseName = dataRegion.getDatabaseName();
-      if (Objects.nonNull(databaseName)
-          && pattern.length() <= databaseName.length()
-          && databaseName.startsWith(pattern)) {
-        isDbNameCoveredByPattern = true;
+      if (Objects.nonNull(databaseName)) {
+        isDbNameCoveredByPattern = pipePattern.coversDb(databaseName);
       }
     }
 
@@ -458,7 +452,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
             false,
             pipeName,
             pipeTaskMeta,
-            pattern,
+            pipePattern,
             historicalDataExtractionStartTime,
             historicalDataExtractionEndTime);
     if (isDbNameCoveredByPattern) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
index f564f737539..85ac5dd2f54 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
 import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
 import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
 import 
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.utils.TimePartitionUtils;
@@ -56,9 +57,7 @@ import java.util.Objects;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_END_TIME_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_PATTERN_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_START_TIME_KEY;
 
 public abstract class PipeRealtimeDataRegionExtractor implements PipeExtractor 
{
@@ -73,7 +72,7 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
   protected boolean shouldExtractInsertion;
   protected boolean shouldExtractDeletion;
 
-  protected String pattern;
+  protected PipePattern pipePattern;
   private boolean isDbNameCoveredByPattern = false;
 
   protected long realtimeDataExtractionStartTime = Long.MIN_VALUE; // Event 
time
@@ -153,18 +152,14 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
     long creationTime = environment.getCreationTime();
     taskID = pipeName + "_" + dataRegionId + "_" + creationTime;
 
-    pattern =
-        parameters.getStringOrDefault(
-            Arrays.asList(EXTRACTOR_PATTERN_KEY, SOURCE_PATTERN_KEY),
-            PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE);
+    pipePattern = PipePattern.parsePipePatternFromSourceParameters(parameters);
+
     final DataRegion dataRegion =
         StorageEngine.getInstance().getDataRegion(new 
DataRegionId(environment.getRegionId()));
     if (dataRegion != null) {
       final String databaseName = dataRegion.getDatabaseName();
-      if (databaseName != null
-          && pattern.length() <= databaseName.length()
-          && databaseName.startsWith(pattern)) {
-        isDbNameCoveredByPattern = true;
+      if (databaseName != null) {
+        isDbNameCoveredByPattern = pipePattern.coversDb(databaseName);
       }
     }
 
@@ -365,8 +360,12 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
     return shouldExtractDeletion;
   }
 
-  public final String getPattern() {
-    return pattern;
+  public final String getPatternString() {
+    return pipePattern != null ? pipePattern.getPattern() : null;
+  }
+
+  public final PipePattern getPipePattern() {
+    return pipePattern;
   }
 
   public final long getRealtimeDataExtractionStartTime() {
@@ -403,8 +402,8 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
   @Override
   public String toString() {
     return "PipeRealtimeDataRegionExtractor{"
-        + "pattern='"
-        + pattern
+        + "pipePattern='"
+        + pipePattern
         + '\''
         + ", dataRegionId='"
         + dataRegionId
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index d12f265fa26..729546b0286 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -23,9 +23,9 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionExtractor;
-import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.matcher.CachedSchemaPatternMatcher;
-import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.matcher.PipeDataRegionMatcher;
 import org.apache.iotdb.db.pipe.metric.PipeAssignerMetrics;
+import org.apache.iotdb.db.pipe.pattern.matcher.CachedSchemaPatternMatcher;
+import org.apache.iotdb.db.pipe.pattern.matcher.PipeDataRegionMatcher;
 
 import java.io.Closeable;
 
@@ -76,7 +76,7 @@ public class PipeDataRegionAssigner implements Closeable {
                   event.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
                       extractor.getPipeName(),
                       extractor.getPipeTaskMeta(),
-                      extractor.getPattern(),
+                      extractor.getPipePattern(),
                       extractor.getRealtimeDataExtractionStartTime(),
                       extractor.getRealtimeDataExtractionEndTime());
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpochManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpochManager.java
index 4eba880cd36..9c841ceccb9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpochManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpochManager.java
@@ -64,7 +64,7 @@ public class TsFileEpochManager {
         epoch,
         resource.getDevices().stream()
             .collect(Collectors.toMap(device -> device, device -> 
EMPTY_MEASUREMENT_ARRAY)),
-        event.getPattern());
+        event.getPipePattern());
   }
 
   public PipeRealtimeEvent bindPipeInsertNodeTabletInsertionEvent(
@@ -76,6 +76,6 @@ public class TsFileEpochManager {
         event,
         epoch,
         Collections.singletonMap(node.getDevicePath().getFullPath(), 
node.getMeasurements()),
-        event.getPattern());
+        event.getPipePattern());
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/pattern/matcher/CachedSchemaPatternMatcher.java
similarity index 66%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/pattern/matcher/CachedSchemaPatternMatcher.java
index 7643c6340f0..470c7ba110f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/pattern/matcher/CachedSchemaPatternMatcher.java
@@ -17,14 +17,14 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.extractor.dataregion.realtime.matcher;
+package org.apache.iotdb.db.pipe.pattern.matcher;
 
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import 
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionExtractor;
-import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -40,12 +41,12 @@ import java.util.stream.Collectors;
 
 public class CachedSchemaPatternMatcher implements PipeDataRegionMatcher {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(CachedSchemaPatternMatcher.class);
+  protected static final Logger LOGGER = 
LoggerFactory.getLogger(CachedSchemaPatternMatcher.class);
 
-  private final ReentrantReadWriteLock lock;
+  protected final ReentrantReadWriteLock lock;
 
-  private final Set<PipeRealtimeDataRegionExtractor> extractors;
-  private final Cache<String, Set<PipeRealtimeDataRegionExtractor>> 
deviceToExtractorsCache;
+  protected final Set<PipeRealtimeDataRegionExtractor> extractors;
+  protected final Cache<String, Set<PipeRealtimeDataRegionExtractor>> 
deviceToExtractorsCache;
 
   public CachedSchemaPatternMatcher() {
     this.lock = new ReentrantReadWriteLock();
@@ -130,47 +131,32 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
         if (measurements.length == 0) {
           // `measurements` is empty (only in case of tsfile event). match all 
extractors.
           //
-          // case 1: for example, pattern is root.a.b, device is root.a.b.c, 
measurement can be any.
+          // case 1: the pattern can match all measurements of the device.
           // in this case, the extractor can be matched without checking the 
measurements.
           //
-          // case 2: for example, pattern is root.a.b.c, device is root.a.b.
-          // in this situation, the extractor can not be matched in some 
cases, but we can not know
-          // all the measurements of the device in an efficient way, so we 
ASSUME that the extractor
-          // can be matched. this is a trade-off between efficiency and 
accuracy. for most user's
-          // usage, this is acceptable, which may result in some unnecessary 
data processing and
-          // transmission, but will not result in data loss.
+          // case 2: the pattern may match some measurements of the device.
+          // in this case, we can't get all measurements efficiently here,
+          // so we just ASSUME the extractor matches and do more checks later.
           matchedExtractors.addAll(extractorsFilteredByDevice);
         } else {
-          // `measurements` is not empty (only in case of tablet event). match 
extractors by
-          // measurements.
+          // `measurements` is not empty (only in case of tablet event).
+          // Match extractors by measurements.
           extractorsFilteredByDevice.forEach(
               extractor -> {
-                final String pattern = extractor.getPattern();
-
-                // case 1: for example, pattern is root.a.b and device is 
root.a.b.c
-                // in this case, the extractor can be matched without checking 
the measurements
-                if (pattern.length() <= device.length()) {
+                final PipePattern pattern = extractor.getPipePattern();
+                if (Objects.isNull(pattern) || pattern.isRoot() || 
pattern.coversDevice(device)) {
+                  // The pattern can match all measurements of the device.
                   matchedExtractors.add(extractor);
-                }
-                // case 2: for example, pattern is root.a.b.c and device is 
root.a.b
-                // in this case, we need to check the full path
-                else {
+                } else {
                   for (final String measurement : measurements) {
-                    // ignore null measurement for partial insert
+                    // Ignore null measurement for partial insert
                     if (measurement == null) {
                       continue;
                     }
 
-                    // for example, pattern is root.a.b.c, device is root.a.b 
and measurement is c
-                    // in this case, the extractor can be matched. other cases 
are not matched.
-                    // please note that there should be a . between device and 
measurement.
-                    if (
-                    // low cost check comes first
-                    pattern.length() == device.length() + measurement.length() 
+ 1
-                        // high cost check comes later
-                        && pattern.endsWith(TsFileConstant.PATH_SEPARATOR + 
measurement)) {
+                    if (pattern.matchesMeasurement(device, measurement)) {
                       matchedExtractors.add(extractor);
-                      // there would be no more matched extractors because the 
measurements are
+                      // There would be no more matched extractors because the 
measurements are
                       // unique
                       break;
                     }
@@ -190,7 +176,7 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
     return matchedExtractors;
   }
 
-  private Set<PipeRealtimeDataRegionExtractor> filterExtractorsByDevice(String 
device) {
+  protected Set<PipeRealtimeDataRegionExtractor> 
filterExtractorsByDevice(String device) {
     final Set<PipeRealtimeDataRegionExtractor> filteredExtractors = new 
HashSet<>();
 
     for (PipeRealtimeDataRegionExtractor extractor : extractors) {
@@ -199,15 +185,8 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
         continue;
       }
 
-      final String pattern = extractor.getPattern();
-      if (
-      // for example, pattern is root.a.b and device is root.a.b.c
-      // in this case, the extractor can be matched without checking the 
measurements
-      (pattern.length() <= device.length() && device.startsWith(pattern))
-          // for example, pattern is root.a.b.c and device is root.a.b
-          // in this case, the extractor can be selected as candidate, but the 
measurements should
-          // be checked further
-          || (pattern.length() > device.length() && 
pattern.startsWith(device))) {
+      final PipePattern pipePattern = extractor.getPipePattern();
+      if (Objects.isNull(pipePattern) || 
pipePattern.mayOverlapWithDevice(device)) {
         filteredExtractors.add(extractor);
       }
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/PipeDataRegionMatcher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/pattern/matcher/PipeDataRegionMatcher.java
similarity index 96%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/PipeDataRegionMatcher.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/pattern/matcher/PipeDataRegionMatcher.java
index 4e102a1f7cf..4be05cc3a81 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/PipeDataRegionMatcher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/pattern/matcher/PipeDataRegionMatcher.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.extractor.dataregion.realtime.matcher;
+package org.apache.iotdb.db.pipe.pattern.matcher;
 
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionExtractor;
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
index 6a29a403739..07b1e5d6541 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.event;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.pipe.pattern.PrefixPipePattern;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.TabletInsertionDataContainer;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
@@ -224,26 +225,28 @@ public class PipeTabletInsertionEventTest {
   @Test
   public void convertToTabletForTest() {
     TabletInsertionDataContainer container1 =
-        new TabletInsertionDataContainer(insertRowNode, pattern);
+        new TabletInsertionDataContainer(insertRowNode, new 
PrefixPipePattern(pattern));
     Tablet tablet1 = container1.convertToTablet();
     boolean isAligned1 = container1.isAligned();
     Assert.assertEquals(tablet1, tabletForInsertRowNode);
     Assert.assertFalse(isAligned1);
 
     TabletInsertionDataContainer container2 =
-        new TabletInsertionDataContainer(insertTabletNode, pattern);
+        new TabletInsertionDataContainer(insertTabletNode, new 
PrefixPipePattern(pattern));
     Tablet tablet2 = container2.convertToTablet();
     boolean isAligned2 = container2.isAligned();
     Assert.assertEquals(tablet2, tabletForInsertTabletNode);
     Assert.assertFalse(isAligned2);
 
-    PipeRawTabletInsertionEvent event3 = new 
PipeRawTabletInsertionEvent(tablet1, false, pattern);
+    PipeRawTabletInsertionEvent event3 =
+        new PipeRawTabletInsertionEvent(tablet1, false, new 
PrefixPipePattern(pattern));
     Tablet tablet3 = event3.convertToTablet();
     boolean isAligned3 = event3.isAligned();
     Assert.assertEquals(tablet1, tablet3);
     Assert.assertFalse(isAligned3);
 
-    PipeRawTabletInsertionEvent event4 = new 
PipeRawTabletInsertionEvent(tablet2, false, pattern);
+    PipeRawTabletInsertionEvent event4 =
+        new PipeRawTabletInsertionEvent(tablet2, false, new 
PrefixPipePattern(pattern));
     Tablet tablet4 = event4.convertToTablet();
     boolean isAligned4 = event4.isAligned();
     Assert.assertEquals(tablet2, tablet4);
@@ -253,26 +256,28 @@ public class PipeTabletInsertionEventTest {
   @Test
   public void convertToAlignedTabletForTest() {
     TabletInsertionDataContainer container1 =
-        new TabletInsertionDataContainer(insertRowNodeAligned, pattern);
+        new TabletInsertionDataContainer(insertRowNodeAligned, new 
PrefixPipePattern(pattern));
     Tablet tablet1 = container1.convertToTablet();
     boolean isAligned1 = container1.isAligned();
     Assert.assertEquals(tablet1, tabletForInsertRowNode);
     Assert.assertTrue(isAligned1);
 
     TabletInsertionDataContainer container2 =
-        new TabletInsertionDataContainer(insertTabletNodeAligned, pattern);
+        new TabletInsertionDataContainer(insertTabletNodeAligned, new 
PrefixPipePattern(pattern));
     Tablet tablet2 = container2.convertToTablet();
     boolean isAligned2 = container2.isAligned();
     Assert.assertEquals(tablet2, tabletForInsertTabletNode);
     Assert.assertTrue(isAligned2);
 
-    PipeRawTabletInsertionEvent event3 = new 
PipeRawTabletInsertionEvent(tablet1, true, pattern);
+    PipeRawTabletInsertionEvent event3 =
+        new PipeRawTabletInsertionEvent(tablet1, true, new 
PrefixPipePattern(pattern));
     Tablet tablet3 = event3.convertToTablet();
     boolean isAligned3 = event3.isAligned();
     Assert.assertEquals(tablet1, tablet3);
     Assert.assertTrue(isAligned3);
 
-    PipeRawTabletInsertionEvent event4 = new 
PipeRawTabletInsertionEvent(tablet2, true, pattern);
+    PipeRawTabletInsertionEvent event4 =
+        new PipeRawTabletInsertionEvent(tablet2, true, new 
PrefixPipePattern(pattern));
     Tablet tablet4 = event4.convertToTablet();
     boolean isAligned4 = event4.isAligned();
     Assert.assertEquals(tablet2, tablet4);
@@ -286,7 +291,7 @@ public class PipeTabletInsertionEventTest {
             null,
             new PipeRawTabletInsertionEvent(tabletForInsertRowNode, 111L, 
113L),
             insertRowNode,
-            pattern);
+            new PrefixPipePattern(pattern));
     Tablet tablet1 = container1.convertToTablet();
     Assert.assertEquals(0, tablet1.rowSize);
     boolean isAligned1 = container1.isAligned();
@@ -297,7 +302,7 @@ public class PipeTabletInsertionEventTest {
             null,
             new PipeRawTabletInsertionEvent(tabletForInsertRowNode, 110L, 
110L),
             insertRowNode,
-            pattern);
+            new PrefixPipePattern(pattern));
     Tablet tablet2 = container2.convertToTablet();
     Assert.assertEquals(1, tablet2.rowSize);
     boolean isAligned2 = container2.isAligned();
@@ -308,7 +313,7 @@ public class PipeTabletInsertionEventTest {
             null,
             new PipeRawTabletInsertionEvent(tabletForInsertTabletNode, 111L, 
113L),
             insertTabletNode,
-            pattern);
+            new PrefixPipePattern(pattern));
     Tablet tablet3 = container3.convertToTablet();
     Assert.assertEquals(3, tablet3.rowSize);
     boolean isAligned3 = container3.isAligned();
@@ -319,7 +324,7 @@ public class PipeTabletInsertionEventTest {
             null,
             new PipeRawTabletInsertionEvent(tabletForInsertTabletNode, 
Long.MIN_VALUE, 109L),
             insertTabletNode,
-            pattern);
+            new PrefixPipePattern(pattern));
     Tablet tablet4 = container4.convertToTablet();
     Assert.assertEquals(0, tablet4.rowSize);
     boolean isAligned4 = container4.isAligned();
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
index da2a7a00667..b26286c4180 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
@@ -19,6 +19,9 @@
 
 package org.apache.iotdb.db.pipe.event;
 
+import org.apache.iotdb.commons.pipe.pattern.IoTDBPipePattern;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
+import org.apache.iotdb.commons.pipe.pattern.PrefixPipePattern;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.TsFileInsertionDataContainer;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
@@ -48,6 +51,9 @@ public class TsFileInsertionDataContainerTest {
 
   private static final long TSFILE_START_TIME = 300L;
 
+  private static final String PREFIX_FORMAT = "prefix";
+  private static final String IOTDB_FORMAT = "iotdb";
+
   private File alignedTsFile;
   private File nonalignedTsFile;
 
@@ -71,6 +77,10 @@ public class TsFileInsertionDataContainerTest {
     measurementNumbers.add(1);
     measurementNumbers.add(2);
 
+    Set<String> patternFormats = new HashSet<>();
+    patternFormats.add(PREFIX_FORMAT);
+    patternFormats.add(IOTDB_FORMAT);
+
     Set<Pair<Long, Long>> startEndTimes = new HashSet<>();
     startEndTimes.add(new Pair<>(100L, TSFILE_START_TIME - 1));
     startEndTimes.add(new Pair<>(100L, TSFILE_START_TIME));
@@ -91,44 +101,126 @@ public class TsFileInsertionDataContainerTest {
 
     for (int deviceNumber : deviceNumbers) {
       for (int measurementNumber : measurementNumbers) {
-        for (Pair<Long, Long> startEndTime : startEndTimes) {
-          testToTabletInsertionEvents(
-              deviceNumber, measurementNumber, 0, startEndTime.left, 
startEndTime.right);
-          testToTabletInsertionEvents(
-              deviceNumber, measurementNumber, 1, startEndTime.left, 
startEndTime.right);
-          testToTabletInsertionEvents(
-              deviceNumber, measurementNumber, 2, startEndTime.left, 
startEndTime.right);
-
-          testToTabletInsertionEvents(
-              deviceNumber, measurementNumber, 999, startEndTime.left, 
startEndTime.right);
-          testToTabletInsertionEvents(
-              deviceNumber, measurementNumber, 1000, startEndTime.left, 
startEndTime.right);
-          testToTabletInsertionEvents(
-              deviceNumber, measurementNumber, 1001, startEndTime.left, 
startEndTime.right);
-
-          testToTabletInsertionEvents(
-              deviceNumber, measurementNumber, 999 * 2 + 1, startEndTime.left, 
startEndTime.right);
-          testToTabletInsertionEvents(
-              deviceNumber, measurementNumber, 1000, startEndTime.left, 
startEndTime.right);
-          testToTabletInsertionEvents(
-              deviceNumber, measurementNumber, 1001 * 2 - 1, 
startEndTime.left, startEndTime.right);
-
-          testToTabletInsertionEvents(
-              deviceNumber, measurementNumber, 1023, startEndTime.left, 
startEndTime.right);
-          testToTabletInsertionEvents(
-              deviceNumber, measurementNumber, 1024, startEndTime.left, 
startEndTime.right);
-          testToTabletInsertionEvents(
-              deviceNumber, measurementNumber, 1025, startEndTime.left, 
startEndTime.right);
-
-          testToTabletInsertionEvents(
-              deviceNumber, measurementNumber, 1023 * 2 + 1, 
startEndTime.left, startEndTime.right);
-          testToTabletInsertionEvents(
-              deviceNumber, measurementNumber, 1024 * 2, startEndTime.left, 
startEndTime.right);
-          testToTabletInsertionEvents(
-              deviceNumber, measurementNumber, 1025 * 2 - 1, 
startEndTime.left, startEndTime.right);
-
-          testToTabletInsertionEvents(
-              deviceNumber, measurementNumber, 10001, startEndTime.left, 
startEndTime.right);
+        for (String patternFormat : patternFormats) {
+          for (Pair<Long, Long> startEndTime : startEndTimes) {
+            testToTabletInsertionEvents(
+                deviceNumber,
+                measurementNumber,
+                0,
+                patternFormat,
+                startEndTime.left,
+                startEndTime.right);
+            testToTabletInsertionEvents(
+                deviceNumber,
+                measurementNumber,
+                1,
+                patternFormat,
+                startEndTime.left,
+                startEndTime.right);
+            testToTabletInsertionEvents(
+                deviceNumber,
+                measurementNumber,
+                2,
+                patternFormat,
+                startEndTime.left,
+                startEndTime.right);
+
+            testToTabletInsertionEvents(
+                deviceNumber,
+                measurementNumber,
+                999,
+                patternFormat,
+                startEndTime.left,
+                startEndTime.right);
+            testToTabletInsertionEvents(
+                deviceNumber,
+                measurementNumber,
+                1000,
+                patternFormat,
+                startEndTime.left,
+                startEndTime.right);
+            testToTabletInsertionEvents(
+                deviceNumber,
+                measurementNumber,
+                1001,
+                patternFormat,
+                startEndTime.left,
+                startEndTime.right);
+
+            testToTabletInsertionEvents(
+                deviceNumber,
+                measurementNumber,
+                999 * 2 + 1,
+                patternFormat,
+                startEndTime.left,
+                startEndTime.right);
+            testToTabletInsertionEvents(
+                deviceNumber,
+                measurementNumber,
+                1000,
+                patternFormat,
+                startEndTime.left,
+                startEndTime.right);
+            testToTabletInsertionEvents(
+                deviceNumber,
+                measurementNumber,
+                1001 * 2 - 1,
+                patternFormat,
+                startEndTime.left,
+                startEndTime.right);
+
+            testToTabletInsertionEvents(
+                deviceNumber,
+                measurementNumber,
+                1023,
+                patternFormat,
+                startEndTime.left,
+                startEndTime.right);
+            testToTabletInsertionEvents(
+                deviceNumber,
+                measurementNumber,
+                1024,
+                patternFormat,
+                startEndTime.left,
+                startEndTime.right);
+            testToTabletInsertionEvents(
+                deviceNumber,
+                measurementNumber,
+                1025,
+                patternFormat,
+                startEndTime.left,
+                startEndTime.right);
+
+            testToTabletInsertionEvents(
+                deviceNumber,
+                measurementNumber,
+                1023 * 2 + 1,
+                patternFormat,
+                startEndTime.left,
+                startEndTime.right);
+            testToTabletInsertionEvents(
+                deviceNumber,
+                measurementNumber,
+                1024 * 2,
+                patternFormat,
+                startEndTime.left,
+                startEndTime.right);
+            testToTabletInsertionEvents(
+                deviceNumber,
+                measurementNumber,
+                1025 * 2 - 1,
+                patternFormat,
+                startEndTime.left,
+                startEndTime.right);
+
+            testToTabletInsertionEvents(
+                deviceNumber,
+                measurementNumber,
+                10001,
+                patternFormat,
+                startEndTime.left,
+                startEndTime.right);
+          }
         }
       }
     }
@@ -138,14 +230,16 @@ public class TsFileInsertionDataContainerTest {
       int deviceNumber,
       int measurementNumber,
       int rowNumberInOneDevice,
+      String patternFormat,
       long startTime,
       long endTime)
       throws Exception {
     LOGGER.info(
-        "testToTabletInsertionEvents: deviceNumber: {}, measurementNumber: {}, 
rowNumberInOneDevice: {}, startTime: {}, endTime: {}",
+        "testToTabletInsertionEvents: deviceNumber: {}, measurementNumber: {}, 
rowNumberInOneDevice: {}, patternFormat: {}, startTime: {}, endTime: {}",
         deviceNumber,
         measurementNumber,
         rowNumberInOneDevice,
+        patternFormat,
         startTime,
         endTime);
 
@@ -191,10 +285,21 @@ public class TsFileInsertionDataContainerTest {
       }
     }
 
+    final PipePattern rootPattern;
+    switch (patternFormat) {
+      case PREFIX_FORMAT:
+        rootPattern = new PrefixPipePattern("root");
+        break;
+      case IOTDB_FORMAT:
+      default:
+        rootPattern = new IoTDBPipePattern("root.**");
+        break;
+    }
+
     try (final TsFileInsertionDataContainer alignedContainer =
-            new TsFileInsertionDataContainer(alignedTsFile, "root", startTime, 
endTime);
+            new TsFileInsertionDataContainer(alignedTsFile, rootPattern, 
startTime, endTime);
         final TsFileInsertionDataContainer nonalignedContainer =
-            new TsFileInsertionDataContainer(nonalignedTsFile, "root", 
startTime, endTime)) {
+            new TsFileInsertionDataContainer(nonalignedTsFile, rootPattern, 
startTime, endTime)) {
       AtomicInteger count1 = new AtomicInteger(0);
       AtomicInteger count2 = new AtomicInteger(0);
       AtomicInteger count3 = new AtomicInteger(0);
@@ -230,20 +335,19 @@ public class TsFileInsertionDataContainerTest {
                                   .forEach(
                                       tabletInsertionEvent2 ->
                                           tabletInsertionEvent2.processTablet(
-                                              (tablet, rowCollector) -> {
-                                                new 
PipeRawTabletInsertionEvent(tablet, false)
-                                                    .processRowByRow(
-                                                        (row, collector) -> {
-                                                          try {
-                                                            
rowCollector.collectRow(row);
-                                                            
Assert.assertEquals(
-                                                                
measurementNumber, row.size());
-                                                            
count3.incrementAndGet();
-                                                          } catch (IOException 
e) {
-                                                            throw new 
RuntimeException(e);
-                                                          }
-                                                        });
-                                              }))));
+                                              (tablet, rowCollector) ->
+                                                  new 
PipeRawTabletInsertionEvent(tablet, false)
+                                                      .processRowByRow(
+                                                          (row, collector) -> {
+                                                            try {
+                                                              
rowCollector.collectRow(row);
+                                                              
Assert.assertEquals(
+                                                                  
measurementNumber, row.size());
+                                                              
count3.incrementAndGet();
+                                                            } catch 
(IOException e) {
+                                                              throw new 
RuntimeException(e);
+                                                            }
+                                                          })))));
 
       Assert.assertEquals(count1.getAndSet(0), deviceNumber * 
expectedRowNumber);
       Assert.assertEquals(count2.getAndSet(0), deviceNumber * 
expectedRowNumber);
@@ -255,19 +359,18 @@ public class TsFileInsertionDataContainerTest {
               event ->
                   event
                       .processTablet(
-                          (tablet, rowCollector) -> {
-                            new PipeRawTabletInsertionEvent(tablet, false)
-                                .processRowByRow(
-                                    (row, collector) -> {
-                                      try {
-                                        rowCollector.collectRow(row);
-                                        Assert.assertEquals(measurementNumber, 
row.size());
-                                        count1.incrementAndGet();
-                                      } catch (IOException e) {
-                                        throw new RuntimeException(e);
-                                      }
-                                    });
-                          })
+                          (tablet, rowCollector) ->
+                              new PipeRawTabletInsertionEvent(tablet, false)
+                                  .processRowByRow(
+                                      (row, collector) -> {
+                                        try {
+                                          rowCollector.collectRow(row);
+                                          
Assert.assertEquals(measurementNumber, row.size());
+                                          count1.incrementAndGet();
+                                        } catch (IOException e) {
+                                          throw new RuntimeException(e);
+                                        }
+                                      }))
                       .forEach(
                           tabletInsertionEvent1 ->
                               tabletInsertionEvent1
@@ -295,9 +398,9 @@ public class TsFileInsertionDataContainerTest {
                                                 }
                                               }))));
 
-      Assert.assertEquals(count1.get(), deviceNumber * expectedRowNumber);
-      Assert.assertEquals(count2.get(), deviceNumber * expectedRowNumber);
-      Assert.assertEquals(count3.get(), deviceNumber * expectedRowNumber);
+      Assert.assertEquals(deviceNumber * expectedRowNumber, count1.get());
+      Assert.assertEquals(deviceNumber * expectedRowNumber, count2.get());
+      Assert.assertEquals(deviceNumber * expectedRowNumber, count3.get());
     } catch (Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
@@ -338,12 +441,26 @@ public class TsFileInsertionDataContainerTest {
                           }));
     }
 
+    final PipePattern oneAlignedDevicePattern;
+    final PipePattern oneNonAlignedDevicePattern;
+    switch (patternFormat) {
+      case PREFIX_FORMAT:
+        oneAlignedDevicePattern = new 
PrefixPipePattern(oneDeviceInAlignedTsFile.get());
+        oneNonAlignedDevicePattern = new 
PrefixPipePattern(oneDeviceInUnalignedTsFile.get());
+        break;
+      case IOTDB_FORMAT:
+      default:
+        oneAlignedDevicePattern = new 
IoTDBPipePattern(oneDeviceInAlignedTsFile.get() + ".**");
+        oneNonAlignedDevicePattern = new 
IoTDBPipePattern(oneDeviceInUnalignedTsFile.get() + ".**");
+        break;
+    }
+
     try (final TsFileInsertionDataContainer alignedContainer =
             new TsFileInsertionDataContainer(
-                alignedTsFile, oneDeviceInAlignedTsFile.get(), startTime, 
endTime);
+                alignedTsFile, oneAlignedDevicePattern, startTime, endTime);
         final TsFileInsertionDataContainer nonalignedContainer =
             new TsFileInsertionDataContainer(
-                nonalignedTsFile, oneDeviceInUnalignedTsFile.get(), startTime, 
endTime); ) {
+                nonalignedTsFile, oneNonAlignedDevicePattern, startTime, 
endTime)) {
       AtomicInteger count1 = new AtomicInteger(0);
       AtomicInteger count2 = new AtomicInteger(0);
       AtomicInteger count3 = new AtomicInteger(0);
@@ -379,24 +496,23 @@ public class TsFileInsertionDataContainerTest {
                                   .forEach(
                                       tabletInsertionEvent2 ->
                                           tabletInsertionEvent2.processTablet(
-                                              (tablet, rowCollector) -> {
-                                                new 
PipeRawTabletInsertionEvent(tablet, false)
-                                                    .processRowByRow(
-                                                        (row, collector) -> {
-                                                          try {
-                                                            
rowCollector.collectRow(row);
-                                                            
Assert.assertEquals(
-                                                                
measurementNumber, row.size());
-                                                            
count3.incrementAndGet();
-                                                          } catch (IOException 
e) {
-                                                            throw new 
RuntimeException(e);
-                                                          }
-                                                        });
-                                              }))));
-
-      Assert.assertEquals(count1.getAndSet(0), expectedRowNumber);
-      Assert.assertEquals(count2.getAndSet(0), expectedRowNumber);
-      Assert.assertEquals(count3.getAndSet(0), expectedRowNumber);
+                                              (tablet, rowCollector) ->
+                                                  new 
PipeRawTabletInsertionEvent(tablet, false)
+                                                      .processRowByRow(
+                                                          (row, collector) -> {
+                                                            try {
+                                                              
rowCollector.collectRow(row);
+                                                              
Assert.assertEquals(
+                                                                  
measurementNumber, row.size());
+                                                              
count3.incrementAndGet();
+                                                            } catch 
(IOException e) {
+                                                              throw new 
RuntimeException(e);
+                                                            }
+                                                          })))));
+
+      Assert.assertEquals(expectedRowNumber, count1.getAndSet(0));
+      Assert.assertEquals(expectedRowNumber, count2.getAndSet(0));
+      Assert.assertEquals(expectedRowNumber, count3.getAndSet(0));
 
       nonalignedContainer
           .toTabletInsertionEvents()
@@ -404,19 +520,18 @@ public class TsFileInsertionDataContainerTest {
               event ->
                   event
                       .processTablet(
-                          (tablet, rowCollector) -> {
-                            new PipeRawTabletInsertionEvent(tablet, false)
-                                .processRowByRow(
-                                    (row, collector) -> {
-                                      try {
-                                        rowCollector.collectRow(row);
-                                        Assert.assertEquals(measurementNumber, 
row.size());
-                                        count1.incrementAndGet();
-                                      } catch (IOException e) {
-                                        throw new RuntimeException(e);
-                                      }
-                                    });
-                          })
+                          (tablet, rowCollector) ->
+                              new PipeRawTabletInsertionEvent(tablet, false)
+                                  .processRowByRow(
+                                      (row, collector) -> {
+                                        try {
+                                          rowCollector.collectRow(row);
+                                          
Assert.assertEquals(measurementNumber, row.size());
+                                          count1.incrementAndGet();
+                                        } catch (IOException e) {
+                                          throw new RuntimeException(e);
+                                        }
+                                      }))
                       .forEach(
                           tabletInsertionEvent1 ->
                               tabletInsertionEvent1
@@ -444,20 +559,36 @@ public class TsFileInsertionDataContainerTest {
                                                 }
                                               }))));
 
-      Assert.assertEquals(count1.get(), expectedRowNumber);
-      Assert.assertEquals(count2.get(), expectedRowNumber);
-      Assert.assertEquals(count3.get(), expectedRowNumber);
+      Assert.assertEquals(expectedRowNumber, count1.get());
+      Assert.assertEquals(expectedRowNumber, count2.get());
+      Assert.assertEquals(expectedRowNumber, count3.get());
     } catch (Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
     }
 
+    final PipePattern oneAlignedMeasurementPattern;
+    final PipePattern oneNonAlignedMeasurementPattern;
+    switch (patternFormat) {
+      case PREFIX_FORMAT:
+        oneAlignedMeasurementPattern = new 
PrefixPipePattern(oneMeasurementInAlignedTsFile.get());
+        oneNonAlignedMeasurementPattern =
+            new PrefixPipePattern(oneMeasurementInUnalignedTsFile.get());
+        break;
+      case IOTDB_FORMAT:
+      default:
+        oneAlignedMeasurementPattern = new 
IoTDBPipePattern(oneMeasurementInAlignedTsFile.get());
+        oneNonAlignedMeasurementPattern =
+            new IoTDBPipePattern(oneMeasurementInUnalignedTsFile.get());
+        break;
+    }
+
     try (final TsFileInsertionDataContainer alignedContainer =
             new TsFileInsertionDataContainer(
-                alignedTsFile, oneMeasurementInAlignedTsFile.get(), startTime, 
endTime);
+                alignedTsFile, oneAlignedMeasurementPattern, startTime, 
endTime);
         final TsFileInsertionDataContainer nonalignedContainer =
             new TsFileInsertionDataContainer(
-                nonalignedTsFile, oneMeasurementInUnalignedTsFile.get(), 
startTime, endTime); ) {
+                nonalignedTsFile, oneNonAlignedMeasurementPattern, startTime, 
endTime)) {
       AtomicInteger count1 = new AtomicInteger(0);
       AtomicInteger count2 = new AtomicInteger(0);
       AtomicInteger count3 = new AtomicInteger(0);
@@ -493,23 +624,22 @@ public class TsFileInsertionDataContainerTest {
                                   .forEach(
                                       tabletInsertionEvent2 ->
                                           tabletInsertionEvent2.processTablet(
-                                              (tablet, rowCollector) -> {
-                                                new 
PipeRawTabletInsertionEvent(tablet, false)
-                                                    .processRowByRow(
-                                                        (row, collector) -> {
-                                                          try {
-                                                            
rowCollector.collectRow(row);
-                                                            
Assert.assertEquals(1, row.size());
-                                                            
count3.incrementAndGet();
-                                                          } catch (IOException 
e) {
-                                                            throw new 
RuntimeException(e);
-                                                          }
-                                                        });
-                                              }))));
-
-      Assert.assertEquals(count1.getAndSet(0), expectedRowNumber);
-      Assert.assertEquals(count2.getAndSet(0), expectedRowNumber);
-      Assert.assertEquals(count3.getAndSet(0), expectedRowNumber);
+                                              (tablet, rowCollector) ->
+                                                  new 
PipeRawTabletInsertionEvent(tablet, false)
+                                                      .processRowByRow(
+                                                          (row, collector) -> {
+                                                            try {
+                                                              
rowCollector.collectRow(row);
+                                                              
Assert.assertEquals(1, row.size());
+                                                              
count3.incrementAndGet();
+                                                            } catch 
(IOException e) {
+                                                              throw new 
RuntimeException(e);
+                                                            }
+                                                          })))));
+
+      Assert.assertEquals(expectedRowNumber, count1.getAndSet(0));
+      Assert.assertEquals(expectedRowNumber, count2.getAndSet(0));
+      Assert.assertEquals(expectedRowNumber, count3.getAndSet(0));
 
       nonalignedContainer
           .toTabletInsertionEvents()
@@ -517,19 +647,18 @@ public class TsFileInsertionDataContainerTest {
               event ->
                   event
                       .processTablet(
-                          (tablet, rowCollector) -> {
-                            new PipeRawTabletInsertionEvent(tablet, false)
-                                .processRowByRow(
-                                    (row, collector) -> {
-                                      try {
-                                        rowCollector.collectRow(row);
-                                        Assert.assertEquals(1, row.size());
-                                        count1.incrementAndGet();
-                                      } catch (IOException e) {
-                                        throw new RuntimeException(e);
-                                      }
-                                    });
-                          })
+                          (tablet, rowCollector) ->
+                              new PipeRawTabletInsertionEvent(tablet, false)
+                                  .processRowByRow(
+                                      (row, collector) -> {
+                                        try {
+                                          rowCollector.collectRow(row);
+                                          Assert.assertEquals(1, row.size());
+                                          count1.incrementAndGet();
+                                        } catch (IOException e) {
+                                          throw new RuntimeException(e);
+                                        }
+                                      }))
                       .forEach(
                           tabletInsertionEvent1 ->
                               tabletInsertionEvent1
@@ -556,20 +685,30 @@ public class TsFileInsertionDataContainerTest {
                                                 }
                                               }))));
 
-      Assert.assertEquals(count1.get(), expectedRowNumber);
-      Assert.assertEquals(count2.get(), expectedRowNumber);
-      Assert.assertEquals(count3.get(), expectedRowNumber);
+      Assert.assertEquals(expectedRowNumber, count1.get());
+      Assert.assertEquals(expectedRowNumber, count2.get());
+      Assert.assertEquals(expectedRowNumber, count3.get());
     } catch (Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
     }
 
+    final PipePattern notExistPattern;
+    switch (patternFormat) {
+      case PREFIX_FORMAT:
+        notExistPattern = new PrefixPipePattern("root.`not-exist-pattern`");
+        break;
+      case IOTDB_FORMAT:
+      default:
+        notExistPattern = new IoTDBPipePattern("root.`not-exist-pattern`");
+        break;
+    }
+
     try (final TsFileInsertionDataContainer alignedContainer =
-            new TsFileInsertionDataContainer(
-                alignedTsFile, "not-exist-pattern", startTime, endTime);
+            new TsFileInsertionDataContainer(alignedTsFile, notExistPattern, 
startTime, endTime);
         final TsFileInsertionDataContainer nonalignedContainer =
             new TsFileInsertionDataContainer(
-                nonalignedTsFile, "not-exist-pattern", startTime, endTime); ) {
+                nonalignedTsFile, notExistPattern, startTime, endTime)) {
       AtomicInteger count1 = new AtomicInteger(0);
       AtomicInteger count2 = new AtomicInteger(0);
       AtomicInteger count3 = new AtomicInteger(0);
@@ -605,23 +744,22 @@ public class TsFileInsertionDataContainerTest {
                                   .forEach(
                                       tabletInsertionEvent2 ->
                                           tabletInsertionEvent2.processTablet(
-                                              (tablet, rowCollector) -> {
-                                                new 
PipeRawTabletInsertionEvent(tablet, false)
-                                                    .processRowByRow(
-                                                        (row, collector) -> {
-                                                          try {
-                                                            
rowCollector.collectRow(row);
-                                                            
Assert.assertEquals(0, row.size());
-                                                            
count3.incrementAndGet();
-                                                          } catch (IOException 
e) {
-                                                            throw new 
RuntimeException(e);
-                                                          }
-                                                        });
-                                              }))));
-
-      Assert.assertEquals(count1.getAndSet(0), 0);
-      Assert.assertEquals(count2.getAndSet(0), 0);
-      Assert.assertEquals(count3.getAndSet(0), 0);
+                                              (tablet, rowCollector) ->
+                                                  new 
PipeRawTabletInsertionEvent(tablet, false)
+                                                      .processRowByRow(
+                                                          (row, collector) -> {
+                                                            try {
+                                                              
rowCollector.collectRow(row);
+                                                              
Assert.assertEquals(0, row.size());
+                                                              
count3.incrementAndGet();
+                                                            } catch 
(IOException e) {
+                                                              throw new 
RuntimeException(e);
+                                                            }
+                                                          })))));
+
+      Assert.assertEquals(0, count1.getAndSet(0));
+      Assert.assertEquals(0, count2.getAndSet(0));
+      Assert.assertEquals(0, count3.getAndSet(0));
 
       nonalignedContainer
           .toTabletInsertionEvents()
@@ -629,19 +767,18 @@ public class TsFileInsertionDataContainerTest {
               event ->
                   event
                       .processTablet(
-                          (tablet, rowCollector) -> {
-                            new PipeRawTabletInsertionEvent(tablet, false)
-                                .processRowByRow(
-                                    (row, collector) -> {
-                                      try {
-                                        rowCollector.collectRow(row);
-                                        Assert.assertEquals(0, row.size());
-                                        count1.incrementAndGet();
-                                      } catch (IOException e) {
-                                        throw new RuntimeException(e);
-                                      }
-                                    });
-                          })
+                          (tablet, rowCollector) ->
+                              new PipeRawTabletInsertionEvent(tablet, false)
+                                  .processRowByRow(
+                                      (row, collector) -> {
+                                        try {
+                                          rowCollector.collectRow(row);
+                                          Assert.assertEquals(0, row.size());
+                                          count1.incrementAndGet();
+                                        } catch (IOException e) {
+                                          throw new RuntimeException(e);
+                                        }
+                                      }))
                       .forEach(
                           tabletInsertionEvent1 ->
                               tabletInsertionEvent1
@@ -668,9 +805,9 @@ public class TsFileInsertionDataContainerTest {
                                                 }
                                               }))));
 
-      Assert.assertEquals(count1.get(), 0);
-      Assert.assertEquals(count2.get(), 0);
-      Assert.assertEquals(count3.get(), 0);
+      Assert.assertEquals(0, count1.get());
+      Assert.assertEquals(0, count2.get());
+      Assert.assertEquals(0, count3.get());
     } catch (Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/CachedSchemaPatternMatcherTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
similarity index 92%
rename from 
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/CachedSchemaPatternMatcherTest.java
rename to 
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
index 6f429b9dcd9..609dd9b3f44 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/CachedSchemaPatternMatcherTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
@@ -17,14 +17,15 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.extractor;
+package org.apache.iotdb.db.pipe.pattern;
 
 import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
 import 
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
 import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
+import org.apache.iotdb.commons.pipe.pattern.PrefixPipePattern;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionExtractor;
-import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.matcher.CachedSchemaPatternMatcher;
+import org.apache.iotdb.db.pipe.pattern.matcher.CachedSchemaPatternMatcher;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
@@ -127,12 +128,12 @@ public class CachedSchemaPatternMatcherTest {
       for (int j = 0; j < deviceNum; j++) {
         PipeRealtimeEvent event =
             new PipeRealtimeEvent(
-                null, null, Collections.singletonMap("root." + i, 
measurements), "root");
+                null, null, Collections.singletonMap("root." + i, 
measurements), null);
         long startTime = System.currentTimeMillis();
         matcher.match(event).forEach(extractor -> extractor.extract(event));
         totalTime += (System.currentTimeMillis() - startTime);
       }
-      PipeRealtimeEvent event = new PipeRealtimeEvent(null, null, deviceMap, 
"root");
+      PipeRealtimeEvent event = new PipeRealtimeEvent(null, null, deviceMap, 
null);
       long startTime = System.currentTimeMillis();
       matcher.match(event).forEach(extractor -> extractor.extract(event));
       totalTime += (System.currentTimeMillis() - startTime);
@@ -148,7 +149,9 @@ public class CachedSchemaPatternMatcherTest {
 
   public static class PipeRealtimeDataRegionFakeExtractor extends 
PipeRealtimeDataRegionExtractor {
 
-    public PipeRealtimeDataRegionFakeExtractor() {}
+    public PipeRealtimeDataRegionFakeExtractor() {
+      pipePattern = new PrefixPipePattern(null);
+    }
 
     @Override
     public Event supply() {
@@ -166,10 +169,13 @@ public class CachedSchemaPatternMatcherTest {
                   for (String s : v) {
                     match[0] =
                         match[0]
-                            || (k + TsFileConstant.PATH_SEPARATOR + 
s).startsWith(getPattern());
+                            || (k + TsFileConstant.PATH_SEPARATOR + s)
+                                .startsWith(getPatternString());
                   }
                 } else {
-                  match[0] = match[0] || (getPattern().startsWith(k) || 
k.startsWith(getPattern()));
+                  match[0] =
+                      match[0]
+                          || (getPatternString().startsWith(k) || 
k.startsWith(getPatternString()));
                 }
               });
       Assert.assertTrue(match[0]);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/IoTDBPipePatternTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/IoTDBPipePatternTest.java
new file mode 100644
index 00000000000..5c74f3ea21c
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/IoTDBPipePatternTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.pattern;
+
+import org.apache.iotdb.commons.pipe.pattern.IoTDBPipePattern;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class IoTDBPipePatternTest {
+
+  @Test
+  public void testIotdbPipePattern() {
+    // Test legal and illegal pattern
+    String[] legalPatterns = {
+      "root", "root.db", "root.db.d1.s", "root.db.`1`", "root.*.d.*s.s",
+    };
+    String[] illegalPatterns = {
+      "root.", "roo", "", "root..", "root./",
+    };
+    for (String s : legalPatterns) {
+      Assert.assertTrue(new IoTDBPipePattern(s).isLegal());
+    }
+    for (String t : illegalPatterns) {
+      try {
+        Assert.assertFalse(new IoTDBPipePattern(t).isLegal());
+      } catch (Exception e) {
+        Assert.assertTrue(e instanceof PipeException);
+      }
+    }
+
+    // Test pattern cover db
+    String db = "root.db";
+    String[] patternsCoverDb = {
+      "root.**", "root.db.**", "root.*db*.**",
+    };
+    String[] patternsNotCoverDb = {
+      "root.db", "root.*", "root.*.*", "root.db.*.**", "root.db.d1", 
"root.**.db.**",
+    };
+    for (String s : patternsCoverDb) {
+      Assert.assertTrue(new IoTDBPipePattern(s).coversDb(db));
+    }
+    for (String t : patternsNotCoverDb) {
+      Assert.assertFalse(new IoTDBPipePattern(t).coversDb(db));
+    }
+
+    String device = "root.db.d1";
+
+    // Test pattern cover device
+    String[] patternsCoverDevice = {
+      "root.**", "root.db.**", "root.*.*.*", "root.db.d1.*", 
"root.*db*.*d*.*", "root.**.*1.*",
+    };
+    String[] patternsNotCoverDevice = {
+      "root.*", "root.*.*", "root.db.d1", "root.db.d2.*", "root.**.d2.**",
+    };
+    for (String s : patternsCoverDevice) {
+      Assert.assertTrue(new IoTDBPipePattern(s).coversDevice(device));
+    }
+    for (String t : patternsNotCoverDevice) {
+      Assert.assertFalse(new IoTDBPipePattern(t).coversDevice(device));
+    }
+
+    // Test pattern may overlap with device
+    String[] patternsOverlapWithDevice = {
+      "root.db.**", "root.db.d1", "root.db.d1.*", "root.db.d1.s1", 
"root.**.d2.**", "root.*.d*.**",
+    };
+    String[] patternsNotOverlapWithDevice = {
+      "root.db.d2.**", "root.db2.d1.**", "root.db.db.d1.**",
+    };
+    for (String s : patternsOverlapWithDevice) {
+      Assert.assertTrue(new IoTDBPipePattern(s).mayOverlapWithDevice(device));
+    }
+    for (String t : patternsNotOverlapWithDevice) {
+      Assert.assertFalse(new IoTDBPipePattern(t).mayOverlapWithDevice(device));
+    }
+
+    // Test pattern match measurement
+    String measurement = "s1";
+    String[] patternsMatchMeasurement = {
+      "root.db.d1.s1", "root.db.d1.*",
+    };
+    String[] patternsNotMatchMeasurement = {
+      "root.db.d1", "root.db.d1", "root.db.d1.*.*",
+    };
+    for (String s : patternsMatchMeasurement) {
+      Assert.assertTrue(new IoTDBPipePattern(s).matchesMeasurement(device, 
measurement));
+    }
+    for (String t : patternsNotMatchMeasurement) {
+      Assert.assertFalse(new IoTDBPipePattern(t).matchesMeasurement(device, 
measurement));
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/PrefixPipePatternTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/PrefixPipePatternTest.java
new file mode 100644
index 00000000000..cb327d2dac0
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/PrefixPipePatternTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.pattern;
+
+import org.apache.iotdb.commons.pipe.pattern.PrefixPipePattern;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PrefixPipePatternTest {
+
+  @Test
+  public void testPrefixPipePattern() {
+    // Test legal and illegal pattern
+    String[] legalPatterns = {
+      "root", "root.", "root.db", "root.db.d1.s", "root.db.`1`",
+    };
+    String[] illegalPatterns = {
+      "roo", "", "root..", "root./",
+    };
+    for (String s : legalPatterns) {
+      Assert.assertTrue(new PrefixPipePattern(s).isLegal());
+    }
+    for (String t : illegalPatterns) {
+      Assert.assertFalse(new PrefixPipePattern(t).isLegal());
+    }
+
+    // Test pattern cover db
+    String db = "root.db";
+    String[] patternsCoverDb = {
+      "root", "root.", "root.d", "root.db",
+    };
+    String[] patternsNotCoverDb = {
+      "root.**", "root.db.",
+    };
+    for (String s : patternsCoverDb) {
+      Assert.assertTrue(new PrefixPipePattern(s).coversDb(db));
+    }
+    for (String t : patternsNotCoverDb) {
+      Assert.assertFalse(new PrefixPipePattern(t).coversDb(db));
+    }
+
+    String device = "root.db.d1";
+
+    // Test pattern cover device
+    String[] patternsCoverDevice = {
+      "root", "root.", "root.d", "root.db", "root.db.", "root.db.d", 
"root.db.d1",
+    };
+    String[] patternsNotCoverDevice = {
+      "root.db.d1.", "root.db.d1.s1", "root.**", "root.db.d2",
+    };
+    for (String s : patternsCoverDevice) {
+      Assert.assertTrue(new PrefixPipePattern(s).coversDevice(device));
+    }
+    for (String t : patternsNotCoverDevice) {
+      Assert.assertFalse(new PrefixPipePattern(t).coversDevice(device));
+    }
+
+    // Test pattern may overlap with device
+    String[] patternsOverlapWithDevice = {
+      "root", "root.db.d1", "root.db.d1.", "root.db.d1.s1",
+    };
+    String[] patternsNotOverlapWithDevice = {
+      "root.db.d2", "root.**",
+    };
+    for (String s : patternsOverlapWithDevice) {
+      Assert.assertTrue(new PrefixPipePattern(s).mayOverlapWithDevice(device));
+    }
+    for (String t : patternsNotOverlapWithDevice) {
+      Assert.assertFalse(new 
PrefixPipePattern(t).mayOverlapWithDevice(device));
+    }
+
+    // Test pattern match measurement
+    String measurement = "s1";
+    String[] patternsMatchMeasurement = {
+      "root.db.d1", "root.db.d1.", "root.db.d1.s", "root.db.d1.s1",
+    };
+    String[] patternsNotMatchMeasurement = {
+      "root.db.d1.s11", "root.db.d1.s2",
+    };
+    for (String s : patternsMatchMeasurement) {
+      Assert.assertTrue(new PrefixPipePattern(s).matchesMeasurement(device, 
measurement));
+    }
+    for (String t : patternsNotMatchMeasurement) {
+      Assert.assertFalse(new PrefixPipePattern(t).matchesMeasurement(device, 
measurement));
+    }
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
index c67f9fd4dc2..8c913fa31f5 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
@@ -34,7 +34,14 @@ public class PipeExtractorConstant {
 
   public static final String EXTRACTOR_PATTERN_KEY = "extractor.pattern";
   public static final String SOURCE_PATTERN_KEY = "source.pattern";
-  public static final String EXTRACTOR_PATTERN_DEFAULT_VALUE = "root";
+  public static final String EXTRACTOR_PATH_KEY = "extractor.path";
+  public static final String SOURCE_PATH_KEY = "source.path";
+  public static final String EXTRACTOR_PATTERN_FORMAT_KEY = 
"extractor.pattern.format";
+  public static final String SOURCE_PATTERN_FORMAT_KEY = 
"source.pattern.format";
+  public static final String EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE = "prefix";
+  public static final String EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE = "iotdb";
+  public static final String EXTRACTOR_PATTERN_PREFIX_DEFAULT_VALUE = "root";
+  public static final String EXTRACTOR_PATTERN_IOTDB_DEFAULT_VALUE = "root.**";
 
   public static final String EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY =
       "extractor.forwarding-pipe-requests";
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
index e9e50f958cc..fb9c1444b70 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.commons.pipe.event;
 
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
-import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.progress.committer.PipeEventCommitManager;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -49,7 +49,7 @@ public abstract class EnrichedEvent implements Event {
   public static final long NO_COMMIT_ID = -1;
   protected long commitId = NO_COMMIT_ID;
 
-  protected final String pattern;
+  protected final PipePattern pipePattern;
 
   protected final long startTime;
   protected final long endTime;
@@ -60,14 +60,18 @@ public abstract class EnrichedEvent implements Event {
   protected boolean shouldReportOnCommit = false;
 
   protected EnrichedEvent(
-      String pipeName, PipeTaskMeta pipeTaskMeta, String pattern, long 
startTime, long endTime) {
+      String pipeName,
+      PipeTaskMeta pipeTaskMeta,
+      PipePattern pipePattern,
+      long startTime,
+      long endTime) {
     referenceCount = new AtomicInteger(0);
     this.pipeName = pipeName;
     this.pipeTaskMeta = pipeTaskMeta;
-    this.pattern = pattern;
+    this.pipePattern = pipePattern;
     this.startTime = startTime;
     this.endTime = endTime;
-    isPatternParsed = 
getPattern().equals(PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE);
+    isPatternParsed = this.pipePattern == null || this.pipePattern.isRoot();
     isTimeParsed = Long.MIN_VALUE == startTime && Long.MAX_VALUE == endTime;
   }
 
@@ -189,12 +193,16 @@ public abstract class EnrichedEvent implements Event {
   }
 
   /**
-   * Get the {@link EnrichedEvent#pattern} of this {@link EnrichedEvent}.
+   * Get the pattern string of this {@link EnrichedEvent}.
    *
-   * @return the {@link EnrichedEvent#pattern}
+   * @return the pattern string
    */
-  public final String getPattern() {
-    return pattern == null ? 
PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE : pattern;
+  public final String getPatternString() {
+    return pipePattern != null ? pipePattern.getPattern() : null;
+  }
+
+  public final PipePattern getPipePattern() {
+    return pipePattern;
   }
 
   public final long getStartTime() {
@@ -206,8 +214,8 @@ public abstract class EnrichedEvent implements Event {
   }
 
   /**
-   * If pipe's {@link EnrichedEvent#pattern} is database-level, then no need 
to parse {@link
-   * EnrichedEvent} by {@link EnrichedEvent#pattern} cause pipes are 
data-region-level.
+   * If pipe's pattern is database-level, then no need to parse {@link 
EnrichedEvent} by pattern
+   * cause pipes are data-region-level.
    */
   public void skipParsingPattern() {
     isPatternParsed = true;
@@ -230,7 +238,11 @@ public abstract class EnrichedEvent implements Event {
   }
 
   public abstract EnrichedEvent 
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
-      String pipeName, PipeTaskMeta pipeTaskMeta, String pattern, long 
startTime, long endTime);
+      String pipeName,
+      PipeTaskMeta pipeTaskMeta,
+      PipePattern pattern,
+      long startTime,
+      long endTime);
 
   public PipeTaskMeta getPipeTaskMeta() {
     return pipeTaskMeta;
@@ -278,7 +290,7 @@ public abstract class EnrichedEvent implements Event {
         + "', commitId="
         + commitId
         + ", pattern='"
-        + pattern
+        + pipePattern
         + "', startTime="
         + startTime
         + ", endTime="
@@ -303,7 +315,7 @@ public abstract class EnrichedEvent implements Event {
         + "', commitId="
         + commitId
         + ", pattern='"
-        + pattern
+        + pipePattern
         + "', startTime="
         + startTime
         + ", endTime="
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java
index 96da71b05a0..afc5da84cf5 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeSnapshotEvent.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.commons.pipe.event;
 
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.resource.PipeSnapshotResourceManager;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -44,7 +45,7 @@ public abstract class PipeSnapshotEvent extends EnrichedEvent 
implements Seriali
       String snapshotPath,
       String pipeName,
       PipeTaskMeta pipeTaskMeta,
-      String pattern,
+      PipePattern pattern,
       PipeSnapshotResourceManager resourceManager) {
     super(pipeName, pipeTaskMeta, pattern, Long.MIN_VALUE, Long.MAX_VALUE);
     this.snapshotPath = snapshotPath;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
index c888e97ee6f..e31e9c85efc 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.commons.pipe.event;
 
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 
 import org.slf4j.Logger;
@@ -38,7 +39,7 @@ public abstract class PipeWritePlanEvent extends 
EnrichedEvent implements Serial
   protected ProgressIndex progressIndex;
 
   protected PipeWritePlanEvent(
-      String pipeName, PipeTaskMeta pipeTaskMeta, String pattern, boolean 
isGeneratedByPipe) {
+      String pipeName, PipeTaskMeta pipeTaskMeta, PipePattern pattern, boolean 
isGeneratedByPipe) {
     super(pipeName, pipeTaskMeta, pattern, Long.MIN_VALUE, Long.MAX_VALUE);
     this.isGeneratedByPipe = isGeneratedByPipe;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/IoTDBPipePattern.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/IoTDBPipePattern.java
new file mode 100644
index 00000000000..ec4192adff3
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/IoTDBPipePattern.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.pattern;
+
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
+import org.apache.iotdb.commons.utils.PathUtils;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+import java.util.Objects;
+
+public class IoTDBPipePattern extends PipePattern {
+
+  private final PartialPath patternPartialPath;
+
+  public IoTDBPipePattern(String pattern) {
+    super(pattern);
+
+    try {
+      patternPartialPath = new PartialPath(getPattern());
+    } catch (IllegalPathException e) {
+      throw new PipeException("Illegal IoTDBPipePattern: " + getPattern(), e);
+    }
+  }
+
+  @Override
+  public String getDefaultPattern() {
+    return PipeExtractorConstant.EXTRACTOR_PATTERN_IOTDB_DEFAULT_VALUE;
+  }
+
+  @Override
+  public boolean isLegal() {
+    if (!pattern.startsWith("root")) {
+      return false;
+    }
+
+    try {
+      PathUtils.isLegalPath(pattern);
+      return true;
+    } catch (IllegalPathException e) {
+      return false;
+    }
+  }
+
+  @Override
+  public boolean coversDb(String db) {
+    try {
+      return patternPartialPath.include(
+          new PartialPath(db, IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD));
+    } catch (IllegalPathException e) {
+      return false;
+    }
+  }
+
+  @Override
+  public boolean coversDevice(String device) {
+    try {
+      return patternPartialPath.include(
+          new PartialPath(device, IoTDBConstant.ONE_LEVEL_PATH_WILDCARD));
+    } catch (IllegalPathException e) {
+      return false;
+    }
+  }
+
+  @Override
+  public boolean mayOverlapWithDevice(String device) {
+    try {
+      // Another way is to use patternPath.overlapWith("device.*"),
+      // there will be no false positives but time cost may be higher.
+      return patternPartialPath.matchPrefixPath(new PartialPath(device));
+    } catch (IllegalPathException e) {
+      return false;
+    }
+  }
+
+  @Override
+  public boolean matchesMeasurement(String device, String measurement) {
+    // For aligned timeseries, empty measurement is an alias of the time 
column.
+    if (Objects.isNull(measurement) || measurement.isEmpty()) {
+      return false;
+    }
+
+    try {
+      return patternPartialPath.matchFullPath(new PartialPath(device, 
measurement));
+    } catch (IllegalPathException e) {
+      return false;
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "IoTDBPipePattern" + super.toString();
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/PipePattern.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/PipePattern.java
new file mode 100644
index 00000000000..04be541f625
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/PipePattern.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.pattern;
+
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATH_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_FORMAT_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_PATH_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_PATTERN_FORMAT_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_PATTERN_KEY;
+
+public abstract class PipePattern {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipePattern.class);
+
+  protected final String pattern;
+
+  protected PipePattern(String pattern) {
+    this.pattern = pattern != null ? pattern : getDefaultPattern();
+  }
+
+  public String getPattern() {
+    return pattern;
+  }
+
+  public boolean isRoot() {
+    return Objects.isNull(pattern) || 
this.pattern.equals(this.getDefaultPattern());
+  }
+
+  /**
+   * Interpret from source parameters and get a pipe pattern.
+   *
+   * @return The interpreted {@link PipePattern} which is not null.
+   */
+  public static PipePattern 
parsePipePatternFromSourceParameters(PipeParameters sourceParameters) {
+    final String path = sourceParameters.getStringByKeys(EXTRACTOR_PATH_KEY, 
SOURCE_PATH_KEY);
+
+    // 1. If "source.path" is specified, it will be interpreted as an 
IoTDB-style path,
+    // ignoring the other 2 parameters.
+    if (path != null) {
+      return new IoTDBPipePattern(path);
+    }
+
+    final String pattern =
+        sourceParameters.getStringByKeys(EXTRACTOR_PATTERN_KEY, 
SOURCE_PATTERN_KEY);
+
+    // 2. Otherwise, If "source.pattern" is specified, it will be interpreted
+    // according to "source.pattern.format".
+    if (pattern != null) {
+      final String patternFormat =
+          sourceParameters.getStringByKeys(EXTRACTOR_PATTERN_FORMAT_KEY, 
SOURCE_PATTERN_FORMAT_KEY);
+
+      // If "source.pattern.format" is not specified, use prefix format by 
default.
+      if (patternFormat == null) {
+        return new PrefixPipePattern(pattern);
+      }
+
+      switch (patternFormat.toLowerCase()) {
+        case EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE:
+          return new IoTDBPipePattern(pattern);
+        case EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE:
+          return new PrefixPipePattern(pattern);
+        default:
+          LOGGER.info(
+              "Unknown pattern format: {}, use prefix matching format by 
default.", patternFormat);
+          return new PrefixPipePattern(pattern);
+      }
+    }
+
+    // 3. If neither "source.path" nor "source.pattern" is specified,
+    // this pipe source will match all data.
+    return new PrefixPipePattern(null);
+  }
+
+  public abstract String getDefaultPattern();
+
+  /** Check if this pattern is legal. Different pattern type may have 
different rules. */
+  public abstract boolean isLegal();
+
+  /** Check if this pattern matches all time-series under a database. */
+  public abstract boolean coversDb(String db);
+
+  /** Check if a device's all measurements are covered by this pattern. */
+  public abstract boolean coversDevice(String device);
+
+  /**
+   * Check if a device may have some measurements matched by the pattern.
+   *
+   * <p>NOTE1: this is only called when {@link PipePattern#coversDevice} is 
false.
+   *
+   * <p>NOTE2: this is just a loose check and may have false positives. To 
further check if a
+   * measurement matches the pattern, please use {@link 
PipePattern#matchesMeasurement} after this.
+   */
+  public abstract boolean mayOverlapWithDevice(String device);
+
+  /**
+   * Check if a full path with device and measurement can be matched by 
pattern.
+   *
+   * <p>NOTE: this is only called when {@link 
PipePattern#mayOverlapWithDevice} is true.
+   */
+  public abstract boolean matchesMeasurement(String device, String 
measurement);
+
+  @Override
+  public String toString() {
+    return "{pattern='" + pattern + "'}";
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/PrefixPipePattern.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/PrefixPipePattern.java
new file mode 100644
index 00000000000..1373b015df1
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/pattern/PrefixPipePattern.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.pattern;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
+import org.apache.iotdb.commons.utils.PathUtils;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Arrays;
+
+public class PrefixPipePattern extends PipePattern {
+
+  public PrefixPipePattern(String pattern) {
+    super(pattern);
+  }
+
+  @Override
+  public String getDefaultPattern() {
+    return PipeExtractorConstant.EXTRACTOR_PATTERN_PREFIX_DEFAULT_VALUE;
+  }
+
+  @Override
+  public boolean isLegal() {
+    if (!pattern.startsWith("root")) {
+      return false;
+    }
+
+    try {
+      PathUtils.isLegalPath(pattern);
+    } catch (IllegalPathException e) {
+      try {
+        if ("root".equals(pattern) || "root.".equals(pattern)) {
+          return true;
+        }
+
+        // Split the pattern to nodes.
+        String[] pathNodes = StringUtils.splitPreserveAllTokens(pattern, 
"\\.");
+
+        // Check whether the pattern without last node is legal.
+        PathUtils.splitPathToDetachedNodes(
+            String.join(".", Arrays.copyOfRange(pathNodes, 0, pathNodes.length 
- 1)));
+        String lastNode = pathNodes[pathNodes.length - 1];
+
+        // Check whether the last node is legal.
+        if (!"".equals(lastNode)) {
+          Double.parseDouble(lastNode);
+        }
+      } catch (Exception ignored) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public boolean coversDb(String db) {
+    return pattern.length() <= db.length() && db.startsWith(pattern);
+  }
+
+  @Override
+  public boolean coversDevice(String device) {
+    // for example, pattern is root.a.b and device is root.a.b.c
+    // in this case, the extractor can be matched without checking the 
measurements
+    return pattern.length() <= device.length() && device.startsWith(pattern);
+  }
+
+  @Override
+  public boolean mayOverlapWithDevice(String device) {
+    return (
+        // for example, pattern is root.a.b and device is root.a.b.c
+        // in this case, the extractor can be matched without checking the 
measurements
+        pattern.length() <= device.length() && device.startsWith(pattern))
+        // for example, pattern is root.a.b.c and device is root.a.b
+        // in this case, the extractor can be selected as candidate, but the 
measurements should
+        // be checked further
+        || (pattern.length() > device.length() && pattern.startsWith(device));
+  }
+
+  @Override
+  public boolean matchesMeasurement(String device, String measurement) {
+    // We assume that the device is already matched.
+    if (pattern.length() <= device.length()) {
+      return true;
+    }
+
+    // For example, pattern is "root.a.b.c", device is "root.a.b",
+    // then measurements "c" and "cc" can be matched,
+    // measurements "d" or "dc" can't be matched.
+    String dotAndMeasurement = TsFileConstant.PATH_SEPARATOR + measurement;
+    return
+    // low cost check comes first
+    pattern.length() <= device.length() + dotAndMeasurement.length()
+        // high cost check comes later
+        && dotAndMeasurement.startsWith(pattern.substring(device.length()));
+  }
+
+  @Override
+  public String toString() {
+    return "PrefixPipePattern" + super.toString();
+  }
+}


Reply via email to