This is an automated email from the ASF dual-hosted git repository. weihao pushed a commit to branch lwh/SQ-mp in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit be98df9ad39432a11bcf307dfaef64202a1b05da Author: Weihao Li <[email protected]> AuthorDate: Thu Apr 16 15:20:43 2026 +0800 draft Signed-off-by: Weihao Li <[email protected]> --- .../db/queryengine/common/MPPQueryContext.java | 14 ++++ .../execution/exchange/MPPDataExchangeManager.java | 53 ++++++++++++- .../execution/exchange/SharedTsBlockQueue.java | 55 ++++++++----- .../execution/exchange/sink/SinkChannel.java | 61 +++++++++++---- .../execution/exchange/source/SourceHandle.java | 89 ++++++++++++++++------ .../fragment/FragmentInstanceContext.java | 13 ++++ .../fragment/FragmentInstanceManager.java | 2 + .../queryengine/execution/memory/MemoryPool.java | 58 ++++++++++++-- .../queryengine/plan/execution/QueryExecution.java | 7 +- .../plan/planner/OperatorTreeGenerator.java | 6 +- .../plan/planner/TableOperatorGenerator.java | 6 +- 11 files changed, 293 insertions(+), 71 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java index 88bd1998f68..4da4810f76a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java @@ -129,6 +129,12 @@ public class MPPQueryContext implements IAuditEntity { private boolean userQuery = false; + /** + * When true (e.g. SHOW QUERIES), operator and exchange memory may use fallback when pool is + * insufficient. Set from analysis via {@link #setNeedSetHighestPriority(boolean)}. + */ + private boolean needSetHighestPriority = false; + private boolean debug = false; private Map<NodeRef<Table>, Query> cteQueries = new HashMap<>(); @@ -507,6 +513,14 @@ public class MPPQueryContext implements IAuditEntity { this.userQuery = userQuery; } + public boolean needSetHighestPriority() { + return needSetHighestPriority; + } + + public void setNeedSetHighestPriority(boolean needSetHighestPriority) { + this.needSetHighestPriority = needSetHighestPriority; + } + public boolean isDebug() { return debug; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java index d2caf330d66..f72bb12f7f0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java @@ -656,7 +656,11 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { } queue = new SharedTsBlockQueue( - localFragmentInstanceId, localPlanNodeId, localMemoryManager, executorService); + localFragmentInstanceId, + localPlanNodeId, + localMemoryManager, + executorService, + instanceContext.isHighestPriority()); } return new LocalSinkChannel( @@ -680,7 +684,8 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { driverContext.getDriverTaskID().getFragmentInstanceId().toThrift(), planNodeId, localMemoryManager, - executorService); + executorService, + driverContext.getFragmentInstanceContext().isHighestPriority()); queue.allowAddingTsBlock(); return new LocalSinkChannel( queue, @@ -718,6 +723,7 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { tsBlockSerdeFactory.get(), new ISinkChannelListenerImpl( localFragmentInstanceId, instanceContext, instanceContext::failed, cnt), + instanceContext.isHighestPriority(), mppDataExchangeServiceClientManager); } @@ -809,6 +815,24 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { TFragmentInstanceId remoteFragmentInstanceId, int index, IMPPDataExchangeManagerCallback<Throwable> onFailureCallback) { + return createLocalSourceHandleForFragment( + localFragmentInstanceId, + localPlanNodeId, + remotePlanNodeId, + remoteFragmentInstanceId, + index, + onFailureCallback, + false); + } + + public synchronized ISourceHandle createLocalSourceHandleForFragment( + TFragmentInstanceId localFragmentInstanceId, + String localPlanNodeId, + String remotePlanNodeId, + TFragmentInstanceId remoteFragmentInstanceId, + int index, + IMPPDataExchangeManagerCallback<Throwable> onFailureCallback, + boolean isHighestPriority) { if (sourceHandles.containsKey(localFragmentInstanceId) && sourceHandles.get(localFragmentInstanceId).containsKey(localPlanNodeId)) { throw new IllegalStateException( @@ -840,7 +864,11 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { } queue = new SharedTsBlockQueue( - remoteFragmentInstanceId, remotePlanNodeId, localMemoryManager, executorService); + remoteFragmentInstanceId, + remotePlanNodeId, + localMemoryManager, + executorService, + isHighestPriority); } LocalSourceHandle localSourceHandle = new LocalSourceHandle( @@ -862,6 +890,24 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { TEndPoint remoteEndpoint, TFragmentInstanceId remoteFragmentInstanceId, IMPPDataExchangeManagerCallback<Throwable> onFailureCallback) { + return createSourceHandle( + localFragmentInstanceId, + localPlanNodeId, + indexOfUpstreamSinkHandle, + remoteEndpoint, + remoteFragmentInstanceId, + onFailureCallback, + false); + } + + public ISourceHandle createSourceHandle( + TFragmentInstanceId localFragmentInstanceId, + String localPlanNodeId, + int indexOfUpstreamSinkHandle, + TEndPoint remoteEndpoint, + TFragmentInstanceId remoteFragmentInstanceId, + IMPPDataExchangeManagerCallback<Throwable> onFailureCallback, + boolean isHighestPriority) { Map<String, ISourceHandle> sourceHandleMap = sourceHandles.get(localFragmentInstanceId); if (sourceHandleMap != null && sourceHandleMap.containsKey(localPlanNodeId)) { throw new IllegalStateException( @@ -891,6 +937,7 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { executorService, tsBlockSerdeFactory.get(), new SourceHandleListenerImpl(onFailureCallback), + isHighestPriority, mppDataExchangeServiceClientManager); sourceHandles .computeIfAbsent(localFragmentInstanceId, key -> new ConcurrentHashMap<>()) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java index e49efabb986..ad68956a98e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java @@ -24,6 +24,7 @@ import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; import org.apache.iotdb.db.queryengine.execution.exchange.sink.LocalSinkChannel; import org.apache.iotdb.db.queryengine.execution.exchange.source.LocalSourceHandle; import org.apache.iotdb.db.queryengine.execution.memory.LocalMemoryManager; +import org.apache.iotdb.db.queryengine.execution.memory.MemoryPool.MemoryReservationResult; import org.apache.iotdb.db.utils.CommonUtils; import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId; @@ -63,7 +64,7 @@ public class SharedTsBlockQueue { private long bufferRetainedSizeInBytes = 0L; - private final Queue<TsBlock> queue = new LinkedList<>(); + private final Queue<Pair<TsBlock, Long>> queue = new LinkedList<>(); private SettableFuture<Void> blocked = SettableFuture.create(); @@ -83,6 +84,7 @@ public class SharedTsBlockQueue { private long maxBytesCanReserve = IoTDBDescriptor.getInstance().getMemoryConfig().getMaxBytesPerFragmentInstance(); + private final boolean isHighestPriority; private volatile Throwable abortedCause = null; @@ -94,6 +96,15 @@ public class SharedTsBlockQueue { String planNodeId, LocalMemoryManager localMemoryManager, ExecutorService executorService) { + this(fragmentInstanceId, planNodeId, localMemoryManager, executorService, false); + } + + public SharedTsBlockQueue( + TFragmentInstanceId fragmentInstanceId, + String planNodeId, + LocalMemoryManager localMemoryManager, + ExecutorService executorService, + boolean isHighestPriority) { this.localFragmentInstanceId = Validate.notNull(fragmentInstanceId, "fragment instance ID cannot be null"); this.fullFragmentInstanceId = @@ -102,6 +113,7 @@ public class SharedTsBlockQueue { this.localMemoryManager = Validate.notNull(localMemoryManager, "local memory manager cannot be null"); this.executorService = Validate.notNull(executorService, "ExecutorService can not be null."); + this.isHighestPriority = isHighestPriority; } public boolean hasNoMoreTsBlocks() { @@ -196,15 +208,18 @@ public class SharedTsBlockQueue { } throw new IllegalStateException("queue has been destroyed"); } - TsBlock tsBlock = queue.remove(); - localMemoryManager - .getQueryPool() - .free( - localFragmentInstanceId.getQueryId(), - fullFragmentInstanceId, - localPlanNodeId, - tsBlock.getSizeInBytes()); - bufferRetainedSizeInBytes -= tsBlock.getSizeInBytes(); + Pair<TsBlock, Long> tsBlockWithReservedBytes = queue.remove(); + long reservedBytes = tsBlockWithReservedBytes.right; + if (reservedBytes > 0) { + localMemoryManager + .getQueryPool() + .free( + localFragmentInstanceId.getQueryId(), + fullFragmentInstanceId, + localPlanNodeId, + reservedBytes); + bufferRetainedSizeInBytes -= reservedBytes; + } // Every time LocalSourceHandle consumes a TsBlock, it needs to send the event // to // corresponding LocalSinkChannel. @@ -214,7 +229,7 @@ public class SharedTsBlockQueue { if (blocked.isDone() && queue.isEmpty() && !noMoreTsBlocks) { blocked = SettableFuture.create(); } - return tsBlock; + return tsBlockWithReservedBytes.left; } /** @@ -240,20 +255,22 @@ public class SharedTsBlockQueue { localFragmentInstanceId.queryId, fullFragmentInstanceId, localPlanNodeId); alreadyRegistered = true; } - Pair<ListenableFuture<Void>, Boolean> pair = + MemoryReservationResult reserveResult = localMemoryManager .getQueryPool() - .reserve( + .reserveWithPriority( localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId, tsBlock.getSizeInBytes(), - maxBytesCanReserve); - blockedOnMemory = pair.left; - bufferRetainedSizeInBytes += tsBlock.getSizeInBytes(); + maxBytesCanReserve, + isHighestPriority); + blockedOnMemory = reserveResult.getFuture(); + long reservedBytes = reserveResult.getReservedBytes(); + bufferRetainedSizeInBytes += reservedBytes; // reserve memory failed, we should wait until there is enough memory - if (!Boolean.TRUE.equals(pair.right)) { + if (!reserveResult.isReserveSuccess()) { SettableFuture<Void> channelBlocked = SettableFuture.create(); blockedOnMemory.addListener( () -> { @@ -268,7 +285,7 @@ public class SharedTsBlockQueue { channelBlocked.set(null); return; } - queue.add(tsBlock); + queue.add(new Pair<>(tsBlock, reservedBytes)); if (!blocked.isDone()) { blocked.set(null); } @@ -285,7 +302,7 @@ public class SharedTsBlockQueue { executorService); return channelBlocked; } else { // reserve memory succeeded, add the TsBlock directly - queue.add(tsBlock); + queue.add(new Pair<>(tsBlock, reservedBytes)); if (!blocked.isDone()) { blocked.set(null); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java index 9cb624e4d96..4e124c2c478 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java @@ -28,6 +28,7 @@ import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; import org.apache.iotdb.db.queryengine.exception.exchange.GetTsBlockFromClosedOrAbortedChannelException; import org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.SinkListener; import org.apache.iotdb.db.queryengine.execution.memory.LocalMemoryManager; +import org.apache.iotdb.db.queryengine.execution.memory.MemoryPool.MemoryReservationResult; import org.apache.iotdb.db.queryengine.metric.DataExchangeCostMetricSet; import org.apache.iotdb.db.queryengine.metric.DataExchangeCountMetricSet; import org.apache.iotdb.db.utils.SetThreadName; @@ -119,6 +120,8 @@ public class SinkChannel implements ISinkChannel { private long maxBytesCanReserve = IoTDBDescriptor.getInstance().getMemoryConfig().getMaxBytesPerFragmentInstance(); + private final boolean isHighestPriority; + private static final DataExchangeCostMetricSet DATA_EXCHANGE_COST_METRIC_SET = DataExchangeCostMetricSet.getInstance(); private static final DataExchangeCountMetricSet DATA_EXCHANGE_COUNT_METRIC_SET = @@ -141,6 +144,34 @@ public class SinkChannel implements ISinkChannel { SinkListener sinkListener, IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> mppDataExchangeServiceClientManager) { + this( + remoteEndpoint, + remoteFragmentInstanceId, + remotePlanNodeId, + localPlanNodeId, + localFragmentInstanceId, + localMemoryManager, + executorService, + serde, + sinkListener, + false, + mppDataExchangeServiceClientManager); + } + + @SuppressWarnings("squid:S107") + public SinkChannel( + TEndPoint remoteEndpoint, + TFragmentInstanceId remoteFragmentInstanceId, + String remotePlanNodeId, + String localPlanNodeId, + TFragmentInstanceId localFragmentInstanceId, + LocalMemoryManager localMemoryManager, + ExecutorService executorService, + TsBlockSerde serde, + SinkListener sinkListener, + boolean isHighestPriority, + IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> + mppDataExchangeServiceClientManager) { this.remoteEndpoint = Validate.notNull(remoteEndpoint, "remoteEndPoint can not be null."); this.remoteFragmentInstanceId = Validate.notNull(remoteFragmentInstanceId, "remoteFragmentInstanceId can not be null."); @@ -155,6 +186,7 @@ public class SinkChannel implements ISinkChannel { this.executorService = Validate.notNull(executorService, "executorService can not be null."); this.serde = Validate.notNull(serde, "serde can not be null."); this.sinkListener = Validate.notNull(sinkListener, "sinkListener can not be null."); + this.isHighestPriority = isHighestPriority; this.mppDataExchangeServiceClientManager = mppDataExchangeServiceClientManager; this.retryIntervalInMs = DEFAULT_RETRY_INTERVAL_IN_MS; this.threadName = @@ -204,21 +236,22 @@ public class SinkChannel implements ISinkChannel { long sizeInBytes = tsBlock.getSizeInBytes(); int startSequenceId; startSequenceId = nextSequenceId; - blocked = + MemoryReservationResult reserveResult = localMemoryManager .getQueryPool() - .reserve( + .reserveWithPriority( localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId, sizeInBytes, - maxBytesCanReserve) - .left; - bufferRetainedSizeInBytes += sizeInBytes; + maxBytesCanReserve, + isHighestPriority); + blocked = reserveResult.getFuture(); + bufferRetainedSizeInBytes += reserveResult.getReservedBytes(); sequenceIdToTsBlock.put(nextSequenceId, new Pair<>(tsBlock, currentTsBlockSize)); nextSequenceId += 1; - currentTsBlockSize = sizeInBytes; + currentTsBlockSize = reserveResult.getReservedBytes(); submitSendNewDataBlockEventTask(startSequenceId, ImmutableList.of(sizeInBytes)); } finally { @@ -434,19 +467,21 @@ public class SinkChannel implements ISinkChannel { return; } // SinkChannel is opened when ShuffleSinkHandle choose it as the next channel - this.blocked = + MemoryReservationResult reserveResult = localMemoryManager .getQueryPool() - .reserve( + .reserveWithPriority( localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId, DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, - maxBytesCanReserve) // actually we only know maxBytesCanReserve after - // the handle is created, so we use DEFAULT here. It is ok to use DEFAULT here because - // at first this SinkChannel has not reserved memory. - .left; - this.bufferRetainedSizeInBytes = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; + maxBytesCanReserve, + isHighestPriority); // actually we only know maxBytesCanReserve after + // the handle is created, so we use DEFAULT here. It is ok to use DEFAULT here because + // at first this SinkChannel has not reserved memory. + this.blocked = reserveResult.getFuture(); + this.bufferRetainedSizeInBytes = reserveResult.getReservedBytes(); + this.currentTsBlockSize = maxBytesCanReserve; } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/SourceHandle.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/SourceHandle.java index 1c6406a2ed9..ec29131af22 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/SourceHandle.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/SourceHandle.java @@ -27,6 +27,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; import org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.SourceHandleListener; import org.apache.iotdb.db.queryengine.execution.memory.LocalMemoryManager; +import org.apache.iotdb.db.queryengine.execution.memory.MemoryPool.MemoryReservationResult; import org.apache.iotdb.db.queryengine.metric.DataExchangeCostMetricSet; import org.apache.iotdb.db.queryengine.metric.DataExchangeCountMetricSet; import org.apache.iotdb.db.utils.SetThreadName; @@ -116,6 +117,8 @@ public class SourceHandle implements ISourceHandle { */ private boolean canGetTsBlockFromRemote = false; + private final boolean isHighestPriority; + private static final DataExchangeCostMetricSet DATA_EXCHANGE_COST_METRIC_SET = DataExchangeCostMetricSet.getInstance(); private static final DataExchangeCountMetricSet DATA_EXCHANGE_COUNT_METRIC_SET = @@ -138,6 +141,34 @@ public class SourceHandle implements ISourceHandle { SourceHandleListener sourceHandleListener, IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> mppDataExchangeServiceClientManager) { + this( + remoteEndpoint, + remoteFragmentInstanceId, + localFragmentInstanceId, + localPlanNodeId, + indexOfUpstreamSinkHandle, + localMemoryManager, + executorService, + serde, + sourceHandleListener, + false, + mppDataExchangeServiceClientManager); + } + + @SuppressWarnings("squid:S107") + public SourceHandle( + TEndPoint remoteEndpoint, + TFragmentInstanceId remoteFragmentInstanceId, + TFragmentInstanceId localFragmentInstanceId, + String localPlanNodeId, + int indexOfUpstreamSinkHandle, + LocalMemoryManager localMemoryManager, + ExecutorService executorService, + TsBlockSerde serde, + SourceHandleListener sourceHandleListener, + boolean isHighestPriority, + IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> + mppDataExchangeServiceClientManager) { this.remoteEndpoint = Validate.notNull(remoteEndpoint, "remoteEndpoint can not be null."); this.remoteFragmentInstanceId = Validate.notNull(remoteFragmentInstanceId, "remoteFragmentInstanceId can not be null."); @@ -153,6 +184,7 @@ public class SourceHandle implements ISourceHandle { this.serde = Validate.notNull(serde, "serde can not be null."); this.sourceHandleListener = Validate.notNull(sourceHandleListener, "sourceHandleListener can not be null."); + this.isHighestPriority = isHighestPriority; this.bufferRetainedSizeInBytes = 0L; this.mppDataExchangeServiceClientManager = mppDataExchangeServiceClientManager; this.retryIntervalInMs = DEFAULT_RETRY_INTERVAL_IN_MS; @@ -193,19 +225,24 @@ public class SourceHandle implements ISourceHandle { if (tsBlock == null) { return null; } - long retainedSize = sequenceIdToDataBlockSize.remove(currSequenceId); + Long retainedSize = sequenceIdToDataBlockSize.remove(currSequenceId); + if (retainedSize == null) { + throw new IllegalStateException("Reserved data block size is null."); + } if (LOGGER.isDebugEnabled()) { LOGGER.debug("[GetTsBlockFromBuffer] sequenceId:{}, size:{}", currSequenceId, retainedSize); } currSequenceId += 1; - bufferRetainedSizeInBytes -= retainedSize; - localMemoryManager - .getQueryPool() - .free( - localFragmentInstanceId.getQueryId(), - fullFragmentInstanceId, - localPlanNodeId, - retainedSize); + if (retainedSize > 0) { + bufferRetainedSizeInBytes -= retainedSize; + localMemoryManager + .getQueryPool() + .free( + localFragmentInstanceId.getQueryId(), + fullFragmentInstanceId, + localPlanNodeId, + retainedSize); + } if (sequenceIdToTsBlock.isEmpty() && !isFinished()) { if (LOGGER.isDebugEnabled()) { @@ -242,18 +279,24 @@ public class SourceHandle implements ISourceHandle { if (bytesToReserve == null) { throw new IllegalStateException("Data block size is null."); } - pair = + MemoryReservationResult reserveResult = localMemoryManager .getQueryPool() - .reserve( + .reserveWithPriority( localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId, bytesToReserve, - maxBytesCanReserve); - bufferRetainedSizeInBytes += bytesToReserve; + maxBytesCanReserve, + isHighestPriority); + pair = new Pair<>(reserveResult.getFuture(), reserveResult.isReserveSuccess()); + // actually reserve size is not equals raw size, update the actually reserve size to the map + if (reserveResult.getReservedBytes() != bytesToReserve) { + sequenceIdToDataBlockSize.put(endSequenceId, reserveResult.getReservedBytes()); + } + bufferRetainedSizeInBytes += reserveResult.getReservedBytes(); endSequenceId += 1; - reservedBytes += bytesToReserve; + reservedBytes += reserveResult.getReservedBytes(); if (!Boolean.TRUE.equals(pair.right)) { blockedSize = bytesToReserve; break; @@ -631,14 +674,16 @@ public class SourceHandle implements ISourceHandle { if (aborted || closed) { return; } - bufferRetainedSizeInBytes -= reservedBytes; - localMemoryManager - .getQueryPool() - .free( - localFragmentInstanceId.getQueryId(), - fullFragmentInstanceId, - localPlanNodeId, - reservedBytes); + if (reservedBytes > 0) { + bufferRetainedSizeInBytes -= reservedBytes; + localMemoryManager + .getQueryPool() + .free( + localFragmentInstanceId.getQueryId(), + fullFragmentInstanceId, + localPlanNodeId, + reservedBytes); + } sourceHandleListener.onFailure(SourceHandle.this, t); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java index df9ac2d2f93..4cfd8dc63ec 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java @@ -161,6 +161,7 @@ public class FragmentInstanceContext extends QueryContext { private long unclosedUnseqFileNum = 0; private long closedSeqFileNum = 0; private long closedUnseqFileNum = 0; + private boolean highestPriority = false; public static FragmentInstanceContext createFragmentInstanceContext( FragmentInstanceId id, @@ -1190,6 +1191,18 @@ public class FragmentInstanceContext extends QueryContext { return ignoreNotExistsDevice; } + /** + * Same flag as {@link + * org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis#needSetHighestPriority()}. + */ + public boolean isHighestPriority() { + return highestPriority; + } + + public void setHighestPriority(boolean highestPriority) { + this.highestPriority = highestPriority; + } + public boolean isSingleSourcePath() { return singleSourcePath; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java index 1898cbfe53c..9dfb2ffa684 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java @@ -163,6 +163,7 @@ public class FragmentInstanceManager { dataNodeQueryContextMap, instance.isDebug(), instance.isVerbose())); + context.setHighestPriority(instance.isHighestPriority()); try { List<PipelineDriverFactory> driverFactories = @@ -277,6 +278,7 @@ public class FragmentInstanceManager { instance.getSessionInfo(), instance.isDebug(), instance.isVerbose())); + context.setHighestPriority(instance.isHighestPriority()); try { List<PipelineDriverFactory> driverFactories = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java index 3e00c845dab..6afa7f692ef 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java @@ -26,7 +26,6 @@ import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.exception.runtime.MemoryLeakException; import com.google.common.util.concurrent.AbstractFuture; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import org.apache.tsfile.external.commons.lang3.Validate; import org.apache.tsfile.utils.Pair; @@ -43,6 +42,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.stream.Collectors; +import static com.google.common.util.concurrent.Futures.immediateVoidFuture; + /** A thread-safe memory pool. */ public class MemoryPool { @@ -113,6 +114,31 @@ public class MemoryPool { } } + public static class MemoryReservationResult { + private final ListenableFuture<Void> future; + private final boolean reserveSuccess; + private final long reservedBytes; + + public MemoryReservationResult( + ListenableFuture<Void> future, boolean reserveSuccess, long reservedBytes) { + this.future = future; + this.reserveSuccess = reserveSuccess; + this.reservedBytes = reservedBytes; + } + + public ListenableFuture<Void> getFuture() { + return future; + } + + public boolean isReserveSuccess() { + return reserveSuccess; + } + + public long getReservedBytes() { + return reservedBytes; + } + } + private final String id; private final IMemoryBlock memoryBlock; private final long maxBytesPerFragmentInstance; @@ -236,6 +262,19 @@ public class MemoryPool { String planNodeId, long bytesToReserve, long maxBytesCanReserve) { + MemoryReservationResult result = + reserveWithPriority( + queryId, fragmentInstanceId, planNodeId, bytesToReserve, maxBytesCanReserve, false); + return new Pair<>(result.getFuture(), result.isReserveSuccess()); + } + + public MemoryReservationResult reserveWithPriority( + String queryId, + String fragmentInstanceId, + String planNodeId, + long bytesToReserve, + long maxBytesCanReserve, + boolean needSetHighestPriority) { Validate.notNull(queryId, "queryId can not be null."); Validate.notNull(fragmentInstanceId, "fragmentInstanceId can not be null."); Validate.notNull(planNodeId, "planNodeId can not be null."); @@ -256,19 +295,21 @@ public class MemoryPool { bytesToReserve, maxBytesCanReserve)); } - ListenableFuture<Void> result; if (tryReserve(queryId, fragmentInstanceId, planNodeId, bytesToReserve, maxBytesCanReserve)) { - result = Futures.immediateFuture(null); - return new Pair<>(result, Boolean.TRUE); + return new MemoryReservationResult(immediateVoidFuture(), true, bytesToReserve); } else { + rollbackReserve(queryId, fragmentInstanceId, planNodeId, bytesToReserve); + if (needSetHighestPriority) { + // SHOW QUERIES etc.: treat as success with zero bytes reserved from pool when insufficient. + return new MemoryReservationResult(immediateVoidFuture(), true, 0L); + } LOGGER.debug( "Blocked reserve request: {} bytes memory for planNodeId{}", bytesToReserve, planNodeId); - rollbackReserve(queryId, fragmentInstanceId, planNodeId, bytesToReserve); - result = + ListenableFuture<Void> result = MemoryReservationFuture.create( queryId, fragmentInstanceId, planNodeId, bytesToReserve, maxBytesCanReserve); memoryReservationFutures.add((MemoryReservationFuture<Void>) result); - return new Pair<>(result, Boolean.FALSE); + return new MemoryReservationResult(result, false, bytesToReserve); } } @@ -299,7 +340,8 @@ public class MemoryPool { /** * Cancel the specified memory reservation. If the reservation has finished, do nothing. * - * @param future The future returned from {@link #reserve(String, String, String, long, long)} + * @param future The future returned from {@link #reserveWithPriority(String, String, String, + * long, long, boolean)} * @return If the future has not complete, return the number of bytes being reserved. Otherwise, * return 0. */ diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java index 7500fbb9c34..788b1543046 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java @@ -129,6 +129,7 @@ public class QueryExecution implements IQueryExecution { this.context = context; this.planner = planner; this.analysis = analyze(context); + context.setNeedSetHighestPriority(analysis.needSetHighestPriority()); this.stateMachine = new QueryStateMachine(context.getQueryId(), executor); // We add the abort logic inside the QueryExecution. @@ -610,7 +611,8 @@ public class QueryExecution implements IQueryExecution { context.getResultNodeContext().getUpStreamPlanNodeId().getId(), context.getResultNodeContext().getUpStreamFragmentInstanceId().toThrift(), 0, // Upstream of result ExchangeNode will only have one child. - stateMachine::transitionToFailed) + stateMachine::transitionToFailed, + context.needSetHighestPriority()) : MPPDataExchangeService.getInstance() .getMPPDataExchangeManager() .createSourceHandle( @@ -619,7 +621,8 @@ public class QueryExecution implements IQueryExecution { 0, upstreamEndPoint, context.getResultNodeContext().getUpStreamFragmentInstanceId().toThrift(), - stateMachine::transitionToFailed); + stateMachine::transitionToFailed, + context.needSetHighestPriority()); } @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java index 422454c11b6..0e32dfd097e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java @@ -2694,14 +2694,16 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP node.getUpstreamPlanNodeId().getId(), remoteInstanceId.toThrift(), node.getIndexOfUpstreamSinkHandle(), - context.getInstanceContext()::failed) + context.getInstanceContext()::failed, + context.getInstanceContext().isHighestPriority()) : MPP_DATA_EXCHANGE_MANAGER.createSourceHandle( localInstanceId.toThrift(), node.getPlanNodeId().getId(), node.getIndexOfUpstreamSinkHandle(), upstreamEndPoint, remoteInstanceId.toThrift(), - context.getInstanceContext()::failed); + context.getInstanceContext()::failed, + context.getInstanceContext().isHighestPriority()); if (!isSameNode) { context.addExchangeSumNum(1); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java index c31a2061f49..f1bb43abed4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java @@ -454,14 +454,16 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution node.getUpstreamPlanNodeId().getId(), remoteInstanceId.toThrift(), node.getIndexOfUpstreamSinkHandle(), - context.getInstanceContext()::failed) + context.getInstanceContext()::failed, + context.getInstanceContext().isHighestPriority()) : MPP_DATA_EXCHANGE_MANAGER.createSourceHandle( localInstanceId.toThrift(), node.getPlanNodeId().getId(), node.getIndexOfUpstreamSinkHandle(), upstreamEndPoint, remoteInstanceId.toThrift(), - context.getInstanceContext()::failed); + context.getInstanceContext()::failed, + context.getInstanceContext().isHighestPriority()); if (!isSameNode) { context.addExchangeSumNum(1); }
