Hi, Please help me with this problem. I would really appreciate your help !
I am using spark 1.2.0. I have a map-reduce job written in spark in the following way: val sumW = splittedTrainingDataRDD.map(localTrainingData => LocalSGD(w, localTrainingData, numeratorCtEta, numitorCtEta, regularizer, 0.2).reduce((w1,w2) => {w1.add(w2); w2.clear; w1}) Here, w is trove TLongDoubleHashMap containing no more than 50 million elements (in RAM this is ~ 15 GB). w1.add(w2) does addition of the values of the same key, for each key of both maps. My initial configuration is: conf.set("spark.cores.max", "16") conf.set("spark.akka.frameSize", "100000") conf.set("spark.executor.memory", "120g") conf.set("spark.reducer.maxMbInFlight", "100000") conf.set("spark.storage.memoryFraction", "0.9") conf.set("spark.shuffle.file.buffer.kb", "1000") conf.set("spark.broadcast.factory", "org.apache.spark.broadcast.HttpBroadcastFactory") conf.set("spark.driver.maxResultSize", "120g") val sc = new SparkContext(conf) I am running this on a cluster with 8 machines, each machine has 16 cores and 130 GB RAM. My spark-env.sh contains: ulimit -n 200000 SPARK_JAVA_OPTS="-Xms120G -Xmx120G -XX:-UseGCOverheadLimit -XX:-UseCompressedOops" SPARK_DRIVER_MEMORY=120G The error I get is at the reducer above (the reducer above is in file called Learning.scala, line 313): 15/01/18 14:35:52 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-4] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: Requested array size exceeds VM limit at java.util.Arrays.copyOf(Arrays.java:2271) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:836) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:778) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:762) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1389) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 15/01/18 14:35:52 INFO DAGScheduler: Job 2 failed: reduce at Learning.scala:313, took 54.657239 s Exception in thread "main" org.apache.spark.SparkException: Job cancelled because SparkContext was shut down at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:702) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:701) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:701) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1428) at akka.actor.Actor$class.aroundPostStop(Actor.scala:475) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundPostStop(DAGScheduler.scala:1375) at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) at akka.actor.ActorCell.terminate(ActorCell.scala:369) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 15/01/18 14:35:52 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 15/01/18 14:35:52 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. Thank you a lot for your suggestions!! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Reducer-memory-exceeded-tp21221.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org