Repository: spark
Updated Branches:
  refs/heads/branch-2.2 198e3a036 -> 6ef7a5bd3


[SPARK-21167][SS] Decode the path generated by File sink to handle special 
characters

## What changes were proposed in this pull request?

Decode the path generated by File sink to handle special characters.

## How was this patch tested?

The added unit test.

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #18381 from zsxwing/SPARK-21167.

(cherry picked from commit d66b143eec7f604595089f72d8786edbdcd74282)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6ef7a5bd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6ef7a5bd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6ef7a5bd

Branch: refs/heads/branch-2.2
Commit: 6ef7a5bd32a483ea1bdac22fbd2403cdefd71bff
Parents: 198e3a0
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Wed Jun 21 23:43:21 2017 -0700
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Wed Jun 21 23:43:30 2017 -0700

----------------------------------------------------------------------
 .../execution/streaming/FileStreamSinkLog.scala |  5 +++-
 .../sql/streaming/FileStreamSinkSuite.scala     | 29 ++++++++++++++++++++
 2 files changed, 33 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6ef7a5bd/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
index 8d718b2..c9939ac 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.net.URI
+
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.json4s.NoTypeHints
 import org.json4s.jackson.Serialization
@@ -47,7 +49,8 @@ case class SinkFileStatus(
     action: String) {
 
   def toFileStatus: FileStatus = {
-    new FileStatus(size, isDir, blockReplication, blockSize, modificationTime, 
new Path(path))
+    new FileStatus(
+      size, isDir, blockReplication, blockSize, modificationTime, new Path(new 
URI(path)))
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6ef7a5bd/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 1a2d3a1..bb6a278 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -64,6 +64,35 @@ class FileStreamSinkSuite extends StreamTest {
     }
   }
 
+  test("SPARK-21167: encode and decode path correctly") {
+    val inputData = MemoryStream[String]
+    val ds = inputData.toDS()
+
+    val outputDir = Utils.createTempDir(namePrefix = 
"stream.output").getCanonicalPath
+    val checkpointDir = Utils.createTempDir(namePrefix = 
"stream.checkpoint").getCanonicalPath
+
+    val query = ds.map(s => (s, s.length))
+      .toDF("value", "len")
+      .writeStream
+      .partitionBy("value")
+      .option("checkpointLocation", checkpointDir)
+      .format("parquet")
+      .start(outputDir)
+
+    try {
+      // The output is partitoned by "value", so the value will appear in the 
file path.
+      // This is to test if we handle spaces in the path correctly.
+      inputData.addData("hello world")
+      failAfter(streamingTimeout) {
+        query.processAllAvailable()
+      }
+      val outputDf = spark.read.parquet(outputDir)
+      checkDatasetUnorderly(outputDf.as[(Int, String)], ("hello world".length, 
"hello world"))
+    } finally {
+      query.stop()
+    }
+  }
+
   test("partitioned writing and batch reading") {
     val inputData = MemoryStream[Int]
     val ds = inputData.toDS()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to