This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 3b96b715bb6 [To dev/1.3] Pipe: Reset tablet 
pipeDataStructureTabletSizeInBytes to 16MB & Enable stopping exception restart 
by manual stop pipe (#17588) (#17604)
3b96b715bb6 is described below

commit 3b96b715bb6ac81474e203e5c1737c08029213b7
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 7 09:34:34 2026 +0800

    [To dev/1.3] Pipe: Reset tablet pipeDataStructureTabletSizeInBytes to 16MB 
& Enable stopping exception restart by manual stop pipe (#17588) (#17604)
    
    * Fixed the bug that create attribute does not support attribute.None & 
Pipe: Reset tablet pipeDataStructureTabletSizeInBytes to 16MB & Enable stopping 
exception restart by manual stop pipe (#17588)
    
    * stop pipe
    
    * 16
    
    * fix
    
    * legacy
    
    * Update PipeTaskCoordinator.java
    
    * fix-access
---
 .../consensus/request/ConfigPhysicalPlan.java      |   4 +
 .../consensus/request/ConfigPhysicalPlanType.java  |   1 +
 ...eStatusWithStoppedByRuntimeExceptionPlanV2.java | 109 +++++++++++++++++++++
 .../pipe/coordinator/task/PipeTaskCoordinator.java |  13 +--
 .../persistence/executor/ConfigPlanExecutor.java   |   4 +
 .../confignode/persistence/pipe/PipeInfo.java      |  20 ++++
 .../confignode/persistence/pipe/PipeTaskInfo.java  |  20 ++++
 .../impl/pipe/task/StopPipeProcedureV2.java        |  26 ++++-
 .../request/ConfigPhysicalPlanSerDeTest.java       |  25 ++++-
 .../impl/pipe/task/StopPipeProcedureV2Test.java    | 105 ++++++++++++++++++++
 .../apache/iotdb/commons/conf/CommonConfig.java    |   4 +-
 11 files changed, 311 insertions(+), 20 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
index b7a0936ca45..06c10c39723 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
@@ -73,6 +73,7 @@ import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipeP
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.OperateMultiplePipesPlanV2;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
+import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusWithStoppedByRuntimeExceptionPlanV2;
 import 
org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProcedurePlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.quota.SetSpaceQuotaPlan;
@@ -354,6 +355,9 @@ public abstract class ConfigPhysicalPlan implements 
IConsensusRequest {
         case SetPipeStatusV2:
           plan = new SetPipeStatusPlanV2();
           break;
+        case SetPipeStatusWithStoppedByRuntimeExceptionV2:
+          plan = new SetPipeStatusWithStoppedByRuntimeExceptionPlanV2();
+          break;
         case DropPipeV2:
           plan = new DropPipePlanV2();
           break;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
index 24e5b9ba9c6..b9862995966 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
@@ -217,6 +217,7 @@ public enum ConfigPhysicalPlanType {
   ShowPipeV2((short) 1503),
   AlterPipeV2((short) 1504),
   OperateMultiplePipesV2((short) 1505),
+  SetPipeStatusWithStoppedByRuntimeExceptionV2((short) 1506),
 
   /** Pipe Runtime. */
   PipeHandleLeaderChange((short) 1600),
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/task/SetPipeStatusWithStoppedByRuntimeExceptionPlanV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/task/SetPipeStatusWithStoppedByRuntimeExceptionPlanV2.java
new file mode 100644
index 00000000000..35ee503d5c5
--- /dev/null
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/task/SetPipeStatusWithStoppedByRuntimeExceptionPlanV2.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.consensus.request.write.pipe.task;
+
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+public class SetPipeStatusWithStoppedByRuntimeExceptionPlanV2 extends 
ConfigPhysicalPlan {
+
+  private String pipeName;
+  private PipeStatus status;
+  private boolean stoppedByRuntimeException;
+
+  public SetPipeStatusWithStoppedByRuntimeExceptionPlanV2() {
+    super(ConfigPhysicalPlanType.SetPipeStatusWithStoppedByRuntimeExceptionV2);
+  }
+
+  public SetPipeStatusWithStoppedByRuntimeExceptionPlanV2(
+      final String pipeName, final PipeStatus status, final boolean 
stoppedByRuntimeException) {
+    super(ConfigPhysicalPlanType.SetPipeStatusWithStoppedByRuntimeExceptionV2);
+    this.pipeName = pipeName;
+    this.status = status;
+    this.stoppedByRuntimeException = stoppedByRuntimeException;
+  }
+
+  public String getPipeName() {
+    return pipeName;
+  }
+
+  public PipeStatus getPipeStatus() {
+    return status;
+  }
+
+  public boolean isStoppedByRuntimeException() {
+    return stoppedByRuntimeException;
+  }
+
+  @Override
+  protected void serializeImpl(final DataOutputStream stream) throws 
IOException {
+    stream.writeShort(getType().getPlanType());
+    ReadWriteIOUtils.write(pipeName, stream);
+    ReadWriteIOUtils.write(status.getType(), stream);
+    ReadWriteIOUtils.write(stoppedByRuntimeException, stream);
+  }
+
+  @Override
+  protected void deserializeImpl(final ByteBuffer buffer) throws IOException {
+    pipeName = ReadWriteIOUtils.readString(buffer);
+    status = PipeStatus.getPipeStatus(ReadWriteIOUtils.readByte(buffer));
+    stoppedByRuntimeException = ReadWriteIOUtils.readBool(buffer);
+  }
+
+  @Override
+  public boolean equals(final Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    final SetPipeStatusWithStoppedByRuntimeExceptionPlanV2 that =
+        (SetPipeStatusWithStoppedByRuntimeExceptionPlanV2) obj;
+    return stoppedByRuntimeException == that.stoppedByRuntimeException
+        && pipeName.equals(that.pipeName)
+        && status.equals(that.status);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(pipeName, status, stoppedByRuntimeException);
+  }
+
+  @Override
+  public String toString() {
+    return "SetPipeStatusWithStoppedByRuntimeExceptionPlanV2{"
+        + "pipeName='"
+        + pipeName
+        + "', status='"
+        + status
+        + "', stoppedByRuntimeException='"
+        + stoppedByRuntimeException
+        + "'}";
+  }
+}
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
index ee1ccd2a8fb..f3430db3932 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
@@ -120,19 +120,8 @@ public class PipeTaskCoordinator {
 
   /** Caller should ensure that the method is called in the lock {@link 
#lock()}. */
   public TSStatus stopPipe(String pipeName) {
-    final boolean isStoppedByRuntimeException = 
pipeTaskInfo.isStoppedByRuntimeException(pipeName);
     final TSStatus status = 
configManager.getProcedureManager().stopPipe(pipeName);
-    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      if (isStoppedByRuntimeException) {
-        // Even if the return status is success, it doesn't imply the success 
of the
-        // `executeFromOperateOnDataNodes` phase of stopping pipe. However, we 
still need to set
-        // `isStoppedByRuntimeException` to false to avoid auto-restart. 
Meanwhile,
-        // `isStoppedByRuntimeException` does not need to be synchronized with 
DNs.
-        LOGGER.info("Pipe {} has stopped manually, stop its auto restart 
process.", pipeName);
-        pipeTaskInfo.setIsStoppedByRuntimeExceptionToFalse(pipeName);
-        configManager.getProcedureManager().pipeHandleMetaChange(true, false);
-      }
-    } else {
+    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       LOGGER.warn("Failed to stop pipe {}. Result status: {}.", pipeName, 
status);
     }
     return status;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index 4facb308d8a..09017a841c7 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -98,6 +98,7 @@ import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipeP
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.OperateMultiplePipesPlanV2;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
+import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusWithStoppedByRuntimeExceptionPlanV2;
 import 
org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProcedurePlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.quota.SetSpaceQuotaPlan;
@@ -497,6 +498,9 @@ public class ConfigPlanExecutor {
         return pipeInfo.createPipe((CreatePipePlanV2) physicalPlan);
       case SetPipeStatusV2:
         return pipeInfo.setPipeStatus((SetPipeStatusPlanV2) physicalPlan);
+      case SetPipeStatusWithStoppedByRuntimeExceptionV2:
+        return pipeInfo.setPipeStatusWithStoppedByRuntimeException(
+            (SetPipeStatusWithStoppedByRuntimeExceptionPlanV2) physicalPlan);
       case DropPipeV2:
         return pipeInfo.dropPipe((DropPipePlanV2) physicalPlan);
       case AlterPipeV2:
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
index d09e5c82845..032534f9161 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipeP
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.OperateMultiplePipesPlanV2;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
+import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusWithStoppedByRuntimeExceptionPlanV2;
 import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
 import 
org.apache.iotdb.confignode.manager.pipe.agent.runtime.PipeConfigRegionListener;
 import 
org.apache.iotdb.confignode.manager.pipe.agent.task.PipeConfigNodeSubtask;
@@ -124,6 +125,25 @@ public class PipeInfo implements SnapshotProcessor {
     }
   }
 
+  public TSStatus setPipeStatusWithStoppedByRuntimeException(
+      final SetPipeStatusWithStoppedByRuntimeExceptionPlanV2 plan) {
+    try {
+      pipeTaskInfo.setPipeStatusWithStoppedByRuntimeException(plan);
+
+      PipeConfigNodeAgent.task()
+          
.handleSinglePipeMetaChanges(pipeTaskInfo.getPipeMetaByPipeName(plan.getPipeName()));
+      PipeTemporaryMetaInCoordinatorMetrics.getInstance()
+          .handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList());
+      return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    } catch (final Exception e) {
+      LOGGER.error("Failed to set pipe status with 
stopped-by-runtime-exception flag", e);
+      return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
+          .setMessage(
+              "Failed to set pipe status with stopped-by-runtime-exception 
flag, because "
+                  + e.getMessage());
+    }
+  }
+
   public TSStatus dropPipe(final DropPipePlanV2 plan) {
     try {
       final Optional<PipeMeta> pipeMetaBeforeDrop =
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 199b835c839..022144f1a91 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -48,6 +48,7 @@ import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipeP
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.OperateMultiplePipesPlanV2;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
+import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusWithStoppedByRuntimeExceptionPlanV2;
 import org.apache.iotdb.confignode.consensus.response.pipe.task.PipeTableResp;
 import 
org.apache.iotdb.confignode.manager.pipe.resource.PipeConfigNodeResourceManager;
 import 
org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeHandleMetaChangeProcedure;
@@ -523,6 +524,25 @@ public class PipeTaskInfo implements SnapshotProcessor {
     }
   }
 
+  public TSStatus setPipeStatusWithStoppedByRuntimeException(
+      final SetPipeStatusWithStoppedByRuntimeExceptionPlanV2 plan) {
+    acquireWriteLock();
+    try {
+      pipeMetaKeeper
+          .getPipeMeta(plan.getPipeName())
+          .getRuntimeMeta()
+          .getStatus()
+          .set(plan.getPipeStatus());
+      pipeMetaKeeper
+          .getPipeMeta(plan.getPipeName())
+          .getRuntimeMeta()
+          .setIsStoppedByRuntimeException(plan.isStoppedByRuntimeException());
+      return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    } finally {
+      releaseWriteLock();
+    }
+  }
+
   public TSStatus dropPipe(final DropPipePlanV2 plan) {
     acquireWriteLock();
     try {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
index 5349cc65640..b2e1a584ec5 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.confignode.procedure.impl.pipe.task;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
-import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
+import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusWithStoppedByRuntimeExceptionPlanV2;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import 
org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProcedureV2;
 import org.apache.iotdb.confignode.procedure.impl.pipe.PipeTaskOperation;
@@ -44,6 +44,7 @@ public class StopPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(StopPipeProcedureV2.class);
 
   private String pipeName;
+  private boolean isStoppedByRuntimeExceptionBeforeStop;
 
   public StopPipeProcedureV2() {
     super();
@@ -71,7 +72,8 @@ public class StopPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
   @Override
   public void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) 
throws PipeException {
     LOGGER.info("StopPipeProcedureV2: executeFromCalculateInfoForTask({})", 
pipeName);
-    // Do nothing
+    isStoppedByRuntimeExceptionBeforeStop =
+        pipeTaskInfo.get().isStoppedByRuntimeException(pipeName);
   }
 
   @Override
@@ -83,7 +85,9 @@ public class StopPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
       response =
           env.getConfigManager()
               .getConsensusManager()
-              .write(new SetPipeStatusPlanV2(pipeName, PipeStatus.STOPPED));
+              .write(
+                  new SetPipeStatusWithStoppedByRuntimeExceptionPlanV2(
+                      pipeName, PipeStatus.STOPPED, false));
     } catch (ConsensusException e) {
       LOGGER.warn("Failed in the write API executing the consensus layer due 
to: ", e);
       response = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
@@ -128,7 +132,9 @@ public class StopPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
       response =
           env.getConfigManager()
               .getConsensusManager()
-              .write(new SetPipeStatusPlanV2(pipeName, PipeStatus.RUNNING));
+              .write(
+                  new SetPipeStatusWithStoppedByRuntimeExceptionPlanV2(
+                      pipeName, PipeStatus.RUNNING, 
isStoppedByRuntimeExceptionBeforeStop));
     } catch (ConsensusException e) {
       LOGGER.warn("Failed in the write API executing the consensus layer due 
to: ", e);
       response = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
@@ -159,12 +165,16 @@ public class StopPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
     stream.writeShort(ProcedureType.STOP_PIPE_PROCEDURE_V2.getTypeCode());
     super.serialize(stream);
     ReadWriteIOUtils.write(pipeName, stream);
+    ReadWriteIOUtils.write(isStoppedByRuntimeExceptionBeforeStop, stream);
   }
 
   @Override
   public void deserialize(ByteBuffer byteBuffer) {
     super.deserialize(byteBuffer);
     pipeName = ReadWriteIOUtils.readString(byteBuffer);
+    // Legacy persisted procedures do not carry this field.
+    isStoppedByRuntimeExceptionBeforeStop =
+        byteBuffer.hasRemaining() && ReadWriteIOUtils.readBool(byteBuffer);
   }
 
   @Override
@@ -179,11 +189,17 @@ public class StopPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
     return getProcId() == that.getProcId()
         && getCurrentState().equals(that.getCurrentState())
         && getCycles() == that.getCycles()
+        && isStoppedByRuntimeExceptionBeforeStop == 
that.isStoppedByRuntimeExceptionBeforeStop
         && pipeName.equals(that.pipeName);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(getProcId(), getCurrentState(), getCycles(), pipeName);
+    return Objects.hash(
+        getProcId(),
+        getCurrentState(),
+        getCycles(),
+        pipeName,
+        isStoppedByRuntimeExceptionBeforeStop);
   }
 }
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index be2ba4b6960..3b0554baa11 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -95,6 +95,7 @@ import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipeP
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.OperateMultiplePipesPlanV2;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
+import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusWithStoppedByRuntimeExceptionPlanV2;
 import 
org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProcedurePlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.quota.SetSpaceQuotaPlan;
@@ -902,6 +903,28 @@ public class ConfigPhysicalPlanSerDeTest {
     Assert.assertEquals(setPipeStatusPlanV2.getPipeStatus(), 
setPipeStatusPlanV21.getPipeStatus());
   }
 
+  @Test
+  public void SetPipeStatusWithStoppedByRuntimeExceptionPlanV2Test() throws 
IOException {
+    final SetPipeStatusWithStoppedByRuntimeExceptionPlanV2
+        setPipeStatusWithStoppedByRuntimeExceptionPlanV2 =
+            new SetPipeStatusWithStoppedByRuntimeExceptionPlanV2(
+                "pipe", 
org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus.STOPPED, true);
+    final SetPipeStatusWithStoppedByRuntimeExceptionPlanV2
+        setPipeStatusWithStoppedByRuntimeExceptionPlanV21 =
+            (SetPipeStatusWithStoppedByRuntimeExceptionPlanV2)
+                ConfigPhysicalPlan.Factory.create(
+                    
setPipeStatusWithStoppedByRuntimeExceptionPlanV2.serializeToByteBuffer());
+    Assert.assertEquals(
+        setPipeStatusWithStoppedByRuntimeExceptionPlanV2.getPipeName(),
+        setPipeStatusWithStoppedByRuntimeExceptionPlanV21.getPipeName());
+    Assert.assertEquals(
+        setPipeStatusWithStoppedByRuntimeExceptionPlanV2.getPipeStatus(),
+        setPipeStatusWithStoppedByRuntimeExceptionPlanV21.getPipeStatus());
+    Assert.assertEquals(
+        
setPipeStatusWithStoppedByRuntimeExceptionPlanV2.isStoppedByRuntimeException(),
+        
setPipeStatusWithStoppedByRuntimeExceptionPlanV21.isStoppedByRuntimeException());
+  }
+
   @Test
   public void DropPipePlanV2Test() throws IOException {
     DropPipePlanV2 dropPipePlanV2 = new DropPipePlanV2("demo");
@@ -944,7 +967,7 @@ public class ConfigPhysicalPlanSerDeTest {
         new SetPipeStatusPlanV2(
             "testSet", 
org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus.RUNNING);
 
-    List<ConfigPhysicalPlan> subPlans = new ArrayList<>();
+    final List<ConfigPhysicalPlan> subPlans = new ArrayList<>();
     subPlans.add(createPipePlanV2);
     subPlans.add(alterPipePlanV2);
     subPlans.add(dropPipePlanV2);
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2Test.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2Test.java
index e6d676ea27b..e3da356059b 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2Test.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2Test.java
@@ -19,18 +19,73 @@
 
 package org.apache.iotdb.confignode.procedure.impl.pipe.task;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2;
+import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusWithStoppedByRuntimeExceptionPlanV2;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
+import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.tsfile.utils.PublicBAOS;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
 
 import java.io.DataOutputStream;
 import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 public class StopPipeProcedureV2Test {
+
+  private static class TestStopPipeProcedureV2 extends StopPipeProcedureV2 {
+
+    private TestStopPipeProcedureV2(final String pipeName) throws 
PipeException {
+      super(pipeName);
+    }
+
+    private void setPipeTaskInfo(final PipeTaskInfo pipeTaskInfo) {
+      this.pipeTaskInfo = new AtomicReference<>(pipeTaskInfo);
+    }
+  }
+
+  private static PipeTaskInfo createExceptionStoppedPipeTaskInfo(final String 
pipeName) {
+    final PipeTaskInfo pipeTaskInfo = new PipeTaskInfo();
+
+    final PipeTaskMeta pipeTaskMeta = new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
+    final ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new 
ConcurrentHashMap<>();
+    pipeTasks.put(1, pipeTaskMeta);
+
+    final PipeStaticMeta pipeStaticMeta =
+        new PipeStaticMeta(
+            pipeName,
+            System.currentTimeMillis(),
+            Collections.singletonMap("extractor", "iotdb-source"),
+            Collections.singletonMap("processor", "do-nothing-processor"),
+            Collections.singletonMap("connector", "iotdb-thrift-connector"));
+    final PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta(pipeTasks);
+    pipeRuntimeMeta.getStatus().set(PipeStatus.STOPPED);
+    pipeRuntimeMeta.setIsStoppedByRuntimeException(true);
+
+    pipeTaskInfo.createPipe(new CreatePipePlanV2(pipeStaticMeta, 
pipeRuntimeMeta));
+    return pipeTaskInfo;
+  }
+
   @Test
   public void serializeDeserializeTest() {
     PublicBAOS byteArrayOutputStream = new PublicBAOS();
@@ -50,4 +105,54 @@ public class StopPipeProcedureV2Test {
       fail();
     }
   }
+
+  @Test
+  public void serializeDeserializeLegacyFormatTest() {
+    final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+    final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream);
+
+    final StopPipeProcedureV2 proc = new StopPipeProcedureV2("testPipe");
+
+    try {
+      proc.serialize(outputStream);
+      final ByteBuffer buffer =
+          ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size() - 1);
+      final StopPipeProcedureV2 proc2 =
+          (StopPipeProcedureV2) ProcedureFactory.getInstance().create(buffer);
+
+      assertEquals(proc, proc2);
+    } catch (Exception e) {
+      fail();
+    }
+  }
+
+  @Test
+  public void testStopPipeWritesStatusAndRuntimeExceptionFlagAtomically() 
throws Exception {
+    final String pipeName = "testPipe";
+    final TestStopPipeProcedureV2 proc = new TestStopPipeProcedureV2(pipeName);
+    proc.setPipeTaskInfo(createExceptionStoppedPipeTaskInfo(pipeName));
+    
proc.executeFromCalculateInfoForTask(Mockito.mock(ConfigNodeProcedureEnv.class));
+
+    final ConfigNodeProcedureEnv env = 
Mockito.mock(ConfigNodeProcedureEnv.class);
+    final ConfigManager configManager = Mockito.mock(ConfigManager.class);
+    final ConsensusManager consensusManager = 
Mockito.mock(ConsensusManager.class);
+    Mockito.when(env.getConfigManager()).thenReturn(configManager);
+    
Mockito.when(configManager.getConsensusManager()).thenReturn(consensusManager);
+    Mockito.when(consensusManager.write(Mockito.any(ConfigPhysicalPlan.class)))
+        .thenReturn(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+
+    proc.executeFromWriteConfigNodeConsensus(env);
+    proc.rollbackFromWriteConfigNodeConsensus(env);
+
+    final ArgumentCaptor<ConfigPhysicalPlan> planCaptor =
+        ArgumentCaptor.forClass(ConfigPhysicalPlan.class);
+    Mockito.verify(consensusManager, 
Mockito.times(2)).write(planCaptor.capture());
+
+    assertEquals(
+        new SetPipeStatusWithStoppedByRuntimeExceptionPlanV2(pipeName, 
PipeStatus.STOPPED, false),
+        planCaptor.getAllValues().get(0));
+    assertEquals(
+        new SetPipeStatusWithStoppedByRuntimeExceptionPlanV2(pipeName, 
PipeStatus.RUNNING, true),
+        planCaptor.getAllValues().get(1));
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index ec9ce06fd1d..22e282d85a8 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -215,8 +215,8 @@ public class CommonConfig {
 
   private int pipeDataStructureTabletRowSize = 2048;
 
-  // 128MB
-  private int pipeDataStructureTabletSizeInBytes = 60 * 1024 * 1024;
+  // 16MB
+  private int pipeDataStructureTabletSizeInBytes = 16 * 1024 * 1024;
   private double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold = 
0.3;
   private double pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold = 
0.3;
   private volatile double pipeTotalFloatingMemoryProportion = 0.5;

Reply via email to