Repository: spark Updated Branches: refs/heads/branch-1.1 0c8183cb3 -> ffdb2fcf8
[SPARK-2823][GraphX]fix GraphX EdgeRDD zipPartitions If the users set âspark.default.parallelismâ and the value is different with the EdgeRDD partition number, GraphX jobs will throw: java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions Author: luluorta <luluo...@gmail.com> Closes #1763 from luluorta/fix-graph-zip and squashes the following commits: 8338961 [luluorta] fix GraphX EdgeRDD zipPartitions (cherry picked from commit 9b225ac3072de522b40b46aba6df1f1c231f13ef) Signed-off-by: Ankur Dave <ankurd...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ffdb2fcf Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ffdb2fcf Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ffdb2fcf Branch: refs/heads/branch-1.1 Commit: ffdb2fcf8cd5880375bee52ee101e0373bf63e27 Parents: 0c8183c Author: luluorta <luluo...@gmail.com> Authored: Tue Sep 2 19:25:52 2014 -0700 Committer: Ankur Dave <ankurd...@gmail.com> Committed: Tue Sep 2 19:28:57 2014 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/graphx/EdgeRDD.scala | 4 ++-- .../scala/org/apache/spark/graphx/GraphSuite.scala | 16 ++++++++++++++++ 2 files changed, 18 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ffdb2fcf/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala ---------------------------------------------------------------------- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala index 899a3cb..0f1a101 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala @@ -19,7 +19,7 @@ package org.apache.spark.graphx import scala.reflect.{classTag, ClassTag} -import org.apache.spark.{OneToOneDependency, Partition, Partitioner, TaskContext} +import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel @@ -47,7 +47,7 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag]( * partitioner that allows co-partitioning with `partitionsRDD`. */ override val partitioner = - partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD))) + partitionsRDD.partitioner.orElse(Some(new HashPartitioner(partitionsRDD.partitions.size))) override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = { val p = firstParent[(PartitionID, EdgePartition[ED, VD])].iterator(part, context) http://git-wip-us.apache.org/repos/asf/spark/blob/ffdb2fcf/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala ---------------------------------------------------------------------- diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala index 6506bac..eaaa449 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.graphx import org.scalatest.FunSuite +import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.graphx.Graph._ import org.apache.spark.graphx.PartitionStrategy._ @@ -350,4 +351,19 @@ class GraphSuite extends FunSuite with LocalSparkContext { } } + test("non-default number of edge partitions") { + val n = 10 + val defaultParallelism = 3 + val numEdgePartitions = 4 + assert(defaultParallelism != numEdgePartitions) + val conf = new SparkConf() + .set("spark.default.parallelism", defaultParallelism.toString) + val sc = new SparkContext("local", "test", conf) + val edges = sc.parallelize((1 to n).map(x => (x: VertexId, 0: VertexId)), + numEdgePartitions) + val graph = Graph.fromEdgeTuples(edges, 1) + val neighborAttrSums = graph.mapReduceTriplets[Int]( + et => Iterator((et.dstId, et.srcAttr)), _ + _) + assert(neighborAttrSums.collect.toSet === Set((0: VertexId, n))) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org