I have attached the hive 10 and 11 query plans, for the sample query below, for illustration.
On Fri, Aug 23, 2013 at 5:35 PM, Pala M Muthaia <mchett...@rocketfuelinc.com > wrote: > Hi, > > We are using DISTRIBUTE BY with custom reducer scripts in our query > workload. > > After upgrade to Hive 0.11, queries with GROUP BY/DISTRIBUTE BY/SORT BY > and custom reducer scripts produced incorrect results. Particularly, rows > with same value on DISTRIBUTE BY column ends up in multiple reducers and > thus produce multiple rows in final result, when we expect only one. > > I investigated a little bit and discovered the following behavior for Hive > 0.11: > > - Hive 0.11 produces a different plan for these queries with incorrect > results. The extra stage for the DISTRIBUTE BY + Transform is missing and > the Transform operator for the custom reducer script is pushed into the > reduce operator tree containing GROUP BY itself. > > - However, *if the SORT BY in the query has a DESC order in it*, the > right plan is produced, and the results look correct too. > > Hive 0.10 produces the expected plan with right results in all cases. > > > To illustrate, here is a simplified repro setup: > > Table: > > *CREATE TABLE test_cluster (grp STRING, val1 STRING, val2 INT, val3 > STRING, val4 INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES > TERMINATED BY '\n' STORED AS TEXTFILE;* > > Query: > > *ADD FILE reducer.py;* > > *FROM(* > * SELECT grp, val2 * > * FROM test_cluster * > * GROUP BY grp, val2 * > * DISTRIBUTE BY grp * > * SORT BY grp, val2 -- add DESC here to get correct results* > *) **a* > * > * > *REDUCE a.** > *USING 'reducer.py'* > *AS grp, reducedValue* > > > If i understand correctly, this is a bug. Is this a known issue? Any other > insights? We have reverted to Hive 0.10 to avoid the incorrect results > while we investigate this. > > I have the repro sample, with test data and scripts, if anybody is > interested. > > > > Thanks, > pala >
ABSTRACT SYNTAX TREE: (TOK_QUERY (TOK_FROM (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME test_cluster))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL grp)) (TOK_SELEXPR (TOK_TABLE_OR_COL val2))) (TOK_GROUPBY (TOK_TABLE_OR_COL grp) (TOK_TABLE_OR_COL val2)) (TOK_DISTRIBUTEBY (TOK_TABLE_OR_COL grp)) (TOK_SORTBY (TOK_TABSORTCOLNAMEASC (TOK_TABLE_OR_COL grp)) (TOK_TABSORTCOLNAMEASC (TOK_TABLE_OR_COL val2))))) a)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TRANSFORM (TOK_EXPLIST (TOK_ALLCOLREF (TOK_TABNAME a))) TOK_SERDE TOK_RECORDWRITER 'reducer.py' TOK_SERDE TOK_RECORDREADER (TOK_ALIASLIST grp reducedValue)))))) STAGE DEPENDENCIES: Stage-1 is a root stage Stage-2 depends on stages: Stage-1 Stage-0 is a root stage STAGE PLANS: Stage: Stage-1 Map Reduce Alias -> Map Operator Tree: a:test_cluster TableScan alias: test_cluster Select Operator expressions: expr: grp type: string expr: val2 type: int outputColumnNames: grp, val2 Group By Operator bucketGroup: false keys: expr: grp type: string expr: val2 type: int mode: hash outputColumnNames: _col0, _col1 Reduce Output Operator key expressions: expr: _col0 type: string expr: _col1 type: int sort order: ++ Map-reduce partition columns: expr: _col0 type: string expr: _col1 type: int tag: -1 Reduce Operator Tree: Group By Operator bucketGroup: false keys: expr: KEY._col0 type: string expr: KEY._col1 type: int mode: mergepartial outputColumnNames: _col0, _col1 Select Operator expressions: expr: _col0 type: string expr: _col1 type: int outputColumnNames: _col0, _col1 File Output Operator compressed: true GlobalTableId: 0 table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat Stage: Stage-2 Map Reduce Alias -> Map Operator Tree: hdfs://master-hadoop2.inw-hercules.rfiserve.net:8020/srv/grid-tmp/hive-mchettiar/hive_2013-08-23_20-54-19_918_5275587411980709973/-mr-10002 Reduce Output Operator key expressions: expr: _col0 type: string expr: _col1 type: int sort order: ++ Map-reduce partition columns: expr: _col0 type: string tag: -1 value expressions: expr: _col0 type: string expr: _col1 type: int Reduce Operator Tree: Extract Select Operator expressions: expr: _col0 type: string expr: _col1 type: int outputColumnNames: _col0, _col1 Transform Operator command: reducer.py output info: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat File Output Operator compressed: true GlobalTableId: 0 table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat Stage: Stage-0 Fetch Operator limit: -1
ABSTRACT SYNTAX TREE: (TOK_QUERY (TOK_FROM (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME test_cluster))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL grp)) (TOK_SELEXPR (TOK_TABLE_OR_COL val2))) (TOK_GROUPBY (TOK_TABLE_OR_COL grp) (TOK_TABLE_OR_COL val2)) (TOK_DISTRIBUTEBY (TOK_TABLE_OR_COL grp)) (TOK_SORTBY (TOK_TABSORTCOLNAMEASC (TOK_TABLE_OR_COL grp)) (TOK_TABSORTCOLNAMEASC (TOK_TABLE_OR_COL val2))))) a)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TRANSFORM (TOK_EXPLIST (TOK_ALLCOLREF (TOK_TABNAME a))) TOK_SERDE TOK_RECORDWRITER 'reducer.py' TOK_SERDE TOK_RECORDREADER (TOK_ALIASLIST grp reducedValue)))))) STAGE DEPENDENCIES: Stage-1 is a root stage Stage-0 is a root stage STAGE PLANS: Stage: Stage-1 Map Reduce Alias -> Map Operator Tree: a:test_cluster TableScan alias: test_cluster Select Operator expressions: expr: grp type: string expr: val2 type: int outputColumnNames: grp, val2 Group By Operator bucketGroup: false keys: expr: grp type: string expr: val2 type: int mode: hash outputColumnNames: _col0, _col1 Reduce Output Operator key expressions: expr: _col0 type: string expr: _col1 type: int sort order: ++ Map-reduce partition columns: expr: _col0 type: string expr: _col1 type: int tag: -1 Reduce Operator Tree: Group By Operator bucketGroup: false keys: expr: KEY._col0 type: string expr: KEY._col1 type: int mode: mergepartial outputColumnNames: _col0, _col1 Select Operator expressions: expr: _col0 type: string expr: _col1 type: int outputColumnNames: _col0, _col1 Transform Operator command: reducer.py output info: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat File Output Operator compressed: true GlobalTableId: 0 table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat Stage: Stage-0 Fetch Operator limit: -1