This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 78d546f  [SPARK-27210][SS] Cleanup incomplete output files in 
ManifestFileCommitProtocol if task is aborted
78d546f is described below

commit 78d546fe15aebcbf4b671c44383ddcf82b05b8a7
Author: Jungtaek Lim (HeartSaVioR) <kabh...@gmail.com>
AuthorDate: Fri Mar 22 11:26:53 2019 -0700

    [SPARK-27210][SS] Cleanup incomplete output files in 
ManifestFileCommitProtocol if task is aborted
    
    ## What changes were proposed in this pull request?
    
    This patch proposes ManifestFileCommitProtocol to clean up incomplete 
output files in task level if task aborts. Please note that this works as 
'best-effort', not kind of guarantee, as we have in 
HadoopMapReduceCommitProtocol.
    
    ## How was this patch tested?
    
    Added UT.
    
    Closes #24154 from HeartSaVioR/SPARK-27210.
    
    Authored-by: Jungtaek Lim (HeartSaVioR) <kabh...@gmail.com>
    Signed-off-by: Shixiong Zhu <zsxw...@gmail.com>
---
 .../streaming/ManifestFileCommitProtocol.scala     |  7 ++++--
 .../spark/sql/streaming/FileStreamSinkSuite.scala  | 29 ++++++++++++++++++++++
 2 files changed, 34 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
index 92191c8..916bd2d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
@@ -114,7 +114,10 @@ class ManifestFileCommitProtocol(jobId: String, path: 
String)
   }
 
   override def abortTask(taskContext: TaskAttemptContext): Unit = {
-    // Do nothing
-    // TODO: we can also try delete the addedFiles as a best-effort cleanup.
+    // best effort cleanup of incomplete files
+    if (addedFiles.nonEmpty) {
+      val fs = new 
Path(addedFiles.head).getFileSystem(taskContext.getConfiguration)
+      addedFiles.foreach { file => fs.delete(new Path(file), false) }
+    }
   }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 619d118..020ab23 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -18,8 +18,11 @@
 package org.apache.spark.sql.streaming
 
 import java.io.File
+import java.nio.file.Files
 import java.util.Locale
 
+import scala.collection.JavaConverters._
+
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
@@ -478,4 +481,30 @@ class FileStreamSinkSuite extends StreamTest {
       checkDatasetUnorderly(outputDf, 1, 2, 3)
     }
   }
+
+  testQuietly("cleanup incomplete output for aborted task") {
+    withTempDir { tempDir =>
+      val checkpointDir = new File(tempDir, "chk")
+      val outputDir = new File(tempDir, "output")
+      val inputData = MemoryStream[Int]
+      inputData.addData(1, 2, 3)
+      val q = inputData.toDS().map(_ / 0)
+        .writeStream
+        .option("checkpointLocation", checkpointDir.getCanonicalPath)
+        .format("parquet")
+        .start(outputDir.getCanonicalPath)
+
+      intercept[StreamingQueryException] {
+        try {
+          q.processAllAvailable()
+        } finally {
+          q.stop()
+        }
+      }
+
+      val outputFiles = Files.walk(outputDir.toPath).iterator().asScala
+        .filter(_.toString.endsWith(".parquet"))
+      assert(outputFiles.toList.isEmpty, "Incomplete files should be cleaned 
up.")
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to