Repository: spark Updated Branches: refs/heads/branch-1.2 a9abcaa2c -> 00112baf9
[SPARK-1955][GraphX]: VertexRDD can incorrectly assume index sharing Fixes the issue whereby when VertexRDD's are `diff`ed, `innerJoin`ed, or `leftJoin`ed and have different partition sizes they fail under the `zipPartitions` method. This fix tests whether the partitions are equal or not and, if not, will repartition the other to match the partition size of the calling VertexRDD. Author: Brennon York <brennon.y...@capitalone.com> Closes #4705 from brennonyork/SPARK-1955 and squashes the following commits: 0882590 [Brennon York] updated to properly handle differently-partitioned vertexRDDs (cherry picked from commit 9f603fce78fcc997926e9a72dec44d48cbc396fc) Signed-off-by: Ankur Dave <ankurd...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/00112baf Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/00112baf Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/00112baf Branch: refs/heads/branch-1.2 Commit: 00112baf9e9707e9a773e8076dc4ed2957803bfd Parents: a9abcaa Author: Brennon York <brennon.y...@capitalone.com> Authored: Wed Feb 25 14:11:12 2015 -0800 Committer: Ankur Dave <ankurd...@gmail.com> Committed: Wed Feb 25 14:14:22 2015 -0800 ---------------------------------------------------------------------- .../org/apache/spark/graphx/impl/VertexRDDImpl.scala | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/00112baf/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala ---------------------------------------------------------------------- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala index 9732c5b..d9bf9fe 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala @@ -94,8 +94,14 @@ class VertexRDDImpl[VD] private[graphx] ( this.mapVertexPartitions(_.map(f)) override def diff(other: VertexRDD[VD]): VertexRDD[VD] = { + val otherPartition = other match { + case other: VertexRDD[_] if this.partitioner == other.partitioner => + other.partitionsRDD + case _ => + VertexRDD(other.partitionBy(this.partitioner.get)).partitionsRDD + } val newPartitionsRDD = partitionsRDD.zipPartitions( - other.partitionsRDD, preservesPartitioning = true + otherPartition, preservesPartitioning = true ) { (thisIter, otherIter) => val thisPart = thisIter.next() val otherPart = otherIter.next() @@ -123,7 +129,7 @@ class VertexRDDImpl[VD] private[graphx] ( // Test if the other vertex is a VertexRDD to choose the optimal join strategy. // If the other set is a VertexRDD then we use the much more efficient leftZipJoin other match { - case other: VertexRDD[_] => + case other: VertexRDD[_] if this.partitioner == other.partitioner => leftZipJoin(other)(f) case _ => this.withPartitionsRDD[VD3]( @@ -152,7 +158,7 @@ class VertexRDDImpl[VD] private[graphx] ( // Test if the other vertex is a VertexRDD to choose the optimal join strategy. // If the other set is a VertexRDD then we use the much more efficient innerZipJoin other match { - case other: VertexRDD[_] => + case other: VertexRDD[_] if this.partitioner == other.partitioner => innerZipJoin(other)(f) case _ => this.withPartitionsRDD( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org