This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e973e6725f8 Pipe: improve db-level pipe performance by removing
useless pattern check (#11386)
e973e6725f8 is described below
commit e973e6725f80f7c02adf5413773ed2c54e0341db
Author: Zikun Ma <[email protected]>
AuthorDate: Thu Oct 26 14:45:22 2023 +0800
Pipe: improve db-level pipe performance by removing useless pattern check
(#11386)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../apache/iotdb/db/pipe/event/EnrichedEvent.java | 17 +++++++---
.../common/tsfile/PipeTsFileInsertionEvent.java | 2 +-
.../db/pipe/event/realtime/PipeRealtimeEvent.java | 9 +++++
.../PipeHistoricalDataRegionTsFileExtractor.java | 16 +++++++++
.../realtime/PipeRealtimeDataRegionExtractor.java | 39 +++++++++++++++++-----
5 files changed, 69 insertions(+), 14 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
index 9817331b62b..9be4867afa5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
@@ -45,14 +45,15 @@ public abstract class EnrichedEvent implements Event {
protected final PipeTaskMeta pipeTaskMeta;
private final String pattern;
- protected boolean isPatternAndTimeParsed;
+
+ protected boolean isPatternParsed;
+ protected boolean isTimeParsed = true;
protected EnrichedEvent(PipeTaskMeta pipeTaskMeta, String pattern) {
referenceCount = new AtomicInteger(0);
this.pipeTaskMeta = pipeTaskMeta;
this.pattern = pattern;
- isPatternAndTimeParsed =
-
getPattern().equals(PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE);
+ isPatternParsed =
getPattern().equals(PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE);
}
/**
@@ -162,8 +163,16 @@ public abstract class EnrichedEvent implements Event {
return pattern == null ?
PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE : pattern;
}
+ /**
+ * If pipe's pattern is database-level, then no need to parse event by
pattern cause pipes are
+ * data-region-level.
+ */
+ public void skipParsingPattern() {
+ isPatternParsed = true;
+ }
+
public boolean shouldParsePatternOrTime() {
- return !isPatternAndTimeParsed;
+ return !isPatternParsed || !isTimeParsed;
}
public abstract EnrichedEvent
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 8914e56cbfa..43f439a45b4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -76,7 +76,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
this.needParseTime = needParseTime;
if (needParseTime) {
- this.isPatternAndTimeParsed = false;
+ isTimeParsed = false;
}
this.resource = resource;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
index 79994a3dd32..600df727f2c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
@@ -125,6 +125,15 @@ public class PipeRealtimeEvent extends EnrichedEvent {
return event.getProgressIndex();
}
+ /**
+ * If pipe's pattern is database-level, then no need to parse event by
pattern cause pipes are
+ * data-region-level.
+ */
+ @Override
+ public void skipParsingPattern() {
+ event.skipParsingPattern();
+ }
+
@Override
public PipeRealtimeEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
PipeTaskMeta pipeTaskMeta, String pattern) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 1a704b44722..37da7db751b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -69,6 +69,7 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
private int dataRegionId;
private String pattern;
+ private boolean isDbNameCoveredByPattern = false;
private long historicalDataExtractionStartTime; // Event time
private long historicalDataExtractionEndTime; // Event time
@@ -99,6 +100,16 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
}
pattern = parameters.getStringOrDefault(EXTRACTOR_PATTERN_KEY,
EXTRACTOR_PATTERN_DEFAULT_VALUE);
+ final DataRegion dataRegion =
+ StorageEngine.getInstance().getDataRegion(new
DataRegionId(environment.getRegionId()));
+ if (dataRegion != null) {
+ final String databaseName = dataRegion.getDatabaseName();
+ if (databaseName != null
+ && pattern.length() <= databaseName.length()
+ && databaseName.startsWith(pattern)) {
+ isDbNameCoveredByPattern = true;
+ }
+ }
// User may set the EXTRACTOR_HISTORY_START_TIME and
EXTRACTOR_HISTORY_END_TIME without
// enabling the historical data extraction, which may affect the realtime
data extraction.
@@ -295,6 +306,10 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
historicalDataExtractionStartTime,
historicalDataExtractionEndTime,
!sloppyTimeRange && !isTsFileResourceCoveredByTimeRange(resource));
+ if (isDbNameCoveredByPattern) {
+ event.skipParsingPattern();
+ }
+
event.increaseReferenceCount(PipeHistoricalDataRegionTsFileExtractor.class.getName());
try {
PipeResourceManager.tsfile().unpinTsFileResource(resource);
@@ -303,6 +318,7 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
"Pipe: failed to unpin TsFileResource after creating event, original
path: {}",
resource.getTsFilePath());
}
+
return event;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
index 07d7a1c4183..2634424ed0d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.extractor.realtime;
+import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant;
import
org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
@@ -26,6 +27,8 @@ import org.apache.iotdb.db.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import
org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNodeListener;
import org.apache.iotdb.db.pipe.task.connection.UnboundedBlockingPendingQueue;
+import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.pipe.api.PipeExtractor;
import
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
@@ -38,13 +41,15 @@ import java.util.concurrent.atomic.AtomicBoolean;
public abstract class PipeRealtimeDataRegionExtractor implements PipeExtractor
{
- protected String pattern;
- protected boolean isForwardingPipeRequests;
-
protected String pipeName;
protected String dataRegionId;
protected PipeTaskMeta pipeTaskMeta;
+ protected String pattern;
+ private boolean isDbNameCoveredByPattern = false;
+
+ protected boolean isForwardingPipeRequests;
+
// This queue is used to store pending events extracted by the method
extract(). The method
// supply() will poll events from this queue and send them to the next pipe
plugin.
protected final UnboundedBlockingPendingQueue<Event> pendingQueue =
@@ -64,20 +69,32 @@ public abstract class PipeRealtimeDataRegionExtractor
implements PipeExtractor {
@Override
public void customize(PipeParameters parameters,
PipeExtractorRuntimeConfiguration configuration)
throws Exception {
+ final PipeTaskExtractorRuntimeEnvironment environment =
+ (PipeTaskExtractorRuntimeEnvironment)
configuration.getRuntimeEnvironment();
+
+ pipeName = environment.getPipeName();
+ dataRegionId = String.valueOf(environment.getRegionId());
+ pipeTaskMeta = environment.getPipeTaskMeta();
+
pattern =
parameters.getStringOrDefault(
PipeExtractorConstant.EXTRACTOR_PATTERN_KEY,
PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE);
+ final DataRegion dataRegion =
+ StorageEngine.getInstance().getDataRegion(new
DataRegionId(environment.getRegionId()));
+ if (dataRegion != null) {
+ final String databaseName = dataRegion.getDatabaseName();
+ if (databaseName != null
+ && pattern.length() <= databaseName.length()
+ && databaseName.startsWith(pattern)) {
+ isDbNameCoveredByPattern = true;
+ }
+ }
+
isForwardingPipeRequests =
parameters.getBooleanOrDefault(
PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY,
PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE);
-
- final PipeTaskExtractorRuntimeEnvironment environment =
- (PipeTaskExtractorRuntimeEnvironment)
configuration.getRuntimeEnvironment();
- pipeName = environment.getPipeName();
- dataRegionId = String.valueOf(environment.getRegionId());
- pipeTaskMeta = environment.getPipeTaskMeta();
}
@Override
@@ -115,6 +132,10 @@ public abstract class PipeRealtimeDataRegionExtractor
implements PipeExtractor {
/** @param event the event from the storage engine */
public final void extract(PipeRealtimeEvent event) {
+ if (isDbNameCoveredByPattern) {
+ event.skipParsingPattern();
+ }
+
doExtract(event);
synchronized (isClosed) {