This is an automated email from the ASF dual-hosted git repository. weihao pushed a commit to branch lwh/SQ-mp in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 55f7c5119a7a7e5d1371ea5314af3b9b667075ce Author: Weihao Li <[email protected]> AuthorDate: Fri Apr 17 11:44:51 2026 +0800 modify some Signed-off-by: Weihao Li <[email protected]> --- .../iotdb/db/queryengine/execution/memory/MemoryPool.java | 15 ++++++++++++--- .../queryengine/plan/planner/plan/FragmentInstance.java | 2 ++ 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java index 6afa7f692ef..6ff862e5d0e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java @@ -256,6 +256,7 @@ public class MemoryPool { * @throws IllegalArgumentException throw exception if current query requests more memory than can * be allocated. */ + @TestOnly public Pair<ListenableFuture<Void>, Boolean> reserve( String queryId, String fragmentInstanceId, @@ -268,13 +269,21 @@ public class MemoryPool { return new Pair<>(result.getFuture(), result.isReserveSuccess()); } + /** + * Reserve memory with bytesToReserve respect priority. + * + * @return if reserve succeed, reservedBytes may be zero or equals with bytesToReserve; if reserve + * failed, reservedBytes must be equals with bytesToReserve + * @throws IllegalArgumentException throw exception if current query requests more memory than can + * be allocated. + */ public MemoryReservationResult reserveWithPriority( String queryId, String fragmentInstanceId, String planNodeId, long bytesToReserve, long maxBytesCanReserve, - boolean needSetHighestPriority) { + boolean isHighestPriority) { Validate.notNull(queryId, "queryId can not be null."); Validate.notNull(fragmentInstanceId, "fragmentInstanceId can not be null."); Validate.notNull(planNodeId, "planNodeId can not be null."); @@ -299,8 +308,8 @@ public class MemoryPool { return new MemoryReservationResult(immediateVoidFuture(), true, bytesToReserve); } else { rollbackReserve(queryId, fragmentInstanceId, planNodeId, bytesToReserve); - if (needSetHighestPriority) { - // SHOW QUERIES etc.: treat as success with zero bytes reserved from pool when insufficient. + if (isHighestPriority) { + // SHOW QUERIES: treat as success with zero bytes reserved from pool when insufficient. return new MemoryReservationResult(immediateVoidFuture(), true, 0L); } LOGGER.debug( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java index d53e29e1692..ae99aac6fba 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java @@ -267,6 +267,7 @@ public class FragmentInstance implements IConsensusRequest { fragmentInstance.hostDataNode = hasHostDataNode ? ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(buffer) : null; fragmentInstance.isExplainAnalyze = ReadWriteIOUtils.readBool(buffer); + fragmentInstance.setHighestPriority(ReadWriteIOUtils.readBool(buffer)); return fragmentInstance; } @@ -293,6 +294,7 @@ public class FragmentInstance implements IConsensusRequest { ThriftCommonsSerDeUtils.serializeTDataNodeLocation(hostDataNode, outputStream); } ReadWriteIOUtils.write(isExplainAnalyze, outputStream); + ReadWriteIOUtils.write(isHighestPriority, outputStream); return ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size()); } catch (IOException e) { LOGGER.error("Unexpected error occurs when serializing this FragmentInstance.", e);
