This is an automated email from the ASF dual-hosted git repository.
JackieTien97 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 63ea91a83f8 [To dev/1.3] Enable show queries to be executed
immediately when the available memory in the memoryPool is insufficient (#17529)
63ea91a83f8 is described below
commit 63ea91a83f870d8fc9fae4386273096c1f1b98eb
Author: Weihao Li <[email protected]>
AuthorDate: Wed Apr 22 09:25:36 2026 +0800
[To dev/1.3] Enable show queries to be executed immediately when the
available memory in the memoryPool is insufficient (#17529)
---
.../db/queryengine/common/MPPQueryContext.java | 14 +++
.../execution/exchange/MPPDataExchangeManager.java | 56 +++++++++-
.../execution/exchange/SharedTsBlockQueue.java | 57 ++++++----
.../execution/exchange/sink/SinkChannel.java | 62 ++++++++---
.../execution/exchange/source/SourceHandle.java | 90 ++++++++++++----
.../fragment/FragmentInstanceContext.java | 13 +++
.../fragment/FragmentInstanceManager.java | 2 +
.../queryengine/execution/memory/MemoryPool.java | 56 +++++++---
.../db/queryengine/plan/analyze/Analysis.java | 2 +-
.../queryengine/plan/execution/QueryExecution.java | 7 +-
.../plan/planner/OperatorTreeGenerator.java | 6 +-
.../plan/planner/plan/FragmentInstance.java | 2 +
.../execution/exchange/LocalSinkChannelTest.java | 10 +-
.../execution/exchange/SharedTsBlockQueueTest.java | 11 +-
.../execution/exchange/ShuffleSinkHandleTest.java | 5 +-
.../execution/exchange/SourceHandleTest.java | 5 +-
.../db/queryengine/execution/exchange/Utils.java | 26 +++--
.../execution/memory/MemoryPoolTest.java | 116 ++++++++++++++++++---
18 files changed, 427 insertions(+), 113 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
index 7479e832a90..13773c15090 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
@@ -100,6 +100,12 @@ public class MPPQueryContext {
private boolean userQuery = false;
+ /**
+ * When true (e.g. SHOW QUERIES), operator and exchange memory may use
fallback when pool is
+ * insufficient. Set from analysis via {@link
#setNeedSetHighestPriority(boolean)}.
+ */
+ private boolean needSetHighestPriority = false;
+
@TestOnly
public MPPQueryContext(QueryId queryId) {
this.queryId = queryId;
@@ -406,6 +412,14 @@ public class MPPQueryContext {
this.userQuery = userQuery;
}
+ public boolean needSetHighestPriority() {
+ return needSetHighestPriority;
+ }
+
+ public void setNeedSetHighestPriority(boolean needSetHighestPriority) {
+ this.needSetHighestPriority = needSetHighestPriority;
+ }
+
public String getClientHostName() {
if (session == null || session.getCliHostname() == null) {
return "UNKNOWN";
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java
index ac0b4111090..61dc05d4bfd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
import
org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
+import org.apache.iotdb.commons.utils.TestOnly;
import
org.apache.iotdb.db.queryengine.exception.exchange.GetTsBlockFromClosedOrAbortedChannelException;
import org.apache.iotdb.db.queryengine.execution.driver.DriverContext;
import
org.apache.iotdb.db.queryengine.execution.exchange.sink.DownStreamChannelIndex;
@@ -656,7 +657,11 @@ public class MPPDataExchangeManager implements
IMPPDataExchangeManager {
}
queue =
new SharedTsBlockQueue(
- localFragmentInstanceId, localPlanNodeId, localMemoryManager,
executorService);
+ localFragmentInstanceId,
+ localPlanNodeId,
+ localMemoryManager,
+ executorService,
+ instanceContext.isHighestPriority());
}
return new LocalSinkChannel(
@@ -680,7 +685,8 @@ public class MPPDataExchangeManager implements
IMPPDataExchangeManager {
driverContext.getDriverTaskID().getFragmentInstanceId().toThrift(),
planNodeId,
localMemoryManager,
- executorService);
+ executorService,
+ driverContext.getFragmentInstanceContext().isHighestPriority());
queue.allowAddingTsBlock();
return new LocalSinkChannel(
queue,
@@ -718,6 +724,7 @@ public class MPPDataExchangeManager implements
IMPPDataExchangeManager {
tsBlockSerdeFactory.get(),
new ISinkChannelListenerImpl(
localFragmentInstanceId, instanceContext, instanceContext::failed,
cnt),
+ instanceContext.isHighestPriority(),
mppDataExchangeServiceClientManager);
}
@@ -802,6 +809,7 @@ public class MPPDataExchangeManager implements
IMPPDataExchangeManager {
context.getDriverTaskID().toString());
}
+ @TestOnly
public synchronized ISourceHandle createLocalSourceHandleForFragment(
TFragmentInstanceId localFragmentInstanceId,
String localPlanNodeId,
@@ -809,6 +817,24 @@ public class MPPDataExchangeManager implements
IMPPDataExchangeManager {
TFragmentInstanceId remoteFragmentInstanceId,
int index,
IMPPDataExchangeManagerCallback<Throwable> onFailureCallback) {
+ return createLocalSourceHandleForFragment(
+ localFragmentInstanceId,
+ localPlanNodeId,
+ remotePlanNodeId,
+ remoteFragmentInstanceId,
+ index,
+ onFailureCallback,
+ false);
+ }
+
+ public synchronized ISourceHandle createLocalSourceHandleForFragment(
+ TFragmentInstanceId localFragmentInstanceId,
+ String localPlanNodeId,
+ String remotePlanNodeId,
+ TFragmentInstanceId remoteFragmentInstanceId,
+ int index,
+ IMPPDataExchangeManagerCallback<Throwable> onFailureCallback,
+ boolean isHighestPriority) {
if (sourceHandles.containsKey(localFragmentInstanceId)
&&
sourceHandles.get(localFragmentInstanceId).containsKey(localPlanNodeId)) {
throw new IllegalStateException(
@@ -840,7 +866,11 @@ public class MPPDataExchangeManager implements
IMPPDataExchangeManager {
}
queue =
new SharedTsBlockQueue(
- remoteFragmentInstanceId, remotePlanNodeId, localMemoryManager,
executorService);
+ remoteFragmentInstanceId,
+ remotePlanNodeId,
+ localMemoryManager,
+ executorService,
+ isHighestPriority);
}
LocalSourceHandle localSourceHandle =
new LocalSourceHandle(
@@ -854,6 +884,7 @@ public class MPPDataExchangeManager implements
IMPPDataExchangeManager {
return localSourceHandle;
}
+ @TestOnly
@Override
public ISourceHandle createSourceHandle(
TFragmentInstanceId localFragmentInstanceId,
@@ -862,6 +893,24 @@ public class MPPDataExchangeManager implements
IMPPDataExchangeManager {
TEndPoint remoteEndpoint,
TFragmentInstanceId remoteFragmentInstanceId,
IMPPDataExchangeManagerCallback<Throwable> onFailureCallback) {
+ return createSourceHandle(
+ localFragmentInstanceId,
+ localPlanNodeId,
+ indexOfUpstreamSinkHandle,
+ remoteEndpoint,
+ remoteFragmentInstanceId,
+ onFailureCallback,
+ false);
+ }
+
+ public ISourceHandle createSourceHandle(
+ TFragmentInstanceId localFragmentInstanceId,
+ String localPlanNodeId,
+ int indexOfUpstreamSinkHandle,
+ TEndPoint remoteEndpoint,
+ TFragmentInstanceId remoteFragmentInstanceId,
+ IMPPDataExchangeManagerCallback<Throwable> onFailureCallback,
+ boolean isHighestPriority) {
Map<String, ISourceHandle> sourceHandleMap =
sourceHandles.get(localFragmentInstanceId);
if (sourceHandleMap != null &&
sourceHandleMap.containsKey(localPlanNodeId)) {
throw new IllegalStateException(
@@ -891,6 +940,7 @@ public class MPPDataExchangeManager implements
IMPPDataExchangeManager {
executorService,
tsBlockSerdeFactory.get(),
new SourceHandleListenerImpl(onFailureCallback),
+ isHighestPriority,
mppDataExchangeServiceClientManager);
sourceHandles
.computeIfAbsent(localFragmentInstanceId, key -> new
ConcurrentHashMap<>())
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
index 555cf9efe5a..f4c21e2fdfa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
@@ -19,11 +19,13 @@
package org.apache.iotdb.db.queryengine.execution.exchange;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
import
org.apache.iotdb.db.queryengine.execution.exchange.sink.LocalSinkChannel;
import
org.apache.iotdb.db.queryengine.execution.exchange.source.LocalSourceHandle;
import org.apache.iotdb.db.queryengine.execution.memory.LocalMemoryManager;
+import
org.apache.iotdb.db.queryengine.execution.memory.MemoryPool.MemoryReservationResult;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import com.google.common.util.concurrent.ListenableFuture;
@@ -62,7 +64,7 @@ public class SharedTsBlockQueue {
private long bufferRetainedSizeInBytes = 0L;
- private final Queue<TsBlock> queue = new LinkedList<>();
+ private final Queue<Pair<TsBlock, Long>> queue = new LinkedList<>();
private SettableFuture<Void> blocked = SettableFuture.create();
@@ -82,17 +84,28 @@ public class SharedTsBlockQueue {
private long maxBytesCanReserve =
IoTDBDescriptor.getInstance().getConfig().getMaxBytesPerFragmentInstance();
+ private final boolean isHighestPriority;
private volatile Throwable abortedCause = null;
// used for SharedTsBlockQueue listener
private final ExecutorService executorService;
+ @TestOnly
public SharedTsBlockQueue(
TFragmentInstanceId fragmentInstanceId,
String planNodeId,
LocalMemoryManager localMemoryManager,
ExecutorService executorService) {
+ this(fragmentInstanceId, planNodeId, localMemoryManager, executorService,
false);
+ }
+
+ public SharedTsBlockQueue(
+ TFragmentInstanceId fragmentInstanceId,
+ String planNodeId,
+ LocalMemoryManager localMemoryManager,
+ ExecutorService executorService,
+ boolean isHighestPriority) {
this.localFragmentInstanceId =
Validate.notNull(fragmentInstanceId, "fragment instance ID cannot be
null");
this.fullFragmentInstanceId =
@@ -101,6 +114,7 @@ public class SharedTsBlockQueue {
this.localMemoryManager =
Validate.notNull(localMemoryManager, "local memory manager cannot be
null");
this.executorService = Validate.notNull(executorService, "ExecutorService
can not be null.");
+ this.isHighestPriority = isHighestPriority;
}
public boolean hasNoMoreTsBlocks() {
@@ -195,15 +209,18 @@ public class SharedTsBlockQueue {
}
throw new IllegalStateException("queue has been destroyed");
}
- TsBlock tsBlock = queue.remove();
- localMemoryManager
- .getQueryPool()
- .free(
- localFragmentInstanceId.getQueryId(),
- fullFragmentInstanceId,
- localPlanNodeId,
- tsBlock.getSizeInBytes());
- bufferRetainedSizeInBytes -= tsBlock.getSizeInBytes();
+ Pair<TsBlock, Long> tsBlockWithReservedBytes = queue.remove();
+ long reservedBytes = tsBlockWithReservedBytes.right;
+ if (reservedBytes > 0) {
+ localMemoryManager
+ .getQueryPool()
+ .free(
+ localFragmentInstanceId.getQueryId(),
+ fullFragmentInstanceId,
+ localPlanNodeId,
+ reservedBytes);
+ bufferRetainedSizeInBytes -= reservedBytes;
+ }
// Every time LocalSourceHandle consumes a TsBlock, it needs to send the
event
// to
// corresponding LocalSinkChannel.
@@ -213,7 +230,7 @@ public class SharedTsBlockQueue {
if (blocked.isDone() && queue.isEmpty() && !noMoreTsBlocks) {
blocked = SettableFuture.create();
}
- return tsBlock;
+ return tsBlockWithReservedBytes.left;
}
/**
@@ -236,20 +253,22 @@ public class SharedTsBlockQueue {
localFragmentInstanceId.queryId, fullFragmentInstanceId,
localPlanNodeId);
alreadyRegistered = true;
}
- Pair<ListenableFuture<Void>, Boolean> pair =
+ MemoryReservationResult reserveResult =
localMemoryManager
.getQueryPool()
- .reserve(
+ .reserveWithPriority(
localFragmentInstanceId.getQueryId(),
fullFragmentInstanceId,
localPlanNodeId,
tsBlock.getSizeInBytes(),
- maxBytesCanReserve);
- blockedOnMemory = pair.left;
- bufferRetainedSizeInBytes += tsBlock.getSizeInBytes();
+ maxBytesCanReserve,
+ isHighestPriority);
+ blockedOnMemory = reserveResult.getFuture();
+ long reservedBytes = reserveResult.getReservedBytes();
+ bufferRetainedSizeInBytes += reservedBytes;
// reserve memory failed, we should wait until there is enough memory
- if (!Boolean.TRUE.equals(pair.right)) {
+ if (!reserveResult.isReserveSuccess()) {
SettableFuture<Void> channelBlocked = SettableFuture.create();
blockedOnMemory.addListener(
() -> {
@@ -264,7 +283,7 @@ public class SharedTsBlockQueue {
channelBlocked.set(null);
return;
}
- queue.add(tsBlock);
+ queue.add(new Pair<>(tsBlock, reservedBytes));
if (!blocked.isDone()) {
blocked.set(null);
}
@@ -281,7 +300,7 @@ public class SharedTsBlockQueue {
executorService);
return channelBlocked;
} else { // reserve memory succeeded, add the TsBlock directly
- queue.add(tsBlock);
+ queue.add(new Pair<>(tsBlock, reservedBytes));
if (!blocked.isDone()) {
blocked.set(null);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java
index ca6fdadc993..47ce6128fb7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java
@@ -28,6 +28,7 @@ import
org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
import
org.apache.iotdb.db.queryengine.exception.exchange.GetTsBlockFromClosedOrAbortedChannelException;
import
org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.SinkListener;
import org.apache.iotdb.db.queryengine.execution.memory.LocalMemoryManager;
+import
org.apache.iotdb.db.queryengine.execution.memory.MemoryPool.MemoryReservationResult;
import org.apache.iotdb.db.queryengine.metric.DataExchangeCostMetricSet;
import org.apache.iotdb.db.queryengine.metric.DataExchangeCountMetricSet;
import org.apache.iotdb.db.utils.SetThreadName;
@@ -119,6 +120,8 @@ public class SinkChannel implements ISinkChannel {
private long maxBytesCanReserve =
IoTDBDescriptor.getInstance().getConfig().getMaxBytesPerFragmentInstance();
+ private final boolean isHighestPriority;
+
private static final DataExchangeCostMetricSet DATA_EXCHANGE_COST_METRIC_SET
=
DataExchangeCostMetricSet.getInstance();
private static final DataExchangeCountMetricSet
DATA_EXCHANGE_COUNT_METRIC_SET =
@@ -128,6 +131,34 @@ public class SinkChannel implements ISinkChannel {
RamUsageEstimator.shallowSizeOfInstance(SinkChannel.class)
+ RamUsageEstimator.shallowSizeOfInstance(TFragmentInstanceId.class)
* 2;
+ @TestOnly
+ @SuppressWarnings("squid:S107")
+ public SinkChannel(
+ TEndPoint remoteEndpoint,
+ TFragmentInstanceId remoteFragmentInstanceId,
+ String remotePlanNodeId,
+ String localPlanNodeId,
+ TFragmentInstanceId localFragmentInstanceId,
+ LocalMemoryManager localMemoryManager,
+ ExecutorService executorService,
+ TsBlockSerde serde,
+ SinkListener sinkListener,
+ IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
+ mppDataExchangeServiceClientManager) {
+ this(
+ remoteEndpoint,
+ remoteFragmentInstanceId,
+ remotePlanNodeId,
+ localPlanNodeId,
+ localFragmentInstanceId,
+ localMemoryManager,
+ executorService,
+ serde,
+ sinkListener,
+ false,
+ mppDataExchangeServiceClientManager);
+ }
+
@SuppressWarnings("squid:S107")
public SinkChannel(
TEndPoint remoteEndpoint,
@@ -139,6 +170,7 @@ public class SinkChannel implements ISinkChannel {
ExecutorService executorService,
TsBlockSerde serde,
SinkListener sinkListener,
+ boolean isHighestPriority,
IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
mppDataExchangeServiceClientManager) {
this.remoteEndpoint = Validate.notNull(remoteEndpoint, "remoteEndPoint can
not be null.");
@@ -155,6 +187,7 @@ public class SinkChannel implements ISinkChannel {
this.executorService = Validate.notNull(executorService, "executorService
can not be null.");
this.serde = Validate.notNull(serde, "serde can not be null.");
this.sinkListener = Validate.notNull(sinkListener, "sinkListener can not
be null.");
+ this.isHighestPriority = isHighestPriority;
this.mppDataExchangeServiceClientManager =
mppDataExchangeServiceClientManager;
this.retryIntervalInMs = DEFAULT_RETRY_INTERVAL_IN_MS;
this.threadName =
@@ -204,21 +237,22 @@ public class SinkChannel implements ISinkChannel {
long sizeInBytes = tsBlock.getSizeInBytes();
int startSequenceId;
startSequenceId = nextSequenceId;
- blocked =
+ MemoryReservationResult reserveResult =
localMemoryManager
.getQueryPool()
- .reserve(
+ .reserveWithPriority(
localFragmentInstanceId.getQueryId(),
fullFragmentInstanceId,
localPlanNodeId,
sizeInBytes,
- maxBytesCanReserve)
- .left;
- bufferRetainedSizeInBytes += sizeInBytes;
+ maxBytesCanReserve,
+ isHighestPriority);
+ blocked = reserveResult.getFuture();
+ bufferRetainedSizeInBytes += reserveResult.getReservedBytes();
sequenceIdToTsBlock.put(nextSequenceId, new Pair<>(tsBlock,
currentTsBlockSize));
nextSequenceId += 1;
- currentTsBlockSize = sizeInBytes;
+ currentTsBlockSize = reserveResult.getReservedBytes();
submitSendNewDataBlockEventTask(startSequenceId,
ImmutableList.of(sizeInBytes));
} finally {
@@ -433,19 +467,21 @@ public class SinkChannel implements ISinkChannel {
return;
}
// SinkChannel is opened when ShuffleSinkHandle choose it as the next
channel
- this.blocked =
+ MemoryReservationResult reserveResult =
localMemoryManager
.getQueryPool()
- .reserve(
+ .reserveWithPriority(
localFragmentInstanceId.getQueryId(),
fullFragmentInstanceId,
localPlanNodeId,
DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
- maxBytesCanReserve) // actually we only know
maxBytesCanReserve after
- // the handle is created, so we use DEFAULT here. It is ok to use
DEFAULT here because
- // at first this SinkChannel has not reserved memory.
- .left;
- this.bufferRetainedSizeInBytes = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+ maxBytesCanReserve,
+ isHighestPriority); // actually we only know
maxBytesCanReserve after
+ // the handle is created, so we use DEFAULT here. It is ok to use DEFAULT
here because
+ // at first this SinkChannel has not reserved memory.
+ this.blocked = reserveResult.getFuture();
+ this.bufferRetainedSizeInBytes = reserveResult.getReservedBytes();
+ this.currentTsBlockSize = reserveResult.getReservedBytes();
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/SourceHandle.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/SourceHandle.java
index 2d1a06fcd32..0e0b94094b5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/SourceHandle.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/SourceHandle.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
import
org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.SourceHandleListener;
import org.apache.iotdb.db.queryengine.execution.memory.LocalMemoryManager;
+import
org.apache.iotdb.db.queryengine.execution.memory.MemoryPool.MemoryReservationResult;
import org.apache.iotdb.db.queryengine.metric.DataExchangeCostMetricSet;
import org.apache.iotdb.db.queryengine.metric.DataExchangeCountMetricSet;
import org.apache.iotdb.db.utils.SetThreadName;
@@ -115,6 +116,8 @@ public class SourceHandle implements ISourceHandle {
*/
private boolean canGetTsBlockFromRemote = false;
+ private final boolean isHighestPriority;
+
private static final DataExchangeCostMetricSet DATA_EXCHANGE_COST_METRIC_SET
=
DataExchangeCostMetricSet.getInstance();
private static final DataExchangeCountMetricSet
DATA_EXCHANGE_COUNT_METRIC_SET =
@@ -124,6 +127,7 @@ public class SourceHandle implements ISourceHandle {
RamUsageEstimator.shallowSizeOfInstance(SourceHandle.class)
+ RamUsageEstimator.shallowSizeOfInstance(TFragmentInstanceId.class)
* 2;
+ @TestOnly
@SuppressWarnings("squid:S107")
public SourceHandle(
TEndPoint remoteEndpoint,
@@ -137,6 +141,34 @@ public class SourceHandle implements ISourceHandle {
SourceHandleListener sourceHandleListener,
IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
mppDataExchangeServiceClientManager) {
+ this(
+ remoteEndpoint,
+ remoteFragmentInstanceId,
+ localFragmentInstanceId,
+ localPlanNodeId,
+ indexOfUpstreamSinkHandle,
+ localMemoryManager,
+ executorService,
+ serde,
+ sourceHandleListener,
+ false,
+ mppDataExchangeServiceClientManager);
+ }
+
+ @SuppressWarnings("squid:S107")
+ public SourceHandle(
+ TEndPoint remoteEndpoint,
+ TFragmentInstanceId remoteFragmentInstanceId,
+ TFragmentInstanceId localFragmentInstanceId,
+ String localPlanNodeId,
+ int indexOfUpstreamSinkHandle,
+ LocalMemoryManager localMemoryManager,
+ ExecutorService executorService,
+ TsBlockSerde serde,
+ SourceHandleListener sourceHandleListener,
+ boolean isHighestPriority,
+ IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
+ mppDataExchangeServiceClientManager) {
this.remoteEndpoint = Validate.notNull(remoteEndpoint, "remoteEndpoint can
not be null.");
this.remoteFragmentInstanceId =
Validate.notNull(remoteFragmentInstanceId, "remoteFragmentInstanceId
can not be null.");
@@ -152,6 +184,7 @@ public class SourceHandle implements ISourceHandle {
this.serde = Validate.notNull(serde, "serde can not be null.");
this.sourceHandleListener =
Validate.notNull(sourceHandleListener, "sourceHandleListener can not
be null.");
+ this.isHighestPriority = isHighestPriority;
this.bufferRetainedSizeInBytes = 0L;
this.mppDataExchangeServiceClientManager =
mppDataExchangeServiceClientManager;
this.retryIntervalInMs = DEFAULT_RETRY_INTERVAL_IN_MS;
@@ -192,19 +225,24 @@ public class SourceHandle implements ISourceHandle {
if (tsBlock == null) {
return null;
}
- long retainedSize = sequenceIdToDataBlockSize.remove(currSequenceId);
+ Long retainedSize = sequenceIdToDataBlockSize.remove(currSequenceId);
+ if (retainedSize == null) {
+ throw new IllegalStateException("Reserved data block size is null.");
+ }
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[GetTsBlockFromBuffer] sequenceId:{}, size:{}",
currSequenceId, retainedSize);
}
currSequenceId += 1;
- bufferRetainedSizeInBytes -= retainedSize;
- localMemoryManager
- .getQueryPool()
- .free(
- localFragmentInstanceId.getQueryId(),
- fullFragmentInstanceId,
- localPlanNodeId,
- retainedSize);
+ if (retainedSize > 0) {
+ bufferRetainedSizeInBytes -= retainedSize;
+ localMemoryManager
+ .getQueryPool()
+ .free(
+ localFragmentInstanceId.getQueryId(),
+ fullFragmentInstanceId,
+ localPlanNodeId,
+ retainedSize);
+ }
if (sequenceIdToTsBlock.isEmpty() && !isFinished()) {
if (LOGGER.isDebugEnabled()) {
@@ -241,18 +279,24 @@ public class SourceHandle implements ISourceHandle {
if (bytesToReserve == null) {
throw new IllegalStateException("Data block size is null.");
}
- pair =
+ MemoryReservationResult reserveResult =
localMemoryManager
.getQueryPool()
- .reserve(
+ .reserveWithPriority(
localFragmentInstanceId.getQueryId(),
fullFragmentInstanceId,
localPlanNodeId,
bytesToReserve,
- maxBytesCanReserve);
- bufferRetainedSizeInBytes += bytesToReserve;
+ maxBytesCanReserve,
+ isHighestPriority);
+ pair = new Pair<>(reserveResult.getFuture(),
reserveResult.isReserveSuccess());
+ // actually reserve size is not equals raw size, update the actually
reserve size to the map
+ if (reserveResult.getReservedBytes() != bytesToReserve) {
+ sequenceIdToDataBlockSize.put(endSequenceId,
reserveResult.getReservedBytes());
+ }
+ bufferRetainedSizeInBytes += reserveResult.getReservedBytes();
endSequenceId += 1;
- reservedBytes += bytesToReserve;
+ reservedBytes += reserveResult.getReservedBytes();
if (!Boolean.TRUE.equals(pair.right)) {
blockedSize = bytesToReserve;
break;
@@ -619,14 +663,16 @@ public class SourceHandle implements ISourceHandle {
if (aborted || closed) {
return;
}
- bufferRetainedSizeInBytes -= reservedBytes;
- localMemoryManager
- .getQueryPool()
- .free(
- localFragmentInstanceId.getQueryId(),
- fullFragmentInstanceId,
- localPlanNodeId,
- reservedBytes);
+ if (reservedBytes > 0) {
+ bufferRetainedSizeInBytes -= reservedBytes;
+ localMemoryManager
+ .getQueryPool()
+ .free(
+ localFragmentInstanceId.getQueryId(),
+ fullFragmentInstanceId,
+ localPlanNodeId,
+ reservedBytes);
+ }
sourceHandleListener.onFailure(SourceHandle.this, t);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index f655df432bf..f02b86579b4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -152,6 +152,7 @@ public class FragmentInstanceContext extends QueryContext {
private long unclosedUnseqFileNum = 0;
private long closedSeqFileNum = 0;
private long closedUnseqFileNum = 0;
+ private boolean highestPriority = false;
public static FragmentInstanceContext createFragmentInstanceContext(
FragmentInstanceId id, FragmentInstanceStateMachine stateMachine,
SessionInfo sessionInfo) {
@@ -1128,6 +1129,18 @@ public class FragmentInstanceContext extends
QueryContext {
return ignoreNotExistsDevice;
}
+ /**
+ * Same flag as {@link
+ *
org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis#needSetHighestPriority()}.
+ */
+ public boolean isHighestPriority() {
+ return highestPriority;
+ }
+
+ public void setHighestPriority(boolean highestPriority) {
+ this.highestPriority = highestPriority;
+ }
+
public boolean isSingleSourcePath() {
return singleSourcePath;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
index e8d0fd82432..9dacdf44d88 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
@@ -158,6 +158,7 @@ public class FragmentInstanceManager {
dataRegion,
instance.getGlobalTimePredicate(),
dataNodeQueryContextMap));
+ context.setHighestPriority(instance.isHighestPriority());
try {
List<PipelineDriverFactory> driverFactories =
@@ -259,6 +260,7 @@ public class FragmentInstanceManager {
fragmentInstanceId ->
createFragmentInstanceContext(
fragmentInstanceId, stateMachine,
instance.getSessionInfo()));
+ context.setHighestPriority(instance.isHighestPriority());
try {
List<PipelineDriverFactory> driverFactories =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
index a9ab6ed5d81..9a00d5a87de 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
@@ -23,10 +23,8 @@ import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.exception.runtime.MemoryLeakException;
import com.google.common.util.concurrent.AbstractFuture;
-import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.commons.lang3.Validate;
-import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,6 +39,8 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
+import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
+
/** A thread-safe memory pool. */
public class MemoryPool {
@@ -111,6 +111,31 @@ public class MemoryPool {
}
}
+ public static class MemoryReservationResult {
+ private final ListenableFuture<Void> future;
+ private final boolean reserveSuccess;
+ private final long reservedBytes;
+
+ public MemoryReservationResult(
+ ListenableFuture<Void> future, boolean reserveSuccess, long
reservedBytes) {
+ this.future = future;
+ this.reserveSuccess = reserveSuccess;
+ this.reservedBytes = reservedBytes;
+ }
+
+ public ListenableFuture<Void> getFuture() {
+ return future;
+ }
+
+ public boolean isReserveSuccess() {
+ return reserveSuccess;
+ }
+
+ public long getReservedBytes() {
+ return reservedBytes;
+ }
+ }
+
private final String id;
private final long maxBytes;
private final long maxBytesPerFragmentInstance;
@@ -220,18 +245,20 @@ public class MemoryPool {
}
/**
- * Reserve memory with bytesToReserve.
+ * Reserve memory with bytesToReserve respect priority.
*
- * @return if reserve succeed, pair.right will be true, otherwise false
+ * @return if reserve succeed, reservedBytes may be zero or equals with
bytesToReserve; if reserve
+ * failed, reservedBytes must be equals with bytesToReserve
* @throws IllegalArgumentException throw exception if current query
requests more memory than can
* be allocated.
*/
- public Pair<ListenableFuture<Void>, Boolean> reserve(
+ public MemoryReservationResult reserveWithPriority(
String queryId,
String fragmentInstanceId,
String planNodeId,
long bytesToReserve,
- long maxBytesCanReserve) {
+ long maxBytesCanReserve,
+ boolean isHighestPriority) {
Validate.notNull(queryId, "queryId can not be null.");
Validate.notNull(fragmentInstanceId, "fragmentInstanceId can not be
null.");
Validate.notNull(planNodeId, "planNodeId can not be null.");
@@ -250,19 +277,21 @@ public class MemoryPool {
"Query is aborted since it requests more memory than can be
allocated.");
}
- ListenableFuture<Void> result;
if (tryReserve(queryId, fragmentInstanceId, planNodeId, bytesToReserve,
maxBytesCanReserve)) {
- result = Futures.immediateFuture(null);
- return new Pair<>(result, Boolean.TRUE);
+ return new MemoryReservationResult(immediateVoidFuture(), true,
bytesToReserve);
} else {
+ rollbackReserve(queryId, fragmentInstanceId, planNodeId, bytesToReserve);
+ if (isHighestPriority) {
+ // SHOW QUERIES: treat as success with zero bytes reserved from pool
when insufficient.
+ return new MemoryReservationResult(immediateVoidFuture(), true, 0L);
+ }
LOGGER.debug(
"Blocked reserve request: {} bytes memory for planNodeId{}",
bytesToReserve, planNodeId);
- rollbackReserve(queryId, fragmentInstanceId, planNodeId, bytesToReserve);
- result =
+ ListenableFuture<Void> result =
MemoryReservationFuture.create(
queryId, fragmentInstanceId, planNodeId, bytesToReserve,
maxBytesCanReserve);
memoryReservationFutures.add((MemoryReservationFuture<Void>) result);
- return new Pair<>(result, Boolean.FALSE);
+ return new MemoryReservationResult(result, false, bytesToReserve);
}
}
@@ -293,7 +322,8 @@ public class MemoryPool {
/**
* Cancel the specified memory reservation. If the reservation has finished,
do nothing.
*
- * @param future The future returned from {@link #reserve(String, String,
String, long, long)}
+ * @param future The future returned from {@link
#reserveWithPriority(String, String, String,
+ * long, long, boolean)}
* @return If the future has not complete, return the number of bytes being
reserved. Otherwise,
* return 0.
*/
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
index 01d1020934d..63b2d379ddd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
@@ -503,7 +503,7 @@ public class Analysis implements IAnalysis {
public boolean needSetHighestPriority() {
// if is this Statement is ShowQueryStatement, set its instances to the
highest priority, so
// that the sub-tasks of the ShowQueries instances could be executed first.
- return StatementType.SHOW_QUERIES.equals(statement.getType());
+ return statement != null &&
StatementType.SHOW_QUERIES.equals(statement.getType());
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
index 8de6165525e..5af143bfca8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
@@ -125,6 +125,7 @@ public class QueryExecution implements IQueryExecution {
this.context = context;
this.planner = planner;
this.analysis = analyze(context);
+ context.setNeedSetHighestPriority(analysis.needSetHighestPriority());
this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
// We add the abort logic inside the QueryExecution.
@@ -587,7 +588,8 @@ public class QueryExecution implements IQueryExecution {
context.getResultNodeContext().getUpStreamPlanNodeId().getId(),
context.getResultNodeContext().getUpStreamFragmentInstanceId().toThrift(),
0, // Upstream of result ExchangeNode will only have one
child.
- stateMachine::transitionToFailed)
+ stateMachine::transitionToFailed,
+ context.needSetHighestPriority())
: MPPDataExchangeService.getInstance()
.getMPPDataExchangeManager()
.createSourceHandle(
@@ -596,7 +598,8 @@ public class QueryExecution implements IQueryExecution {
0,
upstreamEndPoint,
context.getResultNodeContext().getUpStreamFragmentInstanceId().toThrift(),
- stateMachine::transitionToFailed);
+ stateMachine::transitionToFailed,
+ context.needSetHighestPriority());
}
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity
warning
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index e0aeacb7f56..0649f808c4f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -2624,14 +2624,16 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
node.getUpstreamPlanNodeId().getId(),
remoteInstanceId.toThrift(),
node.getIndexOfUpstreamSinkHandle(),
- context.getInstanceContext()::failed)
+ context.getInstanceContext()::failed,
+ context.getInstanceContext().isHighestPriority())
: MPP_DATA_EXCHANGE_MANAGER.createSourceHandle(
localInstanceId.toThrift(),
node.getPlanNodeId().getId(),
node.getIndexOfUpstreamSinkHandle(),
upstreamEndPoint,
remoteInstanceId.toThrift(),
- context.getInstanceContext()::failed);
+ context.getInstanceContext()::failed,
+ context.getInstanceContext().isHighestPriority());
if (!isSameNode) {
context.addExchangeSumNum(1);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java
index d212f4ca725..b96485199ad 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java
@@ -223,6 +223,7 @@ public class FragmentInstance implements IConsensusRequest {
fragmentInstance.hostDataNode =
hasHostDataNode ?
ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(buffer) : null;
fragmentInstance.isExplainAnalyze = ReadWriteIOUtils.readBool(buffer);
+ fragmentInstance.setHighestPriority(ReadWriteIOUtils.readBool(buffer));
return fragmentInstance;
}
@@ -247,6 +248,7 @@ public class FragmentInstance implements IConsensusRequest {
ThriftCommonsSerDeUtils.serializeTDataNodeLocation(hostDataNode,
outputStream);
}
ReadWriteIOUtils.write(isExplainAnalyze, outputStream);
+ ReadWriteIOUtils.write(isHighestPriority, outputStream);
return ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
} catch (IOException e) {
LOGGER.error("Unexpected error occurs when serializing this
FragmentInstance.", e);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/LocalSinkChannelTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/LocalSinkChannelTest.java
index 8d9c4b9cd65..b3c50aaa517 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/LocalSinkChannelTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/LocalSinkChannelTest.java
@@ -92,13 +92,14 @@ public class LocalSinkChannelTest {
Assert.assertFalse(localSinkChannel.isFinished());
Assert.assertEquals(11 * mockTsBlockSize,
localSinkChannel.getBufferRetainedSizeInBytes());
Mockito.verify(spyMemoryPool, Mockito.times(11))
- .reserve(
+ .reserveWithPriority(
queryId,
FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
remoteFragmentInstanceId),
remotePlanNodeId,
mockTsBlockSize,
- Long.MAX_VALUE);
+ Long.MAX_VALUE,
+ false);
// Receive TsBlocks.
int numOfReceivedTsblocks = 0;
@@ -184,13 +185,14 @@ public class LocalSinkChannelTest {
Assert.assertFalse(localSinkChannel.isFinished());
Assert.assertEquals(11 * mockTsBlockSize,
localSinkChannel.getBufferRetainedSizeInBytes());
Mockito.verify(spyMemoryPool, Mockito.times(11))
- .reserve(
+ .reserveWithPriority(
queryId,
FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
remoteFragmentInstanceId),
remotePlanNodeId,
mockTsBlockSize,
- Long.MAX_VALUE);
+ Long.MAX_VALUE,
+ false);
// Abort.
localSinkChannel.abort();
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueueTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueueTest.java
index 46196d1c990..a95fd35ba76 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueueTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueueTest.java
@@ -21,13 +21,13 @@ package org.apache.iotdb.db.queryengine.execution.exchange;
import org.apache.iotdb.db.queryengine.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.queryengine.execution.memory.MemoryPool;
+import
org.apache.iotdb.db.queryengine.execution.memory.MemoryPool.MemoryReservationResult;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.commons.lang3.Validate;
import org.apache.tsfile.read.common.block.TsBlock;
-import org.apache.tsfile.utils.Pair;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
@@ -62,15 +62,16 @@ public class SharedTsBlockQueueTest {
MemoryPool mockMemoryPool = Mockito.mock(MemoryPool.class);
Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(mockMemoryPool);
- // reserve() returns (manualFuture, false) — simulating memory blocked
+ // reserveWithPriority() returns blocked future and reserve failure.
Mockito.when(
- mockMemoryPool.reserve(
+ mockMemoryPool.reserveWithPriority(
Mockito.anyString(),
Mockito.anyString(),
Mockito.anyString(),
Mockito.anyLong(),
- Mockito.anyLong()))
- .thenReturn(new Pair<>(manualFuture, Boolean.FALSE));
+ Mockito.anyLong(),
+ Mockito.anyBoolean()))
+ .thenReturn(new MemoryReservationResult(manualFuture, false, 1024L));
// tryCancel returns 0 — simulating future already completed (can't cancel)
Mockito.when(mockMemoryPool.tryCancel(Mockito.any())).thenReturn(0L);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/ShuffleSinkHandleTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/ShuffleSinkHandleTest.java
index 489ec5c9ed6..4190c2fa61a 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/ShuffleSinkHandleTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/ShuffleSinkHandleTest.java
@@ -104,13 +104,14 @@ public class ShuffleSinkHandleTest {
Assert.assertFalse(localSinkChannel.isFinished());
Assert.assertEquals(11 * mockTsBlockSize,
localSinkChannel.getBufferRetainedSizeInBytes());
Mockito.verify(spyMemoryPool, Mockito.times(11))
- .reserve(
+ .reserveWithPriority(
queryId,
FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
remoteFragmentInstanceId),
remotePlanNodeId,
mockTsBlockSize,
- Long.MAX_VALUE);
+ Long.MAX_VALUE,
+ false);
// Abort.
shuffleSinkHandle.abort();
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SourceHandleTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SourceHandleTest.java
index ee849337206..40721c93db7 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SourceHandleTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SourceHandleTest.java
@@ -254,13 +254,14 @@ public class SourceHandleTest {
.collect(Collectors.toList()));
try {
Mockito.verify(spyMemoryPool, Mockito.timeout(10_000).times(6))
- .reserve(
+ .reserveWithPriority(
queryId,
FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
localFragmentInstanceId),
localPlanNodeId,
MOCK_TSBLOCK_SIZE,
- maxBytesCanReserve);
+ maxBytesCanReserve,
+ false);
Mockito.verify(mockClient, Mockito.timeout(10_0000).times(1))
.getDataBlock(
Mockito.argThat(
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/Utils.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/Utils.java
index 327d4a34c39..b09498ad949 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/Utils.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/Utils.java
@@ -20,11 +20,11 @@
package org.apache.iotdb.db.queryengine.execution.exchange;
import org.apache.iotdb.db.queryengine.execution.memory.MemoryPool;
+import
org.apache.iotdb.db.queryengine.execution.memory.MemoryPool.MemoryReservationResult;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.tsfile.read.common.block.TsBlock;
import org.apache.tsfile.read.common.block.column.TsBlockSerde;
-import org.apache.tsfile.utils.Pair;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
@@ -68,21 +68,25 @@ public class Utils {
settableFuture.get().set(null);
AtomicReference<Long> reservedBytes = new AtomicReference<>(0L);
Mockito.when(
- mockMemoryPool.reserve(
+ mockMemoryPool.reserveWithPriority(
Mockito.eq(queryId),
Mockito.eq(fragmentInstanceId),
Mockito.eq(planNodeId),
Mockito.anyLong(),
- Mockito.anyLong()))
+ Mockito.anyLong(),
+ Mockito.anyBoolean()))
.thenAnswer(
invocation -> {
long bytesToReserve = invocation.getArgument(3);
if (reservedBytes.get() + bytesToReserve <= capacityInBytes) {
- reservedBytes.updateAndGet(v -> v + (long)
invocation.getArgument(3));
- return new Pair<>(settableFuture.get(), true);
+ reservedBytes.updateAndGet(v -> v + bytesToReserve);
+ return new MemoryReservationResult(settableFuture.get(), true,
bytesToReserve);
} else {
+ if (invocation.getArgument(5)) {
+ return new MemoryReservationResult(settableFuture.get(),
true, 0L);
+ }
settableFuture.set(SettableFuture.create());
- return new Pair<>(settableFuture.get(), false);
+ return new MemoryReservationResult(settableFuture.get(),
false, bytesToReserve);
}
});
Mockito.doAnswer(
@@ -124,13 +128,17 @@ public class Utils {
public static MemoryPool createMockNonBlockedMemoryPool() {
MemoryPool mockMemoryPool = Mockito.mock(MemoryPool.class);
Mockito.when(
- mockMemoryPool.reserve(
+ mockMemoryPool.reserveWithPriority(
Mockito.anyString(),
Mockito.anyString(),
Mockito.anyString(),
Mockito.anyLong(),
- Mockito.anyLong()))
- .thenReturn(new Pair<>(immediateFuture(null), true));
+ Mockito.anyLong(),
+ Mockito.anyBoolean()))
+ .thenAnswer(
+ invocation ->
+ new MemoryReservationResult(
+ immediateFuture(null), true, invocation.getArgument(3)));
Mockito.when(
mockMemoryPool.tryReserve(
Mockito.anyString(),
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPoolTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPoolTest.java
index bae3c0643f4..cf7e15dc71f 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPoolTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPoolTest.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.queryengine.execution.memory;
+import
org.apache.iotdb.db.queryengine.execution.memory.MemoryPool.MemoryReservationResult;
+
import com.google.common.util.concurrent.ListenableFuture;
import org.junit.Assert;
import org.junit.Before;
@@ -94,7 +96,9 @@ public class MemoryPoolTest {
public void testReserve() {
ListenableFuture<Void> future =
- pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L,
Long.MAX_VALUE).left;
+ pool.reserveWithPriority(
+ QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L,
Long.MAX_VALUE, false)
+ .getFuture();
Assert.assertTrue(future.isDone());
Assert.assertEquals(256L, pool.getQueryMemoryReservedBytes(QUERY_ID));
Assert.assertEquals(256L, pool.getReservedBytes());
@@ -104,7 +108,8 @@ public class MemoryPoolTest {
public void tesReserveZero() {
try {
- pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 0L,
Long.MAX_VALUE);
+ pool.reserveWithPriority(
+ QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 0L, Long.MAX_VALUE,
false);
Assert.fail("Expect IllegalArgumentException");
} catch (IllegalArgumentException ignore) {
}
@@ -114,7 +119,8 @@ public class MemoryPoolTest {
public void testReserveNegative() {
try {
- pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, -1L,
Long.MAX_VALUE);
+ pool.reserveWithPriority(
+ QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, -1L, Long.MAX_VALUE,
false);
Assert.fail("Expect IllegalArgumentException");
} catch (IllegalArgumentException ignore) {
}
@@ -124,7 +130,9 @@ public class MemoryPoolTest {
public void testReserveAll() {
ListenableFuture<Void> future =
- pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L,
Long.MAX_VALUE).left;
+ pool.reserveWithPriority(
+ QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L,
Long.MAX_VALUE, false)
+ .getFuture();
Assert.assertTrue(future.isDone());
Assert.assertEquals(512L, pool.getQueryMemoryReservedBytes(QUERY_ID));
Assert.assertEquals(512L, pool.getReservedBytes());
@@ -134,11 +142,15 @@ public class MemoryPoolTest {
public void testOverReserve() {
ListenableFuture<Void> future =
- pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L,
Long.MAX_VALUE).left;
+ pool.reserveWithPriority(
+ QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L,
Long.MAX_VALUE, false)
+ .getFuture();
Assert.assertTrue(future.isDone());
Assert.assertEquals(256L, pool.getQueryMemoryReservedBytes(QUERY_ID));
Assert.assertEquals(256L, pool.getReservedBytes());
- future = pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L,
513L).left;
+ future =
+ pool.reserveWithPriority(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID,
512L, 513L, false)
+ .getFuture();
Assert.assertFalse(future.isDone());
Assert.assertEquals(256L, pool.getQueryMemoryReservedBytes(QUERY_ID));
Assert.assertEquals(256L, pool.getReservedBytes());
@@ -148,11 +160,13 @@ public class MemoryPoolTest {
public void testReserveAndFree() {
Assert.assertTrue(
- pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L,
Long.MAX_VALUE)
- .left
+ pool.reserveWithPriority(
+ QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L,
Long.MAX_VALUE, false)
+ .getFuture()
.isDone());
ListenableFuture<Void> future =
- pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L,
513L).left;
+ pool.reserveWithPriority(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID,
512L, 513L, false)
+ .getFuture();
Assert.assertFalse(future.isDone());
Assert.assertEquals(512L, pool.getQueryMemoryReservedBytes(QUERY_ID));
Assert.assertEquals(512L, pool.getReservedBytes());
@@ -166,18 +180,22 @@ public class MemoryPoolTest {
public void testMultiReserveAndFree() {
Assert.assertTrue(
- pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L,
Long.MAX_VALUE)
- .left
+ pool.reserveWithPriority(
+ QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L,
Long.MAX_VALUE, false)
+ .getFuture()
.isDone());
Assert.assertEquals(256L, pool.getQueryMemoryReservedBytes(QUERY_ID));
Assert.assertEquals(256L, pool.getReservedBytes());
ListenableFuture<Void> future1 =
- pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L,
513L).left;
+ pool.reserveWithPriority(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID,
512L, 513L, false)
+ .getFuture();
ListenableFuture<Void> future2 =
- pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L,
513L).left;
+ pool.reserveWithPriority(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID,
512L, 513L, false)
+ .getFuture();
ListenableFuture<Void> future3 =
- pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L,
513L).left;
+ pool.reserveWithPriority(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID,
512L, 513L, false)
+ .getFuture();
Assert.assertFalse(future1.isDone());
Assert.assertFalse(future2.isDone());
Assert.assertFalse(future3.isDone());
@@ -284,7 +302,8 @@ public class MemoryPoolTest {
pool.tryReserveForTest(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID,
512L, Long.MAX_VALUE));
ListenableFuture<Void> f =
- pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L,
512L).left;
+ pool.reserveWithPriority(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID,
256L, 512L, false)
+ .getFuture();
Assert.assertFalse(f.isDone());
// Cancel the reservation.
Assert.assertEquals(256L, pool.tryCancel(f));
@@ -296,11 +315,76 @@ public class MemoryPoolTest {
public void testTryCancelCompletedReservation() {
ListenableFuture<Void> f =
- pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L,
Long.MAX_VALUE).left;
+ pool.reserveWithPriority(
+ QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L,
Long.MAX_VALUE, false)
+ .getFuture();
Assert.assertTrue(f.isDone());
// Cancel the reservation.
Assert.assertEquals(0L, pool.tryCancel(f));
Assert.assertTrue(f.isDone());
Assert.assertFalse(f.isCancelled());
}
+
+ /**
+ * Normal query: requested bytes exceed what the pool can still provide —
reserve fails (blocked
+ * future, not immediate success).
+ */
+ @Test
+ public void testReserveWithPriorityNormalQueryExceedsAvailable() {
+ MemoryReservationResult r1 =
+ pool.reserveWithPriority(
+ QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L,
Long.MAX_VALUE, false);
+ Assert.assertTrue(r1.isReserveSuccess());
+ Assert.assertEquals(512L, r1.getReservedBytes());
+ Assert.assertTrue(r1.getFuture().isDone());
+
+ MemoryReservationResult r2 =
+ pool.reserveWithPriority(
+ QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L,
Long.MAX_VALUE, false);
+ Assert.assertTrue(r2.isReserveSuccess());
+ Assert.assertEquals(512L, r2.getReservedBytes());
+ Assert.assertEquals(1024L, pool.getReservedBytes());
+
+ MemoryReservationResult r3 =
+ pool.reserveWithPriority(
+ QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L,
Long.MAX_VALUE, false);
+ Assert.assertFalse(r3.isReserveSuccess());
+ Assert.assertEquals(256L, r3.getReservedBytes());
+ Assert.assertFalse(r3.getFuture().isDone());
+ Assert.assertEquals(1024L, pool.getReservedBytes());
+ }
+
+ /** SHOW QUERIES path: exceeds pool capacity — treated as success with zero
bytes from pool. */
+ @Test
+ public void testReserveWithPriorityShowQueriesExceedsAvailable() {
+ Assert.assertTrue(
+ pool.reserveWithPriority(
+ QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L,
Long.MAX_VALUE, false)
+ .isReserveSuccess());
+ Assert.assertTrue(
+ pool.reserveWithPriority(
+ QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L,
Long.MAX_VALUE, false)
+ .isReserveSuccess());
+ Assert.assertEquals(1024L, pool.getReservedBytes());
+
+ MemoryReservationResult r =
+ pool.reserveWithPriority(
+ QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L,
Long.MAX_VALUE, true);
+ Assert.assertTrue(r.isReserveSuccess());
+ Assert.assertEquals(0L, r.getReservedBytes());
+ Assert.assertTrue(r.getFuture().isDone());
+ Assert.assertEquals(1024L, pool.getReservedBytes());
+ }
+
+ /** SHOW QUERIES path: pool has room — same as normal successful reserve. */
+ @Test
+ public void testReserveWithPriorityShowQueriesWithinAvailable() {
+ MemoryReservationResult r =
+ pool.reserveWithPriority(
+ QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L,
Long.MAX_VALUE, true);
+ Assert.assertTrue(r.isReserveSuccess());
+ Assert.assertEquals(256L, r.getReservedBytes());
+ Assert.assertTrue(r.getFuture().isDone());
+ Assert.assertEquals(256L, pool.getReservedBytes());
+ }
}