You might want to try the built-in RDD.cartesian() method.

On Thu, Apr 24, 2014 at 9:05 PM, Qin Wei <wei....@dewmobile.net> wrote:

> Hi All,
>
> I have a problem with the Item-Based Collaborative Filtering Recommendation
> Algorithms in spark.
> The basic flow is as below:
>                                             (Item1    ,  (User1     ,
> Score1))
>        RDD1     ==>                    (Item2    ,  (User2     ,   Score2))
>                                             (Item1    ,  (User2     ,
> Score3))
>                                             (Item2    ,  (User1     ,
> Score4))
>
>        RDD1.groupByKey   ==>  RDD2
>                                             (Item1,      ((User1,
> Score1),
> (User2,   Score3)))
>                                             (Item2,      ((User1,
> Score4),
> (User2,   Score2)))
>
> The similarity of Vector  ((User1,   Score1),   (User2,   Score3)) and
> ((User1,   Score4),   (User2,   Score2)) is the similarity of Item1 and
> Item2.
>
> In my situation, RDD2 contains 20 million records, my spark programm is
> extreamly slow, the source code is as below:
>                                 val conf = new
> SparkConf().setMaster("spark://211.151.121.184:7077").setAppName("Score
> Calcu Total").set("spark.executor.memory",
> "20g").setJars(Seq("/home/deployer/score-calcu-assembly-1.0.jar"))
>                                 val sc = new SparkContext(conf)
>
>                                 val mongoRDD =
> sc.textFile(args(0).toString,
> 400)
>                                 val jsonRDD = mongoRDD.map(arg => new
> JSONObject(arg))
>
>                                 val newRDD = jsonRDD.map(arg => {
>                                 var score =
> haha(arg.get("a").asInstanceOf[JSONObject])
>
>                                 // set score to 0.5 for testing
>                                 arg.put("score", 0.5)
>                                 arg
>                                 })
>
>                                 val resourceScoresRDD = newRDD.map(arg =>
> (arg.get("rid").toString.toLong, (arg.get("zid").toString,
> arg.get("score").asInstanceOf[Number].doubleValue))).groupByKey().cache()
>                                 val resourceScores =
> resourceScoresRDD.collect()
>                                 val bcResourceScores =
> sc.broadcast(resourceScores)
>
>                                 val simRDD =
> resourceScoresRDD.mapPartitions({iter =>
>                                 val m = bcResourceScores.value
>                                 for{ (r1, v1) <- iter
>                                        (r2, v2) <- m
>                                        if r1 > r2
>                                     } yield (r1, r2, cosSimilarity(v1,
> v2))}, true).filter(arg => arg._3 > 0.1)
>
>                                 println(simRDD.count)
>
> And I saw this in Spark Web UI:
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/file/n4807/QQ%E6%88%AA%E5%9B%BE20140424204018.png
> >
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/file/n4807/QQ%E6%88%AA%E5%9B%BE20140424204001.png
> >
>
> My standalone cluster has 3 worker node (16 core and 32G RAM),and the
> workload of the machine in my cluster is heavy when the spark program is
> running.
>
> Is there any better way to do the algorithm?
>
> Thanks!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/what-is-the-best-way-to-do-cartesian-tp4807.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

Reply via email to