Repository: spark Updated Branches: refs/heads/master eabc7b8ee -> cd81fc9e8
[SPARK-12400][SHUFFLE] Avoid generating temp shuffle files for empty partitions This problem lies in `BypassMergeSortShuffleWriter`, empty partition will also generate a temp shuffle file with several bytes. So here change to only create file when partition is not empty. This problem only lies in here, no such issue in `HashShuffleWriter`. Please help to review, thanks a lot. Author: jerryshao <ss...@hortonworks.com> Closes #10376 from jerryshao/SPARK-12400. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cd81fc9e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cd81fc9e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cd81fc9e Branch: refs/heads/master Commit: cd81fc9e8652c07b84f0887a24d67381b4e605fa Parents: eabc7b8 Author: jerryshao <ss...@hortonworks.com> Authored: Wed Jan 13 16:34:23 2016 -0800 Committer: Josh Rosen <joshro...@databricks.com> Committed: Wed Jan 13 16:34:23 2016 -0800 ---------------------------------------------------------------------- .../sort/BypassMergeSortShuffleWriter.java | 25 +++++++------ .../BypassMergeSortShuffleWriterSuite.scala | 38 +++++++++++++++++++- 2 files changed, 51 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/cd81fc9e/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index a1a1fb0..56cdc22 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -138,7 +138,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> { final File file = tempShuffleBlockIdPlusFile._2(); final BlockId blockId = tempShuffleBlockIdPlusFile._1(); partitionWriters[i] = - blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics).open(); + blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics); } // Creating the file to write to and creating a disk writer both involve interacting with // the disk, and can take a long time in aggregate when we open many files, so should be @@ -185,16 +185,19 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> { boolean threwException = true; try { for (int i = 0; i < numPartitions; i++) { - final FileInputStream in = new FileInputStream(partitionWriters[i].fileSegment().file()); - boolean copyThrewException = true; - try { - lengths[i] = Utils.copyStream(in, out, false, transferToEnabled); - copyThrewException = false; - } finally { - Closeables.close(in, copyThrewException); - } - if (!partitionWriters[i].fileSegment().file().delete()) { - logger.error("Unable to delete file for partition {}", i); + final File file = partitionWriters[i].fileSegment().file(); + if (file.exists()) { + final FileInputStream in = new FileInputStream(file); + boolean copyThrewException = true; + try { + lengths[i] = Utils.copyStream(in, out, false, transferToEnabled); + copyThrewException = false; + } finally { + Closeables.close(in, copyThrewException); + } + if (!file.delete()) { + logger.error("Unable to delete file for partition {}", i); + } } } threwException = false; http://git-wip-us.apache.org/repos/asf/spark/blob/cd81fc9e/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala index e33408b..ef6ce04 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala @@ -105,7 +105,7 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte new Answer[(TempShuffleBlockId, File)] { override def answer(invocation: InvocationOnMock): (TempShuffleBlockId, File) = { val blockId = new TempShuffleBlockId(UUID.randomUUID) - val file = File.createTempFile(blockId.toString, null, tempDir) + val file = new File(tempDir, blockId.name) blockIdToFileMap.put(blockId, file) temporaryFilesCreated.append(file) (blockId, file) @@ -166,6 +166,7 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte writer.stop( /* success = */ true) assert(temporaryFilesCreated.nonEmpty) assert(writer.getPartitionLengths.sum === outputFile.length()) + assert(writer.getPartitionLengths.filter(_ == 0L).size === 4) // should be 4 zero length files assert(temporaryFilesCreated.count(_.exists()) === 0) // check that temporary files were deleted val shuffleWriteMetrics = taskContext.taskMetrics().shuffleWriteMetrics.get assert(shuffleWriteMetrics.shuffleBytesWritten === outputFile.length()) @@ -174,6 +175,41 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte assert(taskMetrics.memoryBytesSpilled === 0) } + test("only generate temp shuffle file for non-empty partition") { + // Using exception to test whether only non-empty partition creates temp shuffle file, + // because temp shuffle file will only be cleaned after calling stop(false) in the failure + // case, so we could use it to validate the temp shuffle files. + def records: Iterator[(Int, Int)] = + Iterator((1, 1), (5, 5)) ++ + (0 until 100000).iterator.map { i => + if (i == 99990) { + throw new SparkException("intentional failure") + } else { + (2, 2) + } + } + + val writer = new BypassMergeSortShuffleWriter[Int, Int]( + blockManager, + blockResolver, + shuffleHandle, + 0, // MapId + taskContext, + conf + ) + + intercept[SparkException] { + writer.write(records) + } + + assert(temporaryFilesCreated.nonEmpty) + // Only 3 temp shuffle files will be created + assert(temporaryFilesCreated.count(_.exists()) === 3) + + writer.stop( /* success = */ false) + assert(temporaryFilesCreated.count(_.exists()) === 0) // check that temporary files were deleted + } + test("cleanup of intermediate files after errors") { val writer = new BypassMergeSortShuffleWriter[Int, Int]( blockManager, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org