Re: Problem getting program to run on 15TB input

2015-06-09 Thread Arun Luthra
; Burak Yavuz; user@spark.apache.org *Subject:* Re: Problem getting program to run on 15TB input Sometimes a large number of partitions leads to memory problems. Something like val rdd1 = sc.textFile(file1).coalesce(500). ... val rdd2 = sc.textFile(file2).coalesce(500). ... may help

RE: Problem getting program to run on 15TB input

2015-06-06 Thread Kapil Malik
2015 15:42 To: Arun Luthra Cc: Aaron Davidson; Paweł Szulc; Burak Yavuz; user@spark.apache.org Subject: Re: Problem getting program to run on 15TB input Sometimes a large number of partitions leads to memory problems. Something like val rdd1 = sc.textFile(file1).coalesce(500). ... val rdd2

Re: Problem getting program to run on 15TB input

2015-04-13 Thread Daniel Mahler
Sometimes a large number of partitions leads to memory problems. Something like val rdd1 = sc.textFile(file1).coalesce(500). ... val rdd2 = sc.textFile(file2).coalesce(500). ... may help. On Mon, Mar 2, 2015 at 6:26 PM, Arun Luthra arun.lut...@gmail.com wrote: Everything works smoothly if I

Re: Problem getting program to run on 15TB input

2015-03-02 Thread Arun Luthra
Everything works smoothly if I do the 99%-removal filter in Hive first. So, all the baggage from garbage collection was breaking it. Is there a way to filter() out 99% of the data without having to garbage collect 99% of the RDD? On Sun, Mar 1, 2015 at 9:56 AM, Arun Luthra arun.lut...@gmail.com

Re: Problem getting program to run on 15TB input

2015-03-01 Thread Arun Luthra
I tried a shorter simper version of the program, with just 1 RDD, essentially it is: sc.textFile(..., N).map().filter().map( blah = (id, 1L)).reduceByKey().saveAsTextFile(...) Here is a typical GC log trace from one of the yarn container logs: 54.040: [GC [PSYoungGen:

Re: Problem getting program to run on 15TB input

2015-02-28 Thread Arun Luthra
A correction to my first post: There is also a repartition right before groupByKey to help avoid too-many-open-files error: rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile() On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra

Re: Problem getting program to run on 15TB input

2015-02-28 Thread Aaron Davidson
All stated symptoms are consistent with GC pressure (other nodes timeout trying to connect because of a long stop-the-world), quite possibly due to groupByKey. groupByKey is a very expensive operation as it may bring all the data for a particular partition into memory (in particular, it cannot

Re: Problem getting program to run on 15TB input

2015-02-28 Thread Paweł Szulc
I would first check whether there is any possibility that after doing groupbykey one of the groups does not fit in one of the executors' memory. To back up my theory, instead of doing groupbykey + map try reducebykey + mapvalues. Let me know if that helped. Pawel Szulc http://rabbitonweb.com

Re: Problem getting program to run on 15TB input

2015-02-28 Thread Paweł Szulc
But groupbykey will repartition according to numer of keys as I understand how it works. How do you know that you haven't reached the groupbykey phase? Are you using a profiler or do yoi base that assumption only on logs? sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik arun.lut...@gmail.com

Re: Problem getting program to run on 15TB input

2015-02-28 Thread Arun Luthra
So, actually I am removing the persist for now, because there is significant filtering that happens after calling textFile()... but I will keep that option in mind. I just tried a few different combinations of number of executors, executor memory, and more importantly, number of tasks... *all

Re: Problem getting program to run on 15TB input

2015-02-28 Thread Arun Luthra
The job fails before getting to groupByKey. I see a lot of timeout errors in the yarn logs, like: 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1 attempts akka.pattern.AskTimeoutException: Timed out and 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2

Re: Problem getting program to run on 15TB input

2015-02-28 Thread Arun Luthra
The Spark UI names the line number and name of the operation (repartition in this case) that it is performing. Only if this information is wrong (just a possibility), could it have started groupByKey already. I will try to analyze the amount of skew in the data by using reduceByKey (or simply

Problem getting program to run on 15TB input

2015-02-27 Thread Arun Luthra
My program in pseudocode looks like this: val conf = new SparkConf().setAppName(Test) .set(spark.storage.memoryFraction,0.2) // default 0.6 .set(spark.shuffle.memoryFraction,0.12) // default 0.2 .set(spark.shuffle.manager,SORT) // preferred setting for optimized joins

Re: Problem getting program to run on 15TB input

2015-02-27 Thread Burak Yavuz
Hi, Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER` generates many small objects that lead to very long GC time, causing the executor losts, heartbeat not received, and GC overhead limit exceeded messages. Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can also