[jira] [Updated] (SPARK-24020) Sort-merge join inner range optimization

2018-06-28 Thread Petar Zecevic (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Petar Zecevic updated SPARK-24020:
--
Description: 
The problem we are solving is the case where you have two big tables 
partitioned by X column, but also sorted within partitions by Y column and you 
need to calculate an expensive function on the joined rows, which reduces the 
number of output rows (e.g. condition based on a spatial distance calculation). 
But you could theoretically reduce the number of joined rows for which the 
calculation itself is performed by using a range condition on the Y column. 
Something like this:

{{... WHERE t1.X = t2.X AND t1.Y BETWEEN t2.Y - d AND t2.Y + d AND }}

However, during a sort-merge join with this range condition specified, Spark 
will first cross-join all the rows with the same X value and only then try to 
apply the range condition and any function calculations. This happens because, 
inside the generated sort-merge join (SMJ) code, these extra conditions are put 
in the same block with the function being calculated and there is no way to 
evaluate these conditions before reading all the rows to be checked into memory 
(into an {{ExternalAppendOnlyUnsafeRowArray}}). If the two tables have a large 
number of rows per X, this can result in a huge number of calculations and a 
huge number of rows in executor memory, which can be unfeasible.
h3. The solution implementation

We therefore propose a change to the sort-merge join so that, when these extra 
conditions are specified, a queue is used instead of the 
ExternalAppendOnlyUnsafeRowArray class. This queue is then used as a moving 
window through the values from the right relation as the left row changes. You 
could call this a combination of an equi-join and a theta join; in literature 
it is sometimes called an “epsilon join”. We call it a "sort-merge inner range 
join".
This design uses much less memory (not all rows with the same values of X need 
to be loaded into memory at once) and requires a much lower number of 
comparisons (the validity of this statement depends on the actual data and 
conditions used).
h3. The classes that need to be changed 

For implementing the described change we propose changes to these classes:
* _ExtractEquiJoinKeys_ – a pattern that needs to be extended to be able to 
recognize the case where a simple range condition with lower and upper limits 
is used on a secondary column (a column not included in the equi-join 
condition). The pattern also needs to extract the information later required 
for code generation etc.
* _InMemoryUnsafeRowQueue_ – the moving window implementation to be used 
instead of the _ExternalAppendOnlyUnsafeRowArray_ class. The rows need to be 
removed and added to/from the structure as the left key (X) changes, or the 
left secondary value (Y) changes, so the structure needs to be a queue. To make 
the change as less intrusive as possible, we propose to implement 
_InMemoryUnsafeRowQueue_ as a subclass of _ExternalAppendOnlyUnsafeRowArray_
* _JoinSelection_ – a strategy that uses _ExtractEquiJoinKeys_ and needs to be 
aware of the extracted range conditions
* _SortMergeJoinExec_ – the main implementation of the optimization. Needs to 
support two code paths: 
** when whole-stage code generation is turned off (method doExecute, which uses 
sortMergeJoinInnerRangeScanner) 
** when whole-stage code generation is turned on (methods doProduce and 
genScanner)
* _SortMergeJoinInnerRangeScanner_ – implements the SMJ with inner-range 
optimization in the case when whole-stage codegen is turned off
* _InnerJoinSuite_ – functional tests
* _JoinBenchmark_ – performance tests

h3. Triggering the optimization

The optimization should be triggered automatically when an equi-join expression 
is present AND lower and upper range conditions on a secondary column are 
specified. If the tables aren't sorted by both columns, appropriate sorts 
should be added.

To limit the impact of this change we also propose adding a new parameter 
(tentatively named "spark.sql.join.smj.useInnerRangeOptimization") which could 
be used to switch off the optimization entirely.

h2. Applicable use cases

Potential use-cases for this are joins based on spatial or temporal distance 
calculations.

 

  was:
The problem we are solving is the case where you have two big tables 
partitioned by X column, but also sorted within partitions by Y column and you 
need to calculate an expensive function on the joined rows, which reduces the 
number of output rows (e.g. condition based on a spatial distance calculation). 
But you could theoretically reduce the number of joined rows for which the 
calculation itself is performed by using a range condition on the Y column. 
Something like this:

{{... WHERE t1.X = t2.X AND t1.Y BETWEEN t2.Y - d AND t2.Y + d AND }}

However, during a sort-merge join with this range condition specified, 

[jira] [Updated] (SPARK-24020) Sort-merge join inner range optimization

2018-06-28 Thread Petar Zecevic (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Petar Zecevic updated SPARK-24020:
--
Description: 
The problem we are solving is the case where you have two big tables 
partitioned by X column, but also sorted within partitions by Y column and you 
need to calculate an expensive function on the joined rows, which reduces the 
number of output rows (e.g. condition based on a spatial distance calculation). 
But you could theoretically reduce the number of joined rows for which the 
calculation itself is performed by using a range condition on the Y column. 
Something like this:

{{... WHERE t1.X = t2.X AND t1.Y BETWEEN t2.Y - d AND t2.Y + d AND }}

However, during a sort-merge join with this range condition specified, Spark 
will first cross-join all the rows with the same X value and only then try to 
apply the range condition and any function calculations. This happens because, 
inside the generated sort-merge join (SMJ) code, these extra conditions are put 
in the same block with the function being calculated and there is no way to 
evaluate these conditions before reading all the rows to be checked into memory 
(into an {{ExternalAppendOnlyUnsafeRowArray}}). If the two tables have a large 
number of rows per X, this can result in a huge number of calculations and a 
huge number of rows in executor memory, which can be unfeasible.
h3. The solution implementation

We therefore propose a change to the sort-merge join so that, when these extra 
conditions are specified, a queue is used instead of the 
ExternalAppendOnlyUnsafeRowArray class. This queue is then used as a moving 
window through the values from the right relation as the left row changes. You 
could call this a combination of an equi-join and a theta join; in literature 
it is sometimes called an “epsilon join”. We call it a "sort-merge inner range 
join".
This design uses much less memory (not all rows with the same values of X need 
to be loaded into memory at once) and requires a much lower number of 
comparisons (the validity of this statement depends on the actual data and 
conditions used).
h3. The classes that need to be changed 

For implementing the described change we propose changes to these classes:
* _ExtractEquiJoinKeys_ – a pattern that needs to be extended to be able to 
recognize the case where a simple range condition with lower and upper limits 
is used on a secondary column (a column not included in the equi-join 
condition). The pattern also needs to extract the information later required 
for code generation etc.
* _InMemoryUnsafeRowQueue_ – the moving window implementation to be used 
instead of the _ExternalAppendOnlyUnsafeRowArray_ class. The rows need to be 
removed and added to/from the structure as the left key (X) changes, or the 
left secondary value (Y) changes, so the structure needs to be a queue. To make 
the change as less intrusive as possible, we propose to implement 
_InMemoryUnsafeRowQueue_ as a subclass of _ExternalAppendOnlyUnsafeRowArray_
* _JoinSelection_ – a strategy that uses _ExtractEquiJoinKeys_ and needs to be 
aware of the extracted range conditions
* _SortMergeJoinExec_ – the main implementation of the optimization. Needs to 
support two code paths: 
** when whole-stage code generation is turned off (method doExecute, which uses 
sortMergeJoinInnerRangeScanner) 
** when whole-stage code generation is turned on (methods doProduce and 
genScanner)
* _SortMergeJoinInnerRangeScanner_ – implements the SMJ with inner-range 
optimization in the case when whole-stage codegen is turned off
* _InnerJoinSuite_ – functional tests
* _JoinBenchmark_ – performance tests

h3. Triggering the optimization

The optimization should be triggered automatically when an equi-join expression 
is present AND lower and upper range conditions on a secondary column are 
specified. If the tables aren't sorted by both columns, appropriate sorts 
should be added.

To limit the impact of this change we also propose adding a new parameter 
(tentatively named "spark.sql.join.smj.useInnerRangeOptimization") which could 
be used to switch off the optimization entirely.

h3. Applicable use cases

Potential use-cases for this are joins based on spatial or temporal distance 
calculations.

 

  was:
The problem we are solving is the case where you have two big tables 
partitioned by X column, but also sorted within partitions by Y column and you 
need to calculate an expensive function on the joined rows, which reduces the 
number of output rows (e.g. condition based on a spatial distance calculation). 
But you could theoretically reduce the number of joined rows for which the 
calculation itself is performed by using a range condition on the Y column. 
Something like this:

{{... WHERE t1.X = t2.X AND t1.Y BETWEEN t2.Y - d AND t2.Y + d AND }}

However, during a sort-merge join with this range condition specified, 

[jira] [Updated] (SPARK-24020) Sort-merge join inner range optimization

2018-06-28 Thread Petar Zecevic (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Petar Zecevic updated SPARK-24020:
--
Description: 
The problem we are solving is the case where you have two big tables 
partitioned by X column, but also sorted within partitions by Y column and you 
need to calculate an expensive function on the joined rows, which reduces the 
number of output rows (e.g. condition based on a spatial distance calculation). 
But you could theoretically reduce the number of joined rows for which the 
calculation itself is performed by using a range condition on the Y column. 
Something like this:

{{... WHERE t1.X = t2.X AND t1.Y BETWEEN t2.Y - d AND t2.Y + d AND }}

However, during a sort-merge join with this range condition specified, Spark 
will first cross-join all the rows with the same X value and only then try to 
apply the range condition and any function calculations. This happens because, 
inside the generated sort-merge join (SMJ) code, these extra conditions are put 
in the same block with the function being calculated and there is no way to 
evaluate these conditions before reading all the rows to be checked into memory 
(into an {{ExternalAppendOnlyUnsafeRowArray}}). If the two tables have a large 
number of rows per X, this can result in a huge number of calculations and a 
huge number of rows in executor memory, which can be unfeasible.
h2. The solution implementation

We therefore propose a change to the sort-merge join so that, when these extra 
conditions are specified, a queue is used instead of the 
ExternalAppendOnlyUnsafeRowArray class. This queue is then used as a moving 
window through the values from the right relation as the left row changes. You 
could call this a combination of an equi-join and a theta join; in literature 
it is sometimes called an “epsilon join”. We call it a "sort-merge inner range 
join".
This design uses much less memory (not all rows with the same values of X need 
to be loaded into memory at once) and requires a much lower number of 
comparisons (the validity of this statement depends on the actual data and 
conditions used).
h3. The classes that need to be changed 

For implementing the described change we propose changes to these classes:
* _ExtractEquiJoinKeys_ – a pattern that needs to be extended to be able to 
recognize the case where a simple range condition with lower and upper limits 
is used on a secondary column (a column not included in the equi-join 
condition). The pattern also needs to extract the information later required 
for code generation etc.
* _InMemoryUnsafeRowQueue_ – the moving window implementation to be used 
instead of the _ExternalAppendOnlyUnsafeRowArray_ class. The rows need to be 
removed and added to/from the structure as the left key (X) changes, or the 
left secondary value (Y) changes, so the structure needs to be a queue. To make 
the change as less intrusive as possible, we propose to implement 
_InMemoryUnsafeRowQueue_ as a subclass of _ExternalAppendOnlyUnsafeRowArray_
* _JoinSelection_ – a strategy that uses _ExtractEquiJoinKeys_ and needs to be 
aware of the extracted range conditions
* _SortMergeJoinExec_ – the main implementation of the optimization. Needs to 
support two code paths: 
** when whole-stage code generation is turned off (method doExecute, which uses 
sortMergeJoinInnerRangeScanner) 
** when whole-stage code generation is turned on (methods doProduce and 
genScanner)
* _SortMergeJoinInnerRangeScanner_ – implements the SMJ with inner-range 
optimization in the case when whole-stage codegen is turned off
* _InnerJoinSuite_ – functional tests
* _JoinBenchmark_ – performance tests

h2. Triggering the optimization

The optimization should be triggered automatically when an equi-join expression 
is present AND lower and upper range conditions on a secondary column are 
specified. If the tables aren't sorted by both columns, appropriate sorts 
should be added.

To limit the impact of this change we also propose adding a new parameter 
(tentatively named "spark.sql.join.smj.useInnerRangeOptimization") which could 
be used to switch off the optimization entirely.

h2. Applicable use cases

Potential use-cases for this are joins based on spatial or temporal distance 
calculations.

 

  was:
The problem we are solving is the case where you have two big tables 
partitioned by X column, but also sorted within partitions by Y column and you 
need to calculate an expensive function on the joined rows, which reduces the 
number of output rows (e.g. condition based on a spatial distance calculation). 
But you could theoretically reduce the number of joined rows for which the 
calculation itself is performed by using a range condition on the Y column. 
Something like this:

{{... WHERE t1.X = t2.X AND t1.Y BETWEEN t2.Y - d AND t2.Y + d AND }}

However, during a sort-merge join with this range condition specified, 

[jira] [Updated] (SPARK-24020) Sort-merge join inner range optimization

2018-06-28 Thread Petar Zecevic (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Petar Zecevic updated SPARK-24020:
--
Description: 
The problem we are solving is the case where you have two big tables 
partitioned by X column, but also sorted within partitions by Y column and you 
need to calculate an expensive function on the joined rows, which reduces the 
number of output rows (e.g. condition based on a spatial distance calculation). 
But you could theoretically reduce the number of joined rows for which the 
calculation itself is performed by using a range condition on the Y column. 
Something like this:

{{... WHERE t1.X = t2.X AND t1.Y BETWEEN t2.Y - d AND t2.Y + d AND }}

However, during a sort-merge join with this range condition specified, Spark 
will first cross-join all the rows with the same X value and only then try to 
apply the range condition and any function calculations. This happens because, 
inside the generated sort-merge join (SMJ) code, these extra conditions are put 
in the same block with the function being calculated and there is no way to 
evaluate these conditions before reading all the rows to be checked into memory 
(into an {{ExternalAppendOnlyUnsafeRowArray}}). If the two tables have a large 
number of rows per X, this can result in a huge number of calculations and a 
huge number of rows in executor memory, which can be unfeasible.
h2. The solution implementation

We therefore propose a change to the sort-merge join so that, when these extra 
conditions are specified, a queue is used instead of the 
ExternalAppendOnlyUnsafeRowArray class. This queue is then used as a moving 
window through the values from the right relation as the left row changes. You 
could call this a combination of an equi-join and a theta join; in literature 
it is sometimes called an “epsilon join”. We call it a "sort-merge inner range 
join".
This design uses much less memory (not all rows with the same values of X need 
to be loaded into memory at once) and requires a much lower number of 
comparisons (the validity of this statement depends on the actual data and 
conditions used).
h3. 

For implementing the described change we propose changes to these classes:
* _ExtractEquiJoinKeys_ – a pattern that needs to be extended to be able to 
recognize the case where a simple range condition with lower and upper limits 
is used on a secondary column (a column not included in the equi-join 
condition). The pattern also needs to extract the information later required 
for code generation etc.
* _InMemoryUnsafeRowQueue_ – the moving window implementation to be used 
instead of the _ExternalAppendOnlyUnsafeRowArray_ class. The rows need to be 
removed and added to/from the structure as the left key (X) changes, or the 
left secondary value (Y) changes, so the structure needs to be a queue. To make 
the change as less intrusive as possible, we propose to implement 
_InMemoryUnsafeRowQueue_ as a subclass of _ExternalAppendOnlyUnsafeRowArray_
* _JoinSelection_ – a strategy that uses _ExtractEquiJoinKeys_ and needs to be 
aware of the extracted range conditions
* _SortMergeJoinExec_ – the main implementation of the optimization. Needs to 
support two code paths: 
** when whole-stage code generation is turned off (method doExecute, which uses 
sortMergeJoinInnerRangeScanner) 
** when whole-stage code generation is turned on (methods doProduce and 
genScanner)
* _SortMergeJoinInnerRangeScanner_ – implements the SMJ with inner-range 
optimization in the case when whole-stage codegen is turned off
* _InnerJoinSuite_ – functional tests
* _JoinBenchmark_ – performance tests

The optimization should be triggered automatically when an equi-join expression 
is present AND lower and upper range conditions on a secondary column are 
specified. If the tables aren't sorted by both columns, appropriate sorts 
should be added.

To limit the impact of this change we also propose adding a new parameter 
(tentatively named "spark.sql.join.smj.useInnerRangeOptimization") which could 
be used to switch off the optimization entirely.

Potential use-cases for this are joins based on spatial or temporal distance 
calculations.

 

  was:
The problem we are solving is the case where you have two big tables 
partitioned by X column, but also sorted by Y column (within partitions) and 
you need to calculate an expensive function on the joined rows. During a 
sort-merge join, Spark will do cross-joins of all rows that have the same X 
values and calculate the function's value on all of them. If the two tables 
have a large number of rows per X, this can result in a huge number of 
calculations.

We hereby propose an optimization that would allow you to reduce the number of 
matching rows per X using a range condition on Y columns of the two tables. 
Something like:

... WHERE t1.X = t2.X AND t1.Y BETWEEN t2.Y - d AND t2.Y + d

The way SMJ is currently 

[jira] [Updated] (SPARK-24020) Sort-merge join inner range optimization

2018-06-28 Thread Petar Zecevic (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Petar Zecevic updated SPARK-24020:
--
Attachment: SMJ-innerRange-PR24020-designDoc.pdf

> Sort-merge join inner range optimization
> 
>
> Key: SPARK-24020
> URL: https://issues.apache.org/jira/browse/SPARK-24020
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Petar Zecevic
>Priority: Major
> Attachments: SMJ-innerRange-PR24020-designDoc.pdf
>
>
> The problem we are solving is the case where you have two big tables 
> partitioned by X column, but also sorted by Y column (within partitions) and 
> you need to calculate an expensive function on the joined rows. During a 
> sort-merge join, Spark will do cross-joins of all rows that have the same X 
> values and calculate the function's value on all of them. If the two tables 
> have a large number of rows per X, this can result in a huge number of 
> calculations.
> We hereby propose an optimization that would allow you to reduce the number 
> of matching rows per X using a range condition on Y columns of the two 
> tables. Something like:
> ... WHERE t1.X = t2.X AND t1.Y BETWEEN t2.Y - d AND t2.Y + d
> The way SMJ is currently implemented, these extra conditions have no 
> influence on the number of rows (per X) being checked because these extra 
> conditions are put in the same block with the function being calculated.
> Here we propose a change to the sort-merge join so that, when these extra 
> conditions are specified, a queue is used instead of the 
> ExternalAppendOnlyUnsafeRowArray class. This queue would then used as a 
> moving window across the values from the right relation as the left row 
> changes. You could call this a combination of an equi-join and a theta join 
> (we call it "sort-merge inner range join").
> Potential use-cases for this are joins based on spatial or temporal distance 
> calculations.
> The optimization should be triggered automatically when an equi-join 
> expression is present AND lower and upper range conditions on a secondary 
> column are specified. If the tables aren't sorted by both columns, 
> appropriate sorts should be added.
> To limit the impact of this change we also propose adding a new parameter 
> (tentatively named "spark.sql.join.smj.useInnerRangeOptimization") which 
> could be used to switch off the optimization entirely.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org