This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch record-progress-when-shutdown
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 766c8c1afa1b780ba9d3dfced8f17075fb506a0b
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Jun 18 19:02:00 2025 +0800

    Pipe: Persist progress index before shutdown to accurate recovery after 
restart
---
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java     | 19 +++++++++++++++++++
 .../apache/iotdb/db/service/DataNodeShutdownHook.java |  2 ++
 .../commons/pipe/agent/task/meta/PipeRuntimeMeta.java |  9 +++++++++
 .../commons/pipe/agent/task/meta/PipeTaskMeta.java    |  2 +-
 4 files changed, 31 insertions(+), 1 deletion(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 8fd099baa06..15d99f2cff9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -840,6 +840,25 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     return isSnapshotMode;
   }
 
+  ///////////////////////// Shutdown Logic /////////////////////////
+
+  public void persistAllProgressIndexLocally() {
+    if (!tryReadLockWithTimeOut(10)) {
+      LOGGER.info("Failed to persist all progress index locally because of 
timeout.");
+      return;
+    }
+    try {
+      for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
+        pipeMeta.getRuntimeMeta().persistProgressIndex();
+      }
+      LOGGER.info("Persist all progress index locally successfully.");
+    } catch (final Exception e) {
+      LOGGER.warn("Failed to record all progress index locally, because {}.", 
e.getMessage(), e);
+    } finally {
+      releaseReadLock();
+    }
+  }
+
   ///////////////////////// Pipe Consensus /////////////////////////
 
   public ProgressIndex getPipeTaskProgressIndex(final String pipeName, final 
int consensusGroupId) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
index 731c4b09da1..efa628bbcbf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
@@ -118,6 +118,8 @@ public class DataNodeShutdownHook extends Thread {
       triggerSnapshotForAllDataRegion();
     }
 
+    // Persist progress index before shutdown to accurate recovery after 
restart
+    PipeDataNodeAgent.task().persistAllProgressIndexLocally();
     // Shutdown all consensus pipe's receiver
     PipeDataNodeAgent.receiver().pipeConsensus().closeReceiverExecutor();
     // Shutdown pipe progressIndex background service
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java
index 6f3ddd9e0ad..ff77564bd2f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java
@@ -140,6 +140,15 @@ public class PipeRuntimeMeta {
     this.isStoppedByRuntimeException.set(isStoppedByRuntimeException);
   }
 
+  public void persistProgressIndex() {
+    // Iterate through all the task metas and persist their progress index
+    for (final PipeTaskMeta taskMeta : consensusGroupId2TaskMetaMap.values()) {
+      if (taskMeta.getProgressIndex() != null) {
+        taskMeta.persistProgressIndex();
+      }
+    }
+  }
+
   /**
    * We use negative regionId to identify the external pipe source, which is 
not a consensus group
    * id. Then we can reuse the regionId to schedule the external pipe source 
and store the progress
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
index 6a4ab25db7e..2bd40510dc4 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
@@ -138,7 +138,7 @@ public class PipeTaskMeta {
     return progressIndex.get();
   }
 
-  private synchronized void persistProgressIndex() {
+  public synchronized void persistProgressIndex() {
     if (lastPersistCount.get() == updateCount.get()) {
       // in case of multiple threads calling updateProgressIndex at the same 
time
       return;

Reply via email to