spark git commit: [SPARK-26092][SS] Use CheckpointFileManager to write the streaming metadata file
Repository: spark Updated Branches: refs/heads/branch-2.4 77c0629cb -> c23b801d3 [SPARK-26092][SS] Use CheckpointFileManager to write the streaming metadata file ## What changes were proposed in this pull request? Use CheckpointFileManager to write the streaming `metadata` file so that the `metadata` file will never be a partial file. ## How was this patch tested? Jenkins Closes #23060 from zsxwing/SPARK-26092. Authored-by: Shixiong Zhu Signed-off-by: Shixiong Zhu (cherry picked from commit 058c4602b000b24deb764a810ef8b43c41fe63ae) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c23b801d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c23b801d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c23b801d Branch: refs/heads/branch-2.4 Commit: c23b801d3c87b12e729b98910833b441db05bd45 Parents: 77c0629 Author: Shixiong Zhu Authored: Fri Nov 16 15:43:27 2018 -0800 Committer: Shixiong Zhu Committed: Fri Nov 16 15:43:44 2018 -0800 -- .../streaming/CheckpointFileManager.scala | 2 +- .../execution/streaming/StreamExecution.scala | 1 + .../execution/streaming/StreamMetadata.scala| 23 ++-- 3 files changed, 18 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c23b801d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala index 606ba25..b3e4240 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala @@ -56,7 +56,7 @@ trait CheckpointFileManager { * @param overwriteIfPossible If true, then the implementations must do a best-effort attempt to *overwrite the file if it already exists. It should not throw *any exception if the file exists. However, if false, then the - *implementation must not overwrite if the file alraedy exists and + *implementation must not overwrite if the file already exists and *must throw `FileAlreadyExistsException` in that case. */ def createAtomic(path: Path, overwriteIfPossible: Boolean): CancellableFSDataOutputStream http://git-wip-us.apache.org/repos/asf/spark/blob/c23b801d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index f6c60c1..de33844 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -87,6 +87,7 @@ abstract class StreamExecution( val resolvedCheckpointRoot = { val checkpointPath = new Path(checkpointRoot) val fs = checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) +fs.mkdirs(checkpointPath) checkpointPath.makeQualified(fs.getUri, fs.getWorkingDirectory).toUri.toString } http://git-wip-us.apache.org/repos/asf/spark/blob/c23b801d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala index 0bc54ea..516afbe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala @@ -19,16 +19,18 @@ package org.apache.spark.sql.execution.streaming import java.io.{InputStreamReader, OutputStreamWriter} import java.nio.charset.StandardCharsets +import java.util.ConcurrentModificationException import scala.util.control.NonFatal import org.apache.commons.io.IOUtils import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, FSDataOutputStream, Path} +import org.apache.hadoop.fs.{FileAlreadyExistsException, FSDataInputStream, Path} import org.json4s.NoTypeHints import
spark git commit: [SPARK-26092][SS] Use CheckpointFileManager to write the streaming metadata file
Repository: spark Updated Branches: refs/heads/master 99cbc51b3 -> 058c4602b [SPARK-26092][SS] Use CheckpointFileManager to write the streaming metadata file ## What changes were proposed in this pull request? Use CheckpointFileManager to write the streaming `metadata` file so that the `metadata` file will never be a partial file. ## How was this patch tested? Jenkins Closes #23060 from zsxwing/SPARK-26092. Authored-by: Shixiong Zhu Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/058c4602 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/058c4602 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/058c4602 Branch: refs/heads/master Commit: 058c4602b000b24deb764a810ef8b43c41fe63ae Parents: 99cbc51 Author: Shixiong Zhu Authored: Fri Nov 16 15:43:27 2018 -0800 Committer: Shixiong Zhu Committed: Fri Nov 16 15:43:27 2018 -0800 -- .../streaming/CheckpointFileManager.scala | 2 +- .../execution/streaming/StreamExecution.scala | 1 + .../execution/streaming/StreamMetadata.scala| 23 ++-- 3 files changed, 18 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/058c4602/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala index 606ba25..b3e4240 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala @@ -56,7 +56,7 @@ trait CheckpointFileManager { * @param overwriteIfPossible If true, then the implementations must do a best-effort attempt to *overwrite the file if it already exists. It should not throw *any exception if the file exists. However, if false, then the - *implementation must not overwrite if the file alraedy exists and + *implementation must not overwrite if the file already exists and *must throw `FileAlreadyExistsException` in that case. */ def createAtomic(path: Path, overwriteIfPossible: Boolean): CancellableFSDataOutputStream http://git-wip-us.apache.org/repos/asf/spark/blob/058c4602/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 631a6eb..89b4f40 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -88,6 +88,7 @@ abstract class StreamExecution( val resolvedCheckpointRoot = { val checkpointPath = new Path(checkpointRoot) val fs = checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) +fs.mkdirs(checkpointPath) checkpointPath.makeQualified(fs.getUri, fs.getWorkingDirectory).toUri.toString } http://git-wip-us.apache.org/repos/asf/spark/blob/058c4602/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala index 0bc54ea..516afbe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala @@ -19,16 +19,18 @@ package org.apache.spark.sql.execution.streaming import java.io.{InputStreamReader, OutputStreamWriter} import java.nio.charset.StandardCharsets +import java.util.ConcurrentModificationException import scala.util.control.NonFatal import org.apache.commons.io.IOUtils import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, FSDataOutputStream, Path} +import org.apache.hadoop.fs.{FileAlreadyExistsException, FSDataInputStream, Path} import org.json4s.NoTypeHints import org.json4s.jackson.Serialization import org.apache.spark.internal.Logging +import