Mostafa Mokhtar created HIVE-9647:
-------------------------------------

             Summary: Discrepancy in CBO between partitioned and un-partitioned 
tables 
                 Key: HIVE-9647
                 URL: https://issues.apache.org/jira/browse/HIVE-9647
             Project: Hive
          Issue Type: Bug
          Components: CBO
    Affects Versions: 0.14.0
            Reporter: Mostafa Mokhtar
            Assignee: Gunther Hagleitner
             Fix For: 1.2.0


High-level summary
HiveRelMdSelectivity.computeInnerJoinSelectivity relies on per column number of 
distinct value to estimate join selectivity.
The way statistics are aggregated for partitioned tables results in discrepancy 
in number of distinct values which results in different plans between 
partitioned and un-partitioned schemas.

The table below summarizes the NDVs in computeInnerJoinSelectivity which are 
used to estimate selectivity of joins.
{code}
Column  Partitioned count distincts     Un-Partitioned count distincts 
sr_customer_sk  71,245  1,415,625
sr_item_sk      38,846  62,562
sr_ticket_number        71,245  34,931,085
ss_customer_sk  88,476  1,415,625
ss_item_sk      38,846  62,562
ss_ticket_number        100,756 56,256,175
{code}
        
The discrepancy is because NDV calculation for a partitioned table assumes that 
the NDV range is contained within each partition and is calculates as "select 
max(NUM_DISTINCTS) from PART_COL_STATS” .
This is problematic for columns like ticket number which are naturally 
increasing with the partitioned date column ss_sold_date_sk.
Suggestions
Use Hyper Log Log as suggested by Gopal, there is an HLL implementation for 
HBASE co-porccessors which we can use as a reference here 
Using the global stats from TAB_COL_STATS and the per partition stats from 
PART_COL_STATS extrapolate the NDV for the qualified partitions as in :
Max ( (NUM_DISTINCTS from TAB_COL_STATS) x (Number of qualified partitions) / 
(Number of Partitions), max(NUM_DISTINCTS) from PART_COL_STATS))
More details
While doing TPC-DS Partitioned vs. Un-Partitioned runs I noticed that many of 
the plans are different, then I dumped the CBO logical plan and I found that 
join estimates are drastically different

Unpartitioned schema :
{code}
2015-02-10 11:33:27,624 DEBUG [main]: parse.SemanticAnalyzer 
(SemanticAnalyzer.java:apply(12624)) - Plan After Join Reordering:
HiveProjectRel(store_sales_quantitycount=[$0], store_sales_quantityave=[$1], 
store_sales_quantitystdev=[$2], store_sales_quantitycov=[/($2, $1)], 
as_store_returns_quantitycount=[$3], as_store_returns_quantityave=[$4], 
as_store_returns_quantitystdev=[$5], store_returns_quantitycov=[/($5, $4)]): 
rowcount = 1.0, cumulative cost = {6.056835407771381E8 rows, 0.0 cpu, 0.0 io}, 
id = 2956
  HiveAggregateRel(group=[{}], agg#0=[count($0)], agg#1=[avg($0)], 
agg#2=[stddev_samp($0)], agg#3=[count($1)], agg#4=[avg($1)], 
agg#5=[stddev_samp($1)]): rowcount = 1.0, cumulative cost = 
{6.056835407771381E8 rows, 0.0 cpu, 0.0 io}, id = 2954
    HiveProjectRel($f0=[$4], $f1=[$8]): rowcount = 40.05611776795562, 
cumulative cost = {6.056835407771381E8 rows, 0.0 cpu, 0.0 io}, id = 2952
      HiveProjectRel(ss_sold_date_sk=[$0], ss_item_sk=[$1], 
ss_customer_sk=[$2], ss_ticket_number=[$3], ss_quantity=[$4], sr_item_sk=[$5], 
sr_customer_sk=[$6], sr_ticket_number=[$7], sr_return_quantity=[$8], 
d_date_sk=[$9], d_quarter_name=[$10]): rowcount = 40.05611776795562, cumulative 
cost = {6.056835407771381E8 rows, 0.0 cpu, 0.0 io}, id = 2982
        HiveJoinRel(condition=[=($9, $0)], joinType=[inner]): rowcount = 
40.05611776795562, cumulative cost = {6.056835407771381E8 rows, 0.0 cpu, 0.0 
io}, id = 2980
          HiveJoinRel(condition=[AND(AND(=($2, $6), =($1, $5)), =($3, $7))], 
joinType=[inner]): rowcount = 28880.460910696, cumulative cost = {6.05654559E8 
rows, 0.0 cpu, 0.0 io}, id = 2964
            HiveProjectRel(ss_sold_date_sk=[$0], ss_item_sk=[$2], 
ss_customer_sk=[$3], ss_ticket_number=[$9], ss_quantity=[$10]): rowcount = 
5.50076554E8, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2920
              HiveTableScanRel(table=[[tpcds_bin_orc_200.store_sales]]): 
rowcount = 5.50076554E8, cumulative cost = {0}, id = 2822
            HiveProjectRel(sr_item_sk=[$2], sr_customer_sk=[$3], 
sr_ticket_number=[$9], sr_return_quantity=[$10]): rowcount = 5.5578005E7, 
cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2923
              HiveTableScanRel(table=[[tpcds_bin_orc_200.store_returns]]): 
rowcount = 5.5578005E7, cumulative cost = {0}, id = 2823
          HiveProjectRel(d_date_sk=[$0], d_quarter_name=[$15]): rowcount = 
101.31622746185853, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2948
            HiveFilterRel(condition=[=($15, '2000Q1')]): rowcount = 
101.31622746185853, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2946
              HiveTableScanRel(table=[[tpcds_bin_orc_200.date_dim]]): rowcount 
= 73049.0, cumulative cost = {0}, id = 2821
{code}

Partitioned schema :
{code}
2015-02-10 11:32:16,880 DEBUG [main]: parse.SemanticAnalyzer 
(SemanticAnalyzer.java:apply(12624)) - Plan After Join Reordering:
HiveProjectRel(store_sales_quantitycount=[$0], store_sales_quantityave=[$1], 
store_sales_quantitystdev=[$2], store_sales_quantitycov=[/($2, $1)], 
as_store_returns_quantitycount=[$3], as_store_returns_quantityave=[$4], 
as_store_returns_quantitystdev=[$5], store_returns_quantitycov=[/($5, $4)]): 
rowcount = 1.0, cumulative cost = {6.064175958973647E8 rows, 0.0 cpu, 0.0 io}, 
id = 2791
  HiveAggregateRel(group=[{}], agg#0=[count($0)], agg#1=[avg($0)], 
agg#2=[stddev_samp($0)], agg#3=[count($1)], agg#4=[avg($1)], 
agg#5=[stddev_samp($1)]): rowcount = 1.0, cumulative cost = 
{6.064175958973647E8 rows, 0.0 cpu, 0.0 io}, id = 2789
    HiveProjectRel($f0=[$3], $f1=[$8]): rowcount = 100840.08570910375, 
cumulative cost = {6.064175958973647E8 rows, 0.0 cpu, 0.0 io}, id = 2787
      HiveProjectRel(ss_item_sk=[$4], ss_customer_sk=[$5], 
ss_ticket_number=[$6], ss_quantity=[$7], ss_sold_date_sk=[$8], sr_item_sk=[$0], 
sr_customer_sk=[$1], sr_ticket_number=[$2], sr_return_quantity=[$3], 
d_date_sk=[$9], d_quarter_name=[$10]): rowcount = 100840.08570910375, 
cumulative cost = {6.064175958973647E8 rows, 0.0 cpu, 0.0 io}, id = 2817
        HiveJoinRel(condition=[AND(AND(=($5, $1), =($4, $0)), =($6, $2))], 
joinType=[inner]): rowcount = 100840.08570910375, cumulative cost = 
{6.064175958973647E8 rows, 0.0 cpu, 0.0 io}, id = 2815
          HiveProjectRel(sr_item_sk=[$1], sr_customer_sk=[$2], 
sr_ticket_number=[$8], sr_return_quantity=[$9]): rowcount = 5.5578005E7, 
cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2758
            
HiveTableScanRel(table=[[tpcds_bin_partitioned_orc_200_orig.store_returns]]): 
rowcount = 5.5578005E7, cumulative cost = {0}, id = 2658
          HiveJoinRel(condition=[=($5, $4)], joinType=[inner]): rowcount = 
762935.5811373093, cumulative cost = {5.500766553162274E8 rows, 0.0 cpu, 0.0 
io}, id = 2801
            HiveProjectRel(ss_item_sk=[$1], ss_customer_sk=[$2], 
ss_ticket_number=[$8], ss_quantity=[$9], ss_sold_date_sk=[$22]): rowcount = 
5.50076554E8, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2755
              
HiveTableScanRel(table=[[tpcds_bin_partitioned_orc_200_orig.store_sales]]): 
rowcount = 5.50076554E8, cumulative cost = {0}, id = 2657
            HiveProjectRel(d_date_sk=[$0], d_quarter_name=[$15]): rowcount = 
101.31622746185853, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2783
              HiveFilterRel(condition=[=($15, '2000Q1')]): rowcount = 
101.31622746185853, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2781
                
HiveTableScanRel(table=[[tpcds_bin_partitioned_orc_200_orig.date_dim]]): 
rowcount = 73049.0, cumulative cost = {0}, id = 2656
{code}

This was puzzling knowing that the stats for both tables are “identical” in 
TAB_COL_STATS.

Column statistics from TAB_COL_STATS, notice how the column statistics are 
identical in both cases.
{code}
DB_NAME COLUMN_NAME     COLUMN_TYPE     NUM_NULLS       LONG_HIGH_VALUE 
LONG_LOW_VALUE  MAX_COL_LEN     NUM_DISTINCTS
tpcds_bin_orc_200       d_date_sk       int     0       2,488,070       
2,415,022       NULL    65,332
tpcds_bin_partitioned_orc_200   d_date_sk       int     0       2,488,070       
2,415,022       NULL    65,332
tpcds_bin_orc_200       d_quarter_name  string  0       NULL    NULL    6       
721
tpcds_bin_partitioned_orc_200   d_quarter_name  string  0       NULL    NULL    
6       721
tpcds_bin_orc_200       sr_customer_sk  int     1,009,571       1,600,000       
1       NULL    1,415,625
tpcds_bin_partitioned_orc_200   sr_customer_sk  int     1,009,571       
1,600,000       1       NULL    1,415,625
tpcds_bin_orc_200       sr_item_sk      int     0       48,000  1       NULL    
62,562
tpcds_bin_partitioned_orc_200   sr_item_sk      int     0       48,000  1       
NULL    62,562
tpcds_bin_orc_200       sr_ticket_number        int     0       48,000,000      
1       NULL    34,931,085
tpcds_bin_partitioned_orc_200   sr_ticket_number        int     0       
48,000,000      1       NULL    34,931,085
tpcds_bin_orc_200       ss_customer_sk  int     12,960,424      1,600,000       
1       NULL    1,415,625
tpcds_bin_partitioned_orc_200   ss_customer_sk  int     12,960,424      
1,600,000       1       NULL    1,415,625
tpcds_bin_orc_200       ss_item_sk      int     0       48,000  1       NULL    
62,562
tpcds_bin_partitioned_orc_200   ss_item_sk      int     0       48,000  1       
NULL    62,562
tpcds_bin_orc_200       ss_sold_date_sk int     0       2,452,642       
2,450,816       NULL    2,226
tpcds_bin_partitioned_orc_200   ss_sold_date_sk int     0       2,452,642       
2,450,816       NULL    2,226
tpcds_bin_orc_200       ss_ticket_number        int     0       48,000,000      
1       NULL    56,256,175
tpcds_bin_partitioned_orc_200   ss_ticket_number        int     0       
48,000,000      1       NULL    56,256,175
{code}

For partitioned tables we get the statistics using get_aggr_stats_for which 
eventually issues the query below

{code}
select 
    COLUMN_NAME,
    COLUMN_TYPE,
    …
    max(NUM_DISTINCTS),
    …
from
    PART_COL_STATS
Where
where
    DB_NAME = 
        and TABLE_NAME = 
        and COLUMN_NAME in 
        and PARTITION_NAME in (1 … N)
group by COLUMN_NAME , COLUMN_TYPE;
{code}
 …



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to