Repository: hive Updated Branches: refs/heads/master 97bf32a12 -> 22fa9216d
HIVE-11595 : refactor ORC footer reading to make it usable from outside (Sergey Shelukhin, reviewed by Prasanth Jayachandran) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/22fa9216 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/22fa9216 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/22fa9216 Branch: refs/heads/master Commit: 22fa9216d4e32d7681d3c1be8cbedc8c7999e56d Parents: 97bf32a Author: Sergey Shelukhin <[email protected]> Authored: Fri Aug 28 18:23:05 2015 -0700 Committer: Sergey Shelukhin <[email protected]> Committed: Fri Aug 28 18:23:05 2015 -0700 ---------------------------------------------------------------------- .../apache/hadoop/hive/ql/io/orc/Reader.java | 6 + .../hadoop/hive/ql/io/orc/ReaderImpl.java | 281 +++++++++++++------ 2 files changed, 204 insertions(+), 83 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/22fa9216/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java index 7bddefc..187924d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; +import org.apache.hadoop.hive.ql.io.orc.OrcProto.Footer; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -358,4 +359,9 @@ public interface Reader { String[] neededColumns) throws IOException; MetadataReader metadata() throws IOException; + + /** Gets serialized file metadata read from disk for the purposes of caching, etc. */ + ByteBuffer getSerializedFileFooter(); + + Footer getFooter(); } http://git-wip-us.apache.org/repos/asf/hive/blob/22fa9216/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java index c990d85..ab539c4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java @@ -35,6 +35,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.DiskRange; import org.apache.hadoop.hive.ql.io.FileFormatException; +import org.apache.hadoop.hive.ql.io.orc.OrcFile.WriterVersion; +import org.apache.hadoop.hive.ql.io.orc.OrcProto.Footer; import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type; import org.apache.hadoop.hive.ql.io.orc.OrcProto.UserMetadataItem; import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.BufferChunk; @@ -74,6 +76,9 @@ public class ReaderImpl implements Reader { // will help avoid cpu cycles spend in deserializing at cost of increased // memory footprint. private final ByteBuffer footerByteBuffer; + // Same for metastore cache - maintains the same background buffer, but includes postscript. + // This will only be set if the file footer/metadata was read from disk. + private final ByteBuffer footerMetaAndPsBuffer; static class StripeInformationImpl implements StripeInformation { @@ -166,11 +171,7 @@ public class ReaderImpl implements Reader { @Override public List<StripeInformation> getStripes() { - List<StripeInformation> result = new ArrayList<StripeInformation>(); - for(OrcProto.StripeInformation info: footer.getStripesList()) { - result.add(new StripeInformationImpl(info)); - } - return result; + return convertProtoStripesToStripes(footer.getStripesList()); } @Override @@ -274,7 +275,7 @@ public class ReaderImpl implements Reader { * Check to see if this ORC file is from a future version and if so, * warn the user that we may not be able to read all of the column encodings. * @param log the logger to write any error message to - * @param path the filename for error messages + * @param path the data source path for error messages * @param version the version of hive that wrote the file. */ static void checkOrcVersion(Log log, Path path, List<Integer> version) { @@ -287,8 +288,7 @@ public class ReaderImpl implements Reader { if (major > OrcFile.Version.CURRENT.getMajor() || (major == OrcFile.Version.CURRENT.getMajor() && minor > OrcFile.Version.CURRENT.getMinor())) { - log.warn("ORC file " + path + - " was written by a future Hive version " + + log.warn(path + " was written by a future Hive version " + versionString(version) + ". This file may not be readable by this version of Hive."); } @@ -313,9 +313,11 @@ public class ReaderImpl implements Reader { FileMetaInfo footerMetaData; if (options.getFileMetaInfo() != null) { footerMetaData = options.getFileMetaInfo(); + this.footerMetaAndPsBuffer = null; } else { footerMetaData = extractMetaInfoFromFooter(fs, path, options.getMaxLength()); + this.footerMetaAndPsBuffer = footerMetaData.footerMetaAndPsBuffer; } MetaInfoObjExtractor rInfo = new MetaInfoObjExtractor(footerMetaData.compressionType, @@ -349,6 +351,111 @@ public class ReaderImpl implements Reader { return OrcFile.WriterVersion.ORIGINAL; } + /** Extracts the necessary metadata from an externally store buffer (fullFooterBuffer). */ + public static FooterInfo extractMetaInfoFromFooter( + ByteBuffer bb, Path srcPath) throws IOException { + // Read the PostScript. Be very careful as some parts of this historically use bb position + // and some use absolute offsets that have to take position into account. + int baseOffset = bb.position(); + int lastByteAbsPos = baseOffset + bb.remaining() - 1; + int psLen = bb.get(lastByteAbsPos) & 0xff; + int psAbsPos = lastByteAbsPos - psLen; + OrcProto.PostScript ps = extractPostScript(bb, srcPath, psLen, psAbsPos); + assert baseOffset == bb.position(); + + // Extract PS information. + int footerSize = (int)ps.getFooterLength(), metadataSize = (int)ps.getMetadataLength(), + footerAbsPos = psAbsPos - footerSize, metadataAbsPos = footerAbsPos - metadataSize; + String compressionType = ps.getCompression().toString(); + CompressionCodec codec = WriterImpl.createCodec(CompressionKind.valueOf(compressionType)); + int bufferSize = (int)ps.getCompressionBlockSize(); + bb.position(metadataAbsPos); + bb.mark(); + + // Extract metadata and footer. + Metadata metadata = new Metadata(extractMetadata( + bb, metadataAbsPos, metadataSize, codec, bufferSize)); + OrcProto.Footer footer = extractFooter(bb, footerAbsPos, footerSize, codec, bufferSize); + bb.position(metadataAbsPos); + bb.limit(psAbsPos); + // TODO: do we need footer buffer here? FileInfo/FileMetaInfo is a mess... + FileMetaInfo fmi = new FileMetaInfo( + compressionType, bufferSize, metadataSize, bb, extractWriterVersion(ps)); + return new FooterInfo(metadata, footer, fmi); + } + + private static OrcProto.Footer extractFooter(ByteBuffer bb, int footerAbsPos, + int footerSize, CompressionCodec codec, int bufferSize) throws IOException { + bb.position(footerAbsPos); + bb.limit(footerAbsPos + footerSize); + InputStream instream = InStream.create("footer", Lists.<DiskRange>newArrayList( + new BufferChunk(bb, 0)), footerSize, codec, bufferSize); + return OrcProto.Footer.parseFrom(instream); + } + + private static OrcProto.Metadata extractMetadata(ByteBuffer bb, int metadataAbsPos, + int metadataSize, CompressionCodec codec, int bufferSize) throws IOException { + bb.position(metadataAbsPos); + bb.limit(metadataAbsPos + metadataSize); + InputStream instream = InStream.create("metadata", Lists.<DiskRange>newArrayList( + new BufferChunk(bb, 0)), metadataSize, codec, bufferSize); + CodedInputStream in = CodedInputStream.newInstance(instream); + int msgLimit = DEFAULT_PROTOBUF_MESSAGE_LIMIT; + OrcProto.Metadata meta = null; + do { + try { + in.setSizeLimit(msgLimit); + meta = OrcProto.Metadata.parseFrom(in); + } catch (InvalidProtocolBufferException e) { + if (e.getMessage().contains("Protocol message was too large")) { + LOG.warn("Metadata section is larger than " + msgLimit + " bytes. Increasing the max" + + " size of the coded input stream." ); + + msgLimit = msgLimit << 1; + if (msgLimit > PROTOBUF_MESSAGE_MAX_LIMIT) { + LOG.error("Metadata section exceeds max protobuf message size of " + + PROTOBUF_MESSAGE_MAX_LIMIT + " bytes."); + throw e; + } + + // we must have failed in the middle of reading instream and instream doesn't support + // resetting the stream + instream = InStream.create("metadata", Lists.<DiskRange>newArrayList( + new BufferChunk(bb, 0)), metadataSize, codec, bufferSize); + in = CodedInputStream.newInstance(instream); + } else { + throw e; + } + } + } while (meta == null); + return meta; + } + + private static OrcProto.PostScript extractPostScript(ByteBuffer bb, Path path, + int psLen, int psAbsOffset) throws IOException { + // TODO: when PB is upgraded to 2.6, newInstance(ByteBuffer) method should be used here. + assert bb.hasArray(); + CodedInputStream in = CodedInputStream.newInstance( + bb.array(), bb.arrayOffset() + psAbsOffset, psLen); + OrcProto.PostScript ps = OrcProto.PostScript.parseFrom(in); + checkOrcVersion(LOG, path, ps.getVersionList()); + + // Check compression codec. + switch (ps.getCompression()) { + case NONE: + break; + case ZLIB: + break; + case SNAPPY: + break; + case LZO: + break; + default: + throw new IllegalArgumentException("Unknown compression"); + } + return ps; + } + private static FileMetaInfo extractMetaInfoFromFooter(FileSystem fs, Path path, long maxFileLength @@ -367,44 +474,24 @@ public class ReaderImpl implements Reader { int readSize = (int) Math.min(size, DIRECTORY_SIZE_GUESS); file.seek(size - readSize); ByteBuffer buffer = ByteBuffer.allocate(readSize); - file.readFully(buffer.array(), buffer.arrayOffset() + buffer.position(), - buffer.remaining()); + assert buffer.position() == 0; + file.readFully(buffer.array(), buffer.arrayOffset(), readSize); + buffer.position(0); //read the PostScript //get length of PostScript int psLen = buffer.get(readSize - 1) & 0xff; ensureOrcFooter(file, path, psLen, buffer); int psOffset = readSize - 1 - psLen; - CodedInputStream in = CodedInputStream.newInstance(buffer.array(), - buffer.arrayOffset() + psOffset, psLen); - OrcProto.PostScript ps = OrcProto.PostScript.parseFrom(in); - - checkOrcVersion(LOG, path, ps.getVersionList()); + OrcProto.PostScript ps = extractPostScript(buffer, path, psLen, psOffset); int footerSize = (int) ps.getFooterLength(); int metadataSize = (int) ps.getMetadataLength(); - OrcFile.WriterVersion writerVersion; - if (ps.hasWriterVersion()) { - writerVersion = getWriterVersion(ps.getWriterVersion()); - } else { - writerVersion = OrcFile.WriterVersion.ORIGINAL; - } + OrcFile.WriterVersion writerVersion = extractWriterVersion(ps); - //check compression codec - switch (ps.getCompression()) { - case NONE: - break; - case ZLIB: - break; - case SNAPPY: - break; - case LZO: - break; - default: - throw new IllegalArgumentException("Unknown compression"); - } //check if extra bytes need to be read + ByteBuffer fullFooterBuffer = null; int extra = Math.max(0, psLen + 1 + footerSize + metadataSize - readSize); if (extra > 0) { //more bytes need to be read, seek back to the right place and read extra bytes @@ -417,10 +504,12 @@ public class ReaderImpl implements Reader { extraBuf.put(buffer); buffer = extraBuf; buffer.position(0); + fullFooterBuffer = buffer.slice(); buffer.limit(footerSize + metadataSize); } else { //footer is already in the bytes in buffer, just adjust position, length buffer.position(psOffset - footerSize - metadataSize); + fullFooterBuffer = buffer.slice(); buffer.limit(psOffset); } @@ -435,11 +524,24 @@ public class ReaderImpl implements Reader { (int) ps.getMetadataLength(), buffer, ps.getVersionList(), - writerVersion + writerVersion, + fullFooterBuffer ); } + private static OrcFile.WriterVersion extractWriterVersion(OrcProto.PostScript ps) { + return (ps.hasWriterVersion() + ? getWriterVersion(ps.getWriterVersion()) : OrcFile.WriterVersion.ORIGINAL); + } + private static List<StripeInformation> convertProtoStripesToStripes( + List<OrcProto.StripeInformation> stripes) { + List<StripeInformation> result = new ArrayList<StripeInformation>(stripes.size()); + for (OrcProto.StripeInformation info : stripes) { + result.add(new StripeInformationImpl(info)); + } + return result; + } /** * MetaInfoObjExtractor - has logic to create the values for the fields in ReaderImpl @@ -467,46 +569,10 @@ public class ReaderImpl implements Reader { int position = footerBuffer.position(); int footerBufferSize = footerBuffer.limit() - footerBuffer.position() - metadataSize; - footerBuffer.limit(position + metadataSize); - - InputStream instream = InStream.create("metadata", Lists.<DiskRange>newArrayList( - new BufferChunk(footerBuffer, 0)), metadataSize, codec, bufferSize); - CodedInputStream in = CodedInputStream.newInstance(instream); - int msgLimit = DEFAULT_PROTOBUF_MESSAGE_LIMIT; - OrcProto.Metadata meta = null; - do { - try { - in.setSizeLimit(msgLimit); - meta = OrcProto.Metadata.parseFrom(in); - } catch (InvalidProtocolBufferException e) { - if (e.getMessage().contains("Protocol message was too large")) { - LOG.warn("Metadata section is larger than " + msgLimit + " bytes. Increasing the max" + - " size of the coded input stream." ); - - msgLimit = msgLimit << 1; - if (msgLimit > PROTOBUF_MESSAGE_MAX_LIMIT) { - LOG.error("Metadata section exceeds max protobuf message size of " + - PROTOBUF_MESSAGE_MAX_LIMIT + " bytes."); - throw e; - } - - // we must have failed in the middle of reading instream and instream doesn't support - // resetting the stream - instream = InStream.create("metadata", Lists.<DiskRange>newArrayList( - new BufferChunk(footerBuffer, 0)), metadataSize, codec, bufferSize); - in = CodedInputStream.newInstance(instream); - } else { - throw e; - } - } - } while (meta == null); - this.metadata = meta; - footerBuffer.position(position + metadataSize); - footerBuffer.limit(position + metadataSize + footerBufferSize); - instream = InStream.create("footer", Lists.<DiskRange>newArrayList( - new BufferChunk(footerBuffer, 0)), footerBufferSize, codec, bufferSize); - this.footer = OrcProto.Footer.parseFrom(instream); + this.metadata = extractMetadata(footerBuffer, position, metadataSize, codec, bufferSize); + this.footer = extractFooter( + footerBuffer, position + metadataSize, footerBufferSize, codec, bufferSize); footerBuffer.position(position); this.inspector = OrcStruct.createObjectInspector(0, footer.getTypesList()); @@ -518,7 +584,8 @@ public class ReaderImpl implements Reader { * that is useful for Reader implementation * */ - static class FileMetaInfo{ + static class FileMetaInfo { + private ByteBuffer footerMetaAndPsBuffer; final String compressionType; final int bufferSize; final int metadataSize; @@ -526,30 +593,68 @@ public class ReaderImpl implements Reader { final List<Integer> versionList; final OrcFile.WriterVersion writerVersion; + /** Ctor used when reading splits - no version list or full footer buffer. */ FileMetaInfo(String compressionType, int bufferSize, int metadataSize, ByteBuffer footerBuffer, OrcFile.WriterVersion writerVersion) { this(compressionType, bufferSize, metadataSize, footerBuffer, null, - writerVersion); + writerVersion, null); } - FileMetaInfo(String compressionType, int bufferSize, int metadataSize, - ByteBuffer footerBuffer, List<Integer> versionList, - OrcFile.WriterVersion writerVersion){ + /** Ctor used when creating file info during init and when getting a new one. */ + public FileMetaInfo(String compressionType, int bufferSize, int metadataSize, + ByteBuffer footerBuffer, List<Integer> versionList, WriterVersion writerVersion, + ByteBuffer fullFooterBuffer) { this.compressionType = compressionType; this.bufferSize = bufferSize; this.metadataSize = metadataSize; this.footerBuffer = footerBuffer; this.versionList = versionList; this.writerVersion = writerVersion; + this.footerMetaAndPsBuffer = fullFooterBuffer; } } - public FileMetaInfo getFileMetaInfo(){ + public FileMetaInfo getFileMetaInfo() { return new FileMetaInfo(compressionKind.toString(), bufferSize, - metadataSize, footerByteBuffer, versionList, writerVersion); + metadataSize, footerByteBuffer, versionList, writerVersion, footerMetaAndPsBuffer); } + /** Same as FileMetaInfo, but with extra fields. FileMetaInfo is serialized for splits + * and so we don't just add fields to it, it's already messy and confusing. */ + public static final class FooterInfo { + private final OrcProto.Footer footer; + private final Metadata metadata; + private final List<StripeInformation> stripes; + private final FileMetaInfo fileMetaInfo; + private FooterInfo(Metadata metadata, OrcProto.Footer footer, FileMetaInfo fileMetaInfo) { + this.metadata = metadata; + this.footer = footer; + this.fileMetaInfo = fileMetaInfo; + this.stripes = convertProtoStripesToStripes(footer.getStripesList()); + } + + public OrcProto.Footer getFooter() { + return footer; + } + + public Metadata getMetadata() { + return metadata; + } + + public FileMetaInfo getFileMetaInfo() { + return fileMetaInfo; + } + + public List<StripeInformation> getStripes() { + return stripes; + } + } + + @Override + public ByteBuffer getSerializedFileFooter() { + return footerMetaAndPsBuffer; + } @Override public RecordReader rows() throws IOException { @@ -609,14 +714,19 @@ public class ReaderImpl implements Reader { @Override public long getRawDataSizeFromColIndices(List<Integer> colIndices) { + return getRawDataSizeFromColIndices(colIndices, footer); + } + + public static long getRawDataSizeFromColIndices( + List<Integer> colIndices, OrcProto.Footer footer) { long result = 0; for (int colIdx : colIndices) { - result += getRawDataSizeOfColumn(colIdx); + result += getRawDataSizeOfColumn(colIdx, footer); } return result; } - private long getRawDataSizeOfColumn(int colIdx) { + private static long getRawDataSizeOfColumn(int colIdx, OrcProto.Footer footer) { OrcProto.ColumnStatistics colStat = footer.getStatistics(colIdx); long numVals = colStat.getNumberOfValues(); Type type = footer.getTypes(colIdx); @@ -738,4 +848,9 @@ public class ReaderImpl implements Reader { public MetadataReader metadata() throws IOException { return new MetadataReader(fileSystem, path, codec, bufferSize, footer.getTypesCount()); } + + @Override + public Footer getFooter() { + return footer; + } }
