This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new f92580aa111f [SPARK-48090][SS][PYTHON][TESTS] Shorten the traceback in the test checking error message in UDF f92580aa111f is described below commit f92580aa111f8531cdb229a2d1bb0234764d7262 Author: Hyukjin Kwon <gurwls...@apache.org> AuthorDate: Tue May 7 15:59:20 2024 +0900 [SPARK-48090][SS][PYTHON][TESTS] Shorten the traceback in the test checking error message in UDF This PR reduces traceback so the actual error `ZeroDivisionError` can be tested in `pyspark.sql.tests.connect.streaming.test_parity_streaming.StreamingParityTests.test_stream_exception` So long traceback doesn't affect the test case. It can fail as below: ``` ====================================================================== FAIL [1.883s]: test_stream_exception (pyspark.sql.tests.connect.streaming.test_parity_streaming.StreamingParityTests.test_stream_exception) ---------------------------------------------------------------------- Traceback (most recent call last): File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/tests/streaming/test_streaming.py", line 287, in test_stream_exception sq.processAllAvailable() File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/streaming/query.py", line 129, in processAllAvailable self._execute_streaming_query_cmd(cmd) File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/streaming/query.py", line 177, in _execute_streaming_query_cmd (_, properties) = self._session.client.execute_command(exec_cmd) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", line 982, in execute_command data, _, _, _, properties = self._execute_and_fetch(req) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", line 1283, in _execute_and_fetch for response in self._execute_and_fetch_as_iterator(req): File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", line 1264, in _execute_and_fetch_as_iterator self._handle_error(error) File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", line [150](https://github.com/HyukjinKwon/spark/actions/runs/8978991632/job/24660689666#step:9:151)3, in _handle_error self._handle_rpc_error(error) File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", line 1539, in _handle_rpc_error raise convert_exception(info, status.message) from None pyspark.errors.exceptions.connect.StreamingQueryException: [STREAM_FAILED] Query [id = 1c0c440d-0b48-41b1-9a03-071e7e13de82, runId = 692ec338-963a-43b1-89cb-2a8b7cb1e21a] terminated with exception: Job aborted due to stage failure: Task 0 in stage 39.0 failed 1 times, most recent failure: Lost task 0.0 in stage 39.0 (TID 58) (fv-az714-234.22nzjvkrszmuhkvqy55p1tioig.phxx.internal.cloudapp.net executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1834, in main process() File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1826, in process serializer.dump_stream(out_iter, outfile) File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 224, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 145, in dump_stream for obj in iterator: File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 213, in _batched for item in iterator: File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1734, in mapper result = tuple(f(*[a[o] for o in arg_offsets]) for arg_offsets, f in udfs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1734, in <genexpr> result = tuple(f(*[a[o] for o in arg_offsets]) for arg_offsets, f in udfs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 112, in <lambda> return args_kwargs_offsets, lambda *a: func(*a) ^^^^^^^^ File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/util.py", line 134, in wrapper return f(*args, **kwargs) ^^^^^^^^^^^^^^^^^^ File "/home/runner/work/spark/spark-3.... During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/tests/streaming/test_streaming.py", line 291, in test_stream_exception self._assert_exception_tree_contains_msg(e, "ZeroDivisionError") File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/tests/streaming/test_streaming.py", line 300, in _assert_exception_tree_contains_msg self._assert_exception_tree_contains_msg_connect(exception, msg) File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/tests/streaming/test_streaming.py", line 305, in _assert_exception_tree_contains_msg_connect self.assertTrue( AssertionError: False is not true : Exception tree doesn't contain the expected message: ZeroDivisionError ---------------------------------------------------------------------- ``` No, test-only. Tested in my own fork: https://github.com/HyukjinKwon/spark/actions/runs/8978991632 No. Closes #46426 from HyukjinKwon/SPARK-48090. Authored-by: Hyukjin Kwon <gurwls...@apache.org> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> (cherry picked from commit eee179135ed21dbdd8b342d053c9eda849e2de77) Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../connect/streaming/test_parity_streaming.py | 10 +--- .../pyspark/sql/tests/streaming/test_streaming.py | 61 +++++++++++----------- 2 files changed, 32 insertions(+), 39 deletions(-) diff --git a/python/pyspark/sql/tests/connect/streaming/test_parity_streaming.py b/python/pyspark/sql/tests/connect/streaming/test_parity_streaming.py index e7c1958064bb..6b23c15775fe 100644 --- a/python/pyspark/sql/tests/connect/streaming/test_parity_streaming.py +++ b/python/pyspark/sql/tests/connect/streaming/test_parity_streaming.py @@ -14,20 +14,12 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import os -import unittest - from pyspark.sql.tests.streaming.test_streaming import StreamingTestsMixin from pyspark.testing.connectutils import ReusedConnectTestCase class StreamingParityTests(StreamingTestsMixin, ReusedConnectTestCase): - # TODO(SPARK-48090): Reenable this test case - @unittest.skipIf( - "SPARK_SKIP_CONNECT_COMPAT_TESTS" in os.environ, "Failed with different Client <> Server" - ) - def test_stream_exception(self): - super(StreamingParityTests, self).test_stream_exception() + pass if __name__ == "__main__": diff --git a/python/pyspark/sql/tests/streaming/test_streaming.py b/python/pyspark/sql/tests/streaming/test_streaming.py index 0eea86dc7375..69a5a2b90986 100644 --- a/python/pyspark/sql/tests/streaming/test_streaming.py +++ b/python/pyspark/sql/tests/streaming/test_streaming.py @@ -264,36 +264,37 @@ class StreamingTestsMixin: shutil.rmtree(tmpPath) def test_stream_exception(self): - sdf = self.spark.readStream.format("text").load("python/test_support/sql/streaming") - sq = sdf.writeStream.format("memory").queryName("query_explain").start() - try: - sq.processAllAvailable() - self.assertEqual(sq.exception(), None) - finally: - sq.stop() - - from pyspark.sql.functions import col, udf - from pyspark.errors import StreamingQueryException - - bad_udf = udf(lambda x: 1 / 0) - sq = ( - sdf.select(bad_udf(col("value"))) - .writeStream.format("memory") - .queryName("this_query") - .start() - ) - try: - # Process some data to fail the query - sq.processAllAvailable() - self.fail("bad udf should fail the query") - except StreamingQueryException as e: - # This is expected - self._assert_exception_tree_contains_msg(e, "ZeroDivisionError") - finally: - exception = sq.exception() - sq.stop() - self.assertIsInstance(exception, StreamingQueryException) - self._assert_exception_tree_contains_msg(exception, "ZeroDivisionError") + with self.sql_conf({"spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled": True}): + sdf = self.spark.readStream.format("text").load("python/test_support/sql/streaming") + sq = sdf.writeStream.format("memory").queryName("query_explain").start() + try: + sq.processAllAvailable() + self.assertEqual(sq.exception(), None) + finally: + sq.stop() + + from pyspark.sql.functions import col, udf + from pyspark.errors import StreamingQueryException + + bad_udf = udf(lambda x: 1 / 0) + sq = ( + sdf.select(bad_udf(col("value"))) + .writeStream.format("memory") + .queryName("this_query") + .start() + ) + try: + # Process some data to fail the query + sq.processAllAvailable() + self.fail("bad udf should fail the query") + except StreamingQueryException as e: + # This is expected + self._assert_exception_tree_contains_msg(e, "ZeroDivisionError") + finally: + exception = sq.exception() + sq.stop() + self.assertIsInstance(exception, StreamingQueryException) + self._assert_exception_tree_contains_msg(exception, "ZeroDivisionError") def _assert_exception_tree_contains_msg(self, exception, msg): if isinstance(exception, SparkConnectException): --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org