[ 
https://issues.apache.org/jira/browse/SPARK-46125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17790751#comment-17790751
 ] 

Josh Rosen commented on SPARK-46125:
------------------------------------

I think that this issue relates specifically to `createDataFrame` and other 
mechanisms for creating Datasets or RDDs from driver-side data.

I was able to reproduce the memory effects that you reported using a synthetic 
dataset:
{code:java}
n_rows = 1000000
data = np.random.randn(n_rows, n_cols)
pdf = pd.DataFrame(data, columns=[f'Column_{i}' for i in range(n_cols)])
{code}
I took heap dumps in the "with unpersist" and "without unpersist" cases and saw 
that most of the difference was due to `byte[]` arrays. That, in turn, is due 
to ParallelCollectionPartitions being kept alive in a ParallelCollectionRDD 
that is retained by the CacheManager.

When you cache a query, Spark keeps the physical query plan alive so that it 
can recompute cached data if it is lost (e.g. due to a node failure). For 
Datasets or RDDs that are created from data on the driver, that driver-side 
data is kept alive.

It's this CacheManager reference to the physical plan which is keeping the 
source RDD from being cleaned: this is why `del df` followed by GC does not 
clean up the RDD's memory.

---

If you use `localCheckpoint` then Spark will persist the data to disk and 
truncate the RDD lineage, thereby avoiding driver-side memory consumption from 
the parallel collection RDD, but this will have the side effect of removing 
fault-tolerance: if any node is lost then the data will be lost and any 
attempts to access it will result in query failures.

!image-2023-11-28-12-55-58-461.png!

> Memory leak when using createDataFrame with persist
> ---------------------------------------------------
>
>                 Key: SPARK-46125
>                 URL: https://issues.apache.org/jira/browse/SPARK-46125
>             Project: Spark
>          Issue Type: Bug
>          Components: Input/Output, PySpark
>    Affects Versions: 3.5.0
>            Reporter: Arman Yazdani
>            Priority: Major
>              Labels: PySpark, memory-leak, persist
>         Attachments: CreateDataFrameWithUnpersist.png, 
> CreateDataFrameWithoutUnpersist.png, ReadParquetWithoutUnpersist.png, 
> image-2023-11-28-12-55-58-461.png
>
>
> When I create a dataset from pandas data frame and persisting it (DISK_ONLY), 
> some "byte[]" objects (total size of imported data frame) will still remain 
> in the driver's heap memory.
> This is the sample code for reproducing it:
> {code:python}
> import pandas as pd
> import gc
> from pyspark.sql import SparkSession
> from pyspark.storagelevel import StorageLevel
> spark = SparkSession.builder \
>     .config("spark.driver.memory", "4g") \
>     .config("spark.executor.memory", "4g") \
>     .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
>     .getOrCreate()
> pdf = pd.read_pickle('tmp/input.pickle')
> df = spark.createDataFrame(pdf)
> df = df.persist(storageLevel=StorageLevel.DISK_ONLY)
> df.count()
> del pdf
> del df
> gc.collect()
> spark.sparkContext._jvm.System.gc(){code}
> After running this code, I will perform a manual GC in VisualVM, but the 
> driver memory usage will remain at 550 MBs (at start it was about 50 MBs).
> !CreateDataFrameWithoutUnpersist.png|width=467,height=349!
> Then I tested with adding {{"df = df.unpersist()"}} after the 
> {{"df.count()"}} line and everything was OK (Memory usage after performing 
> manual GC was about 50 MBs).
> !CreateDataFrameWithUnpersist.png|width=468,height=300!
> Also, I tried with reading from parquet file (without adding unpersist line) 
> with this code:
> {code:python}
> import gc
> from pyspark.sql import SparkSession
> from pyspark.storagelevel import StorageLevel
> spark = SparkSession.builder \
>     .config("spark.driver.memory", "4g") \
>     .config("spark.executor.memory", "4g") \
>     .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
>     .getOrCreate()
> df = spark.read.parquet('tmp/input.parquet')
> df = df.persist(storageLevel=StorageLevel.DISK_ONLY)
> df.count()
> del df
> gc.collect()
> spark.sparkContext._jvm.System.gc(){code}
> Again everything was fine and memory usage was about 50 MBs after performing 
> manual GC.
> !ReadParquetWithoutUnpersist.png|width=473,height=302!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to