This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e8bf6ef1a8 [IOTDB-5810] Result handle in memory pool is not be cleaned
properly (#9685)
e8bf6ef1a8 is described below
commit e8bf6ef1a8e1227d3f9085808e410f6dec7c9984
Author: Xiangwei Wei <[email protected]>
AuthorDate: Tue Apr 25 00:38:33 2023 +0800
[IOTDB-5810] Result handle in memory pool is not be cleaned properly (#9685)
---------
Co-authored-by: JackieTien97 <[email protected]>
---
.../iotdb/db/mpp/plan/execution/QueryExecution.java | 21 +++++++++++++++++++++
1 file changed, 21 insertions(+)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 94fb69067e..08b0674b1e 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -30,12 +30,14 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.query.KilledByOthersException;
import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
import org.apache.iotdb.db.mpp.execution.QueryState;
import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeService;
import org.apache.iotdb.db.mpp.execution.exchange.source.ISourceHandle;
+import org.apache.iotdb.db.mpp.execution.exchange.source.SourceHandle;
import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
import org.apache.iotdb.db.mpp.plan.analyze.Analyzer;
@@ -62,6 +64,7 @@ import
org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.LoadTsFileStatement;
import org.apache.iotdb.db.utils.SetThreadName;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -391,6 +394,23 @@ public class QueryExecution implements IQueryExecution {
// waiting it to be finished.
if (resultHandle != null) {
resultHandle.close();
+ cleanUpResultHandle();
+ }
+ }
+
+ private void cleanUpResultHandle() {
+ // Result handle belongs to special fragment instance, so we need to
deregister it alone
+ // We don't need to deal with MemorySourceHandle because it doesn't
register to memory pool
+ // We don't need to deal with LocalSourceHandle because the
SharedTsBlockQueue uses the upstream
+ // FragmentInstanceId to register
+ if (resultHandle instanceof SourceHandle) {
+ TFragmentInstanceId fragmentInstanceId =
resultHandle.getLocalFragmentInstanceId();
+ MPPDataExchangeService.getInstance()
+ .getMPPDataExchangeManager()
+ .deRegisterFragmentInstanceFromMemoryPool(
+ fragmentInstanceId.queryId,
+
FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+ fragmentInstanceId));
}
}
@@ -415,6 +435,7 @@ public class QueryExecution implements IQueryExecution {
} else {
resultHandle.close();
}
+ cleanUpResultHandle();
}
}