HBASE-14575 Relax region read lock for compactions (Nick and Ted)

Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d8b30b89
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d8b30b89
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d8b30b89

Branch: refs/heads/hbase-12439
Commit: d8b30b892561126b02bd5d27dbe11a01f97908d1
Parents: 92e178d
Author: tedyu <yuzhih...@gmail.com>
Authored: Tue Dec 1 16:44:59 2015 -0800
Committer: tedyu <yuzhih...@gmail.com>
Committed: Tue Dec 1 16:44:59 2015 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/HRegion.java      | 84 ++++++++++++++++++--
 .../regionserver/TestHRegionServerBulkLoad.java | 73 ++++++++++++++---
 2 files changed, 136 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/d8b30b89/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 7bf4855..557edd9 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -1793,8 +1793,80 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
     }
     MonitoredTask status = null;
     boolean requestNeedsCancellation = true;
-    // block waiting for the lock for compaction
-    lock.readLock().lock();
+    /*
+     * We are trying to remove / relax the region read lock for compaction.
+     * Let's see what are the potential race conditions among the operations 
(user scan,
+     * region split, region close and region bulk load).
+     * 
+     *  user scan ---> region read lock
+     *  region split --> region close first --> region write lock
+     *  region close --> region write lock
+     *  region bulk load --> region write lock
+     *  
+     * read lock is compatible with read lock. ---> no problem with user 
scan/read
+     * region bulk load does not cause problem for compaction (no consistency 
problem, store lock
+     *  will help the store file accounting).
+     * They can run almost concurrently at the region level.
+     * 
+     * The only remaining race condition is between the region close and 
compaction.
+     * So we will evaluate, below, how region close intervenes with compaction 
if compaction does
+     * not acquire region read lock.
+     * 
+     * Here are the steps for compaction:
+     * 1. obtain list of StoreFile's
+     * 2. create StoreFileScanner's based on list from #1
+     * 3. perform compaction and save resulting files under tmp dir
+     * 4. swap in compacted files
+     * 
+     * #1 is guarded by store lock. This patch does not change this --> no 
worse or better
+     * For #2, we obtain smallest read point (for region) across all the 
Scanners (for both default
+     * compactor and stripe compactor).
+     * The read points are for user scans. Region keeps the read points for 
all currently open
+     * user scanners.
+     * Compaction needs to know the smallest read point so that during 
re-write of the hfiles,
+     * it can remove the mvcc points for the cells if their mvccs are older 
than the smallest
+     * since they are not needed anymore.
+     * This will not conflict with compaction.
+     * For #3, it can be performed in parallel to other operations.
+     * For #4 bulk load and compaction don't conflict with each other on the 
region level
+     *   (for multi-family atomicy). 
+     * Region close and compaction are guarded pretty well by the 'writestate'.
+     * In HRegion#doClose(), we have :
+     * synchronized (writestate) {
+     *   // Disable compacting and flushing by background threads for this
+     *   // region.
+     *   canFlush = !writestate.readOnly;
+     *   writestate.writesEnabled = false;
+     *   LOG.debug("Closing " + this + ": disabling compactions & flushes");
+     *   waitForFlushesAndCompactions();
+     * }
+     * waitForFlushesAndCompactions() would wait for writestate.compacting to 
come down to 0.
+     * and in HRegion.compact()
+     *  try {
+     *    synchronized (writestate) {
+     *    if (writestate.writesEnabled) {
+     *      wasStateSet = true;
+     *      ++writestate.compacting;
+     *    } else {
+     *      String msg = "NOT compacting region " + this + ". Writes 
disabled.";
+     *      LOG.info(msg);
+     *      status.abort(msg);
+     *      return false;
+     *    }
+     *  }
+     * Also in compactor.performCompaction():
+     * check periodically to see if a system stop is requested
+     * if (closeCheckInterval > 0) {
+     *   bytesWritten += len;
+     *   if (bytesWritten > closeCheckInterval) {
+     *     bytesWritten = 0;
+     *     if (!store.areWritesEnabled()) {
+     *       progress.cancel();
+     *       return false;
+     *     }
+     *   }
+     * }
+     */
     try {
       byte[] cf = Bytes.toBytes(store.getColumnFamilyName());
       if (stores.get(cf) != store) {
@@ -1852,12 +1924,8 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
       status.markComplete("Compaction complete");
       return true;
     } finally {
-      try {
-        if (requestNeedsCancellation) 
store.cancelRequestedCompaction(compaction);
-        if (status != null) status.cleanup();
-      } finally {
-        lock.readLock().unlock();
-      }
+      if (requestNeedsCancellation) 
store.cancelRequestedCompaction(compaction);
+      if (status != null) status.cleanup();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/d8b30b89/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
index 22e91f0..87cbab7 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
@@ -17,7 +17,17 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import com.google.common.collect.Lists;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -42,6 +52,9 @@ import org.apache.hadoop.hbase.client.RpcRetryingCaller;
 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@@ -60,24 +73,20 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALKey;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
 import com.google.common.collect.Lists;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
-
 /**
  * Tests bulk loading of HFiles and shows the atomicity or lack of atomicity of
  * the region server's bullkLoad functionality.
  */
+@RunWith(Parameterized.class)
 @Category({RegionServerTests.class, LargeTests.class})
 public class TestHRegionServerBulkLoad {
   private static final Log LOG = 
LogFactory.getLog(TestHRegionServerBulkLoad.class);
@@ -85,6 +94,7 @@ public class TestHRegionServerBulkLoad {
   private final static Configuration conf = UTIL.getConfiguration();
   private final static byte[] QUAL = Bytes.toBytes("qual");
   private final static int NUM_CFS = 10;
+  private int sleepDuration;
   public static int BLOCKSIZE = 64 * 1024;
   public static Algorithm COMPRESSION = Compression.Algorithm.NONE;
 
@@ -94,6 +104,24 @@ public class TestHRegionServerBulkLoad {
       families[i] = Bytes.toBytes(family(i));
     }
   }
+  @Parameters
+  public static final Collection<Object[]> parameters() {
+    int[] sleepDurations = new int[] { 0, 30000 };
+    List<Object[]> configurations = new ArrayList<Object[]>();
+    for (int i : sleepDurations) {
+      configurations.add(new Object[] { i });
+    }
+    return configurations;
+  }
+
+  public TestHRegionServerBulkLoad(int duration) {
+    this.sleepDuration = duration;
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    conf.setInt("hbase.rpc.timeout", 10 * 1000);
+  }
 
   /**
    * Create a rowkey compatible with
@@ -189,8 +217,8 @@ public class TestHRegionServerBulkLoad {
       caller.callWithRetries(callable, Integer.MAX_VALUE);
 
       // Periodically do compaction to reduce the number of open file handles.
-      if (numBulkLoads.get() % 10 == 0) {
-        // 10 * 50 = 500 open file handles!
+      if (numBulkLoads.get() % 5 == 0) {
+        // 5 * 50 = 250 open file handles!
         callable = new RegionServerCallable<Void>(conn, tableName, 
Bytes.toBytes("aaa")) {
           @Override
           public Void call(int callTimeout) throws Exception {
@@ -211,6 +239,23 @@ public class TestHRegionServerBulkLoad {
     }
   }
 
+  public static class MyObserver extends BaseRegionObserver {
+    static int sleepDuration;
+    @Override
+    public InternalScanner 
preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
+        final Store store, final InternalScanner scanner, final ScanType 
scanType)
+            throws IOException {
+      try {
+        Thread.sleep(sleepDuration);
+      } catch (InterruptedException ie) {
+        IOException ioe = new InterruptedIOException();
+        ioe.initCause(ie);
+        throw ioe;
+      }
+      return scanner;
+    }
+  }
+
   /**
    * Thread that does full scans of the table looking for any partially
    * completed rows.
@@ -278,6 +323,8 @@ public class TestHRegionServerBulkLoad {
     try {
       LOG.info("Creating table " + table);
       HTableDescriptor htd = new HTableDescriptor(table);
+      htd.addCoprocessor(MyObserver.class.getName());
+      MyObserver.sleepDuration = this.sleepDuration;
       for (int i = 0; i < 10; i++) {
         htd.addFamily(new HColumnDescriptor(family(i)));
       }
@@ -348,7 +395,7 @@ public class TestHRegionServerBulkLoad {
   public static void main(String args[]) throws Exception {
     try {
       Configuration c = HBaseConfiguration.create();
-      TestHRegionServerBulkLoad test = new TestHRegionServerBulkLoad();
+      TestHRegionServerBulkLoad test = new TestHRegionServerBulkLoad(0);
       test.setConf(c);
       test.runAtomicBulkloadTest(TableName.valueOf("atomicTableTest"), 5 * 60 
* 1000, 50);
     } finally {

Reply via email to