This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7cde3c96f93 Pipe: Optimized the procedure waiting timeout for some
time-consuming procedure & Construct the historical pipe when realtime pipe
creation times out (#17404)
7cde3c96f93 is described below
commit 7cde3c96f93f3b9f2449fc500e1c095ce2f3c1a8
Author: Caideyipi <[email protected]>
AuthorDate: Wed Apr 1 09:37:58 2026 +0800
Pipe: Optimized the procedure waiting timeout for some time-consuming
procedure & Construct the historical pipe when realtime pipe creation times out
(#17404)
* fix
* add
---
.../iotdb/confignode/manager/ProcedureManager.java | 25 +++++++++++++---------
.../config/executor/ClusterConfigTaskExecutor.java | 7 +++++-
2 files changed, 21 insertions(+), 11 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 1a69044d37d..499c2fe30d3 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -388,7 +388,7 @@ public class ProcedureManager {
this.executor.submitProcedure(procedure);
}
}
- return waitingProcedureFinished(procedure);
+ return waitingProcedureFinished(procedure, PROCEDURE_WAIT_TIME_OUT << 1);
}
public TSStatus deleteLogicalView(TDeleteLogicalViewReq req) {
@@ -1515,16 +1515,16 @@ public class ProcedureManager {
public TSStatus createPipe(TCreatePipeReq req) {
try {
- CreatePipeProcedureV2 procedure = new CreatePipeProcedureV2(req);
+ final CreatePipeProcedureV2 procedure = new CreatePipeProcedureV2(req);
executor.submitProcedure(procedure);
- TSStatus status = waitingProcedureFinished(procedure);
+ final TSStatus status = waitingProcedureFinished(procedure,
PROCEDURE_WAIT_TIME_OUT << 1);
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
} else {
return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
.setMessage(wrapTimeoutMessageForPipeProcedure(status.getMessage()));
}
- } catch (Exception e) {
+ } catch (final Exception e) {
return new
TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
}
}
@@ -1533,14 +1533,14 @@ public class ProcedureManager {
try {
final AlterPipeProcedureV2 procedure = new AlterPipeProcedureV2(req);
executor.submitProcedure(procedure);
- TSStatus status = waitingProcedureFinished(procedure);
+ final TSStatus status = waitingProcedureFinished(procedure,
PROCEDURE_WAIT_TIME_OUT << 1);
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
} else {
return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
.setMessage(wrapTimeoutMessageForPipeProcedure(status.getMessage()));
}
- } catch (Exception e) {
+ } catch (final Exception e) {
return new
TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
}
}
@@ -1624,9 +1624,9 @@ public class ProcedureManager {
public TSStatus dropPipe(String pipeName) {
try {
- DropPipeProcedureV2 procedure = new DropPipeProcedureV2(pipeName);
+ final DropPipeProcedureV2 procedure = new DropPipeProcedureV2(pipeName);
executor.submitProcedure(procedure);
- TSStatus status = waitingProcedureFinished(procedure);
+ final TSStatus status = waitingProcedureFinished(procedure,
PROCEDURE_WAIT_TIME_OUT << 1);
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
} else {
@@ -1881,13 +1881,18 @@ public class ProcedureManager {
return waitingProcedureFinished(executor.getProcedures().get(procedureId));
}
+ protected TSStatus waitingProcedureFinished(final Procedure<?> procedure) {
+ return waitingProcedureFinished(procedure, PROCEDURE_WAIT_TIME_OUT);
+ }
+
/**
* Waiting until the specific procedure finished.
*
* @param procedure The specific procedure
* @return TSStatus the running result of this procedure
*/
- protected TSStatus waitingProcedureFinished(Procedure<?> procedure) {
+ protected TSStatus waitingProcedureFinished(
+ Procedure<?> procedure, final long procedureWaitRetryTimeout) {
if (procedure == null) {
LOGGER.error("Unexpected null procedure parameters for
waitingProcedureFinished");
return RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR);
@@ -1896,7 +1901,7 @@ public class ProcedureManager {
final long startTimeForCurrentProcedure = System.currentTimeMillis();
while (executor.isRunning()
&& !executor.isFinished(procedure.getProcId())
- && System.currentTimeMillis() - startTimeForCurrentProcedure <
PROCEDURE_WAIT_TIME_OUT) {
+ && System.currentTimeMillis() - startTimeForCurrentProcedure <
procedureWaitRetryTimeout) {
sleepWithoutInterrupt(PROCEDURE_WAIT_RETRY_TIMEOUT);
}
if (!procedure.isFinished()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index de3261b0484..b0ebee30ecb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -2232,7 +2232,12 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
final TSStatus realtimeTsStatus =
configNodeClient.createPipe(realtimeReq);
// If creation fails, immediately return with exception
- if (TSStatusCode.SUCCESS_STATUS.getStatusCode() !=
realtimeTsStatus.getCode()) {
+ // If the procedure is still running, it's probably stuck on DataNode
+ // The pipe creation can ignore this situation and succeed, thus we do
not need to skip in
+ // this case
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() !=
realtimeTsStatus.getCode()
+ && TSStatusCode.OVERLAP_WITH_EXISTING_TASK.getStatusCode()
+ != realtimeTsStatus.getCode()) {
future.setException(new IoTDBException(realtimeTsStatus));
return future;
}