[ https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14645913#comment-14645913 ]
Alex Slusarenko edited comment on SPARK-1239 at 7/29/15 12:00 PM: ------------------------------------------------------------------ Hi, all. We have faced this issue many times. And I've seen about a dozen unanswered mailing lists where guys saw this problem. Currently, we have 250 000 map tasks and the same amount of reduce tasks. We have 200 slave nodes. The driver has 80 GB RAM. First we observed akka frame size limit exception and after increasing the limit we see OOM. Here is the corresponding part of the log: {noformat} ... 15/07/27 17:22:56 INFO TaskSchedulerImpl: Adding task set 1.0 with 262144 tasks 15/07/27 17:22:57 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 233766, 10.47.190.240, PROCESS_LOCAL, 1215 bytes) 15/07/27 17:22:57 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 233767, 10.145.26.133, PROCESS_LOCAL, 1215 bytes) 15/07/27 17:22:57 INFO TaskSetManager: Starting task 2.0 in stage 1.0 (TID 233768, 10.51.191.206, PROCESS_LOCAL, 1215 bytes) ... 15/07/27 17:22:57 INFO TaskSetManager: Starting task 3197.0 in stage 1.0 (TID 236963, 10.99.197.178, PROCESS_LOCAL, 1215 bytes) 15/07/27 17:22:57 INFO TaskSetManager: Starting task 3198.0 in stage 1.0 (TID 236964, 10.65.148.16, PROCESS_LOCAL, 1215 bytes) 15/07/27 17:22:57 INFO TaskSetManager: Starting task 3199.0 in stage 1.0 (TID 236965, 10.123.204.224, PROCESS_LOCAL, 1215 bytes) 15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.145.30.250:38441 (size: 3.8 KB, free: 4.1 GB) 15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.140.170.222:35810 (size: 3.8 KB, free: 4.1 GB) 15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.7.205.149:43761 (size: 3.8 KB, free: 4.1 GB) ... 5/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.165.146.7:37388 (size: 3.8 KB, free: 4.1 GB) 15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.153.254.79:49517 (size: 3.8 KB, free: 4.1 GB) 15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.95.198.154:53675 (size: 3.8 KB, free: 4.1 GB) 15/07/27 17:24:41 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 166509346 bytes 15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 10.109.157.235:39740 15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 10.166.156.78:59382 15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 10.152.41.131:47968 ... 15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 10.140.253.251:44621 15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 10.153.254.79:42648 15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 10.169.230.246:45473 15/07/27 17:25:31 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 10.146.43.5:49989 in memory (size: 3.2 KB, free: 3.4 GB) 15/07/27 17:27:25 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.remote.default-remote-dispatcher-47] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:3236) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877) at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129) at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129) at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129) at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36) at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845) at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:844) at akka.remote.EndpointWriter.writeSend(Endpoint.scala:747) at akka.remote.EndpointWriter$$anonfun$2.applyOrElse(Endpoint.scala:722) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:415) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ... {noformat} Do you need any additional info? was (Author: alyaxey): Hi, all. We have faced this issue many times. Currently, we have 250 000 map tasks and the same amount of reduce tasks. We have 200 slave nodes. The driver has 80 GB RAM. First we observed akka frame size limit exception and after increasing the limit we see OOM. Here is the corresponding part of the log: {noformat} ... 15/07/27 17:22:56 INFO TaskSchedulerImpl: Adding task set 1.0 with 262144 tasks 15/07/27 17:22:57 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 233766, 10.47.190.240, PROCESS_LOCAL, 1215 bytes) 15/07/27 17:22:57 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 233767, 10.145.26.133, PROCESS_LOCAL, 1215 bytes) 15/07/27 17:22:57 INFO TaskSetManager: Starting task 2.0 in stage 1.0 (TID 233768, 10.51.191.206, PROCESS_LOCAL, 1215 bytes) ... 15/07/27 17:22:57 INFO TaskSetManager: Starting task 3197.0 in stage 1.0 (TID 236963, 10.99.197.178, PROCESS_LOCAL, 1215 bytes) 15/07/27 17:22:57 INFO TaskSetManager: Starting task 3198.0 in stage 1.0 (TID 236964, 10.65.148.16, PROCESS_LOCAL, 1215 bytes) 15/07/27 17:22:57 INFO TaskSetManager: Starting task 3199.0 in stage 1.0 (TID 236965, 10.123.204.224, PROCESS_LOCAL, 1215 bytes) 15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.145.30.250:38441 (size: 3.8 KB, free: 4.1 GB) 15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.140.170.222:35810 (size: 3.8 KB, free: 4.1 GB) 15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.7.205.149:43761 (size: 3.8 KB, free: 4.1 GB) ... 5/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.165.146.7:37388 (size: 3.8 KB, free: 4.1 GB) 15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.153.254.79:49517 (size: 3.8 KB, free: 4.1 GB) 15/07/27 17:22:57 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.95.198.154:53675 (size: 3.8 KB, free: 4.1 GB) 15/07/27 17:24:41 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 166509346 bytes 15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 10.109.157.235:39740 15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 10.166.156.78:59382 15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 10.152.41.131:47968 ... 15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 10.140.253.251:44621 15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 10.153.254.79:42648 15/07/27 17:24:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 10.169.230.246:45473 15/07/27 17:25:31 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 10.146.43.5:49989 in memory (size: 3.2 KB, free: 3.4 GB) 15/07/27 17:27:25 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.remote.default-remote-dispatcher-47] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:3236) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877) at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129) at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129) at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129) at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36) at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845) at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:844) at akka.remote.EndpointWriter.writeSend(Endpoint.scala:747) at akka.remote.EndpointWriter$$anonfun$2.applyOrElse(Endpoint.scala:722) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:415) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ... {noformat} Do you need any additional info? > Don't fetch all map output statuses at each reducer during shuffles > ------------------------------------------------------------------- > > Key: SPARK-1239 > URL: https://issues.apache.org/jira/browse/SPARK-1239 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core > Affects Versions: 1.0.2, 1.1.0 > Reporter: Patrick Wendell > > Instead we should modify the way we fetch map output statuses to take both a > mapper and a reducer - or we should just piggyback the statuses on each task. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org