Repository: spark
Updated Branches:
  refs/heads/branch-1.6 582ed8a6e -> 413d0600e


[SPARK-14363] Fix executor OOM due to memory leak in the Sorter

Fix memory leak in the Sorter. When the UnsafeExternalSorter spills the data to 
disk, it does not free up the underlying pointer array. As a result, we see a 
lot of executor OOM and also memory under utilization.
This is a regression partially introduced in PR 
https://github.com/apache/spark/pull/9241

Tested by running a job and observed around 30% speedup after this change.

Author: Sital Kedia <ske...@fb.com>

Closes #12285 from sitalkedia/executor_oom.

(cherry picked from commit d187e7dea9540d26b7800de4eb79863ef5f574bf)
Signed-off-by: Davies Liu <davies....@gmail.com>

Conflicts:
        
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
        
core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/413d0600
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/413d0600
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/413d0600

Branch: refs/heads/branch-1.6
Commit: 413d0600ed61990e657c97d50d1431a8cd1ab0ed
Parents: 582ed8a
Author: Sital Kedia <ske...@fb.com>
Authored: Tue Apr 12 16:10:07 2016 -0700
Committer: Davies Liu <davies....@gmail.com>
Committed: Tue Apr 12 16:12:17 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/shuffle/sort/ShuffleExternalSorter.java  | 6 ++++--
 .../org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java  | 7 +++++++
 .../util/collection/unsafe/sort/UnsafeExternalSorter.java     | 7 +++++--
 .../util/collection/unsafe/sort/UnsafeInMemorySorter.java     | 7 +++++++
 4 files changed, 23 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/413d0600/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java 
b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
index 52032cf..22348c0 100644
--- 
a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
+++ 
b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
@@ -215,8 +215,6 @@ final class ShuffleExternalSorter extends MemoryConsumer {
       }
     }
 
-    inMemSorter.reset();
-
     if (!isLastFile) {  // i.e. this is a spill file
       // The current semantics of `shuffleRecordsWritten` seem to be that it's 
updated when records
       // are written to disk, not when they enter the shuffle sorting code. 
DiskBlockObjectWriter
@@ -255,6 +253,10 @@ final class ShuffleExternalSorter extends MemoryConsumer {
 
     writeSortedFile(false);
     final long spillSize = freeMemory();
+    inMemSorter.reset();
+    // Reset the in-memory sorter's pointer array only after freeing up the 
memory pages holding the
+    // records. Otherwise, if the task is over allocated memory, then without 
freeing the memory pages,
+    // we might not be able to get memory for the pointer array.
     taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
     return spillSize;
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/413d0600/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java 
b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
index d74602c..1afa719 100644
--- 
a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
+++ 
b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
@@ -49,9 +49,12 @@ final class ShuffleInMemorySorter {
    */
   private int pos = 0;
 
+  private int initialSize;
+
   public ShuffleInMemorySorter(MemoryConsumer consumer, int initialSize) {
     this.consumer = consumer;
     assert (initialSize > 0);
+    this.initialSize = initialSize;
     this.array = consumer.allocateArray(initialSize);
     this.sorter = new Sorter<>(ShuffleSortDataFormat.INSTANCE);
   }
@@ -68,6 +71,10 @@ final class ShuffleInMemorySorter {
   }
 
   public void reset() {
+    if (consumer != null) {
+      consumer.freeArray(array);
+      this.array = consumer.allocateArray(initialSize);
+    }
     pos = 0;
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/413d0600/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 1b84e98..de38c2d 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -192,14 +192,17 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
         spillWriter.write(baseObject, baseOffset, recordLength, 
sortedRecords.getKeyPrefix());
       }
       spillWriter.close();
-
-      inMemSorter.reset();
     }
 
     final long spillSize = freeMemory();
     // Note that this is more-or-less going to be a multiple of the page size, 
so wasted space in
     // pages will currently be counted as memory spilled even though that 
space isn't actually
     // written to disk. This also counts the space needed to store the 
sorter's pointer array.
+    inMemSorter.reset();
+    // Reset the in-memory sorter's pointer array only after freeing up the 
memory pages holding the
+    // records. Otherwise, if the task is over allocated memory, then without 
freeing the memory pages,
+    // we might not be able to get memory for the pointer array.
+
     taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
 
     return spillSize;

http://git-wip-us.apache.org/repos/asf/spark/blob/413d0600/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
index 308db22..05390c8 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
@@ -80,6 +80,8 @@ public final class UnsafeInMemorySorter {
    */
   private int pos = 0;
 
+  private long initialSize;
+
   public UnsafeInMemorySorter(
     final MemoryConsumer consumer,
     final TaskMemoryManager memoryManager,
@@ -98,6 +100,7 @@ public final class UnsafeInMemorySorter {
       LongArray array) {
     this.consumer = consumer;
     this.memoryManager = memoryManager;
+    this.initialSize = array.size();
     this.sorter = new Sorter<>(UnsafeSortDataFormat.INSTANCE);
     this.sortComparator = new SortComparator(recordComparator, 
prefixComparator, memoryManager);
     this.array = array;
@@ -114,6 +117,10 @@ public final class UnsafeInMemorySorter {
   }
 
   public void reset() {
+    if (consumer != null) {
+      consumer.freeArray(array);
+      this.array = consumer.allocateArray(initialSize);
+    }
     pos = 0;
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to