Petar Zecevic created SPARK-24020:
-------------------------------------

             Summary: Sort-merge join inner range optimization
                 Key: SPARK-24020
                 URL: https://issues.apache.org/jira/browse/SPARK-24020
             Project: Spark
          Issue Type: Improvement
          Components: SQL
    Affects Versions: 2.3.0
            Reporter: Petar Zecevic


The problem we are solving is the case where you have two big tables 
partitioned by X column, but also sorted by Y column (within partitions) and 
you need to calculate an expensive function on the joined rows. During a 
sort-merge join, Spark will do cross-joins of all rows that have the same X 
values and calculate the function's value on all of them. If the two tables 
have a large number of rows per X, this can result in a huge number of 
calculations.

We hereby propose an optimization that would allow you to reduce the number of 
matching rows per X using a range condition on Y columns of the two tables. 
Something like:

... WHERE t1.X = t2.X AND t1.Y BETWEEN t2.Y - d AND t2.Y + d

The way SMJ is currently implemented, these extra conditions have no influence 
on the number of rows (per X) being checked because these extra conditions are 
put in the same block with the function being calculated.

Here we propose a change to the sort-merge join so that, when these extra 
conditions are specified, a queue is used instead of the 
ExternalAppendOnlyUnsafeRowArray class. This queue would then used as a moving 
window across the values from the right relation as the left row changes. You 
could call this a combination of an equi-join and a theta join (we call it 
"sort-merge inner range join").

Potential use-cases for this are joins based on spatial or temporal distance 
calculations.

The optimization should be triggered automatically when an equi-join expression 
is present AND lower and upper range conditions on a secondary column are 
specified. If the tables aren't sorted by both columns, appropriate sorts 
should be added.

To limit the impact of this change we also propose adding a new parameter 
(tentatively named "spark.sql.join.smj.useInnerRangeOptimization") which could 
be used to switch off the optimization entirely.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to