[ 
https://issues.apache.org/jira/browse/HIVE-22666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Krisztian Kasa updated HIVE-22666:
----------------------------------
    Status: Open  (was: Patch Available)

> Introduce TopNKey operator for PTF Reduce Sink
> ----------------------------------------------
>
>                 Key: HIVE-22666
>                 URL: https://issues.apache.org/jira/browse/HIVE-22666
>             Project: Hive
>          Issue Type: Improvement
>            Reporter: Krisztian Kasa
>            Assignee: Krisztian Kasa
>            Priority: Major
>         Attachments: HIVE-22666.1.patch, HIVE-22666.2.patch, 
> HIVE-22666.3.patch, HIVE-22666.3.patch, HIVE-22666.4.patch, 
> HIVE-22666.4.patch, HIVE-22666.4.patch
>
>
> {code}
> EXPLAIN EXTENDED
> SELECT s_state, ranking
> FROM (
>  SELECT s_state AS s_state,
>  rank() OVER (PARTITION BY s_state ORDER BY ss_net_profit) AS ranking
>  FROM testtable_n1000) tmp1
>  WHERE ranking <= 3;
> {code}
> {code}
> STAGE DEPENDENCIES:
>   Stage-1 is a root stage
>   Stage-0 depends on stages: Stage-1
> STAGE PLANS:
>   Stage: Stage-1
>     Tez
> #### A masked pattern was here ####
>       Edges:
>         Reducer 2 <- Map 1 (SIMPLE_EDGE)
> #### A masked pattern was here ####
>       Vertices:
>         Map 1 
>             Map Operator Tree:
>                 TableScan
>                   alias: testtable_n1000
>                   Statistics: Num rows: 10 Data size: 940 Basic stats: 
> COMPLETE Column stats: COMPLETE
>                   GatherStats: false
>                   Reduce Output Operator
>                     key expressions: s_state (type: string), ss_net_profit 
> (type: double)
>                     null sort order: az
>                     sort order: ++
>                     Map-reduce partition columns: s_state (type: string)
>                     Statistics: Num rows: 10 Data size: 940 Basic stats: 
> COMPLETE Column stats: COMPLETE
>                     tag: -1
>                     TopN: 4
>                     TopN Hash Memory Usage: 0.1
>                     auto parallelism: true
>             Execution mode: vectorized, llap
>             LLAP IO: no inputs
>             Path -> Alias:
> #### A masked pattern was here ####
>             Path -> Partition:
> #### A masked pattern was here ####
>                 Partition
>                   base file name: testtable_n1000
>                   input format: org.apache.hadoop.mapred.TextInputFormat
>                   output format: 
> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>                   properties:
>                     COLUMN_STATS_ACCURATE 
> {"BASIC_STATS":"true","COLUMN_STATS":{"s_state":"true","ss_net_profit":"true"}}
>                     bucket_count -1
>                     bucketing_version 2
>                     column.name.delimiter ,
>                     columns s_state,ss_net_profit
>                     columns.comments 
>                     columns.types string:double
> #### A masked pattern was here ####
>                     name default.testtable_n1000
>                     numFiles 1
>                     numRows 10
>                     rawDataSize 80
>                     serialization.ddl struct testtable_n1000 { string 
> s_state, double ss_net_profit}
>                     serialization.format 1
>                     serialization.lib 
> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>                     totalSize 90
> #### A masked pattern was here ####
>                   serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>                 
>                     input format: org.apache.hadoop.mapred.TextInputFormat
>                     output format: 
> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>                     properties:
>                       COLUMN_STATS_ACCURATE 
> {"BASIC_STATS":"true","COLUMN_STATS":{"s_state":"true","ss_net_profit":"true"}}
>                       bucket_count -1
>                       bucketing_version 2
>                       column.name.delimiter ,
>                       columns s_state,ss_net_profit
>                       columns.comments 
>                       columns.types string:double
> #### A masked pattern was here ####
>                       name default.testtable_n1000
>                       numFiles 1
>                       numRows 10
>                       rawDataSize 80
>                       serialization.ddl struct testtable_n1000 { string 
> s_state, double ss_net_profit}
>                       serialization.format 1
>                       serialization.lib 
> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>                       totalSize 90
> #### A masked pattern was here ####
>                     serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>                     name: default.testtable_n1000
>                   name: default.testtable_n1000
>             Truncated Path -> Alias:
>               /testtable_n1000 [testtable_n1000]
>         Reducer 2 
>             Execution mode: vectorized, llap
>             Needs Tagging: false
>             Reduce Operator Tree:
>               Select Operator
>                 expressions: KEY.reducesinkkey0 (type: string), 
> KEY.reducesinkkey1 (type: double)
>                 outputColumnNames: _col0, _col1
>                 Statistics: Num rows: 10 Data size: 3620 Basic stats: 
> COMPLETE Column stats: COMPLETE
>                 PTF Operator
>                   Function definitions:
>                       Input definition
>                         input alias: ptf_0
>                         output shape: _col0: string, _col1: double
>                         type: WINDOWING
>                       Windowing table definition
>                         input alias: ptf_1
>                         name: windowingtablefunction
>                         order by: _col1 ASC NULLS LAST
>                         partition by: _col0
>                         raw input shape:
>                         window functions:
>                             window function definition
>                               alias: rank_window_0
>                               arguments: _col1
>                               name: rank
>                               window function: GenericUDAFRankEvaluator
>                               window frame: ROWS PRECEDING(MAX)~FOLLOWING(MAX)
>                               isPivotResult: true
>                   Statistics: Num rows: 10 Data size: 3620 Basic stats: 
> COMPLETE Column stats: COMPLETE
>                   Filter Operator
>                     isSamplingPred: false
>                     predicate: (rank_window_0 <= 3) (type: boolean)
>                     Statistics: Num rows: 3 Data size: 1086 Basic stats: 
> COMPLETE Column stats: COMPLETE
>                     Select Operator
>                       expressions: _col0 (type: string), rank_window_0 (type: 
> int)
>                       outputColumnNames: _col0, _col1
>                       Statistics: Num rows: 3 Data size: 270 Basic stats: 
> COMPLETE Column stats: COMPLETE
>                       File Output Operator
>                         compressed: false
>                         GlobalTableId: 0
> #### A masked pattern was here ####
>                         NumFilesPerFileSink: 1
>                         Statistics: Num rows: 3 Data size: 270 Basic stats: 
> COMPLETE Column stats: COMPLETE
> #### A masked pattern was here ####
>                         table:
>                             input format: 
> org.apache.hadoop.mapred.SequenceFileInputFormat
>                             output format: 
> org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
>                             properties:
>                               columns _col0,_col1
>                               columns.types string:int
>                               escape.delim \
>                               
> hive.serialization.extend.additional.nesting.levels true
>                               serialization.escape.crlf true
>                               serialization.format 1
>                               serialization.lib 
> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>                             serde: 
> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>                         TotalFiles: 1
>                         GatherStats: false
>                         MultiFileSpray: false
>   Stage: Stage-0
>     Fetch Operator
>       limit: -1
>       Processor Tree:
>         ListSink
> {code}
> In this case the topN value (3+1) will be pushed to the ReduceSink (Reduce 
> Output Operator) operator in Map 1
> https://github.com/apache/hive/blob/520aa19b20381bfd2ed25c835443c013f6e6ebb9/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java#L257
> ReduceSink operator uses PTFTopNHash to get the topN rows for each partition 
> key (s_state) value.
> The goals of this jira are:
> - implement supporting partitioning in TopNKeyOperator
> - enable push down of partitioned TopNKeyOperator



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to