[
https://issues.apache.org/jira/browse/SPARK-54055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Peter Andrew updated SPARK-54055:
---------------------------------
Description:
Each Spark Connect session that uses Python UDFs seems to leak one PySpark
`daemon` process. Over time, these can accumulate in the 100s until the
corresponding node or container goes OOM (although it can take a while as each
process doesn't use a lot of memory and some of those pages are shared).
{code:java}
spark 263 0.0 0.0 121424 59504 ? S 05:00 0:01 \_
/opt/spark/.venv/bin/python3 -m pyspark.daemon
spark 1515 0.0 0.0 121324 60148 ? S 05:04 0:01 \_
/opt/spark/.venv/bin/python3 -m pyspark.daemon
spark 1525 0.0 0.0 121324 60400 ? S 05:04 0:01 \_
/opt/spark/.venv/bin/python3 -m pyspark.daemon
spark 1568 0.0 0.0 121324 60280 ? S 05:04 0:01 \_
/opt/spark/.venv/bin/python3 -m pyspark.daemon{code}
In addition there are also threads leaking - e.g., here is a thread dump
histogram from a sample executor with 200+ leaked daemon processes:
{code:java}
# These threads seem to be leaking
226 threads Idle Worker Monitor for /opt/spark/.venv/bin/python3
226 threads process reaper
226 threads stderr reader for /opt/spark/.venv/bin/python3
226 threads stdout reader for /opt/spark/.venv/bin/python3
250 threads Worker Monitor for /opt/spark/.venv/bin/python3
# These threads seem fine, Spark is configured with 24 cores/executor
21 threads stdout writer for /opt/spark/.venv/bin/python3
21 threads Writer Monitor for /opt/spark/.venv/bin/python3{code}
With Spark Connect, each session always has a `SPARK_JOB_ARTIFACT_UUID`, even
if there are no artifacts, so the UDF environment built by
`BasePythonRunner.compute` is always different, and each session ends up with
its own `PythonWorkerFactory` and hence its own daemon process.
`PythonWorkerFactory` has a `stop` method that stops the daemon, but there does
not seem to be anyone that calls `PythonWorkerFactory.stop`, except at shutdown
in `SparkEnv.stop`.
This can be reproduced by running a bunch of Spark Connect sessions:
{code:java}
parallel -n0 .venv/bin/python3 dummy.py ::: {1..200} {code}
with `dummy.py`:
{code:java}
from collections.abc import Iterable
import pandas as pd
from pyspark.sql import SparkSession
def _udf(iterator: Iterable[pd.DataFrame]) -> Iterable[pd.DataFrame]:
yield from iterator
if __name__ == "__main__":
spark = SparkSession.builder.remote("...").getOrCreate()
df = spark.range(128)
df.mapInPandas(_udf, df.schema).count()
{code}
was:
Each Spark Connect session that uses Python UDFs seems to leak one PySpark
`daemon` process. Over time, these can accumulate in the 100s until the
corresponding node or container goes OOM.
{code:java}
spark 263 0.0 0.0 121424 59504 ? S 05:00 0:01 \_
/opt/spark/.venv/bin/python3 -m pyspark.daemon
spark 1515 0.0 0.0 121324 60148 ? S 05:04 0:01 \_
/opt/spark/.venv/bin/python3 -m pyspark.daemon
spark 1525 0.0 0.0 121324 60400 ? S 05:04 0:01 \_
/opt/spark/.venv/bin/python3 -m pyspark.daemon
spark 1568 0.0 0.0 121324 60280 ? S 05:04 0:01 \_
/opt/spark/.venv/bin/python3 -m pyspark.daemon{code}
In addition there are also threads leaking - e.g., here is a thread dump
histogram from a sample executor with 200+ leaked daemon processes:
{code:java}
# These threads seem to be leaking
226 threads Idle Worker Monitor for /opt/spark/.venv/bin/python3
226 threads process reaper
226 threads stderr reader for /opt/spark/.venv/bin/python3
226 threads stdout reader for /opt/spark/.venv/bin/python3
250 threads Worker Monitor for /opt/spark/.venv/bin/python3
# These threads seem fine, Spark is configured with 24 cores/executor
21 threads stdout writer for /opt/spark/.venv/bin/python3
21 threads Writer Monitor for /opt/spark/.venv/bin/python3{code}
With Spark Connect, each session always has a `SPARK_JOB_ARTIFACT_UUID`, even
if there are no artifacts, so the UDF environment built by
`BasePythonRunner.compute` is always different, and each session ends up with
its own `PythonWorkerFactory` and hence its own daemon process.
`PythonWorkerFactory` has a `stop` method that stops the daemon, but there does
not seem to be anyone that calls `PythonWorkerFactory.stop`, except at shutdown
in `SparkEnv.stop`.
This can be reproduced by running a bunch of Spark Connect sessions:
{code:java}
parallel -n0 .venv/bin/python3 dummy.py ::: {1..200} {code}
with `dummy.py`:
{code:java}
from collections.abc import Iterable
import pandas as pd
from pyspark.sql import SparkSession
def _udf(iterator: Iterable[pd.DataFrame]) -> Iterable[pd.DataFrame]:
yield from iterator
if __name__ == "__main__":
spark = SparkSession.builder.remote("...").getOrCreate()
df = spark.range(128)
df.mapInPandas(_udf, df.schema).count()
{code}
> Spark Connect sessions leak pyspark UDF daemon processes and threads
> --------------------------------------------------------------------
>
> Key: SPARK-54055
> URL: https://issues.apache.org/jira/browse/SPARK-54055
> Project: Spark
> Issue Type: Bug
> Components: Connect, PySpark
> Affects Versions: 3.5.5
> Reporter: Peter Andrew
> Priority: Major
>
> Each Spark Connect session that uses Python UDFs seems to leak one PySpark
> `daemon` process. Over time, these can accumulate in the 100s until the
> corresponding node or container goes OOM (although it can take a while as
> each process doesn't use a lot of memory and some of those pages are shared).
> {code:java}
> spark 263 0.0 0.0 121424 59504 ? S 05:00 0:01 \_
> /opt/spark/.venv/bin/python3 -m pyspark.daemon
> spark 1515 0.0 0.0 121324 60148 ? S 05:04 0:01 \_
> /opt/spark/.venv/bin/python3 -m pyspark.daemon
> spark 1525 0.0 0.0 121324 60400 ? S 05:04 0:01 \_
> /opt/spark/.venv/bin/python3 -m pyspark.daemon
> spark 1568 0.0 0.0 121324 60280 ? S 05:04 0:01 \_
> /opt/spark/.venv/bin/python3 -m pyspark.daemon{code}
> In addition there are also threads leaking - e.g., here is a thread dump
> histogram from a sample executor with 200+ leaked daemon processes:
> {code:java}
> # These threads seem to be leaking
> 226 threads Idle Worker Monitor for /opt/spark/.venv/bin/python3
> 226 threads process reaper
> 226 threads stderr reader for /opt/spark/.venv/bin/python3
> 226 threads stdout reader for /opt/spark/.venv/bin/python3
> 250 threads Worker Monitor for /opt/spark/.venv/bin/python3
> # These threads seem fine, Spark is configured with 24 cores/executor
> 21 threads stdout writer for /opt/spark/.venv/bin/python3
> 21 threads Writer Monitor for /opt/spark/.venv/bin/python3{code}
> With Spark Connect, each session always has a `SPARK_JOB_ARTIFACT_UUID`, even
> if there are no artifacts, so the UDF environment built by
> `BasePythonRunner.compute` is always different, and each session ends up with
> its own `PythonWorkerFactory` and hence its own daemon process.
> `PythonWorkerFactory` has a `stop` method that stops the daemon, but there
> does not seem to be anyone that calls `PythonWorkerFactory.stop`, except at
> shutdown in `SparkEnv.stop`.
>
> This can be reproduced by running a bunch of Spark Connect sessions:
> {code:java}
> parallel -n0 .venv/bin/python3 dummy.py ::: {1..200} {code}
> with `dummy.py`:
> {code:java}
> from collections.abc import Iterable
> import pandas as pd
> from pyspark.sql import SparkSession
> def _udf(iterator: Iterable[pd.DataFrame]) -> Iterable[pd.DataFrame]:
> yield from iterator
> if __name__ == "__main__":
> spark = SparkSession.builder.remote("...").getOrCreate()
> df = spark.range(128)
> df.mapInPandas(_udf, df.schema).count()
> {code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]