Fwd: autoBroadcastJoinThreshold not working as expected

2019-04-24 Thread Mike Chan
Dear all,

I'm on a case that when certain table being exposed to broadcast join, the
query will eventually failed with remote block error.

Firstly. We set the spark.sql.autoBroadcastJoinThreshold to 10MB, namely
10485760
[image: image.png]

Then we proceed to perform query. In the SQL plan, we found that one table
that is 25MB in size is broadcast as well.

[image: image.png]

Also in desc extended the table is 24452111 bytes. It is a Hive table. We
always ran into error when this table being broadcast. Below is the sample
error

Caused by: java.io.IOException: org.apache.spark.SparkException:
corrupt remote block broadcast_477_piece0 of broadcast_477: 298778625
!= -992055931
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1350)
at 
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:207)
at 
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
at 
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
at 
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)


Also attached the physical plan if you're interested. One thing to
note that, if I turn down autoBroadcastJoinThreshold

to 5MB, this query will get successfully executed and default.product
NOT broadcasted.


However, when I change to another query that querying even less
columns than pervious one, even in 5MB this table still get
broadcasted and failed with the same error. I even changed to 1MB and
still the same.


Appreciate if you can share any input. Thank you very much.


Best Regards,

MIke
== Physical Plan ==
*(10) Project [product_key#445, store_key#446, fiscal_year#447, 
fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, 
fiscal_week_start_date#220 AS period_start_date#472, fiscal_week_end_date#221 
AS period_end_date#473, CASE WHEN isnull(fiscal_week_of_year#222) THEN 0 ELSE 1 
END AS valid_date_flag#474, max_fiscal_date#451, vs_max_sales_year#270, 
vs_max_sales_month#267, vs_max_sales_week#266, bu_id#272 AS bu_code#475, 
bu_name#273, principle_supplier_code#154 AS supplier_code#476, 
mother_company_name#150 AS supplier_name#477, brand_type_name#117, 
brand_name#115, coalesce(h1_l1_hierarchy_code#125, -) AS Category_Code#478, 
coalesce(substring(h1_l1_hierarchy_code#125, 3, 2), -) AS Cate_No#479, 
h1_l1_hierarchy_name#126 AS Category_Name#480, 
coalesce(h1_l2_hierarchy_code#127, -) AS Department_Code#481, 
coalesce(substring(h1_l2_hierarchy_code#127, 7, 2), -) AS Dept_No#482, ... 48 
more fields]
+- *(10) BroadcastHashJoin [bu_key#156], [bu_key#271], LeftOuter, BuildRight
   :- *(10) Project [product_key#445, store_key#446, fiscal_year#447, 
fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, 
max_fiscal_date#451, sales_amt_local#452, cogs_amt_local#453, 
gross_profit_amt_local#454, gross_margin_amt_local#455, adj_amt_local#456, 
compensation#457, qty#458, sales_amt_local_shrink#459, 
cogs_amt_local_shrink#460, qty_shrink#461, sales_amt_local_ly#462, 
cogs_amt_local_ly#463, gross_profit_amt_local_ly#464, 
gross_margin_amt_local_ly#465, adj_amt_local_ly#466, compensation_ly#467, 
qty_ly#468, ... 41 more fields]
   :  +- *(10) BroadcastHashJoin [fiscal_week_of_year#449, fiscal_year#447], 
[fiscal_week_of_year#222, fiscal_year#234], LeftOuter, BuildRight
   : :- *(10) Project [product_key#445, store_key#446, fiscal_year#447, 
fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, 
max_fiscal_date#451, sales_amt_local#452, cogs_amt_local#453, 
gross_profit_amt_local#454, gross_margin_amt_local#455, adj_amt_local#456, 
compensation#457, qty#458, sales_amt_local_shrink#459, 
cogs_amt_local_shrink#460, qty_shrink#461, sales_amt_local_ly#462, 
cogs_amt_local_ly#463, gross_profit_amt_local_ly#464, 
gross_margin_amt_local_ly#465, adj_amt_local_ly#466, compensation_ly#467, 
qty_ly#468, ... 35 more fields]
   : :  +- *(10) BroadcastHashJoin [store_id#157], [loc_idnt#521], 
LeftOuter, BuildRight
   : : :- *(10) Project [product_key#445, store_key#446, 
fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, 
fiscal_year_week#450, max_fiscal_date#451, sales_amt_local#452, 
cogs_amt_local#453, gross_profit_amt_local#454, gross_margin_amt_local#455, 
adj_amt_local#456, compensation#457, qty#458, sales_amt_local_shrink#459, 
cogs_amt_local_shrink#460, qty_shrink#461, sales_amt_local_ly#462, 
cogs_amt_local_ly#463, gross_profit_amt_local_ly#464, 
gross_margin_amt_local_ly#465, adj_amt_local_ly#466, compensation_ly#467, 
qty_ly#468, ... 33 more fields]
   : : :  +- *(10) BroadcastHashJoin [cast(store_key#446 as double)], 
[cast(store_key#155 as double)], LeftOuter, BuildRight
   : : : :- *(10) Project [product_key#445, store_key#446, 
fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, 
fiscal_year_week#450, max_fiscal_date#451, sales_amt_local#452, 
cogs_amt_local#453, gross_profit_amt_local#454, gros

Fwd: autoBroadcastJoinThreshold not working as expected

2019-04-23 Thread Mike Chan
Dear all,

I'm on a case that when certain table being exposed to broadcast join, the
query will eventually failed with remote block error.

Firstly. We set the spark.sql.autoBroadcastJoinThreshold to 10MB, namely
10485760
[image: image.png]

Then we proceed to perform query. In the SQL plan, we found that one table
that is 25MB in size is broadcast as well.

[image: image.png]

Also in desc extended the table is 24452111 bytes. It is a Hive table. We
always ran into error when this table being broadcast. Below is the sample
error

Caused by: java.io.IOException: org.apache.spark.SparkException:
corrupt remote block broadcast_477_piece0 of broadcast_477: 298778625
!= -992055931
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1350)
at 
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:207)
at 
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
at 
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
at 
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)


Also attached the physical plan if you're interested. One thing to
note that, if I turn down autoBroadcastJoinThreshold

to 5MB, this query will get successfully executed and default.product
NOT broadcasted.


However, when I change to another query that querying even less
columns than pervious one, even in 5MB this table still get
broadcasted and failed with the same error. I even changed to 1MB and
still the same.


Appreciate if you can share any input. Thank you very much.


Best Regards,

MIke
== Physical Plan ==
*(10) Project [product_key#445, store_key#446, fiscal_year#447, 
fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, 
fiscal_week_start_date#220 AS period_start_date#472, fiscal_week_end_date#221 
AS period_end_date#473, CASE WHEN isnull(fiscal_week_of_year#222) THEN 0 ELSE 1 
END AS valid_date_flag#474, max_fiscal_date#451, vs_max_sales_year#270, 
vs_max_sales_month#267, vs_max_sales_week#266, bu_id#272 AS bu_code#475, 
bu_name#273, principle_supplier_code#154 AS supplier_code#476, 
mother_company_name#150 AS supplier_name#477, brand_type_name#117, 
brand_name#115, coalesce(h1_l1_hierarchy_code#125, -) AS Category_Code#478, 
coalesce(substring(h1_l1_hierarchy_code#125, 3, 2), -) AS Cate_No#479, 
h1_l1_hierarchy_name#126 AS Category_Name#480, 
coalesce(h1_l2_hierarchy_code#127, -) AS Department_Code#481, 
coalesce(substring(h1_l2_hierarchy_code#127, 7, 2), -) AS Dept_No#482, ... 48 
more fields]
+- *(10) BroadcastHashJoin [bu_key#156], [bu_key#271], LeftOuter, BuildRight
   :- *(10) Project [product_key#445, store_key#446, fiscal_year#447, 
fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, 
max_fiscal_date#451, sales_amt_local#452, cogs_amt_local#453, 
gross_profit_amt_local#454, gross_margin_amt_local#455, adj_amt_local#456, 
compensation#457, qty#458, sales_amt_local_shrink#459, 
cogs_amt_local_shrink#460, qty_shrink#461, sales_amt_local_ly#462, 
cogs_amt_local_ly#463, gross_profit_amt_local_ly#464, 
gross_margin_amt_local_ly#465, adj_amt_local_ly#466, compensation_ly#467, 
qty_ly#468, ... 41 more fields]
   :  +- *(10) BroadcastHashJoin [fiscal_week_of_year#449, fiscal_year#447], 
[fiscal_week_of_year#222, fiscal_year#234], LeftOuter, BuildRight
   : :- *(10) Project [product_key#445, store_key#446, fiscal_year#447, 
fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, 
max_fiscal_date#451, sales_amt_local#452, cogs_amt_local#453, 
gross_profit_amt_local#454, gross_margin_amt_local#455, adj_amt_local#456, 
compensation#457, qty#458, sales_amt_local_shrink#459, 
cogs_amt_local_shrink#460, qty_shrink#461, sales_amt_local_ly#462, 
cogs_amt_local_ly#463, gross_profit_amt_local_ly#464, 
gross_margin_amt_local_ly#465, adj_amt_local_ly#466, compensation_ly#467, 
qty_ly#468, ... 35 more fields]
   : :  +- *(10) BroadcastHashJoin [store_id#157], [loc_idnt#521], 
LeftOuter, BuildRight
   : : :- *(10) Project [product_key#445, store_key#446, 
fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, 
fiscal_year_week#450, max_fiscal_date#451, sales_amt_local#452, 
cogs_amt_local#453, gross_profit_amt_local#454, gross_margin_amt_local#455, 
adj_amt_local#456, compensation#457, qty#458, sales_amt_local_shrink#459, 
cogs_amt_local_shrink#460, qty_shrink#461, sales_amt_local_ly#462, 
cogs_amt_local_ly#463, gross_profit_amt_local_ly#464, 
gross_margin_amt_local_ly#465, adj_amt_local_ly#466, compensation_ly#467, 
qty_ly#468, ... 33 more fields]
   : : :  +- *(10) BroadcastHashJoin [cast(store_key#446 as double)], 
[cast(store_key#155 as double)], LeftOuter, BuildRight
   : : : :- *(10) Project [product_key#445, store_key#446, 
fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, 
fiscal_year_week#450, max_fiscal_date#451, sales_amt_local#452, 
cogs_amt_local#453, gross_profit_amt_local#454, gros