The job fails before getting to groupByKey. I see a lot of timeout errors in the yarn logs, like:
15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1 attempts akka.pattern.AskTimeoutException: Timed out and 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] and some of these are followed by: 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@...] -> [akka.tcp://sparkDriver@...] disassociated! Shutting down. 15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0 in stage 1.0 (TID 336601) java.io.FileNotFoundException: ..../hadoop/yarn/local/......../spark-local-20150228123450-3a71/36/shuffle_0_421027_0 (No such file or directory) On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc <paul.sz...@gmail.com> wrote: > I would first check whether there is any possibility that after doing > groupbykey one of the groups does not fit in one of the executors' memory. > > To back up my theory, instead of doing groupbykey + map try reducebykey + > mapvalues. > > Let me know if that helped. > > Pawel Szulc > http://rabbitonweb.com > > sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik <arun.lut...@gmail.com> > napisał: > > So, actually I am removing the persist for now, because there is >> significant filtering that happens after calling textFile()... but I will >> keep that option in mind. >> >> I just tried a few different combinations of number of executors, >> executor memory, and more importantly, number of tasks... *all three >> times it failed when approximately 75.1% of the tasks were completed (no >> matter how many tasks resulted from repartitioning the data in >> textfile(..., N))*. Surely this is a strong clue to something? >> >> >> >> On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz <brk...@gmail.com> wrote: >> >>> Hi, >>> >>> Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER` >>> generates many small objects that lead to very long GC time, causing the >>> executor losts, heartbeat not received, and GC overhead limit exceeded >>> messages. >>> Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can >>> also try `OFF_HEAP` (and use Tachyon). >>> >>> Burak >>> >>> On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra <arun.lut...@gmail.com> >>> wrote: >>> >>>> My program in pseudocode looks like this: >>>> >>>> val conf = new SparkConf().setAppName("Test") >>>> .set("spark.storage.memoryFraction","0.2") // default 0.6 >>>> .set("spark.shuffle.memoryFraction","0.12") // default 0.2 >>>> .set("spark.shuffle.manager","SORT") // preferred setting for >>>> optimized joins >>>> .set("spark.shuffle.consolidateFiles","true") // helpful for "too >>>> many files open" >>>> .set("spark.mesos.coarse", "true") // helpful for >>>> MapOutputTracker errors? >>>> .set("spark.akka.frameSize","500") // helpful when using >>>> consildateFiles=true >>>> .set("spark.akka.askTimeout", "30") >>>> .set("spark.shuffle.compress","false") // >>>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html >>>> .set("spark.file.transferTo","false") // >>>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html >>>> .set("spark.core.connection.ack.wait.timeout","600") // >>>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html >>>> .set("spark.speculation","true") >>>> .set("spark.worker.timeout","600") // >>>> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html >>>> .set("spark.akka.timeout","300") // >>>> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html >>>> .set("spark.storage.blockManagerSlaveTimeoutMs","120000") >>>> .set("spark.driver.maxResultSize","2048") // in response to >>>> error: Total size of serialized results of 39901 tasks (1024.0 MB) is >>>> bigger than spark.driver.maxResultSize (1024.0 MB) >>>> .set("spark.serializer", >>>> "org.apache.spark.serializer.KryoSerializer") >>>> >>>> .set("spark.kryo.registrator","com.att.bdcoe.cip.ooh.MyRegistrator") >>>> .set("spark.kryo.registrationRequired", "true") >>>> >>>> val rdd1 = >>>> sc.textFile(file1).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split("\\|", >>>> -1)...filter(...) >>>> >>>> val rdd2 = >>>> sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split("\\|", >>>> -1)...filter(...) >>>> >>>> >>>> rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).saveAsTextFile() >>>> >>>> >>>> I run the code with: >>>> --num-executors 500 \ >>>> --driver-memory 20g \ >>>> --executor-memory 20g \ >>>> --executor-cores 32 \ >>>> >>>> >>>> I'm using kryo serialization on everything, including broadcast >>>> variables. >>>> >>>> Spark creates 145k tasks, and the first stage includes everything >>>> before groupByKey(). It fails before getting to groupByKey. I have tried >>>> doubling and tripling the number of partitions when calling textFile, with >>>> no success. >>>> >>>> Very similar code (trivial changes, to accomodate different input) >>>> worked on a smaller input (~8TB)... Not that it was easy to get that >>>> working. >>>> >>>> >>>> >>>> Errors vary, here is what I am getting right now: >>>> >>>> ERROR SendingConnection: Exception while reading SendingConnection >>>> ... java.nio.channels.ClosedChannelException >>>> (^ guessing that is symptom of something else) >>>> >>>> WARN BlockManagerMasterActor: Removing BlockManager >>>> BlockManagerId(...) with no recent heart beats: 120030ms exceeds 120000ms >>>> (^ guessing that is symptom of something else) >>>> >>>> ERROR ActorSystemImpl: Uncaught fatal error from thread (...) shutting >>>> down ActorSystem [sparkDriver] >>>> *java.lang.OutOfMemoryError: GC overhead limit exceeded* >>>> >>>> >>>> >>>> Other times I will get messages about "executor lost..." about 1 >>>> message per second, after ~~50k tasks complete, until there are almost no >>>> executors left and progress slows to nothing. >>>> >>>> I ran with verbose GC info; I do see failing yarn containers that have >>>> multiple (like 30) "Full GC" messages but I don't know how to interpret if >>>> that is the problem. Typical Full GC time taken seems ok: [Times: >>>> user=23.30 sys=0.06, real=1.94 secs] >>>> >>>> >>>> >>>> Suggestions, please? >>>> >>>> Huge thanks for useful suggestions, >>>> Arun >>>> >>> >>> >>