Repository: spark Updated Branches: refs/heads/master 57c2d0880 -> 3bc552872
[SPARK-9946] [SPARK-9589] [SQL] fix NPE and thread-safety in TaskMemoryManager Currently, we access the `page.pageNumer` after it's freed, that could be modified by other thread, cause NPE. The same TaskMemoryManager could be used by multiple threads (for example, Python UDF and TransportScript), so it should be thread safe to allocate/free memory/page. The underlying Bitset and HashSet are not thread safe, we should put them inside a synchronized block. cc JoshRosen Author: Davies Liu <dav...@databricks.com> Closes #8177 from davies/memory_manager. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3bc55287 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3bc55287 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3bc55287 Branch: refs/heads/master Commit: 3bc55287220b1248e935bf817d880ff176ad4d3b Parents: 57c2d08 Author: Davies Liu <dav...@databricks.com> Authored: Fri Aug 14 12:32:35 2015 -0700 Committer: Davies Liu <davies....@gmail.com> Committed: Fri Aug 14 12:32:35 2015 -0700 ---------------------------------------------------------------------- .../spark/unsafe/memory/TaskMemoryManager.java | 44 +++++++++++++------- 1 file changed, 28 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/3bc55287/unsafe/src/main/java/org/apache/spark/unsafe/memory/TaskMemoryManager.java ---------------------------------------------------------------------- diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/memory/TaskMemoryManager.java b/unsafe/src/main/java/org/apache/spark/unsafe/memory/TaskMemoryManager.java index 358bb37..ca70d7f 100644 --- a/unsafe/src/main/java/org/apache/spark/unsafe/memory/TaskMemoryManager.java +++ b/unsafe/src/main/java/org/apache/spark/unsafe/memory/TaskMemoryManager.java @@ -144,14 +144,16 @@ public class TaskMemoryManager { public void freePage(MemoryBlock page) { assert (page.pageNumber != -1) : "Called freePage() on memory that wasn't allocated with allocatePage()"; - executorMemoryManager.free(page); + assert(allocatedPages.get(page.pageNumber)); + pageTable[page.pageNumber] = null; synchronized (this) { allocatedPages.clear(page.pageNumber); } - pageTable[page.pageNumber] = null; if (logger.isTraceEnabled()) { logger.trace("Freed page number {} ({} bytes)", page.pageNumber, page.size()); } + // Cannot access a page once it's freed. + executorMemoryManager.free(page); } /** @@ -166,7 +168,9 @@ public class TaskMemoryManager { public MemoryBlock allocate(long size) throws OutOfMemoryError { assert(size > 0) : "Size must be positive, but got " + size; final MemoryBlock memory = executorMemoryManager.allocate(size); - allocatedNonPageMemory.add(memory); + synchronized(allocatedNonPageMemory) { + allocatedNonPageMemory.add(memory); + } return memory; } @@ -176,8 +180,10 @@ public class TaskMemoryManager { public void free(MemoryBlock memory) { assert (memory.pageNumber == -1) : "Should call freePage() for pages, not free()"; executorMemoryManager.free(memory); - final boolean wasAlreadyRemoved = !allocatedNonPageMemory.remove(memory); - assert (!wasAlreadyRemoved) : "Called free() on memory that was already freed!"; + synchronized(allocatedNonPageMemory) { + final boolean wasAlreadyRemoved = !allocatedNonPageMemory.remove(memory); + assert (!wasAlreadyRemoved) : "Called free() on memory that was already freed!"; + } } /** @@ -223,9 +229,10 @@ public class TaskMemoryManager { if (inHeap) { final int pageNumber = decodePageNumber(pagePlusOffsetAddress); assert (pageNumber >= 0 && pageNumber < PAGE_TABLE_SIZE); - final Object page = pageTable[pageNumber].getBaseObject(); + final MemoryBlock page = pageTable[pageNumber]; assert (page != null); - return page; + assert (page.getBaseObject() != null); + return page.getBaseObject(); } else { return null; } @@ -244,7 +251,9 @@ public class TaskMemoryManager { // converted the absolute address into a relative address. Here, we invert that operation: final int pageNumber = decodePageNumber(pagePlusOffsetAddress); assert (pageNumber >= 0 && pageNumber < PAGE_TABLE_SIZE); - return pageTable[pageNumber].getBaseOffset() + offsetInPage; + final MemoryBlock page = pageTable[pageNumber]; + assert (page != null); + return page.getBaseOffset() + offsetInPage; } } @@ -260,14 +269,17 @@ public class TaskMemoryManager { freePage(page); } } - final Iterator<MemoryBlock> iter = allocatedNonPageMemory.iterator(); - while (iter.hasNext()) { - final MemoryBlock memory = iter.next(); - freedBytes += memory.size(); - // We don't call free() here because that calls Set.remove, which would lead to a - // ConcurrentModificationException here. - executorMemoryManager.free(memory); - iter.remove(); + + synchronized (allocatedNonPageMemory) { + final Iterator<MemoryBlock> iter = allocatedNonPageMemory.iterator(); + while (iter.hasNext()) { + final MemoryBlock memory = iter.next(); + freedBytes += memory.size(); + // We don't call free() here because that calls Set.remove, which would lead to a + // ConcurrentModificationException here. + executorMemoryManager.free(memory); + iter.remove(); + } } return freedBytes; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org