This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-report-realtime-filtered-evemts
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit dd605568286cfc4ea75a73ee56d2450890bfef96
Author: Steve Yurong Su <[email protected]>
AuthorDate: Fri Jul 18 12:11:23 2025 +0800

    fix
---
 .../dataregion/realtime/assigner/PipeDataRegionAssigner.java         | 1 +
 .../extractor/dataregion/realtime/matcher/PipeDataRegionMatcher.java | 5 ++++-
 .../apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java | 4 ++--
 3 files changed, 7 insertions(+), 3 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index 6238117ed89..6b92f4bccdc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -141,6 +141,7 @@ public class PipeDataRegionAssigner implements Closeable {
 
     matcher
         .match(event)
+        .getLeft()
         .forEach(
             extractor -> {
               if (disruptor.isClosed()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/PipeDataRegionMatcher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/PipeDataRegionMatcher.java
index 08df1b9f432..6efb3681cc9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/PipeDataRegionMatcher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/PipeDataRegionMatcher.java
@@ -22,6 +22,8 @@ package 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.matcher;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionExtractor;
 
+import org.apache.tsfile.utils.Pair;
+
 import java.util.Set;
 
 public interface PipeDataRegionMatcher {
@@ -48,7 +50,8 @@ public interface PipeDataRegionMatcher {
    * @param event the event to be matched
    * @return the matched extractors
    */
-  Set<PipeRealtimeDataRegionExtractor> match(PipeRealtimeEvent event);
+  Pair<Set<PipeRealtimeDataRegionExtractor>, 
Set<PipeRealtimeDataRegionExtractor>> match(
+      PipeRealtimeEvent event);
 
   /** Clear all the registered extractors and internal data structures. */
   void clear();
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
index 61f91fea8a3..37c57a3e8ef 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java
@@ -165,12 +165,12 @@ public class CachedSchemaPatternMatcherTest {
                 null,
                 Collections.singletonMap(new StringArrayDeviceID("root.db" + 
i), measurements));
         final long startTime = System.currentTimeMillis();
-        matcher.match(event).forEach(extractor -> extractor.extract(event));
+        matcher.match(event).getLeft().forEach(extractor -> 
extractor.extract(event));
         totalTime += (System.currentTimeMillis() - startTime);
       }
       final MockedPipeRealtimeEvent event = new MockedPipeRealtimeEvent(null, 
null, deviceMap);
       final long startTime = System.currentTimeMillis();
-      matcher.match(event).forEach(extractor -> extractor.extract(event));
+      matcher.match(event).getLeft().forEach(extractor -> 
extractor.extract(event));
       totalTime += (System.currentTimeMillis() - startTime);
     }
     System.out.println("matcher.getRegisterCount() = " + 
matcher.getRegisterCount());

Reply via email to