This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new cff2ff15304 Pipe: Fixed the NPE when an emit is triggered by a generic
event in aggregate processor (#12298)
cff2ff15304 is described below
commit cff2ff15304a7a1dd8eeead156bea7d6f1c7d2af
Author: Caideyipi <[email protected]>
AuthorDate: Sun Apr 7 20:39:45 2024 +0800
Pipe: Fixed the NPE when an emit is triggered by a generic event in
aggregate processor (#12298)
---
.../common/tablet/PipeRawTabletInsertionEvent.java | 4 ++--
.../processor/aggregate/AggregateProcessor.java | 24 +++++++++-------------
2 files changed, 12 insertions(+), 16 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index c1102188682..9ccf707f4b4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -124,7 +124,7 @@ public class PipeRawTabletInsertionEvent extends
EnrichedEvent implements Tablet
@Override
public void bindProgressIndex(ProgressIndex overridingProgressIndex) {
- // Normally not all events need to report progress, but if the
overriddenProgressIndex
+ // Normally not all events need to report progress, but if the
overridingProgressIndex
// is given, indicating that the progress needs to be reported.
if (Objects.nonNull(overridingProgressIndex)) {
markAsNeedToReport();
@@ -135,7 +135,7 @@ public class PipeRawTabletInsertionEvent extends
EnrichedEvent implements Tablet
@Override
public ProgressIndex getProgressIndex() {
- // If the overriddenProgressIndex is given, ignore the sourceEvent's
progressIndex.
+ // If the overridingProgressIndex is given, ignore the sourceEvent's
progressIndex.
if (Objects.nonNull(overridingProgressIndex)) {
return overridingProgressIndex;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
index 472c8a6c8d2..7b855df1b43 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.pipe.agent.PipeAgent;
import
org.apache.iotdb.db.pipe.agent.plugin.dataregion.PipeDataRegionPluginAgent;
import org.apache.iotdb.db.pipe.event.common.row.PipeResetTabletRow;
import org.apache.iotdb.db.pipe.event.common.row.PipeRow;
+import org.apache.iotdb.db.pipe.event.common.row.PipeRowCollector;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
@@ -469,20 +470,15 @@ public class AggregateProcessor implements PipeProcessor {
final AtomicReference<TimeSeriesRuntimeState> stateReference =
pipeName2timeSeries2TimeSeriesRuntimeStateMap.get(pipeName).get(timeSeries);
synchronized (stateReference) {
- // This is only a formal tablet insertion event to collect
all the results
- final PipeRawTabletInsertionEvent tabletInsertionEvent =
- new PipeRawTabletInsertionEvent(
- null, false, pipeName, pipeTaskMeta, null, false);
- tabletInsertionEvent
- .processRowByRow(
- (row, rowCollector) -> {
- try {
- collectWindowOutputs(
- stateReference.get().forceOutput(),
timeSeries, rowCollector);
- } catch (Exception e) {
- exception.set(e);
- }
- })
+ PipeRowCollector rowCollector = new
PipeRowCollector(pipeTaskMeta, null);
+ try {
+ collectWindowOutputs(
+ stateReference.get().forceOutput(), timeSeries,
rowCollector);
+ } catch (IOException e) {
+ exception.set(e);
+ }
+ rowCollector
+ .convertToTabletInsertionEvents()
.forEach(
tabletEvent -> {
try {