This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch RecordBlockedQuery-1.3 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 51dd4452a0716b1d14a1bf0ebc88d223210adb0d Author: JackieTien97 <[email protected]> AuthorDate: Thu Jun 26 16:06:32 2025 +0800 Record release resource blocked too long by driver --- .../queryengine/execution/QueryStateMachine.java | 55 +++++++++++++++------- .../fragment/FragmentInstanceContext.java | 7 +++ 2 files changed, 44 insertions(+), 18 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java index 581b015eeb6..cc0f787b014 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java @@ -21,12 +21,15 @@ package org.apache.iotdb.db.queryengine.execution; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.exception.IoTDBException; +import org.apache.iotdb.commons.exception.IoTDBRuntimeException; import org.apache.iotdb.db.queryengine.common.QueryId; import org.apache.iotdb.db.queryengine.plan.execution.QueryExecution; +import org.apache.iotdb.rpc.RpcUtils; import com.google.common.util.concurrent.ListenableFuture; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicReference; import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; @@ -39,6 +42,7 @@ import static org.apache.iotdb.db.queryengine.execution.QueryState.PENDING_RETRY import static org.apache.iotdb.db.queryengine.execution.QueryState.PLANNED; import static org.apache.iotdb.db.queryengine.execution.QueryState.QUEUED; import static org.apache.iotdb.db.queryengine.execution.QueryState.RUNNING; +import static org.apache.iotdb.db.utils.ErrorHandlingUtils.getRootCause; /** * State machine for a {@link QueryExecution}. It stores the states for the {@link QueryExecution}. @@ -47,8 +51,8 @@ import static org.apache.iotdb.db.queryengine.execution.QueryState.RUNNING; public class QueryStateMachine { private final StateMachine<QueryState> queryState; - private Throwable failureException; - private TSStatus failureStatus; + private final AtomicReference<Throwable> failureException = new AtomicReference<>(); + private final AtomicReference<TSStatus> failureStatus = new AtomicReference<>(); public QueryStateMachine(QueryId queryId, ExecutorService executor) { this.queryState = @@ -71,6 +75,8 @@ public class QueryStateMachine { public void transitionToQueued() { queryState.set(QUEUED); + failureException.set(null); + failureStatus.set(null); } public void transitionToPlanned() { @@ -82,7 +88,7 @@ public class QueryStateMachine { } public void transitionToPendingRetry(TSStatus failureStatus) { - this.failureStatus = failureStatus; + this.failureStatus.compareAndSet(null, failureStatus); queryState.setIf(PENDING_RETRY, currentState -> currentState == DISPATCHING); } @@ -102,10 +108,9 @@ public class QueryStateMachine { } public void transitionToCanceled(Throwable throwable, TSStatus failureStatus) { - if (transitionToDoneState(CANCELED)) { - this.failureException = throwable; - this.failureStatus = failureStatus; - } + this.failureStatus.compareAndSet(null, failureStatus); + this.failureException.compareAndSet(null, throwable); + transitionToDoneState(CANCELED); } public void transitionToAborted() { @@ -117,15 +122,13 @@ public class QueryStateMachine { } public void transitionToFailed(Throwable throwable) { - if (transitionToDoneState(FAILED)) { - this.failureException = throwable; - } + this.failureException.compareAndSet(null, throwable); + transitionToDoneState(FAILED); } public void transitionToFailed(TSStatus failureStatus) { - if (transitionToDoneState(FAILED)) { - this.failureStatus = failureStatus; - } + this.failureStatus.compareAndSet(null, failureStatus); + transitionToDoneState(FAILED); } private boolean transitionToDoneState(QueryState doneState) { @@ -136,21 +139,37 @@ public class QueryStateMachine { } public String getFailureMessage() { - if (failureException != null) { - return failureException.getMessage(); + Throwable throwable = failureException.get(); + if (throwable != null) { + return throwable.getMessage(); } return "no detailed failure reason in QueryStateMachine"; } public Throwable getFailureException() { - if (failureException == null) { + Throwable throwable = failureException.get(); + if (throwable == null) { return new IoTDBException(getFailureStatus().getMessage(), getFailureStatus().code); } else { - return failureException; + return throwable; } } public TSStatus getFailureStatus() { - return failureStatus; + TSStatus status = failureStatus.get(); + if (status != null) { + return status; + } else { + Throwable throwable = failureException.get(); + if (throwable != null) { + Throwable t = getRootCause(throwable); + if (t instanceof IoTDBRuntimeException) { + return RpcUtils.getStatus(((IoTDBRuntimeException) t).getErrorCode(), t.getMessage()); + } else if (t instanceof IoTDBException) { + return RpcUtils.getStatus(((IoTDBException) t).getErrorCode(), t.getMessage()); + } + } + return failureStatus.get(); + } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java index 917e378fd66..f6afb5b71b6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java @@ -71,6 +71,8 @@ public class FragmentInstanceContext extends QueryContext { private static final Logger LOGGER = LoggerFactory.getLogger(FragmentInstanceContext.class); private static final long END_TIME_INITIAL_VALUE = -1L; + // wait over 5s for driver to close is abnormal + private static final long LONG_WAIT_DURATION = 5_000_000_000L; private final FragmentInstanceId id; private final FragmentInstanceStateMachine stateMachine; @@ -675,6 +677,7 @@ public class FragmentInstanceContext extends QueryContext { @SuppressWarnings("squid:S2142") public void releaseResourceWhenAllDriversAreClosed() { + long startTime = System.nanoTime(); while (true) { try { allDriversClosed.await(); @@ -685,6 +688,10 @@ public class FragmentInstanceContext extends QueryContext { "Interrupted when await on allDriversClosed, FragmentInstance Id is {}", this.getId()); } } + long duration = System.nanoTime() - startTime; + if (duration >= LONG_WAIT_DURATION) { + LOGGER.warn("Wait {}ms for all Drivers closed", duration / 1_000_000); + } releaseResource(); }
