Hi, I am trying to migrate Hive SQL to Spark SQL. When I execute the Multi insert with join statement, Spark SQL will scan the same table multiple times, while Hive SQL will only scan once. In the actual production environment, this table is relatively large, which causes the running time of Spark SQL to be longer than that of Hive SQL.
Can someone help me optimize the Multi insert with join statement to scan the table only once on Spark SQL? The environment I use is Spark 2.4.5. In the following simple code, I will demonstrate the different execution plans of Spark SQL and Hive SQL. --- SQL start create table if not exists join_psn ( id int, name string, cid int ) ; insert overwrite table join_psn select 1,'john',2 union all select 2,'tom',2 union all select 3,'jackson',1 ; create table if not exists join_country_partition ( id int, cname string, loc string ) partitioned by (dt string); insert overwrite table join_country_partition partition (dt='20200801') select 1,'USA','America' union all select 2,'UK','European' union all select 3,'CN','Asia' union all select 4,'FR','European' union all select 5,'JP','Asia'; create table if not exists join_result1 ( id int, name string, cname string ) create table if not exists join_result2 ( id int, name string, cname string ) -- On Spark SQL: Different predicate statements cause multiple scans of the same table -- On Hive SQL: Multi Table Inserts minimize the number of data scans required. Hive can insert data into multiple tables by scanning the input data just once (and applying different query operators) to the input data. from ( select * from join_country_partition where dt='20200801' ) c join join_psn p on c.id=p.cid insert overwrite table join_result1 select c.id,name,cname where c.id < 5 insert overwrite table join_result2 select c.id,name,cname where name != 'FR'; -- Spark SQL Plan Union :- Execute InsertIntoHiveTable InsertIntoHiveTable `default`.`join_result1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, true, false, [id, name, cname] : +- *(2) Project [id#273, name#278, cname#274] : +- *(2) BroadcastHashJoin [id#273], [cid#279], Inner, BuildLeft : :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) : : +- *(1) Filter (isnotnull(id#273) && (id#273 < 5)) : : +- Scan hive default.join_country_partition [id#273, cname#274], HiveTableRelation `default`.`join_country_partition`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#273, cname#274, loc#275], [dt#276], [isnotnull(dt#276), (dt#276 = 20200801)] : +- *(2) Filter ((cid#279 < 5) && isnotnull(cid#279)) : +- Scan hive default.join_psn [name#278, cid#279], HiveTableRelation `default`.`join_psn`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#277, name#278, cid#279] +- Execute InsertIntoHiveTable InsertIntoHiveTable `default`.`join_result2`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, true, false, [id, name, cname] +- *(4) Project [id#280, name#285, cname#281] +- *(4) BroadcastHashJoin [id#280], [cid#286], Inner, BuildLeft :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) : +- *(3) Filter isnotnull(id#280) : +- Scan hive default.join_country_partition [id#280, cname#281], HiveTableRelation `default`.`join_country_partition`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#280, cname#281, loc#282], [dt#283], [isnotnull(dt#283), (dt#283 = 20200801)] +- *(4) Filter ((isnotnull(name#285) && NOT (name#285 = FR)) && isnotnull(cid#286)) +- Scan hive default.join_psn [name#285, cid#286], HiveTableRelation `default`.`join_psn`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#284, name#285, cid#286] Time taken: 0.393 seconds, Fetched 1 row(s) -- Hive SQL Plan STAGE DEPENDENCIES: Stage-7 is a root stage Stage-6 depends on stages: Stage-7 Stage-0 depends on stages: Stage-6 Stage-3 depends on stages: Stage-0 Stage-1 depends on stages: Stage-6 Stage-4 depends on stages: Stage-1 STAGE PLANS: Stage: Stage-7 Map Reduce Local Work Alias -> Map Local Tables: p Fetch Operator limit: -1 Alias -> Map Local Operator Tree: p TableScan alias: p Statistics: Num rows: 0 Data size: 29 Basic stats: PARTIAL Column stats: NONE HashTable Sink Operator condition expressions: 0 {_col0} {_col1} 1 {name} keys: 0 _col0 (type: int) 1 cid (type: int) Stage: Stage-6 Map Reduce Map Operator Tree: TableScan alias: join_country_partition Statistics: Num rows: 0 Data size: 62 Basic stats: PARTIAL Column stats: NONE Select Operator expressions: id (type: int), cname (type: string) outputColumnNames: _col0, _col1 Statistics: Num rows: 0 Data size: 62 Basic stats: PARTIAL Column stats: NONE Map Join Operator condition map: Inner Join 0 to 1 condition expressions: 0 {_col0} {_col1} 1 {name} keys: 0 _col0 (type: int) 1 cid (type: int) outputColumnNames: _col0, _col1, _col5 Statistics: Num rows: 0 Data size: 68 Basic stats: PARTIAL Column stats: NONE Filter Operator predicate: (_col0 < 5) (type: boolean) Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE Select Operator expressions: _col0 (type: int), _col5 (type: string), _col1 (type: string) outputColumnNames: _col0, _col1, _col2 Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE File Output Operator compressed: false Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe name: default.join_result1 Filter Operator predicate: (_col5 <> 'FR') (type: boolean) Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE Select Operator expressions: _col0 (type: int), _col5 (type: string), _col1 (type: string) outputColumnNames: _col0, _col1, _col2 Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE File Output Operator compressed: false Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe name: default.join_result2 Local Work: Map Reduce Local Work Stage: Stage-0 Move Operator tables: replace: true table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe name: default.join_result1 Stage: Stage-3 Stats-Aggr Operator Stage: Stage-1 Move Operator tables: replace: true table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe name: default.join_result2 Stage: Stage-4 Stats-Aggr Operator --- SQL end There is a discussion about Hive SQL executing Multi insert with join statement: Multi insert with join in Hive <https://stackoverflow.com/questions/52409105/multi-insert-with-join-in-hive> -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org