HBASE-16698 Performance issue: handlers stuck waiting for CountDownLatch inside WALKey#getWriteEntry under high writing workload
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a7a4e17f Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a7a4e17f Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a7a4e17f Branch: refs/heads/branch-1 Commit: a7a4e17f1d04d389f87ad22da96d72cd3be050d9 Parents: 33e89fa Author: Yu Li <l...@apache.org> Authored: Thu Oct 20 15:32:59 2016 +0800 Committer: Yu Li <l...@apache.org> Committed: Thu Oct 20 15:32:59 2016 +0800 ---------------------------------------------------------------------- .../hadoop/hbase/regionserver/HRegion.java | 86 +++++++++++++++----- .../hbase/regionserver/wal/FSWALEntry.java | 21 +++-- .../org/apache/hadoop/hbase/wal/WALKey.java | 44 +++++++--- .../hadoop/hbase/regionserver/TestHRegion.java | 7 +- 4 files changed, 115 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/a7a4e17f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index c18564d..ca37eb1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -62,6 +62,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.lang.RandomStringUtils; @@ -206,6 +207,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY = "hbase.hregion.scan.loadColumnFamiliesOnDemand"; + /** Config key for using mvcc pre-assign feature for put */ + public static final String HREGION_MVCC_PRE_ASSIGN = "hbase.hregion.mvcc.preassign"; + public static final boolean DEFAULT_HREGION_MVCC_PRE_ASSIGN = true; + /** * Longest time we'll wait on a sequenceid. * Sequenceid comes up out of the WAL subsystem. WAL subsystem can go bad or a test might use @@ -604,6 +609,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private final Durability durability; private final boolean regionStatsEnabled; + // flag and lock for MVCC preassign + private final boolean mvccPreAssign; + private final ReentrantLock preAssignMvccLock; + /** * HRegion constructor. This constructor should only be used for testing and * extensions. Instances of HRegion should be instantiated with the @@ -753,6 +762,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi false : conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE); + + // get mvcc pre-assign flag and lock + this.mvccPreAssign = conf.getBoolean(HREGION_MVCC_PRE_ASSIGN, DEFAULT_HREGION_MVCC_PRE_ASSIGN); + if (this.mvccPreAssign) { + this.preAssignMvccLock = new ReentrantLock(); + } else { + this.preAssignMvccLock = null; + } } void setHTableSpecificConf() { @@ -2576,7 +2593,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // a timeout. May happen in tests after we tightened the semantic via HBASE-14317. // Also, the getSequenceId blocks on a latch. There is no global list of outstanding latches // so if an abort or stop, there is no way to call them in. - WALKey key = this.appendEmptyEdit(wal); + WALKey key = this.appendEmptyEdit(wal, null); mvcc.complete(key.getWriteEntry()); return key.getSequenceId(this.maxWaitForSeqId); } @@ -3283,25 +3300,44 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc); long replaySeqId = batchOp.getReplaySequenceId(); walKey.setOrigLogSeqNum(replaySeqId); - } - if (walEdit.size() > 0) { - if (!isInReplay) { - // we use HLogKey here instead of WALKey directly to support legacy coprocessors. - walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, - mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc); + if (walEdit.size() > 0) { + txid = + this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true); + } + } else { + try { + if (mvccPreAssign) { + preAssignMvccLock.lock(); + writeEntry = mvcc.begin(); + } + if (walEdit.size() > 0) { + // we use HLogKey here instead of WALKey directly to support legacy coprocessors. + walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, + mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc); + if (mvccPreAssign) { + walKey.setPreAssignedWriteEntry(writeEntry); + } + txid = + this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true); + } else { + // If this is a skip wal operation just get the read point from mvcc + walKey = this.appendEmptyEdit(this.wal, writeEntry); + } + } finally { + if (mvccPreAssign) { + preAssignMvccLock.unlock(); + } } - txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true); } // ------------------------------------ // Acquire the latest mvcc number // ---------------------------------- - if (walKey == null) { - // If this is a skip wal operation just get the read point from mvcc - walKey = this.appendEmptyEdit(this.wal); - } if (!isInReplay) { - writeEntry = walKey.getWriteEntry(); + if (writeEntry == null) { + // we need to wait for mvcc to be assigned here if not preassigned + writeEntry = walKey.getWriteEntry(); + } mvccNum = writeEntry.getWriteNumber(); } else { mvccNum = batchOp.getReplaySequenceId(); @@ -3324,7 +3360,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // We need to update the sequence id for following reasons. // 1) If the op is in replay mode, FSWALEntry#stampRegionSequenceId won't stamp sequence id. // 2) If no WAL, FSWALEntry won't be used - boolean updateSeqId = isInReplay || batchOp.getMutation(i).getDurability() == Durability.SKIP_WAL; + // 3) If mvcc preassigned, the asynchronous append may still hasn't run to + // FSWALEntry#stampRegionSequenceId and the cell seqId will be 0. So we need to update + // before apply to memstore to avoid scan return incorrect value + boolean updateSeqId = isInReplay + || batchOp.getMutation(i).getDurability() == Durability.SKIP_WAL || mvccPreAssign; if (updateSeqId) { updateSequenceId(familyMaps[i].values(), mvccNum); } @@ -7206,7 +7246,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if(walKey == null){ // since we use wal sequence Id as mvcc, for SKIP_WAL changes we need a "faked" WALEdit // to get a sequence id assigned which is done by FSWALEntry#stampRegionSequenceId - walKey = this.appendEmptyEdit(this.wal); + walKey = this.appendEmptyEdit(this.wal, null); } // 7. Start mvcc transaction @@ -7531,7 +7571,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi boolean updateSeqId = false; if (walKey == null) { // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned - walKey = this.appendEmptyEdit(this.wal); + walKey = this.appendEmptyEdit(this.wal, null); // If no WAL, FSWALEntry won't be used and no update for sequence id updateSeqId = true; } @@ -7770,7 +7810,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdits, true); } else { // Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned - walKey = this.appendEmptyEdit(this.wal); + walKey = this.appendEmptyEdit(this.wal, null); // If no WAL, FSWALEntry won't be used and no update for sequence id updateSeqId = true; } @@ -7991,9 +8031,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public static final long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + ClassSize.ARRAY + - 46 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT + + 47 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT + (14 * Bytes.SIZEOF_LONG) + - 5 * Bytes.SIZEOF_BOOLEAN); + 6 * Bytes.SIZEOF_BOOLEAN); // woefully out of date - currently missing: // 1 x HashMap - coprocessorServiceHandlers @@ -8575,15 +8615,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * Append a faked WALEdit in order to get a long sequence number and wal syncer will just ignore * the WALEdit append later. * @param wal + * @param writeEntry Preassigned writeEntry, if any * @return Return the key used appending with no sync and no append. * @throws IOException */ - private WALKey appendEmptyEdit(final WAL wal) throws IOException { + private WALKey appendEmptyEdit(final WAL wal, WriteEntry writeEntry) throws IOException { // we use HLogKey here instead of WALKey directly to support legacy coprocessors. @SuppressWarnings("deprecation") WALKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable(), WALKey.NO_SEQUENCE_ID, 0, null, HConstants.NO_NONCE, HConstants.NO_NONCE, getMVCC()); + if (writeEntry != null) { + key.setPreAssignedWriteEntry(writeEntry); + } // Call append but with an empty WALEdit. The returned sequence id will not be associated // with any edit and we can be sure it went in after all outstanding appends. http://git-wip-us.apache.org/repos/asf/hbase/blob/a7a4e17f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java index 7f3eb61..f55e185 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java @@ -111,11 +111,16 @@ class FSWALEntry extends Entry { */ long stampRegionSequenceId() throws IOException { long regionSequenceId = WALKey.NO_SEQUENCE_ID; - MultiVersionConcurrencyControl mvcc = getKey().getMvcc(); - MultiVersionConcurrencyControl.WriteEntry we = null; - - if (mvcc != null) { - we = mvcc.begin(); + WALKey key = getKey(); + MultiVersionConcurrencyControl.WriteEntry we = key.getPreAssignedWriteEntry(); + boolean preAssigned = (we != null); + if (!preAssigned) { + MultiVersionConcurrencyControl mvcc = key.getMvcc(); + if (mvcc != null) { + we = mvcc.begin(); + } + } + if (we != null) { regionSequenceId = we.getWriteNumber(); } @@ -126,9 +131,9 @@ class FSWALEntry extends Entry { } // This has to stay in this order - WALKey key = getKey(); - key.setLogSeqNum(regionSequenceId); - key.setWriteEntry(we); + if (!preAssigned) { + key.setWriteEntry(we); + } return regionSequenceId; } http://git-wip-us.apache.org/repos/asf/hbase/blob/a7a4e17f/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java index 05acd72..585c8f4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java @@ -33,6 +33,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; +import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry; import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -94,6 +95,10 @@ public class WALKey implements SequenceId, Comparable<WALKey> { */ @InterfaceAudience.Private // For internal use only. public MultiVersionConcurrencyControl.WriteEntry getWriteEntry() throws InterruptedIOException { + if (this.preAssignedWriteEntry != null) { + // don't wait for seqNumAssignedLatch if writeEntry is preassigned + return this.preAssignedWriteEntry; + } try { this.seqNumAssignedLatch.await(); } catch (InterruptedException ie) { @@ -114,7 +119,12 @@ public class WALKey implements SequenceId, Comparable<WALKey> { @InterfaceAudience.Private // For internal use only. public void setWriteEntry(MultiVersionConcurrencyControl.WriteEntry writeEntry) { + assert this.writeEntry == null : "Non-null writeEntry when trying to set one"; this.writeEntry = writeEntry; + // Set our sequenceid now using WriteEntry. + if (this.writeEntry != null) { + this.logSeqNum = this.writeEntry.getWriteNumber(); + } this.seqNumAssignedLatch.countDown(); } @@ -196,6 +206,7 @@ public class WALKey implements SequenceId, Comparable<WALKey> { private long nonce = HConstants.NO_NONCE; private MultiVersionConcurrencyControl mvcc; private MultiVersionConcurrencyControl.WriteEntry writeEntry; + private MultiVersionConcurrencyControl.WriteEntry preAssignedWriteEntry = null; public static final List<UUID> EMPTY_UUIDS = Collections.unmodifiableList(new ArrayList<UUID>()); // visible for deprecated HLogKey @@ -360,17 +371,6 @@ public class WALKey implements SequenceId, Comparable<WALKey> { } /** - * Allow that the log sequence id to be set post-construction - * Only public for org.apache.hadoop.hbase.regionserver.wal.FSWALEntry - * @param sequence - */ - @InterfaceAudience.Private - public void setLogSeqNum(final long sequence) { - this.logSeqNum = sequence; - - } - - /** * Used to set original seq Id for WALKey during wal replay * @param seqId */ @@ -666,4 +666,24 @@ public class WALKey implements SequenceId, Comparable<WALKey> { this.origLogSeqNum = walKey.getOrigSequenceNumber(); } } -} + + /** + * @return The preassigned writeEntry, if any + */ + @InterfaceAudience.Private // For internal use only. + public MultiVersionConcurrencyControl.WriteEntry getPreAssignedWriteEntry() { + return this.preAssignedWriteEntry; + } + + /** + * Preassign writeEntry + * @param writeEntry the entry to assign + */ + @InterfaceAudience.Private // For internal use only. + public void setPreAssignedWriteEntry(WriteEntry writeEntry) { + if (writeEntry != null) { + this.preAssignedWriteEntry = writeEntry; + this.logSeqNum = writeEntry.getWriteNumber(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/a7a4e17f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 7cf76fc..6ba0351 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -6200,8 +6200,11 @@ public class TestHRegion { @Override public Long answer(InvocationOnMock invocation) throws Throwable { WALKey key = invocation.getArgumentAt(2, WALKey.class); - MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(); - key.setWriteEntry(we); + MultiVersionConcurrencyControl.WriteEntry we = key.getPreAssignedWriteEntry(); + if (we == null) { + we = key.getMvcc().begin(); + key.setWriteEntry(we); + } return 1L; }