So, actually I am removing the persist for now, because there is
significant filtering that happens after calling textFile()... but I will
keep that option in mind.

I just tried a few different combinations of number of executors, executor
memory, and more importantly, number of tasks... *all three times it failed
when approximately 75.1% of the tasks were completed (no matter how many
tasks resulted from repartitioning the data in textfile(..., N))*. Surely
this is a strong clue to something?



On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz <brk...@gmail.com> wrote:

> Hi,
>
> Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER` generates
> many small objects that lead to very long GC time, causing the executor
> losts, heartbeat not received, and GC overhead limit exceeded messages.
> Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can also
> try `OFF_HEAP` (and use Tachyon).
>
> Burak
>
> On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra <arun.lut...@gmail.com>
> wrote:
>
>> My program in pseudocode looks like this:
>>
>>     val conf = new SparkConf().setAppName("Test")
>>       .set("spark.storage.memoryFraction","0.2") // default 0.6
>>       .set("spark.shuffle.memoryFraction","0.12") // default 0.2
>>       .set("spark.shuffle.manager","SORT") // preferred setting for
>> optimized joins
>>       .set("spark.shuffle.consolidateFiles","true") // helpful for "too
>> many files open"
>>       .set("spark.mesos.coarse", "true") // helpful for MapOutputTracker
>> errors?
>>       .set("spark.akka.frameSize","500") // helpful when using
>> consildateFiles=true
>>       .set("spark.akka.askTimeout", "30")
>>       .set("spark.shuffle.compress","false") //
>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>>       .set("spark.file.transferTo","false") //
>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>>       .set("spark.core.connection.ack.wait.timeout","600") //
>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>>       .set("spark.speculation","true")
>>       .set("spark.worker.timeout","600") //
>> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
>>       .set("spark.akka.timeout","300") //
>> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
>>       .set("spark.storage.blockManagerSlaveTimeoutMs","120000")
>>       .set("spark.driver.maxResultSize","2048") // in response to error:
>> Total size of serialized results of 39901 tasks (1024.0 MB) is bigger than
>> spark.driver.maxResultSize (1024.0 MB)
>>       .set("spark.serializer",
>> "org.apache.spark.serializer.KryoSerializer")
>>       .set("spark.kryo.registrator","com.att.bdcoe.cip.ooh.MyRegistrator")
>>       .set("spark.kryo.registrationRequired", "true")
>>
>> val rdd1 = 
>> sc.textFile(file1).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split("\\|",
>> -1)...filter(...)
>>
>> val rdd2 =
>> sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split("\\|",
>> -1)...filter(...)
>>
>>
>> rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).saveAsTextFile()
>>
>>
>> I run the code with:
>>   --num-executors 500 \
>>   --driver-memory 20g \
>>   --executor-memory 20g \
>>   --executor-cores 32 \
>>
>>
>> I'm using kryo serialization on everything, including broadcast
>> variables.
>>
>> Spark creates 145k tasks, and the first stage includes everything before
>> groupByKey(). It fails before getting to groupByKey. I have tried doubling
>> and tripling the number of partitions when calling textFile, with no
>> success.
>>
>> Very similar code (trivial changes, to accomodate different input) worked
>> on a smaller input (~8TB)... Not that it was easy to get that working.
>>
>>
>>
>> Errors vary, here is what I am getting right now:
>>
>> ERROR SendingConnection: Exception while reading SendingConnection
>> ... java.nio.channels.ClosedChannelException
>> (^ guessing that is symptom of something else)
>>
>> WARN BlockManagerMasterActor: Removing BlockManager
>> BlockManagerId(...) with no recent heart beats: 120030ms exceeds 120000ms
>> (^ guessing that is symptom of something else)
>>
>> ERROR ActorSystemImpl: Uncaught fatal error from thread (...) shutting
>> down ActorSystem [sparkDriver]
>> *java.lang.OutOfMemoryError: GC overhead limit exceeded*
>>
>>
>>
>> Other times I will get messages about "executor lost..." about 1 message
>> per second, after ~~50k tasks complete, until there are almost no executors
>> left and progress slows to nothing.
>>
>> I ran with verbose GC info; I do see failing yarn containers that have
>> multiple (like 30) "Full GC" messages but I don't know how to interpret if
>> that is the problem. Typical Full GC time taken seems ok: [Times:
>> user=23.30 sys=0.06, real=1.94 secs]
>>
>>
>>
>> Suggestions, please?
>>
>> Huge thanks for useful suggestions,
>> Arun
>>
>
>

Reply via email to