This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch deregisterSourceHandle1.1 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4cf0ad663a629fa92b74ffd23504659d17095393 Author: JackieTien97 <[email protected]> AuthorDate: Mon Apr 24 20:18:28 2023 +0800 Fix CI --- .../db/mpp/plan/execution/QueryExecution.java | 27 +++++++++++++++------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java index 8e18cbb6d5..08b0674b1e 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java @@ -37,6 +37,7 @@ import org.apache.iotdb.db.mpp.execution.QueryState; import org.apache.iotdb.db.mpp.execution.QueryStateMachine; import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeService; import org.apache.iotdb.db.mpp.execution.exchange.source.ISourceHandle; +import org.apache.iotdb.db.mpp.execution.exchange.source.SourceHandle; import org.apache.iotdb.db.mpp.metric.QueryMetricsManager; import org.apache.iotdb.db.mpp.plan.analyze.Analysis; import org.apache.iotdb.db.mpp.plan.analyze.Analyzer; @@ -393,6 +394,23 @@ public class QueryExecution implements IQueryExecution { // waiting it to be finished. if (resultHandle != null) { resultHandle.close(); + cleanUpResultHandle(); + } + } + + private void cleanUpResultHandle() { + // Result handle belongs to special fragment instance, so we need to deregister it alone + // We don't need to deal with MemorySourceHandle because it doesn't register to memory pool + // We don't need to deal with LocalSourceHandle because the SharedTsBlockQueue uses the upstream + // FragmentInstanceId to register + if (resultHandle instanceof SourceHandle) { + TFragmentInstanceId fragmentInstanceId = resultHandle.getLocalFragmentInstanceId(); + MPPDataExchangeService.getInstance() + .getMPPDataExchangeManager() + .deRegisterFragmentInstanceFromMemoryPool( + fragmentInstanceId.queryId, + FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId( + fragmentInstanceId)); } } @@ -417,14 +435,7 @@ public class QueryExecution implements IQueryExecution { } else { resultHandle.close(); } - // Result handle belongs to special fragment instance, so we need to deregister it alone - TFragmentInstanceId fragmentInstanceId = resultHandle.getLocalFragmentInstanceId(); - MPPDataExchangeService.getInstance() - .getMPPDataExchangeManager() - .deRegisterFragmentInstanceFromMemoryPool( - fragmentInstanceId.queryId, - FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId( - fragmentInstanceId)); + cleanUpResultHandle(); } }
