HIVE-15665 : LLAP: OrcFileMetadata objects in cache can impact heap usage (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/50fb6f3c Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/50fb6f3c Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/50fb6f3c Branch: refs/heads/hive-14535 Commit: 50fb6f3cb4651be5a1e0c0bae1f59f193f2c7e09 Parents: 4a4ae12 Author: sergey <ser...@apache.org> Authored: Fri Sep 15 12:09:11 2017 -0700 Committer: sergey <ser...@apache.org> Committed: Fri Sep 15 12:09:11 2017 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 6 - .../hive/llap/cache/EvictionDispatcher.java | 27 +- .../hive/llap/io/api/impl/LlapIoImpl.java | 42 +- .../llap/io/decode/OrcColumnVectorProducer.java | 6 +- .../llap/io/encoded/OrcEncodedDataReader.java | 345 +++++++----- .../hive/llap/io/metadata/MetadataCache.java | 537 +++++++++++++++++++ .../llap/io/metadata/OrcFileEstimateErrors.java | 5 +- .../hive/llap/io/metadata/OrcFileMetadata.java | 114 +--- .../hive/llap/io/metadata/OrcMetadataCache.java | 163 ------ .../llap/io/metadata/OrcStripeMetadata.java | 123 +---- .../io/metadata/ParquetMetadataCacheImpl.java | 353 ------------ .../TestIncrementalObjectSizeEstimator.java | 4 +- .../hive/llap/cache/TestOrcMetadataCache.java | 159 ++++-- .../hadoop/hive/ql/io/orc/OrcInputFormat.java | 11 + .../hive/ql/io/orc/encoded/EncodedReader.java | 14 + .../ql/io/orc/encoded/EncodedReaderImpl.java | 534 ++++++++++++++---- .../hadoop/hive/ql/io/orc/encoded/Reader.java | 4 + .../hive/ql/io/orc/encoded/ReaderImpl.java | 11 +- .../clientpositive/llap/orc_llap_counters.q.out | 98 ++-- .../llap/orc_llap_counters1.q.out | 11 +- .../clientpositive/llap/orc_ppd_basic.q.out | 147 ++--- .../llap/orc_ppd_schema_evol_3a.q.out | 156 +++--- .../hive/common/io/FileMetadataCache.java | 8 +- 23 files changed, 1616 insertions(+), 1262 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/50fb6f3c/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index b22a834..8a906ce 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3010,12 +3010,6 @@ public class HiveConf extends Configuration { LLAP_ALLOCATOR_MAX_ALLOC("hive.llap.io.allocator.alloc.max", "16Mb", new SizeValidator(), "Maximum allocation possible from LLAP buddy allocator. For ORC, should be as large as\n" + "the largest expected ORC compression buffer size. Must be a power of 2."), - @Deprecated - LLAP_IO_METADATA_FRACTION("hive.llap.io.metadata.fraction", 0.1f, - "Temporary setting for on-heap metadata cache fraction of xmx, set to avoid potential\n" + - "heap problems on very large datasets when on-heap metadata cache takes over\n" + - "everything. -1 managed metadata and data together (which is more flexible). This\n" + - "setting will be removed (in effect become -1) once ORC metadata cache is moved off-heap."), LLAP_ALLOCATOR_ARENA_COUNT("hive.llap.io.allocator.arena.count", 8, "Arena count for LLAP low-level cache; cache will be allocated in the steps of\n" + "(size/arena_count) bytes. This size must be <= 1Gb and >= max allocation; if it is\n" + http://git-wip-us.apache.org/repos/asf/hive/blob/50fb6f3c/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java index c5248ce..b226906 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java @@ -19,11 +19,8 @@ package org.apache.hadoop.hive.llap.cache; import org.apache.hadoop.hive.llap.cache.SerDeLowLevelCacheImpl.LlapSerDeDataBuffer; import org.apache.hadoop.hive.llap.io.metadata.OrcFileEstimateErrors; -import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata; -import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache; -import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata; -import org.apache.hadoop.hive.llap.io.metadata.ParquetMetadataCacheImpl; -import org.apache.hadoop.hive.llap.io.metadata.ParquetMetadataCacheImpl.LlapFileMetadataBuffer; +import org.apache.hadoop.hive.llap.io.metadata.MetadataCache; +import org.apache.hadoop.hive.llap.io.metadata.MetadataCache.LlapMetadataBuffer; /** * Eviction dispatcher - uses double dispatch to route eviction notifications to correct caches. @@ -31,17 +28,13 @@ import org.apache.hadoop.hive.llap.io.metadata.ParquetMetadataCacheImpl.LlapFile public final class EvictionDispatcher implements EvictionListener, LlapOomDebugDump { private final LowLevelCache dataCache; private final SerDeLowLevelCacheImpl serdeCache; - private final OrcMetadataCache metadataCache; + private final MetadataCache metadataCache; private final EvictionAwareAllocator allocator; - // TODO# temporary, will be merged with OrcMetadataCache after HIVE-15665. - private final ParquetMetadataCacheImpl parquetMetadataCache; public EvictionDispatcher(LowLevelCache dataCache, SerDeLowLevelCacheImpl serdeCache, - OrcMetadataCache metadataCache, EvictionAwareAllocator allocator, - ParquetMetadataCacheImpl parquetMetadataCache) { + MetadataCache metadataCache, EvictionAwareAllocator allocator) { this.dataCache = dataCache; this.metadataCache = metadataCache; - this.parquetMetadataCache = parquetMetadataCache; this.serdeCache = serdeCache; this.allocator = allocator; } @@ -51,10 +44,6 @@ public final class EvictionDispatcher implements EvictionListener, LlapOomDebugD buffer.notifyEvicted(this); // This will call one of the specific notifyEvicted overloads. } - public void notifyEvicted(LlapFileMetadataBuffer buffer) { - this.parquetMetadataCache.notifyEvicted(buffer); - } - public void notifyEvicted(LlapSerDeDataBuffer buffer) { serdeCache.notifyEvicted(buffer); allocator.deallocateEvicted(buffer); @@ -65,12 +54,10 @@ public final class EvictionDispatcher implements EvictionListener, LlapOomDebugD allocator.deallocateEvicted(buffer); } - public void notifyEvicted(OrcFileMetadata buffer) { - metadataCache.notifyEvicted(buffer); - } - - public void notifyEvicted(OrcStripeMetadata buffer) { + public void notifyEvicted(LlapMetadataBuffer<?> buffer) { metadataCache.notifyEvicted(buffer); + // Note: the metadata cache may deallocate additional buffers, but not this one. + allocator.deallocateEvicted(buffer); } public void notifyEvicted(OrcFileEstimateErrors buffer) { http://git-wip-us.apache.org/repos/asf/hive/blob/50fb6f3c/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java index f42622b..77c8ade 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java @@ -28,7 +28,6 @@ import java.util.concurrent.TimeUnit; import javax.management.ObjectName; -import org.apache.hadoop.hive.llap.daemon.impl.LlapDaemon; import org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,8 +60,7 @@ import org.apache.hadoop.hive.llap.io.api.LlapIo; import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer; import org.apache.hadoop.hive.llap.io.decode.GenericColumnVectorProducer; import org.apache.hadoop.hive.llap.io.decode.OrcColumnVectorProducer; -import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache; -import org.apache.hadoop.hive.llap.io.metadata.ParquetMetadataCacheImpl; +import org.apache.hadoop.hive.llap.io.metadata.MetadataCache; import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics; import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics; import org.apache.hadoop.hive.llap.metrics.MetricsUtils; @@ -72,8 +70,6 @@ import org.apache.hadoop.hive.ql.io.orc.encoded.IoTrace; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hive.common.util.FixedSizedObjectPool; @@ -132,7 +128,7 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> { LOG.info("Started llap daemon metrics with displayName: {} sessionId: {}", displayName, sessionId); - OrcMetadataCache metadataCache = null; + MetadataCache metadataCache = null; SerDeLowLevelCacheImpl serdeCache = null; // TODO: extract interface when needed BufferUsageManager bufferManagerOrc = null, bufferManagerGeneric = null; boolean isEncodeEnabled = HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_ENCODE_ENABLED); @@ -141,30 +137,12 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> { boolean useLrfu = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_USE_LRFU); long totalMemorySize = HiveConf.getSizeVar(conf, ConfVars.LLAP_IO_MEMORY_MAX_SIZE); int minAllocSize = (int)HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_MIN_ALLOC); - float metadataFraction = HiveConf.getFloatVar(conf, ConfVars.LLAP_IO_METADATA_FRACTION); - long metaMem = 0; - // TODO: this split a workaround until HIVE-15665. - // Technically we don't have to do it for on-heap data cache but we'd do for testing. - boolean isSplitCache = metadataFraction > 0f; - if (isSplitCache) { - metaMem = (long)(LlapDaemon.getTotalHeapSize() * metadataFraction); - } LowLevelCachePolicy cachePolicy = useLrfu ? new LowLevelLrfuCachePolicy( minAllocSize, totalMemorySize, conf) : new LowLevelFifoCachePolicy(); // Allocator uses memory manager to request memory, so create the manager next. LowLevelCacheMemoryManager memManager = new LowLevelCacheMemoryManager( totalMemorySize, cachePolicy, cacheMetrics); - LowLevelCachePolicy metaCachePolicy = null; - LowLevelCacheMemoryManager metaMemManager = null; - if (isSplitCache) { - metaCachePolicy = useLrfu ? new LowLevelLrfuCachePolicy( - minAllocSize, metaMem, conf) : new LowLevelFifoCachePolicy(); - metaMemManager = new LowLevelCacheMemoryManager(metaMem, metaCachePolicy, cacheMetrics); - } else { - metaCachePolicy = cachePolicy; - metaMemManager = memManager; - } - cacheMetrics.setCacheCapacityTotal(totalMemorySize + metaMem); + cacheMetrics.setCacheCapacityTotal(totalMemorySize); // Cache uses allocator to allocate and deallocate, create allocator and then caches. BuddyAllocator allocator = new BuddyAllocator(conf, memManager, cacheMetrics); this.allocator = allocator; @@ -179,18 +157,12 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> { } boolean useGapCache = HiveConf.getBoolVar(conf, ConfVars.LLAP_CACHE_ENABLE_ORC_GAP_CACHE); - metadataCache = new OrcMetadataCache(metaMemManager, metaCachePolicy, useGapCache); - // TODO# temporary, see comments there - ParquetMetadataCacheImpl parquetMc = new ParquetMetadataCacheImpl( - allocator, memManager, cachePolicy, cacheMetrics); - fileMetadataCache = parquetMc; + metadataCache = new MetadataCache( + allocator, memManager, cachePolicy, useGapCache, cacheMetrics); + fileMetadataCache = metadataCache; // And finally cache policy uses cache to notify it of eviction. The cycle is complete! EvictionDispatcher e = new EvictionDispatcher( - dataCache, serdeCache, metadataCache, allocator, parquetMc); - if (isSplitCache) { - metaCachePolicy.setEvictionListener(e); - metaCachePolicy.setParentDebugDumper(e); - } + dataCache, serdeCache, metadataCache, allocator); cachePolicy.setEvictionListener(e); cachePolicy.setParentDebugDumper(e); http://git-wip-us.apache.org/repos/asf/hive/blob/50fb6f3c/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java index 6edd84b..373af76 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java @@ -33,7 +33,7 @@ import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters; import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch; import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; import org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader; -import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache; +import org.apache.hadoop.hive.llap.io.metadata.MetadataCache; import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics; import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; @@ -52,7 +52,7 @@ import org.apache.orc.OrcConf; public class OrcColumnVectorProducer implements ColumnVectorProducer { - private final OrcMetadataCache metadataCache; + private final MetadataCache metadataCache; private final LowLevelCache lowLevelCache; private final BufferUsageManager bufferManager; private final Configuration conf; @@ -63,7 +63,7 @@ public class OrcColumnVectorProducer implements ColumnVectorProducer { // TODO: should this rather use a threadlocal for NUMA affinity? private final FixedSizedObjectPool<IoTrace> tracePool; - public OrcColumnVectorProducer(OrcMetadataCache metadataCache, + public OrcColumnVectorProducer(MetadataCache metadataCache, LowLevelCache lowLevelCache, BufferUsageManager bufferManager, Configuration conf, LlapDaemonCacheMetrics cacheMetrics, LlapDaemonIOMetrics ioMetrics, FixedSizedObjectPool<IoTrace> tracePool) { http://git-wip-us.apache.org/repos/asf/hive/blob/50fb6f3c/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index b5db302..2e47a56 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@ -26,11 +26,21 @@ import java.util.Collections; import java.util.List; import org.apache.hadoop.hive.llap.counters.LlapIOCounters; +import org.apache.orc.CompressionCodec; +import org.apache.orc.OrcProto.BloomFilterIndex; +import org.apache.orc.OrcProto.FileTail; +import org.apache.orc.OrcProto.RowIndex; +import org.apache.orc.OrcProto.Stream; +import org.apache.orc.OrcProto.StripeStatistics; import org.apache.orc.TypeDescription; +import org.apache.orc.impl.BufferChunk; import org.apache.orc.impl.DataReaderProperties; +import org.apache.orc.impl.InStream; import org.apache.orc.impl.OrcIndex; import org.apache.orc.impl.OrcTail; +import org.apache.orc.impl.ReaderImpl; import org.apache.orc.impl.SchemaEvolution; +import org.apache.orc.impl.WriterImpl; import org.apache.tez.common.counters.TezCounters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,7 +68,8 @@ import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters; import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; import org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer; import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata; -import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache; +import org.apache.hadoop.hive.llap.io.metadata.MetadataCache; +import org.apache.hadoop.hive.llap.io.metadata.MetadataCache.LlapBufferOrBuffers; import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata; import org.apache.hadoop.hive.ql.exec.DDLTask; import org.apache.hadoop.hive.ql.io.AcidUtils; @@ -87,6 +98,8 @@ import org.apache.hive.common.util.FixedSizedObjectPool; import org.apache.orc.OrcProto; import org.apache.tez.common.CallableWithNdc; +import com.google.common.collect.Lists; + /** * This produces EncodedColumnBatch via ORC EncodedDataImpl. * It serves as Consumer for EncodedColumnBatch too, for the high-level cache scenario where @@ -135,7 +148,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> } }; - private final OrcMetadataCache metadataCache; + private final MetadataCache metadataCache; private final LowLevelCache lowLevelCache; private final BufferUsageManager bufferManager; private final Configuration daemonConf, jobConf; @@ -153,8 +166,10 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> private OrcFileMetadata fileMetadata; private Path path; private Reader orcReader; - private DataReader metadataReader; + private DataReader rawDataReader; + private boolean isRawDataReaderOpen = false; private EncodedReader stripeReader; + private CompressionCodec codec; private Object fileKey; private FileSystem fs; /** @@ -166,12 +181,12 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> @SuppressWarnings("unused") private volatile boolean isPaused = false; - boolean[] globalIncludes = null; + boolean[] globalIncludes = null, sargColumns = null; private final IoTrace trace; private Pool<IoTrace> tracePool; public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager bufferManager, - OrcMetadataCache metadataCache, Configuration daemonConf, Configuration jobConf, + MetadataCache metadataCache, Configuration daemonConf, Configuration jobConf, FileSplit split, List<Integer> columnIds, SearchArgument sarg, String[] columnNames, OrcEncodedDataConsumer consumer, QueryFragmentCounters counters, TypeDescription readerSchema, Pool<IoTrace> tracePool) @@ -206,7 +221,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> fileKey = determineFileId(fs, split, HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID), HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID)); - fileMetadata = getOrReadFileMetadata(); + fileMetadata = getFileFooterFromCacheOrDisk(); if (readerSchema == null) { readerSchema = fileMetadata.getSchema(); } @@ -288,7 +303,6 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> // 3. Apply SARG if needed, and otherwise determine what RGs to read. int stride = fileMetadata.getRowIndexStride(); ArrayList<OrcStripeMetadata> stripeMetadatas = null; - boolean[] sargColumns = null; try { if (sarg != null && stride != 0) { // TODO: move this to a common method @@ -307,7 +321,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> stripeMetadatas = readStripesMetadata(globalIncludes, sargColumns); } - // Now, apply SARG if any; w/o sarg, this will just initialize readState. + // Now, apply SARG if any; w/o sarg, this will just initialize stripeRgs. boolean hasData = determineRgsToRead(globalIncludes, stride, stripeMetadatas); if (!hasData) { consumer.setDone(); @@ -327,11 +341,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> // 4. Create encoded data reader. try { - ensureOrcReader(); - // Reader creating updates HDFS counters, don't do it here. - DataWrapperForOrc dw = new DataWrapperForOrc(); - stripeReader = orcReader.encodedReader(fileKey, dw, dw, POOL_FACTORY, trace); - stripeReader.setTracing(LlapIoImpl.ORC_LOGGER.isTraceEnabled()); + ensureDataReader(); } catch (Throwable t) { handleReaderError(startTime, t); return null; @@ -349,16 +359,15 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> int stripeIx = stripeIxFrom + stripeIxMod; boolean[] rgs = null; OrcStripeMetadata stripeMetadata = null; - StripeInformation stripe; + StripeInformation si; try { - stripe = fileMetadata.getStripes().get(stripeIx); - - LlapIoImpl.ORC_LOGGER.trace("Reading stripe {}: {}, {}", stripeIx, stripe.getOffset(), - stripe.getLength()); - trace.logReadingStripe(stripeIx, stripe.getOffset(), stripe.getLength()); + si = fileMetadata.getStripes().get(stripeIx); + LlapIoImpl.ORC_LOGGER.trace("Reading stripe {}: {}, {}", stripeIx, si.getOffset(), + si.getLength()); + trace.logReadingStripe(stripeIx, si.getOffset(), si.getLength()); rgs = stripeRgs[stripeIxMod]; if (LlapIoImpl.ORC_LOGGER.isTraceEnabled()) { - LlapIoImpl.ORC_LOGGER.trace("readState[{}]: {}", stripeIxMod, Arrays.toString(rgs)); + LlapIoImpl.ORC_LOGGER.trace("stripeRgs[{}]: {}", stripeIxMod, Arrays.toString(rgs)); } // We assume that NO_RGS value is only set from SARG filter and for all columns; // intermediate changes for individual columns will unset values in the array. @@ -367,47 +376,18 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> if (rgs == RecordReaderImpl.SargApplier.READ_NO_RGS) continue; // 6.2. Ensure we have stripe metadata. We might have read it before for RG filtering. - boolean isFoundInCache = false; if (stripeMetadatas != null) { stripeMetadata = stripeMetadatas.get(stripeIxMod); } else { - if (hasFileId && metadataCache != null) { - stripeKey.stripeIx = stripeIx; - stripeMetadata = metadataCache.getStripeMetadata(stripeKey); - } - isFoundInCache = (stripeMetadata != null); - if (!isFoundInCache) { - counters.incrCounter(LlapIOCounters.METADATA_CACHE_MISS); - ensureMetadataReader(); - long startTimeHdfs = counters.startTimeCounter(); - stripeMetadata = new OrcStripeMetadata(new OrcBatchKey(fileKey, stripeIx, 0), - metadataReader, stripe, globalIncludes, sargColumns, - orcReader.getSchema(), orcReader.getWriterVersion()); - counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTimeHdfs); - if (hasFileId && metadataCache != null) { - OrcStripeMetadata newMetadata = metadataCache.putStripeMetadata(stripeMetadata); - isFoundInCache = newMetadata != stripeMetadata; // May be cached concurrently. - stripeMetadata = newMetadata; - if (LlapIoImpl.ORC_LOGGER.isTraceEnabled()) { - LlapIoImpl.ORC_LOGGER.trace("Caching stripe {} metadata with includes: {}", - stripeKey.stripeIx, DebugUtils.toString(globalIncludes)); - } - } - } + stripeKey.stripeIx = stripeIx; + OrcProto.StripeFooter footer = getStripeFooterFromCacheOrDisk(si, stripeKey); + stripeMetadata = createOrcStripeMetadataObject( + stripeIx, si, footer, globalIncludes, sargColumns); + ensureDataReader(); + stripeReader.readIndexStreams(stripeMetadata.getIndex(), + si, footer.getStreamsList(), globalIncludes, sargColumns); consumer.setStripeMetadata(stripeMetadata); } - if (!stripeMetadata.hasAllIndexes(globalIncludes)) { - if (LlapIoImpl.ORC_LOGGER.isTraceEnabled()) { - LlapIoImpl.ORC_LOGGER.trace("Updating indexes in stripe {} metadata for includes: {}", - stripeKey.stripeIx, DebugUtils.toString(globalIncludes)); - } - assert isFoundInCache; // If it's not fresh from the cache, indexes should be present. - counters.incrCounter(LlapIOCounters.METADATA_CACHE_MISS); - ensureMetadataReader(); - updateLoadedIndexes(stripeMetadata, stripe, globalIncludes, sargColumns); - } else if (isFoundInCache) { - counters.incrCounter(LlapIOCounters.METADATA_CACHE_HIT); - } } catch (Throwable t) { handleReaderError(startTime, t); return null; @@ -424,7 +404,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> // consumer. It is potentially holding locked buffers, and must perform its own cleanup. // Also, currently readEncodedColumns is not stoppable. The consumer will discard the // data it receives for one stripe. We could probably interrupt it, if it checked that. - stripeReader.readEncodedColumns(stripeIx, stripe, stripeMetadata.getRowIndexes(), + stripeReader.readEncodedColumns(stripeIx, si, stripeMetadata.getRowIndexes(), stripeMetadata.getEncodings(), stripeMetadata.getStreams(), globalIncludes, rgs, consumer); } catch (Throwable t) { @@ -452,6 +432,14 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> tracePool.offer(trace); } + private void ensureDataReader() throws IOException { + ensureOrcReader(); + // Reader creation updates HDFS counters, don't do it here. + DataWrapperForOrc dw = new DataWrapperForOrc(); + stripeReader = orcReader.encodedReader(fileKey, dw, dw, POOL_FACTORY, trace); + stripeReader.setTracing(LlapIoImpl.ORC_LOGGER.isTraceEnabled()); + } + private void recordReaderTime(long startTime) { counters.incrTimeCounter(LlapIOCounters.TOTAL_IO_TIME_NS, startTime); } @@ -544,22 +532,6 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> } /** - * In case if stripe metadata in cache does not have all indexes for current query, load - * the missing one. This is a temporary cludge until real metadata cache becomes available. - */ - private void updateLoadedIndexes(OrcStripeMetadata stripeMetadata, - StripeInformation stripe, boolean[] stripeIncludes, boolean[] sargColumns) throws IOException { - // We only synchronize on write for now - design of metadata cache is very temporary; - // we pre-allocate the array and never remove entries; so readers should be safe. - synchronized (stripeMetadata) { - if (stripeMetadata.hasAllIndexes(stripeIncludes)) return; - long startTime = counters.startTimeCounter(); - stripeMetadata.loadMissingIndexes(metadataReader, stripe, stripeIncludes, sargColumns); - counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime); - } - } - - /** * Closes the stripe readers (on error). */ private void cleanupReaders() { @@ -570,9 +542,9 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> // Ignore. } } - if (metadataReader != null) { + if (rawDataReader != null && isRawDataReaderOpen) { try { - metadataReader.close(); + rawDataReader.close(); } catch (IOException ex) { // Ignore. } @@ -604,90 +576,194 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> } /** + * Ensure codec is created for the split, to decode values from cache. Can only be called + * after initializing fileMetadata. + */ + private void ensureCodecFromFileMetadata() { + if (codec != null) return; + codec = WriterImpl.createCodec(fileMetadata.getCompressionKind()); + } + + /** * Gets file metadata for the split from cache, or reads it from the file. */ - private OrcFileMetadata getOrReadFileMetadata() throws IOException { - OrcFileMetadata metadata = null; - if (fileKey != null && metadataCache != null) { - metadata = metadataCache.getFileMetadata(fileKey); - if (metadata != null) { - counters.incrCounter(LlapIOCounters.METADATA_CACHE_HIT); - return metadata; - } else { - counters.incrCounter(LlapIOCounters.METADATA_CACHE_MISS); + private OrcFileMetadata getFileFooterFromCacheOrDisk() throws IOException { + LlapBufferOrBuffers tailBuffers = null; + List<StripeStatistics> stats = null; + List<StripeInformation> stripes = null; + boolean hasCache = fileKey != null && metadataCache != null; + if (hasCache) { + tailBuffers = metadataCache.getFileMetadata(fileKey); + if (tailBuffers != null) { + try { + MemoryBuffer tailBuffer = tailBuffers.getSingleBuffer(); + ByteBuffer bb = null; + if (tailBuffer != null) { + bb = tailBuffer.getByteBufferDup(); + // TODO: remove the copy after ORC-158 and ORC-197 + // if (bb.isDirect()) { + ByteBuffer dupBb = tailBuffer.getByteBufferDup(); // Don't mess with the cached object. + bb = ByteBuffer.allocate(dupBb.remaining()); + bb.put(dupBb); + bb.flip(); + // } + } else { + // TODO: add the ability to extractFileTail to read from multiple buffers? + MemoryBuffer[] tailBufferArray = tailBuffers.getMultipleBuffers(); + int totalSize = 0; + for (MemoryBuffer buf : tailBufferArray) { + totalSize += buf.getByteBufferRaw().remaining(); + } + bb = ByteBuffer.allocate(totalSize); + for (MemoryBuffer buf : tailBufferArray) { + bb.put(buf.getByteBufferDup()); + } + bb.flip(); + } + OrcTail orcTail = ReaderImpl.extractFileTail(bb); + FileTail tail = orcTail.getFileTail(); + stats = orcTail.getStripeStatisticsProto(); + stripes = new ArrayList<>(tail.getFooter().getStripesCount()); + for (OrcProto.StripeInformation stripeProto : tail.getFooter().getStripesList()) { + stripes.add(new ReaderImpl.StripeInformationImpl(stripeProto)); + } + return new OrcFileMetadata( + fileKey, tail.getFooter(), tail.getPostscript(), stats, stripes); + } finally { + // We don't need the buffer anymore. + metadataCache.decRefBuffer(tailBuffers); + counters.incrCounter(LlapIOCounters.METADATA_CACHE_HIT); + } } + counters.incrCounter(LlapIOCounters.METADATA_CACHE_MISS); } ensureOrcReader(); - // We assume this call doesn't touch HDFS because everything is already read; don't add time. - metadata = new OrcFileMetadata(fileKey, orcReader); - if (fileKey == null || metadataCache == null) return metadata; - return metadataCache.putFileMetadata(metadata); + ByteBuffer tailBufferBb = orcReader.getSerializedFileFooter(); + if (hasCache) { + tailBuffers = metadataCache.putFileMetadata(fileKey, tailBufferBb); + metadataCache.decRefBuffer(tailBuffers); // We don't use the cache's copy of the buffer. + } + FileTail ft = orcReader.getFileTail(); + return new OrcFileMetadata(fileKey, ft.getFooter(), ft.getPostscript(), + orcReader.getOrcProtoStripeStatistics(), orcReader.getStripes()); + } + + private OrcProto.StripeFooter buildStripeFooter( + List<DiskRange> bcs, int len, CompressionCodec codec, int bufferSize) throws IOException { + return OrcProto.StripeFooter.parseFrom(InStream.createCodedInputStream( + "footer", bcs, len, codec, bufferSize)); } /** * Reads the metadata for all stripes in the file. */ private ArrayList<OrcStripeMetadata> readStripesMetadata( - boolean[] globalInc, boolean[] sargColumns) throws IOException { + boolean[] includes, boolean[] sargColumns) throws IOException { ArrayList<OrcStripeMetadata> result = new ArrayList<OrcStripeMetadata>(stripeRgs.length); boolean hasFileId = this.fileKey != null; OrcBatchKey stripeKey = hasFileId ? new OrcBatchKey(fileKey, 0, 0) : null; for (int stripeIxMod = 0; stripeIxMod < stripeRgs.length; ++stripeIxMod) { - OrcStripeMetadata value = null; int stripeIx = stripeIxMod + stripeIxFrom; - if (hasFileId && metadataCache != null) { - stripeKey.stripeIx = stripeIx; - value = metadataCache.getStripeMetadata(stripeKey); - } - if (value == null || !value.hasAllIndexes(globalInc)) { - counters.incrCounter(LlapIOCounters.METADATA_CACHE_MISS); - ensureMetadataReader(); - StripeInformation si = fileMetadata.getStripes().get(stripeIx); - if (value == null) { - long startTime = counters.startTimeCounter(); - value = new OrcStripeMetadata(new OrcBatchKey(fileKey, stripeIx, 0), - metadataReader, si, globalInc, sargColumns, orcReader.getSchema(), - orcReader.getWriterVersion()); - counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime); - if (hasFileId && metadataCache != null) { - value = metadataCache.putStripeMetadata(value); - if (LlapIoImpl.ORC_LOGGER.isTraceEnabled()) { - LlapIoImpl.ORC_LOGGER.trace("Caching stripe {} metadata with includes: {}", - stripeKey.stripeIx, DebugUtils.toString(globalInc)); + stripeKey.stripeIx = stripeIx; + StripeInformation si = fileMetadata.getStripes().get(stripeIx); + OrcProto.StripeFooter footer = getStripeFooterFromCacheOrDisk(si, stripeKey); + OrcStripeMetadata osm = createOrcStripeMetadataObject( + stripeIx, si, footer, includes, sargColumns); + + ensureDataReader(); + OrcIndex index = osm.getIndex(); + stripeReader.readIndexStreams(index, si, footer.getStreamsList(), includes, sargColumns); + result.add(osm); + consumer.setStripeMetadata(osm); + } + return result; + } + + private OrcStripeMetadata createOrcStripeMetadataObject(int stripeIx, StripeInformation si, + OrcProto.StripeFooter footer, boolean[] includes, boolean[] sargColumns) throws IOException { + Stream.Kind[] bks = sargColumns == null ? null : new Stream.Kind[includes.length]; + BloomFilterIndex[] bis = sargColumns == null ? null : new BloomFilterIndex[includes.length]; + return new OrcStripeMetadata(new OrcBatchKey(fileKey, stripeIx, 0), footer, + new OrcIndex(new RowIndex[includes.length], bks, bis), si); + } + + private OrcProto.StripeFooter getStripeFooterFromCacheOrDisk( + StripeInformation si, OrcBatchKey stripeKey) throws IOException { + boolean hasCache = fileKey != null && metadataCache != null; + if (hasCache) { + LlapBufferOrBuffers footerBuffers = metadataCache.getStripeTail(stripeKey); + if (footerBuffers != null) { + try { + counters.incrCounter(LlapIOCounters.METADATA_CACHE_HIT); + ensureCodecFromFileMetadata(); + MemoryBuffer footerBuffer = footerBuffers.getSingleBuffer(); + if (footerBuffer != null) { + ByteBuffer bb = footerBuffer.getByteBufferDup(); + return buildStripeFooter(Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)), + bb.remaining(), codec, fileMetadata.getCompressionBufferSize()); + } else { + MemoryBuffer[] footerBufferArray = footerBuffers.getMultipleBuffers(); + int pos = 0; + List<DiskRange> bcs = new ArrayList<>(footerBufferArray.length); + for (MemoryBuffer buf : footerBufferArray) { + ByteBuffer bb = buf.getByteBufferDup(); + bcs.add(new BufferChunk(bb, pos)); + pos += bb.remaining(); } + return buildStripeFooter(bcs, pos, codec, fileMetadata.getCompressionBufferSize()); } + } finally { + metadataCache.decRefBuffer(footerBuffers); } - // We might have got an old value from cache; recheck it has indexes. - if (!value.hasAllIndexes(globalInc)) { - if (LlapIoImpl.ORC_LOGGER.isTraceEnabled()) { - LlapIoImpl.ORC_LOGGER.trace("Updating indexes in stripe {} metadata for includes: {}", - stripeKey.stripeIx, DebugUtils.toString(globalInc)); - } - updateLoadedIndexes(value, si, globalInc, sargColumns); - } - } else { - counters.incrCounter(LlapIOCounters.METADATA_CACHE_HIT); } - result.add(value); - consumer.setStripeMetadata(value); + counters.incrCounter(LlapIOCounters.METADATA_CACHE_MISS); } - return result; + long offset = si.getOffset() + si.getIndexLength() + si.getDataLength(); + long startTime = counters.startTimeCounter(); + ensureRawDataReader(true); + // TODO: add this to metadatareader in ORC - SI => metadata buffer, not just metadata. + if (LOG.isTraceEnabled()) { + LOG.trace("Reading [" + offset + ", " + + (offset + si.getFooterLength()) + ") based on " + si); + } + DiskRangeList footerRange = rawDataReader.readFileData( + new DiskRangeList(offset, offset + si.getFooterLength()), 0, false); + // LOG.error("Got " + RecordReaderUtils.stringifyDiskRanges(footerRange)); + counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime); + assert footerRange.next == null; // Can only happens w/zcr for a single input buffer. + if (hasCache) { + LlapBufferOrBuffers cacheBuf = metadataCache.putStripeTail(stripeKey, footerRange.getData()); + metadataCache.decRefBuffer(cacheBuf); // We don't use this one. + } + ByteBuffer bb = footerRange.getData(); + return buildStripeFooter(Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)), + bb.remaining(), orcReader.getCodec(), orcReader.getCompressionSize()); } - private void ensureMetadataReader() throws IOException { + private void ensureRawDataReader(boolean isOpen) throws IOException { ensureOrcReader(); - if (metadataReader != null) return; + if (rawDataReader != null) { + if (!isRawDataReaderOpen && isOpen) { + long startTime = counters.startTimeCounter(); + rawDataReader.open(); + counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime); + } + return; + } long startTime = counters.startTimeCounter(); boolean useZeroCopy = (daemonConf != null) && OrcConf.USE_ZEROCOPY.getBoolean(daemonConf); - metadataReader = RecordReaderUtils.createDefaultDataReader( - DataReaderProperties.builder() - .withBufferSize(orcReader.getCompressionSize()) + rawDataReader = RecordReaderUtils.createDefaultDataReader( + DataReaderProperties.builder().withBufferSize(orcReader.getCompressionSize()) .withCompression(orcReader.getCompressionKind()) - .withFileSystem(fs) - .withPath(path) + .withFileSystem(fs).withPath(path) .withTypeCount(orcReader.getSchema().getMaximumId() + 1) .withZeroCopy(useZeroCopy) .build()); + + if (isOpen) { + rawDataReader.open(); + isRawDataReaderOpen = true; + } counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime); } @@ -781,7 +857,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> } /** - * Determine which stripes to read for a split. Populates stripeIxFrom and readState. + * Determine which stripes to read for a split. Populates stripeIxFrom and stripeRgs. */ public void determineStripesToRead() { // The unit of caching for ORC is (rg x column) (see OrcBatchKey). @@ -837,8 +913,8 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> } public DataWrapperForOrc() throws IOException { - ensureMetadataReader(); - this.orcDataReader = metadataReader.clone(); + ensureRawDataReader(false); + this.orcDataReader = rawDataReader.clone(); } @Override @@ -853,7 +929,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> } if (gotAllData.value) return result; return (metadataCache == null) ? result - : metadataCache.getIncompleteCbs(fileKey, result, baseOffset, factory, gotAllData); + : metadataCache.getIncompleteCbs(fileKey, result, baseOffset, gotAllData); } @Override @@ -887,9 +963,6 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> @Override public void close() throws IOException { orcDataReader.close(); - if (metadataReader != null) { - metadataReader.close(); - } } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/50fb6f3c/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java new file mode 100644 index 0000000..9d7951e --- /dev/null +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java @@ -0,0 +1,537 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.io.metadata; + +import org.apache.hadoop.hive.common.FileUtils; + +import org.apache.hadoop.hive.common.io.FileMetadataCache; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.hadoop.hive.common.io.encoded.MemoryBufferOrBuffers; + +import java.nio.ByteBuffer; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.hive.common.io.DiskRange; +import org.apache.hadoop.hive.common.io.DiskRangeList; +import org.apache.hadoop.hive.common.io.DataCache.BooleanRef; +import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer; +import org.apache.hadoop.hive.llap.cache.EvictionAwareAllocator; +import org.apache.hadoop.hive.llap.cache.EvictionDispatcher; +import org.apache.hadoop.hive.llap.cache.LlapAllocatorBuffer; +import org.apache.hadoop.hive.llap.cache.LlapOomDebugDump; +import org.apache.hadoop.hive.llap.cache.LowLevelCachePolicy; +import org.apache.hadoop.hive.llap.cache.MemoryManager; +import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority; +import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; +import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics; +import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey; + +public class MetadataCache implements LlapOomDebugDump, FileMetadataCache { + private final ConcurrentHashMap<Object, LlapBufferOrBuffers> metadata = + new ConcurrentHashMap<>(); + + private final ConcurrentHashMap<Object, OrcFileEstimateErrors> estimateErrors; + private final MemoryManager memoryManager; + private final LowLevelCachePolicy policy; + private final EvictionAwareAllocator allocator; + private final LlapDaemonCacheMetrics metrics; + + public MetadataCache(EvictionAwareAllocator allocator, MemoryManager memoryManager, + LowLevelCachePolicy policy, boolean useEstimateCache, LlapDaemonCacheMetrics metrics) { + this.memoryManager = memoryManager; + this.allocator = allocator; + this.policy = policy; + this.metrics = metrics; + this.estimateErrors = useEstimateCache + ? new ConcurrentHashMap<Object, OrcFileEstimateErrors>() : null; + } + + public void putIncompleteCbs(Object fileKey, DiskRange[] ranges, long baseOffset) { + if (estimateErrors == null) return; + OrcFileEstimateErrors errorData = estimateErrors.get(fileKey); + boolean isNew = false; + // We should technically update memory usage if updating the old object, but we don't do it + // for now; there is no mechanism to properly notify the cache policy/etc. wrt parallel evicts. + if (errorData == null) { + errorData = new OrcFileEstimateErrors(fileKey); + for (DiskRange range : ranges) { + errorData.addError(range.getOffset(), range.getLength(), baseOffset); + } + long memUsage = errorData.estimateMemoryUsage(); + memoryManager.reserveMemory(memUsage); + OrcFileEstimateErrors old = estimateErrors.putIfAbsent(fileKey, errorData); + if (old != null) { + errorData = old; + memoryManager.releaseMemory(memUsage); + policy.notifyLock(errorData); + } else { + isNew = true; + policy.cache(errorData, Priority.NORMAL); + } + } else { + policy.notifyLock(errorData); + } + if (!isNew) { + for (DiskRange range : ranges) { + errorData.addError(range.getOffset(), range.getLength(), baseOffset); + } + } + policy.notifyUnlock(errorData); + } + + public DiskRangeList getIncompleteCbs( + Object fileKey, DiskRangeList ranges, long baseOffset, BooleanRef gotAllData) { + if (estimateErrors == null) return ranges; + OrcFileEstimateErrors errors = estimateErrors.get(fileKey); + if (errors == null) return ranges; + policy.notifyLock(errors); + policy.notifyUnlock(errors); // Never locked for eviction; Java object. + return errors.getIncompleteCbs(ranges, baseOffset, gotAllData); + } + + public void notifyEvicted(LlapMetadataBuffer<?> buffer) { + LlapBufferOrBuffers removed = metadata.remove(buffer.getKey()); + if (removed == null) return; + if (removed.getSingleBuffer() != null) { + assert removed.getSingleBuffer() == buffer; + return; + } + discardMultiBuffer(removed); + } + + public void notifyEvicted(OrcFileEstimateErrors buffer) { + estimateErrors.remove(buffer.getFileKey()); + } + + @Override + public String debugDumpForOom() { + StringBuilder sb = new StringBuilder(); + debugDumpShort(sb); + return sb.toString(); + } + + @Override + public void debugDumpShort(StringBuilder sb) { + // TODO: perhaps add counters for separate things and multiple buffer cases. + sb.append("\nMetadata cache state: ").append(metadata.size()).append( + " files and stripes, ").append(estimateErrors.size()).append(" files w/ORC estimate"); + } + + @Override + public LlapBufferOrBuffers getFileMetadata(Object fileKey) { + return getInternal(fileKey); + } + + public LlapBufferOrBuffers getStripeTail(OrcBatchKey stripeKey) { + return getInternal(new StripeKey(stripeKey.fileKey, stripeKey.stripeIx)); + } + + private LlapBufferOrBuffers getInternal(Object key) { + LlapBufferOrBuffers result = metadata.get(key); + if (result == null) return null; + if (!lockBuffer(result, true)) { + // No need to discard the buffer we cannot lock - eviction takes care of that. + metadata.remove(key, result); + return null; + } + return result; + } + + @Override + public LlapBufferOrBuffers putFileMetadata(Object fileKey, ByteBuffer tailBuffer) { + return putInternal(fileKey, tailBuffer); + } + + public LlapBufferOrBuffers putStripeTail(OrcBatchKey stripeKey, ByteBuffer tailBuffer) { + return putInternal(new StripeKey(stripeKey.fileKey, stripeKey.stripeIx), tailBuffer); + } + + + @Override + public LlapBufferOrBuffers putFileMetadata( + Object fileKey, int length, InputStream is) throws IOException { + LlapBufferOrBuffers result = null; + while (true) { // Overwhelmingly executes once, or maybe twice (replacing stale value). + LlapBufferOrBuffers oldVal = metadata.get(fileKey); + if (oldVal == null) { + result = wrapBbForFile(result, fileKey, length, is); + if (!lockBuffer(result, false)) { + throw new AssertionError("Cannot lock a newly created value " + result); + } + oldVal = metadata.putIfAbsent(fileKey, result); + if (oldVal == null) { + cacheInPolicy(result); // Cached successfully, add to policy. + return result; + } + } + if (lockOldVal(fileKey, result, oldVal)) { + return oldVal; + } + // We found some old value but couldn't incRef it; remove it. + metadata.remove(fileKey, oldVal); + } + } + + + @SuppressWarnings({ "rawtypes", "unchecked" }) + private LlapBufferOrBuffers wrapBbForFile(LlapBufferOrBuffers result, + Object fileKey, int length, InputStream stream) throws IOException { + if (result != null) return result; + int maxAlloc = allocator.getMaxAllocation(); + LlapMetadataBuffer<Object>[] largeBuffers = null; + if (maxAlloc < length) { + largeBuffers = new LlapMetadataBuffer[length / maxAlloc]; + for (int i = 0; i < largeBuffers.length; ++i) { + largeBuffers[i] = new LlapMetadataBuffer<Object>(fileKey); + } + allocator.allocateMultiple(largeBuffers, maxAlloc, null); + for (int i = 0; i < largeBuffers.length; ++i) { + readIntoCacheBuffer(stream, maxAlloc, largeBuffers[i]); + } + } + int smallSize = length % maxAlloc; + if (smallSize == 0) { + return new LlapMetadataBuffers(largeBuffers); + } else { + LlapMetadataBuffer<Object>[] smallBuffer = new LlapMetadataBuffer[1]; + smallBuffer[0] = new LlapMetadataBuffer(fileKey); + allocator.allocateMultiple(smallBuffer, length, null); + readIntoCacheBuffer(stream, smallSize, smallBuffer[0]); + if (largeBuffers == null) { + return smallBuffer[0]; // This is the overwhelmingly common case. + } else { + LlapMetadataBuffer<Object>[] cacheData = new LlapMetadataBuffer[largeBuffers.length + 1]; + System.arraycopy(largeBuffers, 0, cacheData, 0, largeBuffers.length); + cacheData[largeBuffers.length] = smallBuffer[0]; + return new LlapMetadataBuffers<Object>(cacheData); + } + } + } + + private static void readIntoCacheBuffer( + InputStream stream, int length, MemoryBuffer dest) throws IOException { + ByteBuffer bb = dest.getByteBufferRaw(); + int pos = bb.position(); + bb.limit(pos + length); + // TODO: SeekableInputStream.readFully eventually calls a Hadoop method that used to be + // buggy in 2.7 and also anyway just does a copy for a direct buffer. Do a copy here. + // ((SeekableInputStream)stream).readFully(bb); + FileUtils.readFully(stream, length, bb); + bb.position(pos); + } + + private <T> LlapBufferOrBuffers putInternal(T key, ByteBuffer tailBuffer) { + LlapBufferOrBuffers result = null; + while (true) { // Overwhelmingly executes once, or maybe twice (replacing stale value). + LlapBufferOrBuffers oldVal = metadata.get(key); + if (oldVal == null) { + result = wrapBb(result, key, tailBuffer); + oldVal = metadata.putIfAbsent(key, result); + if (oldVal == null) { + cacheInPolicy(result); // Cached successfully, add to policy. + return result; + } + } + if (lockOldVal(key, result, oldVal)) { + return oldVal; + } + // We found some old value but couldn't incRef it; remove it. + metadata.remove(key, oldVal); + } + } + + private void cacheInPolicy(LlapBufferOrBuffers buffers) { + LlapAllocatorBuffer singleBuffer = buffers.getSingleLlapBuffer(); + if (singleBuffer != null) { + policy.cache(singleBuffer, Priority.HIGH); + return; + } + for (LlapAllocatorBuffer buffer : buffers.getMultipleLlapBuffers()) { + policy.cache(buffer, Priority.HIGH); + } + } + + private <T extends LlapBufferOrBuffers> boolean lockOldVal(Object key, T newVal, T oldVal) { + if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) { + LlapIoImpl.CACHE_LOGGER.trace("Trying to cache when metadata is already cached for" + + " {}; old {}, new {}", key, oldVal, newVal); + } + if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) { + LlapIoImpl.LOCKING_LOGGER.trace("Locking {} due to cache collision", oldVal); + } + if (lockBuffer(oldVal, true)) { + // We found an old, valid block for this key in the cache. + if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) { + LlapIoImpl.LOCKING_LOGGER.trace("Unlocking {} due to cache collision with {}", + newVal, oldVal); + } + + if (newVal != null) { + unlockBuffer(newVal, false); + } + return true; + } + return false; + } + + @Override + public void decRefBuffer(MemoryBufferOrBuffers buffer) { + if (!(buffer instanceof LlapBufferOrBuffers)) { + throw new AssertionError(buffer.getClass()); + } + unlockBuffer((LlapBufferOrBuffers)buffer, true); + } + + private <T> LlapBufferOrBuffers wrapBb( + LlapBufferOrBuffers result, T key, ByteBuffer tailBuffer) { + if (result != null) return result; + if (tailBuffer.remaining() <= allocator.getMaxAllocation()) { + // The common case by far. + return wrapSmallBb(new LlapMetadataBuffer<T>(key), tailBuffer); + } else { + int allocCount = determineAllocCount(tailBuffer); + @SuppressWarnings("unchecked") + LlapMetadataBuffer<T>[] results = new LlapMetadataBuffer[allocCount]; + for (int i = 0; i < allocCount; ++i) { + results[i] = new LlapMetadataBuffer<T>(key); + } + wrapLargeBb(results, tailBuffer); + return new LlapMetadataBuffers<T>(results); + } + } + + private <T extends LlapAllocatorBuffer> T wrapSmallBb(T result, ByteBuffer tailBuffer) { + // Note: we pass in null factory because we allocate objects here. We could also pass a + // per-call factory that would set fileKey; or set it after put. + allocator.allocateMultiple(new MemoryBuffer[] { result }, tailBuffer.remaining(), null); + return putBufferToDest(tailBuffer.duplicate(), result); + } + + private <T extends LlapAllocatorBuffer> void wrapLargeBb(T[] results, ByteBuffer tailBuffer) { + // Note: we pass in null factory because we allocate objects here. We could also pass a + // per-call factory that would set fileKey; or set it after put. + allocator.allocateMultiple(results, allocator.getMaxAllocation(), null); + ByteBuffer src = tailBuffer.duplicate(); + int pos = src.position(), remaining = src.remaining(); + for (int i = 0; i < results.length; ++i) { + T result = results[i]; + int toPut = Math.min(remaining, result.getByteBufferRaw().remaining()); + assert toPut > 0; + src.position(pos); + src.limit(pos + toPut); + pos += toPut; + remaining -= toPut; + putBufferToDest(src, result); + } + } + + private <T extends LlapAllocatorBuffer> T putBufferToDest(ByteBuffer src, T result) { + ByteBuffer dest = result.getByteBufferRaw(); + int startPos = dest.position(); + dest.put(src); + int newPos = dest.position(); + dest.position(startPos); + dest.limit(newPos); + boolean canLock = lockOneBuffer(result, false); + assert canLock; + return result; + } + + public int determineAllocCount(ByteBuffer tailBuffer) { + int total = tailBuffer.remaining(), maxAlloc = allocator.getMaxAllocation(); + return total / maxAlloc + ((total % maxAlloc) > 0 ? 1 : 0); + } + + private boolean lockBuffer(LlapBufferOrBuffers buffers, boolean doNotifyPolicy) { + LlapAllocatorBuffer buffer = buffers.getSingleLlapBuffer(); + if (buffer != null) { + return lockOneBuffer(buffer, doNotifyPolicy); + } + LlapAllocatorBuffer[] bufferArray = buffers.getMultipleLlapBuffers(); + for (int i = 0; i < bufferArray.length; ++i) { + if (lockOneBuffer(bufferArray[i], doNotifyPolicy)) continue; + for (int j = 0; j < i; ++j) { + unlockSingleBuffer(buffer, true); + } + discardMultiBuffer(buffers); + return false; + } + return true; + } + + private void discardMultiBuffer(LlapBufferOrBuffers removed) { + long memoryFreed = 0; + for (LlapAllocatorBuffer buf : removed.getMultipleLlapBuffers()) { + long memUsage = buf.getMemoryUsage(); + // We cannot just deallocate the buffer, as it can hypothetically have users. + int result = buf.invalidate(); + switch (result) { + case LlapAllocatorBuffer.INVALIDATE_ALREADY_INVALID: continue; // Nothing to do. + case LlapAllocatorBuffer.INVALIDATE_FAILED: { + // Someone is using this buffer; eventually, it will be evicted. + continue; + } + case LlapAllocatorBuffer.INVALIDATE_OK: { + memoryFreed += memUsage; + allocator.deallocateEvicted(buf); + break; + } + default: throw new AssertionError(result); + } + } + memoryManager.releaseMemory(memoryFreed); + } + + public boolean lockOneBuffer(LlapAllocatorBuffer buffer, boolean doNotifyPolicy) { + int rc = buffer.incRef(); + if (rc > 0) { + metrics.incrCacheNumLockedBuffers(); + } + if (doNotifyPolicy && rc == 1) { + // We have just locked a buffer that wasn't previously locked. + policy.notifyLock(buffer); + } + return rc > 0; + } + + private void unlockBuffer(LlapBufferOrBuffers buffers, boolean isCached) { + LlapAllocatorBuffer singleBuffer = buffers.getSingleLlapBuffer(); + if (singleBuffer != null) { + unlockSingleBuffer(singleBuffer, isCached); + return; + } + for (LlapAllocatorBuffer buffer : buffers.getMultipleLlapBuffers()) { + unlockSingleBuffer(buffer, isCached); + } + } + + private void unlockSingleBuffer(LlapAllocatorBuffer buffer, boolean isCached) { + boolean isLastDecref = (buffer.decRef() == 0); + if (isLastDecref) { + if (isCached) { + policy.notifyUnlock(buffer); + } else { + allocator.deallocate(buffer); + } + } + metrics.decrCacheNumLockedBuffers(); + } + + private final static class StripeKey { + private final Object fileKey; + private final int stripeIx; + + public StripeKey(Object fileKey, int stripeIx) { + this.fileKey = fileKey; + this.stripeIx = stripeIx; + } + + @Override + public int hashCode() { + final int prime = 31; + return (prime + ((fileKey == null) ? 0 : fileKey.hashCode())) * prime + stripeIx; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (!(obj instanceof StripeKey)) return false; + StripeKey other = (StripeKey)obj; + return ((fileKey == null) == (other.fileKey == null)) + && (fileKey == null || fileKey.equals(other.fileKey)) && (stripeIx == other.stripeIx); + } + } + + public static interface LlapBufferOrBuffers extends MemoryBufferOrBuffers { + LlapAllocatorBuffer getSingleLlapBuffer(); + LlapAllocatorBuffer[] getMultipleLlapBuffers(); + } + + + public final static class LlapMetadataBuffer<T> + extends LlapAllocatorBuffer implements LlapBufferOrBuffers { + private final T key; + + public LlapMetadataBuffer(T key) { + this.key = key; + } + + @Override + public void notifyEvicted(EvictionDispatcher evictionDispatcher) { + evictionDispatcher.notifyEvicted(this); + } + + public T getKey() { + return key; + } + + @Override + public LlapAllocatorBuffer getSingleBuffer() { + return this; + } + + @Override + public LlapAllocatorBuffer[] getMultipleBuffers() { + return null; + } + + + @Override + public LlapAllocatorBuffer getSingleLlapBuffer() { + return this; + } + + @Override + public LlapAllocatorBuffer[] getMultipleLlapBuffers() { + return null; + } + } + + public final static class LlapMetadataBuffers<T> implements LlapBufferOrBuffers { + private final LlapMetadataBuffer<T>[] buffers; + + public LlapMetadataBuffers(LlapMetadataBuffer<T>[] buffers) { + this.buffers = buffers; + } + + @Override + public LlapAllocatorBuffer getSingleBuffer() { + return null; + } + + @Override + public LlapAllocatorBuffer[] getMultipleBuffers() { + return buffers; + } + + @Override + public LlapAllocatorBuffer getSingleLlapBuffer() { + return null; + } + + @Override + public LlapAllocatorBuffer[] getMultipleLlapBuffers() { + return buffers; + } + } +} + http://git-wip-us.apache.org/repos/asf/hive/blob/50fb6f3c/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java index dc053ee..6cf9563 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java @@ -23,7 +23,6 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.io.DiskRangeList; import org.apache.hadoop.hive.common.io.DataCache.BooleanRef; -import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory; import org.apache.hadoop.hive.common.io.DiskRangeList.MutateHelper; import org.apache.hadoop.hive.llap.IncrementalObjectSizeEstimator; import org.apache.hadoop.hive.llap.IncrementalObjectSizeEstimator.ObjectEstimator; @@ -61,8 +60,8 @@ public class OrcFileEstimateErrors extends LlapCacheableBuffer { } } - public DiskRangeList getIncompleteCbs(DiskRangeList ranges, long baseOffset, - DiskRangeListFactory factory, BooleanRef gotAllData) { + public DiskRangeList getIncompleteCbs( + DiskRangeList ranges, long baseOffset, BooleanRef gotAllData) { DiskRangeList prev = ranges.prev; if (prev == null) { prev = new MutateHelper(ranges); http://git-wip-us.apache.org/repos/asf/hive/blob/50fb6f3c/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java index b9d7a77..8af161f 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java @@ -18,31 +18,22 @@ package org.apache.hadoop.hive.llap.io.metadata; -import com.google.common.annotations.VisibleForTesting; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import org.apache.hadoop.hive.llap.IncrementalObjectSizeEstimator; -import org.apache.hadoop.hive.llap.IncrementalObjectSizeEstimator.ObjectEstimator; -import org.apache.hadoop.hive.llap.cache.EvictionDispatcher; -import org.apache.hadoop.hive.llap.cache.LlapCacheableBuffer; -import org.apache.hadoop.hive.ql.io.SyntheticFileId; + import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; -import org.apache.hadoop.hive.ql.io.orc.Reader; import org.apache.orc.CompressionKind; import org.apache.orc.FileMetadata; import org.apache.orc.OrcProto; +import org.apache.orc.OrcProto.StripeStatistics; import org.apache.orc.OrcUtils; import org.apache.orc.StripeInformation; import org.apache.orc.TypeDescription; -import org.apache.orc.impl.ReaderImpl; /** ORC file metadata. Currently contains some duplicate info due to how different parts * of ORC use different info. Ideally we would get rid of protobuf structs in code beyond reading, * or instead use protobuf structs everywhere instead of the mix of things like now. */ -public final class OrcFileMetadata extends LlapCacheableBuffer - implements FileMetadata, ConsumerFileMetadata { +public final class OrcFileMetadata implements FileMetadata, ConsumerFileMetadata { private final List<StripeInformation> stripes; private final List<Integer> versionList; private final List<OrcProto.StripeStatistics> stripeStats; @@ -58,91 +49,22 @@ public final class OrcFileMetadata extends LlapCacheableBuffer private final long numberOfRows; private final boolean isOriginalFormat; - private final int estimatedMemUsage; - - private final static HashMap<Class<?>, ObjectEstimator> SIZE_ESTIMATORS; - private final static ObjectEstimator SIZE_ESTIMATOR; - static { - OrcFileMetadata ofm = createDummy(new SyntheticFileId()); - SIZE_ESTIMATORS = IncrementalObjectSizeEstimator.createEstimators(ofm); - IncrementalObjectSizeEstimator.addEstimator( - "com.google.protobuf.LiteralByteString", SIZE_ESTIMATORS); - // Add long for the regular file ID estimation. - IncrementalObjectSizeEstimator.createEstimators(Long.class, SIZE_ESTIMATORS); - SIZE_ESTIMATOR = SIZE_ESTIMATORS.get(OrcFileMetadata.class); - } - - @VisibleForTesting - public static OrcFileMetadata createDummy(Object fileKey) { - OrcFileMetadata ofm = new OrcFileMetadata(fileKey); - ofm.stripes.add(new ReaderImpl.StripeInformationImpl( - OrcProto.StripeInformation.getDefaultInstance())); - ofm.fileStats.add(OrcProto.ColumnStatistics.getDefaultInstance()); - ofm.stripeStats.add(OrcProto.StripeStatistics.newBuilder().addColStats(createStatsDummy()).build()); - ofm.types.add(OrcProto.Type.newBuilder().addFieldNames("a").addSubtypes(0).build()); - ofm.versionList.add(0); - return ofm; - } - - static OrcProto.ColumnStatistics.Builder createStatsDummy() { - return OrcProto.ColumnStatistics.newBuilder().setBucketStatistics( - OrcProto.BucketStatistics.newBuilder().addCount(0)).setStringStatistics( - OrcProto.StringStatistics.newBuilder().setMaximum("zzz")); - } - - // Ctor for memory estimation and tests - private OrcFileMetadata(Object fileKey) { - this.fileKey = fileKey; - stripes = new ArrayList<StripeInformation>(); - versionList = new ArrayList<Integer>(); - fileStats = new ArrayList<>(); - stripeStats = new ArrayList<>(); - types = new ArrayList<>(); - writerVersionNum = metadataSize = compressionBufferSize = rowIndexStride = 0; - contentLength = numberOfRows = 0; - estimatedMemUsage = 0; - isOriginalFormat = false; - compressionKind = CompressionKind.NONE; - } - - public OrcFileMetadata(Object fileKey, Reader reader) { + public OrcFileMetadata(Object fileKey, OrcProto.Footer footer, OrcProto.PostScript ps, + List<StripeStatistics> stats, List<StripeInformation> stripes) { + this.stripeStats = stats; + this.compressionKind = CompressionKind.valueOf(ps.getCompression().name()); + this.compressionBufferSize = (int)ps.getCompressionBlockSize(); + this.stripes = stripes; + this.isOriginalFormat = OrcInputFormat.isOriginal(footer); + this.writerVersionNum = ps.getWriterVersion(); + this.versionList = ps.getVersionList(); + this.metadataSize = (int) ps.getMetadataLength(); + this.types = footer.getTypesList(); + this.rowIndexStride = footer.getRowIndexStride(); + this.contentLength = footer.getContentLength(); + this.numberOfRows = footer.getNumberOfRows(); + this.fileStats = footer.getStatisticsList(); this.fileKey = fileKey; - this.stripeStats = reader.getOrcProtoStripeStatistics(); - this.compressionKind = reader.getCompressionKind(); - this.compressionBufferSize = reader.getCompressionSize(); - this.stripes = reader.getStripes(); - this.isOriginalFormat = OrcInputFormat.isOriginal(reader); - this.writerVersionNum = reader.getWriterVersion().getId(); - this.versionList = reader.getVersionList(); - this.metadataSize = reader.getMetadataSize(); - this.types = reader.getTypes(); - this.rowIndexStride = reader.getRowIndexStride(); - this.contentLength = reader.getContentLength(); - this.numberOfRows = reader.getNumberOfRows(); - this.fileStats = reader.getOrcProtoFileStatistics(); - - this.estimatedMemUsage = SIZE_ESTIMATOR.estimate(this, SIZE_ESTIMATORS); - } - - // LlapCacheableBuffer - @Override - public void notifyEvicted(EvictionDispatcher evictionDispatcher) { - evictionDispatcher.notifyEvicted(this); - } - - @Override - protected int invalidate() { - return INVALIDATE_OK; // relies on GC, so it can always be evicted now. - } - - @Override - public long getMemoryUsage() { - return estimatedMemUsage; - } - - @Override - protected boolean isLocked() { - return false; } // FileMetadata http://git-wip-us.apache.org/repos/asf/hive/blob/50fb6f3c/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java deleted file mode 100644 index 601b622..0000000 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java +++ /dev/null @@ -1,163 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.llap.io.metadata; - -import org.apache.hadoop.hive.llap.cache.LlapCacheableBuffer; - -import java.io.IOException; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.hadoop.hive.common.io.DiskRange; -import org.apache.hadoop.hive.common.io.DiskRangeList; -import org.apache.hadoop.hive.common.io.DataCache.BooleanRef; -import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory; -import org.apache.hadoop.hive.llap.cache.LlapOomDebugDump; -import org.apache.hadoop.hive.llap.cache.LowLevelCachePolicy; -import org.apache.hadoop.hive.llap.cache.MemoryManager; -import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority; -import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey; - -public class OrcMetadataCache implements LlapOomDebugDump { - private final ConcurrentHashMap<Object, OrcFileMetadata> metadata = new ConcurrentHashMap<>(); - private final ConcurrentHashMap<OrcBatchKey, OrcStripeMetadata> stripeMetadata = - new ConcurrentHashMap<>(); - private final ConcurrentHashMap<Object, OrcFileEstimateErrors> estimateErrors; - private final MemoryManager memoryManager; - private final LowLevelCachePolicy policy; - - public OrcMetadataCache(MemoryManager memoryManager, LowLevelCachePolicy policy, - boolean useEstimateCache) { - this.memoryManager = memoryManager; - this.policy = policy; - this.estimateErrors = useEstimateCache - ? new ConcurrentHashMap<Object, OrcFileEstimateErrors>() : null; - } - - public OrcFileMetadata putFileMetadata(OrcFileMetadata metaData) { - long memUsage = metaData.getMemoryUsage(); - memoryManager.reserveMemory(memUsage); - OrcFileMetadata val = metadata.putIfAbsent(metaData.getFileKey(), metaData); - // See OrcFileMetadata; it is always unlocked, so we just "touch" it here to simulate use. - return touchOnPut(metaData, val, memUsage); - } - - public OrcStripeMetadata putStripeMetadata(OrcStripeMetadata metaData) { - long memUsage = metaData.getMemoryUsage(); - memoryManager.reserveMemory(memUsage); - OrcStripeMetadata val = stripeMetadata.putIfAbsent(metaData.getKey(), metaData); - // See OrcStripeMetadata; it is always unlocked, so we just "touch" it here to simulate use. - return touchOnPut(metaData, val, memUsage); - } - - private <T extends LlapCacheableBuffer> T touchOnPut(T newVal, T oldVal, long memUsage) { - if (oldVal == null) { - oldVal = newVal; - policy.cache(oldVal, Priority.HIGH); - } else { - memoryManager.releaseMemory(memUsage); - policy.notifyLock(oldVal); - } - policy.notifyUnlock(oldVal); - return oldVal; - } - - - public void putIncompleteCbs(Object fileKey, DiskRange[] ranges, long baseOffset) { - if (estimateErrors == null) return; - OrcFileEstimateErrors errorData = estimateErrors.get(fileKey); - boolean isNew = false; - // We should technically update memory usage if updating the old object, but we don't do it - // for now; there is no mechanism to properly notify the cache policy/etc. wrt parallel evicts. - if (errorData == null) { - errorData = new OrcFileEstimateErrors(fileKey); - for (DiskRange range : ranges) { - errorData.addError(range.getOffset(), range.getLength(), baseOffset); - } - long memUsage = errorData.estimateMemoryUsage(); - memoryManager.reserveMemory(memUsage); - OrcFileEstimateErrors old = estimateErrors.putIfAbsent(fileKey, errorData); - if (old != null) { - errorData = old; - memoryManager.releaseMemory(memUsage); - policy.notifyLock(errorData); - } else { - isNew = true; - policy.cache(errorData, Priority.NORMAL); - } - } - if (!isNew) { - for (DiskRange range : ranges) { - errorData.addError(range.getOffset(), range.getLength(), baseOffset); - } - } - policy.notifyUnlock(errorData); - } - - public OrcStripeMetadata getStripeMetadata(OrcBatchKey stripeKey) throws IOException { - return touchOnGet(stripeMetadata.get(stripeKey)); - } - - public OrcFileMetadata getFileMetadata(Object fileKey) throws IOException { - return touchOnGet(metadata.get(fileKey)); - } - - private <T extends LlapCacheableBuffer> T touchOnGet(T result) { - if (result != null) { - policy.notifyLock(result); - policy.notifyUnlock(result); // Never locked for eviction; Java object. - } - return result; - } - - public DiskRangeList getIncompleteCbs(Object fileKey, DiskRangeList ranges, long baseOffset, - DiskRangeListFactory factory, BooleanRef gotAllData) { - if (estimateErrors == null) return ranges; - OrcFileEstimateErrors errors = estimateErrors.get(fileKey); - if (errors == null) return ranges; - return errors.getIncompleteCbs(ranges, baseOffset, factory, gotAllData); - } - - public void notifyEvicted(OrcFileMetadata buffer) { - metadata.remove(buffer.getFileKey()); - // See OrcFileMetadata - we don't clear the object, it will be GCed when released by users. - } - - public void notifyEvicted(OrcStripeMetadata buffer) { - stripeMetadata.remove(buffer.getKey()); - // See OrcStripeMetadata - we don't clear the object, it will be GCed when released by users. - } - - public void notifyEvicted(OrcFileEstimateErrors buffer) { - estimateErrors.remove(buffer.getFileKey()); - } - - @Override - public String debugDumpForOom() { - StringBuilder sb = new StringBuilder(); - debugDumpShort(sb); - return sb.toString(); - } - - @Override - public void debugDumpShort(StringBuilder sb) { - sb.append("\nORC metadata cache state: ").append(metadata.size()).append(" files, ") - .append(stripeMetadata.size()).append(" stripes, ").append(estimateErrors.size()) - .append(" files w/ORC estimate"); - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/50fb6f3c/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java index 4565d11..92b9c8f 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java @@ -19,115 +19,30 @@ package org.apache.hadoop.hive.llap.io.metadata; import com.google.common.annotations.VisibleForTesting; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import org.apache.hadoop.hive.llap.IncrementalObjectSizeEstimator; -import org.apache.hadoop.hive.llap.IncrementalObjectSizeEstimator.ObjectEstimator; -import org.apache.hadoop.hive.llap.cache.EvictionDispatcher; -import org.apache.hadoop.hive.llap.cache.LlapCacheableBuffer; -import org.apache.hadoop.hive.ql.io.SyntheticFileId; -import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey; -import org.apache.orc.DataReader; import org.apache.orc.OrcProto; import org.apache.orc.OrcProto.RowIndexEntry; import org.apache.orc.StripeInformation; -import org.apache.orc.TypeDescription; import org.apache.orc.impl.OrcIndex; -public class OrcStripeMetadata extends LlapCacheableBuffer implements ConsumerStripeMetadata { - private final TypeDescription schema; +public class OrcStripeMetadata implements ConsumerStripeMetadata { private final OrcBatchKey stripeKey; private final List<OrcProto.ColumnEncoding> encodings; private final List<OrcProto.Stream> streams; private final String writerTimezone; private final long rowCount; private OrcIndex rowIndex; - private OrcFile.WriterVersion writerVersion; - - private final int estimatedMemUsage; - - private final static HashMap<Class<?>, ObjectEstimator> SIZE_ESTIMATORS; - private final static ObjectEstimator SIZE_ESTIMATOR; - static { - OrcStripeMetadata osm = createDummy(new SyntheticFileId()); - SIZE_ESTIMATORS = IncrementalObjectSizeEstimator.createEstimators(osm); - IncrementalObjectSizeEstimator.addEstimator( - "com.google.protobuf.LiteralByteString", SIZE_ESTIMATORS); - // Add long for the regular file ID estimation. - IncrementalObjectSizeEstimator.createEstimators(Long.class, SIZE_ESTIMATORS); - SIZE_ESTIMATOR = SIZE_ESTIMATORS.get(OrcStripeMetadata.class); - } - public OrcStripeMetadata(OrcBatchKey stripeKey, DataReader mr, StripeInformation stripe, - boolean[] includes, boolean[] sargColumns, TypeDescription schema, - OrcFile.WriterVersion writerVersion) throws IOException { - this.schema = schema; + public OrcStripeMetadata(OrcBatchKey stripeKey, OrcProto.StripeFooter footer, + OrcIndex orcIndex, StripeInformation stripe) throws IOException { this.stripeKey = stripeKey; - OrcProto.StripeFooter footer = mr.readStripeFooter(stripe); streams = footer.getStreamsList(); encodings = footer.getColumnsList(); writerTimezone = footer.getWriterTimezone(); rowCount = stripe.getNumberOfRows(); - rowIndex = mr.readRowIndex(stripe, schema, footer, true, includes, null, - sargColumns, writerVersion, null, null); - - estimatedMemUsage = SIZE_ESTIMATOR.estimate(this, SIZE_ESTIMATORS); - this.writerVersion = writerVersion; - } - - private OrcStripeMetadata(Object id) { - stripeKey = new OrcBatchKey(id, 0, 0); - encodings = new ArrayList<>(); - streams = new ArrayList<>(); - writerTimezone = ""; - schema = TypeDescription.fromString("struct<x:int>"); - rowCount = estimatedMemUsage = 0; - } - - @VisibleForTesting - public static OrcStripeMetadata createDummy(Object id) { - OrcStripeMetadata dummy = new OrcStripeMetadata(id); - dummy.encodings.add(OrcProto.ColumnEncoding.getDefaultInstance()); - dummy.streams.add(OrcProto.Stream.getDefaultInstance()); - OrcProto.RowIndex ri = OrcProto.RowIndex.newBuilder().addEntry( - OrcProto.RowIndexEntry.newBuilder().addPositions(1).setStatistics( - OrcFileMetadata.createStatsDummy())).build(); - OrcProto.BloomFilterIndex bfi = OrcProto.BloomFilterIndex.newBuilder().addBloomFilter( - OrcProto.BloomFilter.newBuilder().addBitset(0)).build(); - dummy.rowIndex = new OrcIndex( - new OrcProto.RowIndex[] { ri }, - new OrcProto.Stream.Kind[] { OrcProto.Stream.Kind.BLOOM_FILTER_UTF8 }, - new OrcProto.BloomFilterIndex[] { bfi }); - return dummy; - } - - public boolean hasAllIndexes(boolean[] includes) { - for (int i = 0; i < includes.length; ++i) { - if (includes[i] && rowIndex.getRowGroupIndex()[i] == null) return false; - } - return true; - } - - public void loadMissingIndexes(DataReader mr, StripeInformation stripe, boolean[] includes, - boolean[] sargColumns) throws IOException { - // Do not lose the old indexes. Create a super set includes - OrcProto.RowIndex[] existing = getRowIndexes(); - boolean superset[] = new boolean[Math.max(existing.length, includes.length)]; - for (int i = 0; i < includes.length; i++) { - superset[i] = includes[i]; - } - for (int i = 0; i < existing.length; i++) { - superset[i] = superset[i] || (existing[i] != null); - } - // TODO: should we save footer to avoid a read here? - rowIndex = mr.readRowIndex(stripe, schema, null, true, superset, - rowIndex.getRowGroupIndex(), - sargColumns, writerVersion, rowIndex.getBloomFilterKinds(), - rowIndex.getBloomFilterIndex()); - // TODO: theoretically, we should re-estimate memory usage here and update memory manager + rowIndex = orcIndex; } public int getStripeIx() { @@ -157,25 +72,6 @@ public class OrcStripeMetadata extends LlapCacheableBuffer implements ConsumerSt public String getWriterTimezone() { return writerTimezone; } - @Override - public long getMemoryUsage() { - return estimatedMemUsage; - } - - @Override - public void notifyEvicted(EvictionDispatcher evictionDispatcher) { - evictionDispatcher.notifyEvicted(this); - } - - @Override - protected int invalidate() { - return INVALIDATE_OK; - } - - @Override - protected boolean isLocked() { - return false; - } public OrcBatchKey getKey() { return stripeKey; @@ -199,4 +95,15 @@ public class OrcStripeMetadata extends LlapCacheableBuffer implements ConsumerSt public boolean supportsRowIndexes() { return true; } + + public OrcIndex getIndex() { + return rowIndex; + } + + @Override + public String toString() { + return "OrcStripeMetadata [stripeKey=" + stripeKey + ", rowCount=" + + rowCount + ", writerTimezone=" + writerTimezone + ", encodings=" + + encodings + ", streams=" + streams + ", rowIndex=" + rowIndex + "]"; + } }