HIVE-10777 : LLAP: add pre-fragment and per-table cache details (Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2024c962 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2024c962 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2024c962 Branch: refs/heads/llap Commit: 2024c962d28d9bf777e8e32364d2685f8ff2cba3 Parents: 2611ddb Author: Sergey Shelukhin <ser...@apache.org> Authored: Tue May 26 13:27:39 2015 -0700 Committer: Sergey Shelukhin <ser...@apache.org> Committed: Tue May 26 13:27:39 2015 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 1 + .../llap/counters/LowLevelCacheCounters.java | 26 ++++ .../hive/llap/io/api/cache/LowLevelCache.java | 21 ++- .../hive/llap/cache/LowLevelCacheImpl.java | 65 ++++++++- .../llap/counters/QueryFragmentCounters.java | 108 ++++++++++++--- .../hive/llap/io/api/impl/LlapInputFormat.java | 22 ++- .../llap/io/decode/OrcEncodedDataConsumer.java | 2 + .../llap/io/encoded/OrcEncodedDataReader.java | 133 ++++++++++++++++--- .../org/apache/hadoop/hive/llap/DebugUtils.java | 2 +- .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 3 +- .../hive/ql/io/orc/EncodedReaderImpl.java | 84 +++++++----- .../apache/hadoop/hive/ql/io/orc/InStream.java | 12 +- .../apache/hadoop/hive/ql/io/orc/OrcSplit.java | 2 +- .../apache/hadoop/hive/ql/io/orc/Reader.java | 4 +- .../hadoop/hive/ql/io/orc/ReaderImpl.java | 6 +- 15 files changed, 396 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/2024c962/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index f4a70b2..818aee3 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2166,6 +2166,7 @@ public class HiveConf extends Configuration { LLAP_ORC_CACHE_ALLOCATE_DIRECT("hive.llap.io.cache.direct", true, ""), LLAP_USE_LRFU("hive.llap.io.use.lrfu", false, ""), LLAP_LRFU_LAMBDA("hive.llap.io.lrfu.lambda", 0.01f, ""), + LLAP_ORC_ENABLE_TIME_COUNTERS("hive.llap.io.orc.time.counters", true, ""), LLAP_AUTO_ALLOW_UBER("hive.llap.auto.allow.uber", true, "Whether or not to allow the planner to run vertices in the AM"), LLAP_AUTO_ENFORCE_TREE("hive.llap.auto.enforce.tree", true, http://git-wip-us.apache.org/repos/asf/hive/blob/2024c962/llap-client/src/java/org/apache/hadoop/hive/llap/counters/LowLevelCacheCounters.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/counters/LowLevelCacheCounters.java b/llap-client/src/java/org/apache/hadoop/hive/llap/counters/LowLevelCacheCounters.java new file mode 100644 index 0000000..d862a83 --- /dev/null +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/counters/LowLevelCacheCounters.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.counters; + +public interface LowLevelCacheCounters { + void recordCacheHit(long bytesHit); + void recordCacheMiss(long bytesMissed); + void recordAllocBytes(long bytesWasted, long bytesAllocated); + void recordHdfsTime(long timeUs); + long startTimeCounter(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/2024c962/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java index 2c35c50..fcc7eed 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java @@ -22,7 +22,9 @@ import java.util.List; import org.apache.hadoop.hive.common.DiskRange; import org.apache.hadoop.hive.common.DiskRangeList; +import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListMutateHelper; import org.apache.hadoop.hive.llap.cache.Allocator; +import org.apache.hadoop.hive.llap.counters.LowLevelCacheCounters; public interface LowLevelCache { public enum Priority { @@ -30,6 +32,15 @@ public interface LowLevelCache { HIGH } + public class CacheListHelper extends DiskRangeListMutateHelper { + public CacheListHelper(DiskRangeList head) { + super(head); + } + + /** Workaround for Java's limitations, used to return stuff from getFileData. */ + public boolean didGetAllData; + } + /** * Gets file data for particular offsets. The range list is modified in place; it is then * returned (since the list head could have changed). Ranges are replaced with cached ranges. @@ -49,6 +60,9 @@ public interface LowLevelCache { * Some sort of InvalidCacheChunk could be placed to avoid them. TODO * @param base base offset for the ranges (stripe/stream offset in case of ORC). */ + DiskRangeList getFileData(long fileId, DiskRangeList range, long baseOffset, + CacheChunkFactory factory, LowLevelCacheCounters qfCounters); + DiskRangeList getFileData( long fileId, DiskRangeList range, long baseOffset, CacheChunkFactory factory); @@ -57,8 +71,11 @@ public interface LowLevelCache { * @return null if all data was put; bitmask indicating which chunks were not put otherwise; * the replacement chunks from cache are updated directly in the array. */ - long[] putFileData( - long fileId, DiskRange[] ranges, LlapMemoryBuffer[] chunks, long base, Priority priority); + long[] putFileData(long fileId, DiskRange[] ranges, LlapMemoryBuffer[] chunks, + long base, Priority priority, LowLevelCacheCounters qfCounters); + + long[] putFileData(long fileId, DiskRange[] ranges, LlapMemoryBuffer[] chunks, + long base, Priority priority); Allocator getAllocator(); http://git-wip-us.apache.org/repos/asf/hive/blob/2024c962/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java index 249ed56..8c65844 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hive.common.DiskRange; import org.apache.hadoop.hive.common.DiskRangeList; import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListMutateHelper; import org.apache.hadoop.hive.llap.DebugUtils; +import org.apache.hadoop.hive.llap.counters.LowLevelCacheCounters; import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer; import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache; import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; @@ -75,33 +76,66 @@ public class LowLevelCacheImpl implements LowLevelCache, LlapOomDebugDump { @Override public DiskRangeList getFileData( long fileId, DiskRangeList ranges, long baseOffset, CacheChunkFactory factory) { + return getFileData(fileId, ranges, baseOffset, factory, null); + } + + @Override + public DiskRangeList getFileData(long fileId, DiskRangeList ranges, long baseOffset, + CacheChunkFactory factory, LowLevelCacheCounters qfCounters) { if (ranges == null) return null; + DiskRangeList prev = ranges.prev; FileCache subCache = cache.get(fileId); if (subCache == null || !subCache.incRef()) { - metrics.incrCacheRequestedBytes(ranges.getTotalLength()); + long totalMissed = ranges.getTotalLength(); + metrics.incrCacheRequestedBytes(totalMissed); + if (qfCounters != null) { + qfCounters.recordCacheMiss(totalMissed); + } + if (prev != null && prev instanceof CacheListHelper) { + ((CacheListHelper)prev).didGetAllData = false; + } return ranges; } + CacheListHelper resultObj = null; try { - DiskRangeList prev = ranges.prev; if (prev == null) { prev = new DiskRangeListMutateHelper(ranges); + } else if (prev instanceof CacheListHelper) { + resultObj = (CacheListHelper)prev; + resultObj.didGetAllData = true; } DiskRangeList current = ranges; while (current != null) { metrics.incrCacheRequestedBytes(current.getLength()); // We assume ranges in "ranges" are non-overlapping; thus, we will save next in advance. DiskRangeList next = current.next; - getOverlappingRanges(baseOffset, current, subCache.cache, factory); + getOverlappingRanges(baseOffset, current, subCache.cache, factory, resultObj); current = next; } - return prev.next; } finally { subCache.decRef(); } + if (qfCounters != null) { + DiskRangeList current = prev.next; + long bytesHit = 0, bytesMissed = 0; + while (current != null) { + // This assumes no ranges passed to cache to fetch have data beforehand. + if (current.hasData()) { + bytesHit += current.getLength(); + } else { + bytesMissed += current.getLength(); + } + current = current.next; + } + qfCounters.recordCacheHit(bytesHit); + qfCounters.recordCacheMiss(bytesMissed); + } + return prev.next; } private void getOverlappingRanges(long baseOffset, DiskRangeList currentNotCached, - ConcurrentSkipListMap<Long, LlapDataBuffer> cache, CacheChunkFactory factory) { + ConcurrentSkipListMap<Long, LlapDataBuffer> cache, CacheChunkFactory factory, + CacheListHelper resultObj) { long absOffset = currentNotCached.getOffset() + baseOffset; if (!doAssumeGranularBlocks) { // This currently only happens in tests. See getFileData comment on the interface. @@ -127,6 +161,7 @@ public class LowLevelCacheImpl implements LowLevelCache, LlapOomDebugDump { if (!lockBuffer(buffer, true)) { // If we cannot lock, remove this from cache and continue. matches.remove(); + resultObj.didGetAllData = false; continue; } long cacheOffset = e.getKey(); @@ -137,19 +172,24 @@ public class LowLevelCacheImpl implements LowLevelCache, LlapOomDebugDump { cacheEnd = cacheOffset + buffer.declaredCachedLength; DiskRangeList currentCached = factory.createCacheChunk(buffer, cacheOffset - baseOffset, cacheEnd - baseOffset); - currentNotCached = addCachedBufferToIter(currentNotCached, currentCached); + currentNotCached = addCachedBufferToIter(currentNotCached, currentCached, resultObj); metrics.incrCacheHitBytes(Math.min(requestedLength, currentCached.getLength())); } + if (currentNotCached != null) { + assert !currentNotCached.hasData(); + resultObj.didGetAllData = false; + } } /** * Adds cached buffer to buffer list. * @param currentNotCached Pointer to the list node where we are inserting. * @param currentCached The cached buffer found for this node, to insert. + * @param resultObj * @return The new currentNotCached pointer, following the cached buffer insertion. */ private DiskRangeList addCachedBufferToIter( - DiskRangeList currentNotCached, DiskRangeList currentCached) { + DiskRangeList currentNotCached, DiskRangeList currentCached, CacheListHelper resultObj) { if (currentNotCached.getOffset() >= currentCached.getOffset()) { if (currentNotCached.getEnd() <= currentCached.getEnd()) { // we assume it's always "==" now // Replace the entire current DiskRange with new cached range. @@ -161,6 +201,8 @@ public class LowLevelCacheImpl implements LowLevelCache, LlapOomDebugDump { return currentNotCached; } } else { + // There's some part of current buffer that is not cached. + resultObj.didGetAllData = false; assert currentNotCached.getOffset() < currentCached.getOffset() || currentNotCached.prev == null || currentNotCached.prev.getEnd() <= currentCached.getOffset(); @@ -192,6 +234,12 @@ public class LowLevelCacheImpl implements LowLevelCache, LlapOomDebugDump { @Override public long[] putFileData(long fileId, DiskRange[] ranges, LlapMemoryBuffer[] buffers, long baseOffset, Priority priority) { + return putFileData(fileId, ranges, buffers, baseOffset, priority, null); + } + + @Override + public long[] putFileData(long fileId, DiskRange[] ranges, LlapMemoryBuffer[] buffers, + long baseOffset, Priority priority, LowLevelCacheCounters qfCounters) { long[] result = null; assert buffers.length == ranges.length; FileCache subCache = getOrAddFileSubCache(fileId); @@ -211,6 +259,9 @@ public class LowLevelCacheImpl implements LowLevelCache, LlapOomDebugDump { if (oldVal == null) { // Cached successfully, add to policy. cachePolicy.cache(buffer, priority); + if (qfCounters != null) { + qfCounters.recordAllocBytes(buffer.byteBuffer.remaining(), buffer.allocSize); + } break; } if (DebugUtils.isTraceCachingEnabled()) { http://git-wip-us.apache.org/repos/asf/hive/blob/2024c962/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java b/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java index bc7c2a8..7658b03 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java @@ -17,32 +17,53 @@ */ package org.apache.hadoop.hive.llap.counters; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLongArray; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; /** * Per query counters. */ -public class QueryFragmentCounters { +public class QueryFragmentCounters implements LowLevelCacheCounters { + private final boolean doUseTimeCounters; public static enum Counter { NUM_VECTOR_BATCHES, NUM_DECODED_BATCHES, SELECTED_ROWGROUPS, NUM_ERRORS, - ROWS_EMITTED + ROWS_EMITTED, + METADATA_CACHE_HIT, + METADATA_CACHE_MISS, + CACHE_HIT_BYTES, + CACHE_MISS_BYTES, + ALLOCATED_BYTES, + ALLOCATED_USED_BYTES, + TOTAL_IO_TIME_US, + DECODE_TIME_US, + HDFS_TIME_US, + CONSUMER_TIME_US } - private String appId; - private Map<String, Long> counterMap; - - public QueryFragmentCounters() { - this("Not Specified"); + public static enum Desc { + TABLE } - public QueryFragmentCounters(String applicationId) { - this.appId = applicationId; - this.counterMap = new ConcurrentHashMap<>(); + private final AtomicLongArray fixedCounters; + private final String[] descs; + + public QueryFragmentCounters(Configuration conf) { + fixedCounters = new AtomicLongArray(Counter.values().length); + descs = new String[Desc.values().length]; + doUseTimeCounters = HiveConf.getBoolVar(conf, ConfVars.LLAP_ORC_ENABLE_TIME_COUNTERS); + if (!doUseTimeCounters) { + setCounter(Counter.TOTAL_IO_TIME_US, -1); + setCounter(Counter.DECODE_TIME_US, -1); + setCounter(Counter.HDFS_TIME_US, -1); + setCounter(Counter.HDFS_TIME_US, -1); + } } public void incrCounter(Counter counter) { @@ -50,20 +71,67 @@ public class QueryFragmentCounters { } public void incrCounter(Counter counter, long delta) { - if (counterMap.containsKey(counter.name())) { - long val = counterMap.get(counter.name()); - counterMap.put(counter.name(), val + delta); - } else { - setCounter(counter, delta); - } + fixedCounters.addAndGet(counter.ordinal(), delta); + } + + @Override + public final long startTimeCounter() { + return (doUseTimeCounters ? System.nanoTime() : 0); + } + + public void incrTimeCounter(Counter counter, long startTime) { + if (!doUseTimeCounters) return; + fixedCounters.addAndGet(counter.ordinal(), System.nanoTime() - startTime); } public void setCounter(Counter counter, long value) { - counterMap.put(counter.name(), value); + fixedCounters.set(counter.ordinal(), value); + } + + public void setDesc(Desc key, String desc) { + descs[key.ordinal()] = desc; + } + + @Override + public void recordCacheHit(long bytesHit) { + incrCounter(Counter.CACHE_HIT_BYTES, bytesHit); + } + + @Override + public void recordCacheMiss(long bytesMissed) { + incrCounter(Counter.CACHE_MISS_BYTES, bytesMissed); + } + + @Override + public void recordAllocBytes(long bytesUsed, long bytesAllocated) { + incrCounter(Counter.ALLOCATED_USED_BYTES, bytesUsed); + incrCounter(Counter.ALLOCATED_BYTES, bytesAllocated); + } + + @Override + public void recordHdfsTime(long startTime) { + incrTimeCounter(Counter.HDFS_TIME_US, startTime); } @Override public String toString() { - return "ApplicationId: " + appId + " Counters: " + counterMap; + // We rely on NDC information in the logs to map counters to attempt. + // If that is not available, appId should either be passed in, or extracted from NDC. + StringBuilder sb = new StringBuilder("Fragment counters for ["); + for (int i = 0; i < descs.length; ++i) { + if (i != 0) { + sb.append(", "); + } + sb.append(descs[i]); + } + sb.append("]: [ "); + for (int i = 0; i < fixedCounters.length(); ++i) { + if (i != 0) { + sb.append(", "); + } + sb.append(Counter.values()[i].name()).append("=").append(fixedCounters.get(i)); + } + sb.append(" ]"); + return sb.toString(); } } http://git-wip-us.apache.org/repos/asf/hive/blob/2024c962/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java index 1cf5158..a4f69da 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.llap.Consumer; import org.apache.hadoop.hive.llap.ConsumerFeedback; import org.apache.hadoop.hive.llap.DebugUtils; import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters; +import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters.Counter; import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer; import org.apache.hadoop.hive.llap.io.decode.ReadPipeline; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -111,13 +112,14 @@ public class LlapInputFormat private boolean isDone = false, isClosed = false; private ConsumerFeedback<ColumnVectorBatch> feedback; private final QueryFragmentCounters counters; + private long firstReturnTime; public LlapRecordReader(JobConf job, FileSplit split, List<Integer> includedCols) { this.split = split; this.columnIds = includedCols; this.sarg = SearchArgumentFactory.createFromConf(job); this.columnNames = ColumnProjectionUtils.getReadColumnNames(job); - this.counters = new QueryFragmentCounters(); + this.counters = new QueryFragmentCounters(job); try { rbCtx = new VectorizedRowBatchCtx(); rbCtx.init(job, split); @@ -150,7 +152,13 @@ public class LlapInputFormat feedback.stop(); throw new IOException(e); } - if (cvb == null) return false; + if (cvb == null) { + if (isFirst) { + firstReturnTime = counters.startTimeCounter(); + } + counters.incrTimeCounter(Counter.CONSUMER_TIME_US, firstReturnTime); + return false; + } if (columnIds.size() != cvb.cols.length) { throw new RuntimeException("Unexpected number of columns, VRB has " + columnIds.size() + " included, but the reader returned " + cvb.cols.length); @@ -162,6 +170,9 @@ public class LlapInputFormat } value.selectedInUse = false; value.size = cvb.size; + if (isFirst) { + firstReturnTime = counters.startTimeCounter(); + } return true; } @@ -191,7 +202,8 @@ public class LlapInputFormat } ColumnVectorBatch nextCvb() throws InterruptedException, IOException { - if (lastCvb != null) { + boolean isFirst = (lastCvb == null); + if (!isFirst) { feedback.returnData(lastCvb); } synchronized (pendingData) { @@ -244,7 +256,7 @@ public class LlapInputFormat LlapIoImpl.LOG.info("close called; closed " + isClosed + ", done " + isDone + ", err " + pendingError + ", pending " + pendingData.size()); } - LlapIoImpl.LOG.info("QueryFragmentCounters: " + counters); + LlapIoImpl.LOG.info(counters); // This is where counters are logged! feedback.stop(); rethrowErrorIfAny(); } @@ -263,7 +275,6 @@ public class LlapInputFormat LlapIoImpl.LOG.info("setDone called; closed " + isClosed + ", done " + isDone + ", err " + pendingError + ", pending " + pendingData.size()); } - LlapIoImpl.LOG.info("DONE: QueryFragmentCounters: " + counters); synchronized (pendingData) { isDone = true; pendingData.notifyAll(); @@ -291,7 +302,6 @@ public class LlapInputFormat LlapIoImpl.LOG.info("setError called; closed " + isClosed + ", done " + isDone + ", err " + pendingError + ", pending " + pendingData.size()); assert t != null; - LlapIoImpl.LOG.info("ERROR: QueryFragmentCounters: " + counters); synchronized (pendingData) { pendingError = t; pendingData.notifyAll(); http://git-wip-us.apache.org/repos/asf/hive/blob/2024c962/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java index c6ff498..c16be38 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java @@ -70,6 +70,7 @@ public class OrcEncodedDataConsumer @Override protected void decodeBatch(OrcEncodedColumnBatch batch, Consumer<ColumnVectorBatch> downstreamConsumer) { + long startTime = counters.startTimeCounter(); int currentStripeIndex = batch.batchKey.stripeIx; boolean sameStripe = currentStripeIndex == previousStripeIndex; @@ -116,6 +117,7 @@ public class OrcEncodedDataConsumer downstreamConsumer.consumeData(cvb); counters.incrCounter(QueryFragmentCounters.Counter.ROWS_EMITTED, batchSize); } + counters.incrTimeCounter(QueryFragmentCounters.Counter.DECODE_TIME_US, startTime); counters.incrCounter(QueryFragmentCounters.Counter.NUM_VECTOR_BATCHES, maxBatchesRG); counters.incrCounter(QueryFragmentCounters.Counter.NUM_DECODED_BATCHES); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/hive/blob/2024c962/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index 13292a6..d76316a 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@ -18,6 +18,7 @@ import org.apache.hadoop.hive.llap.ConsumerFeedback; import org.apache.hadoop.hive.llap.DebugUtils; import org.apache.hadoop.hive.llap.cache.Cache; import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters; +import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters.Counter; import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch.StreamBuffer; import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer; import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache; @@ -28,6 +29,8 @@ import org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer; import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata; import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache; import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata; +import org.apache.hadoop.hive.ql.exec.DDLTask; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.orc.CompressionKind; import org.apache.hadoop.hive.ql.io.orc.EncodedReader; import org.apache.hadoop.hive.ql.io.orc.EncodedReaderImpl; @@ -126,13 +129,18 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> @Override protected Void callInternal() throws IOException { + long startTime = counters.startTimeCounter(); if (LlapIoImpl.LOGL.isInfoEnabled()) { - LlapIoImpl.LOG.info("Processing split for " + split.getPath()); + LlapIoImpl.LOG.info("Processing data for " + split.getPath()); } - if (processStop()) return null; + if (processStop()) { + recordReaderTime(startTime); + return null; + } + counters.setDesc(QueryFragmentCounters.Desc.TABLE, getDbAndTableName(split.getPath())); orcReader = null; // 1. Get file metadata from cache, or create the reader and read it. - // Disable filesystem caching for now; Tez closes it and FS cache will fix all that + // Don't cache the filesystem object for now; Tez closes it and FS cache will fix all that fs = split.getPath().getFileSystem(conf); fileId = determineFileId(fs, split); @@ -147,12 +155,14 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> // 2. Determine which stripes to read based on the split. determineStripesToRead(); } catch (Throwable t) { + recordReaderTime(startTime); consumer.setError(t); return null; } if (readState.length == 0) { consumer.setDone(); + recordReaderTime(startTime); return null; // No data to read. } @@ -184,10 +194,15 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> } catch (Throwable t) { cleanupReaders(); consumer.setError(t); + recordReaderTime(startTime); return null; } - if (processStop()) return null; + if (processStop()) { + cleanupReaders(); + recordReaderTime(startTime); + return null; + } // 4. Get data from high-level cache. // If some cols are fully in cache, this will also give us the modified list of columns to @@ -201,6 +216,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> // produceDataFromCache handles its own cleanup. consumer.setError(t); cleanupReaders(); + recordReaderTime(startTime); return null; } } @@ -208,13 +224,14 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> // 5. Create encoded data reader. // In case if we have high-level cache, we will intercept the data and add it there; // otherwise just pass the data directly to the consumer. - Consumer<OrcEncodedColumnBatch> dataConsumer = - (cache == null) ? this.consumer : this; + Consumer<OrcEncodedColumnBatch> dataConsumer = (cache == null) ? this.consumer : this; try { ensureOrcReader(); - stripeReader = orcReader.encodedReader(fileId, lowLevelCache, dataConsumer); + // Reader creating updates HDFS counters, don't do it here. + stripeReader = orcReader.encodedReader(fileId, lowLevelCache, counters, dataConsumer); } catch (Throwable t) { consumer.setError(t); + recordReaderTime(startTime); cleanupReaders(); return null; } @@ -223,7 +240,11 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> // TODO: I/O threadpool could be here - one thread per stripe; for now, linear. OrcBatchKey stripeKey = new OrcBatchKey(fileId, -1, 0); for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) { - if (processStop()) return null; + if (processStop()) { + cleanupReaders(); + recordReaderTime(startTime); + return null; + } int stripeIx = stripeIxFrom + stripeIxMod; boolean[][] colRgs = null; boolean[] stripeIncludes = null; @@ -241,7 +262,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> colRgs = readState[stripeIxMod]; // 6.1. Determine the columns to read (usually the same as requested). - if (cols == null || cols.size() == colRgs.length) { + if (cache == null || cols == null || cols.size() == colRgs.length) { cols = columnIds; stripeIncludes = globalIncludes; } else { @@ -252,15 +273,21 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> } // 6.2. Ensure we have stripe metadata. We might have read it before for RG filtering. + boolean isFoundInCache = false; if (stripeMetadatas != null) { stripeMetadata = stripeMetadatas.get(stripeIxMod); } else { stripeKey.stripeIx = stripeIx; stripeMetadata = metadataCache.getStripeMetadata(stripeKey); - if (stripeMetadata == null) { + isFoundInCache = (stripeMetadata != null); + if (!isFoundInCache) { + counters.incrCounter(Counter.METADATA_CACHE_MISS); ensureMetadataReader(); - stripeMetadata = metadataCache.putStripeMetadata(new OrcStripeMetadata( - stripeKey, metadataReader, stripe, stripeIncludes, sargColumns)); + long startTimeHdfs = counters.startTimeCounter(); + stripeMetadata = new OrcStripeMetadata( + stripeKey, metadataReader, stripe, stripeIncludes, sargColumns); + counters.incrTimeCounter(Counter.HDFS_TIME_US, startTimeHdfs); + stripeMetadata = metadataCache.putStripeMetadata(stripeMetadata); if (DebugUtils.isTraceOrcEnabled()) { LlapIoImpl.LOG.info("Caching stripe " + stripeKey.stripeIx + " metadata with includes: " + DebugUtils.toString(stripeIncludes)); @@ -274,15 +301,24 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> LlapIoImpl.LOG.info("Updating indexes in stripe " + stripeKey.stripeIx + " metadata for includes: " + DebugUtils.toString(stripeIncludes)); } + assert isFoundInCache; + counters.incrCounter(Counter.METADATA_CACHE_MISS); ensureMetadataReader(); updateLoadedIndexes(stripeMetadata, stripe, stripeIncludes, sargColumns); + } else if (isFoundInCache) { + counters.incrCounter(Counter.METADATA_CACHE_HIT); } } catch (Throwable t) { consumer.setError(t); cleanupReaders(); + recordReaderTime(startTime); + return null; + } + if (processStop()) { + cleanupReaders(); + recordReaderTime(startTime); return null; } - if (processStop()) return null; // 6.3. Finally, hand off to the stripe reader to produce the data. // This is a sync call that will feed data to the consumer. @@ -296,11 +332,13 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> } catch (Throwable t) { consumer.setError(t); cleanupReaders(); + recordReaderTime(startTime); return null; } } // Done with all the things. + recordReaderTime(startTime); dataConsumer.setDone(); if (DebugUtils.isTraceMttEnabled()) { LlapIoImpl.LOG.info("done processing " + split); @@ -311,6 +349,53 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> return null; } + private void recordReaderTime(long startTime) { + counters.incrTimeCounter(Counter.TOTAL_IO_TIME_US, startTime); + } + + private static String getDbAndTableName(Path path) { + // Ideally, we'd get this from split; however, split doesn't contain any such thing and it's + // actually pretty hard to get cause even split generator only uses paths. We only need this + // for metrics; therefore, brace for BLACK MAGIC! + String[] parts = path.toUri().getPath().toString().split(Path.SEPARATOR); + int dbIx = -1; + // Try to find the default db postfix; don't check two last components - at least there + // should be a table and file (we could also try to throw away partition/bucket/acid stuff). + for (int i = 0; i < parts.length - 2; ++i) { + if (!parts[i].endsWith(DDLTask.DATABASE_PATH_SUFFIX)) continue; + if (dbIx >= 0) { + dbIx = -1; // Let's not guess. + break; + } + dbIx = i; + } + if (dbIx >= 0) { + return parts[dbIx].substring(0, parts[dbIx].length() - 3) + "." + parts[dbIx + 1]; + } + + // Just go from the back and throw away everything we think is wrong; skip last item, the file. + boolean isInPartFields = false; + for (int i = parts.length - 2; i >= 0; --i) { + String p = parts[i]; + boolean isPartField = p.contains("="); + if ((isInPartFields && !isPartField) || (!isPartField && !p.startsWith(AcidUtils.BASE_PREFIX) + && !p.startsWith(AcidUtils.DELTA_PREFIX) && !p.startsWith(AcidUtils.BUCKET_PREFIX))) { + dbIx = i - 1; + break; + } + isInPartFields = isPartField; + } + // If we found something before we ran out of components, use it. + if (dbIx >= 0) { + String dbName = parts[dbIx]; + if (dbName.endsWith(DDLTask.DATABASE_PATH_SUFFIX)) { + dbName = dbName.substring(0, dbName.length() - 3); + } + return dbName + "." + parts[dbIx + 1]; + } + return "unknown"; + } + private void validateFileMetadata() throws IOException { if (fileMetadata.getCompressionKind() == CompressionKind.NONE) return; int bufferSize = fileMetadata.getCompressionBufferSize(); @@ -371,7 +456,9 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> // we pre-allocate the array and never remove entries; so readers should be safe. synchronized (stripeMetadata) { if (stripeMetadata.hasAllIndexes(stripeIncludes)) return; + long startTime = counters.startTimeCounter(); stripeMetadata.loadMissingIndexes(metadataReader, stripe, stripeIncludes, sargColumns); + counters.incrTimeCounter(Counter.HDFS_TIME_US, startTime); } } @@ -404,8 +491,10 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> if (DebugUtils.isTraceOrcEnabled()) { LOG.info("Creating reader for " + path + " (" + split.getPath() + ")"); } + long startTime = counters.startTimeCounter(); ReaderOptions opts = OrcFile.readerOptions(conf).filesystem(fs).fileMetadata(fileMetadata); orcReader = OrcFile.createReader(path, opts); + counters.incrTimeCounter(Counter.HDFS_TIME_US, startTime); } /** @@ -413,8 +502,13 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> */ private OrcFileMetadata getOrReadFileMetadata() throws IOException { OrcFileMetadata metadata = metadataCache.getFileMetadata(fileId); - if (metadata != null) return metadata; + if (metadata != null) { + counters.incrCounter(Counter.METADATA_CACHE_HIT); + return metadata; + } + counters.incrCounter(Counter.METADATA_CACHE_MISS); ensureOrcReader(); + // We assume this call doesn't touch HDFS because everything is already read; don't add time. metadata = new OrcFileMetadata(fileId, orcReader); return metadataCache.putFileMetadata(metadata); } @@ -430,11 +524,14 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> stripeKey.stripeIx = stripeIxMod + stripeIxFrom; OrcStripeMetadata value = metadataCache.getStripeMetadata(stripeKey); if (value == null || !value.hasAllIndexes(globalInc)) { + counters.incrCounter(Counter.METADATA_CACHE_MISS); ensureMetadataReader(); StripeInformation si = fileMetadata.getStripes().get(stripeKey.stripeIx); if (value == null) { - value = metadataCache.putStripeMetadata( - new OrcStripeMetadata(stripeKey, metadataReader, si, globalInc, sargColumns)); + long startTime = counters.startTimeCounter(); + value = new OrcStripeMetadata(stripeKey, metadataReader, si, globalInc, sargColumns); + counters.incrTimeCounter(Counter.HDFS_TIME_US, startTime); + value = metadataCache.putStripeMetadata(value); if (DebugUtils.isTraceOrcEnabled()) { LlapIoImpl.LOG.info("Caching stripe " + stripeKey.stripeIx + " metadata with includes: " + DebugUtils.toString(globalInc)); @@ -448,6 +545,8 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> } updateLoadedIndexes(value, si, globalInc, sargColumns); } + } else { + counters.incrCounter(Counter.METADATA_CACHE_HIT); } result.add(value); consumer.setStripeMetadata(value); @@ -458,7 +557,9 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> private void ensureMetadataReader() throws IOException { ensureOrcReader(); if (metadataReader != null) return; + long startTime = counters.startTimeCounter(); metadataReader = orcReader.metadata(); + counters.incrTimeCounter(Counter.HDFS_TIME_US, startTime); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/2024c962/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java b/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java index fc58cf3..8b61fe4 100644 --- a/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java @@ -36,7 +36,7 @@ public class DebugUtils { } public static boolean isTraceRangesEnabled() { - return true; // TODO: temporary, should be hardcoded false + return false; } public static boolean isTraceLockingEnabled() { http://git-wip-us.apache.org/repos/asf/hive/blob/2024c962/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 04c89ae..7f92dda 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -4526,6 +4526,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable { * @param database * Database. */ + public static final String DATABASE_PATH_SUFFIX = ".db"; private void makeLocationQualified(Database database) throws HiveException { if (database.isSetLocationUri()) { database.setLocationUri(Utilities.getQualifiedPath(conf, new Path(database.getLocationUri()))); @@ -4534,7 +4535,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable { // Location is not set we utilize METASTOREWAREHOUSE together with database name database.setLocationUri( Utilities.getQualifiedPath(conf, new Path(HiveConf.getVar(conf, HiveConf.ConfVars.METASTOREWAREHOUSE), - database.getName().toLowerCase() + ".db"))); + database.getName().toLowerCase() + DATABASE_PATH_SUFFIX))); } } http://git-wip-us.apache.org/repos/asf/hive/blob/2024c962/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java index e931d09..9c4d29e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java @@ -31,10 +31,12 @@ import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListCreateHelper; import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListMutateHelper; import org.apache.hadoop.hive.llap.Consumer; import org.apache.hadoop.hive.llap.DebugUtils; +import org.apache.hadoop.hive.llap.counters.LowLevelCacheCounters; import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch; import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch.StreamBuffer; import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer; import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache; +import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache.CacheListHelper; import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey; import org.apache.hadoop.hive.ql.io.orc.InStream.TrackedCacheChunk; import org.apache.hadoop.hive.ql.io.orc.OrcProto.ColumnEncoding; @@ -62,17 +64,18 @@ import org.apache.hive.common.util.FixedSizedObjectPool.PoolObjectHelper; * 3) Additionally, we keep an extra +1 refcount "for the fetching thread". That way, if we send * the block to processor, and the latter decrefs it, the block won't be evicted when we want * to reuse it for some other RG, forcing us to do an extra disk read or cache lookup. - * 4) As we read (we always read RGs in order, and assume they are stored in physical order in the - * file, plus that RGs are not shared between streams, AND that we read each stream from the + * 4) As we read (we always read RGs in order, assume they are stored in physical order in the + * file, plus that CBs are not shared between streams, AND that we read each stream from the * beginning), we note which blocks cannot possibly be reused anymore (next RG starts in the * next CB). We decref for the refcount from (3) in such case. * 5) Given that RG end boundary in ORC is an estimate, so we can request data from cache and then * not use it, at the end we go thru all the blocks, and release those not released by (4). * For dictionary case: - * 1) We have a separate refcount on the ColumnBuffer object we send to the processor. In the above - * case, it's always 1, so when processor is done it goes directly to decrefing cache buffers. + * 1) We have a separate refcount on the ColumnBuffer object we send to the processor (in the + * non-dictionary case, it's always 1, so when processor is done it goes directly to decrefing + * cache buffers). * 2) In the dictionary case, it's increased per RG, and processors don't touch cache buffers if - * they do not happen to decref this counter to 0. + * they do not happen to decref this refcount to 0. * 3) This is done because dictionary can have many buffers; decrefing all of them for all RGs * is more expensive; plus, decrefing in cache may be more expensive due to cache policy/etc. */ @@ -123,33 +126,37 @@ public class EncodedReaderImpl implements EncodedReader { } }); private final long fileId; - private final FSDataInputStream file; + private FSDataInputStream file; private final CompressionCodec codec; private final int bufferSize; private final List<OrcProto.Type> types; - private final ZeroCopyReaderShim zcr; + private ZeroCopyReaderShim zcr; private final long rowIndexStride; private final LowLevelCache cache; - private final ByteBufferAllocatorPool pool; + private ByteBufferAllocatorPool pool; // For now, one consumer for all calls. private final Consumer<OrcEncodedColumnBatch> consumer; - + private final LowLevelCacheCounters qfCounters; + private final FileSystem fs; + private final Path path; + private final boolean useZeroCopy; public EncodedReaderImpl(FileSystem fileSystem, Path path, long fileId, boolean useZeroCopy, List<OrcProto.Type> types, CompressionCodec codec, int bufferSize, long strideRate, - LowLevelCache cache, Consumer<OrcEncodedColumnBatch> consumer) - throws IOException { + LowLevelCache cache, LowLevelCacheCounters qfCounters, + Consumer<OrcEncodedColumnBatch> consumer) throws IOException { this.fileId = fileId; - this.file = fileSystem.open(path); + this.fs = fileSystem; + this.path = path; this.codec = codec; this.types = types; this.bufferSize = bufferSize; - this.pool = useZeroCopy ? new ByteBufferAllocatorPool() : null; - this.zcr = useZeroCopy ? RecordReaderUtils.createZeroCopyShim(file, codec, pool) : null; this.rowIndexStride = strideRate; this.cache = cache; + this.qfCounters = qfCounters; this.consumer = consumer; - if (zcr != null && !cache.getAllocator().isDirectAlloc()) { + this.useZeroCopy = useZeroCopy; + if (useZeroCopy && !cache.getAllocator().isDirectAlloc()) { throw new UnsupportedOperationException("Cannot use zero-copy reader with non-direct cache " + "buffers; either disable zero-copy or enable direct cache allocation"); } @@ -271,7 +278,6 @@ public class EncodedReaderImpl implements EncodedReader { ColumnReadContext[] colCtxs = new ColumnReadContext[colRgs.length]; boolean[] includedRgs = null; boolean isCompressed = (codec != null); - DiskRangeListMutateHelper toRead = null; DiskRangeListCreateHelper listToRead = new DiskRangeListCreateHelper(); boolean hasIndexOnlyCols = false; for (OrcProto.Stream stream : streamList) { @@ -335,27 +341,35 @@ public class EncodedReaderImpl implements EncodedReader { } // 2. Now, read all of the ranges from cache or disk. - toRead = new DiskRangeListMutateHelper(listToRead.get()); + CacheListHelper toRead = new CacheListHelper(listToRead.get()); if ((DebugUtils.isTraceOrcEnabled() || DebugUtils.isTraceRangesEnabled()) && LOG.isInfoEnabled()) { LOG.info("Resulting disk ranges to read (file " + fileId + "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next)); } - cache.getFileData(fileId, toRead.next, stripeOffset, InStream.CC_FACTORY); + cache.getFileData(fileId, toRead.next, stripeOffset, InStream.CC_FACTORY, qfCounters); if ((DebugUtils.isTraceOrcEnabled() || DebugUtils.isTraceRangesEnabled()) && LOG.isInfoEnabled()) { LOG.info("Disk ranges after cache (file " + fileId + ", base offset " + stripeOffset + "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next)); } - // Force direct buffers if we will be decompressing to direct cache. - RecordReaderUtils.readDiskRanges( - file, zcr, stripeOffset, toRead.next, cache.getAllocator().isDirectAlloc()); - - if ((DebugUtils.isTraceOrcEnabled() || DebugUtils.isTraceRangesEnabled()) - && LOG.isInfoEnabled()) { - LOG.info("Disk ranges after disk read (file " + fileId + ", base offset " + stripeOffset - + "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next)); + if (!toRead.didGetAllData) { + long startTime = qfCounters.startTimeCounter(); + if (this.file == null) { + this.file = fs.open(path); + this.pool = useZeroCopy ? new ByteBufferAllocatorPool() : null; + this.zcr = useZeroCopy ? RecordReaderUtils.createZeroCopyShim(file, codec, pool) : null; + } + // Force direct buffers if we will be decompressing to direct cache. + RecordReaderUtils.readDiskRanges( + file, zcr, stripeOffset, toRead.next, cache.getAllocator().isDirectAlloc()); + qfCounters.recordHdfsTime(startTime); + if ((DebugUtils.isTraceOrcEnabled() || DebugUtils.isTraceRangesEnabled()) + && LOG.isInfoEnabled()) { + LOG.info("Disk ranges after disk read (file " + fileId + ", base offset " + stripeOffset + + "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next)); + } } // 3. For uncompressed case, we need some special processing before read. @@ -366,7 +380,7 @@ public class EncodedReaderImpl implements EncodedReader { for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) { StreamContext sctx = ctx.streams[streamIx]; DiskRangeList newIter = InStream.preReadUncompressedStream(fileId, stripeOffset, - iter, sctx.offset, sctx.offset + sctx.length, zcr, cache); + iter, sctx.offset, sctx.offset + sctx.length, zcr, cache, qfCounters); if (newIter != null) { iter = newIter; } @@ -421,7 +435,7 @@ public class EncodedReaderImpl implements EncodedReader { long unlockUntilCOffset = sctx.offset + sctx.length; DiskRangeList lastCached = InStream.readEncodedStream(fileId, stripeOffset, iter, sctx.offset, sctx.offset + sctx.length, zcr, codec, bufferSize, cache, - sctx.stripeLevelStream, unlockUntilCOffset, sctx.offset); + sctx.stripeLevelStream, unlockUntilCOffset, sctx.offset, qfCounters); if (lastCached != null) { iter = lastCached; } @@ -444,7 +458,7 @@ public class EncodedReaderImpl implements EncodedReader { boolean isStartOfStream = sctx.bufferIter == null; DiskRangeList lastCached = InStream.readEncodedStream(fileId, stripeOffset, (isStartOfStream ? iter : sctx.bufferIter), cOffset, endCOffset, zcr, codec, - bufferSize, cache, cb, unlockUntilCOffset, sctx.offset); + bufferSize, cache, cb, unlockUntilCOffset, sctx.offset, qfCounters); if (lastCached != null) { sctx.bufferIter = iter = lastCached; // Reset iter just to ensure it's valid } @@ -519,11 +533,13 @@ public class EncodedReaderImpl implements EncodedReader { @Override public void close() throws IOException { - try { - file.close(); - } catch (IOException ex) { - // Tez might have closed our filesystem. Log and ignore error. - LOG.info("Failed to close file; ignoring: " + ex.getMessage()); + if (file != null) { + try { + file.close(); + } catch (IOException ex) { + // Tez might have closed our filesystem. Log and ignore error. + LOG.info("Failed to close file; ignoring: " + ex.getMessage()); + } } if (pool != null) { pool.clear(); http://git-wip-us.apache.org/repos/asf/hive/blob/2024c962/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java index b7633ea..d0295d9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java @@ -29,6 +29,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.common.DiskRange; import org.apache.hadoop.hive.common.DiskRangeList; import org.apache.hadoop.hive.llap.DebugUtils; +import org.apache.hadoop.hive.llap.counters.LowLevelCacheCounters; import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch.StreamBuffer; import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer; import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache; @@ -701,6 +702,7 @@ public abstract class InStream extends InputStream { * @param unlockUntilCOffset The offset until which the buffers can be unlocked in cache, as * they will not be used in future calls (see the class comment in * EncodedReaderImpl about refcounts). + * @param qfCounters * @return Last buffer cached during decomrpession. Cache buffers are never removed from * the master list, so they are safe to keep as iterators for various streams. */ @@ -709,7 +711,7 @@ public abstract class InStream extends InputStream { public static DiskRangeList readEncodedStream(long fileId, long baseOffset, DiskRangeList start, long cOffset, long endCOffset, ZeroCopyReaderShim zcr, CompressionCodec codec, int bufferSize, LowLevelCache cache, StreamBuffer streamBuffer, long unlockUntilCOffset, - long streamOffset) throws IOException { + long streamOffset, LowLevelCacheCounters qfCounters) throws IOException { if (streamBuffer.cacheBuffers == null) { streamBuffer.cacheBuffers = new ArrayList<LlapMemoryBuffer>(); } else { @@ -784,7 +786,7 @@ public abstract class InStream extends InputStream { // 6. Finally, put uncompressed data to cache. long[] collisionMask = cache.putFileData( - fileId, cacheKeys, targetBuffers, baseOffset, Priority.NORMAL); + fileId, cacheKeys, targetBuffers, baseOffset, Priority.NORMAL, qfCounters); processCacheCollisions( cache, collisionMask, toDecompress, targetBuffers, streamBuffer.cacheBuffers); @@ -892,10 +894,12 @@ public abstract class InStream extends InputStream { * to handle just for this case. * We could avoid copy in non-zcr case and manage the buffer that was not allocated by our * allocator. Uncompressed case is not mainline though so let's not complicate it. + * @param qfCounters */ public static DiskRangeList preReadUncompressedStream(long fileId, long baseOffset, DiskRangeList start, long streamOffset, long streamEnd, - ZeroCopyReaderShim zcr, LowLevelCache cache) throws IOException { + ZeroCopyReaderShim zcr, LowLevelCache cache, LowLevelCacheCounters qfCounters) + throws IOException { if (streamOffset == streamEnd) return null; List<UncompressedCacheChunk> toCache = null; List<ByteBuffer> toRelease = null; @@ -1038,7 +1042,7 @@ public abstract class InStream extends InputStream { // 6. Finally, put uncompressed data to cache. long[] collisionMask = cache.putFileData( - fileId, cacheKeys, targetBuffers, baseOffset, Priority.NORMAL); + fileId, cacheKeys, targetBuffers, baseOffset, Priority.NORMAL, qfCounters); processCacheCollisions(cache, collisionMask, toCache, targetBuffers, null); return lastUncompressed; http://git-wip-us.apache.org/repos/asf/hive/blob/2024c962/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java index fa78703..40675c6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java @@ -48,7 +48,7 @@ public class OrcSplit extends FileSplit implements ColumnarSplit { private boolean hasBase; private final List<Long> deltas = new ArrayList<Long>(); private OrcFile.WriterVersion writerVersion; - private transient Long fileId; + private Long fileId; private long projColsUncompressedSize; static final int HAS_FILEID_FLAG = 8; http://git-wip-us.apache.org/repos/asf/hive/blob/2024c962/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java index 5390c8c..1eb0dec 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import java.util.List; import org.apache.hadoop.hive.llap.Consumer; +import org.apache.hadoop.hive.llap.counters.LowLevelCacheCounters; import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache; import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey; import org.apache.hadoop.hive.ql.io.orc.EncodedReaderImpl.OrcEncodedColumnBatch; @@ -318,7 +319,8 @@ public interface Reader { MetadataReader metadata() throws IOException; EncodedReader encodedReader(long fileId, LowLevelCache lowLevelCache, - Consumer<OrcEncodedColumnBatch> consumer) throws IOException; + LowLevelCacheCounters qfCounters, Consumer<OrcEncodedColumnBatch> consumer) + throws IOException; List<Integer> getVersionList(); http://git-wip-us.apache.org/repos/asf/hive/blob/2024c962/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java index eb349da..8041ba8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java @@ -38,6 +38,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.DiskRange; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.llap.Consumer; +import org.apache.hadoop.hive.llap.counters.LowLevelCacheCounters; import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache; import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey; import org.apache.hadoop.hive.ql.io.orc.EncodedReaderImpl.OrcEncodedColumnBatch; @@ -716,10 +717,11 @@ public class ReaderImpl implements Reader { @Override public EncodedReader encodedReader(long fileId, LowLevelCache lowLevelCache, - Consumer<OrcEncodedColumnBatch> consumer) throws IOException { + LowLevelCacheCounters qfCounters, Consumer<OrcEncodedColumnBatch> consumer) + throws IOException { boolean useZeroCopy = (conf != null) && (HiveConf.getBoolVar(conf, HIVE_ORC_ZEROCOPY)); return new EncodedReaderImpl(fileSystem, path, fileId, useZeroCopy, types, - codec, bufferSize, rowIndexStride, lowLevelCache, consumer); + codec, bufferSize, rowIndexStride, lowLevelCache, qfCounters, consumer); } @Override