Hi,

I am trying to migrate Hive SQL to Spark SQL. When I execute the Multi
insert with join statement, Spark SQL will scan the same table multiple
times, while Hive SQL will only scan once. In the actual production
environment, this table is relatively large, which causes the running time
of Spark SQL to be longer than that of Hive SQL.

Can someone help me optimize the Multi insert with join statement to scan
the table only once on Spark SQL?

The environment I use is Spark 2.4.5. In the following simple code, I will
demonstrate the different execution plans of Spark SQL and Hive SQL.


--- SQL start

create table if not exists join_psn
(
    id int,
    name string,
    cid int
) ;  
 insert overwrite table join_psn 
 select 1,'john',2 
 union all 
 select 2,'tom',2 
 union all 
 select 3,'jackson',1 ;


create table if not exists join_country_partition 
(
    id int,
    cname string,
    loc string
) 
partitioned by (dt string);  
 insert overwrite table join_country_partition partition (dt='20200801') 
 select 1,'USA','America'
 union all
 select 2,'UK','European'
 union all
 select 3,'CN','Asia'
 union all
 select 4,'FR','European'
 union all
 select 5,'JP','Asia';

create table if not exists join_result1 
(
    id int,
    name string,
    cname string
) 
create table if not exists join_result2 
(
    id int,
    name string,
    cname string
) 


-- On Spark SQL: Different predicate statements cause multiple scans of the
same table
-- On Hive SQL: Multi Table Inserts minimize the number of data scans
required. Hive can insert data into multiple tables by scanning the input
data just once (and applying different query operators) to the input data.
 from (
    select * from join_country_partition where dt='20200801'
 ) c
 join join_psn p
 on c.id=p.cid 
 insert overwrite table join_result1 
 select c.id,name,cname where c.id < 5
 insert overwrite table join_result2 
 select c.id,name,cname where name != 'FR';

-- Spark SQL Plan
Union
:- Execute InsertIntoHiveTable InsertIntoHiveTable `default`.`join_result1`,
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, true, false, [id, name,
cname]
:  +- *(2) Project [id#273, name#278, cname#274]
:     +- *(2) BroadcastHashJoin [id#273], [cid#279], Inner, BuildLeft
:        :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0,
int, false] as bigint)))
:        :  +- *(1) Filter (isnotnull(id#273) && (id#273 < 5))
:        :     +- Scan hive default.join_country_partition [id#273,
cname#274], HiveTableRelation `default`.`join_country_partition`,
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#273, cname#274,
loc#275], [dt#276], [isnotnull(dt#276), (dt#276 = 20200801)]
:        +- *(2) Filter ((cid#279 < 5) && isnotnull(cid#279))
:           +- Scan hive default.join_psn [name#278, cid#279],
HiveTableRelation `default`.`join_psn`,
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#277, name#278,
cid#279]
+- Execute InsertIntoHiveTable InsertIntoHiveTable `default`.`join_result2`,
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, true, false, [id, name,
cname]
   +- *(4) Project [id#280, name#285, cname#281]
      +- *(4) BroadcastHashJoin [id#280], [cid#286], Inner, BuildLeft
         :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0,
int, false] as bigint)))
         :  +- *(3) Filter isnotnull(id#280)
         :     +- Scan hive default.join_country_partition [id#280,
cname#281], HiveTableRelation `default`.`join_country_partition`,
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#280, cname#281,
loc#282], [dt#283], [isnotnull(dt#283), (dt#283 = 20200801)]
         +- *(4) Filter ((isnotnull(name#285) && NOT (name#285 = FR)) &&
isnotnull(cid#286))
            +- Scan hive default.join_psn [name#285, cid#286],
HiveTableRelation `default`.`join_psn`,
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#284, name#285,
cid#286]
Time taken: 0.393 seconds, Fetched 1 row(s)

-- Hive SQL Plan
STAGE DEPENDENCIES:
  Stage-7 is a root stage
  Stage-6 depends on stages: Stage-7
  Stage-0 depends on stages: Stage-6
  Stage-3 depends on stages: Stage-0
  Stage-1 depends on stages: Stage-6
  Stage-4 depends on stages: Stage-1

STAGE PLANS:
  Stage: Stage-7
    Map Reduce Local Work
      Alias -> Map Local Tables:
        p
          Fetch Operator
            limit: -1
      Alias -> Map Local Operator Tree:
        p
          TableScan
            alias: p
            Statistics: Num rows: 0 Data size: 29 Basic stats: PARTIAL
Column stats: NONE
            HashTable Sink Operator
              condition expressions:
                0 {_col0} {_col1}
                1 {name}
              keys:
                0 _col0 (type: int)
                1 cid (type: int)

  Stage: Stage-6
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: join_country_partition
            Statistics: Num rows: 0 Data size: 62 Basic stats: PARTIAL
Column stats: NONE
            Select Operator
              expressions: id (type: int), cname (type: string)
              outputColumnNames: _col0, _col1
              Statistics: Num rows: 0 Data size: 62 Basic stats: PARTIAL
Column stats: NONE
              Map Join Operator
                condition map:
                     Inner Join 0 to 1
                condition expressions:
                  0 {_col0} {_col1}
                  1 {name}
                keys:
                  0 _col0 (type: int)
                  1 cid (type: int)
                outputColumnNames: _col0, _col1, _col5
                Statistics: Num rows: 0 Data size: 68 Basic stats: PARTIAL
Column stats: NONE
                Filter Operator
                  predicate: (_col0 < 5) (type: boolean)
                  Statistics: Num rows: 0 Data size: 0 Basic stats: NONE
Column stats: NONE
                  Select Operator
                    expressions: _col0 (type: int), _col5 (type: string),
_col1 (type: string)
                    outputColumnNames: _col0, _col1, _col2
                    Statistics: Num rows: 0 Data size: 0 Basic stats: NONE
Column stats: NONE
                    File Output Operator
                      compressed: false
                      Statistics: Num rows: 0 Data size: 0 Basic stats: NONE
Column stats: NONE
                      table:
                          input format:
org.apache.hadoop.mapred.TextInputFormat
                          output format:
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                          serde:
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                          name: default.join_result1
                Filter Operator
                  predicate: (_col5 <> 'FR') (type: boolean)
                  Statistics: Num rows: 0 Data size: 0 Basic stats: NONE
Column stats: NONE
                  Select Operator
                    expressions: _col0 (type: int), _col5 (type: string),
_col1 (type: string)
                    outputColumnNames: _col0, _col1, _col2
                    Statistics: Num rows: 0 Data size: 0 Basic stats: NONE
Column stats: NONE
                    File Output Operator
                      compressed: false
                      Statistics: Num rows: 0 Data size: 0 Basic stats: NONE
Column stats: NONE
                      table:
                          input format:
org.apache.hadoop.mapred.TextInputFormat
                          output format:
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                          serde:
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                          name: default.join_result2
      Local Work:
        Map Reduce Local Work

  Stage: Stage-0
    Move Operator
      tables:
          replace: true
          table:
              input format: org.apache.hadoop.mapred.TextInputFormat
              output format:
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
              name: default.join_result1

  Stage: Stage-3
    Stats-Aggr Operator

  Stage: Stage-1
    Move Operator
      tables:
          replace: true
          table:
              input format: org.apache.hadoop.mapred.TextInputFormat
              output format:
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
              name: default.join_result2

  Stage: Stage-4
    Stats-Aggr Operator

--- SQL end

There is a discussion about Hive SQL executing Multi insert with join
statement:  Multi insert with join in Hive
<https://stackoverflow.com/questions/52409105/multi-insert-with-join-in-hive>  








--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to