This should work....

se1 = sc.parallelize(setupRow(10),1)
    base2 = sc.parallelize(setupRow(10),1)
    df1 = ssc.createDataFrame(base1)
    df2 = ssc.createDataFrame(base2)
    df1.show()
    df2.show()
    df1.registerTempTable("df1")
    df2.registerTempTable("df2")
    j = ssc.sql("select df1.k1 df1_k1, df1.k2 df1_k2, df1.k3 df1_k3, \
                        df2.k1 df2_k1, df2.k2 df2_k2, df2.k3 df2_k3 \
                       from df1 inner join df2 \
                       on 1 = case when df1.k1=df2.k1 then 1 \
                                   when df1.k2=df2.k2 then 1 \
                                   when df1.k3=df2.k3 then 1 \
                                   else 0 \
                              end")
    for k in j.collect():
        print k

On Sun, May 10, 2015 at 8:50 AM, Stéphane Verlet <kaweahsoluti...@gmail.com>
wrote:

> Create a custom  key class implement the equals methods and make sure the
> hash method is compatible.
> Use that key to map and join your row.
>
>
>
> On Sat, May 9, 2015 at 4:02 PM, Mathieu D <matd...@gmail.com> wrote:
>
>> Hi folks,
>>
>> I need to join RDDs having composite keys like this : (K1, K2 ... Kn).
>>
>> The joining rule looks like this :
>> * if left.K1 == right.K1, then we have a "true equality", and all K2...
>> Kn are also equal.
>> * if left.K1 != right.K1 but left.K2 == right.K2, I have a partial
>> equality, and I also want the join to occur there.
>> * if K2 don't match, then I test K3 and so on.
>>
>> Is there a way to implement a custom join with a given predicate to
>> implement this ? (I would probably also need to provide a partitioner, and
>> some sorting predicate).
>>
>> Left and right RDD are 1-10 millions lines long.
>> Any idea ?
>>
>> Thanks
>> Mathieu
>>
>
>


-- 
Best Regards,
Ayan Guha

Reply via email to