Hello, My job keeps failing on saveAsTextFile stage (frustrating after a 3 hour run) with an OOM exception. The log is below. I'm running the job on an input of ~8Tb gzipped JSON files, executing on 15 m3.xlarge instances. Executor is given 13Gb memory, and I'm setting two custom preferences in the job: spark.akka.frameSize: 50 (otherwise it fails due to exceeding the limit of 10Mb), spark.storage.memoryFraction: 0.2
Any suggestions? 14/08/21 19:29:26 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-99-160-181.ec2.internal:36962 14/08/21 19:29:31 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 17541459 bytes 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-144-221-26.ec2.internal:49973 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-69-31-121.ec2.internal:34569 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-165-70-221.ec2.internal:49193 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-218-181-93.ec2.internal:57648 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-142-187-230.ec2.internal:48115 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-101-178-68.ec2.internal:51931 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-99-165-121.ec2.internal:38153 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-179-187-182.ec2.internal:55645 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-182-231-107.ec2.internal:54088 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-165-79-9.ec2.internal:40112 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-111-169-138.ec2.internal:40394 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-203-161-222.ec2.internal:47447 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to spark@ip-10-153-141-230.ec2.internal:53906 14/08/21 19:29:32 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [spark-akka.actor.default-dispatcher-20] shutting down ActorSystem [spark] java.lang.OutOfMemoryError: Java heap space at com.google.protobuf_spark.AbstractMessageLite.toByteArray(AbstractMessageLite.java:62) at akka.remote.transport.AkkaPduProtobufCodec$.constructPayload(AkkaPduCodec.scala:145) at akka.remote.transport.AkkaProtocolHandle.write(AkkaProtocolTransport.scala:156) at akka.remote.EndpointWriter$$anonfun$7.applyOrElse(Endpoint.scala:569) at akka.remote.EndpointWriter$$anonfun$7.applyOrElse(Endpoint.scala:544) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) at akka.actor.FSM$class.processEvent(FSM.scala:595) at akka.remote.EndpointWriter.processEvent(Endpoint.scala:443) at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:589) at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:583) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 14/08/21 19:29:32 INFO scheduler.DAGScheduler: Failed to run saveAsTextFile at RecRateApp.scala:88 Exception in thread "main" org.apache.spark.SparkException: Job cancelled because SparkContext was shut down at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:639) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:638) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:638) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1215) at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:201) at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163) at akka.actor.ActorCell.terminate(ActorCell.scala:338) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262) at akka.dispatch.Mailbox.run(Mailbox.scala:218) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 14/08/21 19:29:33 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 14/08/21 19:29:33 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.