This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new 1a3dce36e2a [SPARK-43032][FOLLOWUP][SS][CONNECT] StreamingQueryManager bug fix 1a3dce36e2a is described below commit 1a3dce36e2ad8d0e0bb2e1123864764077320466 Author: Wei Liu <wei....@databricks.com> AuthorDate: Fri Aug 11 10:23:56 2023 +0900 [SPARK-43032][FOLLOWUP][SS][CONNECT] StreamingQueryManager bug fix When calling `spark.streams.get(q.id)` on a stopped query q. It should return None in python and null in scala client. But right now it throws a null pointer exception. This PR fixes this issue. Bug fix No Added unit tests Closes #42437 from WweiL/streaming-query-manager-get-bug-fix. Authored-by: Wei Liu <wei....@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> (cherry picked from commit 42eb4223628653db71950f161a745432d1b45502) Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../spark/sql/streaming/ClientStreamingQuerySuite.scala | 2 ++ .../spark/sql/connect/planner/SparkConnectPlanner.scala | 5 +++-- python/pyspark/sql/tests/streaming/test_streaming.py | 13 +++++++++++++ 3 files changed, 18 insertions(+), 2 deletions(-) diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala index f9e6e686495..ab92431bc11 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala @@ -268,6 +268,8 @@ class ClientStreamingQuerySuite extends QueryTest with SQLHelper with Logging { q.stop() assert(!q1.isActive) + + assert(spark.streams.get(q.id) == null) } test("streaming query listener") { diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index d59d01b4ce3..49bac17a4f4 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -3059,8 +3059,9 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging { .asJava) case StreamingQueryManagerCommand.CommandCase.GET_QUERY => - val query = session.streams.get(command.getGetQuery) - respBuilder.setQuery(buildStreamingQueryInstance(query)) + Option(session.streams.get(command.getGetQuery)).foreach { q => + respBuilder.setQuery(buildStreamingQueryInstance(q)) + } case StreamingQueryManagerCommand.CommandCase.AWAIT_ANY_TERMINATION => if (command.getAwaitAnyTermination.hasTimeoutMs) { diff --git a/python/pyspark/sql/tests/streaming/test_streaming.py b/python/pyspark/sql/tests/streaming/test_streaming.py index 52fa19a8642..0eea86dc737 100644 --- a/python/pyspark/sql/tests/streaming/test_streaming.py +++ b/python/pyspark/sql/tests/streaming/test_streaming.py @@ -315,6 +315,19 @@ class StreamingTestsMixin: contains = msg in e.desc self.assertTrue(contains, "Exception tree doesn't contain the expected message: %s" % msg) + def test_query_manager_get(self): + df = self.spark.readStream.format("rate").load() + for q in self.spark.streams.active: + q.stop() + q = df.writeStream.format("noop").start() + + self.assertTrue(q.isActive) + self.assertTrue(q.id == self.spark.streams.get(q.id).id) + + q.stop() + + self.assertIsNone(self.spark.streams.get(q.id)) + def test_query_manager_await_termination(self): df = self.spark.readStream.format("text").load("python/test_support/sql/streaming") for q in self.spark.streams.active: --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org