So, actually I am removing the persist for now, because there is significant filtering that happens after calling textFile()... but I will keep that option in mind.
I just tried a few different combinations of number of executors, executor memory, and more importantly, number of tasks... *all three times it failed when approximately 75.1% of the tasks were completed (no matter how many tasks resulted from repartitioning the data in textfile(..., N))*. Surely this is a strong clue to something? On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz <brk...@gmail.com> wrote: > Hi, > > Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER` generates > many small objects that lead to very long GC time, causing the executor > losts, heartbeat not received, and GC overhead limit exceeded messages. > Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can also > try `OFF_HEAP` (and use Tachyon). > > Burak > > On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra <arun.lut...@gmail.com> > wrote: > >> My program in pseudocode looks like this: >> >> val conf = new SparkConf().setAppName("Test") >> .set("spark.storage.memoryFraction","0.2") // default 0.6 >> .set("spark.shuffle.memoryFraction","0.12") // default 0.2 >> .set("spark.shuffle.manager","SORT") // preferred setting for >> optimized joins >> .set("spark.shuffle.consolidateFiles","true") // helpful for "too >> many files open" >> .set("spark.mesos.coarse", "true") // helpful for MapOutputTracker >> errors? >> .set("spark.akka.frameSize","500") // helpful when using >> consildateFiles=true >> .set("spark.akka.askTimeout", "30") >> .set("spark.shuffle.compress","false") // >> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html >> .set("spark.file.transferTo","false") // >> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html >> .set("spark.core.connection.ack.wait.timeout","600") // >> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html >> .set("spark.speculation","true") >> .set("spark.worker.timeout","600") // >> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html >> .set("spark.akka.timeout","300") // >> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html >> .set("spark.storage.blockManagerSlaveTimeoutMs","120000") >> .set("spark.driver.maxResultSize","2048") // in response to error: >> Total size of serialized results of 39901 tasks (1024.0 MB) is bigger than >> spark.driver.maxResultSize (1024.0 MB) >> .set("spark.serializer", >> "org.apache.spark.serializer.KryoSerializer") >> .set("spark.kryo.registrator","com.att.bdcoe.cip.ooh.MyRegistrator") >> .set("spark.kryo.registrationRequired", "true") >> >> val rdd1 = >> sc.textFile(file1).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split("\\|", >> -1)...filter(...) >> >> val rdd2 = >> sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split("\\|", >> -1)...filter(...) >> >> >> rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).saveAsTextFile() >> >> >> I run the code with: >> --num-executors 500 \ >> --driver-memory 20g \ >> --executor-memory 20g \ >> --executor-cores 32 \ >> >> >> I'm using kryo serialization on everything, including broadcast >> variables. >> >> Spark creates 145k tasks, and the first stage includes everything before >> groupByKey(). It fails before getting to groupByKey. I have tried doubling >> and tripling the number of partitions when calling textFile, with no >> success. >> >> Very similar code (trivial changes, to accomodate different input) worked >> on a smaller input (~8TB)... Not that it was easy to get that working. >> >> >> >> Errors vary, here is what I am getting right now: >> >> ERROR SendingConnection: Exception while reading SendingConnection >> ... java.nio.channels.ClosedChannelException >> (^ guessing that is symptom of something else) >> >> WARN BlockManagerMasterActor: Removing BlockManager >> BlockManagerId(...) with no recent heart beats: 120030ms exceeds 120000ms >> (^ guessing that is symptom of something else) >> >> ERROR ActorSystemImpl: Uncaught fatal error from thread (...) shutting >> down ActorSystem [sparkDriver] >> *java.lang.OutOfMemoryError: GC overhead limit exceeded* >> >> >> >> Other times I will get messages about "executor lost..." about 1 message >> per second, after ~~50k tasks complete, until there are almost no executors >> left and progress slows to nothing. >> >> I ran with verbose GC info; I do see failing yarn containers that have >> multiple (like 30) "Full GC" messages but I don't know how to interpret if >> that is the problem. Typical Full GC time taken seems ok: [Times: >> user=23.30 sys=0.06, real=1.94 secs] >> >> >> >> Suggestions, please? >> >> Huge thanks for useful suggestions, >> Arun >> > >