Repository: spark
Updated Branches:
  refs/heads/branch-2.0 f5c5a07bd -> e05ad8830


[SPARK-18208][SHUFFLE] Executor OOM due to a growing LongArray in 
BytesToBytesMap

## What changes were proposed in this pull request?

BytesToBytesMap currently does not release the in-memory storage (the longArray 
variable) after it spills to disk. This is typically not a problem during 
aggregation because the longArray should be much smaller than the pages, and 
because we grow the longArray at a conservative rate.

However this can lead to an OOM when an already running task is allocated more 
than its fair share, this can happen because of a scheduling delay. In this 
case the longArray can grow beyond the fair share of memory for the task. This 
becomes problematic when the task spills and the long array is not freed, that 
causes subsequent memory allocation requests to be denied by the memory manager 
resulting in an OOM.

This PR fixes this issuing by freeing the longArray when the BytesToBytesMap 
spills.

## How was this patch tested?

Existing tests and tested on realworld workloads.

Author: Jie Xiong <jiexi...@fb.com>
Author: jiexiong <jiexi...@gmail.com>

Closes #15722 from jiexiong/jie_oom_fix.

(cherry picked from commit c496d03b5289f7c604661a12af86f6accddcf125)
Signed-off-by: Herman van Hovell <hvanhov...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e05ad883
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e05ad883
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e05ad883

Branch: refs/heads/branch-2.0
Commit: e05ad8830e204acaf7cee4daef0ed44db9a158f3
Parents: f5c5a07
Author: Jie Xiong <jiexi...@fb.com>
Authored: Wed Dec 7 04:33:30 2016 -0800
Committer: Herman van Hovell <hvanhov...@databricks.com>
Committed: Wed Dec 7 04:34:04 2016 -0800

----------------------------------------------------------------------
 .../java/org/apache/spark/unsafe/map/BytesToBytesMap.java     | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e05ad883/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java 
b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index dc04025..947db7f 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -169,6 +169,8 @@ public final class BytesToBytesMap extends MemoryConsumer {
 
   private long peakMemoryUsedBytes = 0L;
 
+  private final int initialCapacity;
+
   private final BlockManager blockManager;
   private final SerializerManager serializerManager;
   private volatile MapIterator destructiveIterator = null;
@@ -201,6 +203,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
       throw new IllegalArgumentException("Page size " + pageSizeBytes + " 
cannot exceed " +
         TaskMemoryManager.MAXIMUM_PAGE_SIZE_BYTES);
     }
+    this.initialCapacity = initialCapacity;
     allocate(initialCapacity);
   }
 
@@ -897,12 +900,12 @@ public final class BytesToBytesMap extends MemoryConsumer 
{
   public void reset() {
     numKeys = 0;
     numValues = 0;
-    longArray.zeroOut();
-
+    freeArray(longArray);
     while (dataPages.size() > 0) {
       MemoryBlock dataPage = dataPages.removeLast();
       freePage(dataPage);
     }
+    allocate(initialCapacity);
     currentPage = null;
     pageCursor = 0;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to