I have attached the hive 10 and 11 query plans, for the sample query below,
for illustration.
On Fri, Aug 23, 2013 at 5:35 PM, Pala M Muthaia <[email protected]
> wrote:
> Hi,
>
> We are using DISTRIBUTE BY with custom reducer scripts in our query
> workload.
>
> After upgrade to Hive 0.11, queries with GROUP BY/DISTRIBUTE BY/SORT BY
> and custom reducer scripts produced incorrect results. Particularly, rows
> with same value on DISTRIBUTE BY column ends up in multiple reducers and
> thus produce multiple rows in final result, when we expect only one.
>
> I investigated a little bit and discovered the following behavior for Hive
> 0.11:
>
> - Hive 0.11 produces a different plan for these queries with incorrect
> results. The extra stage for the DISTRIBUTE BY + Transform is missing and
> the Transform operator for the custom reducer script is pushed into the
> reduce operator tree containing GROUP BY itself.
>
> - However, *if the SORT BY in the query has a DESC order in it*, the
> right plan is produced, and the results look correct too.
>
> Hive 0.10 produces the expected plan with right results in all cases.
>
>
> To illustrate, here is a simplified repro setup:
>
> Table:
>
> *CREATE TABLE test_cluster (grp STRING, val1 STRING, val2 INT, val3
> STRING, val4 INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES
> TERMINATED BY '\n' STORED AS TEXTFILE;*
>
> Query:
>
> *ADD FILE reducer.py;*
>
> *FROM(*
> * SELECT grp, val2 *
> * FROM test_cluster *
> * GROUP BY grp, val2 *
> * DISTRIBUTE BY grp *
> * SORT BY grp, val2 -- add DESC here to get correct results*
> *) **a*
> *
> *
> *REDUCE a.**
> *USING 'reducer.py'*
> *AS grp, reducedValue*
>
>
> If i understand correctly, this is a bug. Is this a known issue? Any other
> insights? We have reverted to Hive 0.10 to avoid the incorrect results
> while we investigate this.
>
> I have the repro sample, with test data and scripts, if anybody is
> interested.
>
>
>
> Thanks,
> pala
>
ABSTRACT SYNTAX TREE:
(TOK_QUERY (TOK_FROM (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF
(TOK_TABNAME test_cluster))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR
TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL grp)) (TOK_SELEXPR
(TOK_TABLE_OR_COL val2))) (TOK_GROUPBY (TOK_TABLE_OR_COL grp) (TOK_TABLE_OR_COL
val2)) (TOK_DISTRIBUTEBY (TOK_TABLE_OR_COL grp)) (TOK_SORTBY
(TOK_TABSORTCOLNAMEASC (TOK_TABLE_OR_COL grp)) (TOK_TABSORTCOLNAMEASC
(TOK_TABLE_OR_COL val2))))) a)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR
TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TRANSFORM (TOK_EXPLIST
(TOK_ALLCOLREF (TOK_TABNAME a))) TOK_SERDE TOK_RECORDWRITER 'reducer.py'
TOK_SERDE TOK_RECORDREADER (TOK_ALIASLIST grp reducedValue))))))
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-2 depends on stages: Stage-1
Stage-0 is a root stage
STAGE PLANS:
Stage: Stage-1
Map Reduce
Alias -> Map Operator Tree:
a:test_cluster
TableScan
alias: test_cluster
Select Operator
expressions:
expr: grp
type: string
expr: val2
type: int
outputColumnNames: grp, val2
Group By Operator
bucketGroup: false
keys:
expr: grp
type: string
expr: val2
type: int
mode: hash
outputColumnNames: _col0, _col1
Reduce Output Operator
key expressions:
expr: _col0
type: string
expr: _col1
type: int
sort order: ++
Map-reduce partition columns:
expr: _col0
type: string
expr: _col1
type: int
tag: -1
Reduce Operator Tree:
Group By Operator
bucketGroup: false
keys:
expr: KEY._col0
type: string
expr: KEY._col1
type: int
mode: mergepartial
outputColumnNames: _col0, _col1
Select Operator
expressions:
expr: _col0
type: string
expr: _col1
type: int
outputColumnNames: _col0, _col1
File Output Operator
compressed: true
GlobalTableId: 0
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Stage: Stage-2
Map Reduce
Alias -> Map Operator Tree:
hdfs://master-hadoop2.inw-hercules.rfiserve.net:8020/srv/grid-tmp/hive-mchettiar/hive_2013-08-23_20-54-19_918_5275587411980709973/-mr-10002
Reduce Output Operator
key expressions:
expr: _col0
type: string
expr: _col1
type: int
sort order: ++
Map-reduce partition columns:
expr: _col0
type: string
tag: -1
value expressions:
expr: _col0
type: string
expr: _col1
type: int
Reduce Operator Tree:
Extract
Select Operator
expressions:
expr: _col0
type: string
expr: _col1
type: int
outputColumnNames: _col0, _col1
Transform Operator
command: reducer.py
output info:
input format: org.apache.hadoop.mapred.TextInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
File Output Operator
compressed: true
GlobalTableId: 0
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Stage: Stage-0
Fetch Operator
limit: -1
ABSTRACT SYNTAX TREE:
(TOK_QUERY (TOK_FROM (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF
(TOK_TABNAME test_cluster))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR
TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL grp)) (TOK_SELEXPR
(TOK_TABLE_OR_COL val2))) (TOK_GROUPBY (TOK_TABLE_OR_COL grp) (TOK_TABLE_OR_COL
val2)) (TOK_DISTRIBUTEBY (TOK_TABLE_OR_COL grp)) (TOK_SORTBY
(TOK_TABSORTCOLNAMEASC (TOK_TABLE_OR_COL grp)) (TOK_TABSORTCOLNAMEASC
(TOK_TABLE_OR_COL val2))))) a)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR
TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TRANSFORM (TOK_EXPLIST
(TOK_ALLCOLREF (TOK_TABNAME a))) TOK_SERDE TOK_RECORDWRITER 'reducer.py'
TOK_SERDE TOK_RECORDREADER (TOK_ALIASLIST grp reducedValue))))))
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 is a root stage
STAGE PLANS:
Stage: Stage-1
Map Reduce
Alias -> Map Operator Tree:
a:test_cluster
TableScan
alias: test_cluster
Select Operator
expressions:
expr: grp
type: string
expr: val2
type: int
outputColumnNames: grp, val2
Group By Operator
bucketGroup: false
keys:
expr: grp
type: string
expr: val2
type: int
mode: hash
outputColumnNames: _col0, _col1
Reduce Output Operator
key expressions:
expr: _col0
type: string
expr: _col1
type: int
sort order: ++
Map-reduce partition columns:
expr: _col0
type: string
expr: _col1
type: int
tag: -1
Reduce Operator Tree:
Group By Operator
bucketGroup: false
keys:
expr: KEY._col0
type: string
expr: KEY._col1
type: int
mode: mergepartial
outputColumnNames: _col0, _col1
Select Operator
expressions:
expr: _col0
type: string
expr: _col1
type: int
outputColumnNames: _col0, _col1
Transform Operator
command: reducer.py
output info:
input format: org.apache.hadoop.mapred.TextInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
File Output Operator
compressed: true
GlobalTableId: 0
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Stage: Stage-0
Fetch Operator
limit: -1