Repository: spark
Updated Branches:
  refs/heads/branch-2.2 765fd92e7 -> 090b987e6


[SPARK-22094][SS] processAllAvailable should check the query state

`processAllAvailable` should also check the query state and if the query is 
stopped, it should return.

The new unit test.

Author: Shixiong Zhu <zsxw...@gmail.com>

Closes #19314 from zsxwing/SPARK-22094.

(cherry picked from commit fedf6961be4e99139eb7ab08d5e6e29187ea5ccf)
Signed-off-by: Shixiong Zhu <zsxw...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/090b987e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/090b987e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/090b987e

Branch: refs/heads/branch-2.2
Commit: 090b987e665a47f08e2dc9fc5f22c427bc260fbc
Parents: 765fd92
Author: Shixiong Zhu <zsxw...@gmail.com>
Authored: Thu Sep 21 21:55:07 2017 -0700
Committer: Shixiong Zhu <zsxw...@gmail.com>
Committed: Thu Sep 21 22:08:45 2017 -0700

----------------------------------------------------------------------
 .../spark/sql/execution/streaming/StreamExecution.scala |  2 +-
 .../spark/sql/streaming/StreamingQuerySuite.scala       | 12 ++++++++++++
 2 files changed, 13 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/090b987e/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 16db353..33f81d9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -777,7 +777,7 @@ class StreamExecution(
         if (streamDeathCause != null) {
           throw streamDeathCause
         }
-        if (noNewData) {
+        if (noNewData || !isActive) {
           return
         }
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/090b987e/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index b69536e..ee5af65 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -613,6 +613,18 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
     }
   }
 
+  test("processAllAvailable should not block forever when a query is stopped") 
{
+    val input = MemoryStream[Int]
+    input.addData(1)
+    val query = input.toDF().writeStream
+      .trigger(Trigger.Once())
+      .format("console")
+      .start()
+    failAfter(streamingTimeout) {
+      query.processAllAvailable()
+    }
+  }
+
   /** Create a streaming DF that only execute one batch in which it returns 
the given static DF */
   private def createSingleTriggerStreamingDF(triggerDF: DataFrame): DataFrame 
= {
     require(!triggerDF.isStreaming)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to