You might want to try the built-in RDD.cartesian() method.
On Thu, Apr 24, 2014 at 9:05 PM, Qin Wei <wei....@dewmobile.net> wrote: > Hi All, > > I have a problem with the Item-Based Collaborative Filtering Recommendation > Algorithms in spark. > The basic flow is as below: > (Item1 , (User1 , > Score1)) > RDD1 ==> (Item2 , (User2 , Score2)) > (Item1 , (User2 , > Score3)) > (Item2 , (User1 , > Score4)) > > RDD1.groupByKey ==> RDD2 > (Item1, ((User1, > Score1), > (User2, Score3))) > (Item2, ((User1, > Score4), > (User2, Score2))) > > The similarity of Vector ((User1, Score1), (User2, Score3)) and > ((User1, Score4), (User2, Score2)) is the similarity of Item1 and > Item2. > > In my situation, RDD2 contains 20 million records, my spark programm is > extreamly slow, the source code is as below: > val conf = new > SparkConf().setMaster("spark://211.151.121.184:7077").setAppName("Score > Calcu Total").set("spark.executor.memory", > "20g").setJars(Seq("/home/deployer/score-calcu-assembly-1.0.jar")) > val sc = new SparkContext(conf) > > val mongoRDD = > sc.textFile(args(0).toString, > 400) > val jsonRDD = mongoRDD.map(arg => new > JSONObject(arg)) > > val newRDD = jsonRDD.map(arg => { > var score = > haha(arg.get("a").asInstanceOf[JSONObject]) > > // set score to 0.5 for testing > arg.put("score", 0.5) > arg > }) > > val resourceScoresRDD = newRDD.map(arg => > (arg.get("rid").toString.toLong, (arg.get("zid").toString, > arg.get("score").asInstanceOf[Number].doubleValue))).groupByKey().cache() > val resourceScores = > resourceScoresRDD.collect() > val bcResourceScores = > sc.broadcast(resourceScores) > > val simRDD = > resourceScoresRDD.mapPartitions({iter => > val m = bcResourceScores.value > for{ (r1, v1) <- iter > (r2, v2) <- m > if r1 > r2 > } yield (r1, r2, cosSimilarity(v1, > v2))}, true).filter(arg => arg._3 > 0.1) > > println(simRDD.count) > > And I saw this in Spark Web UI: > < > http://apache-spark-user-list.1001560.n3.nabble.com/file/n4807/QQ%E6%88%AA%E5%9B%BE20140424204018.png > > > < > http://apache-spark-user-list.1001560.n3.nabble.com/file/n4807/QQ%E6%88%AA%E5%9B%BE20140424204001.png > > > > My standalone cluster has 3 worker node (16 core and 32G RAM),and the > workload of the machine in my cluster is heavy when the spark program is > running. > > Is there any better way to do the algorithm? > > Thanks! > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/what-is-the-best-way-to-do-cartesian-tp4807.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. >