Repository: spark Updated Branches: refs/heads/branch-2.2 7b9807754 -> 48bacd36c
[SPARK-21696][SS] Fix a potential issue that may generate partial snapshot files ## What changes were proposed in this pull request? Directly writing a snapshot file may generate a partial file. This PR changes it to write to a temp file then rename to the target file. ## How was this patch tested? Jenkins. Author: Shixiong Zhu <shixi...@databricks.com> Closes #18928 from zsxwing/SPARK-21696. (cherry picked from commit 282f00b410fdc4dc69b9d1f3cb3e2ba53cd85b8b) Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/48bacd36 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/48bacd36 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/48bacd36 Branch: refs/heads/branch-2.2 Commit: 48bacd36c673bcbe20dc2e119cddb2a61261a394 Parents: 7b98077 Author: Shixiong Zhu <shixi...@databricks.com> Authored: Mon Aug 14 15:06:55 2017 -0700 Committer: Tathagata Das <tathagata.das1...@gmail.com> Committed: Mon Aug 14 15:07:44 2017 -0700 ---------------------------------------------------------------------- .../streaming/state/HDFSBackedStateStoreProvider.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/48bacd36/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index fb2bf47..ef48fff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -448,9 +448,11 @@ private[state] class HDFSBackedStateStoreProvider( private def writeSnapshotFile(version: Long, map: MapType): Unit = { val fileToWrite = snapshotFile(version) + val tempFile = + new Path(fileToWrite.getParent, s"${fileToWrite.getName}.temp-${Random.nextLong}") var output: DataOutputStream = null Utils.tryWithSafeFinally { - output = compressStream(fs.create(fileToWrite, false)) + output = compressStream(fs.create(tempFile, false)) val iter = map.entrySet().iterator() while(iter.hasNext) { val entry = iter.next() @@ -465,6 +467,12 @@ private[state] class HDFSBackedStateStoreProvider( } { if (output != null) output.close() } + if (fs.exists(fileToWrite)) { + // Skip rename if the file is alreayd created. + fs.delete(tempFile, true) + } else if (!fs.rename(tempFile, fileToWrite)) { + throw new IOException(s"Failed to rename $tempFile to $fileToWrite") + } logInfo(s"Written snapshot file for version $version of $this at $fileToWrite") } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org