Migration from Spark 2.4.0 to Spark 3.1.1 caused SortMergeJoin to change to BroadcastHashJoin
Hi all, I hope everyone is doing well. I'm currently working on a Spark migration project that aims to migrate all Spark SQL pipelines for Spark 3.x version and take advantage of all performance improvements on it. My company is using Spark 2.4.0 but we are targeting to use officially the 3.1.1 for all Spark SQL data pipelines but without AQE enabled yet. The primary goal is to keep everything the same but use the newest version. Later on, we can easily enable AQE for all data pipelines. After migrating some pipelines, we discovered a slight query plan change in the version upgrade. We found out that instead of SortMergeJoin it is using the BroadcastHashJoin to do the join between the tables of my query. Not only this, but the BroadcastExchange operation is occurring on the big table side, which seems strange from my perspective. You can see some snapshots and a better explanation of the problem here: https://stackoverflow.com/questions/72793116/migration-from-spark-2-4-0-to-spark-3-1-1-caused-sortmergejoin-to-change-to-broa I'm setting `spark.sql.adaptive.enabled` to false, `spark.sql.autoBroadcastJoinThreshold` to 10Mb, and `spark.sql.shuffle.partitions` to 200, but apparently only by changing the Spark 2 to 3 for this query, it has made the query plan changes and the performance has been degraded. In this specific scenario, we are facing a "Could not execute broadcast in 300 secs" error. Do you guys have any clue on why this is happening? My questions are: - Why Spark 3 has changed the join approach in this situation given that AQE is disabled and the spark.sql.autoBroadcastJoinThreshold is much smaller than the data set size?- Is this the expected behavior or could this represents a potential bug in Spark 3.x? Please, let me know your thoughts. I appreciate all the help in advance.
Re: Migration from Spark 2.4.0 to Spark 3.1.1 caused SortMergeJoin to change to BroadcastHashJoin
There are a few solutions : 1. Please make sure your driver has enough memory to broadcast the smaller dataframe . 2. Please change the config "spark.sql.autoBroadcastJoinThreshold": "2g" this an example 3. please use Hint in the Join , you need to scroll a bit down https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-hints.html i hope this helps . Best Tufan On Wed, 6 Jul 2022 at 17:11, igor cabral uchoa wrote: > Hi all, I hope everyone is doing well. > > I'm currently working on a Spark migration project that aims to migrate > all Spark SQL pipelines for Spark 3.x version and take advantage of all > performance improvements on it. My company is using Spark 2.4.0 but we are > targeting to use officially the 3.1.1 for all Spark SQL data pipelines but > *without > AQE enabled yet*. The primary goal is to keep everything the same but use > the newest version. Later on, we can easily enable AQE for all data > pipelines. > > After migrating some pipelines, we discovered a slight query plan change > in the version upgrade. We found out that instead of SortMergeJoin it is > using the BroadcastHashJoin to do the join between the tables of my query. > Not only this, but the BroadcastExchange operation is occurring on the big > table side, which seems strange from my perspective. > > You can see some snapshots and a better explanation of the problem here: > https://stackoverflow.com/questions/72793116/migration-from-spark-2-4-0-to-spark-3-1-1-caused-sortmergejoin-to-change-to-broa > > I'm setting `spark.sql.adaptive.enabled` to false, > `spark.sql.autoBroadcastJoinThreshold` > to 10Mb, and `spark.sql.shuffle.partitions` to 200, but apparently only > by changing the Spark 2 to 3 for this query, it has made the query plan > changes and the performance has been degraded. In this specific scenario, > we are facing a "Could not execute broadcast in 300 secs" error. > > Do you guys have any clue on why this is happening? My questions are: > > - Why Spark 3 has changed the join approach in this situation given that > AQE is disabled and the spark.sql.autoBroadcastJoinThreshold is much > smaller than the data set size? > - Is this the expected behavior or could this represents a potential bug > in Spark 3.x? > > Please, let me know your thoughts. I appreciate all the help in advance. >
Re: Migration from Spark 2.4.0 to Spark 3.1.1 caused SortMergeJoin to change to BroadcastHashJoin
Thanks for the suggestions! My concern is why changing to Spark 3 has triggered the BroadcastJoin? The “spark.sql.autoBroadcastJoinThreshold” is 10mb for both engines, but when I run the same code in Spark 2 it uses SortMergeJoin, but running using Spark 3, it uses BHJ. Before change the configs to support the BHJ, I would like to understand whether this is a new behavior in Spark 3 or a bug. I couldn’t find anything useful on the internet about it. Best regards,Igor Uchôa Sent from Yahoo Mail for iPhone On Wednesday, July 6, 2022, 12:47 PM, Tufan Rakshit wrote: There are a few solutions : 1. Please make sure your driver has enough memory to broadcast the smaller dataframe .2. Please change the config "spark.sql.autoBroadcastJoinThreshold": "2g" this an example 3. please use Hint in the Join , you need to scroll a bit down https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-hints.html i hope this helps .Best Tufan On Wed, 6 Jul 2022 at 17:11, igor cabral uchoa wrote: Hi all, I hope everyone is doing well. I'm currently working on a Spark migration project that aims to migrate all Spark SQL pipelines for Spark 3.x version and take advantage of all performance improvements on it. My company is using Spark 2.4.0 but we are targeting to use officially the 3.1.1 for all Spark SQL data pipelines but without AQE enabled yet. The primary goal is to keep everything the same but use the newest version. Later on, we can easily enable AQE for all data pipelines. After migrating some pipelines, we discovered a slight query plan change in the version upgrade. We found out that instead of SortMergeJoin it is using the BroadcastHashJoin to do the join between the tables of my query. Not only this, but the BroadcastExchange operation is occurring on the big table side, which seems strange from my perspective. You can see some snapshots and a better explanation of the problem here: https://stackoverflow.com/questions/72793116/migration-from-spark-2-4-0-to-spark-3-1-1-caused-sortmergejoin-to-change-to-broa I'm setting `spark.sql.adaptive.enabled` to false, `spark.sql.autoBroadcastJoinThreshold` to 10Mb, and `spark.sql.shuffle.partitions` to 200, but apparently only by changing the Spark 2 to 3 for this query, it has made the query plan changes and the performance has been degraded. In this specific scenario, we are facing a "Could not execute broadcast in 300 secs" error. Do you guys have any clue on why this is happening? My questions are: - Why Spark 3 has changed the join approach in this situation given that AQE is disabled and the spark.sql.autoBroadcastJoinThreshold is much smaller than the data set size?- Is this the expected behavior or could this represents a potential bug in Spark 3.x? Please, let me know your thoughts. I appreciate all the help in advance.