Repository: spark
Updated Branches:
  refs/heads/branch-1.1 0c8183cb3 -> ffdb2fcf8


[SPARK-2823][GraphX]fix GraphX EdgeRDD zipPartitions

If the users set “spark.default.parallelism” and the value is different 
with the EdgeRDD partition number, GraphX jobs will throw:
java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of 
partitions

Author: luluorta <luluo...@gmail.com>

Closes #1763 from luluorta/fix-graph-zip and squashes the following commits:

8338961 [luluorta] fix GraphX EdgeRDD zipPartitions

(cherry picked from commit 9b225ac3072de522b40b46aba6df1f1c231f13ef)
Signed-off-by: Ankur Dave <ankurd...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ffdb2fcf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ffdb2fcf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ffdb2fcf

Branch: refs/heads/branch-1.1
Commit: ffdb2fcf8cd5880375bee52ee101e0373bf63e27
Parents: 0c8183c
Author: luluorta <luluo...@gmail.com>
Authored: Tue Sep 2 19:25:52 2014 -0700
Committer: Ankur Dave <ankurd...@gmail.com>
Committed: Tue Sep 2 19:28:57 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/graphx/EdgeRDD.scala     |  4 ++--
 .../scala/org/apache/spark/graphx/GraphSuite.scala  | 16 ++++++++++++++++
 2 files changed, 18 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ffdb2fcf/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
index 899a3cb..0f1a101 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
@@ -19,7 +19,7 @@ package org.apache.spark.graphx
 
 import scala.reflect.{classTag, ClassTag}
 
-import org.apache.spark.{OneToOneDependency, Partition, Partitioner, 
TaskContext}
+import org.apache.spark._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
 
@@ -47,7 +47,7 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
    * partitioner that allows co-partitioning with `partitionsRDD`.
    */
   override val partitioner =
-    
partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))
+    partitionsRDD.partitioner.orElse(Some(new 
HashPartitioner(partitionsRDD.partitions.size)))
 
   override def compute(part: Partition, context: TaskContext): 
Iterator[Edge[ED]] = {
     val p = firstParent[(PartitionID, EdgePartition[ED, VD])].iterator(part, 
context)

http://git-wip-us.apache.org/repos/asf/spark/blob/ffdb2fcf/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala 
b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index 6506bac..eaaa449 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.graphx
 
 import org.scalatest.FunSuite
 
+import org.apache.spark.SparkConf
 import org.apache.spark.SparkContext
 import org.apache.spark.graphx.Graph._
 import org.apache.spark.graphx.PartitionStrategy._
@@ -350,4 +351,19 @@ class GraphSuite extends FunSuite with LocalSparkContext {
     }
   }
 
+  test("non-default number of edge partitions") {
+    val n = 10
+    val defaultParallelism = 3
+    val numEdgePartitions = 4
+    assert(defaultParallelism != numEdgePartitions)
+    val conf = new SparkConf()
+      .set("spark.default.parallelism", defaultParallelism.toString)
+    val sc = new SparkContext("local", "test", conf)
+    val edges = sc.parallelize((1 to n).map(x => (x: VertexId, 0: VertexId)),
+      numEdgePartitions)
+    val graph = Graph.fromEdgeTuples(edges, 1)
+    val neighborAttrSums = graph.mapReduceTriplets[Int](
+      et => Iterator((et.dstId, et.srcAttr)), _ + _)
+    assert(neighborAttrSums.collect.toSet === Set((0: VertexId, n)))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to