Repository: spark Updated Branches: refs/heads/branch-2.2 765fd92e7 -> 090b987e6
[SPARK-22094][SS] processAllAvailable should check the query state `processAllAvailable` should also check the query state and if the query is stopped, it should return. The new unit test. Author: Shixiong Zhu <zsxw...@gmail.com> Closes #19314 from zsxwing/SPARK-22094. (cherry picked from commit fedf6961be4e99139eb7ab08d5e6e29187ea5ccf) Signed-off-by: Shixiong Zhu <zsxw...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/090b987e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/090b987e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/090b987e Branch: refs/heads/branch-2.2 Commit: 090b987e665a47f08e2dc9fc5f22c427bc260fbc Parents: 765fd92 Author: Shixiong Zhu <zsxw...@gmail.com> Authored: Thu Sep 21 21:55:07 2017 -0700 Committer: Shixiong Zhu <zsxw...@gmail.com> Committed: Thu Sep 21 22:08:45 2017 -0700 ---------------------------------------------------------------------- .../spark/sql/execution/streaming/StreamExecution.scala | 2 +- .../spark/sql/streaming/StreamingQuerySuite.scala | 12 ++++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/090b987e/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 16db353..33f81d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -777,7 +777,7 @@ class StreamExecution( if (streamDeathCause != null) { throw streamDeathCause } - if (noNewData) { + if (noNewData || !isActive) { return } } http://git-wip-us.apache.org/repos/asf/spark/blob/090b987e/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index b69536e..ee5af65 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -613,6 +613,18 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi } } + test("processAllAvailable should not block forever when a query is stopped") { + val input = MemoryStream[Int] + input.addData(1) + val query = input.toDF().writeStream + .trigger(Trigger.Once()) + .format("console") + .start() + failAfter(streamingTimeout) { + query.processAllAvailable() + } + } + /** Create a streaming DF that only execute one batch in which it returns the given static DF */ private def createSingleTriggerStreamingDF(triggerDF: DataFrame): DataFrame = { require(!triggerDF.isStreaming) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org