Repository: spark
Updated Branches:
  refs/heads/branch-1.5 03a8a889a -> 416392697


[SPARK-9508] GraphX Pregel docs update with new Pregel code

SPARK-9436 simplifies the Pregel code. graphx-programming-guide needs to be 
modified accordingly since it lists the old Pregel code

Author: Alexander Ulanov <na...@yandex.ru>

Closes #7831 from avulanov/SPARK-9508-pregel-doc2.

(cherry picked from commit 1c843e284818004f16c3f1101c33b510f80722e3)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/41639269
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/41639269
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/41639269

Branch: refs/heads/branch-1.5
Commit: 416392697f20de27b37db0cf0bad15a0e5ac9ebe
Parents: 03a8a88
Author: Alexander Ulanov <na...@yandex.ru>
Authored: Tue Aug 18 22:13:52 2015 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Tue Aug 18 22:13:57 2015 -0700

----------------------------------------------------------------------
 docs/graphx-programming-guide.md | 18 ++++++++----------
 1 file changed, 8 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/41639269/docs/graphx-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md
index 99f8c82..c861a763 100644
--- a/docs/graphx-programming-guide.md
+++ b/docs/graphx-programming-guide.md
@@ -768,16 +768,14 @@ class GraphOps[VD, ED] {
     // Loop until no messages remain or maxIterations is achieved
     var i = 0
     while (activeMessages > 0 && i < maxIterations) {
-      // Receive the messages: 
-----------------------------------------------------------------------
-      // Run the vertex program on all vertices that receive messages
-      val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
-      // Merge the new vertex values back into the graph
-      g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => 
newOpt.getOrElse(old) }.cache()
-      // Send Messages: 
------------------------------------------------------------------------------
-      // Vertices that didn't receive a message above don't appear in newVerts 
and therefore don't
-      // get to send messages.  More precisely the map phase of 
mapReduceTriplets is only invoked
-      // on edges in the activeDir of vertices in newVerts
-      messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, 
activeDir))).cache()
+      // Receive the messages and update the vertices.
+      g = g.joinVertices(messages)(vprog).cache()
+      val oldMessages = messages
+      // Send new messages, skipping edges where neither side received a 
message. We must cache
+      // messages so it can be materialized on the next line, allowing us to 
uncache the previous
+      // iteration.
+      messages = g.mapReduceTriplets(
+        sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
       activeMessages = messages.count()
       i += 1
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to