I found that the problem was due to garbage collection in filter(). Using
Hive to do the filter solved the problem.

A lot of other problems went away when I upgraded to Spark 1.2.0, which
compresses various task overhead data (HighlyCompressedMapStatus etc.).

It has been running very very smoothly with these two changes.

I'm fairly sure that I tried coalesce(), it resulted into tasks that were
too big, the code has evolved too much to easily double check it now.

On Sat, Jun 6, 2015 at 12:50 AM, Kapil Malik <kma...@adobe.com> wrote:

>  Very interesting and relevant thread for production level usage of spark.
>
>
>
> @Arun, can you kindly confirm if Daniel’s suggestion helped your usecase?
>
>
>
> Thanks,
>
>
>
> Kapil Malik | kma...@adobe.com | 33430 / 8800836581
>
>
>
> *From:* Daniel Mahler [mailto:dmah...@gmail.com]
> *Sent:* 13 April 2015 15:42
> *To:* Arun Luthra
> *Cc:* Aaron Davidson; Paweł Szulc; Burak Yavuz; user@spark.apache.org
> *Subject:* Re: Problem getting program to run on 15TB input
>
>
>
> Sometimes a large number of partitions leads to memory problems.
>
> Something like
>
>
>
> val rdd1 = sc.textFile(file1).coalesce(500). ...
>
> val rdd2 = sc.textFile(file2).coalesce(500). ...
>
>
>
> may help.
>
>
>
>
>
> On Mon, Mar 2, 2015 at 6:26 PM, Arun Luthra <arun.lut...@gmail.com> wrote:
>
> Everything works smoothly if I do the 99%-removal filter in Hive first.
> So, all the baggage from garbage collection was breaking it.
>
>
>
> Is there a way to filter() out 99% of the data without having to garbage
> collect 99% of the RDD?
>
>
>
> On Sun, Mar 1, 2015 at 9:56 AM, Arun Luthra <arun.lut...@gmail.com> wrote:
>
> I tried a shorter simper version of the program, with just 1 RDD,
>  essentially it is:
>
>
>
> sc.textFile(..., N).map().filter().map( blah => (id,
> 1L)).reduceByKey().saveAsTextFile(...)
>
>
>
> Here is a typical GC log trace from one of the yarn container logs:
>
>
>
> 54.040: [GC [PSYoungGen: 9176064K->28206K(10704896K)]
> 9176064K->28278K(35171840K), 0.0234420 secs] [Times: user=0.15 sys=0.01,
> real=0.02 secs]
>
> 77.864: [GC [PSYoungGen: 9204270K->150553K(10704896K)]
> 9204342K->150641K(35171840K), 0.0423020 secs] [Times: user=0.30 sys=0.26,
> real=0.04 secs]
>
> 79.485: [GC [PSYoungGen: 9326617K->333519K(10704896K)]
> 9326705K->333615K(35171840K), 0.0774990 secs] [Times: user=0.35 sys=1.28,
> real=0.08 secs]
>
> 92.974: [GC [PSYoungGen: 9509583K->193370K(10704896K)]
> 9509679K->193474K(35171840K), 0.0241590 secs] [Times: user=0.35 sys=0.11,
> real=0.02 secs]
>
> 114.842: [GC [PSYoungGen: 9369434K->123577K(10704896K)]
> 9369538K->123689K(35171840K), 0.0201000 secs] [Times: user=0.31 sys=0.00,
> real=0.02 secs]
>
> 117.277: [GC [PSYoungGen: 9299641K->135459K(11918336K)]
> 9299753K->135579K(36385280K), 0.0244820 secs] [Times: user=0.19 sys=0.25,
> real=0.02 secs]
>
>
>
> So ~9GB is getting GC'ed every few seconds. Which seems like a lot.
>
>
>
> Question: The filter() is removing 99% of the data. Does this 99% of the
> data get GC'ed?
>
>
>
> Now, I was able to finally get to reduceByKey() by reducing the number of
> executor-cores (to 2), based on suggestions at
> http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-OutOfMemoryError-java-lang-OutOfMemoryError-GC-overhead-limit-exceeded-td9036.html
> . This makes everything before reduceByKey() run pretty smoothly.
>
>
>
> I ran this with more executor-memory and less executors (most important
> thing was fewer executor-cores):
>
>
>
> --num-executors 150 \
>
> --driver-memory 15g \
>
> --executor-memory 110g \
>
> --executor-cores 32 \
>
>
>
> But then, reduceByKey() fails with:
>
> java.lang.OutOfMemoryError: Java heap space
>
>
>
>
>
>
>
>
>
> On Sat, Feb 28, 2015 at 12:09 PM, Arun Luthra <arun.lut...@gmail.com>
> wrote:
>
> The Spark UI names the line number and name of the operation (repartition
> in this case) that it is performing. Only if this information is wrong
> (just a possibility), could it have started groupByKey already.
>
>
>
> I will try to analyze the amount of skew in the data by using reduceByKey
> (or simply countByKey) which is relatively inexpensive. For the purposes of
> this algorithm I can simply log and remove keys with huge counts, before
> doing groupByKey.
>
>
>
> On Sat, Feb 28, 2015 at 11:38 AM, Aaron Davidson <ilike...@gmail.com>
> wrote:
>
> All stated symptoms are consistent with GC pressure (other nodes timeout
> trying to connect because of a long stop-the-world), quite possibly due to
> groupByKey. groupByKey is a very expensive operation as it may bring all
> the data for a particular partition into memory (in particular, it cannot
> spill values for a single key, so if you have a single very skewed key you
> can get behavior like this).
>
>
>
> On Sat, Feb 28, 2015 at 11:33 AM, Paweł Szulc <paul.sz...@gmail.com>
> wrote:
>
> But groupbykey will repartition according to numer of keys as I understand
> how it works. How do you know that you haven't reached the groupbykey
> phase? Are you using a profiler or do yoi base that assumption only on logs?
>
>
>
> sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik <arun.lut...@gmail.com>
> napisał:
>
>
>
> A correction to my first post:
>
>
>
> There is also a repartition right before groupByKey to help avoid
> too-many-open-files error:
>
>
>
>
> rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile()
>
>
>
> On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra <arun.lut...@gmail.com>
> wrote:
>
> The job fails before getting to groupByKey.
>
>
>
> I see a lot of timeout errors in the yarn logs, like:
>
>
>
> 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1 attempts
>
> akka.pattern.AskTimeoutException: Timed out
>
>
>
> and
>
>
>
> 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2 attempts
>
> java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
>
>
>
> and some of these are followed by:
>
>
>
> 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver
> Disassociated [akka.tcp://sparkExecutor@...] -> [akka.tcp://sparkDriver@...]
> disassociated! Shutting down.
>
> 15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0 in
> stage 1.0 (TID 336601)
>
> java.io.FileNotFoundException:
> ..../hadoop/yarn/local/......../spark-local-20150228123450-3a71/36/shuffle_0_421027_0
> (No such file or directory)
>
>
>
>
>
>
>
>
>
> On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc <paul.sz...@gmail.com> wrote:
>
> I would first check whether  there is any possibility that after doing
> groupbykey one of the groups does not fit in one of the executors' memory.
>
> To back up my theory, instead of doing groupbykey + map try reducebykey +
> mapvalues.
>
> Let me know if that helped.
>
> Pawel Szulc
> http://rabbitonweb.com
>
>
>
> sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik <arun.lut...@gmail.com>
> napisał:
>
>
>
> So, actually I am removing the persist for now, because there is
> significant filtering that happens after calling textFile()... but I will
> keep that option in mind.
>
>
>
> I just tried a few different combinations of number of executors, executor
> memory, and more importantly, number of tasks... *all three times it
> failed when approximately 75.1% of the tasks were completed (no matter how
> many tasks resulted from repartitioning the data in textfile(..., N))*.
> Surely this is a strong clue to something?
>
>
>
>
>
>
>
> On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz <brk...@gmail.com> wrote:
>
> Hi,
>
>
>
> Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER` generates
> many small objects that lead to very long GC time, causing the executor
> losts, heartbeat not received, and GC overhead limit exceeded messages.
>
> Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can also
> try `OFF_HEAP` (and use Tachyon).
>
>
>
> Burak
>
>
>
> On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra <arun.lut...@gmail.com>
> wrote:
>
>  My program in pseudocode looks like this:
>
>     val conf = new SparkConf().setAppName("Test")
>
>       .set("spark.storage.memoryFraction","0.2") // default 0.6
>
>       .set("spark.shuffle.memoryFraction","0.12") // default 0.2
>
>       .set("spark.shuffle.manager","SORT") // preferred setting for
> optimized joins
>
>       .set("spark.shuffle.consolidateFiles","true") // helpful for "too
> many files open"
>
>       .set("spark.mesos.coarse", "true") // helpful for MapOutputTracker
> errors?
>
>       .set("spark.akka.frameSize","500") // helpful when using
> consildateFiles=true
>
>       .set("spark.akka.askTimeout", "30")
>
>       .set("spark.shuffle.compress","false") //
> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>
>       .set("spark.file.transferTo","false") //
> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>
>       .set("spark.core.connection.ack.wait.timeout","600") //
> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>
>       .set("spark.speculation","true")
>
>       .set("spark.worker.timeout","600") //
> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
>
>       .set("spark.akka.timeout","300") //
> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
>
>       .set("spark.storage.blockManagerSlaveTimeoutMs","120000")
>
>       .set("spark.driver.maxResultSize","2048") // in response to error:
> Total size of serialized results of 39901 tasks (1024.0 MB) is bigger than
> spark.driver.maxResultSize (1024.0 MB)
>
>       .set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
>
>       .set("spark.kryo.registrator","com.att.bdcoe.cip.ooh.MyRegistrator")
>
>       .set("spark.kryo.registrationRequired", "true")
>
>
>
> val rdd1 =
> sc.textFile(file1).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split("\\|",
> -1)...filter(...)
>
>
>
> val rdd2 =
> sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split("\\|",
> -1)...filter(...)
>
>
>
>
> rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).saveAsTextFile()
>
>
>
>
>
> I run the code with:
>
>   --num-executors 500 \
>
>   --driver-memory 20g \
>
>   --executor-memory 20g \
>
>   --executor-cores 32 \
>
>
>
>
>
> I'm using kryo serialization on everything, including broadcast variables.
>
>
>
> Spark creates 145k tasks, and the first stage includes everything before
> groupByKey(). It fails before getting to groupByKey. I have tried doubling
> and tripling the number of partitions when calling textFile, with no
> success.
>
>
>
> Very similar code (trivial changes, to accomodate different input) worked
> on a smaller input (~8TB)... Not that it was easy to get that working.
>
>
>
>
>
>
>
> Errors vary, here is what I am getting right now:
>
>
>
> ERROR SendingConnection: Exception while reading SendingConnection
> ... java.nio.channels.ClosedChannelException
>
> (^ guessing that is symptom of something else)
>
>
>
> WARN BlockManagerMasterActor: Removing BlockManager
> BlockManagerId(...) with no recent heart beats: 120030ms exceeds 120000ms
>
> (^ guessing that is symptom of something else)
>
>
>
> ERROR ActorSystemImpl: Uncaught fatal error from thread (...) shutting
> down ActorSystem [sparkDriver]
>
> *java.lang.OutOfMemoryError: GC overhead limit exceeded*
>
>
>
>
>
>
>
> Other times I will get messages about "executor lost..." about 1 message
> per second, after ~~50k tasks complete, until there are almost no executors
> left and progress slows to nothing.
>
>
>
> I ran with verbose GC info; I do see failing yarn containers that have
> multiple (like 30) "Full GC" messages but I don't know how to interpret if
> that is the problem. Typical Full GC time taken seems ok: [Times:
> user=23.30 sys=0.06, real=1.94 secs]
>
>
>
>
>
>
>
> Suggestions, please?
>
>
>
> Huge thanks for useful suggestions,
>
> Arun
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>

Reply via email to