This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch RecordBlockedQuery-1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 51dd4452a0716b1d14a1bf0ebc88d223210adb0d
Author: JackieTien97 <[email protected]>
AuthorDate: Thu Jun 26 16:06:32 2025 +0800

    Record release resource blocked too long by driver
---
 .../queryengine/execution/QueryStateMachine.java   | 55 +++++++++++++++-------
 .../fragment/FragmentInstanceContext.java          |  7 +++
 2 files changed, 44 insertions(+), 18 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java
index 581b015eeb6..cc0f787b014 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java
@@ -21,12 +21,15 @@ package org.apache.iotdb.db.queryengine.execution;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
 import org.apache.iotdb.db.queryengine.common.QueryId;
 import org.apache.iotdb.db.queryengine.plan.execution.QueryExecution;
+import org.apache.iotdb.rpc.RpcUtils;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
@@ -39,6 +42,7 @@ import static 
org.apache.iotdb.db.queryengine.execution.QueryState.PENDING_RETRY
 import static org.apache.iotdb.db.queryengine.execution.QueryState.PLANNED;
 import static org.apache.iotdb.db.queryengine.execution.QueryState.QUEUED;
 import static org.apache.iotdb.db.queryengine.execution.QueryState.RUNNING;
+import static org.apache.iotdb.db.utils.ErrorHandlingUtils.getRootCause;
 
 /**
  * State machine for a {@link QueryExecution}. It stores the states for the 
{@link QueryExecution}.
@@ -47,8 +51,8 @@ import static 
org.apache.iotdb.db.queryengine.execution.QueryState.RUNNING;
 public class QueryStateMachine {
   private final StateMachine<QueryState> queryState;
 
-  private Throwable failureException;
-  private TSStatus failureStatus;
+  private final AtomicReference<Throwable> failureException = new 
AtomicReference<>();
+  private final AtomicReference<TSStatus> failureStatus = new 
AtomicReference<>();
 
   public QueryStateMachine(QueryId queryId, ExecutorService executor) {
     this.queryState =
@@ -71,6 +75,8 @@ public class QueryStateMachine {
 
   public void transitionToQueued() {
     queryState.set(QUEUED);
+    failureException.set(null);
+    failureStatus.set(null);
   }
 
   public void transitionToPlanned() {
@@ -82,7 +88,7 @@ public class QueryStateMachine {
   }
 
   public void transitionToPendingRetry(TSStatus failureStatus) {
-    this.failureStatus = failureStatus;
+    this.failureStatus.compareAndSet(null, failureStatus);
     queryState.setIf(PENDING_RETRY, currentState -> currentState == 
DISPATCHING);
   }
 
@@ -102,10 +108,9 @@ public class QueryStateMachine {
   }
 
   public void transitionToCanceled(Throwable throwable, TSStatus 
failureStatus) {
-    if (transitionToDoneState(CANCELED)) {
-      this.failureException = throwable;
-      this.failureStatus = failureStatus;
-    }
+    this.failureStatus.compareAndSet(null, failureStatus);
+    this.failureException.compareAndSet(null, throwable);
+    transitionToDoneState(CANCELED);
   }
 
   public void transitionToAborted() {
@@ -117,15 +122,13 @@ public class QueryStateMachine {
   }
 
   public void transitionToFailed(Throwable throwable) {
-    if (transitionToDoneState(FAILED)) {
-      this.failureException = throwable;
-    }
+    this.failureException.compareAndSet(null, throwable);
+    transitionToDoneState(FAILED);
   }
 
   public void transitionToFailed(TSStatus failureStatus) {
-    if (transitionToDoneState(FAILED)) {
-      this.failureStatus = failureStatus;
-    }
+    this.failureStatus.compareAndSet(null, failureStatus);
+    transitionToDoneState(FAILED);
   }
 
   private boolean transitionToDoneState(QueryState doneState) {
@@ -136,21 +139,37 @@ public class QueryStateMachine {
   }
 
   public String getFailureMessage() {
-    if (failureException != null) {
-      return failureException.getMessage();
+    Throwable throwable = failureException.get();
+    if (throwable != null) {
+      return throwable.getMessage();
     }
     return "no detailed failure reason in QueryStateMachine";
   }
 
   public Throwable getFailureException() {
-    if (failureException == null) {
+    Throwable throwable = failureException.get();
+    if (throwable == null) {
       return new IoTDBException(getFailureStatus().getMessage(), 
getFailureStatus().code);
     } else {
-      return failureException;
+      return throwable;
     }
   }
 
   public TSStatus getFailureStatus() {
-    return failureStatus;
+    TSStatus status = failureStatus.get();
+    if (status != null) {
+      return status;
+    } else {
+      Throwable throwable = failureException.get();
+      if (throwable != null) {
+        Throwable t = getRootCause(throwable);
+        if (t instanceof IoTDBRuntimeException) {
+          return RpcUtils.getStatus(((IoTDBRuntimeException) 
t).getErrorCode(), t.getMessage());
+        } else if (t instanceof IoTDBException) {
+          return RpcUtils.getStatus(((IoTDBException) t).getErrorCode(), 
t.getMessage());
+        }
+      }
+      return failureStatus.get();
+    }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index 917e378fd66..f6afb5b71b6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -71,6 +71,8 @@ public class FragmentInstanceContext extends QueryContext {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(FragmentInstanceContext.class);
   private static final long END_TIME_INITIAL_VALUE = -1L;
+  // wait over 5s for driver to close is abnormal
+  private static final long LONG_WAIT_DURATION = 5_000_000_000L;
   private final FragmentInstanceId id;
 
   private final FragmentInstanceStateMachine stateMachine;
@@ -675,6 +677,7 @@ public class FragmentInstanceContext extends QueryContext {
 
   @SuppressWarnings("squid:S2142")
   public void releaseResourceWhenAllDriversAreClosed() {
+    long startTime = System.nanoTime();
     while (true) {
       try {
         allDriversClosed.await();
@@ -685,6 +688,10 @@ public class FragmentInstanceContext extends QueryContext {
             "Interrupted when await on allDriversClosed, FragmentInstance Id 
is {}", this.getId());
       }
     }
+    long duration = System.nanoTime() - startTime;
+    if (duration >= LONG_WAIT_DURATION) {
+      LOGGER.warn("Wait {}ms for all Drivers closed", duration / 1_000_000);
+    }
     releaseResource();
   }
 

Reply via email to