Repository: spark
Updated Branches:
  refs/heads/master 6c4b9f4be -> 5d92326be


[SPARK-16478] graphX (added graph caching in strongly connected components)

## What changes were proposed in this pull request?

I added caching in every iteration for sccGraph that is returned in strongly 
connected components. Without this cache strongly connected components returned 
graph that needed to be computed from scratch when some intermediary caches 
didn't existed anymore.

## How was this patch tested?
I tested it by running code similar to the one  [on 
databrics](https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/4889410027417133/3634650767364730/3117184429335832/latest.html).
 Basically I generated large graph  and computed strongly connected components 
with changed code, than simply run count on vertices and edges. Count after 
this update takes few seconds instead 20 minutes.

# statement
contribution is my original work and I license the work to the project under 
the project's open source license.

Author: Michał Wesołowski <michal.wesolow...@bzwbk.pl>

Closes #14137 from wesolowskim/SPARK-16478.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5d92326b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5d92326b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5d92326b

Branch: refs/heads/master
Commit: 5d92326be76cb15edc6e18e94a373e197f696803
Parents: 6c4b9f4
Author: Michał Wesołowski <michal.wesolow...@bzwbk.pl>
Authored: Tue Jul 19 12:18:42 2016 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Tue Jul 19 12:18:42 2016 +0100

----------------------------------------------------------------------
 .../lib/StronglyConnectedComponents.scala       | 86 ++++++++++++--------
 1 file changed, 50 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5d92326b/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala
----------------------------------------------------------------------
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala
 
b/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala
old mode 100644
new mode 100755
index 1fa92b0..e4f80ff
--- 
a/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala
+++ 
b/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala
@@ -44,6 +44,9 @@ object StronglyConnectedComponents {
     // graph we are going to work with in our iterations
     var sccWorkGraph = graph.mapVertices { case (vid, _) => (vid, false) 
}.cache()
 
+    // helper variables to unpersist cached graphs
+    var prevSccGraph = sccGraph
+
     var numVertices = sccWorkGraph.numVertices
     var iter = 0
     while (sccWorkGraph.numVertices > 0 && iter < numIter) {
@@ -64,48 +67,59 @@ object StronglyConnectedComponents {
         // write values to sccGraph
         sccGraph = sccGraph.outerJoinVertices(finalVertices) {
           (vid, scc, opt) => opt.getOrElse(scc)
-        }
+        }.cache()
+        // materialize vertices and edges
+        sccGraph.vertices.count()
+        sccGraph.edges.count()
+        // sccGraph materialized so, unpersist can be done on previous
+        prevSccGraph.unpersist(blocking = false)
+        prevSccGraph = sccGraph
+
         // only keep vertices that are not final
         sccWorkGraph = sccWorkGraph.subgraph(vpred = (vid, data) => 
!data._2).cache()
       } while (sccWorkGraph.numVertices < numVertices)
 
-      sccWorkGraph = sccWorkGraph.mapVertices{ case (vid, (color, isFinal)) => 
(vid, isFinal) }
+      // if iter < numIter at this point sccGraph that is returned
+      // will not be recomputed and pregel executions are pointless
+      if (iter < numIter) {
+        sccWorkGraph = sccWorkGraph.mapVertices { case (vid, (color, isFinal)) 
=> (vid, isFinal) }
 
-      // collect min of all my neighbor's scc values, update if it's smaller 
than mine
-      // then notify any neighbors with scc values larger than mine
-      sccWorkGraph = Pregel[(VertexId, Boolean), ED, VertexId](
-        sccWorkGraph, Long.MaxValue, activeDirection = EdgeDirection.Out)(
-        (vid, myScc, neighborScc) => (math.min(myScc._1, neighborScc), 
myScc._2),
-        e => {
-          if (e.srcAttr._1 < e.dstAttr._1) {
-            Iterator((e.dstId, e.srcAttr._1))
-          } else {
-            Iterator()
-          }
-        },
-        (vid1, vid2) => math.min(vid1, vid2))
+        // collect min of all my neighbor's scc values, update if it's smaller 
than mine
+        // then notify any neighbors with scc values larger than mine
+        sccWorkGraph = Pregel[(VertexId, Boolean), ED, VertexId](
+          sccWorkGraph, Long.MaxValue, activeDirection = EdgeDirection.Out)(
+          (vid, myScc, neighborScc) => (math.min(myScc._1, neighborScc), 
myScc._2),
+          e => {
+            if (e.srcAttr._1 < e.dstAttr._1) {
+              Iterator((e.dstId, e.srcAttr._1))
+            } else {
+              Iterator()
+            }
+          },
+          (vid1, vid2) => math.min(vid1, vid2))
 
-      // start at root of SCCs. Traverse values in reverse, notify all my 
neighbors
-      // do not propagate if colors do not match!
-      sccWorkGraph = Pregel[(VertexId, Boolean), ED, Boolean](
-        sccWorkGraph, false, activeDirection = EdgeDirection.In)(
-        // vertex is final if it is the root of a color
-        // or it has the same color as a neighbor that is final
-        (vid, myScc, existsSameColorFinalNeighbor) => {
-          val isColorRoot = vid == myScc._1
-          (myScc._1, myScc._2 || isColorRoot || existsSameColorFinalNeighbor)
-        },
-        // activate neighbor if they are not final, you are, and you have the 
same color
-        e => {
-          val sameColor = e.dstAttr._1 == e.srcAttr._1
-          val onlyDstIsFinal = e.dstAttr._2 && !e.srcAttr._2
-          if (sameColor && onlyDstIsFinal) {
-            Iterator((e.srcId, e.dstAttr._2))
-          } else {
-            Iterator()
-          }
-        },
-        (final1, final2) => final1 || final2)
+        // start at root of SCCs. Traverse values in reverse, notify all my 
neighbors
+        // do not propagate if colors do not match!
+        sccWorkGraph = Pregel[(VertexId, Boolean), ED, Boolean](
+          sccWorkGraph, false, activeDirection = EdgeDirection.In)(
+          // vertex is final if it is the root of a color
+          // or it has the same color as a neighbor that is final
+          (vid, myScc, existsSameColorFinalNeighbor) => {
+            val isColorRoot = vid == myScc._1
+            (myScc._1, myScc._2 || isColorRoot || existsSameColorFinalNeighbor)
+          },
+          // activate neighbor if they are not final, you are, and you have 
the same color
+          e => {
+            val sameColor = e.dstAttr._1 == e.srcAttr._1
+            val onlyDstIsFinal = e.dstAttr._2 && !e.srcAttr._2
+            if (sameColor && onlyDstIsFinal) {
+              Iterator((e.srcId, e.dstAttr._2))
+            } else {
+              Iterator()
+            }
+          },
+          (final1, final2) => final1 || final2)
+      }
     }
     sccGraph
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to