This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-realtime-loose-range
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 161583cbea96888f65dd8487aea03b20a4ba91be
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon Jun 17 18:35:13 2024 +0800

    Pipe: Support "source.realtime.loose-range" = "path" in iotdb-source
---
 .../common/tsfile/PipeTsFileInsertionEvent.java    |  4 ++
 .../realtime/PipeRealtimeDataRegionExtractor.java  | 57 ++++++++++++++++++++++
 2 files changed, 61 insertions(+)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index a7c0cc5c255..8744eb6cb69 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -159,6 +159,10 @@ public class PipeTsFileInsertionEvent extends 
EnrichedEvent implements TsFileIns
     return !resource.isEmpty();
   }
 
+  public TsFileResource getResource() {
+    return resource;
+  }
+
   public File getTsFile() {
     return tsFile;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
index 262f2d824aa..62b665441d8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
@@ -30,13 +30,18 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.utils.TimePartitionUtils;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
 import org.apache.iotdb.db.pipe.extractor.dataregion.DataRegionListeningFilter;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.listener.PipeInsertionDataNodeListener;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.listener.PipeTimePartitionListener;
 import org.apache.iotdb.db.pipe.metric.PipeDataRegionEventCounter;
+import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import org.apache.iotdb.db.utils.DateTimeUtils;
 import org.apache.iotdb.pipe.api.PipeExtractor;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
@@ -45,14 +50,18 @@ import 
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
 
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.PlainDeviceID;
 import org.apache.tsfile.utils.Pair;
 import org.checkerframework.checker.nullness.qual.NonNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -307,6 +316,11 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
         // only skip parsing time for events whose data timestamps may 
intersect with the time range
         event.skipParsingTime();
       }
+      if (sloppyPattern && mayEventPathsOverlappedWithPattern(event)) {
+        // only skip parsing pattern for events whose data paths may intersect 
with the pattern
+        event.skipParsingPattern();
+      }
+
       doExtract(event);
     } else {
       
event.decreaseReferenceCount(PipeRealtimeDataRegionExtractor.class.getName(), 
false);
@@ -319,6 +333,49 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
     }
   }
 
+  private boolean mayEventPathsOverlappedWithPattern(final PipeRealtimeEvent 
realtimeEvent) {
+    final EnrichedEvent event = realtimeEvent.getEvent();
+
+    if (event instanceof PipeInsertNodeTabletInsertionEvent) {
+      final String deviceId = ((PipeInsertNodeTabletInsertionEvent) 
event).getDeviceId();
+      return Objects.isNull(deviceId) || 
pipePattern.mayOverlapWithDevice(deviceId);
+    }
+
+    if (event instanceof PipeTsFileInsertionEvent) {
+      final TsFileResource resource = ((PipeTsFileInsertionEvent) 
event).getResource();
+      if (!resource.isClosed()) {
+        return true;
+      }
+
+      try {
+        final Map<IDeviceID, Boolean> deviceIsAlignedMap =
+            PipeResourceManager.tsfile()
+                .getDeviceIsAlignedMapFromCache(
+                    PipeTsFileResourceManager.getHardlinkOrCopiedFileInPipeDir(
+                        resource.getTsFile()));
+        final Set<IDeviceID> deviceSet =
+            Objects.nonNull(deviceIsAlignedMap)
+                ? deviceIsAlignedMap.keySet()
+                : resource.getDevices();
+        return deviceSet.stream()
+            .anyMatch(
+                // TODO: use IDeviceID
+                deviceID ->
+                    pipePattern.mayOverlapWithDevice(((PlainDeviceID) 
deviceID).toStringID()));
+      } catch (final IOException e) {
+        LOGGER.warn(
+            "Pipe {}@{}: failed to get devices from TsFile {}, extract it 
anyway",
+            pipeName,
+            dataRegionId,
+            resource.getTsFilePath(),
+            e);
+        return true;
+      }
+    }
+
+    return false;
+  }
+
   protected abstract void doExtract(final PipeRealtimeEvent event);
 
   protected void extractHeartbeat(final PipeRealtimeEvent event) {

Reply via email to