As you have same partitioner and number of partitions probably you can use zipPartition and provide a user defined function to substract .
A very primitive example being. val data1 = Seq(1->1,2->2,3->3,4->4,5->5,6->6,7->7) val data2 = Seq(1->1,2->2,3->3,4->4,5->5,6->6) val rdd1 = sc.parallelize(data1, 2) val rdd2 = sc.parallelize(data2, 2) val sum = rdd1.zipPartitions(rdd2){ (leftItr, rightItr) => leftItr.filter(p => !rightItr.contains(p)) } sum.foreach(println) Regards, Rishitesh Mishra, SnappyData . (http://www.snappydata.io/) https://in.linkedin.com/in/rishiteshmishra On Mon, May 9, 2016 at 7:35 PM, Raghava Mutharaju <m.vijayaragh...@gmail.com > wrote: > We tried that but couldn't figure out a way to efficiently filter it. Lets > take two RDDs. > > rdd1: > > (1,2) > (1,5) > (2,3) > (3,20) > (3,16) > > rdd2: > > (1,2) > (3,30) > (3,16) > (5,12) > > rdd1.leftOuterJoin(rdd2) and get rdd1.subtract(rdd2): > > (1,(2,Some(2))) > (1,(5,Some(2))) > (2,(3,None)) > (3,(20,Some(30))) > (3,(20,Some(16))) > (3,(16,Some(30))) > (3,(16,Some(16))) > > case (x, (y, z)) => Apart from allowing z == None and filtering on y == z, > we also should filter out (3, (16, Some(30))). How can we do that > efficiently without resorting to broadcast of any elements of rdd2? > > Regards, > Raghava. > > > On Mon, May 9, 2016 at 6:27 AM, ayan guha <guha.a...@gmail.com> wrote: > >> How about outer join? >> On 9 May 2016 13:18, "Raghava Mutharaju" <m.vijayaragh...@gmail.com> >> wrote: >> >>> Hello All, >>> >>> We have two PairRDDs (rdd1, rdd2) which are hash partitioned on key >>> (number of partitions are same for both the RDDs). We would like to >>> subtract rdd2 from rdd1. >>> >>> The subtract code at >>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala >>> seems to group the elements of both the RDDs using (x, null) where x is the >>> element of the RDD and partition them. Then it makes use of >>> subtractByKey(). This way, RDDs have to be repartitioned on x (which in our >>> case, is both key and value combined). In our case, both the RDDs are >>> already hash partitioned on the key of x. Can we take advantage of this by >>> having a PairRDD/HashPartitioner-aware subtract? Is there a way to use >>> mapPartitions() for this? >>> >>> We tried to broadcast rdd2 and use mapPartitions. But this turns out to >>> be memory consuming and inefficient. We tried to do a local set difference >>> between rdd1 and the broadcasted rdd2 (in mapPartitions of rdd1). We did >>> use destroy() on the broadcasted value, but it does not help. >>> >>> The current subtract method is slow for us. rdd1 and rdd2 are around >>> 700MB each and the subtract takes around 14 seconds. >>> >>> Any ideas on this issue is highly appreciated. >>> >>> Regards, >>> Raghava. >>> >> > > > -- > Regards, > Raghava > http://raghavam.github.io >