This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new f687dcb7f76 Fix potential memory leak in MemoryPool
f687dcb7f76 is described below

commit f687dcb7f76067176ee549b3b2407c64436ce51a
Author: Liao Lanyu <[email protected]>
AuthorDate: Wed Jul 26 20:05:48 2023 +0800

    Fix potential memory leak in MemoryPool
---
 .../execution/exchange/MPPDataExchangeManager.java |  2 +-
 .../execution/exchange/sink/SinkChannel.java       |  2 +-
 .../queryengine/execution/memory/MemoryPool.java   | 91 ++++++----------------
 3 files changed, 25 insertions(+), 70 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java
index b92e45eeb2e..886befdf4e9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java
@@ -551,7 +551,7 @@ public class MPPDataExchangeManager implements 
IMPPDataExchangeManager {
   public void deRegisterFragmentInstanceFromMemoryPool(String queryId, String 
fragmentInstanceId) {
     localMemoryManager
         .getQueryPool()
-        .deRegisterFragmentInstanceToQueryMemoryMap(queryId, 
fragmentInstanceId);
+        .deRegisterFragmentInstanceFromQueryMemoryMap(queryId, 
fragmentInstanceId);
   }
 
   public LocalMemoryManager getLocalMemoryManager() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java
index f1a31847237..cc5b7299466 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java
@@ -258,7 +258,7 @@ public class SinkChannel implements ISinkChannel {
     }
     sequenceIdToTsBlock.clear();
     if (blocked != null) {
-      bufferRetainedSizeInBytes -= 
localMemoryManager.getQueryPool().tryComplete(blocked);
+      bufferRetainedSizeInBytes -= 
localMemoryManager.getQueryPool().tryCancel(blocked);
     }
     if (bufferRetainedSizeInBytes > 0) {
       localMemoryManager
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
index 6fd1cbd5b1f..211b614e99b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java
@@ -32,9 +32,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
-import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
@@ -58,8 +56,6 @@ public class MemoryPool {
      */
     private final long maxBytesCanReserve;
 
-    private boolean isMarked = false;
-
     private MemoryReservationFuture(
         String queryId,
         String fragmentInstanceId,
@@ -80,14 +76,6 @@ public class MemoryPool {
       return queryId;
     }
 
-    public boolean isMarked() {
-      return isMarked;
-    }
-
-    public void setMarked(boolean marked) {
-      isMarked = marked;
-    }
-
     public String getFragmentInstanceId() {
       return fragmentInstanceId;
     }
@@ -188,20 +176,17 @@ public class MemoryPool {
    *
    * @throws MemoryLeakException throw {@link MemoryLeakException}
    */
-  public void deRegisterFragmentInstanceToQueryMemoryMap(
+  public void deRegisterFragmentInstanceFromQueryMemoryMap(
       String queryId, String fragmentInstanceId) {
     Map<String, Map<String, Long>> queryRelatedMemory = 
queryMemoryReservations.get(queryId);
     if (queryRelatedMemory != null) {
       Map<String, Long> fragmentRelatedMemory = 
queryRelatedMemory.get(fragmentInstanceId);
+      boolean hasPotentialMemoryLeak = false;
       // fragmentRelatedMemory could be null if the FI has not reserved any 
memory(For example,
       // next() of root operator returns no data)
       if (fragmentRelatedMemory != null) {
-        for (Long memoryReserved : fragmentRelatedMemory.values()) {
-          if (memoryReserved != 0) {
-            throw new MemoryLeakException(
-                "PlanNode related memory is not zero when deregister FI from 
query memory pool.");
-          }
-        }
+        hasPotentialMemoryLeak =
+            fragmentRelatedMemory.values().stream().anyMatch(value -> value != 
0);
       }
       synchronized (queryMemoryReservations) {
         queryRelatedMemory.remove(fragmentInstanceId);
@@ -209,6 +194,10 @@ public class MemoryPool {
           queryMemoryReservations.remove(queryId);
         }
       }
+      if (hasPotentialMemoryLeak) {
+        throw new MemoryLeakException(
+            "PlanNode related memory is not zero when trying to deregister FI 
from query memory pool. QueryId is : {}, FragmentInstanceId is : {}, PlanNode 
related memory is : {}.");
+      }
     }
   }
 
@@ -288,36 +277,21 @@ public class MemoryPool {
    * @return If the future has not complete, return the number of bytes being 
reserved. Otherwise,
    *     return 0.
    */
+  @SuppressWarnings("squid:S2445")
   public synchronized long tryCancel(ListenableFuture<Void> future) {
-    Validate.notNull(future);
-    // If the future is not a MemoryReservationFuture, it must have been 
completed.
-    if (future.isDone()) {
-      return 0L;
-    }
-    Validate.isTrue(
-        future instanceof MemoryReservationFuture,
-        "invalid future type " + future.getClass().getSimpleName());
-    future.cancel(true);
-    return ((MemoryReservationFuture<Void>) future).getBytesToReserve();
-  }
-
-  /**
-   * Complete the specified memory reservation. If the reservation has 
finished, do nothing.
-   *
-   * @param future The future returned from {@link #reserve(String, String, 
String, long, long)}
-   * @return If the future has not complete, return the number of bytes being 
reserved. Otherwise,
-   *     return 0.
-   */
-  public synchronized long tryComplete(ListenableFuture<Void> future) {
-    Validate.notNull(future);
-    // If the future is not a MemoryReservationFuture, it must have been 
completed.
-    if (future.isDone()) {
-      return 0L;
+    // add synchronized on the future to avoid that the future is concurrently 
completed by
+    // MemoryPool.free() which may lead to memory leak.
+    synchronized (future) {
+      Validate.notNull(future);
+      // If the future is not a MemoryReservationFuture, it must have been 
completed.
+      if (future.isDone()) {
+        return 0L;
+      }
+      Validate.isTrue(
+          future instanceof MemoryReservationFuture,
+          "invalid future type " + future.getClass().getSimpleName());
+      future.cancel(true);
     }
-    Validate.isTrue(
-        future instanceof MemoryReservationFuture,
-        "invalid future type " + future.getClass().getSimpleName());
-    ((MemoryReservationFuture<Void>) future).set(null);
     return ((MemoryReservationFuture<Void>) future).getBytesToReserve();
   }
 
@@ -343,7 +317,6 @@ public class MemoryPool {
 
     remainingBytes.addAndGet(bytes);
 
-    List<MemoryReservationFuture<Void>> futureList = new ArrayList<>();
     if (memoryReservationFutures.isEmpty()) {
       return;
     }
@@ -351,7 +324,7 @@ public class MemoryPool {
     while (iterator.hasNext()) {
       MemoryReservationFuture<Void> future = iterator.next();
       synchronized (future) {
-        if (future.isCancelled() || future.isDone() || future.isMarked()) {
+        if (future.isCancelled() || future.isDone()) {
           continue;
         }
         long bytesToReserve = future.getBytesToReserve();
@@ -361,31 +334,13 @@ public class MemoryPool {
         long maxBytesCanReserve = future.getMaxBytesCanReserve();
         if (tryReserve(
             curQueryId, curFragmentInstanceId, curPlanNodeId, bytesToReserve, 
maxBytesCanReserve)) {
-          futureList.add(future);
-          future.setMarked(true);
+          future.set(null);
           iterator.remove();
         } else {
           rollbackReserve(curQueryId, curFragmentInstanceId, curPlanNodeId, 
bytesToReserve);
         }
       }
     }
-
-    // why we need to put this outside MemoryPool's lock?
-    // If we put this block inside the MemoryPool's lock, we will get deadlock 
case like the
-    // following:
-    // Assuming that thread-A: LocalSourceHandle.receive() -> 
A-SharedTsBlockQueue.remove() ->
-    // MemoryPool.free() (hold future's lock) -> future.set(null) -> try to get
-    // B-SharedTsBlockQueue's lock
-    // thread-B: LocalSourceHandle.receive() -> B-SharedTsBlockQueue.remove() 
(hold
-    // B-SharedTsBlockQueue's lock) -> try to get future's lock
-    for (MemoryReservationFuture<Void> future : futureList) {
-      try {
-        future.set(null);
-      } catch (Throwable t) {
-        // ignore it, because we still need to notify other future
-        LOGGER.warn("error happened while trying to free memory: ", t);
-      }
-    }
   }
 
   public long getQueryMemoryReservedBytes(String queryId) {

Reply via email to