I think the problem is that you have a single object that is larger than
2GB and so fails to serialize to a byte array. I think it is best not to
design it this way as you can't parallelize combining maps. You could go
all the way to emit key value pairs and reduceByKey. There are solutions
between these two as well.
On Jan 18, 2015 1:47 PM, "octavian.ganea" <octavian.ga...@inf.ethz.ch>
wrote:

> Hi,
>
> Please help me with this problem. I would really appreciate your help !
>
> I am using spark 1.2.0. I have a map-reduce job written in spark in the
> following way:
>
> val sumW = splittedTrainingDataRDD.map(localTrainingData => LocalSGD(w,
> localTrainingData, numeratorCtEta, numitorCtEta, regularizer,
> 0.2).reduce((w1,w2) => {w1.add(w2); w2.clear; w1})
>
> Here, w is trove TLongDoubleHashMap containing no more than 50 million
> elements (in RAM this is ~ 15 GB). w1.add(w2) does addition of the values
> of
> the same key, for each key of both maps.
>
> My initial configuration is:
>     conf.set("spark.cores.max", "16")
>     conf.set("spark.akka.frameSize", "100000")
>     conf.set("spark.executor.memory", "120g")
>     conf.set("spark.reducer.maxMbInFlight", "100000")
>     conf.set("spark.storage.memoryFraction", "0.9")
>     conf.set("spark.shuffle.file.buffer.kb", "1000")
>     conf.set("spark.broadcast.factory",
> "org.apache.spark.broadcast.HttpBroadcastFactory")
>     conf.set("spark.driver.maxResultSize", "120g")
>     val sc = new SparkContext(conf)
>
> I am running this on a cluster with 8 machines, each machine has 16 cores
> and 130 GB RAM.
>
> My spark-env.sh contains:
>  ulimit -n 200000
>  SPARK_JAVA_OPTS="-Xms120G -Xmx120G -XX:-UseGCOverheadLimit
> -XX:-UseCompressedOops"
>  SPARK_DRIVER_MEMORY=120G
>
>
> The error I get is at the reducer above (the reducer above is in file
> called
> Learning.scala, line 313):
>
> 15/01/18 14:35:52 ERROR ActorSystemImpl: Uncaught fatal error from thread
> [sparkDriver-akka.actor.default-dispatcher-4] shutting down ActorSystem
> [sparkDriver]
> java.lang.OutOfMemoryError: Requested array size exceeds VM limit
>         at java.util.Arrays.copyOf(Arrays.java:2271)
>         at
> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
>         at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>         at
> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
>         at
>
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
>         at
>
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
>         at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
>         at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>         at
>
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
>         at
>
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
>         at
> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:836)
>         at
> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:778)
>         at
>
> org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:762)
>         at
>
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1389)
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>         at
>
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>         at
>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 15/01/18 14:35:52 INFO DAGScheduler: Job 2 failed: reduce at
> Learning.scala:313, took 54.657239 s
> Exception in thread "main" org.apache.spark.SparkException: Job cancelled
> because SparkContext was shut down
>         at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:702)
>         at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:701)
>         at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
>         at
>
> org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:701)
>         at
>
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1428)
>         at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
>         at
>
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundPostStop(DAGScheduler.scala:1375)
>         at
>
> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
>         at
> akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
>         at akka.actor.ActorCell.terminate(ActorCell.scala:369)
>         at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
>         at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
>         at
> akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>         at
>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 15/01/18 14:35:52 INFO RemoteActorRefProvider$RemotingTerminator: Shutting
> down remote daemon.
> 15/01/18 14:35:52 INFO RemoteActorRefProvider$RemotingTerminator: Remote
> daemon shut down; proceeding with flushing remote transports.
>
>
> Thank you a lot for your suggestions!!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Reducer-memory-exceeded-tp21221.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to