Repository: spark
Updated Branches:
  refs/heads/master 4134653e5 -> d32e22778


[MINOR][SQL] Standardize 'continuous queries' to 'streaming Datasets/DataFrames'

## What changes were proposed in this pull request?

This patch does some replacing (as `streaming Datasets/DataFrames` is the term 
we've chosen in 
[SPARK-15593](https://github.com/apache/spark/commit/00c310133df4f3893dd90d801168c2ab9841b102)):
 - `continuous queries` -> `streaming Datasets/DataFrames`
 - `non-continuous queries` -> `non-streaming Datasets/DataFrames`

This patch also adds `test("check foreach() can only be called on streaming 
Datasets/DataFrames")`.

## How was this patch tested?

N/A

Author: Liwei Lin <lwl...@gmail.com>

Closes #13595 from lw-lin/continuous-queries-to-streaming-dss-dfs.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d32e2277
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d32e2277
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d32e2277

Branch: refs/heads/master
Commit: d32e227787338a08741d8064f5dd2db1d60ddc63
Parents: 4134653
Author: Liwei Lin <lwl...@gmail.com>
Authored: Mon Jun 13 11:49:15 2016 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Mon Jun 13 11:49:15 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/DataFrameWriter.scala  | 47 +++++-----
 .../apache/spark/sql/streaming/StreamTest.scala |  4 +-
 .../test/DataFrameReaderWriterSuite.scala       | 90 +++++++++++---------
 3 files changed, 77 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d32e2277/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index afae078..392e3c1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -56,7 +56,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
   def mode(saveMode: SaveMode): DataFrameWriter[T] = {
     // mode() is used for non-continuous queries
     // outputMode() is used for continuous queries
-    assertNotStreaming("mode() can only be called on non-continuous queries")
+    assertNotStreaming("mode() can only be called on non-streaming 
Datasets/DataFrames")
     this.mode = saveMode
     this
   }
@@ -73,7 +73,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
   def mode(saveMode: String): DataFrameWriter[T] = {
     // mode() is used for non-continuous queries
     // outputMode() is used for continuous queries
-    assertNotStreaming("mode() can only be called on non-continuous queries")
+    assertNotStreaming("mode() can only be called on non-streaming 
Datasets/DataFrames")
     this.mode = saveMode.toLowerCase match {
       case "overwrite" => SaveMode.Overwrite
       case "append" => SaveMode.Append
@@ -86,33 +86,33 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
   }
 
   /**
-   * Specifies how data of a streaming DataFrame/Dataset is written to a 
streaming sink.
-   *   - `OutputMode.Append()`: only the new rows in the streaming 
DataFrame/Dataset will be
+   * Specifies how data of a streaming Dataset/DataFrame is written to a 
streaming sink.
+   *   - `OutputMode.Append()`: only the new rows in the streaming 
Dataset/DataFrame will be
    *                            written to the sink
-   *   - `OutputMode.Complete()`: all the rows in the streaming 
DataFrame/Dataset will be written
+   *   - `OutputMode.Complete()`: all the rows in the streaming 
Dataset/DataFrame will be written
    *                              to the sink every time these is some updates
    *
    * @since 2.0.0
    */
   @Experimental
   def outputMode(outputMode: OutputMode): DataFrameWriter[T] = {
-    assertStreaming("outputMode() can only be called on continuous queries")
+    assertStreaming("outputMode() can only be called on streaming 
Datasets/DataFrames")
     this.outputMode = outputMode
     this
   }
 
   /**
-   * Specifies how data of a streaming DataFrame/Dataset is written to a 
streaming sink.
-   *   - `append`:   only the new rows in the streaming DataFrame/Dataset will 
be written to
+   * Specifies how data of a streaming Dataset/DataFrame is written to a 
streaming sink.
+   *   - `append`:   only the new rows in the streaming Dataset/DataFrame will 
be written to
    *                 the sink
-   *   - `complete`: all the rows in the streaming DataFrame/Dataset will be 
written to the sink
+   *   - `complete`: all the rows in the streaming Dataset/DataFrame will be 
written to the sink
    *                 every time these is some updates
    *
    * @since 2.0.0
    */
   @Experimental
   def outputMode(outputMode: String): DataFrameWriter[T] = {
-    assertStreaming("outputMode() can only be called on continuous queries")
+    assertStreaming("outputMode() can only be called on streaming 
Datasets/DataFrames")
     this.outputMode = outputMode.toLowerCase match {
       case "append" =>
         OutputMode.Append
@@ -150,7 +150,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
    */
   @Experimental
   def trigger(trigger: Trigger): DataFrameWriter[T] = {
-    assertStreaming("trigger() can only be called on continuous queries")
+    assertStreaming("trigger() can only be called on streaming 
Datasets/DataFrames")
     this.trigger = trigger
     this
   }
@@ -284,7 +284,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
    */
   def save(): Unit = {
     assertNotBucketed("save")
-    assertNotStreaming("save() can only be called on non-continuous queries")
+    assertNotStreaming("save() can only be called on non-streaming 
Datasets/DataFrames")
     val dataSource = DataSource(
       df.sparkSession,
       className = source,
@@ -304,7 +304,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
    */
   @Experimental
   def queryName(queryName: String): DataFrameWriter[T] = {
-    assertStreaming("queryName() can only be called on continuous queries")
+    assertStreaming("queryName() can only be called on streaming 
Datasets/DataFrames")
     this.extraOptions += ("queryName" -> queryName)
     this
   }
@@ -333,7 +333,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
   @Experimental
   def startStream(): ContinuousQuery = {
     assertNotBucketed("startStream")
-    assertStreaming("startStream() can only be called on continuous queries")
+    assertStreaming("startStream() can only be called on streaming 
Datasets/DataFrames")
 
     if (source == "memory") {
       val queryName =
@@ -434,8 +434,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
   def foreach(writer: ForeachWriter[T]): ContinuousQuery = {
     assertNotPartitioned("foreach")
     assertNotBucketed("foreach")
-    assertStreaming(
-      "foreach() can only be called on streaming Datasets/DataFrames.")
+    assertStreaming("foreach() can only be called on streaming 
Datasets/DataFrames.")
 
     val queryName = extraOptions.getOrElse("queryName", 
StreamExecution.nextName)
     val sink = new 
ForeachSink[T](ds.sparkSession.sparkContext.clean(writer))(ds.exprEnc)
@@ -502,7 +501,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
 
   private def insertInto(tableIdent: TableIdentifier): Unit = {
     assertNotBucketed("insertInto")
-    assertNotStreaming("insertInto() can only be called on non-continuous 
queries")
+    assertNotStreaming("insertInto() can only be called on non-streaming 
Datasets/DataFrames")
     val partitions = normalizedParCols.map(_.map(col => col -> 
(Option.empty[String])).toMap)
     val overwrite = mode == SaveMode.Overwrite
 
@@ -621,7 +620,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
   }
 
   private def saveAsTable(tableIdent: TableIdentifier): Unit = {
-    assertNotStreaming("saveAsTable() can only be called on non-continuous 
queries")
+    assertNotStreaming("saveAsTable() can only be called on non-streaming 
Datasets/DataFrames")
 
     val tableExists = 
df.sparkSession.sessionState.catalog.tableExists(tableIdent)
 
@@ -664,7 +663,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
   def jdbc(url: String, table: String, connectionProperties: Properties): Unit 
= {
     assertNotPartitioned("jdbc")
     assertNotBucketed("jdbc")
-    assertNotStreaming("jdbc() can only be called on non-continuous queries")
+    assertNotStreaming("jdbc() can only be called on non-streaming 
Datasets/DataFrames")
 
     val props = new Properties()
     extraOptions.foreach { case (key, value) =>
@@ -723,7 +722,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
    * @since 1.4.0
    */
   def json(path: String): Unit = {
-    assertNotStreaming("json() can only be called on non-continuous queries")
+    assertNotStreaming("json() can only be called on non-streaming 
Datasets/DataFrames")
     format("json").save(path)
   }
 
@@ -743,7 +742,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
    * @since 1.4.0
    */
   def parquet(path: String): Unit = {
-    assertNotStreaming("parquet() can only be called on non-continuous 
queries")
+    assertNotStreaming("parquet() can only be called on non-streaming 
Datasets/DataFrames")
     format("parquet").save(path)
   }
 
@@ -763,7 +762,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
    * @note Currently, this method can only be used after enabling Hive support
    */
   def orc(path: String): Unit = {
-    assertNotStreaming("orc() can only be called on non-continuous queries")
+    assertNotStreaming("orc() can only be called on non-streaming 
Datasets/DataFrames")
     format("orc").save(path)
   }
 
@@ -787,7 +786,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
    * @since 1.6.0
    */
   def text(path: String): Unit = {
-    assertNotStreaming("text() can only be called on non-continuous queries")
+    assertNotStreaming("text() can only be called on non-streaming 
Datasets/DataFrames")
     format("text").save(path)
   }
 
@@ -817,7 +816,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
    * @since 2.0.0
    */
   def csv(path: String): Unit = {
-    assertNotStreaming("csv() can only be called on non-continuous queries")
+    assertNotStreaming("csv() can only be called on non-streaming 
Datasets/DataFrames")
     format("csv").save(path)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d32e2277/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 7f1e5fe..fabb8ba 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -70,7 +70,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with 
Timeouts {
   /** How long to wait for an active stream to catch up when checking a 
result. */
   val streamingTimeout = 10.seconds
 
-  /** A trait for actions that can be performed while testing a streaming 
DataFrame. */
+  /** A trait for actions that can be performed while testing a streaming 
DataSet/DataFrame. */
   trait StreamAction
 
   /** A trait to mark actions that require the stream to be actively running. 
*/
@@ -194,7 +194,7 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with Timeouts {
   }
 
   /**
-   * Executes the specified actions on the given streaming DataFrame and 
provides helpful
+   * Executes the specified actions on the given streaming DataSet/DataFrame 
and provides helpful
    * error messages in the case of failures or incorrect answers.
    *
    * Note that if the stream is not explicitly started before an action that 
requires it to be

http://git-wip-us.apache.org/repos/asf/spark/blob/d32e2277/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
index 6e0d66a..51aa532 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
@@ -371,66 +371,80 @@ class DataFrameReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
 
   private def newTextInput = Utils.createTempDir(namePrefix = 
"text").getCanonicalPath
 
-  test("check trigger() can only be called on continuous queries") {
+  test("check trigger() can only be called on streaming Datasets/DataFrames") {
     val df = spark.read.text(newTextInput)
     val w = df.write.option("checkpointLocation", newMetadataDir)
     val e = intercept[AnalysisException](w.trigger(ProcessingTime("10 
seconds")))
-    assert(e.getMessage == "trigger() can only be called on continuous 
queries;")
+    assert(e.getMessage == "trigger() can only be called on streaming 
Datasets/DataFrames;")
   }
 
-  test("check queryName() can only be called on continuous queries") {
+  test("check queryName() can only be called on streaming 
Datasets/DataFrames") {
     val df = spark.read.text(newTextInput)
     val w = df.write.option("checkpointLocation", newMetadataDir)
     val e = intercept[AnalysisException](w.queryName("queryName"))
-    assert(e.getMessage == "queryName() can only be called on continuous 
queries;")
+    assert(e.getMessage == "queryName() can only be called on streaming 
Datasets/DataFrames;")
   }
 
-  test("check startStream() can only be called on continuous queries") {
+  test("check startStream() can only be called on streaming 
Datasets/DataFrames") {
     val df = spark.read.text(newTextInput)
     val w = df.write.option("checkpointLocation", newMetadataDir)
     val e = intercept[AnalysisException](w.startStream())
-    assert(e.getMessage == "startStream() can only be called on continuous 
queries;")
+    assert(e.getMessage == "startStream() can only be called on streaming 
Datasets/DataFrames;")
   }
 
-  test("check startStream(path) can only be called on continuous queries") {
+  test("check startStream(path) can only be called on streaming 
Datasets/DataFrames") {
     val df = spark.read.text(newTextInput)
     val w = df.write.option("checkpointLocation", newMetadataDir)
     val e = intercept[AnalysisException](w.startStream("non_exist_path"))
-    assert(e.getMessage == "startStream() can only be called on continuous 
queries;")
+    assert(e.getMessage == "startStream() can only be called on streaming 
Datasets/DataFrames;")
   }
 
-  test("check mode(SaveMode) can only be called on non-continuous queries") {
+  test("check foreach() can only be called on streaming Datasets/DataFrames") {
+    val df = spark.read.text(newTextInput)
+    val w = df.write.option("checkpointLocation", newMetadataDir)
+    val foreachWriter = new ForeachWriter[String] {
+      override def open(partitionId: Long, version: Long): Boolean = false
+      override def process(value: String): Unit = {}
+      override def close(errorOrNull: Throwable): Unit = {}
+    }
+    val e = intercept[AnalysisException](w.foreach(foreachWriter))
+    Seq("foreach()", "streaming Datasets/DataFrames").foreach { s =>
+      assert(e.getMessage.toLowerCase.contains(s.toLowerCase))
+    }
+  }
+
+  test("check mode(SaveMode) can only be called on non-streaming 
Datasets/DataFrames") {
     val df = spark.read
       .format("org.apache.spark.sql.streaming.test")
       .stream()
     val w = df.write
     val e = intercept[AnalysisException](w.mode(SaveMode.Append))
-    assert(e.getMessage == "mode() can only be called on non-continuous 
queries;")
+    assert(e.getMessage == "mode() can only be called on non-streaming 
Datasets/DataFrames;")
   }
 
-  test("check mode(string) can only be called on non-continuous queries") {
+  test("check mode(string) can only be called on non-streaming 
Datasets/DataFrames") {
     val df = spark.read
       .format("org.apache.spark.sql.streaming.test")
       .stream()
     val w = df.write
     val e = intercept[AnalysisException](w.mode("append"))
-    assert(e.getMessage == "mode() can only be called on non-continuous 
queries;")
+    assert(e.getMessage == "mode() can only be called on non-streaming 
Datasets/DataFrames;")
   }
 
-  test("check outputMode(OutputMode) can only be called on continuous 
queries") {
+  test("check outputMode(OutputMode) can only be called on streaming 
Datasets/DataFrames") {
     val df = spark.read.text(newTextInput)
     val w = df.write.option("checkpointLocation", newMetadataDir)
     val e = intercept[AnalysisException](w.outputMode(OutputMode.Append))
-    Seq("outputmode", "continuous queries").foreach { s =>
+    Seq("outputmode", "streaming Datasets/DataFrames").foreach { s =>
       assert(e.getMessage.toLowerCase.contains(s.toLowerCase))
     }
   }
 
-  test("check outputMode(string) can only be called on continuous queries") {
+  test("check outputMode(string) can only be called on streaming 
Datasets/DataFrames") {
     val df = spark.read.text(newTextInput)
     val w = df.write.option("checkpointLocation", newMetadataDir)
     val e = intercept[AnalysisException](w.outputMode("append"))
-    Seq("outputmode", "continuous queries").foreach { s =>
+    Seq("outputmode", "streaming Datasets/DataFrames").foreach { s =>
       assert(e.getMessage.toLowerCase.contains(s.toLowerCase))
     }
   }
@@ -450,7 +464,7 @@ class DataFrameReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
     testError("Xyz")
   }
 
-  test("check bucketBy() can only be called on non-continuous queries") {
+  test("check bucketBy() can only be called on non-streaming 
Datasets/DataFrames") {
     val df = spark.read
       .format("org.apache.spark.sql.streaming.test")
       .stream()
@@ -459,7 +473,7 @@ class DataFrameReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
     assert(e.getMessage == "'startStream' does not support bucketing right 
now;")
   }
 
-  test("check sortBy() can only be called on non-continuous queries;") {
+  test("check sortBy() can only be called on non-streaming 
Datasets/DataFrames;") {
     val df = spark.read
       .format("org.apache.spark.sql.streaming.test")
       .stream()
@@ -468,94 +482,94 @@ class DataFrameReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
     assert(e.getMessage == "'startStream' does not support bucketing right 
now;")
   }
 
-  test("check save(path) can only be called on non-continuous queries") {
+  test("check save(path) can only be called on non-streaming 
Datasets/DataFrames") {
     val df = spark.read
       .format("org.apache.spark.sql.streaming.test")
       .stream()
     val w = df.write
     val e = intercept[AnalysisException](w.save("non_exist_path"))
-    assert(e.getMessage == "save() can only be called on non-continuous 
queries;")
+    assert(e.getMessage == "save() can only be called on non-streaming 
Datasets/DataFrames;")
   }
 
-  test("check save() can only be called on non-continuous queries") {
+  test("check save() can only be called on non-streaming Datasets/DataFrames") 
{
     val df = spark.read
       .format("org.apache.spark.sql.streaming.test")
       .stream()
     val w = df.write
     val e = intercept[AnalysisException](w.save())
-    assert(e.getMessage == "save() can only be called on non-continuous 
queries;")
+    assert(e.getMessage == "save() can only be called on non-streaming 
Datasets/DataFrames;")
   }
 
-  test("check insertInto() can only be called on non-continuous queries") {
+  test("check insertInto() can only be called on non-streaming 
Datasets/DataFrames") {
     val df = spark.read
       .format("org.apache.spark.sql.streaming.test")
       .stream()
     val w = df.write
     val e = intercept[AnalysisException](w.insertInto("non_exsit_table"))
-    assert(e.getMessage == "insertInto() can only be called on non-continuous 
queries;")
+    assert(e.getMessage == "insertInto() can only be called on non-streaming 
Datasets/DataFrames;")
   }
 
-  test("check saveAsTable() can only be called on non-continuous queries") {
+  test("check saveAsTable() can only be called on non-streaming 
Datasets/DataFrames") {
     val df = spark.read
       .format("org.apache.spark.sql.streaming.test")
       .stream()
     val w = df.write
     val e = intercept[AnalysisException](w.saveAsTable("non_exsit_table"))
-    assert(e.getMessage == "saveAsTable() can only be called on non-continuous 
queries;")
+    assert(e.getMessage == "saveAsTable() can only be called on non-streaming 
Datasets/DataFrames;")
   }
 
-  test("check jdbc() can only be called on non-continuous queries") {
+  test("check jdbc() can only be called on non-streaming Datasets/DataFrames") 
{
     val df = spark.read
       .format("org.apache.spark.sql.streaming.test")
       .stream()
     val w = df.write
     val e = intercept[AnalysisException](w.jdbc(null, null, null))
-    assert(e.getMessage == "jdbc() can only be called on non-continuous 
queries;")
+    assert(e.getMessage == "jdbc() can only be called on non-streaming 
Datasets/DataFrames;")
   }
 
-  test("check json() can only be called on non-continuous queries") {
+  test("check json() can only be called on non-streaming Datasets/DataFrames") 
{
     val df = spark.read
       .format("org.apache.spark.sql.streaming.test")
       .stream()
     val w = df.write
     val e = intercept[AnalysisException](w.json("non_exist_path"))
-    assert(e.getMessage == "json() can only be called on non-continuous 
queries;")
+    assert(e.getMessage == "json() can only be called on non-streaming 
Datasets/DataFrames;")
   }
 
-  test("check parquet() can only be called on non-continuous queries") {
+  test("check parquet() can only be called on non-streaming 
Datasets/DataFrames") {
     val df = spark.read
       .format("org.apache.spark.sql.streaming.test")
       .stream()
     val w = df.write
     val e = intercept[AnalysisException](w.parquet("non_exist_path"))
-    assert(e.getMessage == "parquet() can only be called on non-continuous 
queries;")
+    assert(e.getMessage == "parquet() can only be called on non-streaming 
Datasets/DataFrames;")
   }
 
-  test("check orc() can only be called on non-continuous queries") {
+  test("check orc() can only be called on non-streaming Datasets/DataFrames") {
     val df = spark.read
       .format("org.apache.spark.sql.streaming.test")
       .stream()
     val w = df.write
     val e = intercept[AnalysisException](w.orc("non_exist_path"))
-    assert(e.getMessage == "orc() can only be called on non-continuous 
queries;")
+    assert(e.getMessage == "orc() can only be called on non-streaming 
Datasets/DataFrames;")
   }
 
-  test("check text() can only be called on non-continuous queries") {
+  test("check text() can only be called on non-streaming Datasets/DataFrames") 
{
     val df = spark.read
       .format("org.apache.spark.sql.streaming.test")
       .stream()
     val w = df.write
     val e = intercept[AnalysisException](w.text("non_exist_path"))
-    assert(e.getMessage == "text() can only be called on non-continuous 
queries;")
+    assert(e.getMessage == "text() can only be called on non-streaming 
Datasets/DataFrames;")
   }
 
-  test("check csv() can only be called on non-continuous queries") {
+  test("check csv() can only be called on non-streaming Datasets/DataFrames") {
     val df = spark.read
       .format("org.apache.spark.sql.streaming.test")
       .stream()
     val w = df.write
     val e = intercept[AnalysisException](w.csv("non_exist_path"))
-    assert(e.getMessage == "csv() can only be called on non-continuous 
queries;")
+    assert(e.getMessage == "csv() can only be called on non-streaming 
Datasets/DataFrames;")
   }
 
   test("check foreach() does not support partitioning or bucketing") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to