Re: Re: what is the best way to do cartesian
Thanks a lot for your reply, but i have tried the built-in RDD.cartesian() method before, it didn't make it faster. qinwei From: Alex BoisvertDate: 2014-04-26 00:32To: userSubject: Re: what is the best way to do cartesianYou might want to try the built-in RDD.cartesian() method. On Thu, Apr 24, 2014 at 9:05 PM, Qin Wei wei@dewmobile.net wrote: Hi All, I have a problem with the Item-Based Collaborative Filtering Recommendation Algorithms in spark. The basic flow is as below: (Item1 , (User1 , Score1)) RDD1 == (Item2 , (User2 , Score2)) (Item1 , (User2 , Score3)) (Item2 , (User1 , Score4)) RDD1.groupByKey == RDD2 (Item1, ((User1, Score1), (User2, Score3))) (Item2, ((User1, Score4), (User2, Score2))) The similarity of Vector ((User1, Score1), (User2, Score3)) and ((User1, Score4), (User2, Score2)) is the similarity of Item1 and Item2. In my situation, RDD2 contains 20 million records, my spark programm is extreamly slow, the source code is as below: val conf = new SparkConf().setMaster(spark://211.151.121.184:7077).setAppName(Score Calcu Total).set(spark.executor.memory, 20g).setJars(Seq(/home/deployer/score-calcu-assembly-1.0.jar)) val sc = new SparkContext(conf) val mongoRDD = sc.textFile(args(0).toString, 400) val jsonRDD = mongoRDD.map(arg = new JSONObject(arg)) val newRDD = jsonRDD.map(arg = { var score = haha(arg.get(a).asInstanceOf[JSONObject]) // set score to 0.5 for testing arg.put(score, 0.5) arg }) val resourceScoresRDD = newRDD.map(arg = (arg.get(rid).toString.toLong, (arg.get(zid).toString, arg.get(score).asInstanceOf[Number].doubleValue))).groupByKey().cache() val resourceScores = resourceScoresRDD.collect() val bcResourceScores = sc.broadcast(resourceScores) val simRDD = resourceScoresRDD.mapPartitions({iter = val m = bcResourceScores.value for{ (r1, v1) - iter (r2, v2) - m if r1 r2 } yield (r1, r2, cosSimilarity(v1, v2))}, true).filter(arg = arg._3 0.1) println(simRDD.count) And I saw this in Spark Web UI: http://apache-spark-user-list.1001560.n3.nabble.com/file/n4807/QQ%E6%88%AA%E5%9B%BE20140424204018.png http://apache-spark-user-list.1001560.n3.nabble.com/file/n4807/QQ%E6%88%AA%E5%9B%BE20140424204001.png My standalone cluster has 3 worker node (16 core and 32G RAM),and the workload of the machine in my cluster is heavy when the spark program is running. Is there any better way to do the algorithm? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/what-is-the-best-way-to-do-cartesian-tp4807.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: what is the best way to do cartesian
You might want to try the built-in RDD.cartesian() method. On Thu, Apr 24, 2014 at 9:05 PM, Qin Wei wei@dewmobile.net wrote: Hi All, I have a problem with the Item-Based Collaborative Filtering Recommendation Algorithms in spark. The basic flow is as below: (Item1, (User1 , Score1)) RDD1 ==(Item2, (User2 , Score2)) (Item1, (User2 , Score3)) (Item2, (User1 , Score4)) RDD1.groupByKey == RDD2 (Item1, ((User1, Score1), (User2, Score3))) (Item2, ((User1, Score4), (User2, Score2))) The similarity of Vector ((User1, Score1), (User2, Score3)) and ((User1, Score4), (User2, Score2)) is the similarity of Item1 and Item2. In my situation, RDD2 contains 20 million records, my spark programm is extreamly slow, the source code is as below: val conf = new SparkConf().setMaster(spark://211.151.121.184:7077).setAppName(Score Calcu Total).set(spark.executor.memory, 20g).setJars(Seq(/home/deployer/score-calcu-assembly-1.0.jar)) val sc = new SparkContext(conf) val mongoRDD = sc.textFile(args(0).toString, 400) val jsonRDD = mongoRDD.map(arg = new JSONObject(arg)) val newRDD = jsonRDD.map(arg = { var score = haha(arg.get(a).asInstanceOf[JSONObject]) // set score to 0.5 for testing arg.put(score, 0.5) arg }) val resourceScoresRDD = newRDD.map(arg = (arg.get(rid).toString.toLong, (arg.get(zid).toString, arg.get(score).asInstanceOf[Number].doubleValue))).groupByKey().cache() val resourceScores = resourceScoresRDD.collect() val bcResourceScores = sc.broadcast(resourceScores) val simRDD = resourceScoresRDD.mapPartitions({iter = val m = bcResourceScores.value for{ (r1, v1) - iter (r2, v2) - m if r1 r2 } yield (r1, r2, cosSimilarity(v1, v2))}, true).filter(arg = arg._3 0.1) println(simRDD.count) And I saw this in Spark Web UI: http://apache-spark-user-list.1001560.n3.nabble.com/file/n4807/QQ%E6%88%AA%E5%9B%BE20140424204018.png http://apache-spark-user-list.1001560.n3.nabble.com/file/n4807/QQ%E6%88%AA%E5%9B%BE20140424204001.png My standalone cluster has 3 worker node (16 core and 32G RAM),and the workload of the machine in my cluster is heavy when the spark program is running. Is there any better way to do the algorithm? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/what-is-the-best-way-to-do-cartesian-tp4807.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: what is the best way to do cartesian
Depending on the size of the rdd you could also do a collect broadcast and then compute the product in a map function over the other rdd. If this is the same rdd you might also want to cache it. This pattern worked quite good for me Le 25 avr. 2014 18:33, Alex Boisvert alex.boisv...@gmail.com a écrit : You might want to try the built-in RDD.cartesian() method. On Thu, Apr 24, 2014 at 9:05 PM, Qin Wei wei@dewmobile.net wrote: Hi All, I have a problem with the Item-Based Collaborative Filtering Recommendation Algorithms in spark. The basic flow is as below: (Item1, (User1 , Score1)) RDD1 ==(Item2, (User2 , Score2)) (Item1, (User2 , Score3)) (Item2, (User1 , Score4)) RDD1.groupByKey == RDD2 (Item1, ((User1, Score1), (User2, Score3))) (Item2, ((User1, Score4), (User2, Score2))) The similarity of Vector ((User1, Score1), (User2, Score3)) and ((User1, Score4), (User2, Score2)) is the similarity of Item1 and Item2. In my situation, RDD2 contains 20 million records, my spark programm is extreamly slow, the source code is as below: val conf = new SparkConf().setMaster(spark://211.151.121.184:7077).setAppName(Score Calcu Total).set(spark.executor.memory, 20g).setJars(Seq(/home/deployer/score-calcu-assembly-1.0.jar)) val sc = new SparkContext(conf) val mongoRDD = sc.textFile(args(0).toString, 400) val jsonRDD = mongoRDD.map(arg = new JSONObject(arg)) val newRDD = jsonRDD.map(arg = { var score = haha(arg.get(a).asInstanceOf[JSONObject]) // set score to 0.5 for testing arg.put(score, 0.5) arg }) val resourceScoresRDD = newRDD.map(arg = (arg.get(rid).toString.toLong, (arg.get(zid).toString, arg.get(score).asInstanceOf[Number].doubleValue))).groupByKey().cache() val resourceScores = resourceScoresRDD.collect() val bcResourceScores = sc.broadcast(resourceScores) val simRDD = resourceScoresRDD.mapPartitions({iter = val m = bcResourceScores.value for{ (r1, v1) - iter (r2, v2) - m if r1 r2 } yield (r1, r2, cosSimilarity(v1, v2))}, true).filter(arg = arg._3 0.1) println(simRDD.count) And I saw this in Spark Web UI: http://apache-spark-user-list.1001560.n3.nabble.com/file/n4807/QQ%E6%88%AA%E5%9B%BE20140424204018.png http://apache-spark-user-list.1001560.n3.nabble.com/file/n4807/QQ%E6%88%AA%E5%9B%BE20140424204001.png My standalone cluster has 3 worker node (16 core and 32G RAM),and the workload of the machine in my cluster is heavy when the spark program is running. Is there any better way to do the algorithm? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/what-is-the-best-way-to-do-cartesian-tp4807.html Sent from the Apache Spark User List mailing list archive at Nabble.com.