[ https://issues.apache.org/jira/browse/SPARK-46125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17790751#comment-17790751 ]
Josh Rosen commented on SPARK-46125: ------------------------------------ I think that this issue relates specifically to `createDataFrame` and other mechanisms for creating Datasets or RDDs from driver-side data. I was able to reproduce the memory effects that you reported using a synthetic dataset: {code:java} n_rows = 1000000 data = np.random.randn(n_rows, n_cols) pdf = pd.DataFrame(data, columns=[f'Column_{i}' for i in range(n_cols)]) {code} I took heap dumps in the "with unpersist" and "without unpersist" cases and saw that most of the difference was due to `byte[]` arrays. That, in turn, is due to ParallelCollectionPartitions being kept alive in a ParallelCollectionRDD that is retained by the CacheManager. When you cache a query, Spark keeps the physical query plan alive so that it can recompute cached data if it is lost (e.g. due to a node failure). For Datasets or RDDs that are created from data on the driver, that driver-side data is kept alive. It's this CacheManager reference to the physical plan which is keeping the source RDD from being cleaned: this is why `del df` followed by GC does not clean up the RDD's memory. --- If you use `localCheckpoint` then Spark will persist the data to disk and truncate the RDD lineage, thereby avoiding driver-side memory consumption from the parallel collection RDD, but this will have the side effect of removing fault-tolerance: if any node is lost then the data will be lost and any attempts to access it will result in query failures. !image-2023-11-28-12-55-58-461.png! > Memory leak when using createDataFrame with persist > --------------------------------------------------- > > Key: SPARK-46125 > URL: https://issues.apache.org/jira/browse/SPARK-46125 > Project: Spark > Issue Type: Bug > Components: Input/Output, PySpark > Affects Versions: 3.5.0 > Reporter: Arman Yazdani > Priority: Major > Labels: PySpark, memory-leak, persist > Attachments: CreateDataFrameWithUnpersist.png, > CreateDataFrameWithoutUnpersist.png, ReadParquetWithoutUnpersist.png, > image-2023-11-28-12-55-58-461.png > > > When I create a dataset from pandas data frame and persisting it (DISK_ONLY), > some "byte[]" objects (total size of imported data frame) will still remain > in the driver's heap memory. > This is the sample code for reproducing it: > {code:python} > import pandas as pd > import gc > from pyspark.sql import SparkSession > from pyspark.storagelevel import StorageLevel > spark = SparkSession.builder \ > .config("spark.driver.memory", "4g") \ > .config("spark.executor.memory", "4g") \ > .config("spark.sql.execution.arrow.pyspark.enabled", "true") \ > .getOrCreate() > pdf = pd.read_pickle('tmp/input.pickle') > df = spark.createDataFrame(pdf) > df = df.persist(storageLevel=StorageLevel.DISK_ONLY) > df.count() > del pdf > del df > gc.collect() > spark.sparkContext._jvm.System.gc(){code} > After running this code, I will perform a manual GC in VisualVM, but the > driver memory usage will remain at 550 MBs (at start it was about 50 MBs). > !CreateDataFrameWithoutUnpersist.png|width=467,height=349! > Then I tested with adding {{"df = df.unpersist()"}} after the > {{"df.count()"}} line and everything was OK (Memory usage after performing > manual GC was about 50 MBs). > !CreateDataFrameWithUnpersist.png|width=468,height=300! > Also, I tried with reading from parquet file (without adding unpersist line) > with this code: > {code:python} > import gc > from pyspark.sql import SparkSession > from pyspark.storagelevel import StorageLevel > spark = SparkSession.builder \ > .config("spark.driver.memory", "4g") \ > .config("spark.executor.memory", "4g") \ > .config("spark.sql.execution.arrow.pyspark.enabled", "true") \ > .getOrCreate() > df = spark.read.parquet('tmp/input.parquet') > df = df.persist(storageLevel=StorageLevel.DISK_ONLY) > df.count() > del df > gc.collect() > spark.sparkContext._jvm.System.gc(){code} > Again everything was fine and memory usage was about 50 MBs after performing > manual GC. > !ReadParquetWithoutUnpersist.png|width=473,height=302! -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org