This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 82f70f3fb9e7 [SPARK-54344][PYTHON] Kill the worker if flush fails in
daemon.py
82f70f3fb9e7 is described below
commit 82f70f3fb9e77d0dcca210316e8cb699f5fc4bb3
Author: Takuya Ueshin <[email protected]>
AuthorDate: Tue Nov 18 06:48:49 2025 +0900
[SPARK-54344][PYTHON] Kill the worker if flush fails in daemon.py
### What changes were proposed in this pull request?
Kills the worker if flush fails in `daemon.py`.
- Spark conf: `spark.python.daemon.killWorkerOnFlushFailure` (default
`true`)
- SQL conf:
`spark.sql.execution.pyspark.udf.daemonKillWorkerOnFlushFailure` (fallback to
the above)
Before it just dies, reuse `faulthandler` feature and record the thread
dump and it will appear in the error message if `faulthandler` is enabled.
```
WARN TaskSetManager: Lost task 3.0 in stage 1.0 (TID 8) (127.0.0.1 executor
1): org.apache.spark.SparkException: Python worker exited unexpectedly
(crashed): Current thread 0x00000001f0796140 (most recent call first):
File "/.../python/pyspark/daemon.py", line 95 in worker
File "/.../python/pyspark/daemon.py", line 228 in manager
File "/.../python/pyspark/daemon.py", line 253 in <module>
File "<frozen runpy>", line 88 in _run_code
File "<frozen runpy>", line 198 in _run_module_as_main
at
org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:679)
...
```
Even when `faulthandler` is not eabled, the error will appear in the
executor's `stderr` file.
```
Traceback (most recent call last):
File "/.../python/pyspark/daemon.py", line 228, in manager
code = worker(sock, authenticated)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/.../python/pyspark/daemon.py", line 88, in worker
raise Exception("test")
Exception: test
```
When this is disabled, the behavior is the same as before but with a log.
### Why are the changes needed?
Currently an exception caused by `outfile.flush()` failure in `daemon.py`
is ignored, but if the last command in `worker_main` is still not flushed, it
could cause a UDF stuck in Java waiting for the response from the Python worker.
It should just die and let Spark retry the task.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manually.
<details>
<summary>Test with the patch to emulate the case</summary>
```patch
% git diff
diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py
index 54c9507e625..e107216d769 100644
--- a/python/pyspark/daemon.py
+++ b/python/pyspark/daemon.py
-84,6 +84,8 def worker(sock, authenticated):
exit_code = compute_real_exit_code(exc.code)
finally:
try:
+ if worker_main.__globals__.get("TEST", False):
+ raise Exception("test")
outfile.flush()
except Exception:
faulthandler_log_path =
os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 6e34b041665..ff210f4fd97 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
-3413,7 +3413,14 def main(infile, outfile):
# check end of stream
if read_int(infile) == SpecialLengths.END_OF_STREAM:
- write_int(SpecialLengths.END_OF_STREAM, outfile)
+ import random
+
+ if random.random() < 0.1:
+ # emulate the last command is not flushed yet
+ global TEST
+ TEST = True
+ else:
+ write_int(SpecialLengths.END_OF_STREAM, outfile)
else:
# write a different value to tell JVM to not reuse this worker
write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
-3423,6 +3430,9 def main(infile, outfile):
faulthandler.cancel_dump_traceback_later()
+TEST = False
+
+ if __name__ == "__main__":
# Read information about how to connect back to the JVM from the
environment.
conn_info = os.environ.get(
```
</details>
With just `pass` (before this), it gets stuck, and after this it lets Spark
retry the task.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #53055 from ueshin/issues/SPARK-54344/daemon_flush.
Lead-authored-by: Takuya Ueshin <[email protected]>
Co-authored-by: Takuya UESHIN <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit ed23cc39c32e0ca953f174f48ddff78ba24da375)
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../scala/org/apache/spark/api/python/PythonRunner.scala | 8 +++++++-
.../scala/org/apache/spark/internal/config/Python.scala | 12 ++++++++++++
python/pyspark/daemon.py | 15 ++++++++++++++-
.../scala/org/apache/spark/sql/internal/SQLConf.scala | 11 +++++++++++
.../spark/sql/execution/python/ArrowPythonRunner.scala | 2 ++
.../sql/execution/python/ArrowPythonUDTFRunner.scala | 2 ++
.../sql/execution/python/CoGroupedArrowPythonRunner.scala | 2 ++
.../spark/sql/execution/python/PythonPlannerRunner.scala | 4 ++++
.../spark/sql/execution/python/PythonUDFRunner.scala | 2 ++
.../streaming/ApplyInPandasWithStatePythonRunner.scala | 2 ++
.../execution/python/streaming/PythonForeachWriter.scala | 2 ++
11 files changed, 60 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index 66e204fee44b..7f1dc7fc86fc 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -188,12 +188,15 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
protected val timelyFlushEnabled: Boolean = false
protected val timelyFlushTimeoutNanos: Long = 0
protected val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT)
+ private val useDaemon = conf.get(PYTHON_USE_DAEMON)
private val reuseWorker = conf.get(PYTHON_WORKER_REUSE)
protected val faultHandlerEnabled: Boolean =
conf.get(PYTHON_WORKER_FAULTHANLDER_ENABLED)
protected val idleTimeoutSeconds: Long =
conf.get(PYTHON_WORKER_IDLE_TIMEOUT_SECONDS)
protected val killOnIdleTimeout: Boolean =
conf.get(PYTHON_WORKER_KILL_ON_IDLE_TIMEOUT)
protected val tracebackDumpIntervalSeconds: Long =
conf.get(PYTHON_WORKER_TRACEBACK_DUMP_INTERVAL_SECONDS)
+ protected val killWorkerOnFlushFailure: Boolean =
+ conf.get(PYTHON_DAEMON_KILL_WORKER_ON_FLUSH_FAILURE)
protected val hideTraceback: Boolean = false
protected val simplifiedTraceback: Boolean = false
@@ -294,13 +297,16 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
if (tracebackDumpIntervalSeconds > 0L) {
envVars.put("PYTHON_TRACEBACK_DUMP_INTERVAL_SECONDS",
tracebackDumpIntervalSeconds.toString)
}
+ if (useDaemon && killWorkerOnFlushFailure) {
+ envVars.put("PYTHON_DAEMON_KILL_WORKER_ON_FLUSH_FAILURE", "1")
+ }
// allow the user to set the batch size for the BatchedSerializer on UDFs
envVars.put("PYTHON_UDF_BATCH_SIZE", batchSizeForPythonUDF.toString)
envVars.put("SPARK_JOB_ARTIFACT_UUID",
jobArtifactUUID.getOrElse("default"))
val (worker: PythonWorker, handle: Option[ProcessHandle]) =
env.createPythonWorker(
- pythonExec, workerModule, daemonModule, envVars.asScala.toMap)
+ pythonExec, workerModule, daemonModule, envVars.asScala.toMap, useDaemon)
// Whether is the worker released into idle pool or closed. When any codes
try to release or
// close a worker, they should use `releasedOrClosed.compareAndSet` to
flip the state to make
// sure there is only one winner that is going to release or close the
worker.
diff --git a/core/src/main/scala/org/apache/spark/internal/config/Python.scala
b/core/src/main/scala/org/apache/spark/internal/config/Python.scala
index de95e2fa1f7a..dc16d1ff255d 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/Python.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/Python.scala
@@ -138,4 +138,16 @@ private[spark] object Python {
.intConf
.checkValue(_ > 0, "If set, the idle worker max size must be > 0.")
.createOptional
+
+ val PYTHON_DAEMON_KILL_WORKER_ON_FLUSH_FAILURE =
+ ConfigBuilder("spark.python.daemon.killWorkerOnFlushFailure")
+ .doc("When enabled, exceptions raised during output flush operations in
the Python " +
+ "worker managed under Python daemon are not caught, causing the worker
to terminate " +
+ "with the exception. This allows Spark to detect the failure and
launch a new worker " +
+ "and retry the task. " +
+ "When disabled, flush exceptions are caught and logged but the worker
continues, " +
+ "which could cause the worker to get stuck due to protocol mismatch.")
+ .version("4.1.0")
+ .booleanConf
+ .createWithDefault(true)
}
diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py
index ca33ce2c39ef..e75eca68fd0e 100644
--- a/python/pyspark/daemon.py
+++ b/python/pyspark/daemon.py
@@ -24,6 +24,7 @@ import sys
import traceback
import time
import gc
+import faulthandler
from errno import EINTR, EAGAIN
from socket import AF_INET, AF_INET6, SOCK_STREAM, SOMAXCONN
from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN, SIGINT
@@ -85,7 +86,19 @@ def worker(sock, authenticated):
try:
outfile.flush()
except Exception:
- pass
+ if os.environ.get("PYTHON_DAEMON_KILL_WORKER_ON_FLUSH_FAILURE",
False):
+ faulthandler_log_path =
os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
+ if faulthandler_log_path:
+ faulthandler_log_path =
os.path.join(faulthandler_log_path, str(os.getpid()))
+ with open(faulthandler_log_path, "w") as
faulthandler_log_file:
+ faulthandler.dump_traceback(file=faulthandler_log_file)
+ raise
+ else:
+ print(
+ "PySpark daemon failed to flush the output to the worker
process:\n"
+ + traceback.format_exc(),
+ file=sys.stderr,
+ )
return exit_code
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index cda75f22323e..f6aae1d2ba05 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3913,6 +3913,14 @@ object SQLConf {
.version("4.1.0")
.fallbackConf(Python.PYTHON_WORKER_TRACEBACK_DUMP_INTERVAL_SECONDS)
+ val PYTHON_UDF_DAEMON_KILL_WORKER_ON_FLUSH_FAILURE =
+ buildConf("spark.sql.execution.pyspark.udf.daemonKillWorkerOnFlushFailure")
+ .doc(
+ s"Same as ${Python.PYTHON_DAEMON_KILL_WORKER_ON_FLUSH_FAILURE.key} " +
+ "for Python execution with DataFrame and SQL. It can change during
runtime.")
+ .version("4.1.0")
+ .fallbackConf(Python.PYTHON_DAEMON_KILL_WORKER_ON_FLUSH_FAILURE)
+
val PYTHON_WORKER_LOGGING_ENABLED =
buildConf("spark.sql.pyspark.worker.logging.enabled")
.doc("When set to true, this configuration enables comprehensive logging
within " +
@@ -7410,6 +7418,9 @@ class SQLConf extends Serializable with Logging with
SqlApiConf {
def pythonUDFWorkerTracebackDumpIntervalSeconds: Long =
getConf(PYTHON_UDF_WORKER_TRACEBACK_DUMP_INTERVAL_SECONDS)
+ def pythonUDFDaemonKillWorkerOnFlushFailure: Boolean =
+ getConf(PYTHON_UDF_DAEMON_KILL_WORKER_ON_FLUSH_FAILURE)
+
def pythonWorkerLoggingEnabled: Boolean =
getConf(PYTHON_WORKER_LOGGING_ENABLED)
def pythonUDFArrowConcurrencyLevel: Option[Int] =
getConf(PYTHON_UDF_ARROW_CONCURRENCY_LEVEL)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
index b94e00bc11ef..f5f968ee9522 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
@@ -60,6 +60,8 @@ abstract class BaseArrowPythonRunner[IN, OUT <: AnyRef](
override val killOnIdleTimeout: Boolean =
SQLConf.get.pythonUDFWorkerKillOnIdleTimeout
override val tracebackDumpIntervalSeconds: Long =
SQLConf.get.pythonUDFWorkerTracebackDumpIntervalSeconds
+ override val killWorkerOnFlushFailure: Boolean =
+ SQLConf.get.pythonUDFDaemonKillWorkerOnFlushFailure
override val errorOnDuplicatedFieldNames: Boolean = true
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonUDTFRunner.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonUDTFRunner.scala
index 7b73818bf0ec..1d5df9bad924 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonUDTFRunner.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonUDTFRunner.scala
@@ -83,6 +83,8 @@ class ArrowPythonUDTFRunner(
override val killOnIdleTimeout: Boolean =
SQLConf.get.pythonUDFWorkerKillOnIdleTimeout
override val tracebackDumpIntervalSeconds: Long =
SQLConf.get.pythonUDFWorkerTracebackDumpIntervalSeconds
+ override val killWorkerOnFlushFailure: Boolean =
+ SQLConf.get.pythonUDFDaemonKillWorkerOnFlushFailure
override val errorOnDuplicatedFieldNames: Boolean = true
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala
index 50013e533819..7f6efbae8881 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala
@@ -71,6 +71,8 @@ class CoGroupedArrowPythonRunner(
override val killOnIdleTimeout: Boolean =
SQLConf.get.pythonUDFWorkerKillOnIdleTimeout
override val tracebackDumpIntervalSeconds: Long =
SQLConf.get.pythonUDFWorkerTracebackDumpIntervalSeconds
+ override val killWorkerOnFlushFailure: Boolean =
+ SQLConf.get.pythonUDFDaemonKillWorkerOnFlushFailure
override val hideTraceback: Boolean = SQLConf.get.pysparkHideTraceback
override val simplifiedTraceback: Boolean =
SQLConf.get.pysparkSimplifiedTraceback
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonPlannerRunner.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonPlannerRunner.scala
index 0f4ac4ddad71..92e99cdc11d9 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonPlannerRunner.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonPlannerRunner.scala
@@ -58,6 +58,7 @@ abstract class PythonPlannerRunner[T](func: PythonFunction)
extends Logging {
val idleTimeoutSeconds: Long =
SQLConf.get.pythonUDFWorkerIdleTimeoutSeconds
val killOnIdleTimeout: Boolean =
SQLConf.get.pythonUDFWorkerKillOnIdleTimeout
val tracebackDumpIntervalSeconds: Long =
SQLConf.get.pythonUDFWorkerTracebackDumpIntervalSeconds
+ val killWorkerOnFlushFailure: Boolean =
SQLConf.get.pythonUDFDaemonKillWorkerOnFlushFailure
val hideTraceback: Boolean = SQLConf.get.pysparkHideTraceback
val simplifiedTraceback: Boolean = SQLConf.get.pysparkSimplifiedTraceback
val workerMemoryMb = SQLConf.get.pythonPlannerExecMemory
@@ -98,6 +99,9 @@ abstract class PythonPlannerRunner[T](func: PythonFunction)
extends Logging {
if (tracebackDumpIntervalSeconds > 0L) {
envVars.put("PYTHON_TRACEBACK_DUMP_INTERVAL_SECONDS",
tracebackDumpIntervalSeconds.toString)
}
+ if (useDaemon && killWorkerOnFlushFailure) {
+ envVars.put("PYTHON_DAEMON_KILL_WORKER_ON_FLUSH_FAILURE", "1")
+ }
envVars.put("SPARK_JOB_ARTIFACT_UUID",
jobArtifactUUID.getOrElse("default"))
sessionUUID.foreach { uuid =>
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
index 61f493deeee4..759aa998832d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
@@ -59,6 +59,8 @@ abstract class BasePythonUDFRunner(
override val killOnIdleTimeout: Boolean =
SQLConf.get.pythonUDFWorkerKillOnIdleTimeout
override val tracebackDumpIntervalSeconds: Long =
SQLConf.get.pythonUDFWorkerTracebackDumpIntervalSeconds
+ override val killWorkerOnFlushFailure: Boolean =
+ SQLConf.get.pythonUDFDaemonKillWorkerOnFlushFailure
override val bufferSize: Int =
SQLConf.get.getConf(SQLConf.PYTHON_UDF_BUFFER_SIZE)
override val batchSizeForPythonUDF: Int =
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStatePythonRunner.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStatePythonRunner.scala
index 51d9f6f523a2..14054ba89a94 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStatePythonRunner.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/ApplyInPandasWithStatePythonRunner.scala
@@ -79,6 +79,8 @@ class ApplyInPandasWithStatePythonRunner(
override val killOnIdleTimeout: Boolean =
SQLConf.get.pythonUDFWorkerKillOnIdleTimeout
override val tracebackDumpIntervalSeconds: Long =
SQLConf.get.pythonUDFWorkerTracebackDumpIntervalSeconds
+ override val killWorkerOnFlushFailure: Boolean =
+ SQLConf.get.pythonUDFDaemonKillWorkerOnFlushFailure
private val sqlConf = SQLConf.get
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonForeachWriter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonForeachWriter.scala
index 37716d2d8413..cc7745210a4d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonForeachWriter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonForeachWriter.scala
@@ -106,6 +106,8 @@ class PythonForeachWriter(func: PythonFunction, schema:
StructType)
override val killOnIdleTimeout: Boolean =
SQLConf.get.pythonUDFWorkerKillOnIdleTimeout
override val tracebackDumpIntervalSeconds: Long =
SQLConf.get.pythonUDFWorkerTracebackDumpIntervalSeconds
+ override val killWorkerOnFlushFailure: Boolean =
+ SQLConf.get.pythonUDFDaemonKillWorkerOnFlushFailure
override val hideTraceback: Boolean = SQLConf.get.pysparkHideTraceback
override val simplifiedTraceback: Boolean =
SQLConf.get.pysparkSimplifiedTraceback
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]