This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch deregisterSourceHandle in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 5a5b48b1b6e4b204c428e67afa00c1a72f5b72d0 Author: Alima777 <[email protected]> AuthorDate: Mon Apr 24 16:22:06 2023 +0800 deregister result handle --- .../org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java index 94fb69067e..8e18cbb6d5 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java @@ -30,6 +30,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.query.KilledByOthersException; import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException; +import org.apache.iotdb.db.mpp.common.FragmentInstanceId; import org.apache.iotdb.db.mpp.common.MPPQueryContext; import org.apache.iotdb.db.mpp.common.header.DatasetHeader; import org.apache.iotdb.db.mpp.execution.QueryState; @@ -62,6 +63,7 @@ import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement; import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsStatement; import org.apache.iotdb.db.mpp.plan.statement.crud.LoadTsFileStatement; import org.apache.iotdb.db.utils.SetThreadName; +import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.tsfile.read.common.block.TsBlock; @@ -415,6 +417,14 @@ public class QueryExecution implements IQueryExecution { } else { resultHandle.close(); } + // Result handle belongs to special fragment instance, so we need to deregister it alone + TFragmentInstanceId fragmentInstanceId = resultHandle.getLocalFragmentInstanceId(); + MPPDataExchangeService.getInstance() + .getMPPDataExchangeManager() + .deRegisterFragmentInstanceFromMemoryPool( + fragmentInstanceId.queryId, + FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId( + fragmentInstanceId)); } }
