This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e973e6725f8 Pipe: improve db-level pipe performance by removing 
useless pattern check (#11386)
e973e6725f8 is described below

commit e973e6725f80f7c02adf5413773ed2c54e0341db
Author: Zikun Ma <[email protected]>
AuthorDate: Thu Oct 26 14:45:22 2023 +0800

    Pipe: improve db-level pipe performance by removing useless pattern check 
(#11386)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../apache/iotdb/db/pipe/event/EnrichedEvent.java  | 17 +++++++---
 .../common/tsfile/PipeTsFileInsertionEvent.java    |  2 +-
 .../db/pipe/event/realtime/PipeRealtimeEvent.java  |  9 +++++
 .../PipeHistoricalDataRegionTsFileExtractor.java   | 16 +++++++++
 .../realtime/PipeRealtimeDataRegionExtractor.java  | 39 +++++++++++++++++-----
 5 files changed, 69 insertions(+), 14 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
index 9817331b62b..9be4867afa5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
@@ -45,14 +45,15 @@ public abstract class EnrichedEvent implements Event {
   protected final PipeTaskMeta pipeTaskMeta;
 
   private final String pattern;
-  protected boolean isPatternAndTimeParsed;
+
+  protected boolean isPatternParsed;
+  protected boolean isTimeParsed = true;
 
   protected EnrichedEvent(PipeTaskMeta pipeTaskMeta, String pattern) {
     referenceCount = new AtomicInteger(0);
     this.pipeTaskMeta = pipeTaskMeta;
     this.pattern = pattern;
-    isPatternAndTimeParsed =
-        
getPattern().equals(PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE);
+    isPatternParsed = 
getPattern().equals(PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE);
   }
 
   /**
@@ -162,8 +163,16 @@ public abstract class EnrichedEvent implements Event {
     return pattern == null ? 
PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE : pattern;
   }
 
+  /**
+   * If pipe's pattern is database-level, then no need to parse event by 
pattern cause pipes are
+   * data-region-level.
+   */
+  public void skipParsingPattern() {
+    isPatternParsed = true;
+  }
+
   public boolean shouldParsePatternOrTime() {
-    return !isPatternAndTimeParsed;
+    return !isPatternParsed || !isTimeParsed;
   }
 
   public abstract EnrichedEvent 
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 8914e56cbfa..43f439a45b4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -76,7 +76,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
     this.needParseTime = needParseTime;
 
     if (needParseTime) {
-      this.isPatternAndTimeParsed = false;
+      isTimeParsed = false;
     }
 
     this.resource = resource;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
index 79994a3dd32..600df727f2c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
@@ -125,6 +125,15 @@ public class PipeRealtimeEvent extends EnrichedEvent {
     return event.getProgressIndex();
   }
 
+  /**
+   * If pipe's pattern is database-level, then no need to parse event by 
pattern cause pipes are
+   * data-region-level.
+   */
+  @Override
+  public void skipParsingPattern() {
+    event.skipParsingPattern();
+  }
+
   @Override
   public PipeRealtimeEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
       PipeTaskMeta pipeTaskMeta, String pattern) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 1a704b44722..37da7db751b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -69,6 +69,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
   private int dataRegionId;
 
   private String pattern;
+  private boolean isDbNameCoveredByPattern = false;
 
   private long historicalDataExtractionStartTime; // Event time
   private long historicalDataExtractionEndTime; // Event time
@@ -99,6 +100,16 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
     }
 
     pattern = parameters.getStringOrDefault(EXTRACTOR_PATTERN_KEY, 
EXTRACTOR_PATTERN_DEFAULT_VALUE);
+    final DataRegion dataRegion =
+        StorageEngine.getInstance().getDataRegion(new 
DataRegionId(environment.getRegionId()));
+    if (dataRegion != null) {
+      final String databaseName = dataRegion.getDatabaseName();
+      if (databaseName != null
+          && pattern.length() <= databaseName.length()
+          && databaseName.startsWith(pattern)) {
+        isDbNameCoveredByPattern = true;
+      }
+    }
 
     // User may set the EXTRACTOR_HISTORY_START_TIME and 
EXTRACTOR_HISTORY_END_TIME without
     // enabling the historical data extraction, which may affect the realtime 
data extraction.
@@ -295,6 +306,10 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
             historicalDataExtractionStartTime,
             historicalDataExtractionEndTime,
             !sloppyTimeRange && !isTsFileResourceCoveredByTimeRange(resource));
+    if (isDbNameCoveredByPattern) {
+      event.skipParsingPattern();
+    }
+
     
event.increaseReferenceCount(PipeHistoricalDataRegionTsFileExtractor.class.getName());
     try {
       PipeResourceManager.tsfile().unpinTsFileResource(resource);
@@ -303,6 +318,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
           "Pipe: failed to unpin TsFileResource after creating event, original 
path: {}",
           resource.getTsFilePath());
     }
+
     return event;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
index 07d7a1c4183..2634424ed0d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.extractor.realtime;
 
+import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant;
 import 
org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
@@ -26,6 +27,8 @@ import org.apache.iotdb.db.pipe.event.EnrichedEvent;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
 import 
org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNodeListener;
 import org.apache.iotdb.db.pipe.task.connection.UnboundedBlockingPendingQueue;
+import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import org.apache.iotdb.pipe.api.PipeExtractor;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
@@ -38,13 +41,15 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 public abstract class PipeRealtimeDataRegionExtractor implements PipeExtractor 
{
 
-  protected String pattern;
-  protected boolean isForwardingPipeRequests;
-
   protected String pipeName;
   protected String dataRegionId;
   protected PipeTaskMeta pipeTaskMeta;
 
+  protected String pattern;
+  private boolean isDbNameCoveredByPattern = false;
+
+  protected boolean isForwardingPipeRequests;
+
   // This queue is used to store pending events extracted by the method 
extract(). The method
   // supply() will poll events from this queue and send them to the next pipe 
plugin.
   protected final UnboundedBlockingPendingQueue<Event> pendingQueue =
@@ -64,20 +69,32 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
   @Override
   public void customize(PipeParameters parameters, 
PipeExtractorRuntimeConfiguration configuration)
       throws Exception {
+    final PipeTaskExtractorRuntimeEnvironment environment =
+        (PipeTaskExtractorRuntimeEnvironment) 
configuration.getRuntimeEnvironment();
+
+    pipeName = environment.getPipeName();
+    dataRegionId = String.valueOf(environment.getRegionId());
+    pipeTaskMeta = environment.getPipeTaskMeta();
+
     pattern =
         parameters.getStringOrDefault(
             PipeExtractorConstant.EXTRACTOR_PATTERN_KEY,
             PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE);
+    final DataRegion dataRegion =
+        StorageEngine.getInstance().getDataRegion(new 
DataRegionId(environment.getRegionId()));
+    if (dataRegion != null) {
+      final String databaseName = dataRegion.getDatabaseName();
+      if (databaseName != null
+          && pattern.length() <= databaseName.length()
+          && databaseName.startsWith(pattern)) {
+        isDbNameCoveredByPattern = true;
+      }
+    }
+
     isForwardingPipeRequests =
         parameters.getBooleanOrDefault(
             PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY,
             
PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE);
-
-    final PipeTaskExtractorRuntimeEnvironment environment =
-        (PipeTaskExtractorRuntimeEnvironment) 
configuration.getRuntimeEnvironment();
-    pipeName = environment.getPipeName();
-    dataRegionId = String.valueOf(environment.getRegionId());
-    pipeTaskMeta = environment.getPipeTaskMeta();
   }
 
   @Override
@@ -115,6 +132,10 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
 
   /** @param event the event from the storage engine */
   public final void extract(PipeRealtimeEvent event) {
+    if (isDbNameCoveredByPattern) {
+      event.skipParsingPattern();
+    }
+
     doExtract(event);
 
     synchronized (isClosed) {

Reply via email to