http://git-wip-us.apache.org/repos/asf/hive/blob/b18db4f4/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java index 8295929..328e4e2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java @@ -30,7 +30,6 @@ import java.util.List; import java.util.Map; import java.util.TimeZone; -import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; @@ -64,17 +63,12 @@ public class TreeReaderFactory { protected final int columnId; protected BitFieldReader present = null; protected boolean valuePresent = false; - protected ColumnStreamData presentStreamBuffer = null; - protected ColumnStreamData dataStreamBuffer = null; - protected ColumnStreamData dictionaryStreamBuffer = null; - protected ColumnStreamData lengthsStreamBuffer = null; - protected ColumnStreamData secondaryStreamBuffer = null; TreeReader(int columnId) throws IOException { this(columnId, null); } - TreeReader(int columnId, InStream in) throws IOException { + protected TreeReader(int columnId, InStream in) throws IOException { this.columnId = columnId; if (in == null) { present = null; @@ -91,7 +85,7 @@ public class TreeReaderFactory { } } - IntegerReader createIntegerReader(OrcProto.ColumnEncoding.Kind kind, + static IntegerReader createIntegerReader(OrcProto.ColumnEncoding.Kind kind, InStream in, boolean signed, boolean skipCorrupt) throws IOException { switch (kind) { @@ -136,37 +130,6 @@ public class TreeReaderFactory { } } - public void setBuffers(ColumnStreamData[] buffers, boolean sameStripe) - throws IOException { - // stream buffers are arranged in enum order of stream kind - for (ColumnStreamData streamBuffer : buffers) { - switch (streamBuffer.getStreamKind()) { - case 0: - // PRESENT stream - presentStreamBuffer = streamBuffer; - break; - case 1: - // DATA stream - dataStreamBuffer = streamBuffer; - break; - case 2: - // LENGTH stream - lengthsStreamBuffer = streamBuffer; - break; - case 3: - // DICTIONARY_DATA stream - dictionaryStreamBuffer = streamBuffer; - break; - case 5: - // SECONDARY stream - secondaryStreamBuffer = streamBuffer; - break; - default: - throw new IOException("Unexpected stream kind: " + streamBuffer.getStreamKind()); - } - } - } - protected long countNonNulls(long rows) throws IOException { if (present != null) { long result = 0; @@ -222,16 +185,20 @@ public class TreeReaderFactory { } return previousVector; } + + public BitFieldReader getPresent() { + return present; + } } - protected static class BooleanTreeReader extends TreeReader { + public static class BooleanTreeReader extends TreeReader { protected BitFieldReader reader = null; BooleanTreeReader(int columnId) throws IOException { this(columnId, null, null); } - BooleanTreeReader(int columnId, InStream present, InStream data) throws IOException { + protected BooleanTreeReader(int columnId, InStream present, InStream data) throws IOException { super(columnId, present); if (data != null) { reader = new BitFieldReader(data, 1); @@ -296,14 +263,14 @@ public class TreeReaderFactory { } } - protected static class ByteTreeReader extends TreeReader { + public static class ByteTreeReader extends TreeReader { protected RunLengthByteReader reader = null; ByteTreeReader(int columnId) throws IOException { this(columnId, null, null); } - ByteTreeReader(int columnId, InStream present, InStream data) throws IOException { + protected ByteTreeReader(int columnId, InStream present, InStream data) throws IOException { super(columnId, present); this.reader = new RunLengthByteReader(data); } @@ -366,14 +333,14 @@ public class TreeReaderFactory { } } - protected static class ShortTreeReader extends TreeReader { + public static class ShortTreeReader extends TreeReader { protected IntegerReader reader = null; ShortTreeReader(int columnId) throws IOException { this(columnId, null, null, null); } - ShortTreeReader(int columnId, InStream present, InStream data, + protected ShortTreeReader(int columnId, InStream present, InStream data, OrcProto.ColumnEncoding encoding) throws IOException { super(columnId, present); @@ -452,14 +419,14 @@ public class TreeReaderFactory { } } - protected static class IntTreeReader extends TreeReader { + public static class IntTreeReader extends TreeReader { protected IntegerReader reader = null; IntTreeReader(int columnId) throws IOException { this(columnId, null, null, null); } - IntTreeReader(int columnId, InStream present, InStream data, + protected IntTreeReader(int columnId, InStream present, InStream data, OrcProto.ColumnEncoding encoding) throws IOException { super(columnId, present); @@ -538,14 +505,14 @@ public class TreeReaderFactory { } } - protected static class LongTreeReader extends TreeReader { + public static class LongTreeReader extends TreeReader { protected IntegerReader reader = null; LongTreeReader(int columnId, boolean skipCorrupt) throws IOException { this(columnId, null, null, null, skipCorrupt); } - LongTreeReader(int columnId, InStream present, InStream data, + protected LongTreeReader(int columnId, InStream present, InStream data, OrcProto.ColumnEncoding encoding, boolean skipCorrupt) throws IOException { @@ -625,7 +592,7 @@ public class TreeReaderFactory { } } - protected static class FloatTreeReader extends TreeReader { + public static class FloatTreeReader extends TreeReader { protected InStream stream; private final SerializationUtils utils; @@ -633,7 +600,7 @@ public class TreeReaderFactory { this(columnId, null, null); } - FloatTreeReader(int columnId, InStream present, InStream data) throws IOException { + protected FloatTreeReader(int columnId, InStream present, InStream data) throws IOException { super(columnId, present); this.utils = new SerializationUtils(); this.stream = data; @@ -737,7 +704,7 @@ public class TreeReaderFactory { } } - protected static class DoubleTreeReader extends TreeReader { + public static class DoubleTreeReader extends TreeReader { protected InStream stream; private final SerializationUtils utils; @@ -745,7 +712,7 @@ public class TreeReaderFactory { this(columnId, null, null); } - DoubleTreeReader(int columnId, InStream present, InStream data) throws IOException { + protected DoubleTreeReader(int columnId, InStream present, InStream data) throws IOException { super(columnId, present); this.utils = new SerializationUtils(); this.stream = data; @@ -852,7 +819,7 @@ public class TreeReaderFactory { } } - protected static class BinaryTreeReader extends TreeReader { + public static class BinaryTreeReader extends TreeReader { protected InStream stream; protected IntegerReader lengths = null; protected final LongColumnVector scratchlcv; @@ -861,7 +828,7 @@ public class TreeReaderFactory { this(columnId, null, null, null, null); } - BinaryTreeReader(int columnId, InStream present, InStream data, InStream length, + protected BinaryTreeReader(int columnId, InStream present, InStream data, InStream length, OrcProto.ColumnEncoding encoding) throws IOException { super(columnId, present); scratchlcv = new LongColumnVector(); @@ -959,7 +926,7 @@ public class TreeReaderFactory { } } - protected static class TimestampTreeReader extends TreeReader { + public static class TimestampTreeReader extends TreeReader { protected IntegerReader data = null; protected IntegerReader nanos = null; private final boolean skipCorrupt; @@ -973,7 +940,7 @@ public class TreeReaderFactory { this(columnId, null, null, null, null, skipCorrupt); } - TimestampTreeReader(int columnId, InStream presentStream, InStream dataStream, + protected TimestampTreeReader(int columnId, InStream presentStream, InStream dataStream, InStream nanosStream, OrcProto.ColumnEncoding encoding, boolean skipCorrupt) throws IOException { super(columnId, presentStream); @@ -1144,14 +1111,14 @@ public class TreeReaderFactory { } } - protected static class DateTreeReader extends TreeReader { + public static class DateTreeReader extends TreeReader { protected IntegerReader reader = null; DateTreeReader(int columnId) throws IOException { this(columnId, null, null, null); } - DateTreeReader(int columnId, InStream present, InStream data, + protected DateTreeReader(int columnId, InStream present, InStream data, OrcProto.ColumnEncoding encoding) throws IOException { super(columnId, present); if (data != null && encoding != null) { @@ -1229,7 +1196,7 @@ public class TreeReaderFactory { } } - protected static class DecimalTreeReader extends TreeReader { + public static class DecimalTreeReader extends TreeReader { protected InStream valueStream; protected IntegerReader scaleReader = null; private LongColumnVector scratchScaleVector; @@ -1241,7 +1208,7 @@ public class TreeReaderFactory { this(columnId, precision, scale, null, null, null, null); } - DecimalTreeReader(int columnId, int precision, int scale, InStream present, + protected DecimalTreeReader(int columnId, int precision, int scale, InStream present, InStream valueStream, InStream scaleStream, OrcProto.ColumnEncoding encoding) throws IOException { super(columnId, present); @@ -1363,14 +1330,14 @@ public class TreeReaderFactory { * stripe, it creates an internal reader based on whether a direct or * dictionary encoding was used. */ - protected static class StringTreeReader extends TreeReader { + public static class StringTreeReader extends TreeReader { protected TreeReader reader; StringTreeReader(int columnId) throws IOException { super(columnId); } - StringTreeReader(int columnId, InStream present, InStream data, InStream length, + protected StringTreeReader(int columnId, InStream present, InStream data, InStream length, InStream dictionary, OrcProto.ColumnEncoding encoding) throws IOException { super(columnId, present); if (encoding != null) { @@ -1521,7 +1488,7 @@ public class TreeReaderFactory { * A reader for string columns that are direct encoded in the current * stripe. */ - protected static class StringDirectTreeReader extends TreeReader { + public static class StringDirectTreeReader extends TreeReader { protected InStream stream; protected IntegerReader lengths; private final LongColumnVector scratchlcv; @@ -1530,8 +1497,8 @@ public class TreeReaderFactory { this(columnId, null, null, null, null); } - StringDirectTreeReader(int columnId, InStream present, InStream data, InStream length, - OrcProto.ColumnEncoding.Kind encoding) throws IOException { + protected StringDirectTreeReader(int columnId, InStream present, InStream data, + InStream length, OrcProto.ColumnEncoding.Kind encoding) throws IOException { super(columnId, present); this.scratchlcv = new LongColumnVector(); this.stream = data; @@ -1628,13 +1595,21 @@ public class TreeReaderFactory { lengthToSkip -= stream.skip(lengthToSkip); } } + + public IntegerReader getLengths() { + return lengths; + } + + public InStream getStream() { + return stream; + } } /** * A reader for string columns that are dictionary encoded in the current * stripe. */ - protected static class StringDictionaryTreeReader extends TreeReader { + public static class StringDictionaryTreeReader extends TreeReader { private DynamicByteArray dictionaryBuffer; private int[] dictionaryOffsets; protected IntegerReader reader; @@ -1646,7 +1621,7 @@ public class TreeReaderFactory { this(columnId, null, null, null, null, null); } - StringDictionaryTreeReader(int columnId, InStream present, InStream data, + protected StringDictionaryTreeReader(int columnId, InStream present, InStream data, InStream length, InStream dictionary, OrcProto.ColumnEncoding encoding) throws IOException { super(columnId, present); @@ -1839,16 +1814,20 @@ public class TreeReaderFactory { void skipRows(long items) throws IOException { reader.skip(countNonNulls(items)); } + + public IntegerReader getReader() { + return reader; + } } - protected static class CharTreeReader extends StringTreeReader { + public static class CharTreeReader extends StringTreeReader { int maxLength; CharTreeReader(int columnId, int maxLength) throws IOException { this(columnId, maxLength, null, null, null, null, null); } - CharTreeReader(int columnId, int maxLength, InStream present, InStream data, + protected CharTreeReader(int columnId, int maxLength, InStream present, InStream data, InStream length, InStream dictionary, OrcProto.ColumnEncoding encoding) throws IOException { super(columnId, present, data, length, dictionary, encoding); this.maxLength = maxLength; @@ -1915,14 +1894,14 @@ public class TreeReaderFactory { } } - protected static class VarcharTreeReader extends StringTreeReader { + public static class VarcharTreeReader extends StringTreeReader { int maxLength; VarcharTreeReader(int columnId, int maxLength) throws IOException { this(columnId, maxLength, null, null, null, null, null); } - VarcharTreeReader(int columnId, int maxLength, InStream present, InStream data, + protected VarcharTreeReader(int columnId, int maxLength, InStream present, InStream data, InStream length, InStream dictionary, OrcProto.ColumnEncoding encoding) throws IOException { super(columnId, present, data, length, dictionary, encoding); this.maxLength = maxLength; @@ -1987,11 +1966,11 @@ public class TreeReaderFactory { } } - protected static class StructTreeReader extends TreeReader { + public static class StructTreeReader extends TreeReader { protected final TreeReader[] fields; private final String[] fieldNames; - StructTreeReader(int columnId, + protected StructTreeReader(int columnId, List<OrcProto.Type> types, boolean[] included, boolean skipCorrupt) throws IOException { @@ -2090,11 +2069,11 @@ public class TreeReaderFactory { } } - protected static class UnionTreeReader extends TreeReader { + public static class UnionTreeReader extends TreeReader { protected final TreeReader[] fields; protected RunLengthByteReader tags; - UnionTreeReader(int columnId, + protected UnionTreeReader(int columnId, List<OrcProto.Type> types, boolean[] included, boolean skipCorrupt) throws IOException { @@ -2170,11 +2149,11 @@ public class TreeReaderFactory { } } - protected static class ListTreeReader extends TreeReader { + public static class ListTreeReader extends TreeReader { protected final TreeReader elementReader; protected IntegerReader lengths = null; - ListTreeReader(int columnId, + protected ListTreeReader(int columnId, List<OrcProto.Type> types, boolean[] included, boolean skipCorrupt) throws IOException { @@ -2259,12 +2238,12 @@ public class TreeReaderFactory { } } - protected static class MapTreeReader extends TreeReader { + public static class MapTreeReader extends TreeReader { protected final TreeReader keyReader; protected final TreeReader valueReader; protected IntegerReader lengths = null; - MapTreeReader(int columnId, + protected MapTreeReader(int columnId, List<OrcProto.Type> types, boolean[] included, boolean skipCorrupt) throws IOException {
http://git-wip-us.apache.org/repos/asf/hive/blob/b18db4f4/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java index b3c9169..00dcc15 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java @@ -35,7 +35,7 @@ public class CacheChunk extends DiskRangeList { @Override public ByteBuffer getData() { - // Callers duplicate the buffer, they have to for BufferChunk + // Callers duplicate the buffer, they have to for BufferChunk; so we don't have to. return buffer.getByteBufferRaw(); } http://git-wip-us.apache.org/repos/asf/hive/blob/b18db4f4/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java index ce503d9..b1477e7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java @@ -348,7 +348,7 @@ class EncodedReaderImpl implements EncodedReader { ColumnReadContext ctx = colCtxs[colIxMod]; RowIndexEntry index = ctx.rowIndex.getEntry(rgIx), nextIndex = isLastRg ? null : ctx.rowIndex.getEntry(rgIx + 1); - ecb.initColumn(colIxMod, ctx.colIx, ctx.streamCount); + ecb.initColumn(colIxMod, ctx.colIx, OrcEncodedColumnBatch.MAX_DATA_STREAMS); for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) { StreamContext sctx = ctx.streams[streamIx]; ColumnStreamData cb = null; @@ -360,7 +360,6 @@ class EncodedReaderImpl implements EncodedReader { } if (sctx.stripeLevelStream == null) { sctx.stripeLevelStream = POOLS.csdPool.take(); - sctx.stripeLevelStream.init(sctx.kind.getNumber()); // We will be using this for each RG while also sending RGs to processing. // To avoid buffers being unlocked, run refcount one ahead; we will not increase // it when building the last RG, so each RG processing will decref once, and the @@ -399,7 +398,7 @@ class EncodedReaderImpl implements EncodedReader { sctx.bufferIter = iter = lastCached; } } - ecb.setStreamData(colIxMod, streamIx, cb); + ecb.setStreamData(colIxMod, sctx.kind.getNumber(), cb); } } if (isRGSelected) { @@ -431,9 +430,7 @@ class EncodedReaderImpl implements EncodedReader { private ColumnStreamData createRgColumnStreamData(int rgIx, boolean isLastRg, int colIx, StreamContext sctx, long cOffset, long endCOffset, boolean isCompressed) { - ColumnStreamData cb; - cb = POOLS.csdPool.take(); - cb.init(sctx.kind.getNumber()); + ColumnStreamData cb = POOLS.csdPool.take(); cb.incRef(); if (isDebugTracingEnabled) { LOG.info("Getting data for column "+ colIx + " " + (isLastRg ? "last " : "")