Repository: spark Updated Branches: refs/heads/branch-1.5 2fce5d880 -> b846a9dc3
[SPARK-10379] preserve first page in UnsafeShuffleExternalSorter Author: Davies Liu <dav...@databricks.com> Closes #8543 from davies/preserve_page. (cherry picked from commit 62b4690d6b3016f41292b640ac28644ef31e299d) Signed-off-by: Andrew Or <and...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b846a9dc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b846a9dc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b846a9dc Branch: refs/heads/branch-1.5 Commit: b846a9dc3f74af235111b6313900016c6ccac1b9 Parents: 2fce5d8 Author: Davies Liu <dav...@databricks.com> Authored: Wed Sep 2 22:15:54 2015 -0700 Committer: Andrew Or <and...@databricks.com> Committed: Wed Sep 2 22:16:05 2015 -0700 ---------------------------------------------------------------------- .../spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java | 4 ++++ .../org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala | 2 +- .../apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java | 5 +++-- 3 files changed, 8 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/b846a9dc/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java index 3d1ef0c..e73ba39 100644 --- a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java +++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java @@ -122,6 +122,10 @@ final class UnsafeShuffleExternalSorter { this.maxRecordSizeBytes = pageSizeBytes - 4; this.writeMetrics = writeMetrics; initializeForWriting(); + + // preserve first page to ensure that we have at least one page to work with. Otherwise, + // other operators in the same task may starve this sorter (SPARK-9709). + acquireNewPageIfNecessary(pageSizeBytes); } /** http://git-wip-us.apache.org/repos/asf/spark/blob/b846a9dc/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala index 1f2213d..417ff52 100644 --- a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala @@ -41,7 +41,7 @@ private[spark] class MapPartitionsWithPreparationRDD[U: ClassTag, T: ClassTag, M // In certain join operations, prepare can be called on the same partition multiple times. // In this case, we need to ensure that each call to compute gets a separate prepare argument. - private[this] var preparedArguments: ArrayBuffer[M] = new ArrayBuffer[M] + private[this] val preparedArguments: ArrayBuffer[M] = new ArrayBuffer[M] /** * Prepare a partition for a single call to compute. http://git-wip-us.apache.org/repos/asf/spark/blob/b846a9dc/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java index 94650be..a266b0c 100644 --- a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java @@ -530,8 +530,9 @@ public class UnsafeShuffleWriterSuite { for (int i = 0; i < numRecordsPerPage * 10; i++) { writer.insertRecordIntoSorter(new Tuple2<Object, Object>(1, 1)); newPeakMemory = writer.getPeakMemoryUsedBytes(); - if (i % numRecordsPerPage == 0) { - // We allocated a new page for this record, so peak memory should change + if (i % numRecordsPerPage == 0 && i != 0) { + // The first page is allocated in constructor, another page will be allocated after + // every numRecordsPerPage records (peak memory should change). assertEquals(previousPeakMemory + pageSizeBytes, newPeakMemory); } else { assertEquals(previousPeakMemory, newPeakMemory); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org