HIVE-19479 : encoded stream seek is incorrect for 0-length RGs in LLAP IO (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e941bea8 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e941bea8 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e941bea8 Branch: refs/heads/branch-3.0.0 Commit: e941bea80393e74efd64b07355b2e0fac384f7cc Parents: 9ebb2ff Author: sergey <ser...@apache.org> Authored: Fri May 11 12:01:10 2018 -0700 Committer: Vineet Garg <vg...@apache.org> Committed: Sun May 13 21:03:44 2018 -0700 ---------------------------------------------------------------------- .../llap/io/decode/OrcEncodedDataConsumer.java | 3 + .../ql/io/orc/encoded/EncodedReaderImpl.java | 10 +- .../orc/encoded/EncodedTreeReaderFactory.java | 118 ++++++++++--------- 3 files changed, 70 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/e941bea8/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java index fc0c66a..05282db 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java @@ -296,6 +296,9 @@ public class OrcEncodedDataConsumer ConsumerStripeMetadata stripeMetadata) throws IOException { PositionProvider[] pps = createPositionProviders( columnReaders, batch.getBatchKey(), stripeMetadata); + if (LlapIoImpl.ORC_LOGGER.isTraceEnabled()) { + LlapIoImpl.ORC_LOGGER.trace("Created pps {}", Arrays.toString(pps)); + } if (pps == null) return; for (int i = 0; i < columnReaders.length; i++) { TreeReader reader = columnReaders[i]; http://git-wip-us.apache.org/repos/asf/hive/blob/e941bea8/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java index 1d7eceb..348f9df 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java @@ -435,12 +435,12 @@ class EncodedReaderImpl implements EncodedReader { try { if (RecordReaderUtils.isDictionary(sctx.kind, ctx.encoding) || index == null) { // This stream is for entire stripe and needed for every RG; uncompress once and reuse. - if (isTracingEnabled) { - LOG.trace("Getting stripe-level stream [" + sctx.kind + ", " + ctx.encoding + "] for" - + " column " + ctx.colIx + " RG " + rgIx + " at " + sctx.offset + ", " + sctx.length); - } - trace.logStartStripeStream(sctx.kind); if (sctx.stripeLevelStream == null) { + if (isTracingEnabled) { + LOG.trace("Getting stripe-level stream [" + sctx.kind + ", " + ctx.encoding + "] for" + + " column " + ctx.colIx + " RG " + rgIx + " at " + sctx.offset + ", " + sctx.length); + } + trace.logStartStripeStream(sctx.kind); sctx.stripeLevelStream = POOLS.csdPool.take(); // We will be using this for each RG while also sending RGs to processing. // To avoid buffers being unlocked, run refcount one ahead; so each RG http://git-wip-us.apache.org/repos/asf/hive/blob/e941bea8/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java index 42532f9..646b214 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch; import org.apache.orc.CompressionCodec; import org.apache.orc.TypeDescription; import org.apache.orc.TypeDescription.Category; +import org.apache.orc.impl.InStream; import org.apache.orc.impl.PositionProvider; import org.apache.orc.impl.SettableUncompressedStream; import org.apache.orc.impl.TreeReaderFactory; @@ -213,6 +214,11 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory { } } + private static void skipCompressedIndex(boolean isCompressed, PositionProvider index) { + if (!isCompressed) return; + index.getNext(); + } + protected static class StringStreamReader extends StringTreeReader implements SettableTreeReader { private boolean _isFileCompressed; @@ -260,30 +266,30 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory { // data stream could be empty stream or already reached end of stream before present stream. // This can happen if all values in stream are nulls or last row group values are all null. + skipCompressedIndex(_isFileCompressed, index); if (_dataStream != null && _dataStream.available() > 0) { - if (_isFileCompressed) { - index.getNext(); - } ((StringDictionaryTreeReader) reader).getReader().seek(index); - } + } // No need to skip seek here, index won't be used anymore. } else { // DIRECT encoding // data stream could be empty stream or already reached end of stream before present stream. // This can happen if all values in stream are nulls or last row group values are all null. + skipCompressedIndex(_isFileCompressed, index); + // TODO: why does the original code not just use _dataStream that it passes in as stream? + InStream stream = ((StringDirectTreeReader) reader).getStream(); + // TODO: not clear why this check and skipSeek are needed. if (_dataStream != null && _dataStream.available() > 0) { - if (_isFileCompressed) { - index.getNext(); - } - ((StringDirectTreeReader) reader).getStream().seek(index); + stream.seek(index); + } else { + assert stream == _dataStream; + skipSeek(index); } + skipCompressedIndex(_isFileCompressed, index); if (_lengthStream != null && _lengthStream.available() > 0) { - if (_isFileCompressed) { - index.getNext(); - } ((StringDirectTreeReader) reader).getLengths().seek(index); - } + } // No need to skip seek here, index won't be used anymore. } } @@ -830,10 +836,8 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory { // data stream could be empty stream or already reached end of stream before present stream. // This can happen if all values in stream are nulls or last row group values are all null. + skipCompressedIndex(_isFileCompressed, index); if (_dataStream.available() > 0) { - if (_isFileCompressed) { - index.getNext(); - } stream.seek(index); } } @@ -945,10 +949,8 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory { // data stream could be empty stream or already reached end of stream before present stream. // This can happen if all values in stream are nulls or last row group values are all null. + skipCompressedIndex(_isFileCompressed, index); if (_dataStream.available() > 0) { - if (_isFileCompressed) { - index.getNext(); - } stream.seek(index); } } @@ -1071,19 +1073,19 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory { // data stream could be empty stream or already reached end of stream before present stream. // This can happen if all values in stream are nulls or last row group values are all null. + skipCompressedIndex(_isFileCompressed, index); + // TODO: not clear why this check and skipSeek are needed. if (_valueStream.available() > 0) { - if (_isFileCompressed) { - index.getNext(); - } valueStream.seek(index); + } else { + assert valueStream == _valueStream; + skipSeek(index); } + skipCompressedIndex(_isFileCompressed, index); if (_scaleStream.available() > 0) { - if (_isFileCompressed) { - index.getNext(); - } scaleReader.seek(index); - } + } // No need to skip seek here, index won't be used anymore. } @Override @@ -1375,30 +1377,29 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory { // data stream could be empty stream or already reached end of stream before present stream. // This can happen if all values in stream are nulls or last row group values are all null. + skipCompressedIndex(_isFileCompressed, index); if (_dataStream.available() > 0) { - if (_isFileCompressed) { - index.getNext(); - } ((StringDictionaryTreeReader) reader).getReader().seek(index); - } + } // No need to skip seek here, index won't be used anymore. } else { // DIRECT encoding // data stream could be empty stream or already reached end of stream before present stream. // This can happen if all values in stream are nulls or last row group values are all null. + skipCompressedIndex(_isFileCompressed, index); + InStream stream = ((StringDirectTreeReader) reader).getStream(); + // TODO: not clear why this check and skipSeek are needed. if (_dataStream.available() > 0) { - if (_isFileCompressed) { - index.getNext(); - } - ((StringDirectTreeReader) reader).getStream().seek(index); + stream.seek(index); + } else { + assert stream == _dataStream; + skipSeek(index); } + skipCompressedIndex(_isFileCompressed, index); if (_lengthStream.available() > 0) { - if (_isFileCompressed) { - index.getNext(); - } ((StringDirectTreeReader) reader).getLengths().seek(index); - } + } // No need to skip seek here, index won't be used anymore. } } @@ -1574,30 +1575,29 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory { // data stream could be empty stream or already reached end of stream before present stream. // This can happen if all values in stream are nulls or last row group values are all null. + skipCompressedIndex(_isFileCompressed, index); if (_dataStream.available() > 0) { - if (_isFileCompressed) { - index.getNext(); - } ((StringDictionaryTreeReader) reader).getReader().seek(index); - } + } // No need to skip seek here, index won't be used anymore. } else { // DIRECT encoding // data stream could be empty stream or already reached end of stream before present stream. // This can happen if all values in stream are nulls or last row group values are all null. + skipCompressedIndex(_isFileCompressed, index); + InStream stream = ((StringDirectTreeReader) reader).getStream(); + // TODO: not clear why this check and skipSeek are needed. if (_dataStream.available() > 0) { - if (_isFileCompressed) { - index.getNext(); - } - ((StringDirectTreeReader) reader).getStream().seek(index); + stream.seek(index); + } else { + assert stream == _dataStream; + skipSeek(index); } + skipCompressedIndex(_isFileCompressed, index); if (_lengthStream.available() > 0) { - if (_isFileCompressed) { - index.getNext(); - } ((StringDirectTreeReader) reader).getLengths().seek(index); - } + } // No need to skip seek here, index won't be used anymore. } } @@ -1885,19 +1885,19 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory { // data stream could be empty stream or already reached end of stream before present stream. // This can happen if all values in stream are nulls or last row group values are all null. + skipCompressedIndex(_isFileCompressed, index); + // TODO: not clear why this check and skipSeek are needed. if (_dataStream.available() > 0) { - if (_isFileCompressed) { - index.getNext(); - } stream.seek(index); + } else { + assert stream == _dataStream; + skipSeek(index); } + skipCompressedIndex(_isFileCompressed, index); if (lengths != null && _lengthsStream.available() > 0) { - if (_isFileCompressed) { - index.getNext(); - } lengths.seek(index); - } + } // No need to skip seek here, index won't be used anymore. } @Override @@ -2132,6 +2132,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory { } + private static void skipSeek(PositionProvider index) { + // Must be consistent with uncompressed stream seek in ORC. See call site comments. + index.getNext(); + } + + private static TreeReader createEncodedTreeReader(TypeDescription schema, List<OrcProto.ColumnEncoding> encodings, OrcEncodedColumnBatch batch, CompressionCodec codec, TreeReaderFactory.Context context) throws IOException {