[ https://issues.apache.org/jira/browse/HIVE-22666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17018627#comment-17018627 ]
Jesus Camacho Rodriguez commented on HIVE-22666: ------------------------------------------------ [~kkasa], can you rebase on top of current master? Patch does not apply cleanly anymore. +1 (pending tests) Please, create a follow-up to enable the optimization for those tests where it was disabling vectorization. > Introduce TopNKey operator for PTF Reduce Sink > ---------------------------------------------- > > Key: HIVE-22666 > URL: https://issues.apache.org/jira/browse/HIVE-22666 > Project: Hive > Issue Type: Improvement > Reporter: Krisztian Kasa > Assignee: Krisztian Kasa > Priority: Major > Attachments: HIVE-22666.1.patch, HIVE-22666.2.patch, > HIVE-22666.3.patch, HIVE-22666.3.patch > > > {code} > EXPLAIN EXTENDED > SELECT s_state, ranking > FROM ( > SELECT s_state AS s_state, > rank() OVER (PARTITION BY s_state ORDER BY ss_net_profit) AS ranking > FROM testtable_n1000) tmp1 > WHERE ranking <= 3; > {code} > {code} > STAGE DEPENDENCIES: > Stage-1 is a root stage > Stage-0 depends on stages: Stage-1 > STAGE PLANS: > Stage: Stage-1 > Tez > #### A masked pattern was here #### > Edges: > Reducer 2 <- Map 1 (SIMPLE_EDGE) > #### A masked pattern was here #### > Vertices: > Map 1 > Map Operator Tree: > TableScan > alias: testtable_n1000 > Statistics: Num rows: 10 Data size: 940 Basic stats: > COMPLETE Column stats: COMPLETE > GatherStats: false > Reduce Output Operator > key expressions: s_state (type: string), ss_net_profit > (type: double) > null sort order: az > sort order: ++ > Map-reduce partition columns: s_state (type: string) > Statistics: Num rows: 10 Data size: 940 Basic stats: > COMPLETE Column stats: COMPLETE > tag: -1 > TopN: 4 > TopN Hash Memory Usage: 0.1 > auto parallelism: true > Execution mode: vectorized, llap > LLAP IO: no inputs > Path -> Alias: > #### A masked pattern was here #### > Path -> Partition: > #### A masked pattern was here #### > Partition > base file name: testtable_n1000 > input format: org.apache.hadoop.mapred.TextInputFormat > output format: > org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat > properties: > COLUMN_STATS_ACCURATE > {"BASIC_STATS":"true","COLUMN_STATS":{"s_state":"true","ss_net_profit":"true"}} > bucket_count -1 > bucketing_version 2 > column.name.delimiter , > columns s_state,ss_net_profit > columns.comments > columns.types string:double > #### A masked pattern was here #### > name default.testtable_n1000 > numFiles 1 > numRows 10 > rawDataSize 80 > serialization.ddl struct testtable_n1000 { string > s_state, double ss_net_profit} > serialization.format 1 > serialization.lib > org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe > totalSize 90 > #### A masked pattern was here #### > serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe > > input format: org.apache.hadoop.mapred.TextInputFormat > output format: > org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat > properties: > COLUMN_STATS_ACCURATE > {"BASIC_STATS":"true","COLUMN_STATS":{"s_state":"true","ss_net_profit":"true"}} > bucket_count -1 > bucketing_version 2 > column.name.delimiter , > columns s_state,ss_net_profit > columns.comments > columns.types string:double > #### A masked pattern was here #### > name default.testtable_n1000 > numFiles 1 > numRows 10 > rawDataSize 80 > serialization.ddl struct testtable_n1000 { string > s_state, double ss_net_profit} > serialization.format 1 > serialization.lib > org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe > totalSize 90 > #### A masked pattern was here #### > serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe > name: default.testtable_n1000 > name: default.testtable_n1000 > Truncated Path -> Alias: > /testtable_n1000 [testtable_n1000] > Reducer 2 > Execution mode: vectorized, llap > Needs Tagging: false > Reduce Operator Tree: > Select Operator > expressions: KEY.reducesinkkey0 (type: string), > KEY.reducesinkkey1 (type: double) > outputColumnNames: _col0, _col1 > Statistics: Num rows: 10 Data size: 3620 Basic stats: > COMPLETE Column stats: COMPLETE > PTF Operator > Function definitions: > Input definition > input alias: ptf_0 > output shape: _col0: string, _col1: double > type: WINDOWING > Windowing table definition > input alias: ptf_1 > name: windowingtablefunction > order by: _col1 ASC NULLS LAST > partition by: _col0 > raw input shape: > window functions: > window function definition > alias: rank_window_0 > arguments: _col1 > name: rank > window function: GenericUDAFRankEvaluator > window frame: ROWS PRECEDING(MAX)~FOLLOWING(MAX) > isPivotResult: true > Statistics: Num rows: 10 Data size: 3620 Basic stats: > COMPLETE Column stats: COMPLETE > Filter Operator > isSamplingPred: false > predicate: (rank_window_0 <= 3) (type: boolean) > Statistics: Num rows: 3 Data size: 1086 Basic stats: > COMPLETE Column stats: COMPLETE > Select Operator > expressions: _col0 (type: string), rank_window_0 (type: > int) > outputColumnNames: _col0, _col1 > Statistics: Num rows: 3 Data size: 270 Basic stats: > COMPLETE Column stats: COMPLETE > File Output Operator > compressed: false > GlobalTableId: 0 > #### A masked pattern was here #### > NumFilesPerFileSink: 1 > Statistics: Num rows: 3 Data size: 270 Basic stats: > COMPLETE Column stats: COMPLETE > #### A masked pattern was here #### > table: > input format: > org.apache.hadoop.mapred.SequenceFileInputFormat > output format: > org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat > properties: > columns _col0,_col1 > columns.types string:int > escape.delim \ > > hive.serialization.extend.additional.nesting.levels true > serialization.escape.crlf true > serialization.format 1 > serialization.lib > org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe > serde: > org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe > TotalFiles: 1 > GatherStats: false > MultiFileSpray: false > Stage: Stage-0 > Fetch Operator > limit: -1 > Processor Tree: > ListSink > {code} > In this case the topN value (3+1) will be pushed to the ReduceSink (Reduce > Output Operator) operator in Map 1 > https://github.com/apache/hive/blob/520aa19b20381bfd2ed25c835443c013f6e6ebb9/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java#L257 > ReduceSink operator uses PTFTopNHash to get the topN rows for each partition > key (s_state) value. > The goals of this jira are: > - implement supporting partitioning in TopNKeyOperator > - enable push down of partitioned TopNKeyOperator -- This message was sent by Atlassian Jira (v8.3.4#803005)