Re: Re: Spark program thows OutOfMemoryError

2014-04-17 Thread Qin Wei






Hi, Andre, thanks a lot for you reply, but i still get the same exception, the 
complete exception message is as below:
Exception in thread main org.apache.spark.SparkException: Job aborted: Task 
1.0:9 failed 4 times (most recent failure: Exception failure: 
java.lang.OutOfMemoryError: Java heap space)at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)  
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)   at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
 at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
   at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
   at scala.Option.foreach(Option.scala:236)   at 
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604) at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at 
akka.actor.ActorCell.invoke(ActorCell.scala:456) at 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)  at 
akka.dispatch.Mailbox.run(Mailbox.scala:219) at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
according to your hints,i add SPARK_DRIVER_MEMORY to my spark-env.sh:    export 
SPARK_MASTER_IP=192.168.2.184    export SPARK_MASTER_PORT=7077    export 
SPARK_LOCAL_IP=192.168.2.183    export SPARK_DRIVER_MEMORY=10G    export 
SPARK_JAVA_OPTS=-Xms4g -Xmx40g -XX:MaxPermSize=10g

and i modified my code, now i do not call method collect any more, here is my 
code:  def main(args: Array[String]) {

    val sc = new SparkContext(spark://192.168.2.184:7077, Score Calcu 
Total, /usr/local/spark-0.9.1-bin-hadoop2, Seq(/home/deployer/myjar.jar))



    val mongoRDD = sc.textFile(/home/deployer/uris.dat, 200)

    val jsonRDD = mongoRDD.map(arg = new JSONObject(arg))

    val newRDD = jsonRDD.map(arg = {        var score = 0.5        
arg.put(score, score)        arg        })
    val resourceScoresRDD = newRDD.map(arg = (arg.get(rid).toString.toLong, 
(arg.get(zid).toString, 
arg.get(score).asInstanceOf[Number].doubleValue))).groupByKey()
    val simRDD = resourceScoresRDD.cartesian(resourceScoresRDD).filter(arg = 
arg._1._1  arg._2._1).map(arg = (arg._1._1, arg._2._1, 0.8))        
simRDD.saveAsTextFile(/home/deployer/sim)}
I ran the program through java -jar myjar.jar, it crashed quickly, but it 
succeed when the size of the data file is small.
Thanks for your help!

qinwei
 From: Andre Bois-Crettez [via Apache Spark User List]Date: 2014-04-16 17:50To: 
Qin WeiSubject: Re: Spark program thows OutOfMemoryError

Seem you have not enough memory on the spark driver. Hints below :


On 2014-04-15 12:10, Qin Wei wrote:

      val resourcesRDD = jsonRDD.map(arg =

 arg.get(rid).toString.toLong).distinct



      // the program crashes at this line of code

      val bcResources = sc.broadcast(resourcesRDD.collect.toList)

what is returned by resources.RDD.count() ?


 The data file “/home/deployer/uris.dat” is 2G  with lines like this :     {

 id : 1, a : { 0 : 1 }, rid : 5487628, zid : 10550869 }



 And here is my spark-env.sh

      export SCALA_HOME=/usr/local/scala-2.10.3

      export SPARK_MASTER_IP=192.168.2.184

      export SPARK_MASTER_PORT=7077

      export SPARK_LOCAL_IP=192.168.2.182

      export SPARK_WORKER_MEMORY=20g

      export SPARK_MEM=10g

      export SPARK_JAVA_OPTS=-Xms4g -Xmx40g -XX:MaxPermSize=10g

 -XX:-UseGCOverheadLimit
/try setting SPARK_DRIVER_MEMORY to a bigger value, as default 512m is

probably too small for the resourcesRDD.collect()/

By the way, are you really sure you need to collect all that ?


/André Bois-Crettez


Software Architect

Big Data Developer

http://www.kelkoo.com/
/


Kelkoo SAS

Société par Actions Simplifiée

Au capital de € 4.168.964,30

Siège social : 8, rue du Sentier 75002 Paris

425 093 069 RCS Paris


Ce message et les pièces jointes sont confidentiels et établis à l'attention 
exclusive de leurs destinataires. Si vous n'êtes pas le destinataire de ce 
message, merci de le détruire et d'en avertir l'expéditeur

Re: Spark program thows OutOfMemoryError

2014-04-17 Thread yypvsxf19870706
how many tasks are there in your job?

发自我的 iPhone

在 2014-4-17,16:24,Qin Wei wei@dewmobile.net 写道:

 Hi, Andre, thanks a lot for you reply, but i still get the same exception, 
 the complete exception message is as below:
 
 Exception in thread main org.apache.spark.SparkException: Job aborted: Task 
 1.0:9 failed 4 times (most recent failure: Exception failure: 
 java.lang.OutOfMemoryError: Java heap space)
   at 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
   at 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
   at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at 
 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
   at 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
   at 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
   at scala.Option.foreach(Option.scala:236)
   at 
 org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)
   at 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
   at akka.actor.ActorCell.invoke(ActorCell.scala:456)
   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
   at akka.dispatch.Mailbox.run(Mailbox.scala:219)
   at 
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
   at 
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
   at 
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
   at 
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 
 according to your hints,i add SPARK_DRIVER_MEMORY to my spark-env.sh:
 export SPARK_MASTER_IP=192.168.2.184
 export SPARK_MASTER_PORT=7077
 export SPARK_LOCAL_IP=192.168.2.183
 export SPARK_DRIVER_MEMORY=10G
 export SPARK_JAVA_OPTS=-Xms4g -Xmx40g -XX:MaxPermSize=10g
 
 and i modified my code, now i do not call method collect any more, here is my 
 code:
   def main(args: Array[String]) { 
 val sc = new SparkContext(spark://192.168.2.184:7077, Score Calcu 
 Total, /usr/local/spark-0.9.1-bin-hadoop2, 
 Seq(/home/deployer/myjar.jar)) 
 
 val mongoRDD = sc.textFile(/home/deployer/uris.dat, 200) 
 val jsonRDD = mongoRDD.map(arg = new JSONObject(arg)) 
 val newRDD = jsonRDD.map(arg = {
 var score = 0.5
 arg.put(score, score)
 arg
 })
 
 val resourceScoresRDD = newRDD.map(arg = 
 (arg.get(rid).toString.toLong, (arg.get(zid).toString, 
 arg.get(score).asInstanceOf[Number].doubleValue))).groupByKey()
 
 val simRDD = resourceScoresRDD.cartesian(resourceScoresRDD).filter(arg = 
 arg._1._1  arg._2._1).map(arg = (arg._1._1, arg._2._1, 0.8))
 
 simRDD.saveAsTextFile(/home/deployer/sim)
 }
 
 I ran the program through java -jar myjar.jar, it crashed quickly, but it 
 succeed when the size of the data file is small.
 
 Thanks for your help!
 
 qinwei
  
 From: [hidden email]
 Date: 2014-04-16 17:50
 To: [hidden email]
 Subject: Re: Spark program thows OutOfMemoryError
 Seem you have not enough memory on the spark driver. Hints below : 
 
 On 2014-04-15 12:10, Qin Wei wrote: 
   val resourcesRDD = jsonRDD.map(arg = 
  arg.get(rid).toString.toLong).distinct 
  
   // the program crashes at this line of code 
   val bcResources = sc.broadcast(resourcesRDD.collect.toList) 
 what is returned by resources.RDD.count() ? 
 
  The data file “/home/deployer/uris.dat” is 2G  with lines like this : { 
  id : 1, a : { 0 : 1 }, rid : 5487628, zid : 10550869 } 
  
  And here is my spark-env.sh 
   export SCALA_HOME=/usr/local/scala-2.10.3 
   export SPARK_MASTER_IP=192.168.2.184 
   export SPARK_MASTER_PORT=7077 
   export SPARK_LOCAL_IP=192.168.2.182 
   export SPARK_WORKER_MEMORY=20g 
   export SPARK_MEM=10g 
   export SPARK_JAVA_OPTS=-Xms4g -Xmx40g -XX:MaxPermSize=10g 
  -XX:-UseGCOverheadLimit
 /try setting SPARK_DRIVER_MEMORY to a bigger value, as default 512m is
 probably too small for the resourcesRDD.collect()/ 
 By the way, are you really sure you need to collect all that ? 
 
 /André Bois-Crettez 
 
 Software Architect 
 Big Data Developer 
 http://www.kelkoo.com/
 / 
 
 Kelkoo SAS 
 Société par Actions Simplifiée 
 Au capital de � 4.168.964,30 
 Siège social : 8, rue du Sentier 75002 Paris 
 425 093 069 RCS Paris 
 
 Ce message et les pièces

Re: Spark program thows OutOfMemoryError

2014-04-16 Thread Andre Bois-Crettez

Seem you have not enough memory on the spark driver. Hints below :

On 2014-04-15 12:10, Qin Wei wrote:

 val resourcesRDD = jsonRDD.map(arg =
arg.get(rid).toString.toLong).distinct

 // the program crashes at this line of code
 val bcResources = sc.broadcast(resourcesRDD.collect.toList)

what is returned by resources.RDD.count() ?


The data file “/home/deployer/uris.dat” is 2G  with lines like this : {
id : 1, a : { 0 : 1 }, rid : 5487628, zid : 10550869 }

And here is my spark-env.sh
 export SCALA_HOME=/usr/local/scala-2.10.3
 export SPARK_MASTER_IP=192.168.2.184
 export SPARK_MASTER_PORT=7077
 export SPARK_LOCAL_IP=192.168.2.182
 export SPARK_WORKER_MEMORY=20g
 export SPARK_MEM=10g
 export SPARK_JAVA_OPTS=-Xms4g -Xmx40g -XX:MaxPermSize=10g
-XX:-UseGCOverheadLimit

/try setting SPARK_DRIVER_MEMORY to a bigger value, as default 512m is
probably too small for the resourcesRDD.collect()/
By the way, are you really sure you need to collect all that ?

/André Bois-Crettez

Software Architect
Big Data Developer
http://www.kelkoo.com/
/

Kelkoo SAS
Société par Actions Simplifiée
Au capital de € 4.168.964,30
Siège social : 8, rue du Sentier 75002 Paris
425 093 069 RCS Paris

Ce message et les pièces jointes sont confidentiels et établis à l'attention 
exclusive de leurs destinataires. Si vous n'êtes pas le destinataire de ce 
message, merci de le détruire et d'en avertir l'expéditeur.


Spark program thows OutOfMemoryError

2014-04-15 Thread Qin Wei
Hi, all

My spark program always gives me the error java.lang.OutOfMemoryError: Java
heap space in my standalone cluster, here is my code:

object SimCalcuTotal { 
def main(args: Array[String]) { 
val sc = new SparkContext(spark://192.168.2.184:7077, Sim Calcu
Total, /usr/local/spark-0.9.0-incubating-bin-hadoop2,
Seq(/home/deployer/score-calcu-assembly-1.0.jar)) 
// val sc = new SparkContext(local, Score Calcu Total) 

val mongoRDD = sc.textFile(/home/deployer/uris.dat, 200) 
val jsonRDD = mongoRDD.map(arg = new JSONObject(arg)) 

val newRDD = jsonRDD.map(arg = { 
// 0.5 for test 
var score = 0.5 
arg.put(score, score) 
arg 
}) 

val resourcesRDD = jsonRDD.map(arg =
arg.get(rid).toString.toLong).distinct 

// the program crashes at this line of code 
val bcResources = sc.broadcast(resourcesRDD.collect.toList) 
val resourceScoresRDD = newRDD.map(arg =
(arg.get(rid).toString.toLong, (arg.get(zid).toString,
arg.get(score).asInstanceOf[Number].doubleValue))).groupByKey() 
val resouceScores = sc.broadcast(resourceScoresRDD.collect.toMap) 

def calSim(item1 : Long, item2 : Long) = { 
val iv1 = resouceScores.value(item1) 
val iv2 = resouceScores.value(item2) 

// 0.5 for test 
var distance = 0.5 
if(distance  0.05){ 
var json = new JSONObject() 
json.put(_id, item1.toString + item2.toString) 
json.put(rid1, item1) 
json.put(rid2, item2) 
json.put(sim, distance) 
json 
} 
else null 
} 

//val saveRDD = newRDD.map(arg = arg.toString) 
//newRDD.saveAsTextFile(args(1).toString) 
val similarityRDD = resourcesRDD.flatMap(resource = { for(other -
bcResources.value if resource  other) yield calSim(resource,
other)}).filter(arg = arg != null) 
similarityRDD.saveAsTextFile(/home/deployer/sim) 
} 
}

The data file “/home/deployer/uris.dat” is 2G  with lines like this : {
id : 1, a : { 0 : 1 }, rid : 5487628, zid : 10550869 }

And here is my spark-env.sh
export SCALA_HOME=/usr/local/scala-2.10.3
export SPARK_MASTER_IP=192.168.2.184
export SPARK_MASTER_PORT=7077
export SPARK_LOCAL_IP=192.168.2.182
export SPARK_WORKER_MEMORY=20g
export SPARK_MEM=10g
export SPARK_JAVA_OPTS=-Xms4g -Xmx40g -XX:MaxPermSize=10g
-XX:-UseGCOverheadLimit

There are two processes on my server when the spark program is
running(before it crashes): 
java -cp
:/usr/local/spark-0.9.0-incubating-bin-hadoop2/conf:/usr/local/spark-0.9.0-incubating-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-incubating-hadoop2.2.0.jar
-Xms4g -Xmx40g -XX:MaxPermSize=10g -XX:-UseGCOverheadLimit -Xms4g -Xmx40g
-XX:MaxPermSize=10g -XX:-UseGCOverheadLimit -Xms512M -Xmx512M
org.apache.spark.executor.CoarseGrainedExecutorBackend
akka.tcp://spark@192.168.2.183:51339/user/CoarseGrainedScheduler 0
192.168.2.182 16 akka.tcp://sparkWorker@192.168.2.182:45588/user/Worker
app-20140415172433-0001 

java -cp
:/usr/local/spark-0.9.0-incubating-bin-hadoop2/conf:/usr/local/spark-0.9.0-incubating-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-incubating-hadoop2.2.0.jar
-Dspark.akka.logLifecycleEvents=true -Djava.library.path= -Xms512m -Xmx512m
org.apache.spark.deploy.worker.Worker spark://192.168.2.184:7077

Is there anybody who can help me? Thanks very much!!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-program-thows-OutOfMemoryError-tp4268.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.