HIVE-10777 : LLAP: add pre-fragment and per-table cache details (Sergey 
Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2024c962
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2024c962
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2024c962

Branch: refs/heads/llap
Commit: 2024c962d28d9bf777e8e32364d2685f8ff2cba3
Parents: 2611ddb
Author: Sergey Shelukhin <ser...@apache.org>
Authored: Tue May 26 13:27:39 2015 -0700
Committer: Sergey Shelukhin <ser...@apache.org>
Committed: Tue May 26 13:27:39 2015 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   1 +
 .../llap/counters/LowLevelCacheCounters.java    |  26 ++++
 .../hive/llap/io/api/cache/LowLevelCache.java   |  21 ++-
 .../hive/llap/cache/LowLevelCacheImpl.java      |  65 ++++++++-
 .../llap/counters/QueryFragmentCounters.java    | 108 ++++++++++++---
 .../hive/llap/io/api/impl/LlapInputFormat.java  |  22 ++-
 .../llap/io/decode/OrcEncodedDataConsumer.java  |   2 +
 .../llap/io/encoded/OrcEncodedDataReader.java   | 133 ++++++++++++++++---
 .../org/apache/hadoop/hive/llap/DebugUtils.java |   2 +-
 .../org/apache/hadoop/hive/ql/exec/DDLTask.java |   3 +-
 .../hive/ql/io/orc/EncodedReaderImpl.java       |  84 +++++++-----
 .../apache/hadoop/hive/ql/io/orc/InStream.java  |  12 +-
 .../apache/hadoop/hive/ql/io/orc/OrcSplit.java  |   2 +-
 .../apache/hadoop/hive/ql/io/orc/Reader.java    |   4 +-
 .../hadoop/hive/ql/io/orc/ReaderImpl.java       |   6 +-
 15 files changed, 396 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/2024c962/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index f4a70b2..818aee3 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2166,6 +2166,7 @@ public class HiveConf extends Configuration {
     LLAP_ORC_CACHE_ALLOCATE_DIRECT("hive.llap.io.cache.direct", true, ""),
     LLAP_USE_LRFU("hive.llap.io.use.lrfu", false, ""),
     LLAP_LRFU_LAMBDA("hive.llap.io.lrfu.lambda", 0.01f, ""),
+    LLAP_ORC_ENABLE_TIME_COUNTERS("hive.llap.io.orc.time.counters", true, ""),
     LLAP_AUTO_ALLOW_UBER("hive.llap.auto.allow.uber", true,
        "Whether or not to allow the planner to run vertices in the AM"),
     LLAP_AUTO_ENFORCE_TREE("hive.llap.auto.enforce.tree", true,

http://git-wip-us.apache.org/repos/asf/hive/blob/2024c962/llap-client/src/java/org/apache/hadoop/hive/llap/counters/LowLevelCacheCounters.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/counters/LowLevelCacheCounters.java
 
b/llap-client/src/java/org/apache/hadoop/hive/llap/counters/LowLevelCacheCounters.java
new file mode 100644
index 0000000..d862a83
--- /dev/null
+++ 
b/llap-client/src/java/org/apache/hadoop/hive/llap/counters/LowLevelCacheCounters.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.counters;
+
+public interface LowLevelCacheCounters {
+  void recordCacheHit(long bytesHit);
+  void recordCacheMiss(long bytesMissed);
+  void recordAllocBytes(long bytesWasted, long bytesAllocated);
+  void recordHdfsTime(long timeUs);
+  long startTimeCounter();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/2024c962/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java
 
b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java
index 2c35c50..fcc7eed 100644
--- 
a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java
+++ 
b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java
@@ -22,7 +22,9 @@ import java.util.List;
 
 import org.apache.hadoop.hive.common.DiskRange;
 import org.apache.hadoop.hive.common.DiskRangeList;
+import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListMutateHelper;
 import  org.apache.hadoop.hive.llap.cache.Allocator;
+import org.apache.hadoop.hive.llap.counters.LowLevelCacheCounters;
 
 public interface LowLevelCache {
   public enum Priority {
@@ -30,6 +32,15 @@ public interface LowLevelCache {
     HIGH
   }
 
+  public class CacheListHelper extends DiskRangeListMutateHelper {
+    public CacheListHelper(DiskRangeList head) {
+      super(head);
+    }
+
+    /** Workaround for Java's limitations, used to return stuff from 
getFileData. */
+    public boolean didGetAllData;
+  }
+
   /**
    * Gets file data for particular offsets. The range list is modified in 
place; it is then
    * returned (since the list head could have changed). Ranges are replaced 
with cached ranges.
@@ -49,6 +60,9 @@ public interface LowLevelCache {
    *    Some sort of InvalidCacheChunk could be placed to avoid them. TODO
    * @param base base offset for the ranges (stripe/stream offset in case of 
ORC).
    */
+  DiskRangeList getFileData(long fileId, DiskRangeList range, long baseOffset,
+      CacheChunkFactory factory, LowLevelCacheCounters qfCounters);
+
   DiskRangeList getFileData(
       long fileId, DiskRangeList range, long baseOffset, CacheChunkFactory 
factory);
 
@@ -57,8 +71,11 @@ public interface LowLevelCache {
    * @return null if all data was put; bitmask indicating which chunks were 
not put otherwise;
    *         the replacement chunks from cache are updated directly in the 
array.
    */
-  long[] putFileData(
-      long fileId, DiskRange[] ranges, LlapMemoryBuffer[] chunks, long base, 
Priority priority);
+  long[] putFileData(long fileId, DiskRange[] ranges, LlapMemoryBuffer[] 
chunks,
+      long base, Priority priority, LowLevelCacheCounters qfCounters);
+
+  long[] putFileData(long fileId, DiskRange[] ranges, LlapMemoryBuffer[] 
chunks,
+      long base, Priority priority);
 
   Allocator getAllocator();
 

http://git-wip-us.apache.org/repos/asf/hive/blob/2024c962/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java 
b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
index 249ed56..8c65844 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hive.common.DiskRange;
 import org.apache.hadoop.hive.common.DiskRangeList;
 import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListMutateHelper;
 import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.counters.LowLevelCacheCounters;
 import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
 import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
@@ -75,33 +76,66 @@ public class LowLevelCacheImpl implements LowLevelCache, 
LlapOomDebugDump {
   @Override
   public DiskRangeList getFileData(
       long fileId, DiskRangeList ranges, long baseOffset, CacheChunkFactory 
factory) {
+    return getFileData(fileId, ranges, baseOffset, factory, null);
+  }
+
+  @Override
+  public DiskRangeList getFileData(long fileId, DiskRangeList ranges, long 
baseOffset,
+      CacheChunkFactory factory, LowLevelCacheCounters qfCounters) {
     if (ranges == null) return null;
+    DiskRangeList prev = ranges.prev;
     FileCache subCache = cache.get(fileId);
     if (subCache == null || !subCache.incRef()) {
-      metrics.incrCacheRequestedBytes(ranges.getTotalLength());
+      long totalMissed = ranges.getTotalLength();
+      metrics.incrCacheRequestedBytes(totalMissed);
+      if (qfCounters != null) {
+        qfCounters.recordCacheMiss(totalMissed);
+      }
+      if (prev != null && prev instanceof CacheListHelper) {
+        ((CacheListHelper)prev).didGetAllData = false;
+      }
       return ranges;
     }
+    CacheListHelper resultObj = null;
     try {
-      DiskRangeList prev = ranges.prev;
       if (prev == null) {
         prev = new DiskRangeListMutateHelper(ranges);
+      } else if (prev instanceof CacheListHelper) {
+        resultObj = (CacheListHelper)prev;
+        resultObj.didGetAllData = true;
       }
       DiskRangeList current = ranges;
       while (current != null) {
         metrics.incrCacheRequestedBytes(current.getLength());
         // We assume ranges in "ranges" are non-overlapping; thus, we will 
save next in advance.
         DiskRangeList next = current.next;
-        getOverlappingRanges(baseOffset, current, subCache.cache, factory);
+        getOverlappingRanges(baseOffset, current, subCache.cache, factory, 
resultObj);
         current = next;
       }
-      return prev.next;
     } finally {
       subCache.decRef();
     }
+    if (qfCounters != null) {
+      DiskRangeList current = prev.next;
+      long bytesHit = 0, bytesMissed = 0;
+      while (current != null) {
+        // This assumes no ranges passed to cache to fetch have data 
beforehand.
+        if (current.hasData()) {
+          bytesHit += current.getLength();
+        } else {
+          bytesMissed += current.getLength();
+        }
+        current = current.next;
+      }
+      qfCounters.recordCacheHit(bytesHit);
+      qfCounters.recordCacheMiss(bytesMissed);
+    }
+    return prev.next;
   }
 
   private void getOverlappingRanges(long baseOffset, DiskRangeList 
currentNotCached,
-      ConcurrentSkipListMap<Long, LlapDataBuffer> cache, CacheChunkFactory 
factory) {
+      ConcurrentSkipListMap<Long, LlapDataBuffer> cache, CacheChunkFactory 
factory,
+      CacheListHelper resultObj) {
     long absOffset = currentNotCached.getOffset() + baseOffset;
     if (!doAssumeGranularBlocks) {
       // This currently only happens in tests. See getFileData comment on the 
interface.
@@ -127,6 +161,7 @@ public class LowLevelCacheImpl implements LowLevelCache, 
LlapOomDebugDump {
       if (!lockBuffer(buffer, true)) {
         // If we cannot lock, remove this from cache and continue.
         matches.remove();
+        resultObj.didGetAllData = false;
         continue;
       }
       long cacheOffset = e.getKey();
@@ -137,19 +172,24 @@ public class LowLevelCacheImpl implements LowLevelCache, 
LlapOomDebugDump {
       cacheEnd = cacheOffset + buffer.declaredCachedLength;
       DiskRangeList currentCached = factory.createCacheChunk(buffer,
           cacheOffset - baseOffset, cacheEnd - baseOffset);
-      currentNotCached = addCachedBufferToIter(currentNotCached, 
currentCached);
+      currentNotCached = addCachedBufferToIter(currentNotCached, 
currentCached, resultObj);
       metrics.incrCacheHitBytes(Math.min(requestedLength, 
currentCached.getLength()));
     }
+    if (currentNotCached != null) {
+      assert !currentNotCached.hasData();
+      resultObj.didGetAllData = false;
+    }
   }
 
   /**
    * Adds cached buffer to buffer list.
    * @param currentNotCached Pointer to the list node where we are inserting.
    * @param currentCached The cached buffer found for this node, to insert.
+   * @param resultObj
    * @return The new currentNotCached pointer, following the cached buffer 
insertion.
    */
   private DiskRangeList addCachedBufferToIter(
-      DiskRangeList currentNotCached, DiskRangeList currentCached) {
+      DiskRangeList currentNotCached, DiskRangeList currentCached, 
CacheListHelper resultObj) {
     if (currentNotCached.getOffset() >= currentCached.getOffset()) {
       if (currentNotCached.getEnd() <= currentCached.getEnd()) {  // we assume 
it's always "==" now
         // Replace the entire current DiskRange with new cached range.
@@ -161,6 +201,8 @@ public class LowLevelCacheImpl implements LowLevelCache, 
LlapOomDebugDump {
         return currentNotCached;
       }
     } else {
+      // There's some part of current buffer that is not cached.
+      resultObj.didGetAllData = false;
       assert currentNotCached.getOffset() < currentCached.getOffset()
         || currentNotCached.prev == null
         || currentNotCached.prev.getEnd() <= currentCached.getOffset();
@@ -192,6 +234,12 @@ public class LowLevelCacheImpl implements LowLevelCache, 
LlapOomDebugDump {
   @Override
   public long[] putFileData(long fileId, DiskRange[] ranges,
       LlapMemoryBuffer[] buffers, long baseOffset, Priority priority) {
+    return putFileData(fileId, ranges, buffers, baseOffset, priority, null);
+  }
+
+  @Override
+  public long[] putFileData(long fileId, DiskRange[] ranges, 
LlapMemoryBuffer[] buffers,
+      long baseOffset, Priority priority, LowLevelCacheCounters qfCounters) {
     long[] result = null;
     assert buffers.length == ranges.length;
     FileCache subCache = getOrAddFileSubCache(fileId);
@@ -211,6 +259,9 @@ public class LowLevelCacheImpl implements LowLevelCache, 
LlapOomDebugDump {
           if (oldVal == null) {
             // Cached successfully, add to policy.
             cachePolicy.cache(buffer, priority);
+            if (qfCounters != null) {
+              qfCounters.recordAllocBytes(buffer.byteBuffer.remaining(), 
buffer.allocSize);
+            }
             break;
           }
           if (DebugUtils.isTraceCachingEnabled()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/2024c962/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java
index bc7c2a8..7658b03 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java
@@ -17,32 +17,53 @@
  */
 package org.apache.hadoop.hive.llap.counters;
 
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLongArray;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 
 /**
  * Per query counters.
  */
-public class QueryFragmentCounters {
+public class QueryFragmentCounters implements LowLevelCacheCounters {
+  private final boolean doUseTimeCounters;
 
   public static enum Counter {
     NUM_VECTOR_BATCHES,
     NUM_DECODED_BATCHES,
     SELECTED_ROWGROUPS,
     NUM_ERRORS,
-    ROWS_EMITTED
+    ROWS_EMITTED,
+    METADATA_CACHE_HIT,
+    METADATA_CACHE_MISS,
+    CACHE_HIT_BYTES,
+    CACHE_MISS_BYTES,
+    ALLOCATED_BYTES,
+    ALLOCATED_USED_BYTES,
+    TOTAL_IO_TIME_US,
+    DECODE_TIME_US,
+    HDFS_TIME_US,
+    CONSUMER_TIME_US
   }
 
-  private String appId;
-  private Map<String, Long> counterMap;
-
-  public QueryFragmentCounters() {
-    this("Not Specified");
+  public static enum Desc {
+    TABLE
   }
 
-  public QueryFragmentCounters(String applicationId) {
-    this.appId = applicationId;
-    this.counterMap = new ConcurrentHashMap<>();
+  private final AtomicLongArray fixedCounters;
+  private final String[] descs;
+
+  public QueryFragmentCounters(Configuration conf) {
+    fixedCounters = new AtomicLongArray(Counter.values().length);
+    descs = new String[Desc.values().length];
+    doUseTimeCounters = HiveConf.getBoolVar(conf, 
ConfVars.LLAP_ORC_ENABLE_TIME_COUNTERS);
+    if (!doUseTimeCounters) {
+      setCounter(Counter.TOTAL_IO_TIME_US, -1);
+      setCounter(Counter.DECODE_TIME_US, -1);
+      setCounter(Counter.HDFS_TIME_US, -1);
+      setCounter(Counter.HDFS_TIME_US, -1);
+    }
   }
 
   public void incrCounter(Counter counter) {
@@ -50,20 +71,67 @@ public class QueryFragmentCounters {
   }
 
   public void incrCounter(Counter counter, long delta) {
-    if (counterMap.containsKey(counter.name())) {
-      long val = counterMap.get(counter.name());
-      counterMap.put(counter.name(), val + delta);
-    } else {
-      setCounter(counter, delta);
-    }
+    fixedCounters.addAndGet(counter.ordinal(), delta);
+  }
+
+  @Override
+  public final long startTimeCounter() {
+    return (doUseTimeCounters ? System.nanoTime() : 0);
+  }
+
+  public void incrTimeCounter(Counter counter, long startTime) {
+    if (!doUseTimeCounters) return;
+    fixedCounters.addAndGet(counter.ordinal(), System.nanoTime() - startTime);
   }
 
   public void setCounter(Counter counter, long value) {
-    counterMap.put(counter.name(), value);
+    fixedCounters.set(counter.ordinal(), value);
+  }
+
+  public void setDesc(Desc key, String desc) {
+    descs[key.ordinal()] = desc;
+  }
+
+  @Override
+  public void recordCacheHit(long bytesHit) {
+    incrCounter(Counter.CACHE_HIT_BYTES, bytesHit);
+  }
+
+  @Override
+  public void recordCacheMiss(long bytesMissed) {
+    incrCounter(Counter.CACHE_MISS_BYTES, bytesMissed);
+  }
+
+  @Override
+  public void recordAllocBytes(long bytesUsed, long bytesAllocated) {
+    incrCounter(Counter.ALLOCATED_USED_BYTES, bytesUsed);
+    incrCounter(Counter.ALLOCATED_BYTES, bytesAllocated);
+  }
+
+  @Override
+  public void recordHdfsTime(long startTime) {
+    incrTimeCounter(Counter.HDFS_TIME_US, startTime);
   }
 
   @Override
   public String toString() {
-    return "ApplicationId: " + appId + " Counters: " + counterMap;
+    // We rely on NDC information in the logs to map counters to attempt.
+    // If that is not available, appId should either be passed in, or 
extracted from NDC.
+    StringBuilder sb = new StringBuilder("Fragment counters for [");
+    for (int i = 0; i < descs.length; ++i) {
+      if (i != 0) {
+        sb.append(", ");
+      }
+      sb.append(descs[i]);
+    }
+    sb.append("]: [ ");
+    for (int i = 0; i < fixedCounters.length(); ++i) {
+      if (i != 0) {
+        sb.append(", ");
+      }
+      
sb.append(Counter.values()[i].name()).append("=").append(fixedCounters.get(i));
+    }
+    sb.append(" ]");
+    return sb.toString();
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/2024c962/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
index 1cf5158..a4f69da 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hive.llap.Consumer;
 import org.apache.hadoop.hive.llap.ConsumerFeedback;
 import org.apache.hadoop.hive.llap.DebugUtils;
 import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
+import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters.Counter;
 import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer;
 import org.apache.hadoop.hive.llap.io.decode.ReadPipeline;
 import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -111,13 +112,14 @@ public class LlapInputFormat
     private boolean isDone = false, isClosed = false;
     private ConsumerFeedback<ColumnVectorBatch> feedback;
     private final QueryFragmentCounters counters;
+    private long firstReturnTime;
 
     public LlapRecordReader(JobConf job, FileSplit split, List<Integer> 
includedCols) {
       this.split = split;
       this.columnIds = includedCols;
       this.sarg = SearchArgumentFactory.createFromConf(job);
       this.columnNames = ColumnProjectionUtils.getReadColumnNames(job);
-      this.counters = new QueryFragmentCounters();
+      this.counters = new QueryFragmentCounters(job);
       try {
         rbCtx = new VectorizedRowBatchCtx();
         rbCtx.init(job, split);
@@ -150,7 +152,13 @@ public class LlapInputFormat
         feedback.stop();
         throw new IOException(e);
       }
-      if (cvb == null) return false;
+      if (cvb == null) {
+        if (isFirst) {
+          firstReturnTime = counters.startTimeCounter();
+        }
+        counters.incrTimeCounter(Counter.CONSUMER_TIME_US, firstReturnTime);
+        return false;
+      }
       if (columnIds.size() != cvb.cols.length) {
         throw new RuntimeException("Unexpected number of columns, VRB has " + 
columnIds.size()
             + " included, but the reader returned " + cvb.cols.length);
@@ -162,6 +170,9 @@ public class LlapInputFormat
       }
       value.selectedInUse = false;
       value.size = cvb.size;
+      if (isFirst) {
+        firstReturnTime = counters.startTimeCounter();
+      }
       return true;
     }
 
@@ -191,7 +202,8 @@ public class LlapInputFormat
     }
 
     ColumnVectorBatch nextCvb() throws InterruptedException, IOException {
-      if (lastCvb != null) {
+      boolean isFirst = (lastCvb == null);
+      if (!isFirst) {
         feedback.returnData(lastCvb);
       }
       synchronized (pendingData) {
@@ -244,7 +256,7 @@ public class LlapInputFormat
         LlapIoImpl.LOG.info("close called; closed " + isClosed + ", done " + 
isDone
             + ", err " + pendingError + ", pending " + pendingData.size());
       }
-      LlapIoImpl.LOG.info("QueryFragmentCounters: " + counters);
+      LlapIoImpl.LOG.info(counters); // This is where counters are logged!
       feedback.stop();
       rethrowErrorIfAny();
     }
@@ -263,7 +275,6 @@ public class LlapInputFormat
         LlapIoImpl.LOG.info("setDone called; closed " + isClosed
           + ", done " + isDone + ", err " + pendingError + ", pending " + 
pendingData.size());
       }
-      LlapIoImpl.LOG.info("DONE: QueryFragmentCounters: " + counters);
       synchronized (pendingData) {
         isDone = true;
         pendingData.notifyAll();
@@ -291,7 +302,6 @@ public class LlapInputFormat
       LlapIoImpl.LOG.info("setError called; closed " + isClosed
         + ", done " + isDone + ", err " + pendingError + ", pending " + 
pendingData.size());
       assert t != null;
-      LlapIoImpl.LOG.info("ERROR: QueryFragmentCounters: " + counters);
       synchronized (pendingData) {
         pendingError = t;
         pendingData.notifyAll();

http://git-wip-us.apache.org/repos/asf/hive/blob/2024c962/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
index c6ff498..c16be38 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
@@ -70,6 +70,7 @@ public class OrcEncodedDataConsumer
   @Override
   protected void decodeBatch(OrcEncodedColumnBatch batch,
       Consumer<ColumnVectorBatch> downstreamConsumer) {
+    long startTime = counters.startTimeCounter();
     int currentStripeIndex = batch.batchKey.stripeIx;
 
     boolean sameStripe = currentStripeIndex == previousStripeIndex;
@@ -116,6 +117,7 @@ public class OrcEncodedDataConsumer
         downstreamConsumer.consumeData(cvb);
         counters.incrCounter(QueryFragmentCounters.Counter.ROWS_EMITTED, 
batchSize);
       }
+      counters.incrTimeCounter(QueryFragmentCounters.Counter.DECODE_TIME_US, 
startTime);
       counters.incrCounter(QueryFragmentCounters.Counter.NUM_VECTOR_BATCHES, 
maxBatchesRG);
       counters.incrCounter(QueryFragmentCounters.Counter.NUM_DECODED_BATCHES);
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/2024c962/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index 13292a6..d76316a 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -18,6 +18,7 @@ import org.apache.hadoop.hive.llap.ConsumerFeedback;
 import org.apache.hadoop.hive.llap.DebugUtils;
 import org.apache.hadoop.hive.llap.cache.Cache;
 import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
+import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters.Counter;
 import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch.StreamBuffer;
 import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
 import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
@@ -28,6 +29,8 @@ import 
org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer;
 import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata;
 import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache;
 import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata;
+import org.apache.hadoop.hive.ql.exec.DDLTask;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.orc.CompressionKind;
 import org.apache.hadoop.hive.ql.io.orc.EncodedReader;
 import org.apache.hadoop.hive.ql.io.orc.EncodedReaderImpl;
@@ -126,13 +129,18 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
 
   @Override
   protected Void callInternal() throws IOException {
+    long startTime = counters.startTimeCounter();
     if (LlapIoImpl.LOGL.isInfoEnabled()) {
-      LlapIoImpl.LOG.info("Processing split for " + split.getPath());
+      LlapIoImpl.LOG.info("Processing data for " + split.getPath());
     }
-    if (processStop()) return null;
+    if (processStop()) {
+      recordReaderTime(startTime);
+      return null;
+    }
+    counters.setDesc(QueryFragmentCounters.Desc.TABLE, 
getDbAndTableName(split.getPath()));
     orcReader = null;
     // 1. Get file metadata from cache, or create the reader and read it.
-    // Disable filesystem caching for now; Tez closes it and FS cache will fix 
all that
+    // Don't cache the filesystem object for now; Tez closes it and FS cache 
will fix all that
     fs = split.getPath().getFileSystem(conf);
     fileId = determineFileId(fs, split);
 
@@ -147,12 +155,14 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
       // 2. Determine which stripes to read based on the split.
       determineStripesToRead();
     } catch (Throwable t) {
+      recordReaderTime(startTime);
       consumer.setError(t);
       return null;
     }
 
     if (readState.length == 0) {
       consumer.setDone();
+      recordReaderTime(startTime);
       return null; // No data to read.
     }
 
@@ -184,10 +194,15 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
     } catch (Throwable t) {
       cleanupReaders();
       consumer.setError(t);
+      recordReaderTime(startTime);
       return null;
     }
 
-    if (processStop()) return null;
+    if (processStop()) {
+      cleanupReaders();
+      recordReaderTime(startTime);
+      return null;
+    }
 
     // 4. Get data from high-level cache.
     //    If some cols are fully in cache, this will also give us the modified 
list of columns to
@@ -201,6 +216,7 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
         // produceDataFromCache handles its own cleanup.
         consumer.setError(t);
         cleanupReaders();
+        recordReaderTime(startTime);
         return null;
       }
     }
@@ -208,13 +224,14 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
     // 5. Create encoded data reader.
     // In case if we have high-level cache, we will intercept the data and add 
it there;
     // otherwise just pass the data directly to the consumer.
-    Consumer<OrcEncodedColumnBatch> dataConsumer =
-        (cache == null) ? this.consumer : this;
+    Consumer<OrcEncodedColumnBatch> dataConsumer = (cache == null) ? 
this.consumer : this;
     try {
       ensureOrcReader();
-      stripeReader = orcReader.encodedReader(fileId, lowLevelCache, 
dataConsumer);
+      // Reader creating updates HDFS counters, don't do it here.
+      stripeReader = orcReader.encodedReader(fileId, lowLevelCache, counters, 
dataConsumer);
     } catch (Throwable t) {
       consumer.setError(t);
+      recordReaderTime(startTime);
       cleanupReaders();
       return null;
     }
@@ -223,7 +240,11 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
     // TODO: I/O threadpool could be here - one thread per stripe; for now, 
linear.
     OrcBatchKey stripeKey = new OrcBatchKey(fileId, -1, 0);
     for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) {
-      if (processStop()) return null;
+      if (processStop()) {
+        cleanupReaders();
+        recordReaderTime(startTime);
+        return null;
+      }
       int stripeIx = stripeIxFrom + stripeIxMod;
       boolean[][] colRgs = null;
       boolean[] stripeIncludes = null;
@@ -241,7 +262,7 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
         colRgs = readState[stripeIxMod];
 
         // 6.1. Determine the columns to read (usually the same as requested).
-        if (cols == null || cols.size() == colRgs.length) {
+        if (cache == null || cols == null || cols.size() == colRgs.length) {
           cols = columnIds;
           stripeIncludes = globalIncludes;
         } else {
@@ -252,15 +273,21 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
         }
 
         // 6.2. Ensure we have stripe metadata. We might have read it before 
for RG filtering.
+        boolean isFoundInCache = false;
         if (stripeMetadatas != null) {
           stripeMetadata = stripeMetadatas.get(stripeIxMod);
         } else {
           stripeKey.stripeIx = stripeIx;
           stripeMetadata = metadataCache.getStripeMetadata(stripeKey);
-          if (stripeMetadata == null) {
+          isFoundInCache = (stripeMetadata != null);
+          if (!isFoundInCache) {
+            counters.incrCounter(Counter.METADATA_CACHE_MISS);
             ensureMetadataReader();
-            stripeMetadata = metadataCache.putStripeMetadata(new 
OrcStripeMetadata(
-                stripeKey, metadataReader, stripe, stripeIncludes, 
sargColumns));
+            long startTimeHdfs = counters.startTimeCounter();
+            stripeMetadata = new OrcStripeMetadata(
+                stripeKey, metadataReader, stripe, stripeIncludes, 
sargColumns);
+            counters.incrTimeCounter(Counter.HDFS_TIME_US, startTimeHdfs);
+            stripeMetadata = metadataCache.putStripeMetadata(stripeMetadata);
             if (DebugUtils.isTraceOrcEnabled()) {
               LlapIoImpl.LOG.info("Caching stripe " + stripeKey.stripeIx
                   + " metadata with includes: " + 
DebugUtils.toString(stripeIncludes));
@@ -274,15 +301,24 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
             LlapIoImpl.LOG.info("Updating indexes in stripe " + 
stripeKey.stripeIx
                 + " metadata for includes: " + 
DebugUtils.toString(stripeIncludes));
           }
+          assert isFoundInCache;
+          counters.incrCounter(Counter.METADATA_CACHE_MISS);
           ensureMetadataReader();
           updateLoadedIndexes(stripeMetadata, stripe, stripeIncludes, 
sargColumns);
+        } else if (isFoundInCache) {
+          counters.incrCounter(Counter.METADATA_CACHE_HIT);
         }
       } catch (Throwable t) {
         consumer.setError(t);
         cleanupReaders();
+        recordReaderTime(startTime);
+        return null;
+      }
+      if (processStop()) {
+        cleanupReaders();
+        recordReaderTime(startTime);
         return null;
       }
-      if (processStop()) return null;
 
       // 6.3. Finally, hand off to the stripe reader to produce the data.
       //      This is a sync call that will feed data to the consumer.
@@ -296,11 +332,13 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
       } catch (Throwable t) {
         consumer.setError(t);
         cleanupReaders();
+        recordReaderTime(startTime);
         return null;
       }
     }
 
     // Done with all the things.
+    recordReaderTime(startTime);
     dataConsumer.setDone();
     if (DebugUtils.isTraceMttEnabled()) {
       LlapIoImpl.LOG.info("done processing " + split);
@@ -311,6 +349,53 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
     return null;
   }
 
+  private void recordReaderTime(long startTime) {
+    counters.incrTimeCounter(Counter.TOTAL_IO_TIME_US, startTime);
+  }
+
+  private static String getDbAndTableName(Path path) {
+    // Ideally, we'd get this from split; however, split doesn't contain any 
such thing and it's
+    // actually pretty hard to get cause even split generator only uses paths. 
We only need this
+    // for metrics; therefore, brace for BLACK MAGIC!
+    String[] parts = path.toUri().getPath().toString().split(Path.SEPARATOR);
+    int dbIx = -1;
+    // Try to find the default db postfix; don't check two last components - 
at least there
+    // should be a table and file (we could also try to throw away 
partition/bucket/acid stuff).
+    for (int i = 0; i < parts.length - 2; ++i) {
+      if (!parts[i].endsWith(DDLTask.DATABASE_PATH_SUFFIX)) continue;
+      if (dbIx >= 0) {
+        dbIx = -1; // Let's not guess.
+        break;
+      }
+      dbIx = i;
+    }
+    if (dbIx >= 0) {
+      return parts[dbIx].substring(0, parts[dbIx].length() - 3) + "." + 
parts[dbIx + 1];
+    }
+
+    // Just go from the back and throw away everything we think is wrong; skip 
last item, the file.
+    boolean isInPartFields = false;
+    for (int i = parts.length - 2; i >= 0; --i) {
+      String p = parts[i];
+      boolean isPartField = p.contains("=");
+      if ((isInPartFields && !isPartField) || (!isPartField && 
!p.startsWith(AcidUtils.BASE_PREFIX)
+          && !p.startsWith(AcidUtils.DELTA_PREFIX) && 
!p.startsWith(AcidUtils.BUCKET_PREFIX))) {
+        dbIx = i - 1;
+        break;
+      }
+      isInPartFields = isPartField;
+    }
+    // If we found something before we ran out of components, use it.
+    if (dbIx >= 0) {
+      String dbName = parts[dbIx];
+      if (dbName.endsWith(DDLTask.DATABASE_PATH_SUFFIX)) {
+        dbName = dbName.substring(0, dbName.length() - 3);
+      }
+      return dbName + "." + parts[dbIx + 1];
+    }
+    return "unknown";
+  }
+
   private void validateFileMetadata() throws IOException {
     if (fileMetadata.getCompressionKind() == CompressionKind.NONE) return;
     int bufferSize = fileMetadata.getCompressionBufferSize();
@@ -371,7 +456,9 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
     // we pre-allocate the array and never remove entries; so readers should 
be safe.
     synchronized (stripeMetadata) {
       if (stripeMetadata.hasAllIndexes(stripeIncludes)) return;
+      long startTime = counters.startTimeCounter();
       stripeMetadata.loadMissingIndexes(metadataReader, stripe, 
stripeIncludes, sargColumns);
+      counters.incrTimeCounter(Counter.HDFS_TIME_US, startTime);
     }
   }
 
@@ -404,8 +491,10 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
     if (DebugUtils.isTraceOrcEnabled()) {
       LOG.info("Creating reader for " + path + " (" + split.getPath() + ")");
     }
+    long startTime = counters.startTimeCounter();
     ReaderOptions opts = 
OrcFile.readerOptions(conf).filesystem(fs).fileMetadata(fileMetadata);
     orcReader = OrcFile.createReader(path, opts);
+    counters.incrTimeCounter(Counter.HDFS_TIME_US, startTime);
   }
 
   /**
@@ -413,8 +502,13 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
    */
   private OrcFileMetadata getOrReadFileMetadata() throws IOException {
     OrcFileMetadata metadata = metadataCache.getFileMetadata(fileId);
-    if (metadata != null) return metadata;
+    if (metadata != null) {
+      counters.incrCounter(Counter.METADATA_CACHE_HIT);
+      return metadata;
+    }
+    counters.incrCounter(Counter.METADATA_CACHE_MISS);
     ensureOrcReader();
+    // We assume this call doesn't touch HDFS because everything is already 
read; don't add time.
     metadata = new OrcFileMetadata(fileId, orcReader);
     return metadataCache.putFileMetadata(metadata);
   }
@@ -430,11 +524,14 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
       stripeKey.stripeIx = stripeIxMod + stripeIxFrom;
       OrcStripeMetadata value = metadataCache.getStripeMetadata(stripeKey);
       if (value == null || !value.hasAllIndexes(globalInc)) {
+        counters.incrCounter(Counter.METADATA_CACHE_MISS);
         ensureMetadataReader();
         StripeInformation si = 
fileMetadata.getStripes().get(stripeKey.stripeIx);
         if (value == null) {
-          value = metadataCache.putStripeMetadata(
-              new OrcStripeMetadata(stripeKey, metadataReader, si, globalInc, 
sargColumns));
+          long startTime = counters.startTimeCounter();
+          value = new OrcStripeMetadata(stripeKey, metadataReader, si, 
globalInc, sargColumns);
+          counters.incrTimeCounter(Counter.HDFS_TIME_US, startTime);
+          value = metadataCache.putStripeMetadata(value);
           if (DebugUtils.isTraceOrcEnabled()) {
             LlapIoImpl.LOG.info("Caching stripe " + stripeKey.stripeIx
                 + " metadata with includes: " + 
DebugUtils.toString(globalInc));
@@ -448,6 +545,8 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
           }
           updateLoadedIndexes(value, si, globalInc, sargColumns);
         }
+      } else {
+        counters.incrCounter(Counter.METADATA_CACHE_HIT);
       }
       result.add(value);
       consumer.setStripeMetadata(value);
@@ -458,7 +557,9 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
   private void ensureMetadataReader() throws IOException {
     ensureOrcReader();
     if (metadataReader != null) return;
+    long startTime = counters.startTimeCounter();
     metadataReader = orcReader.metadata();
+    counters.incrTimeCounter(Counter.HDFS_TIME_US, startTime);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/2024c962/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java 
b/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java
index fc58cf3..8b61fe4 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java
@@ -36,7 +36,7 @@ public class DebugUtils {
   }
 
   public static boolean isTraceRangesEnabled() {
-    return true; // TODO: temporary, should be hardcoded false
+    return false;
   }
 
   public static boolean isTraceLockingEnabled() {

http://git-wip-us.apache.org/repos/asf/hive/blob/2024c962/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 04c89ae..7f92dda 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -4526,6 +4526,7 @@ public class DDLTask extends Task<DDLWork> implements 
Serializable {
    * @param database
    *          Database.
    */
+  public static final String DATABASE_PATH_SUFFIX = ".db";
   private void makeLocationQualified(Database database) throws HiveException {
     if (database.isSetLocationUri()) {
       database.setLocationUri(Utilities.getQualifiedPath(conf, new 
Path(database.getLocationUri())));
@@ -4534,7 +4535,7 @@ public class DDLTask extends Task<DDLWork> implements 
Serializable {
       // Location is not set we utilize METASTOREWAREHOUSE together with 
database name
       database.setLocationUri(
           Utilities.getQualifiedPath(conf, new Path(HiveConf.getVar(conf, 
HiveConf.ConfVars.METASTOREWAREHOUSE),
-              database.getName().toLowerCase() + ".db")));
+              database.getName().toLowerCase() + DATABASE_PATH_SUFFIX)));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/2024c962/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java
index e931d09..9c4d29e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java
@@ -31,10 +31,12 @@ import 
org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListCreateHelper;
 import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListMutateHelper;
 import org.apache.hadoop.hive.llap.Consumer;
 import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.counters.LowLevelCacheCounters;
 import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
 import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch.StreamBuffer;
 import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
 import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
+import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache.CacheListHelper;
 import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
 import org.apache.hadoop.hive.ql.io.orc.InStream.TrackedCacheChunk;
 import org.apache.hadoop.hive.ql.io.orc.OrcProto.ColumnEncoding;
@@ -62,17 +64,18 @@ import 
org.apache.hive.common.util.FixedSizedObjectPool.PoolObjectHelper;
  * 3) Additionally, we keep an extra +1 refcount "for the fetching thread". 
That way, if we send
  *    the block to processor, and the latter decrefs it, the block won't be 
evicted when we want
  *    to reuse it for some other RG, forcing us to do an extra disk read or 
cache lookup.
- * 4) As we read (we always read RGs in order, and assume they are stored in 
physical order in the
- *    file, plus that RGs are not shared between streams, AND that we read 
each stream from the
+ * 4) As we read (we always read RGs in order, assume they are stored in 
physical order in the
+ *    file, plus that CBs are not shared between streams, AND that we read 
each stream from the
  *    beginning), we note which blocks cannot possibly be reused anymore (next 
RG starts in the
  *    next CB). We decref for the refcount from (3) in such case.
  * 5) Given that RG end boundary in ORC is an estimate, so we can request data 
from cache and then
  *    not use it, at the end we go thru all the blocks, and release those not 
released by (4).
  * For dictionary case:
- * 1) We have a separate refcount on the ColumnBuffer object we send to the 
processor. In the above
- *    case, it's always 1, so when processor is done it goes directly to 
decrefing cache buffers.
+ * 1) We have a separate refcount on the ColumnBuffer object we send to the 
processor (in the
+ *    non-dictionary case, it's always 1, so when processor is done it goes 
directly to decrefing
+ *    cache buffers).
  * 2) In the dictionary case, it's increased per RG, and processors don't 
touch cache buffers if
- *    they do not happen to decref this counter to 0.
+ *    they do not happen to decref this refcount to 0.
  * 3) This is done because dictionary can have many buffers; decrefing all of 
them for all RGs
  *    is more expensive; plus, decrefing in cache may be more expensive due to 
cache policy/etc.
  */
@@ -123,33 +126,37 @@ public class EncodedReaderImpl implements EncodedReader {
         }
       });
   private final long fileId;
-  private final FSDataInputStream file;
+  private FSDataInputStream file;
   private final CompressionCodec codec;
   private final int bufferSize;
   private final List<OrcProto.Type> types;
-  private final ZeroCopyReaderShim zcr;
+  private ZeroCopyReaderShim zcr;
   private final long rowIndexStride;
   private final LowLevelCache cache;
-  private final ByteBufferAllocatorPool pool;
+  private ByteBufferAllocatorPool pool;
   // For now, one consumer for all calls.
   private final Consumer<OrcEncodedColumnBatch> consumer;
-
+  private final LowLevelCacheCounters qfCounters;
+  private final FileSystem fs;
+  private final Path path;
+  private final boolean useZeroCopy;
 
   public EncodedReaderImpl(FileSystem fileSystem, Path path, long fileId, 
boolean useZeroCopy,
       List<OrcProto.Type> types, CompressionCodec codec, int bufferSize, long 
strideRate,
-      LowLevelCache cache, Consumer<OrcEncodedColumnBatch> consumer)
-          throws IOException {
+      LowLevelCache cache, LowLevelCacheCounters qfCounters,
+      Consumer<OrcEncodedColumnBatch> consumer) throws IOException {
     this.fileId = fileId;
-    this.file = fileSystem.open(path);
+    this.fs = fileSystem;
+    this.path = path;
     this.codec = codec;
     this.types = types;
     this.bufferSize = bufferSize;
-    this.pool = useZeroCopy ? new ByteBufferAllocatorPool() : null;
-    this.zcr = useZeroCopy ? RecordReaderUtils.createZeroCopyShim(file, codec, 
pool) : null;
     this.rowIndexStride = strideRate;
     this.cache = cache;
+    this.qfCounters = qfCounters;
     this.consumer = consumer;
-    if (zcr != null && !cache.getAllocator().isDirectAlloc()) {
+    this.useZeroCopy = useZeroCopy;
+    if (useZeroCopy && !cache.getAllocator().isDirectAlloc()) {
       throw new UnsupportedOperationException("Cannot use zero-copy reader 
with non-direct cache "
           + "buffers; either disable zero-copy or enable direct cache 
allocation");
     }
@@ -271,7 +278,6 @@ public class EncodedReaderImpl implements EncodedReader {
     ColumnReadContext[] colCtxs = new ColumnReadContext[colRgs.length];
     boolean[] includedRgs = null;
     boolean isCompressed = (codec != null);
-    DiskRangeListMutateHelper toRead = null;
     DiskRangeListCreateHelper listToRead = new DiskRangeListCreateHelper();
     boolean hasIndexOnlyCols = false;
     for (OrcProto.Stream stream : streamList) {
@@ -335,27 +341,35 @@ public class EncodedReaderImpl implements EncodedReader {
     }
 
     // 2. Now, read all of the ranges from cache or disk.
-    toRead = new DiskRangeListMutateHelper(listToRead.get());
+    CacheListHelper toRead = new CacheListHelper(listToRead.get());
     if ((DebugUtils.isTraceOrcEnabled() || DebugUtils.isTraceRangesEnabled())
         && LOG.isInfoEnabled()) {
       LOG.info("Resulting disk ranges to read (file " + fileId + "): "
           + RecordReaderUtils.stringifyDiskRanges(toRead.next));
     }
-    cache.getFileData(fileId, toRead.next, stripeOffset, InStream.CC_FACTORY);
+    cache.getFileData(fileId, toRead.next, stripeOffset, InStream.CC_FACTORY, 
qfCounters);
     if ((DebugUtils.isTraceOrcEnabled() || DebugUtils.isTraceRangesEnabled())
         && LOG.isInfoEnabled()) {
       LOG.info("Disk ranges after cache (file " + fileId + ", base offset " + 
stripeOffset
           + "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next));
     }
 
-    // Force direct buffers if we will be decompressing to direct cache.
-    RecordReaderUtils.readDiskRanges(
-        file, zcr, stripeOffset, toRead.next, 
cache.getAllocator().isDirectAlloc());
-
-    if ((DebugUtils.isTraceOrcEnabled() || DebugUtils.isTraceRangesEnabled())
-        && LOG.isInfoEnabled()) {
-      LOG.info("Disk ranges after disk read (file " + fileId + ", base offset 
" + stripeOffset
-            + "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next));
+    if (!toRead.didGetAllData) {
+      long startTime = qfCounters.startTimeCounter();
+      if (this.file == null) {
+        this.file = fs.open(path);
+        this.pool = useZeroCopy ? new ByteBufferAllocatorPool() : null;
+        this.zcr = useZeroCopy ? RecordReaderUtils.createZeroCopyShim(file, 
codec, pool) : null;
+      }
+      // Force direct buffers if we will be decompressing to direct cache.
+      RecordReaderUtils.readDiskRanges(
+          file, zcr, stripeOffset, toRead.next, 
cache.getAllocator().isDirectAlloc());
+      qfCounters.recordHdfsTime(startTime);
+      if ((DebugUtils.isTraceOrcEnabled() || DebugUtils.isTraceRangesEnabled())
+          && LOG.isInfoEnabled()) {
+        LOG.info("Disk ranges after disk read (file " + fileId + ", base 
offset " + stripeOffset
+              + "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next));
+      }
     }
 
     // 3. For uncompressed case, we need some special processing before read.
@@ -366,7 +380,7 @@ public class EncodedReaderImpl implements EncodedReader {
         for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) {
           StreamContext sctx = ctx.streams[streamIx];
           DiskRangeList newIter = InStream.preReadUncompressedStream(fileId, 
stripeOffset,
-              iter, sctx.offset, sctx.offset + sctx.length, zcr, cache);
+              iter, sctx.offset, sctx.offset + sctx.length, zcr, cache, 
qfCounters);
           if (newIter != null) {
             iter = newIter;
           }
@@ -421,7 +435,7 @@ public class EncodedReaderImpl implements EncodedReader {
               long unlockUntilCOffset = sctx.offset + sctx.length;
               DiskRangeList lastCached = InStream.readEncodedStream(fileId, 
stripeOffset, iter,
                   sctx.offset, sctx.offset + sctx.length, zcr, codec, 
bufferSize, cache,
-                  sctx.stripeLevelStream, unlockUntilCOffset, sctx.offset);
+                  sctx.stripeLevelStream, unlockUntilCOffset, sctx.offset, 
qfCounters);
               if (lastCached != null) {
                 iter = lastCached;
               }
@@ -444,7 +458,7 @@ public class EncodedReaderImpl implements EncodedReader {
             boolean isStartOfStream = sctx.bufferIter == null;
             DiskRangeList lastCached = InStream.readEncodedStream(fileId, 
stripeOffset,
                 (isStartOfStream ? iter : sctx.bufferIter), cOffset, 
endCOffset, zcr, codec,
-                bufferSize, cache, cb, unlockUntilCOffset, sctx.offset);
+                bufferSize, cache, cb, unlockUntilCOffset, sctx.offset, 
qfCounters);
             if (lastCached != null) {
               sctx.bufferIter = iter = lastCached; // Reset iter just to 
ensure it's valid
             }
@@ -519,11 +533,13 @@ public class EncodedReaderImpl implements EncodedReader {
 
   @Override
   public void close() throws IOException {
-    try {
-      file.close();
-    } catch (IOException ex) {
-      // Tez might have closed our filesystem. Log and ignore error.
-      LOG.info("Failed to close file; ignoring: " + ex.getMessage());
+    if (file != null) {
+      try {
+        file.close();
+      } catch (IOException ex) {
+        // Tez might have closed our filesystem. Log and ignore error.
+        LOG.info("Failed to close file; ignoring: " + ex.getMessage());
+      }
     }
     if (pool != null) {
       pool.clear();

http://git-wip-us.apache.org/repos/asf/hive/blob/2024c962/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
index b7633ea..d0295d9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
@@ -29,6 +29,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.common.DiskRange;
 import org.apache.hadoop.hive.common.DiskRangeList;
 import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.counters.LowLevelCacheCounters;
 import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch.StreamBuffer;
 import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
 import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
@@ -701,6 +702,7 @@ public abstract class InStream extends InputStream {
    * @param unlockUntilCOffset The offset until which the buffers can be 
unlocked in cache, as
    *                           they will not be used in future calls (see the 
class comment in
    *                           EncodedReaderImpl about refcounts).
+   * @param qfCounters
    * @return Last buffer cached during decomrpession. Cache buffers are never 
removed from
    *         the master list, so they are safe to keep as iterators for 
various streams.
    */
@@ -709,7 +711,7 @@ public abstract class InStream extends InputStream {
   public static DiskRangeList readEncodedStream(long fileId, long baseOffset, 
DiskRangeList start,
       long cOffset, long endCOffset, ZeroCopyReaderShim zcr, CompressionCodec 
codec,
       int bufferSize, LowLevelCache cache, StreamBuffer streamBuffer, long 
unlockUntilCOffset,
-      long streamOffset) throws IOException {
+      long streamOffset, LowLevelCacheCounters qfCounters) throws IOException {
     if (streamBuffer.cacheBuffers == null) {
       streamBuffer.cacheBuffers = new ArrayList<LlapMemoryBuffer>();
     } else {
@@ -784,7 +786,7 @@ public abstract class InStream extends InputStream {
 
     // 6. Finally, put uncompressed data to cache.
     long[] collisionMask = cache.putFileData(
-        fileId, cacheKeys, targetBuffers, baseOffset, Priority.NORMAL);
+        fileId, cacheKeys, targetBuffers, baseOffset, Priority.NORMAL, 
qfCounters);
     processCacheCollisions(
         cache, collisionMask, toDecompress, targetBuffers, 
streamBuffer.cacheBuffers);
 
@@ -892,10 +894,12 @@ public abstract class InStream extends InputStream {
    * to handle just for this case.
    * We could avoid copy in non-zcr case and manage the buffer that was not 
allocated by our
    * allocator. Uncompressed case is not mainline though so let's not 
complicate it.
+   * @param qfCounters
    */
   public static DiskRangeList preReadUncompressedStream(long fileId,
       long baseOffset, DiskRangeList start, long streamOffset, long streamEnd,
-      ZeroCopyReaderShim zcr, LowLevelCache cache) throws IOException {
+      ZeroCopyReaderShim zcr, LowLevelCache cache, LowLevelCacheCounters 
qfCounters)
+          throws IOException {
     if (streamOffset == streamEnd) return null;
     List<UncompressedCacheChunk> toCache = null;
     List<ByteBuffer> toRelease = null;
@@ -1038,7 +1042,7 @@ public abstract class InStream extends InputStream {
 
     // 6. Finally, put uncompressed data to cache.
     long[] collisionMask = cache.putFileData(
-        fileId, cacheKeys, targetBuffers, baseOffset, Priority.NORMAL);
+        fileId, cacheKeys, targetBuffers, baseOffset, Priority.NORMAL, 
qfCounters);
     processCacheCollisions(cache, collisionMask, toCache, targetBuffers, null);
 
     return lastUncompressed;

http://git-wip-us.apache.org/repos/asf/hive/blob/2024c962/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
index fa78703..40675c6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
@@ -48,7 +48,7 @@ public class OrcSplit extends FileSplit implements 
ColumnarSplit {
   private boolean hasBase;
   private final List<Long> deltas = new ArrayList<Long>();
   private OrcFile.WriterVersion writerVersion;
-  private transient Long fileId;
+  private Long fileId;
   private long projColsUncompressedSize;
 
   static final int HAS_FILEID_FLAG = 8;

http://git-wip-us.apache.org/repos/asf/hive/blob/2024c962/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
index 5390c8c..1eb0dec 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 import java.util.List;
 
 import org.apache.hadoop.hive.llap.Consumer;
+import org.apache.hadoop.hive.llap.counters.LowLevelCacheCounters;
 import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
 import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
 import 
org.apache.hadoop.hive.ql.io.orc.EncodedReaderImpl.OrcEncodedColumnBatch;
@@ -318,7 +319,8 @@ public interface Reader {
   MetadataReader metadata() throws IOException;
 
   EncodedReader encodedReader(long fileId, LowLevelCache lowLevelCache,
-      Consumer<OrcEncodedColumnBatch> consumer) throws IOException;
+      LowLevelCacheCounters qfCounters, Consumer<OrcEncodedColumnBatch> 
consumer)
+          throws IOException;
 
   List<Integer> getVersionList();
 

http://git-wip-us.apache.org/repos/asf/hive/blob/2024c962/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
index eb349da..8041ba8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.DiskRange;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.Consumer;
+import org.apache.hadoop.hive.llap.counters.LowLevelCacheCounters;
 import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
 import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
 import 
org.apache.hadoop.hive.ql.io.orc.EncodedReaderImpl.OrcEncodedColumnBatch;
@@ -716,10 +717,11 @@ public class ReaderImpl implements Reader {
 
   @Override
   public EncodedReader encodedReader(long fileId, LowLevelCache lowLevelCache,
-      Consumer<OrcEncodedColumnBatch> consumer) throws IOException {
+      LowLevelCacheCounters qfCounters, Consumer<OrcEncodedColumnBatch> 
consumer)
+          throws IOException {
     boolean useZeroCopy = (conf != null) && (HiveConf.getBoolVar(conf, 
HIVE_ORC_ZEROCOPY));
     return new EncodedReaderImpl(fileSystem, path, fileId, useZeroCopy, types,
-        codec, bufferSize, rowIndexStride, lowLevelCache, consumer);
+        codec, bufferSize, rowIndexStride, lowLevelCache, qfCounters, 
consumer);
   }
 
   @Override

Reply via email to