Peter Andrew created SPARK-54055:
------------------------------------

             Summary: Spark Connect sessions leak pyspark UDF daemon processes 
and threads
                 Key: SPARK-54055
                 URL: https://issues.apache.org/jira/browse/SPARK-54055
             Project: Spark
          Issue Type: Bug
          Components: Connect, PySpark
    Affects Versions: 3.5.5
            Reporter: Peter Andrew


Each Spark Connect session that uses Python UDFs seems to leak one PySpark 
`daemon` process. Over time, these can accumulate in the 100s until the 
corresponding node or container goes OOM.

 
{code:java}
spark        263  0.0  0.0 121424 59504 ?        S    05:00   0:01  \_ 
/opt/spark/.venv/bin/python3 -m pyspark.daemon
spark       1515  0.0  0.0 121324 60148 ?        S    05:04   0:01  \_ 
/opt/spark/.venv/bin/python3 -m pyspark.daemon
spark       1525  0.0  0.0 121324 60400 ?        S    05:04   0:01  \_ 
/opt/spark/.venv/bin/python3 -m pyspark.daemon
spark       1568  0.0  0.0 121324 60280 ?        S    05:04   0:01  \_ 
/opt/spark/.venv/bin/python3 -m pyspark.daemon{code}
 

In addition there are also threads leaking - e.g., here is a thread dump 
histogram from a sample executor with 200+ leaked daemon processes:

 
{code:java}
# These threads seem to be leaking
226 threads   Idle Worker Monitor for /opt/spark/.venv/bin/python3
226 threads   process reaper
226 threads   stderr reader for /opt/spark/.venv/bin/python3
226 threads   stdout reader for /opt/spark/.venv/bin/python3
250 threads   Worker Monitor for /opt/spark/.venv/bin/python3

# These threads seem fine, Spark is configured with 24 cores/executor
21 threads    stdout writer for /opt/spark/.venv/bin/python3
21 threads    Writer Monitor for /opt/spark/.venv/bin/python3{code}
 

 

This can be reproduced by running a bunch of Spark Connect sessions:
{code:java}
parallel -n0 .venv/bin/python3 dummy.py ::: {1..200} {code}
 

with `dummy.py`:

 
{code:java}
from collections.abc import Iterable

import pandas as pd
from pyspark.sql import SparkSession


def _udf(iterator: Iterable[pd.DataFrame]) -> Iterable[pd.DataFrame]:
    yield from iterator


if __name__ == "__main__":
    spark = SparkSession.builder.remote("...").getOrCreate()

    df = spark.range(128)
    df.mapInPandas(_udf, df.schema).count()
 {code}
 

With Spark Connect, each session always has a `SPARK_JOB_ARTIFACT_UUID`, even 
if there are no artifacts, so the UDF environment built by 
`BasePythonRunner.compute` is always different, and each session ends up with 
its own `PythonWorkerFactory` and hence its own daemon process.

`PythonWorkerFactory` has a `stop` method that stops the daemon, but there does 
not seem to be anyone that calls `PythonWorkerFactory.stop`, except at shutdown 
in `SparkEnv.stop`.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to