This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new eee179135ed2 [SPARK-48090][SS][PYTHON][TESTS] Shorten the traceback in 
the test checking error message in UDF
eee179135ed2 is described below

commit eee179135ed21dbdd8b342d053c9eda849e2de77
Author: Hyukjin Kwon <gurwls...@apache.org>
AuthorDate: Tue May 7 15:59:20 2024 +0900

    [SPARK-48090][SS][PYTHON][TESTS] Shorten the traceback in the test checking 
error message in UDF
    
    ### What changes were proposed in this pull request?
    
    This PR reduces traceback so the actual error `ZeroDivisionError` can be 
tested in 
`pyspark.sql.tests.connect.streaming.test_parity_streaming.StreamingParityTests.test_stream_exception`
    
    ### Why are the changes needed?
    
    So long traceback doesn't affect the test case. It can fail as below:
    
    ```
    ======================================================================
    FAIL [1.883s]: test_stream_exception 
(pyspark.sql.tests.connect.streaming.test_parity_streaming.StreamingParityTests.test_stream_exception)
    ----------------------------------------------------------------------
    Traceback (most recent call last):
      File 
"/home/runner/work/spark/spark-3.5/python/pyspark/sql/tests/streaming/test_streaming.py",
 line 287, in test_stream_exception
        sq.processAllAvailable()
      File 
"/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/streaming/query.py",
 line 129, in processAllAvailable
        self._execute_streaming_query_cmd(cmd)
      File 
"/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/streaming/query.py",
 line 177, in _execute_streaming_query_cmd
        (_, properties) = self._session.client.execute_command(exec_cmd)
                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File 
"/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", 
line 982, in execute_command
        data, _, _, _, properties = self._execute_and_fetch(req)
                                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File 
"/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", 
line 1283, in _execute_and_fetch
        for response in self._execute_and_fetch_as_iterator(req):
      File 
"/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", 
line 1264, in _execute_and_fetch_as_iterator
        self._handle_error(error)
      File 
"/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", 
line 
[150](https://github.com/HyukjinKwon/spark/actions/runs/8978991632/job/24660689666#step:9:151)3,
 in _handle_error
        self._handle_rpc_error(error)
      File 
"/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", 
line 1539, in _handle_rpc_error
        raise convert_exception(info, status.message) from None
    pyspark.errors.exceptions.connect.StreamingQueryException: [STREAM_FAILED] 
Query [id = 1c0c440d-0b48-41b1-9a03-071e7e13de82, runId = 
692ec338-963a-43b1-89cb-2a8b7cb1e21a] terminated with exception: Job aborted 
due to stage failure: Task 0 in stage 39.0 failed 1 times, most recent failure: 
Lost task 0.0 in stage 39.0 (TID 58) 
(fv-az714-234.22nzjvkrszmuhkvqy55p1tioig.phxx.internal.cloudapp.net executor 
driver): org.apache.spark.api.python.PythonException: Traceback (most recent 
call last):
      File 
"/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
1834, in main
        process()
      File 
"/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
1826, in process
        serializer.dump_stream(out_iter, outfile)
      File 
"/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/serializers.py", 
line 224, in dump_stream
        self.serializer.dump_stream(self._batched(iterator), stream)
      File 
"/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/serializers.py", 
line 145, in dump_stream
        for obj in iterator:
      File 
"/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/serializers.py", 
line 213, in _batched
        for item in iterator:
      File 
"/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
1734, in mapper
        result = tuple(f(*[a[o] for o in arg_offsets]) for arg_offsets, f in 
udfs)
                 
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File 
"/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
1734, in <genexpr>
        result = tuple(f(*[a[o] for o in arg_offsets]) for arg_offsets, f in 
udfs)
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File 
"/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
112, in <lambda>
        return args_kwargs_offsets, lambda *a: func(*a)
                                               ^^^^^^^^
      File 
"/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/util.py", line 
134, in wrapper
        return f(*args, **kwargs)
               ^^^^^^^^^^^^^^^^^^
      File "/home/runner/work/spark/spark-3....
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File 
"/home/runner/work/spark/spark-3.5/python/pyspark/sql/tests/streaming/test_streaming.py",
 line 291, in test_stream_exception
        self._assert_exception_tree_contains_msg(e, "ZeroDivisionError")
      File 
"/home/runner/work/spark/spark-3.5/python/pyspark/sql/tests/streaming/test_streaming.py",
 line 300, in _assert_exception_tree_contains_msg
        self._assert_exception_tree_contains_msg_connect(exception, msg)
      File 
"/home/runner/work/spark/spark-3.5/python/pyspark/sql/tests/streaming/test_streaming.py",
 line 305, in _assert_exception_tree_contains_msg_connect
        self.assertTrue(
    AssertionError: False is not true : Exception tree doesn't contain the 
expected message: ZeroDivisionError
    
    ----------------------------------------------------------------------
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, test-only.
    
    ### How was this patch tested?
    
    Tested in my own fork: 
https://github.com/HyukjinKwon/spark/actions/runs/8978991632
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #46426 from HyukjinKwon/SPARK-48090.
    
    Authored-by: Hyukjin Kwon <gurwls...@apache.org>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../pyspark/sql/tests/streaming/test_streaming.py  | 61 +++++++++++-----------
 1 file changed, 31 insertions(+), 30 deletions(-)

diff --git a/python/pyspark/sql/tests/streaming/test_streaming.py 
b/python/pyspark/sql/tests/streaming/test_streaming.py
index abfacdbbf059..1799f0d1336e 100644
--- a/python/pyspark/sql/tests/streaming/test_streaming.py
+++ b/python/pyspark/sql/tests/streaming/test_streaming.py
@@ -263,36 +263,37 @@ class StreamingTestsMixin:
             shutil.rmtree(tmpPath)
 
     def test_stream_exception(self):
-        sdf = 
self.spark.readStream.format("text").load("python/test_support/sql/streaming")
-        sq = 
sdf.writeStream.format("memory").queryName("query_explain").start()
-        try:
-            sq.processAllAvailable()
-            self.assertEqual(sq.exception(), None)
-        finally:
-            sq.stop()
-
-        from pyspark.sql.functions import col, udf
-        from pyspark.errors import StreamingQueryException
-
-        bad_udf = udf(lambda x: 1 / 0)
-        sq = (
-            sdf.select(bad_udf(col("value")))
-            .writeStream.format("memory")
-            .queryName("this_query")
-            .start()
-        )
-        try:
-            # Process some data to fail the query
-            sq.processAllAvailable()
-            self.fail("bad udf should fail the query")
-        except StreamingQueryException as e:
-            # This is expected
-            self._assert_exception_tree_contains_msg(e, "ZeroDivisionError")
-        finally:
-            exception = sq.exception()
-            sq.stop()
-        self.assertIsInstance(exception, StreamingQueryException)
-        self._assert_exception_tree_contains_msg(exception, 
"ZeroDivisionError")
+        with 
self.sql_conf({"spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled": 
True}):
+            sdf = 
self.spark.readStream.format("text").load("python/test_support/sql/streaming")
+            sq = 
sdf.writeStream.format("memory").queryName("query_explain").start()
+            try:
+                sq.processAllAvailable()
+                self.assertEqual(sq.exception(), None)
+            finally:
+                sq.stop()
+
+            from pyspark.sql.functions import col, udf
+            from pyspark.errors import StreamingQueryException
+
+            bad_udf = udf(lambda x: 1 / 0)
+            sq = (
+                sdf.select(bad_udf(col("value")))
+                .writeStream.format("memory")
+                .queryName("this_query")
+                .start()
+            )
+            try:
+                # Process some data to fail the query
+                sq.processAllAvailable()
+                self.fail("bad udf should fail the query")
+            except StreamingQueryException as e:
+                # This is expected
+                self._assert_exception_tree_contains_msg(e, 
"ZeroDivisionError")
+            finally:
+                exception = sq.exception()
+                sq.stop()
+            self.assertIsInstance(exception, StreamingQueryException)
+            self._assert_exception_tree_contains_msg(exception, 
"ZeroDivisionError")
 
     def test_query_manager_no_recreation(self):
         # SPARK-46873: There should not be a new StreamingQueryManager created 
every time


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to