[
https://issues.apache.org/jira/browse/SPARK-53759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18071119#comment-18071119
]
Antonio Blanco commented on SPARK-53759:
----------------------------------------
I was able to reproduce this issue on Windows and investigated further to find
the root cause.
The problem is in the simple-worker codepath, which Windows always uses because
{{os.fork()}} is unavailable. When the worker finishes processing a task, it
writes results to a buffered socket ({{BufferedRWPair}} with a 64KB buffer
created via {{sock.makefile()}}). The data sits in this buffer until someone
calls {{flush()}} — but the simple-worker path never does. It relies on
Python's garbage collector to flush the buffer during interpreter shutdown.
This worked on Python 3.11 because the buffer happened to be flushed before the
socket was torn down. On Python 3.12+, CPython [moved GC to the eval
breaker|https://github.com/python/cpython/issues/97922], which changed the
order objects are finalized during shutdown. Now the socket closes first, and
when the buffer tries to flush, the data is silently lost. The JVM, still
waiting to read results, sees {{EOFException}}.
The daemon path ({{daemon.py}}) doesn't have this problem because it already
has an explicit {{outfile.flush()}} in a {{finally}} block. The simple-worker
path was just missing the same pattern.
This appears to be resolved by [PR
#54458|https://github.com/apache/spark/pull/54458] (SPARK-55665), which unified
worker socket handling and added the missing flush. I tested every available
pre-release on PyPI and confirmed the fix landed in {{pyspark==4.2.0.dev3}}
(dev1 and dev2 are still affected). All current stable releases through 4.1.1
have the bug.
*Workarounds for anyone on a stable release:*
- Install the pre-release: {{pip install --pre pyspark==4.2.0.dev3}}
- Use Python 3.11 (the bug does not manifest on 3.11)
- On Linux/macOS, use the default daemon mode (only the simple-worker path is
affected)
I put together a reproducer with a test matrix and full root cause analysis
here: https://github.com/anblanco/spark53759-reproducer
> PySpark crashes with Python 3.12+ on Windows
> --------------------------------------------
>
> Key: SPARK-53759
> URL: https://issues.apache.org/jira/browse/SPARK-53759
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 3.5.5, 4.0.0, 4.0.1
> Environment: * PySpark 4.0.1, 4.0.0, and 3.5.5
> * Python 3.12 and 3.13 (the last working version is 3.11.13)
> * Java 17
> * Spark Classic (not Spark Connect) in local mode
> * All Windows machines we've tested
> * Tested with just PySpark installed, and Pandas (2.3.2) & PyArrow (21.0.0)
> installed
> * Tested installation via pip and conda
> Reporter: Max Payson
> Priority: Critical
>
> Python 3.12+ crashes locally on Windows when using the `createDataFrame` API.
> All dataframe creation methods in the [Quickstart:
> Dataframe|https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_df.html#DataFrame-Creation]
> seem to crash but other operations work as expected.
> Reproduction:
> {code:java}
> import os
> import sys
> from pyspark.sql import SparkSession
> os.environ["PYSPARK_PYTHON"] = sys.executable
> spark = SparkSession.builder.getOrCreate()
> df = spark.createDataFrame([(1,), (2,)], ["myint"])
> df.show() {code}
>
> Stack trace. This is with "spark.python.worker.faulthandler.enabled" enabled,
> but the stack trace is the same with it disabled:
> {code:java}
> Traceback (most recent call last):
> File "<your_script>.py", line 10, in <module>
> df.show()
> File "<pyspark>/sql/classic/dataframe.py", line 285, in show
> print(self._show_string(n, truncate, vertical))
> File "<pyspark>/sql/classic/dataframe.py", line 303, in _show_string
> return self._jdf.showString(n, 20, vertical)
> File "<py4j>/java_gateway.py", line 1362, in __call__
> return_value = get_return_value(
> answer, self.gateway_client, self.target_id, self.name)
> File "<pyspark>/errors/exceptions/captured.py", line 282, in deco
> return f(*a, **kw)
> File "<py4j>/protocol.py", line 327, in get_return_value
> raise Py4JJavaError(
> "An error occurred while calling {0}{1}{2}.\n".
> format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling o48.showString.:
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
> stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0
> (TID 0) (executor driver): org.apache.spark.SparkException: Python worker
> exited unexpectedly (crashed)
> at
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:624)
> at
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:599)
> at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:35)
> at
> org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:945)
> at
> org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:925)
> at
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:532)
> at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:601)
> at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
> at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
> Source)
> at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at
> org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)
> at
> org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:402)
> at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:901)
> at
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:901)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
> at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
> at org.apache.spark.scheduler.Task.run(Task.scala:147)
> at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:647)
> at
> org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
> at
> org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:650)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
> at java.base/java.lang.Thread.run(Thread.java:840)Caused by:
> java.io.EOFException
> at java.base/java.io.DataInputStream.readInt(DataInputStream.java:386)
> at
> org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:933)
> ... 26 more{code}
> Note, I originally posted in the Python 3.13 issue but it seemed better to
> create a new issue since that one was closed. Please let us know if our team
> can help debug this further, it seems relatively low level. Thank you!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]