Repository: spark
Updated Branches:
  refs/heads/branch-1.6 a447cd888 -> 8646b84fb


[SPARK-17547] Ensure temp shuffle data file is cleaned up after error

SPARK-8029 (#9610) modified shuffle writers to first stage their data to a 
temporary file in the same directory as the final destination file and then to 
atomically rename this temporary file at the end of the write job. However, 
this change introduced the potential for the temporary output file to be leaked 
if an exception occurs during the write because the shuffle writers' existing 
error cleanup code doesn't handle deletion of the temp file.

This patch avoids this potential cause of disk-space leaks by adding `finally` 
blocks to ensure that temp files are always deleted if they haven't been 
renamed.

Author: Josh Rosen <joshro...@databricks.com>

Closes #15104 from JoshRosen/cleanup-tmp-data-file-in-shuffle-writer.

(cherry picked from commit 5b8f7377d54f83b93ef2bfc2a01ca65fae6d3032)
Signed-off-by: Josh Rosen <joshro...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8646b84f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8646b84f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8646b84f

Branch: refs/heads/branch-1.6
Commit: 8646b84fb8ed319e3a998f93de4821c723f7d419
Parents: a447cd8
Author: Josh Rosen <joshro...@databricks.com>
Authored: Thu Sep 15 11:22:58 2016 -0700
Committer: Josh Rosen <joshro...@databricks.com>
Committed: Thu Sep 15 11:24:00 2016 -0700

----------------------------------------------------------------------
 .../sort/BypassMergeSortShuffleWriter.java      | 10 ++-
 .../spark/shuffle/sort/UnsafeShuffleWriter.java | 18 +++--
 .../shuffle/IndexShuffleBlockResolver.scala     | 80 +++++++++++---------
 .../spark/shuffle/sort/SortShuffleWriter.scala  | 14 +++-
 4 files changed, 73 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8646b84f/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
 
b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
index a1a1fb0..80d24b9 100644
--- 
a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
+++ 
b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
@@ -157,8 +157,14 @@ final class BypassMergeSortShuffleWriter<K, V> extends 
ShuffleWriter<K, V> {
 
     File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
     File tmp = Utils.tempFileWith(output);
-    partitionLengths = writePartitionedFile(tmp);
-    shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, 
partitionLengths, tmp);
+    try {
+      partitionLengths = writePartitionedFile(tmp);
+      shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, 
partitionLengths, tmp);
+    } finally {
+      if (tmp.exists() && !tmp.delete()) {
+        logger.error("Error while deleting temp file {}", 
tmp.getAbsolutePath());
+      }
+    }
     mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), 
partitionLengths);
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8646b84f/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java 
b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
index 744c300..d5e16fc 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
@@ -209,15 +209,21 @@ public class UnsafeShuffleWriter<K, V> extends 
ShuffleWriter<K, V> {
     final File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
     final File tmp = Utils.tempFileWith(output);
     try {
-      partitionLengths = mergeSpills(spills, tmp);
-    } finally {
-      for (SpillInfo spill : spills) {
-        if (spill.file.exists() && ! spill.file.delete()) {
-          logger.error("Error while deleting spill file {}", 
spill.file.getPath());
+      try {
+        partitionLengths = mergeSpills(spills, tmp);
+      } finally {
+        for (SpillInfo spill : spills) {
+          if (spill.file.exists() && ! spill.file.delete()) {
+            logger.error("Error while deleting spill file {}", 
spill.file.getPath());
+          }
         }
       }
+      shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, 
partitionLengths, tmp);
+    } finally {
+      if (tmp.exists() && !tmp.delete()) {
+        logger.error("Error while deleting temp file {}", 
tmp.getAbsolutePath());
+      }
     }
-    shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, 
partitionLengths, tmp);
     mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), 
partitionLengths);
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8646b84f/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala 
b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
index fadb8fe..cae97ad 100644
--- 
a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
+++ 
b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
@@ -138,48 +138,54 @@ private[spark] class IndexShuffleBlockResolver(
       dataTmp: File): Unit = {
     val indexFile = getIndexFile(shuffleId, mapId)
     val indexTmp = Utils.tempFileWith(indexFile)
-    val out = new DataOutputStream(new BufferedOutputStream(new 
FileOutputStream(indexTmp)))
-    Utils.tryWithSafeFinally {
-      // We take in lengths of each block, need to convert it to offsets.
-      var offset = 0L
-      out.writeLong(offset)
-      for (length <- lengths) {
-        offset += length
+    try {
+      val out = new DataOutputStream(new BufferedOutputStream(new 
FileOutputStream(indexTmp)))
+      Utils.tryWithSafeFinally {
+        // We take in lengths of each block, need to convert it to offsets.
+        var offset = 0L
         out.writeLong(offset)
+        for (length <- lengths) {
+          offset += length
+          out.writeLong(offset)
+        }
+      } {
+        out.close()
       }
-    } {
-      out.close()
-    }
 
-    val dataFile = getDataFile(shuffleId, mapId)
-    // There is only one IndexShuffleBlockResolver per executor, this 
synchronization make sure
-    // the following check and rename are atomic.
-    synchronized {
-      val existingLengths = checkIndexAndDataFile(indexFile, dataFile, 
lengths.length)
-      if (existingLengths != null) {
-        // Another attempt for the same task has already written our map 
outputs successfully,
-        // so just use the existing partition lengths and delete our temporary 
map outputs.
-        System.arraycopy(existingLengths, 0, lengths, 0, lengths.length)
-        if (dataTmp != null && dataTmp.exists()) {
-          dataTmp.delete()
-        }
-        indexTmp.delete()
-      } else {
-        // This is the first successful attempt in writing the map outputs for 
this task,
-        // so override any existing index and data files with the ones we 
wrote.
-        if (indexFile.exists()) {
-          indexFile.delete()
-        }
-        if (dataFile.exists()) {
-          dataFile.delete()
-        }
-        if (!indexTmp.renameTo(indexFile)) {
-          throw new IOException("fail to rename file " + indexTmp + " to " + 
indexFile)
-        }
-        if (dataTmp != null && dataTmp.exists() && 
!dataTmp.renameTo(dataFile)) {
-          throw new IOException("fail to rename file " + dataTmp + " to " + 
dataFile)
+      val dataFile = getDataFile(shuffleId, mapId)
+      // There is only one IndexShuffleBlockResolver per executor, this 
synchronization make sure
+      // the following check and rename are atomic.
+      synchronized {
+        val existingLengths = checkIndexAndDataFile(indexFile, dataFile, 
lengths.length)
+        if (existingLengths != null) {
+          // Another attempt for the same task has already written our map 
outputs successfully,
+          // so just use the existing partition lengths and delete our 
temporary map outputs.
+          System.arraycopy(existingLengths, 0, lengths, 0, lengths.length)
+          if (dataTmp != null && dataTmp.exists()) {
+            dataTmp.delete()
+          }
+          indexTmp.delete()
+        } else {
+          // This is the first successful attempt in writing the map outputs 
for this task,
+          // so override any existing index and data files with the ones we 
wrote.
+          if (indexFile.exists()) {
+            indexFile.delete()
+          }
+          if (dataFile.exists()) {
+            dataFile.delete()
+          }
+          if (!indexTmp.renameTo(indexFile)) {
+            throw new IOException("fail to rename file " + indexTmp + " to " + 
indexFile)
+          }
+          if (dataTmp != null && dataTmp.exists() && 
!dataTmp.renameTo(dataFile)) {
+            throw new IOException("fail to rename file " + dataTmp + " to " + 
dataFile)
+          }
         }
       }
+    } finally {
+      if (indexTmp.exists() && !indexTmp.delete()) {
+        logError(s"Failed to delete temporary index file at 
${indexTmp.getAbsolutePath}")
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8646b84f/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala 
b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
index f83cf88..a78cf43 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
@@ -68,10 +68,16 @@ private[spark] class SortShuffleWriter[K, V, C](
     // (see SPARK-3570).
     val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
     val tmp = Utils.tempFileWith(output)
-    val blockId = ShuffleBlockId(dep.shuffleId, mapId, 
IndexShuffleBlockResolver.NOOP_REDUCE_ID)
-    val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
-    shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, 
partitionLengths, tmp)
-    mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
+    try {
+      val blockId = ShuffleBlockId(dep.shuffleId, mapId, 
IndexShuffleBlockResolver.NOOP_REDUCE_ID)
+      val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
+      shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, 
partitionLengths, tmp)
+      mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
+    } finally {
+      if (tmp.exists() && !tmp.delete()) {
+        logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
+      }
+    }
   }
 
   /** Close this writer, passing along whether the map completed */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to