Repository: spark
Updated Branches:
  refs/heads/branch-1.2 a9abcaa2c -> 00112baf9


[SPARK-1955][GraphX]: VertexRDD can incorrectly assume index sharing

Fixes the issue whereby when VertexRDD's are `diff`ed, `innerJoin`ed, or 
`leftJoin`ed and have different partition sizes they fail under the 
`zipPartitions` method. This fix tests whether the partitions are equal or not 
and, if not, will repartition the other to match the partition size of the 
calling VertexRDD.

Author: Brennon York <brennon.y...@capitalone.com>

Closes #4705 from brennonyork/SPARK-1955 and squashes the following commits:

0882590 [Brennon York] updated to properly handle differently-partitioned 
vertexRDDs

(cherry picked from commit 9f603fce78fcc997926e9a72dec44d48cbc396fc)
Signed-off-by: Ankur Dave <ankurd...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/00112baf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/00112baf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/00112baf

Branch: refs/heads/branch-1.2
Commit: 00112baf9e9707e9a773e8076dc4ed2957803bfd
Parents: a9abcaa
Author: Brennon York <brennon.y...@capitalone.com>
Authored: Wed Feb 25 14:11:12 2015 -0800
Committer: Ankur Dave <ankurd...@gmail.com>
Committed: Wed Feb 25 14:14:22 2015 -0800

----------------------------------------------------------------------
 .../org/apache/spark/graphx/impl/VertexRDDImpl.scala    | 12 +++++++++---
 1 file changed, 9 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/00112baf/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
----------------------------------------------------------------------
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
index 9732c5b..d9bf9fe 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
@@ -94,8 +94,14 @@ class VertexRDDImpl[VD] private[graphx] (
     this.mapVertexPartitions(_.map(f))
 
   override def diff(other: VertexRDD[VD]): VertexRDD[VD] = {
+    val otherPartition = other match {
+      case other: VertexRDD[_] if this.partitioner == other.partitioner =>
+        other.partitionsRDD
+      case _ =>
+        VertexRDD(other.partitionBy(this.partitioner.get)).partitionsRDD
+    }
     val newPartitionsRDD = partitionsRDD.zipPartitions(
-      other.partitionsRDD, preservesPartitioning = true
+      otherPartition, preservesPartitioning = true
     ) { (thisIter, otherIter) =>
       val thisPart = thisIter.next()
       val otherPart = otherIter.next()
@@ -123,7 +129,7 @@ class VertexRDDImpl[VD] private[graphx] (
     // Test if the other vertex is a VertexRDD to choose the optimal join 
strategy.
     // If the other set is a VertexRDD then we use the much more efficient 
leftZipJoin
     other match {
-      case other: VertexRDD[_] =>
+      case other: VertexRDD[_] if this.partitioner == other.partitioner =>
         leftZipJoin(other)(f)
       case _ =>
         this.withPartitionsRDD[VD3](
@@ -152,7 +158,7 @@ class VertexRDDImpl[VD] private[graphx] (
     // Test if the other vertex is a VertexRDD to choose the optimal join 
strategy.
     // If the other set is a VertexRDD then we use the much more efficient 
innerZipJoin
     other match {
-      case other: VertexRDD[_] =>
+      case other: VertexRDD[_] if this.partitioner == other.partitioner =>
         innerZipJoin(other)(f)
       case _ =>
         this.withPartitionsRDD(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to