Repository: hive Updated Branches: refs/heads/master 4d8b194bd -> 1c84e0c04
HIVE-17261: Hive use deprecated ParquetInputSplit constructor which blocked parquet dictionary filter (Junjie Chen, reviewed by Ferdinand Xu) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1c84e0c0 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1c84e0c0 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1c84e0c0 Branch: refs/heads/master Commit: 1c84e0c043d68f7ce2e1dd80e1e54ca8d615e7ab Parents: 4d8b194 Author: Ferdinand Xu <cheng.a...@intel.com> Authored: Thu Sep 14 21:35:14 2017 +0800 Committer: Ferdinand Xu <cheng.a...@intel.com> Committed: Thu Sep 14 21:35:14 2017 +0800 ---------------------------------------------------------------------- .../ql/io/parquet/ParquetRecordReaderBase.java | 125 +++++++------------ .../read/ParquetRecordReaderWrapper.java | 5 + .../io/parquet/TestParquetRowGroupFilter.java | 62 +++++++-- 3 files changed, 98 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/1c84e0c0/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java index 167f9b6..6b3859a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java @@ -23,9 +23,9 @@ import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.JobConf; -import org.apache.parquet.filter2.compat.FilterCompat; -import org.apache.parquet.filter2.compat.RowGroupFilter; +import org.apache.parquet.Preconditions; import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetInputFormat; import org.apache.parquet.hadoop.ParquetInputSplit; @@ -40,7 +40,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.ArrayList; import java.util.List; public class ParquetRecordReaderBase { @@ -53,7 +52,6 @@ public class ParquetRecordReaderBase { protected JobConf jobConf; protected int schemaSize; - protected List<BlockMetaData> filtedBlocks; protected ParquetFileReader reader; /** @@ -64,86 +62,57 @@ public class ParquetRecordReaderBase { * @return a ParquetInputSplit corresponding to the oldSplit * @throws IOException if the config cannot be enhanced or if the footer cannot be read from the file */ - @SuppressWarnings("deprecation") protected ParquetInputSplit getSplit( final org.apache.hadoop.mapred.InputSplit oldSplit, final JobConf conf ) throws IOException { - ParquetInputSplit split; - if (oldSplit instanceof FileSplit) { - final Path finalPath = ((FileSplit) oldSplit).getPath(); - jobConf = projectionPusher.pushProjectionsAndFilters(conf, finalPath.getParent()); - - // TODO enable MetadataFilter by using readFooter(Configuration configuration, Path file, - // MetadataFilter filter) API - final ParquetMetadata parquetMetadata = ParquetFileReader.readFooter(jobConf, finalPath); - final List<BlockMetaData> blocks = parquetMetadata.getBlocks(); - final FileMetaData fileMetaData = parquetMetadata.getFileMetaData(); - - final ReadSupport.ReadContext - readContext = new DataWritableReadSupport().init(new InitContext(jobConf, - null, fileMetaData.getSchema())); - - // Compute stats - for (BlockMetaData bmd : blocks) { - serDeStats.setRowCount(serDeStats.getRowCount() + bmd.getRowCount()); - serDeStats.setRawDataSize(serDeStats.getRawDataSize() + bmd.getTotalByteSize()); - } - - schemaSize = MessageTypeParser.parseMessageType(readContext.getReadSupportMetadata() - .get(DataWritableReadSupport.HIVE_TABLE_AS_PARQUET_SCHEMA)).getFieldCount(); - final List<BlockMetaData> splitGroup = new ArrayList<BlockMetaData>(); - final long splitStart = ((FileSplit) oldSplit).getStart(); - final long splitLength = ((FileSplit) oldSplit).getLength(); - for (final BlockMetaData block : blocks) { - final long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset(); - if (firstDataPage >= splitStart && firstDataPage < splitStart + splitLength) { - splitGroup.add(block); - } - } - if (splitGroup.isEmpty()) { - LOG.warn("Skipping split, could not find row group in: " + oldSplit); - return null; - } - - FilterCompat.Filter filter = setFilter(jobConf, fileMetaData.getSchema()); - if (filter != null) { - filtedBlocks = RowGroupFilter.filterRowGroups(filter, splitGroup, fileMetaData.getSchema()); - if (filtedBlocks.isEmpty()) { - LOG.debug("All row groups are dropped due to filter predicates"); - return null; - } - - long droppedBlocks = splitGroup.size() - filtedBlocks.size(); - if (droppedBlocks > 0) { - LOG.debug("Dropping " + droppedBlocks + " row groups that do not pass filter predicate"); - } - } else { - filtedBlocks = splitGroup; - } - - if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION)) { - skipTimestampConversion = !Strings.nullToEmpty(fileMetaData.getCreatedBy()).startsWith("parquet-mr"); - } - split = new ParquetInputSplit(finalPath, - splitStart, - splitLength, - oldSplit.getLocations(), - filtedBlocks, - readContext.getRequestedSchema().toString(), - fileMetaData.getSchema().toString(), - fileMetaData.getKeyValueMetaData(), - readContext.getReadSupportMetadata()); - return split; - } else { - throw new IllegalArgumentException("Unknown split type: " + oldSplit); + Preconditions.checkArgument((oldSplit instanceof FileSplit), "Unknown split type:" + oldSplit); + final Path finalPath = ((FileSplit) oldSplit).getPath(); + jobConf = projectionPusher.pushProjectionsAndFilters(conf, finalPath.getParent()); + + // TODO enable MetadataFilter + final ParquetMetadata parquetMetadata = ParquetFileReader.readFooter(jobConf, + finalPath, ParquetMetadataConverter.NO_FILTER); + final List<BlockMetaData> blocks = parquetMetadata.getBlocks(); + final FileMetaData fileMetaData = parquetMetadata.getFileMetaData(); + + final ReadSupport.ReadContext + readContext = new DataWritableReadSupport().init(new InitContext(jobConf, + null, fileMetaData.getSchema())); + + // Compute stats + for (BlockMetaData bmd : blocks) { + serDeStats.setRowCount(serDeStats.getRowCount() + bmd.getRowCount()); + serDeStats.setRawDataSize(serDeStats.getRawDataSize() + bmd.getTotalByteSize()); + } + + schemaSize = MessageTypeParser.parseMessageType(readContext.getReadSupportMetadata() + .get(DataWritableReadSupport.HIVE_TABLE_AS_PARQUET_SCHEMA)).getFieldCount(); + + final long splitStart = ((FileSplit) oldSplit).getStart(); + final long splitLength = ((FileSplit) oldSplit).getLength(); + + setFilter(jobConf, fileMetaData.getSchema()); + + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION)) { + skipTimestampConversion = !Strings.nullToEmpty(fileMetaData.getCreatedBy()).startsWith("parquet-mr"); } + + // rowGroupOffsets need to be set to null otherwise filter will not be applied + // in ParquetRecordReader#initializeInternalReader + return new ParquetInputSplit(finalPath, + splitStart, + splitStart + splitLength, + splitLength, + oldSplit.getLocations(), + null); + } - public FilterCompat.Filter setFilter(final JobConf conf, MessageType schema) { + private void setFilter(final JobConf conf, MessageType schema) { SearchArgument sarg = ConvertAstToSearchArg.createFromConf(conf); if (sarg == null) { - return null; + return; } // Create the Parquet FilterPredicate without including columns that do not exist @@ -153,18 +122,12 @@ public class ParquetRecordReaderBase { // Filter may have sensitive information. Do not send to debug. LOG.debug("PARQUET predicate push down generated."); ParquetInputFormat.setFilterPredicate(conf, p); - return FilterCompat.get(p); } else { // Filter may have sensitive information. Do not send to debug. LOG.debug("No PARQUET predicate push down is generated."); - return null; } } - public List<BlockMetaData> getFiltedBlocks() { - return filtedBlocks; - } - public SerDeStats getStats() { return serDeStats; } http://git-wip-us.apache.org/repos/asf/hive/blob/1c84e0c0/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java index ac430a6..4ea8978 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java @@ -88,6 +88,11 @@ public class ParquetRecordReaderWrapper extends ParquetRecordReaderBase HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION, skipTimestampConversion); } + if (jobConf.get(ParquetInputFormat.FILTER_PREDICATE) != null) { + conf.set(ParquetInputFormat.FILTER_PREDICATE, + jobConf.get(ParquetInputFormat.FILTER_PREDICATE)); + } + final TaskAttemptContext taskContext = ContextUtil.newTaskAttemptContext(conf, taskAttemptID); if (split != null) { try { http://git-wip-us.apache.org/repos/asf/hive/blob/1c84e0c0/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java index bf363f3..c0bfc3f 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java @@ -24,8 +24,11 @@ import java.util.List; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.SerializationUtilities; -import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper; +import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; +import org.apache.hadoop.hive.ql.io.parquet.read.ParquetFilterPredicateConverter; import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -39,8 +42,15 @@ import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.hadoop.ParquetInputFormat; +import org.apache.parquet.hadoop.ParquetInputSplit; +import org.apache.parquet.hadoop.util.ContextUtil; import org.apache.parquet.io.api.RecordConsumer; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.MessageTypeParser; @@ -93,6 +103,14 @@ public class TestParquetRowGroupFilter extends AbstractTestParquetDirect { } }); + FileSplit testSplit = new FileSplit(testPath, 0, fileLength(testPath), (String[]) null); + ParquetInputSplit parquetSplit = new ParquetInputSplit(testPath, + testSplit.getStart(), + testSplit.getStart() + testSplit.getLength(), + testSplit.getLength(), + testSplit.getLocations(), + null); + // > 50 GenericUDF udf = new GenericUDFOPGreaterThan(); List<ExprNodeDesc> children = Lists.newArrayList(); @@ -103,12 +121,22 @@ public class TestParquetRowGroupFilter extends AbstractTestParquetDirect { ExprNodeGenericFuncDesc genericFuncDesc = new ExprNodeGenericFuncDesc(inspector, udf, children); String searchArgumentStr = SerializationUtilities.serializeExpression(genericFuncDesc); conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, searchArgumentStr); - - ParquetRecordReaderWrapper recordReader = (ParquetRecordReaderWrapper) - new MapredParquetInputFormat().getRecordReader( - new FileSplit(testPath, 0, fileLength(testPath), (String[]) null), conf, null); - - Assert.assertEquals("row group is not filtered correctly", 1, recordReader.getFiltedBlocks().size()); + SearchArgument sarg = ConvertAstToSearchArg.createFromConf(conf); + FilterPredicate p = ParquetFilterPredicateConverter.toFilterPredicate(sarg, fileSchema); + TaskAttemptID taskAttemptID = new org.apache.hadoop.mapred.TaskAttemptID(); + TaskAttemptContext taskContext = ContextUtil.newTaskAttemptContext(conf, taskAttemptID); + ParquetInputFormat<ArrayWritable> inputFormat = new ParquetInputFormat<>(DataWritableReadSupport.class); + org.apache.hadoop.mapreduce.RecordReader<Void, ArrayWritable> recordReader = + inputFormat.createRecordReader(parquetSplit, taskContext); + ParquetInputFormat.setFilterPredicate(conf, p); + + try { + recordReader.initialize(parquetSplit, taskContext); + boolean hasValue = recordReader.nextKeyValue(); + Assert.assertTrue("Row groups should not be filtered.", hasValue); + } finally { + recordReader.close(); + } // > 100 constantDesc = new ExprNodeConstantDesc(100); @@ -116,12 +144,20 @@ public class TestParquetRowGroupFilter extends AbstractTestParquetDirect { genericFuncDesc = new ExprNodeGenericFuncDesc(inspector, udf, children); searchArgumentStr = SerializationUtilities.serializeExpression(genericFuncDesc); conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, searchArgumentStr); - - recordReader = (ParquetRecordReaderWrapper) - new MapredParquetInputFormat().getRecordReader( - new FileSplit(testPath, 0, fileLength(testPath), (String[]) null), conf, null); - - Assert.assertEquals("row group is not filtered correctly", 0, recordReader.getFiltedBlocks().size()); + sarg = ConvertAstToSearchArg.createFromConf(conf); + p = ParquetFilterPredicateConverter.toFilterPredicate(sarg, fileSchema); + taskAttemptID = new org.apache.hadoop.mapred.TaskAttemptID(); + taskContext = ContextUtil.newTaskAttemptContext(conf, taskAttemptID); + recordReader = inputFormat.createRecordReader(parquetSplit, taskContext); + ParquetInputFormat.setFilterPredicate(conf, p); + + try { + recordReader.initialize(parquetSplit, taskContext); + boolean hasValue = recordReader.nextKeyValue(); + Assert.assertFalse("Row groups should be filtered.", hasValue); + } finally { + recordReader.close(); + } } private ArrayWritableObjectInspector getObjectInspector(final String columnNames, final String columnTypes) {