This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new cff2ff15304 Pipe: Fixed the NPE when an emit is triggered by a generic 
event in aggregate processor (#12298)
cff2ff15304 is described below

commit cff2ff15304a7a1dd8eeead156bea7d6f1c7d2af
Author: Caideyipi <[email protected]>
AuthorDate: Sun Apr 7 20:39:45 2024 +0800

    Pipe: Fixed the NPE when an emit is triggered by a generic event in 
aggregate processor (#12298)
---
 .../common/tablet/PipeRawTabletInsertionEvent.java |  4 ++--
 .../processor/aggregate/AggregateProcessor.java    | 24 +++++++++-------------
 2 files changed, 12 insertions(+), 16 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index c1102188682..9ccf707f4b4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -124,7 +124,7 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent implements Tablet
 
   @Override
   public void bindProgressIndex(ProgressIndex overridingProgressIndex) {
-    // Normally not all events need to report progress, but if the 
overriddenProgressIndex
+    // Normally not all events need to report progress, but if the 
overridingProgressIndex
     // is given, indicating that the progress needs to be reported.
     if (Objects.nonNull(overridingProgressIndex)) {
       markAsNeedToReport();
@@ -135,7 +135,7 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent implements Tablet
 
   @Override
   public ProgressIndex getProgressIndex() {
-    // If the overriddenProgressIndex is given, ignore the sourceEvent's 
progressIndex.
+    // If the overridingProgressIndex is given, ignore the sourceEvent's 
progressIndex.
     if (Objects.nonNull(overridingProgressIndex)) {
       return overridingProgressIndex;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
index 472c8a6c8d2..7b855df1b43 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import 
org.apache.iotdb.db.pipe.agent.plugin.dataregion.PipeDataRegionPluginAgent;
 import org.apache.iotdb.db.pipe.event.common.row.PipeResetTabletRow;
 import org.apache.iotdb.db.pipe.event.common.row.PipeRow;
+import org.apache.iotdb.db.pipe.event.common.row.PipeRowCollector;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
@@ -469,20 +470,15 @@ public class AggregateProcessor implements PipeProcessor {
                 final AtomicReference<TimeSeriesRuntimeState> stateReference =
                     
pipeName2timeSeries2TimeSeriesRuntimeStateMap.get(pipeName).get(timeSeries);
                 synchronized (stateReference) {
-                  // This is only a formal tablet insertion event to collect 
all the results
-                  final PipeRawTabletInsertionEvent tabletInsertionEvent =
-                      new PipeRawTabletInsertionEvent(
-                          null, false, pipeName, pipeTaskMeta, null, false);
-                  tabletInsertionEvent
-                      .processRowByRow(
-                          (row, rowCollector) -> {
-                            try {
-                              collectWindowOutputs(
-                                  stateReference.get().forceOutput(), 
timeSeries, rowCollector);
-                            } catch (Exception e) {
-                              exception.set(e);
-                            }
-                          })
+                  PipeRowCollector rowCollector = new 
PipeRowCollector(pipeTaskMeta, null);
+                  try {
+                    collectWindowOutputs(
+                        stateReference.get().forceOutput(), timeSeries, 
rowCollector);
+                  } catch (IOException e) {
+                    exception.set(e);
+                  }
+                  rowCollector
+                      .convertToTabletInsertionEvents()
                       .forEach(
                           tabletEvent -> {
                             try {

Reply via email to