[jira] [Commented] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns
[ https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784282#comment-17784282 ] Asif commented on SPARK-44662: -- The changes for iceberg which support broadcast-var-pushdown are present in the git repo: [iceberg-repo|https://github.com/ahshahid/iceberg.git] branch : broadcastvar-push. The changes done in the iceberg branch are compatible with latest apache/spark master ( identified as 3.5 to iceberg) and tested and compiled using scala 2.13. To get the iceberg-spark-run-time jar for use: First locally install the spark jars using the PR of spark mentioned below. (./build/mvn clean install -Phive -Phive-thriftserver -DskipTests) Then use the iceberg branch broadcastvar-push to create the iceberg spark runtime jar such that it uses the locally installed spark as dependency. In case you are interested in evaluating performance, pls let me know. > SPIP: Improving performance of BroadcastHashJoin queries with stream side > join key on non partition columns > --- > > Key: SPARK-44662 > URL: https://issues.apache.org/jira/browse/SPARK-44662 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.5.1 >Reporter: Asif >Priority: Major > Labels: pull-request-available > Attachments: perf results broadcast var pushdown - Partitioned > TPCDS.pdf > > > h2. *Q1. What are you trying to do? Articulate your objectives using > absolutely no jargon.* > On the lines of DPP which helps DataSourceV2 relations when the joining key > is a partition column, the same concept can be extended over to the case > where joining key is not a partition column. > The Keys of BroadcastHashJoin are already available before actual evaluation > of the stream iterator. These keys can be pushed down to the DataSource as a > SortedSet. > For non partition columns, the DataSources like iceberg have max/min stats on > column available at manifest level, and for formats like parquet , they have > max/min stats at various storage level. The passed SortedSet can be used to > prune using ranges at both driver level ( manifests files) as well as > executor level ( while actually going through chunks , row groups etc at > parquet level) > If the data is stored as Columnar Batch format , then it would not be > possible to filter out individual row at DataSource level, even though we > have keys. > But at the scan level, ( ColumnToRowExec) it is still possible to filter out > as many rows as possible , if the query involves nested joins. Thus reducing > the number of rows to join at the higher join levels. > Will be adding more details.. > h2. *Q2. What problem is this proposal NOT designed to solve?* > This can only help in BroadcastHashJoin's performance if the join is Inner or > Left Semi. > This will also not work if there are nodes like Expand, Generator , Aggregate > (without group by on keys not part of joining column etc) below the > BroadcastHashJoin node being targeted. > h2. *Q3. How is it done today, and what are the limits of current practice?* > Currently this sort of pruning at DataSource level is being done using DPP > (Dynamic Partition Pruning ) and IFF one of the join key column is a > Partitioning column ( so that cost of DPP query is justified and way less > than amount of data it will be filtering by skipping partitions). > The limitation is that DPP type approach is not implemented ( intentionally I > believe), if the join column is a non partition column ( because of cost of > "DPP type" query would most likely be way high as compared to any possible > pruning ( especially if the column is not stored in a sorted manner). > h2. *Q4. What is new in your approach and why do you think it will be > successful?* > 1) This allows pruning on non partition column based joins. > 2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP > type" query. > 3) The Data can be used by DataSource to prune at driver (possibly) and also > at executor level ( as in case of parquet which has max/min at various > structure levels) > 4) The big benefit should be seen in multilevel nested join queries. In the > current code base, if I am correct, only one join's pruning filter would get > pushed at scan level. Since it is on partition key may be that is sufficient. > But if it is a nested Join query , and may be involving different columns on > streaming side for join, each such filter push could do significant pruning. > This requires some handling in case of AQE, as the stream side iterator ( & > hence stage evaluation needs to be delayed, till all the available join > filters in the nested tree are pushed at their respective targe
[jira] [Commented] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns
[ https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17782927#comment-17782927 ] Asif commented on SPARK-44662: -- The majority of file changes are due to additional tpcds tests for iceberg. These will not be included as such in final PR > SPIP: Improving performance of BroadcastHashJoin queries with stream side > join key on non partition columns > --- > > Key: SPARK-44662 > URL: https://issues.apache.org/jira/browse/SPARK-44662 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.5.1 >Reporter: Asif >Priority: Major > Labels: pull-request-available > Attachments: perf results broadcast var pushdown - Partitioned > TPCDS.pdf > > > h2. *Q1. What are you trying to do? Articulate your objectives using > absolutely no jargon.* > On the lines of DPP which helps DataSourceV2 relations when the joining key > is a partition column, the same concept can be extended over to the case > where joining key is not a partition column. > The Keys of BroadcastHashJoin are already available before actual evaluation > of the stream iterator. These keys can be pushed down to the DataSource as a > SortedSet. > For non partition columns, the DataSources like iceberg have max/min stats on > column available at manifest level, and for formats like parquet , they have > max/min stats at various storage level. The passed SortedSet can be used to > prune using ranges at both driver level ( manifests files) as well as > executor level ( while actually going through chunks , row groups etc at > parquet level) > If the data is stored as Columnar Batch format , then it would not be > possible to filter out individual row at DataSource level, even though we > have keys. > But at the scan level, ( ColumnToRowExec) it is still possible to filter out > as many rows as possible , if the query involves nested joins. Thus reducing > the number of rows to join at the higher join levels. > Will be adding more details.. > h2. *Q2. What problem is this proposal NOT designed to solve?* > This can only help in BroadcastHashJoin's performance if the join is Inner or > Left Semi. > This will also not work if there are nodes like Expand, Generator , Aggregate > (without group by on keys not part of joining column etc) below the > BroadcastHashJoin node being targeted. > h2. *Q3. How is it done today, and what are the limits of current practice?* > Currently this sort of pruning at DataSource level is being done using DPP > (Dynamic Partition Pruning ) and IFF one of the join key column is a > Partitioning column ( so that cost of DPP query is justified and way less > than amount of data it will be filtering by skipping partitions). > The limitation is that DPP type approach is not implemented ( intentionally I > believe), if the join column is a non partition column ( because of cost of > "DPP type" query would most likely be way high as compared to any possible > pruning ( especially if the column is not stored in a sorted manner). > h2. *Q4. What is new in your approach and why do you think it will be > successful?* > 1) This allows pruning on non partition column based joins. > 2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP > type" query. > 3) The Data can be used by DataSource to prune at driver (possibly) and also > at executor level ( as in case of parquet which has max/min at various > structure levels) > 4) The big benefit should be seen in multilevel nested join queries. In the > current code base, if I am correct, only one join's pruning filter would get > pushed at scan level. Since it is on partition key may be that is sufficient. > But if it is a nested Join query , and may be involving different columns on > streaming side for join, each such filter push could do significant pruning. > This requires some handling in case of AQE, as the stream side iterator ( & > hence stage evaluation needs to be delayed, till all the available join > filters in the nested tree are pushed at their respective target > BatchScanExec). > h4. *Single Row Filteration* > 5) In case of nested broadcasted joins, if the datasource is column vector > oriented , then what spark would get is a ColumnarBatch. But because scans > have Filters from multiple joins, they can be retrieved and can be applied in > code generated at ColumnToRowExec level, using a new "containsKey" method on > HashedRelation. Thus only those rows which satisfy all the > BroadcastedHashJoins ( whose keys have been pushed) , will be used for join > evaluation. > The code is already there , the PR on spark side is > [spark-broadcast-var|https://github.com/a