I would first check whether  there is any possibility that after doing
groupbykey one of the groups does not fit in one of the executors' memory.

To back up my theory, instead of doing groupbykey + map try reducebykey +
mapvalues.

Let me know if that helped.

Pawel Szulc
http://rabbitonweb.com

sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik <arun.lut...@gmail.com>
napisał:

> So, actually I am removing the persist for now, because there is
> significant filtering that happens after calling textFile()... but I will
> keep that option in mind.
>
> I just tried a few different combinations of number of executors, executor
> memory, and more importantly, number of tasks... *all three times it
> failed when approximately 75.1% of the tasks were completed (no matter how
> many tasks resulted from repartitioning the data in textfile(..., N))*.
> Surely this is a strong clue to something?
>
>
>
> On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz <brk...@gmail.com> wrote:
>
>> Hi,
>>
>> Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER`
>> generates many small objects that lead to very long GC time, causing the
>> executor losts, heartbeat not received, and GC overhead limit exceeded
>> messages.
>> Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can also
>> try `OFF_HEAP` (and use Tachyon).
>>
>> Burak
>>
>> On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra <arun.lut...@gmail.com>
>> wrote:
>>
>>> My program in pseudocode looks like this:
>>>
>>>     val conf = new SparkConf().setAppName("Test")
>>>       .set("spark.storage.memoryFraction","0.2") // default 0.6
>>>       .set("spark.shuffle.memoryFraction","0.12") // default 0.2
>>>       .set("spark.shuffle.manager","SORT") // preferred setting for
>>> optimized joins
>>>       .set("spark.shuffle.consolidateFiles","true") // helpful for "too
>>> many files open"
>>>       .set("spark.mesos.coarse", "true") // helpful for MapOutputTracker
>>> errors?
>>>       .set("spark.akka.frameSize","500") // helpful when using
>>> consildateFiles=true
>>>       .set("spark.akka.askTimeout", "30")
>>>       .set("spark.shuffle.compress","false") //
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>>>       .set("spark.file.transferTo","false") //
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>>>       .set("spark.core.connection.ack.wait.timeout","600") //
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>>>       .set("spark.speculation","true")
>>>       .set("spark.worker.timeout","600") //
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
>>>       .set("spark.akka.timeout","300") //
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
>>>       .set("spark.storage.blockManagerSlaveTimeoutMs","120000")
>>>       .set("spark.driver.maxResultSize","2048") // in response to error:
>>> Total size of serialized results of 39901 tasks (1024.0 MB) is bigger than
>>> spark.driver.maxResultSize (1024.0 MB)
>>>       .set("spark.serializer",
>>> "org.apache.spark.serializer.KryoSerializer")
>>>
>>> .set("spark.kryo.registrator","com.att.bdcoe.cip.ooh.MyRegistrator")
>>>       .set("spark.kryo.registrationRequired", "true")
>>>
>>> val rdd1 = 
>>> sc.textFile(file1).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split("\\|",
>>> -1)...filter(...)
>>>
>>> val rdd2 =
>>> sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split("\\|",
>>> -1)...filter(...)
>>>
>>>
>>> rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).saveAsTextFile()
>>>
>>>
>>> I run the code with:
>>>   --num-executors 500 \
>>>   --driver-memory 20g \
>>>   --executor-memory 20g \
>>>   --executor-cores 32 \
>>>
>>>
>>> I'm using kryo serialization on everything, including broadcast
>>> variables.
>>>
>>> Spark creates 145k tasks, and the first stage includes everything before
>>> groupByKey(). It fails before getting to groupByKey. I have tried doubling
>>> and tripling the number of partitions when calling textFile, with no
>>> success.
>>>
>>> Very similar code (trivial changes, to accomodate different input)
>>> worked on a smaller input (~8TB)... Not that it was easy to get that
>>> working.
>>>
>>>
>>>
>>> Errors vary, here is what I am getting right now:
>>>
>>> ERROR SendingConnection: Exception while reading SendingConnection
>>> ... java.nio.channels.ClosedChannelException
>>> (^ guessing that is symptom of something else)
>>>
>>> WARN BlockManagerMasterActor: Removing BlockManager
>>> BlockManagerId(...) with no recent heart beats: 120030ms exceeds 120000ms
>>> (^ guessing that is symptom of something else)
>>>
>>> ERROR ActorSystemImpl: Uncaught fatal error from thread (...) shutting
>>> down ActorSystem [sparkDriver]
>>> *java.lang.OutOfMemoryError: GC overhead limit exceeded*
>>>
>>>
>>>
>>> Other times I will get messages about "executor lost..." about 1 message
>>> per second, after ~~50k tasks complete, until there are almost no executors
>>> left and progress slows to nothing.
>>>
>>> I ran with verbose GC info; I do see failing yarn containers that have
>>> multiple (like 30) "Full GC" messages but I don't know how to interpret if
>>> that is the problem. Typical Full GC time taken seems ok: [Times:
>>> user=23.30 sys=0.06, real=1.94 secs]
>>>
>>>
>>>
>>> Suggestions, please?
>>>
>>> Huge thanks for useful suggestions,
>>> Arun
>>>
>>
>>
>

Reply via email to