This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 18e07956e94 [SPARK-45739][PYTHON] Catch IOException instead of EOFException alone for faulthandler 18e07956e94 is described below commit 18e07956e9476bf3e9264eb878a25b838feff4a6 Author: Hyukjin Kwon <gurwls...@apache.org> AuthorDate: Wed Nov 1 07:33:35 2023 +0900 [SPARK-45739][PYTHON] Catch IOException instead of EOFException alone for faulthandler ### What changes were proposed in this pull request? This PR improves `spark.python.worker.faulthandler.enabled` feature by catching `IOException` instead of `EOFException` (narrower). ### Why are the changes needed? Exceptions such as `java.net.SocketException: Connection reset` can happen because the worker unexpectedly die. We should better catch all IO exception there. ### Does this PR introduce _any_ user-facing change? Yes, but only in special cases. When the worker dies unexpectedly during its initialization, this can happen. ### How was this patch tested? I tested this with Spark Connect: ```bash $ cat <<EOT >> malformed_daemon.py import ctypes from pyspark import daemon from pyspark import TaskContext def raise_segfault(): ctypes.string_at(0) # Throw a segmentation fault during init. TaskContext._getOrCreate = raise_segfault if __name__ == '__main__': daemon.manager() EOT ``` ```bash ./sbin/stop-connect-server.sh$ ./sbin/start-connect-server.sh --conf spark.python.daemon.module=malformed_daemon --conf spark.python.worker.faulthandler.enabled=true --jars `ls connector/connect/server/target/**/spark-connect*SNAPSHOT.jar` ``` ```bash ./bin/pyspark --remote "sc://localhost:15002" ``` ```python from pyspark.sql.functions import udf spark.addArtifact("malformed_daemon.py", pyfile=True) spark.range(1).select(udf(lambda x: x)("id")).collect() ``` **Before** ``` Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/.../spark/python/pyspark/sql/connect/dataframe.py", line 1710, in collect table, schema = self._to_table() ... File "/.../spark/python/pyspark/sql/connect/client/core.py", line 1575, in _handle_rpc_error raise convert_exception( pyspark.errors.exceptions.connect.SparkConnectGrpcException: (org.apache.spark.SparkException) Job aborted due to stage failure: Task 8 in stage 0.0 failed 1 times, most recent failure: Lost task 8.0 in stage 0.0 (TID 8) (192.168.123.102 executor driver): java.net.SocketException: Connection reset at ... java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:833) Driver stacktrace: JVM stacktrace: org.apache.spark.SparkException: Job aborted due to stage failure: Task 8 in stage 0.0 failed 1 times, most recent failure: Lost task 8.0 in stage 0.0 (TID 8) (192.168.123.102 executor driver): java.net.SocketException: Connection reset at java.base/sun.nio.ch.SocketChannelImpl.throwConnectionReset(SocketChannelImpl.java:394) at ... java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.lang.Thread.run(Thread.java:833) ``` **After** ``` Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/.../spark/python/pyspark/sql/connect/dataframe.py", line 1710, in collect table, schema = self._to_table() ... "/.../spark/python/pyspark/sql/connect/client/core.py", line 1575, in _handle_rpc_error raise convert_exception( pyspark.errors.exceptions.connect.SparkConnectGrpcException: (org.apache.spark.SparkException) Job aborted due to stage failure: Task 4 in stage 0.0 failed 1 times, most recent failure: Lost task 4.0 in stage 0.0 (TID 4) (192.168.123.102 executor driver): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed): Fatal Python error: Segmentation fault Current thread 0x00007ff85d338700 (most recent call first): File "/.../miniconda3/envs/python3.9/lib/python3.9/ctypes/__init__.py", line 525 in string_at File "/private/var/folders/0c/q8y15ybd3tn7sr2_jmbmftr80000gp/T/spark-397ac42b-c05b-4f50-a6b8-ede30254edc9/userFiles-fd70c41e-46b9-44ed-b781-f8dea10bcb4a/5ce3da24-912a-4207-af82-5dfc8a845714/malformed_daemon.py", line 8 in raise_segfault File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 1450 in main ... "/.../miniconda3/envs/python3.9/lib/python3.9/runpy.py", line 197 in _run_module_as_main at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:550) at ... java.base/java.io.DataInputStream.readInt(DataInputStream.java:393) at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:92) ... 30 more Driver stacktrace: JVM stacktrace: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 0.0 failed 1 times, most recent failure: Lost task 4.0 in stage 0.0 (TID 4) (192.168.123.102 executor driver): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed): Fatal Python error: Segmentation fault Current thread 0x00007ff85d338700 (most recent call first): File "/.../miniconda3/envs/python3.9/lib/python3.9/ctypes/__init__.py", line 525 in string_at ... ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43600 from HyukjinKwon/more-segfault. Authored-by: Hyukjin Kwon <gurwls...@apache.org> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index 840352eaf4a..d265bb2fd8b 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -542,12 +542,12 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( logError("This may have been caused by a prior exception:", writer.exception.get) throw writer.exception.get - case eof: EOFException if faultHandlerEnabled && pid.isDefined && + case e: IOException if faultHandlerEnabled && pid.isDefined && JavaFiles.exists(BasePythonRunner.faultHandlerLogPath(pid.get)) => val path = BasePythonRunner.faultHandlerLogPath(pid.get) val error = String.join("\n", JavaFiles.readAllLines(path)) + "\n" JavaFiles.deleteIfExists(path) - throw new SparkException(s"Python worker exited unexpectedly (crashed): $error", eof) + throw new SparkException(s"Python worker exited unexpectedly (crashed): $error", e) case eof: EOFException => throw new SparkException("Python worker exited unexpectedly (crashed)", eof) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org