This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 85dfa900e38 Pipe: Fix PipeTerminateEvent cannot report and mark the 
pipe as completed & events with no generated events in pipe processor subtask 
may not be reported (#12817)
85dfa900e38 is described below

commit 85dfa900e38df149d43452f2810fac4eea9ee100
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jun 27 22:50:52 2024 +0800

    Pipe: Fix PipeTerminateEvent cannot report and mark the pipe as completed & 
events with no generated events in pipe processor subtask may not be reported 
(#12817)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../pipe/it/autocreate/IoTDBPipeAutoDropIT.java    |  6 ++++++
 .../pipe/task/connection/PipeEventCollector.java   | 17 ++++++++++++++--
 .../subtask/processor/PipeProcessorSubtask.java    | 23 +++++++++++++++++++---
 3 files changed, 41 insertions(+), 5 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoDropIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoDropIT.java
index 60c0ba37d4b..d09db4ec61f 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoDropIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoDropIT.java
@@ -24,15 +24,21 @@ import 
org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
 import org.apache.iotdb.db.it.utils.TestUtils;
 import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
 
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2AutoCreateSchema.class})
 public class IoTDBPipeAutoDropIT extends AbstractPipeDualAutoIT {
 
   @Test
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
index f760d6cb4fc..fb88c8983e5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
@@ -55,6 +55,7 @@ public class PipeEventCollector implements EventCollector {
   private final boolean forceTabletFormat;
 
   private final AtomicInteger collectInvocationCount = new AtomicInteger(0);
+  private boolean hasNoGeneratedEvent = true;
 
   public PipeEventCollector(
       final UnboundedBlockingPendingQueue<Event> pendingQueue,
@@ -96,6 +97,7 @@ public class PipeEventCollector implements EventCollector {
     if (sourceEvent.shouldParseTimeOrPattern()) {
       for (final PipeRawTabletInsertionEvent parsedEvent :
           sourceEvent.toRawTabletInsertionEvents()) {
+        hasNoGeneratedEvent = false;
         collectEvent(parsedEvent);
       }
     } else {
@@ -107,6 +109,7 @@ public class PipeEventCollector implements EventCollector {
     if (sourceEvent.shouldParseTimeOrPattern()) {
       final PipeRawTabletInsertionEvent parsedEvent = 
sourceEvent.parseEventWithPatternOrTime();
       if (!parsedEvent.hasNoNeedParsingAndIsEmpty()) {
+        hasNoGeneratedEvent = false;
         collectEvent(parsedEvent);
       }
     } else {
@@ -129,6 +132,7 @@ public class PipeEventCollector implements EventCollector {
 
     try {
       for (final TabletInsertionEvent parsedEvent : 
sourceEvent.toTabletInsertionEvents()) {
+        hasNoGeneratedEvent = false;
         collectEvent(parsedEvent);
       }
     } finally {
@@ -150,7 +154,11 @@ public class PipeEventCollector implements EventCollector {
                     deleteDataEvent.getPipeTaskMeta(),
                     deleteDataEvent.getPipePattern(),
                     deleteDataEvent.isGeneratedByPipe()))
-        .ifPresent(this::collectEvent);
+        .ifPresent(
+            event -> {
+              hasNoGeneratedEvent = false;
+              collectEvent(event);
+            });
   }
 
   private void collectEvent(final Event event) {
@@ -174,8 +182,9 @@ public class PipeEventCollector implements EventCollector {
     pendingQueue.directOffer(event);
   }
 
-  public void resetCollectInvocationCount() {
+  public void resetCollectInvocationCountAndGenerateFlag() {
     collectInvocationCount.set(0);
+    hasNoGeneratedEvent = true;
   }
 
   public long getCollectInvocationCount() {
@@ -185,4 +194,8 @@ public class PipeEventCollector implements EventCollector {
   public boolean hasNoCollectInvocationAfterReset() {
     return collectInvocationCount.get() == 0;
   }
+
+  public boolean hasNoGeneratedEvent() {
+    return hasNoGeneratedEvent;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
index 96e0911af3a..8532c29c202 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
@@ -129,7 +129,7 @@ public class PipeProcessorSubtask extends 
PipeReportableSubtask {
       return false;
     }
 
-    outputEventCollector.resetCollectInvocationCount();
+    outputEventCollector.resetCollectInvocationCountAndGenerateFlag();
     try {
       // event can be supplied after the subtask is closed, so we need to 
check isClosed here
       if (!isClosed.get()) {
@@ -158,10 +158,27 @@ public class PipeProcessorSubtask extends 
PipeReportableSubtask {
 
       final boolean shouldReport =
           !isClosed.get()
-              && outputEventCollector.hasNoCollectInvocationAfterReset()
+              // If an event does not generate any events except itself at 
this stage, it is divided
+              // into two categories:
+              // 1. If the event is collected and passed to the connector, the 
reference count of
+              // the event may eventually be zero in the processor (the 
connector reduces the
+              // reference count first, and then the processor reduces the 
reference count), at this
+              // time, the progress of the event needs to be reported.
+              // 2. If the event is not collected (not passed to the 
connector), the reference count
+              // of the event must be zero in the processor stage, at this 
time, the progress of the
+              // event needs to be reported.
+              && outputEventCollector.hasNoGeneratedEvent()
               // Events generated from consensusPipe's transferred data should 
never be reported.
               && !(pipeProcessor instanceof PipeConsensusProcessor);
-      if (shouldReport && event instanceof EnrichedEvent) {
+      if (shouldReport
+          && event instanceof EnrichedEvent
+          && outputEventCollector.hasNoCollectInvocationAfterReset()) {
+        // An event should be reported here when it is not passed to the 
connector stage, and it
+        // does not generate any new events to be passed to the connector. In 
our system, before
+        // reporting an event, we need to enrich a commitKey and commitId, 
which is done in the
+        // collector stage. But for the event that not passed to the connector 
and not generate any
+        // new events, the collector stage is not triggered, so we need to 
enrich the commitKey and
+        // commitId here.
         PipeEventCommitManager.getInstance()
             .enrichWithCommitterKeyAndCommitId((EnrichedEvent) event, 
creationTime, regionId);
       }

Reply via email to