[ 
https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Asif updated SPARK-44662:
-------------------------
    Affects Version/s: 3.5.0
                           (was: 3.5.1)

> SPIP: Improving performance of BroadcastHashJoin queries with stream side 
> join key on non partition columns
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-44662
>                 URL: https://issues.apache.org/jira/browse/SPARK-44662
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.5.0
>            Reporter: Asif
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: perf results broadcast var pushdown - Partitioned 
> TPCDS.pdf
>
>
> h2. *Q1. What are you trying to do? Articulate your objectives using 
> absolutely no jargon.*
> On the lines of DPP which helps DataSourceV2 relations when the joining key 
> is a partition column, the same concept can be extended over to the case 
> where joining key is not a partition column.
> The Keys of BroadcastHashJoin are already available before actual evaluation 
> of the stream iterator. These keys can be pushed down to the DataSource as a 
> SortedSet.
> For non partition columns, the DataSources like iceberg have max/min stats on 
> column available at manifest level, and for formats like parquet , they have 
> max/min stats at various storage level. The passed SortedSet can be used to 
> prune using ranges at both driver level ( manifests files) as well as 
> executor level ( while actually going through chunks , row groups etc at 
> parquet level)
> If the data is stored as Columnar Batch format , then it would not be 
> possible to filter out individual row at DataSource level, even though we 
> have keys.
> But at the scan level, ( ColumnToRowExec) it is still possible to filter out 
> as many rows as possible , if the query involves nested joins. Thus reducing 
> the number of rows to join at the higher join levels.
> Will be adding more details..
> h2. *Q2. What problem is this proposal NOT designed to solve?*
> This can only help in BroadcastHashJoin's performance if the join is Inner or 
> Left Semi.
> This will also not work if there are nodes like Expand, Generator , Aggregate 
> (without group by on keys not part of joining column etc) below the 
> BroadcastHashJoin node being targeted.
> h2. *Q3. How is it done today, and what are the limits of current practice?*
> Currently this sort of pruning at DataSource level is being done using DPP 
> (Dynamic Partition Pruning ) and IFF one of the join key column is a 
> Partitioning column ( so that cost of DPP query is justified and way less 
> than amount of data it will be filtering by skipping partitions).
> The limitation is that DPP type approach is not implemented ( intentionally I 
> believe), if the join column is a non partition column ( because of cost of 
> "DPP type" query would most likely be way high as compared to any possible 
> pruning ( especially if the column is not stored in a sorted manner).
> h2. *Q4. What is new in your approach and why do you think it will be 
> successful?*
> 1) This allows pruning on non partition column based joins. 
> 2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP 
> type" query. 
> 3) The Data can be used by DataSource to prune at driver (possibly) and also 
> at executor level ( as in case of parquet which has max/min at various 
> structure levels)
> 4) The big benefit should be seen in multilevel nested join queries. In the 
> current code base, if I am correct, only one join's pruning filter would get 
> pushed at scan level. Since it is on partition key may be that is sufficient. 
> But if it is a nested Join query , and may be involving different columns on 
> streaming side for join, each such filter push could do significant pruning. 
> This requires some handling in case of AQE, as the stream side iterator ( & 
> hence stage evaluation needs to be delayed, till all the available join 
> filters in the nested tree are pushed at their respective target 
> BatchScanExec).
> h4. *Single Row Filteration*
> 5) In case of nested broadcasted joins, if the datasource is column vector 
> oriented , then what spark would get is a ColumnarBatch. But because scans 
> have Filters from multiple joins, they can be retrieved and can be applied in 
> code generated at ColumnToRowExec level, using a new "containsKey" method on 
> HashedRelation. Thus only those rows which satisfy all the 
> BroadcastedHashJoins ( whose keys have been pushed) , will be used for join 
> evaluation.
> The code is already there , the PR on spark side is 
> [spark-broadcast-var|https://github.com/apache/spark/pull/43373]. For non 
> partition table TPCDS run on laptop with TPCDS data size of ( scale factor 
> 4), I am seeing 15% gain.
> For partition table TPCDS, there is improvement in 4 - 5 queries to the tune 
> of 10% to 37%.
> h2. *Q5. Who cares? If you are successful, what difference will it make?*
> If use cases involve multiple joins especially when the join columns are non 
> partitioned, and performance is a criteria, this PR *might* help.
> h2. *Q6. What are the risks?*
> Well the changes are extensive. review will be painful . Though code is being 
> tested continuously and adding more tests , with big change, some possibility 
> of bugs is there. But as of now, I think the code is robust. To get the Perf 
> benefit fully, the pushed filters utilization needs to be implemented on the 
> DataSource side too. Have already done it for {*}iceberg{*}. But I believe 
> atleast in case of Nested Broadcast Hash Joins, [#singleRowFilter] approach 
> would still result in perf benefit, even with Default implementation which 
> can be given in *SupportsRuntimeFiltering*
> h2. *Q7. How long will it take?*
> The code is already implemented. PR is already opened. whatever time needed 
> is for review and discussion.
> h2. *Q8. What are the mid-term and final “exams” to check for success?*
> All tests should pass.
> The perf benefit should justify the changes.
> Attaching an excel which indicates perf results on tpcds. However as of now 
> the test is done locally on laptop with scale factor of 4. I wil see if I can 
> get a full fledged tpcds run.
> The perf results are obtained for partitioned and non partitioned tables. And 
> the data source V2 implementation is using iceberg.
> I will be opening a separate PR on iceberg based on the new functions added 
> in SupportsRuntimeV2Filtering iterface.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to