Repository: spark
Updated Branches:
  refs/heads/branch-2.2 7b9807754 -> 48bacd36c


[SPARK-21696][SS] Fix a potential issue that may generate partial snapshot files

## What changes were proposed in this pull request?

Directly writing a snapshot file may generate a partial file. This PR changes 
it to write to a temp file then rename to the target file.

## How was this patch tested?

Jenkins.

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #18928 from zsxwing/SPARK-21696.

(cherry picked from commit 282f00b410fdc4dc69b9d1f3cb3e2ba53cd85b8b)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/48bacd36
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/48bacd36
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/48bacd36

Branch: refs/heads/branch-2.2
Commit: 48bacd36c673bcbe20dc2e119cddb2a61261a394
Parents: 7b98077
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Mon Aug 14 15:06:55 2017 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Mon Aug 14 15:07:44 2017 -0700

----------------------------------------------------------------------
 .../streaming/state/HDFSBackedStateStoreProvider.scala    | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/48bacd36/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index fb2bf47..ef48fff 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -448,9 +448,11 @@ private[state] class HDFSBackedStateStoreProvider(
 
   private def writeSnapshotFile(version: Long, map: MapType): Unit = {
     val fileToWrite = snapshotFile(version)
+    val tempFile =
+      new Path(fileToWrite.getParent, 
s"${fileToWrite.getName}.temp-${Random.nextLong}")
     var output: DataOutputStream = null
     Utils.tryWithSafeFinally {
-      output = compressStream(fs.create(fileToWrite, false))
+      output = compressStream(fs.create(tempFile, false))
       val iter = map.entrySet().iterator()
       while(iter.hasNext) {
         val entry = iter.next()
@@ -465,6 +467,12 @@ private[state] class HDFSBackedStateStoreProvider(
     } {
       if (output != null) output.close()
     }
+    if (fs.exists(fileToWrite)) {
+      // Skip rename if the file is alreayd created.
+      fs.delete(tempFile, true)
+    } else if (!fs.rename(tempFile, fileToWrite)) {
+      throw new IOException(s"Failed to rename $tempFile to $fileToWrite")
+    }
     logInfo(s"Written snapshot file for version $version of $this at 
$fileToWrite")
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to