I did check the execution plan, there were 2 stages and both stages show that 
the pandas UDF (which takes almost all the computation time of the DataFrame) 
is executed.

It didn't seem to be an issue of repartition/coalesce as the DataFrame was 
still computed twice after removing coalesce.




Best,

----

Ben Du

Personal Blog<http://www.legendu.net/> | GitHub<https://github.com/dclong/> | 
Bitbucket<https://bitbucket.org/dclong/> | Docker 
Hub<https://hub.docker.com/r/dclong/>

________________________________
From: Gourav Sengupta <gourav.sengu...@gmail.com>
Sent: Sunday, January 30, 2022 1:08 AM
To: sebastian....@gmail.com <sebastian....@gmail.com>
Cc: Benjamin Du <legendu....@outlook.com>; u...@spark.incubator.apache.org 
<u...@spark.incubator.apache.org>
Subject: Re: A Persisted Spark DataFrame is computed twice

Hi,

without getting into suppositions, the best option is to look into the SPARK UI 
SQL section.

It is the most wonderful tool to explain what is happening, and why. In SPARK 
3.x they have made the UI even better, with different set of granularity and 
details.

On another note, you might want to read the difference between repartition and 
coalesce before making any kind of assumptions.


Regards,
Gourav Sengupta

On Sun, Jan 30, 2022 at 8:52 AM Sebastian Piu 
<sebastian....@gmail.com<mailto:sebastian....@gmail.com>> wrote:
It's probably the repartitioning and deserialising the df that you are seeing 
take time. Try doing this

1. Add another count after your current one and compare times
2. Move coalesce before persist



You should see

On Sun, 30 Jan 2022, 08:37 Benjamin Du, 
<legendu....@outlook.com<mailto:legendu....@outlook.com>> wrote:
I have some PySpark code like below. Basically, I persist a DataFrame (which is 
time-consuming to compute) to disk, call the method DataFrame.count to trigger 
the caching/persist immediately, and then I coalesce the DataFrame to reduce 
the number of partitions (the original DataFrame has 30,000 partitions) and 
output it to HDFS. Based on the execution time of job stages and the execution 
plan, it seems to me that the DataFrame is recomputed at df.coalesce(300). Does 
anyone know why this happens?


df = spark.read.parquet("/input/hdfs/path") \
    .filter(...) \
    .withColumn("new_col", my_pandas_udf("col0", "col1")) \
    .persist(StorageLevel.DISK_ONLY)
df.count()
df.coalesce(300).write.mode("overwrite").parquet(output_mod)


BTW, it works well if I manually write the DataFrame to HDFS, read it back, 
coalesce it and write it back to HDFS.

Originally post at 
https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice.<https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice>

Best,

----

Ben Du

Personal Blog<http://www.legendu.net/> | GitHub<https://github.com/dclong/> | 
Bitbucket<https://bitbucket.org/dclong/> | Docker 
Hub<https://hub.docker.com/r/dclong/>

Reply via email to