Repository: hive Updated Branches: refs/heads/llap 5cd092b8b -> 1e3b59d37
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java index d0295d9..1ce7c0a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java @@ -27,55 +27,14 @@ import java.util.ListIterator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.common.DiskRange; -import org.apache.hadoop.hive.common.DiskRangeList; -import org.apache.hadoop.hive.llap.DebugUtils; -import org.apache.hadoop.hive.llap.counters.LowLevelCacheCounters; -import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch.StreamBuffer; -import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer; -import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache; -import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache.CacheChunkFactory; -import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache.Priority; import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.BufferChunk; -import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.CacheChunk; -import org.apache.hadoop.hive.shims.HadoopShims.ZeroCopyReaderShim; -import org.apache.hive.common.util.FixedSizedObjectPool; -import org.apache.hive.common.util.FixedSizedObjectPool.PoolObjectHelper; import com.google.common.annotations.VisibleForTesting; public abstract class InStream extends InputStream { private static final Log LOG = LogFactory.getLog(InStream.class); - private static final FixedSizedObjectPool<TrackedCacheChunk> TCC_POOL = - new FixedSizedObjectPool<>(1024, new PoolObjectHelper<TrackedCacheChunk>() { - @Override - protected TrackedCacheChunk create() { - return new TrackedCacheChunk(); - } - @Override - protected void resetBeforeOffer(TrackedCacheChunk t) { - t.reset(); - } - }); - private static final FixedSizedObjectPool<ProcCacheChunk> PCC_POOL = - new FixedSizedObjectPool<>(1024, new PoolObjectHelper<ProcCacheChunk>() { - @Override - protected ProcCacheChunk create() { - return new ProcCacheChunk(); - } - @Override - protected void resetBeforeOffer(ProcCacheChunk t) { - t.reset(); - } - }); - final static CacheChunkFactory CC_FACTORY = new CacheChunkFactory() { - @Override - public DiskRangeList createCacheChunk(LlapMemoryBuffer buffer, long offset, long end) { - TrackedCacheChunk tcc = TCC_POOL.take(); - tcc.init(buffer, offset, end); - return tcc; - } - }; + protected final Long fileId; protected final String name; protected long length; @@ -220,49 +179,31 @@ public abstract class InStream extends InputStream { private static class CompressedStream extends InStream { private final List<DiskRange> bytes; private final int bufferSize; - private LlapMemoryBuffer cacheBuffer; private ByteBuffer uncompressed; private final CompressionCodec codec; private ByteBuffer compressed; private long currentOffset; private int currentRange; private boolean isUncompressedOriginal; - private final LowLevelCache cache; - private final boolean doManageBuffers = true; public CompressedStream(Long fileId, String name, List<DiskRange> input, long length, - CompressionCodec codec, int bufferSize, LowLevelCache cache) { + CompressionCodec codec, int bufferSize) { super(fileId, name, length); this.bytes = input; this.codec = codec; this.bufferSize = bufferSize; currentOffset = 0; currentRange = 0; - this.cache = cache; } - // TODO: This should not be used for main path. - private final LlapMemoryBuffer[] singleAllocDest = new LlapMemoryBuffer[1]; private void allocateForUncompressed(int size, boolean isDirect) { - if (cache == null) { - cacheBuffer = null; - uncompressed = allocateBuffer(size, isDirect); - } else { - singleAllocDest[0] = null; - cache.getAllocator().allocateMultiple(singleAllocDest, size); - cacheBuffer = singleAllocDest[0]; - uncompressed = cacheBuffer.getByteBufferDup(); - } + uncompressed = allocateBuffer(size, isDirect); } private void readHeader() throws IOException { if (compressed == null || compressed.remaining() <= 0) { seek(currentOffset); } - if (cacheBuffer != null) { - assert compressed == null; - return; // Next block is ready from cache. - } if (compressed.remaining() > OutStream.HEADER_SIZE) { int b0 = compressed.get() & 0xff; int b1 = compressed.get() & 0xff; @@ -293,13 +234,7 @@ public abstract class InStream extends InputStream { uncompressed.clear(); } codec.decompress(slice, uncompressed); - if (cache != null) { - // this is the inefficient path - // TODO: this is invalid; base stripe offset should be passed, return value handled. - //cache.putFileData(fileId, new DiskRange[] { new DiskRange(originalOffset, - // chunkLength + OutStream.HEADER_SIZE) }, new LlapMemoryBuffer[] { cacheBuffer }, 0); - } - } + } } else { throw new IllegalStateException("Can't read header at " + this); } @@ -342,19 +277,10 @@ public abstract class InStream extends InputStream { @Override public void close() { - cacheBuffer = null; uncompressed = null; compressed = null; currentRange = bytes.size(); currentOffset = length; - if (doManageBuffers) { - // TODO: this is the inefficient path for now. LLAP will used this differently. - for (DiskRange range : bytes) { - if (range instanceof CacheChunk) { - cache.releaseBuffer(((CacheChunk)range).buffer); - } - } - } bytes.clear(); } @@ -439,20 +365,10 @@ public abstract class InStream extends InputStream { for (DiskRange range : bytes) { if (range.getOffset() <= desired && desired < range.getEnd()) { currentRange = i; - if (range instanceof BufferChunk) { - cacheBuffer = null; - compressed = range.getData().duplicate(); - int pos = compressed.position(); - pos += (int)(desired - range.getOffset()); - compressed.position(pos); - } else { - compressed = null; - cacheBuffer = ((CacheChunk)range).buffer; - uncompressed = cacheBuffer.getByteBufferDup(); - if (desired != range.getOffset()) { - throw new IOException("Cannot seek into the middle of uncompressed cached data"); - } - } + compressed = range.getData().duplicate(); + int pos = compressed.position(); + pos += (int)(desired - range.getOffset()); + compressed.position(pos); currentOffset = desired; return; } @@ -463,19 +379,8 @@ public abstract class InStream extends InputStream { if (segments != 0 && desired == bytes.get(segments - 1).getEnd()) { DiskRange range = bytes.get(segments - 1); currentRange = segments - 1; - if (range instanceof BufferChunk) { - cacheBuffer = null; - compressed = range.getData().duplicate(); - compressed.position(compressed.limit()); - } else { - compressed = null; - cacheBuffer = ((CacheChunk)range).buffer; - uncompressed = cacheBuffer.getByteBufferDup(); - uncompressed.position(uncompressed.limit()); - if (desired != range.getOffset()) { - throw new IOException("Cannot seek into the middle of uncompressed cached data"); - } - } + compressed = range.getData().duplicate(); + compressed.position(compressed.limit()); currentOffset = desired; return; } @@ -542,7 +447,7 @@ public abstract class InStream extends InputStream { for (int i = 0; i < buffers.length; ++i) { input.add(new BufferChunk(buffers[i], offsets[i])); } - return create(fileId, streamName, input, length, codec, bufferSize, null); + return create(fileId, streamName, input, length, codec, bufferSize); } /** @@ -561,837 +466,11 @@ public abstract class InStream extends InputStream { List<DiskRange> input, long length, CompressionCodec codec, - int bufferSize, - LowLevelCache cache) throws IOException { + int bufferSize) throws IOException { if (codec == null) { return new UncompressedStream(fileId, name, input, length); } else { - return new CompressedStream(fileId, name, input, length, codec, bufferSize, cache); - } - } - - /** Cache chunk which tracks whether it has been fully read. See - EncodedReaderImpl class comment about refcounts. */ - // TODO: these classes need some cleanup. Find every cast and field access and change to OO; - public static class TrackedCacheChunk extends CacheChunk { - public boolean isReleased = false; - public TrackedCacheChunk() { - super(null, -1, -1); - } - - public void init(LlapMemoryBuffer buffer, long offset, long end) { - this.buffer = buffer; - this.offset = offset; - this.end = end; - } - - public void reset() { - this.buffer = null; - this.next = this.prev = null; - this.isReleased = false; - } - - public void handleCacheCollision(LowLevelCache cache, - LlapMemoryBuffer replacementBuffer, List<LlapMemoryBuffer> cacheBuffers) { - throw new UnsupportedOperationException(); - } - } - - // TODO: should we pool these? only used for uncompressed. - private static class UncompressedCacheChunk extends TrackedCacheChunk { - private BufferChunk chunk; - private int count; - - public UncompressedCacheChunk(BufferChunk bc) { - super(); - init(null, bc.getOffset(), bc.getEnd()); - chunk = bc; - count = 1; - } - - public void addChunk(BufferChunk bc) { - assert bc.getOffset() == this.getEnd(); - this.end = bc.getEnd(); - ++count; - } - - public BufferChunk getChunk() { - return chunk; - } - - public int getCount() { - return count; - } - - @Override - public void handleCacheCollision(LowLevelCache cache, - LlapMemoryBuffer replacementBuffer, List<LlapMemoryBuffer> cacheBuffers) { - assert cacheBuffers == null; - // This is done at pre-read stage where there's nothing special w/refcounts. Just release. - if (DebugUtils.isTraceCachingEnabled()) { - LOG.info("Deallocating " + buffer + " due to cache collision during pre-read"); - } - cache.getAllocator().deallocate(buffer); - // Replace the buffer in our big range list, as well as in current results. - this.buffer = replacementBuffer; - } - - public void clear() { - this.chunk = null; - this.count = -1; - } - } - - /** - * CacheChunk that is pre-created for new cache data; initially, it contains an original disk - * buffer and an unallocated LlapMemoryBuffer object. Before we expose it, the LMB is allocated, - * the data is decompressed, and original compressed data is discarded. The chunk lives on in - * the DiskRange list created for the request, and everyone treats it like regular CacheChunk. - */ - private static class ProcCacheChunk extends TrackedCacheChunk { - public void init(long cbStartOffset, long cbEndOffset, boolean isCompressed, - ByteBuffer originalData, LlapMemoryBuffer targetBuffer, int originalCbIndex) { - super.init(targetBuffer, cbStartOffset, cbEndOffset); - this.isCompressed = isCompressed; - this.originalData = originalData; - this.originalCbIndex = originalCbIndex; - } - - @Override - public void reset() { - super.reset(); - this.originalData = null; - } - - @Override - public void handleCacheCollision(LowLevelCache cache, LlapMemoryBuffer replacementBuffer, - List<LlapMemoryBuffer> cacheBuffers) { - assert originalCbIndex >= 0; - // Had the put succeeded for our new buffer, it would have refcount of 2 - 1 from put, - // and 1 from notifyReused call above. "Old" buffer now has the 1 from put; new buffer - // is not in cache. - if (DebugUtils.isTraceCachingEnabled()) { - LOG.info("Deallocating " + buffer + " due to cache collision"); - } - cache.getAllocator().deallocate(buffer); - cache.notifyReused(replacementBuffer); - // Replace the buffer in our big range list, as well as in current results. - this.buffer = replacementBuffer; - cacheBuffers.set(originalCbIndex, replacementBuffer); - originalCbIndex = -1; // This can only happen once at decompress time. - } - - boolean isCompressed; - ByteBuffer originalData = null; - int originalCbIndex; - } - - /** - * Uncompresses part of the stream. RGs can overlap, so we cannot just go and decompress - * and remove what we have returned. We will keep iterator as a "hint" point. - * @param fileName File name for cache keys. - * @param baseOffset Absolute offset of boundaries and ranges relative to file, for cache keys. - * @param start Ordered ranges containing file data. Helpful if they point close to cOffset. - * @param cOffset Start offset to decompress. - * @param endCOffset End offset to decompress; estimate, partial CBs will be ignored. - * @param zcr Zero-copy reader, if any, to release discarded buffers. - * @param codec Compression codec. - * @param bufferSize Compressed buffer (CB) size. - * @param cache Low-level cache to cache new data. - * @param streamBuffer Stream buffer, to add the results. - * @param unlockUntilCOffset The offset until which the buffers can be unlocked in cache, as - * they will not be used in future calls (see the class comment in - * EncodedReaderImpl about refcounts). - * @param qfCounters - * @return Last buffer cached during decomrpession. Cache buffers are never removed from - * the master list, so they are safe to keep as iterators for various streams. - */ - // TODO: move to EncodedReaderImpl - // TODO: this method has too many arguments... perhaps we can clean it up - public static DiskRangeList readEncodedStream(long fileId, long baseOffset, DiskRangeList start, - long cOffset, long endCOffset, ZeroCopyReaderShim zcr, CompressionCodec codec, - int bufferSize, LowLevelCache cache, StreamBuffer streamBuffer, long unlockUntilCOffset, - long streamOffset, LowLevelCacheCounters qfCounters) throws IOException { - if (streamBuffer.cacheBuffers == null) { - streamBuffer.cacheBuffers = new ArrayList<LlapMemoryBuffer>(); - } else { - streamBuffer.cacheBuffers.clear(); - } - if (cOffset == endCOffset) return null; - boolean isCompressed = codec != null; - List<ProcCacheChunk> toDecompress = null; - List<ByteBuffer> toRelease = null; - if (isCompressed) { - toRelease = zcr == null ? null : new ArrayList<ByteBuffer>(); - toDecompress = new ArrayList<ProcCacheChunk>(); - } - - // 1. Find our bearings in the stream. Normally, iter will already point either to where we - // want to be, or just before. However, RGs can overlap due to encoding, so we may have - // to return to a previous block. - DiskRangeList current = findExactPosition(start, cOffset); - if (DebugUtils.isTraceOrcEnabled()) { - LOG.info("Starting read for [" + cOffset + "," + endCOffset + ") at " + current); - } - - TrackedCacheChunk lastUncompressed = null; - - // 2. Go thru the blocks; add stuff to results and prepare the decompression work (see below). - lastUncompressed = isCompressed ? - prepareRangesForCompressedRead(cOffset, endCOffset, streamOffset, unlockUntilCOffset, - bufferSize, current, zcr, cache, streamBuffer, toRelease, toDecompress) - : prepareRangesForUncompressedRead( - cOffset, endCOffset, streamOffset, unlockUntilCOffset, current, cache, streamBuffer); - - // 3. Allocate the buffers, prepare cache keys. - // At this point, we have read all the CBs we need to read. cacheBuffers contains some cache - // data and some unallocated membufs for decompression. toDecompress contains all the work we - // need to do, and each item points to one of the membufs in cacheBuffers as target. The iter - // has also been adjusted to point to these buffers instead of compressed data for the ranges. - if (toDecompress == null) return lastUncompressed; // Nothing to decompress. - - LlapMemoryBuffer[] targetBuffers = new LlapMemoryBuffer[toDecompress.size()]; - DiskRange[] cacheKeys = new DiskRange[toDecompress.size()]; - int ix = 0; - for (ProcCacheChunk chunk : toDecompress) { - cacheKeys[ix] = chunk; // Relies on the fact that cache does not actually store these. - targetBuffers[ix] = chunk.buffer; - ++ix; - } - cache.getAllocator().allocateMultiple(targetBuffers, bufferSize); - - // 4. Now decompress (or copy) the data into cache buffers. - for (ProcCacheChunk chunk : toDecompress) { - ByteBuffer dest = chunk.buffer.getByteBufferRaw(); - if (chunk.isCompressed) { - decompressChunk(chunk.originalData, codec, dest); - } else { - copyUncompressedChunk(chunk.originalData, dest); - } - - chunk.originalData = null; - if (DebugUtils.isTraceLockingEnabled()) { - LOG.info("Locking " + chunk.buffer + " due to reuse (after decompression)"); - } - cache.notifyReused(chunk.buffer); - } - - // 5. Release original compressed buffers to zero-copy reader if needed. - if (toRelease != null) { - assert zcr != null; - for (ByteBuffer buf : toRelease) { - zcr.releaseBuffer(buf); - } - } - - // 6. Finally, put uncompressed data to cache. - long[] collisionMask = cache.putFileData( - fileId, cacheKeys, targetBuffers, baseOffset, Priority.NORMAL, qfCounters); - processCacheCollisions( - cache, collisionMask, toDecompress, targetBuffers, streamBuffer.cacheBuffers); - - // 7. It may happen that we only use new compression buffers once. Release initial refcounts. - for (ProcCacheChunk chunk : toDecompress) { - ponderReleaseInitialRefcount(cache, unlockUntilCOffset, streamOffset, chunk); - } - - return lastUncompressed; - } - - private static TrackedCacheChunk prepareRangesForCompressedRead(long cOffset, long endCOffset, - long streamOffset, long unlockUntilCOffset, int bufferSize, DiskRangeList current, - ZeroCopyReaderShim zcr, LowLevelCache cache, StreamBuffer streamBuffer, - List<ByteBuffer> toRelease, List<ProcCacheChunk> toDecompress) throws IOException { - if (cOffset > current.getOffset()) { - // Target compression block is in the middle of the range; slice the range in two. - current = current.split(cOffset).next; - } - long currentOffset = cOffset; - TrackedCacheChunk lastUncompressed = null; - while (true) { - DiskRangeList next = null; - if (current instanceof TrackedCacheChunk) { - // 2a. This is a decoded compression buffer, add as is. - TrackedCacheChunk cc = (TrackedCacheChunk)current; - if (DebugUtils.isTraceLockingEnabled()) { - LOG.info("Locking " + cc.buffer + " due to reuse"); - } - boolean canReuse = cache.notifyReused(cc.buffer); - assert canReuse; - streamBuffer.cacheBuffers.add(cc.buffer); - currentOffset = cc.getEnd(); - if (DebugUtils.isTraceOrcEnabled()) { - LOG.info("Adding an already-uncompressed buffer " + cc.buffer); - } - ponderReleaseInitialRefcount(cache, unlockUntilCOffset, streamOffset, cc); - lastUncompressed = cc; - next = current.next; - } else if (current instanceof IncompleteCb) { - // 2b. This is a known incomplete CB caused by ORC CB end boundaries being estimates. - if (DebugUtils.isTraceOrcEnabled()) { - LOG.info("Cannot read " + current); - } - next = null; - currentOffset = -1; - } else { - // 2c. This is a compressed buffer. We need to uncompress it; the buffer can comprise - // several disk ranges, so we might need to combine them. - BufferChunk bc = (BufferChunk)current; - ProcCacheChunk newCached = addOneCompressionBuffer(bc, zcr, bufferSize, - cache, streamBuffer.cacheBuffers, toDecompress, toRelease); - lastUncompressed = (newCached == null) ? lastUncompressed : newCached; - next = (newCached != null) ? newCached.next : null; - currentOffset = (next != null) ? next.getOffset() : -1; - } - - if (next == null || (endCOffset >= 0 && currentOffset >= endCOffset)) { - break; - } - current = next; - } - return lastUncompressed; - } - - private static TrackedCacheChunk prepareRangesForUncompressedRead(long cOffset, long endCOffset, - long streamOffset, long unlockUntilCOffset, DiskRangeList current, LowLevelCache cache, - StreamBuffer streamBuffer) throws IOException { - long currentOffset = cOffset; - TrackedCacheChunk lastUncompressed = null; - boolean isFirst = true; - while (true) { - DiskRangeList next = null; - assert current instanceof TrackedCacheChunk; - lastUncompressed = (TrackedCacheChunk)current; - if (DebugUtils.isTraceLockingEnabled()) { - LOG.info("Locking " + lastUncompressed.buffer + " due to reuse"); - } - boolean canReuse = cache.notifyReused(lastUncompressed.buffer); - assert canReuse; - if (isFirst) { - streamBuffer.indexBaseOffset = (int)(lastUncompressed.getOffset() - streamOffset); - isFirst = false; - } - streamBuffer.cacheBuffers.add(lastUncompressed.buffer); - currentOffset = lastUncompressed.getEnd(); - if (DebugUtils.isTraceOrcEnabled()) { - LOG.info("Adding an uncompressed buffer " + lastUncompressed.buffer); - } - ponderReleaseInitialRefcount(cache, unlockUntilCOffset, streamOffset, lastUncompressed); - next = current.next; - if (next == null || (endCOffset >= 0 && currentOffset >= endCOffset)) { - break; - } - current = next; - } - return lastUncompressed; - } - - /** - * To achieve some sort of consistent cache boundaries, we will cache streams deterministically; - * in segments starting w/stream start, and going for either stream size or maximum allocation. - * If we are not reading the entire segment's worth of data, then we will not cache the partial - * RGs; the breakage of cache assumptions (no interleaving blocks, etc.) is way too much PITA - * to handle just for this case. - * We could avoid copy in non-zcr case and manage the buffer that was not allocated by our - * allocator. Uncompressed case is not mainline though so let's not complicate it. - * @param qfCounters - */ - public static DiskRangeList preReadUncompressedStream(long fileId, - long baseOffset, DiskRangeList start, long streamOffset, long streamEnd, - ZeroCopyReaderShim zcr, LowLevelCache cache, LowLevelCacheCounters qfCounters) - throws IOException { - if (streamOffset == streamEnd) return null; - List<UncompressedCacheChunk> toCache = null; - List<ByteBuffer> toRelease = null; - - // 1. Find our bearings in the stream. - DiskRangeList current = findIntersectingPosition(start, streamOffset, streamEnd); - if (DebugUtils.isTraceOrcEnabled()) { - LOG.info("Starting pre-read for [" + streamOffset + "," + streamEnd + ") at " + current); - } - - if (streamOffset > current.getOffset()) { - // Target compression block is in the middle of the range; slice the range in two. - current = current.split(streamOffset).next; - } - // Account for maximum cache buffer size. - long streamLen = streamEnd - streamOffset; - int partSize = cache.getAllocator().getMaxAllocation(), - partCount = (int)((streamLen / partSize) + (((streamLen % partSize) != 0) ? 1 : 0)); - long partOffset = streamOffset, partEnd = Math.min(partOffset + partSize, streamEnd); - - TrackedCacheChunk lastUncompressed = null; - LlapMemoryBuffer[] singleAlloc = new LlapMemoryBuffer[1]; - for (int i = 0; i < partCount; ++i) { - long hasEntirePartTo = -1; - if (partOffset == current.getOffset()) { - hasEntirePartTo = partOffset; - // We assume cache chunks would always match the way we read, so check and skip it. - if (current instanceof TrackedCacheChunk) { - lastUncompressed = (TrackedCacheChunk)current; - assert current.getOffset() == partOffset && current.getEnd() == partEnd; - partOffset = partEnd; - partEnd = Math.min(partOffset + partSize, streamEnd); - continue; - } - } - if (current.getOffset() >= partEnd) { - // We have no data at all for this part of the stream (could be unneeded), skip. - partOffset = partEnd; - partEnd = Math.min(partOffset + partSize, streamEnd); - continue; - } - if (toRelease == null && zcr != null) { - toRelease = new ArrayList<ByteBuffer>(); - } - // We have some disk buffers... see if we have entire part, etc. - UncompressedCacheChunk candidateCached = null; - DiskRangeList next = current; - while (true) { - if (next == null || next.getOffset() >= partEnd) { - if (hasEntirePartTo < partEnd && candidateCached != null) { - // We are missing a section at the end of the part... - lastUncompressed = copyAndReplaceCandidateToNonCached( - candidateCached, partOffset, hasEntirePartTo, cache, singleAlloc); - candidateCached = null; - } - break; - } - current = next; - boolean wasSplit = (current.getEnd() > partEnd); - if (wasSplit) { - current = current.split(partEnd); - } - if (DebugUtils.isTraceOrcEnabled()) { - LOG.info("Processing uncompressed file data at [" - + current.getOffset() + ", " + current.getEnd() + ")"); - } - BufferChunk bc = (BufferChunk)current; - if (!wasSplit && toRelease != null) { - toRelease.add(bc.chunk); // TODO: is it valid to give zcr the modified 2nd part? - } - - // Track if we still have the entire part. - long hadEntirePartTo = hasEntirePartTo; - if (hasEntirePartTo != -1) { - hasEntirePartTo = (hasEntirePartTo == current.getOffset()) ? current.getEnd() : -1; - } - if (candidateCached != null && hasEntirePartTo == -1) { - lastUncompressed = copyAndReplaceCandidateToNonCached( - candidateCached, partOffset, hadEntirePartTo, cache, singleAlloc); - candidateCached = null; - } - - if (hasEntirePartTo != -1) { - // So far we have all the data from the beginning of the part. - if (candidateCached == null) { - candidateCached = new UncompressedCacheChunk(bc); - } else { - candidateCached.addChunk(bc); - } - // We will take care of this at the end of the part, or if we find a gap. - next = current.next; - continue; - } - // We don't have the entire part; just copy to an allocated buffer. We could try to - // optimize a bit if we have contiguous buffers with gaps, but it's probably not needed. - lastUncompressed = copyAndReplaceUncompressedToNonCached(bc, cache, singleAlloc); - next = lastUncompressed.next; - } - if (candidateCached != null) { - if (toCache == null) { - toCache = new ArrayList<>(partCount - i); - } - toCache.add(candidateCached); - } - } - - // 3. Allocate the buffers, prepare cache keys. - if (toCache == null) return lastUncompressed; // Nothing to copy and cache. - - LlapMemoryBuffer[] targetBuffers = - toCache.size() == 1 ? singleAlloc : new LlapMemoryBuffer[toCache.size()]; - targetBuffers[0] = null; - DiskRange[] cacheKeys = new DiskRange[toCache.size()]; - int ix = 0; - for (UncompressedCacheChunk chunk : toCache) { - cacheKeys[ix] = chunk; // Relies on the fact that cache does not actually store these. - ++ix; - } - cache.getAllocator().allocateMultiple( - targetBuffers, (int)(partCount == 1 ? streamLen : partSize)); - - // 4. Now copy the data into cache buffers. - ix = 0; - for (UncompressedCacheChunk candidateCached : toCache) { - candidateCached.buffer = targetBuffers[ix]; - ByteBuffer dest = candidateCached.buffer.getByteBufferRaw(); - copyAndReplaceUncompressedChunks(candidateCached, dest, candidateCached); - candidateCached.clear(); - lastUncompressed = candidateCached; - ++ix; - } - - // 5. Release original compressed buffers to zero-copy reader if needed. - if (toRelease != null) { - assert zcr != null; - for (ByteBuffer buf : toRelease) { - zcr.releaseBuffer(buf); - } - } - - // 6. Finally, put uncompressed data to cache. - long[] collisionMask = cache.putFileData( - fileId, cacheKeys, targetBuffers, baseOffset, Priority.NORMAL, qfCounters); - processCacheCollisions(cache, collisionMask, toCache, targetBuffers, null); - - return lastUncompressed; - } - - private static void copyUncompressedChunk(ByteBuffer src, ByteBuffer dest) { - int startPos = dest.position(), startLim = dest.limit(); - dest.put(src); // Copy uncompressed data to cache. - // Put moves position forward by the size of the data. - int newPos = dest.position(); - if (newPos > startLim) { - throw new AssertionError("After copying, buffer [" + startPos + ", " + startLim - + ") became [" + newPos + ", " + dest.limit() + ")"); - } - dest.position(startPos); - dest.limit(newPos); - } - - - private static TrackedCacheChunk copyAndReplaceCandidateToNonCached( - UncompressedCacheChunk candidateCached, long partOffset, - long candidateEnd, LowLevelCache cache, LlapMemoryBuffer[] singleAlloc) { - // We thought we had the entire part to cache, but we don't; convert start to - // non-cached. Since we are at the first gap, the previous stuff must be contiguous. - singleAlloc[0] = null; - cache.getAllocator().allocateMultiple(singleAlloc, (int)(candidateEnd - partOffset)); - - LlapMemoryBuffer buffer = singleAlloc[0]; - cache.notifyReused(buffer); - ByteBuffer dest = buffer.getByteBufferRaw(); - TrackedCacheChunk tcc = TCC_POOL.take(); - tcc.init(buffer, partOffset, candidateEnd); - copyAndReplaceUncompressedChunks(candidateCached, dest, tcc); - return tcc; - } - - private static TrackedCacheChunk copyAndReplaceUncompressedToNonCached( - BufferChunk bc, LowLevelCache cache, LlapMemoryBuffer[] singleAlloc) { - singleAlloc[0] = null; - cache.getAllocator().allocateMultiple(singleAlloc, bc.getLength()); - LlapMemoryBuffer buffer = singleAlloc[0]; - cache.notifyReused(buffer); - ByteBuffer dest = buffer.getByteBufferRaw(); - TrackedCacheChunk tcc = TCC_POOL.take(); - tcc.init(buffer, bc.getOffset(), bc.getEnd()); - copyUncompressedChunk(bc.chunk, dest); - bc.replaceSelfWith(tcc); - return tcc; - } - - private static void copyAndReplaceUncompressedChunks( - UncompressedCacheChunk candidateCached, ByteBuffer dest, TrackedCacheChunk tcc) { - int startPos = dest.position(), startLim = dest.limit(); - BufferChunk chunk = candidateCached.getChunk(); - for (int i = 0; i < candidateCached.getCount(); ++i) { - dest.put(chunk.getData()); - BufferChunk next = (BufferChunk)(chunk.next); - if (i == 0) { - chunk.replaceSelfWith(tcc); - } else { - chunk.removeSelf(); - } - chunk = next; - } - int newPos = dest.position(); - if (newPos > startLim) { - throw new AssertionError("After copying, buffer [" + startPos + ", " + startLim - + ") became [" + newPos + ", " + dest.limit() + ")"); - } - dest.position(startPos); - dest.limit(newPos); - } - - private static void decompressChunk( - ByteBuffer src, CompressionCodec codec, ByteBuffer dest) throws IOException { - int startPos = dest.position(), startLim = dest.limit(); - codec.decompress(src, dest); - // Codec resets the position to 0 and limit to correct limit. - dest.position(startPos); - int newLim = dest.limit(); - if (newLim > startLim) { - throw new AssertionError("After codec, buffer [" + startPos + ", " + startLim - + ") became [" + dest.position() + ", " + newLim + ")"); - } - } - - public static void releaseCacheChunksIntoObjectPool(DiskRangeList current) { - while (current != null) { - if (current instanceof ProcCacheChunk) { - PCC_POOL.offer((ProcCacheChunk)current); - } else if (current instanceof TrackedCacheChunk) { - TCC_POOL.offer((TrackedCacheChunk)current); - } - current = current.next; - } - } - - private static void ponderReleaseInitialRefcount(LowLevelCache cache, - long unlockUntilCOffset, long streamStartOffset, TrackedCacheChunk cc) { - if (cc.getEnd() > unlockUntilCOffset) return; - assert !cc.isReleased; - releaseInitialRefcount(cache, cc, false); - // Release all the previous buffers that we may not have been able to release due to reuse. - DiskRangeList prev = cc.prev; - while (true) { - if ((prev == null) || (prev.getEnd() <= streamStartOffset) - || !(prev instanceof TrackedCacheChunk)) break; - TrackedCacheChunk prevCc = (TrackedCacheChunk)prev; - if (prevCc.isReleased) break; - releaseInitialRefcount(cache, prevCc, true); - prev = prev.prev; - } - } - - private static void releaseInitialRefcount( - LowLevelCache cache, TrackedCacheChunk cc, boolean isBacktracking) { - // This is the last RG for which this buffer will be used. Remove the initial refcount - if (DebugUtils.isTraceLockingEnabled()) { - LOG.info("Unlocking " + cc.buffer + " for the fetching thread" - + (isBacktracking ? "; backtracking" : "")); - } - cache.releaseBuffer(cc.buffer); - cc.isReleased = true; - } - - private static void processCacheCollisions(LowLevelCache cache, long[] collisionMask, - List<? extends TrackedCacheChunk> toDecompress, LlapMemoryBuffer[] targetBuffers, - List<LlapMemoryBuffer> cacheBuffers) { - if (collisionMask == null) return; - assert collisionMask.length >= (toDecompress.size() >>> 6); - // There are some elements that were cached in parallel, take care of them. - long maskVal = -1; - for (int i = 0; i < toDecompress.size(); ++i) { - if ((i & 63) == 0) { - maskVal = collisionMask[i >>> 6]; - } - if ((maskVal & 1) == 1) { - // Cache has found an old buffer for the key and put it into array instead of our new one. - TrackedCacheChunk replacedChunk = toDecompress.get(i); - LlapMemoryBuffer replacementBuffer = targetBuffers[i]; - if (DebugUtils.isTraceOrcEnabled()) { - LOG.info("Discarding data due to cache collision: " + replacedChunk.buffer - + " replaced with " + replacementBuffer); - } - assert replacedChunk.buffer != replacementBuffer : i + " was not replaced in the results " - + "even though mask is [" + Long.toBinaryString(maskVal) + "]"; - replacedChunk.handleCacheCollision(cache, replacementBuffer, cacheBuffers); - } - maskVal >>= 1; - } - } - - - /** Finds compressed offset in a stream and makes sure iter points to its position. - This may be necessary for obscure combinations of compression and encoding boundaries. */ - private static DiskRangeList findExactPosition(DiskRangeList ranges, long offset) { - if (offset < 0) return ranges; - return findIntersectingPosition(ranges, offset, offset); - } - - private static DiskRangeList findIntersectingPosition(DiskRangeList ranges, long offset, long end) { - if (offset < 0) return ranges; - // We expect the offset to be valid TODO: rather, validate - while (ranges.getEnd() <= offset) { - ranges = ranges.next; - } - while (ranges.getOffset() > end) { - ranges = ranges.prev; - } - // We are now on some intersecting buffer, find the first intersecting buffer. - while (ranges.prev != null && ranges.prev.getEnd() > offset) { - ranges = ranges.prev; - } - return ranges; - } - - private static class IncompleteCb extends DiskRangeList { - public IncompleteCb(long offset, long end) { - super(offset, end); - } - - @Override - public String toString() { - return "incomplete CB start: " + offset + " end: " + end; - } - } - - /** - * Reads one compression block from the source; handles compression blocks read from - * multiple ranges (usually, that would only happen with zcr). - * Adds stuff to cachedBuffers, toDecompress and toRelease (see below what each does). - * @param current BufferChunk where compression block starts. - * @param ranges Iterator of all chunks, pointing at current. - * @param cacheBuffers The result buffer array to add pre-allocated target cache buffer. - * @param toDecompress The list of work to decompress - pairs of compressed buffers and the - * target buffers (same as the ones added to cacheBuffers). - * @param toRelease The list of buffers to release to zcr because they are no longer in use. - * @return The resulting cache chunk. - */ - private static ProcCacheChunk addOneCompressionBuffer(BufferChunk current, ZeroCopyReaderShim zcr, - int bufferSize, LowLevelCache cache, List<LlapMemoryBuffer> cacheBuffers, - List<ProcCacheChunk> toDecompress, List<ByteBuffer> toRelease) throws IOException { - ByteBuffer slice = null; - ByteBuffer compressed = current.chunk; - long cbStartOffset = current.getOffset(); - int b0 = compressed.get() & 0xff; - int b1 = compressed.get() & 0xff; - int b2 = compressed.get() & 0xff; - int chunkLength = (b2 << 15) | (b1 << 7) | (b0 >> 1); - if (chunkLength > bufferSize) { - throw new IllegalArgumentException("Buffer size too small. size = " + - bufferSize + " needed = " + chunkLength); - } - int consumedLength = chunkLength + OutStream.HEADER_SIZE; - long cbEndOffset = cbStartOffset + consumedLength; - boolean isUncompressed = ((b0 & 0x01) == 1); - if (DebugUtils.isTraceOrcEnabled()) { - LOG.info("Found CB at " + cbStartOffset + ", chunk length " + chunkLength + ", total " - + consumedLength + ", " + (isUncompressed ? "not " : "") + "compressed"); - } - if (compressed.remaining() >= chunkLength) { - // Simple case - CB fits entirely in the disk range. - slice = compressed.slice(); - slice.limit(chunkLength); - ProcCacheChunk cc = addOneCompressionBlockByteBuffer(slice, isUncompressed, cbStartOffset, - cbEndOffset, chunkLength, current, cache, toDecompress, cacheBuffers); - if (compressed.remaining() <= 0 && zcr != null) { - toRelease.add(compressed); - } - return cc; - } - if (current.getEnd() < cbEndOffset && !current.hasContiguousNext()) { - addIncompleteCompressionBuffer(cbStartOffset, current, 0); - return null; // This is impossible to read from this chunk. - } - - // TODO: we could remove extra copy for isUncompressed case by copying directly to cache. - // We need to consolidate 2 or more buffers into one to decompress. - ByteBuffer copy = allocateBuffer(chunkLength, compressed.isDirect()); - int remaining = chunkLength - compressed.remaining(); - int originalPos = compressed.position(); - copy.put(compressed); - if (DebugUtils.isTraceOrcEnabled()) { - LOG.info("Removing partial CB " + current + " from ranges after copying its contents"); - } - DiskRangeList next = current.next; - current.removeSelf(); - if (zcr != null) { - if (originalPos == 0) { - zcr.releaseBuffer(compressed); // We copied the entire buffer. - } else { - toRelease.add(compressed); // There might be slices depending on this buffer. - } - } - - int extraChunkCount = 0; - while (true) { - if (!(next instanceof BufferChunk)) { - throw new IOException("Trying to extend compressed block into uncompressed block " + next); - } - compressed = next.getData(); - ++extraChunkCount; - if (compressed.remaining() >= remaining) { - // This is the last range for this compression block. Yay! - slice = compressed.slice(); - slice.limit(remaining); - copy.put(slice); - ProcCacheChunk cc = addOneCompressionBlockByteBuffer( - copy, isUncompressed, cbStartOffset, cbEndOffset, remaining, - (BufferChunk)next, cache, toDecompress, cacheBuffers); - if (compressed.remaining() <= 0 && zcr != null) { - zcr.releaseBuffer(compressed); // We copied the entire buffer. - } - return cc; - } - remaining -= compressed.remaining(); - copy.put(compressed); - if (zcr != null) { - zcr.releaseBuffer(compressed); // We copied the entire buffer. - } - DiskRangeList tmp = next; - next = next.hasContiguousNext() ? next.next : null; - if (next != null) { - if (DebugUtils.isTraceOrcEnabled()) { - LOG.info("Removing partial CB " + tmp + " from ranges after copying its contents"); - } - tmp.removeSelf(); - } else { - addIncompleteCompressionBuffer(cbStartOffset, tmp, extraChunkCount); - return null; // This is impossible to read from this chunk. - } - } - } - - private static void addIncompleteCompressionBuffer( - long cbStartOffset, DiskRangeList target, int extraChunkCount) { - IncompleteCb icb = new IncompleteCb(cbStartOffset, target.getEnd()); - if (DebugUtils.isTraceOrcEnabled()) { - LOG.info("Replacing " + target + " (and " + extraChunkCount + " previous chunks) with " - + icb + " in the buffers"); - } - target.replaceSelfWith(icb); - } - - /** - * Add one buffer with compressed data the results for addOneCompressionBuffer (see javadoc). - * @param fullCompressionBlock (fCB) Entire compression block, sliced or copied from disk data. - * @param isUncompressed Whether the data in the block is uncompressed. - * @param cbStartOffset Compressed start offset of the fCB. - * @param cbEndOffset Compressed end offset of the fCB. - * @param lastRange The buffer from which the last (or all) bytes of fCB come. - * @param lastChunkLength The number of compressed bytes consumed from last *chunk* into fullCompressionBlock. - * @param ranges The iterator of all compressed ranges for the stream, pointing at lastRange. - * @param lastChunk - * @param toDecompress See addOneCompressionBuffer. - * @param cacheBuffers See addOneCompressionBuffer. - * @return New cache buffer. - */ - private static ProcCacheChunk addOneCompressionBlockByteBuffer(ByteBuffer fullCompressionBlock, - boolean isUncompressed, long cbStartOffset, long cbEndOffset, int lastChunkLength, - BufferChunk lastChunk, LowLevelCache cache, List<ProcCacheChunk> toDecompress, - List<LlapMemoryBuffer> cacheBuffers) { - // Prepare future cache buffer. - LlapMemoryBuffer futureAlloc = cache.createUnallocated(); - // Add it to result in order we are processing. - cacheBuffers.add(futureAlloc); - // Add it to the list of work to decompress. - ProcCacheChunk cc = PCC_POOL.take(); - cc.init(cbStartOffset, cbEndOffset, !isUncompressed, - fullCompressionBlock, futureAlloc, cacheBuffers.size() - 1); - toDecompress.add(cc); - // Adjust the compression block position. - if (DebugUtils.isTraceOrcEnabled()) { - LOG.info("Adjusting " + lastChunk + " to consume " + lastChunkLength + " compressed bytes"); - } - lastChunk.chunk.position(lastChunk.chunk.position() + lastChunkLength); - // Finally, put it in the ranges list for future use (if shared between RGs). - // Before anyone else accesses it, it would have been allocated and decompressed locally. - if (lastChunk.chunk.remaining() <= 0) { - if (DebugUtils.isTraceOrcEnabled()) { - LOG.info("Replacing " + lastChunk + " with " + cc + " in the buffers"); - } - lastChunk.replaceSelfWith(cc); - } else { - if (DebugUtils.isTraceOrcEnabled()) { - LOG.info("Adding " + cc + " before " + lastChunk + " in the buffers"); - } - lastChunk.insertPartBefore(cc); + return new CompressedStream(fileId, name, input, length, codec, bufferSize); } - return cc; } } http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReaderImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReaderImpl.java index d08ad1f..8b88e05 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReaderImpl.java @@ -83,12 +83,12 @@ public class MetadataReaderImpl implements MetadataReader { file.readFully(buffer); indexes[col] = OrcProto.RowIndex.parseFrom(InStream.create(null, "index", Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)), stream.getLength(), - codec, bufferSize, null)); + codec, bufferSize)); if (readBloomFilter) { bb.position((int) stream.getLength()); bloomFilterIndices[col] = OrcProto.BloomFilterIndex.parseFrom(InStream.create( null, "bloom_filter", Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)), - nextStream.getLength(), codec, bufferSize, null)); + nextStream.getLength(), codec, bufferSize)); } } } @@ -109,7 +109,7 @@ public class MetadataReaderImpl implements MetadataReader { file.readFully(tailBuf.array(), tailBuf.arrayOffset(), tailLength); return OrcProto.StripeFooter.parseFrom(InStream.create(null, "footer", Lists.<DiskRange>newArrayList(new BufferChunk(tailBuf, 0)), - tailLength, codec, bufferSize, null)); + tailLength, codec, bufferSize)); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java index 1eb0dec..a92b455 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java @@ -22,11 +22,9 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; -import org.apache.hadoop.hive.llap.Consumer; -import org.apache.hadoop.hive.llap.counters.LowLevelCacheCounters; -import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache; -import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey; -import org.apache.hadoop.hive.ql.io.orc.EncodedReaderImpl.OrcEncodedColumnBatch; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hive.common.io.storage_api.DataCache; +import org.apache.hadoop.hive.common.io.storage_api.DataReader; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -318,9 +316,8 @@ public interface Reader { MetadataReader metadata() throws IOException; - EncodedReader encodedReader(long fileId, LowLevelCache lowLevelCache, - LowLevelCacheCounters qfCounters, Consumer<OrcEncodedColumnBatch> consumer) - throws IOException; + EncodedReader encodedReader( + long fileId, DataCache dataCache, DataReader dataReader) throws IOException; List<Integer> getVersionList(); @@ -331,4 +328,6 @@ public interface Reader { List<StripeStatistics> getStripeStatistics(); List<OrcProto.ColumnStatistics> getOrcProtoFileStatistics(); + + DataReader createDefaultDataReader(boolean useZeroCopy); } http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java index 7d14e04..36d8e0e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java @@ -34,14 +34,10 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.DiskRange; +import org.apache.hadoop.hive.common.io.storage_api.DataCache; +import org.apache.hadoop.hive.common.io.storage_api.DataReader; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.llap.Consumer; -import org.apache.hadoop.hive.llap.counters.LowLevelCacheCounters; -import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache; -import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey; -import org.apache.hadoop.hive.ql.io.orc.EncodedReaderImpl.OrcEncodedColumnBatch; import org.apache.hadoop.hive.ql.io.orc.OrcFile.WriterVersion; -import org.apache.hadoop.hive.ql.io.orc.OrcProto.Footer; import org.apache.hadoop.hive.ql.io.FileFormatException; import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type; import org.apache.hadoop.hive.ql.io.orc.OrcProto.UserMetadataItem; @@ -505,13 +501,13 @@ public class ReaderImpl implements Reader { footerBuffer.limit(position + metadataSize); InputStream instream = InStream.create(null, "metadata", Lists.<DiskRange>newArrayList( - new BufferChunk(footerBuffer, 0)), metadataSize, codec, bufferSize, null); + new BufferChunk(footerBuffer, 0)), metadataSize, codec, bufferSize); this.metadata = OrcProto.Metadata.parseFrom(instream); footerBuffer.position(position + metadataSize); footerBuffer.limit(position + metadataSize + footerBufferSize); instream = InStream.create(null, "footer", Lists.<DiskRange>newArrayList( - new BufferChunk(footerBuffer, 0)), footerBufferSize, codec, bufferSize, null); + new BufferChunk(footerBuffer, 0)), footerBufferSize, codec, bufferSize); this.footer = OrcProto.Footer.parseFrom(instream); footerBuffer.position(position); @@ -722,13 +718,10 @@ public class ReaderImpl implements Reader { } @Override - public EncodedReader encodedReader(long fileId, LowLevelCache lowLevelCache, - LowLevelCacheCounters qfCounters, Consumer<OrcEncodedColumnBatch> consumer) - throws IOException { - boolean useZeroCopy = (conf != null) - && HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_ORC_ZEROCOPY); - return new EncodedReaderImpl(fileSystem, path, fileId, useZeroCopy, types, - codec, bufferSize, rowIndexStride, lowLevelCache, qfCounters, consumer); + public EncodedReader encodedReader( + long fileId, DataCache dataCache, DataReader dataReader) throws IOException { + return new EncodedReaderImpl(fileId, types, + codec, bufferSize, rowIndexStride, dataCache, dataReader); } @Override @@ -740,4 +733,9 @@ public class ReaderImpl implements Reader { public int getMetadataSize() { return metadataSize; } + + @Override + public DataReader createDefaultDataReader(boolean useZeroCopy) { + return RecordReaderUtils.createDefaultDataReader(fileSystem, path, useZeroCopy, codec); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java index eeb2f7d..dba9071 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java @@ -18,16 +18,8 @@ package org.apache.hadoop.hive.ql.io.orc; import java.io.IOException; -import java.util.List; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.hive.ql.io.orc.OrcProto.ColumnEncoding; -import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndex; -import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; -import org.apache.hadoop.hive.llap.Consumer; -import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch; -import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache; -import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey; /** * A row-by-row iterator for ORC files. http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java index 3b98562..5117baf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java @@ -34,35 +34,30 @@ import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.DiskRange; import org.apache.hadoop.hive.common.DiskRangeList; -import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListCreateHelper; +import org.apache.hadoop.hive.common.DiskRangeList.CreateHelper; +import org.apache.hadoop.hive.common.io.storage_api.DataReader; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO; import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndexEntry; -import org.apache.hadoop.hive.ql.io.orc.RecordReaderUtils.ByteBufferAllocatorPool; import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.serde2.io.DateWritable; import org.apache.hadoop.hive.serde2.io.TimestampWritable; -import org.apache.hadoop.hive.shims.HadoopShims.ZeroCopyReaderShim; import org.apache.hadoop.io.Text; public class RecordReaderImpl implements RecordReader { static final Log LOG = LogFactory.getLog(RecordReaderImpl.class); private static final boolean isLogDebugEnabled = LOG.isDebugEnabled(); private final Path path; - private final FileSystem fileSystem; - private final FSDataInputStream file; private final long firstRow; private final List<StripeInformation> stripes = new ArrayList<StripeInformation>(); @@ -88,9 +83,7 @@ public class RecordReaderImpl implements RecordReader { private boolean[] includedRowGroups = null; private final Configuration conf; private final MetadataReader metadata; - - private final ByteBufferAllocatorPool pool = new ByteBufferAllocatorPool(); - private final ZeroCopyReaderShim zcr; + private final DataReader dataReader; public final static class Index { OrcProto.RowIndex[] rowGroupIndex; @@ -163,9 +156,7 @@ public class RecordReaderImpl implements RecordReader { long strideRate, Configuration conf ) throws IOException { - this.fileSystem = fileSystem; this.path = path; - this.file = fileSystem.open(path); this.codec = codec; this.types = types; this.bufferSize = bufferSize; @@ -194,9 +185,10 @@ public class RecordReaderImpl implements RecordReader { } } - final boolean zeroCopy = (conf != null) - && (HiveConf.getBoolVar(conf, HIVE_ORC_ZEROCOPY)); - zcr = zeroCopy ? RecordReaderUtils.createZeroCopyShim(file, codec, pool) : null; + final boolean zeroCopy = (conf != null) && (HiveConf.getBoolVar(conf, HIVE_ORC_ZEROCOPY)); + // TODO: we could change the ctor to pass this externally + this.dataReader = RecordReaderUtils.createDefaultDataReader(fileSystem, path, zeroCopy, codec); + this.dataReader.open(); firstRow = skippedRows; totalRowCount = rows; @@ -766,16 +758,16 @@ public class RecordReaderImpl implements RecordReader { is.close(); } if (bufferChunks != null) { - if (zcr != null) { + if (dataReader.isTrackingDiskRanges()) { for (DiskRangeList range = bufferChunks; range != null; range = range.next) { if (!(range instanceof BufferChunk)) { continue; } - zcr.releaseBuffer(((BufferChunk) range).chunk); + dataReader.releaseBuffer(((BufferChunk) range).chunk); } } - bufferChunks = null; } + bufferChunks = null; streams.clear(); } @@ -835,10 +827,9 @@ public class RecordReaderImpl implements RecordReader { long end = start + stripe.getDataLength(); // explicitly trigger 1 big read DiskRangeList toRead = new DiskRangeList(start, end); - bufferChunks = RecordReaderUtils.readDiskRanges(file, zcr, stripe.getOffset(), toRead, false); + bufferChunks = dataReader.readFileData(toRead, stripe.getOffset(), false); List<OrcProto.Stream> streamDescriptions = stripeFooter.getStreamsList(); createStreams(streamDescriptions, bufferChunks, null, codec, bufferSize, streams); - // TODO: decompressed data from streams should be put in cache } /** @@ -891,37 +882,6 @@ public class RecordReaderImpl implements RecordReader { } } - public static class CacheChunk extends DiskRangeList { - public LlapMemoryBuffer buffer; - - public CacheChunk(LlapMemoryBuffer buffer, long offset, long end) { - super(offset, end); - this.buffer = buffer; - } - - @Override - public boolean hasData() { - return buffer != null; - } - - @Override - public ByteBuffer getData() { - // Callers duplicate the buffer, they have to for BufferChunk - return buffer.getByteBufferRaw(); - } - - @Override - public String toString() { - return "start: " + offset + " end: " + end + " cache buffer: " + buffer; - } - - @Override - public DiskRange sliceAndShift(long offset, long end, long shiftBy) { - throw new UnsupportedOperationException("Cache chunk cannot be sliced - attempted [" - + this.offset + ", " + this.end + ") to [" + offset + ", " + end + ") "); - } - } - /** * Plan the ranges of the file that we need to read given the list of * columns and row groups. @@ -949,7 +909,7 @@ public class RecordReaderImpl implements RecordReader { long offset = 0; // figure out which columns have a present stream boolean[] hasNull = RecordReaderUtils.findPresentStreamsByColumn(streamList, types); - DiskRangeListCreateHelper list = new DiskRangeListCreateHelper(); + CreateHelper list = new CreateHelper(); for (OrcProto.Stream stream : streamList) { long length = stream.getLength(); int column = stream.getColumn(); @@ -992,7 +952,7 @@ public class RecordReaderImpl implements RecordReader { ranges, streamOffset, streamDesc.getLength()); StreamName name = new StreamName(column, streamDesc.getKind()); streams.put(name, InStream.create(null, name.toString(), buffers, - streamDesc.getLength(), codec, bufferSize, null)); + streamDesc.getLength(), codec, bufferSize)); streamOffset += streamDesc.getLength(); } } @@ -1005,7 +965,7 @@ public class RecordReaderImpl implements RecordReader { if (LOG.isDebugEnabled()) { LOG.debug("chunks = " + RecordReaderUtils.stringifyDiskRanges(toRead)); } - bufferChunks = RecordReaderUtils.readDiskRanges(file, zcr, stripe.getOffset(), toRead, false); + bufferChunks = dataReader.readFileData(toRead, stripe.getOffset(), false); if (LOG.isDebugEnabled()) { LOG.debug("merge = " + RecordReaderUtils.stringifyDiskRanges(bufferChunks)); } @@ -1162,8 +1122,7 @@ public class RecordReaderImpl implements RecordReader { @Override public void close() throws IOException { clearStreams(); - pool.clear(); - file.close(); + dataReader.close(); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java index 429c293e..2a57916 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java @@ -29,12 +29,11 @@ import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DFSClient; -import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hive.common.DiskRange; import org.apache.hadoop.hive.common.DiskRangeList; -import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListCreateHelper; -import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListMutateHelper; +import org.apache.hadoop.hive.common.DiskRangeList.CreateHelper; +import org.apache.hadoop.hive.common.DiskRangeList.MutateHelper; +import org.apache.hadoop.hive.common.io.storage_api.DataReader; import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.BufferChunk; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.ShimLoader; @@ -48,6 +47,69 @@ import com.google.common.collect.ComparisonChain; */ public class RecordReaderUtils { private static final HadoopShims SHIMS = ShimLoader.getHadoopShims(); + + private static class DefaultDataReader implements DataReader { + private FSDataInputStream file; + private ByteBufferAllocatorPool pool; + private ZeroCopyReaderShim zcr; + private FileSystem fs; + private Path path; + private boolean useZeroCopy; + private CompressionCodec codec; + + public DefaultDataReader( + FileSystem fs, Path path, boolean useZeroCopy, CompressionCodec codec) { + this.fs = fs; + this.path = path; + this.useZeroCopy = useZeroCopy; + this.codec = codec; + } + + @Override + public void open() throws IOException { + this.file = fs.open(path); + if (useZeroCopy) { + pool = new ByteBufferAllocatorPool(); + zcr = RecordReaderUtils.createZeroCopyShim(file, codec, pool); + } else { + pool = null; + zcr = null; + } + } + + @Override + public DiskRangeList readFileData( + DiskRangeList range, long baseOffset, boolean doForceDirect) throws IOException { + return RecordReaderUtils.readDiskRanges(file, zcr, baseOffset, range, doForceDirect); + } + + @Override + public void close() throws IOException { + if (file != null) { + file.close(); + } + if (pool != null) { + pool.clear(); + } + } + + @Override + public boolean isTrackingDiskRanges() { + return zcr != null; + } + + @Override + public void releaseBuffer(ByteBuffer buffer) { + zcr.releaseBuffer(buffer); + } + + } + + static DataReader createDefaultDataReader( + FileSystem fs, Path path, boolean useZeroCopy, CompressionCodec codec) { + return new DefaultDataReader(fs, path, useZeroCopy, codec); + } + static boolean[] findPresentStreamsByColumn( List<OrcProto.Stream> streamList, List<OrcProto.Type> types) { boolean[] hasNull = new boolean[types.size()]; @@ -75,14 +137,14 @@ public class RecordReaderUtils { } static void addEntireStreamToRanges( - long offset, long length, DiskRangeListCreateHelper list, boolean doMergeBuffers) { + long offset, long length, CreateHelper list, boolean doMergeBuffers) { list.addOrMerge(offset, offset + length, doMergeBuffers, false); } static void addRgFilteredStreamToRanges(OrcProto.Stream stream, boolean[] includedRowGroups, boolean isCompressed, OrcProto.RowIndex index, OrcProto.ColumnEncoding encoding, OrcProto.Type type, int compressionSize, boolean hasNull, - long offset, long length, DiskRangeListCreateHelper list, boolean doMergeBuffers) { + long offset, long length, CreateHelper list, boolean doMergeBuffers) { for (int group = 0; group < includedRowGroups.length; ++group) { if (!includedRowGroups[group]) continue; int posn = getIndexPosition( @@ -204,7 +266,7 @@ public class RecordReaderUtils { * @param ranges ranges to stringify * @return the resulting string */ - static String stringifyDiskRanges(DiskRangeList range) { + public static String stringifyDiskRanges(DiskRangeList range) { StringBuilder buffer = new StringBuilder(); buffer.append("["); boolean isFirst = true; @@ -240,7 +302,7 @@ public class RecordReaderUtils { if (range == null) return null; DiskRangeList prev = range.prev; if (prev == null) { - prev = new DiskRangeListMutateHelper(range); + prev = new MutateHelper(range); } while (range != null) { if (range.hasData()) { @@ -434,31 +496,4 @@ public class RecordReaderUtils { } } } - - public static long getFileId(FileSystem fileSystem, Path path) throws IOException { - String pathStr = path.toUri().getPath(); - if (fileSystem instanceof DistributedFileSystem) { - return SHIMS.getFileId(fileSystem, pathStr); - } - // If we are not on DFS, we just hash the file name + size and hope for the best. - // TODO: we assume it only happens in tests. Fix? - int nameHash = pathStr.hashCode(); - long fileSize = fileSystem.getFileStatus(path).getLen(); - long id = ((fileSize ^ (fileSize >>> 32)) << 32) | ((long)nameHash & 0xffffffffL); - RecordReaderImpl.LOG.warn("Cannot get unique file ID from " - + fileSystem.getClass().getSimpleName() + "; using " + id + "(" + pathStr - + "," + nameHash + "," + fileSize + ")"); - return id; - } - - // TODO: this relies on HDFS not changing the format; we assume if we could get inode ID, this - // is still going to work. Otherwise, file IDs can be turned off. Later, we should use - // as public utility method in HDFS to obtain the inode-based path. - private static String HDFS_ID_PATH_PREFIX = "/.reserved/.inodes/"; - - public static Path getFileIdPath( - FileSystem fileSystem, Path path, long fileId) { - return (fileSystem instanceof DistributedFileSystem) - ? new Path(HDFS_ID_PATH_PREFIX + fileId) : path; - } } http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StreamUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StreamUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StreamUtils.java index 26f4c01..4c884be 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StreamUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StreamUtils.java @@ -21,8 +21,8 @@ import java.io.IOException; import java.nio.ByteBuffer; import org.apache.hadoop.hive.common.DiskRangeInfo; -import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch; -import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer; +import org.apache.hadoop.hive.common.io.storage_api.MemoryBuffer; +import org.apache.hadoop.hive.common.io.storage_api.EncodedColumnBatch.ColumnStreamData; /** * Stream utility. @@ -39,7 +39,7 @@ public class StreamUtils { * @throws IOException */ public static SettableUncompressedStream createSettableUncompressedStream(String streamName, - Long fileId, EncodedColumnBatch.StreamBuffer streamBuffer) throws IOException { + Long fileId, ColumnStreamData streamBuffer) throws IOException { if (streamBuffer == null) { return null; } @@ -54,11 +54,11 @@ public class StreamUtils { * @param streamBuffer - stream buffer * @return - total length of disk ranges */ - public static DiskRangeInfo createDiskRangeInfo(EncodedColumnBatch.StreamBuffer streamBuffer) { - DiskRangeInfo diskRangeInfo = new DiskRangeInfo(streamBuffer.indexBaseOffset); + public static DiskRangeInfo createDiskRangeInfo(ColumnStreamData streamBuffer) { + DiskRangeInfo diskRangeInfo = new DiskRangeInfo(streamBuffer.getIndexBaseOffset()); long offset = diskRangeInfo.getTotalLength(); // See ctor comment. // TODO: we should get rid of this - for (LlapMemoryBuffer memoryBuffer : streamBuffer.cacheBuffers) { + for (MemoryBuffer memoryBuffer : streamBuffer.getCacheBuffers()) { ByteBuffer buffer = memoryBuffer.getByteBufferDup(); diskRangeInfo.addDiskRange(new RecordReaderImpl.BufferChunk(buffer, offset)); offset += buffer.remaining(); http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java index 9ff465d..e08b446 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java @@ -30,8 +30,8 @@ import java.util.List; import java.util.Map; import java.util.TimeZone; +import org.apache.hadoop.hive.common.io.storage_api.EncodedColumnBatch.ColumnStreamData; import org.apache.hadoop.hive.common.type.HiveDecimal; -import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; @@ -65,11 +65,11 @@ public class TreeReaderFactory { protected final int columnId; protected BitFieldReader present = null; protected boolean valuePresent = false; - protected EncodedColumnBatch.StreamBuffer presentStreamBuffer = null; - protected EncodedColumnBatch.StreamBuffer dataStreamBuffer = null; - protected EncodedColumnBatch.StreamBuffer dictionaryStreamBuffer = null; - protected EncodedColumnBatch.StreamBuffer lengthsStreamBuffer = null; - protected EncodedColumnBatch.StreamBuffer secondaryStreamBuffer = null; + protected ColumnStreamData presentStreamBuffer = null; + protected ColumnStreamData dataStreamBuffer = null; + protected ColumnStreamData dictionaryStreamBuffer = null; + protected ColumnStreamData lengthsStreamBuffer = null; + protected ColumnStreamData secondaryStreamBuffer = null; TreeReader(int columnId) throws IOException { this(columnId, null); @@ -137,11 +137,11 @@ public class TreeReaderFactory { } } - public void setBuffers(EncodedColumnBatch.StreamBuffer[] buffers, boolean sameStripe) + public void setBuffers(ColumnStreamData[] buffers, boolean sameStripe) throws IOException { // stream buffers are arranged in enum order of stream kind - for (EncodedColumnBatch.StreamBuffer streamBuffer : buffers) { - switch (streamBuffer.streamKind) { + for (ColumnStreamData streamBuffer : buffers) { + switch (streamBuffer.getStreamKind()) { case 0: // PRESENT stream presentStreamBuffer = streamBuffer; @@ -163,7 +163,7 @@ public class TreeReaderFactory { secondaryStreamBuffer = streamBuffer; break; default: - throw new IOException("Unexpected stream kind: " + streamBuffer.streamKind); + throw new IOException("Unexpected stream kind: " + streamBuffer.getStreamKind()); } } } http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/llap/Consumer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/llap/Consumer.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/llap/Consumer.java new file mode 100644 index 0000000..ce4328f --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/llap/Consumer.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.orc.llap; + +/** + * Data consumer; an equivalent of a data queue for an asynchronous data producer. + */ +public interface Consumer<T> { + /** Some data has been produced. */ + public void consumeData(T data); + /** No more data will be produced; done */ + public void setDone(); + /** No more data will be produced; error during production */ + public void setError(Throwable t); +} http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/llap/OrcBatchKey.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/llap/OrcBatchKey.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/llap/OrcBatchKey.java new file mode 100644 index 0000000..0249168 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/llap/OrcBatchKey.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc.llap; + +public class OrcBatchKey { + public long file; + public int stripeIx, rgIx; + + public OrcBatchKey(long file, int stripeIx, int rgIx) { + set(file, stripeIx, rgIx); + } + + public void set(long file, int stripeIx, int rgIx) { + this.file = file; + this.stripeIx = stripeIx; + this.rgIx = rgIx; + } + + @Override + public String toString() { + return "[" + file + ", stripe " + stripeIx + ", rgIx " + rgIx + "]"; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = prime + (int)(file ^ (file >>> 32)); + return (prime * result + rgIx) * prime + stripeIx; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (!(obj instanceof OrcBatchKey)) return false; + OrcBatchKey other = (OrcBatchKey)obj; + // Strings are interned and can thus be compared like this. + return stripeIx == other.stripeIx && rgIx == other.rgIx && file == other.file; + } + + @Override + public OrcBatchKey clone() throws CloneNotSupportedException { + return new OrcBatchKey(file, stripeIx, rgIx); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/llap/OrcCacheKey.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/llap/OrcCacheKey.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/llap/OrcCacheKey.java new file mode 100644 index 0000000..2be70e5 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/llap/OrcCacheKey.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc.llap; + +public class OrcCacheKey extends OrcBatchKey { + public int colIx; + + public OrcCacheKey(long file, int stripeIx, int rgIx, int colIx) { + super(file, stripeIx, rgIx); + this.colIx = colIx; + } + + public OrcCacheKey(OrcBatchKey batchKey, int colIx) { + super(batchKey.file, batchKey.stripeIx, batchKey.rgIx); + this.colIx = colIx; + } + + public OrcBatchKey copyToPureBatchKey() { + return new OrcBatchKey(file, stripeIx, rgIx); + } + + @Override + public String toString() { + return "[" + file + ", stripe " + stripeIx + ", rgIx " + rgIx + ", rgIx " + colIx + "]"; + } + + @Override + public int hashCode() { + final int prime = 31; + return super.hashCode() * prime + colIx; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (!(obj instanceof OrcCacheKey)) return false; + OrcCacheKey other = (OrcCacheKey)obj; + // Strings are interned and can thus be compared like this. + return stripeIx == other.stripeIx && rgIx == other.rgIx + && file == other.file && other.colIx == colIx; + } +} \ No newline at end of file