Author: namit
Date: Mon Jan 14 10:35:22 2013
New Revision: 1432865

URL: http://svn.apache.org/viewvc?rev=1432865&view=rev
Log:
HIVE-3824 bug if different serdes are used for different partitions
(Namit Jain via Ashutosh and namit)


Added:
    hive/trunk/ql/src/test/queries/clientpositive/partition_wise_fileformat8.q
    hive/trunk/ql/src/test/queries/clientpositive/partition_wise_fileformat9.q
    
hive/trunk/ql/src/test/results/clientpositive/partition_wise_fileformat8.q.out
    
hive/trunk/ql/src/test/results/clientpositive/partition_wise_fileformat9.q.out
Modified:
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java?rev=1432865&r1=1432864&r2=1432865&view=diff
==============================================================================
--- 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java 
(original)
+++ 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java 
Mon Jan 14 10:35:22 2013
@@ -226,22 +226,27 @@ public class CombineHiveInputFormat<K ex
   private static class CombinePathInputFormat {
     private final List<Operator<? extends OperatorDesc>> opList;
     private final String inputFormatClassName;
+    private final String deserializerClassName;
 
     public CombinePathInputFormat(List<Operator<? extends OperatorDesc>> 
opList,
-                                  String inputFormatClassName) {
+                                  String inputFormatClassName,
+                                  String deserializerClassName) {
       this.opList = opList;
       this.inputFormatClassName = inputFormatClassName;
+      this.deserializerClassName = deserializerClassName;
     }
 
     @Override
     public boolean equals(Object o) {
       if (o instanceof CombinePathInputFormat) {
-        CombinePathInputFormat mObj = (CombinePathInputFormat)o;
+        CombinePathInputFormat mObj = (CombinePathInputFormat) o;
         if (mObj == null) {
           return false;
         }
-        return opList.equals(mObj.opList) &&
-          inputFormatClassName.equals(mObj.inputFormatClassName);
+        return (opList.equals(mObj.opList)) &&
+            (inputFormatClassName.equals(mObj.inputFormatClassName)) &&
+            (deserializerClassName == null ? (mObj.deserializerClassName == 
null) :
+                deserializerClassName.equals(mObj.deserializerClassName));
       }
       return false;
     }
@@ -296,6 +301,8 @@ public class CombineHiveInputFormat<K ex
       Class inputFormatClass = part.getInputFileFormatClass();
       String inputFormatClassName = inputFormatClass.getName();
       InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);
+      String deserializerClassName = part.getDeserializerClass() == null ? null
+          : part.getDeserializerClass().getName();
 
       // Since there is no easy way of knowing whether MAPREDUCE-1597 is 
present in the tree or not,
       // we use a configuration variable for the same
@@ -342,12 +349,24 @@ public class CombineHiveInputFormat<K ex
       // Does a pool exist for this path already
       CombineFilter f = null;
       List<Operator<? extends OperatorDesc>> opList = null;
-      boolean done = false;
 
       if (!mrwork.isMapperCannotSpanPartns()) {
         opList = HiveFileFormatUtils.doGetWorksFromPath(
                    pathToAliases, aliasToWork, filterPath);
-        f = poolMap.get(new CombinePathInputFormat(opList, 
inputFormatClassName));
+        CombinePathInputFormat combinePathInputFormat =
+            new CombinePathInputFormat(opList, inputFormatClassName, 
deserializerClassName);
+        f = poolMap.get(combinePathInputFormat);
+        if (f == null) {
+          f = new CombineFilter(filterPath);
+          LOG.info("CombineHiveInputSplit creating pool for " + path +
+                   "; using filter path " + filterPath);
+          combine.createPool(job, f);
+          poolMap.put(combinePathInputFormat, f);
+        } else {
+          LOG.info("CombineHiveInputSplit: pool is already created for " + 
path +
+                   "; using filter path " + filterPath);
+          f.addPath(filterPath);
+        }
       } else {
         // In the case of tablesample, the input paths are pointing to files 
rather than directories.
         // We need to get the parent directory as the filtering path so that 
all files in the same
@@ -361,23 +380,6 @@ public class CombineHiveInputFormat<K ex
         } else {
           inpDirs.add(path);
         }
-        done = true;
-      }
-
-      if (!done) {
-        if (f == null) {
-          f = new CombineFilter(filterPath);
-          LOG.info("CombineHiveInputSplit creating pool for " + path +
-                   "; using filter path " + filterPath);
-          combine.createPool(job, f);
-          if (!mrwork.isMapperCannotSpanPartns()) {
-            poolMap.put(new CombinePathInputFormat(opList, 
inputFormatClassName), f);
-          }
-        } else {
-          LOG.info("CombineHiveInputSplit: pool is already created for " + 
path +
-                   "; using filter path " + filterPath);
-          f.addPath(filterPath);
-        }
       }
     }
 

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java?rev=1432865&r1=1432864&r2=1432865&view=diff
==============================================================================
--- 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java 
(original)
+++ 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java 
Mon Jan 14 10:35:22 2013
@@ -405,7 +405,7 @@ public final class HiveFileFormatUtils {
   }
 
   /**
-   * Get the list of operatators from the opeerator tree that are needed for 
the path
+   * Get the list of operators from the operator tree that are needed for the 
path
    * @param pathToAliases  mapping from path to aliases
    * @param aliasToWork    The operator tree to be invoked for a given alias
    * @param dir            The path to look for

Added: 
hive/trunk/ql/src/test/queries/clientpositive/partition_wise_fileformat8.q
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/partition_wise_fileformat8.q?rev=1432865&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/partition_wise_fileformat8.q 
(added)
+++ hive/trunk/ql/src/test/queries/clientpositive/partition_wise_fileformat8.q 
Mon Jan 14 10:35:22 2013
@@ -0,0 +1,12 @@
+set hive.input.format = org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
+
+-- This tests that a query can span multiple partitions which can not only 
have different file formats, but
+-- also different serdes
+create table partition_test_partitioned(key string, value string) partitioned 
by (dt string) stored as rcfile;
+insert overwrite table partition_test_partitioned partition(dt='1') select * 
from src;
+alter table partition_test_partitioned set fileformat sequencefile;
+insert overwrite table partition_test_partitioned partition(dt='2') select * 
from src;
+alter table partition_test_partitioned set serde 
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe';
+insert overwrite table partition_test_partitioned partition(dt='3') select * 
from src;
+
+select * from partition_test_partitioned where dt is not null order by key, 
value, dt limit 20;

Added: 
hive/trunk/ql/src/test/queries/clientpositive/partition_wise_fileformat9.q
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/partition_wise_fileformat9.q?rev=1432865&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/partition_wise_fileformat9.q 
(added)
+++ hive/trunk/ql/src/test/queries/clientpositive/partition_wise_fileformat9.q 
Mon Jan 14 10:35:22 2013
@@ -0,0 +1,10 @@
+set hive.input.format = org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
+
+-- This tests that a query can span multiple partitions which can not only 
have different file formats, but
+-- also different serdes
+create table partition_test_partitioned(key string, value string) partitioned 
by (dt string) stored as rcfile;
+insert overwrite table partition_test_partitioned partition(dt='1') select * 
from src;
+alter table partition_test_partitioned set serde 
'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe';
+insert overwrite table partition_test_partitioned partition(dt='2') select * 
from src;
+
+select * from partition_test_partitioned where dt is not null order by key, 
value, dt limit 20;

Added: 
hive/trunk/ql/src/test/results/clientpositive/partition_wise_fileformat8.q.out
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/partition_wise_fileformat8.q.out?rev=1432865&view=auto
==============================================================================
--- 
hive/trunk/ql/src/test/results/clientpositive/partition_wise_fileformat8.q.out 
(added)
+++ 
hive/trunk/ql/src/test/results/clientpositive/partition_wise_fileformat8.q.out 
Mon Jan 14 10:35:22 2013
@@ -0,0 +1,107 @@
+PREHOOK: query: -- This tests that a query can span multiple partitions which 
can not only have different file formats, but
+-- also different serdes
+create table partition_test_partitioned(key string, value string) partitioned 
by (dt string) stored as rcfile
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: -- This tests that a query can span multiple partitions which 
can not only have different file formats, but
+-- also different serdes
+create table partition_test_partitioned(key string, value string) partitioned 
by (dt string) stored as rcfile
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@partition_test_partitioned
+PREHOOK: query: insert overwrite table partition_test_partitioned 
partition(dt='1') select * from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@partition_test_partitioned@dt=1
+POSTHOOK: query: insert overwrite table partition_test_partitioned 
partition(dt='1') select * from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@partition_test_partitioned@dt=1
+POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=1).key SIMPLE 
[(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=1).value SIMPLE 
[(src)src.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: alter table partition_test_partitioned set fileformat 
sequencefile
+PREHOOK: type: ALTERTABLE_FILEFORMAT
+PREHOOK: Input: default@partition_test_partitioned
+PREHOOK: Output: default@partition_test_partitioned
+POSTHOOK: query: alter table partition_test_partitioned set fileformat 
sequencefile
+POSTHOOK: type: ALTERTABLE_FILEFORMAT
+POSTHOOK: Input: default@partition_test_partitioned
+POSTHOOK: Output: default@partition_test_partitioned
+POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=1).key SIMPLE 
[(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=1).value SIMPLE 
[(src)src.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: insert overwrite table partition_test_partitioned 
partition(dt='2') select * from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@partition_test_partitioned@dt=2
+POSTHOOK: query: insert overwrite table partition_test_partitioned 
partition(dt='2') select * from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@partition_test_partitioned@dt=2
+POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=1).key SIMPLE 
[(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=1).value SIMPLE 
[(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=2).key SIMPLE 
[(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=2).value SIMPLE 
[(src)src.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: alter table partition_test_partitioned set serde 
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
+PREHOOK: type: ALTERTABLE_SERIALIZER
+PREHOOK: Input: default@partition_test_partitioned
+PREHOOK: Output: default@partition_test_partitioned
+POSTHOOK: query: alter table partition_test_partitioned set serde 
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
+POSTHOOK: type: ALTERTABLE_SERIALIZER
+POSTHOOK: Input: default@partition_test_partitioned
+POSTHOOK: Output: default@partition_test_partitioned
+POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=1).key SIMPLE 
[(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=1).value SIMPLE 
[(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=2).key SIMPLE 
[(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=2).value SIMPLE 
[(src)src.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: insert overwrite table partition_test_partitioned 
partition(dt='3') select * from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@partition_test_partitioned@dt=3
+POSTHOOK: query: insert overwrite table partition_test_partitioned 
partition(dt='3') select * from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@partition_test_partitioned@dt=3
+POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=1).key SIMPLE 
[(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=1).value SIMPLE 
[(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=2).key SIMPLE 
[(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=2).value SIMPLE 
[(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=3).key SIMPLE 
[(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=3).value SIMPLE 
[(src)src.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: select * from partition_test_partitioned where dt is not null 
order by key, value, dt limit 20
+PREHOOK: type: QUERY
+PREHOOK: Input: default@partition_test_partitioned
+PREHOOK: Input: default@partition_test_partitioned@dt=1
+PREHOOK: Input: default@partition_test_partitioned@dt=2
+PREHOOK: Input: default@partition_test_partitioned@dt=3
+#### A masked pattern was here ####
+POSTHOOK: query: select * from partition_test_partitioned where dt is not null 
order by key, value, dt limit 20
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@partition_test_partitioned
+POSTHOOK: Input: default@partition_test_partitioned@dt=1
+POSTHOOK: Input: default@partition_test_partitioned@dt=2
+POSTHOOK: Input: default@partition_test_partitioned@dt=3
+#### A masked pattern was here ####
+POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=1).key SIMPLE 
[(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=1).value SIMPLE 
[(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=2).key SIMPLE 
[(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=2).value SIMPLE 
[(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=3).key SIMPLE 
[(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=3).value SIMPLE 
[(src)src.FieldSchema(name:value, type:string, comment:default), ]
+0      val_0   1
+0      val_0   1
+0      val_0   1
+0      val_0   2
+0      val_0   2
+0      val_0   2
+0      val_0   3
+0      val_0   3
+0      val_0   3
+10     val_10  1
+10     val_10  2
+10     val_10  3
+100    val_100 1
+100    val_100 1
+100    val_100 2
+100    val_100 2
+100    val_100 3
+100    val_100 3
+103    val_103 1
+103    val_103 1

Added: 
hive/trunk/ql/src/test/results/clientpositive/partition_wise_fileformat9.q.out
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/partition_wise_fileformat9.q.out?rev=1432865&view=auto
==============================================================================
--- 
hive/trunk/ql/src/test/results/clientpositive/partition_wise_fileformat9.q.out 
(added)
+++ 
hive/trunk/ql/src/test/results/clientpositive/partition_wise_fileformat9.q.out 
Mon Jan 14 10:35:22 2013
@@ -0,0 +1,77 @@
+PREHOOK: query: -- This tests that a query can span multiple partitions which 
can not only have different file formats, but
+-- also different serdes
+create table partition_test_partitioned(key string, value string) partitioned 
by (dt string) stored as rcfile
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: -- This tests that a query can span multiple partitions which 
can not only have different file formats, but
+-- also different serdes
+create table partition_test_partitioned(key string, value string) partitioned 
by (dt string) stored as rcfile
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@partition_test_partitioned
+PREHOOK: query: insert overwrite table partition_test_partitioned 
partition(dt='1') select * from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@partition_test_partitioned@dt=1
+POSTHOOK: query: insert overwrite table partition_test_partitioned 
partition(dt='1') select * from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@partition_test_partitioned@dt=1
+POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=1).key SIMPLE 
[(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=1).value SIMPLE 
[(src)src.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: alter table partition_test_partitioned set serde 
'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe'
+PREHOOK: type: ALTERTABLE_SERIALIZER
+PREHOOK: Input: default@partition_test_partitioned
+PREHOOK: Output: default@partition_test_partitioned
+POSTHOOK: query: alter table partition_test_partitioned set serde 
'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe'
+POSTHOOK: type: ALTERTABLE_SERIALIZER
+POSTHOOK: Input: default@partition_test_partitioned
+POSTHOOK: Output: default@partition_test_partitioned
+POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=1).key SIMPLE 
[(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=1).value SIMPLE 
[(src)src.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: insert overwrite table partition_test_partitioned 
partition(dt='2') select * from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@partition_test_partitioned@dt=2
+POSTHOOK: query: insert overwrite table partition_test_partitioned 
partition(dt='2') select * from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@partition_test_partitioned@dt=2
+POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=1).key SIMPLE 
[(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=1).value SIMPLE 
[(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=2).key SIMPLE 
[(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=2).value SIMPLE 
[(src)src.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: select * from partition_test_partitioned where dt is not null 
order by key, value, dt limit 20
+PREHOOK: type: QUERY
+PREHOOK: Input: default@partition_test_partitioned
+PREHOOK: Input: default@partition_test_partitioned@dt=1
+PREHOOK: Input: default@partition_test_partitioned@dt=2
+#### A masked pattern was here ####
+POSTHOOK: query: select * from partition_test_partitioned where dt is not null 
order by key, value, dt limit 20
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@partition_test_partitioned
+POSTHOOK: Input: default@partition_test_partitioned@dt=1
+POSTHOOK: Input: default@partition_test_partitioned@dt=2
+#### A masked pattern was here ####
+POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=1).key SIMPLE 
[(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=1).value SIMPLE 
[(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=2).key SIMPLE 
[(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: partition_test_partitioned PARTITION(dt=2).value SIMPLE 
[(src)src.FieldSchema(name:value, type:string, comment:default), ]
+0      val_0   1
+0      val_0   1
+0      val_0   1
+0      val_0   2
+0      val_0   2
+0      val_0   2
+10     val_10  1
+10     val_10  2
+100    val_100 1
+100    val_100 1
+100    val_100 2
+100    val_100 2
+103    val_103 1
+103    val_103 1
+103    val_103 2
+103    val_103 2
+104    val_104 1
+104    val_104 1
+104    val_104 2
+104    val_104 2


Reply via email to