[ https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Asif updated SPARK-44662: ------------------------- Description: h2. *Q1. What are you trying to do? Articulate your objectives using absolutely no jargon.* On the lines of DPP which helps DataSourceV2 relations when the joining key is a partition column, the same concept can be extended over to the case where joining key is not a partition column. The Keys of BroadcastHashJoin are already available before actual evaluation of the stream iterator. These keys can be pushed down to the DataSource as a SortedSet. For non partition columns, the DataSources like iceberg have max/min stats on column available at manifest level, and for formats like parquet , they have max/min stats at various storage level. The passed SortedSet can be used to prune using ranges at both driver level ( manifests files) as well as executor level ( while actually going through chunks , row groups etc at parquet level) If the data is stored as Columnar Batch format , then it would not be possible to filter out individual row at DataSource level, even though we have keys. But at the scan level, ( ColumnToRowExec) it is still possible to filter out as many rows as possible , if the query involves nested joins. Thus reducing the number of rows to join at the higher join levels. Will be adding more details.. h2. *Q2. What problem is this proposal NOT designed to solve?* This can only help in BroadcastHashJoin's performance if the join is Inner or Left Semi. This will also not work if there are nodes like Expand, Generator , Aggregate (without group by on keys not part of joining column etc) below the BroadcastHashJoin node being targeted. h2. *Q3. How is it done today, and what are the limits of current practice?* Currently this sort of pruning at DataSource level is being done using DPP (Dynamic Partition Pruning ) and IFF one of the join key column is a Partitioning column ( so that cost of DPP query is justified and way less than amount of data it will be filtering by skipping partitions). The limitation is that DPP type approach is not implemented ( intentionally I believe), if the join column is a non partition column ( because of cost of "DPP type" query would most likely be way high as compared to any possible pruning ( especially if the column is not stored in a sorted manner). h2. *Q4. What is new in your approach and why do you think it will be successful?* 1) This allows pruning on non partition column based joins. 2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP type" query. 3) The Data can be used by DataSource to prune at driver (possibly) and also at executor level ( as in case of parquet which has max/min at various structure levels) 4) The big benefit should be seen in multilevel nested join queries. In the current code base, if I am correct, only one join's pruning filter would get pushed at scan level. Since it is on partition key may be that is sufficient. But if it is a nested Join query , and may be involving different columns on streaming side for join, each such filter push could do significant pruning. This requires some handling in case of AQE, as the stream side iterator ( & hence stage evaluation needs to be delayed, till all the available join filters in the nested tree are pushed at their respective target BatchScanExec). {anchor:singleRowFilter}5) In case of nested broadcasted joins, if the datasource is column vector oriented , then what spark would get is a ColumnarBatch. But because scans have Filters from multiple joins, they can be retrieved and can be applied in code generated at ColumnToRowExec level, using a new "containsKey" method on HashedRelation. Thus only those rows which satisfy all the BroadcastedHashJoins ( whose keys have been pushed) , will be used for join evaluation. The code is already there , will be opening a PR. For non partition table TPCDS run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain. For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 10% to 37%. h2. *Q5. Who cares? If you are successful, what difference will it make?* If use cases involve multiple joins especially when the join columns are non partitioned, and performance is a criteria, this PR *might* help. h2. Q6. What are the risks?* Well the changes are extensive. review will be painful ( if it happens). Though code is being tested continuously and adding more tests , with big change, some possibility of bugs is there. But as of now, I think the code is robust. To get the Perf benefit fully, the pushed filters utilization needs to be implemented on the DataSource side too. Have already done it for *iceberg*. But I believe atleast in case of Nested Broadcast Hash Joins, [#singleRowFilter] approach would still result in perf benefit. Q7. How long will it take? The code is already implemented. will be opening a PR for review. whatever time needed is for review and discussion. Q8. What are the mid-term and final “exams” to check for success? All tests should pass. The perf benefit should justify the changes. was: h2. *Q1. What are you trying to do? Articulate your objectives using absolutely no jargon.* On the lines of DPP which helps DataSourceV2 relations when the joining key is a partition column, the same concept can be extended over to the case where joining key is not a partition column. The Keys of BroadcastHashJoin are already available before actual evaluation of the stream iterator. These keys can be pushed down to the DataSource as a SortedSet. For non partition columns, the DataSources like iceberg have max/min stats on column available at manifest level, and for formats like parquet , they have max/min stats at various storage level. The passed SortedSet can be used to prune using ranges at both driver level ( manifests files) as well as executor level ( while actually going through chunks , row groups etc at parquet level) If the data is stored as Columnar Batch format , then it would not be possible to filter out individual row at DataSource level, even though we have keys. But at the scan level, ( ColumnToRowExec) it is still possible to filter out as many rows as possible , if the query involves nested joins. Thus reducing the number of rows to join at the higher join levels. Will be adding more details.. h2. *Q2. What problem is this proposal NOT designed to solve?* This can only help in BroadcastHashJoin's performance if the join is Inner or Left Semi. This will also not work if there are nodes like Expand, Generator , Aggregate (without group by on keys not part of joining column etc) below the BroadcastHashJoin node being targeted. h2. *Q3. How is it done today, and what are the limits of current practice?* Currently this sort of pruning at DataSource level is being done using DPP (Dynamic Partition Pruning ) and IFF one of the join key column is a Partitioning column ( so that cost of DPP query is justified and way less than amount of data it will be filtering by skipping partitions). The limitation is that DPP type approach is not implemented ( intentionally I believe), if the join column is a non partition column ( because of cost of "DPP type" query would most likely be way high as compared to any possible pruning ( especially if the column is not stored in a sorted manner). h2. *Q4. What is new in your approach and why do you think it will be successful?* 1) This allows pruning on non partition column based joins. 2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP type" query. 3) The Data can be used by DataSource to prune at driver (possibly) and also at executor level ( as in case of parquet which has max/min at various structure levels) 4) The big benefit should be seen in multilevel nested join queries. In the current code base, if I am correct, only one join's pruning filter would get pushed at scan level. Since it is on partition key may be that is sufficient. But if it is a nested Join query , and may be involving different columns on streaming side for join, each such filter push could do significant pruning. This requires some handling in case of AQE, as the stream side iterator ( & hence stage evaluation needs to be delayed, till all the available join filters in the nested tree are pushed at their respective target BatchScanExec). {anchor:singleRowFilter}5) In case of nested broadcasted joins, if the datasource is column vector oriented , then what spark would get is a ColumnarBatch. But because scans have Filters from multiple joins, they can be retrieved and can be applied in code generated at ColumnToRowExec level, using a new "containsKey" method on HashedRelation. Thus only those rows which satisfy all the BroadcastedHashJoins ( whose keys have been pushed) , will be used for join evaluation. The code is already there , will be opening a PR. For non partition table TPCDS run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain. For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 10% to 37%. h2. *Q5. Who cares? If you are successful, what difference will it make?* If use cases involve multiple joins especially when the join columns are non partitioned, and performance is a criteria, this PR *might* help. h2. Q6. What are the risks?* Well the changes are extensive. review will be painful ( if it happens). Though code is being tested continuously and adding more tests , with big change, some possibility of bugs is there. But as of now, I think the code is robust. To get the Perf benefit fully, the pushed filters utilization needs to be implemented on the DataSource side too. Have already done it for *iceberg*. But I believe atleast in case of Nested Broadcast Hash Joins, approach would still result in perf benefit. Q7. How long will it take? The code is already implemented. will be opening a PR for review. whatever time needed is for review and discussion. Q8. What are the mid-term and final “exams” to check for success? All tests should pass. The perf benefit should justify the changes. > SPIP: Improving performance of BroadcastHashJoin queries with stream side > join key on non partition columns > ----------------------------------------------------------------------------------------------------------- > > Key: SPARK-44662 > URL: https://issues.apache.org/jira/browse/SPARK-44662 > Project: Spark > Issue Type: Improvement > Components: SQL > Affects Versions: 3.3.3 > Reporter: Asif > Priority: Major > Fix For: 3.3.3 > > > h2. *Q1. What are you trying to do? Articulate your objectives using > absolutely no jargon.* > On the lines of DPP which helps DataSourceV2 relations when the joining key > is a partition column, the same concept can be extended over to the case > where joining key is not a partition column. > The Keys of BroadcastHashJoin are already available before actual evaluation > of the stream iterator. These keys can be pushed down to the DataSource as a > SortedSet. > For non partition columns, the DataSources like iceberg have max/min stats on > column available at manifest level, and for formats like parquet , they have > max/min stats at various storage level. The passed SortedSet can be used to > prune using ranges at both driver level ( manifests files) as well as > executor level ( while actually going through chunks , row groups etc at > parquet level) > If the data is stored as Columnar Batch format , then it would not be > possible to filter out individual row at DataSource level, even though we > have keys. > But at the scan level, ( ColumnToRowExec) it is still possible to filter out > as many rows as possible , if the query involves nested joins. Thus reducing > the number of rows to join at the higher join levels. > Will be adding more details.. > h2. *Q2. What problem is this proposal NOT designed to solve?* > This can only help in BroadcastHashJoin's performance if the join is Inner or > Left Semi. > This will also not work if there are nodes like Expand, Generator , Aggregate > (without group by on keys not part of joining column etc) below the > BroadcastHashJoin node being targeted. > h2. *Q3. How is it done today, and what are the limits of current practice?* > Currently this sort of pruning at DataSource level is being done using DPP > (Dynamic Partition Pruning ) and IFF one of the join key column is a > Partitioning column ( so that cost of DPP query is justified and way less > than amount of data it will be filtering by skipping partitions). > The limitation is that DPP type approach is not implemented ( intentionally I > believe), if the join column is a non partition column ( because of cost of > "DPP type" query would most likely be way high as compared to any possible > pruning ( especially if the column is not stored in a sorted manner). > h2. *Q4. What is new in your approach and why do you think it will be > successful?* > 1) This allows pruning on non partition column based joins. > 2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP > type" query. > 3) The Data can be used by DataSource to prune at driver (possibly) and also > at executor level ( as in case of parquet which has max/min at various > structure levels) > 4) The big benefit should be seen in multilevel nested join queries. In the > current code base, if I am correct, only one join's pruning filter would get > pushed at scan level. Since it is on partition key may be that is sufficient. > But if it is a nested Join query , and may be involving different columns on > streaming side for join, each such filter push could do significant pruning. > This requires some handling in case of AQE, as the stream side iterator ( & > hence stage evaluation needs to be delayed, till all the available join > filters in the nested tree are pushed at their respective target > BatchScanExec). > {anchor:singleRowFilter}5) In case of nested broadcasted joins, if the > datasource is column vector oriented , then what spark would get is a > ColumnarBatch. But because scans have Filters from multiple joins, they can > be retrieved and can be applied in code generated at ColumnToRowExec level, > using a new "containsKey" method on HashedRelation. Thus only those rows > which satisfy all the BroadcastedHashJoins ( whose keys have been pushed) , > will be used for join evaluation. > The code is already there , will be opening a PR. For non partition table > TPCDS run on laptop with TPCDS data size of ( scale factor 4), I am seeing > 15% gain. > For partition table TPCDS, there is improvement in 4 - 5 queries to the tune > of 10% to 37%. > h2. *Q5. Who cares? If you are successful, what difference will it make?* > If use cases involve multiple joins especially when the join columns are non > partitioned, and performance is a criteria, this PR *might* help. > h2. Q6. What are the risks?* > Well the changes are extensive. review will be painful ( if it happens). > Though code is being tested continuously and adding more tests , with big > change, some possibility of bugs is there. But as of now, I think the code is > robust. To get the Perf benefit fully, the pushed filters utilization needs > to be implemented on the DataSource side too. Have already done it for > *iceberg*. But I believe atleast in case of Nested Broadcast Hash Joins, > [#singleRowFilter] approach would still result in perf benefit. > Q7. How long will it take? > The code is already implemented. will be opening a PR for review. whatever > time needed is for review and discussion. > Q8. What are the mid-term and final “exams” to check for success? > All tests should pass. > The perf benefit should justify the changes. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org