This should work.... se1 = sc.parallelize(setupRow(10),1) base2 = sc.parallelize(setupRow(10),1) df1 = ssc.createDataFrame(base1) df2 = ssc.createDataFrame(base2) df1.show() df2.show() df1.registerTempTable("df1") df2.registerTempTable("df2") j = ssc.sql("select df1.k1 df1_k1, df1.k2 df1_k2, df1.k3 df1_k3, \ df2.k1 df2_k1, df2.k2 df2_k2, df2.k3 df2_k3 \ from df1 inner join df2 \ on 1 = case when df1.k1=df2.k1 then 1 \ when df1.k2=df2.k2 then 1 \ when df1.k3=df2.k3 then 1 \ else 0 \ end") for k in j.collect(): print k
On Sun, May 10, 2015 at 8:50 AM, Stéphane Verlet <kaweahsoluti...@gmail.com> wrote: > Create a custom key class implement the equals methods and make sure the > hash method is compatible. > Use that key to map and join your row. > > > > On Sat, May 9, 2015 at 4:02 PM, Mathieu D <matd...@gmail.com> wrote: > >> Hi folks, >> >> I need to join RDDs having composite keys like this : (K1, K2 ... Kn). >> >> The joining rule looks like this : >> * if left.K1 == right.K1, then we have a "true equality", and all K2... >> Kn are also equal. >> * if left.K1 != right.K1 but left.K2 == right.K2, I have a partial >> equality, and I also want the join to occur there. >> * if K2 don't match, then I test K3 and so on. >> >> Is there a way to implement a custom join with a given predicate to >> implement this ? (I would probably also need to provide a partitioner, and >> some sorting predicate). >> >> Left and right RDD are 1-10 millions lines long. >> Any idea ? >> >> Thanks >> Mathieu >> > > -- Best Regards, Ayan Guha