There are several possibilities here.
1) Keep in mind that 7GB data will need way more than 7G heap, as deserialize 
java object needs much more space than data itself. Grand rule is multiple 6 to 
8 times, so 7G data need 50G heap space.2) You should monitor the Spark UI, to 
check how many records being processed by task, and if the failed tasks have 
more data than the rest. Even current you have tasks failed, you will also have 
the tasks succeeded. Compare them, does the failed tasks process way more 
records than the succeeded ones? If so, it indicates you have data skew 
problem.3) If the failed tasks allocated similar records as succeeded ones, 
then you just add more partitions, to make each task processing less data, You 
should always monitor the GC output in these cases.4) If most of your tasks 
failed due to memory, then your setting is too small for your data, adding 
partitions or memory.


Date: Fri, 28 Aug 2015 13:55:52 -0700
Subject: Re: How to avoid shuffle errors for a large join ?

Yeah, I tried with 10k and 30k and these still failed, will try with more then. 
Though that is a little disappointing, it only writes ~7TB of shuffle data 
which shouldn't in theory require more than 1000 reducers on my 10TB memory 
cluster (~7GB of spill per reducer).
I'm now wondering if my shuffle partitions are uneven and I should use a custom 
partitioner, is there a way to get stats on the partition sizes from Spark ?
On Fri, Aug 28, 2015 at 12:46 PM, Jason <> wrote:
I had similar problems to this (reduce side failures for large joins (25bn rows 
with 9bn)), and found the answer was to further up the 
spark.sql.shuffle.partitions=1000. In my case, 16k partitions worked for me, 
but your tables look a little denser, so you may want to go even higher.
On Thu, Aug 27, 2015 at 6:04 PM Thomas Dudziak <> wrote:
I'm getting errors like "Removing executor with no recent heartbeats" & 
"Missing an output location for shuffle" errors for a large SparkSql join (1bn 
rows/2.5TB joined with 1bn rows/30GB) and I'm not sure how to configure the job 
to avoid them.

The initial stage completes fine with some 30k tasks on a cluster with 70 
machines/10TB memory, generating about 6.5TB of shuffle writes, but then the 
shuffle stage first waits 30min in the scheduling phase according to the UI, 
and then dies with the mentioned errors.

I can see in the GC logs that the executors reach their memory limits (32g per 
executor, 2 workers per machine) and can't allocate any more stuff in the heap. 
Fwiw, the top 10 in the memory use histogram are:

num     #instances         #bytes  class 
name----------------------------------------------   1:     249139595    
11958700560  scala.collection.immutable.HashMap$HashMap1   2:     251085327     
8034730464  scala.Tuple2   3:     243694737     5848673688  java.lang.Float   
4:     231198778     5548770672  java.lang.Integer   5:      72191585     
4298521576  [Lscala.collection.immutable.HashMap;   6:      72191582     
2310130624  scala.collection.immutable.HashMap$HashTrieMap   7:      74114058   
  1778737392  java.lang.Long   8:       6059103      779203840  
[Ljava.lang.Object;   9:       5461096      174755072  
scala.collection.mutable.ArrayBuffer  10:         34749       70122104  [B
Relevant settings are (Spark 1.4.1, Java 8 with G1 GC):

spark.core.connection.ack.wait.timeout 600spark.executor.heartbeatInterval      
 60sspark.executor.memory                  32gspark.mesos.coarse                              
600sspark.shuffle.blockTransferService     nettyspark.shuffle.consolidateFiles  
           6spark.shuffle.manager                  sort
The join is currently configured with spark.sql.shuffle.partitions=1000 but 
that doesn't seem to help. Would increasing the partitions help ? Is there a 
formula to determine an approximate partitions number value for a join ?
Any help with this job would be appreciated !


Reply via email to