[ 
https://issues.apache.org/jira/browse/SPARK-46125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arman Yazdani updated SPARK-46125:
----------------------------------
    Description: 
When I create a dataset from pandas data frame and persisting it (DISK_ONLY), 
some "byte[]" objects (total size of imported data frame) will still remain in 
the driver's heap memory.

This is the sample code for reproducing it:
{code:python}
import pandas as pd
import gc

from pyspark.sql import SparkSession
from pyspark.storagelevel import StorageLevel

spark = SparkSession.builder \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()

pdf = pd.read_pickle('tmp/input.pickle')
df = spark.createDataFrame(pdf)
df = df.persist(storageLevel=StorageLevel.DISK_ONLY)
df.count()

del pdf
del df
gc.collect()
spark.sparkContext._jvm.System.gc(){code}
After running this code, I will perform a manual GC in VisualVM, but the driver 
memory usage will remain at 550 MBs (at start it was about 50 MBs).

!CreateDataFrameWithoutUnpersist.png|width=467,height=349!

Then I tested with adding {{"df = df.unpersist()"}} after the {{"df.count()"}} 
line and everything was OK (Memory usage after performing manual GC was about 
50 MBs).

!CreateDataFrameWithUnpersist.png|width=468,height=300!

Also, I tried with reading from parquet file (without adding unpersist line) 
with this code:
{code:python}
import gc

from pyspark.sql import SparkSession
from pyspark.storagelevel import StorageLevel

spark = SparkSession.builder \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()

df = spark.read.parquet('tmp/input.parquet')
df = df.persist(storageLevel=StorageLevel.DISK_ONLY)
df.count()

del df
gc.collect()
spark.sparkContext._jvm.System.gc(){code}
Again everything was fine and memory usage was about 50 MBs after performing 
manual GC.

!ReadParquetWithoutUnpersist.png|width=473,height=302!

  was:
When I create a dataset from pandas data frame and persisting it (DISK_ONLY), 
some "byte[]" objects (total size of imported data frame) will still remain in 
the driver's heap memory.

This is the sample code for reproducing it:
{code:python}
import pandas as pd
import gc

from pyspark.sql import SparkSession
from pyspark.storagelevel import StorageLevel

spark = SparkSession.builder \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()

pdf = pd.read_pickle('tmp/input.pickle')
df = spark.createDataFrame(pdf)
df = df.persist(storageLevel=StorageLevel.DISK_ONLY)
df.count()

del pdf
del df
gc.collect()
spark.sparkContext._jvm.System.gc(){code}
After running this code, I will perform a manual GC in VisualVM, but the driver 
memory usage will remain at 550 MBs (at start it was about 50 MBs).

Then I tested with adding {{"df = df.unpersist()"}} after the {{"df.count()"}} 
line and everything was OK (Memory usage after performing manual GC was about 
50 MBs).

Also, I tried with reading from parquet file (without adding unpersist line) 
with this code:
{code:python}
import gc

from pyspark.sql import SparkSession
from pyspark.storagelevel import StorageLevel

spark = SparkSession.builder \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()

df = spark.read.parquet('tmp/input.parquet')
df = df.persist(storageLevel=StorageLevel.DISK_ONLY)
df.count()

del df
gc.collect()
spark.sparkContext._jvm.System.gc(){code}
Again everything was fine and memory usage was about 50 MBs after performing 
manual GC.


> Memory leak when using createDataFrame with persist
> ---------------------------------------------------
>
>                 Key: SPARK-46125
>                 URL: https://issues.apache.org/jira/browse/SPARK-46125
>             Project: Spark
>          Issue Type: Bug
>          Components: Input/Output, PySpark
>    Affects Versions: 3.5.0
>            Reporter: Arman Yazdani
>            Priority: Major
>              Labels: PySpark, memory-leak, persist
>         Attachments: CreateDataFrameWithUnpersist.png, 
> CreateDataFrameWithoutUnpersist.png, ReadParquetWithoutUnpersist.png
>
>
> When I create a dataset from pandas data frame and persisting it (DISK_ONLY), 
> some "byte[]" objects (total size of imported data frame) will still remain 
> in the driver's heap memory.
> This is the sample code for reproducing it:
> {code:python}
> import pandas as pd
> import gc
> from pyspark.sql import SparkSession
> from pyspark.storagelevel import StorageLevel
> spark = SparkSession.builder \
>     .config("spark.driver.memory", "4g") \
>     .config("spark.executor.memory", "4g") \
>     .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
>     .getOrCreate()
> pdf = pd.read_pickle('tmp/input.pickle')
> df = spark.createDataFrame(pdf)
> df = df.persist(storageLevel=StorageLevel.DISK_ONLY)
> df.count()
> del pdf
> del df
> gc.collect()
> spark.sparkContext._jvm.System.gc(){code}
> After running this code, I will perform a manual GC in VisualVM, but the 
> driver memory usage will remain at 550 MBs (at start it was about 50 MBs).
> !CreateDataFrameWithoutUnpersist.png|width=467,height=349!
> Then I tested with adding {{"df = df.unpersist()"}} after the 
> {{"df.count()"}} line and everything was OK (Memory usage after performing 
> manual GC was about 50 MBs).
> !CreateDataFrameWithUnpersist.png|width=468,height=300!
> Also, I tried with reading from parquet file (without adding unpersist line) 
> with this code:
> {code:python}
> import gc
> from pyspark.sql import SparkSession
> from pyspark.storagelevel import StorageLevel
> spark = SparkSession.builder \
>     .config("spark.driver.memory", "4g") \
>     .config("spark.executor.memory", "4g") \
>     .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
>     .getOrCreate()
> df = spark.read.parquet('tmp/input.parquet')
> df = df.persist(storageLevel=StorageLevel.DISK_ONLY)
> df.count()
> del df
> gc.collect()
> spark.sparkContext._jvm.System.gc(){code}
> Again everything was fine and memory usage was about 50 MBs after performing 
> manual GC.
> !ReadParquetWithoutUnpersist.png|width=473,height=302!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to