I don't think coalesce (by repartitioning I assume you mean coalesce) itself and deserialising takes that much time. To add a little bit more context, the computation of the DataFrame is CPU intensive instead of data/IO intensive. I purposely keep coalesce after df.count as I want to keep the large number of partitions (30k) when computing the DataFrame so that I can get a much higher parallelism. After the computation, I reduce the number of partitions (to avoid having too many small files on HDFS). It typically takes about 5 hours to compute the DataFrame (when 30k partitions is used) and write it to disk (without doing repartitioning or coalesce). If I manually write the computed DataFrame to disk, read it back, coalesce it and then write it back to disk, it also takes about 5 hours. The code that I pasted in this thread takes forever to run as the DataFrame is obviously recomputed at df.coalesce and with a parallelism of 300 partitions, it is almost impossible to compute the DataFrame in a reasonable amount of time.
I tried various ways but none of them worked except manually write to disk, read it back, repartition/coalesce it, and then write it back to HDFS. 1. checkpoint by itself computer the DataFrame twice. (This is a known existing bug of checkpoint). output_mod = f"{output}/job={mod}" spark.read.parquet("/input/hdfs/path") \ .filter(col("n0") == n0) \ .filter(col("n1") == n1) \ .filter(col("h1") == h1) \ .filter(col("j1").isin(j1)) \ .filter(col("j0") == j0) \ .filter(col("h0").isin(h0)) \ .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \ .withColumn("test", test_score_r4(col("id0"), col("id1"))) \ .checkpoint() \ .coalesce(300) \ .write.mode("overwrite").parquet(output_mod) 1. persist (to Disk) + count computer the DataFrame twice. output_mod = f"{output}/job={mod}" df = spark.read.parquet("/input/hdfs/path") \ .filter(col("n0") == n0) \ .filter(col("n1") == n1) \ .filter(col("h1") == h1) \ .filter(col("j1").isin(j1)) \ .filter(col("j0") == j0) \ .filter(col("h0").isin(h0)) \ .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \ .withColumn("test", test_score_r4(col("id0"), col("id1"))) \ .persist(StorageLevel.DISK_ONLY) df.count() df.coalesce(300).write.mode("overwrite").parquet(output_mod) 1. persist to memory + count computes the DataFrame twice output_mod = f"{output}/job={mod}" df = spark.read.parquet("/input/hdfs/path") \ .filter(col("n0") == n0) \ .filter(col("n1") == n1) \ .filter(col("h1") == h1) \ .filter(col("j1").isin(j1)) \ .filter(col("j0") == j0) \ .filter(col("h0").isin(h0)) \ .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \ .withColumn("test", test_score_r4(col("id0"), col("id1"))) \ .persist(StorageLevel.MEMORY_ONLY) df.count() df.coalesce(300).write.mode("overwrite").parquet(output_mod) 1. persist (to memory) + checkpoint + coalesce computes the DataFrame twice output_mod = f"{output}/job={mod}" df = spark.read.parquet("/input/hdfs/path") \ .filter(col("n0") == n0) \ .filter(col("n1") == n1) \ .filter(col("h1") == h1) \ .filter(col("j1").isin(j1)) \ .filter(col("j0") == j0) \ .filter(col("h0").isin(h0)) \ .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \ .withColumn("test", test_score_r4(col("id0"), col("id1"))) \ .persist(StorageLevel.MEMORY_ONLY) \ .checkpoint() \ .coalesce(300).write.mode("overwrite").parquet(output_mod) 1. persist (to memory) + checkpoint + without coalesce computes the DataFrame twice output_mod = f"{output}/job={mod}" df = spark.read.parquet("/input/hdfs/path") \ .filter(col("n0") == n0) \ .filter(col("n1") == n1) \ .filter(col("h1") == h1) \ .filter(col("j1").isin(j1)) \ .filter(col("j0") == j0) \ .filter(col("h0").isin(h0)) \ .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \ .withColumn("test", test_score_r4(col("id0"), col("id1"))) \ .persist(StorageLevel.MEMORY_ONLY) \ .checkpoint() \ .write.mode("overwrite").parquet(output_mod) 1. cache (equivalent to persist to MEMORY_AND_DISK) + count + coalesce computes it twice output_mod = f"{output}/job={mod}" df = spark.read.parquet("/input/hdfs/path") \ .filter(col("n0") == n0) \ .filter(col("n1") == n1) \ .filter(col("h1") == h1) \ .filter(col("j1").isin(j1)) \ .filter(col("j0") == j0) \ .filter(col("h0").isin(h0)) \ .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \ .withColumn("test", test_score_r4(col("id0"), col("id1"))) \ .cache() df.count() df.coalesce(300).write.mode("overwrite").parquet(output_mod) A Manual output compute it only once. The function repart_hdfs below is a function written by myself to write a DataFrame to disk, read it back, repartition/coalesce it, and then write it back to HDFS. spark.read.parquet("/input/hdfs/path") \ .filter(col("n0") == n0) \ .filter(col("n1") == n1) \ .filter(col("h1") == h1) \ .filter(col("j1").isin(j1)) \ .filter(col("j0") == j0) \ .filter(col("h0").isin(h0)) \ .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \ .withColumn("test", test_score_r4(col("id0"), col("id1"))) \ .write.mode("overwrite").parquet(output_mod) repart_hdfs(spark, output_mod, 300, coalesce=True) Best, ---- Ben Du Personal Blog<http://www.legendu.net/> | GitHub<https://github.com/dclong/> | Bitbucket<https://bitbucket.org/dclong/> | Docker Hub<https://hub.docker.com/r/dclong/> ________________________________ From: Sebastian Piu <sebastian....@gmail.com> Sent: Sunday, January 30, 2022 12:44 AM To: Benjamin Du <legendu....@outlook.com> Cc: u...@spark.incubator.apache.org <u...@spark.incubator.apache.org> Subject: Re: A Persisted Spark DataFrame is computed twice It's probably the repartitioning and deserialising the df that you are seeing take time. Try doing this 1. Add another count after your current one and compare times 2. Move coalesce before persist You should see On Sun, 30 Jan 2022, 08:37 Benjamin Du, <legendu....@outlook.com<mailto:legendu....@outlook.com>> wrote: I have some PySpark code like below. Basically, I persist a DataFrame (which is time-consuming to compute) to disk, call the method DataFrame.count to trigger the caching/persist immediately, and then I coalesce the DataFrame to reduce the number of partitions (the original DataFrame has 30,000 partitions) and output it to HDFS. Based on the execution time of job stages and the execution plan, it seems to me that the DataFrame is recomputed at df.coalesce(300). Does anyone know why this happens? df = spark.read.parquet("/input/hdfs/path") \ .filter(...) \ .withColumn("new_col", my_pandas_udf("col0", "col1")) \ .persist(StorageLevel.DISK_ONLY) df.count() df.coalesce(300).write.mode("overwrite").parquet(output_mod) BTW, it works well if I manually write the DataFrame to HDFS, read it back, coalesce it and write it back to HDFS. Originally post at https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice.<https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice> Best, ---- Ben Du Personal Blog<http://www.legendu.net/> | GitHub<https://github.com/dclong/> | Bitbucket<https://bitbucket.org/dclong/> | Docker Hub<https://hub.docker.com/r/dclong/>