This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 504810cd406 Pipe: Fixed the bug that schema region and config region 
cannot report progress (#12528)
504810cd406 is described below

commit 504810cd40600ec043cd9c42d91b43f840a02b5e
Author: Caideyipi <[email protected]>
AuthorDate: Wed May 15 10:53:19 2024 +0800

    Pipe: Fixed the bug that schema region and config region cannot report 
progress (#12528)
---
 .../pipe/execution/PipeConfigNodeSubtask.java      |  9 +++++++++
 .../pipe/extractor/IoTDBConfigRegionExtractor.java | 23 ++++++++++++++++++++++
 .../commons/pipe/event/PipeWritePlanEvent.java     | 15 ++------------
 3 files changed, 34 insertions(+), 13 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
index 31c058aa058..fc7281410a2 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
@@ -26,6 +26,7 @@ import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeE
 import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskRuntimeEnvironment;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
+import org.apache.iotdb.commons.pipe.progress.PipeEventCommitManager;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.task.subtask.PipeAbstractConnectorSubtask;
 import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
@@ -48,6 +49,7 @@ public class PipeConfigNodeSubtask extends 
PipeAbstractConnectorSubtask {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeConfigNodeSubtask.class);
 
+  private final String pipeName;
   private final PipeTaskMeta pipeTaskMeta;
 
   // Pipe plugins for this subtask
@@ -67,11 +69,15 @@ public class PipeConfigNodeSubtask extends 
PipeAbstractConnectorSubtask {
       throws Exception {
     // We initialize outputPipeConnector by initConnector()
     super(pipeName, creationTime, null);
+    this.pipeName = pipeName;
     this.pipeTaskMeta = pipeTaskMeta;
 
     initExtractor(extractorAttributes);
     initProcessor(processorAttributes);
     initConnector(connectorAttributes);
+
+    PipeEventCommitManager.getInstance()
+        .register(pipeName, creationTime, CONFIG_REGION_ID.getId(), pipeName + 
"_" + creationTime);
   }
 
   private void initExtractor(Map<String, String> extractorAttributes) throws 
Exception {
@@ -205,6 +211,9 @@ public class PipeConfigNodeSubtask extends 
PipeAbstractConnectorSubtask {
   public void close() {
     isClosed.set(true);
 
+    PipeEventCommitManager.getInstance()
+        .deregister(pipeName, creationTime, CONFIG_REGION_ID.getId());
+
     try {
       extractor.close();
     } catch (Exception e) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/IoTDBConfigRegionExtractor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/IoTDBConfigRegionExtractor.java
index 70b665cc93f..3e996ca7dc2 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/IoTDBConfigRegionExtractor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/IoTDBConfigRegionExtractor.java
@@ -22,8 +22,10 @@ package org.apache.iotdb.confignode.manager.pipe.extractor;
 import org.apache.iotdb.commons.consensus.ConfigRegionId;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import 
org.apache.iotdb.commons.pipe.datastructure.queue.listening.AbstractPipeListeningQueue;
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.event.PipeSnapshotEvent;
 import org.apache.iotdb.commons.pipe.extractor.IoTDBNonDataRegionExtractor;
+import org.apache.iotdb.commons.pipe.progress.PipeEventCommitManager;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
 import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
@@ -86,6 +88,14 @@ public class IoTDBConfigRegionExtractor extends 
IoTDBNonDataRegionExtractor {
     }
   }
 
+  @Override
+  public synchronized EnrichedEvent supply() throws Exception {
+    final EnrichedEvent event = super.supply();
+    PipeEventCommitManager.getInstance()
+        .enrichWithCommitterKeyAndCommitId(event, creationTime, regionId);
+    return event;
+  }
+
   @Override
   protected long getMaxBlockingTimeMs() {
     // The connector continues to submit and relies on the queue to sleep if 
empty
@@ -103,4 +113,17 @@ public class IoTDBConfigRegionExtractor extends 
IoTDBNonDataRegionExtractor {
   protected void confineHistoricalEventTransferTypes(final PipeSnapshotEvent 
event) {
     ((PipeConfigRegionSnapshotEvent) 
event).confineTransferredTypes(listenedTypeSet);
   }
+
+  @Override
+  public synchronized void close() throws Exception {
+    if (hasBeenClosed.get()) {
+      return;
+    }
+    hasBeenClosed.set(true);
+
+    if (!hasBeenStarted.get()) {
+      return;
+    }
+    super.close();
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
index 2e9619d65fa..8cf1a7d3154 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/PipeWritePlanEvent.java
@@ -43,26 +43,15 @@ public abstract class PipeWritePlanEvent extends 
EnrichedEvent implements Serial
     this.isGeneratedByPipe = isGeneratedByPipe;
   }
 
-  /**
-   * This event doesn't share resources with other events, so no need to 
maintain reference count.
-   * We just use a counter to prevent the reference count from being less than 
0.
-   */
+  /** This event doesn't share resources with other events. */
   @Override
   public boolean internallyIncreaseResourceReferenceCount(final String 
holderMessage) {
-    referenceCount.incrementAndGet();
     return true;
   }
 
-  /**
-   * This event doesn't share resources with other events, so no need to 
maintain reference count.
-   * We just use a counter to prevent the reference count from being less than 
0.
-   */
+  /** This event doesn't share resources with other events. */
   @Override
   public boolean internallyDecreaseResourceReferenceCount(final String 
holderMessage) {
-    final long count = referenceCount.decrementAndGet();
-    if (count < 0) {
-      LOGGER.warn("The reference count is less than 0, may need to check the 
implementation.");
-    }
     return true;
   }
 

Reply via email to