This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new fad286945eb Fix recovered consensus pipes staying stopped after
snapshot load (#17438)
fad286945eb is described below
commit fad286945eb25182851015f5a643ee7bbc7be7a0
Author: Peng Junzhi <[email protected]>
AuthorDate: Wed Apr 8 16:11:31 2026 +0800
Fix recovered consensus pipes staying stopped after snapshot load (#17438)
* Fix recovered consensus pipes staying stopped after snapshot load
* Apply spotless formatting for consensus pipe recovery fix
---
.../confignode/persistence/pipe/PipeTaskInfo.java | 25 ++++++++++
.../pipe/PipeTaskInfoConsensusPipeTest.java | 57 ++++++++++++++++++++++
2 files changed, 82 insertions(+)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 78cf9a10eee..a3278d29fdd 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -957,11 +957,36 @@ public class PipeTaskInfo implements SnapshotProcessor {
try (final FileInputStream fileInputStream = new
FileInputStream(snapshotFile)) {
pipeMetaKeeper.processLoadSnapshot(fileInputStream);
}
+ normalizeRecoveredConsensusPipeStatus();
} finally {
releaseWriteLock();
}
}
+ private void normalizeRecoveredConsensusPipeStatus() {
+ final List<String> restartedConsensusPipes = new ArrayList<>();
+
+ pipeMetaKeeper
+ .getPipeMetaList()
+ .forEach(
+ pipeMeta -> {
+ final PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta();
+ if
(!PipeType.CONSENSUS.equals(pipeMeta.getStaticMeta().getPipeType())
+ || !PipeStatus.STOPPED.equals(runtimeMeta.getStatus().get())
+ || runtimeMeta.getIsStoppedByRuntimeException()) {
+ return;
+ }
+
+ runtimeMeta.getStatus().set(PipeStatus.RUNNING);
+
restartedConsensusPipes.add(pipeMeta.getStaticMeta().getPipeName());
+ });
+
+ if (!restartedConsensusPipes.isEmpty()) {
+ LOGGER.info(
+ "Recovered consensus pipes {} as RUNNING during snapshot load.",
restartedConsensusPipes);
+ }
+ }
+
/////////////////////////////// hashCode & equals
///////////////////////////////
@Override
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoConsensusPipeTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoConsensusPipeTest.java
index 094dcebe82f..22cd3988277 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoConsensusPipeTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoConsensusPipeTest.java
@@ -33,6 +33,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -149,4 +150,60 @@ public class PipeTaskInfoConsensusPipeTest {
Assert.assertTrue(result.containsKey(consensusPipeName));
Assert.assertFalse(result.containsKey(subscriptionPipeName));
}
+
+ @Test
+ public void
testProcessLoadSnapshotRestartsOnlyHealthyStoppedConsensusPipes() throws
Exception {
+ DataRegionId regionId = new DataRegionId(100);
+ String consensusPipeToRestart = new ConsensusPipeName(regionId, 1,
2).toString();
+ String consensusPipeStoppedByException = new ConsensusPipeName(regionId,
2, 1).toString();
+ String userPipeName = "userPipe";
+
+ createPipe(consensusPipeToRestart, PipeStatus.STOPPED);
+ createPipe(consensusPipeStoppedByException, PipeStatus.STOPPED);
+ createPipe(userPipeName, PipeStatus.STOPPED);
+
+ pipeTaskInfo
+ .getPipeMetaByPipeName(consensusPipeStoppedByException)
+ .getRuntimeMeta()
+ .setIsStoppedByRuntimeException(true);
+
+ final File snapshotDir =
+
java.nio.file.Files.createTempDirectory("pipe-task-info-consensus-test").toFile();
+ try {
+ Assert.assertTrue(pipeTaskInfo.processTakeSnapshot(snapshotDir));
+
+ PipeTaskInfo recoveredPipeTaskInfo = new PipeTaskInfo();
+ recoveredPipeTaskInfo.processLoadSnapshot(snapshotDir);
+
+ Assert.assertEquals(
+ PipeStatus.RUNNING,
+ recoveredPipeTaskInfo
+ .getPipeMetaByPipeName(consensusPipeToRestart)
+ .getRuntimeMeta()
+ .getStatus()
+ .get());
+ Assert.assertEquals(
+ PipeStatus.STOPPED,
+ recoveredPipeTaskInfo
+ .getPipeMetaByPipeName(consensusPipeStoppedByException)
+ .getRuntimeMeta()
+ .getStatus()
+ .get());
+ Assert.assertTrue(
+ recoveredPipeTaskInfo
+ .getPipeMetaByPipeName(consensusPipeStoppedByException)
+ .getRuntimeMeta()
+ .getIsStoppedByRuntimeException());
+ Assert.assertEquals(
+ PipeStatus.STOPPED,
+ recoveredPipeTaskInfo
+ .getPipeMetaByPipeName(userPipeName)
+ .getRuntimeMeta()
+ .getStatus()
+ .get());
+ } finally {
+ new File(snapshotDir, "pipe_task_info.bin").delete();
+ snapshotDir.delete();
+ }
+ }
}