Repository: spark
Updated Branches:
  refs/heads/master 1002bd6b2 -> 021947020


[SPARK-21996][SQL] read files with space in name for streaming

## What changes were proposed in this pull request?

Structured streaming is now able to read files with space in file name 
(previously it would skip the file and output a warning)

## How was this patch tested?

Added new unit test.

Author: Xiayun Sun <xiayun...@gmail.com>

Closes #19247 from xysun/SPARK-21996.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/02194702
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/02194702
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/02194702

Branch: refs/heads/master
Commit: 02194702068291b3af77486d01029fb848c36d7b
Parents: 1002bd6
Author: Xiayun Sun <xiayun...@gmail.com>
Authored: Wed Jan 17 16:42:38 2018 -0800
Committer: Shixiong Zhu <zsxw...@gmail.com>
Committed: Wed Jan 17 16:42:38 2018 -0800

----------------------------------------------------------------------
 .../execution/streaming/FileStreamSource.scala  |  2 +-
 .../sql/streaming/FileStreamSourceSuite.scala   | 50 +++++++++++++++++++-
 2 files changed, 49 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/02194702/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 0debd7d..8c016ab 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -166,7 +166,7 @@ class FileStreamSource(
     val newDataSource =
       DataSource(
         sparkSession,
-        paths = files.map(_.path),
+        paths = files.map(f => new Path(new URI(f.path)).toString),
         userSpecifiedSchema = Some(schema),
         partitionColumns = partitionColumns,
         className = fileFormatClassName,

http://git-wip-us.apache.org/repos/asf/spark/blob/02194702/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 39bb572..5bb0f4d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -74,11 +74,11 @@ abstract class FileStreamSourceTest
     protected def addData(source: FileStreamSource): Unit
   }
 
-  case class AddTextFileData(content: String, src: File, tmp: File)
+  case class AddTextFileData(content: String, src: File, tmp: File, 
tmpFilePrefix: String = "text")
     extends AddFileData {
 
     override def addData(source: FileStreamSource): Unit = {
-      val tempFile = Utils.tempFileWith(new File(tmp, "text"))
+      val tempFile = Utils.tempFileWith(new File(tmp, tmpFilePrefix))
       val finalFile = new File(src, tempFile.getName)
       src.mkdirs()
       require(stringToFile(tempFile, content).renameTo(finalFile))
@@ -408,6 +408,52 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
     }
   }
 
+  test("SPARK-21996 read from text files -- file name has space") {
+    withTempDirs { case (src, tmp) =>
+      val textStream = createFileStream("text", src.getCanonicalPath)
+      val filtered = textStream.filter($"value" contains "keep")
+
+      testStream(filtered)(
+        AddTextFileData("drop1\nkeep2\nkeep3", src, tmp, "text text"),
+        CheckAnswer("keep2", "keep3")
+      )
+    }
+  }
+
+  test("SPARK-21996 read from text files generated by file sink -- file name 
has space") {
+    val testTableName = "FileStreamSourceTest"
+    withTable(testTableName) {
+      withTempDirs { case (src, checkpoint) =>
+        val output = new File(src, "text text")
+        val inputData = MemoryStream[String]
+        val ds = inputData.toDS()
+
+        val query = ds.writeStream
+          .option("checkpointLocation", checkpoint.getCanonicalPath)
+          .format("text")
+          .start(output.getCanonicalPath)
+
+        try {
+          inputData.addData("foo")
+          failAfter(streamingTimeout) {
+            query.processAllAvailable()
+          }
+        } finally {
+          query.stop()
+        }
+
+        val df2 = spark.readStream.format("text").load(output.getCanonicalPath)
+        val query2 = 
df2.writeStream.format("memory").queryName(testTableName).start()
+        try {
+          query2.processAllAvailable()
+          checkDatasetUnorderly(spark.table(testTableName).as[String], "foo")
+        } finally {
+          query2.stop()
+        }
+      }
+    }
+  }
+
   test("read from textfile") {
     withTempDirs { case (src, tmp) =>
       val textStream = spark.readStream.textFile(src.getCanonicalPath)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to