ok thanks. guess i am simply misremembering that i saw the shuffle files
getting re-used across jobs (actions). it was probably across stages for
the same job.

in structured streaming this is a pretty big deal. if you join a streaming
dataframe with a large static dataframe each microbatch becomes a job
(action), so the large static dataframe gets reshuffled for every
microbatch. observing this performance issue was actually why i did the
little basic experiment in this post.


On Sat, Jul 16, 2022 at 12:33 PM Shay Elbaz <shay.el...@gm.com> wrote:

> Spark can reuse shuffle stages *in the same job *(action), not cross jobs.
> ------------------------------
> *From:* Koert Kuipers <ko...@tresata.com>
> *Sent:* Saturday, July 16, 2022 6:43 PM
> *To:* user <user@spark.apache.org>
> *Subject:* [EXTERNAL] spark re-use shuffle files not happening
>
>
> *ATTENTION:* This email originated from outside of GM.
>
>
> i have seen many jobs where spark re-uses shuffle files (and skips a stage
> of a job), which is an awesome feature given how expensive shuffles are,
> and i generally now assume this will happen.
>
> however i feel like i am going a little crazy today. i did the simplest
> test in spark 3.3.0, basically i run 2 jobs within same spark shell, so
> using same spark session, and broadcast join is disabled so we get
> shuffles:
> 1) job1 joins dataframe1 with dataframe0 and writes results out.
> 2) job2 joins dataframe2 with dataframe0 and writes results out.
>
> i would expect job2 to skip the stage where dataframe0 is getting
> shuffled, but its not skipping it! what am i missing?
> is shuffle re-use only enabled within same job/action? that goes against
> what i remember...
>
> code:
> $ spark-shell --conf spark.sql.autoBroadcastJoinThreshold=-1
> scala> val data0 = spark.read.format("csv").option("header",
> true).load("data0.csv")
> scala> val data1 = spark.read.format("csv").option("header",
> true).load("data1.csv")
> scala> val data2 = spark.read.format("csv").option("header",
> true).load("data2.csv")
> scala> data1.join(data0, "key").write.format("parquet").save("out1")
> scala> data2.join(data0, "key").write.format("parquet").save("out2") //
> should skip stage that scans csv for data0 and writes shuffle files... but
> it doesn't
>
>
>
> CONFIDENTIALITY NOTICE: This electronic communication and any files
> transmitted with it are confidential, privileged and intended solely for
> the use of the individual or entity to whom they are addressed. If you are
> not the intended recipient, you are hereby notified that any disclosure,
> copying, distribution (electronic or otherwise) or forwarding of, or the
> taking of any action in reliance on the contents of this transmission is
> strictly prohibited. Please notify the sender immediately by e-mail if you
> have received this email by mistake and delete this email from your system.
>
> Is it necessary to print this email? If you care about the environment
> like we do, please refrain from printing emails. It helps to keep the
> environment forested and litter-free.
>

-- 
CONFIDENTIALITY NOTICE: This electronic communication and any files 
transmitted with it are confidential, privileged and intended solely for 
the use of the individual or entity to whom they are addressed. If you are 
not the intended recipient, you are hereby notified that any disclosure, 
copying, distribution (electronic or otherwise) or forwarding of, or the 
taking of any action in reliance on the contents of this transmission is 
strictly prohibited. Please notify the sender immediately by e-mail if you 
have received this email by mistake and delete this email from your system.


Is it necessary to print this email? If you care about the environment 
like we do, please refrain from printing emails. It helps to keep the 
environment forested and litter-free.

Reply via email to