HIVE-16672: Parquet vectorization doesn't work for tables with partition info 
(Colin Ma, reviewed by Ferdinand Xu)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1241baf9
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1241baf9
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1241baf9

Branch: refs/heads/branch-2.3
Commit: 1241baf9b0ebe8ce2ef484d5eafbf92fbd69ecff
Parents: 1267d30
Author: Ferdinand Xu <cheng.a...@intel.com>
Authored: Thu Jun 1 14:14:50 2017 +0800
Committer: Ferdinand Xu <cheng.a...@intel.com>
Committed: Thu Jun 1 14:14:50 2017 +0800

----------------------------------------------------------------------
 .../vector/VectorizedParquetRecordReader.java   |   32 +-
 .../vector_partitioned_date_time.q              |  113 +
 .../llap/vector_partitioned_date_time.q.out     | 2434 ++++++++++++++++++
 3 files changed, 2573 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/1241baf9/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
index 0ac8356..31b6b74 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.serde2.SerDeStats;
 import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -72,6 +73,7 @@ public class VectorizedParquetRecordReader extends 
ParquetRecordReaderBase
   private List<TypeInfo> columnTypesList;
   private VectorizedRowBatchCtx rbCtx;
   private List<Integer> indexColumnsWanted;
+  private Object[] partitionValues;
 
   /**
    * For each request column, the reader to read this column. This is NULL if 
this column
@@ -120,12 +122,23 @@ public class VectorizedParquetRecordReader extends 
ParquetRecordReaderBase
       initialize(getSplit(oldInputSplit, conf), conf);
       colsToInclude = ColumnProjectionUtils.getReadColumnIDs(conf);
       rbCtx = Utilities.getVectorizedRowBatchCtx(conf);
+      initPartitionValues((FileSplit) oldInputSplit, conf);
     } catch (Throwable e) {
       LOG.error("Failed to create the vectorized reader due to exception " + 
e);
       throw new RuntimeException(e);
     }
   }
 
+  private void initPartitionValues(FileSplit fileSplit, JobConf conf) throws 
IOException {
+    int partitionColumnCount = rbCtx.getPartitionColumnCount();
+    if (partitionColumnCount > 0) {
+      partitionValues = new Object[partitionColumnCount];
+      rbCtx.getPartitionValues(rbCtx, conf, fileSplit, partitionValues);
+    } else {
+      partitionValues = null;
+    }
+   }
+
   public void initialize(
     InputSplit oldSplit,
     JobConf configuration) throws IOException, InterruptedException {
@@ -262,16 +275,23 @@ public class VectorizedParquetRecordReader extends 
ParquetRecordReaderBase
     if (rowsReturned >= totalRowCount) {
       return false;
     }
+
+    // Add partition cols if necessary (see VectorizedOrcInputFormat for 
details).
+    if (partitionValues != null) {
+      rbCtx.addPartitionColsToBatch(columnarBatch, partitionValues);
+    }
     checkEndOfRowGroup();
 
     int num = (int) Math.min(VectorizedRowBatch.DEFAULT_SIZE, 
totalCountLoadedSoFar - rowsReturned);
-    for (int i = 0; i < columnReaders.length; ++i) {
-      if (columnReaders[i] == null) {
-        continue;
+    if (colsToInclude.size() > 0) {
+      for (int i = 0; i < columnReaders.length; ++i) {
+        if (columnReaders[i] == null) {
+          continue;
+        }
+        columnarBatch.cols[colsToInclude.get(i)].isRepeating = true;
+        columnReaders[i].readBatch(num, 
columnarBatch.cols[colsToInclude.get(i)],
+            columnTypesList.get(colsToInclude.get(i)));
       }
-      columnarBatch.cols[colsToInclude.get(i)].isRepeating = true;
-      columnReaders[i].readBatch(num, columnarBatch.cols[colsToInclude.get(i)],
-        columnTypesList.get(colsToInclude.get(i)));
     }
     rowsReturned += num;
     columnarBatch.size = num;

http://git-wip-us.apache.org/repos/asf/hive/blob/1241baf9/ql/src/test/queries/clientpositive/vector_partitioned_date_time.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/vector_partitioned_date_time.q 
b/ql/src/test/queries/clientpositive/vector_partitioned_date_time.q
index bf4c461..107fe7c 100644
--- a/ql/src/test/queries/clientpositive/vector_partitioned_date_time.q
+++ b/ql/src/test/queries/clientpositive/vector_partitioned_date_time.q
@@ -126,3 +126,116 @@ explain vectorization expression
 select fl_time, count(*) from flights_tiny_orc_partitioned_timestamp group by 
fl_time;
 
 select fl_time, count(*) from flights_tiny_orc_partitioned_timestamp group by 
fl_time;
+
+-- test for Parquet file format
+CREATE TABLE flights_tiny_parquet STORED AS PARQUET AS
+SELECT origin_city_name, dest_city_name, fl_date, to_utc_timestamp(fl_date, 
'America/Los_Angeles') as fl_time, arr_delay, fl_num
+FROM flights_tiny;
+
+SELECT * FROM flights_tiny_parquet;
+
+SET hive.vectorized.execution.enabled=false;
+
+select * from flights_tiny_parquet sort by fl_num, fl_date limit 25;
+
+select fl_date, count(*) from flights_tiny_parquet group by fl_date;
+
+SET hive.vectorized.execution.enabled=true;
+
+explain vectorization expression
+select * from flights_tiny_parquet sort by fl_num, fl_date limit 25;
+
+select * from flights_tiny_parquet sort by fl_num, fl_date limit 25;
+
+explain vectorization expression
+select fl_date, count(*) from flights_tiny_parquet group by fl_date;
+
+select fl_date, count(*) from flights_tiny_parquet group by fl_date;
+
+
+SET hive.vectorized.execution.enabled=false;
+
+CREATE TABLE flights_tiny_parquet_partitioned_date (
+  origin_city_name STRING,
+  dest_city_name STRING,
+  fl_time TIMESTAMP,
+  arr_delay FLOAT,
+  fl_num INT
+)
+PARTITIONED BY (fl_date DATE)
+STORED AS PARQUET;
+
+set hive.exec.dynamic.partition.mode=nonstrict;
+
+INSERT INTO TABLE flights_tiny_parquet_partitioned_date
+PARTITION (fl_date)
+SELECT  origin_city_name, dest_city_name, fl_time, arr_delay, fl_num, fl_date
+FROM flights_tiny_parquet;
+
+
+select * from flights_tiny_parquet_partitioned_date;
+
+select * from flights_tiny_parquet_partitioned_date sort by fl_num, fl_date 
limit 25;
+
+select fl_date, count(*) from flights_tiny_parquet_partitioned_date group by 
fl_date;
+
+SET hive.vectorized.execution.enabled=true;
+
+explain vectorization expression
+select * from flights_tiny_parquet_partitioned_date;
+
+select * from flights_tiny_parquet_partitioned_date;
+
+explain vectorization expression
+select * from flights_tiny_parquet_partitioned_date sort by fl_num, fl_date 
limit 25;
+
+select * from flights_tiny_parquet_partitioned_date sort by fl_num, fl_date 
limit 25;
+
+explain vectorization expression
+select fl_date, count(*) from flights_tiny_parquet_partitioned_date group by 
fl_date;
+
+select fl_date, count(*) from flights_tiny_parquet_partitioned_date group by 
fl_date;
+
+
+SET hive.vectorized.execution.enabled=false;
+
+CREATE TABLE flights_tiny_parquet_partitioned_timestamp (
+  origin_city_name STRING,
+  dest_city_name STRING,
+  fl_date DATE,
+  arr_delay FLOAT,
+  fl_num INT
+)
+PARTITIONED BY (fl_time TIMESTAMP)
+STORED AS PARQUET;
+
+set hive.exec.dynamic.partition.mode=nonstrict;
+
+INSERT INTO TABLE flights_tiny_parquet_partitioned_timestamp
+PARTITION (fl_time)
+SELECT  origin_city_name, dest_city_name, fl_date, arr_delay, fl_num, fl_time
+FROM flights_tiny_parquet;
+
+
+select * from flights_tiny_parquet_partitioned_timestamp;
+
+select * from flights_tiny_parquet_partitioned_timestamp sort by fl_num, 
fl_time limit 25;
+
+select fl_time, count(*) from flights_tiny_parquet_partitioned_timestamp group 
by fl_time;
+
+SET hive.vectorized.execution.enabled=true;
+
+explain vectorization expression
+select * from flights_tiny_parquet_partitioned_timestamp;
+
+select * from flights_tiny_parquet_partitioned_timestamp;
+
+explain vectorization expression
+select * from flights_tiny_parquet_partitioned_timestamp sort by fl_num, 
fl_time limit 25;
+
+select * from flights_tiny_parquet_partitioned_timestamp sort by fl_num, 
fl_time limit 25;
+
+explain vectorization expression
+select fl_time, count(*) from flights_tiny_parquet_partitioned_timestamp group 
by fl_time;
+
+select fl_time, count(*) from flights_tiny_parquet_partitioned_timestamp group 
by fl_time;
\ No newline at end of file

Reply via email to