Repository: spark
Updated Branches:
  refs/heads/branch-2.1 e6509c245 -> 85dd07374


[SPARK-18192] Support all file formats in structured streaming

## What changes were proposed in this pull request?
This patch adds support for all file formats in structured streaming sinks. 
This is actually a very small change thanks to all the previous refactoring 
done using the new internal commit protocol API.

## How was this patch tested?
Updated FileStreamSinkSuite to add test cases for json, text, and parquet.

Author: Reynold Xin <r...@databricks.com>

Closes #15711 from rxin/SPARK-18192.

(cherry picked from commit a36653c5b7b2719f8bfddf4ddfc6e1b828ac9af1)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/85dd0737
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/85dd0737
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/85dd0737

Branch: refs/heads/branch-2.1
Commit: 85dd073743946383438aabb9f1281e6075f25cc5
Parents: e6509c2
Author: Reynold Xin <r...@databricks.com>
Authored: Tue Nov 1 23:37:03 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Tue Nov 1 23:37:11 2016 -0700

----------------------------------------------------------------------
 .../sql/execution/datasources/DataSource.scala  |  8 +--
 .../sql/streaming/FileStreamSinkSuite.scala     | 62 +++++++++-----------
 2 files changed, 32 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/85dd0737/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index d980e6a..3f956c4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -29,7 +29,6 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable}
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
@@ -37,7 +36,6 @@ import 
org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
 import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.{CalendarIntervalType, StructType}
@@ -292,7 +290,7 @@ case class DataSource(
       case s: StreamSinkProvider =>
         s.createSink(sparkSession.sqlContext, options, partitionColumns, 
outputMode)
 
-      case parquet: parquet.ParquetFileFormat =>
+      case fileFormat: FileFormat =>
         val caseInsensitiveOptions = new CaseInsensitiveMap(options)
         val path = caseInsensitiveOptions.getOrElse("path", {
           throw new IllegalArgumentException("'path' is not specified")
@@ -301,7 +299,7 @@ case class DataSource(
           throw new IllegalArgumentException(
             s"Data source $className does not support $outputMode output mode")
         }
-        new FileStreamSink(sparkSession, path, parquet, partitionColumns, 
options)
+        new FileStreamSink(sparkSession, path, fileFormat, partitionColumns, 
options)
 
       case _ =>
         throw new UnsupportedOperationException(
@@ -516,7 +514,7 @@ case class DataSource(
           val plan = data.logicalPlan
           plan.resolve(name :: Nil, 
data.sparkSession.sessionState.analyzer.resolver).getOrElse {
             throw new AnalysisException(
-              s"Unable to resolve ${name} given 
[${plan.output.map(_.name).mkString(", ")}]")
+              s"Unable to resolve $name given 
[${plan.output.map(_.name).mkString(", ")}]")
           }.asInstanceOf[Attribute]
         }
         // For partitioned relation r, r.schema's column ordering can be 
different from the column

http://git-wip-us.apache.org/repos/asf/spark/blob/85dd0737/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 902cf05..0f140f9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.streaming
 
-import org.apache.spark.sql._
+import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.execution.DataSourceScanExec
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.streaming.{MemoryStream, 
MetadataLogFileIndex}
@@ -142,42 +142,38 @@ class FileStreamSinkSuite extends StreamTest {
     }
   }
 
-  test("FileStreamSink - supported formats") {
-    def testFormat(format: Option[String]): Unit = {
-      val inputData = MemoryStream[Int]
-      val ds = inputData.toDS()
+  test("FileStreamSink - parquet") {
+    testFormat(None) // should not throw error as default format parquet when 
not specified
+    testFormat(Some("parquet"))
+  }
 
-      val outputDir = Utils.createTempDir(namePrefix = 
"stream.output").getCanonicalPath
-      val checkpointDir = Utils.createTempDir(namePrefix = 
"stream.checkpoint").getCanonicalPath
+  test("FileStreamSink - text") {
+    testFormat(Some("text"))
+  }
 
-      var query: StreamingQuery = null
+  test("FileStreamSink - json") {
+    testFormat(Some("text"))
+  }
 
-      try {
-        val writer =
-          ds.map(i => (i, i * 1000))
-            .toDF("id", "value")
-            .writeStream
-        if (format.nonEmpty) {
-          writer.format(format.get)
-        }
-        query = writer
-            .option("checkpointLocation", checkpointDir)
-            .start(outputDir)
-      } finally {
-        if (query != null) {
-          query.stop()
-        }
-      }
-    }
+  def testFormat(format: Option[String]): Unit = {
+    val inputData = MemoryStream[Int]
+    val ds = inputData.toDS()
 
-    testFormat(None) // should not throw error as default format parquet when 
not specified
-    testFormat(Some("parquet"))
-    val e = intercept[UnsupportedOperationException] {
-      testFormat(Some("text"))
-    }
-    Seq("text", "not support", "stream").foreach { s =>
-      assert(e.getMessage.contains(s))
+    val outputDir = Utils.createTempDir(namePrefix = 
"stream.output").getCanonicalPath
+    val checkpointDir = Utils.createTempDir(namePrefix = 
"stream.checkpoint").getCanonicalPath
+
+    var query: StreamingQuery = null
+
+    try {
+      val writer = ds.map(i => (i, i * 1000)).toDF("id", "value").writeStream
+      if (format.nonEmpty) {
+        writer.format(format.get)
+      }
+      query = writer.option("checkpointLocation", 
checkpointDir).start(outputDir)
+    } finally {
+      if (query != null) {
+        query.stop()
+      }
     }
   }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to