Repository: hive
Updated Branches:
  refs/heads/master 4f45536f1 -> 9f5a3e3d8


HIVE-16180 : LLAP: Native memory leak in EncodedReader (Sergey 
Shelukhin/Prasanth Jayachandran, reviewed by Prasanth Jayachandran/Sergey 
Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9f5a3e3d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9f5a3e3d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9f5a3e3d

Branch: refs/heads/master
Commit: 9f5a3e3d89db7d6f4754eb345ad9abb6997857e1
Parents: 4f45536
Author: Sergey Shelukhin <ser...@apache.org>
Authored: Tue Mar 21 13:55:27 2017 -0700
Committer: Sergey Shelukhin <ser...@apache.org>
Committed: Tue Mar 21 13:55:27 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hive/llap/cache/SimpleAllocator.java |   5 +-
 .../ql/io/orc/encoded/EncodedReaderImpl.java    | 188 ++++++++++++-------
 2 files changed, 126 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/9f5a3e3d/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java 
b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java
index d8f59d1..51eb34e 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java
@@ -32,9 +32,10 @@ public final class SimpleAllocator implements Allocator, 
BuddyAllocatorMXBean {
   private final boolean isDirect;
   private static Field cleanerField;
   static {
-    ByteBuffer tmp = ByteBuffer.allocateDirect(1);
     try {
-      cleanerField = tmp.getClass().getDeclaredField("cleaner");
+      // TODO: To make it work for JDK9 use CleanerUtil from 
https://issues.apache.org/jira/browse/HADOOP-12760
+      final Class<?> dbClazz = Class.forName("java.nio.DirectByteBuffer");
+      cleanerField = dbClazz.getDeclaredField("cleaner");
       cleanerField.setAccessible(true);
     } catch (Throwable t) {
       LlapIoImpl.LOG.warn("Cannot initialize DirectByteBuffer cleaner", t);

http://git-wip-us.apache.org/repos/asf/hive/blob/9f5a3e3d/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
index 0ac3ec5..19f3451 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
@@ -18,9 +18,12 @@
 package org.apache.hadoop.hive.ql.io.orc.encoded;
 
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.IdentityHashMap;
 import java.util.List;
 
 import org.slf4j.Logger;
@@ -43,11 +46,13 @@ import org.apache.orc.impl.RecordReaderUtils;
 import org.apache.orc.impl.StreamName;
 import org.apache.orc.StripeInformation;
 import org.apache.orc.impl.BufferChunk;
-import org.apache.hadoop.hive.llap.DebugUtils;
 import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch;
 import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.PoolFactory;
 import org.apache.orc.OrcProto;
 
+import sun.misc.Cleaner;
+
+
 /**
  * Encoded reader implementation.
  *
@@ -80,6 +85,17 @@ import org.apache.orc.OrcProto;
  */
 class EncodedReaderImpl implements EncodedReader {
   public static final Logger LOG = 
LoggerFactory.getLogger(EncodedReaderImpl.class);
+  private static Field cleanerField;
+  static {
+    try {
+      // TODO: To make it work for JDK9 use CleanerUtil from 
https://issues.apache.org/jira/browse/HADOOP-12760
+      final Class<?> dbClazz = Class.forName("java.nio.DirectByteBuffer");
+      cleanerField = dbClazz.getDeclaredField("cleaner");
+      cleanerField.setAccessible(true);
+    } catch (Throwable t) {
+      cleanerField = null;
+    }
+  }
   private static final Object POOLS_CREATION_LOCK = new Object();
   private static Pools POOLS;
   private static class Pools {
@@ -303,15 +319,35 @@ class EncodedReaderImpl implements EncodedReader {
       }
     }
 
+    // TODO: the memory release could be optimized - we could release original 
buffers after we
+    //       are fully done with each original buffer from disk. For now 
release all at the end;
+    //       it doesn't increase the total amount of memory we hold, just the 
duration a bit.
+    //       This is much simpler - we can just remember original ranges after 
reading them, and
+    //       release them at the end. In a few cases where it's easy to 
determine that a buffer
+    //       can be freed in advance, we remove it from the map.
+    IdentityHashMap<ByteBuffer, Boolean> toRelease = null;
     if (!isAllInCache.value) {
       if (!isDataReaderOpen) {
         this.dataReader.open();
         isDataReaderOpen = true;
       }
       dataReader.readFileData(toRead.next, stripeOffset, 
cacheWrapper.getAllocator().isDirectAlloc());
+      toRelease = new IdentityHashMap<>();
+      DiskRangeList drl = toRead.next;
+      while (drl != null) {
+        if (drl instanceof BufferChunk) {
+          toRelease.put(drl.getData(), true);
+        }
+        drl = drl.next;
+      }
     }
 
     // 3. For uncompressed case, we need some special processing before read.
+    //    Basically, we are trying to create artificial, consistent ranges to 
cache, as there are
+    //    no CBs in an uncompressed file. At the end of this processing, the 
list would contain
+    //    either cache buffers, or buffers allocated by us and not cached (if 
we are only reading
+    //    parts of the data for some ranges and don't want to cache it). Both 
are represented by
+    //    CacheChunks, so the list is just CacheChunk-s from that point on.
     DiskRangeList iter = toRead.next;  // Keep "toRead" list for future use, 
don't extract().
     if (codec == null) {
       for (int colIx = 0; colIx < colCtxs.length; ++colIx) {
@@ -326,6 +362,12 @@ class EncodedReaderImpl implements EncodedReader {
           }
         }
       }
+      // Release buffers as we are done with all the streams... also see 
toRelease comment.\
+      // With uncompressed streams, we know we are done earlier.
+      if (toRelease != null) {
+        releaseBuffers(toRelease.keySet(), true);
+        toRelease = null;
+      }
       if (isTracingEnabled) {
         LOG.trace("Disk ranges after pre-read (file " + fileKey + ", base 
offset "
             + stripeOffset + "): " + 
RecordReaderUtils.stringifyDiskRanges(toRead.next));
@@ -383,7 +425,7 @@ class EncodedReaderImpl implements EncodedReader {
                 long unlockUntilCOffset = sctx.offset + sctx.length;
                 DiskRangeList lastCached = readEncodedStream(stripeOffset, 
iter,
                     sctx.offset, sctx.offset + sctx.length, 
sctx.stripeLevelStream,
-                    unlockUntilCOffset, sctx.offset);
+                    unlockUntilCOffset, sctx.offset, toRelease);
                 if (lastCached != null) {
                   iter = lastCached;
                 }
@@ -411,7 +453,7 @@ class EncodedReaderImpl implements EncodedReader {
               boolean isStartOfStream = sctx.bufferIter == null;
               DiskRangeList lastCached = readEncodedStream(stripeOffset,
                   (isStartOfStream ? iter : sctx.bufferIter), cOffset, 
endCOffset, cb,
-                  unlockUntilCOffset, sctx.offset);
+                  unlockUntilCOffset, sctx.offset, toRelease);
               if (lastCached != null) {
                 sctx.bufferIter = iter = lastCached;
               }
@@ -438,6 +480,10 @@ class EncodedReaderImpl implements EncodedReader {
 
     // Release the unreleased buffers. See class comment about refcounts.
     releaseInitialRefcounts(toRead.next);
+    // Release buffers as we are done with all the streams... also see 
toRelease comment.
+    if (toRelease != null) {
+      releaseBuffers(toRelease.keySet(), true);
+    }
     releaseCacheChunksIntoObjectPool(toRead.next);
   }
 
@@ -605,8 +651,8 @@ class EncodedReaderImpl implements EncodedReader {
    *         the master list, so they are safe to keep as iterators for 
various streams.
    */
   public DiskRangeList readEncodedStream(long baseOffset, DiskRangeList start, 
long cOffset,
-      long endCOffset, ColumnStreamData csd, long unlockUntilCOffset, long 
streamOffset)
-          throws IOException {
+      long endCOffset, ColumnStreamData csd, long unlockUntilCOffset, long 
streamOffset,
+      IdentityHashMap<ByteBuffer, Boolean> toRelease) throws IOException {
     if (csd.getCacheBuffers() == null) {
       csd.setCacheBuffers(new ArrayList<MemoryBuffer>());
     } else {
@@ -615,10 +661,10 @@ class EncodedReaderImpl implements EncodedReader {
     if (cOffset == endCOffset) return null;
     boolean isCompressed = codec != null;
     List<ProcCacheChunk> toDecompress = null;
-    List<ByteBuffer> toRelease = null;
     List<IncompleteCb> badEstimates = null;
+    List<ByteBuffer> toReleaseCopies = null;
     if (isCompressed) {
-      toRelease = !dataReader.isTrackingDiskRanges() ? null : new 
ArrayList<ByteBuffer>();
+      toReleaseCopies = new ArrayList<>();
       toDecompress = new ArrayList<>();
       badEstimates = new ArrayList<>();
     }
@@ -636,8 +682,8 @@ class EncodedReaderImpl implements EncodedReader {
     // 2. Go thru the blocks; add stuff to results and prepare the 
decompression work (see below).
     try {
       lastUncompressed = isCompressed ?
-          prepareRangesForCompressedRead(cOffset, endCOffset, streamOffset,
-              unlockUntilCOffset, current, csd, toRelease, toDecompress, 
badEstimates)
+          prepareRangesForCompressedRead(cOffset, endCOffset, streamOffset, 
unlockUntilCOffset,
+              current, csd, toRelease, toReleaseCopies, toDecompress, 
badEstimates)
         : prepareRangesForUncompressedRead(
             cOffset, endCOffset, streamOffset, unlockUntilCOffset, current, 
csd);
     } catch (Exception ex) {
@@ -657,7 +703,10 @@ class EncodedReaderImpl implements EncodedReader {
       assert result == null; // We don't expect conflicts from bad estimates.
     }
 
-    if (toDecompress == null || toDecompress.isEmpty()) return 
lastUncompressed; // Nothing to do.
+    if (toDecompress == null || toDecompress.isEmpty()) {
+      releaseBuffers(toReleaseCopies, false);
+      return lastUncompressed; // Nothing to do.
+    }
 
     // 3. Allocate the buffers, prepare cache keys.
     // At this point, we have read all the CBs we need to read. cacheBuffers 
contains some cache
@@ -690,21 +739,18 @@ class EncodedReaderImpl implements EncodedReader {
       cacheWrapper.reuseBuffer(chunk.getBuffer());
     }
 
-    // 5. Release original compressed buffers to zero-copy reader if needed.
-    if (toRelease != null) {
-      assert dataReader.isTrackingDiskRanges();
-      for (ByteBuffer buffer : toRelease) {
-        dataReader.releaseBuffer(buffer);
-      }
-    }
+    // 5. Release the copies we made directly to the cleaner.
+    releaseBuffers(toReleaseCopies, false);
 
     // 6. Finally, put uncompressed data to cache.
     if (fileKey != null) {
-      long[] collisionMask = cacheWrapper.putFileData(fileKey, cacheKeys, 
targetBuffers, baseOffset);
+      long[] collisionMask = cacheWrapper.putFileData(
+          fileKey, cacheKeys, targetBuffers, baseOffset);
       processCacheCollisions(collisionMask, toDecompress, targetBuffers, 
csd.getCacheBuffers());
     }
 
-    // 7. It may happen that we know we won't use some compression buffers 
anymore.
+    // 7. It may happen that we know we won't use some cache buffers anymore 
(the alternative
+    //    is that we will use the same buffers for other streams in separate 
calls).
     //    Release initial refcounts.
     for (ProcCacheChunk chunk : toDecompress) {
       ponderReleaseInitialRefcount(unlockUntilCOffset, streamOffset, chunk);
@@ -713,9 +759,11 @@ class EncodedReaderImpl implements EncodedReader {
     return lastUncompressed;
   }
 
+  /** Subset of readEncodedStream specific to compressed streams, separate to 
avoid long methods. */
   private CacheChunk prepareRangesForCompressedRead(long cOffset, long 
endCOffset,
-      long streamOffset, long unlockUntilCOffset, DiskRangeList current, 
ColumnStreamData columnStreamData,
-      List<ByteBuffer> toRelease, List<ProcCacheChunk> toDecompress,
+      long streamOffset, long unlockUntilCOffset, DiskRangeList current,
+      ColumnStreamData columnStreamData, IdentityHashMap<ByteBuffer, Boolean> 
toRelease,
+      List<ByteBuffer> toReleaseCopies, List<ProcCacheChunk> toDecompress,
       List<IncompleteCb> badEstimates) throws IOException {
     if (cOffset > current.getOffset()) {
       // Target compression block is in the middle of the range; slice the 
range in two.
@@ -762,8 +810,8 @@ class EncodedReaderImpl implements EncodedReader {
           throw new RuntimeException(msg);
         }
         BufferChunk bc = (BufferChunk)current;
-        ProcCacheChunk newCached = addOneCompressionBuffer(
-            bc, columnStreamData.getCacheBuffers(), toDecompress, toRelease, 
badEstimates);
+        ProcCacheChunk newCached = addOneCompressionBuffer(bc, 
columnStreamData.getCacheBuffers(),
+            toDecompress, toRelease, toReleaseCopies, badEstimates);
         lastUncompressed = (newCached == null) ? lastUncompressed : newCached;
         next = (newCached != null) ? newCached.next : null;
         currentOffset = (next != null) ? next.getOffset() : -1;
@@ -777,9 +825,12 @@ class EncodedReaderImpl implements EncodedReader {
     return lastUncompressed;
   }
 
+  /** Subset of readEncodedStream specific to uncompressed streams, separate 
to avoid long methods. */
   private CacheChunk prepareRangesForUncompressedRead(long cOffset, long 
endCOffset,
-      long streamOffset, long unlockUntilCOffset, DiskRangeList current, 
ColumnStreamData columnStreamData)
-          throws IOException {
+      long streamOffset, long unlockUntilCOffset, DiskRangeList current,
+      ColumnStreamData columnStreamData) throws IOException {
+    // Note: we are called after preReadUncompressedStream, so it doesn't have 
to do nearly as much
+    //       as prepareRangesForCompressedRead does; e.g. every buffer is 
already a CacheChunk.
     long currentOffset = cOffset;
     CacheChunk lastUncompressed = null;
     boolean isFirst = true;
@@ -819,11 +870,10 @@ class EncodedReaderImpl implements EncodedReader {
    * We could avoid copy in non-zcr case and manage the buffer that was not 
allocated by our
    * allocator. Uncompressed case is not mainline though so let's not 
complicate it.
    */
-  private DiskRangeList preReadUncompressedStream(long baseOffset,
-      DiskRangeList start, long streamOffset, long streamEnd) throws 
IOException {
+  private DiskRangeList preReadUncompressedStream(long baseOffset, 
DiskRangeList start,
+      long streamOffset, long streamEnd) throws IOException {
     if (streamOffset == streamEnd) return null;
     List<UncompressedCacheChunk> toCache = null;
-    List<ByteBuffer> toRelease = null;
 
     // 1. Find our bearings in the stream.
     DiskRangeList current = findIntersectingPosition(start, streamOffset, 
streamEnd);
@@ -860,9 +910,6 @@ class EncodedReaderImpl implements EncodedReader {
       if (current.getOffset() >= partEnd) {
         continue; // We have no data at all for this part of the stream (could 
be unneeded), skip.
       }
-      if (toRelease == null && dataReader.isTrackingDiskRanges()) {
-        toRelease = new ArrayList<ByteBuffer>();
-      }
       // We have some disk buffers... see if we have entire part, etc.
       UncompressedCacheChunk candidateCached = null; // We will cache if we 
have the entire part.
       DiskRangeList next = current;
@@ -877,21 +924,15 @@ class EncodedReaderImpl implements EncodedReader {
         current = next;
         if (noMoreDataForPart) break; // Done with this part.
 
-        boolean wasSplit = false;
         if (current.getEnd() > partEnd) {
           // If the current buffer contains multiple parts, split it.
           current = current.split(partEnd);
-          wasSplit = true;
         }
         if (isTracingEnabled) {
           LOG.trace("Processing uncompressed file data at ["
               + current.getOffset() + ", " + current.getEnd() + ")");
         }
         BufferChunk curBc = (BufferChunk)current;
-        if (!wasSplit && toRelease != null) {
-          toRelease.add(curBc.getChunk()); // TODO: is it valid to give zcr 
the modified 2nd part?
-        }
-
         // Track if we still have the entire part.
         long hadEntirePartTo = hasEntirePartTo;
         // We have data until the end of current block if we had it until the 
beginning.
@@ -952,15 +993,7 @@ class EncodedReaderImpl implements EncodedReader {
       ++ix;
     }
 
-    // 5. Release original compressed buffers to zero-copy reader if needed.
-    if (toRelease != null) {
-      assert dataReader.isTrackingDiskRanges();
-      for (ByteBuffer buf : toRelease) {
-        dataReader.releaseBuffer(buf);
-      }
-    }
-
-    // 6. Finally, put uncompressed data to cache.
+    // 5. Put uncompressed data to cache.
     if (fileKey != null) {
       long[] collisionMask = cacheWrapper.putFileData(fileKey, cacheKeys, 
targetBuffers, baseOffset);
       processCacheCollisions(collisionMask, toCache, targetBuffers, null);
@@ -969,7 +1002,6 @@ class EncodedReaderImpl implements EncodedReader {
     return lastUncompressed;
   }
 
-
   private int determineUncompressedPartSize() {
     // We will break the uncompressed data in the cache in the chunks that are 
the size
     // of the prevalent ORC compression buffer (the default), or maximum 
allocation (since we
@@ -1178,7 +1210,8 @@ class EncodedReaderImpl implements EncodedReader {
    */
   private ProcCacheChunk addOneCompressionBuffer(BufferChunk current,
       List<MemoryBuffer> cacheBuffers, List<ProcCacheChunk> toDecompress,
-      List<ByteBuffer> toRelease, List<IncompleteCb> badEstimates) throws 
IOException {
+      IdentityHashMap<ByteBuffer, Boolean> toRelease, List<ByteBuffer> 
toReleaseCopies,
+      List<IncompleteCb> badEstimates) throws IOException {
     ByteBuffer slice = null;
     ByteBuffer compressed = current.getChunk();
     long cbStartOffset = current.getOffset();
@@ -1201,12 +1234,8 @@ class EncodedReaderImpl implements EncodedReader {
       // Simple case - CB fits entirely in the disk range.
       slice = compressed.slice();
       slice.limit(chunkLength);
-      ProcCacheChunk cc = addOneCompressionBlockByteBuffer(slice, 
isUncompressed,
+      return addOneCompressionBlockByteBuffer(slice, isUncompressed,
           cbStartOffset, cbEndOffset, chunkLength, current, toDecompress, 
cacheBuffers);
-      if (compressed.remaining() <= 0 && dataReader.isTrackingDiskRanges()) {
-        toRelease.add(compressed);
-      }
-      return cc;
     }
     if (current.getEnd() < cbEndOffset && !current.hasContiguousNext()) {
       badEstimates.add(addIncompleteCompressionBuffer(cbStartOffset, current, 
0));
@@ -1216,6 +1245,7 @@ class EncodedReaderImpl implements EncodedReader {
     // TODO: we could remove extra copy for isUncompressed case by copying 
directly to cache.
     // We need to consolidate 2 or more buffers into one to decompress.
     ByteBuffer copy = allocateBuffer(chunkLength, compressed.isDirect());
+    toReleaseCopies.add(copy); // We will always release copies at the end.
     int remaining = chunkLength - compressed.remaining();
     int originalPos = compressed.position();
     copy.put(compressed);
@@ -1224,12 +1254,8 @@ class EncodedReaderImpl implements EncodedReader {
     }
     DiskRangeList next = current.next;
     current.removeSelf();
-    if (dataReader.isTrackingDiskRanges()) {
-      if (originalPos == 0) {
-        dataReader.releaseBuffer(compressed); // We copied the entire buffer.
-      } else {
-        toRelease.add(compressed); // There might be slices depending on this 
buffer.
-      }
+    if (originalPos == 0 && toRelease.remove(compressed)) {
+      releaseBuffer(compressed, true);
     }
 
     int extraChunkCount = 0;
@@ -1246,15 +1272,15 @@ class EncodedReaderImpl implements EncodedReader {
         copy.put(slice);
         ProcCacheChunk cc = addOneCompressionBlockByteBuffer(copy, 
isUncompressed,
             cbStartOffset, cbEndOffset, remaining, (BufferChunk)next, 
toDecompress, cacheBuffers);
-        if (compressed.remaining() <= 0 && dataReader.isTrackingDiskRanges()) {
-          dataReader.releaseBuffer(compressed); // We copied the entire buffer.
-        }
+        if (compressed.remaining() <= 0 && toRelease.remove(compressed)) {
+          releaseBuffer(compressed, true); // We copied the entire buffer. 
+        } // else there's more data to process; will be handled in next call.
         return cc;
       }
       remaining -= compressed.remaining();
-      copy.put(compressed);
-      if (dataReader.isTrackingDiskRanges()) {
-        dataReader.releaseBuffer(compressed); // We copied the entire buffer.
+      copy.put(compressed); // TODO: move into the if below; account for 
release call
+      if (toRelease.remove(compressed)) {
+        releaseBuffer(compressed, true); // We copied the entire buffer.
       }
       DiskRangeList tmp = next;
       next = next.hasContiguousNext() ? next.next : null;
@@ -1270,6 +1296,38 @@ class EncodedReaderImpl implements EncodedReader {
     }
   }
 
+  private void releaseBuffers(Collection<ByteBuffer> toRelease, boolean 
isFromDataReader) {
+    if (toRelease == null) return;
+    for (ByteBuffer buf : toRelease) {
+      releaseBuffer(buf, isFromDataReader);
+    }
+  }
+
+  private void releaseBuffer(ByteBuffer bb, boolean isFromDataReader) {
+    if (isTracingEnabled) {
+      LOG.trace("Releasing the buffer " + System.identityHashCode(bb));
+    }
+    if (isFromDataReader && dataReader.isTrackingDiskRanges()) {
+      dataReader.releaseBuffer(bb);
+      return;
+    }
+    Field localCf = cleanerField;
+    if (!bb.isDirect() || localCf == null) return;
+    try {
+      Cleaner cleaner = (Cleaner) localCf.get(bb);
+      if (cleaner != null) {
+        cleaner.clean();
+      } else {
+        LOG.debug("Unable to clean a buffer using cleaner - no cleaner");
+      }
+    } catch (Exception e) {
+      // leave it for GC to clean up
+      LOG.warn("Unable to clean direct buffers using Cleaner.");
+      cleanerField = null;
+    }
+  }
+
+
   private IncompleteCb addIncompleteCompressionBuffer(
       long cbStartOffset, DiskRangeList target, int extraChunkCount) {
     IncompleteCb icb = new IncompleteCb(cbStartOffset, target.getEnd());

Reply via email to