[ 
https://issues.apache.org/jira/browse/SPARK-1955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14319661#comment-14319661
 ] 

Brennon York commented on SPARK-1955:
-------------------------------------

[~ankurdave] if you haven't started on this I can take it since it relates 
heavily to [SPARK-4600|https://issues.apache.org/jira/browse/SPARK-4600] and 
[SPARK-5790|https://issues.apache.org/jira/browse/SPARK-5790]. If you've 
already started work though let me know :)

> VertexRDD can incorrectly assume index sharing
> ----------------------------------------------
>
>                 Key: SPARK-1955
>                 URL: https://issues.apache.org/jira/browse/SPARK-1955
>             Project: Spark
>          Issue Type: Bug
>          Components: GraphX
>    Affects Versions: 0.9.0, 0.9.1, 1.0.0
>            Reporter: Ankur Dave
>            Assignee: Ankur Dave
>            Priority: Minor
>
> Many VertexRDD operations (diff, leftJoin, innerJoin) can use a fast zip join 
> if both operands are VertexRDDs sharing the same index (i.e., one operand is 
> derived from the other). This check is implemented by matching on the operand 
> type and using the fast join strategy if both are VertexRDDs.
> This is clearly fine when both do in fact share the same index. It is also 
> fine when the two VertexRDDs have the same partitioner but different indexes, 
> because each VertexPartition will detect the index mismatch and fall back to 
> the slow but correct local join strategy.
> However, when they have different numbers of partitions or different 
> partition functions, an exception or even silently incorrect results can 
> occur.
> For example:
> {code}
> import org.apache.spark._
> import org.apache.spark.graphx._
> // Construct VertexRDDs with different numbers of partitions
> val a = VertexRDD(sc.parallelize(List((0L, 1), (1L, 2)), 1))
> val b = VertexRDD(sc.parallelize(List((0L, 5)), 8))
> // Try to join them. Appears to work...
> val c = a.innerJoin(b) { (vid, x, y) => x + y }
> // ... but then fails with java.lang.IllegalArgumentException: Can't zip RDDs 
> with unequal numbers of partitions
> c.collect
> // Construct VertexRDDs with different partition functions
> val a = VertexRDD(sc.parallelize(List((0L, 1), (1L, 2))).partitionBy(new 
> HashPartitioner(2)))
> val bVerts = sc.parallelize(List((1L, 5)))
> val b = VertexRDD(bVerts.partitionBy(new RangePartitioner(2, bVerts)))
> // Try to join them. We expect (1L, 7).
> val c = a.innerJoin(b) { (vid, x, y) => x + y }
> // Silent failure: we get an empty set!
> c.collect
> {code}
> VertexRDD should check equality of partitioners before using the fast zip 
> join. If the partitioners are different, the two datasets should be 
> automatically co-partitioned.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to