http://git-wip-us.apache.org/repos/asf/hive/blob/3b6b56d7/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
----------------------------------------------------------------------
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
index 29b51ec..f4cfa53 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
@@ -42,7 +42,6 @@ import org.apache.hadoop.hive.ql.io.orc.RecordReaderUtils;
import org.apache.orc.impl.StreamName;
import org.apache.orc.StripeInformation;
import org.apache.orc.impl.BufferChunk;
-import
org.apache.hadoop.hive.ql.io.orc.RecordReaderUtils.ByteBufferAllocatorPool;
import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch;
import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.PoolFactory;
import org.apache.orc.OrcProto;
@@ -103,8 +102,7 @@ class EncodedReaderImpl implements EncodedReader {
private final List<OrcProto.Type> types;
private final long rowIndexStride;
private final DataCache cache;
- private ByteBufferAllocatorPool pool;
- private boolean isDebugTracingEnabled;
+ private boolean isTracingEnabled;
public EncodedReaderImpl(Object fileKey, List<OrcProto.Type> types,
CompressionCodec codec,
int bufferSize, long strideRate, DataCache cache, DataReader dataReader,
PoolFactory pf)
@@ -209,8 +207,8 @@ class EncodedReaderImpl implements EncodedReader {
long offset = 0; // Stream offset in relation to the stripe.
// 1.1. Figure out which columns have a present stream
boolean[] hasNull =
RecordReaderUtils.findPresentStreamsByColumn(streamList, types);
- if (isDebugTracingEnabled) {
- LOG.info("The following columns have PRESENT streams: " +
arrayToString(hasNull));
+ if (isTracingEnabled) {
+ LOG.trace("The following columns have PRESENT streams: " +
arrayToString(hasNull));
}
// We assume stream list is sorted by column and that non-data
@@ -230,8 +228,8 @@ class EncodedReaderImpl implements EncodedReader {
// We have a stream for included column, but in future it might have
no data streams.
// It's more like "has at least one column included that has an index
stream".
hasIndexOnlyCols = hasIndexOnlyCols | included[colIx];
- if (isDebugTracingEnabled) {
- LOG.info("Skipping stream: " + streamKind + " at " + offset + ", " +
length);
+ if (isTracingEnabled) {
+ LOG.trace("Skipping stream: " + streamKind + " at " + offset + ", "
+ length);
}
offset += length;
continue;
@@ -244,8 +242,8 @@ class EncodedReaderImpl implements EncodedReader {
includedRgs = colRgs[colRgIx];
ctx = colCtxs[colRgIx] = new ColumnReadContext(
colIx, encodings.get(colIx), indexes[colIx]);
- if (isDebugTracingEnabled) {
- LOG.info("Creating context " + colRgIx + " for column " + colIx +
":" + ctx.toString());
+ if (isTracingEnabled) {
+ LOG.trace("Creating context " + colRgIx + " for column " + colIx +
":" + ctx.toString());
}
} else {
ctx = colCtxs[colRgIx];
@@ -254,14 +252,14 @@ class EncodedReaderImpl implements EncodedReader {
int indexIx = RecordReaderUtils.getIndexPosition(ctx.encoding.getKind(),
types.get(colIx).getKind(), streamKind, isCompressed,
hasNull[colIx]);
ctx.addStream(offset, stream, indexIx);
- if (isDebugTracingEnabled) {
- LOG.info("Adding stream for column " + colIx + ": " + streamKind + "
at " + offset
+ if (isTracingEnabled) {
+ LOG.trace("Adding stream for column " + colIx + ": " + streamKind + "
at " + offset
+ ", " + length + ", index position " + indexIx);
}
if (includedRgs == null || RecordReaderUtils.isDictionary(streamKind,
encodings.get(colIx))) {
RecordReaderUtils.addEntireStreamToRanges(offset, length, listToRead,
true);
- if (isDebugTracingEnabled) {
- LOG.info("Will read whole stream " + streamKind + "; added to " +
listToRead.getTail());
+ if (isTracingEnabled) {
+ LOG.trace("Will read whole stream " + streamKind + "; added to " +
listToRead.getTail());
}
} else {
RecordReaderUtils.addRgFilteredStreamToRanges(stream, includedRgs,
@@ -287,15 +285,15 @@ class EncodedReaderImpl implements EncodedReader {
// 2. Now, read all of the ranges from cache or disk.
DiskRangeList.MutateHelper toRead = new
DiskRangeList.MutateHelper(listToRead.get());
- if (isDebugTracingEnabled && LOG.isInfoEnabled()) {
- LOG.info("Resulting disk ranges to read (file " + fileKey + "): "
+ if (isTracingEnabled && LOG.isInfoEnabled()) {
+ LOG.trace("Resulting disk ranges to read (file " + fileKey + "): "
+ RecordReaderUtils.stringifyDiskRanges(toRead.next));
}
BooleanRef isAllInCache = new BooleanRef();
if (hasFileId) {
cache.getFileData(fileKey, toRead.next, stripeOffset, CC_FACTORY,
isAllInCache);
- if (isDebugTracingEnabled && LOG.isInfoEnabled()) {
- LOG.info("Disk ranges after cache (file " + fileKey + ", base offset "
+ stripeOffset
+ if (isTracingEnabled && LOG.isInfoEnabled()) {
+ LOG.trace("Disk ranges after cache (file " + fileKey + ", base offset
" + stripeOffset
+ "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next));
}
}
@@ -322,8 +320,8 @@ class EncodedReaderImpl implements EncodedReader {
}
}
}
- if (isDebugTracingEnabled) {
- LOG.info("Disk ranges after pre-read (file " + fileKey + ", base
offset "
+ if (isTracingEnabled) {
+ LOG.trace("Disk ranges after pre-read (file " + fileKey + ", base
offset "
+ stripeOffset + "): " +
RecordReaderUtils.stringifyDiskRanges(toRead.next));
}
iter = toRead.next; // Reset the iter to start.
@@ -354,8 +352,8 @@ class EncodedReaderImpl implements EncodedReader {
ColumnStreamData cb = null;
if (RecordReaderUtils.isDictionary(sctx.kind, ctx.encoding)) {
// This stream is for entire stripe and needed for every RG;
uncompress once and reuse.
- if (isDebugTracingEnabled) {
- LOG.info("Getting stripe-level stream [" + sctx.kind + ", " +
ctx.encoding + "] for"
+ if (isTracingEnabled) {
+ LOG.trace("Getting stripe-level stream [" + sctx.kind + ", " +
ctx.encoding + "] for"
+ " column " + ctx.colIx + " RG " + rgIx + " at " +
sctx.offset + ", " + sctx.length);
}
if (sctx.stripeLevelStream == null) {
@@ -411,8 +409,8 @@ class EncodedReaderImpl implements EncodedReader {
}
}
- if (isDebugTracingEnabled) {
- LOG.info("Disk ranges after preparing all the data "
+ if (isTracingEnabled) {
+ LOG.trace("Disk ranges after preparing all the data "
+ RecordReaderUtils.stringifyDiskRanges(toRead.next));
}
@@ -437,8 +435,8 @@ class EncodedReaderImpl implements EncodedReader {
int colIx, StreamContext sctx, long cOffset, long endCOffset, boolean
isCompressed) {
ColumnStreamData cb = POOLS.csdPool.take();
cb.incRef();
- if (isDebugTracingEnabled) {
- LOG.info("Getting data for column "+ colIx + " " + (isLastRg ? "last " :
"")
+ if (isTracingEnabled) {
+ LOG.trace("Getting data for column "+ colIx + " " + (isLastRg ? "last "
: "")
+ "RG " + rgIx + " stream " + sctx.kind + " at " + sctx.offset + ",
"
+ sctx.length + " index position " + sctx.streamIndexOffset + ": " +
(isCompressed ? "" : "un") + "compressed [" + cOffset + ", " +
endCOffset + ")");
@@ -460,17 +458,14 @@ class EncodedReaderImpl implements EncodedReader {
}
@Override
- public void setDebugTracing(boolean isEnabled) {
- this.isDebugTracingEnabled = isEnabled;
+ public void setTracing(boolean isEnabled) {
+ this.isTracingEnabled = isEnabled;
}
@Override
public void close() throws IOException {
dataReader.close();
- if (pool != null) {
- pool.clear();
- }
}
/**
@@ -608,8 +603,8 @@ class EncodedReaderImpl implements EncodedReader {
// want to be, or just before. However, RGs can overlap due to encoding,
so we may have
// to return to a previous block.
DiskRangeList current = findExactPosition(start, cOffset);
- if (isDebugTracingEnabled) {
- LOG.info("Starting read for [" + cOffset + "," + endCOffset + ") at " +
current);
+ if (isTracingEnabled) {
+ LOG.trace("Starting read for [" + cOffset + "," + endCOffset + ") at " +
current);
}
CacheChunk lastUncompressed = null;
@@ -648,8 +643,8 @@ class EncodedReaderImpl implements EncodedReader {
}
chunk.originalData = null;
- if (isDebugTracingEnabled) {
- LOG.info("Locking " + chunk.getBuffer() + " due to reuse (after
decompression)");
+ if (isTracingEnabled) {
+ LOG.trace("Locking " + chunk.getBuffer() + " due to reuse (after
decompression)");
}
cache.reuseBuffer(chunk.getBuffer());
}
@@ -691,22 +686,22 @@ class EncodedReaderImpl implements EncodedReader {
if (current instanceof CacheChunk) {
// 2a. This is a decoded compression buffer, add as is.
CacheChunk cc = (CacheChunk)current;
- if (isDebugTracingEnabled) {
- LOG.info("Locking " + cc.getBuffer() + " due to reuse");
+ if (isTracingEnabled) {
+ LOG.trace("Locking " + cc.getBuffer() + " due to reuse");
}
cache.reuseBuffer(cc.getBuffer());
columnStreamData.getCacheBuffers().add(cc.getBuffer());
currentOffset = cc.getEnd();
- if (isDebugTracingEnabled) {
- LOG.info("Adding an already-uncompressed buffer " + cc.getBuffer());
+ if (isTracingEnabled) {
+ LOG.trace("Adding an already-uncompressed buffer " + cc.getBuffer());
}
ponderReleaseInitialRefcount(unlockUntilCOffset, streamOffset, cc);
lastUncompressed = cc;
next = current.next;
} else if (current instanceof IncompleteCb) {
// 2b. This is a known incomplete CB caused by ORC CB end boundaries
being estimates.
- if (isDebugTracingEnabled) {
- LOG.info("Cannot read " + current);
+ if (isTracingEnabled) {
+ LOG.trace("Cannot read " + current);
}
next = null;
currentOffset = -1;
@@ -739,8 +734,8 @@ class EncodedReaderImpl implements EncodedReader {
DiskRangeList next = null;
assert current instanceof CacheChunk;
lastUncompressed = (CacheChunk)current;
- if (isDebugTracingEnabled) {
- LOG.info("Locking " + lastUncompressed.getBuffer() + " due to reuse");
+ if (isTracingEnabled) {
+ LOG.trace("Locking " + lastUncompressed.getBuffer() + " due to reuse");
}
cache.reuseBuffer(lastUncompressed.getBuffer());
if (isFirst) {
@@ -749,8 +744,8 @@ class EncodedReaderImpl implements EncodedReader {
}
columnStreamData.getCacheBuffers().add(lastUncompressed.getBuffer());
currentOffset = lastUncompressed.getEnd();
- if (isDebugTracingEnabled) {
- LOG.info("Adding an uncompressed buffer " +
lastUncompressed.getBuffer());
+ if (isTracingEnabled) {
+ LOG.trace("Adding an uncompressed buffer " +
lastUncompressed.getBuffer());
}
ponderReleaseInitialRefcount(unlockUntilCOffset, streamOffset,
lastUncompressed);
next = current.next;
@@ -770,7 +765,6 @@ class EncodedReaderImpl implements EncodedReader {
* to handle just for this case.
* We could avoid copy in non-zcr case and manage the buffer that was not
allocated by our
* allocator. Uncompressed case is not mainline though so let's not
complicate it.
- * @param qfCounters
*/
private DiskRangeList preReadUncompressedStream(long baseOffset,
DiskRangeList start, long streamOffset, long streamEnd) throws
IOException {
@@ -780,8 +774,8 @@ class EncodedReaderImpl implements EncodedReader {
// 1. Find our bearings in the stream.
DiskRangeList current = findIntersectingPosition(start, streamOffset,
streamEnd);
- if (isDebugTracingEnabled) {
- LOG.info("Starting pre-read for [" + streamOffset + "," + streamEnd + ")
at " + current);
+ if (isTracingEnabled) {
+ LOG.trace("Starting pre-read for [" + streamOffset + "," + streamEnd +
") at " + current);
}
if (streamOffset > current.getOffset()) {
@@ -836,8 +830,8 @@ class EncodedReaderImpl implements EncodedReader {
current = current.split(partEnd);
wasSplit = true;
}
- if (isDebugTracingEnabled) {
- LOG.info("Processing uncompressed file data at ["
+ if (isTracingEnabled) {
+ LOG.trace("Processing uncompressed file data at ["
+ current.getOffset() + ", " + current.getEnd() + ")");
}
BufferChunk curBc = (BufferChunk)current;
@@ -1058,8 +1052,8 @@ class EncodedReaderImpl implements EncodedReader {
private void releaseInitialRefcount(CacheChunk cc, boolean isBacktracking) {
// This is the last RG for which this buffer will be used. Remove the
initial refcount
- if (isDebugTracingEnabled) {
- LOG.info("Unlocking " + cc.getBuffer() + " for the fetching thread"
+ if (isTracingEnabled) {
+ LOG.trace("Unlocking " + cc.getBuffer() + " for the fetching thread"
+ (isBacktracking ? "; backtracking" : ""));
}
cache.releaseBuffer(cc.getBuffer());
@@ -1081,8 +1075,8 @@ class EncodedReaderImpl implements EncodedReader {
// Cache has found an old buffer for the key and put it into array
instead of our new one.
CacheChunk replacedChunk = toDecompress.get(i);
MemoryBuffer replacementBuffer = targetBuffers[i];
- if (isDebugTracingEnabled) {
- LOG.info("Discarding data due to cache collision: " +
replacedChunk.getBuffer()
+ if (isTracingEnabled) {
+ LOG.trace("Discarding data due to cache collision: " +
replacedChunk.getBuffer()
+ " replaced with " + replacementBuffer);
}
assert replacedChunk.getBuffer() != replacementBuffer : i + " was not
replaced in the results "
@@ -1133,7 +1127,6 @@ class EncodedReaderImpl implements EncodedReader {
* multiple ranges (usually, that would only happen with zcr).
* Adds stuff to cachedBuffers, toDecompress and toRelease (see below what
each does).
* @param current BufferChunk where compression block starts.
- * @param ranges Iterator of all chunks, pointing at current.
* @param cacheBuffers The result buffer array to add pre-allocated target
cache buffer.
* @param toDecompress The list of work to decompress - pairs of compressed
buffers and the
* target buffers (same as the ones added to
cacheBuffers).
@@ -1157,8 +1150,8 @@ class EncodedReaderImpl implements EncodedReader {
int consumedLength = chunkLength + OutStream.HEADER_SIZE;
long cbEndOffset = cbStartOffset + consumedLength;
boolean isUncompressed = ((b0 & 0x01) == 1);
- if (isDebugTracingEnabled) {
- LOG.info("Found CB at " + cbStartOffset + ", chunk length " +
chunkLength + ", total "
+ if (isTracingEnabled) {
+ LOG.trace("Found CB at " + cbStartOffset + ", chunk length " +
chunkLength + ", total "
+ consumedLength + ", " + (isUncompressed ? "not " : "") +
"compressed");
}
if (compressed.remaining() >= chunkLength) {
@@ -1183,8 +1176,8 @@ class EncodedReaderImpl implements EncodedReader {
int remaining = chunkLength - compressed.remaining();
int originalPos = compressed.position();
copy.put(compressed);
- if (isDebugTracingEnabled) {
- LOG.info("Removing partial CB " + current + " from ranges after copying
its contents");
+ if (isTracingEnabled) {
+ LOG.trace("Removing partial CB " + current + " from ranges after copying
its contents");
}
DiskRangeList next = current.next;
current.removeSelf();
@@ -1223,8 +1216,8 @@ class EncodedReaderImpl implements EncodedReader {
DiskRangeList tmp = next;
next = next.hasContiguousNext() ? next.next : null;
if (next != null) {
- if (isDebugTracingEnabled) {
- LOG.info("Removing partial CB " + tmp + " from ranges after copying
its contents");
+ if (isTracingEnabled) {
+ LOG.trace("Removing partial CB " + tmp + " from ranges after copying
its contents");
}
tmp.removeSelf();
} else {
@@ -1237,8 +1230,8 @@ class EncodedReaderImpl implements EncodedReader {
private void addIncompleteCompressionBuffer(
long cbStartOffset, DiskRangeList target, int extraChunkCount) {
IncompleteCb icb = new IncompleteCb(cbStartOffset, target.getEnd());
- if (isDebugTracingEnabled) {
- LOG.info("Replacing " + target + " (and " + extraChunkCount + " previous
chunks) with "
+ if (isTracingEnabled) {
+ LOG.trace("Replacing " + target + " (and " + extraChunkCount + "
previous chunks) with "
+ icb + " in the buffers");
}
target.replaceSelfWith(icb);
@@ -1250,9 +1243,7 @@ class EncodedReaderImpl implements EncodedReader {
* @param isUncompressed Whether the data in the block is uncompressed.
* @param cbStartOffset Compressed start offset of the fCB.
* @param cbEndOffset Compressed end offset of the fCB.
- * @param lastRange The buffer from which the last (or all) bytes of fCB
come.
* @param lastChunkLength The number of compressed bytes consumed from last
*chunk* into fullCompressionBlock.
- * @param ranges The iterator of all compressed ranges for the stream,
pointing at lastRange.
* @param lastChunk
* @param toDecompress See addOneCompressionBuffer.
* @param cacheBuffers See addOneCompressionBuffer.
@@ -1271,20 +1262,20 @@ class EncodedReaderImpl implements EncodedReader {
fullCompressionBlock, futureAlloc, cacheBuffers.size() - 1);
toDecompress.add(cc);
// Adjust the compression block position.
- if (isDebugTracingEnabled) {
- LOG.info("Adjusting " + lastChunk + " to consume " + lastChunkLength + "
compressed bytes");
+ if (isTracingEnabled) {
+ LOG.trace("Adjusting " + lastChunk + " to consume " + lastChunkLength +
" compressed bytes");
}
lastChunk.getChunk().position(lastChunk.getChunk().position() +
lastChunkLength);
// Finally, put it in the ranges list for future use (if shared between
RGs).
// Before anyone else accesses it, it would have been allocated and
decompressed locally.
if (lastChunk.getChunk().remaining() <= 0) {
- if (isDebugTracingEnabled) {
- LOG.info("Replacing " + lastChunk + " with " + cc + " in the buffers");
+ if (isTracingEnabled) {
+ LOG.trace("Replacing " + lastChunk + " with " + cc + " in the
buffers");
}
lastChunk.replaceSelfWith(cc);
} else {
- if (isDebugTracingEnabled) {
- LOG.info("Adding " + cc + " before " + lastChunk + " in the buffers");
+ if (isTracingEnabled) {
+ LOG.trace("Adding " + cc + " before " + lastChunk + " in the buffers");
}
lastChunk.insertPartBefore(cc);
}