This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new fad286945eb Fix recovered consensus pipes staying stopped after 
snapshot load (#17438)
fad286945eb is described below

commit fad286945eb25182851015f5a643ee7bbc7be7a0
Author: Peng Junzhi <[email protected]>
AuthorDate: Wed Apr 8 16:11:31 2026 +0800

    Fix recovered consensus pipes staying stopped after snapshot load (#17438)
    
    * Fix recovered consensus pipes staying stopped after snapshot load
    
    * Apply spotless formatting for consensus pipe recovery fix
---
 .../confignode/persistence/pipe/PipeTaskInfo.java  | 25 ++++++++++
 .../pipe/PipeTaskInfoConsensusPipeTest.java        | 57 ++++++++++++++++++++++
 2 files changed, 82 insertions(+)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 78cf9a10eee..a3278d29fdd 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -957,11 +957,36 @@ public class PipeTaskInfo implements SnapshotProcessor {
       try (final FileInputStream fileInputStream = new 
FileInputStream(snapshotFile)) {
         pipeMetaKeeper.processLoadSnapshot(fileInputStream);
       }
+      normalizeRecoveredConsensusPipeStatus();
     } finally {
       releaseWriteLock();
     }
   }
 
+  private void normalizeRecoveredConsensusPipeStatus() {
+    final List<String> restartedConsensusPipes = new ArrayList<>();
+
+    pipeMetaKeeper
+        .getPipeMetaList()
+        .forEach(
+            pipeMeta -> {
+              final PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta();
+              if 
(!PipeType.CONSENSUS.equals(pipeMeta.getStaticMeta().getPipeType())
+                  || !PipeStatus.STOPPED.equals(runtimeMeta.getStatus().get())
+                  || runtimeMeta.getIsStoppedByRuntimeException()) {
+                return;
+              }
+
+              runtimeMeta.getStatus().set(PipeStatus.RUNNING);
+              
restartedConsensusPipes.add(pipeMeta.getStaticMeta().getPipeName());
+            });
+
+    if (!restartedConsensusPipes.isEmpty()) {
+      LOGGER.info(
+          "Recovered consensus pipes {} as RUNNING during snapshot load.", 
restartedConsensusPipes);
+    }
+  }
+
   /////////////////////////////// hashCode & equals 
///////////////////////////////
 
   @Override
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoConsensusPipeTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoConsensusPipeTest.java
index 094dcebe82f..22cd3988277 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoConsensusPipeTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoConsensusPipeTest.java
@@ -33,6 +33,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.File;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -149,4 +150,60 @@ public class PipeTaskInfoConsensusPipeTest {
     Assert.assertTrue(result.containsKey(consensusPipeName));
     Assert.assertFalse(result.containsKey(subscriptionPipeName));
   }
+
+  @Test
+  public void 
testProcessLoadSnapshotRestartsOnlyHealthyStoppedConsensusPipes() throws 
Exception {
+    DataRegionId regionId = new DataRegionId(100);
+    String consensusPipeToRestart = new ConsensusPipeName(regionId, 1, 
2).toString();
+    String consensusPipeStoppedByException = new ConsensusPipeName(regionId, 
2, 1).toString();
+    String userPipeName = "userPipe";
+
+    createPipe(consensusPipeToRestart, PipeStatus.STOPPED);
+    createPipe(consensusPipeStoppedByException, PipeStatus.STOPPED);
+    createPipe(userPipeName, PipeStatus.STOPPED);
+
+    pipeTaskInfo
+        .getPipeMetaByPipeName(consensusPipeStoppedByException)
+        .getRuntimeMeta()
+        .setIsStoppedByRuntimeException(true);
+
+    final File snapshotDir =
+        
java.nio.file.Files.createTempDirectory("pipe-task-info-consensus-test").toFile();
+    try {
+      Assert.assertTrue(pipeTaskInfo.processTakeSnapshot(snapshotDir));
+
+      PipeTaskInfo recoveredPipeTaskInfo = new PipeTaskInfo();
+      recoveredPipeTaskInfo.processLoadSnapshot(snapshotDir);
+
+      Assert.assertEquals(
+          PipeStatus.RUNNING,
+          recoveredPipeTaskInfo
+              .getPipeMetaByPipeName(consensusPipeToRestart)
+              .getRuntimeMeta()
+              .getStatus()
+              .get());
+      Assert.assertEquals(
+          PipeStatus.STOPPED,
+          recoveredPipeTaskInfo
+              .getPipeMetaByPipeName(consensusPipeStoppedByException)
+              .getRuntimeMeta()
+              .getStatus()
+              .get());
+      Assert.assertTrue(
+          recoveredPipeTaskInfo
+              .getPipeMetaByPipeName(consensusPipeStoppedByException)
+              .getRuntimeMeta()
+              .getIsStoppedByRuntimeException());
+      Assert.assertEquals(
+          PipeStatus.STOPPED,
+          recoveredPipeTaskInfo
+              .getPipeMetaByPipeName(userPipeName)
+              .getRuntimeMeta()
+              .getStatus()
+              .get());
+    } finally {
+      new File(snapshotDir, "pipe_task_info.bin").delete();
+      snapshotDir.delete();
+    }
+  }
 }

Reply via email to