This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3446a313388 Pipe: Optimized the path construction efficiency in
pattern match (#16265)
3446a313388 is described below
commit 3446a313388c6075e58ce5eb3c58d3fd66e480cd
Author: Caideyipi <[email protected]>
AuthorDate: Tue Aug 26 16:41:54 2025 +0800
Pipe: Optimized the path construction efficiency in pattern match (#16265)
* refactpr
* fix-optimize
* optimie
* fix
---
.../agent/runtime/PipeDataNodeRuntimeAgent.java | 7 +-
.../matcher/CachedSchemaPatternMatcher.java | 180 ++++++++++-----------
.../cache/schema/DataNodeDevicePathCache.java | 3 -
.../execute/utils/CompactionPathUtils.java | 23 +--
.../apache/iotdb/commons/conf/CommonConfig.java | 14 +-
.../iotdb/commons/pipe/config/PipeConfig.java | 6 +-
.../iotdb/commons/pipe/config/PipeDescriptor.java | 6 +-
.../datastructure/pattern/IoTDBTreePattern.java | 23 ++-
8 files changed, 143 insertions(+), 119 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
index 593ba6a297b..67621e3033a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
@@ -29,6 +29,7 @@ import
org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalJobExecutor;
import
org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalPhantomReferenceCleaner;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
@@ -39,6 +40,7 @@ import
org.apache.iotdb.db.pipe.resource.PipeDataNodeHardlinkOrCopiedFileDirStar
import org.apache.iotdb.db.pipe.source.schemaregion.SchemaRegionListeningQueue;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.service.ResourcesInformationHolder;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionPathUtils;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.slf4j.Logger;
@@ -69,7 +71,7 @@ public class PipeDataNodeRuntimeAgent implements IService {
//////////////////////////// System Service Interface
////////////////////////////
public synchronized void preparePipeResources(
- ResourcesInformationHolder resourcesInformationHolder) throws
StartupException {
+ final ResourcesInformationHolder resourcesInformationHolder) throws
StartupException {
// Clean sender (connector) hardlink file dir and snapshot dir
PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner.clean();
@@ -78,6 +80,9 @@ public class PipeDataNodeRuntimeAgent implements IService {
PipeAgentLauncher.launchPipePluginAgent(resourcesInformationHolder);
simpleProgressIndexAssigner.start();
+
+ IoTDBTreePattern.setDevicePathGetter(CompactionPathUtils::getPath);
+ IoTDBTreePattern.setMeasurementPathGetter(CompactionPathUtils::getPath);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
index 884bad39bb5..4411432e1da 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/matcher/CachedSchemaPatternMatcher.java
@@ -58,47 +58,47 @@ public class CachedSchemaPatternMatcher implements
PipeDataRegionMatcher {
protected final ReentrantReadWriteLock lock;
private final AccessControl accessControl =
Coordinator.getInstance().getAccessControl();
- protected final Set<PipeRealtimeDataRegionSource> extractors;
+ protected final Set<PipeRealtimeDataRegionSource> sources;
- protected final Cache<IDeviceID, Set<PipeRealtimeDataRegionSource>>
deviceToExtractorsCache;
+ protected final Cache<IDeviceID, Set<PipeRealtimeDataRegionSource>>
deviceToSourcesCache;
protected final Cache<Pair<String, IDeviceID>,
Set<PipeRealtimeDataRegionSource>>
- databaseAndTableToExtractorsCache;
+ databaseAndTableToSourcesCache;
public CachedSchemaPatternMatcher() {
this.lock = new ReentrantReadWriteLock();
- // Should be thread-safe because the extractors will be returned by {@link
#match} and
- // iterated by {@link #assignToExtractor}, at the same time the extractors
may be added or
+ // Should be thread-safe because the sources will be returned by {@link
#match} and
+ // iterated by {@link #assignToSource}, at the same time the sources may
be added or
// removed by {@link #register} and {@link #deregister}.
- this.extractors = new CopyOnWriteArraySet<>();
- this.deviceToExtractorsCache =
+ this.sources = new CopyOnWriteArraySet<>();
+ this.deviceToSourcesCache =
Caffeine.newBuilder()
-
.maximumSize(PipeConfig.getInstance().getPipeExtractorMatcherCacheSize())
+
.maximumSize(PipeConfig.getInstance().getPipeSourceMatcherCacheSize())
.build();
- this.databaseAndTableToExtractorsCache =
+ this.databaseAndTableToSourcesCache =
Caffeine.newBuilder()
-
.maximumSize(PipeConfig.getInstance().getPipeExtractorMatcherCacheSize())
+
.maximumSize(PipeConfig.getInstance().getPipeSourceMatcherCacheSize())
.build();
}
@Override
- public void register(final PipeRealtimeDataRegionSource extractor) {
+ public void register(final PipeRealtimeDataRegionSource source) {
lock.writeLock().lock();
try {
- extractors.add(extractor);
- deviceToExtractorsCache.invalidateAll();
- databaseAndTableToExtractorsCache.invalidateAll();
+ sources.add(source);
+ deviceToSourcesCache.invalidateAll();
+ databaseAndTableToSourcesCache.invalidateAll();
} finally {
lock.writeLock().unlock();
}
}
@Override
- public void deregister(final PipeRealtimeDataRegionSource extractor) {
+ public void deregister(final PipeRealtimeDataRegionSource source) {
lock.writeLock().lock();
try {
- extractors.remove(extractor);
- deviceToExtractorsCache.invalidateAll();
- databaseAndTableToExtractorsCache.invalidateAll();
+ sources.remove(source);
+ deviceToSourcesCache.invalidateAll();
+ databaseAndTableToSourcesCache.invalidateAll();
} finally {
lock.writeLock().unlock();
}
@@ -109,7 +109,7 @@ public class CachedSchemaPatternMatcher implements
PipeDataRegionMatcher {
lock.writeLock().lock();
try {
// Will invalidate device cache
- databaseAndTableToExtractorsCache.invalidateAll();
+ databaseAndTableToSourcesCache.invalidateAll();
} finally {
lock.writeLock().unlock();
}
@@ -119,7 +119,7 @@ public class CachedSchemaPatternMatcher implements
PipeDataRegionMatcher {
public int getRegisterCount() {
lock.readLock().lock();
try {
- return extractors.size();
+ return sources.size();
} finally {
lock.readLock().unlock();
}
@@ -128,26 +128,26 @@ public class CachedSchemaPatternMatcher implements
PipeDataRegionMatcher {
@Override
public Pair<Set<PipeRealtimeDataRegionSource>,
Set<PipeRealtimeDataRegionSource>> match(
final PipeRealtimeEvent event) {
- final Set<PipeRealtimeDataRegionSource> matchedExtractors = new
HashSet<>();
+ final Set<PipeRealtimeDataRegionSource> matchedSources = new HashSet<>();
lock.readLock().lock();
try {
- if (extractors.isEmpty()) {
- return new Pair<>(matchedExtractors, extractors);
+ if (sources.isEmpty()) {
+ return new Pair<>(matchedSources, sources);
}
- // HeartbeatEvent will be assigned to all extractors
+ // HeartbeatEvent will be assigned to all sources
if (event.getEvent() instanceof PipeHeartbeatEvent) {
- return new Pair<>(extractors, Collections.EMPTY_SET);
+ return new Pair<>(sources, Collections.EMPTY_SET);
}
// TODO: consider table pattern?
- // Deletion event will be assigned to extractors listened to it
+ // Deletion event will be assigned to sources listened to it
if (event.getEvent() instanceof PipeDeleteDataNodeEvent) {
- extractors.stream()
+ sources.stream()
.filter(PipeRealtimeDataRegionSource::shouldExtractDeletion)
- .forEach(matchedExtractors::add);
- return new Pair<>(matchedExtractors,
findUnmatchedExtractors(matchedExtractors));
+ .forEach(matchedSources::add);
+ return new Pair<>(matchedSources,
findUnmatchedSources(matchedSources));
}
for (final Map.Entry<IDeviceID, String[]> entry :
event.getSchemaInfo().entrySet()) {
@@ -158,7 +158,7 @@ public class CachedSchemaPatternMatcher implements
PipeDataRegionMatcher {
||
deviceID.getTableName().startsWith(TREE_MODEL_EVENT_TABLE_NAME_PREFIX)
|| deviceID.getTableName().equals(PATH_ROOT)) {
event.markAsTreeModelEvent();
- matchTreeModelEvent(deviceID, entry.getValue(), matchedExtractors);
+ matchTreeModelEvent(deviceID, entry.getValue(), matchedSources);
} else {
event.markAsTableModelEvent();
matchTableModelEvent(
@@ -166,70 +166,70 @@ public class CachedSchemaPatternMatcher implements
PipeDataRegionMatcher {
? ((PipeInsertionEvent)
event.getEvent()).getTableModelDatabaseName()
: null,
deviceID,
- matchedExtractors);
+ matchedSources);
}
- if (matchedExtractors.size() == extractors.size()) {
+ if (matchedSources.size() == sources.size()) {
break;
}
}
- return new Pair<>(matchedExtractors,
findUnmatchedExtractors(matchedExtractors));
+ return new Pair<>(matchedSources, findUnmatchedSources(matchedSources));
} finally {
lock.readLock().unlock();
}
}
- private Set<PipeRealtimeDataRegionSource> findUnmatchedExtractors(
- final Set<PipeRealtimeDataRegionSource> matchedExtractors) {
- final Set<PipeRealtimeDataRegionSource> unmatchedExtractors = new
HashSet<>();
- for (final PipeRealtimeDataRegionSource extractor : extractors) {
- if (!matchedExtractors.contains(extractor)) {
- unmatchedExtractors.add(extractor);
+ private Set<PipeRealtimeDataRegionSource> findUnmatchedSources(
+ final Set<PipeRealtimeDataRegionSource> matchedSources) {
+ final Set<PipeRealtimeDataRegionSource> unmatchedSources = new HashSet<>();
+ for (final PipeRealtimeDataRegionSource source : sources) {
+ if (!matchedSources.contains(source)) {
+ unmatchedSources.add(source);
}
}
- return unmatchedExtractors;
+ return unmatchedSources;
}
protected void matchTreeModelEvent(
final IDeviceID device,
final String[] measurements,
- final Set<PipeRealtimeDataRegionSource> matchedExtractors) {
- // 1. try to get matched extractors from cache, if not success, match them
by device
- final Set<PipeRealtimeDataRegionSource> extractorsFilteredByDevice =
- deviceToExtractorsCache.get(device, this::filterExtractorsByDevice);
+ final Set<PipeRealtimeDataRegionSource> matchedSources) {
+ // 1. try to get matched sources from cache, if not success, match them by
device
+ final Set<PipeRealtimeDataRegionSource> sourcesFilteredByDevice =
+ deviceToSourcesCache.get(device, this::filterSourcesByDevice);
// this would not happen
- if (extractorsFilteredByDevice == null) {
+ if (sourcesFilteredByDevice == null) {
LOGGER.warn(
- "Extractors filtered by device is null when matching extractors for
tree model event.",
+ "Sources filtered by device is null when matching sources for tree
model event.",
new Exception());
return;
}
- // 2. filter matched candidate extractors by measurements
+ // 2. filter matched candidate sources by measurements
if (measurements.length == 0) {
- // `measurements` is empty (only in case of tsfile event). match all
extractors.
+ // `measurements` is empty (only in case of tsfile event). match all
sources.
//
// case 1: the pattern can match all measurements of the device.
- // in this case, the extractor can be matched without checking the
measurements.
+ // in this case, the source can be matched without checking the
measurements.
//
// case 2: the pattern may match some measurements of the device.
// in this case, we can't get all measurements efficiently here,
- // so we just ASSUME the extractor matches and do more checks later.
- matchedExtractors.addAll(extractorsFilteredByDevice);
+ // so we just ASSUME the source matches and do more checks later.
+ matchedSources.addAll(sourcesFilteredByDevice);
} else {
// `measurements` is not empty (only in case of tablet event).
- // Match extractors by measurements.
- extractorsFilteredByDevice.forEach(
- extractor -> {
- if (matchedExtractors.size() == extractors.size()) {
+ // Match sources by measurements.
+ sourcesFilteredByDevice.forEach(
+ source -> {
+ if (matchedSources.size() == sources.size()) {
return;
}
- final TreePattern pattern = extractor.getTreePattern();
+ final TreePattern pattern = source.getTreePattern();
if (Objects.isNull(pattern) || pattern.isRoot() ||
pattern.coversDevice(device)) {
// The pattern can match all measurements of the device.
- matchedExtractors.add(extractor);
+ matchedSources.add(source);
} else {
for (final String measurement : measurements) {
// Ignore null measurement for partial insert
@@ -238,8 +238,8 @@ public class CachedSchemaPatternMatcher implements
PipeDataRegionMatcher {
}
if (pattern.matchesMeasurement(device, measurement)) {
- matchedExtractors.add(extractor);
- // There would be no more matched extractors because the
measurements are
+ matchedSources.add(source);
+ // There would be no more matched sources because the
measurements are
// unique
break;
}
@@ -249,69 +249,69 @@ public class CachedSchemaPatternMatcher implements
PipeDataRegionMatcher {
}
}
- protected Set<PipeRealtimeDataRegionSource> filterExtractorsByDevice(final
IDeviceID device) {
- final Set<PipeRealtimeDataRegionSource> filteredExtractors = new
HashSet<>();
+ protected Set<PipeRealtimeDataRegionSource> filterSourcesByDevice(final
IDeviceID device) {
+ final Set<PipeRealtimeDataRegionSource> filteredSources = new HashSet<>();
- for (final PipeRealtimeDataRegionSource extractor : extractors) {
- // Return if the extractor only extract deletion
- if (!extractor.shouldExtractInsertion()) {
+ for (final PipeRealtimeDataRegionSource source : sources) {
+ // Return if the source only extract deletion
+ if (!source.shouldExtractInsertion()) {
continue;
}
- final TreePattern treePattern = extractor.getTreePattern();
+ final TreePattern treePattern = source.getTreePattern();
if (Objects.isNull(treePattern)
|| (treePattern.isTreeModelDataAllowedToBeCaptured()
&& treePattern.mayOverlapWithDevice(device))) {
- filteredExtractors.add(extractor);
+ filteredSources.add(source);
}
}
- return filteredExtractors;
+ return filteredSources;
}
protected void matchTableModelEvent(
final String databaseName,
final IDeviceID tableName,
- final Set<PipeRealtimeDataRegionSource> matchedExtractors) {
+ final Set<PipeRealtimeDataRegionSource> matchedSources) {
// this would not happen
if (databaseName == null) {
LOGGER.warn(
- "Database name is null when matching extractors for table model
event.", new Exception());
+ "Database name is null when matching sources for table model
event.", new Exception());
return;
}
- final Set<PipeRealtimeDataRegionSource>
extractorsFilteredByDatabaseAndTable =
- databaseAndTableToExtractorsCache.get(
- new Pair<>(databaseName, tableName),
this::filterExtractorsByDatabaseAndTable);
+ final Set<PipeRealtimeDataRegionSource> sourcesFilteredByDatabaseAndTable =
+ databaseAndTableToSourcesCache.get(
+ new Pair<>(databaseName, tableName),
this::filterSourcesByDatabaseAndTable);
// this would not happen
- if (extractorsFilteredByDatabaseAndTable == null) {
+ if (sourcesFilteredByDatabaseAndTable == null) {
LOGGER.warn(
- "Extractors filtered by database and table is null when matching
extractors for table model event.",
+ "Sources filtered by database and table is null when matching
sources for table model event.",
new Exception());
return;
}
- matchedExtractors.addAll(extractorsFilteredByDatabaseAndTable);
+ matchedSources.addAll(sourcesFilteredByDatabaseAndTable);
}
- protected Set<PipeRealtimeDataRegionSource>
filterExtractorsByDatabaseAndTable(
+ protected Set<PipeRealtimeDataRegionSource> filterSourcesByDatabaseAndTable(
final Pair<String, IDeviceID> databaseNameAndTableName) {
- final Set<PipeRealtimeDataRegionSource> filteredExtractors = new
HashSet<>();
+ final Set<PipeRealtimeDataRegionSource> filteredSources = new HashSet<>();
- for (final PipeRealtimeDataRegionSource extractor : extractors) {
- // Return if the extractor only extract deletion
- if (!extractor.shouldExtractInsertion()) {
+ for (final PipeRealtimeDataRegionSource source : sources) {
+ // Return if the source only extract deletion
+ if (!source.shouldExtractInsertion()) {
continue;
}
- final TablePattern tablePattern = extractor.getTablePattern();
+ final TablePattern tablePattern = source.getTablePattern();
if (matchesTablePattern(tablePattern, databaseNameAndTableName)
- && (!extractor.isSkipIfNoPrivileges()
- || notFilteredByAccess(extractor.getUserName(),
databaseNameAndTableName))) {
- filteredExtractors.add(extractor);
+ && (!source.isSkipIfNoPrivileges()
+ || notFilteredByAccess(source.getUserName(),
databaseNameAndTableName))) {
+ filteredSources.add(source);
}
}
- return filteredExtractors;
+ return filteredSources;
}
private boolean matchesTablePattern(
@@ -335,11 +335,11 @@ public class CachedSchemaPatternMatcher implements
PipeDataRegionMatcher {
public void clear() {
lock.writeLock().lock();
try {
- extractors.clear();
- deviceToExtractorsCache.invalidateAll();
- deviceToExtractorsCache.cleanUp();
- databaseAndTableToExtractorsCache.invalidateAll();
- databaseAndTableToExtractorsCache.cleanUp();
+ sources.clear();
+ deviceToSourcesCache.invalidateAll();
+ deviceToSourcesCache.cleanUp();
+ databaseAndTableToSourcesCache.invalidateAll();
+ databaseAndTableToSourcesCache.cleanUp();
} finally {
lock.writeLock().unlock();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeDevicePathCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeDevicePathCache.java
index 07293bc4ca3..a9458e45f77 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeDevicePathCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeDevicePathCache.java
@@ -29,12 +29,9 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Weigher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/** This cache is for reducing duplicated DeviceId PartialPath initialization
in write process. */
public class DataNodeDevicePathCache {
- private static final Logger LOGGER =
LoggerFactory.getLogger(DataNodeDevicePathCache.class);
private static final DataNodeMemoryConfig memoryConfig =
IoTDBDescriptor.getInstance().getMemoryConfig();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionPathUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionPathUtils.java
index 8b7a29fbf3b..5e452708bca 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionPathUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionPathUtils.java
@@ -24,31 +24,34 @@ import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache;
+import org.apache.tsfile.common.constant.TsFileConstant;
import org.apache.tsfile.file.metadata.IDeviceID;
public class CompactionPathUtils {
private CompactionPathUtils() {}
- public static PartialPath getPath(IDeviceID device, String measurement)
+ public static MeasurementPath getPath(final IDeviceID device, final String
measurement)
throws IllegalPathException {
+ return getPath(device).concatAsMeasurementPath(measurement);
+ }
+
+ public static PartialPath getPath(final IDeviceID device) throws
IllegalPathException {
if (device.isTableModel()) {
- String[] tableNameSegments =
+ final String[] tableNameSegments =
DataNodeDevicePathCache.getInstance().getPartialPath(device.getTableName()).getNodes();
- String[] nodes = new String[device.segmentNum() +
tableNameSegments.length];
+ final String[] nodes = new String[device.segmentNum() +
tableNameSegments.length - 1];
System.arraycopy(tableNameSegments, 0, nodes, 0,
tableNameSegments.length);
for (int i = 0; i < device.segmentNum() - 1; i++) {
nodes[i + tableNameSegments.length] =
device.segment(i + 1) == null ? null : device.segment(i +
1).toString();
}
- nodes[device.segmentNum() + tableNameSegments.length - 1] = measurement;
- MeasurementPath path = new MeasurementPath(nodes);
- path.setDevice(device);
- return path;
+ return new PartialPath(nodes);
} else {
- return DataNodeDevicePathCache.getInstance()
- .getPartialPath(device.toString())
- .concatAsMeasurementPath(measurement);
+ final String deviceId = device.toString();
+ return deviceId.contains(TsFileConstant.BACK_QUOTE_STRING)
+ ? DataNodeDevicePathCache.getInstance().getPartialPath(deviceId)
+ : new
PartialPath(deviceId.split(TsFileConstant.PATH_SEPARATER_NO_REGEX));
}
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index b7b8bade89d..989d663e0ba 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -255,7 +255,7 @@ public class CommonConfig {
private int pipeExtractorAssignerDisruptorRingBufferSize = 128;
private long pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes = 72 *
KB;
- private long pipeExtractorMatcherCacheSize = 1024;
+ private long pipeSourceMatcherCacheSize = 1024;
private int pipeConnectorHandshakeTimeoutMs = 10 * 1000; // 10 seconds
private int pipeConnectorTransferTimeoutMs = 15 * 60 * 1000; // 15 minutes
@@ -988,16 +988,16 @@ public class CommonConfig {
pipeExtractorAssignerDisruptorRingBufferEntrySize);
}
- public long getPipeExtractorMatcherCacheSize() {
- return pipeExtractorMatcherCacheSize;
+ public long getPipeSourceMatcherCacheSize() {
+ return pipeSourceMatcherCacheSize;
}
- public void setPipeExtractorMatcherCacheSize(long
pipeExtractorMatcherCacheSize) {
- if (this.pipeExtractorMatcherCacheSize == pipeExtractorMatcherCacheSize) {
+ public void setPipeSourceMatcherCacheSize(long pipeSourceMatcherCacheSize) {
+ if (this.pipeSourceMatcherCacheSize == pipeSourceMatcherCacheSize) {
return;
}
- this.pipeExtractorMatcherCacheSize = pipeExtractorMatcherCacheSize;
- logger.info("pipeExtractorMatcherCacheSize is set to {}.",
pipeExtractorMatcherCacheSize);
+ this.pipeSourceMatcherCacheSize = pipeSourceMatcherCacheSize;
+ logger.info("pipeSourceMatcherCacheSize is set to {}.",
pipeSourceMatcherCacheSize);
}
public int getPipeConnectorHandshakeTimeoutMs() {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 2b47d4b525d..03aa5342753 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -159,8 +159,8 @@ public class PipeConfig {
return
COMMON_CONFIG.getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes();
}
- public long getPipeExtractorMatcherCacheSize() {
- return COMMON_CONFIG.getPipeExtractorMatcherCacheSize();
+ public long getPipeSourceMatcherCacheSize() {
+ return COMMON_CONFIG.getPipeSourceMatcherCacheSize();
}
/////////////////////////////// Connector ///////////////////////////////
@@ -474,7 +474,7 @@ public class PipeConfig {
LOGGER.info(
"PipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes: {}",
getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes());
- LOGGER.info("PipeExtractorMatcherCacheSize: {}",
getPipeExtractorMatcherCacheSize());
+ LOGGER.info("PipeExtractorMatcherCacheSize: {}",
getPipeSourceMatcherCacheSize());
LOGGER.info("PipeConnectorHandshakeTimeoutMs: {}",
getPipeConnectorHandshakeTimeoutMs());
LOGGER.info("PipeConnectorTransferTimeoutMs: {}",
getPipeConnectorTransferTimeoutMs());
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 45a3678ec9f..970d920313d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -322,13 +322,13 @@ public class PipeDescriptor {
config
.getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes())))));
- config.setPipeExtractorMatcherCacheSize(
+ config.setPipeSourceMatcherCacheSize(
Integer.parseInt(
-
Optional.ofNullable(properties.getProperty("pipe_extractor_matcher_cache_size"))
+
Optional.ofNullable(properties.getProperty("pipe_source_matcher_cache_size"))
.orElse(
properties.getProperty(
"pipe_extractor_matcher_cache_size",
-
String.valueOf(config.getPipeExtractorMatcherCacheSize())))));
+
String.valueOf(config.getPipeSourceMatcherCacheSize())))));
config.setPipeConnectorHandshakeTimeoutMs(
Long.parseLong(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBTreePattern.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBTreePattern.java
index 10557f02ae8..9a1d817dc60 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBTreePattern.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBTreePattern.java
@@ -40,6 +40,8 @@ import java.util.stream.Collectors;
public class IoTDBTreePattern extends TreePattern {
private final PartialPath patternPartialPath;
+ private static volatile DevicePathGetter devicePathGetter = PartialPath::new;
+ private static volatile MeasurementPathGetter measurementPathGetter =
MeasurementPath::new;
public IoTDBTreePattern(final boolean isTreeModelDataAllowedToBeCaptured,
final String pattern) {
super(isTreeModelDataAllowedToBeCaptured, pattern);
@@ -115,7 +117,7 @@ public class IoTDBTreePattern extends TreePattern {
try {
// Another way is to use patternPath.overlapWith("device.*"),
// there will be no false positives but time cost may be higher.
- return patternPartialPath.matchPrefixPath(new PartialPath(device));
+ return
patternPartialPath.matchPrefixPath(devicePathGetter.apply(device));
} catch (final IllegalPathException e) {
return false;
}
@@ -129,7 +131,7 @@ public class IoTDBTreePattern extends TreePattern {
}
try {
- return patternPartialPath.matchFullPath(new MeasurementPath(device,
measurement));
+ return
patternPartialPath.matchFullPath(measurementPathGetter.apply(device,
measurement));
} catch (final IllegalPathException e) {
return false;
}
@@ -208,8 +210,25 @@ public class IoTDBTreePattern extends TreePattern {
return PathPatternUtil.hasWildcard(patternPartialPath.getTailNode());
}
+ public static void setDevicePathGetter(final DevicePathGetter
devicePathGetter) {
+ IoTDBTreePattern.devicePathGetter = devicePathGetter;
+ }
+
+ public static void setMeasurementPathGetter(final MeasurementPathGetter
measurementPathGetter) {
+ IoTDBTreePattern.measurementPathGetter = measurementPathGetter;
+ }
+
@Override
public String toString() {
return "IoTDBPipePattern" + super.toString();
}
+
+ public interface DevicePathGetter {
+ PartialPath apply(final IDeviceID deviceId) throws IllegalPathException;
+ }
+
+ public interface MeasurementPathGetter {
+ MeasurementPath apply(final IDeviceID deviceId, final String measurement)
+ throws IllegalPathException;
+ }
}