This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch pipe-timeout
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/pipe-timeout by this push:
     new f1044e28aa6 fix
f1044e28aa6 is described below

commit f1044e28aa65d5004b01462f7753fb53aa6fb1f2
Author: Caideyipi <[email protected]>
AuthorDate: Tue Mar 31 15:47:04 2026 +0800

    fix
---
 .../iotdb/confignode/manager/ProcedureManager.java | 23 +++++++++++++---------
 1 file changed, 14 insertions(+), 9 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 1a69044d37d..abba01a9983 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -1515,16 +1515,16 @@ public class ProcedureManager {
 
   public TSStatus createPipe(TCreatePipeReq req) {
     try {
-      CreatePipeProcedureV2 procedure = new CreatePipeProcedureV2(req);
+      final CreatePipeProcedureV2 procedure = new CreatePipeProcedureV2(req);
       executor.submitProcedure(procedure);
-      TSStatus status = waitingProcedureFinished(procedure);
+      final TSStatus status = waitingProcedureFinished(procedure, 
PROCEDURE_WAIT_TIME_OUT << 1);
       if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         return status;
       } else {
         return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
             
.setMessage(wrapTimeoutMessageForPipeProcedure(status.getMessage()));
       }
-    } catch (Exception e) {
+    } catch (final Exception e) {
       return new 
TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
     }
   }
@@ -1533,14 +1533,14 @@ public class ProcedureManager {
     try {
       final AlterPipeProcedureV2 procedure = new AlterPipeProcedureV2(req);
       executor.submitProcedure(procedure);
-      TSStatus status = waitingProcedureFinished(procedure);
+      final TSStatus status = waitingProcedureFinished(procedure, 
PROCEDURE_WAIT_TIME_OUT << 1);
       if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         return status;
       } else {
         return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
             
.setMessage(wrapTimeoutMessageForPipeProcedure(status.getMessage()));
       }
-    } catch (Exception e) {
+    } catch (final Exception e) {
       return new 
TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
     }
   }
@@ -1624,9 +1624,9 @@ public class ProcedureManager {
 
   public TSStatus dropPipe(String pipeName) {
     try {
-      DropPipeProcedureV2 procedure = new DropPipeProcedureV2(pipeName);
+      final DropPipeProcedureV2 procedure = new DropPipeProcedureV2(pipeName);
       executor.submitProcedure(procedure);
-      TSStatus status = waitingProcedureFinished(procedure);
+      final TSStatus status = waitingProcedureFinished(procedure, 
PROCEDURE_WAIT_TIME_OUT << 1);
       if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         return status;
       } else {
@@ -1881,13 +1881,18 @@ public class ProcedureManager {
     return waitingProcedureFinished(executor.getProcedures().get(procedureId));
   }
 
+  protected TSStatus waitingProcedureFinished(final Procedure<?> procedure) {
+    return waitingProcedureFinished(procedure, PROCEDURE_WAIT_TIME_OUT);
+  }
+
   /**
    * Waiting until the specific procedure finished.
    *
    * @param procedure The specific procedure
    * @return TSStatus the running result of this procedure
    */
-  protected TSStatus waitingProcedureFinished(Procedure<?> procedure) {
+  protected TSStatus waitingProcedureFinished(
+      Procedure<?> procedure, final long procedureWaitRetryTimeout) {
     if (procedure == null) {
       LOGGER.error("Unexpected null procedure parameters for 
waitingProcedureFinished");
       return RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR);
@@ -1896,7 +1901,7 @@ public class ProcedureManager {
     final long startTimeForCurrentProcedure = System.currentTimeMillis();
     while (executor.isRunning()
         && !executor.isFinished(procedure.getProcId())
-        && System.currentTimeMillis() - startTimeForCurrentProcedure < 
PROCEDURE_WAIT_TIME_OUT) {
+        && System.currentTimeMillis() - startTimeForCurrentProcedure < 
procedureWaitRetryTimeout) {
       sleepWithoutInterrupt(PROCEDURE_WAIT_RETRY_TIMEOUT);
     }
     if (!procedure.isFinished()) {

Reply via email to