Fwd: autoBroadcastJoinThreshold not working as expected
Dear all, I'm on a case that when certain table being exposed to broadcast join, the query will eventually failed with remote block error. Firstly. We set the spark.sql.autoBroadcastJoinThreshold to 10MB, namely 10485760 [image: image.png] Then we proceed to perform query. In the SQL plan, we found that one table that is 25MB in size is broadcast as well. [image: image.png] Also in desc extended the table is 24452111 bytes. It is a Hive table. We always ran into error when this table being broadcast. Below is the sample error Caused by: java.io.IOException: org.apache.spark.SparkException: corrupt remote block broadcast_477_piece0 of broadcast_477: 298778625 != -992055931 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1350) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:207) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96) Also attached the physical plan if you're interested. One thing to note that, if I turn down autoBroadcastJoinThreshold to 5MB, this query will get successfully executed and default.product NOT broadcasted. However, when I change to another query that querying even less columns than pervious one, even in 5MB this table still get broadcasted and failed with the same error. I even changed to 1MB and still the same. Appreciate if you can share any input. Thank you very much. Best Regards, MIke == Physical Plan == *(10) Project [product_key#445, store_key#446, fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, fiscal_week_start_date#220 AS period_start_date#472, fiscal_week_end_date#221 AS period_end_date#473, CASE WHEN isnull(fiscal_week_of_year#222) THEN 0 ELSE 1 END AS valid_date_flag#474, max_fiscal_date#451, vs_max_sales_year#270, vs_max_sales_month#267, vs_max_sales_week#266, bu_id#272 AS bu_code#475, bu_name#273, principle_supplier_code#154 AS supplier_code#476, mother_company_name#150 AS supplier_name#477, brand_type_name#117, brand_name#115, coalesce(h1_l1_hierarchy_code#125, -) AS Category_Code#478, coalesce(substring(h1_l1_hierarchy_code#125, 3, 2), -) AS Cate_No#479, h1_l1_hierarchy_name#126 AS Category_Name#480, coalesce(h1_l2_hierarchy_code#127, -) AS Department_Code#481, coalesce(substring(h1_l2_hierarchy_code#127, 7, 2), -) AS Dept_No#482, ... 48 more fields] +- *(10) BroadcastHashJoin [bu_key#156], [bu_key#271], LeftOuter, BuildRight :- *(10) Project [product_key#445, store_key#446, fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, max_fiscal_date#451, sales_amt_local#452, cogs_amt_local#453, gross_profit_amt_local#454, gross_margin_amt_local#455, adj_amt_local#456, compensation#457, qty#458, sales_amt_local_shrink#459, cogs_amt_local_shrink#460, qty_shrink#461, sales_amt_local_ly#462, cogs_amt_local_ly#463, gross_profit_amt_local_ly#464, gross_margin_amt_local_ly#465, adj_amt_local_ly#466, compensation_ly#467, qty_ly#468, ... 41 more fields] : +- *(10) BroadcastHashJoin [fiscal_week_of_year#449, fiscal_year#447], [fiscal_week_of_year#222, fiscal_year#234], LeftOuter, BuildRight : :- *(10) Project [product_key#445, store_key#446, fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, max_fiscal_date#451, sales_amt_local#452, cogs_amt_local#453, gross_profit_amt_local#454, gross_margin_amt_local#455, adj_amt_local#456, compensation#457, qty#458, sales_amt_local_shrink#459, cogs_amt_local_shrink#460, qty_shrink#461, sales_amt_local_ly#462, cogs_amt_local_ly#463, gross_profit_amt_local_ly#464, gross_margin_amt_local_ly#465, adj_amt_local_ly#466, compensation_ly#467, qty_ly#468, ... 35 more fields] : : +- *(10) BroadcastHashJoin [store_id#157], [loc_idnt#521], LeftOuter, BuildRight : : :- *(10) Project [product_key#445, store_key#446, fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, max_fiscal_date#451, sales_amt_local#452, cogs_amt_local#453, gross_profit_amt_local#454, gross_margin_amt_local#455, adj_amt_local#456, compensation#457, qty#458, sales_amt_local_shrink#459, cogs_amt_local_shrink#460, qty_shrink#461, sales_amt_local_ly#462, cogs_amt_local_ly#463, gross_profit_amt_local_ly#464, gross_margin_amt_local_ly#465, adj_amt_local_ly#466, compensation_ly#467, qty_ly#468, ... 33 more fields] : : : +- *(10) BroadcastHashJoin [cast(store_key#446 as double)], [cast(store_key#155 as double)], LeftOuter, BuildRight : : : :- *(10) Project [product_key#445, store_key#446, fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, max_fiscal_date#451, sales_amt_local#452, cogs_amt_local#453, gross_profit_amt_local#454, gros
Fwd: autoBroadcastJoinThreshold not working as expected
Dear all, I'm on a case that when certain table being exposed to broadcast join, the query will eventually failed with remote block error. Firstly. We set the spark.sql.autoBroadcastJoinThreshold to 10MB, namely 10485760 [image: image.png] Then we proceed to perform query. In the SQL plan, we found that one table that is 25MB in size is broadcast as well. [image: image.png] Also in desc extended the table is 24452111 bytes. It is a Hive table. We always ran into error when this table being broadcast. Below is the sample error Caused by: java.io.IOException: org.apache.spark.SparkException: corrupt remote block broadcast_477_piece0 of broadcast_477: 298778625 != -992055931 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1350) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:207) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96) Also attached the physical plan if you're interested. One thing to note that, if I turn down autoBroadcastJoinThreshold to 5MB, this query will get successfully executed and default.product NOT broadcasted. However, when I change to another query that querying even less columns than pervious one, even in 5MB this table still get broadcasted and failed with the same error. I even changed to 1MB and still the same. Appreciate if you can share any input. Thank you very much. Best Regards, MIke == Physical Plan == *(10) Project [product_key#445, store_key#446, fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, fiscal_week_start_date#220 AS period_start_date#472, fiscal_week_end_date#221 AS period_end_date#473, CASE WHEN isnull(fiscal_week_of_year#222) THEN 0 ELSE 1 END AS valid_date_flag#474, max_fiscal_date#451, vs_max_sales_year#270, vs_max_sales_month#267, vs_max_sales_week#266, bu_id#272 AS bu_code#475, bu_name#273, principle_supplier_code#154 AS supplier_code#476, mother_company_name#150 AS supplier_name#477, brand_type_name#117, brand_name#115, coalesce(h1_l1_hierarchy_code#125, -) AS Category_Code#478, coalesce(substring(h1_l1_hierarchy_code#125, 3, 2), -) AS Cate_No#479, h1_l1_hierarchy_name#126 AS Category_Name#480, coalesce(h1_l2_hierarchy_code#127, -) AS Department_Code#481, coalesce(substring(h1_l2_hierarchy_code#127, 7, 2), -) AS Dept_No#482, ... 48 more fields] +- *(10) BroadcastHashJoin [bu_key#156], [bu_key#271], LeftOuter, BuildRight :- *(10) Project [product_key#445, store_key#446, fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, max_fiscal_date#451, sales_amt_local#452, cogs_amt_local#453, gross_profit_amt_local#454, gross_margin_amt_local#455, adj_amt_local#456, compensation#457, qty#458, sales_amt_local_shrink#459, cogs_amt_local_shrink#460, qty_shrink#461, sales_amt_local_ly#462, cogs_amt_local_ly#463, gross_profit_amt_local_ly#464, gross_margin_amt_local_ly#465, adj_amt_local_ly#466, compensation_ly#467, qty_ly#468, ... 41 more fields] : +- *(10) BroadcastHashJoin [fiscal_week_of_year#449, fiscal_year#447], [fiscal_week_of_year#222, fiscal_year#234], LeftOuter, BuildRight : :- *(10) Project [product_key#445, store_key#446, fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, max_fiscal_date#451, sales_amt_local#452, cogs_amt_local#453, gross_profit_amt_local#454, gross_margin_amt_local#455, adj_amt_local#456, compensation#457, qty#458, sales_amt_local_shrink#459, cogs_amt_local_shrink#460, qty_shrink#461, sales_amt_local_ly#462, cogs_amt_local_ly#463, gross_profit_amt_local_ly#464, gross_margin_amt_local_ly#465, adj_amt_local_ly#466, compensation_ly#467, qty_ly#468, ... 35 more fields] : : +- *(10) BroadcastHashJoin [store_id#157], [loc_idnt#521], LeftOuter, BuildRight : : :- *(10) Project [product_key#445, store_key#446, fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, max_fiscal_date#451, sales_amt_local#452, cogs_amt_local#453, gross_profit_amt_local#454, gross_margin_amt_local#455, adj_amt_local#456, compensation#457, qty#458, sales_amt_local_shrink#459, cogs_amt_local_shrink#460, qty_shrink#461, sales_amt_local_ly#462, cogs_amt_local_ly#463, gross_profit_amt_local_ly#464, gross_margin_amt_local_ly#465, adj_amt_local_ly#466, compensation_ly#467, qty_ly#468, ... 33 more fields] : : : +- *(10) BroadcastHashJoin [cast(store_key#446 as double)], [cast(store_key#155 as double)], LeftOuter, BuildRight : : : :- *(10) Project [product_key#445, store_key#446, fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, max_fiscal_date#451, sales_amt_local#452, cogs_amt_local#453, gross_profit_amt_local#454, gros