The only relevant setting i see in Yarn is this:
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>120726</value>
</property>
which is 120GB and we are well below that. I don't see a total limit.
I haven't played with spark.memory.fraction. I'm not sure if it makes a
difference. Note that there are no errors coming from Spark with respect to
memory being an issue. Yarn kills the JVM and just prints out one line: Out of
memory in the stdout of the container. After that Spark complains about the
ExecutorLostFailure. So the memory factions are not playing a factor here.
I just looked at the link you included. Thank you. Yes this is the same problem
however it looks like no one has come up with a solution for this problem yet
________________________________
From: yohann jardin <[email protected]>
Sent: Friday, July 28, 2017 10:47:40 AM
To: jeff saremi; [email protected]
Subject: Re: How to configure spark on Yarn cluster
Not sure that we are OK on one thing: Yarn limitations are for the sum of all
nodes, while you only specify the memory for a single node through Spark.
By the way, the memory displayed in the UI is only a part of the total memory
allocation:
https://spark.apache.org/docs/latest/configuration.html#memory-management
It corresponds to “spark.memory.fraction”, so it will mainly be filled by the
rdd you’re trying to persist. The memory left by this parameter will be used to
read the input file and compute. When the fail comes from this, the Out Of
Memory exception is quite explicit in the driver logs.
Testing sampled files of 1 GB, 1 TB, 10 TB should help investigate what goes
right and what goes wrong at least a bit.
Also, did you check for similar issues on stackoverflow? Like
https://stackoverflow.com/questions/40781354/container-killed-by-yarn-for-exceeding-memory-limits-10-4-gb-of-10-4-gb-physic
Regards,
Yohann Jardin
Le 7/28/2017 à 6:05 PM, jeff saremi a écrit :
Thanks so much Yohann
I checked the Storage/Memory column in Executors status page. Well below where
I wanted to be.
I will try the suggestion on smaller data sets.
I am also well within the Yarn limitations (128GB). In my last try I asked for
48+32 (overhead). So somehow I am exceeding that or I should say Spark is
exceeding since I am trusting to manage the memory I provided for it.
Is there anything in Shuffle Write Size, Shuffle Spill, or anything in the logs
that I should be looking for to come up with the recommended memory size or
partition count?
thanks
________________________________
From: yohann jardin <[email protected]><mailto:[email protected]>
Sent: Thursday, July 27, 2017 11:15:39 PM
To: jeff saremi; [email protected]<mailto:[email protected]>
Subject: Re: How to configure spark on Yarn cluster
Check the executor page of the Spark UI, to check if your storage level is
limiting.
Also, instead of starting with 100 TB of data, sample it, make it work, and
grow it little by little until you reached 100 TB. This will validate the
workflow and let you see how much data is shuffled, etc.
And just in case, check the limits you set on your Yarn queue. If you try to
allocate more memory to your job than what is set on the queue, there might be
cases of failure.
Though there are some limitations, it’s possible to allocate more ram to your
job than available on your Yarn queue.
Yohann Jardin
Le 7/28/2017 à 8:03 AM, jeff saremi a écrit :
I have the simplest job which i'm running against 100TB of data. The job keeps
failing with ExecutorLostFailure's on containers killed by Yarn for exceeding
memory limits
I have varied the executor-memory from 32GB to 96GB, the
spark.yarn.executor.memoryOverhead from 8192 to 36000 and similar changes to
the number of cores, and driver size. It looks like nothing stops this error
(running out of memory) from happening. Looking at metrics reported by Spark
status page, is there anything I can use to configure my job properly? Is
repartitioning more or less going to help at all? The current number of
partitions is around 40,000 currently.
Here's the gist of the code:
val input = sc.textFile(path)
val t0 = input.map(s => s.split("\t").map(a => ((a(0),a(1)), a)))
t0.persist(StorageLevel.DISK_ONLY)
I have changed storagelevel from MEMORY_ONLY to MEMORY_AND_DISK to DISK_ONLY to
no avail.