HIVE-15665 : LLAP: OrcFileMetadata objects in cache can impact heap usage 
(Sergey Shelukhin, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/50fb6f3c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/50fb6f3c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/50fb6f3c

Branch: refs/heads/hive-14535
Commit: 50fb6f3cb4651be5a1e0c0bae1f59f193f2c7e09
Parents: 4a4ae12
Author: sergey <ser...@apache.org>
Authored: Fri Sep 15 12:09:11 2017 -0700
Committer: sergey <ser...@apache.org>
Committed: Fri Sep 15 12:09:11 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   6 -
 .../hive/llap/cache/EvictionDispatcher.java     |  27 +-
 .../hive/llap/io/api/impl/LlapIoImpl.java       |  42 +-
 .../llap/io/decode/OrcColumnVectorProducer.java |   6 +-
 .../llap/io/encoded/OrcEncodedDataReader.java   | 345 +++++++-----
 .../hive/llap/io/metadata/MetadataCache.java    | 537 +++++++++++++++++++
 .../llap/io/metadata/OrcFileEstimateErrors.java |   5 +-
 .../hive/llap/io/metadata/OrcFileMetadata.java  | 114 +---
 .../hive/llap/io/metadata/OrcMetadataCache.java | 163 ------
 .../llap/io/metadata/OrcStripeMetadata.java     | 123 +----
 .../io/metadata/ParquetMetadataCacheImpl.java   | 353 ------------
 .../TestIncrementalObjectSizeEstimator.java     |   4 +-
 .../hive/llap/cache/TestOrcMetadataCache.java   | 159 ++++--
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   |  11 +
 .../hive/ql/io/orc/encoded/EncodedReader.java   |  14 +
 .../ql/io/orc/encoded/EncodedReaderImpl.java    | 534 ++++++++++++++----
 .../hadoop/hive/ql/io/orc/encoded/Reader.java   |   4 +
 .../hive/ql/io/orc/encoded/ReaderImpl.java      |  11 +-
 .../clientpositive/llap/orc_llap_counters.q.out |  98 ++--
 .../llap/orc_llap_counters1.q.out               |  11 +-
 .../clientpositive/llap/orc_ppd_basic.q.out     | 147 ++---
 .../llap/orc_ppd_schema_evol_3a.q.out           | 156 +++---
 .../hive/common/io/FileMetadataCache.java       |   8 +-
 23 files changed, 1616 insertions(+), 1262 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/50fb6f3c/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index b22a834..8a906ce 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3010,12 +3010,6 @@ public class HiveConf extends Configuration {
     LLAP_ALLOCATOR_MAX_ALLOC("hive.llap.io.allocator.alloc.max", "16Mb", new 
SizeValidator(),
         "Maximum allocation possible from LLAP buddy allocator. For ORC, 
should be as large as\n" +
         "the largest expected ORC compression buffer size. Must be a power of 
2."),
-    @Deprecated
-    LLAP_IO_METADATA_FRACTION("hive.llap.io.metadata.fraction", 0.1f,
-        "Temporary setting for on-heap metadata cache fraction of xmx, set to 
avoid potential\n" +
-        "heap problems on very large datasets when on-heap metadata cache 
takes over\n" +
-        "everything. -1 managed metadata and data together (which is more 
flexible). This\n" +
-        "setting will be removed (in effect become -1) once ORC metadata cache 
is moved off-heap."),
     LLAP_ALLOCATOR_ARENA_COUNT("hive.llap.io.allocator.arena.count", 8,
         "Arena count for LLAP low-level cache; cache will be allocated in the 
steps of\n" +
         "(size/arena_count) bytes. This size must be <= 1Gb and >= max 
allocation; if it is\n" +

http://git-wip-us.apache.org/repos/asf/hive/blob/50fb6f3c/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java
index c5248ce..b226906 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java
@@ -19,11 +19,8 @@ package org.apache.hadoop.hive.llap.cache;
 
 import 
org.apache.hadoop.hive.llap.cache.SerDeLowLevelCacheImpl.LlapSerDeDataBuffer;
 import org.apache.hadoop.hive.llap.io.metadata.OrcFileEstimateErrors;
-import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata;
-import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache;
-import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata;
-import org.apache.hadoop.hive.llap.io.metadata.ParquetMetadataCacheImpl;
-import 
org.apache.hadoop.hive.llap.io.metadata.ParquetMetadataCacheImpl.LlapFileMetadataBuffer;
+import org.apache.hadoop.hive.llap.io.metadata.MetadataCache;
+import 
org.apache.hadoop.hive.llap.io.metadata.MetadataCache.LlapMetadataBuffer;
 
 /**
  * Eviction dispatcher - uses double dispatch to route eviction notifications 
to correct caches.
@@ -31,17 +28,13 @@ import 
org.apache.hadoop.hive.llap.io.metadata.ParquetMetadataCacheImpl.LlapFile
 public final class EvictionDispatcher implements EvictionListener, 
LlapOomDebugDump {
   private final LowLevelCache dataCache;
   private final SerDeLowLevelCacheImpl serdeCache;
-  private final OrcMetadataCache metadataCache;
+  private final MetadataCache metadataCache;
   private final EvictionAwareAllocator allocator;
-  // TODO# temporary, will be merged with OrcMetadataCache after HIVE-15665.
-  private final ParquetMetadataCacheImpl parquetMetadataCache;
 
   public EvictionDispatcher(LowLevelCache dataCache, SerDeLowLevelCacheImpl 
serdeCache,
-      OrcMetadataCache metadataCache, EvictionAwareAllocator allocator,
-      ParquetMetadataCacheImpl parquetMetadataCache) {
+      MetadataCache metadataCache, EvictionAwareAllocator allocator) {
     this.dataCache = dataCache;
     this.metadataCache = metadataCache;
-    this.parquetMetadataCache = parquetMetadataCache;
     this.serdeCache = serdeCache;
     this.allocator = allocator;
   }
@@ -51,10 +44,6 @@ public final class EvictionDispatcher implements 
EvictionListener, LlapOomDebugD
     buffer.notifyEvicted(this); // This will call one of the specific 
notifyEvicted overloads.
   }
 
-  public void notifyEvicted(LlapFileMetadataBuffer buffer) {
-    this.parquetMetadataCache.notifyEvicted(buffer);
-  }
-
   public void notifyEvicted(LlapSerDeDataBuffer buffer) {
     serdeCache.notifyEvicted(buffer);
     allocator.deallocateEvicted(buffer);
@@ -65,12 +54,10 @@ public final class EvictionDispatcher implements 
EvictionListener, LlapOomDebugD
     allocator.deallocateEvicted(buffer);
   }
 
-  public void notifyEvicted(OrcFileMetadata buffer) {
-    metadataCache.notifyEvicted(buffer);
-  }
-
-  public void notifyEvicted(OrcStripeMetadata buffer) {
+  public void notifyEvicted(LlapMetadataBuffer<?> buffer) {
     metadataCache.notifyEvicted(buffer);
+    // Note: the metadata cache may deallocate additional buffers, but not 
this one.
+    allocator.deallocateEvicted(buffer);
   }
 
   public void notifyEvicted(OrcFileEstimateErrors buffer) {

http://git-wip-us.apache.org/repos/asf/hive/blob/50fb6f3c/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
index f42622b..77c8ade 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
@@ -28,7 +28,6 @@ import java.util.concurrent.TimeUnit;
 
 import javax.management.ObjectName;
 
-import org.apache.hadoop.hive.llap.daemon.impl.LlapDaemon;
 import org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,8 +60,7 @@ import org.apache.hadoop.hive.llap.io.api.LlapIo;
 import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer;
 import org.apache.hadoop.hive.llap.io.decode.GenericColumnVectorProducer;
 import org.apache.hadoop.hive.llap.io.decode.OrcColumnVectorProducer;
-import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache;
-import org.apache.hadoop.hive.llap.io.metadata.ParquetMetadataCacheImpl;
+import org.apache.hadoop.hive.llap.io.metadata.MetadataCache;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics;
 import org.apache.hadoop.hive.llap.metrics.MetricsUtils;
@@ -72,8 +70,6 @@ import org.apache.hadoop.hive.ql.io.orc.encoded.IoTrace;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
 import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hive.common.util.FixedSizedObjectPool;
@@ -132,7 +128,7 @@ public class LlapIoImpl implements 
LlapIo<VectorizedRowBatch> {
     LOG.info("Started llap daemon metrics with displayName: {} sessionId: {}", 
displayName,
         sessionId);
 
-    OrcMetadataCache metadataCache = null;
+    MetadataCache metadataCache = null;
     SerDeLowLevelCacheImpl serdeCache = null; // TODO: extract interface when 
needed
     BufferUsageManager bufferManagerOrc = null, bufferManagerGeneric = null;
     boolean isEncodeEnabled = HiveConf.getBoolVar(conf, 
ConfVars.LLAP_IO_ENCODE_ENABLED);
@@ -141,30 +137,12 @@ public class LlapIoImpl implements 
LlapIo<VectorizedRowBatch> {
       boolean useLrfu = HiveConf.getBoolVar(conf, 
HiveConf.ConfVars.LLAP_USE_LRFU);
       long totalMemorySize = HiveConf.getSizeVar(conf, 
ConfVars.LLAP_IO_MEMORY_MAX_SIZE);
       int minAllocSize = (int)HiveConf.getSizeVar(conf, 
ConfVars.LLAP_ALLOCATOR_MIN_ALLOC);
-      float metadataFraction = HiveConf.getFloatVar(conf, 
ConfVars.LLAP_IO_METADATA_FRACTION);
-      long metaMem = 0;
-      // TODO: this split a workaround until HIVE-15665.
-      //       Technically we don't have to do it for on-heap data cache but 
we'd do for testing.
-      boolean isSplitCache = metadataFraction > 0f;
-      if (isSplitCache) {
-        metaMem = (long)(LlapDaemon.getTotalHeapSize() * metadataFraction);
-      }
       LowLevelCachePolicy cachePolicy = useLrfu ? new LowLevelLrfuCachePolicy(
           minAllocSize, totalMemorySize, conf) : new LowLevelFifoCachePolicy();
       // Allocator uses memory manager to request memory, so create the 
manager next.
       LowLevelCacheMemoryManager memManager = new LowLevelCacheMemoryManager(
           totalMemorySize, cachePolicy, cacheMetrics);
-      LowLevelCachePolicy metaCachePolicy = null;
-      LowLevelCacheMemoryManager metaMemManager = null;
-      if (isSplitCache) {
-        metaCachePolicy = useLrfu ? new LowLevelLrfuCachePolicy(
-            minAllocSize, metaMem, conf) : new LowLevelFifoCachePolicy();
-        metaMemManager = new LowLevelCacheMemoryManager(metaMem, 
metaCachePolicy, cacheMetrics);
-      } else {
-        metaCachePolicy = cachePolicy;
-        metaMemManager = memManager;
-      }
-      cacheMetrics.setCacheCapacityTotal(totalMemorySize + metaMem);
+      cacheMetrics.setCacheCapacityTotal(totalMemorySize);
       // Cache uses allocator to allocate and deallocate, create allocator and 
then caches.
       BuddyAllocator allocator = new BuddyAllocator(conf, memManager, 
cacheMetrics);
       this.allocator = allocator;
@@ -179,18 +157,12 @@ public class LlapIoImpl implements 
LlapIo<VectorizedRowBatch> {
       }
 
       boolean useGapCache = HiveConf.getBoolVar(conf, 
ConfVars.LLAP_CACHE_ENABLE_ORC_GAP_CACHE);
-      metadataCache = new OrcMetadataCache(metaMemManager, metaCachePolicy, 
useGapCache);
-      // TODO# temporary, see comments there
-      ParquetMetadataCacheImpl parquetMc = new ParquetMetadataCacheImpl(
-          allocator, memManager, cachePolicy, cacheMetrics);
-      fileMetadataCache = parquetMc;
+      metadataCache = new MetadataCache(
+          allocator, memManager, cachePolicy, useGapCache, cacheMetrics);
+      fileMetadataCache = metadataCache;
       // And finally cache policy uses cache to notify it of eviction. The 
cycle is complete!
       EvictionDispatcher e = new EvictionDispatcher(
-          dataCache, serdeCache, metadataCache, allocator, parquetMc);
-      if (isSplitCache) {
-        metaCachePolicy.setEvictionListener(e);
-        metaCachePolicy.setParentDebugDumper(e);
-      }
+          dataCache, serdeCache, metadataCache, allocator);
       cachePolicy.setEvictionListener(e);
       cachePolicy.setParentDebugDumper(e);
 

http://git-wip-us.apache.org/repos/asf/hive/blob/50fb6f3c/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
index 6edd84b..373af76 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
@@ -33,7 +33,7 @@ import 
org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
 import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
 import org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader;
-import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache;
+import org.apache.hadoop.hive.llap.io.metadata.MetadataCache;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
@@ -52,7 +52,7 @@ import org.apache.orc.OrcConf;
 
 public class OrcColumnVectorProducer implements ColumnVectorProducer {
 
-  private final OrcMetadataCache metadataCache;
+  private final MetadataCache metadataCache;
   private final LowLevelCache lowLevelCache;
   private final BufferUsageManager bufferManager;
   private final Configuration conf;
@@ -63,7 +63,7 @@ public class OrcColumnVectorProducer implements 
ColumnVectorProducer {
   // TODO: should this rather use a threadlocal for NUMA affinity?
   private final FixedSizedObjectPool<IoTrace> tracePool;
 
-  public OrcColumnVectorProducer(OrcMetadataCache metadataCache,
+  public OrcColumnVectorProducer(MetadataCache metadataCache,
       LowLevelCache lowLevelCache, BufferUsageManager bufferManager,
       Configuration conf, LlapDaemonCacheMetrics cacheMetrics, 
LlapDaemonIOMetrics ioMetrics,
       FixedSizedObjectPool<IoTrace> tracePool) {

http://git-wip-us.apache.org/repos/asf/hive/blob/50fb6f3c/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index b5db302..2e47a56 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -26,11 +26,21 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
+import org.apache.orc.CompressionCodec;
+import org.apache.orc.OrcProto.BloomFilterIndex;
+import org.apache.orc.OrcProto.FileTail;
+import org.apache.orc.OrcProto.RowIndex;
+import org.apache.orc.OrcProto.Stream;
+import org.apache.orc.OrcProto.StripeStatistics;
 import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.BufferChunk;
 import org.apache.orc.impl.DataReaderProperties;
+import org.apache.orc.impl.InStream;
 import org.apache.orc.impl.OrcIndex;
 import org.apache.orc.impl.OrcTail;
+import org.apache.orc.impl.ReaderImpl;
 import org.apache.orc.impl.SchemaEvolution;
+import org.apache.orc.impl.WriterImpl;
 import org.apache.tez.common.counters.TezCounters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,7 +68,8 @@ import 
org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
 import org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer;
 import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata;
-import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache;
+import org.apache.hadoop.hive.llap.io.metadata.MetadataCache;
+import 
org.apache.hadoop.hive.llap.io.metadata.MetadataCache.LlapBufferOrBuffers;
 import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata;
 import org.apache.hadoop.hive.ql.exec.DDLTask;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -87,6 +98,8 @@ import org.apache.hive.common.util.FixedSizedObjectPool;
 import org.apache.orc.OrcProto;
 import org.apache.tez.common.CallableWithNdc;
 
+import com.google.common.collect.Lists;
+
 /**
  * This produces EncodedColumnBatch via ORC EncodedDataImpl.
  * It serves as Consumer for EncodedColumnBatch too, for the high-level cache 
scenario where
@@ -135,7 +148,7 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
     }
   };
 
-  private final OrcMetadataCache metadataCache;
+  private final MetadataCache metadataCache;
   private final LowLevelCache lowLevelCache;
   private final BufferUsageManager bufferManager;
   private final Configuration daemonConf, jobConf;
@@ -153,8 +166,10 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
   private OrcFileMetadata fileMetadata;
   private Path path;
   private Reader orcReader;
-  private DataReader metadataReader;
+  private DataReader rawDataReader;
+  private boolean isRawDataReaderOpen = false;
   private EncodedReader stripeReader;
+  private CompressionCodec codec;
   private Object fileKey;
   private FileSystem fs;
   /**
@@ -166,12 +181,12 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
   @SuppressWarnings("unused")
   private volatile boolean isPaused = false;
 
-  boolean[] globalIncludes = null;
+  boolean[] globalIncludes = null, sargColumns = null;
   private final IoTrace trace;
   private Pool<IoTrace> tracePool;
 
   public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager 
bufferManager,
-      OrcMetadataCache metadataCache, Configuration daemonConf, Configuration 
jobConf,
+      MetadataCache metadataCache, Configuration daemonConf, Configuration 
jobConf,
       FileSplit split, List<Integer> columnIds, SearchArgument sarg, String[] 
columnNames,
       OrcEncodedDataConsumer consumer, QueryFragmentCounters counters,
       TypeDescription readerSchema, Pool<IoTrace> tracePool)
@@ -206,7 +221,7 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
     fileKey = determineFileId(fs, split,
         HiveConf.getBoolVar(daemonConf, 
ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID),
         HiveConf.getBoolVar(daemonConf, 
ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID));
-    fileMetadata = getOrReadFileMetadata();
+    fileMetadata = getFileFooterFromCacheOrDisk();
     if (readerSchema == null) {
       readerSchema = fileMetadata.getSchema();
     }
@@ -288,7 +303,6 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
     // 3. Apply SARG if needed, and otherwise determine what RGs to read.
     int stride = fileMetadata.getRowIndexStride();
     ArrayList<OrcStripeMetadata> stripeMetadatas = null;
-    boolean[] sargColumns = null;
     try {
       if (sarg != null && stride != 0) {
         // TODO: move this to a common method
@@ -307,7 +321,7 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
         stripeMetadatas = readStripesMetadata(globalIncludes, sargColumns);
       }
 
-      // Now, apply SARG if any; w/o sarg, this will just initialize readState.
+      // Now, apply SARG if any; w/o sarg, this will just initialize stripeRgs.
       boolean hasData = determineRgsToRead(globalIncludes, stride, 
stripeMetadatas);
       if (!hasData) {
         consumer.setDone();
@@ -327,11 +341,7 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
 
     // 4. Create encoded data reader.
     try {
-      ensureOrcReader();
-      // Reader creating updates HDFS counters, don't do it here.
-      DataWrapperForOrc dw = new DataWrapperForOrc();
-      stripeReader = orcReader.encodedReader(fileKey, dw, dw, POOL_FACTORY, 
trace);
-      stripeReader.setTracing(LlapIoImpl.ORC_LOGGER.isTraceEnabled());
+      ensureDataReader();
     } catch (Throwable t) {
       handleReaderError(startTime, t);
       return null;
@@ -349,16 +359,15 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
       int stripeIx = stripeIxFrom + stripeIxMod;
       boolean[] rgs = null;
       OrcStripeMetadata stripeMetadata = null;
-      StripeInformation stripe;
+      StripeInformation si;
       try {
-        stripe = fileMetadata.getStripes().get(stripeIx);
-
-        LlapIoImpl.ORC_LOGGER.trace("Reading stripe {}: {}, {}", stripeIx, 
stripe.getOffset(),
-            stripe.getLength());
-        trace.logReadingStripe(stripeIx, stripe.getOffset(), 
stripe.getLength());
+        si = fileMetadata.getStripes().get(stripeIx);
+        LlapIoImpl.ORC_LOGGER.trace("Reading stripe {}: {}, {}", stripeIx, 
si.getOffset(),
+            si.getLength());
+        trace.logReadingStripe(stripeIx, si.getOffset(), si.getLength());
         rgs = stripeRgs[stripeIxMod];
         if (LlapIoImpl.ORC_LOGGER.isTraceEnabled()) {
-          LlapIoImpl.ORC_LOGGER.trace("readState[{}]: {}", stripeIxMod, 
Arrays.toString(rgs));
+          LlapIoImpl.ORC_LOGGER.trace("stripeRgs[{}]: {}", stripeIxMod, 
Arrays.toString(rgs));
         }
         // We assume that NO_RGS value is only set from SARG filter and for 
all columns;
         // intermediate changes for individual columns will unset values in 
the array.
@@ -367,47 +376,18 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
         if (rgs == RecordReaderImpl.SargApplier.READ_NO_RGS) continue;
 
         // 6.2. Ensure we have stripe metadata. We might have read it before 
for RG filtering.
-        boolean isFoundInCache = false;
         if (stripeMetadatas != null) {
           stripeMetadata = stripeMetadatas.get(stripeIxMod);
         } else {
-          if (hasFileId && metadataCache != null) {
-            stripeKey.stripeIx = stripeIx;
-            stripeMetadata = metadataCache.getStripeMetadata(stripeKey);
-          }
-          isFoundInCache = (stripeMetadata != null);
-          if (!isFoundInCache) {
-            counters.incrCounter(LlapIOCounters.METADATA_CACHE_MISS);
-            ensureMetadataReader();
-            long startTimeHdfs = counters.startTimeCounter();
-            stripeMetadata = new OrcStripeMetadata(new OrcBatchKey(fileKey, 
stripeIx, 0),
-                metadataReader, stripe, globalIncludes, sargColumns,
-                orcReader.getSchema(), orcReader.getWriterVersion());
-            counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, 
startTimeHdfs);
-            if (hasFileId && metadataCache != null) {
-              OrcStripeMetadata newMetadata = 
metadataCache.putStripeMetadata(stripeMetadata);
-              isFoundInCache = newMetadata != stripeMetadata; // May be cached 
concurrently.
-              stripeMetadata = newMetadata;
-              if (LlapIoImpl.ORC_LOGGER.isTraceEnabled()) {
-                LlapIoImpl.ORC_LOGGER.trace("Caching stripe {} metadata with 
includes: {}",
-                    stripeKey.stripeIx, DebugUtils.toString(globalIncludes));
-              }
-            }
-          }
+          stripeKey.stripeIx = stripeIx;
+          OrcProto.StripeFooter footer = getStripeFooterFromCacheOrDisk(si, 
stripeKey);
+          stripeMetadata = createOrcStripeMetadataObject(
+              stripeIx, si, footer, globalIncludes, sargColumns);
+          ensureDataReader();
+          stripeReader.readIndexStreams(stripeMetadata.getIndex(),
+              si, footer.getStreamsList(), globalIncludes, sargColumns);
           consumer.setStripeMetadata(stripeMetadata);
         }
-        if (!stripeMetadata.hasAllIndexes(globalIncludes)) {
-          if (LlapIoImpl.ORC_LOGGER.isTraceEnabled()) {
-            LlapIoImpl.ORC_LOGGER.trace("Updating indexes in stripe {} 
metadata for includes: {}",
-                stripeKey.stripeIx, DebugUtils.toString(globalIncludes));
-          }
-          assert isFoundInCache; // If it's not fresh from the cache, indexes 
should be present.
-          counters.incrCounter(LlapIOCounters.METADATA_CACHE_MISS);
-          ensureMetadataReader();
-          updateLoadedIndexes(stripeMetadata, stripe, globalIncludes, 
sargColumns);
-        } else if (isFoundInCache) {
-          counters.incrCounter(LlapIOCounters.METADATA_CACHE_HIT);
-        }
       } catch (Throwable t) {
         handleReaderError(startTime, t);
         return null;
@@ -424,7 +404,7 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
         // consumer. It is potentially holding locked buffers, and must 
perform its own cleanup.
         // Also, currently readEncodedColumns is not stoppable. The consumer 
will discard the
         // data it receives for one stripe. We could probably interrupt it, if 
it checked that.
-        stripeReader.readEncodedColumns(stripeIx, stripe, 
stripeMetadata.getRowIndexes(),
+        stripeReader.readEncodedColumns(stripeIx, si, 
stripeMetadata.getRowIndexes(),
             stripeMetadata.getEncodings(), stripeMetadata.getStreams(), 
globalIncludes,
             rgs, consumer);
       } catch (Throwable t) {
@@ -452,6 +432,14 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
     tracePool.offer(trace);
   }
 
+  private void ensureDataReader() throws IOException {
+    ensureOrcReader();
+    // Reader creation updates HDFS counters, don't do it here.
+    DataWrapperForOrc dw = new DataWrapperForOrc();
+    stripeReader = orcReader.encodedReader(fileKey, dw, dw, POOL_FACTORY, 
trace);
+    stripeReader.setTracing(LlapIoImpl.ORC_LOGGER.isTraceEnabled());
+  }
+
   private void recordReaderTime(long startTime) {
     counters.incrTimeCounter(LlapIOCounters.TOTAL_IO_TIME_NS, startTime);
   }
@@ -544,22 +532,6 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
   }
 
   /**
-   * In case if stripe metadata in cache does not have all indexes for current 
query, load
-   * the missing one. This is a temporary cludge until real metadata cache 
becomes available.
-   */
-  private void updateLoadedIndexes(OrcStripeMetadata stripeMetadata,
-      StripeInformation stripe, boolean[] stripeIncludes, boolean[] 
sargColumns) throws IOException {
-    // We only synchronize on write for now - design of metadata cache is very 
temporary;
-    // we pre-allocate the array and never remove entries; so readers should 
be safe.
-    synchronized (stripeMetadata) {
-      if (stripeMetadata.hasAllIndexes(stripeIncludes)) return;
-      long startTime = counters.startTimeCounter();
-      stripeMetadata.loadMissingIndexes(metadataReader, stripe, 
stripeIncludes, sargColumns);
-      counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime);
-    }
-  }
-
-  /**
    * Closes the stripe readers (on error).
    */
   private void cleanupReaders() {
@@ -570,9 +542,9 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
         // Ignore.
       }
     }
-    if (metadataReader != null) {
+    if (rawDataReader != null && isRawDataReaderOpen) {
       try {
-        metadataReader.close();
+        rawDataReader.close();
       } catch (IOException ex) {
         // Ignore.
       }
@@ -604,90 +576,194 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
   }
 
   /**
+   * Ensure codec is created for the split, to decode values from cache. Can 
only be called
+   * after initializing fileMetadata.
+   */
+  private void ensureCodecFromFileMetadata() {
+    if (codec != null) return;
+    codec = WriterImpl.createCodec(fileMetadata.getCompressionKind());
+  }
+
+  /**
    *  Gets file metadata for the split from cache, or reads it from the file.
    */
-  private OrcFileMetadata getOrReadFileMetadata() throws IOException {
-    OrcFileMetadata metadata = null;
-    if (fileKey != null && metadataCache != null) {
-      metadata = metadataCache.getFileMetadata(fileKey);
-      if (metadata != null) {
-        counters.incrCounter(LlapIOCounters.METADATA_CACHE_HIT);
-        return metadata;
-      } else {
-        counters.incrCounter(LlapIOCounters.METADATA_CACHE_MISS);
+  private OrcFileMetadata getFileFooterFromCacheOrDisk() throws IOException {
+    LlapBufferOrBuffers tailBuffers = null;
+    List<StripeStatistics> stats = null;
+    List<StripeInformation> stripes = null;
+    boolean hasCache = fileKey != null && metadataCache != null;
+    if (hasCache) {
+      tailBuffers = metadataCache.getFileMetadata(fileKey);
+      if (tailBuffers != null) {
+        try {
+          MemoryBuffer tailBuffer = tailBuffers.getSingleBuffer();
+          ByteBuffer bb = null;
+          if (tailBuffer != null) {
+            bb = tailBuffer.getByteBufferDup();
+            // TODO: remove the copy after ORC-158 and ORC-197
+            // if (bb.isDirect()) {
+              ByteBuffer dupBb = tailBuffer.getByteBufferDup(); // Don't mess 
with the cached object.
+              bb = ByteBuffer.allocate(dupBb.remaining());
+              bb.put(dupBb);
+              bb.flip();
+            // }
+          } else {
+            // TODO: add the ability to extractFileTail to read from multiple 
buffers?
+            MemoryBuffer[] tailBufferArray = tailBuffers.getMultipleBuffers();
+            int totalSize = 0;
+            for (MemoryBuffer buf : tailBufferArray) {
+              totalSize += buf.getByteBufferRaw().remaining();
+            }
+            bb = ByteBuffer.allocate(totalSize);
+            for (MemoryBuffer buf : tailBufferArray) {
+              bb.put(buf.getByteBufferDup());
+            }
+            bb.flip();
+          }
+          OrcTail orcTail = ReaderImpl.extractFileTail(bb);
+          FileTail tail = orcTail.getFileTail();
+          stats = orcTail.getStripeStatisticsProto();
+          stripes = new ArrayList<>(tail.getFooter().getStripesCount());
+          for (OrcProto.StripeInformation stripeProto : 
tail.getFooter().getStripesList()) {
+            stripes.add(new ReaderImpl.StripeInformationImpl(stripeProto));
+          }
+          return new OrcFileMetadata(
+              fileKey, tail.getFooter(), tail.getPostscript(), stats, stripes);
+        } finally {
+          // We don't need the buffer anymore.
+          metadataCache.decRefBuffer(tailBuffers);
+          counters.incrCounter(LlapIOCounters.METADATA_CACHE_HIT);
+        }
       }
+      counters.incrCounter(LlapIOCounters.METADATA_CACHE_MISS);
     }
     ensureOrcReader();
-    // We assume this call doesn't touch HDFS because everything is already 
read; don't add time.
-    metadata = new OrcFileMetadata(fileKey, orcReader);
-    if (fileKey == null || metadataCache == null) return metadata;
-    return metadataCache.putFileMetadata(metadata);
+    ByteBuffer tailBufferBb = orcReader.getSerializedFileFooter();
+    if (hasCache) {
+      tailBuffers = metadataCache.putFileMetadata(fileKey, tailBufferBb);
+      metadataCache.decRefBuffer(tailBuffers); // We don't use the cache's 
copy of the buffer.
+    }
+    FileTail ft = orcReader.getFileTail();
+    return new OrcFileMetadata(fileKey, ft.getFooter(), ft.getPostscript(),
+        orcReader.getOrcProtoStripeStatistics(), orcReader.getStripes());
+  }
+
+  private OrcProto.StripeFooter buildStripeFooter(
+      List<DiskRange> bcs, int len, CompressionCodec codec, int bufferSize) 
throws IOException {
+    return OrcProto.StripeFooter.parseFrom(InStream.createCodedInputStream(
+        "footer", bcs, len, codec, bufferSize));
   }
 
   /**
    * Reads the metadata for all stripes in the file.
    */
   private ArrayList<OrcStripeMetadata> readStripesMetadata(
-      boolean[] globalInc, boolean[] sargColumns) throws IOException {
+      boolean[] includes, boolean[] sargColumns) throws IOException {
     ArrayList<OrcStripeMetadata> result = new 
ArrayList<OrcStripeMetadata>(stripeRgs.length);
     boolean hasFileId = this.fileKey != null;
     OrcBatchKey stripeKey = hasFileId ? new OrcBatchKey(fileKey, 0, 0) : null;
     for (int stripeIxMod = 0; stripeIxMod < stripeRgs.length; ++stripeIxMod) {
-      OrcStripeMetadata value = null;
       int stripeIx = stripeIxMod + stripeIxFrom;
-      if (hasFileId && metadataCache != null) {
-        stripeKey.stripeIx = stripeIx;
-        value = metadataCache.getStripeMetadata(stripeKey);
-      }
-      if (value == null || !value.hasAllIndexes(globalInc)) {
-        counters.incrCounter(LlapIOCounters.METADATA_CACHE_MISS);
-        ensureMetadataReader();
-        StripeInformation si = fileMetadata.getStripes().get(stripeIx);
-        if (value == null) {
-          long startTime = counters.startTimeCounter();
-          value = new OrcStripeMetadata(new OrcBatchKey(fileKey, stripeIx, 0),
-              metadataReader, si, globalInc, sargColumns, 
orcReader.getSchema(),
-              orcReader.getWriterVersion());
-          counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime);
-          if (hasFileId && metadataCache != null) {
-            value = metadataCache.putStripeMetadata(value);
-            if (LlapIoImpl.ORC_LOGGER.isTraceEnabled()) {
-              LlapIoImpl.ORC_LOGGER.trace("Caching stripe {} metadata with 
includes: {}",
-                  stripeKey.stripeIx, DebugUtils.toString(globalInc));
+      stripeKey.stripeIx = stripeIx;
+      StripeInformation si = fileMetadata.getStripes().get(stripeIx);
+      OrcProto.StripeFooter footer = getStripeFooterFromCacheOrDisk(si, 
stripeKey);
+      OrcStripeMetadata osm = createOrcStripeMetadataObject(
+          stripeIx, si, footer, includes, sargColumns);
+
+      ensureDataReader();
+      OrcIndex index = osm.getIndex();
+      stripeReader.readIndexStreams(index, si, footer.getStreamsList(), 
includes, sargColumns);
+      result.add(osm);
+      consumer.setStripeMetadata(osm);
+    }
+    return result;
+  }
+
+  private OrcStripeMetadata createOrcStripeMetadataObject(int stripeIx, 
StripeInformation si,
+      OrcProto.StripeFooter footer, boolean[] includes, boolean[] sargColumns) 
throws IOException {
+    Stream.Kind[] bks = sargColumns == null ? null : new 
Stream.Kind[includes.length];
+    BloomFilterIndex[] bis = sargColumns == null ? null : new 
BloomFilterIndex[includes.length];
+    return new OrcStripeMetadata(new OrcBatchKey(fileKey, stripeIx, 0), footer,
+        new OrcIndex(new RowIndex[includes.length], bks, bis), si);
+  }
+
+  private OrcProto.StripeFooter getStripeFooterFromCacheOrDisk(
+      StripeInformation si, OrcBatchKey stripeKey) throws IOException {
+    boolean hasCache = fileKey != null && metadataCache != null;
+    if (hasCache) {
+      LlapBufferOrBuffers footerBuffers = 
metadataCache.getStripeTail(stripeKey);
+      if (footerBuffers != null) {
+        try {
+          counters.incrCounter(LlapIOCounters.METADATA_CACHE_HIT);
+          ensureCodecFromFileMetadata();
+          MemoryBuffer footerBuffer = footerBuffers.getSingleBuffer();
+          if (footerBuffer != null) {
+            ByteBuffer bb = footerBuffer.getByteBufferDup();
+            return buildStripeFooter(Lists.<DiskRange>newArrayList(new 
BufferChunk(bb, 0)),
+                bb.remaining(), codec, 
fileMetadata.getCompressionBufferSize());
+          } else {
+            MemoryBuffer[] footerBufferArray = 
footerBuffers.getMultipleBuffers();
+            int pos = 0;
+            List<DiskRange> bcs = new ArrayList<>(footerBufferArray.length);
+            for (MemoryBuffer buf : footerBufferArray) {
+              ByteBuffer bb = buf.getByteBufferDup();
+              bcs.add(new BufferChunk(bb, pos));
+              pos += bb.remaining();
             }
+            return buildStripeFooter(bcs, pos, codec, 
fileMetadata.getCompressionBufferSize());
           }
+        } finally {
+          metadataCache.decRefBuffer(footerBuffers);
         }
-        // We might have got an old value from cache; recheck it has indexes.
-        if (!value.hasAllIndexes(globalInc)) {
-          if (LlapIoImpl.ORC_LOGGER.isTraceEnabled()) {
-            LlapIoImpl.ORC_LOGGER.trace("Updating indexes in stripe {} 
metadata for includes: {}",
-                stripeKey.stripeIx, DebugUtils.toString(globalInc));
-          }
-          updateLoadedIndexes(value, si, globalInc, sargColumns);
-        }
-      } else {
-        counters.incrCounter(LlapIOCounters.METADATA_CACHE_HIT);
       }
-      result.add(value);
-      consumer.setStripeMetadata(value);
+      counters.incrCounter(LlapIOCounters.METADATA_CACHE_MISS);
     }
-    return result;
+    long offset = si.getOffset() + si.getIndexLength() + si.getDataLength();
+    long startTime = counters.startTimeCounter();
+    ensureRawDataReader(true);
+    // TODO: add this to metadatareader in ORC - SI => metadata buffer, not 
just metadata.
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Reading [" + offset + ", "
+          + (offset + si.getFooterLength()) + ") based on " + si);
+    }
+    DiskRangeList footerRange = rawDataReader.readFileData(
+        new DiskRangeList(offset, offset + si.getFooterLength()), 0, false);
+    // LOG.error("Got " + RecordReaderUtils.stringifyDiskRanges(footerRange));
+    counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime);
+    assert footerRange.next == null; // Can only happens w/zcr for a single 
input buffer.
+    if (hasCache) {
+      LlapBufferOrBuffers cacheBuf = metadataCache.putStripeTail(stripeKey, 
footerRange.getData());
+      metadataCache.decRefBuffer(cacheBuf); // We don't use this one.
+    }
+    ByteBuffer bb = footerRange.getData();
+    return buildStripeFooter(Lists.<DiskRange>newArrayList(new BufferChunk(bb, 
0)),
+        bb.remaining(), orcReader.getCodec(), orcReader.getCompressionSize());
   }
 
-  private void ensureMetadataReader() throws IOException {
+  private void ensureRawDataReader(boolean isOpen) throws IOException {
     ensureOrcReader();
-    if (metadataReader != null) return;
+    if (rawDataReader != null) {
+      if (!isRawDataReaderOpen && isOpen) {
+        long startTime = counters.startTimeCounter();
+        rawDataReader.open();
+        counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime);
+      }
+      return;
+    }
     long startTime = counters.startTimeCounter();
     boolean useZeroCopy = (daemonConf != null) && 
OrcConf.USE_ZEROCOPY.getBoolean(daemonConf);
-    metadataReader = RecordReaderUtils.createDefaultDataReader(
-        DataReaderProperties.builder()
-        .withBufferSize(orcReader.getCompressionSize())
+    rawDataReader = RecordReaderUtils.createDefaultDataReader(
+        
DataReaderProperties.builder().withBufferSize(orcReader.getCompressionSize())
         .withCompression(orcReader.getCompressionKind())
-        .withFileSystem(fs)
-        .withPath(path)
+        .withFileSystem(fs).withPath(path)
         .withTypeCount(orcReader.getSchema().getMaximumId() + 1)
         .withZeroCopy(useZeroCopy)
         .build());
+
+    if (isOpen) {
+      rawDataReader.open();
+      isRawDataReaderOpen = true;
+    }
     counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime);
   }
 
@@ -781,7 +857,7 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
   }
 
   /**
-   * Determine which stripes to read for a split. Populates stripeIxFrom and 
readState.
+   * Determine which stripes to read for a split. Populates stripeIxFrom and 
stripeRgs.
    */
   public void determineStripesToRead() {
     // The unit of caching for ORC is (rg x column) (see OrcBatchKey).
@@ -837,8 +913,8 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
     }
 
     public DataWrapperForOrc() throws IOException {
-      ensureMetadataReader();
-      this.orcDataReader = metadataReader.clone();
+      ensureRawDataReader(false);
+      this.orcDataReader = rawDataReader.clone();
     }
 
     @Override
@@ -853,7 +929,7 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
       }
       if (gotAllData.value) return result;
       return (metadataCache == null) ? result
-          : metadataCache.getIncompleteCbs(fileKey, result, baseOffset, 
factory, gotAllData);
+          : metadataCache.getIncompleteCbs(fileKey, result, baseOffset, 
gotAllData);
     }
 
     @Override
@@ -887,9 +963,6 @@ public class OrcEncodedDataReader extends 
CallableWithNdc<Void>
     @Override
     public void close() throws IOException {
       orcDataReader.close();
-      if (metadataReader != null) {
-        metadataReader.close();
-      }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/50fb6f3c/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java
new file mode 100644
index 0000000..9d7951e
--- /dev/null
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java
@@ -0,0 +1,537 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.io.metadata;
+
+import org.apache.hadoop.hive.common.FileUtils;
+
+import org.apache.hadoop.hive.common.io.FileMetadataCache;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.hive.common.io.encoded.MemoryBufferOrBuffers;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.hive.common.io.DiskRange;
+import org.apache.hadoop.hive.common.io.DiskRangeList;
+import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
+import org.apache.hadoop.hive.llap.cache.EvictionAwareAllocator;
+import org.apache.hadoop.hive.llap.cache.EvictionDispatcher;
+import org.apache.hadoop.hive.llap.cache.LlapAllocatorBuffer;
+import org.apache.hadoop.hive.llap.cache.LlapOomDebugDump;
+import org.apache.hadoop.hive.llap.cache.LowLevelCachePolicy;
+import org.apache.hadoop.hive.llap.cache.MemoryManager;
+import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
+import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
+import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
+import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey;
+
+public class MetadataCache implements LlapOomDebugDump, FileMetadataCache {
+  private final ConcurrentHashMap<Object, LlapBufferOrBuffers> metadata =
+      new ConcurrentHashMap<>();
+
+  private final ConcurrentHashMap<Object, OrcFileEstimateErrors> 
estimateErrors;
+  private final MemoryManager memoryManager;
+  private final LowLevelCachePolicy policy;
+  private final EvictionAwareAllocator allocator;
+  private final LlapDaemonCacheMetrics metrics;
+
+  public MetadataCache(EvictionAwareAllocator allocator, MemoryManager 
memoryManager,
+      LowLevelCachePolicy policy, boolean useEstimateCache, 
LlapDaemonCacheMetrics metrics) {
+    this.memoryManager = memoryManager;
+    this.allocator = allocator;
+    this.policy = policy;
+    this.metrics = metrics;
+    this.estimateErrors = useEstimateCache
+        ? new ConcurrentHashMap<Object, OrcFileEstimateErrors>() : null;
+  }
+
+  public void putIncompleteCbs(Object fileKey, DiskRange[] ranges, long 
baseOffset) {
+    if (estimateErrors == null) return;
+    OrcFileEstimateErrors errorData = estimateErrors.get(fileKey);
+    boolean isNew = false;
+    // We should technically update memory usage if updating the old object, 
but we don't do it
+    // for now; there is no mechanism to properly notify the cache policy/etc. 
wrt parallel evicts.
+    if (errorData == null) {
+      errorData = new OrcFileEstimateErrors(fileKey);
+      for (DiskRange range : ranges) {
+        errorData.addError(range.getOffset(), range.getLength(), baseOffset);
+      }
+      long memUsage = errorData.estimateMemoryUsage();
+      memoryManager.reserveMemory(memUsage);
+      OrcFileEstimateErrors old = estimateErrors.putIfAbsent(fileKey, 
errorData);
+      if (old != null) {
+        errorData = old;
+        memoryManager.releaseMemory(memUsage);
+        policy.notifyLock(errorData);
+      } else {
+        isNew = true;
+        policy.cache(errorData, Priority.NORMAL);
+      }
+    } else {
+      policy.notifyLock(errorData);
+    }
+    if (!isNew) {
+      for (DiskRange range : ranges) {
+        errorData.addError(range.getOffset(), range.getLength(), baseOffset);
+      }
+    }
+    policy.notifyUnlock(errorData);
+  }
+
+  public DiskRangeList getIncompleteCbs(
+      Object fileKey, DiskRangeList ranges, long baseOffset, BooleanRef 
gotAllData) {
+    if (estimateErrors == null) return ranges;
+    OrcFileEstimateErrors errors = estimateErrors.get(fileKey);
+    if (errors == null) return ranges;
+    policy.notifyLock(errors);
+    policy.notifyUnlock(errors); // Never locked for eviction; Java object.
+    return errors.getIncompleteCbs(ranges, baseOffset, gotAllData);
+  }
+
+  public void notifyEvicted(LlapMetadataBuffer<?> buffer) {
+    LlapBufferOrBuffers removed = metadata.remove(buffer.getKey());
+    if (removed == null) return;
+    if (removed.getSingleBuffer() != null) {
+      assert removed.getSingleBuffer() == buffer;
+      return;
+    }
+    discardMultiBuffer(removed);
+  }
+
+  public void notifyEvicted(OrcFileEstimateErrors buffer) {
+    estimateErrors.remove(buffer.getFileKey());
+  }
+
+  @Override
+  public String debugDumpForOom() {
+    StringBuilder sb = new StringBuilder();
+    debugDumpShort(sb);
+    return sb.toString();
+  }
+
+  @Override
+  public void debugDumpShort(StringBuilder sb) {
+    // TODO: perhaps add counters for separate things and multiple buffer 
cases.
+    sb.append("\nMetadata cache state: ").append(metadata.size()).append(
+        " files and stripes, ").append(estimateErrors.size()).append(" files 
w/ORC estimate");
+  }
+
+  @Override
+  public LlapBufferOrBuffers getFileMetadata(Object fileKey) {
+    return getInternal(fileKey);
+  }
+
+  public LlapBufferOrBuffers getStripeTail(OrcBatchKey stripeKey) {
+    return getInternal(new StripeKey(stripeKey.fileKey, stripeKey.stripeIx));
+  }
+
+  private LlapBufferOrBuffers getInternal(Object key) {
+    LlapBufferOrBuffers result = metadata.get(key);
+    if (result == null) return null;
+    if (!lockBuffer(result, true)) {
+      // No need to discard the buffer we cannot lock - eviction takes care of 
that.
+      metadata.remove(key, result);
+      return null;
+    }
+    return result;
+  }
+
+  @Override
+  public LlapBufferOrBuffers putFileMetadata(Object fileKey, ByteBuffer 
tailBuffer) {
+    return putInternal(fileKey, tailBuffer);
+  }
+
+  public LlapBufferOrBuffers putStripeTail(OrcBatchKey stripeKey, ByteBuffer 
tailBuffer) {
+    return putInternal(new StripeKey(stripeKey.fileKey, stripeKey.stripeIx), 
tailBuffer);
+  }
+
+
+  @Override
+  public LlapBufferOrBuffers putFileMetadata(
+      Object fileKey, int length, InputStream is) throws IOException {
+    LlapBufferOrBuffers result = null;
+    while (true) { // Overwhelmingly executes once, or maybe twice (replacing 
stale value).
+      LlapBufferOrBuffers oldVal = metadata.get(fileKey);
+      if (oldVal == null) {
+        result = wrapBbForFile(result, fileKey, length, is);
+        if (!lockBuffer(result, false)) {
+          throw new AssertionError("Cannot lock a newly created value " + 
result);
+        }
+        oldVal = metadata.putIfAbsent(fileKey, result);
+        if (oldVal == null) {
+          cacheInPolicy(result); // Cached successfully, add to policy.
+          return result;
+        }
+      }
+      if (lockOldVal(fileKey, result, oldVal)) {
+        return oldVal;
+      }
+      // We found some old value but couldn't incRef it; remove it.
+      metadata.remove(fileKey, oldVal);
+    }
+  }
+
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  private LlapBufferOrBuffers wrapBbForFile(LlapBufferOrBuffers result,
+      Object fileKey, int length, InputStream stream) throws IOException {
+    if (result != null) return result;
+    int maxAlloc = allocator.getMaxAllocation();
+    LlapMetadataBuffer<Object>[] largeBuffers = null;
+    if (maxAlloc < length) {
+      largeBuffers = new LlapMetadataBuffer[length / maxAlloc];
+      for (int i = 0; i < largeBuffers.length; ++i) {
+        largeBuffers[i] = new LlapMetadataBuffer<Object>(fileKey);
+      }
+      allocator.allocateMultiple(largeBuffers, maxAlloc, null);
+      for (int i = 0; i < largeBuffers.length; ++i) {
+        readIntoCacheBuffer(stream, maxAlloc, largeBuffers[i]);
+      }
+    }
+    int smallSize = length % maxAlloc;
+    if (smallSize == 0) {
+      return new LlapMetadataBuffers(largeBuffers);
+    } else {
+      LlapMetadataBuffer<Object>[] smallBuffer = new LlapMetadataBuffer[1];
+      smallBuffer[0] = new LlapMetadataBuffer(fileKey);
+      allocator.allocateMultiple(smallBuffer, length, null);
+      readIntoCacheBuffer(stream, smallSize, smallBuffer[0]);
+      if (largeBuffers == null) {
+        return smallBuffer[0]; // This is the overwhelmingly common case.
+      } else {
+        LlapMetadataBuffer<Object>[] cacheData = new 
LlapMetadataBuffer[largeBuffers.length + 1];
+        System.arraycopy(largeBuffers, 0, cacheData, 0, largeBuffers.length);
+        cacheData[largeBuffers.length] = smallBuffer[0];
+        return new LlapMetadataBuffers<Object>(cacheData);
+      }
+    }
+  }
+
+  private static void readIntoCacheBuffer(
+      InputStream stream, int length, MemoryBuffer dest) throws IOException {
+    ByteBuffer bb = dest.getByteBufferRaw();
+    int pos = bb.position();
+    bb.limit(pos + length);
+    // TODO: SeekableInputStream.readFully eventually calls a Hadoop method 
that used to be
+    //       buggy in 2.7 and also anyway just does a copy for a direct 
buffer. Do a copy here.
+    // ((SeekableInputStream)stream).readFully(bb);
+    FileUtils.readFully(stream, length, bb);
+    bb.position(pos);
+  }
+
+  private <T> LlapBufferOrBuffers putInternal(T key, ByteBuffer tailBuffer) {
+    LlapBufferOrBuffers result = null;
+    while (true) { // Overwhelmingly executes once, or maybe twice (replacing 
stale value).
+      LlapBufferOrBuffers oldVal = metadata.get(key);
+      if (oldVal == null) {
+        result = wrapBb(result, key, tailBuffer);
+        oldVal = metadata.putIfAbsent(key, result);
+        if (oldVal == null) {
+          cacheInPolicy(result); // Cached successfully, add to policy.
+          return result;
+        }
+      }
+      if (lockOldVal(key, result, oldVal)) {
+        return oldVal;
+      }
+      // We found some old value but couldn't incRef it; remove it.
+      metadata.remove(key, oldVal);
+    }
+  }
+
+  private void cacheInPolicy(LlapBufferOrBuffers buffers) {
+    LlapAllocatorBuffer singleBuffer = buffers.getSingleLlapBuffer();
+    if (singleBuffer != null) {
+      policy.cache(singleBuffer, Priority.HIGH);
+      return;
+    }
+    for (LlapAllocatorBuffer buffer : buffers.getMultipleLlapBuffers()) {
+      policy.cache(buffer, Priority.HIGH);
+    }
+  }
+
+  private <T extends LlapBufferOrBuffers> boolean lockOldVal(Object key, T 
newVal, T oldVal) {
+    if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
+      LlapIoImpl.CACHE_LOGGER.trace("Trying to cache when metadata is already 
cached for" +
+          " {}; old {}, new {}", key, oldVal, newVal);
+    }
+    if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
+      LlapIoImpl.LOCKING_LOGGER.trace("Locking {} due to cache collision", 
oldVal);
+    }
+    if (lockBuffer(oldVal, true)) {
+      // We found an old, valid block for this key in the cache.
+      if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
+        LlapIoImpl.LOCKING_LOGGER.trace("Unlocking {} due to cache collision 
with {}",
+            newVal, oldVal);
+      }
+
+      if (newVal != null) {
+        unlockBuffer(newVal, false);
+      }
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public void decRefBuffer(MemoryBufferOrBuffers buffer) {
+    if (!(buffer instanceof LlapBufferOrBuffers)) {
+      throw new AssertionError(buffer.getClass());
+    }
+    unlockBuffer((LlapBufferOrBuffers)buffer, true);
+  }
+
+  private <T> LlapBufferOrBuffers wrapBb(
+      LlapBufferOrBuffers result, T key, ByteBuffer tailBuffer) {
+    if (result != null) return result;
+    if (tailBuffer.remaining() <= allocator.getMaxAllocation()) {
+      // The common case by far.
+      return wrapSmallBb(new LlapMetadataBuffer<T>(key), tailBuffer);
+    } else {
+      int allocCount = determineAllocCount(tailBuffer);
+      @SuppressWarnings("unchecked")
+      LlapMetadataBuffer<T>[] results = new LlapMetadataBuffer[allocCount];
+      for (int i = 0; i < allocCount; ++i) {
+        results[i] = new LlapMetadataBuffer<T>(key);
+      }
+      wrapLargeBb(results, tailBuffer);
+      return new LlapMetadataBuffers<T>(results);
+    }
+  }
+
+  private <T extends LlapAllocatorBuffer> T wrapSmallBb(T result, ByteBuffer 
tailBuffer) {
+    // Note: we pass in null factory because we allocate objects here. We 
could also pass a
+    //       per-call factory that would set fileKey; or set it after put.
+    allocator.allocateMultiple(new MemoryBuffer[] { result }, 
tailBuffer.remaining(), null);
+    return putBufferToDest(tailBuffer.duplicate(), result);
+  }
+
+  private <T extends LlapAllocatorBuffer> void wrapLargeBb(T[] results, 
ByteBuffer tailBuffer) {
+    // Note: we pass in null factory because we allocate objects here. We 
could also pass a
+    //       per-call factory that would set fileKey; or set it after put.
+    allocator.allocateMultiple(results, allocator.getMaxAllocation(), null);
+    ByteBuffer src = tailBuffer.duplicate();
+    int pos = src.position(), remaining = src.remaining();
+    for (int i = 0; i < results.length; ++i) {
+      T result = results[i];
+      int toPut = Math.min(remaining, result.getByteBufferRaw().remaining());
+      assert toPut > 0;
+      src.position(pos);
+      src.limit(pos + toPut);
+      pos += toPut;
+      remaining -= toPut;
+      putBufferToDest(src, result);
+    }
+  }
+
+  private <T extends LlapAllocatorBuffer> T putBufferToDest(ByteBuffer src, T 
result) {
+    ByteBuffer dest = result.getByteBufferRaw();
+    int startPos = dest.position();
+    dest.put(src);
+    int newPos = dest.position();
+    dest.position(startPos);
+    dest.limit(newPos);
+    boolean canLock = lockOneBuffer(result, false);
+    assert canLock;
+    return result;
+  }
+
+  public int determineAllocCount(ByteBuffer tailBuffer) {
+    int total = tailBuffer.remaining(), maxAlloc = 
allocator.getMaxAllocation();
+    return total / maxAlloc + ((total % maxAlloc) > 0 ? 1 : 0);
+  }
+
+  private boolean lockBuffer(LlapBufferOrBuffers buffers, boolean 
doNotifyPolicy) {
+    LlapAllocatorBuffer buffer = buffers.getSingleLlapBuffer();
+    if (buffer != null) {
+      return lockOneBuffer(buffer, doNotifyPolicy);
+    }
+    LlapAllocatorBuffer[] bufferArray = buffers.getMultipleLlapBuffers();
+    for (int i = 0; i < bufferArray.length; ++i) {
+      if (lockOneBuffer(bufferArray[i], doNotifyPolicy)) continue;
+      for (int j = 0; j < i; ++j) {
+        unlockSingleBuffer(buffer, true);
+      }
+      discardMultiBuffer(buffers);
+      return false;
+    }
+    return true;
+  }
+
+  private void discardMultiBuffer(LlapBufferOrBuffers removed) {
+    long memoryFreed = 0;
+    for (LlapAllocatorBuffer buf : removed.getMultipleLlapBuffers()) {
+      long memUsage = buf.getMemoryUsage();
+      // We cannot just deallocate the buffer, as it can hypothetically have 
users.
+      int result = buf.invalidate();
+      switch (result) {
+      case LlapAllocatorBuffer.INVALIDATE_ALREADY_INVALID: continue; // 
Nothing to do.
+      case LlapAllocatorBuffer.INVALIDATE_FAILED: {
+        // Someone is using this buffer; eventually, it will be evicted.
+        continue;
+      }
+      case LlapAllocatorBuffer.INVALIDATE_OK: {
+        memoryFreed += memUsage;
+        allocator.deallocateEvicted(buf);
+        break;
+      }
+      default: throw new AssertionError(result);
+      }
+    }
+    memoryManager.releaseMemory(memoryFreed);
+  }
+
+  public boolean lockOneBuffer(LlapAllocatorBuffer buffer, boolean 
doNotifyPolicy) {
+    int rc = buffer.incRef();
+    if (rc > 0) {
+      metrics.incrCacheNumLockedBuffers();
+    }
+    if (doNotifyPolicy && rc == 1) {
+      // We have just locked a buffer that wasn't previously locked.
+      policy.notifyLock(buffer);
+    }
+    return rc > 0;
+  }
+
+  private void unlockBuffer(LlapBufferOrBuffers buffers, boolean isCached) {
+    LlapAllocatorBuffer singleBuffer = buffers.getSingleLlapBuffer();
+    if (singleBuffer != null) {
+      unlockSingleBuffer(singleBuffer, isCached);
+      return;
+    }
+    for (LlapAllocatorBuffer buffer : buffers.getMultipleLlapBuffers()) {
+      unlockSingleBuffer(buffer, isCached);
+    }
+  }
+
+  private void unlockSingleBuffer(LlapAllocatorBuffer buffer, boolean 
isCached) {
+    boolean isLastDecref = (buffer.decRef() == 0);
+    if (isLastDecref) {
+      if (isCached) {
+        policy.notifyUnlock(buffer);
+      } else {
+        allocator.deallocate(buffer);
+      }
+    }
+    metrics.decrCacheNumLockedBuffers();
+  }
+
+  private final static class StripeKey {
+    private final Object fileKey;
+    private final int stripeIx;
+
+    public StripeKey(Object fileKey, int stripeIx) {
+      this.fileKey = fileKey;
+      this.stripeIx = stripeIx;
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      return (prime + ((fileKey == null) ? 0 : fileKey.hashCode())) * prime + 
stripeIx;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) return true;
+      if (!(obj instanceof StripeKey)) return false;
+      StripeKey other = (StripeKey)obj;
+      return ((fileKey == null) == (other.fileKey == null))
+          && (fileKey == null || fileKey.equals(other.fileKey)) && (stripeIx 
== other.stripeIx);
+    }
+  }
+
+  public static interface LlapBufferOrBuffers extends MemoryBufferOrBuffers {
+    LlapAllocatorBuffer getSingleLlapBuffer();
+    LlapAllocatorBuffer[] getMultipleLlapBuffers();
+  }
+
+
+  public final static class LlapMetadataBuffer<T>
+      extends LlapAllocatorBuffer implements LlapBufferOrBuffers {
+    private final T key;
+
+    public LlapMetadataBuffer(T key) {
+      this.key = key;
+    }
+
+    @Override
+    public void notifyEvicted(EvictionDispatcher evictionDispatcher) {
+      evictionDispatcher.notifyEvicted(this);
+    }
+
+    public T getKey() {
+      return key;
+    }
+
+    @Override
+    public LlapAllocatorBuffer getSingleBuffer() {
+      return this;
+    }
+
+    @Override
+    public LlapAllocatorBuffer[] getMultipleBuffers() {
+      return null;
+    }
+
+
+    @Override
+    public LlapAllocatorBuffer getSingleLlapBuffer() {
+      return this;
+    }
+
+    @Override
+    public LlapAllocatorBuffer[] getMultipleLlapBuffers() {
+      return null;
+    }
+  }
+
+  public final static class LlapMetadataBuffers<T> implements 
LlapBufferOrBuffers {
+    private final LlapMetadataBuffer<T>[] buffers;
+
+    public LlapMetadataBuffers(LlapMetadataBuffer<T>[] buffers) {
+      this.buffers = buffers;
+    }
+
+    @Override
+    public LlapAllocatorBuffer getSingleBuffer() {
+      return null;
+    }
+
+    @Override
+    public LlapAllocatorBuffer[] getMultipleBuffers() {
+      return buffers;
+    }
+
+    @Override
+    public LlapAllocatorBuffer getSingleLlapBuffer() {
+      return null;
+    }
+
+    @Override
+    public LlapAllocatorBuffer[] getMultipleLlapBuffers() {
+      return buffers;
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/hive/blob/50fb6f3c/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java
index dc053ee..6cf9563 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java
@@ -23,7 +23,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.io.DiskRangeList;
 import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
-import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory;
 import org.apache.hadoop.hive.common.io.DiskRangeList.MutateHelper;
 import org.apache.hadoop.hive.llap.IncrementalObjectSizeEstimator;
 import 
org.apache.hadoop.hive.llap.IncrementalObjectSizeEstimator.ObjectEstimator;
@@ -61,8 +60,8 @@ public class OrcFileEstimateErrors extends 
LlapCacheableBuffer {
     }
   }
 
-  public DiskRangeList getIncompleteCbs(DiskRangeList ranges, long baseOffset,
-      DiskRangeListFactory factory, BooleanRef gotAllData) {
+  public DiskRangeList getIncompleteCbs(
+      DiskRangeList ranges, long baseOffset, BooleanRef gotAllData) {
     DiskRangeList prev = ranges.prev;
     if (prev == null) {
       prev = new MutateHelper(ranges);

http://git-wip-us.apache.org/repos/asf/hive/blob/50fb6f3c/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java
index b9d7a77..8af161f 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java
@@ -18,31 +18,22 @@
 
 package org.apache.hadoop.hive.llap.io.metadata;
 
-import com.google.common.annotations.VisibleForTesting;
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import org.apache.hadoop.hive.llap.IncrementalObjectSizeEstimator;
-import 
org.apache.hadoop.hive.llap.IncrementalObjectSizeEstimator.ObjectEstimator;
-import org.apache.hadoop.hive.llap.cache.EvictionDispatcher;
-import org.apache.hadoop.hive.llap.cache.LlapCacheableBuffer;
-import org.apache.hadoop.hive.ql.io.SyntheticFileId;
+
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
-import org.apache.hadoop.hive.ql.io.orc.Reader;
 import org.apache.orc.CompressionKind;
 import org.apache.orc.FileMetadata;
 import org.apache.orc.OrcProto;
+import org.apache.orc.OrcProto.StripeStatistics;
 import org.apache.orc.OrcUtils;
 import org.apache.orc.StripeInformation;
 import org.apache.orc.TypeDescription;
-import org.apache.orc.impl.ReaderImpl;
 
 /** ORC file metadata. Currently contains some duplicate info due to how 
different parts
  * of ORC use different info. Ideally we would get rid of protobuf structs in 
code beyond reading,
  * or instead use protobuf structs everywhere instead of the mix of things 
like now.
  */
-public final class OrcFileMetadata extends LlapCacheableBuffer
-    implements FileMetadata, ConsumerFileMetadata {
+public final class OrcFileMetadata implements FileMetadata, 
ConsumerFileMetadata {
   private final List<StripeInformation> stripes;
   private final List<Integer> versionList;
   private final List<OrcProto.StripeStatistics> stripeStats;
@@ -58,91 +49,22 @@ public final class OrcFileMetadata extends 
LlapCacheableBuffer
   private final long numberOfRows;
   private final boolean isOriginalFormat;
 
-  private final int estimatedMemUsage;
-
-  private final static HashMap<Class<?>, ObjectEstimator> SIZE_ESTIMATORS;
-  private final static ObjectEstimator SIZE_ESTIMATOR;
-  static {
-    OrcFileMetadata ofm = createDummy(new SyntheticFileId());
-    SIZE_ESTIMATORS = IncrementalObjectSizeEstimator.createEstimators(ofm);
-    IncrementalObjectSizeEstimator.addEstimator(
-        "com.google.protobuf.LiteralByteString", SIZE_ESTIMATORS);
-    // Add long for the regular file ID estimation.
-    IncrementalObjectSizeEstimator.createEstimators(Long.class, 
SIZE_ESTIMATORS);
-    SIZE_ESTIMATOR = SIZE_ESTIMATORS.get(OrcFileMetadata.class);
-  }
-
-  @VisibleForTesting
-  public static OrcFileMetadata createDummy(Object fileKey) {
-    OrcFileMetadata ofm = new OrcFileMetadata(fileKey);
-    ofm.stripes.add(new ReaderImpl.StripeInformationImpl(
-        OrcProto.StripeInformation.getDefaultInstance()));
-    ofm.fileStats.add(OrcProto.ColumnStatistics.getDefaultInstance());
-    
ofm.stripeStats.add(OrcProto.StripeStatistics.newBuilder().addColStats(createStatsDummy()).build());
-    
ofm.types.add(OrcProto.Type.newBuilder().addFieldNames("a").addSubtypes(0).build());
-    ofm.versionList.add(0);
-    return ofm;
-  }
-
-  static OrcProto.ColumnStatistics.Builder createStatsDummy() {
-    return OrcProto.ColumnStatistics.newBuilder().setBucketStatistics(
-            
OrcProto.BucketStatistics.newBuilder().addCount(0)).setStringStatistics(
-            OrcProto.StringStatistics.newBuilder().setMaximum("zzz"));
-  }
-
-  // Ctor for memory estimation and tests
-  private OrcFileMetadata(Object fileKey) {
-    this.fileKey = fileKey;
-    stripes = new ArrayList<StripeInformation>();
-    versionList = new ArrayList<Integer>();
-    fileStats = new ArrayList<>();
-    stripeStats = new ArrayList<>();
-    types = new ArrayList<>();
-    writerVersionNum = metadataSize = compressionBufferSize = rowIndexStride = 
0;
-    contentLength = numberOfRows = 0;
-    estimatedMemUsage = 0;
-    isOriginalFormat = false;
-    compressionKind = CompressionKind.NONE;
-  }
-
-  public OrcFileMetadata(Object fileKey, Reader reader) {
+  public OrcFileMetadata(Object fileKey, OrcProto.Footer footer, 
OrcProto.PostScript ps,
+      List<StripeStatistics> stats, List<StripeInformation> stripes) {
+    this.stripeStats = stats;
+    this.compressionKind = CompressionKind.valueOf(ps.getCompression().name());
+    this.compressionBufferSize = (int)ps.getCompressionBlockSize();
+    this.stripes = stripes;
+    this.isOriginalFormat = OrcInputFormat.isOriginal(footer);
+    this.writerVersionNum = ps.getWriterVersion();
+    this.versionList = ps.getVersionList();
+    this.metadataSize = (int) ps.getMetadataLength();
+    this.types = footer.getTypesList();
+    this.rowIndexStride = footer.getRowIndexStride();
+    this.contentLength = footer.getContentLength();
+    this.numberOfRows = footer.getNumberOfRows();
+    this.fileStats = footer.getStatisticsList();
     this.fileKey = fileKey;
-    this.stripeStats = reader.getOrcProtoStripeStatistics();
-    this.compressionKind = reader.getCompressionKind();
-    this.compressionBufferSize = reader.getCompressionSize();
-    this.stripes = reader.getStripes();
-    this.isOriginalFormat = OrcInputFormat.isOriginal(reader);
-    this.writerVersionNum = reader.getWriterVersion().getId();
-    this.versionList = reader.getVersionList();
-    this.metadataSize = reader.getMetadataSize();
-    this.types = reader.getTypes();
-    this.rowIndexStride = reader.getRowIndexStride();
-    this.contentLength = reader.getContentLength();
-    this.numberOfRows = reader.getNumberOfRows();
-    this.fileStats = reader.getOrcProtoFileStatistics();
-
-    this.estimatedMemUsage = SIZE_ESTIMATOR.estimate(this, SIZE_ESTIMATORS);
-  }
-
-  // LlapCacheableBuffer
-  @Override
-  public void notifyEvicted(EvictionDispatcher evictionDispatcher) {
-    evictionDispatcher.notifyEvicted(this);
-  }
-
-  @Override
-  protected int invalidate() {
-    return INVALIDATE_OK; // relies on GC, so it can always be evicted now.
-  }
-
-  @Override
-  public long getMemoryUsage() {
-    return estimatedMemUsage;
-  }
-
-  @Override
-  protected boolean isLocked() {
-    return false;
   }
 
   // FileMetadata

http://git-wip-us.apache.org/repos/asf/hive/blob/50fb6f3c/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
deleted file mode 100644
index 601b622..0000000
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap.io.metadata;
-
-import org.apache.hadoop.hive.llap.cache.LlapCacheableBuffer;
-
-import java.io.IOException;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.hadoop.hive.common.io.DiskRange;
-import org.apache.hadoop.hive.common.io.DiskRangeList;
-import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
-import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory;
-import org.apache.hadoop.hive.llap.cache.LlapOomDebugDump;
-import org.apache.hadoop.hive.llap.cache.LowLevelCachePolicy;
-import org.apache.hadoop.hive.llap.cache.MemoryManager;
-import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
-import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey;
-
-public class OrcMetadataCache implements LlapOomDebugDump {
-  private final ConcurrentHashMap<Object, OrcFileMetadata> metadata = new 
ConcurrentHashMap<>();
-  private final ConcurrentHashMap<OrcBatchKey, OrcStripeMetadata> 
stripeMetadata =
-      new ConcurrentHashMap<>();
-  private final ConcurrentHashMap<Object, OrcFileEstimateErrors> 
estimateErrors;
-  private final MemoryManager memoryManager;
-  private final LowLevelCachePolicy policy;
-
-  public OrcMetadataCache(MemoryManager memoryManager, LowLevelCachePolicy 
policy,
-      boolean useEstimateCache) {
-    this.memoryManager = memoryManager;
-    this.policy = policy;
-    this.estimateErrors = useEstimateCache
-        ? new ConcurrentHashMap<Object, OrcFileEstimateErrors>() : null;
-  }
-
-  public OrcFileMetadata putFileMetadata(OrcFileMetadata metaData) {
-    long memUsage = metaData.getMemoryUsage();
-    memoryManager.reserveMemory(memUsage);
-    OrcFileMetadata val = metadata.putIfAbsent(metaData.getFileKey(), 
metaData);
-    // See OrcFileMetadata; it is always unlocked, so we just "touch" it here 
to simulate use.
-    return touchOnPut(metaData, val, memUsage);
-  }
-
-  public OrcStripeMetadata putStripeMetadata(OrcStripeMetadata metaData) {
-    long memUsage = metaData.getMemoryUsage();
-    memoryManager.reserveMemory(memUsage);
-    OrcStripeMetadata val = stripeMetadata.putIfAbsent(metaData.getKey(), 
metaData);
-    // See OrcStripeMetadata; it is always unlocked, so we just "touch" it 
here to simulate use.
-    return touchOnPut(metaData, val, memUsage);
-  }
-
-  private <T extends LlapCacheableBuffer> T touchOnPut(T newVal, T oldVal, 
long memUsage) {
-    if (oldVal == null) {
-      oldVal = newVal;
-      policy.cache(oldVal, Priority.HIGH);
-    } else {
-      memoryManager.releaseMemory(memUsage);
-      policy.notifyLock(oldVal);
-    }
-    policy.notifyUnlock(oldVal);
-    return oldVal;
-  }
-
-
-  public void putIncompleteCbs(Object fileKey, DiskRange[] ranges, long 
baseOffset) {
-    if (estimateErrors == null) return;
-    OrcFileEstimateErrors errorData = estimateErrors.get(fileKey);
-    boolean isNew = false;
-    // We should technically update memory usage if updating the old object, 
but we don't do it
-    // for now; there is no mechanism to properly notify the cache policy/etc. 
wrt parallel evicts.
-    if (errorData == null) {
-      errorData = new OrcFileEstimateErrors(fileKey);
-      for (DiskRange range : ranges) {
-        errorData.addError(range.getOffset(), range.getLength(), baseOffset);
-      }
-      long memUsage = errorData.estimateMemoryUsage();
-      memoryManager.reserveMemory(memUsage);
-      OrcFileEstimateErrors old = estimateErrors.putIfAbsent(fileKey, 
errorData);
-      if (old != null) {
-        errorData = old;
-        memoryManager.releaseMemory(memUsage);
-        policy.notifyLock(errorData);
-      } else {
-        isNew = true;
-        policy.cache(errorData, Priority.NORMAL);
-      }
-    }
-    if (!isNew) {
-      for (DiskRange range : ranges) {
-        errorData.addError(range.getOffset(), range.getLength(), baseOffset);
-      }
-    }
-    policy.notifyUnlock(errorData);
-  }
-
-  public OrcStripeMetadata getStripeMetadata(OrcBatchKey stripeKey) throws 
IOException {
-    return touchOnGet(stripeMetadata.get(stripeKey));
-  }
-
-  public OrcFileMetadata getFileMetadata(Object fileKey) throws IOException {
-    return touchOnGet(metadata.get(fileKey));
-  }
-
-  private <T extends LlapCacheableBuffer> T touchOnGet(T result) {
-    if (result != null) {
-      policy.notifyLock(result);
-      policy.notifyUnlock(result); // Never locked for eviction; Java object.
-    }
-    return result;
-  }
-
-  public DiskRangeList getIncompleteCbs(Object fileKey, DiskRangeList ranges, 
long baseOffset,
-      DiskRangeListFactory factory, BooleanRef gotAllData) {
-    if (estimateErrors == null) return ranges;
-    OrcFileEstimateErrors errors = estimateErrors.get(fileKey);
-    if (errors == null) return ranges;
-    return errors.getIncompleteCbs(ranges, baseOffset, factory, gotAllData);
-  }
-
-  public void notifyEvicted(OrcFileMetadata buffer) {
-    metadata.remove(buffer.getFileKey());
-    // See OrcFileMetadata - we don't clear the object, it will be GCed when 
released by users.
-  }
-
-  public void notifyEvicted(OrcStripeMetadata buffer) {
-    stripeMetadata.remove(buffer.getKey());
-    // See OrcStripeMetadata - we don't clear the object, it will be GCed when 
released by users.
-  }
-
-  public void notifyEvicted(OrcFileEstimateErrors buffer) {
-    estimateErrors.remove(buffer.getFileKey());
-  }
-
-  @Override
-  public String debugDumpForOom() {
-    StringBuilder sb = new StringBuilder();
-    debugDumpShort(sb);
-    return sb.toString();
-  }
-
-  @Override
-  public void debugDumpShort(StringBuilder sb) {
-    sb.append("\nORC metadata cache state: ").append(metadata.size()).append(" 
files, ")
-      .append(stripeMetadata.size()).append(" stripes, 
").append(estimateErrors.size())
-      .append(" files w/ORC estimate");
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/50fb6f3c/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
index 4565d11..92b9c8f 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
@@ -19,115 +19,30 @@ package org.apache.hadoop.hive.llap.io.metadata;
 
 import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 
-import org.apache.hadoop.hive.llap.IncrementalObjectSizeEstimator;
-import 
org.apache.hadoop.hive.llap.IncrementalObjectSizeEstimator.ObjectEstimator;
-import org.apache.hadoop.hive.llap.cache.EvictionDispatcher;
-import org.apache.hadoop.hive.llap.cache.LlapCacheableBuffer;
-import org.apache.hadoop.hive.ql.io.SyntheticFileId;
-import org.apache.hadoop.hive.ql.io.orc.OrcFile;
 import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey;
-import org.apache.orc.DataReader;
 import org.apache.orc.OrcProto;
 import org.apache.orc.OrcProto.RowIndexEntry;
 import org.apache.orc.StripeInformation;
-import org.apache.orc.TypeDescription;
 import org.apache.orc.impl.OrcIndex;
 
-public class OrcStripeMetadata extends LlapCacheableBuffer implements 
ConsumerStripeMetadata {
-  private final TypeDescription schema;
+public class OrcStripeMetadata implements ConsumerStripeMetadata {
   private final OrcBatchKey stripeKey;
   private final List<OrcProto.ColumnEncoding> encodings;
   private final List<OrcProto.Stream> streams;
   private final String writerTimezone;
   private final long rowCount;
   private OrcIndex rowIndex;
-  private OrcFile.WriterVersion writerVersion;
-
-  private final int estimatedMemUsage;
-
-  private final static HashMap<Class<?>, ObjectEstimator> SIZE_ESTIMATORS;
-  private final static ObjectEstimator SIZE_ESTIMATOR;
-  static {
-    OrcStripeMetadata osm = createDummy(new SyntheticFileId());
-    SIZE_ESTIMATORS = IncrementalObjectSizeEstimator.createEstimators(osm);
-    IncrementalObjectSizeEstimator.addEstimator(
-        "com.google.protobuf.LiteralByteString", SIZE_ESTIMATORS);
-    // Add long for the regular file ID estimation.
-    IncrementalObjectSizeEstimator.createEstimators(Long.class, 
SIZE_ESTIMATORS);
-    SIZE_ESTIMATOR = SIZE_ESTIMATORS.get(OrcStripeMetadata.class);
-  }
 
-  public OrcStripeMetadata(OrcBatchKey stripeKey, DataReader mr, 
StripeInformation stripe,
-                           boolean[] includes, boolean[] sargColumns, 
TypeDescription schema,
-                           OrcFile.WriterVersion writerVersion) throws 
IOException {
-    this.schema = schema;
+  public OrcStripeMetadata(OrcBatchKey stripeKey, OrcProto.StripeFooter footer,
+      OrcIndex orcIndex, StripeInformation stripe) throws IOException {
     this.stripeKey = stripeKey;
-    OrcProto.StripeFooter footer = mr.readStripeFooter(stripe);
     streams = footer.getStreamsList();
     encodings = footer.getColumnsList();
     writerTimezone = footer.getWriterTimezone();
     rowCount = stripe.getNumberOfRows();
-    rowIndex = mr.readRowIndex(stripe, schema, footer, true, includes, null,
-        sargColumns, writerVersion, null, null);
-
-    estimatedMemUsage = SIZE_ESTIMATOR.estimate(this, SIZE_ESTIMATORS);
-    this.writerVersion = writerVersion;
-  }
-
-  private OrcStripeMetadata(Object id) {
-    stripeKey = new OrcBatchKey(id, 0, 0);
-    encodings = new ArrayList<>();
-    streams = new ArrayList<>();
-    writerTimezone = "";
-    schema = TypeDescription.fromString("struct<x:int>");
-    rowCount = estimatedMemUsage = 0;
-  }
-
-  @VisibleForTesting
-  public static OrcStripeMetadata createDummy(Object id) {
-    OrcStripeMetadata dummy = new OrcStripeMetadata(id);
-    dummy.encodings.add(OrcProto.ColumnEncoding.getDefaultInstance());
-    dummy.streams.add(OrcProto.Stream.getDefaultInstance());
-    OrcProto.RowIndex ri = OrcProto.RowIndex.newBuilder().addEntry(
-        OrcProto.RowIndexEntry.newBuilder().addPositions(1).setStatistics(
-            OrcFileMetadata.createStatsDummy())).build();
-    OrcProto.BloomFilterIndex bfi = 
OrcProto.BloomFilterIndex.newBuilder().addBloomFilter(
-        OrcProto.BloomFilter.newBuilder().addBitset(0)).build();
-    dummy.rowIndex = new OrcIndex(
-        new OrcProto.RowIndex[] { ri },
-        new OrcProto.Stream.Kind[] { OrcProto.Stream.Kind.BLOOM_FILTER_UTF8 },
-        new OrcProto.BloomFilterIndex[] { bfi });
-    return dummy;
-  }
-
-  public boolean hasAllIndexes(boolean[] includes) {
-    for (int i = 0; i < includes.length; ++i) {
-      if (includes[i] && rowIndex.getRowGroupIndex()[i] == null) return false;
-    }
-    return true;
-  }
-
-  public void loadMissingIndexes(DataReader mr, StripeInformation stripe, 
boolean[] includes,
-      boolean[] sargColumns) throws IOException {
-    // Do not lose the old indexes. Create a super set includes
-    OrcProto.RowIndex[] existing = getRowIndexes();
-    boolean superset[] = new boolean[Math.max(existing.length, 
includes.length)];
-    for (int i = 0; i < includes.length; i++) {
-      superset[i] = includes[i];
-    }
-    for (int i = 0; i < existing.length; i++) {
-      superset[i] = superset[i] || (existing[i] != null);
-    }
-    // TODO: should we save footer to avoid a read here?
-    rowIndex = mr.readRowIndex(stripe, schema, null, true, superset,
-        rowIndex.getRowGroupIndex(),
-        sargColumns, writerVersion, rowIndex.getBloomFilterKinds(),
-        rowIndex.getBloomFilterIndex());
-    // TODO: theoretically, we should re-estimate memory usage here and update 
memory manager
+    rowIndex = orcIndex;
   }
 
   public int getStripeIx() {
@@ -157,25 +72,6 @@ public class OrcStripeMetadata extends LlapCacheableBuffer 
implements ConsumerSt
   public String getWriterTimezone() {
     return writerTimezone;
   }
-  @Override
-  public long getMemoryUsage() {
-    return estimatedMemUsage;
-  }
-
-  @Override
-  public void notifyEvicted(EvictionDispatcher evictionDispatcher) {
-    evictionDispatcher.notifyEvicted(this);
-  }
-
-  @Override
-  protected int invalidate() {
-    return INVALIDATE_OK;
-  }
-
-  @Override
-  protected boolean isLocked() {
-    return false;
-  }
 
   public OrcBatchKey getKey() {
     return stripeKey;
@@ -199,4 +95,15 @@ public class OrcStripeMetadata extends LlapCacheableBuffer 
implements ConsumerSt
   public boolean supportsRowIndexes() {
     return true;
   }
+
+  public OrcIndex getIndex() {
+    return rowIndex;
+  }
+
+  @Override
+  public String toString() {
+    return "OrcStripeMetadata [stripeKey=" + stripeKey + ", rowCount="
+        + rowCount + ", writerTimezone=" + writerTimezone + ", encodings="
+        + encodings + ", streams=" + streams + ", rowIndex=" + rowIndex + "]";
+  }
 }

Reply via email to