This is an automated email from the ASF dual-hosted git repository. zsxwing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 78d546f [SPARK-27210][SS] Cleanup incomplete output files in ManifestFileCommitProtocol if task is aborted 78d546f is described below commit 78d546fe15aebcbf4b671c44383ddcf82b05b8a7 Author: Jungtaek Lim (HeartSaVioR) <kabh...@gmail.com> AuthorDate: Fri Mar 22 11:26:53 2019 -0700 [SPARK-27210][SS] Cleanup incomplete output files in ManifestFileCommitProtocol if task is aborted ## What changes were proposed in this pull request? This patch proposes ManifestFileCommitProtocol to clean up incomplete output files in task level if task aborts. Please note that this works as 'best-effort', not kind of guarantee, as we have in HadoopMapReduceCommitProtocol. ## How was this patch tested? Added UT. Closes #24154 from HeartSaVioR/SPARK-27210. Authored-by: Jungtaek Lim (HeartSaVioR) <kabh...@gmail.com> Signed-off-by: Shixiong Zhu <zsxw...@gmail.com> --- .../streaming/ManifestFileCommitProtocol.scala | 7 ++++-- .../spark/sql/streaming/FileStreamSinkSuite.scala | 29 ++++++++++++++++++++++ 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala index 92191c8..916bd2d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala @@ -114,7 +114,10 @@ class ManifestFileCommitProtocol(jobId: String, path: String) } override def abortTask(taskContext: TaskAttemptContext): Unit = { - // Do nothing - // TODO: we can also try delete the addedFiles as a best-effort cleanup. + // best effort cleanup of incomplete files + if (addedFiles.nonEmpty) { + val fs = new Path(addedFiles.head).getFileSystem(taskContext.getConfiguration) + addedFiles.foreach { file => fs.delete(new Path(file), false) } + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index 619d118..020ab23 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -18,8 +18,11 @@ package org.apache.spark.sql.streaming import java.io.File +import java.nio.file.Files import java.util.Locale +import scala.collection.JavaConverters._ + import org.apache.hadoop.fs.Path import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} @@ -478,4 +481,30 @@ class FileStreamSinkSuite extends StreamTest { checkDatasetUnorderly(outputDf, 1, 2, 3) } } + + testQuietly("cleanup incomplete output for aborted task") { + withTempDir { tempDir => + val checkpointDir = new File(tempDir, "chk") + val outputDir = new File(tempDir, "output") + val inputData = MemoryStream[Int] + inputData.addData(1, 2, 3) + val q = inputData.toDS().map(_ / 0) + .writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .format("parquet") + .start(outputDir.getCanonicalPath) + + intercept[StreamingQueryException] { + try { + q.processAllAvailable() + } finally { + q.stop() + } + } + + val outputFiles = Files.walk(outputDir.toPath).iterator().asScala + .filter(_.toString.endsWith(".parquet")) + assert(outputFiles.toList.isEmpty, "Incomplete files should be cleaned up.") + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org