Repository: spark Updated Branches: refs/heads/branch-2.1 3a7591ad5 -> 1237aaea2
[SPARK-19779][SS] Delete needless tmp file after restart structured streaming job ## What changes were proposed in this pull request? [SPARK-19779](https://issues.apache.org/jira/browse/SPARK-19779) The PR (https://github.com/apache/spark/pull/17012) can to fix restart a Structured Streaming application using hdfs as fileSystem, but also exist a problem that a tmp file of delta file is still reserved in hdfs. And Structured Streaming don't delete the tmp file generated when restart streaming job in future. ## How was this patch tested? unit tests Author: guifeng <guifengl...@gmail.com> Closes #17124 from gf53520/SPARK-19779. (cherry picked from commit e24f21b5f8365ed25346e986748b393e0b4be25c) Signed-off-by: Shixiong Zhu <shixi...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1237aaea Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1237aaea Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1237aaea Branch: refs/heads/branch-2.1 Commit: 1237aaea279d6aac504ae1e3265c0b53779b5303 Parents: 3a7591a Author: guifeng <guifengl...@gmail.com> Authored: Thu Mar 2 21:19:29 2017 -0800 Committer: Shixiong Zhu <shixi...@databricks.com> Committed: Thu Mar 2 21:19:40 2017 -0800 ---------------------------------------------------------------------- .../streaming/state/HDFSBackedStateStoreProvider.scala | 4 +++- .../spark/sql/execution/streaming/state/StateStoreSuite.scala | 7 +++++++ 2 files changed, 10 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/1237aaea/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 2d29940..ab1204a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -283,7 +283,9 @@ private[state] class HDFSBackedStateStoreProvider( // semantically correct because Structured Streaming requires rerunning a batch should // generate the same output. (SPARK-19677) // scalastyle:on - if (!fs.exists(finalDeltaFile) && !fs.rename(tempDeltaFile, finalDeltaFile)) { + if (fs.exists(finalDeltaFile)) { + fs.delete(tempDeltaFile, true) + } else if (!fs.rename(tempDeltaFile, finalDeltaFile)) { throw new IOException(s"Failed to rename $tempDeltaFile to $finalDeltaFile") } loadedMaps.put(newVersion, map) http://git-wip-us.apache.org/repos/asf/spark/blob/1237aaea/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index 21a0a10..255378c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -20,9 +20,11 @@ package org.apache.spark.sql.execution.streaming.state import java.io.{File, IOException} import java.net.URI +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.Random +import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem} import org.scalatest.{BeforeAndAfter, PrivateMethodTester} @@ -293,6 +295,11 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth val provider = newStoreProvider(hadoopConf = conf) provider.getStore(0).commit() provider.getStore(0).commit() + + // Verify we don't leak temp files + val tempFiles = FileUtils.listFiles(new File(provider.id.checkpointLocation), + null, true).asScala.filter(_.getName.startsWith("temp-")) + assert(tempFiles.isEmpty) } test("corrupted file handling") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org