This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch fix-sit
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1e594bb376ba86c1b462a231dca0035c01fd72c4
Author: Caideyipi <[email protected]>
AuthorDate: Thu Apr 30 16:15:45 2026 +0800

    fix
---
 .../confignode/persistence/pipe/PipeTaskInfo.java  |   8 ++
 .../pipe/PipeTaskInfoAutoRestartTest.java          | 124 +++++++++++++++++++++
 2 files changed, 132 insertions(+)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index a3278d29fdd..6b76b702379 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -823,6 +823,14 @@ public class PipeTaskInfo implements SnapshotProcessor {
                   final PipeRuntimeMeta runtimeMeta =
                       
pipeMetaKeeper.getPipeMeta(message.getPipeName()).getRuntimeMeta();
 
+                  // Keep user-stopped pipes out of the auto-restart flow. 
Otherwise, a failed
+                  // STOPPED meta sync can turn a manually stopped pipe into a 
runtime-stopped one
+                  // and the next PipeMetaSyncer round will restart it 
automatically.
+                  if (PipeStatus.STOPPED.equals(runtimeMeta.getStatus().get())
+                      && !runtimeMeta.getIsStoppedByRuntimeException()) {
+                    return;
+                  }
+
                   // Mark the status of the pipe with exception as stopped
                   runtimeMeta.getStatus().set(PipeStatus.STOPPED);
                   runtimeMeta.setIsStoppedByRuntimeException(true);
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoAutoRestartTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoAutoRestartTest.java
new file mode 100644
index 00000000000..986522ec78a
--- /dev/null
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoAutoRestartTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.persistence.pipe;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
+import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2;
+import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
+import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp;
+import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class PipeTaskInfoAutoRestartTest {
+
+  private static final int DATA_NODE_ID = 1;
+
+  private PipeTaskInfo pipeTaskInfo;
+
+  @Before
+  public void setUp() {
+    pipeTaskInfo = new PipeTaskInfo();
+  }
+
+  @Test
+  public void 
testRecordDataNodePushPipeMetaExceptionsMarksRunningPipeForAutoRestart() {
+    final String pipeName = "runningPipe";
+    createPipe(pipeName, PipeStatus.RUNNING);
+
+    
Assert.assertTrue(pipeTaskInfo.recordDataNodePushPipeMetaExceptions(createErrorRespMap(pipeName)));
+
+    final PipeRuntimeMeta runtimeMeta = 
pipeTaskInfo.getPipeMetaByPipeName(pipeName).getRuntimeMeta();
+    Assert.assertEquals(PipeStatus.STOPPED, runtimeMeta.getStatus().get());
+    Assert.assertTrue(runtimeMeta.getIsStoppedByRuntimeException());
+
+    Assert.assertTrue(pipeTaskInfo.autoRestart());
+    Assert.assertEquals(PipeStatus.RUNNING, runtimeMeta.getStatus().get());
+  }
+
+  @Test
+  public void 
testRecordDataNodePushPipeMetaExceptionsKeepsUserStoppedPipeOutOfAutoRestart() {
+    final String pipeName = "stoppedPipe";
+    createPipe(pipeName, PipeStatus.STOPPED);
+
+    Assert.assertTrue(pipeTaskInfo.isPipeStoppedByUser(pipeName));
+    
Assert.assertTrue(pipeTaskInfo.recordDataNodePushPipeMetaExceptions(createErrorRespMap(pipeName)));
+
+    final PipeRuntimeMeta runtimeMeta = 
pipeTaskInfo.getPipeMetaByPipeName(pipeName).getRuntimeMeta();
+    Assert.assertEquals(PipeStatus.STOPPED, runtimeMeta.getStatus().get());
+    Assert.assertFalse(runtimeMeta.getIsStoppedByRuntimeException());
+    Assert.assertTrue(pipeTaskInfo.isPipeStoppedByUser(pipeName));
+
+    Assert.assertFalse(pipeTaskInfo.autoRestart());
+    Assert.assertEquals(PipeStatus.STOPPED, runtimeMeta.getStatus().get());
+  }
+
+  private Map<Integer, TPushPipeMetaResp> createErrorRespMap(final String 
pipeName) {
+    final TPushPipeMetaRespExceptionMessage exceptionMessage =
+        new TPushPipeMetaRespExceptionMessage(
+            pipeName, "failed to push pipe meta", System.currentTimeMillis());
+    final TPushPipeMetaResp resp =
+        new TPushPipeMetaResp()
+            .setStatus(new 
TSStatus(TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode()))
+            .setExceptionMessages(Collections.singletonList(exceptionMessage));
+    return Collections.singletonMap(DATA_NODE_ID, resp);
+  }
+
+  private void createPipe(final String pipeName, final PipeStatus 
initialStatus) {
+    final Map<String, String> extractorAttributes = new HashMap<>();
+    final Map<String, String> processorAttributes = new HashMap<>();
+    final Map<String, String> connectorAttributes = new HashMap<>();
+    extractorAttributes.put("extractor", "iotdb-source");
+    processorAttributes.put("processor", "do-nothing-processor");
+    connectorAttributes.put("connector", "iotdb-thrift-sink");
+
+    final PipeTaskMeta pipeTaskMeta = new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, DATA_NODE_ID);
+    final ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new 
ConcurrentHashMap<>();
+    pipeTasks.put(DATA_NODE_ID, pipeTaskMeta);
+
+    final PipeStaticMeta pipeStaticMeta =
+        new PipeStaticMeta(
+            pipeName,
+            System.currentTimeMillis(),
+            extractorAttributes,
+            processorAttributes,
+            connectorAttributes);
+    final PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta(pipeTasks);
+    pipeTaskInfo.createPipe(new CreatePipePlanV2(pipeStaticMeta, 
pipeRuntimeMeta));
+
+    if (PipeStatus.RUNNING.equals(initialStatus)) {
+      pipeTaskInfo.setPipeStatus(new SetPipeStatusPlanV2(pipeName, 
PipeStatus.RUNNING));
+    }
+  }
+}

Reply via email to