This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new bb68d508629 Fix kill query doesn't take effect bug (#17358)
bb68d508629 is described below
commit bb68d508629df7e19f0b8782b90cbc127d159792
Author: Jackie Tien <[email protected]>
AuthorDate: Fri Mar 27 17:16:45 2026 +0800
Fix kill query doesn't take effect bug (#17358)
---
.../protocol/thrift/impl/ClientRPCServiceImpl.java | 27 ++++--
.../db/queryengine/common/MPPQueryContext.java | 14 +--
.../queryengine/execution/QueryStateMachine.java | 4 +-
.../fragment/FragmentInstanceManager.java | 2 +
.../iotdb/db/queryengine/plan/Coordinator.java | 34 ++++++-
.../analyze/schema/ClusterSchemaFetchExecutor.java | 43 +++++----
.../plan/execution/IQueryExecution.java | 25 ++++-
.../queryengine/plan/execution/QueryExecution.java | 101 ++++++++++++++-------
.../plan/execution/config/ConfigExecution.java | 27 ++++--
.../operator/MergeTreeSortOperatorTest.java | 11 ++-
10 files changed, 206 insertions(+), 82 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index d3f6b7d91dd..ed754c418c6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -245,6 +245,7 @@ import static
org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
import static
org.apache.iotdb.db.utils.QueryDataSetUtils.convertTsBlockByFetchSize;
import static org.apache.iotdb.rpc.RpcUtils.TIME_PRECISION;
+import static org.apache.iotdb.rpc.TSStatusCode.QUERY_WAS_KILLED;
public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
@@ -286,6 +287,9 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
private static final int DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES =
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
+ private static final String NO_QUERY_EXECUTION_ERR_MSG =
+ "Query is not found, it may be killed by others, timeout or some other
runtime errors, you can see more details in server log.";
+
@FunctionalInterface
public interface SelectResult {
@@ -1526,15 +1530,18 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
finished = true;
return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus());
}
- TSFetchResultsResp resp =
RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
queryExecution = COORDINATOR.getQueryExecution(req.queryId);
if (queryExecution == null) {
- resp.setHasResultSet(false);
- resp.setMoreData(false);
- return resp;
+ TSStatus noQueryExecutionStatus = new
TSStatus(QUERY_WAS_KILLED.getStatusCode());
+ noQueryExecutionStatus.setMessage(NO_QUERY_EXECUTION_ERR_MSG);
+ return RpcUtils.getTSFetchResultsResp(noQueryExecutionStatus);
}
+
+ TSFetchResultsResp resp =
RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
+
+ queryExecution.updateCurrentRpcStartTime(startTime);
statementType = queryExecution.getStatementType();
try (SetThreadName queryName = new
SetThreadName(queryExecution.getQueryId())) {
@@ -2272,16 +2279,16 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus());
}
- TSFetchResultsResp resp =
RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
-
queryExecution = COORDINATOR.getQueryExecution(req.queryId);
if (queryExecution == null) {
- resp.setHasResultSet(false);
- resp.setMoreData(true);
- return resp;
+ TSStatus noQueryExecutionStatus = new
TSStatus(QUERY_WAS_KILLED.getStatusCode());
+ noQueryExecutionStatus.setMessage(NO_QUERY_EXECUTION_ERR_MSG);
+ return RpcUtils.getTSFetchResultsResp(noQueryExecutionStatus);
}
+ queryExecution.updateCurrentRpcStartTime(startTime);
statementType = queryExecution.getStatementType();
+ TSFetchResultsResp resp =
RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
try (SetThreadName queryName = new
SetThreadName(queryExecution.getQueryId())) {
Pair<TSQueryDataSet, Boolean> pair =
convertTsBlockByFetchSize(queryExecution, req.fetchSize);
@@ -2291,7 +2298,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
resp.setHasResultSet(hasResultSet);
resp.setQueryDataSet(result);
resp.setIsAlign(true);
- resp.setMoreData(finished);
+ resp.setMoreData(!finished);
return resp;
}
} catch (Exception e) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
index 0294a14af25..88bd1998f68 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
@@ -76,7 +76,11 @@ public class MPPQueryContext implements IAuditEntity {
private long localQueryId;
private SessionInfo session;
private QueryType queryType = QueryType.READ;
+
+ /** the max executing time of query in ms. Unit: millisecond */
private long timeOut;
+
+ // time unit is ms
private long startTime;
private TEndPoint localDataBlockEndpoint;
@@ -147,6 +151,7 @@ public class MPPQueryContext implements IAuditEntity {
// Tables in the subquery
private final Map<NodeRef<Query>, List<Identifier>> subQueryTables = new
HashMap<>();
+ @TestOnly
public MPPQueryContext(QueryId queryId) {
this.queryId = queryId;
this.endPointBlackList = ConcurrentHashMap.newKeySet();
@@ -161,12 +166,7 @@ public class MPPQueryContext implements IAuditEntity {
SessionInfo session,
TEndPoint localDataBlockEndpoint,
TEndPoint localInternalEndpoint) {
- this(queryId);
- this.sql = sql;
- this.session = session;
- this.localDataBlockEndpoint = localDataBlockEndpoint;
- this.localInternalEndpoint = localInternalEndpoint;
- this.initResultNodeContext();
+ this(sql, queryId, -1, session, localDataBlockEndpoint,
localInternalEndpoint);
}
public MPPQueryContext(
@@ -244,10 +244,12 @@ public class MPPQueryContext implements IAuditEntity {
return queryType;
}
+ /** the max executing time of query in ms. Unit: millisecond */
public long getTimeOut() {
return timeOut;
}
+ /** the max executing time of query in ms. Unit: millisecond */
public void setTimeOut(long timeOut) {
this.timeOut = timeOut;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java
index 146359bc215..19d0fb38749 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java
@@ -107,10 +107,10 @@ public class QueryStateMachine {
transitionToDoneState(CANCELED);
}
- public void transitionToCanceled(Throwable throwable, TSStatus
failureStatus) {
+ public boolean transitionToCanceled(Throwable throwable, TSStatus
failureStatus) {
this.failureStatus.compareAndSet(null, failureStatus);
this.failureException.compareAndSet(null, throwable);
- transitionToDoneState(CANCELED);
+ return transitionToDoneState(CANCELED);
}
public void transitionToAborted() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
index 3f6812943d6..1898cbfe53c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
@@ -38,6 +38,7 @@ import
org.apache.iotdb.db.queryengine.execution.schedule.DriverScheduler;
import org.apache.iotdb.db.queryengine.execution.schedule.IDriverScheduler;
import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet;
import org.apache.iotdb.db.queryengine.metric.QueryRelatedResourceMetricSet;
+import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
import org.apache.iotdb.db.queryengine.plan.planner.PipelineDriverFactory;
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
@@ -435,6 +436,7 @@ public class FragmentInstanceManager {
+ "ms, and now is in flushing state"));
}
});
+ Coordinator.getInstance().cleanUpStaleQueries();
}
public ExecutorService getIntoOperationExecutor() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
index b0d28f03c02..9342b3a4674 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.commons.memory.MemoryBlockType;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.protocol.session.IClientSession;
import org.apache.iotdb.db.protocol.session.PreparedStatementInfo;
@@ -805,7 +806,7 @@ public class Coordinator {
}
public void cleanupQueryExecution(Long queryId, Supplier<String>
contentSupplier, Throwable t) {
- IQueryExecution queryExecution = getQueryExecution(queryId);
+ IQueryExecution queryExecution = queryExecutionMap.remove(queryId);
if (queryExecution != null) {
cleanupQueryExecutionInternal(queryId, queryExecution, contentSupplier,
t);
}
@@ -813,7 +814,7 @@ public class Coordinator {
public void cleanupQueryExecution(
Long queryId, org.apache.thrift.TBase<?, ?> nativeApiRequest, Throwable
t) {
- IQueryExecution queryExecution = getQueryExecution(queryId);
+ IQueryExecution queryExecution = queryExecutionMap.remove(queryId);
if (queryExecution != null) {
Supplier<String> contentSupplier =
new ContentOfQuerySupplier(nativeApiRequest, queryExecution);
@@ -898,6 +899,35 @@ public class Coordinator {
}
}
+ /**
+ * We need to reclaim resources from queries that have exceeded their
timeout by more than one
+ * minute. This indicates that the associated clients have failed to perform
proper resource
+ * cleanup.
+ */
+ public void cleanUpStaleQueries() {
+ long currentTime = System.currentTimeMillis();
+ queryExecutionMap.forEach(
+ (queryId, queryExecution) -> {
+ if (queryExecution.isActive()) {
+ return;
+ }
+ long timeout = queryExecution.getTimeout();
+ long queryStartTime = queryExecution.getStartExecutionTime();
+ long executeTime = currentTime - queryStartTime;
+ if (timeout > 0 && executeTime - 60_000L > timeout) {
+ LOGGER.warn(
+ "Cleaning up stale query with id {}, which has been running
for {} ms, timeout duration is: {}ms",
+ queryId,
+ executeTime,
+ timeout);
+ cleanupQueryExecution(
+ queryId,
+ (org.apache.thrift.TBase<?, ?>) null,
+ new QueryTimeoutRuntimeException(queryStartTime, currentTime,
timeout));
+ }
+ });
+ }
+
public void cleanupQueryExecution(Long queryId) {
cleanupQueryExecution(queryId, (org.apache.thrift.TBase<?, ?>) null, null);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
index aad9f50aca0..d575f6420eb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
@@ -36,6 +36,7 @@ import
org.apache.iotdb.db.queryengine.exception.MemoryNotEnoughException;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import
org.apache.iotdb.db.queryengine.plan.statement.internal.DeviceSchemaFetchStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.internal.SeriesSchemaFetchStatement;
@@ -265,30 +266,38 @@ class ClusterSchemaFetchExecutor {
String.format("Fetch Schema failed, because %s",
executionResult.status.getMessage()),
executionResult.status.getCode());
}
+ IQueryExecution queryExecution = coordinator.getQueryExecution(queryId);
try (SetThreadName ignored = new
SetThreadName(executionResult.queryId.getId())) {
ClusterSchemaTree result = new ClusterSchemaTree();
ClusterSchemaTree.SchemaNodeBatchDeserializer deserializer =
new ClusterSchemaTree.SchemaNodeBatchDeserializer();
Set<String> databaseSet = new HashSet<>();
- while (coordinator.getQueryExecution(queryId).hasNextResult()) {
- // The query will be transited to FINISHED when invoking
getBatchResult() at the last time
- // So we don't need to clean up it manually
- Optional<TsBlock> tsBlock;
- try {
- tsBlock = coordinator.getQueryExecution(queryId).getBatchResult();
- } catch (IoTDBException e) {
- t = e;
- throw new QuerySchemaFetchFailedException(
- String.format("Fetch Schema failed: %s", e.getMessage()),
e.getErrorCode());
- }
- if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) {
- break;
- }
- Column column = tsBlock.get().getColumn(0);
- for (int i = 0; i < column.getPositionCount(); i++) {
- parseFetchedData(column.getBinary(i), result, deserializer,
databaseSet, context);
+ if (queryExecution != null) {
+ while (queryExecution.hasNextResult()) {
+ // The query will be transited to FINISHED when invoking
getBatchResult() at the last
+ // time
+ // So we don't need to clean up it manually
+ Optional<TsBlock> tsBlock;
+ try {
+ tsBlock = queryExecution.getBatchResult();
+ } catch (IoTDBException e) {
+ t = e;
+ throw new QuerySchemaFetchFailedException(
+ String.format("Fetch Schema failed: %s", e.getMessage()),
e.getErrorCode());
+ }
+ if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) {
+ break;
+ }
+ Column column = tsBlock.get().getColumn(0);
+ for (int i = 0; i < column.getPositionCount(); i++) {
+ parseFetchedData(column.getBinary(i), result, deserializer,
databaseSet, context);
+ }
}
+ } else {
+ throw new RuntimeException(
+ String.format("Fetch Schema failed, because queryExecution is
null for %s", queryId));
}
+
result.setDatabases(databaseSet);
return result;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java
index f754bc4b10e..9b5a183d98a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java
@@ -35,8 +35,6 @@ public interface IQueryExecution {
void stop(Throwable t);
- void stopAndCleanup();
-
void stopAndCleanup(Throwable t);
void cancel();
@@ -61,15 +59,38 @@ public interface IQueryExecution {
String getQueryId();
+ // time unit is ms
long getStartExecutionTime();
+ /**
+ * @param executionTime time unit should be ns
+ */
void recordExecutionTime(long executionTime);
+ /**
+ * update current rpc start time, which is used to calculate rpc execution
time and update total
+ * execution time
+ *
+ * @param startTime start time of current rpc, time unit is ns
+ */
+ void updateCurrentRpcStartTime(long startTime);
+
+ /**
+ * Check if there is an active RPC for this query. If {@code
startTimeOfCurrentRpc == -1}, it
+ * means there is no active RPC, otherwise there is an active RPC. An active
RPC means that the
+ * client is still fetching results and the QueryExecution should not be
cleaned up until the RPC
+ * finishes. On the other hand, if there is no active RPC, it means that the
client has finished
+ * fetching results or has not started fetching results yet, and the
QueryExecution can be safely
+ * cleaned up.
+ */
+ boolean isActive();
+
/**
* @return cost time in ns
*/
long getTotalExecutionTime();
+ /** the max executing time of query in ms. Unit: millisecond */
long getTimeout();
Optional<String> getExecuteSQL();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
index 6e940e9816e..7500fbb9c34 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
@@ -39,6 +39,7 @@ import
org.apache.iotdb.db.queryengine.execution.exchange.source.ISourceHandle;
import org.apache.iotdb.db.queryengine.execution.exchange.source.SourceHandle;
import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet;
import org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet;
+import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
import
org.apache.iotdb.db.queryengine.plan.execution.memory.MemorySourceHandle;
@@ -108,9 +109,14 @@ public class QueryExecution implements IQueryExecution {
private final AtomicBoolean stopped;
- // cost time in ns
+ // cost time in ns of finished rpc
private long totalExecutionTime = 0;
+ // -1 if previous rpc is finished and next client req hasn't come yet, unit
is ns
+ // it will be updated in fetchResult rpc
+ // protected by synchronized(this)
+ private volatile long startTimeOfCurrentRpc = System.nanoTime();
+
private static final QueryExecutionMetricSet QUERY_EXECUTION_METRIC_SET =
QueryExecutionMetricSet.getInstance();
private static final QueryPlanCostMetricSet QUERY_PLAN_COST_METRIC_SET =
@@ -133,14 +139,19 @@ public class QueryExecution implements IQueryExecution {
if (!state.isDone()) {
return;
}
+ Throwable cause = null;
if (state == QueryState.FAILED
|| state == QueryState.ABORTED
|| state == QueryState.CANCELED) {
- LOGGER.debug("[ReleaseQueryResource] state is: {}", state);
- Throwable cause = stateMachine.getFailureException();
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("[ReleaseQueryResource] state is: {}", state);
+ }
+ cause = stateMachine.getFailureException();
releaseResource(cause);
+
+ this.cleanUpCoordinatorContextMapIfNeeded(cause);
}
- this.stop(null);
+ this.stop(cause);
}
});
this.stopped = new AtomicBoolean(false);
@@ -321,33 +332,20 @@ public class QueryExecution implements IQueryExecution {
}
}
- // Stop the query and clean up all the resources this query occupied
- @Override
- public void stopAndCleanup() {
- stop(null);
- releaseResource();
- }
-
@Override
public void cancel() {
- stateMachine.transitionToCanceled(
- new KilledByOthersException(),
- new TSStatus(TSStatusCode.QUERY_WAS_KILLED.getStatusCode())
- .setMessage(KilledByOthersException.MESSAGE));
- }
-
- /** Release the resources that current QueryExecution hold. */
- private void releaseResource() {
- // close ResultHandle to unblock client's getResult request
- // Actually, we should not close the ResultHandle when the QueryExecution
is Finished.
- // There are only two scenarios where the ResultHandle should be closed:
- // 1. The client fetch all the result and the ResultHandle is finished.
- // 2. The client's connection is closed that all owned QueryExecution
should be cleaned up
- // If the QueryExecution's state is abnormal, we should also abort the
resultHandle without
- // waiting it to be finished.
- if (resultHandle != null) {
- resultHandle.close();
- cleanUpResultHandle();
+ Throwable cause = new KilledByOthersException();
+ boolean cancelled =
+ stateMachine.transitionToCanceled(
+ cause,
+ new TSStatus(TSStatusCode.QUERY_WAS_KILLED.getStatusCode())
+ .setMessage(KilledByOthersException.MESSAGE));
+ if (!cancelled) {
+ // cancel failed, means this query has already in a done state, we can
do nothing to change
+ // the state but clean up the resource if needed
+ // we don't need to do cleanUpCoordinatorContextMapIfNeeded if cancel
succeed, because it will
+ // be called in callback logic in QueryStateMachine of this
QueryExecution
+ this.cleanUpCoordinatorContextMapIfNeeded(cause);
}
}
@@ -356,7 +354,7 @@ public class QueryExecution implements IQueryExecution {
// We don't need to deal with MemorySourceHandle because it doesn't
register to memory pool
// We don't need to deal with LocalSourceHandle because the
SharedTsBlockQueue uses the upstream
// FragmentInstanceId to register
- if (resultHandleCleanUp.compareAndSet(false, true) && resultHandle
instanceof SourceHandle) {
+ if (resultHandle instanceof SourceHandle) {
TFragmentInstanceId fragmentInstanceId =
resultHandle.getLocalFragmentInstanceId();
MPPDataExchangeService.getInstance()
.getMPPDataExchangeManager()
@@ -384,7 +382,7 @@ public class QueryExecution implements IQueryExecution {
// 2. The client's connection is closed that all owned QueryExecution
should be cleaned up
// If the QueryExecution's state is abnormal, we should also abort the
resultHandle without
// waiting it to be finished.
- if (resultHandle != null) {
+ if (resultHandle != null && resultHandleCleanUp.compareAndSet(false,
true)) {
if (t != null) {
resultHandle.abort(t);
} else {
@@ -394,6 +392,32 @@ public class QueryExecution implements IQueryExecution {
}
}
+ /**
+ * Clear up Coordinator.queryExecutionMap by calling
Coordinator.cleanupQueryExecution if there is
+ * no RPC in progress for this query (that is, the current RPC has finished
and {@code
+ * startTimeOfCurrentRpc == -1}). In cases where an RPC is still active, the
finally block in
+ * ClientRPCServiceImpl is responsible for performing the cleanup.
+ */
+ private synchronized void cleanUpCoordinatorContextMapIfNeeded(Throwable t) {
+ if (isActive()) {
+ Coordinator.getInstance()
+ .cleanupQueryExecution(
+ context.getLocalQueryId(), (org.apache.thrift.TBase<?, ?>) null,
t);
+ }
+ }
+
+ /**
+ * Check if there is an active RPC for this query. If {@code
startTimeOfCurrentRpc == -1}, it
+ * means there is no active RPC, otherwise there is an active RPC. An active
RPC means that the
+ * client is still fetching results and the QueryExecution should not be
cleaned up until the RPC
+ * finishes. On the other hand, if there is no active RPC, it means that the
client has finished
+ * fetching results or has not started fetching results yet, and the
QueryExecution can be safely
+ * cleaned up.
+ */
+ public synchronized boolean isActive() {
+ return startTimeOfCurrentRpc == -1;
+ }
+
/**
* This method will be called by the request thread from client connection.
This method will block
* until one of these conditions occurs: 1. There is a batch of result 2.
There is no more result
@@ -671,13 +695,22 @@ public class QueryExecution implements IQueryExecution {
}
@Override
- public void recordExecutionTime(long executionTime) {
+ public synchronized void recordExecutionTime(long executionTime) {
totalExecutionTime += executionTime;
+ // recordExecutionTime is called after current rpc finished, so we need to
set
+ // startTimeOfCurrentRpc to -1
+ this.startTimeOfCurrentRpc = -1;
+ }
+
+ @Override
+ public synchronized void updateCurrentRpcStartTime(long startTime) {
+ this.startTimeOfCurrentRpc = startTime;
}
@Override
- public long getTotalExecutionTime() {
- return totalExecutionTime;
+ public synchronized long getTotalExecutionTime() {
+ return totalExecutionTime
+ + (startTimeOfCurrentRpc == -1 ? 0 : System.nanoTime() -
startTimeOfCurrentRpc);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java
index f431bb2e51f..2b70a667be9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java
@@ -113,6 +113,12 @@ public class ConfigExecution implements IQueryExecution {
private final StatementType statementType;
private long totalExecutionTime;
+ // -1 if previous rpc is finished and next client req hasn't come yet, unit
is ns
+ // it will be updated in fetchResult rpc
+ // currently, ConfigExecution will return result is just one call, so this
field is not used. But
+ // we will keep it for future use when ConfigExecution may return result in
multiple calls
+ private volatile long startTimeOfCurrentRpc = System.nanoTime();
+
public ConfigExecution(
MPPQueryContext context,
StatementType statementType,
@@ -222,11 +228,6 @@ public class ConfigExecution implements IQueryExecution {
// do nothing
}
- @Override
- public void stopAndCleanup() {
- // do nothing
- }
-
@Override
public void stopAndCleanup(Throwable t) {
// do nothing
@@ -327,11 +328,25 @@ public class ConfigExecution implements IQueryExecution {
@Override
public void recordExecutionTime(long executionTime) {
totalExecutionTime += executionTime;
+ // recordExecutionTime is called after current rpc finished, so we need to
set
+ // startTimeOfCurrentRpc to -1
+ this.startTimeOfCurrentRpc = -1;
+ }
+
+ @Override
+ public void updateCurrentRpcStartTime(long startTime) {
+ this.startTimeOfCurrentRpc = startTime;
+ }
+
+ @Override
+ public boolean isActive() {
+ return startTimeOfCurrentRpc == -1;
}
@Override
public long getTotalExecutionTime() {
- return totalExecutionTime;
+ return totalExecutionTime
+ + (startTimeOfCurrentRpc == -1 ? 0 : System.nanoTime() -
startTimeOfCurrentRpc);
}
@Override
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeTreeSortOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeTreeSortOperatorTest.java
index 1739944be7c..2d0c1bf543c 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeTreeSortOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeTreeSortOperatorTest.java
@@ -1826,6 +1826,14 @@ public class MergeTreeSortOperatorTest {
@Override
public void recordExecutionTime(long executionTime) {}
+ @Override
+ public void updateCurrentRpcStartTime(long startTime) {}
+
+ @Override
+ public boolean isActive() {
+ return true;
+ }
+
@Override
public long getTotalExecutionTime() {
return 0;
@@ -1857,9 +1865,6 @@ public class MergeTreeSortOperatorTest {
@Override
public void stop(Throwable t) {}
- @Override
- public void stopAndCleanup() {}
-
@Override
public void stopAndCleanup(Throwable t) {}