Repository: hbase
Updated Branches:
  refs/heads/branch-1.2 e77c57874 -> e1d695038


HBASE-18233 We shouldn't wait for readlock in doMiniBatchMutation in case of 
deadlock (Allan Yang)

This patch plus a sorting of the batch (HBASE-17924) fixes a regression
in Increment/CheckAndPut-style operations.

Signed-off-by: Allan Yang <allan...@163.com>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e1d69503
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e1d69503
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e1d69503

Branch: refs/heads/branch-1.2
Commit: e1d6950381daeb46f0218fc1de3256e88cfc0a1f
Parents: e77c578
Author: Michael Stack <st...@apache.org>
Authored: Tue Nov 28 09:14:58 2017 -0800
Committer: Michael Stack <st...@apache.org>
Committed: Tue Nov 28 09:15:04 2017 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/HRegion.java      |  59 ++++++--
 .../hadoop/hbase/regionserver/Region.java       |   4 +-
 .../hadoop/hbase/client/TestMultiParallel.java  | 141 +++++++++++++++++++
 .../hbase/regionserver/TestAtomicOperation.java |   9 ++
 4 files changed, 203 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e1d69503/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 0e73d1c..30202a0 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -3051,18 +3051,29 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
           continue;
         }
 
+
+        //HBASE-18233
         // If we haven't got any rows in our batch, we should block to
-        // get the next one.
+        // get the next one's read lock. We need at least one row to mutate.
+        // If we have got rows, do not block when lock is not available,
+        // so that we can fail fast and go on with the rows with locks in
+        // the batch. By doing this, we can reduce contention and prevent
+        // possible deadlocks.
+        // The unfinished rows in the batch will be detected in batchMutate,
+        // and it wil try to finish them by calling doMiniBatchMutation again.
+        boolean shouldBlock = numReadyToWrite == 0;
         RowLock rowLock = null;
         try {
-          rowLock = getRowLock(mutation.getRow(), true);
+          rowLock = getRowLock(mutation.getRow(), true, shouldBlock);
         } catch (IOException ioe) {
           LOG.warn("Failed getting lock in batch put, row="
             + Bytes.toStringBinary(mutation.getRow()), ioe);
         }
         if (rowLock == null) {
-          // We failed to grab another lock
-          break; // stop acquiring more rows for this batch
+          // We failed to grab another lock. Stop acquiring more rows for this
+          // batch and go on with the gotten ones
+          break;
+
         } else {
           acquiredRowLocks.add(rowLock);
         }
@@ -5138,7 +5149,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
    * Get an exclusive ( write lock ) lock on a given row.
    * @param row Which row to lock.
    * @return A locked RowLock. The lock is exclusive and already aqquired.
-   * @throws IOException
+   * @throws IOException if any error occurred
    */
   public RowLock getRowLock(byte[] row) throws IOException {
     return getRowLock(row, false);
@@ -5152,9 +5163,28 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
    * started (the calling thread has already acquired the region-close-guard 
lock).
    * @param row The row actions will be performed against
    * @param readLock is the lock reader or writer. True indicates that a 
non-exlcusive
-   *                 lock is requested
+   *          lock is requested
+   * @return A locked RowLock.
+   * @throws IOException if any error occurred
    */
   public RowLock getRowLock(byte[] row, boolean readLock) throws IOException {
+    return getRowLock(row, readLock, true);
+  }
+
+  /**
+   *
+   * Get a row lock for the specified row. All locks are reentrant.
+   *
+   * Before calling this function make sure that a region operation has 
already been
+   * started (the calling thread has already acquired the region-close-guard 
lock).
+   * @param row The row actions will be performed against
+   * @param readLock is the lock reader or writer. True indicates that a 
non-exlcusive
+   *          lock is requested
+   * @param waitForLock whether should wait for this lock
+   * @return A locked RowLock, or null if {@code waitForLock} set to false and 
tryLock failed
+   * @throws IOException if any error occurred
+   */
+  public RowLock getRowLock(byte[] row, boolean readLock, boolean waitForLock) 
throws IOException {
     // Make sure the row is inside of this region before getting the lock for 
it.
     checkRow(row, "row lock");
     // create an object to use a a key in the row lock map
@@ -5195,12 +5225,25 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
           result = rowLockContext.newWriteLock();
         }
       }
-      if (!result.getLock().tryLock(this.rowLockWaitDuration, 
TimeUnit.MILLISECONDS)) {
+      boolean lockAvailable = false;
+      if(waitForLock) {
+        //if waiting for lock, wait for rowLockWaitDuration milliseconds
+        lockAvailable = result.getLock().tryLock(this.rowLockWaitDuration, 
TimeUnit.MILLISECONDS);
+      } else {
+        //if we are not waiting for lock, tryLock() will return immediately 
whether we have got
+        //this lock or not
+        lockAvailable = result.getLock().tryLock();
+      }
+      if(!lockAvailable) {
         if (traceScope != null) {
           traceScope.getSpan().addTimelineAnnotation("Failed to get row lock");
         }
         result = null;
-        throw new IOException("Timed out waiting for lock for row: " + rowKey);
+        if(waitForLock) {
+          throw new IOException("Timed out waiting for lock for row: " + 
rowKey);
+        } else {
+          return null;
+        }
       }
       rowLockContext.setThreadName(Thread.currentThread().getName());
       success = true;

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1d69503/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
index 81fb0b9..dcf151c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
@@ -270,14 +270,14 @@ public interface Region extends ConfigurationObserver {
 
   /**
    * Tries to acquire a lock on the given row.
-   * @param waitForLock if true, will block until the lock is available.
+   * @param readlock if true, will block until the lock is available.
    *        Otherwise, just tries to obtain the lock and returns
    *        false if unavailable.
    * @return the row lock if acquired,
    *   null if waitForLock was false and the lock was not acquired
    * @throws IOException if waitForLock was true and the lock could not be 
acquired after waiting
    */
-  RowLock getRowLock(byte[] row, boolean waitForLock) throws IOException;
+  RowLock getRowLock(byte[] row, boolean readlock) throws IOException;
 
   /**
    * If the given list of row locks is not null, releases all locks.

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1d69503/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
index 935f6e8..ecd67a1 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
@@ -63,6 +64,7 @@ public class TestMultiParallel {
   private static final byte[] QUALIFIER = Bytes.toBytes("qual");
   private static final String FAMILY = "family";
   private static final TableName TEST_TABLE = 
TableName.valueOf("multi_test_table");
+  private static final TableName TEST_TABLE2 = 
TableName.valueOf("multi_test_table2");
   private static final byte[] BYTES_FAMILY = Bytes.toBytes(FAMILY);
   private static final byte[] ONE_ROW = Bytes.toBytes("xxx");
   private static final byte [][] KEYS = makeKeys();
@@ -728,4 +730,143 @@ public class TestMultiParallel {
       validateEmpty(result);
     }
   }
+
+  private static class MultiThread extends Thread {
+    public Throwable throwable = null;
+    private CountDownLatch endLatch;
+    private CountDownLatch beginLatch;
+    List<Put> puts;
+    public MultiThread(List<Put> puts, CountDownLatch beginLatch, 
CountDownLatch endLatch) {
+      this.puts = puts;
+      this.beginLatch = beginLatch;
+      this.endLatch = endLatch;
+    }
+    @Override
+    public void run() {
+      HTable table = null;
+      try {
+        table = new HTable(UTIL.getConfiguration(), TEST_TABLE2);
+        table.setAutoFlush(false);
+        beginLatch.await();
+        for (int i = 0; i < 100; i++) {
+          for(Put put : puts) {
+            table.put(put);
+          }
+          table.flushCommits();
+        }
+      } catch (Throwable t) {
+        throwable = t;
+        LOG.warn("Error when put:", t);
+      } finally {
+        endLatch.countDown();
+        if(table != null) {
+          try {
+            table.close();
+          } catch (IOException ioe) {
+            LOG.error("Error when close table", ioe);
+          }
+        }
+      }
+    }
+  }
+
+
+  private static class IncrementThread extends Thread {
+    public Throwable throwable = null;
+    private CountDownLatch endLatch;
+    private CountDownLatch beginLatch;
+    List<Put> puts;
+    public IncrementThread(List<Put> puts, CountDownLatch beginLatch, 
CountDownLatch endLatch) {
+      this.puts = puts;
+      this.beginLatch = beginLatch;
+      this.endLatch = endLatch;
+    }
+    @Override
+    public void run() {
+      HTable table = null;
+      try {
+        table = new HTable(UTIL.getConfiguration(), TEST_TABLE2);
+        beginLatch.await();
+        for (int i = 0; i < 100; i++) {
+          for(Put put : puts) {
+            Increment inc = new Increment(put.getRow());
+            inc.addColumn(BYTES_FAMILY, BYTES_FAMILY, 1);
+            table.increment(inc);
+          }
+        }
+      } catch (Throwable t) {
+        throwable = t;
+        LOG.warn("Error when incr:", t);
+      } finally {
+        endLatch.countDown();
+        if(table != null) {
+          try {
+            table.close();
+          } catch (IOException ioe) {
+            LOG.error("Error when close table", ioe);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * UT for HBASE-18233, test for disordered batch mutation thread and
+   * increment won't lock each other
+   * @throws Exception if any error occurred
+   */
+  @Test(timeout=300000)
+  public void testMultiThreadWithRowLocks() throws Exception {
+    //set a short timeout to get timeout exception when getting row lock fail
+    UTIL.getConfiguration().setInt("hbase.rpc.timeout", 2000);
+    UTIL.getConfiguration().setInt("hbase.client.operation.timeout", 4000);
+    UTIL.getConfiguration().setInt("hbase.client.retries.number", 10);
+
+    UTIL.createTable(TEST_TABLE2, BYTES_FAMILY);
+    List<Put> puts = new ArrayList<>();
+    for(int i = 0; i < 10; i++) {
+      Put put = new Put(Bytes.toBytes(i));
+      put.add(BYTES_FAMILY, BYTES_FAMILY, Bytes.toBytes((long)0));
+      puts.add(put);
+    }
+    List<Put> reversePuts = new ArrayList<>(puts);
+    Collections.reverse(reversePuts);
+    int NUM_OF_THREAD = 12;
+    CountDownLatch latch = new CountDownLatch(NUM_OF_THREAD);
+    CountDownLatch beginLatch = new CountDownLatch(1);
+    int threadNum = NUM_OF_THREAD / 4;
+    List<MultiThread> multiThreads = new ArrayList<>();
+    List<IncrementThread> incThreads = new ArrayList<>();
+    for(int i = 0; i < threadNum; i ++) {
+      MultiThread thread = new MultiThread(reversePuts, beginLatch, latch);
+      thread.start();
+      multiThreads.add(thread);
+    }
+    for(int i = 0; i < threadNum; i++) {
+      MultiThread thread = new MultiThread(puts, beginLatch, latch);
+      thread.start();
+      multiThreads.add(thread);
+    }
+    for(int i = 0; i < threadNum; i ++) {
+      IncrementThread thread = new IncrementThread(reversePuts, beginLatch, 
latch);
+      thread.start();
+      incThreads.add(thread);
+    }
+    for(int i = 0; i < threadNum; i++) {
+      IncrementThread thread = new IncrementThread(puts, beginLatch, latch);
+      thread.start();
+      incThreads.add(thread);
+    }
+    long timeBegin = System.currentTimeMillis();
+    beginLatch.countDown();
+    latch.await();
+    LOG.error("Time took:" + (System.currentTimeMillis() - timeBegin));
+    for(MultiThread thread : multiThreads) {
+      Assert.assertTrue(thread.throwable == null);
+    }
+    for(IncrementThread thread : incThreads) {
+      Assert.assertTrue(thread.throwable == null);
+    }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1d69503/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
index a02f56a..cd325c0 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
@@ -626,6 +626,15 @@ public class TestAtomicOperation {
     }
 
     @Override
+    public RowLock getRowLock(final byte[] row, boolean readLock, boolean 
waitForLock)
+      throws IOException {
+      if (testStep == TestStep.CHECKANDPUT_STARTED) {
+        latch.countDown();
+      }
+      return new WrappedRowLock(super.getRowLock(row, readLock, waitForLock));
+    }
+
+    @Override
     public RowLock getRowLock(final byte[] row, boolean readLock) throws 
IOException {
       if (testStep == TestStep.CHECKANDPUT_STARTED) {
         latch.countDown();

Reply via email to