Repository: hive
Updated Branches:
  refs/heads/master 4d8b194bd -> 1c84e0c04


HIVE-17261: Hive use deprecated ParquetInputSplit constructor which blocked 
parquet dictionary filter (Junjie Chen, reviewed by Ferdinand Xu)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1c84e0c0
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1c84e0c0
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1c84e0c0

Branch: refs/heads/master
Commit: 1c84e0c043d68f7ce2e1dd80e1e54ca8d615e7ab
Parents: 4d8b194
Author: Ferdinand Xu <cheng.a...@intel.com>
Authored: Thu Sep 14 21:35:14 2017 +0800
Committer: Ferdinand Xu <cheng.a...@intel.com>
Committed: Thu Sep 14 21:35:14 2017 +0800

----------------------------------------------------------------------
 .../ql/io/parquet/ParquetRecordReaderBase.java  | 125 +++++++------------
 .../read/ParquetRecordReaderWrapper.java        |   5 +
 .../io/parquet/TestParquetRowGroupFilter.java   |  62 +++++++--
 3 files changed, 98 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/1c84e0c0/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java
index 167f9b6..6b3859a 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java
@@ -23,9 +23,9 @@ import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.serde2.SerDeStats;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.parquet.filter2.compat.FilterCompat;
-import org.apache.parquet.filter2.compat.RowGroupFilter;
+import org.apache.parquet.Preconditions;
 import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.ParquetInputFormat;
 import org.apache.parquet.hadoop.ParquetInputSplit;
@@ -40,7 +40,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 
 public class ParquetRecordReaderBase {
@@ -53,7 +52,6 @@ public class ParquetRecordReaderBase {
   protected JobConf jobConf;
 
   protected int schemaSize;
-  protected List<BlockMetaData> filtedBlocks;
   protected ParquetFileReader reader;
 
   /**
@@ -64,86 +62,57 @@ public class ParquetRecordReaderBase {
    * @return a ParquetInputSplit corresponding to the oldSplit
    * @throws IOException if the config cannot be enhanced or if the footer 
cannot be read from the file
    */
-  @SuppressWarnings("deprecation")
   protected ParquetInputSplit getSplit(
     final org.apache.hadoop.mapred.InputSplit oldSplit,
     final JobConf conf
   ) throws IOException {
-    ParquetInputSplit split;
-    if (oldSplit instanceof FileSplit) {
-      final Path finalPath = ((FileSplit) oldSplit).getPath();
-      jobConf = projectionPusher.pushProjectionsAndFilters(conf, 
finalPath.getParent());
-
-      // TODO enable MetadataFilter by using readFooter(Configuration 
configuration, Path file,
-      // MetadataFilter filter) API
-      final ParquetMetadata parquetMetadata = 
ParquetFileReader.readFooter(jobConf, finalPath);
-      final List<BlockMetaData> blocks = parquetMetadata.getBlocks();
-      final FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
-
-      final ReadSupport.ReadContext
-        readContext = new DataWritableReadSupport().init(new 
InitContext(jobConf,
-        null, fileMetaData.getSchema()));
-
-      // Compute stats
-      for (BlockMetaData bmd : blocks) {
-        serDeStats.setRowCount(serDeStats.getRowCount() + bmd.getRowCount());
-        serDeStats.setRawDataSize(serDeStats.getRawDataSize() + 
bmd.getTotalByteSize());
-      }
-
-      schemaSize = 
MessageTypeParser.parseMessageType(readContext.getReadSupportMetadata()
-        
.get(DataWritableReadSupport.HIVE_TABLE_AS_PARQUET_SCHEMA)).getFieldCount();
-      final List<BlockMetaData> splitGroup = new ArrayList<BlockMetaData>();
-      final long splitStart = ((FileSplit) oldSplit).getStart();
-      final long splitLength = ((FileSplit) oldSplit).getLength();
-      for (final BlockMetaData block : blocks) {
-        final long firstDataPage = 
block.getColumns().get(0).getFirstDataPageOffset();
-        if (firstDataPage >= splitStart && firstDataPage < splitStart + 
splitLength) {
-          splitGroup.add(block);
-        }
-      }
-      if (splitGroup.isEmpty()) {
-        LOG.warn("Skipping split, could not find row group in: " + oldSplit);
-        return null;
-      }
-
-      FilterCompat.Filter filter = setFilter(jobConf, 
fileMetaData.getSchema());
-      if (filter != null) {
-        filtedBlocks = RowGroupFilter.filterRowGroups(filter, splitGroup, 
fileMetaData.getSchema());
-        if (filtedBlocks.isEmpty()) {
-          LOG.debug("All row groups are dropped due to filter predicates");
-          return null;
-        }
-
-        long droppedBlocks = splitGroup.size() - filtedBlocks.size();
-        if (droppedBlocks > 0) {
-          LOG.debug("Dropping " + droppedBlocks + " row groups that do not 
pass filter predicate");
-        }
-      } else {
-        filtedBlocks = splitGroup;
-      }
-
-      if (HiveConf.getBoolVar(conf, 
HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION)) {
-        skipTimestampConversion = 
!Strings.nullToEmpty(fileMetaData.getCreatedBy()).startsWith("parquet-mr");
-      }
-      split = new ParquetInputSplit(finalPath,
-        splitStart,
-        splitLength,
-        oldSplit.getLocations(),
-        filtedBlocks,
-        readContext.getRequestedSchema().toString(),
-        fileMetaData.getSchema().toString(),
-        fileMetaData.getKeyValueMetaData(),
-        readContext.getReadSupportMetadata());
-      return split;
-    } else {
-      throw new IllegalArgumentException("Unknown split type: " + oldSplit);
+    Preconditions.checkArgument((oldSplit instanceof FileSplit), "Unknown 
split type:" + oldSplit);
+    final Path finalPath = ((FileSplit) oldSplit).getPath();
+    jobConf = projectionPusher.pushProjectionsAndFilters(conf, 
finalPath.getParent());
+
+    // TODO enable MetadataFilter
+    final ParquetMetadata parquetMetadata = 
ParquetFileReader.readFooter(jobConf,
+      finalPath, ParquetMetadataConverter.NO_FILTER);
+    final List<BlockMetaData> blocks = parquetMetadata.getBlocks();
+    final FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
+
+    final ReadSupport.ReadContext
+      readContext = new DataWritableReadSupport().init(new InitContext(jobConf,
+      null, fileMetaData.getSchema()));
+
+    // Compute stats
+    for (BlockMetaData bmd : blocks) {
+      serDeStats.setRowCount(serDeStats.getRowCount() + bmd.getRowCount());
+      serDeStats.setRawDataSize(serDeStats.getRawDataSize() + 
bmd.getTotalByteSize());
+    }
+
+    schemaSize = 
MessageTypeParser.parseMessageType(readContext.getReadSupportMetadata()
+      
.get(DataWritableReadSupport.HIVE_TABLE_AS_PARQUET_SCHEMA)).getFieldCount();
+
+    final long splitStart = ((FileSplit) oldSplit).getStart();
+    final long splitLength = ((FileSplit) oldSplit).getLength();
+
+    setFilter(jobConf, fileMetaData.getSchema());
+
+    if (HiveConf.getBoolVar(conf, 
HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION)) {
+      skipTimestampConversion = 
!Strings.nullToEmpty(fileMetaData.getCreatedBy()).startsWith("parquet-mr");
     }
+
+    // rowGroupOffsets need to be set to null otherwise filter will not be 
applied
+    // in ParquetRecordReader#initializeInternalReader
+    return new ParquetInputSplit(finalPath,
+      splitStart,
+      splitStart + splitLength,
+      splitLength,
+      oldSplit.getLocations(),
+      null);
+
   }
 
-  public FilterCompat.Filter setFilter(final JobConf conf, MessageType schema) 
{
+  private void setFilter(final JobConf conf, MessageType schema) {
     SearchArgument sarg = ConvertAstToSearchArg.createFromConf(conf);
     if (sarg == null) {
-      return null;
+      return;
     }
 
     // Create the Parquet FilterPredicate without including columns that do 
not exist
@@ -153,18 +122,12 @@ public class ParquetRecordReaderBase {
       // Filter may have sensitive information. Do not send to debug.
       LOG.debug("PARQUET predicate push down generated.");
       ParquetInputFormat.setFilterPredicate(conf, p);
-      return FilterCompat.get(p);
     } else {
       // Filter may have sensitive information. Do not send to debug.
       LOG.debug("No PARQUET predicate push down is generated.");
-      return null;
     }
   }
 
-  public List<BlockMetaData> getFiltedBlocks() {
-    return filtedBlocks;
-  }
-
   public SerDeStats getStats() {
     return serDeStats;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/1c84e0c0/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java
index ac430a6..4ea8978 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java
@@ -88,6 +88,11 @@ public class ParquetRecordReaderWrapper extends 
ParquetRecordReaderBase
         HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION, 
skipTimestampConversion);
     }
 
+    if (jobConf.get(ParquetInputFormat.FILTER_PREDICATE) != null) {
+      conf.set(ParquetInputFormat.FILTER_PREDICATE,
+        jobConf.get(ParquetInputFormat.FILTER_PREDICATE));
+    }
+
     final TaskAttemptContext taskContext = 
ContextUtil.newTaskAttemptContext(conf, taskAttemptID);
     if (split != null) {
       try {

http://git-wip-us.apache.org/repos/asf/hive/blob/1c84e0c0/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java
 
b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java
index bf363f3..c0bfc3f 100644
--- 
a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java
+++ 
b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java
@@ -24,8 +24,11 @@ import java.util.List;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
-import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper;
+import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
+import 
org.apache.hadoop.hive.ql.io.parquet.read.ParquetFilterPredicateConverter;
 import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -39,8 +42,15 @@ import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.hadoop.ParquetInputFormat;
+import org.apache.parquet.hadoop.ParquetInputSplit;
+import org.apache.parquet.hadoop.util.ContextUtil;
 import org.apache.parquet.io.api.RecordConsumer;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.MessageTypeParser;
@@ -93,6 +103,14 @@ public class TestParquetRowGroupFilter extends 
AbstractTestParquetDirect {
           }
         });
 
+    FileSplit testSplit = new FileSplit(testPath, 0, fileLength(testPath), 
(String[]) null);
+    ParquetInputSplit parquetSplit = new ParquetInputSplit(testPath,
+      testSplit.getStart(),
+      testSplit.getStart() + testSplit.getLength(),
+      testSplit.getLength(),
+      testSplit.getLocations(),
+      null);
+
     // > 50
     GenericUDF udf = new GenericUDFOPGreaterThan();
     List<ExprNodeDesc> children = Lists.newArrayList();
@@ -103,12 +121,22 @@ public class TestParquetRowGroupFilter extends 
AbstractTestParquetDirect {
     ExprNodeGenericFuncDesc genericFuncDesc = new 
ExprNodeGenericFuncDesc(inspector, udf, children);
     String searchArgumentStr = 
SerializationUtilities.serializeExpression(genericFuncDesc);
     conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, searchArgumentStr);
-
-    ParquetRecordReaderWrapper recordReader = (ParquetRecordReaderWrapper)
-        new MapredParquetInputFormat().getRecordReader(
-        new FileSplit(testPath, 0, fileLength(testPath), (String[]) null), 
conf, null);
-
-    Assert.assertEquals("row group is not filtered correctly", 1, 
recordReader.getFiltedBlocks().size());
+    SearchArgument sarg = ConvertAstToSearchArg.createFromConf(conf);
+    FilterPredicate p = 
ParquetFilterPredicateConverter.toFilterPredicate(sarg, fileSchema);
+    TaskAttemptID taskAttemptID = new org.apache.hadoop.mapred.TaskAttemptID();
+    TaskAttemptContext taskContext = ContextUtil.newTaskAttemptContext(conf, 
taskAttemptID);
+    ParquetInputFormat<ArrayWritable> inputFormat = new 
ParquetInputFormat<>(DataWritableReadSupport.class);
+    org.apache.hadoop.mapreduce.RecordReader<Void, ArrayWritable> recordReader 
=
+      inputFormat.createRecordReader(parquetSplit, taskContext);
+    ParquetInputFormat.setFilterPredicate(conf, p);
+
+    try {
+      recordReader.initialize(parquetSplit, taskContext);
+      boolean hasValue = recordReader.nextKeyValue();
+      Assert.assertTrue("Row groups should not be filtered.", hasValue);
+    } finally {
+      recordReader.close();
+    }
 
     // > 100
     constantDesc = new ExprNodeConstantDesc(100);
@@ -116,12 +144,20 @@ public class TestParquetRowGroupFilter extends 
AbstractTestParquetDirect {
     genericFuncDesc = new ExprNodeGenericFuncDesc(inspector, udf, children);
     searchArgumentStr = 
SerializationUtilities.serializeExpression(genericFuncDesc);
     conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, searchArgumentStr);
-
-    recordReader = (ParquetRecordReaderWrapper)
-        new MapredParquetInputFormat().getRecordReader(
-            new FileSplit(testPath, 0, fileLength(testPath), (String[]) null), 
conf, null);
-
-    Assert.assertEquals("row group is not filtered correctly", 0, 
recordReader.getFiltedBlocks().size());
+    sarg = ConvertAstToSearchArg.createFromConf(conf);
+    p = ParquetFilterPredicateConverter.toFilterPredicate(sarg, fileSchema);
+    taskAttemptID = new org.apache.hadoop.mapred.TaskAttemptID();
+    taskContext = ContextUtil.newTaskAttemptContext(conf, taskAttemptID);
+    recordReader = inputFormat.createRecordReader(parquetSplit, taskContext);
+    ParquetInputFormat.setFilterPredicate(conf, p);
+
+    try {
+      recordReader.initialize(parquetSplit, taskContext);
+      boolean hasValue = recordReader.nextKeyValue();
+      Assert.assertFalse("Row groups should be filtered.", hasValue);
+    } finally {
+      recordReader.close();
+    }
   }
 
   private ArrayWritableObjectInspector getObjectInspector(final String 
columnNames, final String columnTypes) {

Reply via email to