This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new eee179135ed2 [SPARK-48090][SS][PYTHON][TESTS] Shorten the traceback in the test checking error message in UDF eee179135ed2 is described below commit eee179135ed21dbdd8b342d053c9eda849e2de77 Author: Hyukjin Kwon <gurwls...@apache.org> AuthorDate: Tue May 7 15:59:20 2024 +0900 [SPARK-48090][SS][PYTHON][TESTS] Shorten the traceback in the test checking error message in UDF ### What changes were proposed in this pull request? This PR reduces traceback so the actual error `ZeroDivisionError` can be tested in `pyspark.sql.tests.connect.streaming.test_parity_streaming.StreamingParityTests.test_stream_exception` ### Why are the changes needed? So long traceback doesn't affect the test case. It can fail as below: ``` ====================================================================== FAIL [1.883s]: test_stream_exception (pyspark.sql.tests.connect.streaming.test_parity_streaming.StreamingParityTests.test_stream_exception) ---------------------------------------------------------------------- Traceback (most recent call last): File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/tests/streaming/test_streaming.py", line 287, in test_stream_exception sq.processAllAvailable() File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/streaming/query.py", line 129, in processAllAvailable self._execute_streaming_query_cmd(cmd) File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/streaming/query.py", line 177, in _execute_streaming_query_cmd (_, properties) = self._session.client.execute_command(exec_cmd) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", line 982, in execute_command data, _, _, _, properties = self._execute_and_fetch(req) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", line 1283, in _execute_and_fetch for response in self._execute_and_fetch_as_iterator(req): File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", line 1264, in _execute_and_fetch_as_iterator self._handle_error(error) File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", line [150](https://github.com/HyukjinKwon/spark/actions/runs/8978991632/job/24660689666#step:9:151)3, in _handle_error self._handle_rpc_error(error) File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", line 1539, in _handle_rpc_error raise convert_exception(info, status.message) from None pyspark.errors.exceptions.connect.StreamingQueryException: [STREAM_FAILED] Query [id = 1c0c440d-0b48-41b1-9a03-071e7e13de82, runId = 692ec338-963a-43b1-89cb-2a8b7cb1e21a] terminated with exception: Job aborted due to stage failure: Task 0 in stage 39.0 failed 1 times, most recent failure: Lost task 0.0 in stage 39.0 (TID 58) (fv-az714-234.22nzjvkrszmuhkvqy55p1tioig.phxx.internal.cloudapp.net executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1834, in main process() File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1826, in process serializer.dump_stream(out_iter, outfile) File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 224, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 145, in dump_stream for obj in iterator: File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 213, in _batched for item in iterator: File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1734, in mapper result = tuple(f(*[a[o] for o in arg_offsets]) for arg_offsets, f in udfs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1734, in <genexpr> result = tuple(f(*[a[o] for o in arg_offsets]) for arg_offsets, f in udfs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 112, in <lambda> return args_kwargs_offsets, lambda *a: func(*a) ^^^^^^^^ File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/util.py", line 134, in wrapper return f(*args, **kwargs) ^^^^^^^^^^^^^^^^^^ File "/home/runner/work/spark/spark-3.... During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/tests/streaming/test_streaming.py", line 291, in test_stream_exception self._assert_exception_tree_contains_msg(e, "ZeroDivisionError") File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/tests/streaming/test_streaming.py", line 300, in _assert_exception_tree_contains_msg self._assert_exception_tree_contains_msg_connect(exception, msg) File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/tests/streaming/test_streaming.py", line 305, in _assert_exception_tree_contains_msg_connect self.assertTrue( AssertionError: False is not true : Exception tree doesn't contain the expected message: ZeroDivisionError ---------------------------------------------------------------------- ``` ### Does this PR introduce _any_ user-facing change? No, test-only. ### How was this patch tested? Tested in my own fork: https://github.com/HyukjinKwon/spark/actions/runs/8978991632 ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46426 from HyukjinKwon/SPARK-48090. Authored-by: Hyukjin Kwon <gurwls...@apache.org> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../pyspark/sql/tests/streaming/test_streaming.py | 61 +++++++++++----------- 1 file changed, 31 insertions(+), 30 deletions(-) diff --git a/python/pyspark/sql/tests/streaming/test_streaming.py b/python/pyspark/sql/tests/streaming/test_streaming.py index abfacdbbf059..1799f0d1336e 100644 --- a/python/pyspark/sql/tests/streaming/test_streaming.py +++ b/python/pyspark/sql/tests/streaming/test_streaming.py @@ -263,36 +263,37 @@ class StreamingTestsMixin: shutil.rmtree(tmpPath) def test_stream_exception(self): - sdf = self.spark.readStream.format("text").load("python/test_support/sql/streaming") - sq = sdf.writeStream.format("memory").queryName("query_explain").start() - try: - sq.processAllAvailable() - self.assertEqual(sq.exception(), None) - finally: - sq.stop() - - from pyspark.sql.functions import col, udf - from pyspark.errors import StreamingQueryException - - bad_udf = udf(lambda x: 1 / 0) - sq = ( - sdf.select(bad_udf(col("value"))) - .writeStream.format("memory") - .queryName("this_query") - .start() - ) - try: - # Process some data to fail the query - sq.processAllAvailable() - self.fail("bad udf should fail the query") - except StreamingQueryException as e: - # This is expected - self._assert_exception_tree_contains_msg(e, "ZeroDivisionError") - finally: - exception = sq.exception() - sq.stop() - self.assertIsInstance(exception, StreamingQueryException) - self._assert_exception_tree_contains_msg(exception, "ZeroDivisionError") + with self.sql_conf({"spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled": True}): + sdf = self.spark.readStream.format("text").load("python/test_support/sql/streaming") + sq = sdf.writeStream.format("memory").queryName("query_explain").start() + try: + sq.processAllAvailable() + self.assertEqual(sq.exception(), None) + finally: + sq.stop() + + from pyspark.sql.functions import col, udf + from pyspark.errors import StreamingQueryException + + bad_udf = udf(lambda x: 1 / 0) + sq = ( + sdf.select(bad_udf(col("value"))) + .writeStream.format("memory") + .queryName("this_query") + .start() + ) + try: + # Process some data to fail the query + sq.processAllAvailable() + self.fail("bad udf should fail the query") + except StreamingQueryException as e: + # This is expected + self._assert_exception_tree_contains_msg(e, "ZeroDivisionError") + finally: + exception = sq.exception() + sq.stop() + self.assertIsInstance(exception, StreamingQueryException) + self._assert_exception_tree_contains_msg(exception, "ZeroDivisionError") def test_query_manager_no_recreation(self): # SPARK-46873: There should not be a new StreamingQueryManager created every time --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org