This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch pipe-report-realtime-filtered-evemts in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit dd605568286cfc4ea75a73ee56d2450890bfef96 Author: Steve Yurong Su <[email protected]> AuthorDate: Fri Jul 18 12:11:23 2025 +0800 fix --- .../dataregion/realtime/assigner/PipeDataRegionAssigner.java | 1 + .../extractor/dataregion/realtime/matcher/PipeDataRegionMatcher.java | 5 ++++- .../apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java | 4 ++-- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java index 6238117ed89..6b92f4bccdc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java @@ -141,6 +141,7 @@ public class PipeDataRegionAssigner implements Closeable { matcher .match(event) + .getLeft() .forEach( extractor -> { if (disruptor.isClosed()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/PipeDataRegionMatcher.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/PipeDataRegionMatcher.java index 08df1b9f432..6efb3681cc9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/PipeDataRegionMatcher.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/matcher/PipeDataRegionMatcher.java @@ -22,6 +22,8 @@ package org.apache.iotdb.db.pipe.extractor.dataregion.realtime.matcher; import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent; import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionExtractor; +import org.apache.tsfile.utils.Pair; + import java.util.Set; public interface PipeDataRegionMatcher { @@ -48,7 +50,8 @@ public interface PipeDataRegionMatcher { * @param event the event to be matched * @return the matched extractors */ - Set<PipeRealtimeDataRegionExtractor> match(PipeRealtimeEvent event); + Pair<Set<PipeRealtimeDataRegionExtractor>, Set<PipeRealtimeDataRegionExtractor>> match( + PipeRealtimeEvent event); /** Clear all the registered extractors and internal data structures. */ void clear(); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java index 61f91fea8a3..37c57a3e8ef 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/CachedSchemaPatternMatcherTest.java @@ -165,12 +165,12 @@ public class CachedSchemaPatternMatcherTest { null, Collections.singletonMap(new StringArrayDeviceID("root.db" + i), measurements)); final long startTime = System.currentTimeMillis(); - matcher.match(event).forEach(extractor -> extractor.extract(event)); + matcher.match(event).getLeft().forEach(extractor -> extractor.extract(event)); totalTime += (System.currentTimeMillis() - startTime); } final MockedPipeRealtimeEvent event = new MockedPipeRealtimeEvent(null, null, deviceMap); final long startTime = System.currentTimeMillis(); - matcher.match(event).forEach(extractor -> extractor.extract(event)); + matcher.match(event).getLeft().forEach(extractor -> extractor.extract(event)); totalTime += (System.currentTimeMillis() - startTime); } System.out.println("matcher.getRegisterCount() = " + matcher.getRegisterCount());
