This is an automated email from the ASF dual-hosted git repository.

JackieTien97 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 63ea91a83f8 [To dev/1.3] Enable show queries to be executed 
immediately when the available memory in the memoryPool is insufficient (#17529)
63ea91a83f8 is described below

commit 63ea91a83f870d8fc9fae4386273096c1f1b98eb
Author: Weihao Li <[email protected]>
AuthorDate: Wed Apr 22 09:25:36 2026 +0800

    [To dev/1.3] Enable show queries to be executed immediately when the 
available memory in the memoryPool is insufficient (#17529)
---
 .../db/queryengine/common/MPPQueryContext.java     |  14 +++
 .../execution/exchange/MPPDataExchangeManager.java |  56 +++++++++-
 .../execution/exchange/SharedTsBlockQueue.java     |  57 ++++++----
 .../execution/exchange/sink/SinkChannel.java       |  62 ++++++++---
 .../execution/exchange/source/SourceHandle.java    |  90 ++++++++++++----
 .../fragment/FragmentInstanceContext.java          |  13 +++
 .../fragment/FragmentInstanceManager.java          |   2 +
 .../queryengine/execution/memory/MemoryPool.java   |  56 +++++++---
 .../db/queryengine/plan/analyze/Analysis.java      |   2 +-
 .../queryengine/plan/execution/QueryExecution.java |   7 +-
 .../plan/planner/OperatorTreeGenerator.java        |   6 +-
 .../plan/planner/plan/FragmentInstance.java        |   2 +
 .../execution/exchange/LocalSinkChannelTest.java   |  10 +-
 .../execution/exchange/SharedTsBlockQueueTest.java |  11 +-
 .../execution/exchange/ShuffleSinkHandleTest.java  |   5 +-
 .../execution/exchange/SourceHandleTest.java       |   5 +-
 .../db/queryengine/execution/exchange/Utils.java   |  26 +++--
 .../execution/memory/MemoryPoolTest.java           | 116 ++++++++++++++++++---
 18 files changed, 427 insertions(+), 113 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
index 7479e832a90..13773c15090 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
@@ -100,6 +100,12 @@ public class MPPQueryContext {
 
   private boolean userQuery = false;
 
+  /**
+   * When true (e.g. SHOW QUERIES), operator and exchange memory may use 
fallback when pool is
+   * insufficient. Set from analysis via {@link 
#setNeedSetHighestPriority(boolean)}.
+   */
+  private boolean needSetHighestPriority = false;
+
   @TestOnly
   public MPPQueryContext(QueryId queryId) {
     this.queryId = queryId;
@@ -406,6 +412,14 @@ public class MPPQueryContext {
     this.userQuery = userQuery;
   }
 
+  public boolean needSetHighestPriority() {
+    return needSetHighestPriority;
+  }
+
+  public void setNeedSetHighestPriority(boolean needSetHighestPriority) {
+    this.needSetHighestPriority = needSetHighestPriority;
+  }
+
   public String getClientHostName() {
     if (session == null || session.getCliHostname() == null) {
       return "UNKNOWN";
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java
index ac0b4111090..61dc05d4bfd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
 import 
org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
+import org.apache.iotdb.commons.utils.TestOnly;
 import 
org.apache.iotdb.db.queryengine.exception.exchange.GetTsBlockFromClosedOrAbortedChannelException;
 import org.apache.iotdb.db.queryengine.execution.driver.DriverContext;
 import 
org.apache.iotdb.db.queryengine.execution.exchange.sink.DownStreamChannelIndex;
@@ -656,7 +657,11 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
       }
       queue =
           new SharedTsBlockQueue(
-              localFragmentInstanceId, localPlanNodeId, localMemoryManager, 
executorService);
+              localFragmentInstanceId,
+              localPlanNodeId,
+              localMemoryManager,
+              executorService,
+              instanceContext.isHighestPriority());
     }
 
     return new LocalSinkChannel(
@@ -680,7 +685,8 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
             driverContext.getDriverTaskID().getFragmentInstanceId().toThrift(),
             planNodeId,
             localMemoryManager,
-            executorService);
+            executorService,
+            driverContext.getFragmentInstanceContext().isHighestPriority());
     queue.allowAddingTsBlock();
     return new LocalSinkChannel(
         queue,
@@ -718,6 +724,7 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
         tsBlockSerdeFactory.get(),
         new ISinkChannelListenerImpl(
             localFragmentInstanceId, instanceContext, instanceContext::failed, 
cnt),
+        instanceContext.isHighestPriority(),
         mppDataExchangeServiceClientManager);
   }
 
@@ -802,6 +809,7 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
         context.getDriverTaskID().toString());
   }
 
+  @TestOnly
   public synchronized ISourceHandle createLocalSourceHandleForFragment(
       TFragmentInstanceId localFragmentInstanceId,
       String localPlanNodeId,
@@ -809,6 +817,24 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
       TFragmentInstanceId remoteFragmentInstanceId,
       int index,
       IMPPDataExchangeManagerCallback<Throwable> onFailureCallback) {
+    return createLocalSourceHandleForFragment(
+        localFragmentInstanceId,
+        localPlanNodeId,
+        remotePlanNodeId,
+        remoteFragmentInstanceId,
+        index,
+        onFailureCallback,
+        false);
+  }
+
+  public synchronized ISourceHandle createLocalSourceHandleForFragment(
+      TFragmentInstanceId localFragmentInstanceId,
+      String localPlanNodeId,
+      String remotePlanNodeId,
+      TFragmentInstanceId remoteFragmentInstanceId,
+      int index,
+      IMPPDataExchangeManagerCallback<Throwable> onFailureCallback,
+      boolean isHighestPriority) {
     if (sourceHandles.containsKey(localFragmentInstanceId)
         && 
sourceHandles.get(localFragmentInstanceId).containsKey(localPlanNodeId)) {
       throw new IllegalStateException(
@@ -840,7 +866,11 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
       }
       queue =
           new SharedTsBlockQueue(
-              remoteFragmentInstanceId, remotePlanNodeId, localMemoryManager, 
executorService);
+              remoteFragmentInstanceId,
+              remotePlanNodeId,
+              localMemoryManager,
+              executorService,
+              isHighestPriority);
     }
     LocalSourceHandle localSourceHandle =
         new LocalSourceHandle(
@@ -854,6 +884,7 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
     return localSourceHandle;
   }
 
+  @TestOnly
   @Override
   public ISourceHandle createSourceHandle(
       TFragmentInstanceId localFragmentInstanceId,
@@ -862,6 +893,24 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
       TEndPoint remoteEndpoint,
       TFragmentInstanceId remoteFragmentInstanceId,
       IMPPDataExchangeManagerCallback<Throwable> onFailureCallback) {
+    return createSourceHandle(
+        localFragmentInstanceId,
+        localPlanNodeId,
+        indexOfUpstreamSinkHandle,
+        remoteEndpoint,
+        remoteFragmentInstanceId,
+        onFailureCallback,
+        false);
+  }
+
+  public ISourceHandle createSourceHandle(
+      TFragmentInstanceId localFragmentInstanceId,
+      String localPlanNodeId,
+      int indexOfUpstreamSinkHandle,
+      TEndPoint remoteEndpoint,
+      TFragmentInstanceId remoteFragmentInstanceId,
+      IMPPDataExchangeManagerCallback<Throwable> onFailureCallback,
+      boolean isHighestPriority) {
     Map<String, ISourceHandle> sourceHandleMap = 
sourceHandles.get(localFragmentInstanceId);
     if (sourceHandleMap != null && 
sourceHandleMap.containsKey(localPlanNodeId)) {
       throw new IllegalStateException(
@@ -891,6 +940,7 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
             executorService,
             tsBlockSerdeFactory.get(),
             new SourceHandleListenerImpl(onFailureCallback),
+            isHighestPriority,
             mppDataExchangeServiceClientManager);
     sourceHandles
         .computeIfAbsent(localFragmentInstanceId, key -> new 
ConcurrentHashMap<>())
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
index 555cf9efe5a..f4c21e2fdfa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
@@ -19,11 +19,13 @@
 
 package org.apache.iotdb.db.queryengine.execution.exchange;
 
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
 import 
org.apache.iotdb.db.queryengine.execution.exchange.sink.LocalSinkChannel;
 import 
org.apache.iotdb.db.queryengine.execution.exchange.source.LocalSourceHandle;
 import org.apache.iotdb.db.queryengine.execution.memory.LocalMemoryManager;
+import 
org.apache.iotdb.db.queryengine.execution.memory.MemoryPool.MemoryReservationResult;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 
 import com.google.common.util.concurrent.ListenableFuture;
@@ -62,7 +64,7 @@ public class SharedTsBlockQueue {
 
   private long bufferRetainedSizeInBytes = 0L;
 
-  private final Queue<TsBlock> queue = new LinkedList<>();
+  private final Queue<Pair<TsBlock, Long>> queue = new LinkedList<>();
 
   private SettableFuture<Void> blocked = SettableFuture.create();
 
@@ -82,17 +84,28 @@ public class SharedTsBlockQueue {
 
   private long maxBytesCanReserve =
       
IoTDBDescriptor.getInstance().getConfig().getMaxBytesPerFragmentInstance();
+  private final boolean isHighestPriority;
 
   private volatile Throwable abortedCause = null;
 
   // used for SharedTsBlockQueue listener
   private final ExecutorService executorService;
 
+  @TestOnly
   public SharedTsBlockQueue(
       TFragmentInstanceId fragmentInstanceId,
       String planNodeId,
       LocalMemoryManager localMemoryManager,
       ExecutorService executorService) {
+    this(fragmentInstanceId, planNodeId, localMemoryManager, executorService, 
false);
+  }
+
+  public SharedTsBlockQueue(
+      TFragmentInstanceId fragmentInstanceId,
+      String planNodeId,
+      LocalMemoryManager localMemoryManager,
+      ExecutorService executorService,
+      boolean isHighestPriority) {
     this.localFragmentInstanceId =
         Validate.notNull(fragmentInstanceId, "fragment instance ID cannot be 
null");
     this.fullFragmentInstanceId =
@@ -101,6 +114,7 @@ public class SharedTsBlockQueue {
     this.localMemoryManager =
         Validate.notNull(localMemoryManager, "local memory manager cannot be 
null");
     this.executorService = Validate.notNull(executorService, "ExecutorService 
can not be null.");
+    this.isHighestPriority = isHighestPriority;
   }
 
   public boolean hasNoMoreTsBlocks() {
@@ -195,15 +209,18 @@ public class SharedTsBlockQueue {
       }
       throw new IllegalStateException("queue has been destroyed");
     }
-    TsBlock tsBlock = queue.remove();
-    localMemoryManager
-        .getQueryPool()
-        .free(
-            localFragmentInstanceId.getQueryId(),
-            fullFragmentInstanceId,
-            localPlanNodeId,
-            tsBlock.getSizeInBytes());
-    bufferRetainedSizeInBytes -= tsBlock.getSizeInBytes();
+    Pair<TsBlock, Long> tsBlockWithReservedBytes = queue.remove();
+    long reservedBytes = tsBlockWithReservedBytes.right;
+    if (reservedBytes > 0) {
+      localMemoryManager
+          .getQueryPool()
+          .free(
+              localFragmentInstanceId.getQueryId(),
+              fullFragmentInstanceId,
+              localPlanNodeId,
+              reservedBytes);
+      bufferRetainedSizeInBytes -= reservedBytes;
+    }
     // Every time LocalSourceHandle consumes a TsBlock, it needs to send the 
event
     // to
     // corresponding LocalSinkChannel.
@@ -213,7 +230,7 @@ public class SharedTsBlockQueue {
     if (blocked.isDone() && queue.isEmpty() && !noMoreTsBlocks) {
       blocked = SettableFuture.create();
     }
-    return tsBlock;
+    return tsBlockWithReservedBytes.left;
   }
 
   /**
@@ -236,20 +253,22 @@ public class SharedTsBlockQueue {
               localFragmentInstanceId.queryId, fullFragmentInstanceId, 
localPlanNodeId);
       alreadyRegistered = true;
     }
-    Pair<ListenableFuture<Void>, Boolean> pair =
+    MemoryReservationResult reserveResult =
         localMemoryManager
             .getQueryPool()
-            .reserve(
+            .reserveWithPriority(
                 localFragmentInstanceId.getQueryId(),
                 fullFragmentInstanceId,
                 localPlanNodeId,
                 tsBlock.getSizeInBytes(),
-                maxBytesCanReserve);
-    blockedOnMemory = pair.left;
-    bufferRetainedSizeInBytes += tsBlock.getSizeInBytes();
+                maxBytesCanReserve,
+                isHighestPriority);
+    blockedOnMemory = reserveResult.getFuture();
+    long reservedBytes = reserveResult.getReservedBytes();
+    bufferRetainedSizeInBytes += reservedBytes;
 
     // reserve memory failed, we should wait until there is enough memory
-    if (!Boolean.TRUE.equals(pair.right)) {
+    if (!reserveResult.isReserveSuccess()) {
       SettableFuture<Void> channelBlocked = SettableFuture.create();
       blockedOnMemory.addListener(
           () -> {
@@ -264,7 +283,7 @@ public class SharedTsBlockQueue {
                 channelBlocked.set(null);
                 return;
               }
-              queue.add(tsBlock);
+              queue.add(new Pair<>(tsBlock, reservedBytes));
               if (!blocked.isDone()) {
                 blocked.set(null);
               }
@@ -281,7 +300,7 @@ public class SharedTsBlockQueue {
           executorService);
       return channelBlocked;
     } else { // reserve memory succeeded, add the TsBlock directly
-      queue.add(tsBlock);
+      queue.add(new Pair<>(tsBlock, reservedBytes));
       if (!blocked.isDone()) {
         blocked.set(null);
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java
index ca6fdadc993..47ce6128fb7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java
@@ -28,6 +28,7 @@ import 
org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
 import 
org.apache.iotdb.db.queryengine.exception.exchange.GetTsBlockFromClosedOrAbortedChannelException;
 import 
org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.SinkListener;
 import org.apache.iotdb.db.queryengine.execution.memory.LocalMemoryManager;
+import 
org.apache.iotdb.db.queryengine.execution.memory.MemoryPool.MemoryReservationResult;
 import org.apache.iotdb.db.queryengine.metric.DataExchangeCostMetricSet;
 import org.apache.iotdb.db.queryengine.metric.DataExchangeCountMetricSet;
 import org.apache.iotdb.db.utils.SetThreadName;
@@ -119,6 +120,8 @@ public class SinkChannel implements ISinkChannel {
   private long maxBytesCanReserve =
       
IoTDBDescriptor.getInstance().getConfig().getMaxBytesPerFragmentInstance();
 
+  private final boolean isHighestPriority;
+
   private static final DataExchangeCostMetricSet DATA_EXCHANGE_COST_METRIC_SET 
=
       DataExchangeCostMetricSet.getInstance();
   private static final DataExchangeCountMetricSet 
DATA_EXCHANGE_COUNT_METRIC_SET =
@@ -128,6 +131,34 @@ public class SinkChannel implements ISinkChannel {
       RamUsageEstimator.shallowSizeOfInstance(SinkChannel.class)
           + RamUsageEstimator.shallowSizeOfInstance(TFragmentInstanceId.class) 
* 2;
 
+  @TestOnly
+  @SuppressWarnings("squid:S107")
+  public SinkChannel(
+      TEndPoint remoteEndpoint,
+      TFragmentInstanceId remoteFragmentInstanceId,
+      String remotePlanNodeId,
+      String localPlanNodeId,
+      TFragmentInstanceId localFragmentInstanceId,
+      LocalMemoryManager localMemoryManager,
+      ExecutorService executorService,
+      TsBlockSerde serde,
+      SinkListener sinkListener,
+      IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
+          mppDataExchangeServiceClientManager) {
+    this(
+        remoteEndpoint,
+        remoteFragmentInstanceId,
+        remotePlanNodeId,
+        localPlanNodeId,
+        localFragmentInstanceId,
+        localMemoryManager,
+        executorService,
+        serde,
+        sinkListener,
+        false,
+        mppDataExchangeServiceClientManager);
+  }
+
   @SuppressWarnings("squid:S107")
   public SinkChannel(
       TEndPoint remoteEndpoint,
@@ -139,6 +170,7 @@ public class SinkChannel implements ISinkChannel {
       ExecutorService executorService,
       TsBlockSerde serde,
       SinkListener sinkListener,
+      boolean isHighestPriority,
       IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
           mppDataExchangeServiceClientManager) {
     this.remoteEndpoint = Validate.notNull(remoteEndpoint, "remoteEndPoint can 
not be null.");
@@ -155,6 +187,7 @@ public class SinkChannel implements ISinkChannel {
     this.executorService = Validate.notNull(executorService, "executorService 
can not be null.");
     this.serde = Validate.notNull(serde, "serde can not be null.");
     this.sinkListener = Validate.notNull(sinkListener, "sinkListener can not 
be null.");
+    this.isHighestPriority = isHighestPriority;
     this.mppDataExchangeServiceClientManager = 
mppDataExchangeServiceClientManager;
     this.retryIntervalInMs = DEFAULT_RETRY_INTERVAL_IN_MS;
     this.threadName =
@@ -204,21 +237,22 @@ public class SinkChannel implements ISinkChannel {
       long sizeInBytes = tsBlock.getSizeInBytes();
       int startSequenceId;
       startSequenceId = nextSequenceId;
-      blocked =
+      MemoryReservationResult reserveResult =
           localMemoryManager
               .getQueryPool()
-              .reserve(
+              .reserveWithPriority(
                   localFragmentInstanceId.getQueryId(),
                   fullFragmentInstanceId,
                   localPlanNodeId,
                   sizeInBytes,
-                  maxBytesCanReserve)
-              .left;
-      bufferRetainedSizeInBytes += sizeInBytes;
+                  maxBytesCanReserve,
+                  isHighestPriority);
+      blocked = reserveResult.getFuture();
+      bufferRetainedSizeInBytes += reserveResult.getReservedBytes();
 
       sequenceIdToTsBlock.put(nextSequenceId, new Pair<>(tsBlock, 
currentTsBlockSize));
       nextSequenceId += 1;
-      currentTsBlockSize = sizeInBytes;
+      currentTsBlockSize = reserveResult.getReservedBytes();
 
       submitSendNewDataBlockEventTask(startSequenceId, 
ImmutableList.of(sizeInBytes));
     } finally {
@@ -433,19 +467,21 @@ public class SinkChannel implements ISinkChannel {
       return;
     }
     // SinkChannel is opened when ShuffleSinkHandle choose it as the next 
channel
-    this.blocked =
+    MemoryReservationResult reserveResult =
         localMemoryManager
             .getQueryPool()
-            .reserve(
+            .reserveWithPriority(
                 localFragmentInstanceId.getQueryId(),
                 fullFragmentInstanceId,
                 localPlanNodeId,
                 DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
-                maxBytesCanReserve) // actually we only know 
maxBytesCanReserve after
-            // the handle is created, so we use DEFAULT here. It is ok to use 
DEFAULT here because
-            // at first this SinkChannel has not reserved memory.
-            .left;
-    this.bufferRetainedSizeInBytes = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+                maxBytesCanReserve,
+                isHighestPriority); // actually we only know 
maxBytesCanReserve after
+    // the handle is created, so we use DEFAULT here. It is ok to use DEFAULT 
here because
+    // at first this SinkChannel has not reserved memory.
+    this.blocked = reserveResult.getFuture();
+    this.bufferRetainedSizeInBytes = reserveResult.getReservedBytes();
+    this.currentTsBlockSize = reserveResult.getReservedBytes();
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/SourceHandle.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/SourceHandle.java
index 2d1a06fcd32..0e0b94094b5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/SourceHandle.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/SourceHandle.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
 import 
org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.SourceHandleListener;
 import org.apache.iotdb.db.queryengine.execution.memory.LocalMemoryManager;
+import 
org.apache.iotdb.db.queryengine.execution.memory.MemoryPool.MemoryReservationResult;
 import org.apache.iotdb.db.queryengine.metric.DataExchangeCostMetricSet;
 import org.apache.iotdb.db.queryengine.metric.DataExchangeCountMetricSet;
 import org.apache.iotdb.db.utils.SetThreadName;
@@ -115,6 +116,8 @@ public class SourceHandle implements ISourceHandle {
    */
   private boolean canGetTsBlockFromRemote = false;
 
+  private final boolean isHighestPriority;
+
   private static final DataExchangeCostMetricSet DATA_EXCHANGE_COST_METRIC_SET 
=
       DataExchangeCostMetricSet.getInstance();
   private static final DataExchangeCountMetricSet 
DATA_EXCHANGE_COUNT_METRIC_SET =
@@ -124,6 +127,7 @@ public class SourceHandle implements ISourceHandle {
       RamUsageEstimator.shallowSizeOfInstance(SourceHandle.class)
           + RamUsageEstimator.shallowSizeOfInstance(TFragmentInstanceId.class) 
* 2;
 
+  @TestOnly
   @SuppressWarnings("squid:S107")
   public SourceHandle(
       TEndPoint remoteEndpoint,
@@ -137,6 +141,34 @@ public class SourceHandle implements ISourceHandle {
       SourceHandleListener sourceHandleListener,
       IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
           mppDataExchangeServiceClientManager) {
+    this(
+        remoteEndpoint,
+        remoteFragmentInstanceId,
+        localFragmentInstanceId,
+        localPlanNodeId,
+        indexOfUpstreamSinkHandle,
+        localMemoryManager,
+        executorService,
+        serde,
+        sourceHandleListener,
+        false,
+        mppDataExchangeServiceClientManager);
+  }
+
+  @SuppressWarnings("squid:S107")
+  public SourceHandle(
+      TEndPoint remoteEndpoint,
+      TFragmentInstanceId remoteFragmentInstanceId,
+      TFragmentInstanceId localFragmentInstanceId,
+      String localPlanNodeId,
+      int indexOfUpstreamSinkHandle,
+      LocalMemoryManager localMemoryManager,
+      ExecutorService executorService,
+      TsBlockSerde serde,
+      SourceHandleListener sourceHandleListener,
+      boolean isHighestPriority,
+      IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
+          mppDataExchangeServiceClientManager) {
     this.remoteEndpoint = Validate.notNull(remoteEndpoint, "remoteEndpoint can 
not be null.");
     this.remoteFragmentInstanceId =
         Validate.notNull(remoteFragmentInstanceId, "remoteFragmentInstanceId 
can not be null.");
@@ -152,6 +184,7 @@ public class SourceHandle implements ISourceHandle {
     this.serde = Validate.notNull(serde, "serde can not be null.");
     this.sourceHandleListener =
         Validate.notNull(sourceHandleListener, "sourceHandleListener can not 
be null.");
+    this.isHighestPriority = isHighestPriority;
     this.bufferRetainedSizeInBytes = 0L;
     this.mppDataExchangeServiceClientManager = 
mppDataExchangeServiceClientManager;
     this.retryIntervalInMs = DEFAULT_RETRY_INTERVAL_IN_MS;
@@ -192,19 +225,24 @@ public class SourceHandle implements ISourceHandle {
       if (tsBlock == null) {
         return null;
       }
-      long retainedSize = sequenceIdToDataBlockSize.remove(currSequenceId);
+      Long retainedSize = sequenceIdToDataBlockSize.remove(currSequenceId);
+      if (retainedSize == null) {
+        throw new IllegalStateException("Reserved data block size is null.");
+      }
       if (LOGGER.isDebugEnabled()) {
         LOGGER.debug("[GetTsBlockFromBuffer] sequenceId:{}, size:{}", 
currSequenceId, retainedSize);
       }
       currSequenceId += 1;
-      bufferRetainedSizeInBytes -= retainedSize;
-      localMemoryManager
-          .getQueryPool()
-          .free(
-              localFragmentInstanceId.getQueryId(),
-              fullFragmentInstanceId,
-              localPlanNodeId,
-              retainedSize);
+      if (retainedSize > 0) {
+        bufferRetainedSizeInBytes -= retainedSize;
+        localMemoryManager
+            .getQueryPool()
+            .free(
+                localFragmentInstanceId.getQueryId(),
+                fullFragmentInstanceId,
+                localPlanNodeId,
+                retainedSize);
+      }
 
       if (sequenceIdToTsBlock.isEmpty() && !isFinished()) {
         if (LOGGER.isDebugEnabled()) {
@@ -241,18 +279,24 @@ public class SourceHandle implements ISourceHandle {
       if (bytesToReserve == null) {
         throw new IllegalStateException("Data block size is null.");
       }
-      pair =
+      MemoryReservationResult reserveResult =
           localMemoryManager
               .getQueryPool()
-              .reserve(
+              .reserveWithPriority(
                   localFragmentInstanceId.getQueryId(),
                   fullFragmentInstanceId,
                   localPlanNodeId,
                   bytesToReserve,
-                  maxBytesCanReserve);
-      bufferRetainedSizeInBytes += bytesToReserve;
+                  maxBytesCanReserve,
+                  isHighestPriority);
+      pair = new Pair<>(reserveResult.getFuture(), 
reserveResult.isReserveSuccess());
+      // actually reserve size is not equals raw size, update the actually 
reserve size to the map
+      if (reserveResult.getReservedBytes() != bytesToReserve) {
+        sequenceIdToDataBlockSize.put(endSequenceId, 
reserveResult.getReservedBytes());
+      }
+      bufferRetainedSizeInBytes += reserveResult.getReservedBytes();
       endSequenceId += 1;
-      reservedBytes += bytesToReserve;
+      reservedBytes += reserveResult.getReservedBytes();
       if (!Boolean.TRUE.equals(pair.right)) {
         blockedSize = bytesToReserve;
         break;
@@ -619,14 +663,16 @@ public class SourceHandle implements ISourceHandle {
         if (aborted || closed) {
           return;
         }
-        bufferRetainedSizeInBytes -= reservedBytes;
-        localMemoryManager
-            .getQueryPool()
-            .free(
-                localFragmentInstanceId.getQueryId(),
-                fullFragmentInstanceId,
-                localPlanNodeId,
-                reservedBytes);
+        if (reservedBytes > 0) {
+          bufferRetainedSizeInBytes -= reservedBytes;
+          localMemoryManager
+              .getQueryPool()
+              .free(
+                  localFragmentInstanceId.getQueryId(),
+                  fullFragmentInstanceId,
+                  localPlanNodeId,
+                  reservedBytes);
+        }
         sourceHandleListener.onFailure(SourceHandle.this, t);
       }
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index f655df432bf..f02b86579b4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -152,6 +152,7 @@ public class FragmentInstanceContext extends QueryContext {
   private long unclosedUnseqFileNum = 0;
   private long closedSeqFileNum = 0;
   private long closedUnseqFileNum = 0;
+  private boolean highestPriority = false;
 
   public static FragmentInstanceContext createFragmentInstanceContext(
       FragmentInstanceId id, FragmentInstanceStateMachine stateMachine, 
SessionInfo sessionInfo) {
@@ -1128,6 +1129,18 @@ public class FragmentInstanceContext extends 
QueryContext {
     return ignoreNotExistsDevice;
   }
 
+  /**
+   * Same flag as {@link
+   * 
org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis#needSetHighestPriority()}.
+   */
+  public boolean isHighestPriority() {
+    return highestPriority;
+  }
+
+  public void setHighestPriority(boolean highestPriority) {
+    this.highestPriority = highestPriority;
+  }
+
   public boolean isSingleSourcePath() {
     return singleSourcePath;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
index e8d0fd82432..9dacdf44d88 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
@@ -158,6 +158,7 @@ public class FragmentInstanceManager {
                                 dataRegion,
                                 instance.getGlobalTimePredicate(),
                                 dataNodeQueryContextMap));
+                context.setHighestPriority(instance.isHighestPriority());
 
                 try {
                   List<PipelineDriverFactory> driverFactories =
@@ -259,6 +260,7 @@ public class FragmentInstanceManager {
                       fragmentInstanceId ->
                           createFragmentInstanceContext(
                               fragmentInstanceId, stateMachine, 
instance.getSessionInfo()));
+              context.setHighestPriority(instance.isHighestPriority());
 
               try {
                 List<PipelineDriverFactory> driverFactories =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
index a9ab6ed5d81..9a00d5a87de 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
@@ -23,10 +23,8 @@ import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.exception.runtime.MemoryLeakException;
 
 import com.google.common.util.concurrent.AbstractFuture;
-import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.commons.lang3.Validate;
-import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,6 +39,8 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
+import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
+
 /** A thread-safe memory pool. */
 public class MemoryPool {
 
@@ -111,6 +111,31 @@ public class MemoryPool {
     }
   }
 
+  public static class MemoryReservationResult {
+    private final ListenableFuture<Void> future;
+    private final boolean reserveSuccess;
+    private final long reservedBytes;
+
+    public MemoryReservationResult(
+        ListenableFuture<Void> future, boolean reserveSuccess, long 
reservedBytes) {
+      this.future = future;
+      this.reserveSuccess = reserveSuccess;
+      this.reservedBytes = reservedBytes;
+    }
+
+    public ListenableFuture<Void> getFuture() {
+      return future;
+    }
+
+    public boolean isReserveSuccess() {
+      return reserveSuccess;
+    }
+
+    public long getReservedBytes() {
+      return reservedBytes;
+    }
+  }
+
   private final String id;
   private final long maxBytes;
   private final long maxBytesPerFragmentInstance;
@@ -220,18 +245,20 @@ public class MemoryPool {
   }
 
   /**
-   * Reserve memory with bytesToReserve.
+   * Reserve memory with bytesToReserve respect priority.
    *
-   * @return if reserve succeed, pair.right will be true, otherwise false
+   * @return if reserve succeed, reservedBytes may be zero or equals with 
bytesToReserve; if reserve
+   *     failed, reservedBytes must be equals with bytesToReserve
    * @throws IllegalArgumentException throw exception if current query 
requests more memory than can
    *     be allocated.
    */
-  public Pair<ListenableFuture<Void>, Boolean> reserve(
+  public MemoryReservationResult reserveWithPriority(
       String queryId,
       String fragmentInstanceId,
       String planNodeId,
       long bytesToReserve,
-      long maxBytesCanReserve) {
+      long maxBytesCanReserve,
+      boolean isHighestPriority) {
     Validate.notNull(queryId, "queryId can not be null.");
     Validate.notNull(fragmentInstanceId, "fragmentInstanceId can not be 
null.");
     Validate.notNull(planNodeId, "planNodeId can not be null.");
@@ -250,19 +277,21 @@ public class MemoryPool {
           "Query is aborted since it requests more memory than can be 
allocated.");
     }
 
-    ListenableFuture<Void> result;
     if (tryReserve(queryId, fragmentInstanceId, planNodeId, bytesToReserve, 
maxBytesCanReserve)) {
-      result = Futures.immediateFuture(null);
-      return new Pair<>(result, Boolean.TRUE);
+      return new MemoryReservationResult(immediateVoidFuture(), true, 
bytesToReserve);
     } else {
+      rollbackReserve(queryId, fragmentInstanceId, planNodeId, bytesToReserve);
+      if (isHighestPriority) {
+        // SHOW QUERIES: treat as success with zero bytes reserved from pool 
when insufficient.
+        return new MemoryReservationResult(immediateVoidFuture(), true, 0L);
+      }
       LOGGER.debug(
           "Blocked reserve request: {} bytes memory for planNodeId{}", 
bytesToReserve, planNodeId);
-      rollbackReserve(queryId, fragmentInstanceId, planNodeId, bytesToReserve);
-      result =
+      ListenableFuture<Void> result =
           MemoryReservationFuture.create(
               queryId, fragmentInstanceId, planNodeId, bytesToReserve, 
maxBytesCanReserve);
       memoryReservationFutures.add((MemoryReservationFuture<Void>) result);
-      return new Pair<>(result, Boolean.FALSE);
+      return new MemoryReservationResult(result, false, bytesToReserve);
     }
   }
 
@@ -293,7 +322,8 @@ public class MemoryPool {
   /**
    * Cancel the specified memory reservation. If the reservation has finished, 
do nothing.
    *
-   * @param future The future returned from {@link #reserve(String, String, 
String, long, long)}
+   * @param future The future returned from {@link 
#reserveWithPriority(String, String, String,
+   *     long, long, boolean)}
    * @return If the future has not complete, return the number of bytes being 
reserved. Otherwise,
    *     return 0.
    */
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
index 01d1020934d..63b2d379ddd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
@@ -503,7 +503,7 @@ public class Analysis implements IAnalysis {
   public boolean needSetHighestPriority() {
     // if is this Statement is ShowQueryStatement, set its instances to the 
highest priority, so
     // that the sub-tasks of the ShowQueries instances could be executed first.
-    return StatementType.SHOW_QUERIES.equals(statement.getType());
+    return statement != null && 
StatementType.SHOW_QUERIES.equals(statement.getType());
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
index 8de6165525e..5af143bfca8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
@@ -125,6 +125,7 @@ public class QueryExecution implements IQueryExecution {
     this.context = context;
     this.planner = planner;
     this.analysis = analyze(context);
+    context.setNeedSetHighestPriority(analysis.needSetHighestPriority());
     this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
 
     // We add the abort logic inside the QueryExecution.
@@ -587,7 +588,8 @@ public class QueryExecution implements IQueryExecution {
                     
context.getResultNodeContext().getUpStreamPlanNodeId().getId(),
                     
context.getResultNodeContext().getUpStreamFragmentInstanceId().toThrift(),
                     0, // Upstream of result ExchangeNode will only have one 
child.
-                    stateMachine::transitionToFailed)
+                    stateMachine::transitionToFailed,
+                    context.needSetHighestPriority())
             : MPPDataExchangeService.getInstance()
                 .getMPPDataExchangeManager()
                 .createSourceHandle(
@@ -596,7 +598,8 @@ public class QueryExecution implements IQueryExecution {
                     0,
                     upstreamEndPoint,
                     
context.getResultNodeContext().getUpStreamFragmentInstanceId().toThrift(),
-                    stateMachine::transitionToFailed);
+                    stateMachine::transitionToFailed,
+                    context.needSetHighestPriority());
   }
 
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity 
warning
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index e0aeacb7f56..0649f808c4f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -2624,14 +2624,16 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                 node.getUpstreamPlanNodeId().getId(),
                 remoteInstanceId.toThrift(),
                 node.getIndexOfUpstreamSinkHandle(),
-                context.getInstanceContext()::failed)
+                context.getInstanceContext()::failed,
+                context.getInstanceContext().isHighestPriority())
             : MPP_DATA_EXCHANGE_MANAGER.createSourceHandle(
                 localInstanceId.toThrift(),
                 node.getPlanNodeId().getId(),
                 node.getIndexOfUpstreamSinkHandle(),
                 upstreamEndPoint,
                 remoteInstanceId.toThrift(),
-                context.getInstanceContext()::failed);
+                context.getInstanceContext()::failed,
+                context.getInstanceContext().isHighestPriority());
     if (!isSameNode) {
       context.addExchangeSumNum(1);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java
index d212f4ca725..b96485199ad 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java
@@ -223,6 +223,7 @@ public class FragmentInstance implements IConsensusRequest {
     fragmentInstance.hostDataNode =
         hasHostDataNode ? 
ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(buffer) : null;
     fragmentInstance.isExplainAnalyze = ReadWriteIOUtils.readBool(buffer);
+    fragmentInstance.setHighestPriority(ReadWriteIOUtils.readBool(buffer));
     return fragmentInstance;
   }
 
@@ -247,6 +248,7 @@ public class FragmentInstance implements IConsensusRequest {
         ThriftCommonsSerDeUtils.serializeTDataNodeLocation(hostDataNode, 
outputStream);
       }
       ReadWriteIOUtils.write(isExplainAnalyze, outputStream);
+      ReadWriteIOUtils.write(isHighestPriority, outputStream);
       return ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
     } catch (IOException e) {
       LOGGER.error("Unexpected error occurs when serializing this 
FragmentInstance.", e);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/LocalSinkChannelTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/LocalSinkChannelTest.java
index 8d9c4b9cd65..b3c50aaa517 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/LocalSinkChannelTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/LocalSinkChannelTest.java
@@ -92,13 +92,14 @@ public class LocalSinkChannelTest {
     Assert.assertFalse(localSinkChannel.isFinished());
     Assert.assertEquals(11 * mockTsBlockSize, 
localSinkChannel.getBufferRetainedSizeInBytes());
     Mockito.verify(spyMemoryPool, Mockito.times(11))
-        .reserve(
+        .reserveWithPriority(
             queryId,
             FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
                 remoteFragmentInstanceId),
             remotePlanNodeId,
             mockTsBlockSize,
-            Long.MAX_VALUE);
+            Long.MAX_VALUE,
+            false);
 
     // Receive TsBlocks.
     int numOfReceivedTsblocks = 0;
@@ -184,13 +185,14 @@ public class LocalSinkChannelTest {
     Assert.assertFalse(localSinkChannel.isFinished());
     Assert.assertEquals(11 * mockTsBlockSize, 
localSinkChannel.getBufferRetainedSizeInBytes());
     Mockito.verify(spyMemoryPool, Mockito.times(11))
-        .reserve(
+        .reserveWithPriority(
             queryId,
             FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
                 remoteFragmentInstanceId),
             remotePlanNodeId,
             mockTsBlockSize,
-            Long.MAX_VALUE);
+            Long.MAX_VALUE,
+            false);
 
     // Abort.
     localSinkChannel.abort();
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueueTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueueTest.java
index 46196d1c990..a95fd35ba76 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueueTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueueTest.java
@@ -21,13 +21,13 @@ package org.apache.iotdb.db.queryengine.execution.exchange;
 
 import org.apache.iotdb.db.queryengine.execution.memory.LocalMemoryManager;
 import org.apache.iotdb.db.queryengine.execution.memory.MemoryPool;
+import 
org.apache.iotdb.db.queryengine.execution.memory.MemoryPool.MemoryReservationResult;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import org.apache.commons.lang3.Validate;
 import org.apache.tsfile.read.common.block.TsBlock;
-import org.apache.tsfile.utils.Pair;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -62,15 +62,16 @@ public class SharedTsBlockQueueTest {
     MemoryPool mockMemoryPool = Mockito.mock(MemoryPool.class);
     
Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(mockMemoryPool);
 
-    // reserve() returns (manualFuture, false) — simulating memory blocked
+    // reserveWithPriority() returns blocked future and reserve failure.
     Mockito.when(
-            mockMemoryPool.reserve(
+            mockMemoryPool.reserveWithPriority(
                 Mockito.anyString(),
                 Mockito.anyString(),
                 Mockito.anyString(),
                 Mockito.anyLong(),
-                Mockito.anyLong()))
-        .thenReturn(new Pair<>(manualFuture, Boolean.FALSE));
+                Mockito.anyLong(),
+                Mockito.anyBoolean()))
+        .thenReturn(new MemoryReservationResult(manualFuture, false, 1024L));
     // tryCancel returns 0 — simulating future already completed (can't cancel)
     Mockito.when(mockMemoryPool.tryCancel(Mockito.any())).thenReturn(0L);
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/ShuffleSinkHandleTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/ShuffleSinkHandleTest.java
index 489ec5c9ed6..4190c2fa61a 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/ShuffleSinkHandleTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/ShuffleSinkHandleTest.java
@@ -104,13 +104,14 @@ public class ShuffleSinkHandleTest {
     Assert.assertFalse(localSinkChannel.isFinished());
     Assert.assertEquals(11 * mockTsBlockSize, 
localSinkChannel.getBufferRetainedSizeInBytes());
     Mockito.verify(spyMemoryPool, Mockito.times(11))
-        .reserve(
+        .reserveWithPriority(
             queryId,
             FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
                 remoteFragmentInstanceId),
             remotePlanNodeId,
             mockTsBlockSize,
-            Long.MAX_VALUE);
+            Long.MAX_VALUE,
+            false);
 
     // Abort.
     shuffleSinkHandle.abort();
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SourceHandleTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SourceHandleTest.java
index ee849337206..40721c93db7 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SourceHandleTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SourceHandleTest.java
@@ -254,13 +254,14 @@ public class SourceHandleTest {
             .collect(Collectors.toList()));
     try {
       Mockito.verify(spyMemoryPool, Mockito.timeout(10_000).times(6))
-          .reserve(
+          .reserveWithPriority(
               queryId,
               
FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
                   localFragmentInstanceId),
               localPlanNodeId,
               MOCK_TSBLOCK_SIZE,
-              maxBytesCanReserve);
+              maxBytesCanReserve,
+              false);
       Mockito.verify(mockClient, Mockito.timeout(10_0000).times(1))
           .getDataBlock(
               Mockito.argThat(
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/Utils.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/Utils.java
index 327d4a34c39..b09498ad949 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/Utils.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/Utils.java
@@ -20,11 +20,11 @@
 package org.apache.iotdb.db.queryengine.execution.exchange;
 
 import org.apache.iotdb.db.queryengine.execution.memory.MemoryPool;
+import 
org.apache.iotdb.db.queryengine.execution.memory.MemoryPool.MemoryReservationResult;
 
 import com.google.common.util.concurrent.SettableFuture;
 import org.apache.tsfile.read.common.block.TsBlock;
 import org.apache.tsfile.read.common.block.column.TsBlockSerde;
-import org.apache.tsfile.utils.Pair;
 import org.mockito.Mockito;
 import org.mockito.stubbing.Answer;
 
@@ -68,21 +68,25 @@ public class Utils {
     settableFuture.get().set(null);
     AtomicReference<Long> reservedBytes = new AtomicReference<>(0L);
     Mockito.when(
-            mockMemoryPool.reserve(
+            mockMemoryPool.reserveWithPriority(
                 Mockito.eq(queryId),
                 Mockito.eq(fragmentInstanceId),
                 Mockito.eq(planNodeId),
                 Mockito.anyLong(),
-                Mockito.anyLong()))
+                Mockito.anyLong(),
+                Mockito.anyBoolean()))
         .thenAnswer(
             invocation -> {
               long bytesToReserve = invocation.getArgument(3);
               if (reservedBytes.get() + bytesToReserve <= capacityInBytes) {
-                reservedBytes.updateAndGet(v -> v + (long) 
invocation.getArgument(3));
-                return new Pair<>(settableFuture.get(), true);
+                reservedBytes.updateAndGet(v -> v + bytesToReserve);
+                return new MemoryReservationResult(settableFuture.get(), true, 
bytesToReserve);
               } else {
+                if (invocation.getArgument(5)) {
+                  return new MemoryReservationResult(settableFuture.get(), 
true, 0L);
+                }
                 settableFuture.set(SettableFuture.create());
-                return new Pair<>(settableFuture.get(), false);
+                return new MemoryReservationResult(settableFuture.get(), 
false, bytesToReserve);
               }
             });
     Mockito.doAnswer(
@@ -124,13 +128,17 @@ public class Utils {
   public static MemoryPool createMockNonBlockedMemoryPool() {
     MemoryPool mockMemoryPool = Mockito.mock(MemoryPool.class);
     Mockito.when(
-            mockMemoryPool.reserve(
+            mockMemoryPool.reserveWithPriority(
                 Mockito.anyString(),
                 Mockito.anyString(),
                 Mockito.anyString(),
                 Mockito.anyLong(),
-                Mockito.anyLong()))
-        .thenReturn(new Pair<>(immediateFuture(null), true));
+                Mockito.anyLong(),
+                Mockito.anyBoolean()))
+        .thenAnswer(
+            invocation ->
+                new MemoryReservationResult(
+                    immediateFuture(null), true, invocation.getArgument(3)));
     Mockito.when(
             mockMemoryPool.tryReserve(
                 Mockito.anyString(),
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPoolTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPoolTest.java
index bae3c0643f4..cf7e15dc71f 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPoolTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPoolTest.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.queryengine.execution.memory;
 
+import 
org.apache.iotdb.db.queryengine.execution.memory.MemoryPool.MemoryReservationResult;
+
 import com.google.common.util.concurrent.ListenableFuture;
 import org.junit.Assert;
 import org.junit.Before;
@@ -94,7 +96,9 @@ public class MemoryPoolTest {
   public void testReserve() {
 
     ListenableFuture<Void> future =
-        pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L, 
Long.MAX_VALUE).left;
+        pool.reserveWithPriority(
+                QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L, 
Long.MAX_VALUE, false)
+            .getFuture();
     Assert.assertTrue(future.isDone());
     Assert.assertEquals(256L, pool.getQueryMemoryReservedBytes(QUERY_ID));
     Assert.assertEquals(256L, pool.getReservedBytes());
@@ -104,7 +108,8 @@ public class MemoryPoolTest {
   public void tesReserveZero() {
 
     try {
-      pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 0L, 
Long.MAX_VALUE);
+      pool.reserveWithPriority(
+          QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 0L, Long.MAX_VALUE, 
false);
       Assert.fail("Expect IllegalArgumentException");
     } catch (IllegalArgumentException ignore) {
     }
@@ -114,7 +119,8 @@ public class MemoryPoolTest {
   public void testReserveNegative() {
 
     try {
-      pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, -1L, 
Long.MAX_VALUE);
+      pool.reserveWithPriority(
+          QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, -1L, Long.MAX_VALUE, 
false);
       Assert.fail("Expect IllegalArgumentException");
     } catch (IllegalArgumentException ignore) {
     }
@@ -124,7 +130,9 @@ public class MemoryPoolTest {
   public void testReserveAll() {
 
     ListenableFuture<Void> future =
-        pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, 
Long.MAX_VALUE).left;
+        pool.reserveWithPriority(
+                QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, 
Long.MAX_VALUE, false)
+            .getFuture();
     Assert.assertTrue(future.isDone());
     Assert.assertEquals(512L, pool.getQueryMemoryReservedBytes(QUERY_ID));
     Assert.assertEquals(512L, pool.getReservedBytes());
@@ -134,11 +142,15 @@ public class MemoryPoolTest {
   public void testOverReserve() {
 
     ListenableFuture<Void> future =
-        pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L, 
Long.MAX_VALUE).left;
+        pool.reserveWithPriority(
+                QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L, 
Long.MAX_VALUE, false)
+            .getFuture();
     Assert.assertTrue(future.isDone());
     Assert.assertEquals(256L, pool.getQueryMemoryReservedBytes(QUERY_ID));
     Assert.assertEquals(256L, pool.getReservedBytes());
-    future = pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, 
513L).left;
+    future =
+        pool.reserveWithPriority(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 
512L, 513L, false)
+            .getFuture();
     Assert.assertFalse(future.isDone());
     Assert.assertEquals(256L, pool.getQueryMemoryReservedBytes(QUERY_ID));
     Assert.assertEquals(256L, pool.getReservedBytes());
@@ -148,11 +160,13 @@ public class MemoryPoolTest {
   public void testReserveAndFree() {
 
     Assert.assertTrue(
-        pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, 
Long.MAX_VALUE)
-            .left
+        pool.reserveWithPriority(
+                QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, 
Long.MAX_VALUE, false)
+            .getFuture()
             .isDone());
     ListenableFuture<Void> future =
-        pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, 
513L).left;
+        pool.reserveWithPriority(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 
512L, 513L, false)
+            .getFuture();
     Assert.assertFalse(future.isDone());
     Assert.assertEquals(512L, pool.getQueryMemoryReservedBytes(QUERY_ID));
     Assert.assertEquals(512L, pool.getReservedBytes());
@@ -166,18 +180,22 @@ public class MemoryPoolTest {
   public void testMultiReserveAndFree() {
 
     Assert.assertTrue(
-        pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L, 
Long.MAX_VALUE)
-            .left
+        pool.reserveWithPriority(
+                QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L, 
Long.MAX_VALUE, false)
+            .getFuture()
             .isDone());
     Assert.assertEquals(256L, pool.getQueryMemoryReservedBytes(QUERY_ID));
     Assert.assertEquals(256L, pool.getReservedBytes());
 
     ListenableFuture<Void> future1 =
-        pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, 
513L).left;
+        pool.reserveWithPriority(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 
512L, 513L, false)
+            .getFuture();
     ListenableFuture<Void> future2 =
-        pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, 
513L).left;
+        pool.reserveWithPriority(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 
512L, 513L, false)
+            .getFuture();
     ListenableFuture<Void> future3 =
-        pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, 
513L).left;
+        pool.reserveWithPriority(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 
512L, 513L, false)
+            .getFuture();
     Assert.assertFalse(future1.isDone());
     Assert.assertFalse(future2.isDone());
     Assert.assertFalse(future3.isDone());
@@ -284,7 +302,8 @@ public class MemoryPoolTest {
         pool.tryReserveForTest(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 
512L, Long.MAX_VALUE));
 
     ListenableFuture<Void> f =
-        pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L, 
512L).left;
+        pool.reserveWithPriority(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 
256L, 512L, false)
+            .getFuture();
     Assert.assertFalse(f.isDone());
     // Cancel the reservation.
     Assert.assertEquals(256L, pool.tryCancel(f));
@@ -296,11 +315,76 @@ public class MemoryPoolTest {
   public void testTryCancelCompletedReservation() {
 
     ListenableFuture<Void> f =
-        pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L, 
Long.MAX_VALUE).left;
+        pool.reserveWithPriority(
+                QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L, 
Long.MAX_VALUE, false)
+            .getFuture();
     Assert.assertTrue(f.isDone());
     // Cancel the reservation.
     Assert.assertEquals(0L, pool.tryCancel(f));
     Assert.assertTrue(f.isDone());
     Assert.assertFalse(f.isCancelled());
   }
+
+  /**
+   * Normal query: requested bytes exceed what the pool can still provide — 
reserve fails (blocked
+   * future, not immediate success).
+   */
+  @Test
+  public void testReserveWithPriorityNormalQueryExceedsAvailable() {
+    MemoryReservationResult r1 =
+        pool.reserveWithPriority(
+            QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, 
Long.MAX_VALUE, false);
+    Assert.assertTrue(r1.isReserveSuccess());
+    Assert.assertEquals(512L, r1.getReservedBytes());
+    Assert.assertTrue(r1.getFuture().isDone());
+
+    MemoryReservationResult r2 =
+        pool.reserveWithPriority(
+            QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, 
Long.MAX_VALUE, false);
+    Assert.assertTrue(r2.isReserveSuccess());
+    Assert.assertEquals(512L, r2.getReservedBytes());
+    Assert.assertEquals(1024L, pool.getReservedBytes());
+
+    MemoryReservationResult r3 =
+        pool.reserveWithPriority(
+            QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L, 
Long.MAX_VALUE, false);
+    Assert.assertFalse(r3.isReserveSuccess());
+    Assert.assertEquals(256L, r3.getReservedBytes());
+    Assert.assertFalse(r3.getFuture().isDone());
+    Assert.assertEquals(1024L, pool.getReservedBytes());
+  }
+
+  /** SHOW QUERIES path: exceeds pool capacity — treated as success with zero 
bytes from pool. */
+  @Test
+  public void testReserveWithPriorityShowQueriesExceedsAvailable() {
+    Assert.assertTrue(
+        pool.reserveWithPriority(
+                QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, 
Long.MAX_VALUE, false)
+            .isReserveSuccess());
+    Assert.assertTrue(
+        pool.reserveWithPriority(
+                QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, 
Long.MAX_VALUE, false)
+            .isReserveSuccess());
+    Assert.assertEquals(1024L, pool.getReservedBytes());
+
+    MemoryReservationResult r =
+        pool.reserveWithPriority(
+            QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L, 
Long.MAX_VALUE, true);
+    Assert.assertTrue(r.isReserveSuccess());
+    Assert.assertEquals(0L, r.getReservedBytes());
+    Assert.assertTrue(r.getFuture().isDone());
+    Assert.assertEquals(1024L, pool.getReservedBytes());
+  }
+
+  /** SHOW QUERIES path: pool has room — same as normal successful reserve. */
+  @Test
+  public void testReserveWithPriorityShowQueriesWithinAvailable() {
+    MemoryReservationResult r =
+        pool.reserveWithPriority(
+            QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L, 
Long.MAX_VALUE, true);
+    Assert.assertTrue(r.isReserveSuccess());
+    Assert.assertEquals(256L, r.getReservedBytes());
+    Assert.assertTrue(r.getFuture().isDone());
+    Assert.assertEquals(256L, pool.getReservedBytes());
+  }
 }

Reply via email to