spark git commit: [SPARK-26092][SS] Use CheckpointFileManager to write the streaming metadata file

2018-11-16 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.4 77c0629cb -> c23b801d3


[SPARK-26092][SS] Use CheckpointFileManager to write the streaming metadata file

## What changes were proposed in this pull request?

Use CheckpointFileManager to write the streaming `metadata` file so that the 
`metadata` file will never be a partial file.

## How was this patch tested?

Jenkins

Closes #23060 from zsxwing/SPARK-26092.

Authored-by: Shixiong Zhu 
Signed-off-by: Shixiong Zhu 
(cherry picked from commit 058c4602b000b24deb764a810ef8b43c41fe63ae)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c23b801d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c23b801d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c23b801d

Branch: refs/heads/branch-2.4
Commit: c23b801d3c87b12e729b98910833b441db05bd45
Parents: 77c0629
Author: Shixiong Zhu 
Authored: Fri Nov 16 15:43:27 2018 -0800
Committer: Shixiong Zhu 
Committed: Fri Nov 16 15:43:44 2018 -0800

--
 .../streaming/CheckpointFileManager.scala   |  2 +-
 .../execution/streaming/StreamExecution.scala   |  1 +
 .../execution/streaming/StreamMetadata.scala| 23 ++--
 3 files changed, 18 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c23b801d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
index 606ba25..b3e4240 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
@@ -56,7 +56,7 @@ trait CheckpointFileManager {
* @param overwriteIfPossible If true, then the implementations must do a 
best-effort attempt to
*overwrite the file if it already exists. It 
should not throw
*any exception if the file exists. However, if 
false, then the
-   *implementation must not overwrite if the file 
alraedy exists and
+   *implementation must not overwrite if the file 
already exists and
*must throw `FileAlreadyExistsException` in 
that case.
*/
   def createAtomic(path: Path, overwriteIfPossible: Boolean): 
CancellableFSDataOutputStream

http://git-wip-us.apache.org/repos/asf/spark/blob/c23b801d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index f6c60c1..de33844 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -87,6 +87,7 @@ abstract class StreamExecution(
   val resolvedCheckpointRoot = {
 val checkpointPath = new Path(checkpointRoot)
 val fs = 
checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
+fs.mkdirs(checkpointPath)
 checkpointPath.makeQualified(fs.getUri, 
fs.getWorkingDirectory).toUri.toString
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c23b801d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
index 0bc54ea..516afbe 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
@@ -19,16 +19,18 @@ package org.apache.spark.sql.execution.streaming
 
 import java.io.{InputStreamReader, OutputStreamWriter}
 import java.nio.charset.StandardCharsets
+import java.util.ConcurrentModificationException
 
 import scala.util.control.NonFatal
 
 import org.apache.commons.io.IOUtils
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, 
FSDataOutputStream, Path}
+import org.apache.hadoop.fs.{FileAlreadyExistsException, FSDataInputStream, 
Path}
 import org.json4s.NoTypeHints
 import 

spark git commit: [SPARK-26092][SS] Use CheckpointFileManager to write the streaming metadata file

2018-11-16 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 99cbc51b3 -> 058c4602b


[SPARK-26092][SS] Use CheckpointFileManager to write the streaming metadata file

## What changes were proposed in this pull request?

Use CheckpointFileManager to write the streaming `metadata` file so that the 
`metadata` file will never be a partial file.

## How was this patch tested?

Jenkins

Closes #23060 from zsxwing/SPARK-26092.

Authored-by: Shixiong Zhu 
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/058c4602
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/058c4602
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/058c4602

Branch: refs/heads/master
Commit: 058c4602b000b24deb764a810ef8b43c41fe63ae
Parents: 99cbc51
Author: Shixiong Zhu 
Authored: Fri Nov 16 15:43:27 2018 -0800
Committer: Shixiong Zhu 
Committed: Fri Nov 16 15:43:27 2018 -0800

--
 .../streaming/CheckpointFileManager.scala   |  2 +-
 .../execution/streaming/StreamExecution.scala   |  1 +
 .../execution/streaming/StreamMetadata.scala| 23 ++--
 3 files changed, 18 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/058c4602/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
index 606ba25..b3e4240 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
@@ -56,7 +56,7 @@ trait CheckpointFileManager {
* @param overwriteIfPossible If true, then the implementations must do a 
best-effort attempt to
*overwrite the file if it already exists. It 
should not throw
*any exception if the file exists. However, if 
false, then the
-   *implementation must not overwrite if the file 
alraedy exists and
+   *implementation must not overwrite if the file 
already exists and
*must throw `FileAlreadyExistsException` in 
that case.
*/
   def createAtomic(path: Path, overwriteIfPossible: Boolean): 
CancellableFSDataOutputStream

http://git-wip-us.apache.org/repos/asf/spark/blob/058c4602/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 631a6eb..89b4f40 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -88,6 +88,7 @@ abstract class StreamExecution(
   val resolvedCheckpointRoot = {
 val checkpointPath = new Path(checkpointRoot)
 val fs = 
checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
+fs.mkdirs(checkpointPath)
 checkpointPath.makeQualified(fs.getUri, 
fs.getWorkingDirectory).toUri.toString
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/058c4602/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
index 0bc54ea..516afbe 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
@@ -19,16 +19,18 @@ package org.apache.spark.sql.execution.streaming
 
 import java.io.{InputStreamReader, OutputStreamWriter}
 import java.nio.charset.StandardCharsets
+import java.util.ConcurrentModificationException
 
 import scala.util.control.NonFatal
 
 import org.apache.commons.io.IOUtils
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, 
FSDataOutputStream, Path}
+import org.apache.hadoop.fs.{FileAlreadyExistsException, FSDataInputStream, 
Path}
 import org.json4s.NoTypeHints
 import org.json4s.jackson.Serialization
 
 import org.apache.spark.internal.Logging
+import