Author: sershe
Date: Fri Mar 27 02:06:54 2015
New Revision: 1669493

URL: http://svn.apache.org/r1669493
Log:
HIVE-10110 : LLAP: port updates from HIVE-9555 to llap branch in preparation 
for trunk merge (Sergey Shelukhin)

Modified:
    
hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRange.java
    
hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRangeList.java
    
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java
    
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
    
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
    
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
    
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java
    
hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java

Modified: 
hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRange.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRange.java?rev=1669493&r1=1669492&r2=1669493&view=diff
==============================================================================
--- 
hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRange.java 
(original)
+++ 
hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRange.java 
Fri Mar 27 02:06:54 2015
@@ -24,9 +24,9 @@ import java.nio.ByteBuffer;
  */
 public class DiskRange {
   /** The first address. */
-  public long offset;
+  protected long offset;
   /** The address afterwards. */
-  public long end;
+  protected long end;
 
   public DiskRange(long offset, long end) {
     this.offset = offset;
@@ -50,6 +50,14 @@ public class DiskRange {
     return "range start: " + offset + " end: " + end;
   }
 
+  public long getOffset() {
+    return offset;
+  }
+
+  public long getEnd() {
+    return end;
+  }
+
   public int getLength() {
     long len = this.end - this.offset;
     assert len <= Integer.MAX_VALUE;
@@ -61,7 +69,7 @@ public class DiskRange {
     return false;
   }
 
-  public DiskRange slice(long offset, long end) {
+  public DiskRange sliceAndShift(long offset, long end, long shiftBy) {
     // Rather, unexpected usage exception.
     throw new UnsupportedOperationException();
   }
@@ -70,8 +78,17 @@ public class DiskRange {
     throw new UnsupportedOperationException();
   }
 
-  public void shiftBy(long offsetDelta) {
-    offset += offsetDelta;
-    end += offsetDelta;
+  protected boolean merge(long otherOffset, long otherEnd) {
+    if (!overlap(offset, end, otherOffset, otherEnd)) return false;
+    offset = Math.min(offset, otherOffset);
+    end = Math.max(end, otherEnd);
+    return true;
+  }
+
+  private static boolean overlap(long leftA, long rightA, long leftB, long 
rightB) {
+    if (leftA <= leftB) {
+      return rightA >= leftB;
+    }
+    return rightB >= leftA;
   }
 }
\ No newline at end of file

Modified: 
hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRangeList.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRangeList.java?rev=1669493&r1=1669492&r2=1669493&view=diff
==============================================================================
--- 
hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRangeList.java
 (original)
+++ 
hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRangeList.java
 Fri Mar 27 02:06:54 2015
@@ -47,8 +47,13 @@ public class DiskRangeList extends DiskR
     return other;
   }
 
-  /** Inserts an element before current in the list; returns the new element. 
*/
-  public DiskRangeList insertBefore(DiskRangeList other) {
+  /**
+   * Inserts an intersecting range before current in the list and adjusts 
offset accordingly.
+   * @returns the new element.
+   */
+  public DiskRangeList insertPartBefore(DiskRangeList other) {
+    assert other.end >= this.offset;
+    this.offset = other.end;
     other.prev = this.prev;
     other.next = this;
     if (this.prev != null) {
@@ -58,7 +63,10 @@ public class DiskRangeList extends DiskR
     return other;
   }
 
-  /** Inserts an element after current in the list; returns the new element. */
+  /**
+   * Inserts an element after current in the list.
+   * @returns the new element.
+   * */
   public DiskRangeList insertAfter(DiskRangeList other) {
     other.next = this.next;
     other.prev = this;
@@ -69,6 +77,16 @@ public class DiskRangeList extends DiskR
     return other;
   }
 
+  /**
+   * Inserts an intersecting range after current in the list and adjusts 
offset accordingly.
+   * @returns the new element.
+   */
+  public DiskRangeList insertPartAfter(DiskRangeList other) {
+    assert other.offset <= this.end;
+    this.end = other.offset;
+    return insertAfter(other);
+  }
+
   /** Removes an element after current from the list. */
   public void removeAfter() {
     DiskRangeList other = this.next;
@@ -92,8 +110,8 @@ public class DiskRangeList extends DiskR
 
   /** Splits current element in the list, using DiskRange::slice */
   public DiskRangeList split(long cOffset) {
-    insertAfter((DiskRangeList)this.slice(cOffset, end));
-    return replaceSelfWith((DiskRangeList)this.slice(offset, cOffset));
+    insertAfter((DiskRangeList)this.sliceAndShift(cOffset, end, 0));
+    return replaceSelfWith((DiskRangeList)this.sliceAndShift(offset, cOffset, 
0));
   }
 
   public boolean hasContiguousNext() {
@@ -149,29 +167,19 @@ public class DiskRangeList extends DiskR
     }
 
     public void addOrMerge(long offset, long end, boolean doMerge, boolean 
doLogNew) {
-      if (doMerge && tail != null && overlap(tail.offset, tail.end, offset, 
end)) {
-        tail.offset = Math.min(tail.offset, offset);
-        tail.end = Math.max(tail.end, end);
+      if (doMerge && tail != null && tail.merge(offset, end)) return;
+      if (doLogNew) {
+        LOG.info("Creating new range; last range (which can include some 
previous adds) was "
+            + tail);
+      }
+      DiskRangeList node = new DiskRangeList(offset, end);
+      if (tail == null) {
+        head = tail = node;
       } else {
-        if (doLogNew) {
-          LOG.info("Creating new range; last range (which can include some 
previous adds) was "
-              + tail);
-        }
-        DiskRangeList node = new DiskRangeList(offset, end);
-        if (tail == null) {
-          head = tail = node;
-        } else {
-          tail = tail.insertAfter(node);
-        }
+        tail = tail.insertAfter(node);
       }
     }
 
-    private static boolean overlap(long leftA, long rightA, long leftB, long 
rightB) {
-      if (leftA <= leftB) {
-        return rightA >= leftB;
-      }
-      return rightB >= leftA;
-    }
 
     public DiskRangeList get() {
       return head;

Modified: 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java?rev=1669493&r1=1669492&r2=1669493&view=diff
==============================================================================
--- 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
 (original)
+++ 
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
 Fri Mar 27 02:06:54 2015
@@ -107,7 +107,7 @@ public class LowLevelCacheImpl implement
 
   private void getOverlappingRanges(long baseOffset, DiskRangeList 
currentNotCached,
       ConcurrentSkipListMap<Long, LlapDataBuffer> cache) {
-    long absOffset = currentNotCached.offset + baseOffset;
+    long absOffset = currentNotCached.getOffset() + baseOffset;
     if (!doAssumeGranularBlocks) {
       // This currently only happens in tests. See getFileData comment on the 
interface.
       Long prevOffset = cache.floorKey(absOffset);
@@ -116,7 +116,7 @@ public class LowLevelCacheImpl implement
       }
     }
     Iterator<Map.Entry<Long, LlapDataBuffer>> matches = cache.subMap(
-        absOffset, currentNotCached.end + baseOffset)
+        absOffset, currentNotCached.getEnd() + baseOffset)
         .entrySet().iterator();
     long cacheEnd = -1;
     while (matches.hasNext()) {
@@ -140,48 +140,42 @@ public class LowLevelCacheImpl implement
             + cacheOffset + ", " + (cacheOffset + buffer.declaredLength) + 
")");
       }
       cacheEnd = cacheOffset + buffer.declaredLength;
-      CacheChunk currentCached = new CacheChunk(buffer, cacheOffset, cacheEnd);
-      currentNotCached = addCachedBufferToIter(currentNotCached, 
currentCached, baseOffset);
-      // Now that we've added it into correct position, we can adjust it by 
base offset.
-      currentCached.shiftBy(-baseOffset);
+      CacheChunk currentCached = new CacheChunk(buffer,
+          cacheOffset - baseOffset, cacheEnd - baseOffset);
+      currentNotCached = addCachedBufferToIter(currentNotCached, 
currentCached);
       metrics.incrCacheHitBytes(Math.min(requestedLength, 
currentCached.getLength()));
     }
   }
 
   /**
    * Adds cached buffer to buffer list.
-   * @param currentNotCached Pointer to the list node where we are inserting. 
Expressed in stripe/stream offset.
-   * @param currentCached The cached buffer found for this node, to insert. 
Expressed in file offset.
-   * @param baseOffset 
+   * @param currentNotCached Pointer to the list node where we are inserting.
+   * @param currentCached The cached buffer found for this node, to insert.
    * @return The new currentNotCached pointer, following the cached buffer 
insertion.
    */
   private DiskRangeList addCachedBufferToIter(
-      DiskRangeList currentNotCached, CacheChunk currentCached, long 
baseOffset) {
-    // Both currentNotCached and currentCached already include baseOffset.
-    long startOffset = baseOffset + currentNotCached.offset,
-        endOffset = baseOffset + currentNotCached.end;
-    if (startOffset >= currentCached.offset) {
-      if (endOffset <= currentCached.end) {  // we assume it's always "==" now
+      DiskRangeList currentNotCached, CacheChunk currentCached) {
+    if (currentNotCached.getOffset() >= currentCached.getOffset()) {
+      if (currentNotCached.getEnd() <= currentCached.getEnd()) {  // we assume 
it's always "==" now
         // Replace the entire current DiskRange with new cached range.
         currentNotCached.replaceSelfWith(currentCached);
         return null;
       } else {
         // Insert the new cache range before the disk range.
-        currentNotCached.offset = currentCached.end - baseOffset;
-        currentNotCached.insertBefore(currentCached);
+        currentNotCached.insertPartBefore(currentCached);
         return currentNotCached;
       }
     } else {
-      assert startOffset < currentCached.offset
-        || currentNotCached.prev == null || currentNotCached.prev.end <= 
currentCached.offset;
-      currentNotCached.end = currentCached.offset - baseOffset;
-      currentNotCached.insertAfter(currentCached);
-      if (endOffset <= currentCached.end) { // we assume it's always "==" now
+      assert currentNotCached.getOffset() < currentCached.getOffset()
+        || currentNotCached.prev == null
+        || currentNotCached.prev.getEnd() <= currentCached.getOffset();
+      long endOffset = currentNotCached.getEnd();
+      currentNotCached.insertPartAfter(currentCached);
+      if (endOffset <= currentCached.getEnd()) { // we assume it's always "==" 
now
         return null;  // No more matches expected...
       } else {
         // Insert the new disk range after the cache range. TODO: not strictly 
necessary yet?
-        currentNotCached = new DiskRangeList(
-            currentCached.end - baseOffset, endOffset - baseOffset);
+        currentNotCached = new DiskRangeList(currentCached.getEnd(), 
endOffset);
         currentCached.insertAfter(currentNotCached);
         return currentNotCached;
       }
@@ -214,7 +208,7 @@ public class LowLevelCacheImpl implement
         }
         boolean canLock = lockBuffer(buffer, false);
         assert canLock;
-        long offset = ranges[i].offset + baseOffset;
+        long offset = ranges[i].getOffset() + baseOffset;
         buffer.declaredLength = ranges[i].getLength();
         while (true) { // Overwhelmingly executes once, or maybe twice 
(replacing stale value).
           LlapDataBuffer oldVal = subCache.cache.putIfAbsent(offset, buffer);

Modified: 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java?rev=1669493&r1=1669492&r2=1669493&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java 
(original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java 
Fri Mar 27 02:06:54 2015
@@ -32,7 +32,7 @@ public class DebugUtils {
 
   private final static boolean isTraceOrcEnabled = 
EncodedReaderImpl.LOG.isDebugEnabled();
   public static boolean isTraceOrcEnabled() {
-    return isTraceOrcEnabled; // TODO: temporary, should be hardcoded false
+    return true; // TODO: temporary, should be hardcoded false
   }
 
   public static boolean isTraceLockingEnabled() {

Modified: 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java?rev=1669493&r1=1669492&r2=1669493&view=diff
==============================================================================
--- 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java 
(original)
+++ 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java 
Fri Mar 27 02:06:54 2015
@@ -137,13 +137,13 @@ public abstract class InStream extends I
           logEmptySeek(name);
           return;
         }
-        if (curRange.offset <= desired &&
-            (desired - curRange.offset) < curRange.getLength()) {
+        if (curRange.getOffset() <= desired &&
+            (desired - curRange.getOffset()) < curRange.getLength()) {
           currentOffset = desired;
           currentRange = i;
           this.range = curRange.getData().duplicate();
           int pos = range.position();
-          pos += (int)(desired - curRange.offset); // this is why we duplicate
+          pos += (int)(desired - curRange.getOffset()); // this is why we 
duplicate
           this.range.position(pos);
           return;
         }
@@ -151,13 +151,13 @@ public abstract class InStream extends I
       }
       // if they are seeking to the precise end, go ahead and let them go there
       int segments = bytes.size();
-      if (segments != 0 && desired == bytes.get(segments - 1).end) {
+      if (segments != 0 && desired == bytes.get(segments - 1).getEnd()) {
         currentOffset = desired;
         currentRange = segments - 1;
         DiskRange curRange = bytes.get(currentRange);
         this.range = curRange.getData().duplicate();
         int pos = range.position();
-        pos += (int)(desired - curRange.offset); // this is why we duplicate
+        pos += (int)(desired - curRange.getOffset()); // this is why we 
duplicate
         this.range.position(pos);
         return;
       }
@@ -228,7 +228,6 @@ public abstract class InStream extends I
         assert compressed == null;
         return; // Next block is ready from cache.
       }
-      long originalOffset = currentOffset;
       if (compressed.remaining() > OutStream.HEADER_SIZE) {
         int b0 = compressed.get() & 0xff;
         int b1 = compressed.get() & 0xff;
@@ -403,19 +402,19 @@ public abstract class InStream extends I
       }
       int i = 0;
       for (DiskRange range : bytes) {
-        if (range.offset <= desired && desired < range.end) {
+        if (range.getOffset() <= desired && desired < range.getEnd()) {
           currentRange = i;
           if (range instanceof BufferChunk) {
             cacheBuffer = null;
             compressed = range.getData().duplicate();
             int pos = compressed.position();
-            pos += (int)(desired - range.offset);
+            pos += (int)(desired - range.getOffset());
             compressed.position(pos);
           } else {
             compressed = null;
             cacheBuffer = ((CacheChunk)range).buffer;
             uncompressed = cacheBuffer.getByteBufferDup();
-            if (desired != range.offset) {
+            if (desired != range.getOffset()) {
               throw new IOException("Cannot seek into the middle of 
uncompressed cached data");
             }
           }
@@ -426,7 +425,7 @@ public abstract class InStream extends I
       }
       // if they are seeking to the precise end, go ahead and let them go there
       int segments = bytes.size();
-      if (segments != 0 && desired == bytes.get(segments - 1).end) {
+      if (segments != 0 && desired == bytes.get(segments - 1).getEnd()) {
         DiskRange range = bytes.get(segments - 1);
         currentRange = segments - 1;
         if (range instanceof BufferChunk) {
@@ -438,7 +437,7 @@ public abstract class InStream extends I
           cacheBuffer = ((CacheChunk)range).buffer;
           uncompressed = cacheBuffer.getByteBufferDup();
           uncompressed.position(uncompressed.limit());
-          if (desired != range.offset) {
+          if (desired != range.getOffset()) {
             throw new IOException("Cannot seek into the middle of uncompressed 
cached data");
           }
           currentOffset = desired;
@@ -455,8 +454,8 @@ public abstract class InStream extends I
         if (i != 0) {
           builder.append("; ");
         }
-        builder.append(" range " + i + " = " + range.offset
-            + " to " + (range.end - range.offset));
+        builder.append(" range " + i + " = " + range.getOffset()
+            + " to " + (range.getEnd() - range.getOffset()));
         ++i;
       }
       return builder.toString();
@@ -590,7 +589,7 @@ public abstract class InStream extends I
     }
 
     // 2. Go thru the blocks; add stuff to results and prepare the 
decompression work (see below).
-    if (cOffset > current.offset) {
+    if (cOffset > current.getOffset()) {
       // Target compression block is in the middle of the range; slice the 
range in two.
       current = current.split(cOffset).next;
     }
@@ -606,7 +605,7 @@ public abstract class InStream extends I
         }
         cache.notifyReused(cc.buffer);
         streamBuffer.cacheBuffers.add(cc.buffer);
-        currentCOffset = cc.end;
+        currentCOffset = cc.getEnd();
         if (DebugUtils.isTraceOrcEnabled()) {
           LOG.info("Adding an already-uncompressed buffer " + cc.buffer);
         }
@@ -620,11 +619,11 @@ public abstract class InStream extends I
           toDecompress = new ArrayList<ProcCacheChunk>();
           toRelease = (zcr == null) ? null : new ArrayList<ByteBuffer>();
         }
-        long originalOffset = bc.offset;
+        long originalOffset = bc.getOffset();
         lastCached = addOneCompressionBuffer(bc, zcr, bufferSize,
             cache, streamBuffer.cacheBuffers, toDecompress, toRelease);
         next = (lastCached != null) ? lastCached.next : null;
-        currentCOffset = (next != null) ? next.offset : originalOffset;
+        currentCOffset = (next != null) ? next.getOffset() : originalOffset;
       }
       if ((endCOffset >= 0 && currentCOffset >= endCOffset) || next == null) {
         break;
@@ -740,10 +739,10 @@ public abstract class InStream extends I
       DiskRangeList ranges, long cOffset) {
     if (cOffset < 0) return ranges;
     // We expect the offset to be valid TODO: rather, validate
-    while (ranges.end <= cOffset) {
+    while (ranges.getEnd() <= cOffset) {
       ranges = ranges.next;
     }
-    while (ranges.offset > cOffset) {
+    while (ranges.getOffset() > cOffset) {
       ranges = ranges.prev;
     }
     return ranges;
@@ -767,7 +766,7 @@ public abstract class InStream extends I
       List<ProcCacheChunk> toDecompress, List<ByteBuffer> toRelease) throws 
IOException {
     ByteBuffer slice = null;
     ByteBuffer compressed = current.chunk;
-    long cbStartOffset = current.offset;
+    long cbStartOffset = current.getOffset();
     int b0 = compressed.get() & 0xff;
     int b1 = compressed.get() & 0xff;
     int b2 = compressed.get() & 0xff;
@@ -788,13 +787,13 @@ public abstract class InStream extends I
       slice = compressed.slice();
       slice.limit(chunkLength);
       ProcCacheChunk cc = addOneCompressionBlockByteBuffer(slice, 
isUncompressed, cbStartOffset,
-          cbEndOffset, chunkLength, consumedLength, current, cache, 
toDecompress, cacheBuffers);
+          cbEndOffset, chunkLength, current, cache, toDecompress, 
cacheBuffers);
       if (compressed.remaining() <= 0 && zcr != null) {
         toRelease.add(compressed);
       }
       return cc;
     }
-    if (current.end < cbEndOffset && !current.hasContiguousNext()) {
+    if (current.getEnd() < cbEndOffset && !current.hasContiguousNext()) {
       return null; // This is impossible to read from this chunk.
     }
 
@@ -828,7 +827,7 @@ public abstract class InStream extends I
         slice.limit(remaining);
         copy.put(slice);
         ProcCacheChunk cc = addOneCompressionBlockByteBuffer(
-            copy, isUncompressed, cbStartOffset, cbEndOffset, remaining, 
remaining,
+            copy, isUncompressed, cbStartOffset, cbEndOffset, remaining,
             (BufferChunk)next, cache, toDecompress, cacheBuffers);
         if (compressed.remaining() <= 0 && zcr != null) {
           zcr.releaseBuffer(compressed); // We copied the entire buffer.
@@ -857,9 +856,7 @@ public abstract class InStream extends I
    * @param cbStartOffset Compressed start offset of the fCB.
    * @param cbEndOffset Compressed end offset of the fCB.
    * @param lastRange The buffer from which the last (or all) bytes of fCB 
come.
-   * @param lastPartChunkLength The number of compressed bytes consumed from 
last *chunk* into fullCompressionBlock.
-   * @param lastPartConsumedLength The number of compressed bytes consumed 
from last *range* into fullCompressionBlock.
-   *                               Can be different from lastPartChunkLength 
due to header.
+   * @param lastChunkLength The number of compressed bytes consumed from last 
*chunk* into fullCompressionBlock.
    * @param ranges The iterator of all compressed ranges for the stream, 
pointing at lastRange.
    * @param lastChunk 
    * @param toDecompress See addOneCompressionBuffer.
@@ -867,9 +864,9 @@ public abstract class InStream extends I
    * @return New cache buffer.
    */
   private static ProcCacheChunk addOneCompressionBlockByteBuffer(ByteBuffer 
fullCompressionBlock,
-      boolean isUncompressed, long cbStartOffset, long cbEndOffset, int 
lastPartChunkLength,
-      int lastPartConsumedLength, BufferChunk lastChunk, LowLevelCache cache,
-      List<ProcCacheChunk> toDecompress, List<LlapMemoryBuffer> cacheBuffers) {
+      boolean isUncompressed, long cbStartOffset, long cbEndOffset, int 
lastChunkLength,
+      BufferChunk lastChunk, LowLevelCache cache, List<ProcCacheChunk> 
toDecompress,
+      List<LlapMemoryBuffer> cacheBuffers) {
     // Prepare future cache buffer.
     LlapMemoryBuffer futureAlloc = cache.createUnallocated();
     // Add it to result in order we are processing.
@@ -880,24 +877,21 @@ public abstract class InStream extends I
     toDecompress.add(cc);
     // Adjust the compression block position.
     if (DebugUtils.isTraceOrcEnabled()) {
-      LOG.info("Adjusting " + lastChunk + " to consume " + lastPartChunkLength
-          + " compressed / " + lastPartConsumedLength + " total bytes");
+      LOG.info("Adjusting " + lastChunk + " to consume " + lastChunkLength + " 
compressed bytes");
     }
-    lastChunk.chunk.position(lastChunk.chunk.position() + lastPartChunkLength);
-    lastChunk.offset += lastPartConsumedLength;
+    lastChunk.chunk.position(lastChunk.chunk.position() + lastChunkLength);
     // Finally, put it in the ranges list for future use (if shared between 
RGs).
     // Before anyone else accesses it, it would have been allocated and 
decompressed locally.
     if (lastChunk.chunk.remaining() <= 0) {
       if (DebugUtils.isTraceOrcEnabled()) {
         LOG.info("Replacing " + lastChunk + " with " + cc + " in the buffers");
       }
-      assert lastChunk.offset == lastChunk.end;
       lastChunk.replaceSelfWith(cc);
     } else {
       if (DebugUtils.isTraceOrcEnabled()) {
         LOG.info("Adding " + cc + " before " + lastChunk + " in the buffers");
       }
-      lastChunk.insertBefore(cc);
+      lastChunk.insertPartBefore(cc);
     }
     return cc;
   }

Modified: 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java?rev=1669493&r1=1669492&r2=1669493&view=diff
==============================================================================
--- 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java 
(original)
+++ 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java 
Fri Mar 27 02:06:54 2015
@@ -145,7 +145,6 @@ public class OrcSplit extends FileSplit
     if (hasFileId) {
       fileId = in.readLong();
     }
-    LOG.error("TODO# Got file ID " + fileId + " for " + getPath());
   }
 
   FileMetaInfo getFileMetaInfo(){

Modified: 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1669493&r1=1669492&r2=1669493&view=diff
==============================================================================
--- 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
 (original)
+++ 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
 Fri Mar 27 02:06:54 2015
@@ -2974,12 +2974,9 @@ public class RecordReaderImpl implements
     }
     if (bufferChunks != null) {
       if (zcr != null) {
-        DiskRangeList range = bufferChunks;
-        while (range != null) {
-          if (range instanceof BufferChunk) {
-            zcr.releaseBuffer(((BufferChunk)range).chunk);
-          }
-          range = range.next;
+        for (DiskRangeList range = bufferChunks; range != null; range = 
range.next) {
+          if (!(range instanceof BufferChunk)) continue;
+          zcr.releaseBuffer(((BufferChunk)range).chunk);
         }
       }
       bufferChunks = null;
@@ -3077,22 +3074,23 @@ public class RecordReaderImpl implements
     }
 
     @Override
-    public DiskRange slice(long offset, long end) {
+    public DiskRange sliceAndShift(long offset, long end, long shiftBy) {
       assert offset <= end && offset >= this.offset && end <= this.end;
+      assert offset + shiftBy >= 0;
       ByteBuffer sliceBuf = chunk.slice();
       int newPos = (int)(offset - this.offset);
       int newLimit = newPos + (int)(end - offset);
-      // TODO: temporary
       try {
         sliceBuf.position(newPos);
         sliceBuf.limit(newLimit);
       } catch (Throwable t) {
         LOG.error("Failed to slice buffer chunk with range" + " [" + 
this.offset + ", " + this.end
             + "), position: " + chunk.position() + " limit: " + chunk.limit() 
+ ", "
-            + (chunk.isDirect() ? "direct" : "array") + "; to [" + offset + ", 
" + end + ") " + t.getClass());
+            + (chunk.isDirect() ? "direct" : "array") + "; to [" + offset + ", 
" + end + ") "
+            + t.getClass());
         throw new RuntimeException(t);
       }
-      return new BufferChunk(sliceBuf, offset);
+      return new BufferChunk(sliceBuf, offset + shiftBy);
     }
 
     @Override
@@ -3176,24 +3174,6 @@ public class RecordReaderImpl implements
     return list.extract();
   }
 
-  /**
-   * Update the disk ranges to collapse adjacent or overlapping ranges. It
-   * assumes that the ranges are sorted.
-   * @param ranges the list of disk ranges to merge
-   */
-  static void mergeDiskRanges(DiskRangeList range) {
-    while (range != null && range.next != null) {
-      DiskRangeList next = range.next;
-      if (RecordReaderUtils.overlap(range.offset, range.end, next.offset, 
next.end)) {
-        range.offset = Math.min(range.offset, next.offset);
-        range.end = Math.max(range.end, next.end);
-        range.removeAfter();
-      } else {
-        range = next;
-      }
-    }
-  }
-
   void createStreams(List<OrcProto.Stream> streamDescriptions,
                             DiskRangeList ranges,
                             boolean[] includeColumn,
@@ -3233,7 +3213,6 @@ public class RecordReaderImpl implements
     if (LOG.isDebugEnabled()) {
       LOG.debug("chunks = " + RecordReaderUtils.stringifyDiskRanges(toRead));
     }
-    mergeDiskRanges(toRead);
     if (this.cache != null) {
       toRead = cache.getFileData(fileId, toRead, stripe.getOffset());
     }

Modified: 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java?rev=1669493&r1=1669492&r2=1669493&view=diff
==============================================================================
--- 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java
 (original)
+++ 
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java
 Fri Mar 27 02:06:54 2015
@@ -245,8 +245,8 @@ public class RecordReaderUtils {
         range = range.next;
         continue;
       }
-      int len = (int) (range.end - range.offset);
-      long off = range.offset;
+      int len = (int) (range.getEnd() - range.getOffset());
+      long off = range.getOffset();
       file.seek(base + off);
       if (zcr != null) {
         boolean hasReplaced = false;
@@ -267,11 +267,11 @@ public class RecordReaderUtils {
       } else if (doForceDirect) {
         ByteBuffer directBuf = ByteBuffer.allocateDirect(len);
         readDirect(file, len, directBuf, true);
-        range = range.replaceSelfWith(new BufferChunk(directBuf, 
range.offset));
+        range = range.replaceSelfWith(new BufferChunk(directBuf, 
range.getOffset()));
       } else {
         byte[] buffer = new byte[len];
         file.readFully(buffer, 0, buffer.length);
-        range = range.replaceSelfWith(new BufferChunk(ByteBuffer.wrap(buffer), 
range.offset));
+        range = range.replaceSelfWith(new BufferChunk(ByteBuffer.wrap(buffer), 
range.getOffset()));
       }
       range = range.next;
     }
@@ -320,37 +320,31 @@ public class RecordReaderUtils {
     boolean inRange = false;
     while (range != null) {
       if (!inRange) {
-        if (range.end <= offset) {
+        if (range.getEnd() <= offset) {
           range = range.next;
           continue; // Skip until we are in range.
         }
         inRange = true;
-        if (range.offset < offset) {
+        if (range.getOffset() < offset) {
           // Partial first buffer, add a slice of it.
-          DiskRange partial = range.slice(offset, Math.min(streamEnd, 
range.end));
-          partial.shiftBy(-offset);
-          buffers.add(partial);
-          if (range.end >= streamEnd) break; // Partial first buffer is also 
partial last buffer.
+          buffers.add(range.sliceAndShift(offset, Math.min(streamEnd, 
range.getEnd()), -offset));
+          if (range.getEnd() >= streamEnd) break; // Partial first buffer is 
also partial last buffer.
           range = range.next;
           continue;
         }
-      } else if (range.offset >= streamEnd) {
+      } else if (range.getOffset() >= streamEnd) {
         break;
       }
-      if (range.end > streamEnd) {
+      if (range.getEnd() > streamEnd) {
         // Partial last buffer (may also be the first buffer), add a slice of 
it.
-        DiskRange partial = range.slice(range.offset, streamEnd);
-        partial.shiftBy(-offset);
-        buffers.add(partial);
+        buffers.add(range.sliceAndShift(range.getOffset(), streamEnd, 
-offset));
         break;
       }
       // Buffer that belongs entirely to one stream.
       // TODO: ideally we would want to reuse the object and remove it from 
the list, but we cannot
       //       because bufferChunks is also used by clearStreams for zcr. 
Create a useless dup.
-      DiskRange full = range.slice(range.offset, range.end);
-      full.shiftBy(-offset);
-      buffers.add(full);
-      if (range.end == streamEnd) break;
+      buffers.add(range.sliceAndShift(range.getOffset(), range.getEnd(), 
-offset));
+      if (range.getEnd() == streamEnd) break;
       range = range.next;
     }
     return buffers;

Modified: 
hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java
URL: 
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java?rev=1669493&r1=1669492&r2=1669493&view=diff
==============================================================================
--- 
hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java
 (original)
+++ 
hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java
 Fri Mar 27 02:06:54 2015
@@ -801,28 +801,6 @@ public class TestRecordReaderImpl {
   }
 
   @Test
-  public void testMergeDiskRanges() throws Exception {
-    DiskRangeList list = diskRanges(100, 200, 300, 400, 500, 600);
-    RecordReaderImpl.mergeDiskRanges(list);
-    assertThat(list, is(diskRanges(100, 200, 300, 400, 500, 600)));
-    list = diskRanges(100, 200, 150, 300, 400, 500);
-    RecordReaderImpl.mergeDiskRanges(list);
-    assertThat(list, is(diskRanges(100, 300, 400, 500)));
-    list = diskRanges(100, 200, 300, 400, 400, 500);
-    RecordReaderImpl.mergeDiskRanges(list);
-    assertThat(list, is(diskRanges(100, 200, 300, 500)));
-    list = diskRanges(100, 200, 0, 300);
-    RecordReaderImpl.mergeDiskRanges(list);
-    assertThat(list, is(diskRanges(0, 300)));
-    list = diskRanges(0, 500, 200, 400);
-    RecordReaderImpl.mergeDiskRanges(list);
-    assertThat(list, is(diskRanges(0, 500)));
-    list = diskRanges(0, 100, 100, 200, 200, 300, 300, 400);
-    RecordReaderImpl.mergeDiskRanges(list);
-    assertThat(list, is(diskRanges(0, 400)));
-  }
-
-  @Test
   public void testGetIndexPosition() throws Exception {
     assertEquals(0, RecordReaderUtils.getIndexPosition
         (OrcProto.ColumnEncoding.Kind.DIRECT, OrcProto.Type.Kind.INT,


Reply via email to