This is an automated email from the ASF dual-hosted git repository.

szita pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new dc6e13d746a HIVE-25827: Parquet file footer is read multiple times, 
when multiple splits are created in same file (#3368) (Adam Szita, reviewed by 
Peter Vary)
dc6e13d746a is described below

commit dc6e13d746a88456254378455f37af576ee4722b
Author: Adam Szita <40628386+sz...@users.noreply.github.com>
AuthorDate: Wed Jun 22 11:07:43 2022 +0200

    HIVE-25827: Parquet file footer is read multiple times, when multiple 
splits are created in same file (#3368) (Adam Szita, reviewed by Peter Vary)
---
 .../ql/io/parquet/ParquetRecordReaderBase.java     | 176 ++++++++++++---------
 .../parquet/read/ParquetRecordReaderWrapper.java   |  23 +--
 .../vector/VectorizedParquetRecordReader.java      | 171 +++++++++-----------
 .../ql/io/parquet/TestParquetRowGroupFilter.java   |   4 +-
 .../ql/io/parquet/TestVectorizedColumnReader.java  |  10 +-
 5 files changed, 187 insertions(+), 197 deletions(-)

diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java
index 4cc32ae4804..a665c2586a3 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hive.serde2.SerDeStats;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.parquet.filter2.compat.FilterCompat;
 import org.apache.parquet.filter2.compat.RowGroupFilter;
@@ -48,115 +49,134 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class ParquetRecordReaderBase {
+public abstract class ParquetRecordReaderBase {
   public static final Logger LOG = 
LoggerFactory.getLogger(ParquetRecordReaderBase.class);
 
-  protected Path file;
+  protected final FileSplit fileSplit;
+  protected Path filePath;
+  protected ParquetInputSplit parquetInputSplit;
+  protected ParquetMetadata parquetMetadata;
   protected ProjectionPusher projectionPusher;
   protected boolean skipTimestampConversion = false;
   protected Boolean skipProlepticConversion;
   protected Boolean legacyConversionEnabled;
   protected SerDeStats serDeStats;
-  protected JobConf jobConf;
+  protected final JobConf jobConf;
 
   protected int schemaSize;
-  protected List<BlockMetaData> filtedBlocks;
+  protected List<BlockMetaData> filteredBlocks;
   protected ParquetFileReader reader;
 
+  protected ParquetRecordReaderBase(JobConf conf, InputSplit oldSplit) throws 
IOException {
+    serDeStats = new SerDeStats();
+    projectionPusher = new ProjectionPusher();
+
+    if (!(oldSplit instanceof FileSplit)) {
+      throw new IllegalArgumentException("Unknown split type: " + oldSplit);
+    }
+    this.fileSplit = (FileSplit) oldSplit;
+    this.jobConf = projectionPusher.pushProjectionsAndFilters(conf, 
fileSplit.getPath().getParent());
+    this.filePath = fileSplit.getPath();
+  }
+
+  protected void setupMetadataAndParquetSplit(JobConf conf) throws IOException 
{
+    // In the case of stat tasks a dummy split is created with -1 length but 
real path...
+    if (fileSplit.getLength() != 0) {
+      parquetMetadata = getParquetMetadata(filePath, conf);
+      parquetInputSplit = getSplit(conf);
+    }
+    // having null as parquetInputSplit seems to be a valid case based on this 
file's history
+  }
+
   /**
    * gets a ParquetInputSplit corresponding to a split given by Hive
    *
-   * @param oldSplit The split given by Hive
    * @param conf The JobConf of the Hive job
    * @return a ParquetInputSplit corresponding to the oldSplit
    * @throws IOException if the config cannot be enhanced or if the footer 
cannot be read from the file
    */
   @SuppressWarnings("deprecation")
   protected ParquetInputSplit getSplit(
-    final org.apache.hadoop.mapred.InputSplit oldSplit,
     final JobConf conf
   ) throws IOException {
-    if (oldSplit.getLength() == 0) {
-      return null;
-    }
+
     ParquetInputSplit split;
-    if (oldSplit instanceof FileSplit) {
-      final Path finalPath = ((FileSplit) oldSplit).getPath();
-      jobConf = projectionPusher.pushProjectionsAndFilters(conf, 
finalPath.getParent());
-
-      // TODO enable MetadataFilter by using readFooter(Configuration 
configuration, Path file,
-      // MetadataFilter filter) API
-      final ParquetMetadata parquetMetadata = 
ParquetFileReader.readFooter(jobConf, finalPath);
-      final List<BlockMetaData> blocks = parquetMetadata.getBlocks();
-      final FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
-
-      final ReadSupport.ReadContext
-        readContext = new DataWritableReadSupport().init(new 
InitContext(jobConf,
-        null, fileMetaData.getSchema()));
-
-      // Compute stats
-      for (BlockMetaData bmd : blocks) {
-        serDeStats.setRowCount(serDeStats.getRowCount() + bmd.getRowCount());
-        serDeStats.setRawDataSize(serDeStats.getRawDataSize() + 
bmd.getTotalByteSize());
-      }
+    final Path finalPath = fileSplit.getPath();
+
+    // TODO enable MetadataFilter by using readFooter(Configuration 
configuration, Path file,
+    // MetadataFilter filter) API
+    final List<BlockMetaData> blocks = parquetMetadata.getBlocks();
+    final FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
+
+    final ReadSupport.ReadContext
+      readContext = new DataWritableReadSupport().init(new InitContext(jobConf,
+      null, fileMetaData.getSchema()));
+
+    // Compute stats
+    for (BlockMetaData bmd : blocks) {
+      serDeStats.setRowCount(serDeStats.getRowCount() + bmd.getRowCount());
+      serDeStats.setRawDataSize(serDeStats.getRawDataSize() + 
bmd.getTotalByteSize());
+    }
 
-      schemaSize = 
MessageTypeParser.parseMessageType(readContext.getReadSupportMetadata()
-        
.get(DataWritableReadSupport.HIVE_TABLE_AS_PARQUET_SCHEMA)).getFieldCount();
-      final List<BlockMetaData> splitGroup = new ArrayList<BlockMetaData>();
-      final long splitStart = ((FileSplit) oldSplit).getStart();
-      final long splitLength = ((FileSplit) oldSplit).getLength();
-      for (final BlockMetaData block : blocks) {
-        final long firstDataPage = 
block.getColumns().get(0).getFirstDataPageOffset();
-        if (firstDataPage >= splitStart && firstDataPage < splitStart + 
splitLength) {
-          splitGroup.add(block);
-        }
+    schemaSize = 
MessageTypeParser.parseMessageType(readContext.getReadSupportMetadata()
+      
.get(DataWritableReadSupport.HIVE_TABLE_AS_PARQUET_SCHEMA)).getFieldCount();
+    final List<BlockMetaData> splitGroup = new ArrayList<BlockMetaData>();
+    final long splitStart = fileSplit.getStart();
+    final long splitLength = fileSplit.getLength();
+    for (final BlockMetaData block : blocks) {
+      final long firstDataPage = 
block.getColumns().get(0).getFirstDataPageOffset();
+      if (firstDataPage >= splitStart && firstDataPage < splitStart + 
splitLength) {
+        splitGroup.add(block);
       }
-      if (splitGroup.isEmpty()) {
-        LOG.warn("Skipping split, could not find row group in: " + oldSplit);
+    }
+    if (splitGroup.isEmpty()) {
+      LOG.warn("Skipping split, could not find row group in: " + fileSplit);
+      return null;
+    }
+
+    FilterCompat.Filter filter = setFilter(jobConf, fileMetaData.getSchema());
+    if (filter != null) {
+      filteredBlocks = RowGroupFilter.filterRowGroups(filter, splitGroup, 
fileMetaData.getSchema());
+      if (filteredBlocks.isEmpty()) {
+        LOG.debug("All row groups are dropped due to filter predicates");
         return null;
       }
 
-      FilterCompat.Filter filter = setFilter(jobConf, 
fileMetaData.getSchema());
-      if (filter != null) {
-        filtedBlocks = RowGroupFilter.filterRowGroups(filter, splitGroup, 
fileMetaData.getSchema());
-        if (filtedBlocks.isEmpty()) {
-          LOG.debug("All row groups are dropped due to filter predicates");
-          return null;
-        }
-
-        long droppedBlocks = splitGroup.size() - filtedBlocks.size();
-        if (droppedBlocks > 0) {
-          LOG.debug("Dropping " + droppedBlocks + " row groups that do not 
pass filter predicate");
-        }
-      } else {
-        filtedBlocks = splitGroup;
+      long droppedBlocks = splitGroup.size() - filteredBlocks.size();
+      if (droppedBlocks > 0) {
+        LOG.debug("Dropping " + droppedBlocks + " row groups that do not pass 
filter predicate");
       }
+    } else {
+      filteredBlocks = splitGroup;
+    }
 
-      if (HiveConf.getBoolVar(conf, 
HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION)) {
-        skipTimestampConversion = 
!Strings.nullToEmpty(fileMetaData.getCreatedBy()).startsWith("parquet-mr");
-      }
-      skipProlepticConversion = DataWritableReadSupport
-          .getWriterDateProleptic(fileMetaData.getKeyValueMetaData());
-      if (skipProlepticConversion == null) {
-        skipProlepticConversion = HiveConf.getBoolVar(
-            conf, 
HiveConf.ConfVars.HIVE_PARQUET_DATE_PROLEPTIC_GREGORIAN_DEFAULT);
-      }
+    if (HiveConf.getBoolVar(conf, 
HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION)) {
+      skipTimestampConversion = 
!Strings.nullToEmpty(fileMetaData.getCreatedBy()).startsWith("parquet-mr");
+    }
+    skipProlepticConversion = DataWritableReadSupport
+        .getWriterDateProleptic(fileMetaData.getKeyValueMetaData());
+    if (skipProlepticConversion == null) {
+      skipProlepticConversion = HiveConf.getBoolVar(
+          conf, 
HiveConf.ConfVars.HIVE_PARQUET_DATE_PROLEPTIC_GREGORIAN_DEFAULT);
+    }
       legacyConversionEnabled =
           
DataWritableReadSupport.getZoneConversionLegacy(fileMetaData.getKeyValueMetaData(),
 conf);
 
-      split = new ParquetInputSplit(finalPath,
-        splitStart,
-        splitLength,
-        oldSplit.getLocations(),
-        filtedBlocks,
-        readContext.getRequestedSchema().toString(),
-        fileMetaData.getSchema().toString(),
-        fileMetaData.getKeyValueMetaData(),
-        readContext.getReadSupportMetadata());
-      return split;
-    } else {
-      throw new IllegalArgumentException("Unknown split type: " + oldSplit);
-    }
+    split = new ParquetInputSplit(finalPath,
+      splitStart,
+      splitLength,
+      fileSplit.getLocations(),
+      filteredBlocks,
+      readContext.getRequestedSchema().toString(),
+      fileMetaData.getSchema().toString(),
+      fileMetaData.getKeyValueMetaData(),
+      readContext.getReadSupportMetadata());
+    return split;
+  }
+
+  @SuppressWarnings("deprecation")
+  protected ParquetMetadata getParquetMetadata(Path path, JobConf conf) throws 
IOException {
+    return ParquetFileReader.readFooter(jobConf, path);
   }
 
   public FilterCompat.Filter setFilter(final JobConf conf, MessageType schema) 
{
@@ -189,8 +209,8 @@ public class ParquetRecordReaderBase {
     }
   }
 
-  public List<BlockMetaData> getFiltedBlocks() {
-    return filtedBlocks;
+  public List<BlockMetaData> getFilteredBlocks() {
+    return filteredBlocks;
   }
 
   public SerDeStats getStats() {
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java
index 113d61f5f97..aebcd247354 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java
@@ -57,22 +57,11 @@ public class ParquetRecordReaderWrapper extends 
ParquetRecordReaderBase
       final JobConf oldJobConf,
       final Reporter reporter)
           throws IOException, InterruptedException {
-    this(newInputFormat, oldSplit, oldJobConf, reporter, new 
ProjectionPusher());
-  }
+    super(oldJobConf, oldSplit);
 
-  public ParquetRecordReaderWrapper(
-      final ParquetInputFormat<ArrayWritable> newInputFormat,
-      final InputSplit oldSplit,
-      final JobConf oldJobConf,
-      final Reporter reporter,
-      final ProjectionPusher pusher)
-          throws IOException, InterruptedException {
-    this.splitLen = oldSplit.getLength();
-    this.projectionPusher = pusher;
-    this.serDeStats = new SerDeStats();
+    setupMetadataAndParquetSplit(oldJobConf);
 
-    jobConf = oldJobConf;
-    final ParquetInputSplit split = getSplit(oldSplit, jobConf);
+    this.splitLen = oldSplit.getLength();
 
     TaskAttemptID taskAttemptID = 
TaskAttemptID.forName(jobConf.get(IOConstants.MAPRED_TASK_ID));
     if (taskAttemptID == null) {
@@ -89,10 +78,10 @@ public class ParquetRecordReaderWrapper extends 
ParquetRecordReaderBase
     }
 
     final TaskAttemptContext taskContext = 
ContextUtil.newTaskAttemptContext(conf, taskAttemptID);
-    if (split != null) {
+    if (parquetInputSplit != null) {
       try {
-        realReader = newInputFormat.createRecordReader(split, taskContext);
-        realReader.initialize(split, taskContext);
+        realReader = newInputFormat.createRecordReader(parquetInputSplit, 
taskContext);
+        realReader.initialize(parquetInputSplit, taskContext);
 
         // read once to gain access to key and value objects
         if (realReader.nextKeyValue()) {
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
index d17ddd5ab11..e0e14863dfd 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
@@ -37,27 +37,24 @@ import org.apache.hadoop.hive.ql.io.BucketIdentifier;
 import org.apache.hadoop.hive.ql.io.HdfsUtils;
 import org.apache.hadoop.hive.ql.io.IOConstants;
 import org.apache.hadoop.hive.ql.io.parquet.ParquetRecordReaderBase;
-import org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher;
 import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
-import org.apache.hadoop.hive.serde2.SerDeStats;
 import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.parquet.ParquetRuntimeException;
 import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.page.PageReadStore;
-import org.apache.parquet.filter2.compat.FilterCompat;
 import 
org.apache.parquet.format.converter.ParquetMetadataConverter.MetadataFilter;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.ParquetFileWriter;
@@ -83,15 +80,11 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 
 import static org.apache.hadoop.hive.llap.LlapHiveUtils.throwIfCacheOnlyRead;
-import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups;
 import static 
org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
-import static 
org.apache.parquet.format.converter.ParquetMetadataConverter.range;
-import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
 
 /**
  * This reader is used to read a batch of record from inputsplit, part of the 
code is referred
@@ -111,7 +104,6 @@ public class VectorizedParquetRecordReader extends 
ParquetRecordReaderBase
   private Object[] partitionValues;
   private Path cacheFsPath;
   private static final int MAP_DEFINITION_LEVEL_MAX = 3;
-  private Map<Path, PartitionDesc> parts;
   private final boolean isReadCacheOnly;
 
   /**
@@ -138,32 +130,53 @@ public class VectorizedParquetRecordReader extends 
ParquetRecordReaderBase
   private ZoneId writerTimezone;
   private final BucketIdentifier bucketIdentifier;
 
-  public VectorizedParquetRecordReader(
-      org.apache.hadoop.mapred.InputSplit oldInputSplit, JobConf conf) {
+  // LLAP cache integration
+  // TODO: also support fileKey in splits, like OrcSplit does
+  private Object cacheKey = null;
+  private CacheTag cacheTag = null;
+
+  public VectorizedParquetRecordReader(InputSplit oldInputSplit, JobConf conf) 
throws IOException {
     this(oldInputSplit, conf, null, null, null);
   }
 
   public VectorizedParquetRecordReader(
-      org.apache.hadoop.mapred.InputSplit oldInputSplit, JobConf conf,
-      FileMetadataCache metadataCache, DataCache dataCache, Configuration 
cacheConf) {
+      InputSplit oldInputSplit, JobConf conf,
+      FileMetadataCache metadataCache, DataCache dataCache, Configuration 
cacheConf)
+      throws IOException {
+    super(conf, oldInputSplit);
     try {
       this.metadataCache = metadataCache;
       this.cache = dataCache;
       this.cacheConf = cacheConf;
-      serDeStats = new SerDeStats();
-      projectionPusher = new ProjectionPusher();
+
+      if (metadataCache != null) {
+        cacheKey = HdfsUtils.getFileId(filePath.getFileSystem(conf), filePath,
+            HiveConf.getBoolVar(cacheConf, 
ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID),
+            HiveConf.getBoolVar(cacheConf, 
ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID),
+            !HiveConf.getBoolVar(cacheConf, ConfVars.LLAP_IO_USE_FILEID_PATH));
+        // HdfsUtils.getFileId might yield to null in certain configurations
+        if (cacheKey != null) {
+          cacheTag = cacheTagOfParquetFile(filePath, cacheConf, conf);
+          // If we are going to use cache, change the path to depend on file 
ID for extra consistency.
+          if (cacheKey instanceof Long && HiveConf.getBoolVar(
+              cacheConf, ConfVars.LLAP_IO_USE_FILEID_PATH)) {
+            filePath = HdfsUtils.getFileIdPath(filePath, (long)cacheKey);
+          }
+        }
+      }
+
+      setupMetadataAndParquetSplit(conf);
+
       colsToInclude = ColumnProjectionUtils.getReadColumnIDs(conf);
       //initialize the rowbatchContext
-      jobConf = conf;
       isReadCacheOnly = HiveConf.getBoolVar(jobConf, 
ConfVars.LLAP_IO_CACHE_ONLY);
       rbCtx = Utilities.getVectorizedRowBatchCtx(jobConf);
-      ParquetInputSplit inputSplit = getSplit(oldInputSplit, conf);
-      if (inputSplit != null) {
-        initialize(inputSplit, conf);
+
+      if (parquetInputSplit != null) {
+        initialize(parquetInputSplit, conf);
       }
-      FileSplit fileSplit = (FileSplit) oldInputSplit;
       initPartitionValues(fileSplit, conf);
-      bucketIdentifier = BucketIdentifier.from(conf, fileSplit.getPath());
+      bucketIdentifier = BucketIdentifier.from(conf, filePath);
     } catch (Throwable e) {
       LOG.error("Failed to create the vectorized reader due to exception " + 
e);
       throw new RuntimeException(e);
@@ -180,26 +193,20 @@ public class VectorizedParquetRecordReader extends 
ParquetRecordReaderBase
      }
   }
 
+  @Override
+  protected ParquetMetadata getParquetMetadata(Path path, JobConf conf) throws 
IOException {
+    return readSplitFooter(conf, filePath, cacheKey, NO_FILTER, cacheTag);
+  }
+
   @SuppressWarnings("deprecation")
   public void initialize(
-    InputSplit oldSplit,
+      ParquetInputSplit split,
     JobConf configuration) throws IOException, InterruptedException, 
HiveException {
-    // the oldSplit may be null during the split phase
-    if (oldSplit == null) {
-      return;
-    }
-    ParquetMetadata footer;
-    List<BlockMetaData> blocks;
 
-    MapWork mapWork = LlapHiveUtils.findMapWork(jobConf);
-    if (mapWork != null) {
-      parts = mapWork.getPathToPartitionInfo();
-    }
+    List<BlockMetaData> blocks;
 
-    ParquetInputSplit split = (ParquetInputSplit) oldSplit;
     boolean indexAccess =
       
configuration.getBoolean(DataWritableReadSupport.PARQUET_COLUMN_INDEX_ACCESS, 
false);
-    this.file = split.getPath();
     long[] rowGroupOffsets = split.getRowGroupOffsets();
 
     String columnNames = configuration.get(IOConstants.COLUMNS);
@@ -207,83 +214,46 @@ public class VectorizedParquetRecordReader extends 
ParquetRecordReaderBase
     String columnTypes = configuration.get(IOConstants.COLUMNS_TYPES);
     columnTypesList = DataWritableReadSupport.getColumnTypes(columnTypes);
 
-    // if task.side.metadata is set, rowGroupOffsets is null
-    Object cacheKey = null;
-    CacheTag cacheTag = null;
-    // TODO: also support fileKey in splits, like OrcSplit does
-    if (metadataCache != null) {
-      if (cacheKey == null) {
-        cacheKey = HdfsUtils.getFileId(file.getFileSystem(configuration), file,
-          HiveConf.getBoolVar(cacheConf, 
ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID),
-          HiveConf.getBoolVar(cacheConf, 
ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID),
-          !HiveConf.getBoolVar(cacheConf, ConfVars.LLAP_IO_USE_FILEID_PATH));
-      }
+    Set<Long> offsets = new HashSet<>();
+    for (long offset : rowGroupOffsets) {
+      offsets.add(offset);
     }
-    if (cacheKey != null) {
-      if (HiveConf.getBoolVar(cacheConf, ConfVars.LLAP_TRACK_CACHE_USAGE)) {
-        PartitionDesc partitionDesc = 
LlapHiveUtils.partitionDescForPath(split.getPath(), parts);
-        cacheTag = LlapHiveUtils.getDbAndTableNameForMetrics(file, true, 
partitionDesc);
-      }
-      // If we are going to use cache, change the path to depend on file ID 
for extra consistency.
-      FileSystem fs = file.getFileSystem(configuration);
-      if (cacheKey instanceof Long && HiveConf.getBoolVar(
-          cacheConf, ConfVars.LLAP_IO_USE_FILEID_PATH)) {
-        file = HdfsUtils.getFileIdPath(file, (long)cacheKey);
+    blocks = new ArrayList<>();
+    for (BlockMetaData block : parquetMetadata.getBlocks()) {
+      if (offsets.contains(block.getStartingPos())) {
+        blocks.add(block);
       }
     }
-
-    if (rowGroupOffsets == null) {
-      //TODO check whether rowGroupOffSets can be null
-      // then we need to apply the predicate push down filter
-      footer = readSplitFooter(
-          configuration, file, cacheKey, range(split.getStart(), 
split.getEnd()), cacheTag);
-      MessageType fileSchema = footer.getFileMetaData().getSchema();
-      FilterCompat.Filter filter = getFilter(configuration);
-      blocks = filterRowGroups(filter, footer.getBlocks(), fileSchema);
-    } else {
-      // otherwise we find the row groups that were selected on the client
-      footer = readSplitFooter(configuration, file, cacheKey, NO_FILTER, 
cacheTag);
-      Set<Long> offsets = new HashSet<>();
-      for (long offset : rowGroupOffsets) {
-        offsets.add(offset);
-      }
-      blocks = new ArrayList<>();
-      for (BlockMetaData block : footer.getBlocks()) {
-        if (offsets.contains(block.getStartingPos())) {
-          blocks.add(block);
-        }
-      }
-      // verify we found them all
-      if (blocks.size() != rowGroupOffsets.length) {
-        long[] foundRowGroupOffsets = new long[footer.getBlocks().size()];
-        for (int i = 0; i < foundRowGroupOffsets.length; i++) {
-          foundRowGroupOffsets[i] = footer.getBlocks().get(i).getStartingPos();
-        }
-        // this should never happen.
-        // provide a good error message in case there's a bug
-        throw new IllegalStateException(
-          "All the offsets listed in the split should be found in the file."
-            + " expected: " + Arrays.toString(rowGroupOffsets)
-            + " found: " + blocks
-            + " out of: " + Arrays.toString(foundRowGroupOffsets)
-            + " in range " + split.getStart() + ", " + split.getEnd());
+    // verify we found them all
+    if (blocks.size() != rowGroupOffsets.length) {
+      long[] foundRowGroupOffsets = new 
long[parquetMetadata.getBlocks().size()];
+      for (int i = 0; i < foundRowGroupOffsets.length; i++) {
+        foundRowGroupOffsets[i] = 
parquetMetadata.getBlocks().get(i).getStartingPos();
       }
+      // this should never happen.
+      // provide a good error message in case there's a bug
+      throw new IllegalStateException(
+        "All the offsets listed in the split should be found in the file."
+          + " expected: " + Arrays.toString(rowGroupOffsets)
+          + " found: " + blocks
+          + " out of: " + Arrays.toString(foundRowGroupOffsets)
+          + " in range " + split.getStart() + ", " + split.getEnd());
     }
 
     for (BlockMetaData block : blocks) {
       this.totalRowCount += block.getRowCount();
     }
-    this.fileSchema = footer.getFileMetaData().getSchema();
+    this.fileSchema = parquetMetadata.getFileMetaData().getSchema();
     this.writerTimezone = DataWritableReadSupport
-        .getWriterTimeZoneId(footer.getFileMetaData().getKeyValueMetaData());
+        
.getWriterTimeZoneId(parquetMetadata.getFileMetaData().getKeyValueMetaData());
 
     colsToInclude = ColumnProjectionUtils.getReadColumnIDs(configuration);
     requestedSchema = DataWritableReadSupport
       .getRequestedSchema(indexAccess, columnNamesList, columnTypesList, 
fileSchema, configuration);
 
-    Path path = wrapPathForCache(file, cacheKey, configuration, blocks, 
cacheTag);
+    Path path = wrapPathForCache(filePath, cacheKey, configuration, blocks, 
cacheTag);
     this.reader = new ParquetFileReader(
-      configuration, footer.getFileMetaData(), path, blocks, 
requestedSchema.getColumns());
+      configuration, parquetMetadata.getFileMetaData(), path, blocks, 
requestedSchema.getColumns());
   }
 
   private Path wrapPathForCache(Path path, Object fileKey, JobConf 
configuration,
@@ -360,13 +330,22 @@ public class VectorizedParquetRecordReader extends 
ParquetRecordReaderBase
         return HadoopStreams.wrap(fs.open(file));
       }
       @Override
-      public long getLength() throws IOException {
+      public long getLength() {
         return stat.getLen();
       }
     };
     return ParquetFileReader.readFooter(inputFile, filter);
   }
 
+  private static CacheTag cacheTagOfParquetFile(Path path, Configuration 
cacheConf, JobConf jobConf) {
+    MapWork mapWork = LlapHiveUtils.findMapWork(jobConf);
+    if (!HiveConf.getBoolVar(cacheConf, ConfVars.LLAP_TRACK_CACHE_USAGE) || 
mapWork == null) {
+      return null;
+    }
+    PartitionDesc partitionDesc = LlapHiveUtils.partitionDescForPath(path, 
mapWork.getPathToPartitionInfo());
+    return LlapHiveUtils.getDbAndTableNameForMetrics(path, true, 
partitionDesc);
+  }
+
 
   private FileMetadataCache metadataCache;
   private DataCache cache;
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java
 
b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java
index c33b701d94c..24697691bc9 100644
--- 
a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java
+++ 
b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java
@@ -108,7 +108,7 @@ public class TestParquetRowGroupFilter extends 
AbstractTestParquetDirect {
         new MapredParquetInputFormat().getRecordReader(
         new FileSplit(testPath, 0, fileLength(testPath), (String[]) null), 
conf, null);
 
-    Assert.assertEquals("row group is not filtered correctly", 1, 
recordReader.getFiltedBlocks().size());
+    Assert.assertEquals("row group is not filtered correctly", 1, 
recordReader.getFilteredBlocks().size());
 
     // > 100
     constantDesc = new ExprNodeConstantDesc(100);
@@ -121,7 +121,7 @@ public class TestParquetRowGroupFilter extends 
AbstractTestParquetDirect {
         new MapredParquetInputFormat().getRecordReader(
             new FileSplit(testPath, 0, fileLength(testPath), (String[]) null), 
conf, null);
 
-    Assert.assertEquals("row group is not filtered correctly", 0, 
recordReader.getFiltedBlocks().size());
+    Assert.assertEquals("row group is not filtered correctly", 0, 
recordReader.getFilteredBlocks().size());
   }
 
   private ArrayWritableObjectInspector getObjectInspector(final String 
columnNames, final String columnTypes) {
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java
 
b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java
index 52e6045b631..e290e332e7f 100644
--- 
a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java
+++ 
b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.parquet.hadoop.ParquetInputFormat;
 import org.apache.parquet.hadoop.ParquetInputSplit;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -119,12 +121,12 @@ public class TestVectorizedColumnReader extends 
VectorizedColumnReaderTestBase {
 
   private class TestVectorizedParquetRecordReader extends 
VectorizedParquetRecordReader {
     public TestVectorizedParquetRecordReader(
-        org.apache.hadoop.mapred.InputSplit oldInputSplit, JobConf conf) {
+        org.apache.hadoop.mapred.InputSplit oldInputSplit, JobConf conf) 
throws IOException {
       super(oldInputSplit, conf);
     }
+
     @Override
-    protected ParquetInputSplit getSplit(
-        org.apache.hadoop.mapred.InputSplit oldInputSplit, JobConf conf) {
+    protected ParquetInputSplit getSplit(JobConf conf) throws IOException {
       return null;
     }
   }
@@ -145,6 +147,6 @@ public class TestVectorizedColumnReader extends 
VectorizedColumnReaderTestBase {
     FileSplit fsplit = getFileSplit(vectorJob);
     JobConf jobConf = new JobConf(conf);
     TestVectorizedParquetRecordReader testReader = new 
TestVectorizedParquetRecordReader(fsplit, jobConf);
-    Assert.assertNull("Test should return null split from getSplit() method", 
testReader.getSplit(fsplit, jobConf));
+    Assert.assertNull("Test should return null split from getSplit() method", 
testReader.getSplit(null));
   }
 }

Reply via email to