Repository: spark
Updated Branches:
  refs/heads/branch-1.6 3f40af574 -> 015569341


[SPARK-11805] free the array in UnsafeExternalSorter during spilling

After calling spill() on SortedIterator, the array inside InMemorySorter is not 
needed, it should be freed during spilling, this could help to join multiple 
tables with limited memory.

Author: Davies Liu <dav...@databricks.com>

Closes #9793 from davies/free_array.

(cherry picked from commit 58d9b260556a89a3d0832d583acafba1df7c6751)
Signed-off-by: Josh Rosen <joshro...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/01556934
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/01556934
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/01556934

Branch: refs/heads/branch-1.6
Commit: 015569341dc8031b62ea94a2d194bbd35567f727
Parents: 3f40af5
Author: Davies Liu <dav...@databricks.com>
Authored: Tue Nov 24 14:33:28 2015 -0800
Committer: Josh Rosen <joshro...@databricks.com>
Committed: Tue Nov 24 14:33:40 2015 -0800

----------------------------------------------------------------------
 .../unsafe/sort/UnsafeExternalSorter.java       | 10 ++++---
 .../unsafe/sort/UnsafeInMemorySorter.java       | 31 ++++++++------------
 2 files changed, 19 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/01556934/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 9a7b2ad..2e40312 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -468,6 +468,12 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
           }
           allocatedPages.clear();
         }
+
+        // in-memory sorter will not be used after spilling
+        assert(inMemSorter != null);
+        released += inMemSorter.getMemoryUsage();
+        inMemSorter.free();
+        inMemSorter = null;
         return released;
       }
     }
@@ -489,10 +495,6 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
           }
           upstream = nextUpstream;
           nextUpstream = null;
-
-          assert(inMemSorter != null);
-          inMemSorter.free();
-          inMemSorter = null;
         }
         numRecords--;
         upstream.loadNext();

http://git-wip-us.apache.org/repos/asf/spark/blob/01556934/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
index a218ad4..dce1f15 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
@@ -108,6 +108,7 @@ public final class UnsafeInMemorySorter {
    */
   public void free() {
     consumer.freeArray(array);
+    array = null;
   }
 
   public void reset() {
@@ -160,28 +161,22 @@ public final class UnsafeInMemorySorter {
     pos++;
   }
 
-  public static final class SortedIterator extends UnsafeSorterIterator {
+  public final class SortedIterator extends UnsafeSorterIterator {
 
-    private final TaskMemoryManager memoryManager;
-    private final int sortBufferInsertPosition;
-    private final LongArray sortBuffer;
-    private int position = 0;
+    private final int numRecords;
+    private int position;
     private Object baseObject;
     private long baseOffset;
     private long keyPrefix;
     private int recordLength;
 
-    private SortedIterator(
-        TaskMemoryManager memoryManager,
-        int sortBufferInsertPosition,
-        LongArray sortBuffer) {
-      this.memoryManager = memoryManager;
-      this.sortBufferInsertPosition = sortBufferInsertPosition;
-      this.sortBuffer = sortBuffer;
+    private SortedIterator(int numRecords) {
+      this.numRecords = numRecords;
+      this.position = 0;
     }
 
     public SortedIterator clone () {
-      SortedIterator iter = new SortedIterator(memoryManager, 
sortBufferInsertPosition, sortBuffer);
+      SortedIterator iter = new SortedIterator(numRecords);
       iter.position = position;
       iter.baseObject = baseObject;
       iter.baseOffset = baseOffset;
@@ -192,21 +187,21 @@ public final class UnsafeInMemorySorter {
 
     @Override
     public boolean hasNext() {
-      return position < sortBufferInsertPosition;
+      return position / 2 < numRecords;
     }
 
     public int numRecordsLeft() {
-      return (sortBufferInsertPosition - position) / 2;
+      return numRecords - position / 2;
     }
 
     @Override
     public void loadNext() {
       // This pointer points to a 4-byte record length, followed by the 
record's bytes
-      final long recordPointer = sortBuffer.get(position);
+      final long recordPointer = array.get(position);
       baseObject = memoryManager.getPage(recordPointer);
       baseOffset = memoryManager.getOffsetInPage(recordPointer) + 4;  // Skip 
over record length
       recordLength = Platform.getInt(baseObject, baseOffset - 4);
-      keyPrefix = sortBuffer.get(position + 1);
+      keyPrefix = array.get(position + 1);
       position += 2;
     }
 
@@ -229,6 +224,6 @@ public final class UnsafeInMemorySorter {
    */
   public SortedIterator getSortedIterator() {
     sorter.sort(array, 0, pos / 2, sortComparator);
-    return new SortedIterator(memoryManager, pos, array);
+    return new SortedIterator(pos / 2);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to