This is an automated email from the ASF dual-hosted git repository. szita pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new dc6e13d746a HIVE-25827: Parquet file footer is read multiple times, when multiple splits are created in same file (#3368) (Adam Szita, reviewed by Peter Vary) dc6e13d746a is described below commit dc6e13d746a88456254378455f37af576ee4722b Author: Adam Szita <40628386+sz...@users.noreply.github.com> AuthorDate: Wed Jun 22 11:07:43 2022 +0200 HIVE-25827: Parquet file footer is read multiple times, when multiple splits are created in same file (#3368) (Adam Szita, reviewed by Peter Vary) --- .../ql/io/parquet/ParquetRecordReaderBase.java | 176 ++++++++++++--------- .../parquet/read/ParquetRecordReaderWrapper.java | 23 +-- .../vector/VectorizedParquetRecordReader.java | 171 +++++++++----------- .../ql/io/parquet/TestParquetRowGroupFilter.java | 4 +- .../ql/io/parquet/TestVectorizedColumnReader.java | 10 +- 5 files changed, 187 insertions(+), 197 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java index 4cc32ae4804..a665c2586a3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.filter2.compat.RowGroupFilter; @@ -48,115 +49,134 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -public class ParquetRecordReaderBase { +public abstract class ParquetRecordReaderBase { public static final Logger LOG = LoggerFactory.getLogger(ParquetRecordReaderBase.class); - protected Path file; + protected final FileSplit fileSplit; + protected Path filePath; + protected ParquetInputSplit parquetInputSplit; + protected ParquetMetadata parquetMetadata; protected ProjectionPusher projectionPusher; protected boolean skipTimestampConversion = false; protected Boolean skipProlepticConversion; protected Boolean legacyConversionEnabled; protected SerDeStats serDeStats; - protected JobConf jobConf; + protected final JobConf jobConf; protected int schemaSize; - protected List<BlockMetaData> filtedBlocks; + protected List<BlockMetaData> filteredBlocks; protected ParquetFileReader reader; + protected ParquetRecordReaderBase(JobConf conf, InputSplit oldSplit) throws IOException { + serDeStats = new SerDeStats(); + projectionPusher = new ProjectionPusher(); + + if (!(oldSplit instanceof FileSplit)) { + throw new IllegalArgumentException("Unknown split type: " + oldSplit); + } + this.fileSplit = (FileSplit) oldSplit; + this.jobConf = projectionPusher.pushProjectionsAndFilters(conf, fileSplit.getPath().getParent()); + this.filePath = fileSplit.getPath(); + } + + protected void setupMetadataAndParquetSplit(JobConf conf) throws IOException { + // In the case of stat tasks a dummy split is created with -1 length but real path... + if (fileSplit.getLength() != 0) { + parquetMetadata = getParquetMetadata(filePath, conf); + parquetInputSplit = getSplit(conf); + } + // having null as parquetInputSplit seems to be a valid case based on this file's history + } + /** * gets a ParquetInputSplit corresponding to a split given by Hive * - * @param oldSplit The split given by Hive * @param conf The JobConf of the Hive job * @return a ParquetInputSplit corresponding to the oldSplit * @throws IOException if the config cannot be enhanced or if the footer cannot be read from the file */ @SuppressWarnings("deprecation") protected ParquetInputSplit getSplit( - final org.apache.hadoop.mapred.InputSplit oldSplit, final JobConf conf ) throws IOException { - if (oldSplit.getLength() == 0) { - return null; - } + ParquetInputSplit split; - if (oldSplit instanceof FileSplit) { - final Path finalPath = ((FileSplit) oldSplit).getPath(); - jobConf = projectionPusher.pushProjectionsAndFilters(conf, finalPath.getParent()); - - // TODO enable MetadataFilter by using readFooter(Configuration configuration, Path file, - // MetadataFilter filter) API - final ParquetMetadata parquetMetadata = ParquetFileReader.readFooter(jobConf, finalPath); - final List<BlockMetaData> blocks = parquetMetadata.getBlocks(); - final FileMetaData fileMetaData = parquetMetadata.getFileMetaData(); - - final ReadSupport.ReadContext - readContext = new DataWritableReadSupport().init(new InitContext(jobConf, - null, fileMetaData.getSchema())); - - // Compute stats - for (BlockMetaData bmd : blocks) { - serDeStats.setRowCount(serDeStats.getRowCount() + bmd.getRowCount()); - serDeStats.setRawDataSize(serDeStats.getRawDataSize() + bmd.getTotalByteSize()); - } + final Path finalPath = fileSplit.getPath(); + + // TODO enable MetadataFilter by using readFooter(Configuration configuration, Path file, + // MetadataFilter filter) API + final List<BlockMetaData> blocks = parquetMetadata.getBlocks(); + final FileMetaData fileMetaData = parquetMetadata.getFileMetaData(); + + final ReadSupport.ReadContext + readContext = new DataWritableReadSupport().init(new InitContext(jobConf, + null, fileMetaData.getSchema())); + + // Compute stats + for (BlockMetaData bmd : blocks) { + serDeStats.setRowCount(serDeStats.getRowCount() + bmd.getRowCount()); + serDeStats.setRawDataSize(serDeStats.getRawDataSize() + bmd.getTotalByteSize()); + } - schemaSize = MessageTypeParser.parseMessageType(readContext.getReadSupportMetadata() - .get(DataWritableReadSupport.HIVE_TABLE_AS_PARQUET_SCHEMA)).getFieldCount(); - final List<BlockMetaData> splitGroup = new ArrayList<BlockMetaData>(); - final long splitStart = ((FileSplit) oldSplit).getStart(); - final long splitLength = ((FileSplit) oldSplit).getLength(); - for (final BlockMetaData block : blocks) { - final long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset(); - if (firstDataPage >= splitStart && firstDataPage < splitStart + splitLength) { - splitGroup.add(block); - } + schemaSize = MessageTypeParser.parseMessageType(readContext.getReadSupportMetadata() + .get(DataWritableReadSupport.HIVE_TABLE_AS_PARQUET_SCHEMA)).getFieldCount(); + final List<BlockMetaData> splitGroup = new ArrayList<BlockMetaData>(); + final long splitStart = fileSplit.getStart(); + final long splitLength = fileSplit.getLength(); + for (final BlockMetaData block : blocks) { + final long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset(); + if (firstDataPage >= splitStart && firstDataPage < splitStart + splitLength) { + splitGroup.add(block); } - if (splitGroup.isEmpty()) { - LOG.warn("Skipping split, could not find row group in: " + oldSplit); + } + if (splitGroup.isEmpty()) { + LOG.warn("Skipping split, could not find row group in: " + fileSplit); + return null; + } + + FilterCompat.Filter filter = setFilter(jobConf, fileMetaData.getSchema()); + if (filter != null) { + filteredBlocks = RowGroupFilter.filterRowGroups(filter, splitGroup, fileMetaData.getSchema()); + if (filteredBlocks.isEmpty()) { + LOG.debug("All row groups are dropped due to filter predicates"); return null; } - FilterCompat.Filter filter = setFilter(jobConf, fileMetaData.getSchema()); - if (filter != null) { - filtedBlocks = RowGroupFilter.filterRowGroups(filter, splitGroup, fileMetaData.getSchema()); - if (filtedBlocks.isEmpty()) { - LOG.debug("All row groups are dropped due to filter predicates"); - return null; - } - - long droppedBlocks = splitGroup.size() - filtedBlocks.size(); - if (droppedBlocks > 0) { - LOG.debug("Dropping " + droppedBlocks + " row groups that do not pass filter predicate"); - } - } else { - filtedBlocks = splitGroup; + long droppedBlocks = splitGroup.size() - filteredBlocks.size(); + if (droppedBlocks > 0) { + LOG.debug("Dropping " + droppedBlocks + " row groups that do not pass filter predicate"); } + } else { + filteredBlocks = splitGroup; + } - if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION)) { - skipTimestampConversion = !Strings.nullToEmpty(fileMetaData.getCreatedBy()).startsWith("parquet-mr"); - } - skipProlepticConversion = DataWritableReadSupport - .getWriterDateProleptic(fileMetaData.getKeyValueMetaData()); - if (skipProlepticConversion == null) { - skipProlepticConversion = HiveConf.getBoolVar( - conf, HiveConf.ConfVars.HIVE_PARQUET_DATE_PROLEPTIC_GREGORIAN_DEFAULT); - } + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION)) { + skipTimestampConversion = !Strings.nullToEmpty(fileMetaData.getCreatedBy()).startsWith("parquet-mr"); + } + skipProlepticConversion = DataWritableReadSupport + .getWriterDateProleptic(fileMetaData.getKeyValueMetaData()); + if (skipProlepticConversion == null) { + skipProlepticConversion = HiveConf.getBoolVar( + conf, HiveConf.ConfVars.HIVE_PARQUET_DATE_PROLEPTIC_GREGORIAN_DEFAULT); + } legacyConversionEnabled = DataWritableReadSupport.getZoneConversionLegacy(fileMetaData.getKeyValueMetaData(), conf); - split = new ParquetInputSplit(finalPath, - splitStart, - splitLength, - oldSplit.getLocations(), - filtedBlocks, - readContext.getRequestedSchema().toString(), - fileMetaData.getSchema().toString(), - fileMetaData.getKeyValueMetaData(), - readContext.getReadSupportMetadata()); - return split; - } else { - throw new IllegalArgumentException("Unknown split type: " + oldSplit); - } + split = new ParquetInputSplit(finalPath, + splitStart, + splitLength, + fileSplit.getLocations(), + filteredBlocks, + readContext.getRequestedSchema().toString(), + fileMetaData.getSchema().toString(), + fileMetaData.getKeyValueMetaData(), + readContext.getReadSupportMetadata()); + return split; + } + + @SuppressWarnings("deprecation") + protected ParquetMetadata getParquetMetadata(Path path, JobConf conf) throws IOException { + return ParquetFileReader.readFooter(jobConf, path); } public FilterCompat.Filter setFilter(final JobConf conf, MessageType schema) { @@ -189,8 +209,8 @@ public class ParquetRecordReaderBase { } } - public List<BlockMetaData> getFiltedBlocks() { - return filtedBlocks; + public List<BlockMetaData> getFilteredBlocks() { + return filteredBlocks; } public SerDeStats getStats() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java index 113d61f5f97..aebcd247354 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java @@ -57,22 +57,11 @@ public class ParquetRecordReaderWrapper extends ParquetRecordReaderBase final JobConf oldJobConf, final Reporter reporter) throws IOException, InterruptedException { - this(newInputFormat, oldSplit, oldJobConf, reporter, new ProjectionPusher()); - } + super(oldJobConf, oldSplit); - public ParquetRecordReaderWrapper( - final ParquetInputFormat<ArrayWritable> newInputFormat, - final InputSplit oldSplit, - final JobConf oldJobConf, - final Reporter reporter, - final ProjectionPusher pusher) - throws IOException, InterruptedException { - this.splitLen = oldSplit.getLength(); - this.projectionPusher = pusher; - this.serDeStats = new SerDeStats(); + setupMetadataAndParquetSplit(oldJobConf); - jobConf = oldJobConf; - final ParquetInputSplit split = getSplit(oldSplit, jobConf); + this.splitLen = oldSplit.getLength(); TaskAttemptID taskAttemptID = TaskAttemptID.forName(jobConf.get(IOConstants.MAPRED_TASK_ID)); if (taskAttemptID == null) { @@ -89,10 +78,10 @@ public class ParquetRecordReaderWrapper extends ParquetRecordReaderBase } final TaskAttemptContext taskContext = ContextUtil.newTaskAttemptContext(conf, taskAttemptID); - if (split != null) { + if (parquetInputSplit != null) { try { - realReader = newInputFormat.createRecordReader(split, taskContext); - realReader.initialize(split, taskContext); + realReader = newInputFormat.createRecordReader(parquetInputSplit, taskContext); + realReader.initialize(parquetInputSplit, taskContext); // read once to gain access to key and value objects if (realReader.nextKeyValue()) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java index d17ddd5ab11..e0e14863dfd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java @@ -37,27 +37,24 @@ import org.apache.hadoop.hive.ql.io.BucketIdentifier; import org.apache.hadoop.hive.ql.io.HdfsUtils; import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.parquet.ParquetRecordReaderBase; -import org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher; import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; -import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapreduce.InputSplit; import org.apache.parquet.ParquetRuntimeException; import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.PageReadStore; -import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.format.converter.ParquetMetadataConverter.MetadataFilter; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetFileWriter; @@ -83,15 +80,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.TreeMap; import static org.apache.hadoop.hive.llap.LlapHiveUtils.throwIfCacheOnlyRead; -import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups; import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; -import static org.apache.parquet.format.converter.ParquetMetadataConverter.range; -import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter; /** * This reader is used to read a batch of record from inputsplit, part of the code is referred @@ -111,7 +104,6 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase private Object[] partitionValues; private Path cacheFsPath; private static final int MAP_DEFINITION_LEVEL_MAX = 3; - private Map<Path, PartitionDesc> parts; private final boolean isReadCacheOnly; /** @@ -138,32 +130,53 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase private ZoneId writerTimezone; private final BucketIdentifier bucketIdentifier; - public VectorizedParquetRecordReader( - org.apache.hadoop.mapred.InputSplit oldInputSplit, JobConf conf) { + // LLAP cache integration + // TODO: also support fileKey in splits, like OrcSplit does + private Object cacheKey = null; + private CacheTag cacheTag = null; + + public VectorizedParquetRecordReader(InputSplit oldInputSplit, JobConf conf) throws IOException { this(oldInputSplit, conf, null, null, null); } public VectorizedParquetRecordReader( - org.apache.hadoop.mapred.InputSplit oldInputSplit, JobConf conf, - FileMetadataCache metadataCache, DataCache dataCache, Configuration cacheConf) { + InputSplit oldInputSplit, JobConf conf, + FileMetadataCache metadataCache, DataCache dataCache, Configuration cacheConf) + throws IOException { + super(conf, oldInputSplit); try { this.metadataCache = metadataCache; this.cache = dataCache; this.cacheConf = cacheConf; - serDeStats = new SerDeStats(); - projectionPusher = new ProjectionPusher(); + + if (metadataCache != null) { + cacheKey = HdfsUtils.getFileId(filePath.getFileSystem(conf), filePath, + HiveConf.getBoolVar(cacheConf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID), + HiveConf.getBoolVar(cacheConf, ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID), + !HiveConf.getBoolVar(cacheConf, ConfVars.LLAP_IO_USE_FILEID_PATH)); + // HdfsUtils.getFileId might yield to null in certain configurations + if (cacheKey != null) { + cacheTag = cacheTagOfParquetFile(filePath, cacheConf, conf); + // If we are going to use cache, change the path to depend on file ID for extra consistency. + if (cacheKey instanceof Long && HiveConf.getBoolVar( + cacheConf, ConfVars.LLAP_IO_USE_FILEID_PATH)) { + filePath = HdfsUtils.getFileIdPath(filePath, (long)cacheKey); + } + } + } + + setupMetadataAndParquetSplit(conf); + colsToInclude = ColumnProjectionUtils.getReadColumnIDs(conf); //initialize the rowbatchContext - jobConf = conf; isReadCacheOnly = HiveConf.getBoolVar(jobConf, ConfVars.LLAP_IO_CACHE_ONLY); rbCtx = Utilities.getVectorizedRowBatchCtx(jobConf); - ParquetInputSplit inputSplit = getSplit(oldInputSplit, conf); - if (inputSplit != null) { - initialize(inputSplit, conf); + + if (parquetInputSplit != null) { + initialize(parquetInputSplit, conf); } - FileSplit fileSplit = (FileSplit) oldInputSplit; initPartitionValues(fileSplit, conf); - bucketIdentifier = BucketIdentifier.from(conf, fileSplit.getPath()); + bucketIdentifier = BucketIdentifier.from(conf, filePath); } catch (Throwable e) { LOG.error("Failed to create the vectorized reader due to exception " + e); throw new RuntimeException(e); @@ -180,26 +193,20 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase } } + @Override + protected ParquetMetadata getParquetMetadata(Path path, JobConf conf) throws IOException { + return readSplitFooter(conf, filePath, cacheKey, NO_FILTER, cacheTag); + } + @SuppressWarnings("deprecation") public void initialize( - InputSplit oldSplit, + ParquetInputSplit split, JobConf configuration) throws IOException, InterruptedException, HiveException { - // the oldSplit may be null during the split phase - if (oldSplit == null) { - return; - } - ParquetMetadata footer; - List<BlockMetaData> blocks; - MapWork mapWork = LlapHiveUtils.findMapWork(jobConf); - if (mapWork != null) { - parts = mapWork.getPathToPartitionInfo(); - } + List<BlockMetaData> blocks; - ParquetInputSplit split = (ParquetInputSplit) oldSplit; boolean indexAccess = configuration.getBoolean(DataWritableReadSupport.PARQUET_COLUMN_INDEX_ACCESS, false); - this.file = split.getPath(); long[] rowGroupOffsets = split.getRowGroupOffsets(); String columnNames = configuration.get(IOConstants.COLUMNS); @@ -207,83 +214,46 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase String columnTypes = configuration.get(IOConstants.COLUMNS_TYPES); columnTypesList = DataWritableReadSupport.getColumnTypes(columnTypes); - // if task.side.metadata is set, rowGroupOffsets is null - Object cacheKey = null; - CacheTag cacheTag = null; - // TODO: also support fileKey in splits, like OrcSplit does - if (metadataCache != null) { - if (cacheKey == null) { - cacheKey = HdfsUtils.getFileId(file.getFileSystem(configuration), file, - HiveConf.getBoolVar(cacheConf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID), - HiveConf.getBoolVar(cacheConf, ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID), - !HiveConf.getBoolVar(cacheConf, ConfVars.LLAP_IO_USE_FILEID_PATH)); - } + Set<Long> offsets = new HashSet<>(); + for (long offset : rowGroupOffsets) { + offsets.add(offset); } - if (cacheKey != null) { - if (HiveConf.getBoolVar(cacheConf, ConfVars.LLAP_TRACK_CACHE_USAGE)) { - PartitionDesc partitionDesc = LlapHiveUtils.partitionDescForPath(split.getPath(), parts); - cacheTag = LlapHiveUtils.getDbAndTableNameForMetrics(file, true, partitionDesc); - } - // If we are going to use cache, change the path to depend on file ID for extra consistency. - FileSystem fs = file.getFileSystem(configuration); - if (cacheKey instanceof Long && HiveConf.getBoolVar( - cacheConf, ConfVars.LLAP_IO_USE_FILEID_PATH)) { - file = HdfsUtils.getFileIdPath(file, (long)cacheKey); + blocks = new ArrayList<>(); + for (BlockMetaData block : parquetMetadata.getBlocks()) { + if (offsets.contains(block.getStartingPos())) { + blocks.add(block); } } - - if (rowGroupOffsets == null) { - //TODO check whether rowGroupOffSets can be null - // then we need to apply the predicate push down filter - footer = readSplitFooter( - configuration, file, cacheKey, range(split.getStart(), split.getEnd()), cacheTag); - MessageType fileSchema = footer.getFileMetaData().getSchema(); - FilterCompat.Filter filter = getFilter(configuration); - blocks = filterRowGroups(filter, footer.getBlocks(), fileSchema); - } else { - // otherwise we find the row groups that were selected on the client - footer = readSplitFooter(configuration, file, cacheKey, NO_FILTER, cacheTag); - Set<Long> offsets = new HashSet<>(); - for (long offset : rowGroupOffsets) { - offsets.add(offset); - } - blocks = new ArrayList<>(); - for (BlockMetaData block : footer.getBlocks()) { - if (offsets.contains(block.getStartingPos())) { - blocks.add(block); - } - } - // verify we found them all - if (blocks.size() != rowGroupOffsets.length) { - long[] foundRowGroupOffsets = new long[footer.getBlocks().size()]; - for (int i = 0; i < foundRowGroupOffsets.length; i++) { - foundRowGroupOffsets[i] = footer.getBlocks().get(i).getStartingPos(); - } - // this should never happen. - // provide a good error message in case there's a bug - throw new IllegalStateException( - "All the offsets listed in the split should be found in the file." - + " expected: " + Arrays.toString(rowGroupOffsets) - + " found: " + blocks - + " out of: " + Arrays.toString(foundRowGroupOffsets) - + " in range " + split.getStart() + ", " + split.getEnd()); + // verify we found them all + if (blocks.size() != rowGroupOffsets.length) { + long[] foundRowGroupOffsets = new long[parquetMetadata.getBlocks().size()]; + for (int i = 0; i < foundRowGroupOffsets.length; i++) { + foundRowGroupOffsets[i] = parquetMetadata.getBlocks().get(i).getStartingPos(); } + // this should never happen. + // provide a good error message in case there's a bug + throw new IllegalStateException( + "All the offsets listed in the split should be found in the file." + + " expected: " + Arrays.toString(rowGroupOffsets) + + " found: " + blocks + + " out of: " + Arrays.toString(foundRowGroupOffsets) + + " in range " + split.getStart() + ", " + split.getEnd()); } for (BlockMetaData block : blocks) { this.totalRowCount += block.getRowCount(); } - this.fileSchema = footer.getFileMetaData().getSchema(); + this.fileSchema = parquetMetadata.getFileMetaData().getSchema(); this.writerTimezone = DataWritableReadSupport - .getWriterTimeZoneId(footer.getFileMetaData().getKeyValueMetaData()); + .getWriterTimeZoneId(parquetMetadata.getFileMetaData().getKeyValueMetaData()); colsToInclude = ColumnProjectionUtils.getReadColumnIDs(configuration); requestedSchema = DataWritableReadSupport .getRequestedSchema(indexAccess, columnNamesList, columnTypesList, fileSchema, configuration); - Path path = wrapPathForCache(file, cacheKey, configuration, blocks, cacheTag); + Path path = wrapPathForCache(filePath, cacheKey, configuration, blocks, cacheTag); this.reader = new ParquetFileReader( - configuration, footer.getFileMetaData(), path, blocks, requestedSchema.getColumns()); + configuration, parquetMetadata.getFileMetaData(), path, blocks, requestedSchema.getColumns()); } private Path wrapPathForCache(Path path, Object fileKey, JobConf configuration, @@ -360,13 +330,22 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase return HadoopStreams.wrap(fs.open(file)); } @Override - public long getLength() throws IOException { + public long getLength() { return stat.getLen(); } }; return ParquetFileReader.readFooter(inputFile, filter); } + private static CacheTag cacheTagOfParquetFile(Path path, Configuration cacheConf, JobConf jobConf) { + MapWork mapWork = LlapHiveUtils.findMapWork(jobConf); + if (!HiveConf.getBoolVar(cacheConf, ConfVars.LLAP_TRACK_CACHE_USAGE) || mapWork == null) { + return null; + } + PartitionDesc partitionDesc = LlapHiveUtils.partitionDescForPath(path, mapWork.getPathToPartitionInfo()); + return LlapHiveUtils.getDbAndTableNameForMetrics(path, true, partitionDesc); + } + private FileMetadataCache metadataCache; private DataCache cache; diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java index c33b701d94c..24697691bc9 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java @@ -108,7 +108,7 @@ public class TestParquetRowGroupFilter extends AbstractTestParquetDirect { new MapredParquetInputFormat().getRecordReader( new FileSplit(testPath, 0, fileLength(testPath), (String[]) null), conf, null); - Assert.assertEquals("row group is not filtered correctly", 1, recordReader.getFiltedBlocks().size()); + Assert.assertEquals("row group is not filtered correctly", 1, recordReader.getFilteredBlocks().size()); // > 100 constantDesc = new ExprNodeConstantDesc(100); @@ -121,7 +121,7 @@ public class TestParquetRowGroupFilter extends AbstractTestParquetDirect { new MapredParquetInputFormat().getRecordReader( new FileSplit(testPath, 0, fileLength(testPath), (String[]) null), conf, null); - Assert.assertEquals("row group is not filtered correctly", 0, recordReader.getFiltedBlocks().size()); + Assert.assertEquals("row group is not filtered correctly", 0, recordReader.getFilteredBlocks().size()); } private ArrayWritableObjectInspector getObjectInspector(final String columnNames, final String columnTypes) { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java index 52e6045b631..e290e332e7f 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java @@ -28,6 +28,8 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.parquet.hadoop.ParquetInputFormat; import org.apache.parquet.hadoop.ParquetInputSplit; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; + import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -119,12 +121,12 @@ public class TestVectorizedColumnReader extends VectorizedColumnReaderTestBase { private class TestVectorizedParquetRecordReader extends VectorizedParquetRecordReader { public TestVectorizedParquetRecordReader( - org.apache.hadoop.mapred.InputSplit oldInputSplit, JobConf conf) { + org.apache.hadoop.mapred.InputSplit oldInputSplit, JobConf conf) throws IOException { super(oldInputSplit, conf); } + @Override - protected ParquetInputSplit getSplit( - org.apache.hadoop.mapred.InputSplit oldInputSplit, JobConf conf) { + protected ParquetInputSplit getSplit(JobConf conf) throws IOException { return null; } } @@ -145,6 +147,6 @@ public class TestVectorizedColumnReader extends VectorizedColumnReaderTestBase { FileSplit fsplit = getFileSplit(vectorJob); JobConf jobConf = new JobConf(conf); TestVectorizedParquetRecordReader testReader = new TestVectorizedParquetRecordReader(fsplit, jobConf); - Assert.assertNull("Test should return null split from getSplit() method", testReader.getSplit(fsplit, jobConf)); + Assert.assertNull("Test should return null split from getSplit() method", testReader.getSplit(null)); } }