Hi Grega,

Did you ever get this figured out?  I'm observing the same issue in Spark
1.0.2.

For me it was after 1.5hr of a large .distinct call, followed by a
.saveAsTextFile()

 14/08/26 20:57:43 INFO executor.CoarseGrainedExecutorBackend: Got assigned
task 18500
 14/08/26 20:57:43 INFO executor.Executor: Running task ID 18500
 14/08/26 20:57:43 INFO storage.BlockManager: Found block broadcast_0
locally
 14/08/26 20:57:43 INFO spark.MapOutputTrackerWorker: Don't have map
outputs for shuffle 0, fetching them
 14/08/26 20:58:13 ERROR executor.Executor: Exception in task ID 18491
 org.apache.spark.SparkException: Error communicating with MapOutputTracker
         at
org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:108)
         at
org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:155)
         at
org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:42)
         at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:65)
         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
         at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
         at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
         at org.apache.spark.scheduler.Task.run(Task.scala:51)
         at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
         at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
         at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
         at java.lang.Thread.run(Thread.java:745)
 Caused by: java.util.concurrent.TimeoutException: Futures timed out after
[30 seconds]
         at
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
         at
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
         at
scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
         at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
         at scala.concurrent.Await$.result(package.scala:107)
         at
org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:105)
         ... 23 more


On Tue, Mar 11, 2014 at 3:07 PM, Grega Kespret <gr...@celtra.com> wrote:

> > Your input data read as RDD may be causing OOM, so thats where you can
> use different memory configuration.
>
> We are not getting any OOM exceptions, just akka future timeouts in
> mapoutputtracker and unsuccessful get of shuffle outputs, therefore
> refetching them.
>
> What is the industry practice when going about debugging such errors?
>
> Questions:
> - why are mapoutputtrackers timing out? ( and how to debug this properly?)
> - what is the task/purpose of mapoutputtracker?
> - how to check per-task objects size?
>
> Thanks,
> Grega
>
> On 11 Mar 2014, at 18:43, Mayur Rustagi <mayur.rust...@gmail.com> wrote:
>
> Shuffle data is always stored on disk, its unlikely to cause OOM. Your
> input data read as RDD may be causing OOM, so thats where you can use
> different memory configuration.
>
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>
>
>
> On Tue, Mar 11, 2014 at 9:20 AM, sparrow <do...@celtra.com> wrote:
>
>> I don't understand how exactly will that help. There are no persisted
>> RDD's in storage. Our input data is ~ 100GB, but output of the flatMap is
>> ~40Mb. The small RDD is then persisted.
>>
>> Memory configuration should not affect shuffle data if I understand you
>> correctly?
>>
>>
>>
>>
>> On Tue, Mar 11, 2014 at 4:52 PM, Mayur Rustagi [via Apache Spark User
>> List] <[hidden email] <http://user/SendEmail.jtp?type=node&node=2537&i=0>
>> > wrote:
>>
>>> Shuffle data is not kept in memory. Did you try additional memory
>>> configurations(
>>> https://spark.incubator.apache.org/docs/latest/scala-programming-guide.html#rdd-persistence
>>> )
>>>
>>> Mayur Rustagi
>>> Ph: <a href="tel:%2B1%20%28760%29%20203%203257" value="+17602033257"
>>> target="_blank">+1 (760) 203 3257
>>> http://www.sigmoidanalytics.com
>>> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>>>
>>>
>>>
>>> On Tue, Mar 11, 2014 at 8:35 AM, Domen Grabec <[hidden email]
>>> <http://user/SendEmail.jtp?type=node&node=2534&i=0>> wrote:
>>>
>>>> Hi
>>>>
>>>> I have a spark cluster with 4 workers each with 13GB ram. I would like
>>>> to process a large data set (does not fit in memory) that consists of JSON
>>>> entries. These are the transformations applied:
>>>>
>>>> SparkContext.textFile(s3url). // read files from s3
>>>> keyBy(_.parseJson.id) // key by id that is located in json string
>>>> groupByKey(number_of_group_tasks) //group by id
>>>> flatMap(case (key,lines) => { //do some stuff })
>>>>
>>>> In the web view I can see a key by operation doing a shuffle write. If
>>>> I understand correctly the groupByKey transformation creates a wide RDD
>>>> dependency thus requiring a shuffle write. I have already increased
>>>> spark.akka.askTimeout to 30 seconds and still job fails with errors on
>>>> workers:
>>>>
>>>> Error communicating with MapOutputTracker
>>>>         at
>>>> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:84)
>>>>         at
>>>> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:170)
>>>>         at
>>>> org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:43)
>>>>         at
>>>> org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:59)
>>>>         at
>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>>>>         at
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
>>>>         at
>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>>>>         at
>>>> org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:32)
>>>>         at
>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>>>>         at
>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:71)
>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:224)
>>>>         at
>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:107)
>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:53)
>>>>         at
>>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215)
>>>>         at
>>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:47)
>>>>         at
>>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:46)
>>>>         at java.security.AccessController.doPrivileged(Native Method)
>>>>         at javax.security.auth.Subject.doAs(Subject.java:415)
>>>>         at
>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
>>>>         at
>>>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:46)
>>>>         at
>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182)
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>         at java.lang.Thread.run(Thread.java:724)
>>>> Caused by: java.util.concurrent.TimeoutException: Futures timed out
>>>> after [30000] milliseconds
>>>>         at akka.dispatch.DefaultPromise.ready(Future.scala:870)
>>>>         at akka.dispatch.DefaultPromise.result(Future.scala:874)
>>>>         at akka.dispatch.Await$.result(Future.scala:74)
>>>>         at
>>>> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:81)
>>>>         ... 25 more
>>>>
>>>>
>>>> Before the error I can see this kind of logs:
>>>>
>>>> 14/03/11 14:29:40 INFO MapOutputTracker: Don't have map outputs for
>>>> shuffle 0, fetching them 14/03/11 14:29:40 INFO MapOutputTracker: Don't
>>>> have map outputs for shuffle 0, fetching them 14/03/11 14:29:40 INFO
>>>> MapOutputTracker: Don't have map outputs for shuffle 0, fetching them
>>>>
>>>> Can you please help me understand what is going on? Is the whole
>>>> shuffle write RDD kept in memory and when cluster runs out of memory it
>>>> starts garbage collecting and re fetching from s3?
>>>>
>>>> If this is the case does spark require additional configuration for
>>>> effective shuffle write to disk?
>>>>
>>>> Regards, Domen
>>>>
>>>
>>>
>>>
>>> ------------------------------
>>>  If you reply to this email, your message will be added to the
>>> discussion below:
>>>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Re-Out-of-memory-on-large-RDDs-tp2533p2534.html
>>>  To start a new topic under Apache Spark User List, email [hidden email]
>>> <http://user/SendEmail.jtp?type=node&node=2537&i=1>
>>> To unsubscribe from Apache Spark User List, click here.
>>> NAML
>>> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>
>>
>>
>> ------------------------------
>> View this message in context: Re: Out of memory on large RDDs
>> <http://apache-spark-user-list.1001560.n3.nabble.com/Re-Out-of-memory-on-large-RDDs-tp2533p2537.html>
>> Sent from the Apache Spark User List mailing list archive
>> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>>
>
>

Reply via email to