Hi,

Please help me with this problem. I would really appreciate your help !

I am using spark 1.2.0. I have a map-reduce job written in spark in the
following way:

val sumW = splittedTrainingDataRDD.map(localTrainingData => LocalSGD(w,
localTrainingData, numeratorCtEta, numitorCtEta, regularizer,
0.2).reduce((w1,w2) => {w1.add(w2); w2.clear; w1}) 

Here, w is trove TLongDoubleHashMap containing no more than 50 million
elements (in RAM this is ~ 15 GB). w1.add(w2) does addition of the values of
the same key, for each key of both maps.

My initial configuration is:
    conf.set("spark.cores.max", "16")
    conf.set("spark.akka.frameSize", "100000")
    conf.set("spark.executor.memory", "120g")
    conf.set("spark.reducer.maxMbInFlight", "100000")
    conf.set("spark.storage.memoryFraction", "0.9")
    conf.set("spark.shuffle.file.buffer.kb", "1000")
    conf.set("spark.broadcast.factory",
"org.apache.spark.broadcast.HttpBroadcastFactory")  
    conf.set("spark.driver.maxResultSize", "120g")
    val sc = new SparkContext(conf)

I am running this on a cluster with 8 machines, each machine has 16 cores
and 130 GB RAM.

My spark-env.sh contains:
 ulimit -n 200000
 SPARK_JAVA_OPTS="-Xms120G -Xmx120G -XX:-UseGCOverheadLimit
-XX:-UseCompressedOops"
 SPARK_DRIVER_MEMORY=120G


The error I get is at the reducer above (the reducer above is in file called
Learning.scala, line 313):

15/01/18 14:35:52 ERROR ActorSystemImpl: Uncaught fatal error from thread
[sparkDriver-akka.actor.default-dispatcher-4] shutting down ActorSystem
[sparkDriver]
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
        at java.util.Arrays.copyOf(Arrays.java:2271)
        at
java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
        at
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
        at
java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
        at
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
        at
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
        at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
        at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
        at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
        at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
        at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:836)
        at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:778)
        at
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:762)
        at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1389)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
        at akka.dispatch.Mailbox.run(Mailbox.scala:220)
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
        at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
15/01/18 14:35:52 INFO DAGScheduler: Job 2 failed: reduce at
Learning.scala:313, took 54.657239 s
Exception in thread "main" org.apache.spark.SparkException: Job cancelled
because SparkContext was shut down
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:702)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:701)
        at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
        at
org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:701)
        at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1428)
        at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
        at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundPostStop(DAGScheduler.scala:1375)
        at
akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
        at
akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
        at akka.actor.ActorCell.terminate(ActorCell.scala:369)
        at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
        at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
        at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
        at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
15/01/18 14:35:52 INFO RemoteActorRefProvider$RemotingTerminator: Shutting
down remote daemon.
15/01/18 14:35:52 INFO RemoteActorRefProvider$RemotingTerminator: Remote
daemon shut down; proceeding with flushing remote transports.


Thank you a lot for your suggestions!!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Reducer-memory-exceeded-tp21221.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to