Re: custom joins on dataframe

2017-07-24 Thread Jörn Franke
It might be faster if you add the column with the hash result before the join 
to the dataframe and then do simply a normal join on that column

> On 22. Jul 2017, at 17:39, Stephen Fletcher  
> wrote:
> 
> Normally a family of joins (left, right outter, inner) are performed on two 
> dataframes using columns for the comparison ie left("acol") === ight("acol") 
> . the comparison operator of the "left" dataframe does something internally 
> and produces a column that i assume is used by the join.
> 
> What I want is to create my own comparison operation (i have a case where i 
> want to use some fuzzy matching between rows and if they fall within some 
> threshold we allow the join to happen)
> 
> so it would look something like
> 
> left.join(right, my_fuzzy_udf (left("cola"),right("cola")))
> 
> Where my_fuzzy_udf  is my defined UDF. My main concern is the column that 
> would have to be output what would its value be ie what would the function 
> need to return that the udf susbsystem would then turn to a column to be 
> evaluated by the join.
> 
> 
> Thanks in advance for any advice

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: custom joins on dataframe

2017-07-23 Thread Michael Armbrust
>
> left.join(right, my_fuzzy_udf (left("cola"),right("cola")))
>

While this could work, the problem will be that we'll have to check every
possible combination of tuples from left and right using your UDF.  It
would be best if you could somehow partition the problem so that we could
reduce the number of comparisons.  For example, if you had a fuzzy hash
that you could do an equality check on in addition to the UDF, that would
greatly speed up the computation.


Re: custom joins on dataframe

2017-07-22 Thread Sumedh Wale
The Dataset.join(right: Dataset[_], joinExprs: Column) API can use any 
arbitrary expression so you can use UDF for join.


The problem with all non-equality joins is that they use 
BroadcastNestedLoopJoin or equivalent, that is an (M X N) nested-loop 
which will be unusable for medium/large tables. At least one of the 
tables should be small for this to work with an acceptable performance. 
For example if one table has 100M rows after filter, and other 1M rows, 
then NLJ will result in 100 trillion rows to be scanned that will take 
very long under normal circumstances, but if one of the sides is much 
smaller after filter say few thousand rows then can be fine.


What you probably need for large tables is to implement own optimized 
join operator and use some join structure that can do the join 
efficiently without having to do nested loops (i.e. some fancy structure 
for efficient fuzzy joins). Its possible to do that using internal Spark 
APIs but its not easy and you have to implement an efficient join 
structure first. Or perhaps some existing libraries out there could work 
for you (like https://github.com/soundcloud/cosine-lsh-join-spark?).


--
Sumedh Wale
SnappyData (http://www.snappydata.io)

On Saturday 22 July 2017 09:09 PM, Stephen Fletcher wrote:
Normally a family of joins (left, right outter, inner) are performed 
on two dataframes using columns for the comparison ie left("acol") === 
ight("acol") . the comparison operator of the "left" dataframe does 
something internally and produces a column that i assume is used by 
the join.


What I want is to create my own comparison operation (i have a case 
where i want to use some fuzzy matching between rows and if they fall 
within some threshold we allow the join to happen)


so it would look something like

left.join(right, my_fuzzy_udf (left("cola"),right("cola")))

Where my_fuzzy_udf  is my defined UDF. My main concern is the column 
that would have to be output what would its value be ie what would the 
function need to return that the udf susbsystem would then turn to a 
column to be evaluated by the join.



Thanks in advance for any advice



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



custom joins on dataframe

2017-07-22 Thread Stephen Fletcher
Normally a family of joins (left, right outter, inner) are performed on two
dataframes using columns for the comparison ie left("acol") ===
ight("acol") . the comparison operator of the "left" dataframe does
something internally and produces a column that i assume is used by the
join.

What I want is to create my own comparison operation (i have a case where i
want to use some fuzzy matching between rows and if they fall within some
threshold we allow the join to happen)

so it would look something like

left.join(right, my_fuzzy_udf (left("cola"),right("cola")))

Where my_fuzzy_udf  is my defined UDF. My main concern is the column that
would have to be output what would its value be ie what would the function
need to return that the udf susbsystem would then turn to a column to be
evaluated by the join.


Thanks in advance for any advice