Repository: spark
Updated Branches:
  refs/heads/branch-2.4 77c0629cb -> c23b801d3


[SPARK-26092][SS] Use CheckpointFileManager to write the streaming metadata file

## What changes were proposed in this pull request?

Use CheckpointFileManager to write the streaming `metadata` file so that the 
`metadata` file will never be a partial file.

## How was this patch tested?

Jenkins

Closes #23060 from zsxwing/SPARK-26092.

Authored-by: Shixiong Zhu <zsxw...@gmail.com>
Signed-off-by: Shixiong Zhu <zsxw...@gmail.com>
(cherry picked from commit 058c4602b000b24deb764a810ef8b43c41fe63ae)
Signed-off-by: Shixiong Zhu <zsxw...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c23b801d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c23b801d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c23b801d

Branch: refs/heads/branch-2.4
Commit: c23b801d3c87b12e729b98910833b441db05bd45
Parents: 77c0629
Author: Shixiong Zhu <zsxw...@gmail.com>
Authored: Fri Nov 16 15:43:27 2018 -0800
Committer: Shixiong Zhu <zsxw...@gmail.com>
Committed: Fri Nov 16 15:43:44 2018 -0800

----------------------------------------------------------------------
 .../streaming/CheckpointFileManager.scala       |  2 +-
 .../execution/streaming/StreamExecution.scala   |  1 +
 .../execution/streaming/StreamMetadata.scala    | 23 ++++++++++++++------
 3 files changed, 18 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c23b801d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
index 606ba25..b3e4240 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
@@ -56,7 +56,7 @@ trait CheckpointFileManager {
    * @param overwriteIfPossible If true, then the implementations must do a 
best-effort attempt to
    *                            overwrite the file if it already exists. It 
should not throw
    *                            any exception if the file exists. However, if 
false, then the
-   *                            implementation must not overwrite if the file 
alraedy exists and
+   *                            implementation must not overwrite if the file 
already exists and
    *                            must throw `FileAlreadyExistsException` in 
that case.
    */
   def createAtomic(path: Path, overwriteIfPossible: Boolean): 
CancellableFSDataOutputStream

http://git-wip-us.apache.org/repos/asf/spark/blob/c23b801d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index f6c60c1..de33844 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -87,6 +87,7 @@ abstract class StreamExecution(
   val resolvedCheckpointRoot = {
     val checkpointPath = new Path(checkpointRoot)
     val fs = 
checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
+    fs.mkdirs(checkpointPath)
     checkpointPath.makeQualified(fs.getUri, 
fs.getWorkingDirectory).toUri.toString
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c23b801d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
index 0bc54ea..516afbe 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
@@ -19,16 +19,18 @@ package org.apache.spark.sql.execution.streaming
 
 import java.io.{InputStreamReader, OutputStreamWriter}
 import java.nio.charset.StandardCharsets
+import java.util.ConcurrentModificationException
 
 import scala.util.control.NonFatal
 
 import org.apache.commons.io.IOUtils
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, 
FSDataOutputStream, Path}
+import org.apache.hadoop.fs.{FileAlreadyExistsException, FSDataInputStream, 
Path}
 import org.json4s.NoTypeHints
 import org.json4s.jackson.Serialization
 
 import org.apache.spark.internal.Logging
+import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
 import org.apache.spark.sql.streaming.StreamingQuery
 
 /**
@@ -70,19 +72,26 @@ object StreamMetadata extends Logging {
       metadata: StreamMetadata,
       metadataFile: Path,
       hadoopConf: Configuration): Unit = {
-    var output: FSDataOutputStream = null
+    var output: CancellableFSDataOutputStream = null
     try {
-      val fs = metadataFile.getFileSystem(hadoopConf)
-      output = fs.create(metadataFile)
+      val fileManager = CheckpointFileManager.create(metadataFile.getParent, 
hadoopConf)
+      output = fileManager.createAtomic(metadataFile, overwriteIfPossible = 
false)
       val writer = new OutputStreamWriter(output)
       Serialization.write(metadata, writer)
       writer.close()
     } catch {
-      case NonFatal(e) =>
+      case e: FileAlreadyExistsException =>
+        if (output != null) {
+          output.cancel()
+        }
+        throw new ConcurrentModificationException(
+          s"Multiple streaming queries are concurrently using $metadataFile", 
e)
+      case e: Throwable =>
+        if (output != null) {
+          output.cancel()
+        }
         logError(s"Error writing stream metadata $metadata to $metadataFile", 
e)
         throw e
-    } finally {
-      IOUtils.closeQuietly(output)
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to