Re: Re: what is the best way to do cartesian

2014-04-27 Thread qinwei






Thanks a lot for your reply, but i have tried the  built-in RDD.cartesian() 
method before, it didn't make it faster.


qinwei
 From: Alex BoisvertDate: 2014-04-26 00:32To: userSubject: Re: what is the best 
way to do cartesianYou might want to try the built-in RDD.cartesian() method.


On Thu, Apr 24, 2014 at 9:05 PM, Qin Wei wei@dewmobile.net wrote:

Hi All,



I have a problem with the Item-Based Collaborative Filtering Recommendation

Algorithms in spark.

The basic flow is as below:

                                            (Item1    ,  (User1     ,

Score1))

       RDD1     ==                    (Item2    ,  (User2     ,   Score2))

                                            (Item1    ,  (User2     ,

Score3))

                                            (Item2    ,  (User1     ,

Score4))



       RDD1.groupByKey   ==  RDD2

                                            (Item1,      ((User1,   Score1),

(User2,   Score3)))

                                            (Item2,      ((User1,   Score4),

(User2,   Score2)))



The similarity of Vector  ((User1,   Score1),   (User2,   Score3)) and

((User1,   Score4),   (User2,   Score2)) is the similarity of Item1 and

Item2.



In my situation, RDD2 contains 20 million records, my spark programm is

extreamly slow, the source code is as below:

                                val conf = new

SparkConf().setMaster(spark://211.151.121.184:7077).setAppName(Score

Calcu Total).set(spark.executor.memory,

20g).setJars(Seq(/home/deployer/score-calcu-assembly-1.0.jar))

                                val sc = new SparkContext(conf)



                                val mongoRDD = sc.textFile(args(0).toString,

400)

                                val jsonRDD = mongoRDD.map(arg = new

JSONObject(arg))



                                val newRDD = jsonRDD.map(arg = {

                                var score =

haha(arg.get(a).asInstanceOf[JSONObject])



                                // set score to 0.5 for testing

                                arg.put(score, 0.5)

                                arg

                                })



                                val resourceScoresRDD = newRDD.map(arg =

(arg.get(rid).toString.toLong, (arg.get(zid).toString,

arg.get(score).asInstanceOf[Number].doubleValue))).groupByKey().cache()

                                val resourceScores =

resourceScoresRDD.collect()

                                val bcResourceScores =

sc.broadcast(resourceScores)



                                val simRDD =

resourceScoresRDD.mapPartitions({iter =

                                val m = bcResourceScores.value

                                for{ (r1, v1) - iter

                                       (r2, v2) - m

                                       if r1  r2

                                    } yield (r1, r2, cosSimilarity(v1,

v2))}, true).filter(arg = arg._3  0.1)



                                println(simRDD.count)



And I saw this in Spark Web UI:

http://apache-spark-user-list.1001560.n3.nabble.com/file/n4807/QQ%E6%88%AA%E5%9B%BE20140424204018.png


http://apache-spark-user-list.1001560.n3.nabble.com/file/n4807/QQ%E6%88%AA%E5%9B%BE20140424204001.png




My standalone cluster has 3 worker node (16 core and 32G RAM),and the

workload of the machine in my cluster is heavy when the spark program is

running.



Is there any better way to do the algorithm?



Thanks!







--

View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/what-is-the-best-way-to-do-cartesian-tp4807.html


Sent from the Apache Spark User List mailing list archive at Nabble.com.






Re: what is the best way to do cartesian

2014-04-25 Thread Alex Boisvert
You might want to try the built-in RDD.cartesian() method.


On Thu, Apr 24, 2014 at 9:05 PM, Qin Wei wei@dewmobile.net wrote:

 Hi All,

 I have a problem with the Item-Based Collaborative Filtering Recommendation
 Algorithms in spark.
 The basic flow is as below:
 (Item1,  (User1 ,
 Score1))
RDD1 ==(Item2,  (User2 ,   Score2))
 (Item1,  (User2 ,
 Score3))
 (Item2,  (User1 ,
 Score4))

RDD1.groupByKey   ==  RDD2
 (Item1,  ((User1,
 Score1),
 (User2,   Score3)))
 (Item2,  ((User1,
 Score4),
 (User2,   Score2)))

 The similarity of Vector  ((User1,   Score1),   (User2,   Score3)) and
 ((User1,   Score4),   (User2,   Score2)) is the similarity of Item1 and
 Item2.

 In my situation, RDD2 contains 20 million records, my spark programm is
 extreamly slow, the source code is as below:
 val conf = new
 SparkConf().setMaster(spark://211.151.121.184:7077).setAppName(Score
 Calcu Total).set(spark.executor.memory,
 20g).setJars(Seq(/home/deployer/score-calcu-assembly-1.0.jar))
 val sc = new SparkContext(conf)

 val mongoRDD =
 sc.textFile(args(0).toString,
 400)
 val jsonRDD = mongoRDD.map(arg = new
 JSONObject(arg))

 val newRDD = jsonRDD.map(arg = {
 var score =
 haha(arg.get(a).asInstanceOf[JSONObject])

 // set score to 0.5 for testing
 arg.put(score, 0.5)
 arg
 })

 val resourceScoresRDD = newRDD.map(arg =
 (arg.get(rid).toString.toLong, (arg.get(zid).toString,
 arg.get(score).asInstanceOf[Number].doubleValue))).groupByKey().cache()
 val resourceScores =
 resourceScoresRDD.collect()
 val bcResourceScores =
 sc.broadcast(resourceScores)

 val simRDD =
 resourceScoresRDD.mapPartitions({iter =
 val m = bcResourceScores.value
 for{ (r1, v1) - iter
(r2, v2) - m
if r1  r2
 } yield (r1, r2, cosSimilarity(v1,
 v2))}, true).filter(arg = arg._3  0.1)

 println(simRDD.count)

 And I saw this in Spark Web UI:
 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n4807/QQ%E6%88%AA%E5%9B%BE20140424204018.png
 
 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n4807/QQ%E6%88%AA%E5%9B%BE20140424204001.png
 

 My standalone cluster has 3 worker node (16 core and 32G RAM),and the
 workload of the machine in my cluster is heavy when the spark program is
 running.

 Is there any better way to do the algorithm?

 Thanks!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/what-is-the-best-way-to-do-cartesian-tp4807.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: what is the best way to do cartesian

2014-04-25 Thread Eugen Cepoi
Depending on the size of the rdd you could also do a collect broadcast and
then compute the product in a map function over the other rdd. If this is
the same rdd you might also want to cache it. This pattern worked quite
good for me
Le 25 avr. 2014 18:33, Alex Boisvert alex.boisv...@gmail.com a écrit :

 You might want to try the built-in RDD.cartesian() method.


 On Thu, Apr 24, 2014 at 9:05 PM, Qin Wei wei@dewmobile.net wrote:

 Hi All,

 I have a problem with the Item-Based Collaborative Filtering
 Recommendation
 Algorithms in spark.
 The basic flow is as below:
 (Item1,  (User1 ,
 Score1))
RDD1 ==(Item2,  (User2 ,
 Score2))
 (Item1,  (User2 ,
 Score3))
 (Item2,  (User1 ,
 Score4))

RDD1.groupByKey   ==  RDD2
 (Item1,  ((User1,
 Score1),
 (User2,   Score3)))
 (Item2,  ((User1,
 Score4),
 (User2,   Score2)))

 The similarity of Vector  ((User1,   Score1),   (User2,   Score3)) and
 ((User1,   Score4),   (User2,   Score2)) is the similarity of Item1 and
 Item2.

 In my situation, RDD2 contains 20 million records, my spark programm is
 extreamly slow, the source code is as below:
 val conf = new
 SparkConf().setMaster(spark://211.151.121.184:7077).setAppName(Score
 Calcu Total).set(spark.executor.memory,
 20g).setJars(Seq(/home/deployer/score-calcu-assembly-1.0.jar))
 val sc = new SparkContext(conf)

 val mongoRDD =
 sc.textFile(args(0).toString,
 400)
 val jsonRDD = mongoRDD.map(arg = new
 JSONObject(arg))

 val newRDD = jsonRDD.map(arg = {
 var score =
 haha(arg.get(a).asInstanceOf[JSONObject])

 // set score to 0.5 for testing
 arg.put(score, 0.5)
 arg
 })

 val resourceScoresRDD = newRDD.map(arg =
 (arg.get(rid).toString.toLong, (arg.get(zid).toString,
 arg.get(score).asInstanceOf[Number].doubleValue))).groupByKey().cache()
 val resourceScores =
 resourceScoresRDD.collect()
 val bcResourceScores =
 sc.broadcast(resourceScores)

 val simRDD =
 resourceScoresRDD.mapPartitions({iter =
 val m = bcResourceScores.value
 for{ (r1, v1) - iter
(r2, v2) - m
if r1  r2
 } yield (r1, r2, cosSimilarity(v1,
 v2))}, true).filter(arg = arg._3  0.1)

 println(simRDD.count)

 And I saw this in Spark Web UI:
 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n4807/QQ%E6%88%AA%E5%9B%BE20140424204018.png
 
 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n4807/QQ%E6%88%AA%E5%9B%BE20140424204001.png
 

 My standalone cluster has 3 worker node (16 core and 32G RAM),and the
 workload of the machine in my cluster is heavy when the spark program is
 running.

 Is there any better way to do the algorithm?

 Thanks!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/what-is-the-best-way-to-do-cartesian-tp4807.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.