[ https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yuming Wang updated SPARK-44662: -------------------------------- Target Version/s: (was: 3.3.3) > SPIP: Improving performance of BroadcastHashJoin queries with stream side > join key on non partition columns > ----------------------------------------------------------------------------------------------------------- > > Key: SPARK-44662 > URL: https://issues.apache.org/jira/browse/SPARK-44662 > Project: Spark > Issue Type: Improvement > Components: SQL > Affects Versions: 3.3.3 > Reporter: Asif > Priority: Major > > h2. *Q1. What are you trying to do? Articulate your objectives using > absolutely no jargon.* > On the lines of DPP which helps DataSourceV2 relations when the joining key > is a partition column, the same concept can be extended over to the case > where joining key is not a partition column. > The Keys of BroadcastHashJoin are already available before actual evaluation > of the stream iterator. These keys can be pushed down to the DataSource as a > SortedSet. > For non partition columns, the DataSources like iceberg have max/min stats on > column available at manifest level, and for formats like parquet , they have > max/min stats at various storage level. The passed SortedSet can be used to > prune using ranges at both driver level ( manifests files) as well as > executor level ( while actually going through chunks , row groups etc at > parquet level) > If the data is stored as Columnar Batch format , then it would not be > possible to filter out individual row at DataSource level, even though we > have keys. > But at the scan level, ( ColumnToRowExec) it is still possible to filter out > as many rows as possible , if the query involves nested joins. Thus reducing > the number of rows to join at the higher join levels. > Will be adding more details.. > h2. *Q2. What problem is this proposal NOT designed to solve?* > This can only help in BroadcastHashJoin's performance if the join is Inner or > Left Semi. > This will also not work if there are nodes like Expand, Generator , Aggregate > (without group by on keys not part of joining column etc) below the > BroadcastHashJoin node being targeted. > h2. *Q3. How is it done today, and what are the limits of current practice?* > Currently this sort of pruning at DataSource level is being done using DPP > (Dynamic Partition Pruning ) and IFF one of the join key column is a > Partitioning column ( so that cost of DPP query is justified and way less > than amount of data it will be filtering by skipping partitions). > The limitation is that DPP type approach is not implemented ( intentionally I > believe), if the join column is a non partition column ( because of cost of > "DPP type" query would most likely be way high as compared to any possible > pruning ( especially if the column is not stored in a sorted manner). > h2. *Q4. What is new in your approach and why do you think it will be > successful?* > 1) This allows pruning on non partition column based joins. > 2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP > type" query. > 3) The Data can be used by DataSource to prune at driver (possibly) and also > at executor level ( as in case of parquet which has max/min at various > structure levels) > 4) The big benefit should be seen in multilevel nested join queries. In the > current code base, if I am correct, only one join's pruning filter would get > pushed at scan level. Since it is on partition key may be that is sufficient. > But if it is a nested Join query , and may be involving different columns on > streaming side for join, each such filter push could do significant pruning. > This requires some handling in case of AQE, as the stream side iterator ( & > hence stage evaluation needs to be delayed, till all the available join > filters in the nested tree are pushed at their respective target > BatchScanExec). > h4. *Single Row Filteration* > 5) In case of nested broadcasted joins, if the datasource is column vector > oriented , then what spark would get is a ColumnarBatch. But because scans > have Filters from multiple joins, they can be retrieved and can be applied in > code generated at ColumnToRowExec level, using a new "containsKey" method on > HashedRelation. Thus only those rows which satisfy all the > BroadcastedHashJoins ( whose keys have been pushed) , will be used for join > evaluation. > The code is already there , will be opening a PR. For non partition table > TPCDS run on laptop with TPCDS data size of ( scale factor 4), I am seeing > 15% gain. > For partition table TPCDS, there is improvement in 4 - 5 queries to the tune > of 10% to 37%. > h2. *Q5. Who cares? If you are successful, what difference will it make?* > If use cases involve multiple joins especially when the join columns are non > partitioned, and performance is a criteria, this PR *might* help. > h2. *Q6. What are the risks?* > Well the changes are extensive. review will be painful . Though code is being > tested continuously and adding more tests , with big change, some possibility > of bugs is there. But as of now, I think the code is robust. To get the Perf > benefit fully, the pushed filters utilization needs to be implemented on the > DataSource side too. Have already done it for {*}iceberg{*}. But I believe > atleast in case of Nested Broadcast Hash Joins, [#singleRowFilter] approach > would still result in perf benefit, even with Default implementation which > can be given in *SupportsRuntimeFiltering* > h2. *Q7. How long will it take?* > The code is already implemented. will be opening a PR for review. whatever > time needed is for review and discussion. > h2. *Q8. What are the mid-term and final “exams” to check for success?* > All tests should pass. > The perf benefit should justify the changes. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org