Repository: spark
Updated Branches:
  refs/heads/master eabc7b8ee -> cd81fc9e8


[SPARK-12400][SHUFFLE] Avoid generating temp shuffle files for empty partitions

This problem lies in `BypassMergeSortShuffleWriter`, empty partition will also 
generate a temp shuffle file with several bytes. So here change to only create 
file when partition is not empty.

This problem only lies in here, no such issue in `HashShuffleWriter`.

Please help to review, thanks a lot.

Author: jerryshao <ss...@hortonworks.com>

Closes #10376 from jerryshao/SPARK-12400.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cd81fc9e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cd81fc9e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cd81fc9e

Branch: refs/heads/master
Commit: cd81fc9e8652c07b84f0887a24d67381b4e605fa
Parents: eabc7b8
Author: jerryshao <ss...@hortonworks.com>
Authored: Wed Jan 13 16:34:23 2016 -0800
Committer: Josh Rosen <joshro...@databricks.com>
Committed: Wed Jan 13 16:34:23 2016 -0800

----------------------------------------------------------------------
 .../sort/BypassMergeSortShuffleWriter.java      | 25 +++++++------
 .../BypassMergeSortShuffleWriterSuite.scala     | 38 +++++++++++++++++++-
 2 files changed, 51 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cd81fc9e/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
 
b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
index a1a1fb0..56cdc22 100644
--- 
a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
+++ 
b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
@@ -138,7 +138,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends 
ShuffleWriter<K, V> {
       final File file = tempShuffleBlockIdPlusFile._2();
       final BlockId blockId = tempShuffleBlockIdPlusFile._1();
       partitionWriters[i] =
-        blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, 
writeMetrics).open();
+        blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, 
writeMetrics);
     }
     // Creating the file to write to and creating a disk writer both involve 
interacting with
     // the disk, and can take a long time in aggregate when we open many 
files, so should be
@@ -185,16 +185,19 @@ final class BypassMergeSortShuffleWriter<K, V> extends 
ShuffleWriter<K, V> {
     boolean threwException = true;
     try {
       for (int i = 0; i < numPartitions; i++) {
-        final FileInputStream in = new 
FileInputStream(partitionWriters[i].fileSegment().file());
-        boolean copyThrewException = true;
-        try {
-          lengths[i] = Utils.copyStream(in, out, false, transferToEnabled);
-          copyThrewException = false;
-        } finally {
-          Closeables.close(in, copyThrewException);
-        }
-        if (!partitionWriters[i].fileSegment().file().delete()) {
-          logger.error("Unable to delete file for partition {}", i);
+        final File file = partitionWriters[i].fileSegment().file();
+        if (file.exists()) {
+          final FileInputStream in = new FileInputStream(file);
+          boolean copyThrewException = true;
+          try {
+            lengths[i] = Utils.copyStream(in, out, false, transferToEnabled);
+            copyThrewException = false;
+          } finally {
+            Closeables.close(in, copyThrewException);
+          }
+          if (!file.delete()) {
+            logger.error("Unable to delete file for partition {}", i);
+          }
         }
       }
       threwException = false;

http://git-wip-us.apache.org/repos/asf/spark/blob/cd81fc9e/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
 
b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
index e33408b..ef6ce04 100644
--- 
a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
@@ -105,7 +105,7 @@ class BypassMergeSortShuffleWriterSuite extends 
SparkFunSuite with BeforeAndAfte
       new Answer[(TempShuffleBlockId, File)] {
         override def answer(invocation: InvocationOnMock): 
(TempShuffleBlockId, File) = {
           val blockId = new TempShuffleBlockId(UUID.randomUUID)
-          val file = File.createTempFile(blockId.toString, null, tempDir)
+          val file = new File(tempDir, blockId.name)
           blockIdToFileMap.put(blockId, file)
           temporaryFilesCreated.append(file)
           (blockId, file)
@@ -166,6 +166,7 @@ class BypassMergeSortShuffleWriterSuite extends 
SparkFunSuite with BeforeAndAfte
     writer.stop( /* success = */ true)
     assert(temporaryFilesCreated.nonEmpty)
     assert(writer.getPartitionLengths.sum === outputFile.length())
+    assert(writer.getPartitionLengths.filter(_ == 0L).size === 4) // should be 
4 zero length files
     assert(temporaryFilesCreated.count(_.exists()) === 0) // check that 
temporary files were deleted
     val shuffleWriteMetrics = taskContext.taskMetrics().shuffleWriteMetrics.get
     assert(shuffleWriteMetrics.shuffleBytesWritten === outputFile.length())
@@ -174,6 +175,41 @@ class BypassMergeSortShuffleWriterSuite extends 
SparkFunSuite with BeforeAndAfte
     assert(taskMetrics.memoryBytesSpilled === 0)
   }
 
+  test("only generate temp shuffle file for non-empty partition") {
+    // Using exception to test whether only non-empty partition creates temp 
shuffle file,
+    // because temp shuffle file will only be cleaned after calling 
stop(false) in the failure
+    // case, so we could use it to validate the temp shuffle files.
+    def records: Iterator[(Int, Int)] =
+      Iterator((1, 1), (5, 5)) ++
+        (0 until 100000).iterator.map { i =>
+          if (i == 99990) {
+            throw new SparkException("intentional failure")
+          } else {
+            (2, 2)
+          }
+        }
+
+    val writer = new BypassMergeSortShuffleWriter[Int, Int](
+      blockManager,
+      blockResolver,
+      shuffleHandle,
+      0, // MapId
+      taskContext,
+      conf
+    )
+
+    intercept[SparkException] {
+      writer.write(records)
+    }
+
+    assert(temporaryFilesCreated.nonEmpty)
+    // Only 3 temp shuffle files will be created
+    assert(temporaryFilesCreated.count(_.exists()) === 3)
+
+    writer.stop( /* success = */ false)
+    assert(temporaryFilesCreated.count(_.exists()) === 0) // check that 
temporary files were deleted
+  }
+
   test("cleanup of intermediate files after errors") {
     val writer = new BypassMergeSortShuffleWriter[Int, Int](
       blockManager,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to