This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 0cc841b7639 Fix close method of TopOperator and make close method of 
Operator an interface
0cc841b7639 is described below

commit 0cc841b7639be022c54f6e335744a7528f8c025e
Author: Liao Lanyu <[email protected]>
AuthorDate: Thu Nov 30 18:29:59 2023 +0800

    Fix close method of TopOperator and make close method of Operator an 
interface
---
 .../db/queryengine/execution/driver/Driver.java    |  4 --
 .../execution/exchange/sink/LocalSinkChannel.java  | 39 ++++++++----
 .../execution/exchange/sink/ShuffleSinkHandle.java | 27 +++++---
 .../execution/exchange/sink/SinkChannel.java       | 73 ++++++++++++++--------
 .../exchange/source/LocalSourceHandle.java         | 15 +++--
 .../execution/exchange/source/SourceHandle.java    | 20 +++---
 .../fragment/FragmentInstanceExecution.java        |  3 +-
 .../queryengine/execution/memory/MemoryPool.java   |  6 +-
 .../queryengine/execution/operator/Operator.java   | 12 +++-
 .../execution/operator/process/TopKOperator.java   | 10 +++
 .../operator/schema/SchemaFetchScanOperator.java   |  5 ++
 .../source/AbstractDataSourceOperator.java         |  5 ++
 .../operator/source/LastCacheScanOperator.java     |  5 ++
 .../operator/source/ShowQueriesOperator.java       |  5 ++
 .../schedule/DriverTaskTimeoutSentinelThread.java  |  2 +-
 .../execution/operator/FillOperatorTest.java       | 15 +++++
 .../operator/LastQueryMergeOperatorTest.java       | 20 ++++++
 .../execution/operator/LinearFillOperatorTest.java | 35 +++++++++++
 18 files changed, 230 insertions(+), 71 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java
index 45b897d9c60..61b3a634e82 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java
@@ -115,10 +115,6 @@ public abstract class Driver implements IDriver {
   /** release resource this driver used. */
   protected abstract void releaseResource();
 
-  public int getDependencyDriverIndex() {
-    return driverContext.getDependencyDriverIndex();
-  }
-
   @Override
   public ListenableFuture<?> processFor(Duration duration) {
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/LocalSinkChannel.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/LocalSinkChannel.java
index d23f7c04450..2fe94bbeb51 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/LocalSinkChannel.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/LocalSinkChannel.java
@@ -57,8 +57,8 @@ public class LocalSinkChannel implements ISinkChannel {
       DataExchangeCostMetricSet.getInstance();
 
   public LocalSinkChannel(SharedTsBlockQueue queue, SinkListener sinkListener) 
{
-    this.sinkListener = Validate.notNull(sinkListener);
-    this.queue = Validate.notNull(queue);
+    this.sinkListener = Validate.notNull(sinkListener, "sinkListener can not 
be null.");
+    this.queue = Validate.notNull(queue, "queue can not be null.");
     this.queue.setSinkChannel(this);
     blocked = queue.getCanAddTsBlock();
   }
@@ -67,9 +67,10 @@ public class LocalSinkChannel implements ISinkChannel {
       TFragmentInstanceId localFragmentInstanceId,
       SharedTsBlockQueue queue,
       SinkListener sinkListener) {
-    this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
-    this.sinkListener = Validate.notNull(sinkListener);
-    this.queue = Validate.notNull(queue);
+    this.localFragmentInstanceId =
+        Validate.notNull(localFragmentInstanceId, "localFragmentInstanceId can 
not be null.");
+    this.sinkListener = Validate.notNull(sinkListener, "sinkListener can not 
be null.");
+    this.queue = Validate.notNull(queue, "queue can not be null.");
     this.queue.setSinkChannel(this);
     // SinkChannel can send data after SourceHandle asks it to
     blocked = queue.getCanAddTsBlock();
@@ -140,7 +141,9 @@ public class LocalSinkChannel implements ISinkChannel {
         if (queue.hasNoMoreTsBlocks()) {
           return;
         }
-        LOGGER.debug("[StartSendTsBlockOnLocal]");
+        if (LOGGER.isDebugEnabled()) {
+          LOGGER.debug("[StartSendTsBlockOnLocal]");
+        }
         synchronized (this) {
           blocked = queue.add(tsBlock);
         }
@@ -155,7 +158,9 @@ public class LocalSinkChannel implements ISinkChannel {
   public void setNoMoreTsBlocks() {
     synchronized (queue) {
       synchronized (this) {
-        LOGGER.debug("[StartSetNoMoreTsBlocksOnLocal]");
+        if (LOGGER.isDebugEnabled()) {
+          LOGGER.debug("[StartSetNoMoreTsBlocksOnLocal]");
+        }
         if (aborted || closed) {
           return;
         }
@@ -164,12 +169,16 @@ public class LocalSinkChannel implements ISinkChannel {
       }
     }
     checkAndInvokeOnFinished();
-    LOGGER.debug("[EndSetNoMoreTsBlocksOnLocal]");
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("[EndSetNoMoreTsBlocksOnLocal]");
+    }
   }
 
   @Override
   public void abort() {
-    LOGGER.debug("[StartAbortLocalSinkChannel]");
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("[StartAbortLocalSinkChannel]");
+    }
     synchronized (queue) {
       synchronized (this) {
         if (aborted || closed) {
@@ -184,12 +193,16 @@ public class LocalSinkChannel implements ISinkChannel {
         }
       }
     }
-    LOGGER.debug("[EndAbortLocalSinkChannel]");
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("[EndAbortLocalSinkChannel]");
+    }
   }
 
   @Override
   public void close() {
-    LOGGER.debug("[StartCloseLocalSinkChannel]");
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("[StartCloseLocalSinkChannel]");
+    }
     synchronized (queue) {
       synchronized (this) {
         if (aborted || closed) {
@@ -203,7 +216,9 @@ public class LocalSinkChannel implements ISinkChannel {
         }
       }
     }
-    LOGGER.debug("[EndCloseLocalSinkChannel]");
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("[EndCloseLocalSinkChannel]");
+    }
   }
 
   public SharedTsBlockQueue getSharedTsBlockQueue() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ShuffleSinkHandle.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ShuffleSinkHandle.java
index 234fe2dbecb..7c31ab67e04 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ShuffleSinkHandle.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/ShuffleSinkHandle.java
@@ -75,10 +75,13 @@ public class ShuffleSinkHandle implements ISinkHandle {
       DownStreamChannelIndex downStreamChannelIndex,
       ShuffleStrategyEnum shuffleStrategyEnum,
       MPPDataExchangeManager.SinkListener sinkListener) {
-    this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
-    this.downStreamChannelList = Validate.notNull(downStreamChannelList);
-    this.downStreamChannelIndex = Validate.notNull(downStreamChannelIndex);
-    this.sinkListener = Validate.notNull(sinkListener);
+    this.localFragmentInstanceId =
+        Validate.notNull(localFragmentInstanceId, "localFragmentInstanceId can 
not be null.");
+    this.downStreamChannelList =
+        Validate.notNull(downStreamChannelList, "downStreamChannelList can not 
be null.");
+    this.downStreamChannelIndex =
+        Validate.notNull(downStreamChannelIndex, "downStreamChannelIndex can 
not be null.");
+    this.sinkListener = Validate.notNull(sinkListener, "sinkListener can not 
be null.");
     this.channelNum = downStreamChannelList.size();
     this.shuffleStrategy = getShuffleStrategy(shuffleStrategyEnum);
     this.hasSetNoMoreTsBlocks = new boolean[channelNum];
@@ -186,7 +189,9 @@ public class ShuffleSinkHandle implements ISinkHandle {
       return;
     }
     aborted = true;
-    LOGGER.debug("[StartAbortShuffleSinkHandle]");
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("[StartAbortShuffleSinkHandle]");
+    }
     boolean meetError = false;
     Exception firstException = null;
     for (ISink channel : downStreamChannelList) {
@@ -203,7 +208,9 @@ public class ShuffleSinkHandle implements ISinkHandle {
       LOGGER.warn("Error occurred when try to abort channel.", firstException);
     }
     sinkListener.onAborted(this);
-    LOGGER.debug("[EndAbortShuffleSinkHandle]");
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("[EndAbortShuffleSinkHandle]");
+    }
   }
 
   // Add synchronized on this method may lead to Dead Lock
@@ -216,7 +223,9 @@ public class ShuffleSinkHandle implements ISinkHandle {
       return;
     }
     closed = true;
-    LOGGER.debug("[StartCloseShuffleSinkHandle]");
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("[StartCloseShuffleSinkHandle]");
+    }
     boolean meetError = false;
     Exception firstException = null;
     for (ISink channel : downStreamChannelList) {
@@ -233,7 +242,9 @@ public class ShuffleSinkHandle implements ISinkHandle {
       LOGGER.warn("Error occurred when try to close channel.", firstException);
     }
     sinkListener.onFinish(this);
-    LOGGER.debug("[EndCloseShuffleSinkHandle]");
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("[EndCloseShuffleSinkHandle]");
+    }
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java
index 0230ee9d0aa..4290f74ba85 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java
@@ -135,17 +135,20 @@ public class SinkChannel implements ISinkChannel {
       SinkListener sinkListener,
       IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
           mppDataExchangeServiceClientManager) {
-    this.remoteEndpoint = Validate.notNull(remoteEndpoint);
-    this.remoteFragmentInstanceId = Validate.notNull(remoteFragmentInstanceId);
-    this.remotePlanNodeId = Validate.notNull(remotePlanNodeId);
-    this.localPlanNodeId = Validate.notNull(localPlanNodeId);
-    this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
+    this.remoteEndpoint = Validate.notNull(remoteEndpoint, "remoteEndPoint can 
not be null.");
+    this.remoteFragmentInstanceId =
+        Validate.notNull(remoteFragmentInstanceId, "remoteFragmentInstanceId 
can not be null.");
+    this.remotePlanNodeId = Validate.notNull(remotePlanNodeId, 
"remotePlanNodeId can not be null.");
+    this.localPlanNodeId = Validate.notNull(localPlanNodeId, "localPlanNodeId 
can not be null.");
+    this.localFragmentInstanceId =
+        Validate.notNull(localFragmentInstanceId, "localFragmentInstanceId can 
not be null.");
     this.fullFragmentInstanceId =
         
FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId);
-    this.localMemoryManager = Validate.notNull(localMemoryManager);
-    this.executorService = Validate.notNull(executorService);
-    this.serde = Validate.notNull(serde);
-    this.sinkListener = Validate.notNull(sinkListener);
+    this.localMemoryManager =
+        Validate.notNull(localMemoryManager, "localMemoryManager can not be 
null.");
+    this.executorService = Validate.notNull(executorService, "executorService 
can not be null.");
+    this.serde = Validate.notNull(serde, "serde can not be null.");
+    this.sinkListener = Validate.notNull(sinkListener, "sinkListener can not 
be null.");
     this.mppDataExchangeServiceClientManager = 
mppDataExchangeServiceClientManager;
     this.retryIntervalInMs = DEFAULT_RETRY_INTERVAL_IN_MS;
     this.threadName =
@@ -221,7 +224,9 @@ public class SinkChannel implements ISinkChannel {
 
   @Override
   public synchronized void setNoMoreTsBlocks() {
-    LOGGER.debug("[StartSetNoMoreTsBlocks]");
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("[StartSetNoMoreTsBlocks]");
+    }
     if (aborted || closed) {
       return;
     }
@@ -230,7 +235,9 @@ public class SinkChannel implements ISinkChannel {
 
   @Override
   public synchronized void abort() {
-    LOGGER.debug("[StartAbortSinkChannel]");
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("[StartAbortSinkChannel]");
+    }
     if (aborted || closed) {
       return;
     }
@@ -250,12 +257,16 @@ public class SinkChannel implements ISinkChannel {
     }
     sinkListener.onAborted(this);
     aborted = true;
-    LOGGER.debug("[EndAbortSinkChannel]");
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("[EndAbortSinkChannel]");
+    }
   }
 
   @Override
   public synchronized void close() {
-    LOGGER.debug("[StartCloseSinkChannel]");
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("[StartCloseSinkChannel]");
+    }
     if (closed || aborted) {
       return;
     }
@@ -275,7 +286,9 @@ public class SinkChannel implements ISinkChannel {
     }
     invokeOnFinished();
     closed = true;
-    LOGGER.debug("[EndCloseSinkChannel]");
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("[EndCloseSinkChannel]");
+    }
   }
 
   private void invokeOnFinished() {
@@ -310,10 +323,12 @@ public class SinkChannel implements ISinkChannel {
 
   public synchronized ByteBuffer getSerializedTsBlock(int sequenceId) throws 
IOException {
     if (aborted || closed) {
-      LOGGER.debug(
-          "SinkChannel still receive getting TsBlock request after being 
aborted={} or closed={}",
-          aborted,
-          closed);
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug(
+            "SinkChannel still receive getting TsBlock request after being 
aborted={} or closed={}",
+            aborted,
+            closed);
+      }
       throw new GetTsBlockFromClosedOrAbortedChannelException("SinkChannel is 
aborted or closed. ");
     }
     Pair<TsBlock, Long> pair = sequenceIdToTsBlock.get(sequenceId);
@@ -347,7 +362,9 @@ public class SinkChannel implements ISinkChannel {
         freedBytes += entry.getValue().right;
         bufferRetainedSizeInBytes -= entry.getValue().right;
         iterator.remove();
-        LOGGER.debug("[ACKTsBlock] {}.", entry.getKey());
+        if (LOGGER.isDebugEnabled()) {
+          LOGGER.debug("[ACKTsBlock] {}.", entry.getKey());
+        }
       }
 
       // there may exist duplicate ack message in network caused by caller 
retrying, if so duplicate
@@ -457,12 +474,14 @@ public class SinkChannel implements ISinkChannel {
     @Override
     public void run() {
       try (SetThreadName sinkChannelName = new SetThreadName(threadName)) {
-        LOGGER.debug(
-            "[NotifyNewTsBlock] [{}, {}) to {}.{}",
-            startSequenceId,
-            startSequenceId + blockSizes.size(),
-            remoteFragmentInstanceId,
-            remotePlanNodeId);
+        if (LOGGER.isDebugEnabled()) {
+          LOGGER.debug(
+              "[NotifyNewTsBlock] [{}, {}) to {}.{}",
+              startSequenceId,
+              startSequenceId + blockSizes.size(),
+              remoteFragmentInstanceId,
+              remotePlanNodeId);
+        }
         int attempt = 0;
         TNewDataBlockEvent newDataBlockEvent =
             new TNewDataBlockEvent(
@@ -509,7 +528,9 @@ public class SinkChannel implements ISinkChannel {
     @Override
     public void run() {
       try (SetThreadName sinkChannelName = new SetThreadName(threadName)) {
-        LOGGER.debug("[NotifyNoMoreTsBlock]");
+        if (LOGGER.isDebugEnabled()) {
+          LOGGER.debug("[NotifyNoMoreTsBlock]");
+        }
         int attempt = 0;
         TEndOfDataBlockEvent endOfDataBlockEvent =
             new TEndOfDataBlockEvent(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java
index 51ac04d5fea..de37c64e460 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java
@@ -64,9 +64,10 @@ public class LocalSourceHandle implements ISourceHandle {
 
   public LocalSourceHandle(
       SharedTsBlockQueue queue, SourceHandleListener sourceHandleListener, 
String threadName) {
-    this.queue = Validate.notNull(queue);
+    this.queue = Validate.notNull(queue, "queue can not be null.");
     this.queue.setSourceHandle(this);
-    this.sourceHandleListener = Validate.notNull(sourceHandleListener);
+    this.sourceHandleListener =
+        Validate.notNull(sourceHandleListener, "sourceHandleListener can not 
be null.");
     this.threadName = threadName;
   }
 
@@ -76,11 +77,13 @@ public class LocalSourceHandle implements ISourceHandle {
       String localPlanNodeId,
       SharedTsBlockQueue queue,
       SourceHandleListener sourceHandleListener) {
-    this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
-    this.localPlanNodeId = Validate.notNull(localPlanNodeId);
-    this.queue = Validate.notNull(queue);
+    this.localFragmentInstanceId =
+        Validate.notNull(localFragmentInstanceId, "localFragmentInstanceId can 
not be null.");
+    this.localPlanNodeId = Validate.notNull(localPlanNodeId, "localPlanNodeId 
can not be null.");
+    this.queue = Validate.notNull(queue, "queue can not be null.");
     this.queue.setSourceHandle(this);
-    this.sourceHandleListener = Validate.notNull(sourceHandleListener);
+    this.sourceHandleListener =
+        Validate.notNull(sourceHandleListener, "sourceHandleListener can not 
be null.");
     this.threadName = createFullIdFrom(localFragmentInstanceId, 
localPlanNodeId);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/SourceHandle.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/SourceHandle.java
index 500ac1ad417..e7c25d14c04 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/SourceHandle.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/SourceHandle.java
@@ -132,17 +132,21 @@ public class SourceHandle implements ISourceHandle {
       SourceHandleListener sourceHandleListener,
       IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
           mppDataExchangeServiceClientManager) {
-    this.remoteEndpoint = Validate.notNull(remoteEndpoint);
-    this.remoteFragmentInstanceId = Validate.notNull(remoteFragmentInstanceId);
-    this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
+    this.remoteEndpoint = Validate.notNull(remoteEndpoint, "remoteEndpoint can 
not be null.");
+    this.remoteFragmentInstanceId =
+        Validate.notNull(remoteFragmentInstanceId, "remoteFragmentInstanceId 
can not be null.");
+    this.localFragmentInstanceId =
+        Validate.notNull(localFragmentInstanceId, "localFragmentInstanceId can 
not be null.");
     this.fullFragmentInstanceId =
         
FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId);
-    this.localPlanNodeId = Validate.notNull(localPlanNodeId);
+    this.localPlanNodeId = Validate.notNull(localPlanNodeId, "localPlanNodeId 
can not be null.");
     this.indexOfUpstreamSinkHandle = indexOfUpstreamSinkHandle;
-    this.localMemoryManager = Validate.notNull(localMemoryManager);
-    this.executorService = Validate.notNull(executorService);
-    this.serde = Validate.notNull(serde);
-    this.sourceHandleListener = Validate.notNull(sourceHandleListener);
+    this.localMemoryManager =
+        Validate.notNull(localMemoryManager, "localMemoryManager can not be 
null.");
+    this.executorService = Validate.notNull(executorService, "executorService 
can not be null.");
+    this.serde = Validate.notNull(serde, "serde can not be null.");
+    this.sourceHandleListener =
+        Validate.notNull(sourceHandleListener, "sourceHandleListener can not 
be null.");
     this.bufferRetainedSizeInBytes = 0L;
     this.mppDataExchangeServiceClientManager = 
mppDataExchangeServiceClientManager;
     this.retryIntervalInMs = DEFAULT_RETRY_INTERVAL_IN_MS;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
index 840617a512f..890d1098353 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
@@ -165,7 +165,8 @@ public class FragmentInstanceExecution {
           } catch (Throwable t) {
             try (SetThreadName threadName = new 
SetThreadName(instanceId.getFullId())) {
               LOGGER.error(
-                  "Errors happened while trying to finish FI, resource may 
already leak!", t);
+                  "Errors occurred while attempting to finish the FI process, 
potentially leading to resource leakage.",
+                  t);
             }
           }
         });
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
index 936a4cbcb27..3b8adc70d44 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
@@ -123,7 +123,7 @@ public class MemoryPool {
       new ConcurrentLinkedQueue<>();
 
   public MemoryPool(String id, long maxBytes, long 
maxBytesPerFragmentInstance) {
-    this.id = Validate.notNull(id);
+    this.id = Validate.notNull(id, "id can not be null.");
     Validate.isTrue(maxBytes > 0L, "max bytes should be greater than zero: 
%d", maxBytes);
     this.maxBytes = maxBytes;
     Validate.isTrue(
@@ -291,7 +291,7 @@ public class MemoryPool {
     // add synchronized on the future to avoid that the future is concurrently 
completed by
     // MemoryPool.free() which may lead to memory leak.
     synchronized (future) {
-      Validate.notNull(future);
+      Validate.notNull(future, "The future to be cancelled can not be null.");
       // If the future is not a MemoryReservationFuture, it must have been 
completed.
       if (future.isDone()) {
         return 0L;
@@ -305,7 +305,7 @@ public class MemoryPool {
   }
 
   public void free(String queryId, String fragmentInstanceId, String 
planNodeId, long bytes) {
-    Validate.notNull(queryId);
+    Validate.notNull(queryId, "queryId can not be null.");
     Validate.isTrue(bytes > 0L);
 
     try {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/Operator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/Operator.java
index 6b3c71d3c51..e7a2d3d08ee 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/Operator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/Operator.java
@@ -79,9 +79,17 @@ public interface Operator extends AutoCloseable {
   @SuppressWarnings("squid:S112")
   boolean hasNext() throws Exception;
 
-  /** This method will always be called before releasing the Operator 
reference. */
+  /**
+   * The close method cleans up the resources occupied by this Operator. This 
method will always be
+   * called before releasing the Operator reference.
+   *
+   * <p><b>Note:</b> If this Operator has child Operators, please ensure that 
child operators are
+   * also closed.
+   *
+   * @throws Exception An exception occurred during the close operation.
+   */
   @Override
-  default void close() throws Exception {}
+  void close() throws Exception;
 
   /**
    * Is this operator completely finished processing and no more output 
TsBlock will be produced.
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java
index d59067463f9..9459b5c5749 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java
@@ -221,6 +221,16 @@ public class TopKOperator implements ProcessOperator {
     return null;
   }
 
+  @Override
+  public void close() throws Exception {
+    for (int i = deviceIndex; i < deviceOperators.size(); i++) {
+      final Operator operator = deviceOperators.get(i);
+      if (operator != null) {
+        operator.close();
+      }
+    }
+  }
+
   @Override
   public long calculateMaxPeekMemory() {
     // traverse each child serial,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/SchemaFetchScanOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/SchemaFetchScanOperator.java
index 2079e371149..630a084b080 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/SchemaFetchScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/schema/SchemaFetchScanOperator.java
@@ -100,6 +100,11 @@ public class SchemaFetchScanOperator implements 
SourceOperator {
     return isFinished;
   }
 
+  @Override
+  public void close() throws Exception {
+    // do nothing
+  }
+
   @Override
   public PlanNodeId getSourceId() {
     return sourceId;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractDataSourceOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractDataSourceOperator.java
index 41fc28f4a9c..c68ed736e4d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractDataSourceOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractDataSourceOperator.java
@@ -38,5 +38,10 @@ public abstract class AbstractDataSourceOperator extends 
AbstractSourceOperator
     resultTsBlockBuilder = new TsBlockBuilder(getResultDataTypes());
   }
 
+  @Override
+  public void close() throws Exception {
+    // do nothing
+  }
+
   protected abstract List<TSDataType> getResultDataTypes();
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/LastCacheScanOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/LastCacheScanOperator.java
index 495a948ce27..e649fee1116 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/LastCacheScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/LastCacheScanOperator.java
@@ -58,6 +58,11 @@ public class LastCacheScanOperator implements SourceOperator 
{
     return !hasNextWithTimer();
   }
 
+  @Override
+  public void close() throws Exception {
+    // do nothing
+  }
+
   @Override
   public long calculateMaxPeekMemory() {
     return tsBlock.getRetainedSizeInBytes();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ShowQueriesOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ShowQueriesOperator.java
index 265a3410b8e..66cd7a81db7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ShowQueriesOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ShowQueriesOperator.java
@@ -86,6 +86,11 @@ public class ShowQueriesOperator implements SourceOperator {
     return hasConsumed;
   }
 
+  @Override
+  public void close() throws Exception {
+    // do nothing
+  }
+
   @Override
   public long calculateMaxPeekMemory() {
     return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskTimeoutSentinelThread.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskTimeoutSentinelThread.java
index f7bbfab725d..9fb4e59dda2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskTimeoutSentinelThread.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskTimeoutSentinelThread.java
@@ -31,7 +31,7 @@ public class DriverTaskTimeoutSentinelThread extends 
AbstractDriverThread {
   private static final Logger LOGGER =
       LoggerFactory.getLogger(DriverTaskTimeoutSentinelThread.class);
 
-  private final long SLEEP_BOUND = 5 * 1000L;
+  private static final long SLEEP_BOUND = 5 * 1000L;
 
   public DriverTaskTimeoutSentinelThread(
       String workerId,
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/FillOperatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/FillOperatorTest.java
index aa6df804baa..bf5bf6f2c7f 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/FillOperatorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/FillOperatorTest.java
@@ -133,6 +133,11 @@ public class FillOperatorTest {
                   return index >= 3;
                 }
 
+                @Override
+                public void close() throws Exception {
+                  // do nothing
+                }
+
                 @Override
                 public long calculateMaxPeekMemory() {
                   return 0;
@@ -306,6 +311,11 @@ public class FillOperatorTest {
                   return index >= 3;
                 }
 
+                @Override
+                public void close() throws Exception {
+                  // do nothing
+                }
+
                 @Override
                 public long calculateMaxPeekMemory() {
                   return 0;
@@ -473,6 +483,11 @@ public class FillOperatorTest {
                   return index >= 3;
                 }
 
+                @Override
+                public void close() throws Exception {
+                  // do nothing
+                }
+
                 @Override
                 public long calculateMaxPeekMemory() {
                   return 0;
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LastQueryMergeOperatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LastQueryMergeOperatorTest.java
index 477f4e3c556..cb8c30041d6 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LastQueryMergeOperatorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LastQueryMergeOperatorTest.java
@@ -128,6 +128,11 @@ public class LastQueryMergeOperatorTest {
             return !hasNext();
           }
 
+          @Override
+          public void close() throws Exception {
+            // do nothing
+          }
+
           @Override
           public long calculateMaxPeekMemory() {
             return 0;
@@ -192,6 +197,11 @@ public class LastQueryMergeOperatorTest {
             return !hasNext();
           }
 
+          @Override
+          public void close() throws Exception {
+            // do nothing
+          }
+
           @Override
           public long calculateMaxPeekMemory() {
             return 0;
@@ -333,6 +343,11 @@ public class LastQueryMergeOperatorTest {
             return !hasNext();
           }
 
+          @Override
+          public void close() throws Exception {
+            // do nothing
+          }
+
           @Override
           public long calculateMaxPeekMemory() {
             return 0;
@@ -398,6 +413,11 @@ public class LastQueryMergeOperatorTest {
             return !hasNext();
           }
 
+          @Override
+          public void close() throws Exception {
+            // do nothing
+          }
+
           @Override
           public long calculateMaxPeekMemory() {
             return 0;
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LinearFillOperatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LinearFillOperatorTest.java
index 07e9da9ca9a..5e59ac9dba7 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LinearFillOperatorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LinearFillOperatorTest.java
@@ -164,6 +164,11 @@ public class LinearFillOperatorTest {
                   return index >= 3;
                 }
 
+                @Override
+                public void close() throws Exception {
+                  // do nothing
+                }
+
                 @Override
                 public long calculateMaxPeekMemory() {
                   return 0;
@@ -377,6 +382,11 @@ public class LinearFillOperatorTest {
                   return index >= 3;
                 }
 
+                @Override
+                public void close() throws Exception {
+                  // do nothing
+                }
+
                 @Override
                 public long calculateMaxPeekMemory() {
                   return 0;
@@ -590,6 +600,11 @@ public class LinearFillOperatorTest {
                   return index >= 3;
                 }
 
+                @Override
+                public void close() throws Exception {
+                  // do nothing
+                }
+
                 @Override
                 public long calculateMaxPeekMemory() {
                   return 0;
@@ -803,6 +818,11 @@ public class LinearFillOperatorTest {
                   return index >= 3;
                 }
 
+                @Override
+                public void close() throws Exception {
+                  // do nothing
+                }
+
                 @Override
                 public long calculateMaxPeekMemory() {
                   return 0;
@@ -964,6 +984,11 @@ public class LinearFillOperatorTest {
                   return index >= 7;
                 }
 
+                @Override
+                public void close() throws Exception {
+                  // do nothing
+                }
+
                 @Override
                 public long calculateMaxPeekMemory() {
                   return 0;
@@ -1084,6 +1109,11 @@ public class LinearFillOperatorTest {
                   return index >= 7;
                 }
 
+                @Override
+                public void close() throws Exception {
+                  // do nothing
+                }
+
                 @Override
                 public long calculateMaxPeekMemory() {
                   return 0;
@@ -1204,6 +1234,11 @@ public class LinearFillOperatorTest {
                   return index >= 7;
                 }
 
+                @Override
+                public void close() throws Exception {
+                  // do nothing
+                }
+
                 @Override
                 public long calculateMaxPeekMemory() {
                   return 0;


Reply via email to