Re: Re: Spark program thows OutOfMemoryError
Hi, Andre, thanks a lot for you reply, but i still get the same exception, the complete exception message is as below: Exception in thread main org.apache.spark.SparkException: Job aborted: Task 1.0:9 failed 4 times (most recent failure: Exception failure: java.lang.OutOfMemoryError: Java heap space)at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) according to your hints,i add SPARK_DRIVER_MEMORY to my spark-env.sh: export SPARK_MASTER_IP=192.168.2.184 export SPARK_MASTER_PORT=7077 export SPARK_LOCAL_IP=192.168.2.183 export SPARK_DRIVER_MEMORY=10G export SPARK_JAVA_OPTS=-Xms4g -Xmx40g -XX:MaxPermSize=10g and i modified my code, now i do not call method collect any more, here is my code: def main(args: Array[String]) { val sc = new SparkContext(spark://192.168.2.184:7077, Score Calcu Total, /usr/local/spark-0.9.1-bin-hadoop2, Seq(/home/deployer/myjar.jar)) val mongoRDD = sc.textFile(/home/deployer/uris.dat, 200) val jsonRDD = mongoRDD.map(arg = new JSONObject(arg)) val newRDD = jsonRDD.map(arg = { var score = 0.5 arg.put(score, score) arg }) val resourceScoresRDD = newRDD.map(arg = (arg.get(rid).toString.toLong, (arg.get(zid).toString, arg.get(score).asInstanceOf[Number].doubleValue))).groupByKey() val simRDD = resourceScoresRDD.cartesian(resourceScoresRDD).filter(arg = arg._1._1 arg._2._1).map(arg = (arg._1._1, arg._2._1, 0.8)) simRDD.saveAsTextFile(/home/deployer/sim)} I ran the program through java -jar myjar.jar, it crashed quickly, but it succeed when the size of the data file is small. Thanks for your help! qinwei From: Andre Bois-Crettez [via Apache Spark User List]Date: 2014-04-16 17:50To: Qin WeiSubject: Re: Spark program thows OutOfMemoryError Seem you have not enough memory on the spark driver. Hints below : On 2014-04-15 12:10, Qin Wei wrote: val resourcesRDD = jsonRDD.map(arg = arg.get(rid).toString.toLong).distinct // the program crashes at this line of code val bcResources = sc.broadcast(resourcesRDD.collect.toList) what is returned by resources.RDD.count() ? The data file “/home/deployer/uris.dat” is 2G with lines like this : { id : 1, a : { 0 : 1 }, rid : 5487628, zid : 10550869 } And here is my spark-env.sh export SCALA_HOME=/usr/local/scala-2.10.3 export SPARK_MASTER_IP=192.168.2.184 export SPARK_MASTER_PORT=7077 export SPARK_LOCAL_IP=192.168.2.182 export SPARK_WORKER_MEMORY=20g export SPARK_MEM=10g export SPARK_JAVA_OPTS=-Xms4g -Xmx40g -XX:MaxPermSize=10g -XX:-UseGCOverheadLimit /try setting SPARK_DRIVER_MEMORY to a bigger value, as default 512m is probably too small for the resourcesRDD.collect()/ By the way, are you really sure you need to collect all that ? /André Bois-Crettez Software Architect Big Data Developer http://www.kelkoo.com/ / Kelkoo SAS Société par Actions Simplifiée Au capital de € 4.168.964,30 Siège social : 8, rue du Sentier 75002 Paris 425 093 069 RCS Paris Ce message et les pièces jointes sont confidentiels et établis à l'attention exclusive de leurs destinataires. Si vous n'êtes pas le destinataire de ce message, merci de le détruire et d'en avertir l'expéditeur
Re: Spark program thows OutOfMemoryError
how many tasks are there in your job? 发自我的 iPhone 在 2014-4-17,16:24,Qin Wei wei@dewmobile.net 写道: Hi, Andre, thanks a lot for you reply, but i still get the same exception, the complete exception message is as below: Exception in thread main org.apache.spark.SparkException: Job aborted: Task 1.0:9 failed 4 times (most recent failure: Exception failure: java.lang.OutOfMemoryError: Java heap space) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) according to your hints,i add SPARK_DRIVER_MEMORY to my spark-env.sh: export SPARK_MASTER_IP=192.168.2.184 export SPARK_MASTER_PORT=7077 export SPARK_LOCAL_IP=192.168.2.183 export SPARK_DRIVER_MEMORY=10G export SPARK_JAVA_OPTS=-Xms4g -Xmx40g -XX:MaxPermSize=10g and i modified my code, now i do not call method collect any more, here is my code: def main(args: Array[String]) { val sc = new SparkContext(spark://192.168.2.184:7077, Score Calcu Total, /usr/local/spark-0.9.1-bin-hadoop2, Seq(/home/deployer/myjar.jar)) val mongoRDD = sc.textFile(/home/deployer/uris.dat, 200) val jsonRDD = mongoRDD.map(arg = new JSONObject(arg)) val newRDD = jsonRDD.map(arg = { var score = 0.5 arg.put(score, score) arg }) val resourceScoresRDD = newRDD.map(arg = (arg.get(rid).toString.toLong, (arg.get(zid).toString, arg.get(score).asInstanceOf[Number].doubleValue))).groupByKey() val simRDD = resourceScoresRDD.cartesian(resourceScoresRDD).filter(arg = arg._1._1 arg._2._1).map(arg = (arg._1._1, arg._2._1, 0.8)) simRDD.saveAsTextFile(/home/deployer/sim) } I ran the program through java -jar myjar.jar, it crashed quickly, but it succeed when the size of the data file is small. Thanks for your help! qinwei From: [hidden email] Date: 2014-04-16 17:50 To: [hidden email] Subject: Re: Spark program thows OutOfMemoryError Seem you have not enough memory on the spark driver. Hints below : On 2014-04-15 12:10, Qin Wei wrote: val resourcesRDD = jsonRDD.map(arg = arg.get(rid).toString.toLong).distinct // the program crashes at this line of code val bcResources = sc.broadcast(resourcesRDD.collect.toList) what is returned by resources.RDD.count() ? The data file “/home/deployer/uris.dat” is 2G with lines like this : { id : 1, a : { 0 : 1 }, rid : 5487628, zid : 10550869 } And here is my spark-env.sh export SCALA_HOME=/usr/local/scala-2.10.3 export SPARK_MASTER_IP=192.168.2.184 export SPARK_MASTER_PORT=7077 export SPARK_LOCAL_IP=192.168.2.182 export SPARK_WORKER_MEMORY=20g export SPARK_MEM=10g export SPARK_JAVA_OPTS=-Xms4g -Xmx40g -XX:MaxPermSize=10g -XX:-UseGCOverheadLimit /try setting SPARK_DRIVER_MEMORY to a bigger value, as default 512m is probably too small for the resourcesRDD.collect()/ By the way, are you really sure you need to collect all that ? /André Bois-Crettez Software Architect Big Data Developer http://www.kelkoo.com/ / Kelkoo SAS Société par Actions Simplifiée Au capital de � 4.168.964,30 Siège social : 8, rue du Sentier 75002 Paris 425 093 069 RCS Paris Ce message et les pièces
Re: Spark program thows OutOfMemoryError
Seem you have not enough memory on the spark driver. Hints below : On 2014-04-15 12:10, Qin Wei wrote: val resourcesRDD = jsonRDD.map(arg = arg.get(rid).toString.toLong).distinct // the program crashes at this line of code val bcResources = sc.broadcast(resourcesRDD.collect.toList) what is returned by resources.RDD.count() ? The data file “/home/deployer/uris.dat” is 2G with lines like this : { id : 1, a : { 0 : 1 }, rid : 5487628, zid : 10550869 } And here is my spark-env.sh export SCALA_HOME=/usr/local/scala-2.10.3 export SPARK_MASTER_IP=192.168.2.184 export SPARK_MASTER_PORT=7077 export SPARK_LOCAL_IP=192.168.2.182 export SPARK_WORKER_MEMORY=20g export SPARK_MEM=10g export SPARK_JAVA_OPTS=-Xms4g -Xmx40g -XX:MaxPermSize=10g -XX:-UseGCOverheadLimit /try setting SPARK_DRIVER_MEMORY to a bigger value, as default 512m is probably too small for the resourcesRDD.collect()/ By the way, are you really sure you need to collect all that ? /André Bois-Crettez Software Architect Big Data Developer http://www.kelkoo.com/ / Kelkoo SAS Société par Actions Simplifiée Au capital de € 4.168.964,30 Siège social : 8, rue du Sentier 75002 Paris 425 093 069 RCS Paris Ce message et les pièces jointes sont confidentiels et établis à l'attention exclusive de leurs destinataires. Si vous n'êtes pas le destinataire de ce message, merci de le détruire et d'en avertir l'expéditeur.
Spark program thows OutOfMemoryError
Hi, all My spark program always gives me the error java.lang.OutOfMemoryError: Java heap space in my standalone cluster, here is my code: object SimCalcuTotal { def main(args: Array[String]) { val sc = new SparkContext(spark://192.168.2.184:7077, Sim Calcu Total, /usr/local/spark-0.9.0-incubating-bin-hadoop2, Seq(/home/deployer/score-calcu-assembly-1.0.jar)) // val sc = new SparkContext(local, Score Calcu Total) val mongoRDD = sc.textFile(/home/deployer/uris.dat, 200) val jsonRDD = mongoRDD.map(arg = new JSONObject(arg)) val newRDD = jsonRDD.map(arg = { // 0.5 for test var score = 0.5 arg.put(score, score) arg }) val resourcesRDD = jsonRDD.map(arg = arg.get(rid).toString.toLong).distinct // the program crashes at this line of code val bcResources = sc.broadcast(resourcesRDD.collect.toList) val resourceScoresRDD = newRDD.map(arg = (arg.get(rid).toString.toLong, (arg.get(zid).toString, arg.get(score).asInstanceOf[Number].doubleValue))).groupByKey() val resouceScores = sc.broadcast(resourceScoresRDD.collect.toMap) def calSim(item1 : Long, item2 : Long) = { val iv1 = resouceScores.value(item1) val iv2 = resouceScores.value(item2) // 0.5 for test var distance = 0.5 if(distance 0.05){ var json = new JSONObject() json.put(_id, item1.toString + item2.toString) json.put(rid1, item1) json.put(rid2, item2) json.put(sim, distance) json } else null } //val saveRDD = newRDD.map(arg = arg.toString) //newRDD.saveAsTextFile(args(1).toString) val similarityRDD = resourcesRDD.flatMap(resource = { for(other - bcResources.value if resource other) yield calSim(resource, other)}).filter(arg = arg != null) similarityRDD.saveAsTextFile(/home/deployer/sim) } } The data file “/home/deployer/uris.dat” is 2G with lines like this : { id : 1, a : { 0 : 1 }, rid : 5487628, zid : 10550869 } And here is my spark-env.sh export SCALA_HOME=/usr/local/scala-2.10.3 export SPARK_MASTER_IP=192.168.2.184 export SPARK_MASTER_PORT=7077 export SPARK_LOCAL_IP=192.168.2.182 export SPARK_WORKER_MEMORY=20g export SPARK_MEM=10g export SPARK_JAVA_OPTS=-Xms4g -Xmx40g -XX:MaxPermSize=10g -XX:-UseGCOverheadLimit There are two processes on my server when the spark program is running(before it crashes): java -cp :/usr/local/spark-0.9.0-incubating-bin-hadoop2/conf:/usr/local/spark-0.9.0-incubating-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-incubating-hadoop2.2.0.jar -Xms4g -Xmx40g -XX:MaxPermSize=10g -XX:-UseGCOverheadLimit -Xms4g -Xmx40g -XX:MaxPermSize=10g -XX:-UseGCOverheadLimit -Xms512M -Xmx512M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://spark@192.168.2.183:51339/user/CoarseGrainedScheduler 0 192.168.2.182 16 akka.tcp://sparkWorker@192.168.2.182:45588/user/Worker app-20140415172433-0001 java -cp :/usr/local/spark-0.9.0-incubating-bin-hadoop2/conf:/usr/local/spark-0.9.0-incubating-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-incubating-hadoop2.2.0.jar -Dspark.akka.logLifecycleEvents=true -Djava.library.path= -Xms512m -Xmx512m org.apache.spark.deploy.worker.Worker spark://192.168.2.184:7077 Is there anybody who can help me? Thanks very much!! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-program-thows-OutOfMemoryError-tp4268.html Sent from the Apache Spark User List mailing list archive at Nabble.com.