Repository: spark
Updated Branches:
  refs/heads/master 0f90f4e6a -> 3d79f1065


[SPARK-3650][GRAPHX] Triangle Count handles reverse edges incorrectly

jegonzal ankurdave please could you review

## What changes were proposed in this pull request?

Reworking of jegonzal PR #2495 to address the issue identified in SPARK-3650. 
Code amended to use the convertToCanonicalEdges method.

## How was the this patch tested?

Patch was tested using the unit tests created in PR #2495

Author: Robin East <robin.e...@xense.co.uk>
Author: Joseph E. Gonzalez <joseph.e.gonza...@gmail.com>

Closes #11290 from insidedctm/spark-3650.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3d79f106
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3d79f106
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3d79f106

Branch: refs/heads/master
Commit: 3d79f1065cd02133ad9dd4423c09b8c8b52b38e2
Parents: 0f90f4e
Author: Robin East <robin.e...@xense.co.uk>
Authored: Sun Feb 21 17:07:17 2016 -0800
Committer: Reynold Xin <r...@databricks.com>
Committed: Sun Feb 21 17:07:17 2016 -0800

----------------------------------------------------------------------
 .../org/apache/spark/graphx/GraphOps.scala      | 14 +++++
 .../apache/spark/graphx/lib/TriangleCount.scala | 63 +++++++++++++-------
 .../org/apache/spark/graphx/GraphOpsSuite.scala | 15 +++++
 .../spark/graphx/lib/TriangleCountSuite.scala   |  7 ++-
 4 files changed, 76 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3d79f106/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
index 97a8223..3e8c385 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
@@ -17,10 +17,15 @@
 
 package org.apache.spark.graphx
 
+import scala.reflect.{classTag, ClassTag}
 import scala.reflect.ClassTag
 import scala.util.Random
 
+import org.apache.spark.HashPartitioner
+import org.apache.spark.SparkContext._
 import org.apache.spark.SparkException
+import org.apache.spark.graphx.impl.EdgePartitionBuilder
+import org.apache.spark.graphx.impl.GraphImpl
 import org.apache.spark.graphx.lib._
 import org.apache.spark.rdd.RDD
 
@@ -184,6 +189,15 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: 
Graph[VD, ED]) extends Seriali
   }
 
   /**
+   * Remove self edges.
+   *
+   * @return a graph with all self edges removed
+   */
+  def removeSelfEdges(): Graph[VD, ED] = {
+    graph.subgraph(epred = e => e.srcId != e.dstId)
+  }
+
+  /**
    * Join the vertices with an RDD and then apply a function from the
    * vertex and RDD entry to a new vertex value.  The input table
    * should contain at most one entry for each vertex.  If no entry is

http://git-wip-us.apache.org/repos/asf/spark/blob/3d79f106/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala
----------------------------------------------------------------------
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala
index a5d5980..51bcdf2 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala
@@ -20,6 +20,7 @@ package org.apache.spark.graphx.lib
 import scala.reflect.ClassTag
 
 import org.apache.spark.graphx._
+import org.apache.spark.graphx.PartitionStrategy.EdgePartition2D
 
 /**
  * Compute the number of triangles passing through each vertex.
@@ -27,25 +28,47 @@ import org.apache.spark.graphx._
  * The algorithm is relatively straightforward and can be computed in three 
steps:
  *
  * <ul>
- * <li>Compute the set of neighbors for each vertex
- * <li>For each edge compute the intersection of the sets and send the count 
to both vertices.
- * <li> Compute the sum at each vertex and divide by two since each triangle 
is counted twice.
+ * <li> Compute the set of neighbors for each vertex</li>
+ * <li> For each edge compute the intersection of the sets and send the count 
to both vertices.</li>
+ * <li> Compute the sum at each vertex and divide by two since each triangle 
is counted twice.</li>
  * </ul>
  *
- * Note that the input graph should have its edges in canonical direction
- * (i.e. the `sourceId` less than `destId`). Also the graph must have been 
partitioned
- * using [[org.apache.spark.graphx.Graph#partitionBy]].
+ * There are two implementations.  The default `TriangleCount.run` 
implementation first removes
+ * self cycles and canonicalizes the graph to ensure that the following 
conditions hold:
+ * <ul>
+ * <li> There are no self edges</li>
+ * <li> All edges are oriented src > dst</li>
+ * <li> There are no duplicate edges</li>
+ * </ul>
+ * However, the canonicalization procedure is costly as it requires 
repartitioning the graph.
+ * If the input data is already in "canonical form" with self cycles removed 
then the
+ * `TriangleCount.runPreCanonicalized` should be used instead.
+ *
+ * {{{
+ * val canonicalGraph = graph.mapEdges(e => 
1).removeSelfEdges().canonicalizeEdges()
+ * val counts = TriangleCount.runPreCanonicalized(canonicalGraph).vertices
+ * }}}
+ *
  */
 object TriangleCount {
 
   def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED] = {
-    // Remove redundant edges
-    val g = graph.groupEdges((a, b) => a).cache()
+    // Transform the edge data something cheap to shuffle and then canonicalize
+    val canonicalGraph = graph.mapEdges(e => 
true).removeSelfEdges().convertToCanonicalEdges()
+    // Get the triangle counts
+    val counters = runPreCanonicalized(canonicalGraph).vertices
+    // Join them bath with the original graph
+    graph.outerJoinVertices(counters) { (vid, _, optCounter: Option[Int]) =>
+      optCounter.getOrElse(0)
+    }
+  }
 
+
+  def runPreCanonicalized[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): 
Graph[Int, ED] = {
     // Construct set representations of the neighborhoods
     val nbrSets: VertexRDD[VertexSet] =
-      g.collectNeighborIds(EdgeDirection.Either).mapValues { (vid, nbrs) =>
-        val set = new VertexSet(4)
+      graph.collectNeighborIds(EdgeDirection.Either).mapValues { (vid, nbrs) =>
+        val set = new VertexSet(nbrs.length)
         var i = 0
         while (i < nbrs.size) {
           // prevent self cycle
@@ -56,14 +79,14 @@ object TriangleCount {
         }
         set
       }
+
     // join the sets with the graph
-    val setGraph: Graph[VertexSet, ED] = g.outerJoinVertices(nbrSets) {
+    val setGraph: Graph[VertexSet, ED] = graph.outerJoinVertices(nbrSets) {
       (vid, _, optSet) => optSet.getOrElse(null)
     }
+
     // Edge function computes intersection of smaller vertex with larger vertex
     def edgeFunc(ctx: EdgeContext[VertexSet, ED, Int]) {
-      assert(ctx.srcAttr != null)
-      assert(ctx.dstAttr != null)
       val (smallSet, largeSet) = if (ctx.srcAttr.size < ctx.dstAttr.size) {
         (ctx.srcAttr, ctx.dstAttr)
       } else {
@@ -80,15 +103,15 @@ object TriangleCount {
       ctx.sendToSrc(counter)
       ctx.sendToDst(counter)
     }
+
     // compute the intersection along edges
     val counters: VertexRDD[Int] = setGraph.aggregateMessages(edgeFunc, _ + _)
     // Merge counters with the graph and divide by two since each triangle is 
counted twice
-    g.outerJoinVertices(counters) {
-      (vid, _, optCounter: Option[Int]) =>
-        val dblCount = optCounter.getOrElse(0)
-        // double count should be even (divisible by two)
-        assert((dblCount & 1) == 0)
-        dblCount / 2
+    graph.outerJoinVertices(counters) { (_, _, optCounter: Option[Int]) =>
+      val dblCount = optCounter.getOrElse(0)
+      // This algorithm double counts each triangle so the final count should 
be even
+      require(dblCount % 2 == 0, "Triangle count resulted in an invalid number 
of triangles.")
+      dblCount / 2
     }
-  } // end of TriangleCount
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3d79f106/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala 
b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
index 57a8b95..3967f66 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
@@ -55,6 +55,21 @@ class GraphOpsSuite extends SparkFunSuite with 
LocalSparkContext {
     }
   }
 
+  test("removeSelfEdges") {
+    withSpark { sc =>
+      val edgeArray = Array((1 -> 2), (2 -> 3), (3 -> 3), (4 -> 3), (1 -> 1))
+        .map {
+          case (a, b) => (a.toLong, b.toLong)
+        }
+      val correctEdges = edgeArray.filter { case (a, b) => a != b }.toSet
+      val graph = Graph.fromEdgeTuples(sc.parallelize(edgeArray), 1)
+      val canonicalizedEdges = graph.removeSelfEdges().edges.map(e => 
(e.srcId, e.dstId))
+        .collect
+      assert(canonicalizedEdges.toSet.size === canonicalizedEdges.size)
+      assert(canonicalizedEdges.toSet === correctEdges)
+    }
+  }
+
   test ("filter") {
     withSpark { sc =>
       val n = 5

http://git-wip-us.apache.org/repos/asf/spark/blob/3d79f106/graphx/src/test/scala/org/apache/spark/graphx/lib/TriangleCountSuite.scala
----------------------------------------------------------------------
diff --git 
a/graphx/src/test/scala/org/apache/spark/graphx/lib/TriangleCountSuite.scala 
b/graphx/src/test/scala/org/apache/spark/graphx/lib/TriangleCountSuite.scala
index 608e43c..f19c3ac 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/lib/TriangleCountSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/TriangleCountSuite.scala
@@ -64,9 +64,9 @@ class TriangleCountSuite extends SparkFunSuite with 
LocalSparkContext {
       val verts = triangleCount.vertices
       verts.collect().foreach { case (vid, count) =>
         if (vid == 0) {
-          assert(count === 4)
-        } else {
           assert(count === 2)
+        } else {
+          assert(count === 1)
         }
       }
     }
@@ -75,7 +75,8 @@ class TriangleCountSuite extends SparkFunSuite with 
LocalSparkContext {
   test("Count a single triangle with duplicate edges") {
     withSpark { sc =>
       val rawEdges = sc.parallelize(Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
-        Array(0L -> 1L, 1L -> 2L, 2L -> 0L), 2)
+        Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
+        Array(1L -> 0L, 1L -> 1L), 2)
       val graph = Graph.fromEdgeTuples(rawEdges, true, uniqueEdges = 
Some(RandomVertexCut)).cache()
       val triangleCount = graph.triangleCount()
       val verts = triangleCount.vertices


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to