HBASE-17819 Reduce the heap overhead for BucketCache.

Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3f7222df
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3f7222df
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3f7222df

Branch: refs/heads/HBASE-19064
Commit: 3f7222df3699ef11d0782628c8358ad5d0ce108b
Parents: 9ea1a7d
Author: anoopsamjohn <anoopsamj...@gmail.com>
Authored: Sun Mar 25 16:36:30 2018 +0530
Committer: anoopsamjohn <anoopsamj...@gmail.com>
Committed: Sun Mar 25 16:36:30 2018 +0530

----------------------------------------------------------------------
 .../apache/hadoop/hbase/util/UnsafeAccess.java  |  2 +-
 .../hbase/io/hfile/bucket/BucketCache.java      | 83 +++++++++++++++++---
 .../io/hfile/bucket/ByteBufferIOEngine.java     |  5 ++
 .../hadoop/hbase/io/hfile/bucket/IOEngine.java  |  9 +++
 .../bucket/UnsafeSharedMemoryBucketEntry.java   | 81 +++++++++++++++++++
 5 files changed, 167 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/3f7222df/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java
----------------------------------------------------------------------
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java
index feaa9e6..486f81b 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java
@@ -37,7 +37,7 @@ public final class UnsafeAccess {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(UnsafeAccess.class);
 
-  static final Unsafe theUnsafe;
+  public static final Unsafe theUnsafe;
 
   /** The offset to the first element in a byte array. */
   public static final long BYTE_ARRAY_BASE_OFFSET;

http://git-wip-us.apache.org/repos/asf/hbase/blob/3f7222df/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index e9129d2..673057c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -72,6 +72,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.HasThread;
 import org.apache.hadoop.hbase.util.IdReadWriteLock;
 import org.apache.hadoop.hbase.util.IdReadWriteLock.ReferenceType;
+import org.apache.hadoop.hbase.util.UnsafeAvailChecker;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -517,7 +518,7 @@ public class BucketCache implements BlockCache, HeapSize {
             cacheStats.ioHit(timeTaken);
           }
           if (cachedBlock.getMemoryType() == MemoryType.SHARED) {
-            bucketEntry.refCount.incrementAndGet();
+            bucketEntry.incrementRefCountAndGet();
           }
           bucketEntry.access(accessCount.incrementAndGet());
           if (this.ioErrorStartTime > 0) {
@@ -610,8 +611,8 @@ public class BucketCache implements BlockCache, HeapSize {
     ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset());
     try {
       lock.writeLock().lock();
-      int refCount = bucketEntry.refCount.get();
-      if(refCount == 0) {
+      int refCount = bucketEntry.getRefCount();
+      if (refCount == 0) {
         if (backingMap.remove(cacheKey, bucketEntry)) {
           blockEvicted(cacheKey, bucketEntry, removedBlock == null);
         } else {
@@ -630,7 +631,7 @@ public class BucketCache implements BlockCache, HeapSize {
                 + " readers. Can not be freed now. Hence will mark this"
                 + " for evicting at a later point");
           }
-          bucketEntry.markedForEvict = true;
+          bucketEntry.markForEvict();
         }
       }
     } finally {
@@ -728,7 +729,7 @@ public class BucketCache implements BlockCache, HeapSize {
       // this set is small around O(Handler Count) unless something else is 
wrong
       Set<Integer> inUseBuckets = new HashSet<Integer>();
       for (BucketEntry entry : backingMap.values()) {
-        if (entry.refCount.get() != 0) {
+        if (entry.getRefCount() != 0) {
           inUseBuckets.add(bucketAllocator.getBucketIndex(entry.offset()));
         }
       }
@@ -1275,9 +1276,6 @@ public class BucketCache implements BlockCache, HeapSize {
     byte deserialiserIndex;
     private volatile long accessCounter;
     private BlockPriority priority;
-    // Set this when we were not able to forcefully evict the block
-    private volatile boolean markedForEvict;
-    private AtomicInteger refCount = new AtomicInteger(0);
 
     /**
      * Time this block was cached.  Presumes we are created just before we are 
added to the cache.
@@ -1342,6 +1340,63 @@ public class BucketCache implements BlockCache, HeapSize 
{
     public long getCachedTime() {
       return cachedTime;
     }
+
+    protected int getRefCount() {
+      return 0;
+    }
+
+    protected int incrementRefCountAndGet() {
+      return 0;
+    }
+
+    protected int decrementRefCountAndGet() {
+      return 0;
+    }
+
+    protected boolean isMarkedForEvict() {
+      return false;
+    }
+
+    protected void markForEvict() {
+      // noop;
+    }
+  }
+
+  static class SharedMemoryBucketEntry extends BucketEntry {
+    private static final long serialVersionUID = -2187147283772338481L;
+
+    // Set this when we were not able to forcefully evict the block
+    private volatile boolean markedForEvict;
+    private AtomicInteger refCount = new AtomicInteger(0);
+
+    SharedMemoryBucketEntry(long offset, int length, long accessCounter, 
boolean inMemory) {
+      super(offset, length, accessCounter, inMemory);
+    }
+
+    @Override
+    protected int getRefCount() {
+      return this.refCount.get();
+    }
+
+    @Override
+    protected int incrementRefCountAndGet() {
+      return this.refCount.incrementAndGet();
+    }
+
+    @Override
+    protected int decrementRefCountAndGet() {
+      return this.refCount.decrementAndGet();
+    }
+
+    @Override
+    protected boolean isMarkedForEvict() {
+      return this.markedForEvict;
+    }
+
+    @Override
+    protected void markForEvict() {
+      this.markedForEvict = true;
+    }
   }
 
   /**
@@ -1431,7 +1486,11 @@ public class BucketCache implements BlockCache, HeapSize 
{
       // This cacheable thing can't be serialized
       if (len == 0) return null;
       long offset = bucketAllocator.allocateBlock(len);
-      BucketEntry bucketEntry = new BucketEntry(offset, len, accessCounter, 
inMemory);
+      BucketEntry bucketEntry = ioEngine.usesSharedMemory()
+          ? UnsafeAvailChecker.isAvailable()
+              ? new UnsafeSharedMemoryBucketEntry(offset, len, accessCounter, 
inMemory)
+              : new SharedMemoryBucketEntry(offset, len, accessCounter, 
inMemory)
+          : new BucketEntry(offset, len, accessCounter, inMemory);
       bucketEntry.setDeserialiserReference(data.getDeserializer(), 
deserialiserMap);
       try {
         if (data instanceof HFileBlock) {
@@ -1573,8 +1632,8 @@ public class BucketCache implements BlockCache, HeapSize {
     if (block.getMemoryType() == MemoryType.SHARED) {
       BucketEntry bucketEntry = backingMap.get(cacheKey);
       if (bucketEntry != null) {
-        int refCount = bucketEntry.refCount.decrementAndGet();
-        if (bucketEntry.markedForEvict && refCount == 0) {
+        int refCount = bucketEntry.decrementRefCountAndGet();
+        if (refCount == 0 && bucketEntry.isMarkedForEvict()) {
           forceEvict(cacheKey);
         }
       }
@@ -1585,7 +1644,7 @@ public class BucketCache implements BlockCache, HeapSize {
   public int getRefCount(BlockCacheKey cacheKey) {
     BucketEntry bucketEntry = backingMap.get(cacheKey);
     if (bucketEntry != null) {
-      return bucketEntry.refCount.get();
+      return bucketEntry.getRefCount();
     }
     return 0;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/3f7222df/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java
index 9f4ffba..3b832fe 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java
@@ -103,6 +103,11 @@ public class ByteBufferIOEngine implements IOEngine {
   }
 
   @Override
+  public boolean usesSharedMemory() {
+    return true;
+  }
+
+  @Override
   public Cacheable read(long offset, int length, 
CacheableDeserializer<Cacheable> deserializer)
       throws IOException {
     ByteBuff dstBuffer = bufferArray.asSubByteBuff(offset, length);

http://git-wip-us.apache.org/repos/asf/hbase/blob/3f7222df/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java
index 61bde6b..87f71a5 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java
@@ -38,6 +38,15 @@ public interface IOEngine {
   boolean isPersistent();
 
   /**
+   * IOEngine uses shared memory means, when reading Cacheable from it, those 
refers to the same
+   * memory area as used by the Engine for caching it.
+   * @return true when IOEngine using shared memory.
+   */
+  default boolean usesSharedMemory() {
+    return false;
+  }
+
+  /**
    * Transfers data from IOEngine to a Cacheable object.
    * @param length How many bytes to be read from the offset
    * @param offset The offset in the IO engine where the first byte to be read

http://git-wip-us.apache.org/repos/asf/hbase/blob/3f7222df/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/UnsafeSharedMemoryBucketEntry.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/UnsafeSharedMemoryBucketEntry.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/UnsafeSharedMemoryBucketEntry.java
new file mode 100644
index 0000000..5d93e97
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/UnsafeSharedMemoryBucketEntry.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile.bucket;
+
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry;
+import org.apache.hadoop.hbase.util.UnsafeAccess;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import sun.misc.Unsafe;
+
+@InterfaceAudience.Private
+public class UnsafeSharedMemoryBucketEntry extends BucketEntry {
+  private static final long serialVersionUID = 707544024564058801L;
+
+  // We are just doing what AtomicInteger doing for the Atomic 
incrementAndGet/decrementAndGet.
+  // We are avoiding the need to have a field of AtomicIneger type and have it 
as just int type.
+  // We would like to reduce the head overhead per object of this type as much 
as possible.
+  // Doing this direct Unsafe usage save us 16 bytes per Object.
+  // ie Just using 4 bytes for int type than 20 bytes requirement for an 
AtomicInteger (16 bytes)
+  // and 4 bytes reference to it.
+  private static final Unsafe unsafe = UnsafeAccess.theUnsafe;
+  private static final long refCountOffset;
+
+  static {
+    try {
+      refCountOffset = unsafe
+          
.objectFieldOffset(UnsafeSharedMemoryBucketEntry.class.getDeclaredField("refCount"));
+    } catch (Exception ex) {
+      throw new Error(ex);
+    }
+  }
+
+  // Set this when we were not able to forcefully evict the block
+  private volatile boolean markedForEvict;
+  private volatile int refCount = 0;
+
+  public UnsafeSharedMemoryBucketEntry(long offset, int length, long 
accessCounter,
+      boolean inMemory) {
+    super(offset, length, accessCounter, inMemory);
+  }
+
+  @Override
+  protected int getRefCount() {
+    return this.refCount;
+  }
+
+  @Override
+  protected int incrementRefCountAndGet() {
+    return unsafe.getAndAddInt(this, refCountOffset, 1) + 1;
+  }
+
+  @Override
+  protected int decrementRefCountAndGet() {
+    return unsafe.getAndAddInt(this, refCountOffset, -1) - 1;
+  }
+
+  @Override
+  protected boolean isMarkedForEvict() {
+    return this.markedForEvict;
+  }
+
+  @Override
+  protected void markForEvict() {
+    this.markedForEvict = true;
+  }
+}
\ No newline at end of file

Reply via email to