[ 
https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14645913#comment-14645913
 ] 

Alex Slusarenko edited comment on SPARK-1239 at 7/29/15 12:00 PM:
------------------------------------------------------------------

Hi, all. We have faced this issue many times. And I've seen about a dozen 
unanswered mailing lists where guys saw this problem. Currently, we have 250 
000 map tasks and the same amount of reduce tasks. We have 200 slave nodes. The 
driver has 80 GB RAM. First we observed akka frame size limit exception and 
after increasing the limit we see OOM. Here is the corresponding part of the 
log:
{noformat}
...
15/07/27 17:22:56 INFO TaskSchedulerImpl: Adding task set 1.0 with 262144 tasks
15/07/27 17:22:57 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 
233766, 10.47.190.240, PROCESS_LOCAL, 1215 bytes)
15/07/27 17:22:57 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 
233767, 10.145.26.133, PROCESS_LOCAL, 1215 bytes)
15/07/27 17:22:57 INFO TaskSetManager: Starting task 2.0 in stage 1.0 (TID 
233768, 10.51.191.206, PROCESS_LOCAL, 1215 bytes)
...
15/07/27 17:22:57 INFO TaskSetManager: Starting task 3197.0 in stage 1.0 (TID 
236963, 10.99.197.178, PROCESS_LOCAL, 1215 bytes)
15/07/27 17:22:57 INFO TaskSetManager: Starting task 3198.0 in stage 1.0 (TID 
236964, 10.65.148.16, PROCESS_LOCAL, 1215 bytes)
15/07/27 17:22:57 INFO TaskSetManager: Starting task 3199.0 in stage 1.0 (TID 
236965, 10.123.204.224, PROCESS_LOCAL, 1215 bytes)
15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 
10.145.30.250:38441 (size: 3.8 KB, free: 4.1 GB)
15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 
10.140.170.222:35810 (size: 3.8 KB, free: 4.1 GB)
15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 
10.7.205.149:43761 (size: 3.8 KB, free: 4.1 GB)
...
5/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 
10.165.146.7:37388 (size: 3.8 KB, free: 4.1 GB)
15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 
10.153.254.79:49517 (size: 3.8 KB, free: 4.1 GB)
15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 
10.95.198.154:53675 (size: 3.8 KB, free: 4.1 GB)
15/07/27 17:24:41 INFO MapOutputTrackerMaster: Size of output statuses for 
shuffle 0 is 166509346 bytes
15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output 
locations for shuffle 0 to 10.109.157.235:39740
15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output 
locations for shuffle 0 to 10.166.156.78:59382
15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output 
locations for shuffle 0 to 10.152.41.131:47968
...
15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output 
locations for shuffle 0 to 10.140.253.251:44621
15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output 
locations for shuffle 0 to 10.153.254.79:42648
15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output 
locations for shuffle 0 to 10.169.230.246:45473
15/07/27 17:25:31 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 
10.146.43.5:49989 in memory (size: 3.2 KB, free: 3.4 GB)
15/07/27 17:27:25 ERROR ActorSystemImpl: Uncaught fatal error from thread 
[sparkDriver-akka.remote.default-remote-dispatcher-47] shutting down 
ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: Java heap space
        at java.util.Arrays.copyOf(Arrays.java:3236)
        at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
        at 
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
        at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
        at 
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
        at 
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at 
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)
        at 
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
        at 
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
        at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
        at 
akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845)
        at 
akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:844)
        at akka.remote.EndpointWriter.writeSend(Endpoint.scala:747)
        at akka.remote.EndpointWriter$$anonfun$2.applyOrElse(Endpoint.scala:722)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:415)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
        at akka.dispatch.Mailbox.run(Mailbox.scala:220)
        at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
...
{noformat}

Do you need any additional info?


was (Author: alyaxey):
Hi, all. We have faced this issue many times. Currently, we have 250 000 map 
tasks and the same amount of reduce tasks. We have 200 slave nodes. The driver 
has 80 GB RAM. First we observed akka frame size limit exception and after 
increasing the limit we see OOM. Here is the corresponding part of the log:
{noformat}
...
15/07/27 17:22:56 INFO TaskSchedulerImpl: Adding task set 1.0 with 262144 tasks
15/07/27 17:22:57 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 
233766, 10.47.190.240, PROCESS_LOCAL, 1215 bytes)
15/07/27 17:22:57 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 
233767, 10.145.26.133, PROCESS_LOCAL, 1215 bytes)
15/07/27 17:22:57 INFO TaskSetManager: Starting task 2.0 in stage 1.0 (TID 
233768, 10.51.191.206, PROCESS_LOCAL, 1215 bytes)
...
15/07/27 17:22:57 INFO TaskSetManager: Starting task 3197.0 in stage 1.0 (TID 
236963, 10.99.197.178, PROCESS_LOCAL, 1215 bytes)
15/07/27 17:22:57 INFO TaskSetManager: Starting task 3198.0 in stage 1.0 (TID 
236964, 10.65.148.16, PROCESS_LOCAL, 1215 bytes)
15/07/27 17:22:57 INFO TaskSetManager: Starting task 3199.0 in stage 1.0 (TID 
236965, 10.123.204.224, PROCESS_LOCAL, 1215 bytes)
15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 
10.145.30.250:38441 (size: 3.8 KB, free: 4.1 GB)
15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 
10.140.170.222:35810 (size: 3.8 KB, free: 4.1 GB)
15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 
10.7.205.149:43761 (size: 3.8 KB, free: 4.1 GB)
...
5/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 
10.165.146.7:37388 (size: 3.8 KB, free: 4.1 GB)
15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 
10.153.254.79:49517 (size: 3.8 KB, free: 4.1 GB)
15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 
10.95.198.154:53675 (size: 3.8 KB, free: 4.1 GB)
15/07/27 17:24:41 INFO MapOutputTrackerMaster: Size of output statuses for 
shuffle 0 is 166509346 bytes
15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output 
locations for shuffle 0 to 10.109.157.235:39740
15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output 
locations for shuffle 0 to 10.166.156.78:59382
15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output 
locations for shuffle 0 to 10.152.41.131:47968
...
15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output 
locations for shuffle 0 to 10.140.253.251:44621
15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output 
locations for shuffle 0 to 10.153.254.79:42648
15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output 
locations for shuffle 0 to 10.169.230.246:45473
15/07/27 17:25:31 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 
10.146.43.5:49989 in memory (size: 3.2 KB, free: 3.4 GB)
15/07/27 17:27:25 ERROR ActorSystemImpl: Uncaught fatal error from thread 
[sparkDriver-akka.remote.default-remote-dispatcher-47] shutting down 
ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: Java heap space
        at java.util.Arrays.copyOf(Arrays.java:3236)
        at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
        at 
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
        at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
        at 
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
        at 
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at 
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)
        at 
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
        at 
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
        at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
        at 
akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845)
        at 
akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:844)
        at akka.remote.EndpointWriter.writeSend(Endpoint.scala:747)
        at akka.remote.EndpointWriter$$anonfun$2.applyOrElse(Endpoint.scala:722)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:415)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
        at akka.dispatch.Mailbox.run(Mailbox.scala:220)
        at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
...
{noformat}

Do you need any additional info?

> Don't fetch all map output statuses at each reducer during shuffles
> -------------------------------------------------------------------
>
>                 Key: SPARK-1239
>                 URL: https://issues.apache.org/jira/browse/SPARK-1239
>             Project: Spark
>          Issue Type: Improvement
>          Components: Shuffle, Spark Core
>    Affects Versions: 1.0.2, 1.1.0
>            Reporter: Patrick Wendell
>
> Instead we should modify the way we fetch map output statuses to take both a 
> mapper and a reducer - or we should just piggyback the statuses on each task. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to