Repository: spark Updated Branches: refs/heads/master fab0d62a7 -> 7a0a630e0
[SPARK-19407][SS] defaultFS is used FileSystem.get instead of getting it from uri scheme ## What changes were proposed in this pull request? ``` Caused by: java.lang.IllegalArgumentException: Wrong FS: s3a://**************/checkpoint/7b2231a3-d845-4740-bfa3-681850e5987f/metadata, expected: file:/// at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:649) at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:82) at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:606) at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824) at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601) at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421) at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426) at org.apache.spark.sql.execution.streaming.StreamMetadata$.read(StreamMetadata.scala:51) at org.apache.spark.sql.execution.streaming.StreamExecution.<init>(StreamExecution.scala:100) at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232) at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269) at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262) ``` Can easily replicate on spark standalone cluster by providing checkpoint location uri scheme anything other than "file://" and not overriding in config. WorkAround --conf spark.hadoop.fs.defaultFS=s3a://somebucket or set it in sparkConf or spark-default.conf ## How was this patch tested? existing ut Author: uncleGen <husty...@gmail.com> Closes #16815 from uncleGen/SPARK-19407. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7a0a630e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7a0a630e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7a0a630e Branch: refs/heads/master Commit: 7a0a630e0f699017c7d0214923cd4aa0227e62ff Parents: fab0d62 Author: uncleGen <husty...@gmail.com> Authored: Mon Feb 6 21:03:20 2017 -0800 Committer: Shixiong Zhu <shixi...@databricks.com> Committed: Mon Feb 6 21:03:20 2017 -0800 ---------------------------------------------------------------------- .../apache/spark/sql/execution/streaming/StreamMetadata.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/7a0a630e/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala index 7807c9f..0bc54ea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala @@ -47,7 +47,7 @@ object StreamMetadata extends Logging { /** Read the metadata from file if it exists */ def read(metadataFile: Path, hadoopConf: Configuration): Option[StreamMetadata] = { - val fs = FileSystem.get(hadoopConf) + val fs = metadataFile.getFileSystem(hadoopConf) if (fs.exists(metadataFile)) { var input: FSDataInputStream = null try { @@ -72,7 +72,7 @@ object StreamMetadata extends Logging { hadoopConf: Configuration): Unit = { var output: FSDataOutputStream = null try { - val fs = FileSystem.get(hadoopConf) + val fs = metadataFile.getFileSystem(hadoopConf) output = fs.create(metadataFile) val writer = new OutputStreamWriter(output) Serialization.write(metadata, writer) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org