Repository: spark
Updated Branches:
  refs/heads/master 57c2d0880 -> 3bc552872


[SPARK-9946] [SPARK-9589] [SQL] fix NPE and thread-safety in TaskMemoryManager

Currently, we access the `page.pageNumer` after it's freed, that could be 
modified by other thread, cause NPE.

The same TaskMemoryManager could be used by multiple threads (for example, 
Python UDF and TransportScript), so it should be thread safe to allocate/free 
memory/page. The underlying Bitset and HashSet are not thread safe, we should 
put them inside a synchronized block.

cc JoshRosen

Author: Davies Liu <dav...@databricks.com>

Closes #8177 from davies/memory_manager.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3bc55287
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3bc55287
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3bc55287

Branch: refs/heads/master
Commit: 3bc55287220b1248e935bf817d880ff176ad4d3b
Parents: 57c2d08
Author: Davies Liu <dav...@databricks.com>
Authored: Fri Aug 14 12:32:35 2015 -0700
Committer: Davies Liu <davies....@gmail.com>
Committed: Fri Aug 14 12:32:35 2015 -0700

----------------------------------------------------------------------
 .../spark/unsafe/memory/TaskMemoryManager.java  | 44 +++++++++++++-------
 1 file changed, 28 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3bc55287/unsafe/src/main/java/org/apache/spark/unsafe/memory/TaskMemoryManager.java
----------------------------------------------------------------------
diff --git 
a/unsafe/src/main/java/org/apache/spark/unsafe/memory/TaskMemoryManager.java 
b/unsafe/src/main/java/org/apache/spark/unsafe/memory/TaskMemoryManager.java
index 358bb37..ca70d7f 100644
--- a/unsafe/src/main/java/org/apache/spark/unsafe/memory/TaskMemoryManager.java
+++ b/unsafe/src/main/java/org/apache/spark/unsafe/memory/TaskMemoryManager.java
@@ -144,14 +144,16 @@ public class TaskMemoryManager {
   public void freePage(MemoryBlock page) {
     assert (page.pageNumber != -1) :
       "Called freePage() on memory that wasn't allocated with allocatePage()";
-    executorMemoryManager.free(page);
+    assert(allocatedPages.get(page.pageNumber));
+    pageTable[page.pageNumber] = null;
     synchronized (this) {
       allocatedPages.clear(page.pageNumber);
     }
-    pageTable[page.pageNumber] = null;
     if (logger.isTraceEnabled()) {
       logger.trace("Freed page number {} ({} bytes)", page.pageNumber, 
page.size());
     }
+    // Cannot access a page once it's freed.
+    executorMemoryManager.free(page);
   }
 
   /**
@@ -166,7 +168,9 @@ public class TaskMemoryManager {
   public MemoryBlock allocate(long size) throws OutOfMemoryError {
     assert(size > 0) : "Size must be positive, but got " + size;
     final MemoryBlock memory = executorMemoryManager.allocate(size);
-    allocatedNonPageMemory.add(memory);
+    synchronized(allocatedNonPageMemory) {
+      allocatedNonPageMemory.add(memory);
+    }
     return memory;
   }
 
@@ -176,8 +180,10 @@ public class TaskMemoryManager {
   public void free(MemoryBlock memory) {
     assert (memory.pageNumber == -1) : "Should call freePage() for pages, not 
free()";
     executorMemoryManager.free(memory);
-    final boolean wasAlreadyRemoved = !allocatedNonPageMemory.remove(memory);
-    assert (!wasAlreadyRemoved) : "Called free() on memory that was already 
freed!";
+    synchronized(allocatedNonPageMemory) {
+      final boolean wasAlreadyRemoved = !allocatedNonPageMemory.remove(memory);
+      assert (!wasAlreadyRemoved) : "Called free() on memory that was already 
freed!";
+    }
   }
 
   /**
@@ -223,9 +229,10 @@ public class TaskMemoryManager {
     if (inHeap) {
       final int pageNumber = decodePageNumber(pagePlusOffsetAddress);
       assert (pageNumber >= 0 && pageNumber < PAGE_TABLE_SIZE);
-      final Object page = pageTable[pageNumber].getBaseObject();
+      final MemoryBlock page = pageTable[pageNumber];
       assert (page != null);
-      return page;
+      assert (page.getBaseObject() != null);
+      return page.getBaseObject();
     } else {
       return null;
     }
@@ -244,7 +251,9 @@ public class TaskMemoryManager {
       // converted the absolute address into a relative address. Here, we 
invert that operation:
       final int pageNumber = decodePageNumber(pagePlusOffsetAddress);
       assert (pageNumber >= 0 && pageNumber < PAGE_TABLE_SIZE);
-      return pageTable[pageNumber].getBaseOffset() + offsetInPage;
+      final MemoryBlock page = pageTable[pageNumber];
+      assert (page != null);
+      return page.getBaseOffset() + offsetInPage;
     }
   }
 
@@ -260,14 +269,17 @@ public class TaskMemoryManager {
         freePage(page);
       }
     }
-    final Iterator<MemoryBlock> iter = allocatedNonPageMemory.iterator();
-    while (iter.hasNext()) {
-      final MemoryBlock memory = iter.next();
-      freedBytes += memory.size();
-      // We don't call free() here because that calls Set.remove, which would 
lead to a
-      // ConcurrentModificationException here.
-      executorMemoryManager.free(memory);
-      iter.remove();
+
+    synchronized (allocatedNonPageMemory) {
+      final Iterator<MemoryBlock> iter = allocatedNonPageMemory.iterator();
+      while (iter.hasNext()) {
+        final MemoryBlock memory = iter.next();
+        freedBytes += memory.size();
+        // We don't call free() here because that calls Set.remove, which 
would lead to a
+        // ConcurrentModificationException here.
+        executorMemoryManager.free(memory);
+        iter.remove();
+      }
     }
     return freedBytes;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to