A correction to my first post: There is also a repartition right before groupByKey to help avoid too-many-open-files error:
rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile() On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra <arun.lut...@gmail.com> wrote: > The job fails before getting to groupByKey. > > I see a lot of timeout errors in the yarn logs, like: > > 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1 attempts > akka.pattern.AskTimeoutException: Timed out > > and > > 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2 attempts > java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] > > and some of these are followed by: > > 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver > Disassociated [akka.tcp://sparkExecutor@...] -> [akka.tcp://sparkDriver@...] > disassociated! Shutting down. > 15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0 in > stage 1.0 (TID 336601) > java.io.FileNotFoundException: > ..../hadoop/yarn/local/......../spark-local-20150228123450-3a71/36/shuffle_0_421027_0 > (No such file or directory) > > > > > On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc <paul.sz...@gmail.com> wrote: > >> I would first check whether there is any possibility that after doing >> groupbykey one of the groups does not fit in one of the executors' memory. >> >> To back up my theory, instead of doing groupbykey + map try reducebykey + >> mapvalues. >> >> Let me know if that helped. >> >> Pawel Szulc >> http://rabbitonweb.com >> >> sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik <arun.lut...@gmail.com> >> napisał: >> >> So, actually I am removing the persist for now, because there is >>> significant filtering that happens after calling textFile()... but I will >>> keep that option in mind. >>> >>> I just tried a few different combinations of number of executors, >>> executor memory, and more importantly, number of tasks... *all three >>> times it failed when approximately 75.1% of the tasks were completed (no >>> matter how many tasks resulted from repartitioning the data in >>> textfile(..., N))*. Surely this is a strong clue to something? >>> >>> >>> >>> On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz <brk...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER` >>>> generates many small objects that lead to very long GC time, causing the >>>> executor losts, heartbeat not received, and GC overhead limit exceeded >>>> messages. >>>> Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can >>>> also try `OFF_HEAP` (and use Tachyon). >>>> >>>> Burak >>>> >>>> On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra <arun.lut...@gmail.com> >>>> wrote: >>>> >>>>> My program in pseudocode looks like this: >>>>> >>>>> val conf = new SparkConf().setAppName("Test") >>>>> .set("spark.storage.memoryFraction","0.2") // default 0.6 >>>>> .set("spark.shuffle.memoryFraction","0.12") // default 0.2 >>>>> .set("spark.shuffle.manager","SORT") // preferred setting for >>>>> optimized joins >>>>> .set("spark.shuffle.consolidateFiles","true") // helpful for >>>>> "too many files open" >>>>> .set("spark.mesos.coarse", "true") // helpful for >>>>> MapOutputTracker errors? >>>>> .set("spark.akka.frameSize","500") // helpful when using >>>>> consildateFiles=true >>>>> .set("spark.akka.askTimeout", "30") >>>>> .set("spark.shuffle.compress","false") // >>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html >>>>> .set("spark.file.transferTo","false") // >>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html >>>>> .set("spark.core.connection.ack.wait.timeout","600") // >>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html >>>>> .set("spark.speculation","true") >>>>> .set("spark.worker.timeout","600") // >>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html >>>>> .set("spark.akka.timeout","300") // >>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html >>>>> .set("spark.storage.blockManagerSlaveTimeoutMs","120000") >>>>> .set("spark.driver.maxResultSize","2048") // in response to >>>>> error: Total size of serialized results of 39901 tasks (1024.0 MB) is >>>>> bigger than spark.driver.maxResultSize (1024.0 MB) >>>>> .set("spark.serializer", >>>>> "org.apache.spark.serializer.KryoSerializer") >>>>> >>>>> .set("spark.kryo.registrator","com.att.bdcoe.cip.ooh.MyRegistrator") >>>>> .set("spark.kryo.registrationRequired", "true") >>>>> >>>>> val rdd1 = sc.textFile(file1).persist(StorageLevel >>>>> .MEMORY_AND_DISK_SER).map(_.split("\\|", -1)...filter(...) >>>>> >>>>> val rdd2 = >>>>> sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split("\\|", >>>>> -1)...filter(...) >>>>> >>>>> >>>>> rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).saveAsTextFile() >>>>> >>>>> >>>>> I run the code with: >>>>> --num-executors 500 \ >>>>> --driver-memory 20g \ >>>>> --executor-memory 20g \ >>>>> --executor-cores 32 \ >>>>> >>>>> >>>>> I'm using kryo serialization on everything, including broadcast >>>>> variables. >>>>> >>>>> Spark creates 145k tasks, and the first stage includes everything >>>>> before groupByKey(). It fails before getting to groupByKey. I have tried >>>>> doubling and tripling the number of partitions when calling textFile, with >>>>> no success. >>>>> >>>>> Very similar code (trivial changes, to accomodate different input) >>>>> worked on a smaller input (~8TB)... Not that it was easy to get that >>>>> working. >>>>> >>>>> >>>>> >>>>> Errors vary, here is what I am getting right now: >>>>> >>>>> ERROR SendingConnection: Exception while reading SendingConnection >>>>> ... java.nio.channels.ClosedChannelException >>>>> (^ guessing that is symptom of something else) >>>>> >>>>> WARN BlockManagerMasterActor: Removing BlockManager >>>>> BlockManagerId(...) with no recent heart beats: 120030ms exceeds 120000ms >>>>> (^ guessing that is symptom of something else) >>>>> >>>>> ERROR ActorSystemImpl: Uncaught fatal error from thread (...) shutting >>>>> down ActorSystem [sparkDriver] >>>>> *java.lang.OutOfMemoryError: GC overhead limit exceeded* >>>>> >>>>> >>>>> >>>>> Other times I will get messages about "executor lost..." about 1 >>>>> message per second, after ~~50k tasks complete, until there are almost no >>>>> executors left and progress slows to nothing. >>>>> >>>>> I ran with verbose GC info; I do see failing yarn containers that have >>>>> multiple (like 30) "Full GC" messages but I don't know how to interpret if >>>>> that is the problem. Typical Full GC time taken seems ok: [Times: >>>>> user=23.30 sys=0.06, real=1.94 secs] >>>>> >>>>> >>>>> >>>>> Suggestions, please? >>>>> >>>>> Huge thanks for useful suggestions, >>>>> Arun >>>>> >>>> >>>> >>> >