There are several possibilities here. 1) Keep in mind that 7GB data will need way more than 7G heap, as deserialize java object needs much more space than data itself. Grand rule is multiple 6 to 8 times, so 7G data need 50G heap space.2) You should monitor the Spark UI, to check how many records being processed by task, and if the failed tasks have more data than the rest. Even current you have tasks failed, you will also have the tasks succeeded. Compare them, does the failed tasks process way more records than the succeeded ones? If so, it indicates you have data skew problem.3) If the failed tasks allocated similar records as succeeded ones, then you just add more partitions, to make each task processing less data, You should always monitor the GC output in these cases.4) If most of your tasks failed due to memory, then your setting is too small for your data, adding partitions or memory.
Yong From: tom...@gmail.com Date: Fri, 28 Aug 2015 13:55:52 -0700 Subject: Re: How to avoid shuffle errors for a large join ? To: ja...@jasonknight.us CC: user@spark.apache.org Yeah, I tried with 10k and 30k and these still failed, will try with more then. Though that is a little disappointing, it only writes ~7TB of shuffle data which shouldn't in theory require more than 1000 reducers on my 10TB memory cluster (~7GB of spill per reducer). I'm now wondering if my shuffle partitions are uneven and I should use a custom partitioner, is there a way to get stats on the partition sizes from Spark ? On Fri, Aug 28, 2015 at 12:46 PM, Jason <ja...@jasonknight.us> wrote: I had similar problems to this (reduce side failures for large joins (25bn rows with 9bn)), and found the answer was to further up the spark.sql.shuffle.partitions=1000. In my case, 16k partitions worked for me, but your tables look a little denser, so you may want to go even higher. On Thu, Aug 27, 2015 at 6:04 PM Thomas Dudziak <tom...@gmail.com> wrote: I'm getting errors like "Removing executor with no recent heartbeats" & "Missing an output location for shuffle" errors for a large SparkSql join (1bn rows/2.5TB joined with 1bn rows/30GB) and I'm not sure how to configure the job to avoid them. The initial stage completes fine with some 30k tasks on a cluster with 70 machines/10TB memory, generating about 6.5TB of shuffle writes, but then the shuffle stage first waits 30min in the scheduling phase according to the UI, and then dies with the mentioned errors. I can see in the GC logs that the executors reach their memory limits (32g per executor, 2 workers per machine) and can't allocate any more stuff in the heap. Fwiw, the top 10 in the memory use histogram are: num #instances #bytes class name---------------------------------------------- 1: 249139595 11958700560 scala.collection.immutable.HashMap$HashMap1 2: 251085327 8034730464 scala.Tuple2 3: 243694737 5848673688 java.lang.Float 4: 231198778 5548770672 java.lang.Integer 5: 72191585 4298521576 [Lscala.collection.immutable.HashMap; 6: 72191582 2310130624 scala.collection.immutable.HashMap$HashTrieMap 7: 74114058 1778737392 java.lang.Long 8: 6059103 779203840 [Ljava.lang.Object; 9: 5461096 174755072 scala.collection.mutable.ArrayBuffer 10: 34749 70122104 [B Relevant settings are (Spark 1.4.1, Java 8 with G1 GC): spark.core.connection.ack.wait.timeout 600spark.executor.heartbeatInterval 60sspark.executor.memory 32gspark.mesos.coarse falsespark.network.timeout 600sspark.shuffle.blockTransferService nettyspark.shuffle.consolidateFiles truespark.shuffle.file.buffer 1mspark.shuffle.io.maxRetries 6spark.shuffle.manager sort The join is currently configured with spark.sql.shuffle.partitions=1000 but that doesn't seem to help. Would increasing the partitions help ? Is there a formula to determine an approximate partitions number value for a join ? Any help with this job would be appreciated ! cheers,Tom