This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 85dfa900e38 Pipe: Fix PipeTerminateEvent cannot report and mark the
pipe as completed & events with no generated events in pipe processor subtask
may not be reported (#12817)
85dfa900e38 is described below
commit 85dfa900e38df149d43452f2810fac4eea9ee100
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jun 27 22:50:52 2024 +0800
Pipe: Fix PipeTerminateEvent cannot report and mark the pipe as completed &
events with no generated events in pipe processor subtask may not be reported
(#12817)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../pipe/it/autocreate/IoTDBPipeAutoDropIT.java | 6 ++++++
.../pipe/task/connection/PipeEventCollector.java | 17 ++++++++++++++--
.../subtask/processor/PipeProcessorSubtask.java | 23 +++++++++++++++++++---
3 files changed, 41 insertions(+), 5 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoDropIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoDropIT.java
index 60c0ba37d4b..d09db4ec61f 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoDropIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoDropIT.java
@@ -24,15 +24,21 @@ import
org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema;
import org.apache.iotdb.rpc.TSStatusCode;
import org.junit.Assert;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2AutoCreateSchema.class})
public class IoTDBPipeAutoDropIT extends AbstractPipeDualAutoIT {
@Test
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
index f760d6cb4fc..fb88c8983e5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
@@ -55,6 +55,7 @@ public class PipeEventCollector implements EventCollector {
private final boolean forceTabletFormat;
private final AtomicInteger collectInvocationCount = new AtomicInteger(0);
+ private boolean hasNoGeneratedEvent = true;
public PipeEventCollector(
final UnboundedBlockingPendingQueue<Event> pendingQueue,
@@ -96,6 +97,7 @@ public class PipeEventCollector implements EventCollector {
if (sourceEvent.shouldParseTimeOrPattern()) {
for (final PipeRawTabletInsertionEvent parsedEvent :
sourceEvent.toRawTabletInsertionEvents()) {
+ hasNoGeneratedEvent = false;
collectEvent(parsedEvent);
}
} else {
@@ -107,6 +109,7 @@ public class PipeEventCollector implements EventCollector {
if (sourceEvent.shouldParseTimeOrPattern()) {
final PipeRawTabletInsertionEvent parsedEvent =
sourceEvent.parseEventWithPatternOrTime();
if (!parsedEvent.hasNoNeedParsingAndIsEmpty()) {
+ hasNoGeneratedEvent = false;
collectEvent(parsedEvent);
}
} else {
@@ -129,6 +132,7 @@ public class PipeEventCollector implements EventCollector {
try {
for (final TabletInsertionEvent parsedEvent :
sourceEvent.toTabletInsertionEvents()) {
+ hasNoGeneratedEvent = false;
collectEvent(parsedEvent);
}
} finally {
@@ -150,7 +154,11 @@ public class PipeEventCollector implements EventCollector {
deleteDataEvent.getPipeTaskMeta(),
deleteDataEvent.getPipePattern(),
deleteDataEvent.isGeneratedByPipe()))
- .ifPresent(this::collectEvent);
+ .ifPresent(
+ event -> {
+ hasNoGeneratedEvent = false;
+ collectEvent(event);
+ });
}
private void collectEvent(final Event event) {
@@ -174,8 +182,9 @@ public class PipeEventCollector implements EventCollector {
pendingQueue.directOffer(event);
}
- public void resetCollectInvocationCount() {
+ public void resetCollectInvocationCountAndGenerateFlag() {
collectInvocationCount.set(0);
+ hasNoGeneratedEvent = true;
}
public long getCollectInvocationCount() {
@@ -185,4 +194,8 @@ public class PipeEventCollector implements EventCollector {
public boolean hasNoCollectInvocationAfterReset() {
return collectInvocationCount.get() == 0;
}
+
+ public boolean hasNoGeneratedEvent() {
+ return hasNoGeneratedEvent;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
index 96e0911af3a..8532c29c202 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
@@ -129,7 +129,7 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
return false;
}
- outputEventCollector.resetCollectInvocationCount();
+ outputEventCollector.resetCollectInvocationCountAndGenerateFlag();
try {
// event can be supplied after the subtask is closed, so we need to
check isClosed here
if (!isClosed.get()) {
@@ -158,10 +158,27 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
final boolean shouldReport =
!isClosed.get()
- && outputEventCollector.hasNoCollectInvocationAfterReset()
+ // If an event does not generate any events except itself at
this stage, it is divided
+ // into two categories:
+ // 1. If the event is collected and passed to the connector, the
reference count of
+ // the event may eventually be zero in the processor (the
connector reduces the
+ // reference count first, and then the processor reduces the
reference count), at this
+ // time, the progress of the event needs to be reported.
+ // 2. If the event is not collected (not passed to the
connector), the reference count
+ // of the event must be zero in the processor stage, at this
time, the progress of the
+ // event needs to be reported.
+ && outputEventCollector.hasNoGeneratedEvent()
// Events generated from consensusPipe's transferred data should
never be reported.
&& !(pipeProcessor instanceof PipeConsensusProcessor);
- if (shouldReport && event instanceof EnrichedEvent) {
+ if (shouldReport
+ && event instanceof EnrichedEvent
+ && outputEventCollector.hasNoCollectInvocationAfterReset()) {
+ // An event should be reported here when it is not passed to the
connector stage, and it
+ // does not generate any new events to be passed to the connector. In
our system, before
+ // reporting an event, we need to enrich a commitKey and commitId,
which is done in the
+ // collector stage. But for the event that not passed to the connector
and not generate any
+ // new events, the collector stage is not triggered, so we need to
enrich the commitKey and
+ // commitId here.
PipeEventCommitManager.getInstance()
.enrichWithCommitterKeyAndCommitId((EnrichedEvent) event,
creationTime, regionId);
}