This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 722e8959493 [To dev/1.3] Pipe: Optimized the path construction
efficiency in pattern match (#16265) (#16271)
722e8959493 is described below
commit 722e89594937f0fc8893ecd9332ea124829d3764
Author: Caideyipi <[email protected]>
AuthorDate: Tue Aug 26 19:01:08 2025 +0800
[To dev/1.3] Pipe: Optimized the path construction efficiency in pattern
match (#16265) (#16271)
* cp
* some
---
.../agent/runtime/PipeDataNodeRuntimeAgent.java | 7 +-
.../matcher/CachedSchemaPatternMatcher.java | 118 ++++++++++-----------
.../realtime/matcher/PipeDataRegionMatcher.java | 6 +-
.../execute/utils/CompactionPathUtils.java | 22 ++--
.../apache/iotdb/commons/conf/CommonConfig.java | 14 +--
.../iotdb/commons/pipe/config/PipeConfig.java | 6 +-
.../iotdb/commons/pipe/config/PipeDescriptor.java | 6 +-
.../datastructure/pattern/IoTDBPipePattern.java | 22 +++-
8 files changed, 116 insertions(+), 85 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
index 70af01b288e..81702645102 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
@@ -29,6 +29,7 @@ import
org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalJobExecutor;
import
org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalPhantomReferenceCleaner;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
@@ -39,6 +40,7 @@ import
org.apache.iotdb.db.pipe.resource.PipeDataNodeHardlinkOrCopiedFileDirStar
import org.apache.iotdb.db.pipe.source.schemaregion.SchemaRegionListeningQueue;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.service.ResourcesInformationHolder;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionPathUtils;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.slf4j.Logger;
@@ -69,7 +71,7 @@ public class PipeDataNodeRuntimeAgent implements IService {
//////////////////////////// System Service Interface
////////////////////////////
public synchronized void preparePipeResources(
- ResourcesInformationHolder resourcesInformationHolder) throws
StartupException {
+ final ResourcesInformationHolder resourcesInformationHolder) throws
StartupException {
// Clean sender (connector) hardlink file dir and snapshot dir
PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner.clean();
@@ -78,6 +80,9 @@ public class PipeDataNodeRuntimeAgent implements IService {
PipeAgentLauncher.launchPipePluginAgent(resourcesInformationHolder);
simpleProgressIndexAssigner.start();
+
+ IoTDBPipePattern.setDevicePathGetter(CompactionPathUtils::getPath);
+ IoTDBPipePattern.setMeasurementPathGetter(CompactionPathUtils::getPath);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
index 2c68331705f..a01b50e7e68 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
@@ -46,38 +46,38 @@ public class CachedSchemaPatternMatcher implements
PipeDataRegionMatcher {
protected final ReentrantReadWriteLock lock;
- protected final Set<PipeRealtimeDataRegionSource> extractors;
- protected final Cache<String, Set<PipeRealtimeDataRegionSource>>
deviceToExtractorsCache;
+ protected final Set<PipeRealtimeDataRegionSource> sources;
+ protected final Cache<String, Set<PipeRealtimeDataRegionSource>>
deviceToSourcesCache;
public CachedSchemaPatternMatcher() {
this.lock = new ReentrantReadWriteLock();
- // Should be thread-safe because the extractors will be returned by {@link
#match} and
- // iterated by {@link #assignToExtractor}, at the same time the extractors
may be added or
+ // Should be thread-safe because the sources will be returned by {@link
#match} and
+ // iterated by {@link #assignToSource}, at the same time the sources may
be added or
// removed by {@link #register} and {@link #deregister}.
- this.extractors = new CopyOnWriteArraySet<>();
- this.deviceToExtractorsCache =
+ this.sources = new CopyOnWriteArraySet<>();
+ this.deviceToSourcesCache =
Caffeine.newBuilder()
-
.maximumSize(PipeConfig.getInstance().getPipeExtractorMatcherCacheSize())
+
.maximumSize(PipeConfig.getInstance().getPipeSourceMatcherCacheSize())
.build();
}
@Override
- public void register(final PipeRealtimeDataRegionSource extractor) {
+ public void register(final PipeRealtimeDataRegionSource source) {
lock.writeLock().lock();
try {
- extractors.add(extractor);
- deviceToExtractorsCache.invalidateAll();
+ sources.add(source);
+ deviceToSourcesCache.invalidateAll();
} finally {
lock.writeLock().unlock();
}
}
@Override
- public void deregister(final PipeRealtimeDataRegionSource extractor) {
+ public void deregister(final PipeRealtimeDataRegionSource source) {
lock.writeLock().lock();
try {
- extractors.remove(extractor);
- deviceToExtractorsCache.invalidateAll();
+ sources.remove(source);
+ deviceToSourcesCache.invalidateAll();
} finally {
lock.writeLock().unlock();
}
@@ -87,7 +87,7 @@ public class CachedSchemaPatternMatcher implements
PipeDataRegionMatcher {
public int getRegisterCount() {
lock.readLock().lock();
try {
- return extractors.size();
+ return sources.size();
} finally {
lock.readLock().unlock();
}
@@ -96,60 +96,60 @@ public class CachedSchemaPatternMatcher implements
PipeDataRegionMatcher {
@Override
public Pair<Set<PipeRealtimeDataRegionSource>,
Set<PipeRealtimeDataRegionSource>> match(
final PipeRealtimeEvent event) {
- final Set<PipeRealtimeDataRegionSource> matchedExtractors = new
HashSet<>();
+ final Set<PipeRealtimeDataRegionSource> matchedSources = new HashSet<>();
lock.readLock().lock();
try {
- if (extractors.isEmpty()) {
- return new Pair<>(matchedExtractors, extractors);
+ if (sources.isEmpty()) {
+ return new Pair<>(matchedSources, sources);
}
- // HeartbeatEvent will be assigned to all extractors
+ // HeartbeatEvent will be assigned to all sources
if (event.getEvent() instanceof PipeHeartbeatEvent) {
- return new Pair<>(extractors, Collections.EMPTY_SET);
+ return new Pair<>(sources, Collections.EMPTY_SET);
}
- // Deletion event will be assigned to extractors listened to it
+ // Deletion event will be assigned to sources listened to it
if (event.getEvent() instanceof PipeSchemaRegionWritePlanEvent) {
- extractors.stream()
+ sources.stream()
.filter(PipeRealtimeDataRegionSource::shouldExtractDeletion)
- .forEach(matchedExtractors::add);
- return new Pair<>(matchedExtractors,
findUnmatchedExtractors(matchedExtractors));
+ .forEach(matchedSources::add);
+ return new Pair<>(matchedSources,
findUnmatchedSources(matchedSources));
}
for (final Map.Entry<String, String[]> entry :
event.getSchemaInfo().entrySet()) {
final String device = entry.getKey();
final String[] measurements = entry.getValue();
- // 1. try to get matched extractors from cache, if not success, match
them by device
- final Set<PipeRealtimeDataRegionSource> extractorsFilteredByDevice =
- deviceToExtractorsCache.get(device,
this::filterExtractorsByDevice);
+ // 1. try to get matched sources from cache, if not success, match
them by device
+ final Set<PipeRealtimeDataRegionSource> sourcesFilteredByDevice =
+ deviceToSourcesCache.get(device, this::filterSourcesByDevice);
// this would not happen
- if (extractorsFilteredByDevice == null) {
+ if (sourcesFilteredByDevice == null) {
LOGGER.warn("Match result NPE when handle device {}", device);
continue;
}
- // 2. filter matched candidate extractors by measurements
+ // 2. filter matched candidate sources by measurements
if (measurements.length == 0) {
- // `measurements` is empty (only in case of tsfile event). match all
extractors.
+ // `measurements` is empty (only in case of tsfile event). match all
sources.
//
// case 1: the pattern can match all measurements of the device.
- // in this case, the extractor can be matched without checking the
measurements.
+ // in this case, the source can be matched without checking the
measurements.
//
// case 2: the pattern may match some measurements of the device.
// in this case, we can't get all measurements efficiently here,
- // so we just ASSUME the extractor matches and do more checks later.
- matchedExtractors.addAll(extractorsFilteredByDevice);
+ // so we just ASSUME the source matches and do more checks later.
+ matchedSources.addAll(sourcesFilteredByDevice);
} else {
// `measurements` is not empty (only in case of tablet event).
- // Match extractors by measurements.
- extractorsFilteredByDevice.forEach(
- extractor -> {
- final PipePattern pattern = extractor.getPipePattern();
+ // Match sources by measurements.
+ sourcesFilteredByDevice.forEach(
+ source -> {
+ final PipePattern pattern = source.getPipePattern();
if (Objects.isNull(pattern) || pattern.isRoot() ||
pattern.coversDevice(device)) {
// The pattern can match all measurements of the device.
- matchedExtractors.add(extractor);
+ matchedSources.add(source);
} else {
for (final String measurement : measurements) {
// Ignore null measurement for partial insert
@@ -158,8 +158,8 @@ public class CachedSchemaPatternMatcher implements
PipeDataRegionMatcher {
}
if (pattern.matchesMeasurement(device, measurement)) {
- matchedExtractors.add(extractor);
- // There would be no more matched extractors because the
measurements are
+ matchedSources.add(source);
+ // There would be no more matched sources because the
measurements are
// unique
break;
}
@@ -168,53 +168,53 @@ public class CachedSchemaPatternMatcher implements
PipeDataRegionMatcher {
});
}
- if (matchedExtractors.size() == extractors.size()) {
+ if (matchedSources.size() == sources.size()) {
break;
}
}
- return new Pair<>(matchedExtractors,
findUnmatchedExtractors(matchedExtractors));
+ return new Pair<>(matchedSources, findUnmatchedSources(matchedSources));
} finally {
lock.readLock().unlock();
}
}
- private Set<PipeRealtimeDataRegionSource> findUnmatchedExtractors(
- final Set<PipeRealtimeDataRegionSource> matchedExtractors) {
- final Set<PipeRealtimeDataRegionSource> unmatchedExtractors = new
HashSet<>();
- for (final PipeRealtimeDataRegionSource extractor : extractors) {
- if (!matchedExtractors.contains(extractor)) {
- unmatchedExtractors.add(extractor);
+ private Set<PipeRealtimeDataRegionSource> findUnmatchedSources(
+ final Set<PipeRealtimeDataRegionSource> matchedSources) {
+ final Set<PipeRealtimeDataRegionSource> unmatchedSources = new HashSet<>();
+ for (final PipeRealtimeDataRegionSource source : sources) {
+ if (!matchedSources.contains(source)) {
+ unmatchedSources.add(source);
}
}
- return unmatchedExtractors;
+ return unmatchedSources;
}
- protected Set<PipeRealtimeDataRegionSource> filterExtractorsByDevice(final
String device) {
- final Set<PipeRealtimeDataRegionSource> filteredExtractors = new
HashSet<>();
+ protected Set<PipeRealtimeDataRegionSource> filterSourcesByDevice(final
String device) {
+ final Set<PipeRealtimeDataRegionSource> filteredSources = new HashSet<>();
- for (final PipeRealtimeDataRegionSource extractor : extractors) {
- // Return if the extractor only extract deletion
- if (!extractor.shouldExtractInsertion()) {
+ for (final PipeRealtimeDataRegionSource source : sources) {
+ // Return if the source only extract deletion
+ if (!source.shouldExtractInsertion()) {
continue;
}
- final PipePattern pipePattern = extractor.getPipePattern();
+ final PipePattern pipePattern = source.getPipePattern();
if (Objects.isNull(pipePattern) ||
pipePattern.mayOverlapWithDevice(device)) {
- filteredExtractors.add(extractor);
+ filteredSources.add(source);
}
}
- return filteredExtractors;
+ return filteredSources;
}
@Override
public void clear() {
lock.writeLock().lock();
try {
- extractors.clear();
- deviceToExtractorsCache.invalidateAll();
- deviceToExtractorsCache.cleanUp();
+ sources.clear();
+ deviceToSourcesCache.invalidateAll();
+ deviceToSourcesCache.cleanUp();
} finally {
lock.writeLock().unlock();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/PipeDataRegionMatcher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/PipeDataRegionMatcher.java
index 810e42a92b2..da5e9318f5a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/PipeDataRegionMatcher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/PipeDataRegionMatcher.java
@@ -32,10 +32,10 @@ public interface PipeDataRegionMatcher {
* Register a extractor. If the extractor's pattern matches the event's
schema info, the event
* will be assigned to the extractor.
*/
- void register(PipeRealtimeDataRegionSource extractor);
+ void register(final PipeRealtimeDataRegionSource extractor);
/** Deregister a extractor. */
- void deregister(PipeRealtimeDataRegionSource extractor);
+ void deregister(final PipeRealtimeDataRegionSource extractor);
/** Get the number of registered extractors in this matcher. */
int getRegisterCount();
@@ -48,7 +48,7 @@ public interface PipeDataRegionMatcher {
* @return pair of matched extractors and unmatched extractors.
*/
Pair<Set<PipeRealtimeDataRegionSource>, Set<PipeRealtimeDataRegionSource>>
match(
- PipeRealtimeEvent event);
+ final PipeRealtimeEvent event);
/** Clear all the registered extractors and internal data structures. */
void clear();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionPathUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionPathUtils.java
index 52c4ec4554b..63140f733a1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionPathUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionPathUtils.java
@@ -31,18 +31,26 @@ public class CompactionPathUtils {
private CompactionPathUtils() {}
- public static PartialPath getPath(IDeviceID device, String measurement)
+ public static PartialPath getPath(final IDeviceID device, String measurement)
throws IllegalPathException {
return getPath(device).concatNode(measurement);
}
- public static PartialPath getPath(IDeviceID device) throws
IllegalPathException {
- PartialPath path;
- String plainDeviceId = ((PlainDeviceID) device).toStringID();
- if (plainDeviceId.contains(TsFileConstant.BACK_QUOTE_STRING)) {
- path =
DataNodeDevicePathCache.getInstance().getPartialPath(plainDeviceId);
+ public static PartialPath getPath(final IDeviceID device) throws
IllegalPathException {
+ return getPath(((PlainDeviceID) device).toStringID());
+ }
+
+ public static PartialPath getPath(final String device, String measurement)
+ throws IllegalPathException {
+ return getPath(device).concatNode(measurement);
+ }
+
+ public static PartialPath getPath(final String device) throws
IllegalPathException {
+ final PartialPath path;
+ if (device.contains(TsFileConstant.BACK_QUOTE_STRING)) {
+ path = DataNodeDevicePathCache.getInstance().getPartialPath(device);
} else {
- path = new
PartialPath(plainDeviceId.split(TsFileConstant.PATH_SEPARATER_NO_REGEX));
+ path = new
PartialPath(device.split(TsFileConstant.PATH_SEPARATER_NO_REGEX));
}
return path;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 1608303e46f..29c06a55589 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -257,7 +257,7 @@ public class CommonConfig {
private int pipeExtractorAssignerDisruptorRingBufferSize = 128;
private long pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes = 72 *
KB;
- private long pipeExtractorMatcherCacheSize = 1024;
+ private long pipeSourceMatcherCacheSize = 1024;
private int pipeConnectorHandshakeTimeoutMs = 10 * 1000; // 10 seconds
private int pipeConnectorTransferTimeoutMs = 15 * 60 * 1000; // 15 minutes
@@ -985,16 +985,16 @@ public class CommonConfig {
pipeExtractorAssignerDisruptorRingBufferEntrySize);
}
- public long getPipeExtractorMatcherCacheSize() {
- return pipeExtractorMatcherCacheSize;
+ public long getPipeSourceMatcherCacheSize() {
+ return pipeSourceMatcherCacheSize;
}
- public void setPipeExtractorMatcherCacheSize(long
pipeExtractorMatcherCacheSize) {
- if (this.pipeExtractorMatcherCacheSize == pipeExtractorMatcherCacheSize) {
+ public void setPipeSourceMatcherCacheSize(long pipeSourceMatcherCacheSize) {
+ if (this.pipeSourceMatcherCacheSize == pipeSourceMatcherCacheSize) {
return;
}
- this.pipeExtractorMatcherCacheSize = pipeExtractorMatcherCacheSize;
- logger.info("pipeExtractorMatcherCacheSize is set to {}.",
pipeExtractorMatcherCacheSize);
+ this.pipeSourceMatcherCacheSize = pipeSourceMatcherCacheSize;
+ logger.info("pipeExtractorMatcherCacheSize is set to {}.",
pipeSourceMatcherCacheSize);
}
public int getPipeConnectorHandshakeTimeoutMs() {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 00423730877..1dcc849f86c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -159,8 +159,8 @@ public class PipeConfig {
return
COMMON_CONFIG.getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes();
}
- public long getPipeExtractorMatcherCacheSize() {
- return COMMON_CONFIG.getPipeExtractorMatcherCacheSize();
+ public long getPipeSourceMatcherCacheSize() {
+ return COMMON_CONFIG.getPipeSourceMatcherCacheSize();
}
/////////////////////////////// Connector ///////////////////////////////
@@ -478,7 +478,7 @@ public class PipeConfig {
LOGGER.info(
"PipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes: {}",
getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes());
- LOGGER.info("PipeExtractorMatcherCacheSize: {}",
getPipeExtractorMatcherCacheSize());
+ LOGGER.info("PipeSourceMatcherCacheSize: {}",
getPipeSourceMatcherCacheSize());
LOGGER.info("PipeConnectorHandshakeTimeoutMs: {}",
getPipeConnectorHandshakeTimeoutMs());
LOGGER.info("PipeConnectorTransferTimeoutMs: {}",
getPipeConnectorTransferTimeoutMs());
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 640652345a1..a60ac57da02 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -322,13 +322,13 @@ public class PipeDescriptor {
config
.getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes())))));
- config.setPipeExtractorMatcherCacheSize(
+ config.setPipeSourceMatcherCacheSize(
Integer.parseInt(
-
Optional.ofNullable(properties.getProperty("pipe_extractor_matcher_cache_size"))
+
Optional.ofNullable(properties.getProperty("pipe_source_matcher_cache_size"))
.orElse(
properties.getProperty(
"pipe_extractor_matcher_cache_size",
-
String.valueOf(config.getPipeExtractorMatcherCacheSize())))));
+
String.valueOf(config.getPipeSourceMatcherCacheSize())))));
config.setPipeConnectorHandshakeTimeoutMs(
Long.parseLong(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBPipePattern.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBPipePattern.java
index e9129ec23e1..9ed5837e625 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBPipePattern.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBPipePattern.java
@@ -37,6 +37,8 @@ import java.util.stream.Collectors;
public class IoTDBPipePattern extends PipePattern {
private final PartialPath patternPartialPath;
+ private static volatile DevicePathGetter devicePathGetter = PartialPath::new;
+ private static volatile MeasurementPathGetter measurementPathGetter =
PartialPath::new;
public IoTDBPipePattern(final String pattern) {
super(pattern);
@@ -99,7 +101,7 @@ public class IoTDBPipePattern extends PipePattern {
try {
// Another way is to use patternPath.overlapWith("device.*"),
// there will be no false positives but time cost may be higher.
- return patternPartialPath.matchPrefixPath(new PartialPath(device));
+ return
patternPartialPath.matchPrefixPath(devicePathGetter.apply(device));
} catch (final IllegalPathException e) {
return false;
}
@@ -122,7 +124,7 @@ public class IoTDBPipePattern extends PipePattern {
}
try {
- return patternPartialPath.matchFullPath(new PartialPath(device,
measurement));
+ return
patternPartialPath.matchFullPath(measurementPathGetter.apply(device,
measurement));
} catch (final IllegalPathException e) {
return false;
}
@@ -198,8 +200,24 @@ public class IoTDBPipePattern extends PipePattern {
return PathPatternUtil.hasWildcard(patternPartialPath.getTailNode());
}
+ public static void setDevicePathGetter(final DevicePathGetter
devicePathGetter) {
+ IoTDBPipePattern.devicePathGetter = devicePathGetter;
+ }
+
+ public static void setMeasurementPathGetter(final MeasurementPathGetter
measurementPathGetter) {
+ IoTDBPipePattern.measurementPathGetter = measurementPathGetter;
+ }
+
@Override
public String toString() {
return "IoTDBPipePattern" + super.toString();
}
+
+ public interface DevicePathGetter {
+ PartialPath apply(final String deviceId) throws IllegalPathException;
+ }
+
+ public interface MeasurementPathGetter {
+ PartialPath apply(final String deviceId, final String measurement) throws
IllegalPathException;
+ }
}