This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 18e07956e94 [SPARK-45739][PYTHON] Catch IOException instead of 
EOFException alone for faulthandler
18e07956e94 is described below

commit 18e07956e9476bf3e9264eb878a25b838feff4a6
Author: Hyukjin Kwon <gurwls...@apache.org>
AuthorDate: Wed Nov 1 07:33:35 2023 +0900

    [SPARK-45739][PYTHON] Catch IOException instead of EOFException alone for 
faulthandler
    
    ### What changes were proposed in this pull request?
    
    This PR improves `spark.python.worker.faulthandler.enabled` feature by 
catching `IOException` instead of `EOFException` (narrower).
    
    ### Why are the changes needed?
    
    Exceptions such as `java.net.SocketException: Connection reset` can happen 
because the worker unexpectedly die. We should better catch all IO exception 
there.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, but only in special cases. When the worker dies unexpectedly during 
its initialization, this can happen.
    
    ### How was this patch tested?
    
    I tested this with Spark Connect:
    
    ```bash
    $ cat <<EOT >> malformed_daemon.py
    import ctypes
    
    from pyspark import daemon
    from pyspark import TaskContext
    
    def raise_segfault():
        ctypes.string_at(0)
    
    # Throw a segmentation fault during init.
    TaskContext._getOrCreate = raise_segfault
    
    if __name__ == '__main__':
        daemon.manager()
    EOT
    ```
    
    ```bash
    ./sbin/stop-connect-server.sh$ ./sbin/start-connect-server.sh --conf 
spark.python.daemon.module=malformed_daemon --conf 
spark.python.worker.faulthandler.enabled=true --jars `ls 
connector/connect/server/target/**/spark-connect*SNAPSHOT.jar`
    ```
    ```bash
    ./bin/pyspark --remote "sc://localhost:15002"
    ```
    
    ```python
    from pyspark.sql.functions import udf
    spark.addArtifact("malformed_daemon.py", pyfile=True)
    spark.range(1).select(udf(lambda x: x)("id")).collect()
    ```
    
    **Before**
    
    ```
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "/.../spark/python/pyspark/sql/connect/dataframe.py", line 1710, in 
collect
        table, schema = self._to_table()
        ...
      File "/.../spark/python/pyspark/sql/connect/client/core.py", line 1575, 
in _handle_rpc_error
        raise convert_exception(
    pyspark.errors.exceptions.connect.SparkConnectGrpcException: 
(org.apache.spark.SparkException) Job aborted due to stage failure: Task 8 in 
stage 0.0 failed 1 times, most recent failure: Lost task 8.0 in stage 0.0 (TID 
8) (192.168.123.102 executor driver): java.net.SocketException: Connection reset
            at
          ...
    
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
            at java.base/java.lang.Thread.run(Thread.java:833)
    
    Driver stacktrace:
    
    JVM stacktrace:
    org.apache.spark.SparkException: Job aborted due to stage failure: Task 8 
in stage 0.0 failed 1 times, most recent failure: Lost task 8.0 in stage 0.0 
(TID 8) (192.168.123.102 executor driver): java.net.SocketException: Connection 
reset
            at 
java.base/sun.nio.ch.SocketChannelImpl.throwConnectionReset(SocketChannelImpl.java:394)
            at
        ...
    
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
            at java.lang.Thread.run(Thread.java:833)
    ```
    
    **After**
    
    ```
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "/.../spark/python/pyspark/sql/connect/dataframe.py", line 1710, in 
collect
        table, schema = self._to_table()
        ...
    "/.../spark/python/pyspark/sql/connect/client/core.py", line 1575, in 
_handle_rpc_error
        raise convert_exception(
    pyspark.errors.exceptions.connect.SparkConnectGrpcException: 
(org.apache.spark.SparkException) Job aborted due to stage failure: Task 4 in 
stage 0.0 failed 1 times, most recent failure: Lost task 4.0 in stage 0.0 (TID 
4) (192.168.123.102 executor driver): org.apache.spark.SparkException: Python 
worker exited unexpectedly (crashed): Fatal Python error: Segmentation fault
    
    Current thread 0x00007ff85d338700 (most recent call first):
      File "/.../miniconda3/envs/python3.9/lib/python3.9/ctypes/__init__.py", 
line 525 in string_at
      File 
"/private/var/folders/0c/q8y15ybd3tn7sr2_jmbmftr80000gp/T/spark-397ac42b-c05b-4f50-a6b8-ede30254edc9/userFiles-fd70c41e-46b9-44ed-b781-f8dea10bcb4a/5ce3da24-912a-4207-af82-5dfc8a845714/malformed_daemon.py",
 line 8 in raise_segfault
      File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 1450 in 
main
      ...
    "/.../miniconda3/envs/python3.9/lib/python3.9/runpy.py", line 197 in 
_run_module_as_main
    
            at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:550)
            at
         ...
    java.base/java.io.DataInputStream.readInt(DataInputStream.java:393)
            at 
org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:92)
            ... 30 more
    
    Driver stacktrace:
    
    JVM stacktrace:
    org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 
in stage 0.0 failed 1 times, most recent failure: Lost task 4.0 in stage 0.0 
(TID 4) (192.168.123.102 executor driver): org.apache.spark.SparkException: 
Python worker exited unexpectedly (crashed): Fatal Python error: Segmentation 
fault
    
    Current thread 0x00007ff85d338700 (most recent call first):
      File "/.../miniconda3/envs/python3.9/lib/python3.9/ctypes/__init__.py", 
line 525 in string_at
    ...
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #43600 from HyukjinKwon/more-segfault.
    
    Authored-by: Hyukjin Kwon <gurwls...@apache.org>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index 840352eaf4a..d265bb2fd8b 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -542,12 +542,12 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
         logError("This may have been caused by a prior exception:", 
writer.exception.get)
         throw writer.exception.get
 
-      case eof: EOFException if faultHandlerEnabled && pid.isDefined &&
+      case e: IOException if faultHandlerEnabled && pid.isDefined &&
           JavaFiles.exists(BasePythonRunner.faultHandlerLogPath(pid.get)) =>
         val path = BasePythonRunner.faultHandlerLogPath(pid.get)
         val error = String.join("\n", JavaFiles.readAllLines(path)) + "\n"
         JavaFiles.deleteIfExists(path)
-        throw new SparkException(s"Python worker exited unexpectedly 
(crashed): $error", eof)
+        throw new SparkException(s"Python worker exited unexpectedly 
(crashed): $error", e)
 
       case eof: EOFException =>
         throw new SparkException("Python worker exited unexpectedly 
(crashed)", eof)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to