This is an automated email from the ASF dual-hosted git repository.

haejoon pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 87a50a503f47 [SPARK-51186][PYTHON] Add 
`StreamingPythonRunnerInitializationException` to PySpark base exception
87a50a503f47 is described below

commit 87a50a503f47b45abb1df3f317d4400066dd6252
Author: Haejoon Lee <haejoon....@databricks.com>
AuthorDate: Fri Feb 14 15:48:23 2025 +0900

    [SPARK-51186][PYTHON] Add `StreamingPythonRunnerInitializationException` to 
PySpark base exception
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to add `StreamingPythonRunnerInitializationException` to 
PySpark base exception
    
    ### Why are the changes needed?
    
    Consistency of error message
    
    ### Does this PR introduce _any_ user-facing change?
    
    No API changes, but the user-facing error message would be improved
    
    ### How was this patch tested?
    
    Updated the existing UT
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #49917 from itholic/remove_grpc_exception.
    
    Authored-by: Haejoon Lee <haejoon....@databricks.com>
    Signed-off-by: Haejoon Lee <haejoon....@databricks.com>
    (cherry picked from commit 7992a2fd519c6e0fe759c138c78478701b5a5674)
    Signed-off-by: Haejoon Lee <haejoon....@databricks.com>
---
 python/docs/source/reference/pyspark.errors.rst               |  1 +
 python/pyspark/errors/__init__.py                             |  2 ++
 python/pyspark/errors/exceptions/base.py                      |  6 ++++++
 python/pyspark/errors/exceptions/connect.py                   | 11 +++++++++++
 .../sql/tests/connect/streaming/test_parity_foreach_batch.py  |  4 ++--
 5 files changed, 22 insertions(+), 2 deletions(-)

diff --git a/python/docs/source/reference/pyspark.errors.rst 
b/python/docs/source/reference/pyspark.errors.rst
index 2744e515fd53..60655b342248 100644
--- a/python/docs/source/reference/pyspark.errors.rst
+++ b/python/docs/source/reference/pyspark.errors.rst
@@ -56,6 +56,7 @@ Classes
     SparkUpgradeException
     SparkNoSuchElementException
     StreamingQueryException
+    StreamingPythonRunnerInitializationException
     TempTableAlreadyExistsException
     UnknownException
     UnsupportedOperationException
diff --git a/python/pyspark/errors/__init__.py 
b/python/pyspark/errors/__init__.py
index 85e9bb65f0a6..39bd6698b798 100644
--- a/python/pyspark/errors/__init__.py
+++ b/python/pyspark/errors/__init__.py
@@ -50,6 +50,7 @@ from pyspark.errors.exceptions.base import (  # noqa: F401
     PySparkKeyError,
     QueryContext,
     QueryContextType,
+    StreamingPythonRunnerInitializationException,
 )
 
 
@@ -85,4 +86,5 @@ __all__ = [
     "PySparkKeyError",
     "QueryContext",
     "QueryContextType",
+    "StreamingPythonRunnerInitializationException",
 ]
diff --git a/python/pyspark/errors/exceptions/base.py 
b/python/pyspark/errors/exceptions/base.py
index 60f6d3d0475b..21d87e8cd028 100644
--- a/python/pyspark/errors/exceptions/base.py
+++ b/python/pyspark/errors/exceptions/base.py
@@ -244,6 +244,12 @@ class StreamingQueryException(PySparkException):
     """
 
 
+class StreamingPythonRunnerInitializationException(PySparkException):
+    """
+    Failed to initialize a streaming Python runner.
+    """
+
+
 class QueryExecutionException(PySparkException):
     """
     Failed to execute a query.
diff --git a/python/pyspark/errors/exceptions/connect.py 
b/python/pyspark/errors/exceptions/connect.py
index cb67d1f3cf3f..5837d89db94d 100644
--- a/python/pyspark/errors/exceptions/connect.py
+++ b/python/pyspark/errors/exceptions/connect.py
@@ -36,6 +36,7 @@ from pyspark.errors.exceptions.base import (
     SparkUpgradeException as BaseSparkUpgradeException,
     QueryContext as BaseQueryContext,
     QueryContextType,
+    StreamingPythonRunnerInitializationException as 
BaseStreamingPythonRunnerInitException,
 )
 
 if TYPE_CHECKING:
@@ -307,6 +308,14 @@ class InvalidCommandInput(SparkConnectGrpcException):
     """
 
 
+class StreamingPythonRunnerInitializationException(
+    SparkConnectGrpcException, BaseStreamingPythonRunnerInitException
+):
+    """
+    Failed to initialize a streaming Python runner.
+    """
+
+
 # Update EXCEPTION_CLASS_MAPPING here when adding a new exception
 EXCEPTION_CLASS_MAPPING = {
     "org.apache.spark.sql.catalyst.parser.ParseException": ParseException,
@@ -326,6 +335,8 @@ EXCEPTION_CLASS_MAPPING = {
     "org.apache.spark.SparkException": SparkException,
     "org.apache.spark.sql.connect.common.InvalidPlanInput": InvalidPlanInput,
     "org.apache.spark.sql.connect.common.InvalidCommandInput": 
InvalidCommandInput,
+    "org.apache.spark.api.python.StreamingPythonRunner"
+    "$StreamingPythonRunnerInitializationException": 
StreamingPythonRunnerInitializationException,
 }
 
 
diff --git 
a/python/pyspark/sql/tests/connect/streaming/test_parity_foreach_batch.py 
b/python/pyspark/sql/tests/connect/streaming/test_parity_foreach_batch.py
index 9d28ec0e1970..632fa4628d1b 100644
--- a/python/pyspark/sql/tests/connect/streaming/test_parity_foreach_batch.py
+++ b/python/pyspark/sql/tests/connect/streaming/test_parity_foreach_batch.py
@@ -22,7 +22,7 @@ from pyspark.testing.connectutils import 
ReusedConnectTestCase, should_test_conn
 from pyspark.errors import PySparkPicklingError
 
 if should_test_connect:
-    from pyspark.errors.exceptions.connect import SparkConnectGrpcException
+    from pyspark.errors.exceptions.connect import 
StreamingPythonRunnerInitializationException
 
 
 class StreamingForeachBatchParityTests(StreamingTestsForeachBatchMixin, 
ReusedConnectTestCase):
@@ -95,7 +95,7 @@ class 
StreamingForeachBatchParityTests(StreamingTestsForeachBatchMixin, ReusedCo
             print(obj)
 
         # Assert that an exception occurs during the initialization
-        with self.assertRaises(SparkConnectGrpcException) as error:
+        with self.assertRaises(StreamingPythonRunnerInitializationException) 
as error:
             df.select("value").writeStream.foreachBatch(fcn).start()
 
         # Assert that the error message contains the expected string


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to