This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 7cde3c96f93 Pipe: Optimized the procedure waiting timeout for some 
time-consuming procedure & Construct the historical pipe when realtime pipe 
creation times out (#17404)
7cde3c96f93 is described below

commit 7cde3c96f93f3b9f2449fc500e1c095ce2f3c1a8
Author: Caideyipi <[email protected]>
AuthorDate: Wed Apr 1 09:37:58 2026 +0800

    Pipe: Optimized the procedure waiting timeout for some time-consuming 
procedure & Construct the historical pipe when realtime pipe creation times out 
(#17404)
    
    * fix
    
    * add
---
 .../iotdb/confignode/manager/ProcedureManager.java | 25 +++++++++++++---------
 .../config/executor/ClusterConfigTaskExecutor.java |  7 +++++-
 2 files changed, 21 insertions(+), 11 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 1a69044d37d..499c2fe30d3 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -388,7 +388,7 @@ public class ProcedureManager {
         this.executor.submitProcedure(procedure);
       }
     }
-    return waitingProcedureFinished(procedure);
+    return waitingProcedureFinished(procedure, PROCEDURE_WAIT_TIME_OUT << 1);
   }
 
   public TSStatus deleteLogicalView(TDeleteLogicalViewReq req) {
@@ -1515,16 +1515,16 @@ public class ProcedureManager {
 
   public TSStatus createPipe(TCreatePipeReq req) {
     try {
-      CreatePipeProcedureV2 procedure = new CreatePipeProcedureV2(req);
+      final CreatePipeProcedureV2 procedure = new CreatePipeProcedureV2(req);
       executor.submitProcedure(procedure);
-      TSStatus status = waitingProcedureFinished(procedure);
+      final TSStatus status = waitingProcedureFinished(procedure, 
PROCEDURE_WAIT_TIME_OUT << 1);
       if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         return status;
       } else {
         return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
             
.setMessage(wrapTimeoutMessageForPipeProcedure(status.getMessage()));
       }
-    } catch (Exception e) {
+    } catch (final Exception e) {
       return new 
TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
     }
   }
@@ -1533,14 +1533,14 @@ public class ProcedureManager {
     try {
       final AlterPipeProcedureV2 procedure = new AlterPipeProcedureV2(req);
       executor.submitProcedure(procedure);
-      TSStatus status = waitingProcedureFinished(procedure);
+      final TSStatus status = waitingProcedureFinished(procedure, 
PROCEDURE_WAIT_TIME_OUT << 1);
       if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         return status;
       } else {
         return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
             
.setMessage(wrapTimeoutMessageForPipeProcedure(status.getMessage()));
       }
-    } catch (Exception e) {
+    } catch (final Exception e) {
       return new 
TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
     }
   }
@@ -1624,9 +1624,9 @@ public class ProcedureManager {
 
   public TSStatus dropPipe(String pipeName) {
     try {
-      DropPipeProcedureV2 procedure = new DropPipeProcedureV2(pipeName);
+      final DropPipeProcedureV2 procedure = new DropPipeProcedureV2(pipeName);
       executor.submitProcedure(procedure);
-      TSStatus status = waitingProcedureFinished(procedure);
+      final TSStatus status = waitingProcedureFinished(procedure, 
PROCEDURE_WAIT_TIME_OUT << 1);
       if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         return status;
       } else {
@@ -1881,13 +1881,18 @@ public class ProcedureManager {
     return waitingProcedureFinished(executor.getProcedures().get(procedureId));
   }
 
+  protected TSStatus waitingProcedureFinished(final Procedure<?> procedure) {
+    return waitingProcedureFinished(procedure, PROCEDURE_WAIT_TIME_OUT);
+  }
+
   /**
    * Waiting until the specific procedure finished.
    *
    * @param procedure The specific procedure
    * @return TSStatus the running result of this procedure
    */
-  protected TSStatus waitingProcedureFinished(Procedure<?> procedure) {
+  protected TSStatus waitingProcedureFinished(
+      Procedure<?> procedure, final long procedureWaitRetryTimeout) {
     if (procedure == null) {
       LOGGER.error("Unexpected null procedure parameters for 
waitingProcedureFinished");
       return RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR);
@@ -1896,7 +1901,7 @@ public class ProcedureManager {
     final long startTimeForCurrentProcedure = System.currentTimeMillis();
     while (executor.isRunning()
         && !executor.isFinished(procedure.getProcId())
-        && System.currentTimeMillis() - startTimeForCurrentProcedure < 
PROCEDURE_WAIT_TIME_OUT) {
+        && System.currentTimeMillis() - startTimeForCurrentProcedure < 
procedureWaitRetryTimeout) {
       sleepWithoutInterrupt(PROCEDURE_WAIT_RETRY_TIMEOUT);
     }
     if (!procedure.isFinished()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index de3261b0484..b0ebee30ecb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -2232,7 +2232,12 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
 
         final TSStatus realtimeTsStatus = 
configNodeClient.createPipe(realtimeReq);
         // If creation fails, immediately return with exception
-        if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != 
realtimeTsStatus.getCode()) {
+        // If the procedure is still running, it's probably stuck on DataNode
+        // The pipe creation can ignore this situation and succeed, thus we do 
not need to skip in
+        // this case
+        if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != 
realtimeTsStatus.getCode()
+            && TSStatusCode.OVERLAP_WITH_EXISTING_TASK.getStatusCode()
+                != realtimeTsStatus.getCode()) {
           future.setException(new IoTDBException(realtimeTsStatus));
           return future;
         }

Reply via email to