This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 82f70f3fb9e7 [SPARK-54344][PYTHON] Kill the worker if flush fails in 
daemon.py
82f70f3fb9e7 is described below

commit 82f70f3fb9e77d0dcca210316e8cb699f5fc4bb3
Author: Takuya Ueshin <[email protected]>
AuthorDate: Tue Nov 18 06:48:49 2025 +0900

    [SPARK-54344][PYTHON] Kill the worker if flush fails in daemon.py
    
    ### What changes were proposed in this pull request?
    
    Kills the worker if flush fails in `daemon.py`.
    
    - Spark conf: `spark.python.daemon.killWorkerOnFlushFailure` (default 
`true`)
    - SQL conf: 
`spark.sql.execution.pyspark.udf.daemonKillWorkerOnFlushFailure` (fallback to 
the above)
    
    Before it just dies, reuse `faulthandler` feature and record the thread 
dump and it will appear in the error message if `faulthandler` is enabled.
    
    ```
    WARN TaskSetManager: Lost task 3.0 in stage 1.0 (TID 8) (127.0.0.1 executor 
1): org.apache.spark.SparkException: Python worker exited unexpectedly 
(crashed): Current thread 0x00000001f0796140 (most recent call first):
      File "/.../python/pyspark/daemon.py", line 95 in worker
      File "/.../python/pyspark/daemon.py", line 228 in manager
      File "/.../python/pyspark/daemon.py", line 253 in <module>
      File "<frozen runpy>", line 88 in _run_code
      File "<frozen runpy>", line 198 in _run_module_as_main
    
            at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:679)
    ...
    ```
    
    Even when `faulthandler` is not eabled, the error will appear in the 
executor's `stderr` file.
    
    ```
    Traceback (most recent call last):
      File "/.../python/pyspark/daemon.py", line 228, in manager
        code = worker(sock, authenticated)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/.../python/pyspark/daemon.py", line 88, in worker
        raise Exception("test")
    Exception: test
    ```
    
    When this is disabled, the behavior is the same as before but with a log.
    
    ### Why are the changes needed?
    
    Currently an exception caused by `outfile.flush()` failure in `daemon.py` 
is ignored, but if the last command in `worker_main` is still not flushed, it 
could cause a UDF stuck in Java waiting for the response from the Python worker.
    
    It should just die and let Spark retry the task.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Manually.
    
    <details>
    <summary>Test with the patch to emulate the case</summary>
    
    ```patch
    % git diff
    diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py
    index 54c9507e625..e107216d769 100644
    --- a/python/pyspark/daemon.py
    +++ b/python/pyspark/daemon.py
     -84,6 +84,8  def worker(sock, authenticated):
             exit_code = compute_real_exit_code(exc.code)
         finally:
             try:
    +            if worker_main.__globals__.get("TEST", False):
    +                raise Exception("test")
                 outfile.flush()
             except Exception:
                 faulthandler_log_path = 
os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
    diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
    index 6e34b041665..ff210f4fd97 100644
    --- a/python/pyspark/worker.py
    +++ b/python/pyspark/worker.py
     -3413,7 +3413,14  def main(infile, outfile):
    
         # check end of stream
         if read_int(infile) == SpecialLengths.END_OF_STREAM:
    -        write_int(SpecialLengths.END_OF_STREAM, outfile)
    +        import random
    +
    +        if random.random() < 0.1:
    +            # emulate the last command is not flushed yet
    +            global TEST
    +            TEST = True
    +        else:
    +            write_int(SpecialLengths.END_OF_STREAM, outfile)
         else:
             # write a different value to tell JVM to not reuse this worker
             write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
     -3423,6 +3430,9  def main(infile, outfile):
         faulthandler.cancel_dump_traceback_later()
    
    +TEST = False
    +
    + if __name__ == "__main__":
         # Read information about how to connect back to the JVM from the 
environment.
         conn_info = os.environ.get(
    ```
    
    </details>
    
    With just `pass` (before this), it gets stuck, and after this it lets Spark 
retry the task.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #53055 from ueshin/issues/SPARK-54344/daemon_flush.
    
    Lead-authored-by: Takuya Ueshin <[email protected]>
    Co-authored-by: Takuya UESHIN <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
    (cherry picked from commit ed23cc39c32e0ca953f174f48ddff78ba24da375)
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../scala/org/apache/spark/api/python/PythonRunner.scala  |  8 +++++++-
 .../scala/org/apache/spark/internal/config/Python.scala   | 12 ++++++++++++
 python/pyspark/daemon.py                                  | 15 ++++++++++++++-
 .../scala/org/apache/spark/sql/internal/SQLConf.scala     | 11 +++++++++++
 .../spark/sql/execution/python/ArrowPythonRunner.scala    |  2 ++
 .../sql/execution/python/ArrowPythonUDTFRunner.scala      |  2 ++
 .../sql/execution/python/CoGroupedArrowPythonRunner.scala |  2 ++
 .../spark/sql/execution/python/PythonPlannerRunner.scala  |  4 ++++
 .../spark/sql/execution/python/PythonUDFRunner.scala      |  2 ++
 .../streaming/ApplyInPandasWithStatePythonRunner.scala    |  2 ++
 .../execution/python/streaming/PythonForeachWriter.scala  |  2 ++
 11 files changed, 60 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index 66e204fee44b..7f1dc7fc86fc 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -188,12 +188,15 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
   protected val timelyFlushEnabled: Boolean = false
   protected val timelyFlushTimeoutNanos: Long = 0
   protected val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT)
+  private val useDaemon = conf.get(PYTHON_USE_DAEMON)
   private val reuseWorker = conf.get(PYTHON_WORKER_REUSE)
   protected val faultHandlerEnabled: Boolean = 
conf.get(PYTHON_WORKER_FAULTHANLDER_ENABLED)
   protected val idleTimeoutSeconds: Long = 
conf.get(PYTHON_WORKER_IDLE_TIMEOUT_SECONDS)
   protected val killOnIdleTimeout: Boolean = 
conf.get(PYTHON_WORKER_KILL_ON_IDLE_TIMEOUT)
   protected val tracebackDumpIntervalSeconds: Long =
     conf.get(PYTHON_WORKER_TRACEBACK_DUMP_INTERVAL_SECONDS)
+  protected val killWorkerOnFlushFailure: Boolean =
+     conf.get(PYTHON_DAEMON_KILL_WORKER_ON_FLUSH_FAILURE)
   protected val hideTraceback: Boolean = false
   protected val simplifiedTraceback: Boolean = false
 
@@ -294,13 +297,16 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
     if (tracebackDumpIntervalSeconds > 0L) {
       envVars.put("PYTHON_TRACEBACK_DUMP_INTERVAL_SECONDS", 
tracebackDumpIntervalSeconds.toString)
     }
+    if (useDaemon && killWorkerOnFlushFailure) {
+      envVars.put("PYTHON_DAEMON_KILL_WORKER_ON_FLUSH_FAILURE", "1")
+    }
     // allow the user to set the batch size for the BatchedSerializer on UDFs
     envVars.put("PYTHON_UDF_BATCH_SIZE", batchSizeForPythonUDF.toString)
 
     envVars.put("SPARK_JOB_ARTIFACT_UUID", 
jobArtifactUUID.getOrElse("default"))
 
     val (worker: PythonWorker, handle: Option[ProcessHandle]) = 
env.createPythonWorker(
-      pythonExec, workerModule, daemonModule, envVars.asScala.toMap)
+      pythonExec, workerModule, daemonModule, envVars.asScala.toMap, useDaemon)
     // Whether is the worker released into idle pool or closed. When any codes 
try to release or
     // close a worker, they should use `releasedOrClosed.compareAndSet` to 
flip the state to make
     // sure there is only one winner that is going to release or close the 
worker.
diff --git a/core/src/main/scala/org/apache/spark/internal/config/Python.scala 
b/core/src/main/scala/org/apache/spark/internal/config/Python.scala
index de95e2fa1f7a..dc16d1ff255d 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/Python.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/Python.scala
@@ -138,4 +138,16 @@ private[spark] object Python {
       .intConf
       .checkValue(_ > 0, "If set, the idle worker max size must be > 0.")
       .createOptional
+
+  val PYTHON_DAEMON_KILL_WORKER_ON_FLUSH_FAILURE =
+    ConfigBuilder("spark.python.daemon.killWorkerOnFlushFailure")
+      .doc("When enabled, exceptions raised during output flush operations in 
the Python " +
+        "worker managed under Python daemon are not caught, causing the worker 
to terminate " +
+        "with the exception. This allows Spark to detect the failure and 
launch a new worker " +
+        "and retry the task. " +
+        "When disabled, flush exceptions are caught and logged but the worker 
continues, " +
+        "which could cause the worker to get stuck due to protocol mismatch.")
+      .version("4.1.0")
+      .booleanConf
+      .createWithDefault(true)
 }
diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py
index ca33ce2c39ef..e75eca68fd0e 100644
--- a/python/pyspark/daemon.py
+++ b/python/pyspark/daemon.py
@@ -24,6 +24,7 @@ import sys
 import traceback
 import time
 import gc
+import faulthandler
 from errno import EINTR, EAGAIN
 from socket import AF_INET, AF_INET6, SOCK_STREAM, SOMAXCONN
 from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN, SIGINT
@@ -85,7 +86,19 @@ def worker(sock, authenticated):
         try:
             outfile.flush()
         except Exception:
-            pass
+            if os.environ.get("PYTHON_DAEMON_KILL_WORKER_ON_FLUSH_FAILURE", 
False):
+                faulthandler_log_path = 
os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
+                if faulthandler_log_path:
+                    faulthandler_log_path = 
os.path.join(faulthandler_log_path, str(os.getpid()))
+                    with open(faulthandler_log_path, "w") as 
faulthandler_log_file:
+                        faulthandler.dump_traceback(file=faulthandler_log_file)
+                raise
+            else:
+                print(
+                    "PySpark daemon failed to flush the output to the worker 
process:\n"
+                    + traceback.format_exc(),
+                    file=sys.stderr,
+                )
     return exit_code
 
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index cda75f22323e..f6aae1d2ba05 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3913,6 +3913,14 @@ object SQLConf {
       .version("4.1.0")
       .fallbackConf(Python.PYTHON_WORKER_TRACEBACK_DUMP_INTERVAL_SECONDS)
 
+  val PYTHON_UDF_DAEMON_KILL_WORKER_ON_FLUSH_FAILURE =
+    buildConf("spark.sql.execution.pyspark.udf.daemonKillWorkerOnFlushFailure")
+      .doc(
+        s"Same as ${Python.PYTHON_DAEMON_KILL_WORKER_ON_FLUSH_FAILURE.key} " +
+          "for Python execution with DataFrame and SQL. It can change during 
runtime.")
+      .version("4.1.0")
+      .fallbackConf(Python.PYTHON_DAEMON_KILL_WORKER_ON_FLUSH_FAILURE)
+
   val PYTHON_WORKER_LOGGING_ENABLED =
     buildConf("spark.sql.pyspark.worker.logging.enabled")
       .doc("When set to true, this configuration enables comprehensive logging 
within " +
@@ -7410,6 +7418,9 @@ class SQLConf extends Serializable with Logging with 
SqlApiConf {
   def pythonUDFWorkerTracebackDumpIntervalSeconds: Long =
     getConf(PYTHON_UDF_WORKER_TRACEBACK_DUMP_INTERVAL_SECONDS)
 
+  def pythonUDFDaemonKillWorkerOnFlushFailure: Boolean =
+    getConf(PYTHON_UDF_DAEMON_KILL_WORKER_ON_FLUSH_FAILURE)
+
   def pythonWorkerLoggingEnabled: Boolean = 
getConf(PYTHON_WORKER_LOGGING_ENABLED)
 
   def pythonUDFArrowConcurrencyLevel: Option[Int] = 
getConf(PYTHON_UDF_ARROW_CONCURRENCY_LEVEL)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
index b94e00bc11ef..f5f968ee9522 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
@@ -60,6 +60,8 @@ abstract class BaseArrowPythonRunner[IN, OUT <: AnyRef](
   override val killOnIdleTimeout: Boolean = 
SQLConf.get.pythonUDFWorkerKillOnIdleTimeout
   override val tracebackDumpIntervalSeconds: Long =
     SQLConf.get.pythonUDFWorkerTracebackDumpIntervalSeconds
+  override val killWorkerOnFlushFailure: Boolean =
+    SQLConf.get.pythonUDFDaemonKillWorkerOnFlushFailure
 
   override val errorOnDuplicatedFieldNames: Boolean = true
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonUDTFRunner.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonUDTFRunner.scala
index 7b73818bf0ec..1d5df9bad924 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonUDTFRunner.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonUDTFRunner.scala
@@ -83,6 +83,8 @@ class ArrowPythonUDTFRunner(
   override val killOnIdleTimeout: Boolean = 
SQLConf.get.pythonUDFWorkerKillOnIdleTimeout
   override val tracebackDumpIntervalSeconds: Long =
     SQLConf.get.pythonUDFWorkerTracebackDumpIntervalSeconds
+  override val killWorkerOnFlushFailure: Boolean =
+    SQLConf.get.pythonUDFDaemonKillWorkerOnFlushFailure
 
   override val errorOnDuplicatedFieldNames: Boolean = true
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala
index 50013e533819..7f6efbae8881 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala
@@ -71,6 +71,8 @@ class CoGroupedArrowPythonRunner(
   override val killOnIdleTimeout: Boolean = 
SQLConf.get.pythonUDFWorkerKillOnIdleTimeout
   override val tracebackDumpIntervalSeconds: Long =
     SQLConf.get.pythonUDFWorkerTracebackDumpIntervalSeconds
+  override val killWorkerOnFlushFailure: Boolean =
+    SQLConf.get.pythonUDFDaemonKillWorkerOnFlushFailure
 
   override val hideTraceback: Boolean = SQLConf.get.pysparkHideTraceback
   override val simplifiedTraceback: Boolean = 
SQLConf.get.pysparkSimplifiedTraceback
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonPlannerRunner.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonPlannerRunner.scala
index 0f4ac4ddad71..92e99cdc11d9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonPlannerRunner.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonPlannerRunner.scala
@@ -58,6 +58,7 @@ abstract class PythonPlannerRunner[T](func: PythonFunction) 
extends Logging {
     val idleTimeoutSeconds: Long = 
SQLConf.get.pythonUDFWorkerIdleTimeoutSeconds
     val killOnIdleTimeout: Boolean = 
SQLConf.get.pythonUDFWorkerKillOnIdleTimeout
     val tracebackDumpIntervalSeconds: Long = 
SQLConf.get.pythonUDFWorkerTracebackDumpIntervalSeconds
+    val killWorkerOnFlushFailure: Boolean = 
SQLConf.get.pythonUDFDaemonKillWorkerOnFlushFailure
     val hideTraceback: Boolean = SQLConf.get.pysparkHideTraceback
     val simplifiedTraceback: Boolean = SQLConf.get.pysparkSimplifiedTraceback
     val workerMemoryMb = SQLConf.get.pythonPlannerExecMemory
@@ -98,6 +99,9 @@ abstract class PythonPlannerRunner[T](func: PythonFunction) 
extends Logging {
     if (tracebackDumpIntervalSeconds > 0L) {
       envVars.put("PYTHON_TRACEBACK_DUMP_INTERVAL_SECONDS", 
tracebackDumpIntervalSeconds.toString)
     }
+    if (useDaemon && killWorkerOnFlushFailure) {
+      envVars.put("PYTHON_DAEMON_KILL_WORKER_ON_FLUSH_FAILURE", "1")
+    }
 
     envVars.put("SPARK_JOB_ARTIFACT_UUID", 
jobArtifactUUID.getOrElse("default"))
     sessionUUID.foreach { uuid =>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
index 61f493deeee4..759aa998832d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
@@ -59,6 +59,8 @@ abstract class BasePythonUDFRunner(
   override val killOnIdleTimeout: Boolean = 
SQLConf.get.pythonUDFWorkerKillOnIdleTimeout
   override val tracebackDumpIntervalSeconds: Long =
     SQLConf.get.pythonUDFWorkerTracebackDumpIntervalSeconds
+  override val killWorkerOnFlushFailure: Boolean =
+    SQLConf.get.pythonUDFDaemonKillWorkerOnFlushFailure
 
   override val bufferSize: Int = 
SQLConf.get.getConf(SQLConf.PYTHON_UDF_BUFFER_SIZE)
   override val batchSizeForPythonUDF: Int =
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStatePythonRunner.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStatePythonRunner.scala
index 51d9f6f523a2..14054ba89a94 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStatePythonRunner.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStatePythonRunner.scala
@@ -79,6 +79,8 @@ class ApplyInPandasWithStatePythonRunner(
   override val killOnIdleTimeout: Boolean = 
SQLConf.get.pythonUDFWorkerKillOnIdleTimeout
   override val tracebackDumpIntervalSeconds: Long =
     SQLConf.get.pythonUDFWorkerTracebackDumpIntervalSeconds
+  override val killWorkerOnFlushFailure: Boolean =
+    SQLConf.get.pythonUDFDaemonKillWorkerOnFlushFailure
 
   private val sqlConf = SQLConf.get
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonForeachWriter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonForeachWriter.scala
index 37716d2d8413..cc7745210a4d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonForeachWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonForeachWriter.scala
@@ -106,6 +106,8 @@ class PythonForeachWriter(func: PythonFunction, schema: 
StructType)
       override val killOnIdleTimeout: Boolean = 
SQLConf.get.pythonUDFWorkerKillOnIdleTimeout
       override val tracebackDumpIntervalSeconds: Long =
         SQLConf.get.pythonUDFWorkerTracebackDumpIntervalSeconds
+      override val killWorkerOnFlushFailure: Boolean =
+        SQLConf.get.pythonUDFDaemonKillWorkerOnFlushFailure
 
       override val hideTraceback: Boolean = SQLConf.get.pysparkHideTraceback
       override val simplifiedTraceback: Boolean = 
SQLConf.get.pysparkSimplifiedTraceback


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to