Repository: hive Updated Branches: refs/heads/master 4f45536f1 -> 9f5a3e3d8
HIVE-16180 : LLAP: Native memory leak in EncodedReader (Sergey Shelukhin/Prasanth Jayachandran, reviewed by Prasanth Jayachandran/Sergey Shelukhin) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9f5a3e3d Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9f5a3e3d Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9f5a3e3d Branch: refs/heads/master Commit: 9f5a3e3d89db7d6f4754eb345ad9abb6997857e1 Parents: 4f45536 Author: Sergey Shelukhin <ser...@apache.org> Authored: Tue Mar 21 13:55:27 2017 -0700 Committer: Sergey Shelukhin <ser...@apache.org> Committed: Tue Mar 21 13:55:27 2017 -0700 ---------------------------------------------------------------------- .../hadoop/hive/llap/cache/SimpleAllocator.java | 5 +- .../ql/io/orc/encoded/EncodedReaderImpl.java | 188 ++++++++++++------- 2 files changed, 126 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/9f5a3e3d/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java index d8f59d1..51eb34e 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java @@ -32,9 +32,10 @@ public final class SimpleAllocator implements Allocator, BuddyAllocatorMXBean { private final boolean isDirect; private static Field cleanerField; static { - ByteBuffer tmp = ByteBuffer.allocateDirect(1); try { - cleanerField = tmp.getClass().getDeclaredField("cleaner"); + // TODO: To make it work for JDK9 use CleanerUtil from https://issues.apache.org/jira/browse/HADOOP-12760 + final Class<?> dbClazz = Class.forName("java.nio.DirectByteBuffer"); + cleanerField = dbClazz.getDeclaredField("cleaner"); cleanerField.setAccessible(true); } catch (Throwable t) { LlapIoImpl.LOG.warn("Cannot initialize DirectByteBuffer cleaner", t); http://git-wip-us.apache.org/repos/asf/hive/blob/9f5a3e3d/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java index 0ac3ec5..19f3451 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java @@ -18,9 +18,12 @@ package org.apache.hadoop.hive.ql.io.orc.encoded; import java.io.IOException; +import java.lang.reflect.Field; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.IdentityHashMap; import java.util.List; import org.slf4j.Logger; @@ -43,11 +46,13 @@ import org.apache.orc.impl.RecordReaderUtils; import org.apache.orc.impl.StreamName; import org.apache.orc.StripeInformation; import org.apache.orc.impl.BufferChunk; -import org.apache.hadoop.hive.llap.DebugUtils; import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch; import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.PoolFactory; import org.apache.orc.OrcProto; +import sun.misc.Cleaner; + + /** * Encoded reader implementation. * @@ -80,6 +85,17 @@ import org.apache.orc.OrcProto; */ class EncodedReaderImpl implements EncodedReader { public static final Logger LOG = LoggerFactory.getLogger(EncodedReaderImpl.class); + private static Field cleanerField; + static { + try { + // TODO: To make it work for JDK9 use CleanerUtil from https://issues.apache.org/jira/browse/HADOOP-12760 + final Class<?> dbClazz = Class.forName("java.nio.DirectByteBuffer"); + cleanerField = dbClazz.getDeclaredField("cleaner"); + cleanerField.setAccessible(true); + } catch (Throwable t) { + cleanerField = null; + } + } private static final Object POOLS_CREATION_LOCK = new Object(); private static Pools POOLS; private static class Pools { @@ -303,15 +319,35 @@ class EncodedReaderImpl implements EncodedReader { } } + // TODO: the memory release could be optimized - we could release original buffers after we + // are fully done with each original buffer from disk. For now release all at the end; + // it doesn't increase the total amount of memory we hold, just the duration a bit. + // This is much simpler - we can just remember original ranges after reading them, and + // release them at the end. In a few cases where it's easy to determine that a buffer + // can be freed in advance, we remove it from the map. + IdentityHashMap<ByteBuffer, Boolean> toRelease = null; if (!isAllInCache.value) { if (!isDataReaderOpen) { this.dataReader.open(); isDataReaderOpen = true; } dataReader.readFileData(toRead.next, stripeOffset, cacheWrapper.getAllocator().isDirectAlloc()); + toRelease = new IdentityHashMap<>(); + DiskRangeList drl = toRead.next; + while (drl != null) { + if (drl instanceof BufferChunk) { + toRelease.put(drl.getData(), true); + } + drl = drl.next; + } } // 3. For uncompressed case, we need some special processing before read. + // Basically, we are trying to create artificial, consistent ranges to cache, as there are + // no CBs in an uncompressed file. At the end of this processing, the list would contain + // either cache buffers, or buffers allocated by us and not cached (if we are only reading + // parts of the data for some ranges and don't want to cache it). Both are represented by + // CacheChunks, so the list is just CacheChunk-s from that point on. DiskRangeList iter = toRead.next; // Keep "toRead" list for future use, don't extract(). if (codec == null) { for (int colIx = 0; colIx < colCtxs.length; ++colIx) { @@ -326,6 +362,12 @@ class EncodedReaderImpl implements EncodedReader { } } } + // Release buffers as we are done with all the streams... also see toRelease comment.\ + // With uncompressed streams, we know we are done earlier. + if (toRelease != null) { + releaseBuffers(toRelease.keySet(), true); + toRelease = null; + } if (isTracingEnabled) { LOG.trace("Disk ranges after pre-read (file " + fileKey + ", base offset " + stripeOffset + "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next)); @@ -383,7 +425,7 @@ class EncodedReaderImpl implements EncodedReader { long unlockUntilCOffset = sctx.offset + sctx.length; DiskRangeList lastCached = readEncodedStream(stripeOffset, iter, sctx.offset, sctx.offset + sctx.length, sctx.stripeLevelStream, - unlockUntilCOffset, sctx.offset); + unlockUntilCOffset, sctx.offset, toRelease); if (lastCached != null) { iter = lastCached; } @@ -411,7 +453,7 @@ class EncodedReaderImpl implements EncodedReader { boolean isStartOfStream = sctx.bufferIter == null; DiskRangeList lastCached = readEncodedStream(stripeOffset, (isStartOfStream ? iter : sctx.bufferIter), cOffset, endCOffset, cb, - unlockUntilCOffset, sctx.offset); + unlockUntilCOffset, sctx.offset, toRelease); if (lastCached != null) { sctx.bufferIter = iter = lastCached; } @@ -438,6 +480,10 @@ class EncodedReaderImpl implements EncodedReader { // Release the unreleased buffers. See class comment about refcounts. releaseInitialRefcounts(toRead.next); + // Release buffers as we are done with all the streams... also see toRelease comment. + if (toRelease != null) { + releaseBuffers(toRelease.keySet(), true); + } releaseCacheChunksIntoObjectPool(toRead.next); } @@ -605,8 +651,8 @@ class EncodedReaderImpl implements EncodedReader { * the master list, so they are safe to keep as iterators for various streams. */ public DiskRangeList readEncodedStream(long baseOffset, DiskRangeList start, long cOffset, - long endCOffset, ColumnStreamData csd, long unlockUntilCOffset, long streamOffset) - throws IOException { + long endCOffset, ColumnStreamData csd, long unlockUntilCOffset, long streamOffset, + IdentityHashMap<ByteBuffer, Boolean> toRelease) throws IOException { if (csd.getCacheBuffers() == null) { csd.setCacheBuffers(new ArrayList<MemoryBuffer>()); } else { @@ -615,10 +661,10 @@ class EncodedReaderImpl implements EncodedReader { if (cOffset == endCOffset) return null; boolean isCompressed = codec != null; List<ProcCacheChunk> toDecompress = null; - List<ByteBuffer> toRelease = null; List<IncompleteCb> badEstimates = null; + List<ByteBuffer> toReleaseCopies = null; if (isCompressed) { - toRelease = !dataReader.isTrackingDiskRanges() ? null : new ArrayList<ByteBuffer>(); + toReleaseCopies = new ArrayList<>(); toDecompress = new ArrayList<>(); badEstimates = new ArrayList<>(); } @@ -636,8 +682,8 @@ class EncodedReaderImpl implements EncodedReader { // 2. Go thru the blocks; add stuff to results and prepare the decompression work (see below). try { lastUncompressed = isCompressed ? - prepareRangesForCompressedRead(cOffset, endCOffset, streamOffset, - unlockUntilCOffset, current, csd, toRelease, toDecompress, badEstimates) + prepareRangesForCompressedRead(cOffset, endCOffset, streamOffset, unlockUntilCOffset, + current, csd, toRelease, toReleaseCopies, toDecompress, badEstimates) : prepareRangesForUncompressedRead( cOffset, endCOffset, streamOffset, unlockUntilCOffset, current, csd); } catch (Exception ex) { @@ -657,7 +703,10 @@ class EncodedReaderImpl implements EncodedReader { assert result == null; // We don't expect conflicts from bad estimates. } - if (toDecompress == null || toDecompress.isEmpty()) return lastUncompressed; // Nothing to do. + if (toDecompress == null || toDecompress.isEmpty()) { + releaseBuffers(toReleaseCopies, false); + return lastUncompressed; // Nothing to do. + } // 3. Allocate the buffers, prepare cache keys. // At this point, we have read all the CBs we need to read. cacheBuffers contains some cache @@ -690,21 +739,18 @@ class EncodedReaderImpl implements EncodedReader { cacheWrapper.reuseBuffer(chunk.getBuffer()); } - // 5. Release original compressed buffers to zero-copy reader if needed. - if (toRelease != null) { - assert dataReader.isTrackingDiskRanges(); - for (ByteBuffer buffer : toRelease) { - dataReader.releaseBuffer(buffer); - } - } + // 5. Release the copies we made directly to the cleaner. + releaseBuffers(toReleaseCopies, false); // 6. Finally, put uncompressed data to cache. if (fileKey != null) { - long[] collisionMask = cacheWrapper.putFileData(fileKey, cacheKeys, targetBuffers, baseOffset); + long[] collisionMask = cacheWrapper.putFileData( + fileKey, cacheKeys, targetBuffers, baseOffset); processCacheCollisions(collisionMask, toDecompress, targetBuffers, csd.getCacheBuffers()); } - // 7. It may happen that we know we won't use some compression buffers anymore. + // 7. It may happen that we know we won't use some cache buffers anymore (the alternative + // is that we will use the same buffers for other streams in separate calls). // Release initial refcounts. for (ProcCacheChunk chunk : toDecompress) { ponderReleaseInitialRefcount(unlockUntilCOffset, streamOffset, chunk); @@ -713,9 +759,11 @@ class EncodedReaderImpl implements EncodedReader { return lastUncompressed; } + /** Subset of readEncodedStream specific to compressed streams, separate to avoid long methods. */ private CacheChunk prepareRangesForCompressedRead(long cOffset, long endCOffset, - long streamOffset, long unlockUntilCOffset, DiskRangeList current, ColumnStreamData columnStreamData, - List<ByteBuffer> toRelease, List<ProcCacheChunk> toDecompress, + long streamOffset, long unlockUntilCOffset, DiskRangeList current, + ColumnStreamData columnStreamData, IdentityHashMap<ByteBuffer, Boolean> toRelease, + List<ByteBuffer> toReleaseCopies, List<ProcCacheChunk> toDecompress, List<IncompleteCb> badEstimates) throws IOException { if (cOffset > current.getOffset()) { // Target compression block is in the middle of the range; slice the range in two. @@ -762,8 +810,8 @@ class EncodedReaderImpl implements EncodedReader { throw new RuntimeException(msg); } BufferChunk bc = (BufferChunk)current; - ProcCacheChunk newCached = addOneCompressionBuffer( - bc, columnStreamData.getCacheBuffers(), toDecompress, toRelease, badEstimates); + ProcCacheChunk newCached = addOneCompressionBuffer(bc, columnStreamData.getCacheBuffers(), + toDecompress, toRelease, toReleaseCopies, badEstimates); lastUncompressed = (newCached == null) ? lastUncompressed : newCached; next = (newCached != null) ? newCached.next : null; currentOffset = (next != null) ? next.getOffset() : -1; @@ -777,9 +825,12 @@ class EncodedReaderImpl implements EncodedReader { return lastUncompressed; } + /** Subset of readEncodedStream specific to uncompressed streams, separate to avoid long methods. */ private CacheChunk prepareRangesForUncompressedRead(long cOffset, long endCOffset, - long streamOffset, long unlockUntilCOffset, DiskRangeList current, ColumnStreamData columnStreamData) - throws IOException { + long streamOffset, long unlockUntilCOffset, DiskRangeList current, + ColumnStreamData columnStreamData) throws IOException { + // Note: we are called after preReadUncompressedStream, so it doesn't have to do nearly as much + // as prepareRangesForCompressedRead does; e.g. every buffer is already a CacheChunk. long currentOffset = cOffset; CacheChunk lastUncompressed = null; boolean isFirst = true; @@ -819,11 +870,10 @@ class EncodedReaderImpl implements EncodedReader { * We could avoid copy in non-zcr case and manage the buffer that was not allocated by our * allocator. Uncompressed case is not mainline though so let's not complicate it. */ - private DiskRangeList preReadUncompressedStream(long baseOffset, - DiskRangeList start, long streamOffset, long streamEnd) throws IOException { + private DiskRangeList preReadUncompressedStream(long baseOffset, DiskRangeList start, + long streamOffset, long streamEnd) throws IOException { if (streamOffset == streamEnd) return null; List<UncompressedCacheChunk> toCache = null; - List<ByteBuffer> toRelease = null; // 1. Find our bearings in the stream. DiskRangeList current = findIntersectingPosition(start, streamOffset, streamEnd); @@ -860,9 +910,6 @@ class EncodedReaderImpl implements EncodedReader { if (current.getOffset() >= partEnd) { continue; // We have no data at all for this part of the stream (could be unneeded), skip. } - if (toRelease == null && dataReader.isTrackingDiskRanges()) { - toRelease = new ArrayList<ByteBuffer>(); - } // We have some disk buffers... see if we have entire part, etc. UncompressedCacheChunk candidateCached = null; // We will cache if we have the entire part. DiskRangeList next = current; @@ -877,21 +924,15 @@ class EncodedReaderImpl implements EncodedReader { current = next; if (noMoreDataForPart) break; // Done with this part. - boolean wasSplit = false; if (current.getEnd() > partEnd) { // If the current buffer contains multiple parts, split it. current = current.split(partEnd); - wasSplit = true; } if (isTracingEnabled) { LOG.trace("Processing uncompressed file data at [" + current.getOffset() + ", " + current.getEnd() + ")"); } BufferChunk curBc = (BufferChunk)current; - if (!wasSplit && toRelease != null) { - toRelease.add(curBc.getChunk()); // TODO: is it valid to give zcr the modified 2nd part? - } - // Track if we still have the entire part. long hadEntirePartTo = hasEntirePartTo; // We have data until the end of current block if we had it until the beginning. @@ -952,15 +993,7 @@ class EncodedReaderImpl implements EncodedReader { ++ix; } - // 5. Release original compressed buffers to zero-copy reader if needed. - if (toRelease != null) { - assert dataReader.isTrackingDiskRanges(); - for (ByteBuffer buf : toRelease) { - dataReader.releaseBuffer(buf); - } - } - - // 6. Finally, put uncompressed data to cache. + // 5. Put uncompressed data to cache. if (fileKey != null) { long[] collisionMask = cacheWrapper.putFileData(fileKey, cacheKeys, targetBuffers, baseOffset); processCacheCollisions(collisionMask, toCache, targetBuffers, null); @@ -969,7 +1002,6 @@ class EncodedReaderImpl implements EncodedReader { return lastUncompressed; } - private int determineUncompressedPartSize() { // We will break the uncompressed data in the cache in the chunks that are the size // of the prevalent ORC compression buffer (the default), or maximum allocation (since we @@ -1178,7 +1210,8 @@ class EncodedReaderImpl implements EncodedReader { */ private ProcCacheChunk addOneCompressionBuffer(BufferChunk current, List<MemoryBuffer> cacheBuffers, List<ProcCacheChunk> toDecompress, - List<ByteBuffer> toRelease, List<IncompleteCb> badEstimates) throws IOException { + IdentityHashMap<ByteBuffer, Boolean> toRelease, List<ByteBuffer> toReleaseCopies, + List<IncompleteCb> badEstimates) throws IOException { ByteBuffer slice = null; ByteBuffer compressed = current.getChunk(); long cbStartOffset = current.getOffset(); @@ -1201,12 +1234,8 @@ class EncodedReaderImpl implements EncodedReader { // Simple case - CB fits entirely in the disk range. slice = compressed.slice(); slice.limit(chunkLength); - ProcCacheChunk cc = addOneCompressionBlockByteBuffer(slice, isUncompressed, + return addOneCompressionBlockByteBuffer(slice, isUncompressed, cbStartOffset, cbEndOffset, chunkLength, current, toDecompress, cacheBuffers); - if (compressed.remaining() <= 0 && dataReader.isTrackingDiskRanges()) { - toRelease.add(compressed); - } - return cc; } if (current.getEnd() < cbEndOffset && !current.hasContiguousNext()) { badEstimates.add(addIncompleteCompressionBuffer(cbStartOffset, current, 0)); @@ -1216,6 +1245,7 @@ class EncodedReaderImpl implements EncodedReader { // TODO: we could remove extra copy for isUncompressed case by copying directly to cache. // We need to consolidate 2 or more buffers into one to decompress. ByteBuffer copy = allocateBuffer(chunkLength, compressed.isDirect()); + toReleaseCopies.add(copy); // We will always release copies at the end. int remaining = chunkLength - compressed.remaining(); int originalPos = compressed.position(); copy.put(compressed); @@ -1224,12 +1254,8 @@ class EncodedReaderImpl implements EncodedReader { } DiskRangeList next = current.next; current.removeSelf(); - if (dataReader.isTrackingDiskRanges()) { - if (originalPos == 0) { - dataReader.releaseBuffer(compressed); // We copied the entire buffer. - } else { - toRelease.add(compressed); // There might be slices depending on this buffer. - } + if (originalPos == 0 && toRelease.remove(compressed)) { + releaseBuffer(compressed, true); } int extraChunkCount = 0; @@ -1246,15 +1272,15 @@ class EncodedReaderImpl implements EncodedReader { copy.put(slice); ProcCacheChunk cc = addOneCompressionBlockByteBuffer(copy, isUncompressed, cbStartOffset, cbEndOffset, remaining, (BufferChunk)next, toDecompress, cacheBuffers); - if (compressed.remaining() <= 0 && dataReader.isTrackingDiskRanges()) { - dataReader.releaseBuffer(compressed); // We copied the entire buffer. - } + if (compressed.remaining() <= 0 && toRelease.remove(compressed)) { + releaseBuffer(compressed, true); // We copied the entire buffer. + } // else there's more data to process; will be handled in next call. return cc; } remaining -= compressed.remaining(); - copy.put(compressed); - if (dataReader.isTrackingDiskRanges()) { - dataReader.releaseBuffer(compressed); // We copied the entire buffer. + copy.put(compressed); // TODO: move into the if below; account for release call + if (toRelease.remove(compressed)) { + releaseBuffer(compressed, true); // We copied the entire buffer. } DiskRangeList tmp = next; next = next.hasContiguousNext() ? next.next : null; @@ -1270,6 +1296,38 @@ class EncodedReaderImpl implements EncodedReader { } } + private void releaseBuffers(Collection<ByteBuffer> toRelease, boolean isFromDataReader) { + if (toRelease == null) return; + for (ByteBuffer buf : toRelease) { + releaseBuffer(buf, isFromDataReader); + } + } + + private void releaseBuffer(ByteBuffer bb, boolean isFromDataReader) { + if (isTracingEnabled) { + LOG.trace("Releasing the buffer " + System.identityHashCode(bb)); + } + if (isFromDataReader && dataReader.isTrackingDiskRanges()) { + dataReader.releaseBuffer(bb); + return; + } + Field localCf = cleanerField; + if (!bb.isDirect() || localCf == null) return; + try { + Cleaner cleaner = (Cleaner) localCf.get(bb); + if (cleaner != null) { + cleaner.clean(); + } else { + LOG.debug("Unable to clean a buffer using cleaner - no cleaner"); + } + } catch (Exception e) { + // leave it for GC to clean up + LOG.warn("Unable to clean direct buffers using Cleaner."); + cleanerField = null; + } + } + + private IncompleteCb addIncompleteCompressionBuffer( long cbStartOffset, DiskRangeList target, int extraChunkCount) { IncompleteCb icb = new IncompleteCb(cbStartOffset, target.getEnd());