This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 504810cd406 Pipe: Fixed the bug that schema region and config region
cannot report progress (#12528)
504810cd406 is described below
commit 504810cd40600ec043cd9c42d91b43f840a02b5e
Author: Caideyipi <[email protected]>
AuthorDate: Wed May 15 10:53:19 2024 +0800
Pipe: Fixed the bug that schema region and config region cannot report
progress (#12528)
---
.../pipe/execution/PipeConfigNodeSubtask.java | 9 +++++++++
.../pipe/extractor/IoTDBConfigRegionExtractor.java | 23 ++++++++++++++++++++++
.../commons/pipe/event/PipeWritePlanEvent.java | 15 ++------------
3 files changed, 34 insertions(+), 13 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
index 31c058aa058..fc7281410a2 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
@@ -26,6 +26,7 @@ import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeE
import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskRuntimeEnvironment;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
+import org.apache.iotdb.commons.pipe.progress.PipeEventCommitManager;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.task.subtask.PipeAbstractConnectorSubtask;
import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
@@ -48,6 +49,7 @@ public class PipeConfigNodeSubtask extends
PipeAbstractConnectorSubtask {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeConfigNodeSubtask.class);
+ private final String pipeName;
private final PipeTaskMeta pipeTaskMeta;
// Pipe plugins for this subtask
@@ -67,11 +69,15 @@ public class PipeConfigNodeSubtask extends
PipeAbstractConnectorSubtask {
throws Exception {
// We initialize outputPipeConnector by initConnector()
super(pipeName, creationTime, null);
+ this.pipeName = pipeName;
this.pipeTaskMeta = pipeTaskMeta;
initExtractor(extractorAttributes);
initProcessor(processorAttributes);
initConnector(connectorAttributes);
+
+ PipeEventCommitManager.getInstance()
+ .register(pipeName, creationTime, CONFIG_REGION_ID.getId(), pipeName +
"_" + creationTime);
}
private void initExtractor(Map<String, String> extractorAttributes) throws
Exception {
@@ -205,6 +211,9 @@ public class PipeConfigNodeSubtask extends
PipeAbstractConnectorSubtask {
public void close() {
isClosed.set(true);
+ PipeEventCommitManager.getInstance()
+ .deregister(pipeName, creationTime, CONFIG_REGION_ID.getId());
+
try {
extractor.close();
} catch (Exception e) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/IoTDBConfigRegionExtractor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/IoTDBConfigRegionExtractor.java
index 70b665cc93f..3e996ca7dc2 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/IoTDBConfigRegionExtractor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/IoTDBConfigRegionExtractor.java
@@ -22,8 +22,10 @@ package org.apache.iotdb.confignode.manager.pipe.extractor;
import org.apache.iotdb.commons.consensus.ConfigRegionId;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import
org.apache.iotdb.commons.pipe.datastructure.queue.listening.AbstractPipeListeningQueue;
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.event.PipeSnapshotEvent;
import org.apache.iotdb.commons.pipe.extractor.IoTDBNonDataRegionExtractor;
+import org.apache.iotdb.commons.pipe.progress.PipeEventCommitManager;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
@@ -86,6 +88,14 @@ public class IoTDBConfigRegionExtractor extends
IoTDBNonDataRegionExtractor {
}
}
+ @Override
+ public synchronized EnrichedEvent supply() throws Exception {
+ final EnrichedEvent event = super.supply();
+ PipeEventCommitManager.getInstance()
+ .enrichWithCommitterKeyAndCommitId(event, creationTime, regionId);
+ return event;
+ }
+
@Override
protected long getMaxBlockingTimeMs() {
// The connector continues to submit and relies on the queue to sleep if
empty
@@ -103,4 +113,17 @@ public class IoTDBConfigRegionExtractor extends
IoTDBNonDataRegionExtractor {
protected void confineHistoricalEventTransferTypes(final PipeSnapshotEvent
event) {
((PipeConfigRegionSnapshotEvent)
event).confineTransferredTypes(listenedTypeSet);
}
+
+ @Override
+ public synchronized void close() throws Exception {
+ if (hasBeenClosed.get()) {
+ return;
+ }
+ hasBeenClosed.set(true);
+
+ if (!hasBeenStarted.get()) {
+ return;
+ }
+ super.close();
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
index 2e9619d65fa..8cf1a7d3154 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
@@ -43,26 +43,15 @@ public abstract class PipeWritePlanEvent extends
EnrichedEvent implements Serial
this.isGeneratedByPipe = isGeneratedByPipe;
}
- /**
- * This event doesn't share resources with other events, so no need to
maintain reference count.
- * We just use a counter to prevent the reference count from being less than
0.
- */
+ /** This event doesn't share resources with other events. */
@Override
public boolean internallyIncreaseResourceReferenceCount(final String
holderMessage) {
- referenceCount.incrementAndGet();
return true;
}
- /**
- * This event doesn't share resources with other events, so no need to
maintain reference count.
- * We just use a counter to prevent the reference count from being less than
0.
- */
+ /** This event doesn't share resources with other events. */
@Override
public boolean internallyDecreaseResourceReferenceCount(final String
holderMessage) {
- final long count = referenceCount.decrementAndGet();
- if (count < 0) {
- LOGGER.warn("The reference count is less than 0, may need to check the
implementation.");
- }
return true;
}