This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 3b96b715bb6 [To dev/1.3] Pipe: Reset tablet
pipeDataStructureTabletSizeInBytes to 16MB & Enable stopping exception restart
by manual stop pipe (#17588) (#17604)
3b96b715bb6 is described below
commit 3b96b715bb6ac81474e203e5c1737c08029213b7
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 7 09:34:34 2026 +0800
[To dev/1.3] Pipe: Reset tablet pipeDataStructureTabletSizeInBytes to 16MB
& Enable stopping exception restart by manual stop pipe (#17588) (#17604)
* Fixed the bug that create attribute does not support attribute.None &
Pipe: Reset tablet pipeDataStructureTabletSizeInBytes to 16MB & Enable stopping
exception restart by manual stop pipe (#17588)
* stop pipe
* 16
* fix
* legacy
* Update PipeTaskCoordinator.java
* fix-access
---
.../consensus/request/ConfigPhysicalPlan.java | 4 +
.../consensus/request/ConfigPhysicalPlanType.java | 1 +
...eStatusWithStoppedByRuntimeExceptionPlanV2.java | 109 +++++++++++++++++++++
.../pipe/coordinator/task/PipeTaskCoordinator.java | 13 +--
.../persistence/executor/ConfigPlanExecutor.java | 4 +
.../confignode/persistence/pipe/PipeInfo.java | 20 ++++
.../confignode/persistence/pipe/PipeTaskInfo.java | 20 ++++
.../impl/pipe/task/StopPipeProcedureV2.java | 26 ++++-
.../request/ConfigPhysicalPlanSerDeTest.java | 25 ++++-
.../impl/pipe/task/StopPipeProcedureV2Test.java | 105 ++++++++++++++++++++
.../apache/iotdb/commons/conf/CommonConfig.java | 4 +-
11 files changed, 311 insertions(+), 20 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
index b7a0936ca45..06c10c39723 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
@@ -73,6 +73,7 @@ import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipeP
import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.OperateMultiplePipesPlanV2;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
+import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusWithStoppedByRuntimeExceptionPlanV2;
import
org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProcedurePlan;
import
org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
import
org.apache.iotdb.confignode.consensus.request.write.quota.SetSpaceQuotaPlan;
@@ -354,6 +355,9 @@ public abstract class ConfigPhysicalPlan implements
IConsensusRequest {
case SetPipeStatusV2:
plan = new SetPipeStatusPlanV2();
break;
+ case SetPipeStatusWithStoppedByRuntimeExceptionV2:
+ plan = new SetPipeStatusWithStoppedByRuntimeExceptionPlanV2();
+ break;
case DropPipeV2:
plan = new DropPipePlanV2();
break;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
index 24e5b9ba9c6..b9862995966 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
@@ -217,6 +217,7 @@ public enum ConfigPhysicalPlanType {
ShowPipeV2((short) 1503),
AlterPipeV2((short) 1504),
OperateMultiplePipesV2((short) 1505),
+ SetPipeStatusWithStoppedByRuntimeExceptionV2((short) 1506),
/** Pipe Runtime. */
PipeHandleLeaderChange((short) 1600),
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/task/SetPipeStatusWithStoppedByRuntimeExceptionPlanV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/task/SetPipeStatusWithStoppedByRuntimeExceptionPlanV2.java
new file mode 100644
index 00000000000..35ee503d5c5
--- /dev/null
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/task/SetPipeStatusWithStoppedByRuntimeExceptionPlanV2.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.consensus.request.write.pipe.task;
+
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+public class SetPipeStatusWithStoppedByRuntimeExceptionPlanV2 extends
ConfigPhysicalPlan {
+
+ private String pipeName;
+ private PipeStatus status;
+ private boolean stoppedByRuntimeException;
+
+ public SetPipeStatusWithStoppedByRuntimeExceptionPlanV2() {
+ super(ConfigPhysicalPlanType.SetPipeStatusWithStoppedByRuntimeExceptionV2);
+ }
+
+ public SetPipeStatusWithStoppedByRuntimeExceptionPlanV2(
+ final String pipeName, final PipeStatus status, final boolean
stoppedByRuntimeException) {
+ super(ConfigPhysicalPlanType.SetPipeStatusWithStoppedByRuntimeExceptionV2);
+ this.pipeName = pipeName;
+ this.status = status;
+ this.stoppedByRuntimeException = stoppedByRuntimeException;
+ }
+
+ public String getPipeName() {
+ return pipeName;
+ }
+
+ public PipeStatus getPipeStatus() {
+ return status;
+ }
+
+ public boolean isStoppedByRuntimeException() {
+ return stoppedByRuntimeException;
+ }
+
+ @Override
+ protected void serializeImpl(final DataOutputStream stream) throws
IOException {
+ stream.writeShort(getType().getPlanType());
+ ReadWriteIOUtils.write(pipeName, stream);
+ ReadWriteIOUtils.write(status.getType(), stream);
+ ReadWriteIOUtils.write(stoppedByRuntimeException, stream);
+ }
+
+ @Override
+ protected void deserializeImpl(final ByteBuffer buffer) throws IOException {
+ pipeName = ReadWriteIOUtils.readString(buffer);
+ status = PipeStatus.getPipeStatus(ReadWriteIOUtils.readByte(buffer));
+ stoppedByRuntimeException = ReadWriteIOUtils.readBool(buffer);
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ final SetPipeStatusWithStoppedByRuntimeExceptionPlanV2 that =
+ (SetPipeStatusWithStoppedByRuntimeExceptionPlanV2) obj;
+ return stoppedByRuntimeException == that.stoppedByRuntimeException
+ && pipeName.equals(that.pipeName)
+ && status.equals(that.status);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(pipeName, status, stoppedByRuntimeException);
+ }
+
+ @Override
+ public String toString() {
+ return "SetPipeStatusWithStoppedByRuntimeExceptionPlanV2{"
+ + "pipeName='"
+ + pipeName
+ + "', status='"
+ + status
+ + "', stoppedByRuntimeException='"
+ + stoppedByRuntimeException
+ + "'}";
+ }
+}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
index ee1ccd2a8fb..f3430db3932 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
@@ -120,19 +120,8 @@ public class PipeTaskCoordinator {
/** Caller should ensure that the method is called in the lock {@link
#lock()}. */
public TSStatus stopPipe(String pipeName) {
- final boolean isStoppedByRuntimeException =
pipeTaskInfo.isStoppedByRuntimeException(pipeName);
final TSStatus status =
configManager.getProcedureManager().stopPipe(pipeName);
- if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- if (isStoppedByRuntimeException) {
- // Even if the return status is success, it doesn't imply the success
of the
- // `executeFromOperateOnDataNodes` phase of stopping pipe. However, we
still need to set
- // `isStoppedByRuntimeException` to false to avoid auto-restart.
Meanwhile,
- // `isStoppedByRuntimeException` does not need to be synchronized with
DNs.
- LOGGER.info("Pipe {} has stopped manually, stop its auto restart
process.", pipeName);
- pipeTaskInfo.setIsStoppedByRuntimeExceptionToFalse(pipeName);
- configManager.getProcedureManager().pipeHandleMetaChange(true, false);
- }
- } else {
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.warn("Failed to stop pipe {}. Result status: {}.", pipeName,
status);
}
return status;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index 4facb308d8a..09017a841c7 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -98,6 +98,7 @@ import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipeP
import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.OperateMultiplePipesPlanV2;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
+import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusWithStoppedByRuntimeExceptionPlanV2;
import
org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProcedurePlan;
import
org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
import
org.apache.iotdb.confignode.consensus.request.write.quota.SetSpaceQuotaPlan;
@@ -497,6 +498,9 @@ public class ConfigPlanExecutor {
return pipeInfo.createPipe((CreatePipePlanV2) physicalPlan);
case SetPipeStatusV2:
return pipeInfo.setPipeStatus((SetPipeStatusPlanV2) physicalPlan);
+ case SetPipeStatusWithStoppedByRuntimeExceptionV2:
+ return pipeInfo.setPipeStatusWithStoppedByRuntimeException(
+ (SetPipeStatusWithStoppedByRuntimeExceptionPlanV2) physicalPlan);
case DropPipeV2:
return pipeInfo.dropPipe((DropPipePlanV2) physicalPlan);
case AlterPipeV2:
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
index d09e5c82845..032534f9161 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
@@ -29,6 +29,7 @@ import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipeP
import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.OperateMultiplePipesPlanV2;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
+import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusWithStoppedByRuntimeExceptionPlanV2;
import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
import
org.apache.iotdb.confignode.manager.pipe.agent.runtime.PipeConfigRegionListener;
import
org.apache.iotdb.confignode.manager.pipe.agent.task.PipeConfigNodeSubtask;
@@ -124,6 +125,25 @@ public class PipeInfo implements SnapshotProcessor {
}
}
+ public TSStatus setPipeStatusWithStoppedByRuntimeException(
+ final SetPipeStatusWithStoppedByRuntimeExceptionPlanV2 plan) {
+ try {
+ pipeTaskInfo.setPipeStatusWithStoppedByRuntimeException(plan);
+
+ PipeConfigNodeAgent.task()
+
.handleSinglePipeMetaChanges(pipeTaskInfo.getPipeMetaByPipeName(plan.getPipeName()));
+ PipeTemporaryMetaInCoordinatorMetrics.getInstance()
+ .handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList());
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ } catch (final Exception e) {
+ LOGGER.error("Failed to set pipe status with
stopped-by-runtime-exception flag", e);
+ return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
+ .setMessage(
+ "Failed to set pipe status with stopped-by-runtime-exception
flag, because "
+ + e.getMessage());
+ }
+ }
+
public TSStatus dropPipe(final DropPipePlanV2 plan) {
try {
final Optional<PipeMeta> pipeMetaBeforeDrop =
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 199b835c839..022144f1a91 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -48,6 +48,7 @@ import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipeP
import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.OperateMultiplePipesPlanV2;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
+import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusWithStoppedByRuntimeExceptionPlanV2;
import org.apache.iotdb.confignode.consensus.response.pipe.task.PipeTableResp;
import
org.apache.iotdb.confignode.manager.pipe.resource.PipeConfigNodeResourceManager;
import
org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeHandleMetaChangeProcedure;
@@ -523,6 +524,25 @@ public class PipeTaskInfo implements SnapshotProcessor {
}
}
+ public TSStatus setPipeStatusWithStoppedByRuntimeException(
+ final SetPipeStatusWithStoppedByRuntimeExceptionPlanV2 plan) {
+ acquireWriteLock();
+ try {
+ pipeMetaKeeper
+ .getPipeMeta(plan.getPipeName())
+ .getRuntimeMeta()
+ .getStatus()
+ .set(plan.getPipeStatus());
+ pipeMetaKeeper
+ .getPipeMeta(plan.getPipeName())
+ .getRuntimeMeta()
+ .setIsStoppedByRuntimeException(plan.isStoppedByRuntimeException());
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ } finally {
+ releaseWriteLock();
+ }
+ }
+
public TSStatus dropPipe(final DropPipePlanV2 plan) {
acquireWriteLock();
try {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
index 5349cc65640..b2e1a584ec5 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.confignode.procedure.impl.pipe.task;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
-import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
+import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusWithStoppedByRuntimeExceptionPlanV2;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import
org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProcedureV2;
import org.apache.iotdb.confignode.procedure.impl.pipe.PipeTaskOperation;
@@ -44,6 +44,7 @@ public class StopPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
private static final Logger LOGGER =
LoggerFactory.getLogger(StopPipeProcedureV2.class);
private String pipeName;
+ private boolean isStoppedByRuntimeExceptionBeforeStop;
public StopPipeProcedureV2() {
super();
@@ -71,7 +72,8 @@ public class StopPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
@Override
public void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env)
throws PipeException {
LOGGER.info("StopPipeProcedureV2: executeFromCalculateInfoForTask({})",
pipeName);
- // Do nothing
+ isStoppedByRuntimeExceptionBeforeStop =
+ pipeTaskInfo.get().isStoppedByRuntimeException(pipeName);
}
@Override
@@ -83,7 +85,9 @@ public class StopPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
response =
env.getConfigManager()
.getConsensusManager()
- .write(new SetPipeStatusPlanV2(pipeName, PipeStatus.STOPPED));
+ .write(
+ new SetPipeStatusWithStoppedByRuntimeExceptionPlanV2(
+ pipeName, PipeStatus.STOPPED, false));
} catch (ConsensusException e) {
LOGGER.warn("Failed in the write API executing the consensus layer due
to: ", e);
response = new
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
@@ -128,7 +132,9 @@ public class StopPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
response =
env.getConfigManager()
.getConsensusManager()
- .write(new SetPipeStatusPlanV2(pipeName, PipeStatus.RUNNING));
+ .write(
+ new SetPipeStatusWithStoppedByRuntimeExceptionPlanV2(
+ pipeName, PipeStatus.RUNNING,
isStoppedByRuntimeExceptionBeforeStop));
} catch (ConsensusException e) {
LOGGER.warn("Failed in the write API executing the consensus layer due
to: ", e);
response = new
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
@@ -159,12 +165,16 @@ public class StopPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
stream.writeShort(ProcedureType.STOP_PIPE_PROCEDURE_V2.getTypeCode());
super.serialize(stream);
ReadWriteIOUtils.write(pipeName, stream);
+ ReadWriteIOUtils.write(isStoppedByRuntimeExceptionBeforeStop, stream);
}
@Override
public void deserialize(ByteBuffer byteBuffer) {
super.deserialize(byteBuffer);
pipeName = ReadWriteIOUtils.readString(byteBuffer);
+ // Legacy persisted procedures do not carry this field.
+ isStoppedByRuntimeExceptionBeforeStop =
+ byteBuffer.hasRemaining() && ReadWriteIOUtils.readBool(byteBuffer);
}
@Override
@@ -179,11 +189,17 @@ public class StopPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
return getProcId() == that.getProcId()
&& getCurrentState().equals(that.getCurrentState())
&& getCycles() == that.getCycles()
+ && isStoppedByRuntimeExceptionBeforeStop ==
that.isStoppedByRuntimeExceptionBeforeStop
&& pipeName.equals(that.pipeName);
}
@Override
public int hashCode() {
- return Objects.hash(getProcId(), getCurrentState(), getCycles(), pipeName);
+ return Objects.hash(
+ getProcId(),
+ getCurrentState(),
+ getCycles(),
+ pipeName,
+ isStoppedByRuntimeExceptionBeforeStop);
}
}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index be2ba4b6960..3b0554baa11 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -95,6 +95,7 @@ import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipeP
import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.OperateMultiplePipesPlanV2;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
+import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusWithStoppedByRuntimeExceptionPlanV2;
import
org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProcedurePlan;
import
org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
import
org.apache.iotdb.confignode.consensus.request.write.quota.SetSpaceQuotaPlan;
@@ -902,6 +903,28 @@ public class ConfigPhysicalPlanSerDeTest {
Assert.assertEquals(setPipeStatusPlanV2.getPipeStatus(),
setPipeStatusPlanV21.getPipeStatus());
}
+ @Test
+ public void SetPipeStatusWithStoppedByRuntimeExceptionPlanV2Test() throws
IOException {
+ final SetPipeStatusWithStoppedByRuntimeExceptionPlanV2
+ setPipeStatusWithStoppedByRuntimeExceptionPlanV2 =
+ new SetPipeStatusWithStoppedByRuntimeExceptionPlanV2(
+ "pipe",
org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus.STOPPED, true);
+ final SetPipeStatusWithStoppedByRuntimeExceptionPlanV2
+ setPipeStatusWithStoppedByRuntimeExceptionPlanV21 =
+ (SetPipeStatusWithStoppedByRuntimeExceptionPlanV2)
+ ConfigPhysicalPlan.Factory.create(
+
setPipeStatusWithStoppedByRuntimeExceptionPlanV2.serializeToByteBuffer());
+ Assert.assertEquals(
+ setPipeStatusWithStoppedByRuntimeExceptionPlanV2.getPipeName(),
+ setPipeStatusWithStoppedByRuntimeExceptionPlanV21.getPipeName());
+ Assert.assertEquals(
+ setPipeStatusWithStoppedByRuntimeExceptionPlanV2.getPipeStatus(),
+ setPipeStatusWithStoppedByRuntimeExceptionPlanV21.getPipeStatus());
+ Assert.assertEquals(
+
setPipeStatusWithStoppedByRuntimeExceptionPlanV2.isStoppedByRuntimeException(),
+
setPipeStatusWithStoppedByRuntimeExceptionPlanV21.isStoppedByRuntimeException());
+ }
+
@Test
public void DropPipePlanV2Test() throws IOException {
DropPipePlanV2 dropPipePlanV2 = new DropPipePlanV2("demo");
@@ -944,7 +967,7 @@ public class ConfigPhysicalPlanSerDeTest {
new SetPipeStatusPlanV2(
"testSet",
org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus.RUNNING);
- List<ConfigPhysicalPlan> subPlans = new ArrayList<>();
+ final List<ConfigPhysicalPlan> subPlans = new ArrayList<>();
subPlans.add(createPipePlanV2);
subPlans.add(alterPipePlanV2);
subPlans.add(dropPipePlanV2);
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2Test.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2Test.java
index e6d676ea27b..e3da356059b 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2Test.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2Test.java
@@ -19,18 +19,73 @@
package org.apache.iotdb.confignode.procedure.impl.pipe.task;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2;
+import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusWithStoppedByRuntimeExceptionPlanV2;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
+import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.tsfile.utils.PublicBAOS;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
import java.io.DataOutputStream;
import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
public class StopPipeProcedureV2Test {
+
+ private static class TestStopPipeProcedureV2 extends StopPipeProcedureV2 {
+
+ private TestStopPipeProcedureV2(final String pipeName) throws
PipeException {
+ super(pipeName);
+ }
+
+ private void setPipeTaskInfo(final PipeTaskInfo pipeTaskInfo) {
+ this.pipeTaskInfo = new AtomicReference<>(pipeTaskInfo);
+ }
+ }
+
+ private static PipeTaskInfo createExceptionStoppedPipeTaskInfo(final String
pipeName) {
+ final PipeTaskInfo pipeTaskInfo = new PipeTaskInfo();
+
+ final PipeTaskMeta pipeTaskMeta = new
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
+ final ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new
ConcurrentHashMap<>();
+ pipeTasks.put(1, pipeTaskMeta);
+
+ final PipeStaticMeta pipeStaticMeta =
+ new PipeStaticMeta(
+ pipeName,
+ System.currentTimeMillis(),
+ Collections.singletonMap("extractor", "iotdb-source"),
+ Collections.singletonMap("processor", "do-nothing-processor"),
+ Collections.singletonMap("connector", "iotdb-thrift-connector"));
+ final PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta(pipeTasks);
+ pipeRuntimeMeta.getStatus().set(PipeStatus.STOPPED);
+ pipeRuntimeMeta.setIsStoppedByRuntimeException(true);
+
+ pipeTaskInfo.createPipe(new CreatePipePlanV2(pipeStaticMeta,
pipeRuntimeMeta));
+ return pipeTaskInfo;
+ }
+
@Test
public void serializeDeserializeTest() {
PublicBAOS byteArrayOutputStream = new PublicBAOS();
@@ -50,4 +105,54 @@ public class StopPipeProcedureV2Test {
fail();
}
}
+
+ @Test
+ public void serializeDeserializeLegacyFormatTest() {
+ final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream);
+
+ final StopPipeProcedureV2 proc = new StopPipeProcedureV2("testPipe");
+
+ try {
+ proc.serialize(outputStream);
+ final ByteBuffer buffer =
+ ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size() - 1);
+ final StopPipeProcedureV2 proc2 =
+ (StopPipeProcedureV2) ProcedureFactory.getInstance().create(buffer);
+
+ assertEquals(proc, proc2);
+ } catch (Exception e) {
+ fail();
+ }
+ }
+
+ @Test
+ public void testStopPipeWritesStatusAndRuntimeExceptionFlagAtomically()
throws Exception {
+ final String pipeName = "testPipe";
+ final TestStopPipeProcedureV2 proc = new TestStopPipeProcedureV2(pipeName);
+ proc.setPipeTaskInfo(createExceptionStoppedPipeTaskInfo(pipeName));
+
proc.executeFromCalculateInfoForTask(Mockito.mock(ConfigNodeProcedureEnv.class));
+
+ final ConfigNodeProcedureEnv env =
Mockito.mock(ConfigNodeProcedureEnv.class);
+ final ConfigManager configManager = Mockito.mock(ConfigManager.class);
+ final ConsensusManager consensusManager =
Mockito.mock(ConsensusManager.class);
+ Mockito.when(env.getConfigManager()).thenReturn(configManager);
+
Mockito.when(configManager.getConsensusManager()).thenReturn(consensusManager);
+ Mockito.when(consensusManager.write(Mockito.any(ConfigPhysicalPlan.class)))
+ .thenReturn(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+
+ proc.executeFromWriteConfigNodeConsensus(env);
+ proc.rollbackFromWriteConfigNodeConsensus(env);
+
+ final ArgumentCaptor<ConfigPhysicalPlan> planCaptor =
+ ArgumentCaptor.forClass(ConfigPhysicalPlan.class);
+ Mockito.verify(consensusManager,
Mockito.times(2)).write(planCaptor.capture());
+
+ assertEquals(
+ new SetPipeStatusWithStoppedByRuntimeExceptionPlanV2(pipeName,
PipeStatus.STOPPED, false),
+ planCaptor.getAllValues().get(0));
+ assertEquals(
+ new SetPipeStatusWithStoppedByRuntimeExceptionPlanV2(pipeName,
PipeStatus.RUNNING, true),
+ planCaptor.getAllValues().get(1));
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index ec9ce06fd1d..22e282d85a8 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -215,8 +215,8 @@ public class CommonConfig {
private int pipeDataStructureTabletRowSize = 2048;
- // 128MB
- private int pipeDataStructureTabletSizeInBytes = 60 * 1024 * 1024;
+ // 16MB
+ private int pipeDataStructureTabletSizeInBytes = 16 * 1024 * 1024;
private double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold =
0.3;
private double pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold =
0.3;
private volatile double pipeTotalFloatingMemoryProportion = 0.5;