[ https://issues.apache.org/jira/browse/DRILL-2840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sean Hsuan-Yi Chu updated DRILL-2840: ------------------------------------- Fix Version/s: (was: 1.2.0) Future > Duplicate HashAgg operator seen in physical plan for aggregate & grouping > query > ------------------------------------------------------------------------------- > > Key: DRILL-2840 > URL: https://issues.apache.org/jira/browse/DRILL-2840 > Project: Apache Drill > Issue Type: Improvement > Components: Query Planning & Optimization > Affects Versions: 0.9.0 > Reporter: Khurram Faraaz > Assignee: Sean Hsuan-Yi Chu > Priority: Minor > Fix For: Future > > > Test was executed on 4 node cluster on CentOS. > {code} > Case 1) We need to know why there is an additional (HashAgg(group=[{0, 1}])) > operator right after the Scan is done, in the physical plan. > 0: jdbc:drill:> select max( distinct key1 ) maximum, count( distinct key1 ) > count_key1, sum( distinct key1 ) sum_key1, min( distinct key1 ) minimum, avg( > distinct key1 ) average, key2 from `twoKeyJsn.json` group by key2 order by > key2; > +------------+------------+------------+------------+------------+------------+ > | maximum | count_key1 | sum_key1 | minimum | average | key2 > | > +------------+------------+------------+------------+------------+------------+ > | 1.40095133379E9 | 156 | 1.1920469973999657E11 | 2.39793089027E7 | > 7.641326906410036E8 | 0 | > | 1.42435032101E9 | 180 | 1.2884789516328592E11 | 8659240.29442 | > 7.158216397960329E8 | 1 | > | 1.42931626355E9 | 1872281 | 1.3386731804571605E15 | 618.939623926 | > 7.149958689198686E8 | a | > | 1.42931347924E9 | 1870676 | 1.3371603128280032E15 | 108.851943741 | > 7.14800592314224E8 | b | > | 1.42931336919E9 | 1871847 | 1.337837189079748E15 | 3018.47312743 | > 7.147150323075273E8 | c | > | 1.42931380603E9 | 1870697 | 1.3362335178170852E15 | 3890.92180463 | > 7.142971404867198E8 | d | > | 1.42931281008E9 | 1871507 | 1.3367368067327902E15 | 1165.48741414 | > 7.142569099302275E8 | e | > | 1.42931480081E9 | 1870450 | 1.3358301916601862E15 | 354.577534881 | > 7.14175835579773E8 | f | > | 1.42931509068E9 | 1873604 | 1.3389171286500712E15 | 889.584888053 | > 7.146211945801094E8 | g | > | 1.42931553374E9 | 1872726 | 1.3393592500619982E15 | 2704.34813594 | > 7.151923186104097E8 | h | > | 1.42931450347E9 | 1872434 | 1.3381712881732795E15 | 122.281412463 | > 7.146694025921766E8 | i | > | 1.42931539751E9 | 1872250 | 1.3380216282921535E15 | 946.21365677 | > 7.146597026530397E8 | j | > | 1.42931334853E9 | 1873923 | 1.3390341356271005E15 | 1070.7862089 | > 7.145619834043877E8 | k | > | 1.42931539809E9 | 1870929 | 1.3371605654647945E15 | 55.1144569856 | > 7.147040670516062E8 | l | > | 1.42931543226E9 | 1874172 | 1.339322148620916E15 | 858.05505376 | > 7.146207224421856E8 | m | > | 1.42931595791E9 | 1874462 | 1.3391024723756562E15 | 237.230716926 | > 7.143929684227561E8 | n | > +------------+------------+------------+------------+------------+------------+ > 16 rows selected (103.566 seconds) > 0: jdbc:drill:> explain plan for select max( distinct key1 ) maximum, count( > distinct key1 ) count_key1, sum( distinct key1 ) sum_key1, min( distinct key1 > ) minimum, avg( distinct key1 ) average, key2 from `twoKeyJsn.json` group by > key2 order by key2; > +------------+------------+ > | text | json | > +------------+------------+ > | 00-00 Screen > 00-01 Project(maximum=[$0], count_key1=[$1], sum_key1=[$2], > minimum=[$3], average=[$4], key2=[$5]) > 00-02 SelectionVectorRemover > 00-03 Sort(sort0=[$5], dir0=[ASC]) > 00-04 Project(maximum=[$1], count_key1=[$2], sum_key1=[CASE(=($2, > 0), null, $3)], minimum=[$4], average=[CAST(/(CastHigh(CASE(=($2, 0), null, > $3)), $2)):ANY NOT NULL], key2=[$0]) > 00-05 HashAgg(group=[{0}], maximum=[MAX($1)], > count_key1=[COUNT($1)], agg#2=[$SUM0($1)], minimum=[MIN($1)]) > 00-06 HashAgg(group=[{0, 1}]) > 00-07 HashAgg(group=[{0, 1}]) > 00-08 Scan(groupscan=[EasyGroupScan > [selectionRoot=/tmp/twoKeyJsn.json, numFiles=1, columns=[`key2`, `key1`], > files=[maprfs:/tmp/twoKeyJsn.json]]]) > config options related to hashing and aggregation were set to, > 0: jdbc:drill:> select * from sys.options where name like '%agg%'; > +------------+------------+------------+------------+------------+------------+------------+ > | name | kind | type | num_val | string_val | bool_val > | float_val | > +------------+------------+------------+------------+------------+------------+------------+ > | planner.enable_multiphase_agg | BOOLEAN | SYSTEM | null | null > | true | null | > | planner.enable_streamagg | BOOLEAN | SYSTEM | null | null > | true | null | > | planner.enable_hashagg | BOOLEAN | SYSTEM | null | null > | true | null | > | planner.memory.hash_agg_table_factor | DOUBLE | SYSTEM | null > | null | null | 1.1 | > +------------+------------+------------+------------+------------+------------+------------+ > 4 rows selected (0.203 seconds) > 0: jdbc:drill:> select * from sys.options where name like '%hash%'; > +------------+------------+------------+------------+------------+------------+------------+ > | name | kind | type | num_val | string_val | bool_val > | float_val | > +------------+------------+------------+------------+------------+------------+------------+ > | planner.enable_hash_single_key | BOOLEAN | SYSTEM | null | > null | true | null | > | planner.join.hash_join_swap_margin_factor | DOUBLE | SYSTEM | null > | null | null | 10.0 | > | exec.max_hash_table_size | LONG | SYSTEM | 1073741824 | null > | null | null | > | planner.enable_hashjoin_swap | BOOLEAN | SYSTEM | null | null > | true | null | > | exec.min_hash_table_size | LONG | SYSTEM | 65536 | null > | null | null | > | planner.enable_hashagg | BOOLEAN | SYSTEM | null | null > | true | null | > | planner.memory.hash_agg_table_factor | DOUBLE | SYSTEM | null > | null | null | 1.1 | > | planner.memory.hash_join_table_factor | DOUBLE | SYSTEM | null > | null | null | 1.1 | > | planner.enable_hashjoin | BOOLEAN | SYSTEM | null | null > | true | null | > +------------+------------+------------+------------+------------+------------+------------+ > 9 rows selected (0.144 seconds) > {code} > Here is another (similar) aggregate and grouping query that hung forever. > {code} > case 2) This aggregate and grouping query hangs indefinitely forever... > input is from CSV file and it has two columns of data. It is running on 4 > node cluster on CentOS. Data file has close to 26 million records in it. > 0: jdbc:drill:> select count(*) from (select max( distinct cast(columns[0] as > double) ) maximum, count( distinct cast(columns[0] as double) ) count_key1, > sum( distinct cast(columns[0] as double)) sum_key1, min( distinct > cast(columns[0] as double)) minimum, avg( distinct cast(columns[0] as > double)) average, columns[1] from `tblfrmJsnToCSV/0_0_0.csv` where columns[0] > <> 'key1' group by columns[1]); > Physical plan for the query is > 00-00 Screen : rowType = RecordType(BIGINT EXPR$0): rowcount = > 22.877799999999997, cumulative cost = {2176138.623779999 rows, > 7886477.503779999 cpu, 0.0 io, 2.811224064E8 network, 6683978.048 memory}, id > = 74019 > 00-01 StreamAgg(group=[{}], EXPR$0=[COUNT()]) : rowType = > RecordType(BIGINT EXPR$0): rowcount = 22.877799999999997, cumulative cost = > {2176136.335999999 rows, 7886475.215999999 cpu, 0.0 io, 2.811224064E8 > network, 6683978.048 memory}, id = 74018 > 00-02 Project($f0=[0]) : rowType = RecordType(INTEGER $f0): rowcount = > 228.77799999999996, cumulative cost = {2175907.5579999993 rows, > 7883729.879999999 cpu, 0.0 io, 2.811224064E8 network, 6683978.048 memory}, id > = 74017 > 00-03 HashAgg(group=[{0}], maximum=[MAX($1)], > count_key1=[COUNT($1)], agg#2=[$SUM0($1)], minimum=[MIN($1)]) : rowType = > RecordType(ANY EXPR$5, DOUBLE maximum, BIGINT count_key1, DOUBLE $f3, DOUBLE > minimum): rowcount = 228.77799999999996, cumulative cost = > {2175678.7799999993 rows, 7883725.879999999 cpu, 0.0 io, 2.811224064E8 > network, 6683978.048 memory}, id = 74016 > 00-04 HashAgg(group=[{0, 1}]) : rowType = RecordType(ANY EXPR$5, > DOUBLE $f1): rowcount = 2287.7799999999997, cumulative cost = > {2173390.9999999995 rows, 7755610.199999999 cpu, 0.0 io, 2.811224064E8 > network, 6643713.12 memory}, id = 74015 > 00-05 Project(EXPR$5=[$0], $f1=[$1]) : rowType = RecordType(ANY > EXPR$5, DOUBLE $f1): rowcount = 22877.8, cumulative cost = > {2150513.1999999997 rows, 7389565.399999999 cpu, 0.0 io, 2.811224064E8 > network, 6039739.2 memory}, id = 74014 > 00-06 HashToRandomExchange(dist0=[[$0]], dist1=[[$1]]) : > rowType = RecordType(ANY EXPR$5, DOUBLE $f1, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): > rowcount = 22877.8, cumulative cost = {2127635.4 rows, 7389557.399999999 cpu, > 0.0 io, 2.811224064E8 network, 6039739.2 memory}, id = 74013 > 01-01 UnorderedMuxExchange : rowType = RecordType(ANY > EXPR$5, DOUBLE $f1, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 22877.8, > cumulative cost = {2104757.6 rows, 7115023.8 cpu, 0.0 io, 0.0 network, > 6039739.2 memory}, id = 74012 > 02-01 Project(EXPR$5=[$0], $f1=[$1], > E_X_P_R_H_A_S_H_F_I_E_L_D=[castINT(hash64($1, hash64($0)))]) : rowType = > RecordType(ANY EXPR$5, DOUBLE $f1, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = > 22877.8, cumulative cost = {2081879.8 rows, 7092146.0 cpu, 0.0 io, 0.0 > network, 6039739.2 memory}, id = 74011 > 02-02 HashAgg(group=[{0, 1}]) : rowType = RecordType(ANY > EXPR$5, DOUBLE $f1): rowcount = 22877.8, cumulative cost = {2059002.0 rows, > 7092134.0 cpu, 0.0 io, 0.0 network, 6039739.2 memory}, id = 74010 > 02-03 Project(EXPR$5=[$0], $f1=[CAST($1):DOUBLE]) : > rowType = RecordType(ANY EXPR$5, DOUBLE $f1): rowcount = 228778.0, cumulative > cost = {1830224.0 rows, 3431686.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = > 74009 > 02-04 SelectionVectorRemover : rowType = > RecordType(ANY ITEM, ANY ITEM1): rowcount = 228778.0, cumulative cost = > {1601446.0 rows, 3431678.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 74008 > 02-05 Filter(condition=[<>($1, 'key1')]) : rowType > = RecordType(ANY ITEM, ANY ITEM1): rowcount = 228778.0, cumulative cost = > {1372668.0 rows, 3202900.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 74007 > 02-06 Project(ITEM=[ITEM($0, 1)], > ITEM1=[ITEM($0, 0)]) : rowType = RecordType(ANY ITEM, ANY ITEM1): rowcount = > 457556.0, cumulative cost = {915112.0 rows, 457564.0 cpu, 0.0 io, 0.0 > network, 0.0 memory}, id = 74006 > 02-07 Scan(groupscan=[EasyGroupScan > [selectionRoot=/tmp/tblfrmJsnToCSV/0_0_0.csv, numFiles=1, > columns=[`columns`[1], `columns`[0]], > files=[maprfs:/tmp/tblfrmJsnToCSV/0_0_0.csv]]]) : rowType = RecordType(ANY > columns): rowcount = 457556.0, cumulative cost = {457556.0 rows, 457556.0 > cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 74005 > Sample data from the data file > [root@centos-01 logs]# hadoop fs -cat /tmp/tblfrmJsnToCSV/0_0_0.csv | more > key1,key2 > 1.2968152673E9,d > 4.67365529012E7,c > 9.39682065896E7,b > 1.01580172933E9,d > 4.98788888641E8,1 > 1.52391833107E8,1 > 7.31290386917E8,a > 6.92726688161E8,d > 1.12383522654E9,a > 1.26807240856E8,1 > 9.54482542201E8,1 > 1.32100398388E9,0 > 1.17405537683E9,a > 3.49879149097E7,0 > 6.50489380899E7,b > 1.00841781109E9,a > 1.19199684011E9,c > 1.88765338328E8,b > 8.24243579027E8,a > 7.03797780195E8,b > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)