Thanks for the suggestions Mich, Jörn, and Adam.
The rationale for long-lived app with loop versus submitting multiple yarn
applications is mainly for simplicity. Plan to run app on an multi-tenant EMR
cluster alongside other yarn apps. Implementing the loop outside the Spark app
will work but introduces more complexity compared to single long-lived Spark
app with dynamic allocation + min executors. Specifically,
* Introduce component that submits an EMR step to run `spark-submit`
* Define YARN queue for my app such resources are reserved and other
tenants will not prevent my app from entering RUNNING state
* Determine whether the previous YARN app is FINISHED (or just submit a
bunch of apps up front and rely on yarn SUBMITTED/ACCEPTED states)
So I really was hoping for being able to recreate the Spark Context, or at
least find some way to trigger a clean of the DiskBlockManager in between loop
iterations. If no way to do this, I will test performance of cloud based
shuffle. This might be better for cost savings too (S3 vs. EBS) and allow me to
use smaller instances too (I was using beefy instances and beefy executors to
improve shuffle locality).
To the other points:
1. Dynamic allocation is enabled suspect not the issue here. Enabling
`spark.shuffle.service.removeShuffle` didn’t seem to help much – likely
because executors are not being decommissioned often due to nature of the tight
loop and the fact executor timeout was already raised from 60s default to 300s.
2. Cloud shuffle + S3 lifecycle policy or brute force/cron removing files
will for sure work but looking for something more “elegant”
3. Shuffle data should be deleted after it’s no longer needed • From my
understanding of the spark codebase the only time the DiskBlockManager cleans
the `spark.local.dir` directory [1] is when stop() is called – which only
happens when the SparkEnv is stopped [2].
4. Suspect spilled data is not what’s filling up disk since app barely
spills to disk [3]. Also supporting this hypothesis was that raising
`spark.shuffle.sort.bypassMergeThreshold` to above the max reducer partitions
significantly slowed the rate of disk usage
5.
Daniel
[1]
https://github.com/apache/spark/blob/8f5a647b0bbb6e83ee484091d3422b24baea5a80/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala#L369
[2]
https://github.com/apache/spark/blob/c4e4497ff7e747eb71d087cdfb1b51673c53b83b/core/src/main/scala/org/apache/spark/SparkEnv.scala#L112
[3] Was able to eliminate most of the skew during repartitionByRange by
dynamically salting keys using the results of df.stat.countMinSketch
From: Mich Talebzadeh
Date: Sunday, February 18, 2024 at 1:38 AM
Cc: "user@spark.apache.org"
Subject: RE: [EXTERNAL] Re-create SparkContext of SparkSession inside
long-lived Spark app
CAUTION: This email originated from outside of the organization. Do not click
links or open attachments unless you can confirm the sender and know the
content is safe.
Hi,
What do you propose or you think will help when these spark jobs are
independent of each other --> So once a job/iterator is complete, there is no
need to retain these shuffle files. You have a number of options to consider
starting from spark configuration parameters and so forth
https://spark.apache.org/docs/latest/configuration.html#shuffle-behavior
Aside, have you turned on dynamic resource allocation and the relevant
parameters. Can you up executor memory -> spark.storage.,memoryFraction and
spark.shuffle.spillThreshold as well? You can of course use brute force with
shutil.rmtree(path) to remove these files.
HTH
Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom
[https://ci3.googleusercontent.com/mail-sig/AIorK4zholKucR2Q9yMrKbHNn-o1TuS4mYXyi2KO6Xmx6ikHPySa9MLaLZ8t2hrA6AUcxSxDgHIwmKE]
view my Linkedin
profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
https://en.everybodywiki.com/Mich_Talebzadeh
Disclaimer: The information provided is correct to the best of my knowledge but
of course cannot be guaranteed . It is essential to note that, as with any
advice, one verified and tested result holds more weight than a thousand expert
opinions.
On Sat, 17 Feb 2024 at 23:40, Saha, Daniel wrote:
Hi,
Background: I am running into executor disk space issues when running a
long-lived Spark 3.3 app with YARN on AWS EMR. The app performs back-to-back
spark jobs in a sequential loop with each iteration performing 100gb+ shuffles.
The files taking up the space are related to shuffle blocks [1]. Disk is only
cleared when restarting the YARN app. For all intents and purposes, each job is
independent. So once a job/iterator is complete, there is no need to retain
these shuffle files. I want to try stopping and recreating the Spark context
between loop iterations/jobs to indicate to Spark DiskBlockManager that these
intermediate results a