This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 938f8149430 Pipe: add inclusion/exclusion pattern keys for pipe source
filtering (#17091)
938f8149430 is described below
commit 938f81494304e14df8a5359e4227e8b11ac412a6
Author: VGalaxies <[email protected]>
AuthorDate: Fri Jan 30 11:24:24 2026 +0800
Pipe: add inclusion/exclusion pattern keys for pipe source filtering
(#17091)
* feat(pipe): add inclusion/exclusion pattern keys for pipe source
BREAKING CHANGE: legacy source.pattern/source.path accept only one rule;
use source.pattern.inclusion for multi-value wildcard patterns.
* review
---
.../auto/basic/IoTDBTreePatternFormatIT.java | 258 +--------------------
.../treemodel/manual/IoTDBPipeInclusionIT.java | 13 +-
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 23 +-
.../source/dataregion/IoTDBDataRegionSource.java | 9 +-
.../db/pipe/pattern/TreePatternPruningTest.java | 59 +++--
.../db/pipe/source/IoTDBDataRegionSourceTest.java | 4 -
.../pipe/config/constant/PipeSourceConstant.java | 2 +
.../pipe/datastructure/pattern/TreePattern.java | 198 ++++++++++++----
8 files changed, 228 insertions(+), 338 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBTreePatternFormatIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBTreePatternFormatIT.java
index b61a13693dc..9079daeab71 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBTreePatternFormatIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBTreePatternFormatIT.java
@@ -31,7 +31,6 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -271,68 +270,9 @@ public class IoTDBTreePatternFormatIT extends
AbstractPipeDualTreeModelAutoIT {
}
@Test
- @Ignore("Disabled: multi/exclusion tree patterns are blocked in this branch")
- public void testMultiplePrefixPatternHistoricalData() throws Exception {
- // Define source attributes
- final Map<String, String> sourceAttributes = new HashMap<>();
- sourceAttributes.put("source.pattern", "root.db.d1.s, root.db2.d1.s");
- sourceAttributes.put("source.inclusion", "data.insert");
- sourceAttributes.put("user", "root");
-
- // Define data to be inserted
- final List<String> insertQueries =
- Arrays.asList(
- "insert into root.db.d1(time, s, s1) values (1, 1, 1)",
- "insert into root.db.d2(time, s) values (2, 2)",
- "insert into root.db2.d1(time, s) values (3, 3)");
-
- // Define expected results on receiver
- final Set<String> expectedResSet = new HashSet<>();
- expectedResSet.add("1,null,1.0,1.0,");
- expectedResSet.add("3,3.0,null,null,");
-
- // Execute the common test logic
- testPipeWithMultiplePatterns(
- sourceAttributes,
- insertQueries,
- true, // isHistorical = true
- "select * from root.db2.**,root.db.**",
- "Time,root.db2.d1.s,root.db.d1.s,root.db.d1.s1,",
- expectedResSet);
- }
-
- @Test
- @Ignore("Disabled: multi/exclusion tree patterns are blocked in this branch")
- public void testMultiplePrefixPatternRealtimeData() throws Exception {
- final Map<String, String> sourceAttributes = new HashMap<>();
- sourceAttributes.put("source.pattern", "root.db.d1.s, root.db2.d1.s");
- sourceAttributes.put("source.inclusion", "data.insert");
- sourceAttributes.put("user", "root");
-
- final List<String> insertQueries =
- Arrays.asList(
- "insert into root.db.d1(time, s, s1) values (1, 1, 1)",
- "insert into root.db.d2(time, s) values (2, 2)",
- "insert into root.db2.d1(time, s) values (3, 3)");
-
- final Set<String> expectedResSet = new HashSet<>();
- expectedResSet.add("1,null,1.0,1.0,");
- expectedResSet.add("3,3.0,null,null,");
-
- testPipeWithMultiplePatterns(
- sourceAttributes,
- insertQueries,
- false, // isHistorical = false
- "select * from root.db2.**,root.db.**",
- "Time,root.db2.d1.s,root.db.d1.s,root.db.d1.s1,",
- expectedResSet);
- }
-
- @Test
- @Ignore("Disabled: multi/exclusion tree patterns are blocked in this branch")
public void testMultipleIoTDBPatternHistoricalData() throws Exception {
final Map<String, String> sourceAttributes = new HashMap<>();
- sourceAttributes.put("source.path", "root.db.**, root.db2.d1.*");
+ sourceAttributes.put("source.pattern.inclusion", "root.db.**,
root.db2.d1.*");
sourceAttributes.put("source.inclusion", "data.insert");
sourceAttributes.put("user", "root");
@@ -358,10 +298,9 @@ public class IoTDBTreePatternFormatIT extends
AbstractPipeDualTreeModelAutoIT {
}
@Test
- @Ignore("Disabled: multi/exclusion tree patterns are blocked in this branch")
public void testMultipleIoTDBPatternRealtimeData() throws Exception {
final Map<String, String> sourceAttributes = new HashMap<>();
- sourceAttributes.put("source.path", "root.db.**, root.db2.d1.*");
+ sourceAttributes.put("source.pattern.inclusion", "root.db.**,
root.db2.d1.*");
sourceAttributes.put("source.inclusion", "data.insert");
sourceAttributes.put("user", "root");
@@ -387,129 +326,12 @@ public class IoTDBTreePatternFormatIT extends
AbstractPipeDualTreeModelAutoIT {
}
@Test
- @Ignore("Disabled: multi/exclusion tree patterns are blocked in this branch")
- public void testMultipleHybridPatternHistoricalData() throws Exception {
- final Map<String, String> sourceAttributes = new HashMap<>();
- sourceAttributes.put("source.path", "root.db.d1.*");
- sourceAttributes.put("source.pattern", "root.db2.d1.s");
- sourceAttributes.put("source.inclusion", "data.insert");
- sourceAttributes.put("user", "root");
-
- final List<String> insertQueries =
- Arrays.asList(
- "insert into root.db.d1(time, s, s1) values (1, 1, 1)",
- "insert into root.db2.d1(time, s) values (2, 2)",
- "insert into root.db3.d1(time, s) values (3, 3)");
-
- final Set<String> expectedResSet = new HashSet<>();
- expectedResSet.add("1,1.0,1.0,null,");
- expectedResSet.add("2,null,null,2.0,");
-
- testPipeWithMultiplePatterns(
- sourceAttributes,
- insertQueries,
- true, // isHistorical = true
- "select * from root.db.**,root.db2.**",
- "Time,root.db.d1.s,root.db.d1.s1,root.db2.d1.s,",
- expectedResSet);
- }
-
- @Test
- @Ignore("Disabled: multi/exclusion tree patterns are blocked in this branch")
- public void testMultipleHybridPatternRealtimeData() throws Exception {
- final Map<String, String> sourceAttributes = new HashMap<>();
- sourceAttributes.put("source.path", "root.db.d1.*");
- sourceAttributes.put("source.pattern", "root.db2.d1.s");
- sourceAttributes.put("source.inclusion", "data.insert");
- sourceAttributes.put("user", "root");
-
- final List<String> insertQueries =
- Arrays.asList(
- "insert into root.db.d1(time, s, s1) values (1, 1, 1)",
- "insert into root.db2.d1(time, s) values (2, 2)",
- "insert into root.db3.d1(time, s) values (3, 3)");
-
- final Set<String> expectedResSet = new HashSet<>();
- expectedResSet.add("1,1.0,1.0,null,");
- expectedResSet.add("2,null,null,2.0,");
-
- testPipeWithMultiplePatterns(
- sourceAttributes,
- insertQueries,
- false, // isHistorical = false
- "select * from root.db.**,root.db2.**",
- "Time,root.db.d1.s,root.db.d1.s1,root.db2.d1.s,",
- expectedResSet);
- }
-
- @Test
- @Ignore("Disabled: multi/exclusion tree patterns are blocked in this branch")
- public void testPrefixPatternWithExclusionHistoricalData() throws Exception {
- final Map<String, String> sourceAttributes = new HashMap<>();
- // Inclusion: Match everything under root.db.d1 and root.db.d2
- sourceAttributes.put("source.pattern", "root.db.d1, root.db.d2");
- // Exclusion: Exclude anything with the prefix root.db.d1.s1
- sourceAttributes.put("source.pattern.exclusion", "root.db.d1.s1");
- sourceAttributes.put("source.inclusion", "data.insert");
- sourceAttributes.put("user", "root");
-
- final List<String> insertQueries =
- Arrays.asList(
- // s matches, s1 is excluded
- "insert into root.db.d1(time, s, s1) values (1, 1, 1)",
- // s matches
- "insert into root.db.d2(time, s) values (2, 2)",
- "insert into root.db1.d1(time, s) values (3, 3)");
-
- final Set<String> expectedResSet = new HashSet<>();
- expectedResSet.add("1,1.0,null,");
- expectedResSet.add("2,null,2.0,");
-
- testPipeWithMultiplePatterns(
- sourceAttributes,
- insertQueries,
- true, // isHistorical = true
- "select * from root.db.**",
- "Time,root.db.d1.s,root.db.d2.s,",
- expectedResSet);
- }
-
- @Test
- @Ignore("Disabled: multi/exclusion tree patterns are blocked in this branch")
- public void testPrefixPatternWithExclusionRealtimeData() throws Exception {
- final Map<String, String> sourceAttributes = new HashMap<>();
- sourceAttributes.put("source.pattern", "root.db.d1, root.db.d2");
- sourceAttributes.put("source.pattern.exclusion", "root.db.d1.s1");
- sourceAttributes.put("source.inclusion", "data.insert");
- sourceAttributes.put("user", "root");
-
- final List<String> insertQueries =
- Arrays.asList(
- "insert into root.db.d1(time, s, s1) values (1, 1, 1)",
- "insert into root.db.d2(time, s) values (2, 2)",
- "insert into root.db1.d1(time, s) values (3, 3)");
-
- final Set<String> expectedResSet = new HashSet<>();
- expectedResSet.add("1,1.0,null,");
- expectedResSet.add("2,null,2.0,");
-
- testPipeWithMultiplePatterns(
- sourceAttributes,
- insertQueries,
- false, // isHistorical = false
- "select * from root.db.**",
- "Time,root.db.d1.s,root.db.d2.s,",
- expectedResSet);
- }
-
- @Test
- @Ignore("Disabled: multi/exclusion tree patterns are blocked in this branch")
public void testIoTDBPatternWithExclusionHistoricalData() throws Exception {
final Map<String, String> sourceAttributes = new HashMap<>();
// Inclusion: Match everything under root.db
- sourceAttributes.put("source.path", "root.db.**");
+ sourceAttributes.put("source.pattern.inclusion", "root.db.**");
// Exclusion: Exclude root.db.d1.s* and root.db.d3.*
- sourceAttributes.put("source.path.exclusion", "root.db.d1.s*,
root.db.d3.*");
+ sourceAttributes.put("source.pattern.exclusion", "root.db.d1.s*,
root.db.d3.*");
sourceAttributes.put("source.inclusion", "data.insert");
sourceAttributes.put("user", "root");
@@ -537,11 +359,10 @@ public class IoTDBTreePatternFormatIT extends
AbstractPipeDualTreeModelAutoIT {
}
@Test
- @Ignore("Disabled: multi/exclusion tree patterns are blocked in this branch")
public void testIoTDBPatternWithExclusionRealtimeData() throws Exception {
final Map<String, String> sourceAttributes = new HashMap<>();
- sourceAttributes.put("source.path", "root.db.**");
- sourceAttributes.put("source.path.exclusion", "root.db.d1.s*,
root.db.d3.*");
+ sourceAttributes.put("source.pattern.inclusion", "root.db.**");
+ sourceAttributes.put("source.pattern.exclusion", "root.db.d1.s*,
root.db.d3.*");
sourceAttributes.put("source.inclusion", "data.insert");
sourceAttributes.put("user", "root");
@@ -564,71 +385,4 @@ public class IoTDBTreePatternFormatIT extends
AbstractPipeDualTreeModelAutoIT {
"Time,root.db.d1.t,root.db.d2.s,",
expectedResSet);
}
-
- @Test
- @Ignore("Disabled: multi/exclusion tree patterns are blocked in this branch")
- public void testHybridPatternWithHybridExclusionHistoricalData() throws
Exception {
- final Map<String, String> sourceAttributes = new HashMap<>();
- // Inclusion: Match root.db.** (IoTDB) AND root.db2.d1 (Prefix)
- sourceAttributes.put("source.path", "root.db.**");
- sourceAttributes.put("source.pattern", "root.db2.d1");
- // Exclusion: Exclude root.db.d1.* (IoTDB) AND root.db2.d1.s (Prefix)
- sourceAttributes.put("source.path.exclusion", "root.db.d1.*");
- sourceAttributes.put("source.pattern.exclusion", "root.db2.d1.s");
- sourceAttributes.put("source.inclusion", "data.insert");
- sourceAttributes.put("user", "root");
-
- final List<String> insertQueries =
- Arrays.asList(
- // s, s1 excluded by path.exclusion
- "insert into root.db.d1(time, s, s1) values (1, 1, 1)",
- // s matches
- "insert into root.db.d2(time, s) values (2, 2)",
- // s excluded by pattern.exclusion, t matches
- "insert into root.db2.d1(time, s, t) values (3, 3, 3)",
- "insert into root.db3.d1(time, s) values (4, 4)");
-
- final Set<String> expectedResSet = new HashSet<>();
- expectedResSet.add("2,2.0,null,");
- expectedResSet.add("3,null,3.0,");
-
- testPipeWithMultiplePatterns(
- sourceAttributes,
- insertQueries,
- true, // isHistorical = true
- "select * from root.db.**,root.db2.**",
- "Time,root.db.d2.s,root.db2.d1.t,",
- expectedResSet);
- }
-
- @Test
- @Ignore("Disabled: multi/exclusion tree patterns are blocked in this branch")
- public void testHybridPatternWithHybridExclusionRealtimeData() throws
Exception {
- final Map<String, String> sourceAttributes = new HashMap<>();
- sourceAttributes.put("source.path", "root.db.**");
- sourceAttributes.put("source.pattern", "root.db2.d1");
- sourceAttributes.put("source.path.exclusion", "root.db.d1.*");
- sourceAttributes.put("source.pattern.exclusion", "root.db2.d1.s");
- sourceAttributes.put("source.inclusion", "data.insert");
- sourceAttributes.put("user", "root");
-
- final List<String> insertQueries =
- Arrays.asList(
- "insert into root.db.d1(time, s, s1) values (1, 1, 1)",
- "insert into root.db.d2(time, s) values (2, 2)",
- "insert into root.db2.d1(time, s, t) values (3, 3, 3)",
- "insert into root.db3.d1(time, s) values (4, 4)");
-
- final Set<String> expectedResSet = new HashSet<>();
- expectedResSet.add("2,2.0,null,");
- expectedResSet.add("3,null,3.0,");
-
- testPipeWithMultiplePatterns(
- sourceAttributes,
- insertQueries,
- false, // isHistorical = false
- "select * from root.db.**,root.db2.**",
- "Time,root.db.d2.s,root.db2.d1.t,",
- expectedResSet);
- }
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeInclusionIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeInclusionIT.java
index 5bc4f468062..a95e8535d42 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeInclusionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeInclusionIT.java
@@ -29,7 +29,6 @@ import
org.apache.iotdb.itbase.category.MultiClusterIT2DualTreeManual;
import org.apache.iotdb.rpc.TSStatusCode;
import org.junit.Assert;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -114,7 +113,6 @@ public class IoTDBPipeInclusionIT extends
AbstractPipeDualTreeModelManualIT {
}
@Test
- @Ignore("Disabled: multi/exclusion tree patterns are blocked in this branch")
public void testPureSchemaInclusionWithMultiplePattern() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
@@ -127,7 +125,7 @@ public class IoTDBPipeInclusionIT extends
AbstractPipeDualTreeModelManualIT {
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> sinkAttributes = new HashMap<>();
- sourceAttributes.put("path", "root.ln.wf01.wt01.status,root.ln.wf02.**");
+ sourceAttributes.put("source.pattern.inclusion",
"root.ln.wf01.wt01.status,root.ln.wf02.**");
sourceAttributes.put("source.inclusion", "schema");
sourceAttributes.put("user", "root");
@@ -187,7 +185,6 @@ public class IoTDBPipeInclusionIT extends
AbstractPipeDualTreeModelManualIT {
}
@Test
- @Ignore("Disabled: multi/exclusion tree patterns are blocked in this branch")
public void testPureSchemaInclusionWithExclusionPattern() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
@@ -200,13 +197,13 @@ public class IoTDBPipeInclusionIT extends
AbstractPipeDualTreeModelManualIT {
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> sinkAttributes = new HashMap<>();
- sourceAttributes.put("extractor.inclusion", "schema");
+ sourceAttributes.put("source.inclusion", "schema");
sourceAttributes.put("user", "root");
// Include root.ln.**
- sourceAttributes.put("path", "root.ln.**");
+ sourceAttributes.put("source.pattern.inclusion", "root.ln.**");
// Exclude root.ln.wf02.* and root.ln.wf03.wt01.status
- sourceAttributes.put("path.exclusion", "root.ln.wf02.**,
root.ln.wf03.wt01.status");
+ sourceAttributes.put("source.pattern.exclusion", "root.ln.wf02.**,
root.ln.wf03.wt01.status");
sinkAttributes.put("connector", "iotdb-thrift-connector");
sinkAttributes.put("connector.ip", receiverIp);
@@ -314,7 +311,7 @@ public class IoTDBPipeInclusionIT extends
AbstractPipeDualTreeModelManualIT {
final Map<String, String> sinkAttributes = new HashMap<>();
sourceAttributes.put("source.inclusion", "auth");
- sourceAttributes.put("path", "root.ln.**");
+ sourceAttributes.put("source.pattern.inclusion", "root.ln.**");
sourceAttributes.put("user", "root");
sinkAttributes.put("sink", "iotdb-thrift-sink");
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 44f02bbd4bb..2f1be77fe3e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -109,7 +109,10 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.E
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_END_TIME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_START_TIME_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATH_EXCLUSION_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATH_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_EXCLUSION_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_INCLUSION_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_KEY;
@@ -118,7 +121,10 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.S
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_END_TIME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_START_TIME_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATH_EXCLUSION_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATH_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATTERN_EXCLUSION_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATTERN_INCLUSION_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATTERN_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_REALTIME_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_START_TIME_KEY;
@@ -808,10 +814,23 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
// If the source has pattern or path, we need to allocate memory
isTSFileParser =
isTSFileParser
- || sourceParameters.hasAnyAttributes(EXTRACTOR_PATTERN_KEY,
SOURCE_PATTERN_KEY);
+ || sourceParameters.hasAnyAttributes(
+ EXTRACTOR_PATTERN_KEY,
+ SOURCE_PATTERN_KEY,
+ EXTRACTOR_PATTERN_INCLUSION_KEY,
+ SOURCE_PATTERN_INCLUSION_KEY,
+ EXTRACTOR_PATTERN_EXCLUSION_KEY,
+ SOURCE_PATTERN_EXCLUSION_KEY);
isTSFileParser =
- isTSFileParser ||
sourceParameters.hasAnyAttributes(EXTRACTOR_PATH_KEY, SOURCE_PATH_KEY);
+ isTSFileParser
+ || sourceParameters.hasAnyAttributes(
+ EXTRACTOR_PATH_KEY,
+ SOURCE_PATH_KEY,
+ EXTRACTOR_PATTERN_INCLUSION_KEY,
+ SOURCE_PATTERN_INCLUSION_KEY,
+ EXTRACTOR_PATH_EXCLUSION_KEY,
+ SOURCE_PATH_EXCLUSION_KEY);
// If the source is not hybrid, we do need to allocate memory
isTSFileParser =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
index 4e69b47cc8d..d1ceb48e8aa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java
@@ -78,6 +78,7 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.E
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_FORMAT_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_INCLUSION_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_KEY;
@@ -109,6 +110,7 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.S
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODS_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATH_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATTERN_FORMAT_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATTERN_INCLUSION_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATTERN_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_REALTIME_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_REALTIME_LOOSE_RANGE_KEY;
@@ -180,7 +182,12 @@ public class IoTDBDataRegionSource extends IoTDBSource {
&& validator
.getParameters()
.hasAnyAttributes(
- EXTRACTOR_PATH_KEY, SOURCE_PATH_KEY, EXTRACTOR_PATTERN_KEY,
SOURCE_PATTERN_KEY)) {
+ EXTRACTOR_PATH_KEY,
+ SOURCE_PATH_KEY,
+ EXTRACTOR_PATTERN_KEY,
+ SOURCE_PATTERN_KEY,
+ EXTRACTOR_PATTERN_INCLUSION_KEY,
+ SOURCE_PATTERN_INCLUSION_KEY)) {
throw new PipeException(
"The pipe cannot extract tree model data when sql dialect is set to
table.");
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/TreePatternPruningTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/TreePatternPruningTest.java
index 62868f0492e..d5a209e5d87 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/TreePatternPruningTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/TreePatternPruningTest.java
@@ -21,19 +21,16 @@ package org.apache.iotdb.db.pipe.pattern;
import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
-import org.apache.iotdb.commons.pipe.datastructure.pattern.PrefixTreePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
import
org.apache.iotdb.commons.pipe.datastructure.pattern.UnionIoTDBTreePattern;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.junit.Assert;
-import org.junit.Ignore;
import org.junit.Test;
import java.util.HashMap;
-@Ignore("Disabled: multi/exclusion tree patterns are blocked in this branch")
public class TreePatternPruningTest {
@Test
@@ -42,7 +39,7 @@ public class TreePatternPruningTest {
new PipeParameters(
new HashMap<String, String>() {
{
- put(PipeSourceConstant.SOURCE_PATH_KEY,
"root.db.d1.*,root.db.d1.s1");
+ put(PipeSourceConstant.SOURCE_PATTERN_INCLUSION_KEY,
"root.db.d1.*,root.db.d1.s1");
}
});
@@ -58,7 +55,7 @@ public class TreePatternPruningTest {
new PipeParameters(
new HashMap<String, String>() {
{
- put(PipeSourceConstant.SOURCE_PATH_KEY,
"root.db.d1,root.db.d1");
+ put(PipeSourceConstant.SOURCE_PATTERN_INCLUSION_KEY,
"root.db.d1,root.db.d1");
}
});
@@ -74,8 +71,8 @@ public class TreePatternPruningTest {
new PipeParameters(
new HashMap<String, String>() {
{
- put(PipeSourceConstant.SOURCE_PATH_KEY,
"root.sg.d1,root.sg.d2");
- put(PipeSourceConstant.SOURCE_PATH_EXCLUSION_KEY,
"root.sg.d1");
+ put(PipeSourceConstant.SOURCE_PATTERN_INCLUSION_KEY,
"root.sg.d1,root.sg.d2");
+ put(PipeSourceConstant.SOURCE_PATTERN_EXCLUSION_KEY,
"root.sg.d1");
}
});
@@ -91,8 +88,8 @@ public class TreePatternPruningTest {
new PipeParameters(
new HashMap<String, String>() {
{
- put(PipeSourceConstant.SOURCE_PATH_KEY, "root.sg.d1");
- put(PipeSourceConstant.SOURCE_PATH_EXCLUSION_KEY,
"root.sg.**");
+ put(PipeSourceConstant.SOURCE_PATTERN_INCLUSION_KEY,
"root.sg.d1");
+ put(PipeSourceConstant.SOURCE_PATTERN_EXCLUSION_KEY,
"root.sg.**");
}
});
@@ -110,8 +107,10 @@ public class TreePatternPruningTest {
new PipeParameters(
new HashMap<String, String>() {
{
- put(PipeSourceConstant.SOURCE_PATH_KEY,
"root.sg.A,root.sg.B,root.sg.A.sub");
- put(PipeSourceConstant.SOURCE_PATH_EXCLUSION_KEY,
"root.sg.A,root.sg.A.**");
+ put(
+ PipeSourceConstant.SOURCE_PATTERN_INCLUSION_KEY,
+ "root.sg.A,root.sg.B,root.sg.A.sub");
+ put(PipeSourceConstant.SOURCE_PATTERN_EXCLUSION_KEY,
"root.sg.A,root.sg.A.**");
}
});
@@ -122,21 +121,21 @@ public class TreePatternPruningTest {
}
@Test
- public void testComplexPruning_Prefix() {
+ public void testLegacyPatternMultipleRulesRejected() {
final PipeParameters params =
new PipeParameters(
new HashMap<String, String>() {
{
- put(PipeSourceConstant.SOURCE_PATTERN_KEY,
"root.sg.A,root.sg.B,root.sg.A.sub");
- put(PipeSourceConstant.SOURCE_PATTERN_EXCLUSION_KEY,
"root.sg.A");
- put(PipeSourceConstant.SOURCE_PATTERN_FORMAT_KEY, "prefix");
+ put(PipeSourceConstant.SOURCE_PATTERN_KEY,
"root.sg.A,root.sg.B");
}
});
- final TreePattern result =
TreePattern.parsePipePatternFromSourceParameters(params);
-
- Assert.assertTrue(result instanceof PrefixTreePattern);
- Assert.assertEquals("root.sg.B", result.getPattern());
+ try {
+ TreePattern.parsePipePatternFromSourceParameters(params);
+ Assert.fail("Should throw PipeException for legacy multi-pattern
parameters");
+ } catch (final PipeException ignored) {
+ // Expected exception
+ }
}
@Test
@@ -145,8 +144,8 @@ public class TreePatternPruningTest {
new PipeParameters(
new HashMap<String, String>() {
{
- put(PipeSourceConstant.SOURCE_PATH_KEY,
"root.sg.d1,root.sg.d2");
- put(PipeSourceConstant.SOURCE_PATH_EXCLUSION_KEY,
"root.other");
+ put(PipeSourceConstant.SOURCE_PATTERN_INCLUSION_KEY,
"root.sg.d1,root.sg.d2");
+ put(PipeSourceConstant.SOURCE_PATTERN_EXCLUSION_KEY,
"root.other");
}
});
@@ -155,4 +154,22 @@ public class TreePatternPruningTest {
Assert.assertTrue(result instanceof UnionIoTDBTreePattern);
Assert.assertEquals("root.sg.d1,root.sg.d2", result.getPattern());
}
+
+ @Test
+ public void testLegacyPathMultipleRulesRejected() {
+ final PipeParameters params =
+ new PipeParameters(
+ new HashMap<String, String>() {
+ {
+ put(PipeSourceConstant.SOURCE_PATH_KEY,
"root.sg.d1,root.sg.d2");
+ }
+ });
+
+ try {
+ TreePattern.parsePipePatternFromSourceParameters(params);
+ Assert.fail("Should throw PipeException for legacy multi-path
parameters");
+ } catch (final PipeException ignored) {
+ // Expected exception
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/IoTDBDataRegionSourceTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/IoTDBDataRegionSourceTest.java
index feca8d54fed..dd0558bc238 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/IoTDBDataRegionSourceTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/IoTDBDataRegionSourceTest.java
@@ -25,7 +25,6 @@ import
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.junit.Assert;
-import org.junit.Ignore;
import org.junit.Test;
import java.util.HashMap;
@@ -54,7 +53,6 @@ public class IoTDBDataRegionSourceTest {
}
@Test
- @Ignore("Disabled: multi/exclusion tree patterns are blocked in this branch")
public void testIoTDBDataRegionExtractorWithPattern() {
Assert.assertEquals(
IllegalArgumentException.class,
@@ -84,8 +82,6 @@ public class IoTDBDataRegionSourceTest {
Assert.assertNull(testIoTDBDataRegionExtractorWithPattern("root"));
Assert.assertNull(testIoTDBDataRegionExtractorWithPattern("root.`a-b`"));
Assert.assertNull(testIoTDBDataRegionExtractorWithPattern("root.1"));
-
Assert.assertNull(testIoTDBDataRegionExtractorWithPattern("root.a,root.b"));
-
Assert.assertNull(testIoTDBDataRegionExtractorWithPattern("root.a,root.b,root.db1.`a,b`.**"));
}
public Exception testIoTDBDataRegionExtractorWithPattern(final String
pattern) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSourceConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSourceConstant.java
index d13bdf7d046..7107614ebd5 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSourceConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSourceConstant.java
@@ -52,6 +52,8 @@ public class PipeSourceConstant {
public static final String EXTRACTOR_PATTERN_KEY = "extractor.pattern";
public static final String SOURCE_PATTERN_KEY = "source.pattern";
+ public static final String EXTRACTOR_PATTERN_INCLUSION_KEY =
"extractor.pattern.inclusion";
+ public static final String SOURCE_PATTERN_INCLUSION_KEY =
"source.pattern.inclusion";
public static final String EXTRACTOR_PATH_KEY = "extractor.path";
public static final String SOURCE_PATH_KEY = "source.path";
public static final String EXTRACTOR_PATTERN_FORMAT_KEY =
"extractor.pattern.format";
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java
index 9a9360a0abf..0cea3b34e48 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java
@@ -47,11 +47,13 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.E
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_FORMAT_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_INCLUSION_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATH_EXCLUSION_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATH_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATTERN_EXCLUSION_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATTERN_FORMAT_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATTERN_INCLUSION_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATTERN_KEY;
public abstract class TreePattern {
@@ -143,18 +145,7 @@ public abstract class TreePattern {
*/
public static TreePattern parsePipePatternFromSourceParameters(
final PipeParameters sourceParameters) {
- final TreePattern treePattern =
parsePipePatternFromSourceParametersInternal(sourceParameters);
- if (!treePattern.isSingle()) {
- final String msg =
- String.format(
- "Pipe: The provided pattern should be single now. " +
"Inclusion: %s, Exclusion: %s",
- sourceParameters.getStringByKeys(EXTRACTOR_PATTERN_KEY,
SOURCE_PATTERN_KEY),
- sourceParameters.getStringByKeys(
- EXTRACTOR_PATTERN_EXCLUSION_KEY,
SOURCE_PATTERN_EXCLUSION_KEY));
- LOGGER.warn(msg);
- throw new PipeException(msg);
- }
- return treePattern;
+ return parsePipePatternFromSourceParametersInternal(sourceParameters);
}
public static TreePattern parsePipePatternFromSourceParametersInternal(
@@ -162,15 +153,41 @@ public abstract class TreePattern {
final boolean isTreeModelDataAllowedToBeCaptured =
isTreeModelDataAllowToBeCaptured(sourceParameters);
+ final boolean hasPatternInclusionKey =
+ sourceParameters.hasAnyAttributes(
+ EXTRACTOR_PATTERN_INCLUSION_KEY, SOURCE_PATTERN_INCLUSION_KEY);
+ final boolean hasLegacyPathKey =
+ sourceParameters.hasAnyAttributes(EXTRACTOR_PATH_KEY, SOURCE_PATH_KEY);
+ final boolean hasLegacyPatternKey =
+ sourceParameters.hasAnyAttributes(EXTRACTOR_PATTERN_KEY,
SOURCE_PATTERN_KEY);
+
+ if (hasPatternInclusionKey && (hasLegacyPathKey || hasLegacyPatternKey)) {
+ final String msg =
+ String.format(
+ "Pipe: %s cannot be used together with %s or %s.",
+ SOURCE_PATTERN_INCLUSION_KEY, SOURCE_PATTERN_KEY,
SOURCE_PATH_KEY);
+ LOGGER.warn(msg);
+ throw new PipeException(msg);
+ }
+
// 1. Parse INCLUSION patterns into a list
List<TreePattern> inclusionPatterns =
- parsePatternList(
- sourceParameters,
- isTreeModelDataAllowedToBeCaptured,
- EXTRACTOR_PATH_KEY,
- SOURCE_PATH_KEY,
- EXTRACTOR_PATTERN_KEY,
- SOURCE_PATTERN_KEY);
+ hasPatternInclusionKey
+ ? parseIoTDBPatternList(
+ sourceParameters.getStringByKeys(
+ EXTRACTOR_PATTERN_INCLUSION_KEY,
SOURCE_PATTERN_INCLUSION_KEY),
+ isTreeModelDataAllowedToBeCaptured,
+ true,
+ SOURCE_PATTERN_INCLUSION_KEY)
+ : parseLegacyPatternList(
+ sourceParameters,
+ isTreeModelDataAllowedToBeCaptured,
+ EXTRACTOR_PATH_KEY,
+ SOURCE_PATH_KEY,
+ EXTRACTOR_PATTERN_KEY,
+ SOURCE_PATTERN_KEY,
+ SOURCE_PATH_KEY,
+ SOURCE_PATTERN_KEY);
// If no inclusion patterns are specified, use default "root.**"
if (inclusionPatterns.isEmpty()) {
@@ -181,14 +198,34 @@ public abstract class TreePattern {
}
// 2. Parse EXCLUSION patterns into a list
+ if (hasPatternInclusionKey
+ && sourceParameters.hasAnyAttributes(
+ EXTRACTOR_PATH_EXCLUSION_KEY, SOURCE_PATH_EXCLUSION_KEY)) {
+ final String msg =
+ String.format(
+ "Pipe: %s cannot be used together with %s.",
+ SOURCE_PATTERN_INCLUSION_KEY, SOURCE_PATH_EXCLUSION_KEY);
+ LOGGER.warn(msg);
+ throw new PipeException(msg);
+ }
+
List<TreePattern> exclusionPatterns =
- parsePatternList(
- sourceParameters,
- isTreeModelDataAllowedToBeCaptured,
- EXTRACTOR_PATH_EXCLUSION_KEY,
- SOURCE_PATH_EXCLUSION_KEY,
- EXTRACTOR_PATTERN_EXCLUSION_KEY,
- SOURCE_PATTERN_EXCLUSION_KEY);
+ hasPatternInclusionKey
+ ? parseIoTDBPatternList(
+ sourceParameters.getStringByKeys(
+ EXTRACTOR_PATTERN_EXCLUSION_KEY,
SOURCE_PATTERN_EXCLUSION_KEY),
+ isTreeModelDataAllowedToBeCaptured,
+ true,
+ SOURCE_PATTERN_EXCLUSION_KEY)
+ : parseLegacyPatternList(
+ sourceParameters,
+ isTreeModelDataAllowedToBeCaptured,
+ EXTRACTOR_PATH_EXCLUSION_KEY,
+ SOURCE_PATH_EXCLUSION_KEY,
+ EXTRACTOR_PATTERN_EXCLUSION_KEY,
+ SOURCE_PATTERN_EXCLUSION_KEY,
+ SOURCE_PATH_EXCLUSION_KEY,
+ SOURCE_PATTERN_EXCLUSION_KEY);
// 3. Optimize the lists: remove redundant patterns (e.g., if "root.**"
exists, "root.db" is
// redundant)
@@ -206,9 +243,18 @@ public abstract class TreePattern {
"Pipe: The provided exclusion pattern fully covers the inclusion
pattern. "
+ "This pipe pattern will match nothing. "
+ "Inclusion: %s, Exclusion: %s",
- sourceParameters.getStringByKeys(EXTRACTOR_PATTERN_KEY,
SOURCE_PATTERN_KEY),
sourceParameters.getStringByKeys(
- EXTRACTOR_PATTERN_EXCLUSION_KEY,
SOURCE_PATTERN_EXCLUSION_KEY));
+ EXTRACTOR_PATTERN_INCLUSION_KEY,
+ SOURCE_PATTERN_INCLUSION_KEY,
+ EXTRACTOR_PATH_KEY,
+ SOURCE_PATH_KEY,
+ EXTRACTOR_PATTERN_KEY,
+ SOURCE_PATTERN_KEY),
+ sourceParameters.getStringByKeys(
+ EXTRACTOR_PATTERN_EXCLUSION_KEY,
+ SOURCE_PATTERN_EXCLUSION_KEY,
+ EXTRACTOR_PATH_EXCLUSION_KEY,
+ SOURCE_PATH_EXCLUSION_KEY));
LOGGER.warn(msg);
throw new PipeException(msg);
}
@@ -319,37 +365,71 @@ public abstract class TreePattern {
}
/**
- * Helper method to parse pattern parameters into a list of patterns without
creating the Union
- * object immediately.
+ * Helper method to parse legacy pattern parameters into a list of patterns
without creating the
+ * Union object immediately.
*/
- private static List<TreePattern> parsePatternList(
+ private static List<TreePattern> parseLegacyPatternList(
final PipeParameters sourceParameters,
final boolean isTreeModelDataAllowedToBeCaptured,
final String extractorPathKey,
final String sourcePathKey,
final String extractorPatternKey,
- final String sourcePatternKey) {
+ final String sourcePatternKey,
+ final String pathKeyName,
+ final String patternKeyName) {
final String path = sourceParameters.getStringByKeys(extractorPathKey,
sourcePathKey);
final String pattern =
sourceParameters.getStringByKeys(extractorPatternKey, sourcePatternKey);
+ if (path != null && pattern != null) {
+ final String msg =
+ String.format("Pipe: %s and %s cannot be used together.",
pathKeyName, patternKeyName);
+ LOGGER.warn(msg);
+ throw new PipeException(msg);
+ }
+
final List<TreePattern> result = new ArrayList<>();
if (path != null) {
result.addAll(
- parseMultiplePatterns(
- path, p -> new
IoTDBTreePattern(isTreeModelDataAllowedToBeCaptured, p)));
+ parseIoTDBPatternList(path, isTreeModelDataAllowedToBeCaptured,
false, pathKeyName));
}
if (pattern != null) {
result.addAll(
parsePatternsFromPatternParameter(
- pattern, sourceParameters, isTreeModelDataAllowedToBeCaptured));
+ pattern,
+ sourceParameters,
+ isTreeModelDataAllowedToBeCaptured,
+ false,
+ patternKeyName));
}
return result;
}
+ private static List<TreePattern> parseIoTDBPatternList(
+ final String pattern,
+ final boolean isTreeModelDataAllowedToBeCaptured,
+ final boolean allowMultiple,
+ final String parameterKey) {
+ if (pattern == null) {
+ return new ArrayList<>();
+ }
+
+ final List<TreePattern> patterns =
+ parseMultiplePatterns(
+ pattern, p -> new
IoTDBTreePattern(isTreeModelDataAllowedToBeCaptured, p));
+
+ if (!allowMultiple && patterns.size() > 1) {
+ final String msg =
+ String.format("Pipe: The parameter %s only supports a single pattern
now.", parameterKey);
+ LOGGER.warn(msg);
+ throw new PipeException(msg);
+ }
+ return patterns;
+ }
+
/**
* Removes patterns from the list that are covered by other patterns in the
same list. For
* example, if "root.**" and "root.db.**" are present, "root.db.**" is
removed.
@@ -554,29 +634,47 @@ public abstract class TreePattern {
private static List<TreePattern> parsePatternsFromPatternParameter(
final String pattern,
final PipeParameters sourceParameters,
- final boolean isTreeModelDataAllowedToBeCaptured) {
+ final boolean isTreeModelDataAllowedToBeCaptured,
+ final boolean allowMultiple,
+ final String parameterKey) {
final String patternFormat =
sourceParameters.getStringByKeys(EXTRACTOR_PATTERN_FORMAT_KEY,
SOURCE_PATTERN_FORMAT_KEY);
+ final List<TreePattern> patterns;
// If "source.pattern.format" is not specified, use prefix format by
default.
if (patternFormat == null) {
- return parseMultiplePatterns(
- pattern, p -> new
PrefixTreePattern(isTreeModelDataAllowedToBeCaptured, p));
+ patterns =
+ parseMultiplePatterns(
+ pattern, p -> new
PrefixTreePattern(isTreeModelDataAllowedToBeCaptured, p));
+ } else {
+ switch (patternFormat.toLowerCase()) {
+ case EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE:
+ patterns =
+ parseMultiplePatterns(
+ pattern, p -> new
IoTDBTreePattern(isTreeModelDataAllowedToBeCaptured, p));
+ break;
+ case EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE:
+ patterns =
+ parseMultiplePatterns(
+ pattern, p -> new
PrefixTreePattern(isTreeModelDataAllowedToBeCaptured, p));
+ break;
+ default:
+ LOGGER.info(
+ "Unknown pattern format: {}, use prefix matching format by
default.", patternFormat);
+ patterns =
+ parseMultiplePatterns(
+ pattern, p -> new
PrefixTreePattern(isTreeModelDataAllowedToBeCaptured, p));
+ }
}
- switch (patternFormat.toLowerCase()) {
- case EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE:
- return parseMultiplePatterns(
- pattern, p -> new
IoTDBTreePattern(isTreeModelDataAllowedToBeCaptured, p));
- case EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE:
- return parseMultiplePatterns(
- pattern, p -> new
PrefixTreePattern(isTreeModelDataAllowedToBeCaptured, p));
- default:
- LOGGER.info(
- "Unknown pattern format: {}, use prefix matching format by
default.", patternFormat);
- return parseMultiplePatterns(
- pattern, p -> new
PrefixTreePattern(isTreeModelDataAllowedToBeCaptured, p));
+ if (!allowMultiple && patterns.size() > 1) {
+ final String msg =
+ String.format("Pipe: The parameter %s only supports a single pattern
now.", parameterKey);
+ LOGGER.warn(msg);
+ throw new PipeException(msg);
}
+
+ return patterns;
}
private static List<TreePattern> parseMultiplePatterns(