Repository: spark
Updated Branches:
  refs/heads/branch-1.5 2fce5d880 -> b846a9dc3


[SPARK-10379] preserve first page in UnsafeShuffleExternalSorter

Author: Davies Liu <dav...@databricks.com>

Closes #8543 from davies/preserve_page.

(cherry picked from commit 62b4690d6b3016f41292b640ac28644ef31e299d)
Signed-off-by: Andrew Or <and...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b846a9dc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b846a9dc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b846a9dc

Branch: refs/heads/branch-1.5
Commit: b846a9dc3f74af235111b6313900016c6ccac1b9
Parents: 2fce5d8
Author: Davies Liu <dav...@databricks.com>
Authored: Wed Sep 2 22:15:54 2015 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Wed Sep 2 22:16:05 2015 -0700

----------------------------------------------------------------------
 .../spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java       | 4 ++++
 .../org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala  | 2 +-
 .../apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java   | 5 +++--
 3 files changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b846a9dc/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java
 
b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java
index 3d1ef0c..e73ba39 100644
--- 
a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java
+++ 
b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java
@@ -122,6 +122,10 @@ final class UnsafeShuffleExternalSorter {
     this.maxRecordSizeBytes = pageSizeBytes - 4;
     this.writeMetrics = writeMetrics;
     initializeForWriting();
+
+    // preserve first page to ensure that we have at least one page to work 
with. Otherwise,
+    // other operators in the same task may starve this sorter (SPARK-9709).
+    acquireNewPageIfNecessary(pageSizeBytes);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/b846a9dc/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala
 
b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala
index 1f2213d..417ff52 100644
--- 
a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala
+++ 
b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala
@@ -41,7 +41,7 @@ private[spark] class MapPartitionsWithPreparationRDD[U: 
ClassTag, T: ClassTag, M
 
   // In certain join operations, prepare can be called on the same partition 
multiple times.
   // In this case, we need to ensure that each call to compute gets a separate 
prepare argument.
-  private[this] var preparedArguments: ArrayBuffer[M] = new ArrayBuffer[M]
+  private[this] val preparedArguments: ArrayBuffer[M] = new ArrayBuffer[M]
 
   /**
    * Prepare a partition for a single call to compute.

http://git-wip-us.apache.org/repos/asf/spark/blob/b846a9dc/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
 
b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
index 94650be..a266b0c 100644
--- 
a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
+++ 
b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
@@ -530,8 +530,9 @@ public class UnsafeShuffleWriterSuite {
       for (int i = 0; i < numRecordsPerPage * 10; i++) {
         writer.insertRecordIntoSorter(new Tuple2<Object, Object>(1, 1));
         newPeakMemory = writer.getPeakMemoryUsedBytes();
-        if (i % numRecordsPerPage == 0) {
-          // We allocated a new page for this record, so peak memory should 
change
+        if (i % numRecordsPerPage == 0 && i != 0) {
+          // The first page is allocated in constructor, another page will be 
allocated after
+          // every numRecordsPerPage records (peak memory should change).
           assertEquals(previousPeakMemory + pageSizeBytes, newPeakMemory);
         } else {
           assertEquals(previousPeakMemory, newPeakMemory);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to