You might want to try the built-in RDD.cartesian() method.

On Thu, Apr 24, 2014 at 9:05 PM, Qin Wei <> wrote:

> Hi All,
> I have a problem with the Item-Based Collaborative Filtering Recommendation
> Algorithms in spark.
> The basic flow is as below:
>                                             (Item1    ,  (User1     ,
> Score1))
>        RDD1     ==>                    (Item2    ,  (User2     ,   Score2))
>                                             (Item1    ,  (User2     ,
> Score3))
>                                             (Item2    ,  (User1     ,
> Score4))
>        RDD1.groupByKey   ==>  RDD2
>                                             (Item1,      ((User1,
> Score1),
> (User2,   Score3)))
>                                             (Item2,      ((User1,
> Score4),
> (User2,   Score2)))
> The similarity of Vector  ((User1,   Score1),   (User2,   Score3)) and
> ((User1,   Score4),   (User2,   Score2)) is the similarity of Item1 and
> Item2.
> In my situation, RDD2 contains 20 million records, my spark programm is
> extreamly slow, the source code is as below:
>                                 val conf = new
> SparkConf().setMaster("spark://").setAppName("Score
> Calcu Total").set("spark.executor.memory",
> "20g").setJars(Seq("/home/deployer/score-calcu-assembly-1.0.jar"))
>                                 val sc = new SparkContext(conf)
>                                 val mongoRDD =
> sc.textFile(args(0).toString,
> 400)
>                                 val jsonRDD = => new
> JSONObject(arg))
>                                 val newRDD = => {
>                                 var score =
> haha(arg.get("a").asInstanceOf[JSONObject])
>                                 // set score to 0.5 for testing
>                                 arg.put("score", 0.5)
>                                 arg
>                                 })
>                                 val resourceScoresRDD = =>
> (arg.get("rid").toString.toLong, (arg.get("zid").toString,
> arg.get("score").asInstanceOf[Number].doubleValue))).groupByKey().cache()
>                                 val resourceScores =
> resourceScoresRDD.collect()
>                                 val bcResourceScores =
> sc.broadcast(resourceScores)
>                                 val simRDD =
> resourceScoresRDD.mapPartitions({iter =>
>                                 val m = bcResourceScores.value
>                                 for{ (r1, v1) <- iter
>                                        (r2, v2) <- m
>                                        if r1 > r2
>                                     } yield (r1, r2, cosSimilarity(v1,
> v2))}, true).filter(arg => arg._3 > 0.1)
>                                 println(simRDD.count)
> And I saw this in Spark Web UI:
> <
> >
> <
> >
> My standalone cluster has 3 worker node (16 core and 32G RAM),and the
> workload of the machine in my cluster is heavy when the spark program is
> running.
> Is there any better way to do the algorithm?
> Thanks!
> --
> View this message in context:
> Sent from the Apache Spark User List mailing list archive at

Reply via email to