; Burak Yavuz; user@spark.apache.org
*Subject:* Re: Problem getting program to run on 15TB input
Sometimes a large number of partitions leads to memory problems.
Something like
val rdd1 = sc.textFile(file1).coalesce(500). ...
val rdd2 = sc.textFile(file2).coalesce(500). ...
may help
2015 15:42
To: Arun Luthra
Cc: Aaron Davidson; Paweł Szulc; Burak Yavuz; user@spark.apache.org
Subject: Re: Problem getting program to run on 15TB input
Sometimes a large number of partitions leads to memory problems.
Something like
val rdd1 = sc.textFile(file1).coalesce(500). ...
val rdd2
Sometimes a large number of partitions leads to memory problems.
Something like
val rdd1 = sc.textFile(file1).coalesce(500). ...
val rdd2 = sc.textFile(file2).coalesce(500). ...
may help.
On Mon, Mar 2, 2015 at 6:26 PM, Arun Luthra arun.lut...@gmail.com wrote:
Everything works smoothly if I
Everything works smoothly if I do the 99%-removal filter in Hive first. So,
all the baggage from garbage collection was breaking it.
Is there a way to filter() out 99% of the data without having to garbage
collect 99% of the RDD?
On Sun, Mar 1, 2015 at 9:56 AM, Arun Luthra arun.lut...@gmail.com
I tried a shorter simper version of the program, with just 1 RDD,
essentially it is:
sc.textFile(..., N).map().filter().map( blah = (id,
1L)).reduceByKey().saveAsTextFile(...)
Here is a typical GC log trace from one of the yarn container logs:
54.040: [GC [PSYoungGen:
A correction to my first post:
There is also a repartition right before groupByKey to help avoid
too-many-open-files error:
rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile()
On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra
All stated symptoms are consistent with GC pressure (other nodes timeout
trying to connect because of a long stop-the-world), quite possibly due to
groupByKey. groupByKey is a very expensive operation as it may bring all
the data for a particular partition into memory (in particular, it cannot
I would first check whether there is any possibility that after doing
groupbykey one of the groups does not fit in one of the executors' memory.
To back up my theory, instead of doing groupbykey + map try reducebykey +
mapvalues.
Let me know if that helped.
Pawel Szulc
http://rabbitonweb.com
But groupbykey will repartition according to numer of keys as I understand
how it works. How do you know that you haven't reached the groupbykey
phase? Are you using a profiler or do yoi base that assumption only on logs?
sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik arun.lut...@gmail.com
So, actually I am removing the persist for now, because there is
significant filtering that happens after calling textFile()... but I will
keep that option in mind.
I just tried a few different combinations of number of executors, executor
memory, and more importantly, number of tasks... *all
The job fails before getting to groupByKey.
I see a lot of timeout errors in the yarn logs, like:
15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1 attempts
akka.pattern.AskTimeoutException: Timed out
and
15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2
The Spark UI names the line number and name of the operation (repartition
in this case) that it is performing. Only if this information is wrong
(just a possibility), could it have started groupByKey already.
I will try to analyze the amount of skew in the data by using reduceByKey
(or simply
My program in pseudocode looks like this:
val conf = new SparkConf().setAppName(Test)
.set(spark.storage.memoryFraction,0.2) // default 0.6
.set(spark.shuffle.memoryFraction,0.12) // default 0.2
.set(spark.shuffle.manager,SORT) // preferred setting for
optimized joins
Hi,
Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER` generates
many small objects that lead to very long GC time, causing the executor
losts, heartbeat not received, and GC overhead limit exceeded messages.
Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can also
14 matches
Mail list logo