This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 0ffa4b67deb Correct status code from 301 to 719 while memory not
enough & Record release resource blocked too long by driver (#15828)
0ffa4b67deb is described below
commit 0ffa4b67deb7ede43a93d3250a0acbf7306b654a
Author: Jackie Tien <[email protected]>
AuthorDate: Thu Jun 26 17:44:08 2025 +0800
Correct status code from 301 to 719 while memory not enough & Record
release resource blocked too long by driver (#15828)
---
.../queryengine/execution/QueryStateMachine.java | 58 ++++++++++++----------
.../fragment/FragmentInstanceContext.java | 7 +++
.../scheduler/FragmentInstanceDispatcherImpl.java | 7 +--
3 files changed, 41 insertions(+), 31 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java
index a6e3acfb44d..cc0f787b014 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.rpc.RpcUtils;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
@@ -50,8 +51,8 @@ import static
org.apache.iotdb.db.utils.ErrorHandlingUtils.getRootCause;
public class QueryStateMachine {
private final StateMachine<QueryState> queryState;
- private Throwable failureException;
- private TSStatus failureStatus;
+ private final AtomicReference<Throwable> failureException = new
AtomicReference<>();
+ private final AtomicReference<TSStatus> failureStatus = new
AtomicReference<>();
public QueryStateMachine(QueryId queryId, ExecutorService executor) {
this.queryState =
@@ -74,6 +75,8 @@ public class QueryStateMachine {
public void transitionToQueued() {
queryState.set(QUEUED);
+ failureException.set(null);
+ failureStatus.set(null);
}
public void transitionToPlanned() {
@@ -85,7 +88,7 @@ public class QueryStateMachine {
}
public void transitionToPendingRetry(TSStatus failureStatus) {
- this.failureStatus = failureStatus;
+ this.failureStatus.compareAndSet(null, failureStatus);
queryState.setIf(PENDING_RETRY, currentState -> currentState ==
DISPATCHING);
}
@@ -105,10 +108,9 @@ public class QueryStateMachine {
}
public void transitionToCanceled(Throwable throwable, TSStatus
failureStatus) {
- if (transitionToDoneState(CANCELED)) {
- this.failureException = throwable;
- this.failureStatus = failureStatus;
- }
+ this.failureStatus.compareAndSet(null, failureStatus);
+ this.failureException.compareAndSet(null, throwable);
+ transitionToDoneState(CANCELED);
}
public void transitionToAborted() {
@@ -120,15 +122,13 @@ public class QueryStateMachine {
}
public void transitionToFailed(Throwable throwable) {
- if (transitionToDoneState(FAILED)) {
- this.failureException = throwable;
- }
+ this.failureException.compareAndSet(null, throwable);
+ transitionToDoneState(FAILED);
}
public void transitionToFailed(TSStatus failureStatus) {
- if (transitionToDoneState(FAILED)) {
- this.failureStatus = failureStatus;
- }
+ this.failureStatus.compareAndSet(null, failureStatus);
+ transitionToDoneState(FAILED);
}
private boolean transitionToDoneState(QueryState doneState) {
@@ -139,31 +139,37 @@ public class QueryStateMachine {
}
public String getFailureMessage() {
- if (failureException != null) {
- return failureException.getMessage();
+ Throwable throwable = failureException.get();
+ if (throwable != null) {
+ return throwable.getMessage();
}
return "no detailed failure reason in QueryStateMachine";
}
public Throwable getFailureException() {
- if (failureException == null) {
+ Throwable throwable = failureException.get();
+ if (throwable == null) {
return new IoTDBException(getFailureStatus().getMessage(),
getFailureStatus().code);
} else {
- return failureException;
+ return throwable;
}
}
public TSStatus getFailureStatus() {
- if (failureStatus != null) {
- return failureStatus;
- } else if (failureException != null) {
- Throwable t = getRootCause(failureException);
- if (t instanceof IoTDBRuntimeException) {
- return RpcUtils.getStatus(((IoTDBRuntimeException) t).getErrorCode(),
t.getMessage());
- } else if (t instanceof IoTDBException) {
- return RpcUtils.getStatus(((IoTDBException) t).getErrorCode(),
t.getMessage());
+ TSStatus status = failureStatus.get();
+ if (status != null) {
+ return status;
+ } else {
+ Throwable throwable = failureException.get();
+ if (throwable != null) {
+ Throwable t = getRootCause(throwable);
+ if (t instanceof IoTDBRuntimeException) {
+ return RpcUtils.getStatus(((IoTDBRuntimeException)
t).getErrorCode(), t.getMessage());
+ } else if (t instanceof IoTDBException) {
+ return RpcUtils.getStatus(((IoTDBException) t).getErrorCode(),
t.getMessage());
+ }
}
+ return failureStatus.get();
}
- return failureStatus;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index 55df223db17..8b9a2a274a1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -78,6 +78,8 @@ public class FragmentInstanceContext extends QueryContext {
private static final Logger LOGGER =
LoggerFactory.getLogger(FragmentInstanceContext.class);
private static final long END_TIME_INITIAL_VALUE = -1L;
+ // wait over 5s for driver to close is abnormal
+ private static final long LONG_WAIT_DURATION = 5_000_000_000L;
private final FragmentInstanceId id;
private final FragmentInstanceStateMachine stateMachine;
@@ -708,6 +710,7 @@ public class FragmentInstanceContext extends QueryContext {
@SuppressWarnings("squid:S2142")
public void releaseResourceWhenAllDriversAreClosed() {
+ long startTime = System.nanoTime();
while (true) {
try {
allDriversClosed.await();
@@ -718,6 +721,10 @@ public class FragmentInstanceContext extends QueryContext {
"Interrupted when await on allDriversClosed, FragmentInstance Id
is {}", this.getId());
}
}
+ long duration = System.nanoTime() - startTime;
+ if (duration >= LONG_WAIT_DURATION) {
+ LOGGER.warn("Wait {}ms for all Drivers closed", duration / 1_000_000);
+ }
releaseResource();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index a756a26df7f..a4f21d2f20f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -471,8 +471,7 @@ public class FragmentInstanceDispatcherImpl implements
IFragInstanceDispatcher {
} else if (sendFragmentInstanceResp.status.getCode()
== TSStatusCode.CONSENSUS_GROUP_NOT_EXIST.getStatusCode()) {
throw new
ConsensusGroupNotExistException(sendFragmentInstanceResp.message);
- } else if (sendFragmentInstanceResp.status.getCode()
- ==
TSStatusCode.TOO_MANY_CONCURRENT_QUERIES_ERROR.getStatusCode()) {
+ } else {
throw new
FragmentInstanceDispatchException(sendFragmentInstanceResp.status);
}
} else if (sendFragmentInstanceResp.status != null) {
@@ -610,9 +609,7 @@ public class FragmentInstanceDispatcherImpl implements
IFragInstanceDispatcher {
if (!executionResult.isAccepted()) {
LOGGER.warn(executionResult.getMessage());
if (executionResult.isReadNeedRetry()) {
- if (executionResult.getStatus() != null
- && executionResult.getStatus().getCode()
- ==
TSStatusCode.TOO_MANY_CONCURRENT_QUERIES_ERROR.getStatusCode()) {
+ if (executionResult.getStatus() != null) {
throw new
FragmentInstanceDispatchException(executionResult.getStatus());
}
throw new FragmentInstanceDispatchException(