This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f34bc03129 Specify error msg when DriverTask is aborted by 
MemoryNotEnoughException
7f34bc03129 is described below

commit 7f34bc03129f8a9e477211e66bcbafb8bdab8c1a
Author: Liao Lanyu <[email protected]>
AuthorDate: Tue Jun 25 13:59:09 2024 +0800

    Specify error msg when DriverTask is aborted by MemoryNotEnoughException
---
 .../queryengine/execution/schedule/AbstractDriverThread.java | 12 +++++++++++-
 .../execution/schedule/DriverTaskAbortedException.java       |  3 +++
 .../dataregion/read/reader/common/PriorityMergeReader.java   |  3 +++
 3 files changed, 17 insertions(+), 1 deletion(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/AbstractDriverThread.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/AbstractDriverThread.java
index c433f0d2925..381c9b9378a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/AbstractDriverThread.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/AbstractDriverThread.java
@@ -19,8 +19,10 @@
 
 package org.apache.iotdb.db.queryengine.execution.schedule;
 
+import org.apache.iotdb.db.queryengine.exception.MemoryNotEnoughException;
 import 
org.apache.iotdb.db.queryengine.execution.schedule.queue.IndexedBlockingQueue;
 import org.apache.iotdb.db.queryengine.execution.schedule.task.DriverTask;
+import org.apache.iotdb.db.utils.ErrorHandlingUtils;
 import org.apache.iotdb.db.utils.SetThreadName;
 
 import org.slf4j.Logger;
@@ -77,7 +79,7 @@ public abstract class AbstractDriverThread extends Thread 
implements Closeable {
           try (SetThreadName driverTaskName =
               new 
SetThreadName(next.getDriver().getDriverTaskId().getFullId())) {
             logger.warn("[ExecuteFailed]", e);
-            
next.setAbortCause(DriverTaskAbortedException.BY_INTERNAL_ERROR_SCHEDULED);
+            next.setAbortCause(getAbortCause(e));
             scheduler.toAborted(next);
           }
         } finally {
@@ -113,4 +115,12 @@ public abstract class AbstractDriverThread extends Thread 
implements Closeable {
   public void close() throws IOException {
     closed = true;
   }
+
+  private String getAbortCause(final Exception e) {
+    Throwable rootCause = ErrorHandlingUtils.getRootCause(e);
+    if (rootCause instanceof MemoryNotEnoughException) {
+      return DriverTaskAbortedException.BY_MEMORY_NOT_ENOUGH;
+    }
+    return DriverTaskAbortedException.BY_INTERNAL_ERROR_SCHEDULED;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskAbortedException.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskAbortedException.java
index 1f066e736d5..aac6d9fdebe 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskAbortedException.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskAbortedException.java
@@ -30,6 +30,9 @@ public class DriverTaskAbortedException extends Exception {
   public static final String BY_ALREADY_BEING_CANCELLED = "already being 
cancelled";
   public static final String BY_INTERNAL_ERROR_SCHEDULED = "internal error 
scheduled";
 
+  public static final String BY_MEMORY_NOT_ENOUGH =
+      "Memory is not enough to execute the query task.";
+
   public DriverTaskAbortedException(String driverTaskName, String causeMsg) {
     super(String.format("DriverTask %s is aborted by %s", driverTaskName, 
causeMsg));
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/PriorityMergeReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/PriorityMergeReader.java
index a7c96131acf..7fc89dce439 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/PriorityMergeReader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/PriorityMergeReader.java
@@ -182,6 +182,9 @@ public class PriorityMergeReader implements IPointReader {
       Element e = heap.poll();
       e.close();
     }
+    if (memoryReservationManager != null) {
+      memoryReservationManager.releaseMemoryCumulatively(usedMemorySize);
+    }
     usedMemorySize = 0;
   }
 

Reply via email to