This is an automated email from the ASF dual-hosted git repository.

weihao pushed a commit to branch lwh/SQ-mp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit be98df9ad39432a11bcf307dfaef64202a1b05da
Author: Weihao Li <[email protected]>
AuthorDate: Thu Apr 16 15:20:43 2026 +0800

    draft
    
    Signed-off-by: Weihao Li <[email protected]>
---
 .../db/queryengine/common/MPPQueryContext.java     | 14 ++++
 .../execution/exchange/MPPDataExchangeManager.java | 53 ++++++++++++-
 .../execution/exchange/SharedTsBlockQueue.java     | 55 ++++++++-----
 .../execution/exchange/sink/SinkChannel.java       | 61 +++++++++++----
 .../execution/exchange/source/SourceHandle.java    | 89 ++++++++++++++++------
 .../fragment/FragmentInstanceContext.java          | 13 ++++
 .../fragment/FragmentInstanceManager.java          |  2 +
 .../queryengine/execution/memory/MemoryPool.java   | 58 ++++++++++++--
 .../queryengine/plan/execution/QueryExecution.java |  7 +-
 .../plan/planner/OperatorTreeGenerator.java        |  6 +-
 .../plan/planner/TableOperatorGenerator.java       |  6 +-
 11 files changed, 293 insertions(+), 71 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
index 88bd1998f68..4da4810f76a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
@@ -129,6 +129,12 @@ public class MPPQueryContext implements IAuditEntity {
 
   private boolean userQuery = false;
 
+  /**
+   * When true (e.g. SHOW QUERIES), operator and exchange memory may use 
fallback when pool is
+   * insufficient. Set from analysis via {@link 
#setNeedSetHighestPriority(boolean)}.
+   */
+  private boolean needSetHighestPriority = false;
+
   private boolean debug = false;
 
   private Map<NodeRef<Table>, Query> cteQueries = new HashMap<>();
@@ -507,6 +513,14 @@ public class MPPQueryContext implements IAuditEntity {
     this.userQuery = userQuery;
   }
 
+  public boolean needSetHighestPriority() {
+    return needSetHighestPriority;
+  }
+
+  public void setNeedSetHighestPriority(boolean needSetHighestPriority) {
+    this.needSetHighestPriority = needSetHighestPriority;
+  }
+
   public boolean isDebug() {
     return debug;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java
index d2caf330d66..f72bb12f7f0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java
@@ -656,7 +656,11 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
       }
       queue =
           new SharedTsBlockQueue(
-              localFragmentInstanceId, localPlanNodeId, localMemoryManager, 
executorService);
+              localFragmentInstanceId,
+              localPlanNodeId,
+              localMemoryManager,
+              executorService,
+              instanceContext.isHighestPriority());
     }
 
     return new LocalSinkChannel(
@@ -680,7 +684,8 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
             driverContext.getDriverTaskID().getFragmentInstanceId().toThrift(),
             planNodeId,
             localMemoryManager,
-            executorService);
+            executorService,
+            driverContext.getFragmentInstanceContext().isHighestPriority());
     queue.allowAddingTsBlock();
     return new LocalSinkChannel(
         queue,
@@ -718,6 +723,7 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
         tsBlockSerdeFactory.get(),
         new ISinkChannelListenerImpl(
             localFragmentInstanceId, instanceContext, instanceContext::failed, 
cnt),
+        instanceContext.isHighestPriority(),
         mppDataExchangeServiceClientManager);
   }
 
@@ -809,6 +815,24 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
       TFragmentInstanceId remoteFragmentInstanceId,
       int index,
       IMPPDataExchangeManagerCallback<Throwable> onFailureCallback) {
+    return createLocalSourceHandleForFragment(
+        localFragmentInstanceId,
+        localPlanNodeId,
+        remotePlanNodeId,
+        remoteFragmentInstanceId,
+        index,
+        onFailureCallback,
+        false);
+  }
+
+  public synchronized ISourceHandle createLocalSourceHandleForFragment(
+      TFragmentInstanceId localFragmentInstanceId,
+      String localPlanNodeId,
+      String remotePlanNodeId,
+      TFragmentInstanceId remoteFragmentInstanceId,
+      int index,
+      IMPPDataExchangeManagerCallback<Throwable> onFailureCallback,
+      boolean isHighestPriority) {
     if (sourceHandles.containsKey(localFragmentInstanceId)
         && 
sourceHandles.get(localFragmentInstanceId).containsKey(localPlanNodeId)) {
       throw new IllegalStateException(
@@ -840,7 +864,11 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
       }
       queue =
           new SharedTsBlockQueue(
-              remoteFragmentInstanceId, remotePlanNodeId, localMemoryManager, 
executorService);
+              remoteFragmentInstanceId,
+              remotePlanNodeId,
+              localMemoryManager,
+              executorService,
+              isHighestPriority);
     }
     LocalSourceHandle localSourceHandle =
         new LocalSourceHandle(
@@ -862,6 +890,24 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
       TEndPoint remoteEndpoint,
       TFragmentInstanceId remoteFragmentInstanceId,
       IMPPDataExchangeManagerCallback<Throwable> onFailureCallback) {
+    return createSourceHandle(
+        localFragmentInstanceId,
+        localPlanNodeId,
+        indexOfUpstreamSinkHandle,
+        remoteEndpoint,
+        remoteFragmentInstanceId,
+        onFailureCallback,
+        false);
+  }
+
+  public ISourceHandle createSourceHandle(
+      TFragmentInstanceId localFragmentInstanceId,
+      String localPlanNodeId,
+      int indexOfUpstreamSinkHandle,
+      TEndPoint remoteEndpoint,
+      TFragmentInstanceId remoteFragmentInstanceId,
+      IMPPDataExchangeManagerCallback<Throwable> onFailureCallback,
+      boolean isHighestPriority) {
     Map<String, ISourceHandle> sourceHandleMap = 
sourceHandles.get(localFragmentInstanceId);
     if (sourceHandleMap != null && 
sourceHandleMap.containsKey(localPlanNodeId)) {
       throw new IllegalStateException(
@@ -891,6 +937,7 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
             executorService,
             tsBlockSerdeFactory.get(),
             new SourceHandleListenerImpl(onFailureCallback),
+            isHighestPriority,
             mppDataExchangeServiceClientManager);
     sourceHandles
         .computeIfAbsent(localFragmentInstanceId, key -> new 
ConcurrentHashMap<>())
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
index e49efabb986..ad68956a98e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
 import 
org.apache.iotdb.db.queryengine.execution.exchange.sink.LocalSinkChannel;
 import 
org.apache.iotdb.db.queryengine.execution.exchange.source.LocalSourceHandle;
 import org.apache.iotdb.db.queryengine.execution.memory.LocalMemoryManager;
+import 
org.apache.iotdb.db.queryengine.execution.memory.MemoryPool.MemoryReservationResult;
 import org.apache.iotdb.db.utils.CommonUtils;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 
@@ -63,7 +64,7 @@ public class SharedTsBlockQueue {
 
   private long bufferRetainedSizeInBytes = 0L;
 
-  private final Queue<TsBlock> queue = new LinkedList<>();
+  private final Queue<Pair<TsBlock, Long>> queue = new LinkedList<>();
 
   private SettableFuture<Void> blocked = SettableFuture.create();
 
@@ -83,6 +84,7 @@ public class SharedTsBlockQueue {
 
   private long maxBytesCanReserve =
       
IoTDBDescriptor.getInstance().getMemoryConfig().getMaxBytesPerFragmentInstance();
+  private final boolean isHighestPriority;
 
   private volatile Throwable abortedCause = null;
 
@@ -94,6 +96,15 @@ public class SharedTsBlockQueue {
       String planNodeId,
       LocalMemoryManager localMemoryManager,
       ExecutorService executorService) {
+    this(fragmentInstanceId, planNodeId, localMemoryManager, executorService, 
false);
+  }
+
+  public SharedTsBlockQueue(
+      TFragmentInstanceId fragmentInstanceId,
+      String planNodeId,
+      LocalMemoryManager localMemoryManager,
+      ExecutorService executorService,
+      boolean isHighestPriority) {
     this.localFragmentInstanceId =
         Validate.notNull(fragmentInstanceId, "fragment instance ID cannot be 
null");
     this.fullFragmentInstanceId =
@@ -102,6 +113,7 @@ public class SharedTsBlockQueue {
     this.localMemoryManager =
         Validate.notNull(localMemoryManager, "local memory manager cannot be 
null");
     this.executorService = Validate.notNull(executorService, "ExecutorService 
can not be null.");
+    this.isHighestPriority = isHighestPriority;
   }
 
   public boolean hasNoMoreTsBlocks() {
@@ -196,15 +208,18 @@ public class SharedTsBlockQueue {
       }
       throw new IllegalStateException("queue has been destroyed");
     }
-    TsBlock tsBlock = queue.remove();
-    localMemoryManager
-        .getQueryPool()
-        .free(
-            localFragmentInstanceId.getQueryId(),
-            fullFragmentInstanceId,
-            localPlanNodeId,
-            tsBlock.getSizeInBytes());
-    bufferRetainedSizeInBytes -= tsBlock.getSizeInBytes();
+    Pair<TsBlock, Long> tsBlockWithReservedBytes = queue.remove();
+    long reservedBytes = tsBlockWithReservedBytes.right;
+    if (reservedBytes > 0) {
+      localMemoryManager
+          .getQueryPool()
+          .free(
+              localFragmentInstanceId.getQueryId(),
+              fullFragmentInstanceId,
+              localPlanNodeId,
+              reservedBytes);
+      bufferRetainedSizeInBytes -= reservedBytes;
+    }
     // Every time LocalSourceHandle consumes a TsBlock, it needs to send the 
event
     // to
     // corresponding LocalSinkChannel.
@@ -214,7 +229,7 @@ public class SharedTsBlockQueue {
     if (blocked.isDone() && queue.isEmpty() && !noMoreTsBlocks) {
       blocked = SettableFuture.create();
     }
-    return tsBlock;
+    return tsBlockWithReservedBytes.left;
   }
 
   /**
@@ -240,20 +255,22 @@ public class SharedTsBlockQueue {
               localFragmentInstanceId.queryId, fullFragmentInstanceId, 
localPlanNodeId);
       alreadyRegistered = true;
     }
-    Pair<ListenableFuture<Void>, Boolean> pair =
+    MemoryReservationResult reserveResult =
         localMemoryManager
             .getQueryPool()
-            .reserve(
+            .reserveWithPriority(
                 localFragmentInstanceId.getQueryId(),
                 fullFragmentInstanceId,
                 localPlanNodeId,
                 tsBlock.getSizeInBytes(),
-                maxBytesCanReserve);
-    blockedOnMemory = pair.left;
-    bufferRetainedSizeInBytes += tsBlock.getSizeInBytes();
+                maxBytesCanReserve,
+                isHighestPriority);
+    blockedOnMemory = reserveResult.getFuture();
+    long reservedBytes = reserveResult.getReservedBytes();
+    bufferRetainedSizeInBytes += reservedBytes;
 
     // reserve memory failed, we should wait until there is enough memory
-    if (!Boolean.TRUE.equals(pair.right)) {
+    if (!reserveResult.isReserveSuccess()) {
       SettableFuture<Void> channelBlocked = SettableFuture.create();
       blockedOnMemory.addListener(
           () -> {
@@ -268,7 +285,7 @@ public class SharedTsBlockQueue {
                 channelBlocked.set(null);
                 return;
               }
-              queue.add(tsBlock);
+              queue.add(new Pair<>(tsBlock, reservedBytes));
               if (!blocked.isDone()) {
                 blocked.set(null);
               }
@@ -285,7 +302,7 @@ public class SharedTsBlockQueue {
           executorService);
       return channelBlocked;
     } else { // reserve memory succeeded, add the TsBlock directly
-      queue.add(tsBlock);
+      queue.add(new Pair<>(tsBlock, reservedBytes));
       if (!blocked.isDone()) {
         blocked.set(null);
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java
index 9cb624e4d96..4e124c2c478 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java
@@ -28,6 +28,7 @@ import 
org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
 import 
org.apache.iotdb.db.queryengine.exception.exchange.GetTsBlockFromClosedOrAbortedChannelException;
 import 
org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.SinkListener;
 import org.apache.iotdb.db.queryengine.execution.memory.LocalMemoryManager;
+import 
org.apache.iotdb.db.queryengine.execution.memory.MemoryPool.MemoryReservationResult;
 import org.apache.iotdb.db.queryengine.metric.DataExchangeCostMetricSet;
 import org.apache.iotdb.db.queryengine.metric.DataExchangeCountMetricSet;
 import org.apache.iotdb.db.utils.SetThreadName;
@@ -119,6 +120,8 @@ public class SinkChannel implements ISinkChannel {
   private long maxBytesCanReserve =
       
IoTDBDescriptor.getInstance().getMemoryConfig().getMaxBytesPerFragmentInstance();
 
+  private final boolean isHighestPriority;
+
   private static final DataExchangeCostMetricSet DATA_EXCHANGE_COST_METRIC_SET 
=
       DataExchangeCostMetricSet.getInstance();
   private static final DataExchangeCountMetricSet 
DATA_EXCHANGE_COUNT_METRIC_SET =
@@ -141,6 +144,34 @@ public class SinkChannel implements ISinkChannel {
       SinkListener sinkListener,
       IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
           mppDataExchangeServiceClientManager) {
+    this(
+        remoteEndpoint,
+        remoteFragmentInstanceId,
+        remotePlanNodeId,
+        localPlanNodeId,
+        localFragmentInstanceId,
+        localMemoryManager,
+        executorService,
+        serde,
+        sinkListener,
+        false,
+        mppDataExchangeServiceClientManager);
+  }
+
+  @SuppressWarnings("squid:S107")
+  public SinkChannel(
+      TEndPoint remoteEndpoint,
+      TFragmentInstanceId remoteFragmentInstanceId,
+      String remotePlanNodeId,
+      String localPlanNodeId,
+      TFragmentInstanceId localFragmentInstanceId,
+      LocalMemoryManager localMemoryManager,
+      ExecutorService executorService,
+      TsBlockSerde serde,
+      SinkListener sinkListener,
+      boolean isHighestPriority,
+      IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
+          mppDataExchangeServiceClientManager) {
     this.remoteEndpoint = Validate.notNull(remoteEndpoint, "remoteEndPoint can 
not be null.");
     this.remoteFragmentInstanceId =
         Validate.notNull(remoteFragmentInstanceId, "remoteFragmentInstanceId 
can not be null.");
@@ -155,6 +186,7 @@ public class SinkChannel implements ISinkChannel {
     this.executorService = Validate.notNull(executorService, "executorService 
can not be null.");
     this.serde = Validate.notNull(serde, "serde can not be null.");
     this.sinkListener = Validate.notNull(sinkListener, "sinkListener can not 
be null.");
+    this.isHighestPriority = isHighestPriority;
     this.mppDataExchangeServiceClientManager = 
mppDataExchangeServiceClientManager;
     this.retryIntervalInMs = DEFAULT_RETRY_INTERVAL_IN_MS;
     this.threadName =
@@ -204,21 +236,22 @@ public class SinkChannel implements ISinkChannel {
       long sizeInBytes = tsBlock.getSizeInBytes();
       int startSequenceId;
       startSequenceId = nextSequenceId;
-      blocked =
+      MemoryReservationResult reserveResult =
           localMemoryManager
               .getQueryPool()
-              .reserve(
+              .reserveWithPriority(
                   localFragmentInstanceId.getQueryId(),
                   fullFragmentInstanceId,
                   localPlanNodeId,
                   sizeInBytes,
-                  maxBytesCanReserve)
-              .left;
-      bufferRetainedSizeInBytes += sizeInBytes;
+                  maxBytesCanReserve,
+                  isHighestPriority);
+      blocked = reserveResult.getFuture();
+      bufferRetainedSizeInBytes += reserveResult.getReservedBytes();
 
       sequenceIdToTsBlock.put(nextSequenceId, new Pair<>(tsBlock, 
currentTsBlockSize));
       nextSequenceId += 1;
-      currentTsBlockSize = sizeInBytes;
+      currentTsBlockSize = reserveResult.getReservedBytes();
 
       submitSendNewDataBlockEventTask(startSequenceId, 
ImmutableList.of(sizeInBytes));
     } finally {
@@ -434,19 +467,21 @@ public class SinkChannel implements ISinkChannel {
       return;
     }
     // SinkChannel is opened when ShuffleSinkHandle choose it as the next 
channel
-    this.blocked =
+    MemoryReservationResult reserveResult =
         localMemoryManager
             .getQueryPool()
-            .reserve(
+            .reserveWithPriority(
                 localFragmentInstanceId.getQueryId(),
                 fullFragmentInstanceId,
                 localPlanNodeId,
                 DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
-                maxBytesCanReserve) // actually we only know 
maxBytesCanReserve after
-            // the handle is created, so we use DEFAULT here. It is ok to use 
DEFAULT here because
-            // at first this SinkChannel has not reserved memory.
-            .left;
-    this.bufferRetainedSizeInBytes = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+                maxBytesCanReserve,
+                isHighestPriority); // actually we only know 
maxBytesCanReserve after
+    // the handle is created, so we use DEFAULT here. It is ok to use DEFAULT 
here because
+    // at first this SinkChannel has not reserved memory.
+    this.blocked = reserveResult.getFuture();
+    this.bufferRetainedSizeInBytes = reserveResult.getReservedBytes();
+    this.currentTsBlockSize = maxBytesCanReserve;
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/SourceHandle.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/SourceHandle.java
index 1c6406a2ed9..ec29131af22 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/SourceHandle.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/SourceHandle.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
 import 
org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.SourceHandleListener;
 import org.apache.iotdb.db.queryengine.execution.memory.LocalMemoryManager;
+import 
org.apache.iotdb.db.queryengine.execution.memory.MemoryPool.MemoryReservationResult;
 import org.apache.iotdb.db.queryengine.metric.DataExchangeCostMetricSet;
 import org.apache.iotdb.db.queryengine.metric.DataExchangeCountMetricSet;
 import org.apache.iotdb.db.utils.SetThreadName;
@@ -116,6 +117,8 @@ public class SourceHandle implements ISourceHandle {
    */
   private boolean canGetTsBlockFromRemote = false;
 
+  private final boolean isHighestPriority;
+
   private static final DataExchangeCostMetricSet DATA_EXCHANGE_COST_METRIC_SET 
=
       DataExchangeCostMetricSet.getInstance();
   private static final DataExchangeCountMetricSet 
DATA_EXCHANGE_COUNT_METRIC_SET =
@@ -138,6 +141,34 @@ public class SourceHandle implements ISourceHandle {
       SourceHandleListener sourceHandleListener,
       IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
           mppDataExchangeServiceClientManager) {
+    this(
+        remoteEndpoint,
+        remoteFragmentInstanceId,
+        localFragmentInstanceId,
+        localPlanNodeId,
+        indexOfUpstreamSinkHandle,
+        localMemoryManager,
+        executorService,
+        serde,
+        sourceHandleListener,
+        false,
+        mppDataExchangeServiceClientManager);
+  }
+
+  @SuppressWarnings("squid:S107")
+  public SourceHandle(
+      TEndPoint remoteEndpoint,
+      TFragmentInstanceId remoteFragmentInstanceId,
+      TFragmentInstanceId localFragmentInstanceId,
+      String localPlanNodeId,
+      int indexOfUpstreamSinkHandle,
+      LocalMemoryManager localMemoryManager,
+      ExecutorService executorService,
+      TsBlockSerde serde,
+      SourceHandleListener sourceHandleListener,
+      boolean isHighestPriority,
+      IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
+          mppDataExchangeServiceClientManager) {
     this.remoteEndpoint = Validate.notNull(remoteEndpoint, "remoteEndpoint can 
not be null.");
     this.remoteFragmentInstanceId =
         Validate.notNull(remoteFragmentInstanceId, "remoteFragmentInstanceId 
can not be null.");
@@ -153,6 +184,7 @@ public class SourceHandle implements ISourceHandle {
     this.serde = Validate.notNull(serde, "serde can not be null.");
     this.sourceHandleListener =
         Validate.notNull(sourceHandleListener, "sourceHandleListener can not 
be null.");
+    this.isHighestPriority = isHighestPriority;
     this.bufferRetainedSizeInBytes = 0L;
     this.mppDataExchangeServiceClientManager = 
mppDataExchangeServiceClientManager;
     this.retryIntervalInMs = DEFAULT_RETRY_INTERVAL_IN_MS;
@@ -193,19 +225,24 @@ public class SourceHandle implements ISourceHandle {
       if (tsBlock == null) {
         return null;
       }
-      long retainedSize = sequenceIdToDataBlockSize.remove(currSequenceId);
+      Long retainedSize = sequenceIdToDataBlockSize.remove(currSequenceId);
+      if (retainedSize == null) {
+        throw new IllegalStateException("Reserved data block size is null.");
+      }
       if (LOGGER.isDebugEnabled()) {
         LOGGER.debug("[GetTsBlockFromBuffer] sequenceId:{}, size:{}", 
currSequenceId, retainedSize);
       }
       currSequenceId += 1;
-      bufferRetainedSizeInBytes -= retainedSize;
-      localMemoryManager
-          .getQueryPool()
-          .free(
-              localFragmentInstanceId.getQueryId(),
-              fullFragmentInstanceId,
-              localPlanNodeId,
-              retainedSize);
+      if (retainedSize > 0) {
+        bufferRetainedSizeInBytes -= retainedSize;
+        localMemoryManager
+            .getQueryPool()
+            .free(
+                localFragmentInstanceId.getQueryId(),
+                fullFragmentInstanceId,
+                localPlanNodeId,
+                retainedSize);
+      }
 
       if (sequenceIdToTsBlock.isEmpty() && !isFinished()) {
         if (LOGGER.isDebugEnabled()) {
@@ -242,18 +279,24 @@ public class SourceHandle implements ISourceHandle {
       if (bytesToReserve == null) {
         throw new IllegalStateException("Data block size is null.");
       }
-      pair =
+      MemoryReservationResult reserveResult =
           localMemoryManager
               .getQueryPool()
-              .reserve(
+              .reserveWithPriority(
                   localFragmentInstanceId.getQueryId(),
                   fullFragmentInstanceId,
                   localPlanNodeId,
                   bytesToReserve,
-                  maxBytesCanReserve);
-      bufferRetainedSizeInBytes += bytesToReserve;
+                  maxBytesCanReserve,
+                  isHighestPriority);
+      pair = new Pair<>(reserveResult.getFuture(), 
reserveResult.isReserveSuccess());
+      // actually reserve size is not equals raw size, update the actually 
reserve size to the map
+      if (reserveResult.getReservedBytes() != bytesToReserve) {
+        sequenceIdToDataBlockSize.put(endSequenceId, 
reserveResult.getReservedBytes());
+      }
+      bufferRetainedSizeInBytes += reserveResult.getReservedBytes();
       endSequenceId += 1;
-      reservedBytes += bytesToReserve;
+      reservedBytes += reserveResult.getReservedBytes();
       if (!Boolean.TRUE.equals(pair.right)) {
         blockedSize = bytesToReserve;
         break;
@@ -631,14 +674,16 @@ public class SourceHandle implements ISourceHandle {
         if (aborted || closed) {
           return;
         }
-        bufferRetainedSizeInBytes -= reservedBytes;
-        localMemoryManager
-            .getQueryPool()
-            .free(
-                localFragmentInstanceId.getQueryId(),
-                fullFragmentInstanceId,
-                localPlanNodeId,
-                reservedBytes);
+        if (reservedBytes > 0) {
+          bufferRetainedSizeInBytes -= reservedBytes;
+          localMemoryManager
+              .getQueryPool()
+              .free(
+                  localFragmentInstanceId.getQueryId(),
+                  fullFragmentInstanceId,
+                  localPlanNodeId,
+                  reservedBytes);
+        }
         sourceHandleListener.onFailure(SourceHandle.this, t);
       }
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index df9ac2d2f93..4cfd8dc63ec 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -161,6 +161,7 @@ public class FragmentInstanceContext extends QueryContext {
   private long unclosedUnseqFileNum = 0;
   private long closedSeqFileNum = 0;
   private long closedUnseqFileNum = 0;
+  private boolean highestPriority = false;
 
   public static FragmentInstanceContext createFragmentInstanceContext(
       FragmentInstanceId id,
@@ -1190,6 +1191,18 @@ public class FragmentInstanceContext extends 
QueryContext {
     return ignoreNotExistsDevice;
   }
 
+  /**
+   * Same flag as {@link
+   * 
org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis#needSetHighestPriority()}.
+   */
+  public boolean isHighestPriority() {
+    return highestPriority;
+  }
+
+  public void setHighestPriority(boolean highestPriority) {
+    this.highestPriority = highestPriority;
+  }
+
   public boolean isSingleSourcePath() {
     return singleSourcePath;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
index 1898cbfe53c..9dfb2ffa684 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
@@ -163,6 +163,7 @@ public class FragmentInstanceManager {
                                 dataNodeQueryContextMap,
                                 instance.isDebug(),
                                 instance.isVerbose()));
+                context.setHighestPriority(instance.isHighestPriority());
 
                 try {
                   List<PipelineDriverFactory> driverFactories =
@@ -277,6 +278,7 @@ public class FragmentInstanceManager {
                               instance.getSessionInfo(),
                               instance.isDebug(),
                               instance.isVerbose()));
+              context.setHighestPriority(instance.isHighestPriority());
 
               try {
                 List<PipelineDriverFactory> driverFactories =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
index 3e00c845dab..6afa7f692ef 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.exception.runtime.MemoryLeakException;
 
 import com.google.common.util.concurrent.AbstractFuture;
-import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.tsfile.external.commons.lang3.Validate;
 import org.apache.tsfile.utils.Pair;
@@ -43,6 +42,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.stream.Collectors;
 
+import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
+
 /** A thread-safe memory pool. */
 public class MemoryPool {
 
@@ -113,6 +114,31 @@ public class MemoryPool {
     }
   }
 
+  public static class MemoryReservationResult {
+    private final ListenableFuture<Void> future;
+    private final boolean reserveSuccess;
+    private final long reservedBytes;
+
+    public MemoryReservationResult(
+        ListenableFuture<Void> future, boolean reserveSuccess, long 
reservedBytes) {
+      this.future = future;
+      this.reserveSuccess = reserveSuccess;
+      this.reservedBytes = reservedBytes;
+    }
+
+    public ListenableFuture<Void> getFuture() {
+      return future;
+    }
+
+    public boolean isReserveSuccess() {
+      return reserveSuccess;
+    }
+
+    public long getReservedBytes() {
+      return reservedBytes;
+    }
+  }
+
   private final String id;
   private final IMemoryBlock memoryBlock;
   private final long maxBytesPerFragmentInstance;
@@ -236,6 +262,19 @@ public class MemoryPool {
       String planNodeId,
       long bytesToReserve,
       long maxBytesCanReserve) {
+    MemoryReservationResult result =
+        reserveWithPriority(
+            queryId, fragmentInstanceId, planNodeId, bytesToReserve, 
maxBytesCanReserve, false);
+    return new Pair<>(result.getFuture(), result.isReserveSuccess());
+  }
+
+  public MemoryReservationResult reserveWithPriority(
+      String queryId,
+      String fragmentInstanceId,
+      String planNodeId,
+      long bytesToReserve,
+      long maxBytesCanReserve,
+      boolean needSetHighestPriority) {
     Validate.notNull(queryId, "queryId can not be null.");
     Validate.notNull(fragmentInstanceId, "fragmentInstanceId can not be 
null.");
     Validate.notNull(planNodeId, "planNodeId can not be null.");
@@ -256,19 +295,21 @@ public class MemoryPool {
               bytesToReserve, maxBytesCanReserve));
     }
 
-    ListenableFuture<Void> result;
     if (tryReserve(queryId, fragmentInstanceId, planNodeId, bytesToReserve, 
maxBytesCanReserve)) {
-      result = Futures.immediateFuture(null);
-      return new Pair<>(result, Boolean.TRUE);
+      return new MemoryReservationResult(immediateVoidFuture(), true, 
bytesToReserve);
     } else {
+      rollbackReserve(queryId, fragmentInstanceId, planNodeId, bytesToReserve);
+      if (needSetHighestPriority) {
+        // SHOW QUERIES etc.: treat as success with zero bytes reserved from 
pool when insufficient.
+        return new MemoryReservationResult(immediateVoidFuture(), true, 0L);
+      }
       LOGGER.debug(
           "Blocked reserve request: {} bytes memory for planNodeId{}", 
bytesToReserve, planNodeId);
-      rollbackReserve(queryId, fragmentInstanceId, planNodeId, bytesToReserve);
-      result =
+      ListenableFuture<Void> result =
           MemoryReservationFuture.create(
               queryId, fragmentInstanceId, planNodeId, bytesToReserve, 
maxBytesCanReserve);
       memoryReservationFutures.add((MemoryReservationFuture<Void>) result);
-      return new Pair<>(result, Boolean.FALSE);
+      return new MemoryReservationResult(result, false, bytesToReserve);
     }
   }
 
@@ -299,7 +340,8 @@ public class MemoryPool {
   /**
    * Cancel the specified memory reservation. If the reservation has finished, 
do nothing.
    *
-   * @param future The future returned from {@link #reserve(String, String, 
String, long, long)}
+   * @param future The future returned from {@link 
#reserveWithPriority(String, String, String,
+   *     long, long, boolean)}
    * @return If the future has not complete, return the number of bytes being 
reserved. Otherwise,
    *     return 0.
    */
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
index 7500fbb9c34..788b1543046 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
@@ -129,6 +129,7 @@ public class QueryExecution implements IQueryExecution {
     this.context = context;
     this.planner = planner;
     this.analysis = analyze(context);
+    context.setNeedSetHighestPriority(analysis.needSetHighestPriority());
     this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
 
     // We add the abort logic inside the QueryExecution.
@@ -610,7 +611,8 @@ public class QueryExecution implements IQueryExecution {
                     
context.getResultNodeContext().getUpStreamPlanNodeId().getId(),
                     
context.getResultNodeContext().getUpStreamFragmentInstanceId().toThrift(),
                     0, // Upstream of result ExchangeNode will only have one 
child.
-                    stateMachine::transitionToFailed)
+                    stateMachine::transitionToFailed,
+                    context.needSetHighestPriority())
             : MPPDataExchangeService.getInstance()
                 .getMPPDataExchangeManager()
                 .createSourceHandle(
@@ -619,7 +621,8 @@ public class QueryExecution implements IQueryExecution {
                     0,
                     upstreamEndPoint,
                     
context.getResultNodeContext().getUpStreamFragmentInstanceId().toThrift(),
-                    stateMachine::transitionToFailed);
+                    stateMachine::transitionToFailed,
+                    context.needSetHighestPriority());
   }
 
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity 
warning
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index 422454c11b6..0e32dfd097e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -2694,14 +2694,16 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                 node.getUpstreamPlanNodeId().getId(),
                 remoteInstanceId.toThrift(),
                 node.getIndexOfUpstreamSinkHandle(),
-                context.getInstanceContext()::failed)
+                context.getInstanceContext()::failed,
+                context.getInstanceContext().isHighestPriority())
             : MPP_DATA_EXCHANGE_MANAGER.createSourceHandle(
                 localInstanceId.toThrift(),
                 node.getPlanNodeId().getId(),
                 node.getIndexOfUpstreamSinkHandle(),
                 upstreamEndPoint,
                 remoteInstanceId.toThrift(),
-                context.getInstanceContext()::failed);
+                context.getInstanceContext()::failed,
+                context.getInstanceContext().isHighestPriority());
     if (!isSameNode) {
       context.addExchangeSumNum(1);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index c31a2061f49..f1bb43abed4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -454,14 +454,16 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
                 node.getUpstreamPlanNodeId().getId(),
                 remoteInstanceId.toThrift(),
                 node.getIndexOfUpstreamSinkHandle(),
-                context.getInstanceContext()::failed)
+                context.getInstanceContext()::failed,
+                context.getInstanceContext().isHighestPriority())
             : MPP_DATA_EXCHANGE_MANAGER.createSourceHandle(
                 localInstanceId.toThrift(),
                 node.getPlanNodeId().getId(),
                 node.getIndexOfUpstreamSinkHandle(),
                 upstreamEndPoint,
                 remoteInstanceId.toThrift(),
-                context.getInstanceContext()::failed);
+                context.getInstanceContext()::failed,
+                context.getInstanceContext().isHighestPriority());
     if (!isSameNode) {
       context.addExchangeSumNum(1);
     }


Reply via email to