Author: sershe Date: Fri Mar 27 02:06:54 2015 New Revision: 1669493 URL: http://svn.apache.org/r1669493 Log: HIVE-10110 : LLAP: port updates from HIVE-9555 to llap branch in preparation for trunk merge (Sergey Shelukhin)
Modified: hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRange.java hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRangeList.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java hive/branches/llap/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java Modified: hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRange.java URL: http://svn.apache.org/viewvc/hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRange.java?rev=1669493&r1=1669492&r2=1669493&view=diff ============================================================================== --- hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRange.java (original) +++ hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRange.java Fri Mar 27 02:06:54 2015 @@ -24,9 +24,9 @@ import java.nio.ByteBuffer; */ public class DiskRange { /** The first address. */ - public long offset; + protected long offset; /** The address afterwards. */ - public long end; + protected long end; public DiskRange(long offset, long end) { this.offset = offset; @@ -50,6 +50,14 @@ public class DiskRange { return "range start: " + offset + " end: " + end; } + public long getOffset() { + return offset; + } + + public long getEnd() { + return end; + } + public int getLength() { long len = this.end - this.offset; assert len <= Integer.MAX_VALUE; @@ -61,7 +69,7 @@ public class DiskRange { return false; } - public DiskRange slice(long offset, long end) { + public DiskRange sliceAndShift(long offset, long end, long shiftBy) { // Rather, unexpected usage exception. throw new UnsupportedOperationException(); } @@ -70,8 +78,17 @@ public class DiskRange { throw new UnsupportedOperationException(); } - public void shiftBy(long offsetDelta) { - offset += offsetDelta; - end += offsetDelta; + protected boolean merge(long otherOffset, long otherEnd) { + if (!overlap(offset, end, otherOffset, otherEnd)) return false; + offset = Math.min(offset, otherOffset); + end = Math.max(end, otherEnd); + return true; + } + + private static boolean overlap(long leftA, long rightA, long leftB, long rightB) { + if (leftA <= leftB) { + return rightA >= leftB; + } + return rightB >= leftA; } } \ No newline at end of file Modified: hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRangeList.java URL: http://svn.apache.org/viewvc/hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRangeList.java?rev=1669493&r1=1669492&r2=1669493&view=diff ============================================================================== --- hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRangeList.java (original) +++ hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRangeList.java Fri Mar 27 02:06:54 2015 @@ -47,8 +47,13 @@ public class DiskRangeList extends DiskR return other; } - /** Inserts an element before current in the list; returns the new element. */ - public DiskRangeList insertBefore(DiskRangeList other) { + /** + * Inserts an intersecting range before current in the list and adjusts offset accordingly. + * @returns the new element. + */ + public DiskRangeList insertPartBefore(DiskRangeList other) { + assert other.end >= this.offset; + this.offset = other.end; other.prev = this.prev; other.next = this; if (this.prev != null) { @@ -58,7 +63,10 @@ public class DiskRangeList extends DiskR return other; } - /** Inserts an element after current in the list; returns the new element. */ + /** + * Inserts an element after current in the list. + * @returns the new element. + * */ public DiskRangeList insertAfter(DiskRangeList other) { other.next = this.next; other.prev = this; @@ -69,6 +77,16 @@ public class DiskRangeList extends DiskR return other; } + /** + * Inserts an intersecting range after current in the list and adjusts offset accordingly. + * @returns the new element. + */ + public DiskRangeList insertPartAfter(DiskRangeList other) { + assert other.offset <= this.end; + this.end = other.offset; + return insertAfter(other); + } + /** Removes an element after current from the list. */ public void removeAfter() { DiskRangeList other = this.next; @@ -92,8 +110,8 @@ public class DiskRangeList extends DiskR /** Splits current element in the list, using DiskRange::slice */ public DiskRangeList split(long cOffset) { - insertAfter((DiskRangeList)this.slice(cOffset, end)); - return replaceSelfWith((DiskRangeList)this.slice(offset, cOffset)); + insertAfter((DiskRangeList)this.sliceAndShift(cOffset, end, 0)); + return replaceSelfWith((DiskRangeList)this.sliceAndShift(offset, cOffset, 0)); } public boolean hasContiguousNext() { @@ -149,29 +167,19 @@ public class DiskRangeList extends DiskR } public void addOrMerge(long offset, long end, boolean doMerge, boolean doLogNew) { - if (doMerge && tail != null && overlap(tail.offset, tail.end, offset, end)) { - tail.offset = Math.min(tail.offset, offset); - tail.end = Math.max(tail.end, end); + if (doMerge && tail != null && tail.merge(offset, end)) return; + if (doLogNew) { + LOG.info("Creating new range; last range (which can include some previous adds) was " + + tail); + } + DiskRangeList node = new DiskRangeList(offset, end); + if (tail == null) { + head = tail = node; } else { - if (doLogNew) { - LOG.info("Creating new range; last range (which can include some previous adds) was " - + tail); - } - DiskRangeList node = new DiskRangeList(offset, end); - if (tail == null) { - head = tail = node; - } else { - tail = tail.insertAfter(node); - } + tail = tail.insertAfter(node); } } - private static boolean overlap(long leftA, long rightA, long leftB, long rightB) { - if (leftA <= leftB) { - return rightA >= leftB; - } - return rightB >= leftA; - } public DiskRangeList get() { return head; Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java?rev=1669493&r1=1669492&r2=1669493&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java Fri Mar 27 02:06:54 2015 @@ -107,7 +107,7 @@ public class LowLevelCacheImpl implement private void getOverlappingRanges(long baseOffset, DiskRangeList currentNotCached, ConcurrentSkipListMap<Long, LlapDataBuffer> cache) { - long absOffset = currentNotCached.offset + baseOffset; + long absOffset = currentNotCached.getOffset() + baseOffset; if (!doAssumeGranularBlocks) { // This currently only happens in tests. See getFileData comment on the interface. Long prevOffset = cache.floorKey(absOffset); @@ -116,7 +116,7 @@ public class LowLevelCacheImpl implement } } Iterator<Map.Entry<Long, LlapDataBuffer>> matches = cache.subMap( - absOffset, currentNotCached.end + baseOffset) + absOffset, currentNotCached.getEnd() + baseOffset) .entrySet().iterator(); long cacheEnd = -1; while (matches.hasNext()) { @@ -140,48 +140,42 @@ public class LowLevelCacheImpl implement + cacheOffset + ", " + (cacheOffset + buffer.declaredLength) + ")"); } cacheEnd = cacheOffset + buffer.declaredLength; - CacheChunk currentCached = new CacheChunk(buffer, cacheOffset, cacheEnd); - currentNotCached = addCachedBufferToIter(currentNotCached, currentCached, baseOffset); - // Now that we've added it into correct position, we can adjust it by base offset. - currentCached.shiftBy(-baseOffset); + CacheChunk currentCached = new CacheChunk(buffer, + cacheOffset - baseOffset, cacheEnd - baseOffset); + currentNotCached = addCachedBufferToIter(currentNotCached, currentCached); metrics.incrCacheHitBytes(Math.min(requestedLength, currentCached.getLength())); } } /** * Adds cached buffer to buffer list. - * @param currentNotCached Pointer to the list node where we are inserting. Expressed in stripe/stream offset. - * @param currentCached The cached buffer found for this node, to insert. Expressed in file offset. - * @param baseOffset + * @param currentNotCached Pointer to the list node where we are inserting. + * @param currentCached The cached buffer found for this node, to insert. * @return The new currentNotCached pointer, following the cached buffer insertion. */ private DiskRangeList addCachedBufferToIter( - DiskRangeList currentNotCached, CacheChunk currentCached, long baseOffset) { - // Both currentNotCached and currentCached already include baseOffset. - long startOffset = baseOffset + currentNotCached.offset, - endOffset = baseOffset + currentNotCached.end; - if (startOffset >= currentCached.offset) { - if (endOffset <= currentCached.end) { // we assume it's always "==" now + DiskRangeList currentNotCached, CacheChunk currentCached) { + if (currentNotCached.getOffset() >= currentCached.getOffset()) { + if (currentNotCached.getEnd() <= currentCached.getEnd()) { // we assume it's always "==" now // Replace the entire current DiskRange with new cached range. currentNotCached.replaceSelfWith(currentCached); return null; } else { // Insert the new cache range before the disk range. - currentNotCached.offset = currentCached.end - baseOffset; - currentNotCached.insertBefore(currentCached); + currentNotCached.insertPartBefore(currentCached); return currentNotCached; } } else { - assert startOffset < currentCached.offset - || currentNotCached.prev == null || currentNotCached.prev.end <= currentCached.offset; - currentNotCached.end = currentCached.offset - baseOffset; - currentNotCached.insertAfter(currentCached); - if (endOffset <= currentCached.end) { // we assume it's always "==" now + assert currentNotCached.getOffset() < currentCached.getOffset() + || currentNotCached.prev == null + || currentNotCached.prev.getEnd() <= currentCached.getOffset(); + long endOffset = currentNotCached.getEnd(); + currentNotCached.insertPartAfter(currentCached); + if (endOffset <= currentCached.getEnd()) { // we assume it's always "==" now return null; // No more matches expected... } else { // Insert the new disk range after the cache range. TODO: not strictly necessary yet? - currentNotCached = new DiskRangeList( - currentCached.end - baseOffset, endOffset - baseOffset); + currentNotCached = new DiskRangeList(currentCached.getEnd(), endOffset); currentCached.insertAfter(currentNotCached); return currentNotCached; } @@ -214,7 +208,7 @@ public class LowLevelCacheImpl implement } boolean canLock = lockBuffer(buffer, false); assert canLock; - long offset = ranges[i].offset + baseOffset; + long offset = ranges[i].getOffset() + baseOffset; buffer.declaredLength = ranges[i].getLength(); while (true) { // Overwhelmingly executes once, or maybe twice (replacing stale value). LlapDataBuffer oldVal = subCache.cache.putIfAbsent(offset, buffer); Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java?rev=1669493&r1=1669492&r2=1669493&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java Fri Mar 27 02:06:54 2015 @@ -32,7 +32,7 @@ public class DebugUtils { private final static boolean isTraceOrcEnabled = EncodedReaderImpl.LOG.isDebugEnabled(); public static boolean isTraceOrcEnabled() { - return isTraceOrcEnabled; // TODO: temporary, should be hardcoded false + return true; // TODO: temporary, should be hardcoded false } public static boolean isTraceLockingEnabled() { Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java?rev=1669493&r1=1669492&r2=1669493&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java Fri Mar 27 02:06:54 2015 @@ -137,13 +137,13 @@ public abstract class InStream extends I logEmptySeek(name); return; } - if (curRange.offset <= desired && - (desired - curRange.offset) < curRange.getLength()) { + if (curRange.getOffset() <= desired && + (desired - curRange.getOffset()) < curRange.getLength()) { currentOffset = desired; currentRange = i; this.range = curRange.getData().duplicate(); int pos = range.position(); - pos += (int)(desired - curRange.offset); // this is why we duplicate + pos += (int)(desired - curRange.getOffset()); // this is why we duplicate this.range.position(pos); return; } @@ -151,13 +151,13 @@ public abstract class InStream extends I } // if they are seeking to the precise end, go ahead and let them go there int segments = bytes.size(); - if (segments != 0 && desired == bytes.get(segments - 1).end) { + if (segments != 0 && desired == bytes.get(segments - 1).getEnd()) { currentOffset = desired; currentRange = segments - 1; DiskRange curRange = bytes.get(currentRange); this.range = curRange.getData().duplicate(); int pos = range.position(); - pos += (int)(desired - curRange.offset); // this is why we duplicate + pos += (int)(desired - curRange.getOffset()); // this is why we duplicate this.range.position(pos); return; } @@ -228,7 +228,6 @@ public abstract class InStream extends I assert compressed == null; return; // Next block is ready from cache. } - long originalOffset = currentOffset; if (compressed.remaining() > OutStream.HEADER_SIZE) { int b0 = compressed.get() & 0xff; int b1 = compressed.get() & 0xff; @@ -403,19 +402,19 @@ public abstract class InStream extends I } int i = 0; for (DiskRange range : bytes) { - if (range.offset <= desired && desired < range.end) { + if (range.getOffset() <= desired && desired < range.getEnd()) { currentRange = i; if (range instanceof BufferChunk) { cacheBuffer = null; compressed = range.getData().duplicate(); int pos = compressed.position(); - pos += (int)(desired - range.offset); + pos += (int)(desired - range.getOffset()); compressed.position(pos); } else { compressed = null; cacheBuffer = ((CacheChunk)range).buffer; uncompressed = cacheBuffer.getByteBufferDup(); - if (desired != range.offset) { + if (desired != range.getOffset()) { throw new IOException("Cannot seek into the middle of uncompressed cached data"); } } @@ -426,7 +425,7 @@ public abstract class InStream extends I } // if they are seeking to the precise end, go ahead and let them go there int segments = bytes.size(); - if (segments != 0 && desired == bytes.get(segments - 1).end) { + if (segments != 0 && desired == bytes.get(segments - 1).getEnd()) { DiskRange range = bytes.get(segments - 1); currentRange = segments - 1; if (range instanceof BufferChunk) { @@ -438,7 +437,7 @@ public abstract class InStream extends I cacheBuffer = ((CacheChunk)range).buffer; uncompressed = cacheBuffer.getByteBufferDup(); uncompressed.position(uncompressed.limit()); - if (desired != range.offset) { + if (desired != range.getOffset()) { throw new IOException("Cannot seek into the middle of uncompressed cached data"); } currentOffset = desired; @@ -455,8 +454,8 @@ public abstract class InStream extends I if (i != 0) { builder.append("; "); } - builder.append(" range " + i + " = " + range.offset - + " to " + (range.end - range.offset)); + builder.append(" range " + i + " = " + range.getOffset() + + " to " + (range.getEnd() - range.getOffset())); ++i; } return builder.toString(); @@ -590,7 +589,7 @@ public abstract class InStream extends I } // 2. Go thru the blocks; add stuff to results and prepare the decompression work (see below). - if (cOffset > current.offset) { + if (cOffset > current.getOffset()) { // Target compression block is in the middle of the range; slice the range in two. current = current.split(cOffset).next; } @@ -606,7 +605,7 @@ public abstract class InStream extends I } cache.notifyReused(cc.buffer); streamBuffer.cacheBuffers.add(cc.buffer); - currentCOffset = cc.end; + currentCOffset = cc.getEnd(); if (DebugUtils.isTraceOrcEnabled()) { LOG.info("Adding an already-uncompressed buffer " + cc.buffer); } @@ -620,11 +619,11 @@ public abstract class InStream extends I toDecompress = new ArrayList<ProcCacheChunk>(); toRelease = (zcr == null) ? null : new ArrayList<ByteBuffer>(); } - long originalOffset = bc.offset; + long originalOffset = bc.getOffset(); lastCached = addOneCompressionBuffer(bc, zcr, bufferSize, cache, streamBuffer.cacheBuffers, toDecompress, toRelease); next = (lastCached != null) ? lastCached.next : null; - currentCOffset = (next != null) ? next.offset : originalOffset; + currentCOffset = (next != null) ? next.getOffset() : originalOffset; } if ((endCOffset >= 0 && currentCOffset >= endCOffset) || next == null) { break; @@ -740,10 +739,10 @@ public abstract class InStream extends I DiskRangeList ranges, long cOffset) { if (cOffset < 0) return ranges; // We expect the offset to be valid TODO: rather, validate - while (ranges.end <= cOffset) { + while (ranges.getEnd() <= cOffset) { ranges = ranges.next; } - while (ranges.offset > cOffset) { + while (ranges.getOffset() > cOffset) { ranges = ranges.prev; } return ranges; @@ -767,7 +766,7 @@ public abstract class InStream extends I List<ProcCacheChunk> toDecompress, List<ByteBuffer> toRelease) throws IOException { ByteBuffer slice = null; ByteBuffer compressed = current.chunk; - long cbStartOffset = current.offset; + long cbStartOffset = current.getOffset(); int b0 = compressed.get() & 0xff; int b1 = compressed.get() & 0xff; int b2 = compressed.get() & 0xff; @@ -788,13 +787,13 @@ public abstract class InStream extends I slice = compressed.slice(); slice.limit(chunkLength); ProcCacheChunk cc = addOneCompressionBlockByteBuffer(slice, isUncompressed, cbStartOffset, - cbEndOffset, chunkLength, consumedLength, current, cache, toDecompress, cacheBuffers); + cbEndOffset, chunkLength, current, cache, toDecompress, cacheBuffers); if (compressed.remaining() <= 0 && zcr != null) { toRelease.add(compressed); } return cc; } - if (current.end < cbEndOffset && !current.hasContiguousNext()) { + if (current.getEnd() < cbEndOffset && !current.hasContiguousNext()) { return null; // This is impossible to read from this chunk. } @@ -828,7 +827,7 @@ public abstract class InStream extends I slice.limit(remaining); copy.put(slice); ProcCacheChunk cc = addOneCompressionBlockByteBuffer( - copy, isUncompressed, cbStartOffset, cbEndOffset, remaining, remaining, + copy, isUncompressed, cbStartOffset, cbEndOffset, remaining, (BufferChunk)next, cache, toDecompress, cacheBuffers); if (compressed.remaining() <= 0 && zcr != null) { zcr.releaseBuffer(compressed); // We copied the entire buffer. @@ -857,9 +856,7 @@ public abstract class InStream extends I * @param cbStartOffset Compressed start offset of the fCB. * @param cbEndOffset Compressed end offset of the fCB. * @param lastRange The buffer from which the last (or all) bytes of fCB come. - * @param lastPartChunkLength The number of compressed bytes consumed from last *chunk* into fullCompressionBlock. - * @param lastPartConsumedLength The number of compressed bytes consumed from last *range* into fullCompressionBlock. - * Can be different from lastPartChunkLength due to header. + * @param lastChunkLength The number of compressed bytes consumed from last *chunk* into fullCompressionBlock. * @param ranges The iterator of all compressed ranges for the stream, pointing at lastRange. * @param lastChunk * @param toDecompress See addOneCompressionBuffer. @@ -867,9 +864,9 @@ public abstract class InStream extends I * @return New cache buffer. */ private static ProcCacheChunk addOneCompressionBlockByteBuffer(ByteBuffer fullCompressionBlock, - boolean isUncompressed, long cbStartOffset, long cbEndOffset, int lastPartChunkLength, - int lastPartConsumedLength, BufferChunk lastChunk, LowLevelCache cache, - List<ProcCacheChunk> toDecompress, List<LlapMemoryBuffer> cacheBuffers) { + boolean isUncompressed, long cbStartOffset, long cbEndOffset, int lastChunkLength, + BufferChunk lastChunk, LowLevelCache cache, List<ProcCacheChunk> toDecompress, + List<LlapMemoryBuffer> cacheBuffers) { // Prepare future cache buffer. LlapMemoryBuffer futureAlloc = cache.createUnallocated(); // Add it to result in order we are processing. @@ -880,24 +877,21 @@ public abstract class InStream extends I toDecompress.add(cc); // Adjust the compression block position. if (DebugUtils.isTraceOrcEnabled()) { - LOG.info("Adjusting " + lastChunk + " to consume " + lastPartChunkLength - + " compressed / " + lastPartConsumedLength + " total bytes"); + LOG.info("Adjusting " + lastChunk + " to consume " + lastChunkLength + " compressed bytes"); } - lastChunk.chunk.position(lastChunk.chunk.position() + lastPartChunkLength); - lastChunk.offset += lastPartConsumedLength; + lastChunk.chunk.position(lastChunk.chunk.position() + lastChunkLength); // Finally, put it in the ranges list for future use (if shared between RGs). // Before anyone else accesses it, it would have been allocated and decompressed locally. if (lastChunk.chunk.remaining() <= 0) { if (DebugUtils.isTraceOrcEnabled()) { LOG.info("Replacing " + lastChunk + " with " + cc + " in the buffers"); } - assert lastChunk.offset == lastChunk.end; lastChunk.replaceSelfWith(cc); } else { if (DebugUtils.isTraceOrcEnabled()) { LOG.info("Adding " + cc + " before " + lastChunk + " in the buffers"); } - lastChunk.insertBefore(cc); + lastChunk.insertPartBefore(cc); } return cc; } Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java?rev=1669493&r1=1669492&r2=1669493&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java Fri Mar 27 02:06:54 2015 @@ -145,7 +145,6 @@ public class OrcSplit extends FileSplit if (hasFileId) { fileId = in.readLong(); } - LOG.error("TODO# Got file ID " + fileId + " for " + getPath()); } FileMetaInfo getFileMetaInfo(){ Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1669493&r1=1669492&r2=1669493&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Fri Mar 27 02:06:54 2015 @@ -2974,12 +2974,9 @@ public class RecordReaderImpl implements } if (bufferChunks != null) { if (zcr != null) { - DiskRangeList range = bufferChunks; - while (range != null) { - if (range instanceof BufferChunk) { - zcr.releaseBuffer(((BufferChunk)range).chunk); - } - range = range.next; + for (DiskRangeList range = bufferChunks; range != null; range = range.next) { + if (!(range instanceof BufferChunk)) continue; + zcr.releaseBuffer(((BufferChunk)range).chunk); } } bufferChunks = null; @@ -3077,22 +3074,23 @@ public class RecordReaderImpl implements } @Override - public DiskRange slice(long offset, long end) { + public DiskRange sliceAndShift(long offset, long end, long shiftBy) { assert offset <= end && offset >= this.offset && end <= this.end; + assert offset + shiftBy >= 0; ByteBuffer sliceBuf = chunk.slice(); int newPos = (int)(offset - this.offset); int newLimit = newPos + (int)(end - offset); - // TODO: temporary try { sliceBuf.position(newPos); sliceBuf.limit(newLimit); } catch (Throwable t) { LOG.error("Failed to slice buffer chunk with range" + " [" + this.offset + ", " + this.end + "), position: " + chunk.position() + " limit: " + chunk.limit() + ", " - + (chunk.isDirect() ? "direct" : "array") + "; to [" + offset + ", " + end + ") " + t.getClass()); + + (chunk.isDirect() ? "direct" : "array") + "; to [" + offset + ", " + end + ") " + + t.getClass()); throw new RuntimeException(t); } - return new BufferChunk(sliceBuf, offset); + return new BufferChunk(sliceBuf, offset + shiftBy); } @Override @@ -3176,24 +3174,6 @@ public class RecordReaderImpl implements return list.extract(); } - /** - * Update the disk ranges to collapse adjacent or overlapping ranges. It - * assumes that the ranges are sorted. - * @param ranges the list of disk ranges to merge - */ - static void mergeDiskRanges(DiskRangeList range) { - while (range != null && range.next != null) { - DiskRangeList next = range.next; - if (RecordReaderUtils.overlap(range.offset, range.end, next.offset, next.end)) { - range.offset = Math.min(range.offset, next.offset); - range.end = Math.max(range.end, next.end); - range.removeAfter(); - } else { - range = next; - } - } - } - void createStreams(List<OrcProto.Stream> streamDescriptions, DiskRangeList ranges, boolean[] includeColumn, @@ -3233,7 +3213,6 @@ public class RecordReaderImpl implements if (LOG.isDebugEnabled()) { LOG.debug("chunks = " + RecordReaderUtils.stringifyDiskRanges(toRead)); } - mergeDiskRanges(toRead); if (this.cache != null) { toRead = cache.getFileData(fileId, toRead, stripe.getOffset()); } Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java?rev=1669493&r1=1669492&r2=1669493&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java Fri Mar 27 02:06:54 2015 @@ -245,8 +245,8 @@ public class RecordReaderUtils { range = range.next; continue; } - int len = (int) (range.end - range.offset); - long off = range.offset; + int len = (int) (range.getEnd() - range.getOffset()); + long off = range.getOffset(); file.seek(base + off); if (zcr != null) { boolean hasReplaced = false; @@ -267,11 +267,11 @@ public class RecordReaderUtils { } else if (doForceDirect) { ByteBuffer directBuf = ByteBuffer.allocateDirect(len); readDirect(file, len, directBuf, true); - range = range.replaceSelfWith(new BufferChunk(directBuf, range.offset)); + range = range.replaceSelfWith(new BufferChunk(directBuf, range.getOffset())); } else { byte[] buffer = new byte[len]; file.readFully(buffer, 0, buffer.length); - range = range.replaceSelfWith(new BufferChunk(ByteBuffer.wrap(buffer), range.offset)); + range = range.replaceSelfWith(new BufferChunk(ByteBuffer.wrap(buffer), range.getOffset())); } range = range.next; } @@ -320,37 +320,31 @@ public class RecordReaderUtils { boolean inRange = false; while (range != null) { if (!inRange) { - if (range.end <= offset) { + if (range.getEnd() <= offset) { range = range.next; continue; // Skip until we are in range. } inRange = true; - if (range.offset < offset) { + if (range.getOffset() < offset) { // Partial first buffer, add a slice of it. - DiskRange partial = range.slice(offset, Math.min(streamEnd, range.end)); - partial.shiftBy(-offset); - buffers.add(partial); - if (range.end >= streamEnd) break; // Partial first buffer is also partial last buffer. + buffers.add(range.sliceAndShift(offset, Math.min(streamEnd, range.getEnd()), -offset)); + if (range.getEnd() >= streamEnd) break; // Partial first buffer is also partial last buffer. range = range.next; continue; } - } else if (range.offset >= streamEnd) { + } else if (range.getOffset() >= streamEnd) { break; } - if (range.end > streamEnd) { + if (range.getEnd() > streamEnd) { // Partial last buffer (may also be the first buffer), add a slice of it. - DiskRange partial = range.slice(range.offset, streamEnd); - partial.shiftBy(-offset); - buffers.add(partial); + buffers.add(range.sliceAndShift(range.getOffset(), streamEnd, -offset)); break; } // Buffer that belongs entirely to one stream. // TODO: ideally we would want to reuse the object and remove it from the list, but we cannot // because bufferChunks is also used by clearStreams for zcr. Create a useless dup. - DiskRange full = range.slice(range.offset, range.end); - full.shiftBy(-offset); - buffers.add(full); - if (range.end == streamEnd) break; + buffers.add(range.sliceAndShift(range.getOffset(), range.getEnd(), -offset)); + if (range.getEnd() == streamEnd) break; range = range.next; } return buffers; Modified: hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java?rev=1669493&r1=1669492&r2=1669493&view=diff ============================================================================== --- hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java (original) +++ hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java Fri Mar 27 02:06:54 2015 @@ -801,28 +801,6 @@ public class TestRecordReaderImpl { } @Test - public void testMergeDiskRanges() throws Exception { - DiskRangeList list = diskRanges(100, 200, 300, 400, 500, 600); - RecordReaderImpl.mergeDiskRanges(list); - assertThat(list, is(diskRanges(100, 200, 300, 400, 500, 600))); - list = diskRanges(100, 200, 150, 300, 400, 500); - RecordReaderImpl.mergeDiskRanges(list); - assertThat(list, is(diskRanges(100, 300, 400, 500))); - list = diskRanges(100, 200, 300, 400, 400, 500); - RecordReaderImpl.mergeDiskRanges(list); - assertThat(list, is(diskRanges(100, 200, 300, 500))); - list = diskRanges(100, 200, 0, 300); - RecordReaderImpl.mergeDiskRanges(list); - assertThat(list, is(diskRanges(0, 300))); - list = diskRanges(0, 500, 200, 400); - RecordReaderImpl.mergeDiskRanges(list); - assertThat(list, is(diskRanges(0, 500))); - list = diskRanges(0, 100, 100, 200, 200, 300, 300, 400); - RecordReaderImpl.mergeDiskRanges(list); - assertThat(list, is(diskRanges(0, 400))); - } - - @Test public void testGetIndexPosition() throws Exception { assertEquals(0, RecordReaderUtils.getIndexPosition (OrcProto.ColumnEncoding.Kind.DIRECT, OrcProto.Type.Kind.INT,