Repository: spark
Updated Branches:
  refs/heads/master 172a52f5d -> 78062b852


[SPARK-18845][GRAPHX] PageRank has incorrect initialization value that leads to 
slow convergence

## What changes were proposed in this pull request?

Change the initial value in all PageRank implementations to be `1.0` instead of 
`resetProb` (default `0.15`) and use `outerJoinVertices` instead of 
`joinVertices` so that source vertices get updated in each iteration.

This seems to have been introduced a long time ago in 
https://github.com/apache/spark/commit/15a564598fe63003652b1e24527c432080b5976c#diff-b2bf3f97dcd2f19d61c921836159cda9L90

With the exception of graphs with sinks (which currently give incorrect results 
see SPARK-18847) this gives faster convergence as the sum of ranks is already 
correct (sum of ranks should be number of vertices).

Convergence comparision benchmark for small graph: http://imgur.com/a/HkkZf
Code for benchmark: 
https://gist.github.com/aray/a7de1f3801a810f8b1fa00c271a1fefd

## How was this patch tested?

(corrected) existing unit tests and additional test that verifies against 
result of igraph and NetworkX on a loop with a source.

Author: Andrew Ray <ray.and...@gmail.com>

Closes #16271 from aray/pagerank-initial-value.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/78062b85
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/78062b85
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/78062b85

Branch: refs/heads/master
Commit: 78062b8521bb02900baeec31992d697fa677f122
Parents: 172a52f
Author: Andrew Ray <ray.and...@gmail.com>
Authored: Thu Dec 15 23:32:10 2016 -0800
Committer: Ankur Dave <ankurd...@gmail.com>
Committed: Thu Dec 15 23:32:10 2016 -0800

----------------------------------------------------------------------
 .../org/apache/spark/graphx/lib/PageRank.scala  | 24 +++++++-------
 .../apache/spark/graphx/lib/PageRankSuite.scala | 34 +++++++++++++++++---
 2 files changed, 42 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/78062b85/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
index feb3f47..37b6e45 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
@@ -115,9 +115,9 @@ object PageRank extends Logging {
     val src: VertexId = srcId.getOrElse(-1L)
 
     // Initialize the PageRank graph with each edge attribute having
-    // weight 1/outDegree and each vertex with attribute resetProb.
+    // weight 1/outDegree and each vertex with attribute 1.0.
     // When running personalized pagerank, only the source vertex
-    // has an attribute resetProb. All others are set to 0.
+    // has an attribute 1.0. All others are set to 0.
     var rankGraph: Graph[Double, Double] = graph
       // Associate the degree with each vertex
       .outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => 
deg.getOrElse(0) }
@@ -125,7 +125,7 @@ object PageRank extends Logging {
       .mapTriplets( e => 1.0 / e.srcAttr, TripletFields.Src )
       // Set the vertex attributes to the initial pagerank values
       .mapVertices { (id, attr) =>
-        if (!(id != src && personalized)) resetProb else 0.0
+        if (!(id != src && personalized)) 1.0 else 0.0
       }
 
     def delta(u: VertexId, v: VertexId): Double = { if (u == v) 1.0 else 0.0 }
@@ -150,8 +150,8 @@ object PageRank extends Logging {
         (src: VertexId, id: VertexId) => resetProb
       }
 
-      rankGraph = rankGraph.joinVertices(rankUpdates) {
-        (id, oldRank, msgSum) => rPrb(src, id) + (1.0 - resetProb) * msgSum
+      rankGraph = rankGraph.outerJoinVertices(rankUpdates) {
+        (id, oldRank, msgSumOpt) => rPrb(src, id) + (1.0 - resetProb) * 
msgSumOpt.getOrElse(0.0)
       }.cache()
 
       rankGraph.edges.foreachPartition(x => {}) // also materializes 
rankGraph.vertices
@@ -196,7 +196,7 @@ object PageRank extends Logging {
     // we won't be able to store its activations in a sparse vector
     val zero = Vectors.sparse(sources.size, List()).asBreeze
     val sourcesInitMap = sources.zipWithIndex.map { case (vid, i) =>
-      val v = Vectors.sparse(sources.size, Array(i), Array(resetProb)).asBreeze
+      val v = Vectors.sparse(sources.size, Array(i), Array(1.0)).asBreeze
       (vid, v)
     }.toMap
     val sc = graph.vertices.sparkContext
@@ -225,11 +225,11 @@ object PageRank extends Logging {
         ctx => ctx.sendToDst(ctx.srcAttr :* ctx.attr),
         (a : BV[Double], b : BV[Double]) => a :+ b, TripletFields.Src)
 
-      rankGraph = rankGraph.joinVertices(rankUpdates) {
-        (vid, oldRank, msgSum) =>
-          val popActivations: BV[Double] = msgSum :* (1.0 - resetProb)
+      rankGraph = rankGraph.outerJoinVertices(rankUpdates) {
+        (vid, oldRank, msgSumOpt) =>
+          val popActivations: BV[Double] = msgSumOpt.getOrElse(zero) :* (1.0 - 
resetProb)
           val resetActivations = if (sourcesInitMapBC.value contains vid) {
-            sourcesInitMapBC.value(vid)
+            sourcesInitMapBC.value(vid) :* resetProb
           } else {
             zero
           }
@@ -307,7 +307,7 @@ object PageRank extends Logging {
       .mapTriplets( e => 1.0 / e.srcAttr )
       // Set the vertex attributes to (initialPR, delta = 0)
       .mapVertices { (id, attr) =>
-        if (id == src) (resetProb, Double.NegativeInfinity) else (0.0, 0.0)
+        if (id == src) (1.0, Double.NegativeInfinity) else (0.0, 0.0)
       }
       .cache()
 
@@ -323,7 +323,7 @@ object PageRank extends Logging {
       msgSum: Double): (Double, Double) = {
       val (oldPR, lastDelta) = attr
       var teleport = oldPR
-      val delta = if (src==id) 1.0 else 0.0
+      val delta = if (src==id) resetProb else 0.0
       teleport = oldPR*delta
 
       val newPR = teleport + (1.0 - resetProb) * msgSum

http://git-wip-us.apache.org/repos/asf/spark/blob/78062b85/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
----------------------------------------------------------------------
diff --git 
a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala 
b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
index b6305c8..6afbb5a 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
@@ -41,7 +41,7 @@ object GridPageRank {
       }
     }
     // compute the pagerank
-    var pr = Array.fill(nRows * nCols)(resetProb)
+    var pr = Array.fill(nRows * nCols)(1.0)
     for (iter <- 0 until nIter) {
       val oldPr = pr
       pr = new Array[Double](nRows * nCols)
@@ -70,10 +70,10 @@ class PageRankSuite extends SparkFunSuite with 
LocalSparkContext {
       val resetProb = 0.15
       val errorTol = 1.0e-5
 
-      val staticRanks1 = starGraph.staticPageRank(numIter = 1, 
resetProb).vertices
-      val staticRanks2 = starGraph.staticPageRank(numIter = 2, 
resetProb).vertices.cache()
+      val staticRanks1 = starGraph.staticPageRank(numIter = 2, 
resetProb).vertices
+      val staticRanks2 = starGraph.staticPageRank(numIter = 3, 
resetProb).vertices.cache()
 
-      // Static PageRank should only take 2 iterations to converge
+      // Static PageRank should only take 3 iterations to converge
       val notMatching = staticRanks1.innerZipJoin(staticRanks2) { (vid, pr1, 
pr2) =>
         if (pr1 != pr2) 1 else 0
       }.map { case (vid, test) => test }.sum()
@@ -203,4 +203,30 @@ class PageRankSuite extends SparkFunSuite with 
LocalSparkContext {
       assert(compareRanks(staticRanks, parallelStaticRanks) < errorTol)
     }
   }
+
+  test("Loop with source PageRank") {
+    withSpark { sc =>
+      val edges = sc.parallelize((1L, 2L) :: (2L, 3L) :: (3L, 4L) :: (4L, 2L) 
:: Nil)
+      val g = Graph.fromEdgeTuples(edges, 1)
+      val resetProb = 0.15
+      val tol = 0.0001
+      val numIter = 50
+      val errorTol = 1.0e-5
+
+      val staticRanks = g.staticPageRank(numIter, resetProb).vertices
+      val dynamicRanks = g.pageRank(tol, resetProb).vertices
+      assert(compareRanks(staticRanks, dynamicRanks) < errorTol)
+
+      // Computed in igraph 1.0 w/ R bindings:
+      // > page_rank(graph_from_literal( A -+ B -+ C -+ D -+ B))
+      // Alternatively in NetworkX 1.11:
+      // > nx.pagerank(nx.DiGraph([(1,2),(2,3),(3,4),(4,2)]))
+      // We multiply by the number of vertices to account for difference in 
normalization
+      val igraphPR = Seq(0.0375000, 0.3326045, 0.3202138, 0.3096817).map(_ * 4)
+      val ranks = VertexRDD(sc.parallelize(1L to 4L zip igraphPR))
+      assert(compareRanks(staticRanks, ranks) < errorTol)
+      assert(compareRanks(dynamicRanks, ranks) < errorTol)
+
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to