The Spark UI names the line number and name of the operation (repartition
in this case) that it is performing. Only if this information is wrong
(just a possibility), could it have started groupByKey already.

I will try to analyze the amount of skew in the data by using reduceByKey
(or simply countByKey) which is relatively inexpensive. For the purposes of
this algorithm I can simply log and remove keys with huge counts, before
doing groupByKey.

On Sat, Feb 28, 2015 at 11:38 AM, Aaron Davidson <ilike...@gmail.com> wrote:

> All stated symptoms are consistent with GC pressure (other nodes timeout
> trying to connect because of a long stop-the-world), quite possibly due to
> groupByKey. groupByKey is a very expensive operation as it may bring all
> the data for a particular partition into memory (in particular, it cannot
> spill values for a single key, so if you have a single very skewed key you
> can get behavior like this).
>
> On Sat, Feb 28, 2015 at 11:33 AM, Paweł Szulc <paul.sz...@gmail.com>
> wrote:
>
>> But groupbykey will repartition according to numer of keys as I
>> understand how it works. How do you know that you haven't reached the
>> groupbykey phase? Are you using a profiler or do yoi base that assumption
>> only on logs?
>>
>> sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik <arun.lut...@gmail.com>
>> napisał:
>>
>> A correction to my first post:
>>>
>>> There is also a repartition right before groupByKey to help avoid
>>> too-many-open-files error:
>>>
>>>
>>> rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile()
>>>
>>> On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra <arun.lut...@gmail.com>
>>> wrote:
>>>
>>>> The job fails before getting to groupByKey.
>>>>
>>>> I see a lot of timeout errors in the yarn logs, like:
>>>>
>>>> 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1
>>>> attempts
>>>> akka.pattern.AskTimeoutException: Timed out
>>>>
>>>> and
>>>>
>>>> 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2
>>>> attempts
>>>> java.util.concurrent.TimeoutException: Futures timed out after [30
>>>> seconds]
>>>>
>>>> and some of these are followed by:
>>>>
>>>> 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver
>>>> Disassociated [akka.tcp://sparkExecutor@...] ->
>>>> [akka.tcp://sparkDriver@...] disassociated! Shutting down.
>>>> 15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0
>>>> in stage 1.0 (TID 336601)
>>>> java.io.FileNotFoundException:
>>>> ..../hadoop/yarn/local/......../spark-local-20150228123450-3a71/36/shuffle_0_421027_0
>>>> (No such file or directory)
>>>>
>>>>
>>>>
>>>>
>>>> On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc <paul.sz...@gmail.com>
>>>> wrote:
>>>>
>>>>> I would first check whether  there is any possibility that after doing
>>>>> groupbykey one of the groups does not fit in one of the executors' memory.
>>>>>
>>>>> To back up my theory, instead of doing groupbykey + map try
>>>>> reducebykey + mapvalues.
>>>>>
>>>>> Let me know if that helped.
>>>>>
>>>>> Pawel Szulc
>>>>> http://rabbitonweb.com
>>>>>
>>>>> sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik <
>>>>> arun.lut...@gmail.com> napisał:
>>>>>
>>>>> So, actually I am removing the persist for now, because there is
>>>>>> significant filtering that happens after calling textFile()... but I will
>>>>>> keep that option in mind.
>>>>>>
>>>>>> I just tried a few different combinations of number of executors,
>>>>>> executor memory, and more importantly, number of tasks... *all three
>>>>>> times it failed when approximately 75.1% of the tasks were completed (no
>>>>>> matter how many tasks resulted from repartitioning the data in
>>>>>> textfile(..., N))*. Surely this is a strong clue to something?
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz <brk...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER`
>>>>>>> generates many small objects that lead to very long GC time, causing the
>>>>>>> executor losts, heartbeat not received, and GC overhead limit exceeded
>>>>>>> messages.
>>>>>>> Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can
>>>>>>> also try `OFF_HEAP` (and use Tachyon).
>>>>>>>
>>>>>>> Burak
>>>>>>>
>>>>>>> On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra <arun.lut...@gmail.com
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> My program in pseudocode looks like this:
>>>>>>>>
>>>>>>>>     val conf = new SparkConf().setAppName("Test")
>>>>>>>>       .set("spark.storage.memoryFraction","0.2") // default 0.6
>>>>>>>>       .set("spark.shuffle.memoryFraction","0.12") // default 0.2
>>>>>>>>       .set("spark.shuffle.manager","SORT") // preferred setting for
>>>>>>>> optimized joins
>>>>>>>>       .set("spark.shuffle.consolidateFiles","true") // helpful for
>>>>>>>> "too many files open"
>>>>>>>>       .set("spark.mesos.coarse", "true") // helpful for
>>>>>>>> MapOutputTracker errors?
>>>>>>>>       .set("spark.akka.frameSize","500") // helpful when using
>>>>>>>> consildateFiles=true
>>>>>>>>       .set("spark.akka.askTimeout", "30")
>>>>>>>>       .set("spark.shuffle.compress","false") //
>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>>>>>>>>       .set("spark.file.transferTo","false") //
>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>>>>>>>>       .set("spark.core.connection.ack.wait.timeout","600") //
>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>>>>>>>>       .set("spark.speculation","true")
>>>>>>>>       .set("spark.worker.timeout","600") //
>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
>>>>>>>>       .set("spark.akka.timeout","300") //
>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
>>>>>>>>       .set("spark.storage.blockManagerSlaveTimeoutMs","120000")
>>>>>>>>       .set("spark.driver.maxResultSize","2048") // in response to
>>>>>>>> error: Total size of serialized results of 39901 tasks (1024.0 MB) is
>>>>>>>> bigger than spark.driver.maxResultSize (1024.0 MB)
>>>>>>>>       .set("spark.serializer",
>>>>>>>> "org.apache.spark.serializer.KryoSerializer")
>>>>>>>>
>>>>>>>> .set("spark.kryo.registrator","com.att.bdcoe.cip.ooh.MyRegistrator")
>>>>>>>>       .set("spark.kryo.registrationRequired", "true")
>>>>>>>>
>>>>>>>> val rdd1 = sc.textFile(file1).persist(StorageLevel
>>>>>>>> .MEMORY_AND_DISK_SER).map(_.split("\\|", -1)...filter(...)
>>>>>>>>
>>>>>>>> val rdd2 =
>>>>>>>> sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split("\\|",
>>>>>>>> -1)...filter(...)
>>>>>>>>
>>>>>>>>
>>>>>>>> rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).saveAsTextFile()
>>>>>>>>
>>>>>>>>
>>>>>>>> I run the code with:
>>>>>>>>   --num-executors 500 \
>>>>>>>>   --driver-memory 20g \
>>>>>>>>   --executor-memory 20g \
>>>>>>>>   --executor-cores 32 \
>>>>>>>>
>>>>>>>>
>>>>>>>> I'm using kryo serialization on everything, including broadcast
>>>>>>>> variables.
>>>>>>>>
>>>>>>>> Spark creates 145k tasks, and the first stage includes everything
>>>>>>>> before groupByKey(). It fails before getting to groupByKey. I have 
>>>>>>>> tried
>>>>>>>> doubling and tripling the number of partitions when calling textFile, 
>>>>>>>> with
>>>>>>>> no success.
>>>>>>>>
>>>>>>>> Very similar code (trivial changes, to accomodate different input)
>>>>>>>> worked on a smaller input (~8TB)... Not that it was easy to get that
>>>>>>>> working.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Errors vary, here is what I am getting right now:
>>>>>>>>
>>>>>>>> ERROR SendingConnection: Exception while reading SendingConnection
>>>>>>>> ... java.nio.channels.ClosedChannelException
>>>>>>>> (^ guessing that is symptom of something else)
>>>>>>>>
>>>>>>>> WARN BlockManagerMasterActor: Removing BlockManager
>>>>>>>> BlockManagerId(...) with no recent heart beats: 120030ms exceeds 
>>>>>>>> 120000ms
>>>>>>>> (^ guessing that is symptom of something else)
>>>>>>>>
>>>>>>>> ERROR ActorSystemImpl: Uncaught fatal error from thread (...)
>>>>>>>> shutting down ActorSystem [sparkDriver]
>>>>>>>> *java.lang.OutOfMemoryError: GC overhead limit exceeded*
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Other times I will get messages about "executor lost..." about 1
>>>>>>>> message per second, after ~~50k tasks complete, until there are almost 
>>>>>>>> no
>>>>>>>> executors left and progress slows to nothing.
>>>>>>>>
>>>>>>>> I ran with verbose GC info; I do see failing yarn containers that
>>>>>>>> have multiple (like 30) "Full GC" messages but I don't know how to
>>>>>>>> interpret if that is the problem. Typical Full GC time taken seems
>>>>>>>> ok: [Times: user=23.30 sys=0.06, real=1.94 secs]
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Suggestions, please?
>>>>>>>>
>>>>>>>> Huge thanks for useful suggestions,
>>>>>>>> Arun
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>>
>

Reply via email to