But groupbykey will repartition according to numer of keys as I understand
how it works. How do you know that you haven't reached the groupbykey
phase? Are you using a profiler or do yoi base that assumption only on logs?

sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik <arun.lut...@gmail.com>
napisał:

> A correction to my first post:
>
> There is also a repartition right before groupByKey to help avoid
> too-many-open-files error:
>
>
> rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile()
>
> On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra <arun.lut...@gmail.com>
> wrote:
>
>> The job fails before getting to groupByKey.
>>
>> I see a lot of timeout errors in the yarn logs, like:
>>
>> 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1 attempts
>> akka.pattern.AskTimeoutException: Timed out
>>
>> and
>>
>> 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2 attempts
>> java.util.concurrent.TimeoutException: Futures timed out after [30
>> seconds]
>>
>> and some of these are followed by:
>>
>> 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver
>> Disassociated [akka.tcp://sparkExecutor@...] -> [akka.tcp://sparkDriver@...]
>> disassociated! Shutting down.
>> 15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0 in
>> stage 1.0 (TID 336601)
>> java.io.FileNotFoundException:
>> ..../hadoop/yarn/local/......../spark-local-20150228123450-3a71/36/shuffle_0_421027_0
>> (No such file or directory)
>>
>>
>>
>>
>> On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc <paul.sz...@gmail.com>
>> wrote:
>>
>>> I would first check whether  there is any possibility that after doing
>>> groupbykey one of the groups does not fit in one of the executors' memory.
>>>
>>> To back up my theory, instead of doing groupbykey + map try reducebykey
>>> + mapvalues.
>>>
>>> Let me know if that helped.
>>>
>>> Pawel Szulc
>>> http://rabbitonweb.com
>>>
>>> sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik <arun.lut...@gmail.com>
>>> napisał:
>>>
>>> So, actually I am removing the persist for now, because there is
>>>> significant filtering that happens after calling textFile()... but I will
>>>> keep that option in mind.
>>>>
>>>> I just tried a few different combinations of number of executors,
>>>> executor memory, and more importantly, number of tasks... *all three
>>>> times it failed when approximately 75.1% of the tasks were completed (no
>>>> matter how many tasks resulted from repartitioning the data in
>>>> textfile(..., N))*. Surely this is a strong clue to something?
>>>>
>>>>
>>>>
>>>> On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz <brk...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER`
>>>>> generates many small objects that lead to very long GC time, causing the
>>>>> executor losts, heartbeat not received, and GC overhead limit exceeded
>>>>> messages.
>>>>> Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can
>>>>> also try `OFF_HEAP` (and use Tachyon).
>>>>>
>>>>> Burak
>>>>>
>>>>> On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra <arun.lut...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> My program in pseudocode looks like this:
>>>>>>
>>>>>>     val conf = new SparkConf().setAppName("Test")
>>>>>>       .set("spark.storage.memoryFraction","0.2") // default 0.6
>>>>>>       .set("spark.shuffle.memoryFraction","0.12") // default 0.2
>>>>>>       .set("spark.shuffle.manager","SORT") // preferred setting for
>>>>>> optimized joins
>>>>>>       .set("spark.shuffle.consolidateFiles","true") // helpful for
>>>>>> "too many files open"
>>>>>>       .set("spark.mesos.coarse", "true") // helpful for
>>>>>> MapOutputTracker errors?
>>>>>>       .set("spark.akka.frameSize","500") // helpful when using
>>>>>> consildateFiles=true
>>>>>>       .set("spark.akka.askTimeout", "30")
>>>>>>       .set("spark.shuffle.compress","false") //
>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>>>>>>       .set("spark.file.transferTo","false") //
>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>>>>>>       .set("spark.core.connection.ack.wait.timeout","600") //
>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>>>>>>       .set("spark.speculation","true")
>>>>>>       .set("spark.worker.timeout","600") //
>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
>>>>>>       .set("spark.akka.timeout","300") //
>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
>>>>>>       .set("spark.storage.blockManagerSlaveTimeoutMs","120000")
>>>>>>       .set("spark.driver.maxResultSize","2048") // in response to
>>>>>> error: Total size of serialized results of 39901 tasks (1024.0 MB) is
>>>>>> bigger than spark.driver.maxResultSize (1024.0 MB)
>>>>>>       .set("spark.serializer",
>>>>>> "org.apache.spark.serializer.KryoSerializer")
>>>>>>
>>>>>> .set("spark.kryo.registrator","com.att.bdcoe.cip.ooh.MyRegistrator")
>>>>>>       .set("spark.kryo.registrationRequired", "true")
>>>>>>
>>>>>> val rdd1 = sc.textFile(file1).persist(StorageLevel
>>>>>> .MEMORY_AND_DISK_SER).map(_.split("\\|", -1)...filter(...)
>>>>>>
>>>>>> val rdd2 =
>>>>>> sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split("\\|",
>>>>>> -1)...filter(...)
>>>>>>
>>>>>>
>>>>>> rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).saveAsTextFile()
>>>>>>
>>>>>>
>>>>>> I run the code with:
>>>>>>   --num-executors 500 \
>>>>>>   --driver-memory 20g \
>>>>>>   --executor-memory 20g \
>>>>>>   --executor-cores 32 \
>>>>>>
>>>>>>
>>>>>> I'm using kryo serialization on everything, including broadcast
>>>>>> variables.
>>>>>>
>>>>>> Spark creates 145k tasks, and the first stage includes everything
>>>>>> before groupByKey(). It fails before getting to groupByKey. I have tried
>>>>>> doubling and tripling the number of partitions when calling textFile, 
>>>>>> with
>>>>>> no success.
>>>>>>
>>>>>> Very similar code (trivial changes, to accomodate different input)
>>>>>> worked on a smaller input (~8TB)... Not that it was easy to get that
>>>>>> working.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Errors vary, here is what I am getting right now:
>>>>>>
>>>>>> ERROR SendingConnection: Exception while reading SendingConnection
>>>>>> ... java.nio.channels.ClosedChannelException
>>>>>> (^ guessing that is symptom of something else)
>>>>>>
>>>>>> WARN BlockManagerMasterActor: Removing BlockManager
>>>>>> BlockManagerId(...) with no recent heart beats: 120030ms exceeds 120000ms
>>>>>> (^ guessing that is symptom of something else)
>>>>>>
>>>>>> ERROR ActorSystemImpl: Uncaught fatal error from thread (...)
>>>>>> shutting down ActorSystem [sparkDriver]
>>>>>> *java.lang.OutOfMemoryError: GC overhead limit exceeded*
>>>>>>
>>>>>>
>>>>>>
>>>>>> Other times I will get messages about "executor lost..." about 1
>>>>>> message per second, after ~~50k tasks complete, until there are almost no
>>>>>> executors left and progress slows to nothing.
>>>>>>
>>>>>> I ran with verbose GC info; I do see failing yarn containers that
>>>>>> have multiple (like 30) "Full GC" messages but I don't know how to
>>>>>> interpret if that is the problem. Typical Full GC time taken seems
>>>>>> ok: [Times: user=23.30 sys=0.06, real=1.94 secs]
>>>>>>
>>>>>>
>>>>>>
>>>>>> Suggestions, please?
>>>>>>
>>>>>> Huge thanks for useful suggestions,
>>>>>> Arun
>>>>>>
>>>>>
>>>>>
>>>>
>>
>

Reply via email to