This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new be1fdde9891 Pipe: Fix ProgressReportEvent may be transferred / marked 
as rate in config subtask (#13161)
be1fdde9891 is described below

commit be1fdde9891b46f5973501e6c6045514bb564dca
Author: Caideyipi <[email protected]>
AuthorDate: Wed Aug 14 15:59:46 2024 +0800

    Pipe: Fix ProgressReportEvent may be transferred / marked as rate in config 
subtask (#13161)
---
 .../confignode/manager/pipe/execution/PipeConfigNodeSubtask.java   | 7 +++++--
 .../org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java   | 5 +++++
 2 files changed, 10 insertions(+), 2 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
index 02cb5373359..64c164705e2 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
@@ -25,6 +25,7 @@ import 
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeC
 import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
 import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskRuntimeEnvironment;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
 import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
 import org.apache.iotdb.commons.pipe.progress.PipeEventCommitManager;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
@@ -181,10 +182,12 @@ public class PipeConfigNodeSubtask extends 
PipeAbstractConnectorSubtask {
         return false;
       }
 
-      outputPipeConnector.transfer(event);
+      if (!(event instanceof ProgressReportEvent)) {
+        outputPipeConnector.transfer(event);
+        PipeConfigRegionConnectorMetrics.getInstance().markConfigEvent(taskID);
+      }
       decreaseReferenceCountAndReleaseLastEvent(true);
 
-      PipeConfigRegionConnectorMetrics.getInstance().markConfigEvent(taskID);
     } catch (final PipeException e) {
       setLastExceptionEvent(event);
       if (!isClosed.get()) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java
index 700f8f16387..e3e580ba1dd 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/ProgressReportEvent.java
@@ -88,4 +88,9 @@ public class ProgressReportEvent extends EnrichedEvent {
   public boolean mayEventPathsOverlappedWithPattern() {
     return true;
   }
+
+  @Override
+  public String toString() {
+    return "ProgressReportEvent - " + super.toString();
+  }
 }

Reply via email to