Sometimes a large number of partitions leads to memory problems.
Something like

val rdd1 = sc.textFile(file1).coalesce(500). ...
val rdd2 = sc.textFile(file2).coalesce(500). ...

may help.


On Mon, Mar 2, 2015 at 6:26 PM, Arun Luthra <arun.lut...@gmail.com> wrote:

> Everything works smoothly if I do the 99%-removal filter in Hive first.
> So, all the baggage from garbage collection was breaking it.
>
> Is there a way to filter() out 99% of the data without having to garbage
> collect 99% of the RDD?
>
> On Sun, Mar 1, 2015 at 9:56 AM, Arun Luthra <arun.lut...@gmail.com> wrote:
>
>> I tried a shorter simper version of the program, with just 1 RDD,
>>  essentially it is:
>>
>> sc.textFile(..., N).map().filter().map( blah => (id,
>> 1L)).reduceByKey().saveAsTextFile(...)
>>
>> Here is a typical GC log trace from one of the yarn container logs:
>>
>> 54.040: [GC [PSYoungGen: 9176064K->28206K(10704896K)]
>> 9176064K->28278K(35171840K), 0.0234420 secs] [Times: user=0.15 sys=0.01,
>> real=0.02 secs]
>> 77.864: [GC [PSYoungGen: 9204270K->150553K(10704896K)]
>> 9204342K->150641K(35171840K), 0.0423020 secs] [Times: user=0.30 sys=0.26,
>> real=0.04 secs]
>> 79.485: [GC [PSYoungGen: 9326617K->333519K(10704896K)]
>> 9326705K->333615K(35171840K), 0.0774990 secs] [Times: user=0.35 sys=1.28,
>> real=0.08 secs]
>> 92.974: [GC [PSYoungGen: 9509583K->193370K(10704896K)]
>> 9509679K->193474K(35171840K), 0.0241590 secs] [Times: user=0.35 sys=0.11,
>> real=0.02 secs]
>> 114.842: [GC [PSYoungGen: 9369434K->123577K(10704896K)]
>> 9369538K->123689K(35171840K), 0.0201000 secs] [Times: user=0.31 sys=0.00,
>> real=0.02 secs]
>> 117.277: [GC [PSYoungGen: 9299641K->135459K(11918336K)]
>> 9299753K->135579K(36385280K), 0.0244820 secs] [Times: user=0.19 sys=0.25,
>> real=0.02 secs]
>>
>> So ~9GB is getting GC'ed every few seconds. Which seems like a lot.
>>
>> Question: The filter() is removing 99% of the data. Does this 99% of the
>> data get GC'ed?
>>
>> Now, I was able to finally get to reduceByKey() by reducing the number of
>> executor-cores (to 2), based on suggestions at
>> http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-OutOfMemoryError-java-lang-OutOfMemoryError-GC-overhead-limit-exceeded-td9036.html
>> . This makes everything before reduceByKey() run pretty smoothly.
>>
>> I ran this with more executor-memory and less executors (most important
>> thing was fewer executor-cores):
>>
>> --num-executors 150 \
>> --driver-memory 15g \
>> --executor-memory 110g \
>> --executor-cores 32 \
>>
>> But then, reduceByKey() fails with:
>>
>> java.lang.OutOfMemoryError: Java heap space
>>
>>
>>
>>
>> On Sat, Feb 28, 2015 at 12:09 PM, Arun Luthra <arun.lut...@gmail.com>
>> wrote:
>>
>>> The Spark UI names the line number and name of the operation
>>> (repartition in this case) that it is performing. Only if this information
>>> is wrong (just a possibility), could it have started groupByKey already.
>>>
>>> I will try to analyze the amount of skew in the data by using
>>> reduceByKey (or simply countByKey) which is relatively inexpensive. For the
>>> purposes of this algorithm I can simply log and remove keys with huge
>>> counts, before doing groupByKey.
>>>
>>> On Sat, Feb 28, 2015 at 11:38 AM, Aaron Davidson <ilike...@gmail.com>
>>> wrote:
>>>
>>>> All stated symptoms are consistent with GC pressure (other nodes
>>>> timeout trying to connect because of a long stop-the-world), quite possibly
>>>> due to groupByKey. groupByKey is a very expensive operation as it may bring
>>>> all the data for a particular partition into memory (in particular, it
>>>> cannot spill values for a single key, so if you have a single very skewed
>>>> key you can get behavior like this).
>>>>
>>>> On Sat, Feb 28, 2015 at 11:33 AM, Paweł Szulc <paul.sz...@gmail.com>
>>>> wrote:
>>>>
>>>>> But groupbykey will repartition according to numer of keys as I
>>>>> understand how it works. How do you know that you haven't reached the
>>>>> groupbykey phase? Are you using a profiler or do yoi base that assumption
>>>>> only on logs?
>>>>>
>>>>> sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik <
>>>>> arun.lut...@gmail.com> napisał:
>>>>>
>>>>> A correction to my first post:
>>>>>>
>>>>>> There is also a repartition right before groupByKey to help avoid
>>>>>> too-many-open-files error:
>>>>>>
>>>>>>
>>>>>> rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile()
>>>>>>
>>>>>> On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra <arun.lut...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> The job fails before getting to groupByKey.
>>>>>>>
>>>>>>> I see a lot of timeout errors in the yarn logs, like:
>>>>>>>
>>>>>>> 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1
>>>>>>> attempts
>>>>>>> akka.pattern.AskTimeoutException: Timed out
>>>>>>>
>>>>>>> and
>>>>>>>
>>>>>>> 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2
>>>>>>> attempts
>>>>>>> java.util.concurrent.TimeoutException: Futures timed out after [30
>>>>>>> seconds]
>>>>>>>
>>>>>>> and some of these are followed by:
>>>>>>>
>>>>>>> 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend:
>>>>>>> Driver Disassociated [akka.tcp://sparkExecutor@...] ->
>>>>>>> [akka.tcp://sparkDriver@...] disassociated! Shutting down.
>>>>>>> 15/02/28 12:48:02 ERROR executor.Executor: Exception in task
>>>>>>> 421027.0 in stage 1.0 (TID 336601)
>>>>>>> java.io.FileNotFoundException:
>>>>>>> ..../hadoop/yarn/local/......../spark-local-20150228123450-3a71/36/shuffle_0_421027_0
>>>>>>> (No such file or directory)
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc <paul.sz...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I would first check whether  there is any possibility that after
>>>>>>>> doing groupbykey one of the groups does not fit in one of the 
>>>>>>>> executors'
>>>>>>>> memory.
>>>>>>>>
>>>>>>>> To back up my theory, instead of doing groupbykey + map try
>>>>>>>> reducebykey + mapvalues.
>>>>>>>>
>>>>>>>> Let me know if that helped.
>>>>>>>>
>>>>>>>> Pawel Szulc
>>>>>>>> http://rabbitonweb.com
>>>>>>>>
>>>>>>>> sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik <
>>>>>>>> arun.lut...@gmail.com> napisał:
>>>>>>>>
>>>>>>>> So, actually I am removing the persist for now, because there is
>>>>>>>>> significant filtering that happens after calling textFile()... but I 
>>>>>>>>> will
>>>>>>>>> keep that option in mind.
>>>>>>>>>
>>>>>>>>> I just tried a few different combinations of number of executors,
>>>>>>>>> executor memory, and more importantly, number of tasks... *all
>>>>>>>>> three times it failed when approximately 75.1% of the tasks were 
>>>>>>>>> completed
>>>>>>>>> (no matter how many tasks resulted from repartitioning the data in
>>>>>>>>> textfile(..., N))*. Surely this is a strong clue to something?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz <brk...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER`
>>>>>>>>>> generates many small objects that lead to very long GC time, causing 
>>>>>>>>>> the
>>>>>>>>>> executor losts, heartbeat not received, and GC overhead limit 
>>>>>>>>>> exceeded
>>>>>>>>>> messages.
>>>>>>>>>> Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You
>>>>>>>>>> can also try `OFF_HEAP` (and use Tachyon).
>>>>>>>>>>
>>>>>>>>>> Burak
>>>>>>>>>>
>>>>>>>>>> On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra <
>>>>>>>>>> arun.lut...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> My program in pseudocode looks like this:
>>>>>>>>>>>
>>>>>>>>>>>     val conf = new SparkConf().setAppName("Test")
>>>>>>>>>>>       .set("spark.storage.memoryFraction","0.2") // default 0.6
>>>>>>>>>>>       .set("spark.shuffle.memoryFraction","0.12") // default 0.2
>>>>>>>>>>>       .set("spark.shuffle.manager","SORT") // preferred setting
>>>>>>>>>>> for optimized joins
>>>>>>>>>>>       .set("spark.shuffle.consolidateFiles","true") // helpful
>>>>>>>>>>> for "too many files open"
>>>>>>>>>>>       .set("spark.mesos.coarse", "true") // helpful for
>>>>>>>>>>> MapOutputTracker errors?
>>>>>>>>>>>       .set("spark.akka.frameSize","500") // helpful when using
>>>>>>>>>>> consildateFiles=true
>>>>>>>>>>>       .set("spark.akka.askTimeout", "30")
>>>>>>>>>>>       .set("spark.shuffle.compress","false") //
>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>>>>>>>>>>>       .set("spark.file.transferTo","false") //
>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>>>>>>>>>>>       .set("spark.core.connection.ack.wait.timeout","600") //
>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>>>>>>>>>>>       .set("spark.speculation","true")
>>>>>>>>>>>       .set("spark.worker.timeout","600") //
>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
>>>>>>>>>>>       .set("spark.akka.timeout","300") //
>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
>>>>>>>>>>>       .set("spark.storage.blockManagerSlaveTimeoutMs","120000")
>>>>>>>>>>>       .set("spark.driver.maxResultSize","2048") // in response
>>>>>>>>>>> to error: Total size of serialized results of 39901 tasks (1024.0 
>>>>>>>>>>> MB) is
>>>>>>>>>>> bigger than spark.driver.maxResultSize (1024.0 MB)
>>>>>>>>>>>       .set("spark.serializer",
>>>>>>>>>>> "org.apache.spark.serializer.KryoSerializer")
>>>>>>>>>>>
>>>>>>>>>>> .set("spark.kryo.registrator","com.att.bdcoe.cip.ooh.MyRegistrator")
>>>>>>>>>>>       .set("spark.kryo.registrationRequired", "true")
>>>>>>>>>>>
>>>>>>>>>>> val rdd1 = sc.textFile(file1).persist(StorageLevel
>>>>>>>>>>> .MEMORY_AND_DISK_SER).map(_.split("\\|", -1)...filter(...)
>>>>>>>>>>>
>>>>>>>>>>> val rdd2 =
>>>>>>>>>>> sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split("\\|",
>>>>>>>>>>> -1)...filter(...)
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).saveAsTextFile()
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> I run the code with:
>>>>>>>>>>>   --num-executors 500 \
>>>>>>>>>>>   --driver-memory 20g \
>>>>>>>>>>>   --executor-memory 20g \
>>>>>>>>>>>   --executor-cores 32 \
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> I'm using kryo serialization on everything, including broadcast
>>>>>>>>>>> variables.
>>>>>>>>>>>
>>>>>>>>>>> Spark creates 145k tasks, and the first stage includes
>>>>>>>>>>> everything before groupByKey(). It fails before getting to 
>>>>>>>>>>> groupByKey. I
>>>>>>>>>>> have tried doubling and tripling the number of partitions when 
>>>>>>>>>>> calling
>>>>>>>>>>> textFile, with no success.
>>>>>>>>>>>
>>>>>>>>>>> Very similar code (trivial changes, to accomodate different
>>>>>>>>>>> input) worked on a smaller input (~8TB)... Not that it was easy to 
>>>>>>>>>>> get that
>>>>>>>>>>> working.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Errors vary, here is what I am getting right now:
>>>>>>>>>>>
>>>>>>>>>>> ERROR SendingConnection: Exception while reading
>>>>>>>>>>> SendingConnection ... java.nio.channels.ClosedChannelException
>>>>>>>>>>> (^ guessing that is symptom of something else)
>>>>>>>>>>>
>>>>>>>>>>> WARN BlockManagerMasterActor: Removing BlockManager
>>>>>>>>>>> BlockManagerId(...) with no recent heart beats: 120030ms exceeds 
>>>>>>>>>>> 120000ms
>>>>>>>>>>> (^ guessing that is symptom of something else)
>>>>>>>>>>>
>>>>>>>>>>> ERROR ActorSystemImpl: Uncaught fatal error from thread (...)
>>>>>>>>>>> shutting down ActorSystem [sparkDriver]
>>>>>>>>>>> *java.lang.OutOfMemoryError: GC overhead limit exceeded*
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Other times I will get messages about "executor lost..." about 1
>>>>>>>>>>> message per second, after ~~50k tasks complete, until there are 
>>>>>>>>>>> almost no
>>>>>>>>>>> executors left and progress slows to nothing.
>>>>>>>>>>>
>>>>>>>>>>> I ran with verbose GC info; I do see failing yarn containers
>>>>>>>>>>> that have multiple (like 30) "Full GC" messages but I don't know 
>>>>>>>>>>> how to
>>>>>>>>>>> interpret if that is the problem. Typical Full GC time taken seems
>>>>>>>>>>> ok: [Times: user=23.30 sys=0.06, real=1.94 secs]
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Suggestions, please?
>>>>>>>>>>>
>>>>>>>>>>> Huge thanks for useful suggestions,
>>>>>>>>>>> Arun
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>>
>>
>

Reply via email to