I have attached the hive 10 and 11 query plans, for the sample query below,
for illustration.


On Fri, Aug 23, 2013 at 5:35 PM, Pala M Muthaia <mchett...@rocketfuelinc.com
> wrote:

> Hi,
>
> We are using DISTRIBUTE BY with custom reducer scripts in our query
> workload.
>
> After upgrade to Hive 0.11, queries with GROUP BY/DISTRIBUTE BY/SORT BY
> and custom reducer scripts produced incorrect results. Particularly, rows
> with same value on DISTRIBUTE BY column ends up in multiple reducers and
> thus produce multiple rows in final result, when we expect only one.
>
> I investigated a little bit and discovered the following behavior for Hive
> 0.11:
>
> - Hive 0.11 produces a different plan for these queries with incorrect
> results. The extra stage for the DISTRIBUTE BY + Transform is missing and
> the Transform operator for the custom reducer script is pushed into the
> reduce operator tree containing GROUP BY itself.
>
> - However, *if the SORT BY in the query has a DESC order in it*, the
> right plan is produced, and the results look correct too.
>
> Hive 0.10 produces the expected plan with right results in all cases.
>
>
> To illustrate, here is a simplified repro setup:
>
> Table:
>
> *CREATE TABLE test_cluster (grp STRING, val1 STRING, val2 INT, val3
> STRING, val4 INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES
> TERMINATED BY '\n' STORED AS TEXTFILE;*
>
> Query:
>
> *ADD FILE reducer.py;*
>
> *FROM(*
> *  SELECT grp, val2 *
> *  FROM test_cluster *
> *  GROUP BY grp, val2 *
> *  DISTRIBUTE BY grp *
> *  SORT BY grp, val2  -- add DESC here to get correct results*
> *) **a*
> *
> *
> *REDUCE a.**
> *USING 'reducer.py'*
> *AS grp, reducedValue*
>
>
> If i understand correctly, this is a bug. Is this a known issue? Any other
> insights? We have reverted to Hive 0.10 to avoid the incorrect results
> while we investigate this.
>
> I have the repro sample, with test data and scripts, if anybody is
> interested.
>
>
>
> Thanks,
> pala
>
ABSTRACT SYNTAX TREE:
  (TOK_QUERY (TOK_FROM (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF 
(TOK_TABNAME test_cluster))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR 
TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL grp)) (TOK_SELEXPR 
(TOK_TABLE_OR_COL val2))) (TOK_GROUPBY (TOK_TABLE_OR_COL grp) (TOK_TABLE_OR_COL 
val2)) (TOK_DISTRIBUTEBY (TOK_TABLE_OR_COL grp)) (TOK_SORTBY 
(TOK_TABSORTCOLNAMEASC (TOK_TABLE_OR_COL grp)) (TOK_TABSORTCOLNAMEASC 
(TOK_TABLE_OR_COL val2))))) a)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR 
TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TRANSFORM (TOK_EXPLIST 
(TOK_ALLCOLREF (TOK_TABNAME a))) TOK_SERDE TOK_RECORDWRITER 'reducer.py' 
TOK_SERDE TOK_RECORDREADER (TOK_ALIASLIST grp reducedValue))))))

STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-2 depends on stages: Stage-1
  Stage-0 is a root stage

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Alias -> Map Operator Tree:
        a:test_cluster
          TableScan
            alias: test_cluster
            Select Operator
              expressions:
                    expr: grp
                    type: string
                    expr: val2
                    type: int
              outputColumnNames: grp, val2
              Group By Operator
                bucketGroup: false
                keys:
                      expr: grp
                      type: string
                      expr: val2
                      type: int
                mode: hash
                outputColumnNames: _col0, _col1
                Reduce Output Operator
                  key expressions:
                        expr: _col0
                        type: string
                        expr: _col1
                        type: int
                  sort order: ++
                  Map-reduce partition columns:
                        expr: _col0
                        type: string
                        expr: _col1
                        type: int
                  tag: -1
      Reduce Operator Tree:
        Group By Operator
          bucketGroup: false
          keys:
                expr: KEY._col0
                type: string
                expr: KEY._col1
                type: int
          mode: mergepartial
          outputColumnNames: _col0, _col1
          Select Operator
            expressions:
                  expr: _col0
                  type: string
                  expr: _col1
                  type: int
            outputColumnNames: _col0, _col1
            File Output Operator
              compressed: true
              GlobalTableId: 0
              table:
                  input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                  output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat

  Stage: Stage-2
    Map Reduce
      Alias -> Map Operator Tree:
        
hdfs://master-hadoop2.inw-hercules.rfiserve.net:8020/srv/grid-tmp/hive-mchettiar/hive_2013-08-23_20-54-19_918_5275587411980709973/-mr-10002
            Reduce Output Operator
              key expressions:
                    expr: _col0
                    type: string
                    expr: _col1
                    type: int
              sort order: ++
              Map-reduce partition columns:
                    expr: _col0
                    type: string
              tag: -1
              value expressions:
                    expr: _col0
                    type: string
                    expr: _col1
                    type: int
      Reduce Operator Tree:
        Extract
          Select Operator
            expressions:
                  expr: _col0
                  type: string
                  expr: _col1
                  type: int
            outputColumnNames: _col0, _col1
            Transform Operator
              command: reducer.py
              output info:
                  input format: org.apache.hadoop.mapred.TextInputFormat
                  output format: 
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
              File Output Operator
                compressed: true
                GlobalTableId: 0
                table:
                    input format: org.apache.hadoop.mapred.TextInputFormat
                    output format: 
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat

  Stage: Stage-0
    Fetch Operator
      limit: -1
ABSTRACT SYNTAX TREE:
  (TOK_QUERY (TOK_FROM (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF 
(TOK_TABNAME test_cluster))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR 
TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL grp)) (TOK_SELEXPR 
(TOK_TABLE_OR_COL val2))) (TOK_GROUPBY (TOK_TABLE_OR_COL grp) (TOK_TABLE_OR_COL 
val2)) (TOK_DISTRIBUTEBY (TOK_TABLE_OR_COL grp)) (TOK_SORTBY 
(TOK_TABSORTCOLNAMEASC (TOK_TABLE_OR_COL grp)) (TOK_TABSORTCOLNAMEASC 
(TOK_TABLE_OR_COL val2))))) a)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR 
TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TRANSFORM (TOK_EXPLIST 
(TOK_ALLCOLREF (TOK_TABNAME a))) TOK_SERDE TOK_RECORDWRITER 'reducer.py' 
TOK_SERDE TOK_RECORDREADER (TOK_ALIASLIST grp reducedValue))))))

STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-0 is a root stage

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Alias -> Map Operator Tree:
        a:test_cluster
          TableScan
            alias: test_cluster
            Select Operator
              expressions:
                    expr: grp
                    type: string
                    expr: val2
                    type: int
              outputColumnNames: grp, val2
              Group By Operator
                bucketGroup: false
                keys:
                      expr: grp
                      type: string
                      expr: val2
                      type: int
                mode: hash
                outputColumnNames: _col0, _col1
                Reduce Output Operator
                  key expressions:
                        expr: _col0
                        type: string
                        expr: _col1
                        type: int
                  sort order: ++
                  Map-reduce partition columns:
                        expr: _col0
                        type: string
                        expr: _col1
                        type: int
                  tag: -1
      Reduce Operator Tree:
        Group By Operator
          bucketGroup: false
          keys:
                expr: KEY._col0
                type: string
                expr: KEY._col1
                type: int
          mode: mergepartial
          outputColumnNames: _col0, _col1
          Select Operator
            expressions:
                  expr: _col0
                  type: string
                  expr: _col1
                  type: int
            outputColumnNames: _col0, _col1
            Transform Operator
              command: reducer.py
              output info:
                  input format: org.apache.hadoop.mapred.TextInputFormat
                  output format: 
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
              File Output Operator
                compressed: true
                GlobalTableId: 0
                table:
                    input format: org.apache.hadoop.mapred.TextInputFormat
                    output format: 
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat

  Stage: Stage-0
    Fetch Operator
      limit: -1

Reply via email to