Repository: spark
Updated Branches:
  refs/heads/master 13268a58f -> 9962390af


[SPARK-22781][SS] Support creating streaming dataset with ORC files

## What changes were proposed in this pull request?

Like `Parquet`, users can use `ORC` with Apache Spark structured streaming. 
This PR adds `orc()` to `DataStreamReader`(Scala/Python) in order to support 
creating streaming dataset with ORC file format more easily like the other file 
formats. Also, this adds a test coverage for ORC data source and updates the 
document.

**BEFORE**

```scala
scala> spark.readStream.schema("a 
int").orc("/tmp/orc_ss").writeStream.format("console").start()
<console>:24: error: value orc is not a member of 
org.apache.spark.sql.streaming.DataStreamReader
       spark.readStream.schema("a 
int").orc("/tmp/orc_ss").writeStream.format("console").start()
```

**AFTER**
```scala
scala> spark.readStream.schema("a 
int").orc("/tmp/orc_ss").writeStream.format("console").start()
res0: org.apache.spark.sql.streaming.StreamingQuery = 
org.apache.spark.sql.execution.streaming.StreamingQueryWrapper678b3746

scala>
-------------------------------------------
Batch: 0
-------------------------------------------
+---+
|  a|
+---+
|  1|
+---+
```

## How was this patch tested?

Pass the newly added test cases.

Author: Dongjoon Hyun <dongj...@apache.org>

Closes #19975 from dongjoon-hyun/SPARK-22781.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9962390a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9962390a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9962390a

Branch: refs/heads/master
Commit: 9962390af77d8085ce2a475b5983766c9c307031
Parents: 13268a5
Author: Dongjoon Hyun <dongj...@apache.org>
Authored: Tue Dec 19 23:50:06 2017 -0800
Committer: Shixiong Zhu <zsxw...@gmail.com>
Committed: Tue Dec 19 23:50:06 2017 -0800

----------------------------------------------------------------------
 docs/structured-streaming-programming-guide.md  |   2 +-
 python/pyspark/sql/streaming.py                 |  17 +++
 .../spark/sql/streaming/DataStreamReader.scala  |  15 +++
 .../sql/streaming/FileStreamSinkSuite.scala     |   4 +
 .../sql/streaming/FileStreamSourceSuite.scala   | 111 +++++++++++++++++++
 5 files changed, 148 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9962390a/docs/structured-streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/structured-streaming-programming-guide.md 
b/docs/structured-streaming-programming-guide.md
index 93bef8d..31fcfab 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -493,7 +493,7 @@ returned by `SparkSession.readStream()`. In 
[R](api/R/read.stream.html), with th
 #### Input Sources
 There are a few built-in sources.
 
-  - **File source** - Reads files written in a directory as a stream of data. 
Supported file formats are text, csv, json, parquet. See the docs of the 
DataStreamReader interface for a more up-to-date list, and supported options 
for each file format. Note that the files must be atomically placed in the 
given directory, which in most file systems, can be achieved by file move 
operations.
+  - **File source** - Reads files written in a directory as a stream of data. 
Supported file formats are text, csv, json, orc, parquet. See the docs of the 
DataStreamReader interface for a more up-to-date list, and supported options 
for each file format. Note that the files must be atomically placed in the 
given directory, which in most file systems, can be achieved by file move 
operations.
 
   - **Kafka source** - Reads data from Kafka. It's compatible with Kafka 
broker versions 0.10.0 or higher. See the [Kafka Integration 
Guide](structured-streaming-kafka-integration.html) for more details.
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9962390a/python/pyspark/sql/streaming.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 0cf7021..d0aba28 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -490,6 +490,23 @@ class DataStreamReader(OptionUtils):
         else:
             raise TypeError("path can be only a single string")
 
+    @since(2.3)
+    def orc(self, path):
+        """Loads a ORC file stream, returning the result as a 
:class:`DataFrame`.
+
+        .. note:: Evolving.
+
+        >>> orc_sdf = 
spark.readStream.schema(sdf_schema).orc(tempfile.mkdtemp())
+        >>> orc_sdf.isStreaming
+        True
+        >>> orc_sdf.schema == sdf_schema
+        True
+        """
+        if isinstance(path, basestring):
+            return self._df(self._jreader.orc(path))
+        else:
+            raise TypeError("path can be only a single string")
+
     @since(2.0)
     def parquet(self, path):
         """Loads a Parquet file stream, returning the result as a 
:class:`DataFrame`.

http://git-wip-us.apache.org/repos/asf/spark/blob/9962390a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index a42e280..41aa02c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -299,6 +299,21 @@ final class DataStreamReader private[sql](sparkSession: 
SparkSession) extends Lo
   def csv(path: String): DataFrame = format("csv").load(path)
 
   /**
+   * Loads a ORC file stream, returning the result as a `DataFrame`.
+   *
+   * You can set the following ORC-specific option(s) for reading ORC files:
+   * <ul>
+   * <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number 
of new files to be
+   * considered in every trigger.</li>
+   * </ul>
+   *
+   * @since 2.3.0
+   */
+  def orc(path: String): DataFrame = {
+    format("orc").load(path)
+  }
+
+  /**
    * Loads a Parquet file stream, returning the result as a `DataFrame`.
    *
    * You can set the following Parquet-specific option(s) for reading Parquet 
files:

http://git-wip-us.apache.org/repos/asf/spark/blob/9962390a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 08db06b..2a25522 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -305,6 +305,10 @@ class FileStreamSinkSuite extends StreamTest {
     testFormat(Some("parquet"))
   }
 
+  test("orc") {
+    testFormat(Some("orc"))
+  }
+
   test("text") {
     testFormat(Some("text"))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/9962390a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index c5b57bc..f4fa7fa 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -87,6 +87,28 @@ abstract class FileStreamSourceTest
     }
   }
 
+  case class AddOrcFileData(data: DataFrame, src: File, tmp: File) extends 
AddFileData {
+    override def addData(source: FileStreamSource): Unit = {
+      AddOrcFileData.writeToFile(data, src, tmp)
+    }
+  }
+
+  object AddOrcFileData {
+    def apply(seq: Seq[String], src: File, tmp: File): AddOrcFileData = {
+      AddOrcFileData(seq.toDS().toDF(), src, tmp)
+    }
+
+    /** Write orc files in a temp dir, and move the individual files to the 
'src' dir */
+    def writeToFile(df: DataFrame, src: File, tmp: File): Unit = {
+      val tmpDir = Utils.tempFileWith(new File(tmp, "orc"))
+      df.write.orc(tmpDir.getCanonicalPath)
+      src.mkdirs()
+      tmpDir.listFiles().foreach { f =>
+        f.renameTo(new File(src, s"${f.getName}"))
+      }
+    }
+  }
+
   case class AddParquetFileData(data: DataFrame, src: File, tmp: File) extends 
AddFileData {
     override def addData(source: FileStreamSource): Unit = {
       AddParquetFileData.writeToFile(data, src, tmp)
@@ -249,6 +271,42 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
     }
   }
 
+  // =============== ORC file stream schema tests ================
+
+  test("FileStreamSource schema: orc, existing files, no schema") {
+    withTempDir { src =>
+      Seq("a", "b", "c").toDS().as("userColumn").toDF().write
+        .mode(org.apache.spark.sql.SaveMode.Overwrite)
+        .orc(src.getCanonicalPath)
+
+      // Without schema inference, should throw error
+      withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "false") {
+        intercept[IllegalArgumentException] {
+          createFileStreamSourceAndGetSchema(
+            format = Some("orc"), path = Some(src.getCanonicalPath), schema = 
None)
+        }
+      }
+
+      // With schema inference, should infer correct schema
+      withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") {
+        val schema = createFileStreamSourceAndGetSchema(
+          format = Some("orc"), path = Some(src.getCanonicalPath), schema = 
None)
+        assert(schema === new StructType().add("value", StringType))
+      }
+    }
+  }
+
+  test("FileStreamSource schema: orc, existing files, schema") {
+    withTempPath { src =>
+      Seq("a", "b", "c").toDS().as("oldUserColumn").toDF()
+        .write.orc(new File(src, "1").getCanonicalPath)
+      val userSchema = new StructType().add("userColumn", StringType)
+      val schema = createFileStreamSourceAndGetSchema(
+        format = Some("orc"), path = Some(src.getCanonicalPath), schema = 
Some(userSchema))
+      assert(schema === userSchema)
+    }
+  }
+
   // =============== Parquet file stream schema tests ================
 
   test("FileStreamSource schema: parquet, existing files, no schema") {
@@ -508,6 +566,59 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
     }
   }
 
+  // =============== ORC file stream tests ================
+
+  test("read from orc files") {
+    withTempDirs { case (src, tmp) =>
+      val fileStream = createFileStream("orc", src.getCanonicalPath, 
Some(valueSchema))
+      val filtered = fileStream.filter($"value" contains "keep")
+
+      testStream(filtered)(
+        AddOrcFileData(Seq("drop1", "keep2", "keep3"), src, tmp),
+        CheckAnswer("keep2", "keep3"),
+        StopStream,
+        AddOrcFileData(Seq("drop4", "keep5", "keep6"), src, tmp),
+        StartStream(),
+        CheckAnswer("keep2", "keep3", "keep5", "keep6"),
+        AddOrcFileData(Seq("drop7", "keep8", "keep9"), src, tmp),
+        CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
+      )
+    }
+  }
+
+  test("read from orc files with changing schema") {
+    withTempDirs { case (src, tmp) =>
+      withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") {
+
+        // Add a file so that we can infer its schema
+        AddOrcFileData.writeToFile(Seq("value0").toDF("k"), src, tmp)
+
+        val fileStream = createFileStream("orc", src.getCanonicalPath)
+
+        // FileStreamSource should infer the column "k"
+        assert(fileStream.schema === StructType(Seq(StructField("k", 
StringType))))
+
+        // After creating DF and before starting stream, add data with 
different schema
+        // Should not affect the inferred schema any more
+        AddOrcFileData.writeToFile(Seq(("value1", 0)).toDF("k", "v"), src, tmp)
+
+        testStream(fileStream)(
+          // Should not pick up column v in the file added before start
+          AddOrcFileData(Seq("value2").toDF("k"), src, tmp),
+          CheckAnswer("value0", "value1", "value2"),
+
+          // Should read data in column k, and ignore v
+          AddOrcFileData(Seq(("value3", 1)).toDF("k", "v"), src, tmp),
+          CheckAnswer("value0", "value1", "value2", "value3"),
+
+          // Should ignore rows that do not have the necessary k column
+          AddOrcFileData(Seq("value5").toDF("v"), src, tmp),
+          CheckAnswer("value0", "value1", "value2", "value3", null)
+        )
+      }
+    }
+  }
+
   // =============== Parquet file stream tests ================
 
   test("read from parquet files") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to