This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git

commit e35d287dd9fd5b7bd7e06025f535772b482b443c
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Tue Dec 11 12:22:58 2018 -0800

    [SPARK-26265][CORE][BRANCH-2.4] Fix deadlock in BytesToBytesMap.MapIterator 
when locking both BytesToBytesMap.MapIterator and TaskMemoryManager
    
    ## What changes were proposed in this pull request?
    
    In `BytesToBytesMap.MapIterator.advanceToNextPage`, We will first lock this 
`MapIterator` and then `TaskMemoryManager` when going to free a memory page by 
calling `freePage`. At the same time, it is possibly that another memory 
consumer first locks `TaskMemoryManager` and then this `MapIterator` when it 
acquires memory and causes spilling on this `MapIterator`.
    
    So it ends with the `MapIterator` object holds lock to the `MapIterator` 
object and waits for lock on `TaskMemoryManager`, and the other consumer holds 
lock to `TaskMemoryManager` and waits for lock on the `MapIterator` object.
    
    To avoid deadlock here, this patch proposes to keep reference to the page 
to free and free it after releasing the lock of `MapIterator`.
    
    This backports the fix to branch-2.4.
    
    ## How was this patch tested?
    
     Added test and manually test by running the test 100 times to make sure 
there is no deadlock.
    
    Closes #23289 from viirya/SPARK-26265-2.4.
    
    Authored-by: Liang-Chi Hsieh <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../apache/spark/unsafe/map/BytesToBytesMap.java   | 12 +++++-
 .../apache/spark/memory/TestMemoryConsumer.java    |  4 +-
 .../unsafe/map/AbstractBytesToBytesMapSuite.java   | 47 ++++++++++++++++++++++
 3 files changed, 60 insertions(+), 3 deletions(-)

diff --git 
a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java 
b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 9b6cbab..6465033 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -267,11 +267,18 @@ public final class BytesToBytesMap extends MemoryConsumer 
{
     }
 
     private void advanceToNextPage() {
+      // SPARK-26265: We will first lock this `MapIterator` and then 
`TaskMemoryManager` when going
+      // to free a memory page by calling `freePage`. At the same time, it is 
possibly that another
+      // memory consumer first locks `TaskMemoryManager` and then this 
`MapIterator` when it
+      // acquires memory and causes spilling on this `MapIterator`. To avoid 
deadlock here, we keep
+      // reference to the page to free and free it after releasing the lock of 
`MapIterator`.
+      MemoryBlock pageToFree = null;
+
       synchronized (this) {
         int nextIdx = dataPages.indexOf(currentPage) + 1;
         if (destructive && currentPage != null) {
           dataPages.remove(currentPage);
-          freePage(currentPage);
+          pageToFree = currentPage;
           nextIdx --;
         }
         if (dataPages.size() > nextIdx) {
@@ -295,6 +302,9 @@ public final class BytesToBytesMap extends MemoryConsumer {
           }
         }
       }
+      if (pageToFree != null) {
+        freePage(pageToFree);
+      }
     }
 
     @Override
diff --git a/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java 
b/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java
index 0bbaea6..6aa577d 100644
--- a/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java
+++ b/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java
@@ -38,12 +38,12 @@ public class TestMemoryConsumer extends MemoryConsumer {
     return used;
   }
 
-  void use(long size) {
+  public void use(long size) {
     long got = taskMemoryManager.acquireExecutionMemory(size, this);
     used += got;
   }
 
-  void free(long size) {
+  public void free(long size) {
     used -= size;
     taskMemoryManager.releaseExecutionMemory(size, this);
   }
diff --git 
a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
 
b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
index 53a233f..278d28f 100644
--- 
a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
+++ 
b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
@@ -33,6 +33,8 @@ import org.mockito.MockitoAnnotations;
 
 import org.apache.spark.SparkConf;
 import org.apache.spark.executor.ShuffleWriteMetrics;
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.memory.TestMemoryConsumer;
 import org.apache.spark.memory.TaskMemoryManager;
 import org.apache.spark.memory.TestMemoryManager;
 import org.apache.spark.network.util.JavaUtils;
@@ -667,4 +669,49 @@ public abstract class AbstractBytesToBytesMapSuite {
     }
   }
 
+  @Test
+  public void avoidDeadlock() throws InterruptedException {
+    memoryManager.limit(PAGE_SIZE_BYTES);
+    MemoryMode mode = useOffHeapMemoryAllocator() ? MemoryMode.OFF_HEAP: 
MemoryMode.ON_HEAP;
+    TestMemoryConsumer c1 = new TestMemoryConsumer(taskMemoryManager, mode);
+    BytesToBytesMap map =
+      new BytesToBytesMap(taskMemoryManager, blockManager, serializerManager, 
1, 0.5, 1024, false);
+
+    Thread thread = new Thread(() -> {
+      int i = 0;
+      long used = 0;
+      while (i < 10) {
+        c1.use(10000000);
+        used += 10000000;
+        i++;
+      }
+      c1.free(used);
+    });
+
+    try {
+      int i;
+      for (i = 0; i < 1024; i++) {
+        final long[] arr = new long[]{i};
+        final BytesToBytesMap.Location loc = map.lookup(arr, 
Platform.LONG_ARRAY_OFFSET, 8);
+        loc.append(arr, Platform.LONG_ARRAY_OFFSET, 8, arr, 
Platform.LONG_ARRAY_OFFSET, 8);
+      }
+
+      // Starts to require memory at another memory consumer.
+      thread.start();
+
+      BytesToBytesMap.MapIterator iter = map.destructiveIterator();
+      for (i = 0; i < 1024; i++) {
+        iter.next();
+      }
+      assertFalse(iter.hasNext());
+    } finally {
+      map.free();
+      thread.join();
+      for (File spillFile : spillFilesCreated) {
+        assertFalse("Spill file " + spillFile.getPath() + " was not cleaned 
up",
+          spillFile.exists());
+      }
+    }
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to