Repository: spark
Updated Branches:
  refs/heads/branch-2.1 3a7591ad5 -> 1237aaea2


[SPARK-19779][SS] Delete needless tmp file after restart structured streaming 
job

## What changes were proposed in this pull request?

[SPARK-19779](https://issues.apache.org/jira/browse/SPARK-19779)

The PR (https://github.com/apache/spark/pull/17012) can to fix restart a 
Structured Streaming application using hdfs as fileSystem, but also exist a 
problem that a tmp file of delta file is still reserved in hdfs. And Structured 
Streaming don't delete the tmp file generated when restart streaming job in 
future.

## How was this patch tested?
 unit tests

Author: guifeng <guifengl...@gmail.com>

Closes #17124 from gf53520/SPARK-19779.

(cherry picked from commit e24f21b5f8365ed25346e986748b393e0b4be25c)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1237aaea
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1237aaea
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1237aaea

Branch: refs/heads/branch-2.1
Commit: 1237aaea279d6aac504ae1e3265c0b53779b5303
Parents: 3a7591a
Author: guifeng <guifengl...@gmail.com>
Authored: Thu Mar 2 21:19:29 2017 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Thu Mar 2 21:19:40 2017 -0800

----------------------------------------------------------------------
 .../streaming/state/HDFSBackedStateStoreProvider.scala        | 4 +++-
 .../spark/sql/execution/streaming/state/StateStoreSuite.scala | 7 +++++++
 2 files changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1237aaea/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 2d29940..ab1204a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -283,7 +283,9 @@ private[state] class HDFSBackedStateStoreProvider(
       // semantically correct because Structured Streaming requires rerunning 
a batch should
       // generate the same output. (SPARK-19677)
       // scalastyle:on
-      if (!fs.exists(finalDeltaFile) && !fs.rename(tempDeltaFile, 
finalDeltaFile)) {
+      if (fs.exists(finalDeltaFile)) {
+        fs.delete(tempDeltaFile, true)
+      } else if (!fs.rename(tempDeltaFile, finalDeltaFile)) {
         throw new IOException(s"Failed to rename $tempDeltaFile to 
$finalDeltaFile")
       }
       loadedMaps.put(newVersion, map)

http://git-wip-us.apache.org/repos/asf/spark/blob/1237aaea/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index 21a0a10..255378c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -20,9 +20,11 @@ package org.apache.spark.sql.execution.streaming.state
 import java.io.{File, IOException}
 import java.net.URI
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.util.Random
 
+import org.apache.commons.io.FileUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
 import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
@@ -293,6 +295,11 @@ class StateStoreSuite extends SparkFunSuite with 
BeforeAndAfter with PrivateMeth
     val provider = newStoreProvider(hadoopConf = conf)
     provider.getStore(0).commit()
     provider.getStore(0).commit()
+
+    // Verify we don't leak temp files
+    val tempFiles = FileUtils.listFiles(new 
File(provider.id.checkpointLocation),
+      null, true).asScala.filter(_.getName.startsWith("temp-"))
+    assert(tempFiles.isEmpty)
   }
 
   test("corrupted file handling") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to