Re: Re: Problem with the Item-Based Collaborative Filtering Recommendation Algorithms in spark

2014-04-27 Thread Qin Wei






Thanks a lot for your reply, it gave me much inspiration.


qinwei
 From: Sean Owen-2 [via Apache Spark User List]Date: 2014-04-25 14:11To: Qin 
WeiSubject: Re: Problem with the Item-Based Collaborative Filtering 
Recommendation Algorithms in spark

So you are computing all-pairs similarity over 20M users?

This going to take about 200 trillion similarity computations, no?

I don't think there's any way to make that fundamentally fast.


I see you're copying the data set to all workers, which helps make it

faster at the expense of memory consumption.


If you really want to do this and can tolerate some approximation, I

think you want to do some kind of location sensitive hashing to bucket

the vectors and then evaluate similarity to only the other items in

the bucket.



On Fri, Apr 25, 2014 at 5:55 AM, Qin Wei [hidden email] wrote:

 Hi All,



 I have a problem with the Item-Based Collaborative Filtering Recommendation

 Algorithms in spark.

 The basic flow is as below:

                                             (Item1    ,  (User1     ,

 Score1))

        RDD1     ==                    (Item2    ,  (User2     ,   Score2))

                                             (Item1    ,  (User2     ,

 Score3))

                                             (Item2    ,  (User1     ,

 Score4))



        RDD1.groupByKey   ==  RDD2

                                             (Item1,      ((User1,   Score1),

 (User2,   Score3)))

                                             (Item2,      ((User1,   Score4),

 (User2,   Score2)))



 The similarity of Vector  ((User1,   Score1),   (User2,   Score3)) and

 ((User1,   Score4),   (User2,   Score2)) is the similarity of Item1 and

 Item2.



 In my situation, RDD2 contains 20 million records, my spark programm is

 extreamly slow, the source code is as below:

                                 val conf = new

 SparkConf().setMaster(spark://211.151.121.184:7077).setAppName(Score

 Calcu Total).set(spark.executor.memory,

 20g).setJars(Seq(/home/deployer/score-calcu-assembly-1.0.jar))

                                 val sc = new SparkContext(conf)



                                 val mongoRDD = sc.textFile(args(0).toString,

 400)

                                 val jsonRDD = mongoRDD.map(arg = new

 JSONObject(arg))



                                 val newRDD = jsonRDD.map(arg = {

                                 var score =

 haha(arg.get(a).asInstanceOf[JSONObject])



                                 // set score to 0.5 for testing

                                 arg.put(score, 0.5)

                                 arg

                                 })



                                 val resourceScoresRDD = newRDD.map(arg =

 (arg.get(rid).toString.toLong, (arg.get(zid).toString,

 arg.get(score).asInstanceOf[Number].doubleValue))).groupByKey().cache()

                                 val resourceScores =

 resourceScoresRDD.collect()

                                 val bcResourceScores =

 sc.broadcast(resourceScores)



                                 val simRDD =

 resourceScoresRDD.mapPartitions({iter =

                                 val m = bcResourceScores.value

                                 for{ (r1, v1) - iter

                                        (r2, v2) - m

                                        if r1  r2

                                     } yield (r1, r2, cosSimilarity(v1,

 v2))}, true).filter(arg = arg._3  0.1)



                                 println(simRDD.count)



 And I saw this in Spark Web UI:

 http://apache-spark-user-list.1001560.n3.nabble.com/file/n4808/QQ%E6%88%AA%E5%9B%BE20140424204018.png

 http://apache-spark-user-list.1001560.n3.nabble.com/file/n4808/QQ%E6%88%AA%E5%9B%BE20140424204001.png



 My standalone cluster has 3 worker node (16 core and 32G RAM),and the

 workload of the machine in my cluster is heavy when the spark program is

 running.



 Is there any better way to do the algorithm?



 Thanks!







 --

 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Problem-with-the-Item-Based-Collaborative-Filtering-Recommendation-Algorithms-in-spark-tp4808.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.












If you reply to this email, your message will be added to the 
discussion below:

http://apache-spark-user-list.1001560.n3.nabble.com/Problem-with-the-Item-Based-Collaborative-Filtering-Recommendation-Algorithms-in-spark-tp4808p4815.html



To unsubscribe from Problem with the Item-Based Collaborative 
Filtering Recommendation Algorithms in spark, click here.

NAML





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Problem-with-the-Item-Based-Collaborative-Filtering-Recommendation-Algorithms

Problem with the Item-Based Collaborative Filtering Recommendation Algorithms in spark

2014-04-24 Thread Qin Wei
Hi All,

I have a problem with the Item-Based Collaborative Filtering Recommendation
Algorithms in spark.
The basic flow is as below:
(Item1,  (User1 ,  
Score1))
   RDD1 ==(Item2,  (User2 ,   Score2))
(Item1,  (User2 ,  
Score3))
(Item2,  (User1 ,  
Score4))

   RDD1.groupByKey   ==  RDD2
(Item1,  ((User1,   Score1),  
(User2,   Score3)))
(Item2,  ((User1,   Score4),  
(User2,   Score2)))

The similarity of Vector  ((User1,   Score1),   (User2,   Score3)) and
((User1,   Score4),   (User2,   Score2)) is the similarity of Item1 and
Item2.

In my situation, RDD2 contains 20 million records, my spark programm is
extreamly slow, the source code is as below:
val conf = new
SparkConf().setMaster(spark://211.151.121.184:7077).setAppName(Score
Calcu Total).set(spark.executor.memory,
20g).setJars(Seq(/home/deployer/score-calcu-assembly-1.0.jar)) 
val sc = new SparkContext(conf) 

val mongoRDD = sc.textFile(args(0).toString,
400) 
val jsonRDD = mongoRDD.map(arg = new
JSONObject(arg)) 

val newRDD = jsonRDD.map(arg = { 
var score =
haha(arg.get(a).asInstanceOf[JSONObject]) 

// set score to 0.5 for testing
arg.put(score, 0.5) 
arg 
}) 

val resourceScoresRDD = newRDD.map(arg =
(arg.get(rid).toString.toLong, (arg.get(zid).toString,
arg.get(score).asInstanceOf[Number].doubleValue))).groupByKey().cache() 
val resourceScores =
resourceScoresRDD.collect()
val bcResourceScores =
sc.broadcast(resourceScores) 

val simRDD =
resourceScoresRDD.mapPartitions({iter = 
val m = bcResourceScores.value 
for{ (r1, v1) - iter 
   (r2, v2) - m 
   if r1  r2 
} yield (r1, r2, cosSimilarity(v1,
v2))}, true).filter(arg = arg._3  0.1) 

println(simRDD.count)

And I saw this in Spark Web UI:
http://apache-spark-user-list.1001560.n3.nabble.com/file/n4808/QQ%E6%88%AA%E5%9B%BE20140424204018.png
 
http://apache-spark-user-list.1001560.n3.nabble.com/file/n4808/QQ%E6%88%AA%E5%9B%BE20140424204001.png
 

My standalone cluster has 3 worker node (16 core and 32G RAM),and the
workload of the machine in my cluster is heavy when the spark program is
running.

Is there any better way to do the algorithm?

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Problem-with-the-Item-Based-Collaborative-Filtering-Recommendation-Algorithms-in-spark-tp4808.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Re: Spark program thows OutOfMemoryError

2014-04-17 Thread Qin Wei






Hi, Andre, thanks a lot for you reply, but i still get the same exception, the 
complete exception message is as below:
Exception in thread main org.apache.spark.SparkException: Job aborted: Task 
1.0:9 failed 4 times (most recent failure: Exception failure: 
java.lang.OutOfMemoryError: Java heap space)at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)  
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)   at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
 at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
   at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
   at scala.Option.foreach(Option.scala:236)   at 
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604) at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at 
akka.actor.ActorCell.invoke(ActorCell.scala:456) at 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)  at 
akka.dispatch.Mailbox.run(Mailbox.scala:219) at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
according to your hints,i add SPARK_DRIVER_MEMORY to my spark-env.sh:    export 
SPARK_MASTER_IP=192.168.2.184    export SPARK_MASTER_PORT=7077    export 
SPARK_LOCAL_IP=192.168.2.183    export SPARK_DRIVER_MEMORY=10G    export 
SPARK_JAVA_OPTS=-Xms4g -Xmx40g -XX:MaxPermSize=10g

and i modified my code, now i do not call method collect any more, here is my 
code:  def main(args: Array[String]) {

    val sc = new SparkContext(spark://192.168.2.184:7077, Score Calcu 
Total, /usr/local/spark-0.9.1-bin-hadoop2, Seq(/home/deployer/myjar.jar))



    val mongoRDD = sc.textFile(/home/deployer/uris.dat, 200)

    val jsonRDD = mongoRDD.map(arg = new JSONObject(arg))

    val newRDD = jsonRDD.map(arg = {        var score = 0.5        
arg.put(score, score)        arg        })
    val resourceScoresRDD = newRDD.map(arg = (arg.get(rid).toString.toLong, 
(arg.get(zid).toString, 
arg.get(score).asInstanceOf[Number].doubleValue))).groupByKey()
    val simRDD = resourceScoresRDD.cartesian(resourceScoresRDD).filter(arg = 
arg._1._1  arg._2._1).map(arg = (arg._1._1, arg._2._1, 0.8))        
simRDD.saveAsTextFile(/home/deployer/sim)}
I ran the program through java -jar myjar.jar, it crashed quickly, but it 
succeed when the size of the data file is small.
Thanks for your help!

qinwei
 From: Andre Bois-Crettez [via Apache Spark User List]Date: 2014-04-16 17:50To: 
Qin WeiSubject: Re: Spark program thows OutOfMemoryError

Seem you have not enough memory on the spark driver. Hints below :


On 2014-04-15 12:10, Qin Wei wrote:

      val resourcesRDD = jsonRDD.map(arg =

 arg.get(rid).toString.toLong).distinct



      // the program crashes at this line of code

      val bcResources = sc.broadcast(resourcesRDD.collect.toList)

what is returned by resources.RDD.count() ?


 The data file “/home/deployer/uris.dat” is 2G  with lines like this :     {

 id : 1, a : { 0 : 1 }, rid : 5487628, zid : 10550869 }



 And here is my spark-env.sh

      export SCALA_HOME=/usr/local/scala-2.10.3

      export SPARK_MASTER_IP=192.168.2.184

      export SPARK_MASTER_PORT=7077

      export SPARK_LOCAL_IP=192.168.2.182

      export SPARK_WORKER_MEMORY=20g

      export SPARK_MEM=10g

      export SPARK_JAVA_OPTS=-Xms4g -Xmx40g -XX:MaxPermSize=10g

 -XX:-UseGCOverheadLimit
/try setting SPARK_DRIVER_MEMORY to a bigger value, as default 512m is

probably too small for the resourcesRDD.collect()/

By the way, are you really sure you need to collect all that ?


/André Bois-Crettez


Software Architect

Big Data Developer

http://www.kelkoo.com/
/


Kelkoo SAS

Société par Actions Simplifiée

Au capital de € 4.168.964,30

Siège social : 8, rue du Sentier 75002 Paris

425 093 069 RCS Paris


Ce message et les pièces jointes sont confidentiels et établis à l'attention 
exclusive de leurs destinataires. Si vous n'êtes pas le destinataire de ce 
message, merci de le détruire et d'en avertir l'expéditeur

Spark program thows OutOfMemoryError

2014-04-15 Thread Qin Wei
Hi, all

My spark program always gives me the error java.lang.OutOfMemoryError: Java
heap space in my standalone cluster, here is my code:

object SimCalcuTotal { 
def main(args: Array[String]) { 
val sc = new SparkContext(spark://192.168.2.184:7077, Sim Calcu
Total, /usr/local/spark-0.9.0-incubating-bin-hadoop2,
Seq(/home/deployer/score-calcu-assembly-1.0.jar)) 
// val sc = new SparkContext(local, Score Calcu Total) 

val mongoRDD = sc.textFile(/home/deployer/uris.dat, 200) 
val jsonRDD = mongoRDD.map(arg = new JSONObject(arg)) 

val newRDD = jsonRDD.map(arg = { 
// 0.5 for test 
var score = 0.5 
arg.put(score, score) 
arg 
}) 

val resourcesRDD = jsonRDD.map(arg =
arg.get(rid).toString.toLong).distinct 

// the program crashes at this line of code 
val bcResources = sc.broadcast(resourcesRDD.collect.toList) 
val resourceScoresRDD = newRDD.map(arg =
(arg.get(rid).toString.toLong, (arg.get(zid).toString,
arg.get(score).asInstanceOf[Number].doubleValue))).groupByKey() 
val resouceScores = sc.broadcast(resourceScoresRDD.collect.toMap) 

def calSim(item1 : Long, item2 : Long) = { 
val iv1 = resouceScores.value(item1) 
val iv2 = resouceScores.value(item2) 

// 0.5 for test 
var distance = 0.5 
if(distance  0.05){ 
var json = new JSONObject() 
json.put(_id, item1.toString + item2.toString) 
json.put(rid1, item1) 
json.put(rid2, item2) 
json.put(sim, distance) 
json 
} 
else null 
} 

//val saveRDD = newRDD.map(arg = arg.toString) 
//newRDD.saveAsTextFile(args(1).toString) 
val similarityRDD = resourcesRDD.flatMap(resource = { for(other -
bcResources.value if resource  other) yield calSim(resource,
other)}).filter(arg = arg != null) 
similarityRDD.saveAsTextFile(/home/deployer/sim) 
} 
}

The data file “/home/deployer/uris.dat” is 2G  with lines like this : {
id : 1, a : { 0 : 1 }, rid : 5487628, zid : 10550869 }

And here is my spark-env.sh
export SCALA_HOME=/usr/local/scala-2.10.3
export SPARK_MASTER_IP=192.168.2.184
export SPARK_MASTER_PORT=7077
export SPARK_LOCAL_IP=192.168.2.182
export SPARK_WORKER_MEMORY=20g
export SPARK_MEM=10g
export SPARK_JAVA_OPTS=-Xms4g -Xmx40g -XX:MaxPermSize=10g
-XX:-UseGCOverheadLimit

There are two processes on my server when the spark program is
running(before it crashes): 
java -cp
:/usr/local/spark-0.9.0-incubating-bin-hadoop2/conf:/usr/local/spark-0.9.0-incubating-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-incubating-hadoop2.2.0.jar
-Xms4g -Xmx40g -XX:MaxPermSize=10g -XX:-UseGCOverheadLimit -Xms4g -Xmx40g
-XX:MaxPermSize=10g -XX:-UseGCOverheadLimit -Xms512M -Xmx512M
org.apache.spark.executor.CoarseGrainedExecutorBackend
akka.tcp://spark@192.168.2.183:51339/user/CoarseGrainedScheduler 0
192.168.2.182 16 akka.tcp://sparkWorker@192.168.2.182:45588/user/Worker
app-20140415172433-0001 

java -cp
:/usr/local/spark-0.9.0-incubating-bin-hadoop2/conf:/usr/local/spark-0.9.0-incubating-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-incubating-hadoop2.2.0.jar
-Dspark.akka.logLifecycleEvents=true -Djava.library.path= -Xms512m -Xmx512m
org.apache.spark.deploy.worker.Worker spark://192.168.2.184:7077

Is there anybody who can help me? Thanks very much!!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-program-thows-OutOfMemoryError-tp4268.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.