HIVE-11928: ORC footer section can also exceed protobuf message limit (Prasanth Jayachandran reviewed by Sergey Shelukhin and Owen O'Malley)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/467a117e Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/467a117e Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/467a117e Branch: refs/heads/llap Commit: 467a117edeb40074957d222386e1800194322a29 Parents: 947871a Author: Prasanth Jayachandran <j.prasant...@gmail.com> Authored: Thu Oct 1 17:04:00 2015 -0500 Committer: Prasanth Jayachandran <j.prasant...@gmail.com> Committed: Thu Oct 1 17:04:00 2015 -0500 ---------------------------------------------------------------------- .../apache/hadoop/hive/ql/io/orc/InStream.java | 24 +++++++++++ .../hadoop/hive/ql/io/orc/MetadataReader.java | 2 +- .../hadoop/hive/ql/io/orc/ReaderImpl.java | 43 ++------------------ 3 files changed, 29 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/467a117e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java index 381d97d..6fec8b7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java @@ -30,10 +30,12 @@ import org.apache.hadoop.hive.common.DiskRange; import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.BufferChunk; import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.CodedInputStream; public abstract class InStream extends InputStream { private static final Log LOG = LogFactory.getLog(InStream.class); + private static final int PROTOBUF_MESSAGE_MAX_LIMIT = 1024 << 20; // 1GB protected final String name; protected final long length; @@ -447,4 +449,26 @@ public abstract class InStream extends InputStream { return new CompressedStream(name, input, length, codec, bufferSize); } } + + /** + * Creates coded input stream (used for protobuf message parsing) with higher message size limit. + * + * @param name the name of the stream + * @param input the list of ranges of bytes for the stream; from disk or cache + * @param length the length in bytes of the stream + * @param codec the compression codec + * @param bufferSize the compression buffer size + * @return coded input stream + * @throws IOException + */ + public static CodedInputStream createCodedInputStream(String name, + List<DiskRange> input, + long length, + CompressionCodec codec, + int bufferSize) throws IOException { + InStream inStream = create(name, input, length, codec, bufferSize); + CodedInputStream codedInputStream = CodedInputStream.newInstance(inStream); + codedInputStream.setSizeLimit(PROTOBUF_MESSAGE_MAX_LIMIT); + return codedInputStream; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/467a117e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReader.java index 43d2933..1910214 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReader.java @@ -108,7 +108,7 @@ public class MetadataReader { // read the footer ByteBuffer tailBuf = ByteBuffer.allocate(tailLength); file.readFully(offset, tailBuf.array(), tailBuf.arrayOffset(), tailLength); - return OrcProto.StripeFooter.parseFrom(InStream.create("footer", + return OrcProto.StripeFooter.parseFrom(InStream.createCodedInputStream("footer", Lists.<DiskRange>newArrayList(new BufferChunk(tailBuf, 0)), tailLength, codec, bufferSize)); } http://git-wip-us.apache.org/repos/asf/hive/blob/467a117e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java index 36fb858..3bac48a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hive.ql.io.orc; import java.io.IOException; -import java.io.InputStream; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -48,15 +47,12 @@ import org.apache.hadoop.io.Text; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.protobuf.CodedInputStream; -import com.google.protobuf.InvalidProtocolBufferException; public class ReaderImpl implements Reader { private static final Log LOG = LogFactory.getLog(ReaderImpl.class); private static final int DIRECTORY_SIZE_GUESS = 16 * 1024; - private static final int DEFAULT_PROTOBUF_MESSAGE_LIMIT = 64 << 20; // 64MB - private static final int PROTOBUF_MESSAGE_MAX_LIMIT = 1024 << 20; // 1GB protected final FileSystem fileSystem; protected final Path path; @@ -387,47 +383,16 @@ public class ReaderImpl implements Reader { int footerSize, CompressionCodec codec, int bufferSize) throws IOException { bb.position(footerAbsPos); bb.limit(footerAbsPos + footerSize); - InputStream instream = InStream.create("footer", Lists.<DiskRange>newArrayList( - new BufferChunk(bb, 0)), footerSize, codec, bufferSize); - return OrcProto.Footer.parseFrom(instream); + return OrcProto.Footer.parseFrom(InStream.createCodedInputStream("footer", + Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)), footerSize, codec, bufferSize)); } private static OrcProto.Metadata extractMetadata(ByteBuffer bb, int metadataAbsPos, int metadataSize, CompressionCodec codec, int bufferSize) throws IOException { bb.position(metadataAbsPos); bb.limit(metadataAbsPos + metadataSize); - InputStream instream = InStream.create("metadata", Lists.<DiskRange>newArrayList( - new BufferChunk(bb, 0)), metadataSize, codec, bufferSize); - CodedInputStream in = CodedInputStream.newInstance(instream); - int msgLimit = DEFAULT_PROTOBUF_MESSAGE_LIMIT; - OrcProto.Metadata meta = null; - do { - try { - in.setSizeLimit(msgLimit); - meta = OrcProto.Metadata.parseFrom(in); - } catch (InvalidProtocolBufferException e) { - if (e.getMessage().contains("Protocol message was too large")) { - LOG.warn("Metadata section is larger than " + msgLimit + " bytes. Increasing the max" + - " size of the coded input stream." ); - - msgLimit = msgLimit << 1; - if (msgLimit > PROTOBUF_MESSAGE_MAX_LIMIT) { - LOG.error("Metadata section exceeds max protobuf message size of " + - PROTOBUF_MESSAGE_MAX_LIMIT + " bytes."); - throw e; - } - - // we must have failed in the middle of reading instream and instream doesn't support - // resetting the stream - instream = InStream.create("metadata", Lists.<DiskRange>newArrayList( - new BufferChunk(bb, 0)), metadataSize, codec, bufferSize); - in = CodedInputStream.newInstance(instream); - } else { - throw e; - } - } - } while (meta == null); - return meta; + return OrcProto.Metadata.parseFrom(InStream.createCodedInputStream("metadata", + Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)), metadataSize, codec, bufferSize)); } private static OrcProto.PostScript extractPostScript(ByteBuffer bb, Path path,