[ 
https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Asif updated SPARK-44662:
-------------------------
    Description: 
h2. *Q1. What are you trying to do? Articulate your objectives using absolutely 
no jargon.*

On the lines of DPP which helps DataSourceV2 relations when the joining key is 
a partition column, the same concept can be extended over to the case where 
joining key is not a partition column.
The Keys of BroadcastHashJoin are already available before actual evaluation of 
the stream iterator. These keys can be pushed down to the DataSource as a 
SortedSet.
For non partition columns, the DataSources like iceberg have max/min stats on 
column available at manifest level, and for formats like parquet , they have 
max/min stats at various storage level. The passed SortedSet can be used to 
prune using ranges at both driver level ( manifests files) as well as executor 
level ( while actually going through chunks , row groups etc at parquet level)

If the data is stored as Columnar Batch format , then it would not be possible 
to filter out individual row at DataSource level, even though we have keys.
But at the scan level, ( ColumnToRowExec) it is still possible to filter out as 
many rows as possible , if the query involves nested joins. Thus reducing the 
number of rows to join at the higher join levels.

Will be adding more details..
h2. *Q2. What problem is this proposal NOT designed to solve?*

This can only help in BroadcastHashJoin's performance if the join is Inner or 
Left Semi.
This will also not work if there are nodes like Expand, Generator , Aggregate 
(without group by on keys not part of joining column etc) below the 
BroadcastHashJoin node being targeted.
h2. *Q3. How is it done today, and what are the limits of current practice?*

Currently this sort of pruning at DataSource level is being done using DPP 
(Dynamic Partition Pruning ) and IFF one of the join key column is a 
Partitioning column ( so that cost of DPP query is justified and way less than 
amount of data it will be filtering by skipping partitions).
The limitation is that DPP type approach is not implemented ( intentionally I 
believe), if the join column is a non partition column ( because of cost of 
"DPP type" query would most likely be way high as compared to any possible 
pruning ( especially if the column is not stored in a sorted manner).
h2. *Q4. What is new in your approach and why do you think it will be 
successful?*

1) This allows pruning on non partition column based joins. 
2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP 
type" query. 
3) The Data can be used by DataSource to prune at driver (possibly) and also at 
executor level ( as in case of parquet which has max/min at various structure 
levels)

4) The big benefit should be seen in multilevel nested join queries. In the 
current code base, if I am correct, only one join's pruning filter would get 
pushed at scan level. Since it is on partition key may be that is sufficient. 
But if it is a nested Join query , and may be involving different columns on 
streaming side for join, each such filter push could do significant pruning. 
This requires some handling in case of AQE, as the stream side iterator ( & 
hence stage evaluation needs to be delayed, till all the available join filters 
in the nested tree are pushed at their respective target BatchScanExec).

{:singleRowFilter}5) In case of nested broadcasted joins, if the datasource is 
column vector oriented , then what spark would get is a ColumnarBatch. But 
because scans have Filters from multiple joins, they can be retrieved and can 
be applied in code generated at ColumnToRowExec level, using a new 
"containsKey" method on HashedRelation. Thus only those rows which satisfy all 
the BroadcastedHashJoins ( whose keys have been pushed) , will be used for join 
evaluation.

The code is already there , will be opening a PR. For non partition table TPCDS 
run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain.

For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 
10% to 37%.

h2. *Q5. Who cares? If you are successful, what difference will it make?*

If use cases involve multiple joins especially when the join columns are non 
partitioned, and performance is a criteria, this PR *might* help.

h2. *Q6. What are the risks?*

Well the changes are extensive. review will be painful ( if it happens). Though 
code is being tested continuously and adding more tests , with big change, some 
possibility of bugs is there. But as of now, I think the code is robust. To get 
the Perf benefit fully, the pushed filters utilization needs to be implemented 
on the DataSource side too. Have already done it for *iceberg*. But I believe 
atleast in case of Nested Broadcast Hash Joins, [#singleRowFilter] approach 
would still result in perf benefit.

h2. *Q7. How long will it take?*
The code is already implemented. will be opening a PR for review. whatever time 
needed is for review and discussion.


h2. *Q8. What are the mid-term and final “exams” to check for success?*
All tests should pass.
The perf benefit should justify the changes.

  was:
h2. *Q1. What are you trying to do? Articulate your objectives using absolutely 
no jargon.*

On the lines of DPP which helps DataSourceV2 relations when the joining key is 
a partition column, the same concept can be extended over to the case where 
joining key is not a partition column.
The Keys of BroadcastHashJoin are already available before actual evaluation of 
the stream iterator. These keys can be pushed down to the DataSource as a 
SortedSet.
For non partition columns, the DataSources like iceberg have max/min stats on 
column available at manifest level, and for formats like parquet , they have 
max/min stats at various storage level. The passed SortedSet can be used to 
prune using ranges at both driver level ( manifests files) as well as executor 
level ( while actually going through chunks , row groups etc at parquet level)

If the data is stored as Columnar Batch format , then it would not be possible 
to filter out individual row at DataSource level, even though we have keys.
But at the scan level, ( ColumnToRowExec) it is still possible to filter out as 
many rows as possible , if the query involves nested joins. Thus reducing the 
number of rows to join at the higher join levels.

Will be adding more details..
h2. *Q2. What problem is this proposal NOT designed to solve?*

This can only help in BroadcastHashJoin's performance if the join is Inner or 
Left Semi.
This will also not work if there are nodes like Expand, Generator , Aggregate 
(without group by on keys not part of joining column etc) below the 
BroadcastHashJoin node being targeted.
h2. *Q3. How is it done today, and what are the limits of current practice?*

Currently this sort of pruning at DataSource level is being done using DPP 
(Dynamic Partition Pruning ) and IFF one of the join key column is a 
Partitioning column ( so that cost of DPP query is justified and way less than 
amount of data it will be filtering by skipping partitions).
The limitation is that DPP type approach is not implemented ( intentionally I 
believe), if the join column is a non partition column ( because of cost of 
"DPP type" query would most likely be way high as compared to any possible 
pruning ( especially if the column is not stored in a sorted manner).
h2. *Q4. What is new in your approach and why do you think it will be 
successful?*

1) This allows pruning on non partition column based joins. 
2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP 
type" query. 
3) The Data can be used by DataSource to prune at driver (possibly) and also at 
executor level ( as in case of parquet which has max/min at various structure 
levels)

4) The big benefit should be seen in multilevel nested join queries. In the 
current code base, if I am correct, only one join's pruning filter would get 
pushed at scan level. Since it is on partition key may be that is sufficient. 
But if it is a nested Join query , and may be involving different columns on 
streaming side for join, each such filter push could do significant pruning. 
This requires some handling in case of AQE, as the stream side iterator ( & 
hence stage evaluation needs to be delayed, till all the available join filters 
in the nested tree are pushed at their respective target BatchScanExec).

{anchor:singleRowFilter}5) In case of nested broadcasted joins, if the 
datasource is column vector oriented , then what spark would get is a 
ColumnarBatch. But because scans have Filters from multiple joins, they can be 
retrieved and can be applied in code generated at ColumnToRowExec level, using 
a new "containsKey" method on HashedRelation. Thus only those rows which 
satisfy all the BroadcastedHashJoins ( whose keys have been pushed) , will be 
used for join evaluation.

The code is already there , will be opening a PR. For non partition table TPCDS 
run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain.

For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 
10% to 37%.
h2. *Q5. Who cares? If you are successful, what difference will it make?*

If use cases involve multiple joins especially when the join columns are non 
partitioned, and performance is a criteria, this PR *might* help.
h2. Q6. What are the risks?*

Well the changes are extensive. review will be painful ( if it happens). Though 
code is being tested continuously and adding more tests , with big change, some 
possibility of bugs is there. But as of now, I think the code is robust. To get 
the Perf benefit fully, the pushed filters utilization needs to be implemented 
on the DataSource side too. Have already done it for *iceberg*. But I believe 
atleast in case of Nested Broadcast Hash Joins, [#singleRowFilter] approach 
would still result in perf benefit.

Q7. How long will it take?
The code is already implemented. will be opening a PR for review. whatever time 
needed is for review and discussion.


Q8. What are the mid-term and final “exams” to check for success?
All tests should pass.
The perf benefit should justify the changes.


> SPIP: Improving performance of BroadcastHashJoin queries with stream side 
> join key on non partition columns
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-44662
>                 URL: https://issues.apache.org/jira/browse/SPARK-44662
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.3.3
>            Reporter: Asif
>            Priority: Major
>             Fix For: 3.3.3
>
>
> h2. *Q1. What are you trying to do? Articulate your objectives using 
> absolutely no jargon.*
> On the lines of DPP which helps DataSourceV2 relations when the joining key 
> is a partition column, the same concept can be extended over to the case 
> where joining key is not a partition column.
> The Keys of BroadcastHashJoin are already available before actual evaluation 
> of the stream iterator. These keys can be pushed down to the DataSource as a 
> SortedSet.
> For non partition columns, the DataSources like iceberg have max/min stats on 
> column available at manifest level, and for formats like parquet , they have 
> max/min stats at various storage level. The passed SortedSet can be used to 
> prune using ranges at both driver level ( manifests files) as well as 
> executor level ( while actually going through chunks , row groups etc at 
> parquet level)
> If the data is stored as Columnar Batch format , then it would not be 
> possible to filter out individual row at DataSource level, even though we 
> have keys.
> But at the scan level, ( ColumnToRowExec) it is still possible to filter out 
> as many rows as possible , if the query involves nested joins. Thus reducing 
> the number of rows to join at the higher join levels.
> Will be adding more details..
> h2. *Q2. What problem is this proposal NOT designed to solve?*
> This can only help in BroadcastHashJoin's performance if the join is Inner or 
> Left Semi.
> This will also not work if there are nodes like Expand, Generator , Aggregate 
> (without group by on keys not part of joining column etc) below the 
> BroadcastHashJoin node being targeted.
> h2. *Q3. How is it done today, and what are the limits of current practice?*
> Currently this sort of pruning at DataSource level is being done using DPP 
> (Dynamic Partition Pruning ) and IFF one of the join key column is a 
> Partitioning column ( so that cost of DPP query is justified and way less 
> than amount of data it will be filtering by skipping partitions).
> The limitation is that DPP type approach is not implemented ( intentionally I 
> believe), if the join column is a non partition column ( because of cost of 
> "DPP type" query would most likely be way high as compared to any possible 
> pruning ( especially if the column is not stored in a sorted manner).
> h2. *Q4. What is new in your approach and why do you think it will be 
> successful?*
> 1) This allows pruning on non partition column based joins. 
> 2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP 
> type" query. 
> 3) The Data can be used by DataSource to prune at driver (possibly) and also 
> at executor level ( as in case of parquet which has max/min at various 
> structure levels)
> 4) The big benefit should be seen in multilevel nested join queries. In the 
> current code base, if I am correct, only one join's pruning filter would get 
> pushed at scan level. Since it is on partition key may be that is sufficient. 
> But if it is a nested Join query , and may be involving different columns on 
> streaming side for join, each such filter push could do significant pruning. 
> This requires some handling in case of AQE, as the stream side iterator ( & 
> hence stage evaluation needs to be delayed, till all the available join 
> filters in the nested tree are pushed at their respective target 
> BatchScanExec).
> {:singleRowFilter}5) In case of nested broadcasted joins, if the datasource 
> is column vector oriented , then what spark would get is a ColumnarBatch. But 
> because scans have Filters from multiple joins, they can be retrieved and can 
> be applied in code generated at ColumnToRowExec level, using a new 
> "containsKey" method on HashedRelation. Thus only those rows which satisfy 
> all the BroadcastedHashJoins ( whose keys have been pushed) , will be used 
> for join evaluation.
> The code is already there , will be opening a PR. For non partition table 
> TPCDS run on laptop with TPCDS data size of ( scale factor 4), I am seeing 
> 15% gain.
> For partition table TPCDS, there is improvement in 4 - 5 queries to the tune 
> of 10% to 37%.
> h2. *Q5. Who cares? If you are successful, what difference will it make?*
> If use cases involve multiple joins especially when the join columns are non 
> partitioned, and performance is a criteria, this PR *might* help.
> h2. *Q6. What are the risks?*
> Well the changes are extensive. review will be painful ( if it happens). 
> Though code is being tested continuously and adding more tests , with big 
> change, some possibility of bugs is there. But as of now, I think the code is 
> robust. To get the Perf benefit fully, the pushed filters utilization needs 
> to be implemented on the DataSource side too. Have already done it for 
> *iceberg*. But I believe atleast in case of Nested Broadcast Hash Joins, 
> [#singleRowFilter] approach would still result in perf benefit.
> h2. *Q7. How long will it take?*
> The code is already implemented. will be opening a PR for review. whatever 
> time needed is for review and discussion.
> h2. *Q8. What are the mid-term and final “exams” to check for success?*
> All tests should pass.
> The perf benefit should justify the changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to