This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 0cc841b7639 Fix close method of TopOperator and make close method of
Operator an interface
0cc841b7639 is described below
commit 0cc841b7639be022c54f6e335744a7528f8c025e
Author: Liao Lanyu <[email protected]>
AuthorDate: Thu Nov 30 18:29:59 2023 +0800
Fix close method of TopOperator and make close method of Operator an
interface
---
.../db/queryengine/execution/driver/Driver.java | 4 --
.../execution/exchange/sink/LocalSinkChannel.java | 39 ++++++++----
.../execution/exchange/sink/ShuffleSinkHandle.java | 27 +++++---
.../execution/exchange/sink/SinkChannel.java | 73 ++++++++++++++--------
.../exchange/source/LocalSourceHandle.java | 15 +++--
.../execution/exchange/source/SourceHandle.java | 20 +++---
.../fragment/FragmentInstanceExecution.java | 3 +-
.../queryengine/execution/memory/MemoryPool.java | 6 +-
.../queryengine/execution/operator/Operator.java | 12 +++-
.../execution/operator/process/TopKOperator.java | 10 +++
.../operator/schema/SchemaFetchScanOperator.java | 5 ++
.../source/AbstractDataSourceOperator.java | 5 ++
.../operator/source/LastCacheScanOperator.java | 5 ++
.../operator/source/ShowQueriesOperator.java | 5 ++
.../schedule/DriverTaskTimeoutSentinelThread.java | 2 +-
.../execution/operator/FillOperatorTest.java | 15 +++++
.../operator/LastQueryMergeOperatorTest.java | 20 ++++++
.../execution/operator/LinearFillOperatorTest.java | 35 +++++++++++
18 files changed, 230 insertions(+), 71 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java
index 45b897d9c60..61b3a634e82 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java
@@ -115,10 +115,6 @@ public abstract class Driver implements IDriver {
/** release resource this driver used. */
protected abstract void releaseResource();
- public int getDependencyDriverIndex() {
- return driverContext.getDependencyDriverIndex();
- }
-
@Override
public ListenableFuture<?> processFor(Duration duration) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/LocalSinkChannel.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/LocalSinkChannel.java
index d23f7c04450..2fe94bbeb51 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/LocalSinkChannel.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/LocalSinkChannel.java
@@ -57,8 +57,8 @@ public class LocalSinkChannel implements ISinkChannel {
DataExchangeCostMetricSet.getInstance();
public LocalSinkChannel(SharedTsBlockQueue queue, SinkListener sinkListener)
{
- this.sinkListener = Validate.notNull(sinkListener);
- this.queue = Validate.notNull(queue);
+ this.sinkListener = Validate.notNull(sinkListener, "sinkListener can not
be null.");
+ this.queue = Validate.notNull(queue, "queue can not be null.");
this.queue.setSinkChannel(this);
blocked = queue.getCanAddTsBlock();
}
@@ -67,9 +67,10 @@ public class LocalSinkChannel implements ISinkChannel {
TFragmentInstanceId localFragmentInstanceId,
SharedTsBlockQueue queue,
SinkListener sinkListener) {
- this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
- this.sinkListener = Validate.notNull(sinkListener);
- this.queue = Validate.notNull(queue);
+ this.localFragmentInstanceId =
+ Validate.notNull(localFragmentInstanceId, "localFragmentInstanceId can
not be null.");
+ this.sinkListener = Validate.notNull(sinkListener, "sinkListener can not
be null.");
+ this.queue = Validate.notNull(queue, "queue can not be null.");
this.queue.setSinkChannel(this);
// SinkChannel can send data after SourceHandle asks it to
blocked = queue.getCanAddTsBlock();
@@ -140,7 +141,9 @@ public class LocalSinkChannel implements ISinkChannel {
if (queue.hasNoMoreTsBlocks()) {
return;
}
- LOGGER.debug("[StartSendTsBlockOnLocal]");
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("[StartSendTsBlockOnLocal]");
+ }
synchronized (this) {
blocked = queue.add(tsBlock);
}
@@ -155,7 +158,9 @@ public class LocalSinkChannel implements ISinkChannel {
public void setNoMoreTsBlocks() {
synchronized (queue) {
synchronized (this) {
- LOGGER.debug("[StartSetNoMoreTsBlocksOnLocal]");
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("[StartSetNoMoreTsBlocksOnLocal]");
+ }
if (aborted || closed) {
return;
}
@@ -164,12 +169,16 @@ public class LocalSinkChannel implements ISinkChannel {
}
}
checkAndInvokeOnFinished();
- LOGGER.debug("[EndSetNoMoreTsBlocksOnLocal]");
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("[EndSetNoMoreTsBlocksOnLocal]");
+ }
}
@Override
public void abort() {
- LOGGER.debug("[StartAbortLocalSinkChannel]");
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("[StartAbortLocalSinkChannel]");
+ }
synchronized (queue) {
synchronized (this) {
if (aborted || closed) {
@@ -184,12 +193,16 @@ public class LocalSinkChannel implements ISinkChannel {
}
}
}
- LOGGER.debug("[EndAbortLocalSinkChannel]");
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("[EndAbortLocalSinkChannel]");
+ }
}
@Override
public void close() {
- LOGGER.debug("[StartCloseLocalSinkChannel]");
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("[StartCloseLocalSinkChannel]");
+ }
synchronized (queue) {
synchronized (this) {
if (aborted || closed) {
@@ -203,7 +216,9 @@ public class LocalSinkChannel implements ISinkChannel {
}
}
}
- LOGGER.debug("[EndCloseLocalSinkChannel]");
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("[EndCloseLocalSinkChannel]");
+ }
}
public SharedTsBlockQueue getSharedTsBlockQueue() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ShuffleSinkHandle.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ShuffleSinkHandle.java
index 234fe2dbecb..7c31ab67e04 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ShuffleSinkHandle.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ShuffleSinkHandle.java
@@ -75,10 +75,13 @@ public class ShuffleSinkHandle implements ISinkHandle {
DownStreamChannelIndex downStreamChannelIndex,
ShuffleStrategyEnum shuffleStrategyEnum,
MPPDataExchangeManager.SinkListener sinkListener) {
- this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
- this.downStreamChannelList = Validate.notNull(downStreamChannelList);
- this.downStreamChannelIndex = Validate.notNull(downStreamChannelIndex);
- this.sinkListener = Validate.notNull(sinkListener);
+ this.localFragmentInstanceId =
+ Validate.notNull(localFragmentInstanceId, "localFragmentInstanceId can
not be null.");
+ this.downStreamChannelList =
+ Validate.notNull(downStreamChannelList, "downStreamChannelList can not
be null.");
+ this.downStreamChannelIndex =
+ Validate.notNull(downStreamChannelIndex, "downStreamChannelIndex can
not be null.");
+ this.sinkListener = Validate.notNull(sinkListener, "sinkListener can not
be null.");
this.channelNum = downStreamChannelList.size();
this.shuffleStrategy = getShuffleStrategy(shuffleStrategyEnum);
this.hasSetNoMoreTsBlocks = new boolean[channelNum];
@@ -186,7 +189,9 @@ public class ShuffleSinkHandle implements ISinkHandle {
return;
}
aborted = true;
- LOGGER.debug("[StartAbortShuffleSinkHandle]");
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("[StartAbortShuffleSinkHandle]");
+ }
boolean meetError = false;
Exception firstException = null;
for (ISink channel : downStreamChannelList) {
@@ -203,7 +208,9 @@ public class ShuffleSinkHandle implements ISinkHandle {
LOGGER.warn("Error occurred when try to abort channel.", firstException);
}
sinkListener.onAborted(this);
- LOGGER.debug("[EndAbortShuffleSinkHandle]");
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("[EndAbortShuffleSinkHandle]");
+ }
}
// Add synchronized on this method may lead to Dead Lock
@@ -216,7 +223,9 @@ public class ShuffleSinkHandle implements ISinkHandle {
return;
}
closed = true;
- LOGGER.debug("[StartCloseShuffleSinkHandle]");
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("[StartCloseShuffleSinkHandle]");
+ }
boolean meetError = false;
Exception firstException = null;
for (ISink channel : downStreamChannelList) {
@@ -233,7 +242,9 @@ public class ShuffleSinkHandle implements ISinkHandle {
LOGGER.warn("Error occurred when try to close channel.", firstException);
}
sinkListener.onFinish(this);
- LOGGER.debug("[EndCloseShuffleSinkHandle]");
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("[EndCloseShuffleSinkHandle]");
+ }
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java
index 0230ee9d0aa..4290f74ba85 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java
@@ -135,17 +135,20 @@ public class SinkChannel implements ISinkChannel {
SinkListener sinkListener,
IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
mppDataExchangeServiceClientManager) {
- this.remoteEndpoint = Validate.notNull(remoteEndpoint);
- this.remoteFragmentInstanceId = Validate.notNull(remoteFragmentInstanceId);
- this.remotePlanNodeId = Validate.notNull(remotePlanNodeId);
- this.localPlanNodeId = Validate.notNull(localPlanNodeId);
- this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
+ this.remoteEndpoint = Validate.notNull(remoteEndpoint, "remoteEndPoint can
not be null.");
+ this.remoteFragmentInstanceId =
+ Validate.notNull(remoteFragmentInstanceId, "remoteFragmentInstanceId
can not be null.");
+ this.remotePlanNodeId = Validate.notNull(remotePlanNodeId,
"remotePlanNodeId can not be null.");
+ this.localPlanNodeId = Validate.notNull(localPlanNodeId, "localPlanNodeId
can not be null.");
+ this.localFragmentInstanceId =
+ Validate.notNull(localFragmentInstanceId, "localFragmentInstanceId can
not be null.");
this.fullFragmentInstanceId =
FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId);
- this.localMemoryManager = Validate.notNull(localMemoryManager);
- this.executorService = Validate.notNull(executorService);
- this.serde = Validate.notNull(serde);
- this.sinkListener = Validate.notNull(sinkListener);
+ this.localMemoryManager =
+ Validate.notNull(localMemoryManager, "localMemoryManager can not be
null.");
+ this.executorService = Validate.notNull(executorService, "executorService
can not be null.");
+ this.serde = Validate.notNull(serde, "serde can not be null.");
+ this.sinkListener = Validate.notNull(sinkListener, "sinkListener can not
be null.");
this.mppDataExchangeServiceClientManager =
mppDataExchangeServiceClientManager;
this.retryIntervalInMs = DEFAULT_RETRY_INTERVAL_IN_MS;
this.threadName =
@@ -221,7 +224,9 @@ public class SinkChannel implements ISinkChannel {
@Override
public synchronized void setNoMoreTsBlocks() {
- LOGGER.debug("[StartSetNoMoreTsBlocks]");
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("[StartSetNoMoreTsBlocks]");
+ }
if (aborted || closed) {
return;
}
@@ -230,7 +235,9 @@ public class SinkChannel implements ISinkChannel {
@Override
public synchronized void abort() {
- LOGGER.debug("[StartAbortSinkChannel]");
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("[StartAbortSinkChannel]");
+ }
if (aborted || closed) {
return;
}
@@ -250,12 +257,16 @@ public class SinkChannel implements ISinkChannel {
}
sinkListener.onAborted(this);
aborted = true;
- LOGGER.debug("[EndAbortSinkChannel]");
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("[EndAbortSinkChannel]");
+ }
}
@Override
public synchronized void close() {
- LOGGER.debug("[StartCloseSinkChannel]");
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("[StartCloseSinkChannel]");
+ }
if (closed || aborted) {
return;
}
@@ -275,7 +286,9 @@ public class SinkChannel implements ISinkChannel {
}
invokeOnFinished();
closed = true;
- LOGGER.debug("[EndCloseSinkChannel]");
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("[EndCloseSinkChannel]");
+ }
}
private void invokeOnFinished() {
@@ -310,10 +323,12 @@ public class SinkChannel implements ISinkChannel {
public synchronized ByteBuffer getSerializedTsBlock(int sequenceId) throws
IOException {
if (aborted || closed) {
- LOGGER.debug(
- "SinkChannel still receive getting TsBlock request after being
aborted={} or closed={}",
- aborted,
- closed);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "SinkChannel still receive getting TsBlock request after being
aborted={} or closed={}",
+ aborted,
+ closed);
+ }
throw new GetTsBlockFromClosedOrAbortedChannelException("SinkChannel is
aborted or closed. ");
}
Pair<TsBlock, Long> pair = sequenceIdToTsBlock.get(sequenceId);
@@ -347,7 +362,9 @@ public class SinkChannel implements ISinkChannel {
freedBytes += entry.getValue().right;
bufferRetainedSizeInBytes -= entry.getValue().right;
iterator.remove();
- LOGGER.debug("[ACKTsBlock] {}.", entry.getKey());
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("[ACKTsBlock] {}.", entry.getKey());
+ }
}
// there may exist duplicate ack message in network caused by caller
retrying, if so duplicate
@@ -457,12 +474,14 @@ public class SinkChannel implements ISinkChannel {
@Override
public void run() {
try (SetThreadName sinkChannelName = new SetThreadName(threadName)) {
- LOGGER.debug(
- "[NotifyNewTsBlock] [{}, {}) to {}.{}",
- startSequenceId,
- startSequenceId + blockSizes.size(),
- remoteFragmentInstanceId,
- remotePlanNodeId);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "[NotifyNewTsBlock] [{}, {}) to {}.{}",
+ startSequenceId,
+ startSequenceId + blockSizes.size(),
+ remoteFragmentInstanceId,
+ remotePlanNodeId);
+ }
int attempt = 0;
TNewDataBlockEvent newDataBlockEvent =
new TNewDataBlockEvent(
@@ -509,7 +528,9 @@ public class SinkChannel implements ISinkChannel {
@Override
public void run() {
try (SetThreadName sinkChannelName = new SetThreadName(threadName)) {
- LOGGER.debug("[NotifyNoMoreTsBlock]");
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("[NotifyNoMoreTsBlock]");
+ }
int attempt = 0;
TEndOfDataBlockEvent endOfDataBlockEvent =
new TEndOfDataBlockEvent(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java
index 51ac04d5fea..de37c64e460 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java
@@ -64,9 +64,10 @@ public class LocalSourceHandle implements ISourceHandle {
public LocalSourceHandle(
SharedTsBlockQueue queue, SourceHandleListener sourceHandleListener,
String threadName) {
- this.queue = Validate.notNull(queue);
+ this.queue = Validate.notNull(queue, "queue can not be null.");
this.queue.setSourceHandle(this);
- this.sourceHandleListener = Validate.notNull(sourceHandleListener);
+ this.sourceHandleListener =
+ Validate.notNull(sourceHandleListener, "sourceHandleListener can not
be null.");
this.threadName = threadName;
}
@@ -76,11 +77,13 @@ public class LocalSourceHandle implements ISourceHandle {
String localPlanNodeId,
SharedTsBlockQueue queue,
SourceHandleListener sourceHandleListener) {
- this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
- this.localPlanNodeId = Validate.notNull(localPlanNodeId);
- this.queue = Validate.notNull(queue);
+ this.localFragmentInstanceId =
+ Validate.notNull(localFragmentInstanceId, "localFragmentInstanceId can
not be null.");
+ this.localPlanNodeId = Validate.notNull(localPlanNodeId, "localPlanNodeId
can not be null.");
+ this.queue = Validate.notNull(queue, "queue can not be null.");
this.queue.setSourceHandle(this);
- this.sourceHandleListener = Validate.notNull(sourceHandleListener);
+ this.sourceHandleListener =
+ Validate.notNull(sourceHandleListener, "sourceHandleListener can not
be null.");
this.threadName = createFullIdFrom(localFragmentInstanceId,
localPlanNodeId);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/SourceHandle.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/SourceHandle.java
index 500ac1ad417..e7c25d14c04 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/SourceHandle.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/SourceHandle.java
@@ -132,17 +132,21 @@ public class SourceHandle implements ISourceHandle {
SourceHandleListener sourceHandleListener,
IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
mppDataExchangeServiceClientManager) {
- this.remoteEndpoint = Validate.notNull(remoteEndpoint);
- this.remoteFragmentInstanceId = Validate.notNull(remoteFragmentInstanceId);
- this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
+ this.remoteEndpoint = Validate.notNull(remoteEndpoint, "remoteEndpoint can
not be null.");
+ this.remoteFragmentInstanceId =
+ Validate.notNull(remoteFragmentInstanceId, "remoteFragmentInstanceId
can not be null.");
+ this.localFragmentInstanceId =
+ Validate.notNull(localFragmentInstanceId, "localFragmentInstanceId can
not be null.");
this.fullFragmentInstanceId =
FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId);
- this.localPlanNodeId = Validate.notNull(localPlanNodeId);
+ this.localPlanNodeId = Validate.notNull(localPlanNodeId, "localPlanNodeId
can not be null.");
this.indexOfUpstreamSinkHandle = indexOfUpstreamSinkHandle;
- this.localMemoryManager = Validate.notNull(localMemoryManager);
- this.executorService = Validate.notNull(executorService);
- this.serde = Validate.notNull(serde);
- this.sourceHandleListener = Validate.notNull(sourceHandleListener);
+ this.localMemoryManager =
+ Validate.notNull(localMemoryManager, "localMemoryManager can not be
null.");
+ this.executorService = Validate.notNull(executorService, "executorService
can not be null.");
+ this.serde = Validate.notNull(serde, "serde can not be null.");
+ this.sourceHandleListener =
+ Validate.notNull(sourceHandleListener, "sourceHandleListener can not
be null.");
this.bufferRetainedSizeInBytes = 0L;
this.mppDataExchangeServiceClientManager =
mppDataExchangeServiceClientManager;
this.retryIntervalInMs = DEFAULT_RETRY_INTERVAL_IN_MS;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
index 840617a512f..890d1098353 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
@@ -165,7 +165,8 @@ public class FragmentInstanceExecution {
} catch (Throwable t) {
try (SetThreadName threadName = new
SetThreadName(instanceId.getFullId())) {
LOGGER.error(
- "Errors happened while trying to finish FI, resource may
already leak!", t);
+ "Errors occurred while attempting to finish the FI process,
potentially leading to resource leakage.",
+ t);
}
}
});
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
index 936a4cbcb27..3b8adc70d44 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
@@ -123,7 +123,7 @@ public class MemoryPool {
new ConcurrentLinkedQueue<>();
public MemoryPool(String id, long maxBytes, long
maxBytesPerFragmentInstance) {
- this.id = Validate.notNull(id);
+ this.id = Validate.notNull(id, "id can not be null.");
Validate.isTrue(maxBytes > 0L, "max bytes should be greater than zero:
%d", maxBytes);
this.maxBytes = maxBytes;
Validate.isTrue(
@@ -291,7 +291,7 @@ public class MemoryPool {
// add synchronized on the future to avoid that the future is concurrently
completed by
// MemoryPool.free() which may lead to memory leak.
synchronized (future) {
- Validate.notNull(future);
+ Validate.notNull(future, "The future to be cancelled can not be null.");
// If the future is not a MemoryReservationFuture, it must have been
completed.
if (future.isDone()) {
return 0L;
@@ -305,7 +305,7 @@ public class MemoryPool {
}
public void free(String queryId, String fragmentInstanceId, String
planNodeId, long bytes) {
- Validate.notNull(queryId);
+ Validate.notNull(queryId, "queryId can not be null.");
Validate.isTrue(bytes > 0L);
try {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/Operator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/Operator.java
index 6b3c71d3c51..e7a2d3d08ee 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/Operator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/Operator.java
@@ -79,9 +79,17 @@ public interface Operator extends AutoCloseable {
@SuppressWarnings("squid:S112")
boolean hasNext() throws Exception;
- /** This method will always be called before releasing the Operator
reference. */
+ /**
+ * The close method cleans up the resources occupied by this Operator. This
method will always be
+ * called before releasing the Operator reference.
+ *
+ * <p><b>Note:</b> If this Operator has child Operators, please ensure that
child operators are
+ * also closed.
+ *
+ * @throws Exception An exception occurred during the close operation.
+ */
@Override
- default void close() throws Exception {}
+ void close() throws Exception;
/**
* Is this operator completely finished processing and no more output
TsBlock will be produced.
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java
index d59067463f9..9459b5c5749 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java
@@ -221,6 +221,16 @@ public class TopKOperator implements ProcessOperator {
return null;
}
+ @Override
+ public void close() throws Exception {
+ for (int i = deviceIndex; i < deviceOperators.size(); i++) {
+ final Operator operator = deviceOperators.get(i);
+ if (operator != null) {
+ operator.close();
+ }
+ }
+ }
+
@Override
public long calculateMaxPeekMemory() {
// traverse each child serial,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/SchemaFetchScanOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/SchemaFetchScanOperator.java
index 2079e371149..630a084b080 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/SchemaFetchScanOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/SchemaFetchScanOperator.java
@@ -100,6 +100,11 @@ public class SchemaFetchScanOperator implements
SourceOperator {
return isFinished;
}
+ @Override
+ public void close() throws Exception {
+ // do nothing
+ }
+
@Override
public PlanNodeId getSourceId() {
return sourceId;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractDataSourceOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractDataSourceOperator.java
index 41fc28f4a9c..c68ed736e4d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractDataSourceOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractDataSourceOperator.java
@@ -38,5 +38,10 @@ public abstract class AbstractDataSourceOperator extends
AbstractSourceOperator
resultTsBlockBuilder = new TsBlockBuilder(getResultDataTypes());
}
+ @Override
+ public void close() throws Exception {
+ // do nothing
+ }
+
protected abstract List<TSDataType> getResultDataTypes();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/LastCacheScanOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/LastCacheScanOperator.java
index 495a948ce27..e649fee1116 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/LastCacheScanOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/LastCacheScanOperator.java
@@ -58,6 +58,11 @@ public class LastCacheScanOperator implements SourceOperator
{
return !hasNextWithTimer();
}
+ @Override
+ public void close() throws Exception {
+ // do nothing
+ }
+
@Override
public long calculateMaxPeekMemory() {
return tsBlock.getRetainedSizeInBytes();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ShowQueriesOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ShowQueriesOperator.java
index 265a3410b8e..66cd7a81db7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ShowQueriesOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ShowQueriesOperator.java
@@ -86,6 +86,11 @@ public class ShowQueriesOperator implements SourceOperator {
return hasConsumed;
}
+ @Override
+ public void close() throws Exception {
+ // do nothing
+ }
+
@Override
public long calculateMaxPeekMemory() {
return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskTimeoutSentinelThread.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskTimeoutSentinelThread.java
index f7bbfab725d..9fb4e59dda2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskTimeoutSentinelThread.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskTimeoutSentinelThread.java
@@ -31,7 +31,7 @@ public class DriverTaskTimeoutSentinelThread extends
AbstractDriverThread {
private static final Logger LOGGER =
LoggerFactory.getLogger(DriverTaskTimeoutSentinelThread.class);
- private final long SLEEP_BOUND = 5 * 1000L;
+ private static final long SLEEP_BOUND = 5 * 1000L;
public DriverTaskTimeoutSentinelThread(
String workerId,
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/FillOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/FillOperatorTest.java
index aa6df804baa..bf5bf6f2c7f 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/FillOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/FillOperatorTest.java
@@ -133,6 +133,11 @@ public class FillOperatorTest {
return index >= 3;
}
+ @Override
+ public void close() throws Exception {
+ // do nothing
+ }
+
@Override
public long calculateMaxPeekMemory() {
return 0;
@@ -306,6 +311,11 @@ public class FillOperatorTest {
return index >= 3;
}
+ @Override
+ public void close() throws Exception {
+ // do nothing
+ }
+
@Override
public long calculateMaxPeekMemory() {
return 0;
@@ -473,6 +483,11 @@ public class FillOperatorTest {
return index >= 3;
}
+ @Override
+ public void close() throws Exception {
+ // do nothing
+ }
+
@Override
public long calculateMaxPeekMemory() {
return 0;
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LastQueryMergeOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LastQueryMergeOperatorTest.java
index 477f4e3c556..cb8c30041d6 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LastQueryMergeOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LastQueryMergeOperatorTest.java
@@ -128,6 +128,11 @@ public class LastQueryMergeOperatorTest {
return !hasNext();
}
+ @Override
+ public void close() throws Exception {
+ // do nothing
+ }
+
@Override
public long calculateMaxPeekMemory() {
return 0;
@@ -192,6 +197,11 @@ public class LastQueryMergeOperatorTest {
return !hasNext();
}
+ @Override
+ public void close() throws Exception {
+ // do nothing
+ }
+
@Override
public long calculateMaxPeekMemory() {
return 0;
@@ -333,6 +343,11 @@ public class LastQueryMergeOperatorTest {
return !hasNext();
}
+ @Override
+ public void close() throws Exception {
+ // do nothing
+ }
+
@Override
public long calculateMaxPeekMemory() {
return 0;
@@ -398,6 +413,11 @@ public class LastQueryMergeOperatorTest {
return !hasNext();
}
+ @Override
+ public void close() throws Exception {
+ // do nothing
+ }
+
@Override
public long calculateMaxPeekMemory() {
return 0;
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LinearFillOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LinearFillOperatorTest.java
index 07e9da9ca9a..5e59ac9dba7 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LinearFillOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LinearFillOperatorTest.java
@@ -164,6 +164,11 @@ public class LinearFillOperatorTest {
return index >= 3;
}
+ @Override
+ public void close() throws Exception {
+ // do nothing
+ }
+
@Override
public long calculateMaxPeekMemory() {
return 0;
@@ -377,6 +382,11 @@ public class LinearFillOperatorTest {
return index >= 3;
}
+ @Override
+ public void close() throws Exception {
+ // do nothing
+ }
+
@Override
public long calculateMaxPeekMemory() {
return 0;
@@ -590,6 +600,11 @@ public class LinearFillOperatorTest {
return index >= 3;
}
+ @Override
+ public void close() throws Exception {
+ // do nothing
+ }
+
@Override
public long calculateMaxPeekMemory() {
return 0;
@@ -803,6 +818,11 @@ public class LinearFillOperatorTest {
return index >= 3;
}
+ @Override
+ public void close() throws Exception {
+ // do nothing
+ }
+
@Override
public long calculateMaxPeekMemory() {
return 0;
@@ -964,6 +984,11 @@ public class LinearFillOperatorTest {
return index >= 7;
}
+ @Override
+ public void close() throws Exception {
+ // do nothing
+ }
+
@Override
public long calculateMaxPeekMemory() {
return 0;
@@ -1084,6 +1109,11 @@ public class LinearFillOperatorTest {
return index >= 7;
}
+ @Override
+ public void close() throws Exception {
+ // do nothing
+ }
+
@Override
public long calculateMaxPeekMemory() {
return 0;
@@ -1204,6 +1234,11 @@ public class LinearFillOperatorTest {
return index >= 7;
}
+ @Override
+ public void close() throws Exception {
+ // do nothing
+ }
+
@Override
public long calculateMaxPeekMemory() {
return 0;