I have some PySpark code like below. Basically, I persist a DataFrame (which is 
time-consuming to compute) to disk, call the method DataFrame.count to trigger 
the caching/persist immediately, and then I coalesce the DataFrame to reduce 
the number of partitions (the original DataFrame has 30,000 partitions) and 
output it to HDFS. Based on the execution time of job stages and the execution 
plan, it seems to me that the DataFrame is recomputed at df.coalesce(300). Does 
anyone know why this happens?


df = spark.read.parquet("/input/hdfs/path") \
    .filter(...) \
    .withColumn("new_col", my_pandas_udf("col0", "col1")) \
    .persist(StorageLevel.DISK_ONLY)
df.count()
df.coalesce(300).write.mode("overwrite").parquet(output_mod)


BTW, it works well if I manually write the DataFrame to HDFS, read it back, 
coalesce it and write it back to HDFS.

Originally post at 
https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice.<https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice>

Best,

----

Ben Du

Personal Blog<http://www.legendu.net/> | GitHub<https://github.com/dclong/> | 
Bitbucket<https://bitbucket.org/dclong/> | Docker 
Hub<https://hub.docker.com/r/dclong/>

Reply via email to