This is an automated email from the ASF dual-hosted git repository. haejoon pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push: new 87a50a503f47 [SPARK-51186][PYTHON] Add `StreamingPythonRunnerInitializationException` to PySpark base exception 87a50a503f47 is described below commit 87a50a503f47b45abb1df3f317d4400066dd6252 Author: Haejoon Lee <haejoon....@databricks.com> AuthorDate: Fri Feb 14 15:48:23 2025 +0900 [SPARK-51186][PYTHON] Add `StreamingPythonRunnerInitializationException` to PySpark base exception ### What changes were proposed in this pull request? This PR proposes to add `StreamingPythonRunnerInitializationException` to PySpark base exception ### Why are the changes needed? Consistency of error message ### Does this PR introduce _any_ user-facing change? No API changes, but the user-facing error message would be improved ### How was this patch tested? Updated the existing UT ### Was this patch authored or co-authored using generative AI tooling? No Closes #49917 from itholic/remove_grpc_exception. Authored-by: Haejoon Lee <haejoon....@databricks.com> Signed-off-by: Haejoon Lee <haejoon....@databricks.com> (cherry picked from commit 7992a2fd519c6e0fe759c138c78478701b5a5674) Signed-off-by: Haejoon Lee <haejoon....@databricks.com> --- python/docs/source/reference/pyspark.errors.rst | 1 + python/pyspark/errors/__init__.py | 2 ++ python/pyspark/errors/exceptions/base.py | 6 ++++++ python/pyspark/errors/exceptions/connect.py | 11 +++++++++++ .../sql/tests/connect/streaming/test_parity_foreach_batch.py | 4 ++-- 5 files changed, 22 insertions(+), 2 deletions(-) diff --git a/python/docs/source/reference/pyspark.errors.rst b/python/docs/source/reference/pyspark.errors.rst index 2744e515fd53..60655b342248 100644 --- a/python/docs/source/reference/pyspark.errors.rst +++ b/python/docs/source/reference/pyspark.errors.rst @@ -56,6 +56,7 @@ Classes SparkUpgradeException SparkNoSuchElementException StreamingQueryException + StreamingPythonRunnerInitializationException TempTableAlreadyExistsException UnknownException UnsupportedOperationException diff --git a/python/pyspark/errors/__init__.py b/python/pyspark/errors/__init__.py index 85e9bb65f0a6..39bd6698b798 100644 --- a/python/pyspark/errors/__init__.py +++ b/python/pyspark/errors/__init__.py @@ -50,6 +50,7 @@ from pyspark.errors.exceptions.base import ( # noqa: F401 PySparkKeyError, QueryContext, QueryContextType, + StreamingPythonRunnerInitializationException, ) @@ -85,4 +86,5 @@ __all__ = [ "PySparkKeyError", "QueryContext", "QueryContextType", + "StreamingPythonRunnerInitializationException", ] diff --git a/python/pyspark/errors/exceptions/base.py b/python/pyspark/errors/exceptions/base.py index 60f6d3d0475b..21d87e8cd028 100644 --- a/python/pyspark/errors/exceptions/base.py +++ b/python/pyspark/errors/exceptions/base.py @@ -244,6 +244,12 @@ class StreamingQueryException(PySparkException): """ +class StreamingPythonRunnerInitializationException(PySparkException): + """ + Failed to initialize a streaming Python runner. + """ + + class QueryExecutionException(PySparkException): """ Failed to execute a query. diff --git a/python/pyspark/errors/exceptions/connect.py b/python/pyspark/errors/exceptions/connect.py index cb67d1f3cf3f..5837d89db94d 100644 --- a/python/pyspark/errors/exceptions/connect.py +++ b/python/pyspark/errors/exceptions/connect.py @@ -36,6 +36,7 @@ from pyspark.errors.exceptions.base import ( SparkUpgradeException as BaseSparkUpgradeException, QueryContext as BaseQueryContext, QueryContextType, + StreamingPythonRunnerInitializationException as BaseStreamingPythonRunnerInitException, ) if TYPE_CHECKING: @@ -307,6 +308,14 @@ class InvalidCommandInput(SparkConnectGrpcException): """ +class StreamingPythonRunnerInitializationException( + SparkConnectGrpcException, BaseStreamingPythonRunnerInitException +): + """ + Failed to initialize a streaming Python runner. + """ + + # Update EXCEPTION_CLASS_MAPPING here when adding a new exception EXCEPTION_CLASS_MAPPING = { "org.apache.spark.sql.catalyst.parser.ParseException": ParseException, @@ -326,6 +335,8 @@ EXCEPTION_CLASS_MAPPING = { "org.apache.spark.SparkException": SparkException, "org.apache.spark.sql.connect.common.InvalidPlanInput": InvalidPlanInput, "org.apache.spark.sql.connect.common.InvalidCommandInput": InvalidCommandInput, + "org.apache.spark.api.python.StreamingPythonRunner" + "$StreamingPythonRunnerInitializationException": StreamingPythonRunnerInitializationException, } diff --git a/python/pyspark/sql/tests/connect/streaming/test_parity_foreach_batch.py b/python/pyspark/sql/tests/connect/streaming/test_parity_foreach_batch.py index 9d28ec0e1970..632fa4628d1b 100644 --- a/python/pyspark/sql/tests/connect/streaming/test_parity_foreach_batch.py +++ b/python/pyspark/sql/tests/connect/streaming/test_parity_foreach_batch.py @@ -22,7 +22,7 @@ from pyspark.testing.connectutils import ReusedConnectTestCase, should_test_conn from pyspark.errors import PySparkPicklingError if should_test_connect: - from pyspark.errors.exceptions.connect import SparkConnectGrpcException + from pyspark.errors.exceptions.connect import StreamingPythonRunnerInitializationException class StreamingForeachBatchParityTests(StreamingTestsForeachBatchMixin, ReusedConnectTestCase): @@ -95,7 +95,7 @@ class StreamingForeachBatchParityTests(StreamingTestsForeachBatchMixin, ReusedCo print(obj) # Assert that an exception occurs during the initialization - with self.assertRaises(SparkConnectGrpcException) as error: + with self.assertRaises(StreamingPythonRunnerInitializationException) as error: df.select("value").writeStream.foreachBatch(fcn).start() # Assert that the error message contains the expected string --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org