HBASE-14463 Severe performance downgrade when parallel reading a single key 
from BucketCache (Yu Li)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/263a0adf
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/263a0adf
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/263a0adf

Branch: refs/heads/hbase-12439
Commit: 263a0adf79105b9dc166e21c3f5159ade6e2d0a7
Parents: 6ec4a96
Author: tedyu <yuzhih...@gmail.com>
Authored: Fri Nov 6 13:12:10 2015 -0800
Committer: tedyu <yuzhih...@gmail.com>
Committed: Fri Nov 6 13:12:10 2015 -0800

----------------------------------------------------------------------
 .../hbase/io/hfile/bucket/BucketCache.java      |  52 +++-----
 .../hadoop/hbase/util/IdReadWriteLock.java      |  91 +++++++++++++
 .../hadoop/hbase/io/hfile/CacheTestUtils.java   |  19 ++-
 .../hbase/io/hfile/bucket/TestBucketCache.java  |   9 +-
 .../hadoop/hbase/util/TestIdReadWriteLock.java  | 127 +++++++++++++++++++
 5 files changed, 258 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/263a0adf/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index 5eb6f8f..718e92a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -47,6 +47,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -68,7 +69,7 @@ import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.util.ConcurrentIndex;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.HasThread;
-import org.apache.hadoop.hbase.util.IdLock;
+import org.apache.hadoop.hbase.util.IdReadWriteLock;
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -180,14 +181,11 @@ public class BucketCache implements BlockCache, HeapSize {
   private volatile long ioErrorStartTime = -1;
 
   /**
-   * A "sparse lock" implementation allowing to lock on a particular block
-   * identified by offset. The purpose of this is to avoid freeing the block
-   * which is being read.
-   *
-   * TODO:We could extend the IdLock to IdReadWriteLock for better.
+   * A ReentrantReadWriteLock to lock on a particular block identified by 
offset.
+   * The purpose of this is to avoid freeing the block which is being read.
    */
   @VisibleForTesting
-  final IdLock offsetLock = new IdLock();
+  final IdReadWriteLock offsetLock = new IdReadWriteLock();
 
   private final ConcurrentIndex<String, BlockCacheKey> blocksByHFile =
       new ConcurrentIndex<String, BlockCacheKey>(new 
Comparator<BlockCacheKey>() {
@@ -412,9 +410,9 @@ public class BucketCache implements BlockCache, HeapSize {
     BucketEntry bucketEntry = backingMap.get(key);
     if (bucketEntry != null) {
       long start = System.nanoTime();
-      IdLock.Entry lockEntry = null;
+      ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset());
       try {
-        lockEntry = offsetLock.getLockEntry(bucketEntry.offset());
+        lock.readLock().lock();
         // We can not read here even if backingMap does contain the given key 
because its offset
         // maybe changed. If we lock BlockCacheKey instead of offset, then we 
can only check
         // existence here.
@@ -442,9 +440,7 @@ public class BucketCache implements BlockCache, HeapSize {
         LOG.error("Failed reading block " + key + " from bucket cache", ioex);
         checkIOErrorIsTolerated();
       } finally {
-        if (lockEntry != null) {
-          offsetLock.releaseLockEntry(lockEntry);
-        }
+        lock.readLock().unlock();
       }
     }
     if (!repeat && updateCacheMetrics) {
@@ -484,21 +480,16 @@ public class BucketCache implements BlockCache, HeapSize {
         return false;
       }
     }
-    IdLock.Entry lockEntry = null;
+    ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset());
     try {
-      lockEntry = offsetLock.getLockEntry(bucketEntry.offset());
+      lock.writeLock().lock();
       if (backingMap.remove(cacheKey, bucketEntry)) {
         blockEvicted(cacheKey, bucketEntry, removedBlock == null);
       } else {
         return false;
       }
-    } catch (IOException ie) {
-      LOG.warn("Failed evicting block " + cacheKey);
-      return false;
     } finally {
-      if (lockEntry != null) {
-        offsetLock.releaseLockEntry(lockEntry);
-      }
+      lock.writeLock().unlock();
     }
     cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary());
     return true;
@@ -527,9 +518,9 @@ public class BucketCache implements BlockCache, HeapSize {
         return false;
       }
     }
-    IdLock.Entry lockEntry = null;
+    ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset());
     try {
-      lockEntry = offsetLock.getLockEntry(bucketEntry.offset());
+      lock.writeLock().lock();
       int refCount = bucketEntry.refCount.get();
       if(refCount == 0) {
         if (backingMap.remove(cacheKey, bucketEntry)) {
@@ -553,13 +544,8 @@ public class BucketCache implements BlockCache, HeapSize {
           bucketEntry.markedForEvict = true;
         }
       }
-    } catch (IOException ie) {
-      LOG.warn("Failed evicting block " + cacheKey);
-      return false;
     } finally {
-      if (lockEntry != null) {
-        offsetLock.releaseLockEntry(lockEntry);
-      }
+      lock.writeLock().unlock();
     }
     cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary());
     return true;
@@ -909,18 +895,14 @@ public class BucketCache implements BlockCache, HeapSize {
           heapSize.addAndGet(-1 * entries.get(i).getData().heapSize());
         } else if (bucketEntries[i] != null){
           // Block should have already been evicted. Remove it and free space.
-          IdLock.Entry lockEntry = null;
+          ReentrantReadWriteLock lock = 
offsetLock.getLock(bucketEntries[i].offset());
           try {
-            lockEntry = offsetLock.getLockEntry(bucketEntries[i].offset());
+            lock.writeLock().lock();
             if (backingMap.remove(key, bucketEntries[i])) {
               blockEvicted(key, bucketEntries[i], false);
             }
-          } catch (IOException e) {
-            LOG.warn("failed to free space for " + key, e);
           } finally {
-            if (lockEntry != null) {
-              offsetLock.releaseLockEntry(lockEntry);
-            }
+            lock.writeLock().unlock();
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/263a0adf/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdReadWriteLock.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdReadWriteLock.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdReadWriteLock.java
new file mode 100644
index 0000000..7dc6fbf
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdReadWriteLock.java
@@ -0,0 +1,91 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Allows multiple concurrent clients to lock on a numeric id with 
ReentrantReadWriteLock. The
+ * intended usage for read lock is as follows:
+ *
+ * <pre>
+ * ReentrantReadWriteLock lock = idReadWriteLock.getLock(id);
+ * try {
+ *   lock.readLock().lock();
+ *   // User code.
+ * } finally {
+ *   lock.readLock().unlock();
+ * }
+ * </pre>
+ *
+ * For write lock, use lock.writeLock()
+ */
+@InterfaceAudience.Private
+public class IdReadWriteLock {
+  // The number of lock we want to easily support. It's not a maximum.
+  private static final int NB_CONCURRENT_LOCKS = 1000;
+  // The pool to get entry from, entries are mapped by weak reference to make 
it able to be
+  // garbage-collected asap
+  private final WeakObjectPool<Long, ReentrantReadWriteLock> lockPool =
+      new WeakObjectPool<Long, ReentrantReadWriteLock>(
+          new WeakObjectPool.ObjectFactory<Long, ReentrantReadWriteLock>() {
+            @Override
+            public ReentrantReadWriteLock createObject(Long id) {
+              return new ReentrantReadWriteLock();
+            }
+          }, NB_CONCURRENT_LOCKS);
+
+  /**
+   * Get the ReentrantReadWriteLock corresponding to the given id
+   * @param id an arbitrary number to identify the lock
+   */
+  public ReentrantReadWriteLock getLock(long id) {
+    lockPool.purge();
+    ReentrantReadWriteLock readWriteLock = lockPool.get(id);
+    return readWriteLock;
+  }
+
+  /** For testing */
+  @VisibleForTesting
+  int purgeAndGetEntryPoolSize() {
+    System.gc();
+    Threads.sleep(200);
+    lockPool.purge();
+    return lockPool.size();
+  }
+
+  @VisibleForTesting
+  public void waitForWaiters(long id, int numWaiters) throws 
InterruptedException {
+    for (ReentrantReadWriteLock readWriteLock;;) {
+      readWriteLock = lockPool.get(id);
+      if (readWriteLock != null) {
+        synchronized (readWriteLock) {
+          if (readWriteLock.getQueueLength() >= numWaiters) {
+            return;
+          }
+        }
+      }
+      Thread.sleep(50);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/263a0adf/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
index 96179fd..69671e2 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
@@ -187,7 +187,11 @@ public class CacheTestUtils {
         public void doAnAction() throws Exception {
           ByteArrayCacheable returned = (ByteArrayCacheable) toBeTested
               .getBlock(key, false, false, true);
-          assertArrayEquals(buf, returned.buf);
+          if (returned != null) {
+            assertArrayEquals(buf, returned.buf);
+          } else {
+            Thread.sleep(10);
+          }
           totalQueries.incrementAndGet();
         }
       };
@@ -196,6 +200,19 @@ public class CacheTestUtils {
       ctx.addThread(t);
     }
 
+    // add a thread to periodically evict and re-cache the block
+    final long blockEvictPeriod = 50;
+    TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
+      @Override
+      public void doAnAction() throws Exception {
+        toBeTested.evictBlock(key);
+        toBeTested.cacheBlock(key, bac);
+        Thread.sleep(blockEvictPeriod);
+      }
+    };
+    t.setDaemon(true);
+    ctx.addThread(t);
+
     ctx.startThreads();
     while (totalQueries.get() < numQueries && ctx.shouldRun()) {
       Thread.sleep(10);

http://git-wip-us.apache.org/repos/asf/hbase/blob/263a0adf/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
index 70d21dd..54dd8e5 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
 import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
@@ -35,7 +36,6 @@ import 
org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo;
 import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.hbase.util.IdLock;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -174,7 +174,7 @@ public class TestBucketCache {
 
   @Test
   public void testCacheMultiThreadedSingleKey() throws Exception {
-    CacheTestUtils.hammerSingleKey(cache, BLOCK_SIZE, NUM_THREADS, 
NUM_QUERIES);
+    CacheTestUtils.hammerSingleKey(cache, BLOCK_SIZE, 2 * NUM_THREADS, 2 * 
NUM_QUERIES);
   }
 
   @Test
@@ -199,7 +199,8 @@ public class TestBucketCache {
     cacheAndWaitUntilFlushedToBucket(cache, cacheKey, new 
CacheTestUtils.ByteArrayCacheable(
         new byte[10]));
     long lockId = cache.backingMap.get(cacheKey).offset();
-    IdLock.Entry lockEntry = cache.offsetLock.getLockEntry(lockId);
+    ReentrantReadWriteLock lock = cache.offsetLock.getLock(lockId);
+    lock.writeLock().lock();
     Thread evictThread = new Thread("evict-block") {
 
       @Override
@@ -213,7 +214,7 @@ public class TestBucketCache {
     cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true);
     cacheAndWaitUntilFlushedToBucket(cache, cacheKey, new 
CacheTestUtils.ByteArrayCacheable(
         new byte[10]));
-    cache.offsetLock.releaseLockEntry(lockEntry);
+    lock.writeLock().unlock();
     evictThread.join();
     assertEquals(1L, cache.getBlockCount());
     assertTrue(cache.getCurrentSize() > 0L);

http://git-wip-us.apache.org/repos/asf/hbase/blob/263a0adf/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdReadWriteLock.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdReadWriteLock.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdReadWriteLock.java
new file mode 100644
index 0000000..66f6d4b
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdReadWriteLock.java
@@ -0,0 +1,127 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({MiscTests.class, MediumTests.class})
+// Medium as it creates 100 threads; seems better to run it isolated
+public class TestIdReadWriteLock {
+
+  private static final Log LOG = LogFactory.getLog(TestIdReadWriteLock.class);
+
+  private static final int NUM_IDS = 16;
+  private static final int NUM_THREADS = 128;
+  private static final int NUM_SECONDS = 15;
+
+  private IdReadWriteLock idLock = new IdReadWriteLock();
+
+  private Map<Long, String> idOwner = new ConcurrentHashMap<Long, String>();
+
+  private class IdLockTestThread implements Callable<Boolean> {
+
+    private String clientId;
+
+    public IdLockTestThread(String clientId) {
+      this.clientId = clientId;
+    }
+
+    @Override
+    public Boolean call() throws Exception {
+      Thread.currentThread().setName(clientId);
+      Random rand = new Random();
+      long endTime = System.currentTimeMillis() + NUM_SECONDS * 1000;
+      while (System.currentTimeMillis() < endTime) {
+        long id = rand.nextInt(NUM_IDS);
+        boolean readLock = rand.nextBoolean();
+
+        ReentrantReadWriteLock readWriteLock = idLock.getLock(id);
+        Lock lock = readLock ? readWriteLock.readLock() : 
readWriteLock.writeLock();
+        try {
+          lock.lock();
+          int sleepMs = 1 + rand.nextInt(4);
+          String owner = idOwner.get(id);
+          if (owner != null && LOG.isDebugEnabled()) {
+            LOG.debug((readLock ? "Read" : "Write") + "lock of Id " + id + " 
already taken by "
+                + owner + ", we are " + clientId);
+          }
+
+          idOwner.put(id, clientId);
+          Thread.sleep(sleepMs);
+          idOwner.remove(id);
+
+        } finally {
+          lock.unlock();
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Release " + (readLock ? "Read" : "Write") + " lock of 
Id" + id + ", we are "
+                + clientId);
+          }
+        }
+      }
+      return true;
+    }
+
+  }
+
+  @Test(timeout = 60000)
+  public void testMultipleClients() throws Exception {
+    ExecutorService exec = Executors.newFixedThreadPool(NUM_THREADS);
+    try {
+      ExecutorCompletionService<Boolean> ecs =
+          new ExecutorCompletionService<Boolean>(exec);
+      for (int i = 0; i < NUM_THREADS; ++i)
+        ecs.submit(new IdLockTestThread("client_" + i));
+      for (int i = 0; i < NUM_THREADS; ++i) {
+        Future<Boolean> result = ecs.take();
+        assertTrue(result.get());
+      }
+      // make sure the entry pool will be cleared after GC and purge call
+      int entryPoolSize = idLock.purgeAndGetEntryPoolSize();
+      LOG.debug("Size of entry pool after gc and purge: " + entryPoolSize);
+      assertEquals(0, entryPoolSize);
+    } finally {
+      exec.shutdown();
+      exec.awaitTermination(5000, TimeUnit.MILLISECONDS);
+    }
+  }
+
+
+}
+

Reply via email to