What operation are you performing before doing the saveAsTextFile? If you
are doing a groupBy/sortBy/mapPartition/reduceByKey operations then you can
specify the number of partitions. We were facing these kind of problems and
specifying the correct partition solved the issue.

Thanks
Best Regards


On Fri, Aug 22, 2014 at 2:06 AM, Daniil Osipov <daniil.osi...@shazam.com>
wrote:

> Hello,
>
> My job keeps failing on saveAsTextFile stage (frustrating after a 3 hour
> run) with an OOM exception. The log is below. I'm running the job on an
> input of ~8Tb gzipped JSON files, executing on 15 m3.xlarge instances.
> Executor is given 13Gb memory, and I'm setting two custom preferences in
> the job: spark.akka.frameSize: 50 (otherwise it fails due to exceeding the
> limit of 10Mb), spark.storage.memoryFraction: 0.2
>
> Any suggestions?
>
> 14/08/21 19:29:26 INFO spark.MapOutputTrackerMasterActor: Asked to send
> map output locations for shuffle 1 to spark@ip-10-99-160-181.ec2.internal
> :36962
> 14/08/21 19:29:31 INFO spark.MapOutputTrackerMaster: Size of output
> statuses for shuffle 1 is 17541459 bytes
> 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send
> map output locations for shuffle 1 to spark@ip-10-144-221-26.ec2.internal
> :49973
> 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send
> map output locations for shuffle 1 to spark@ip-10-69-31-121.ec2.internal
> :34569
> 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send
> map output locations for shuffle 1 to spark@ip-10-165-70-221.ec2.internal
> :49193
> 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send
> map output locations for shuffle 1 to spark@ip-10-218-181-93.ec2.internal
> :57648
> 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send
> map output locations for shuffle 1 to spark@ip-10-142-187-230.ec2.internal
> :48115
> 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send
> map output locations for shuffle 1 to spark@ip-10-101-178-68.ec2.internal
> :51931
> 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send
> map output locations for shuffle 1 to spark@ip-10-99-165-121.ec2.internal
> :38153
> 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send
> map output locations for shuffle 1 to spark@ip-10-179-187-182.ec2.internal
> :55645
> 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send
> map output locations for shuffle 1 to spark@ip-10-182-231-107.ec2.internal
> :54088
> 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send
> map output locations for shuffle 1 to spark@ip-10-165-79-9.ec2.internal
> :40112
> 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send
> map output locations for shuffle 1 to spark@ip-10-111-169-138.ec2.internal
> :40394
> 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send
> map output locations for shuffle 1 to spark@ip-10-203-161-222.ec2.internal
> :47447
> 14/08/21 19:29:31 INFO spark.MapOutputTrackerMasterActor: Asked to send
> map output locations for shuffle 1 to spark@ip-10-153-141-230.ec2.internal
> :53906
> 14/08/21 19:29:32 ERROR actor.ActorSystemImpl: Uncaught fatal error from
> thread [spark-akka.actor.default-dispatcher-20] shutting down ActorSystem
> [spark]
> java.lang.OutOfMemoryError: Java heap space
>         at
> com.google.protobuf_spark.AbstractMessageLite.toByteArray(AbstractMessageLite.java:62)
>         at
> akka.remote.transport.AkkaPduProtobufCodec$.constructPayload(AkkaPduCodec.scala:145)
>         at
> akka.remote.transport.AkkaProtocolHandle.write(AkkaProtocolTransport.scala:156)
>         at
> akka.remote.EndpointWriter$$anonfun$7.applyOrElse(Endpoint.scala:569)
>         at
> akka.remote.EndpointWriter$$anonfun$7.applyOrElse(Endpoint.scala:544)
>         at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
>         at akka.actor.FSM$class.processEvent(FSM.scala:595)
>         at akka.remote.EndpointWriter.processEvent(Endpoint.scala:443)
>         at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:589)
>         at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:583)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>         at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 14/08/21 19:29:32 INFO scheduler.DAGScheduler: Failed to run
> saveAsTextFile at RecRateApp.scala:88
> Exception in thread "main" org.apache.spark.SparkException: Job cancelled
> because SparkContext was shut down
>         at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:639)
>         at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:638)
>         at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
>         at
> org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:638)
>         at
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1215)
>         at
> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:201)
>         at
> akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163)
>         at akka.actor.ActorCell.terminate(ActorCell.scala:338)
>         at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431)
>         at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
>         at
> akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:218)
>         at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 14/08/21 19:29:33 INFO remote.RemoteActorRefProvider$RemotingTerminator:
> Shutting down remote daemon.
> 14/08/21 19:29:33 INFO remote.RemoteActorRefProvider$RemotingTerminator:
> Remote daemon shut down; proceeding with flushing remote transports.
>

Reply via email to