This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch pipe-timeout
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/pipe-timeout by this push:
new f1044e28aa6 fix
f1044e28aa6 is described below
commit f1044e28aa65d5004b01462f7753fb53aa6fb1f2
Author: Caideyipi <[email protected]>
AuthorDate: Tue Mar 31 15:47:04 2026 +0800
fix
---
.../iotdb/confignode/manager/ProcedureManager.java | 23 +++++++++++++---------
1 file changed, 14 insertions(+), 9 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 1a69044d37d..abba01a9983 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -1515,16 +1515,16 @@ public class ProcedureManager {
public TSStatus createPipe(TCreatePipeReq req) {
try {
- CreatePipeProcedureV2 procedure = new CreatePipeProcedureV2(req);
+ final CreatePipeProcedureV2 procedure = new CreatePipeProcedureV2(req);
executor.submitProcedure(procedure);
- TSStatus status = waitingProcedureFinished(procedure);
+ final TSStatus status = waitingProcedureFinished(procedure,
PROCEDURE_WAIT_TIME_OUT << 1);
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
} else {
return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
.setMessage(wrapTimeoutMessageForPipeProcedure(status.getMessage()));
}
- } catch (Exception e) {
+ } catch (final Exception e) {
return new
TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
}
}
@@ -1533,14 +1533,14 @@ public class ProcedureManager {
try {
final AlterPipeProcedureV2 procedure = new AlterPipeProcedureV2(req);
executor.submitProcedure(procedure);
- TSStatus status = waitingProcedureFinished(procedure);
+ final TSStatus status = waitingProcedureFinished(procedure,
PROCEDURE_WAIT_TIME_OUT << 1);
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
} else {
return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
.setMessage(wrapTimeoutMessageForPipeProcedure(status.getMessage()));
}
- } catch (Exception e) {
+ } catch (final Exception e) {
return new
TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
}
}
@@ -1624,9 +1624,9 @@ public class ProcedureManager {
public TSStatus dropPipe(String pipeName) {
try {
- DropPipeProcedureV2 procedure = new DropPipeProcedureV2(pipeName);
+ final DropPipeProcedureV2 procedure = new DropPipeProcedureV2(pipeName);
executor.submitProcedure(procedure);
- TSStatus status = waitingProcedureFinished(procedure);
+ final TSStatus status = waitingProcedureFinished(procedure,
PROCEDURE_WAIT_TIME_OUT << 1);
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
} else {
@@ -1881,13 +1881,18 @@ public class ProcedureManager {
return waitingProcedureFinished(executor.getProcedures().get(procedureId));
}
+ protected TSStatus waitingProcedureFinished(final Procedure<?> procedure) {
+ return waitingProcedureFinished(procedure, PROCEDURE_WAIT_TIME_OUT);
+ }
+
/**
* Waiting until the specific procedure finished.
*
* @param procedure The specific procedure
* @return TSStatus the running result of this procedure
*/
- protected TSStatus waitingProcedureFinished(Procedure<?> procedure) {
+ protected TSStatus waitingProcedureFinished(
+ Procedure<?> procedure, final long procedureWaitRetryTimeout) {
if (procedure == null) {
LOGGER.error("Unexpected null procedure parameters for
waitingProcedureFinished");
return RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR);
@@ -1896,7 +1901,7 @@ public class ProcedureManager {
final long startTimeForCurrentProcedure = System.currentTimeMillis();
while (executor.isRunning()
&& !executor.isFinished(procedure.getProcId())
- && System.currentTimeMillis() - startTimeForCurrentProcedure <
PROCEDURE_WAIT_TIME_OUT) {
+ && System.currentTimeMillis() - startTimeForCurrentProcedure <
procedureWaitRetryTimeout) {
sleepWithoutInterrupt(PROCEDURE_WAIT_RETRY_TIMEOUT);
}
if (!procedure.isFinished()) {