I have some PySpark code like below. Basically, I persist a DataFrame (which is time-consuming to compute) to disk, call the method DataFrame.count to trigger the caching/persist immediately, and then I coalesce the DataFrame to reduce the number of partitions (the original DataFrame has 30,000 partitions) and output it to HDFS. Based on the execution time of job stages and the execution plan, it seems to me that the DataFrame is recomputed at df.coalesce(300). Does anyone know why this happens?
df = spark.read.parquet("/input/hdfs/path") \ .filter(...) \ .withColumn("new_col", my_pandas_udf("col0", "col1")) \ .persist(StorageLevel.DISK_ONLY) df.count() df.coalesce(300).write.mode("overwrite").parquet(output_mod) BTW, it works well if I manually write the DataFrame to HDFS, read it back, coalesce it and write it back to HDFS. Originally post at https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice.<https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice> Best, ---- Ben Du Personal Blog<http://www.legendu.net/> | GitHub<https://github.com/dclong/> | Bitbucket<https://bitbucket.org/dclong/> | Docker Hub<https://hub.docker.com/r/dclong/>