[ https://issues.apache.org/jira/browse/SPARK-1955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14319661#comment-14319661 ]
Brennon York commented on SPARK-1955: ------------------------------------- [~ankurdave] if you haven't started on this I can take it since it relates heavily to [SPARK-4600|https://issues.apache.org/jira/browse/SPARK-4600] and [SPARK-5790|https://issues.apache.org/jira/browse/SPARK-5790]. If you've already started work though let me know :) > VertexRDD can incorrectly assume index sharing > ---------------------------------------------- > > Key: SPARK-1955 > URL: https://issues.apache.org/jira/browse/SPARK-1955 > Project: Spark > Issue Type: Bug > Components: GraphX > Affects Versions: 0.9.0, 0.9.1, 1.0.0 > Reporter: Ankur Dave > Assignee: Ankur Dave > Priority: Minor > > Many VertexRDD operations (diff, leftJoin, innerJoin) can use a fast zip join > if both operands are VertexRDDs sharing the same index (i.e., one operand is > derived from the other). This check is implemented by matching on the operand > type and using the fast join strategy if both are VertexRDDs. > This is clearly fine when both do in fact share the same index. It is also > fine when the two VertexRDDs have the same partitioner but different indexes, > because each VertexPartition will detect the index mismatch and fall back to > the slow but correct local join strategy. > However, when they have different numbers of partitions or different > partition functions, an exception or even silently incorrect results can > occur. > For example: > {code} > import org.apache.spark._ > import org.apache.spark.graphx._ > // Construct VertexRDDs with different numbers of partitions > val a = VertexRDD(sc.parallelize(List((0L, 1), (1L, 2)), 1)) > val b = VertexRDD(sc.parallelize(List((0L, 5)), 8)) > // Try to join them. Appears to work... > val c = a.innerJoin(b) { (vid, x, y) => x + y } > // ... but then fails with java.lang.IllegalArgumentException: Can't zip RDDs > with unequal numbers of partitions > c.collect > // Construct VertexRDDs with different partition functions > val a = VertexRDD(sc.parallelize(List((0L, 1), (1L, 2))).partitionBy(new > HashPartitioner(2))) > val bVerts = sc.parallelize(List((1L, 5))) > val b = VertexRDD(bVerts.partitionBy(new RangePartitioner(2, bVerts))) > // Try to join them. We expect (1L, 7). > val c = a.innerJoin(b) { (vid, x, y) => x + y } > // Silent failure: we get an empty set! > c.collect > {code} > VertexRDD should check equality of partitioners before using the fast zip > join. If the partitioners are different, the two datasets should be > automatically co-partitioned. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org