As you have same partitioner and number of partitions probably you can use
zipPartition and provide a user defined function to substract .

A very primitive  example being.

val data1 = Seq(1->1,2->2,3->3,4->4,5->5,6->6,7->7)
val data2 = Seq(1->1,2->2,3->3,4->4,5->5,6->6)
val rdd1 = sc.parallelize(data1, 2)
val rdd2 = sc.parallelize(data2, 2)
val sum = rdd1.zipPartitions(rdd2){ (leftItr, rightItr) =>
  leftItr.filter(p => !rightItr.contains(p))
}
sum.foreach(println)



Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra

On Mon, May 9, 2016 at 7:35 PM, Raghava Mutharaju <m.vijayaragh...@gmail.com
> wrote:

> We tried that but couldn't figure out a way to efficiently filter it. Lets
> take two RDDs.
>
> rdd1:
>
> (1,2)
> (1,5)
> (2,3)
> (3,20)
> (3,16)
>
> rdd2:
>
> (1,2)
> (3,30)
> (3,16)
> (5,12)
>
> rdd1.leftOuterJoin(rdd2) and get rdd1.subtract(rdd2):
>
> (1,(2,Some(2)))
> (1,(5,Some(2)))
> (2,(3,None))
> (3,(20,Some(30)))
> (3,(20,Some(16)))
> (3,(16,Some(30)))
> (3,(16,Some(16)))
>
> case (x, (y, z)) => Apart from allowing z == None and filtering on y == z,
> we also should filter out (3, (16, Some(30))). How can we do that
> efficiently without resorting to broadcast of any elements of rdd2?
>
> Regards,
> Raghava.
>
>
> On Mon, May 9, 2016 at 6:27 AM, ayan guha <guha.a...@gmail.com> wrote:
>
>> How about outer join?
>> On 9 May 2016 13:18, "Raghava Mutharaju" <m.vijayaragh...@gmail.com>
>> wrote:
>>
>>> Hello All,
>>>
>>> We have two PairRDDs (rdd1, rdd2) which are hash partitioned on key
>>> (number of partitions are same for both the RDDs). We would like to
>>> subtract rdd2 from rdd1.
>>>
>>> The subtract code at
>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala
>>> seems to group the elements of both the RDDs using (x, null) where x is the
>>> element of the RDD and partition them. Then it makes use of
>>> subtractByKey(). This way, RDDs have to be repartitioned on x (which in our
>>> case, is both key and value combined). In our case, both the RDDs are
>>> already hash partitioned on the key of x. Can we take advantage of this by
>>> having a PairRDD/HashPartitioner-aware subtract? Is there a way to use
>>> mapPartitions() for this?
>>>
>>> We tried to broadcast rdd2 and use mapPartitions. But this turns out to
>>> be memory consuming and inefficient. We tried to do a local set difference
>>> between rdd1 and the broadcasted rdd2 (in mapPartitions of rdd1). We did
>>> use destroy() on the broadcasted value, but it does not help.
>>>
>>> The current subtract method is slow for us. rdd1 and rdd2 are around
>>> 700MB each and the subtract takes around 14 seconds.
>>>
>>> Any ideas on this issue is highly appreciated.
>>>
>>> Regards,
>>> Raghava.
>>>
>>
>
>
> --
> Regards,
> Raghava
> http://raghavam.github.io
>

Reply via email to