GitHub user sujith71955 opened a pull request:

    https://github.com/apache/spark/pull/22758

    [SPARK-25332][SQL] Instead of broadcast hash join ,Sort merge join has 
selected when restart spark-shell/spark-JDBC for hive provider

    ## What changes were proposed in this pull request?
    Problem:
    Below steps in sequence to reproduce the issue.
    ```
    a.Create parquet table with stored as clause.
    b.Run insert statement  => This will not update Hivestats.
    c.Run (Select query which needs to calculate stats or explain cost select 
statement)  => this will evaluate  stats from HadoopFsRelation
    d.Since correct stats(sizeInBytes) is available , the plan  will select  
broadcast node if join with any table.
    e. Exit => (come out of shell)
    
    f.Now again run **step c** ( calculate stat) query. This gives wrong stats 
(sizeInBytes - default value will be assigned)  in plan. Because it is 
calculated from HiveCatalogTable which does not have any stats as it is not 
updated in **step b**
    g.Since in-correct stats(sizeInBytes - default value will be assigned) is 
available, the plan  will select  SortMergeJoin node if join with any table.
    h.Now Run insert statement => This will  update Hivestats .
    i.Now again run **step c** ( calculate stat) query. This gives correct stat 
(sizeInBytes)  in plan .because it can read the hive stats which is updated in 
**step i**.
    j.Now onward always stat is available so correct stat is plan will be 
displayed  which picks  Broadcast join node(based on threshold size) always.
    ```
    ## What changes were proposed in this pull request?
    Here the main problem is hive stats is not getting recorded after insert 
command, this is because of a condition  "if (table.stats.nonEmpty)" in 
updateTableStats()  which will be executed as part of 
InsertIntoHadoopFsRelationCommand command.
    So as part of fix we initialized a default value for the CatalogTable stats 
if there is no cache of a particular LogicalRelation.
    Also it is observed in Test Case “test statistics of LogicalRelation 
converted from Hive serde tables" in StatisticsSuite,  orc and parquet both are 
convertible but we are expecting that only orc should
    get/record stats Hivestats  not for parquet.But since both relations are 
convertible so we should have same expectation. Same is corrected in this PR.
    
    ## How was this patch tested?
    Manually tested, attaching the snapshot, also corrected a UT as mentioned 
above in description which will compliment this PR changes.
    Step 1: 
    Login to spark-shell => create 2 tables => Run insert commad => Explain the 
check the plan =>Plan contains Broadcast join => Exit
    
    
![step-1_spark-25332](https://user-images.githubusercontent.com/12999161/47113009-83db6400-d275-11e8-8439-0b9cba0cb413.PNG)
    
    Step 2:
    Relaunch Spark-shell => Run explain command of particular select statement 
=> verify the plan => Plan contains SortMergeJoin - This is incorrect result.
    
    
![step-2_spark-25332](https://user-images.githubusercontent.com/12999161/47113119-d288fe00-d275-11e8-9c8c-971f02fddda7.PNG)
    
    Step 3:
    Again Run insert command => Run explain command of particular select 
statement => verify the plan
    we can observer the node is been changed as BroadcastJoin - This makes the 
flow inconsistent.
    
    **After Fix**
    Login to spark-shell => create 2 tables => Run insert commad => Explain the 
check the plan =>Plan contains Broadcast join => Exit
    
    
![step-1-afterfix-spark-25332](https://user-images.githubusercontent.com/12999161/47113323-52af6380-d276-11e8-9eb9-71d1076d7e38.PNG)
    
    Step 2:
    Relaunch Spark-shell => Run explain command of particular select statement 
=> verify the plan => Plan still contains Broadcast join since after fix 
Hivestats is available for the table.
    
![step-2-afterfix-spark-25332](https://user-images.githubusercontent.com/12999161/47113407-94400e80-d276-11e8-99c1-66fa0c333beb.PNG)
    
    Step 3:
    Again Run insert command => Run explain command of particular select 
statement => verify the plan
    we can observer the plan still retains BroadcastJoin - Nowonwards the 
results are always consistent


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/sujith71955/spark master_statistics

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/22758.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #22758
    
----
commit 469f3693b641dae161bbb673599f55f20a60767b
Author: s71955 <sujithchacko.2010@...>
Date:   2018-10-17T19:43:39Z

    [SPARK-25332][SQL] Instead of broadcast hash join ,Sort merge join has 
selected when restart spark-shell/spark-JDBC for hive provider
    
    ## What changes were proposed in this pull request?
    Problaem:
    Below steps in sequence to reproduce the issue.
    a.Create parquet table with stored as clause.
    b.Run insert statement  => This will not update Hivestats.
    c.Run (Select query which needs to calculate stats or explain cost select 
statement)  -> this will evaluate  stats from HadoopFsRelation
    d.Since correct stats(sizeInBytes) is available , the plan  will select  
broadcast node if join with any table.
    e. Exit; (come out of shell)
    
    f.Now again run setp c ( calculate stat) query. This gives wrong stats 
(sizeInBytes - default value will be assigned)  in plan. Because it is 
calculated from HiveCatalogTable which does not have any stats as it is not 
updated in step b
    g.Since in-correct stats(sizeInBytes - default value will be assigned) is 
available, the plan  will select  SortMergeJoin node if join with any table.
    h.Now Run insert statement => This will  update Hivestats .
    i.Now again run setp c ( calculate stat) query. This gives correct stat 
(sizeInBytes)  in plan .because it can read the hive stats which is updated in 
step i.
    j.Now onward always stat is available so correct stat is plan will be 
displayed  which picks  Broadcast join node(based on threshold size) always.
    
    Solution:
    Here the main problem is hive stats is not getting recorded after insert 
command, this is because of a condition  "if (table.stats.nonEmpty)" in 
updateTableStats()  which will be executed as part of 
InsertIntoHadoopFsRelationCommand command.
    So as part of fix we initialized a default value for the CatalogTable stats 
if there is no cache of a particular LogicalRelation.
    
    Also it is observed in Test Case “test statistics of LogicalRelation 
converted from Hive serde tables" in StatisticsSuite,  orc and parquet both are 
convertible but we are expecting that only orc should
    get/record stats Hivestats  not for parquet.But since both relations are 
convertible so we should have same expectation. Same is corrected in this PR.
    
    How this patch tested:
    Manually testes, adding the test snapshots and the UT is corrected which 
will verify the PR scenario.

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to