Repository: spark
Updated Branches:
  refs/heads/master 084e4e126 -> 83f6f54d1


[SPARK-10474] [SQL] Aggregation fails to allocate memory for pointer array 
(round 2)

This patch reverts most of the changes in a previous fix #8827.

The real cause of the issue is that in `TungstenAggregate`'s prepare method we 
only reserve 1 page, but later when we switch to sort-based aggregation we try 
to acquire 1 page AND a pointer array. The longer-term fix should be to reserve 
also the pointer array, but for now ***we will simply not track the pointer 
array***. (Note that elsewhere we already don't track the pointer array, e.g. 
[here](https://github.com/apache/spark/blob/a18208047f06a4244703c17023bb20cbe1f59d73/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java#L88))

Note: This patch reuses the unit test added in #8827 so it doesn't show up in 
the diff.

Author: Andrew Or <and...@databricks.com>

Closes #8888 from andrewor14/dont-track-pointer-array.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/83f6f54d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/83f6f54d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/83f6f54d

Branch: refs/heads/master
Commit: 83f6f54d12a418f5158ee7ee985b54eef8cc1cf0
Parents: 084e4e1
Author: Andrew Or <and...@databricks.com>
Authored: Wed Sep 23 19:34:31 2015 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Wed Sep 23 19:34:31 2015 -0700

----------------------------------------------------------------------
 .../unsafe/sort/UnsafeExternalSorter.java       | 51 +++++---------------
 .../sql/execution/UnsafeKVExternalSorter.java   |  9 +---
 .../UnsafeFixedWidthAggregationMapSuite.scala   |  8 +--
 3 files changed, 16 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/83f6f54d/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 14b6aaf..0a311d2 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -159,16 +159,15 @@ public final class UnsafeExternalSorter {
   /**
    * Allocates new sort data structures. Called when creating the sorter and 
after each spill.
    */
-  public void initializeForWriting() throws IOException {
+  private void initializeForWriting() throws IOException {
+    // Note: Do not track memory for the pointer array for now because of 
SPARK-10474.
+    // In more detail, in TungstenAggregate we only reserve a page, but when 
we fall back to
+    // sort-based aggregation we try to acquire a page AND a pointer array, 
which inevitably
+    // fails if all other memory is already occupied. It should be safe to not 
track the array
+    // because its memory footprint is frequently much smaller than that of a 
page. This is a
+    // temporary hack that we should address in 1.6.0.
+    // TODO: track the pointer array memory!
     this.writeMetrics = new ShuffleWriteMetrics();
-    final long pointerArrayMemory =
-      UnsafeInMemorySorter.getMemoryRequirementsForPointerArray(initialSize);
-    final long memoryAcquired = 
shuffleMemoryManager.tryToAcquire(pointerArrayMemory);
-    if (memoryAcquired != pointerArrayMemory) {
-      shuffleMemoryManager.release(memoryAcquired);
-      throw new IOException("Could not acquire " + pointerArrayMemory + " 
bytes of memory");
-    }
-
     this.inMemSorter =
       new UnsafeInMemorySorter(taskMemoryManager, recordComparator, 
prefixComparator, initialSize);
     this.isInMemSorterExternal = false;
@@ -187,14 +186,6 @@ public final class UnsafeExternalSorter {
    * Sort and spill the current records in response to memory pressure.
    */
   public void spill() throws IOException {
-    spill(true);
-  }
-
-  /**
-   * Sort and spill the current records in response to memory pressure.
-   * @param shouldInitializeForWriting whether to allocate memory for writing 
after the spill
-   */
-  public void spill(boolean shouldInitializeForWriting) throws IOException {
     assert(inMemSorter != null);
     logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)",
       Thread.currentThread().getId(),
@@ -225,9 +216,7 @@ public final class UnsafeExternalSorter {
     // written to disk. This also counts the space needed to store the 
sorter's pointer array.
     taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
 
-    if (shouldInitializeForWriting) {
-      initializeForWriting();
-    }
+    initializeForWriting();
   }
 
   /**
@@ -275,14 +264,7 @@ public final class UnsafeExternalSorter {
       shuffleMemoryManager.release(block.size());
       memoryFreed += block.size();
     }
-    if (inMemSorter != null) {
-      if (!isInMemSorterExternal) {
-        long sorterMemoryUsage = inMemSorter.getMemoryUsage();
-        memoryFreed += sorterMemoryUsage;
-        shuffleMemoryManager.release(sorterMemoryUsage);
-      }
-      inMemSorter = null;
-    }
+    // TODO: track in-memory sorter memory usage (SPARK-10474)
     allocatedPages.clear();
     currentPage = null;
     currentPagePosition = -1;
@@ -320,17 +302,8 @@ public final class UnsafeExternalSorter {
   private void growPointerArrayIfNecessary() throws IOException {
     assert(inMemSorter != null);
     if (!inMemSorter.hasSpaceForAnotherRecord()) {
-      logger.debug("Attempting to expand sort pointer array");
-      final long oldPointerArrayMemoryUsage = inMemSorter.getMemoryUsage();
-      final long memoryToGrowPointerArray = oldPointerArrayMemoryUsage * 2;
-      final long memoryAcquired = 
shuffleMemoryManager.tryToAcquire(memoryToGrowPointerArray);
-      if (memoryAcquired < memoryToGrowPointerArray) {
-        shuffleMemoryManager.release(memoryAcquired);
-        spill();
-      } else {
-        inMemSorter.expandPointerArray();
-        shuffleMemoryManager.release(oldPointerArrayMemoryUsage);
-      }
+      // TODO: track the pointer array memory! (SPARK-10474)
+      inMemSorter.expandPointerArray();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/83f6f54d/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
index b81f67a..9df5780 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
@@ -85,7 +85,7 @@ public final class UnsafeKVExternalSorter {
       // We will use the number of elements in the map as the initialSize of 
the
       // UnsafeInMemorySorter. Because UnsafeInMemorySorter does not accept 0 
as the initialSize,
       // we will use 1 as its initial size if the map is empty.
-      // TODO: track pointer array memory used by this in-memory sorter!
+      // TODO: track pointer array memory used by this in-memory sorter! 
(SPARK-10474)
       final UnsafeInMemorySorter inMemSorter = new UnsafeInMemorySorter(
         taskMemoryManager, recordComparator, prefixComparator, Math.max(1, 
map.numElements()));
 
@@ -124,13 +124,8 @@ public final class UnsafeKVExternalSorter {
         pageSizeBytes,
         inMemSorter);
 
-      // Note: This spill doesn't actually release any memory, so if we try to 
allocate a new
-      // pointer array immediately after the spill then we may fail to acquire 
sufficient space
-      // for it (SPARK-10474). For this reason, we must initialize for writing 
explicitly *after*
-      // we have actually freed memory from our map.
-      sorter.spill(false /* initialize for writing */);
+      sorter.spill();
       map.free();
-      sorter.initializeForWriting();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/83f6f54d/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
index ada4d42..1739798 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
@@ -200,9 +200,7 @@ class UnsafeFixedWidthAggregationMapSuite
     val sorter = map.destructAndCreateExternalSorter()
 
     withClue(s"destructAndCreateExternalSorter should release memory used by 
the map") {
-      // 4096 * 16 is the initial size allocated for the pointer/prefix array 
in the in-mem sorter.
-      assert(shuffleMemoryManager.getMemoryConsumptionForThisTask() ===
-        initialMemoryConsumption + 4096 * 16)
+      assert(shuffleMemoryManager.getMemoryConsumptionForThisTask() === 
initialMemoryConsumption)
     }
 
     // Add more keys to the sorter and make sure the results come out sorted.
@@ -305,9 +303,7 @@ class UnsafeFixedWidthAggregationMapSuite
     val sorter = map.destructAndCreateExternalSorter()
 
     withClue(s"destructAndCreateExternalSorter should release memory used by 
the map") {
-      // 4096 * 16 is the initial size allocated for the pointer/prefix array 
in the in-mem sorter.
-      assert(shuffleMemoryManager.getMemoryConsumptionForThisTask() ===
-        initialMemoryConsumption + 4096 * 16)
+      assert(shuffleMemoryManager.getMemoryConsumptionForThisTask() === 
initialMemoryConsumption)
     }
 
     // Add more keys to the sorter and make sure the results come out sorted.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to