Repository: spark Updated Branches: refs/heads/branch-1.4 4b5bbc589 -> 6dde38026
[SPARK-9801] [STREAMING] Check if file exists before deleting temporary files. Spark streaming deletes the temp file and backup files without checking if they exist or not Author: Hao Zhu <viadea...@gmail.com> Closes #8082 from viadea/master and squashes the following commits: 242d05f [Hao Zhu] [SPARK-9801][Streaming]No need to check the existence of those files fd143f2 [Hao Zhu] [SPARK-9801][Streaming]Check if backupFile exists before deleting backupFile files. 087daf0 [Hao Zhu] SPARK-9801 (cherry picked from commit 3c9802d9400bea802984456683b2736a450ee17e) Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6dde3802 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6dde3802 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6dde3802 Branch: refs/heads/branch-1.4 Commit: 6dde38026113d8f83190e801a0f889c53bbc316d Parents: 4b5bbc5 Author: Hao Zhu <viadea...@gmail.com> Authored: Mon Aug 10 17:17:22 2015 -0700 Committer: Tathagata Das <tathagata.das1...@gmail.com> Committed: Mon Aug 10 17:17:47 2015 -0700 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/streaming/Checkpoint.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/6dde3802/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 5279331..bd117ed 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -189,7 +189,9 @@ class CheckpointWriter( + "'") // Write checkpoint to temp file - fs.delete(tempFile, true) // just in case it exists + if (fs.exists(tempFile)) { + fs.delete(tempFile, true) // just in case it exists + } val fos = fs.create(tempFile) Utils.tryWithSafeFinally { fos.write(bytes) @@ -200,7 +202,9 @@ class CheckpointWriter( // If the checkpoint file exists, back it up // If the backup exists as well, just delete it, otherwise rename will fail if (fs.exists(checkpointFile)) { - fs.delete(backupFile, true) // just in case it exists + if (fs.exists(backupFile)){ + fs.delete(backupFile, true) // just in case it exists + } if (!fs.rename(checkpointFile, backupFile)) { logWarning("Could not rename " + checkpointFile + " to " + backupFile) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org