Hi Grega, Did you ever get this figured out? I'm observing the same issue in Spark 1.0.2.
For me it was after 1.5hr of a large .distinct call, followed by a .saveAsTextFile() 14/08/26 20:57:43 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 18500 14/08/26 20:57:43 INFO executor.Executor: Running task ID 18500 14/08/26 20:57:43 INFO storage.BlockManager: Found block broadcast_0 locally 14/08/26 20:57:43 INFO spark.MapOutputTrackerWorker: Don't have map outputs for shuffle 0, fetching them 14/08/26 20:58:13 ERROR executor.Executor: Exception in task ID 18491 org.apache.spark.SparkException: Error communicating with MapOutputTracker at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:108) at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:155) at org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:42) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:65) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:105) ... 23 more On Tue, Mar 11, 2014 at 3:07 PM, Grega Kespret <gr...@celtra.com> wrote: > > Your input data read as RDD may be causing OOM, so thats where you can > use different memory configuration. > > We are not getting any OOM exceptions, just akka future timeouts in > mapoutputtracker and unsuccessful get of shuffle outputs, therefore > refetching them. > > What is the industry practice when going about debugging such errors? > > Questions: > - why are mapoutputtrackers timing out? ( and how to debug this properly?) > - what is the task/purpose of mapoutputtracker? > - how to check per-task objects size? > > Thanks, > Grega > > On 11 Mar 2014, at 18:43, Mayur Rustagi <mayur.rust...@gmail.com> wrote: > > Shuffle data is always stored on disk, its unlikely to cause OOM. Your > input data read as RDD may be causing OOM, so thats where you can use > different memory configuration. > > Mayur Rustagi > Ph: +1 (760) 203 3257 > http://www.sigmoidanalytics.com > @mayur_rustagi <https://twitter.com/mayur_rustagi> > > > > On Tue, Mar 11, 2014 at 9:20 AM, sparrow <do...@celtra.com> wrote: > >> I don't understand how exactly will that help. There are no persisted >> RDD's in storage. Our input data is ~ 100GB, but output of the flatMap is >> ~40Mb. The small RDD is then persisted. >> >> Memory configuration should not affect shuffle data if I understand you >> correctly? >> >> >> >> >> On Tue, Mar 11, 2014 at 4:52 PM, Mayur Rustagi [via Apache Spark User >> List] <[hidden email] <http://user/SendEmail.jtp?type=node&node=2537&i=0> >> > wrote: >> >>> Shuffle data is not kept in memory. Did you try additional memory >>> configurations( >>> https://spark.incubator.apache.org/docs/latest/scala-programming-guide.html#rdd-persistence >>> ) >>> >>> Mayur Rustagi >>> Ph: <a href="tel:%2B1%20%28760%29%20203%203257" value="+17602033257" >>> target="_blank">+1 (760) 203 3257 >>> http://www.sigmoidanalytics.com >>> @mayur_rustagi <https://twitter.com/mayur_rustagi> >>> >>> >>> >>> On Tue, Mar 11, 2014 at 8:35 AM, Domen Grabec <[hidden email] >>> <http://user/SendEmail.jtp?type=node&node=2534&i=0>> wrote: >>> >>>> Hi >>>> >>>> I have a spark cluster with 4 workers each with 13GB ram. I would like >>>> to process a large data set (does not fit in memory) that consists of JSON >>>> entries. These are the transformations applied: >>>> >>>> SparkContext.textFile(s3url). // read files from s3 >>>> keyBy(_.parseJson.id) // key by id that is located in json string >>>> groupByKey(number_of_group_tasks) //group by id >>>> flatMap(case (key,lines) => { //do some stuff }) >>>> >>>> In the web view I can see a key by operation doing a shuffle write. If >>>> I understand correctly the groupByKey transformation creates a wide RDD >>>> dependency thus requiring a shuffle write. I have already increased >>>> spark.akka.askTimeout to 30 seconds and still job fails with errors on >>>> workers: >>>> >>>> Error communicating with MapOutputTracker >>>> at >>>> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:84) >>>> at >>>> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:170) >>>> at >>>> org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:43) >>>> at >>>> org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:59) >>>> at >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226) >>>> at >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34) >>>> at >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226) >>>> at >>>> org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:32) >>>> at >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237) >>>> at >>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:71) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:224) >>>> at >>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:107) >>>> at org.apache.spark.scheduler.Task.run(Task.scala:53) >>>> at >>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215) >>>> at >>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:47) >>>> at >>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:46) >>>> at java.security.AccessController.doPrivileged(Native Method) >>>> at javax.security.auth.Subject.doAs(Subject.java:415) >>>> at >>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121) >>>> at >>>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:46) >>>> at >>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>> at java.lang.Thread.run(Thread.java:724) >>>> Caused by: java.util.concurrent.TimeoutException: Futures timed out >>>> after [30000] milliseconds >>>> at akka.dispatch.DefaultPromise.ready(Future.scala:870) >>>> at akka.dispatch.DefaultPromise.result(Future.scala:874) >>>> at akka.dispatch.Await$.result(Future.scala:74) >>>> at >>>> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:81) >>>> ... 25 more >>>> >>>> >>>> Before the error I can see this kind of logs: >>>> >>>> 14/03/11 14:29:40 INFO MapOutputTracker: Don't have map outputs for >>>> shuffle 0, fetching them 14/03/11 14:29:40 INFO MapOutputTracker: Don't >>>> have map outputs for shuffle 0, fetching them 14/03/11 14:29:40 INFO >>>> MapOutputTracker: Don't have map outputs for shuffle 0, fetching them >>>> >>>> Can you please help me understand what is going on? Is the whole >>>> shuffle write RDD kept in memory and when cluster runs out of memory it >>>> starts garbage collecting and re fetching from s3? >>>> >>>> If this is the case does spark require additional configuration for >>>> effective shuffle write to disk? >>>> >>>> Regards, Domen >>>> >>> >>> >>> >>> ------------------------------ >>> If you reply to this email, your message will be added to the >>> discussion below: >>> >>> http://apache-spark-user-list.1001560.n3.nabble.com/Re-Out-of-memory-on-large-RDDs-tp2533p2534.html >>> To start a new topic under Apache Spark User List, email [hidden email] >>> <http://user/SendEmail.jtp?type=node&node=2537&i=1> >>> To unsubscribe from Apache Spark User List, click here. >>> NAML >>> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >>> >> >> >> ------------------------------ >> View this message in context: Re: Out of memory on large RDDs >> <http://apache-spark-user-list.1001560.n3.nabble.com/Re-Out-of-memory-on-large-RDDs-tp2533p2537.html> >> Sent from the Apache Spark User List mailing list archive >> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com. >> > >