Repository: hive Updated Branches: refs/heads/master b3043a37d -> f69fcce10
HIVE-17696: Vectorized reader does not seem to be pushing down projection columns in certain code paths (Ferdinand Xu, via Vihang Karajgaonkar) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f69fcce1 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f69fcce1 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f69fcce1 Branch: refs/heads/master Commit: f69fcce1085bce491738b91aae31a7b368a1d444 Parents: b3043a3 Author: Ferdinand Xu <cheng.a...@intel.com> Authored: Thu Oct 26 15:06:38 2017 +0800 Committer: Ferdinand Xu <cheng.a...@intel.com> Committed: Thu Oct 26 15:09:39 2017 +0800 ---------------------------------------------------------------------- .../parquet/read/DataWritableReadSupport.java | 89 ++++++++++++++------ .../vector/VectorizedParquetRecordReader.java | 24 +----- 2 files changed, 67 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f69fcce1/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java index 604cbbc..8645d51 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java @@ -16,7 +16,6 @@ package org.apache.hadoop.hive.ql.io.parquet.read; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.ListIterator; import java.util.Map; @@ -354,34 +353,15 @@ public class DataWritableReadSupport extends ReadSupport<ArrayWritable> { String columnTypes = configuration.get(IOConstants.COLUMNS_TYPES); List<TypeInfo> columnTypesList = getColumnTypes(columnTypes); - MessageType tableSchema; - if (indexAccess) { - List<Integer> indexSequence = new ArrayList<Integer>(); - - // Generates a sequence list of indexes - for(int i = 0; i < columnNamesList.size(); i++) { - indexSequence.add(i); - } - - tableSchema = getSchemaByIndex(fileSchema, columnNamesList, indexSequence); - } else { - - tableSchema = getSchemaByName(fileSchema, columnNamesList, columnTypesList); - } + MessageType tableSchema = + getRequestedSchemaForIndexAccess(indexAccess, columnNamesList, columnTypesList, fileSchema); contextMetadata.put(HIVE_TABLE_AS_PARQUET_SCHEMA, tableSchema.toString()); contextMetadata.put(PARQUET_COLUMN_INDEX_ACCESS, String.valueOf(indexAccess)); this.hiveTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNamesList, columnTypesList); - Set<String> groupPaths = ColumnProjectionUtils.getNestedColumnPaths(configuration); - List<Integer> indexColumnsWanted = ColumnProjectionUtils.getReadColumnIDs(configuration); - if (!ColumnProjectionUtils.isReadAllColumns(configuration) && !indexColumnsWanted.isEmpty()) { - MessageType requestedSchemaByUser = getProjectedSchema(tableSchema, columnNamesList, - indexColumnsWanted, groupPaths); - return new ReadContext(requestedSchemaByUser, contextMetadata); - } else { - return new ReadContext(tableSchema, contextMetadata); - } + return new ReadContext(getRequestedPrunedSchema(columnNamesList, tableSchema, configuration), + contextMetadata); } else { contextMetadata.put(HIVE_TABLE_AS_PARQUET_SCHEMA, fileSchema.toString()); return new ReadContext(fileSchema, contextMetadata); @@ -389,6 +369,67 @@ public class DataWritableReadSupport extends ReadSupport<ArrayWritable> { } /** + * It's used for vectorized code path. + * @param indexAccess + * @param columnNamesList + * @param columnTypesList + * @param fileSchema + * @param configuration + * @return + */ + public static MessageType getRequestedSchema( + boolean indexAccess, + List<String> columnNamesList, + List<TypeInfo> columnTypesList, + MessageType fileSchema, + Configuration configuration) { + MessageType tableSchema = + getRequestedSchemaForIndexAccess(indexAccess, columnNamesList, columnTypesList, fileSchema); + + List<Integer> indexColumnsWanted = ColumnProjectionUtils.getReadColumnIDs(configuration); + //TODO Duplicated code for init method since vectorization reader path doesn't support Nested + // column pruning so far. See HIVE-15156 + if (!ColumnProjectionUtils.isReadAllColumns(configuration) && !indexColumnsWanted.isEmpty()) { + return DataWritableReadSupport + .getSchemaByIndex(tableSchema, columnNamesList, indexColumnsWanted); + } else { + return fileSchema; + } + } + + private static MessageType getRequestedSchemaForIndexAccess( + boolean indexAccess, + List<String> columnNamesList, + List<TypeInfo> columnTypesList, + MessageType fileSchema) { + if (indexAccess) { + List<Integer> indexSequence = new ArrayList<Integer>(); + + // Generates a sequence list of indexes + for (int i = 0; i < columnNamesList.size(); i++) { + indexSequence.add(i); + } + + return getSchemaByIndex(fileSchema, columnNamesList, indexSequence); + } else { + return getSchemaByName(fileSchema, columnNamesList, columnTypesList); + } + } + + private static MessageType getRequestedPrunedSchema( + List<String> columnNamesList, + MessageType fileSchema, + Configuration configuration) { + Set<String> groupPaths = ColumnProjectionUtils.getNestedColumnPaths(configuration); + List<Integer> indexColumnsWanted = ColumnProjectionUtils.getReadColumnIDs(configuration); + if (!ColumnProjectionUtils.isReadAllColumns(configuration) && !indexColumnsWanted.isEmpty()) { + return getProjectedSchema(fileSchema, columnNamesList, indexColumnsWanted, groupPaths); + } else { + return fileSchema; + } + } + + /** * * It creates the hive read support to interpret data from parquet to hive * http://git-wip-us.apache.org/repos/asf/hive/blob/f69fcce1/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java index 0977759..9c75f1d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java @@ -254,29 +254,9 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase } this.fileSchema = footer.getFileMetaData().getSchema(); - MessageType tableSchema; - if (indexAccess) { - List<Integer> indexSequence = new ArrayList<>(); - - // Generates a sequence list of indexes - for(int i = 0; i < columnNamesList.size(); i++) { - indexSequence.add(i); - } - - tableSchema = DataWritableReadSupport.getSchemaByIndex(fileSchema, columnNamesList, - indexSequence); - } else { - tableSchema = DataWritableReadSupport.getSchemaByName(fileSchema, columnNamesList, - columnTypesList); - } - indexColumnsWanted = ColumnProjectionUtils.getReadColumnIDs(configuration); - if (!ColumnProjectionUtils.isReadAllColumns(configuration) && !indexColumnsWanted.isEmpty()) { - requestedSchema = - DataWritableReadSupport.getSchemaByIndex(tableSchema, columnNamesList, indexColumnsWanted); - } else { - requestedSchema = fileSchema; - } + requestedSchema = DataWritableReadSupport + .getRequestedSchema(indexAccess, columnNamesList, columnTypesList, fileSchema, configuration); Path path = wrapPathForCache(file, cacheKey, configuration, blocks); this.reader = new ParquetFileReader(