Author: namit Date: Mon Jan 14 10:35:22 2013 New Revision: 1432865 URL: http://svn.apache.org/viewvc?rev=1432865&view=rev Log: HIVE-3824 bug if different serdes are used for different partitions (Namit Jain via Ashutosh and namit)
Added: hive/trunk/ql/src/test/queries/clientpositive/partition_wise_fileformat8.q hive/trunk/ql/src/test/queries/clientpositive/partition_wise_fileformat9.q hive/trunk/ql/src/test/results/clientpositive/partition_wise_fileformat8.q.out hive/trunk/ql/src/test/results/clientpositive/partition_wise_fileformat9.q.out Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java?rev=1432865&r1=1432864&r2=1432865&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java Mon Jan 14 10:35:22 2013 @@ -226,22 +226,27 @@ public class CombineHiveInputFormat<K ex private static class CombinePathInputFormat { private final List<Operator<? extends OperatorDesc>> opList; private final String inputFormatClassName; + private final String deserializerClassName; public CombinePathInputFormat(List<Operator<? extends OperatorDesc>> opList, - String inputFormatClassName) { + String inputFormatClassName, + String deserializerClassName) { this.opList = opList; this.inputFormatClassName = inputFormatClassName; + this.deserializerClassName = deserializerClassName; } @Override public boolean equals(Object o) { if (o instanceof CombinePathInputFormat) { - CombinePathInputFormat mObj = (CombinePathInputFormat)o; + CombinePathInputFormat mObj = (CombinePathInputFormat) o; if (mObj == null) { return false; } - return opList.equals(mObj.opList) && - inputFormatClassName.equals(mObj.inputFormatClassName); + return (opList.equals(mObj.opList)) && + (inputFormatClassName.equals(mObj.inputFormatClassName)) && + (deserializerClassName == null ? (mObj.deserializerClassName == null) : + deserializerClassName.equals(mObj.deserializerClassName)); } return false; } @@ -296,6 +301,8 @@ public class CombineHiveInputFormat<K ex Class inputFormatClass = part.getInputFileFormatClass(); String inputFormatClassName = inputFormatClass.getName(); InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job); + String deserializerClassName = part.getDeserializerClass() == null ? null + : part.getDeserializerClass().getName(); // Since there is no easy way of knowing whether MAPREDUCE-1597 is present in the tree or not, // we use a configuration variable for the same @@ -342,12 +349,24 @@ public class CombineHiveInputFormat<K ex // Does a pool exist for this path already CombineFilter f = null; List<Operator<? extends OperatorDesc>> opList = null; - boolean done = false; if (!mrwork.isMapperCannotSpanPartns()) { opList = HiveFileFormatUtils.doGetWorksFromPath( pathToAliases, aliasToWork, filterPath); - f = poolMap.get(new CombinePathInputFormat(opList, inputFormatClassName)); + CombinePathInputFormat combinePathInputFormat = + new CombinePathInputFormat(opList, inputFormatClassName, deserializerClassName); + f = poolMap.get(combinePathInputFormat); + if (f == null) { + f = new CombineFilter(filterPath); + LOG.info("CombineHiveInputSplit creating pool for " + path + + "; using filter path " + filterPath); + combine.createPool(job, f); + poolMap.put(combinePathInputFormat, f); + } else { + LOG.info("CombineHiveInputSplit: pool is already created for " + path + + "; using filter path " + filterPath); + f.addPath(filterPath); + } } else { // In the case of tablesample, the input paths are pointing to files rather than directories. // We need to get the parent directory as the filtering path so that all files in the same @@ -361,23 +380,6 @@ public class CombineHiveInputFormat<K ex } else { inpDirs.add(path); } - done = true; - } - - if (!done) { - if (f == null) { - f = new CombineFilter(filterPath); - LOG.info("CombineHiveInputSplit creating pool for " + path + - "; using filter path " + filterPath); - combine.createPool(job, f); - if (!mrwork.isMapperCannotSpanPartns()) { - poolMap.put(new CombinePathInputFormat(opList, inputFormatClassName), f); - } - } else { - LOG.info("CombineHiveInputSplit: pool is already created for " + path + - "; using filter path " + filterPath); - f.addPath(filterPath); - } } } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java?rev=1432865&r1=1432864&r2=1432865&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java Mon Jan 14 10:35:22 2013 @@ -405,7 +405,7 @@ public final class HiveFileFormatUtils { } /** - * Get the list of operatators from the opeerator tree that are needed for the path + * Get the list of operators from the operator tree that are needed for the path * @param pathToAliases mapping from path to aliases * @param aliasToWork The operator tree to be invoked for a given alias * @param dir The path to look for Added: hive/trunk/ql/src/test/queries/clientpositive/partition_wise_fileformat8.q URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/partition_wise_fileformat8.q?rev=1432865&view=auto ============================================================================== --- hive/trunk/ql/src/test/queries/clientpositive/partition_wise_fileformat8.q (added) +++ hive/trunk/ql/src/test/queries/clientpositive/partition_wise_fileformat8.q Mon Jan 14 10:35:22 2013 @@ -0,0 +1,12 @@ +set hive.input.format = org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; + +-- This tests that a query can span multiple partitions which can not only have different file formats, but +-- also different serdes +create table partition_test_partitioned(key string, value string) partitioned by (dt string) stored as rcfile; +insert overwrite table partition_test_partitioned partition(dt='1') select * from src; +alter table partition_test_partitioned set fileformat sequencefile; +insert overwrite table partition_test_partitioned partition(dt='2') select * from src; +alter table partition_test_partitioned set serde 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'; +insert overwrite table partition_test_partitioned partition(dt='3') select * from src; + +select * from partition_test_partitioned where dt is not null order by key, value, dt limit 20; Added: hive/trunk/ql/src/test/queries/clientpositive/partition_wise_fileformat9.q URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/partition_wise_fileformat9.q?rev=1432865&view=auto ============================================================================== --- hive/trunk/ql/src/test/queries/clientpositive/partition_wise_fileformat9.q (added) +++ hive/trunk/ql/src/test/queries/clientpositive/partition_wise_fileformat9.q Mon Jan 14 10:35:22 2013 @@ -0,0 +1,10 @@ +set hive.input.format = org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; + +-- This tests that a query can span multiple partitions which can not only have different file formats, but +-- also different serdes +create table partition_test_partitioned(key string, value string) partitioned by (dt string) stored as rcfile; +insert overwrite table partition_test_partitioned partition(dt='1') select * from src; +alter table partition_test_partitioned set serde 'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe'; +insert overwrite table partition_test_partitioned partition(dt='2') select * from src; + +select * from partition_test_partitioned where dt is not null order by key, value, dt limit 20; Added: hive/trunk/ql/src/test/results/clientpositive/partition_wise_fileformat8.q.out URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/partition_wise_fileformat8.q.out?rev=1432865&view=auto ============================================================================== --- hive/trunk/ql/src/test/results/clientpositive/partition_wise_fileformat8.q.out (added) +++ hive/trunk/ql/src/test/results/clientpositive/partition_wise_fileformat8.q.out Mon Jan 14 10:35:22 2013 @@ -0,0 +1,107 @@ +PREHOOK: query: -- This tests that a query can span multiple partitions which can not only have different file formats, but +-- also different serdes +create table partition_test_partitioned(key string, value string) partitioned by (dt string) stored as rcfile +PREHOOK: type: CREATETABLE +POSTHOOK: query: -- This tests that a query can span multiple partitions which can not only have different file formats, but +-- also different serdes +create table partition_test_partitioned(key string, value string) partitioned by (dt string) stored as rcfile +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@partition_test_partitioned +PREHOOK: query: insert overwrite table partition_test_partitioned partition(dt='1') select * from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@partition_test_partitioned@dt=1 +POSTHOOK: query: insert overwrite table partition_test_partitioned partition(dt='1') select * from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@partition_test_partitioned@dt=1 +POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=1).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: alter table partition_test_partitioned set fileformat sequencefile +PREHOOK: type: ALTERTABLE_FILEFORMAT +PREHOOK: Input: default@partition_test_partitioned +PREHOOK: Output: default@partition_test_partitioned +POSTHOOK: query: alter table partition_test_partitioned set fileformat sequencefile +POSTHOOK: type: ALTERTABLE_FILEFORMAT +POSTHOOK: Input: default@partition_test_partitioned +POSTHOOK: Output: default@partition_test_partitioned +POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=1).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: insert overwrite table partition_test_partitioned partition(dt='2') select * from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@partition_test_partitioned@dt=2 +POSTHOOK: query: insert overwrite table partition_test_partitioned partition(dt='2') select * from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@partition_test_partitioned@dt=2 +POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=1).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=2).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: alter table partition_test_partitioned set serde 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' +PREHOOK: type: ALTERTABLE_SERIALIZER +PREHOOK: Input: default@partition_test_partitioned +PREHOOK: Output: default@partition_test_partitioned +POSTHOOK: query: alter table partition_test_partitioned set serde 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' +POSTHOOK: type: ALTERTABLE_SERIALIZER +POSTHOOK: Input: default@partition_test_partitioned +POSTHOOK: Output: default@partition_test_partitioned +POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=1).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=2).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: insert overwrite table partition_test_partitioned partition(dt='3') select * from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@partition_test_partitioned@dt=3 +POSTHOOK: query: insert overwrite table partition_test_partitioned partition(dt='3') select * from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@partition_test_partitioned@dt=3 +POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=1).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=2).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=3).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=3).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: select * from partition_test_partitioned where dt is not null order by key, value, dt limit 20 +PREHOOK: type: QUERY +PREHOOK: Input: default@partition_test_partitioned +PREHOOK: Input: default@partition_test_partitioned@dt=1 +PREHOOK: Input: default@partition_test_partitioned@dt=2 +PREHOOK: Input: default@partition_test_partitioned@dt=3 +#### A masked pattern was here #### +POSTHOOK: query: select * from partition_test_partitioned where dt is not null order by key, value, dt limit 20 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@partition_test_partitioned +POSTHOOK: Input: default@partition_test_partitioned@dt=1 +POSTHOOK: Input: default@partition_test_partitioned@dt=2 +POSTHOOK: Input: default@partition_test_partitioned@dt=3 +#### A masked pattern was here #### +POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=1).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=2).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=3).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=3).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +0 val_0 1 +0 val_0 1 +0 val_0 1 +0 val_0 2 +0 val_0 2 +0 val_0 2 +0 val_0 3 +0 val_0 3 +0 val_0 3 +10 val_10 1 +10 val_10 2 +10 val_10 3 +100 val_100 1 +100 val_100 1 +100 val_100 2 +100 val_100 2 +100 val_100 3 +100 val_100 3 +103 val_103 1 +103 val_103 1 Added: hive/trunk/ql/src/test/results/clientpositive/partition_wise_fileformat9.q.out URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/partition_wise_fileformat9.q.out?rev=1432865&view=auto ============================================================================== --- hive/trunk/ql/src/test/results/clientpositive/partition_wise_fileformat9.q.out (added) +++ hive/trunk/ql/src/test/results/clientpositive/partition_wise_fileformat9.q.out Mon Jan 14 10:35:22 2013 @@ -0,0 +1,77 @@ +PREHOOK: query: -- This tests that a query can span multiple partitions which can not only have different file formats, but +-- also different serdes +create table partition_test_partitioned(key string, value string) partitioned by (dt string) stored as rcfile +PREHOOK: type: CREATETABLE +POSTHOOK: query: -- This tests that a query can span multiple partitions which can not only have different file formats, but +-- also different serdes +create table partition_test_partitioned(key string, value string) partitioned by (dt string) stored as rcfile +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@partition_test_partitioned +PREHOOK: query: insert overwrite table partition_test_partitioned partition(dt='1') select * from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@partition_test_partitioned@dt=1 +POSTHOOK: query: insert overwrite table partition_test_partitioned partition(dt='1') select * from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@partition_test_partitioned@dt=1 +POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=1).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: alter table partition_test_partitioned set serde 'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe' +PREHOOK: type: ALTERTABLE_SERIALIZER +PREHOOK: Input: default@partition_test_partitioned +PREHOOK: Output: default@partition_test_partitioned +POSTHOOK: query: alter table partition_test_partitioned set serde 'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe' +POSTHOOK: type: ALTERTABLE_SERIALIZER +POSTHOOK: Input: default@partition_test_partitioned +POSTHOOK: Output: default@partition_test_partitioned +POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=1).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: insert overwrite table partition_test_partitioned partition(dt='2') select * from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@partition_test_partitioned@dt=2 +POSTHOOK: query: insert overwrite table partition_test_partitioned partition(dt='2') select * from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@partition_test_partitioned@dt=2 +POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=1).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=2).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: select * from partition_test_partitioned where dt is not null order by key, value, dt limit 20 +PREHOOK: type: QUERY +PREHOOK: Input: default@partition_test_partitioned +PREHOOK: Input: default@partition_test_partitioned@dt=1 +PREHOOK: Input: default@partition_test_partitioned@dt=2 +#### A masked pattern was here #### +POSTHOOK: query: select * from partition_test_partitioned where dt is not null order by key, value, dt limit 20 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@partition_test_partitioned +POSTHOOK: Input: default@partition_test_partitioned@dt=1 +POSTHOOK: Input: default@partition_test_partitioned@dt=2 +#### A masked pattern was here #### +POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=1).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=2).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +0 val_0 1 +0 val_0 1 +0 val_0 1 +0 val_0 2 +0 val_0 2 +0 val_0 2 +10 val_10 1 +10 val_10 2 +100 val_100 1 +100 val_100 1 +100 val_100 2 +100 val_100 2 +103 val_103 1 +103 val_103 1 +103 val_103 2 +103 val_103 2 +104 val_104 1 +104 val_104 1 +104 val_104 2 +104 val_104 2