This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 1a3dce36e2a [SPARK-43032][FOLLOWUP][SS][CONNECT] StreamingQueryManager 
bug fix
1a3dce36e2a is described below

commit 1a3dce36e2ad8d0e0bb2e1123864764077320466
Author: Wei Liu <wei....@databricks.com>
AuthorDate: Fri Aug 11 10:23:56 2023 +0900

    [SPARK-43032][FOLLOWUP][SS][CONNECT] StreamingQueryManager bug fix
    
    When calling `spark.streams.get(q.id)` on a stopped query q. It should 
return None in python and null in scala client. But right now it throws a null 
pointer exception. This PR fixes this issue.
    
    Bug fix
    
    No
    
    Added unit tests
    
    Closes #42437 from WweiL/streaming-query-manager-get-bug-fix.
    
    Authored-by: Wei Liu <wei....@databricks.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
    (cherry picked from commit 42eb4223628653db71950f161a745432d1b45502)
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../spark/sql/streaming/ClientStreamingQuerySuite.scala     |  2 ++
 .../spark/sql/connect/planner/SparkConnectPlanner.scala     |  5 +++--
 python/pyspark/sql/tests/streaming/test_streaming.py        | 13 +++++++++++++
 3 files changed, 18 insertions(+), 2 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala
index f9e6e686495..ab92431bc11 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala
@@ -268,6 +268,8 @@ class ClientStreamingQuerySuite extends QueryTest with 
SQLHelper with Logging {
 
     q.stop()
     assert(!q1.isActive)
+
+    assert(spark.streams.get(q.id) == null)
   }
 
   test("streaming query listener") {
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index d59d01b4ce3..49bac17a4f4 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -3059,8 +3059,9 @@ class SparkConnectPlanner(val sessionHolder: 
SessionHolder) extends Logging {
             .asJava)
 
       case StreamingQueryManagerCommand.CommandCase.GET_QUERY =>
-        val query = session.streams.get(command.getGetQuery)
-        respBuilder.setQuery(buildStreamingQueryInstance(query))
+        Option(session.streams.get(command.getGetQuery)).foreach { q =>
+          respBuilder.setQuery(buildStreamingQueryInstance(q))
+        }
 
       case StreamingQueryManagerCommand.CommandCase.AWAIT_ANY_TERMINATION =>
         if (command.getAwaitAnyTermination.hasTimeoutMs) {
diff --git a/python/pyspark/sql/tests/streaming/test_streaming.py 
b/python/pyspark/sql/tests/streaming/test_streaming.py
index 52fa19a8642..0eea86dc737 100644
--- a/python/pyspark/sql/tests/streaming/test_streaming.py
+++ b/python/pyspark/sql/tests/streaming/test_streaming.py
@@ -315,6 +315,19 @@ class StreamingTestsMixin:
             contains = msg in e.desc
         self.assertTrue(contains, "Exception tree doesn't contain the expected 
message: %s" % msg)
 
+    def test_query_manager_get(self):
+        df = self.spark.readStream.format("rate").load()
+        for q in self.spark.streams.active:
+            q.stop()
+        q = df.writeStream.format("noop").start()
+
+        self.assertTrue(q.isActive)
+        self.assertTrue(q.id == self.spark.streams.get(q.id).id)
+
+        q.stop()
+
+        self.assertIsNone(self.spark.streams.get(q.id))
+
     def test_query_manager_await_termination(self):
         df = 
self.spark.readStream.format("text").load("python/test_support/sql/streaming")
         for q in self.spark.streams.active:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to