Hi All,

I have a problem with the Item-Based Collaborative Filtering Recommendation
Algorithms in spark.
The basic flow is as below:
                                            (Item1    ,  (User1     ,  
Score1))
       RDD1     ==>                    (Item2    ,  (User2     ,   Score2))
                                            (Item1    ,  (User2     ,  
Score3))
                                            (Item2    ,  (User1     ,  
Score4))

       RDD1.groupByKey   ==>  RDD2
                                            (Item1,      ((User1,   Score1),  
(User2,   Score3)))
                                            (Item2,      ((User1,   Score4),  
(User2,   Score2)))

The similarity of Vector  ((User1,   Score1),   (User2,   Score3)) and
((User1,   Score4),   (User2,   Score2)) is the similarity of Item1 and
Item2.

In my situation, RDD2 contains 20 million records, my spark programm is
extreamly slow, the source code is as below:
                                val conf = new
SparkConf().setMaster("spark://211.151.121.184:7077").setAppName("Score
Calcu Total").set("spark.executor.memory",
"20g").setJars(Seq("/home/deployer/score-calcu-assembly-1.0.jar")) 
                                val sc = new SparkContext(conf) 

                                val mongoRDD = sc.textFile(args(0).toString,
400) 
                                val jsonRDD = mongoRDD.map(arg => new
JSONObject(arg)) 

                                val newRDD = jsonRDD.map(arg => { 
                                var score =
haha(arg.get("a").asInstanceOf[JSONObject]) 

                                // set score to 0.5 for testing
                                arg.put("score", 0.5) 
                                arg 
                                }) 

                                val resourceScoresRDD = newRDD.map(arg =>
(arg.get("rid").toString.toLong, (arg.get("zid").toString,
arg.get("score").asInstanceOf[Number].doubleValue))).groupByKey().cache() 
                                val resourceScores =
resourceScoresRDD.collect()
                                val bcResourceScores =
sc.broadcast(resourceScores) 

                                val simRDD =
resourceScoresRDD.mapPartitions({iter => 
                                val m = bcResourceScores.value 
                                for{ (r1, v1) <- iter 
                                       (r2, v2) <- m 
                                       if r1 > r2 
                                    } yield (r1, r2, cosSimilarity(v1,
v2))}, true).filter(arg => arg._3 > 0.1) 

                                println(simRDD.count)

And I saw this in Spark Web UI:
<http://apache-spark-user-list.1001560.n3.nabble.com/file/n4808/QQ%E6%88%AA%E5%9B%BE20140424204018.png>
 
<http://apache-spark-user-list.1001560.n3.nabble.com/file/n4808/QQ%E6%88%AA%E5%9B%BE20140424204001.png>
 

My standalone cluster has 3 worker node (16 core and 32G RAM),and the
workload of the machine in my cluster is heavy when the spark program is
running.

Is there any better way to do the algorithm?

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Problem-with-the-Item-Based-Collaborative-Filtering-Recommendation-Algorithms-in-spark-tp4808.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply via email to