[ https://issues.apache.org/jira/browse/SPARK-1931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14010340#comment-14010340 ]
Ankur Dave edited comment on SPARK-1931 at 5/27/14 9:47 PM: ------------------------------------------------------------ Since the fix didn't make it into Spark 1.0.0, a workaround is to partition the edges before constructing the graph, as follows: {code} import org.apache.spark.HashPartitioner import org.apache.spark.rdd.RDD import org.apache.spark.graphx._ def partitionBy[ED](edges: RDD[Edge[ED]], partitionStrategy: PartitionStrategy): RDD[Edge[ED]] = { val numPartitions = edges.partitions.size edges.map(e => (partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions), e)) .partitionBy(new HashPartitioner(numPartitions)) .mapPartitions(_.map(_._2), preservesPartitioning = true) } val g = Graph( sc.parallelize(List((0L, "a"), (1L, "b"), (2L, "c"))), sc.parallelize(List(Edge(0L, 1L, 1), Edge(0L, 2L, 1)), 2)) assert(g.triplets.collect.map(_.toTuple).toSet == Set(((0L, "a"), (1L, "b"), 1), ((0L, "a"), (2L, "c"), 1))) val gPart = Graph( sc.parallelize(List((0L, "a"), (1L, "b"), (2L, "c"))), partitionBy(sc.parallelize(List(Edge(0L, 1L, 1), Edge(0L, 2L, 1)), 2), PartitionStrategy.EdgePartition2D)) assert(gPart.triplets.collect.map(_.toTuple).toSet == Set(((0L, "a"), (1L, "b"), 1), ((0L, "a"), (2L, "c"), 1))) {code} was (Author: ankurd): Since the fix didn't make it into Spark 1.0.0, a workaround is to partition the edges before constructing the graph as follows: {code} import org.apache.spark.HashPartitioner import org.apache.spark.rdd.RDD import org.apache.spark.graphx._ def partitionBy[ED](edges: RDD[Edge[ED]], partitionStrategy: PartitionStrategy): RDD[Edge[ED]] = { val numPartitions = edges.partitions.size edges.map(e => (partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions), e)) .partitionBy(new HashPartitioner(numPartitions)) .mapPartitions(_.map(_._2), preservesPartitioning = true) } val g = Graph( sc.parallelize(List((0L, "a"), (1L, "b"), (2L, "c"))), sc.parallelize(List(Edge(0L, 1L, 1), Edge(0L, 2L, 1)), 2)) assert(g.triplets.collect.map(_.toTuple).toSet == Set(((0L, "a"), (1L, "b"), 1), ((0L, "a"), (2L, "c"), 1))) val gPart = Graph( sc.parallelize(List((0L, "a"), (1L, "b"), (2L, "c"))), partitionBy(sc.parallelize(List(Edge(0L, 1L, 1), Edge(0L, 2L, 1)), 2), PartitionStrategy.EdgePartition2D)) assert(gPart.triplets.collect.map(_.toTuple).toSet == Set(((0L, "a"), (1L, "b"), 1), ((0L, "a"), (2L, "c"), 1))) {code} > Graph.partitionBy does not reconstruct routing tables > ----------------------------------------------------- > > Key: SPARK-1931 > URL: https://issues.apache.org/jira/browse/SPARK-1931 > Project: Spark > Issue Type: Bug > Components: GraphX > Affects Versions: 1.0.0 > Reporter: Ankur Dave > Assignee: Ankur Dave > Fix For: 1.0.1 > > > Commit 905173df57b90f90ebafb22e43f55164445330e6 introduced a bug in > partitionBy where, after repartitioning the edges, it reuses the VertexRDD > without updating the routing tables to reflect the new edge layout. This > causes the following test to fail: > {code} > import org.apache.spark.graphx._ > val g = Graph( > sc.parallelize(List((0L, "a"), (1L, "b"), (2L, "c"))), > sc.parallelize(List(Edge(0L, 1L, 1), Edge(0L, 2L, 1)), 2)) > assert(g.triplets.collect.map(_.toTuple).toSet == > Set(((0L, "a"), (1L, "b"), 1), ((0L, "a"), (2L, "c"), 1))) > val gPart = g.partitionBy(PartitionStrategy.EdgePartition2D) > assert(gPart.triplets.collect.map(_.toTuple).toSet == > Set(((0L, "a"), (1L, "b"), 1), ((0L, "a"), (2L, "c"), 1))) > {code} -- This message was sent by Atlassian JIRA (v6.2#6252)