Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/14998#discussion_r78090050 --- Diff: graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala --- @@ -163,6 +166,85 @@ object PageRank extends Logging { } /** + * Run Personalized PageRank for a fixed number of iterations, for a + * set of starting nodes in parallel. Returns a graph with vertex attributes + * containing the pagerank relative to all starting nodes (as a sparse vector) and + * edge attributes the normalized edge weight + * + * @tparam VD The original vertex attribute (not used) + * @tparam ED The original edge attribute (not used) + * + * @param graph The graph on which to compute personalized pagerank + * @param numIter The number of iterations to run + * @param resetProb The random reset probability + * @param sources The list of sources to compute personalized pagerank from + * @return the graph with vertex attributes + * containing the pagerank relative to all starting nodes (as a sparse vector) and + * edge attributes the normalized edge weight + */ + def runParallelPersonalizedPageRank[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], + numIter: Int, resetProb: Double = 0.15, + sources: Array[VertexId]): Graph[Vector, Double] = { + // TODO if one sources vertex id is outside of the int range + // we won't be able to store its activations in a sparse vector + val zero = Vectors.sparse(sources.size, List()).asBreeze + val sourcesInitMap = sources.zipWithIndex.map { case (vid, i) => + val v = Vectors.sparse(sources.size, Array(i), Array(resetProb)).asBreeze + (vid, v) + }.toMap + val sc = graph.vertices.sparkContext + val sourcesInitMapBC = sc.broadcast(sourcesInitMap) + // Initialize the PageRank graph with each edge attribute having + // weight 1/outDegree and each source vertex with attribute 1.0. + var rankGraph = graph + // Associate the degree with each vertex + .outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0) } + // Set the weight on the edges based on the degree + .mapTriplets(e => 1.0 / e.srcAttr, TripletFields.Src) + .mapVertices { (vid, attr) => + if (sourcesInitMapBC.value contains vid) { + sourcesInitMapBC.value(vid) + } else { + zero + } + } + + var i = 0 + while (i < numIter) { + val prevRankGraph = rankGraph + // Propagates the message along outbound edges + // and adding start nodes back in with activation resetProb + val rankUpdates = rankGraph.aggregateMessages[BV[Double]]( + ctx => ctx.sendToDst(ctx.srcAttr :* ctx.attr), + (a : BV[Double], b : BV[Double]) => a :+ b, TripletFields.Src) + + rankGraph = rankGraph.joinVertices(rankUpdates) { + (vid, oldRank, msgSum) => + val popActivations: BV[Double] = msgSum :* (1.0 - resetProb) --- End diff -- We found that `breeze` is fairly slow by doing this operation. Is it possible to use native spark vector, and use our linear algebra package?
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org