HBASE-16698 Performance issue: handlers stuck waiting for CountDownLatch inside 
WALKey#getWriteEntry under high writing workload


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a7a4e17f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a7a4e17f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a7a4e17f

Branch: refs/heads/branch-1
Commit: a7a4e17f1d04d389f87ad22da96d72cd3be050d9
Parents: 33e89fa
Author: Yu Li <l...@apache.org>
Authored: Thu Oct 20 15:32:59 2016 +0800
Committer: Yu Li <l...@apache.org>
Committed: Thu Oct 20 15:32:59 2016 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/HRegion.java      | 86 +++++++++++++++-----
 .../hbase/regionserver/wal/FSWALEntry.java      | 21 +++--
 .../org/apache/hadoop/hbase/wal/WALKey.java     | 44 +++++++---
 .../hadoop/hbase/regionserver/TestHRegion.java  |  7 +-
 4 files changed, 115 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/a7a4e17f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index c18564d..ca37eb1 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -62,6 +62,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.lang.RandomStringUtils;
@@ -206,6 +207,10 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
   public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY =
     "hbase.hregion.scan.loadColumnFamiliesOnDemand";
 
+  /** Config key for using mvcc pre-assign feature for put */
+  public static final String HREGION_MVCC_PRE_ASSIGN = 
"hbase.hregion.mvcc.preassign";
+  public static final boolean DEFAULT_HREGION_MVCC_PRE_ASSIGN = true;
+
   /**
    * Longest time we'll wait on a sequenceid.
    * Sequenceid comes up out of the WAL subsystem. WAL subsystem can go bad or 
a test might use
@@ -604,6 +609,10 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
   private final Durability durability;
   private final boolean regionStatsEnabled;
 
+  // flag and lock for MVCC preassign
+  private final boolean mvccPreAssign;
+  private final ReentrantLock preAssignMvccLock;
+
   /**
    * HRegion constructor. This constructor should only be used for testing and
    * extensions.  Instances of HRegion should be instantiated with the
@@ -753,6 +762,14 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
           false :
           conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE,
               HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE);
+
+    // get mvcc pre-assign flag and lock
+    this.mvccPreAssign = conf.getBoolean(HREGION_MVCC_PRE_ASSIGN, 
DEFAULT_HREGION_MVCC_PRE_ASSIGN);
+    if (this.mvccPreAssign) {
+      this.preAssignMvccLock = new ReentrantLock();
+    } else {
+      this.preAssignMvccLock = null;
+    }
   }
 
   void setHTableSpecificConf() {
@@ -2576,7 +2593,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
     // a timeout. May happen in tests after we tightened the semantic via 
HBASE-14317.
     // Also, the getSequenceId blocks on a latch. There is no global list of 
outstanding latches
     // so if an abort or stop, there is no way to call them in.
-    WALKey key = this.appendEmptyEdit(wal);
+    WALKey key = this.appendEmptyEdit(wal, null);
     mvcc.complete(key.getWriteEntry());
     return key.getSequenceId(this.maxWaitForSeqId);
   }
@@ -3283,25 +3300,44 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
           mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc);
         long replaySeqId = batchOp.getReplaySequenceId();
         walKey.setOrigLogSeqNum(replaySeqId);
-      }
-      if (walEdit.size() > 0) {
-        if (!isInReplay) {
-        // we use HLogKey here instead of WALKey directly to support legacy 
coprocessors.
-        walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
-            this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
-            mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc);
+        if (walEdit.size() > 0) {
+          txid =
+              this.wal.append(this.htableDescriptor, this.getRegionInfo(), 
walKey, walEdit, true);
+        }
+      } else {
+        try {
+          if (mvccPreAssign) {
+            preAssignMvccLock.lock();
+            writeEntry = mvcc.begin();
+          }
+          if (walEdit.size() > 0) {
+            // we use HLogKey here instead of WALKey directly to support 
legacy coprocessors.
+            walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
+                this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, 
now,
+                mutation.getClusterIds(), currentNonceGroup, currentNonce, 
mvcc);
+            if (mvccPreAssign) {
+              walKey.setPreAssignedWriteEntry(writeEntry);
+            }
+            txid =
+                this.wal.append(this.htableDescriptor, this.getRegionInfo(), 
walKey, walEdit, true);
+          } else {
+            // If this is a skip wal operation just get the read point from 
mvcc
+            walKey = this.appendEmptyEdit(this.wal, writeEntry);
+          }
+        } finally {
+          if (mvccPreAssign) {
+            preAssignMvccLock.unlock();
+          }
         }
-        txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), 
walKey, walEdit, true);
       }
       // ------------------------------------
       // Acquire the latest mvcc number
       // ----------------------------------
-      if (walKey == null) {
-        // If this is a skip wal operation just get the read point from mvcc
-        walKey = this.appendEmptyEdit(this.wal);
-      }
       if (!isInReplay) {
-        writeEntry = walKey.getWriteEntry();
+        if (writeEntry == null) {
+          // we need to wait for mvcc to be assigned here if not preassigned
+          writeEntry = walKey.getWriteEntry();
+        }
         mvccNum = writeEntry.getWriteNumber();
       } else {
         mvccNum = batchOp.getReplaySequenceId();
@@ -3324,7 +3360,11 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
         // We need to update the sequence id for following reasons.
         // 1) If the op is in replay mode, FSWALEntry#stampRegionSequenceId 
won't stamp sequence id.
         // 2) If no WAL, FSWALEntry won't be used
-        boolean updateSeqId = isInReplay || 
batchOp.getMutation(i).getDurability() == Durability.SKIP_WAL;
+        // 3) If mvcc preassigned, the asynchronous append may still hasn't 
run to
+        // FSWALEntry#stampRegionSequenceId and the cell seqId will be 0. So 
we need to update
+        // before apply to memstore to avoid scan return incorrect value
+        boolean updateSeqId = isInReplay
+            || batchOp.getMutation(i).getDurability() == Durability.SKIP_WAL 
|| mvccPreAssign;
         if (updateSeqId) {
           updateSequenceId(familyMaps[i].values(), mvccNum);
         }
@@ -7206,7 +7246,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
           if(walKey == null){
             // since we use wal sequence Id as mvcc, for SKIP_WAL changes we 
need a "faked" WALEdit
             // to get a sequence id assigned which is done by 
FSWALEntry#stampRegionSequenceId
-            walKey = this.appendEmptyEdit(this.wal);
+            walKey = this.appendEmptyEdit(this.wal, null);
           }
 
           // 7. Start mvcc transaction
@@ -7531,7 +7571,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
           boolean updateSeqId = false;
           if (walKey == null) {
             // Append a faked WALEdit in order for SKIP_WAL updates to get 
mvcc assigned
-            walKey = this.appendEmptyEdit(this.wal);
+            walKey = this.appendEmptyEdit(this.wal, null);
             // If no WAL, FSWALEntry won't be used and no update for sequence 
id
             updateSeqId = true;
           }
@@ -7770,7 +7810,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
               this.wal.append(this.htableDescriptor, this.getRegionInfo(), 
walKey, walEdits, true);
           } else {
             // Append a faked WALEdit in order for SKIP_WAL updates to get 
mvccNum assigned
-            walKey = this.appendEmptyEdit(this.wal);
+            walKey = this.appendEmptyEdit(this.wal, null);
             // If no WAL, FSWALEntry won't be used and no update for sequence 
id
             updateSeqId = true;
           }
@@ -7991,9 +8031,9 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
   public static final long FIXED_OVERHEAD = ClassSize.align(
       ClassSize.OBJECT +
       ClassSize.ARRAY +
-      46 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
+      47 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
       (14 * Bytes.SIZEOF_LONG) +
-      5 * Bytes.SIZEOF_BOOLEAN);
+      6 * Bytes.SIZEOF_BOOLEAN);
 
   // woefully out of date - currently missing:
   // 1 x HashMap - coprocessorServiceHandlers
@@ -8575,15 +8615,19 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
    * Append a faked WALEdit in order to get a long sequence number and wal 
syncer will just ignore
    * the WALEdit append later.
    * @param wal
+   * @param writeEntry Preassigned writeEntry, if any
    * @return Return the key used appending with no sync and no append.
    * @throws IOException
    */
-  private WALKey appendEmptyEdit(final WAL wal) throws IOException {
+  private WALKey appendEmptyEdit(final WAL wal, WriteEntry writeEntry) throws 
IOException {
     // we use HLogKey here instead of WALKey directly to support legacy 
coprocessors.
     @SuppressWarnings("deprecation")
     WALKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(),
       getRegionInfo().getTable(), WALKey.NO_SEQUENCE_ID, 0, null,
       HConstants.NO_NONCE, HConstants.NO_NONCE, getMVCC());
+    if (writeEntry != null) {
+      key.setPreAssignedWriteEntry(writeEntry);
+    }
 
     // Call append but with an empty WALEdit.  The returned sequence id will 
not be associated
     // with any edit and we can be sure it went in after all outstanding 
appends.

http://git-wip-us.apache.org/repos/asf/hbase/blob/a7a4e17f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
index 7f3eb61..f55e185 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
@@ -111,11 +111,16 @@ class FSWALEntry extends Entry {
    */
   long stampRegionSequenceId() throws IOException {
     long regionSequenceId = WALKey.NO_SEQUENCE_ID;
-    MultiVersionConcurrencyControl mvcc = getKey().getMvcc();
-    MultiVersionConcurrencyControl.WriteEntry we = null;
-
-    if (mvcc != null) {
-      we = mvcc.begin();
+    WALKey key = getKey();
+    MultiVersionConcurrencyControl.WriteEntry we = 
key.getPreAssignedWriteEntry();
+    boolean preAssigned = (we != null);
+    if (!preAssigned) {
+      MultiVersionConcurrencyControl mvcc = key.getMvcc();
+      if (mvcc != null) {
+        we = mvcc.begin();
+      }
+    }
+    if (we != null) {
       regionSequenceId = we.getWriteNumber();
     }
 
@@ -126,9 +131,9 @@ class FSWALEntry extends Entry {
     }
 
     // This has to stay in this order
-    WALKey key = getKey();
-    key.setLogSeqNum(regionSequenceId);
-    key.setWriteEntry(we);
+    if (!preAssigned) {
+      key.setWriteEntry(we);
+    }
     return regionSequenceId;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/a7a4e17f/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
index 05acd72..585c8f4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
@@ -33,6 +33,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import 
org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry;
 import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -94,6 +95,10 @@ public class WALKey implements SequenceId, 
Comparable<WALKey> {
    */
   @InterfaceAudience.Private // For internal use only.
   public MultiVersionConcurrencyControl.WriteEntry getWriteEntry() throws 
InterruptedIOException {
+    if (this.preAssignedWriteEntry != null) {
+      // don't wait for seqNumAssignedLatch if writeEntry is preassigned
+      return this.preAssignedWriteEntry;
+    }
     try {
       this.seqNumAssignedLatch.await();
     } catch (InterruptedException ie) {
@@ -114,7 +119,12 @@ public class WALKey implements SequenceId, 
Comparable<WALKey> {
 
   @InterfaceAudience.Private // For internal use only.
   public void setWriteEntry(MultiVersionConcurrencyControl.WriteEntry 
writeEntry) {
+    assert this.writeEntry == null : "Non-null writeEntry when trying to set 
one";
     this.writeEntry = writeEntry;
+    // Set our sequenceid now using WriteEntry.
+    if (this.writeEntry != null) {
+      this.logSeqNum = this.writeEntry.getWriteNumber();
+    }
     this.seqNumAssignedLatch.countDown();
   }
 
@@ -196,6 +206,7 @@ public class WALKey implements SequenceId, 
Comparable<WALKey> {
   private long nonce = HConstants.NO_NONCE;
   private MultiVersionConcurrencyControl mvcc;
   private MultiVersionConcurrencyControl.WriteEntry writeEntry;
+  private MultiVersionConcurrencyControl.WriteEntry preAssignedWriteEntry = 
null;
   public static final List<UUID> EMPTY_UUIDS = 
Collections.unmodifiableList(new ArrayList<UUID>());
 
   // visible for deprecated HLogKey
@@ -360,17 +371,6 @@ public class WALKey implements SequenceId, 
Comparable<WALKey> {
   }
 
   /**
-   * Allow that the log sequence id to be set post-construction
-   * Only public for org.apache.hadoop.hbase.regionserver.wal.FSWALEntry
-   * @param sequence
-   */
-  @InterfaceAudience.Private
-  public void setLogSeqNum(final long sequence) {
-    this.logSeqNum = sequence;
-
-  }
-
-  /**
    * Used to set original seq Id for WALKey during wal replay
    * @param seqId
    */
@@ -666,4 +666,24 @@ public class WALKey implements SequenceId, 
Comparable<WALKey> {
       this.origLogSeqNum = walKey.getOrigSequenceNumber();
     }
   }
-}
+
+  /**
+   * @return The preassigned writeEntry, if any
+   */
+  @InterfaceAudience.Private // For internal use only.
+  public MultiVersionConcurrencyControl.WriteEntry getPreAssignedWriteEntry() {
+    return this.preAssignedWriteEntry;
+  }
+
+  /**
+   * Preassign writeEntry
+   * @param writeEntry the entry to assign
+   */
+  @InterfaceAudience.Private // For internal use only.
+  public void setPreAssignedWriteEntry(WriteEntry writeEntry) {
+    if (writeEntry != null) {
+      this.preAssignedWriteEntry = writeEntry;
+      this.logSeqNum = writeEntry.getWriteNumber();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/a7a4e17f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 7cf76fc..6ba0351 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -6200,8 +6200,11 @@ public class TestHRegion {
         @Override
         public Long answer(InvocationOnMock invocation) throws Throwable {
           WALKey key = invocation.getArgumentAt(2, WALKey.class);
-          MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin();
-          key.setWriteEntry(we);
+          MultiVersionConcurrencyControl.WriteEntry we = 
key.getPreAssignedWriteEntry();
+          if (we == null) {
+            we = key.getMvcc().begin();
+            key.setWriteEntry(we);
+          }
           return 1L;
         }
 

Reply via email to