I don't think coalesce (by repartitioning I assume you mean coalesce) itself 
and deserialising takes that much time. To add a little bit more context, the 
computation of the DataFrame is CPU intensive instead of data/IO intensive. I 
purposely keep coalesce​ after df.count​ as I want to keep the large number of 
partitions (30k) when computing the DataFrame so that I can get a much higher 
parallelism. After the computation, I reduce the number of partitions (to avoid 
having too many small files on HDFS). It typically takes about 5 hours to 
compute the DataFrame (when 30k partitions is used) and write it to disk 
(without doing repartitioning or coalesce). If I manually write the computed 
DataFrame to disk, read it back, coalesce it and then write it back to disk, it 
also takes about 5 hours. The code that I pasted in this thread takes forever 
to run as the DataFrame is obviously recomputed at df.coalesce​ and with a 
parallelism of 300 partitions, it is almost impossible to compute the DataFrame 
in a reasonable amount of time.

I tried various ways but none of them worked except manually write to disk, 
read it back, repartition/coalesce it, and then write it back to HDFS.

  1.  checkpoint by itself computer the DataFrame twice. (This is a known 
existing bug of checkpoint).

    output_mod = f"{output}/job={mod}"
    spark.read.parquet("/input/hdfs/path") \
        .filter(col("n0") == n0) \
        .filter(col("n1") == n1) \
        .filter(col("h1") == h1) \
        .filter(col("j1").isin(j1)) \
        .filter(col("j0") == j0) \
        .filter(col("h0").isin(h0)) \
        .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
        .withColumn("test", test_score_r4(col("id0"), col("id1"))) \
        .checkpoint() \
        .coalesce(300) \
        .write.mode("overwrite").parquet(output_mod)


  1.  persist (to Disk) + count computer the DataFrame twice.

    output_mod = f"{output}/job={mod}"
    df = spark.read.parquet("/input/hdfs/path") \
        .filter(col("n0") == n0) \
        .filter(col("n1") == n1) \
        .filter(col("h1") == h1) \
        .filter(col("j1").isin(j1)) \
        .filter(col("j0") == j0) \
        .filter(col("h0").isin(h0)) \
        .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
        .withColumn("test", test_score_r4(col("id0"), col("id1"))) \
        .persist(StorageLevel.DISK_ONLY)
    df.count()
    df.coalesce(300).write.mode("overwrite").parquet(output_mod)


  1.  persist to memory + count computes the DataFrame twice

    output_mod = f"{output}/job={mod}"
    df = spark.read.parquet("/input/hdfs/path") \
        .filter(col("n0") == n0) \
        .filter(col("n1") == n1) \
        .filter(col("h1") == h1) \
        .filter(col("j1").isin(j1)) \
        .filter(col("j0") == j0) \
        .filter(col("h0").isin(h0)) \
        .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
        .withColumn("test", test_score_r4(col("id0"), col("id1"))) \
        .persist(StorageLevel.MEMORY_ONLY)
    df.count()
    df.coalesce(300).write.mode("overwrite").parquet(output_mod)


  1.  persist (to memory) + checkpoint + coalesce computes the DataFrame twice

    output_mod = f"{output}/job={mod}"
    df = spark.read.parquet("/input/hdfs/path") \
        .filter(col("n0") == n0) \
        .filter(col("n1") == n1) \
        .filter(col("h1") == h1) \
        .filter(col("j1").isin(j1)) \
        .filter(col("j0") == j0) \
        .filter(col("h0").isin(h0)) \
        .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
        .withColumn("test", test_score_r4(col("id0"), col("id1"))) \
        .persist(StorageLevel.MEMORY_ONLY) \
        .checkpoint() \
        .coalesce(300).write.mode("overwrite").parquet(output_mod)


  1.  persist (to memory) + checkpoint + without coalesce computes the 
DataFrame twice

    output_mod = f"{output}/job={mod}"
    df = spark.read.parquet("/input/hdfs/path") \
        .filter(col("n0") == n0) \
        .filter(col("n1") == n1) \
        .filter(col("h1") == h1) \
        .filter(col("j1").isin(j1)) \
        .filter(col("j0") == j0) \
        .filter(col("h0").isin(h0)) \
        .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
        .withColumn("test", test_score_r4(col("id0"), col("id1"))) \
        .persist(StorageLevel.MEMORY_ONLY) \
        .checkpoint() \
        .write.mode("overwrite").parquet(output_mod)


  1.  cache (equivalent to persist to MEMORY_AND_DISK) + count + coalesce 
computes it twice

    output_mod = f"{output}/job={mod}"
    df = spark.read.parquet("/input/hdfs/path") \
        .filter(col("n0") == n0) \
        .filter(col("n1") == n1) \
        .filter(col("h1") == h1) \
        .filter(col("j1").isin(j1)) \
        .filter(col("j0") == j0) \
        .filter(col("h0").isin(h0)) \
        .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
        .withColumn("test", test_score_r4(col("id0"), col("id1"))) \
        .cache()
    df.count()
    df.coalesce(300).write.mode("overwrite").parquet(output_mod)


A

Manual output compute it only once. The function repart_hdfs below is a 
function written by myself to write a DataFrame to disk, read it back, 
repartition/coalesce it, and then write it back to HDFS.

spark.read.parquet("/input/hdfs/path") \
        .filter(col("n0") == n0) \
        .filter(col("n1") == n1) \
        .filter(col("h1") == h1) \
        .filter(col("j1").isin(j1)) \
        .filter(col("j0") == j0) \
        .filter(col("h0").isin(h0)) \
        .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
        .withColumn("test", test_score_r4(col("id0"), col("id1"))) \
        .write.mode("overwrite").parquet(output_mod)
    repart_hdfs(spark, output_mod, 300, coalesce=True)











Best,

----

Ben Du

Personal Blog<http://www.legendu.net/> | GitHub<https://github.com/dclong/> | 
Bitbucket<https://bitbucket.org/dclong/> | Docker 
Hub<https://hub.docker.com/r/dclong/>

________________________________
From: Sebastian Piu <sebastian....@gmail.com>
Sent: Sunday, January 30, 2022 12:44 AM
To: Benjamin Du <legendu....@outlook.com>
Cc: u...@spark.incubator.apache.org <u...@spark.incubator.apache.org>
Subject: Re: A Persisted Spark DataFrame is computed twice

It's probably the repartitioning and deserialising the df that you are seeing 
take time. Try doing this

1. Add another count after your current one and compare times
2. Move coalesce before persist



You should see

On Sun, 30 Jan 2022, 08:37 Benjamin Du, 
<legendu....@outlook.com<mailto:legendu....@outlook.com>> wrote:
I have some PySpark code like below. Basically, I persist a DataFrame (which is 
time-consuming to compute) to disk, call the method DataFrame.count to trigger 
the caching/persist immediately, and then I coalesce the DataFrame to reduce 
the number of partitions (the original DataFrame has 30,000 partitions) and 
output it to HDFS. Based on the execution time of job stages and the execution 
plan, it seems to me that the DataFrame is recomputed at df.coalesce(300). Does 
anyone know why this happens?


df = spark.read.parquet("/input/hdfs/path") \
    .filter(...) \
    .withColumn("new_col", my_pandas_udf("col0", "col1")) \
    .persist(StorageLevel.DISK_ONLY)
df.count()
df.coalesce(300).write.mode("overwrite").parquet(output_mod)


BTW, it works well if I manually write the DataFrame to HDFS, read it back, 
coalesce it and write it back to HDFS.

Originally post at 
https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice.<https://stackoverflow.com/questions/70781494/a-persisted-spark-dataframe-is-computed-twice>

Best,

----

Ben Du

Personal Blog<http://www.legendu.net/> | GitHub<https://github.com/dclong/> | 
Bitbucket<https://bitbucket.org/dclong/> | Docker 
Hub<https://hub.docker.com/r/dclong/>

Reply via email to