HBASE-14374 Backport parent 'HBASE-14317 Stuck FSHLog' issue to 1.1
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/0bf97bac Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/0bf97bac Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/0bf97bac Branch: refs/heads/branch-1.1 Commit: 0bf97bac2ed564994a0bcda5f1993260bf0b448f Parents: 5b0f30d Author: stack <st...@apache.org> Authored: Wed Sep 23 12:05:25 2015 -0700 Committer: stack <st...@apache.org> Committed: Wed Sep 23 12:05:25 2015 -0700 ---------------------------------------------------------------------- .../hadoop/hbase/regionserver/HRegion.java | 110 +++++--- .../hadoop/hbase/regionserver/LogRoller.java | 22 +- .../MultiVersionConsistencyControl.java | 24 +- .../regionserver/wal/DamagedWALException.java | 45 +++ .../hadoop/hbase/regionserver/wal/FSHLog.java | 169 ++++++++--- .../hbase/regionserver/wal/FSWALEntry.java | 2 +- .../hadoop/hbase/regionserver/wal/HLogKey.java | 4 + .../regionserver/wal/ProtobufLogReader.java | 9 +- .../regionserver/wal/ProtobufLogWriter.java | 2 +- .../hbase/regionserver/wal/SyncFuture.java | 4 +- .../org/apache/hadoop/hbase/wal/WALKey.java | 31 ++- .../master/TestDistributedLogSplitting.java | 3 +- .../hbase/regionserver/TestFSErrorsExposed.java | 4 +- .../regionserver/TestFailedAppendAndSync.java | 273 ++++++++++++++++++ .../hadoop/hbase/regionserver/TestHRegion.java | 107 +++++-- .../TestMultiVersionConsistencyControl.java | 4 +- .../hbase/regionserver/TestWALLockup.java | 279 +++++++++++++++++++ .../hbase/regionserver/wal/TestLogRolling.java | 15 +- 18 files changed, 970 insertions(+), 137 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/0bf97bac/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index e23791a..bc68fbd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -77,6 +77,7 @@ import org.apache.hadoop.hbase.CompoundConfiguration; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants.OperationStatusCode; @@ -200,7 +201,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public static final Log LOG = LogFactory.getLog(HRegion.class); public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY = - "hbase.hregion.scan.loadColumnFamiliesOnDemand"; + "hbase.hregion.scan.loadColumnFamiliesOnDemand"; + + /** + * Longest time we'll wait on a sequenceid. + * Sequenceid comes up out of the WAL subsystem. WAL subsystem can go bad or a test might use + * it without cleanup previous usage properly; generally, a WAL roll is needed. + * Key to use changing the default of 30000ms. + */ + private final int maxWaitForSeqId; + private static final String MAX_WAIT_FOR_SEQ_ID_KEY = "hbase.hregion.max.wait.for.sequenceid.ms"; + private static final int DEFAULT_MAX_WAIT_FOR_SEQ_ID = 30000; /** * This is the global default value for durability. All tables/mutations not @@ -233,7 +244,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * file in this sequence id's order; i.e. edit #2 will be in the WAL after edit #1. * Its default value is -1L. This default is used as a marker to indicate * that the region hasn't opened yet. Once it is opened, it is set to the derived - * {@link #openSeqNum}, the largest sequence id of all hfiles opened under this Region. + * #openSeqNum, the largest sequence id of all hfiles opened under this Region. * * <p>Control of this sequence is handed off to the WAL implementation. It is responsible * for tagging edits with the correct sequence id since it is responsible for getting the @@ -671,6 +682,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration", DEFAULT_ROWLOCK_WAIT_DURATION); + this.maxWaitForSeqId = conf.getInt(MAX_WAIT_FOR_SEQ_ID_KEY, DEFAULT_MAX_WAIT_FOR_SEQ_ID); this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, true); this.htableDescriptor = htd; this.rsServices = rsServices; @@ -2078,7 +2090,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (this.memstoreSize.get() <= 0) { // Take an update lock because am about to change the sequence id and we want the sequence id // to be at the border of the empty memstore. - MultiVersionConsistencyControl.WriteEntry w = null; + MultiVersionConsistencyControl.WriteEntry writeEntry = null; this.updatesLock.writeLock().lock(); try { if (this.memstoreSize.get() <= 0) { @@ -2089,14 +2101,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // etc.) // wal can be null replaying edits. if (wal != null) { - w = mvcc.beginMemstoreInsert(); + writeEntry = mvcc.beginMemstoreInsert(); long flushOpSeqId = getNextSequenceId(wal); FlushResult flushResult = new FlushResultImpl( FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, flushOpSeqId, "Nothing to flush", writeFlushRequestMarkerToWAL(wal, writeFlushWalMarker)); - w.setWriteNumber(flushOpSeqId); - mvcc.waitForPreviousTransactionsComplete(w); - w = null; + writeEntry.setWriteNumber(flushOpSeqId); + mvcc.waitForPreviousTransactionsComplete(writeEntry); + writeEntry = null; return new PrepareFlushResult(flushResult, myseqid); } else { return new PrepareFlushResult( @@ -2107,8 +2119,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } finally { this.updatesLock.writeLock().unlock(); - if (w != null) { - mvcc.advanceMemstore(w); + if (writeEntry != null) { + mvcc.advanceMemstore(writeEntry); } } } @@ -2131,7 +2143,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // to do this for a moment. It is quick. We also set the memstore size to zero here before we // allow updates again so its value will represent the size of the updates received // during flush - MultiVersionConsistencyControl.WriteEntry w = null; + MultiVersionConsistencyControl.WriteEntry writeEntry = null; // We have to take an update lock during snapshot, or else a write could end up in both snapshot // and memstore (makes it difficult to do atomic rows then) status.setStatus("Obtaining lock to block concurrent updates"); @@ -2163,7 +2175,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi long trxId = 0; try { try { - w = mvcc.beginMemstoreInsert(); + writeEntry = mvcc.beginMemstoreInsert(); if (wal != null) { Long earliestUnflushedSequenceIdForTheRegion = wal.startCacheFlush(encodedRegionName, flushedFamilyNames); @@ -2236,8 +2248,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi try { wal.sync(); // ensure that flush marker is sync'ed } catch (IOException ioe) { - LOG.warn("Unexpected exception while wal.sync(), ignoring. Exception: " - + StringUtils.stringifyException(ioe)); + wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes()); + throw ioe; } } @@ -2246,14 +2258,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // uncommitted transactions from being written into HFiles. // We have to block before we start the flush, otherwise keys that // were removed via a rollbackMemstore could be written to Hfiles. - w.setWriteNumber(flushOpSeqId); - mvcc.waitForPreviousTransactionsComplete(w); + writeEntry.setWriteNumber(flushOpSeqId); + mvcc.waitForPreviousTransactionsComplete(writeEntry); // set w to null to prevent mvcc.advanceMemstore from being called again inside finally block - w = null; + writeEntry = null; } finally { - if (w != null) { - // in case of failure just mark current w as complete - mvcc.advanceMemstore(w); + if (writeEntry != null) { + // in case of failure just mark current writeEntry as complete + mvcc.advanceMemstore(writeEntry); } } return new PrepareFlushResult(storeFlushCtxs, committedFiles, storeFlushableSize, startTime, flushOpSeqId, @@ -2430,8 +2442,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi */ @VisibleForTesting protected long getNextSequenceId(final WAL wal) throws IOException { + // TODO: For review. Putting an empty edit in to get a sequenceid out will not work if the + // WAL is banjaxed... if it has gotten an exception and the WAL has not yet been rolled or + // aborted. In this case, we'll just get stuck here. For now, until HBASE-12751, just have + // a timeout. May happen in tests after we tightened the semantic via HBASE-14317. + // Also, the getSequenceId blocks on a latch. There is no global list of outstanding latches WALKey key = this.appendEmptyEdit(wal, null); - return key.getSequenceId(); + return key.getSequenceId(maxWaitForSeqId); } ////////////////////////////////////////////////////////////////////////////// @@ -2877,7 +2894,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi long currentNonceGroup = HConstants.NO_NONCE, currentNonce = HConstants.NO_NONCE; WALEdit walEdit = new WALEdit(isInReplay); - MultiVersionConsistencyControl.WriteEntry w = null; + MultiVersionConsistencyControl.WriteEntry writeEntry = null; long txid = 0; boolean doRollBackMemstore = false; boolean locked = false; @@ -3030,7 +3047,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // ------------------------------------ // Acquire the latest mvcc number // ---------------------------------- - w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum); + writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum); // calling the pre CP hook for batch mutation if (!isInReplay && coprocessorHost != null) { @@ -3145,7 +3162,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, getSequenceId(), true, memstoreCells); } - if(walKey == null){ + if (walKey == null){ // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned walKey = this.appendEmptyEdit(this.wal, memstoreCells); } @@ -3179,9 +3196,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // ------------------------------------------------------------------ // STEP 8. Advance mvcc. This will make this put visible to scanners and getters. // ------------------------------------------------------------------ - if (w != null) { - mvcc.completeMemstoreInsertWithSeqNum(w, walKey); - w = null; + if (writeEntry != null) { + mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey); + writeEntry = null; } // ------------------------------------ @@ -3210,9 +3227,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // if the wal sync was unsuccessful, remove keys from memstore if (doRollBackMemstore) { rollbackMemstore(memstoreCells); - } - if (w != null) { - mvcc.completeMemstoreInsertWithSeqNum(w, walKey); + if (writeEntry != null) mvcc.cancelMemstoreInsert(writeEntry); + } else if (writeEntry != null) { + mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey); } if (locked) { @@ -6743,6 +6760,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi processor.postBatchMutate(this); } } finally { + // TODO: Make this method look like all other methods that are doing append/sync and + // memstore rollback such as append and doMiniBatchMutation. Currently it is a little + // different. Make them all share same code! if (!mutations.isEmpty() && !walSyncSuccessful) { LOG.warn("Wal sync failed. Roll back " + mutations.size() + " memstore keyvalues for row(s):" + StringUtils.byteToHexString( @@ -6753,6 +6773,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi getStore(cell).rollback(cell); } } + if (writeEntry != null) { + mvcc.cancelMemstoreInsert(writeEntry); + writeEntry = null; + } } // 13. Roll mvcc forward if (writeEntry != null) { @@ -6854,7 +6878,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi startRegionOperation(Operation.APPEND); this.writeRequestsCount.increment(); long mvccNum = 0; - WriteEntry w = null; + WriteEntry writeEntry = null; WALKey walKey = null; RowLock rowLock = null; List<Cell> memstoreCells = new ArrayList<Cell>(); @@ -6875,7 +6899,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // now start my own transaction mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId); - w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum); + writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum); long now = EnvironmentEdgeManager.currentTime(); // Process each family for (Map.Entry<byte[], List<Cell>> family : append.getFamilyCellMap().entrySet()) { @@ -7068,10 +7092,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // if the wal sync was unsuccessful, remove keys from memstore if (doRollBackMemstore) { rollbackMemstore(memstoreCells); + if (writeEntry != null) mvcc.cancelMemstoreInsert(writeEntry); + } else if (writeEntry != null) { + mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey); } - if (w != null) { - mvcc.completeMemstoreInsertWithSeqNum(w, walKey); - } + closeRegionOperation(Operation.APPEND); } @@ -7118,7 +7143,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi startRegionOperation(Operation.INCREMENT); this.writeRequestsCount.increment(); RowLock rowLock = null; - WriteEntry w = null; + WriteEntry writeEntry = null; WALKey walKey = null; long mvccNum = 0; List<Cell> memstoreCells = new ArrayList<Cell>(); @@ -7139,7 +7164,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // now start my own transaction mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId); - w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum); + writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum); long now = EnvironmentEdgeManager.currentTime(); // Process each family for (Map.Entry<byte [], List<Cell>> family: @@ -7309,9 +7334,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // if the wal sync was unsuccessful, remove keys from memstore if (doRollBackMemstore) { rollbackMemstore(memstoreCells); - } - if (w != null) { - mvcc.completeMemstoreInsertWithSeqNum(w, walKey); + if (writeEntry != null) mvcc.cancelMemstoreInsert(writeEntry); + } else if (writeEntry != null) { + mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey); } closeRegionOperation(Operation.INCREMENT); if (this.metricsRegion != null) { @@ -7342,7 +7367,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public static final long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + ClassSize.ARRAY + - 45 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + + 45 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT + (14 * Bytes.SIZEOF_LONG) + 5 * Bytes.SIZEOF_BOOLEAN); @@ -7989,12 +8014,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi */ private WALKey appendEmptyEdit(final WAL wal, List<Cell> cells) throws IOException { // we use HLogKey here instead of WALKey directly to support legacy coprocessors. + @SuppressWarnings("deprecation") WALKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable(), WALKey.NO_SEQUENCE_ID, 0, null, HConstants.NO_NONCE, HConstants.NO_NONCE); // Call append but with an empty WALEdit. The returned seqeunce id will not be associated // with any edit and we can be sure it went in after all outstanding appends. - wal.append(getTableDesc(), getRegionInfo(), key, - WALEdit.EMPTY_WALEDIT, this.sequenceId, false, cells); + wal.append(getTableDesc(), getRegionInfo(), key, WALEdit.EMPTY_WALEDIT, getSequenceId(), false, + cells); return key; } http://git-wip-us.apache.org/repos/asf/hbase/blob/0bf97bac/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java index e767ced..a3b5af3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java @@ -104,6 +104,15 @@ public class LogRoller extends HasThread { } @Override + public void interrupt() { + // Wake up if we are waiting on rollLog. For tests. + synchronized (rollLog) { + this.rollLog.notify(); + } + super.interrupt(); + } + + @Override public void run() { while (!server.isStopped()) { long now = System.currentTimeMillis(); @@ -113,7 +122,9 @@ public class LogRoller extends HasThread { if (!periodic) { synchronized (rollLog) { try { - if (!rollLog.get()) rollLog.wait(this.threadWakeFrequency); + if (!rollLog.get()) { + rollLog.wait(this.threadWakeFrequency); + } } catch (InterruptedException e) { // Fall through } @@ -146,12 +157,14 @@ public class LogRoller extends HasThread { } catch (java.net.ConnectException e) { server.abort("Failed log close in log roller", e); } catch (IOException ex) { + LOG.fatal("Aborting", ex); // Abort if we get here. We probably won't recover an IOE. HBASE-1132 server.abort("IOE in log roller", RemoteExceptionHandler.checkIOException(ex)); } catch (Exception ex) { - LOG.error("Log rolling failed", ex); - server.abort("Log rolling failed", ex); + final String msg = "Failed rolling WAL; aborting to recover edits!"; + LOG.error(msg, ex); + server.abort(msg, ex); } finally { try { rollLog.set(false); @@ -184,5 +197,4 @@ public class LogRoller extends HasThread { requester); } } - -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/0bf97bac/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java index 96af2c3..fee15dd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java @@ -79,10 +79,11 @@ public class MultiVersionConsistencyControl { // current MVCC completes. Theoretically the bump only needs to be 2 * the number of handlers // because each handler could increment sequence num twice and max concurrent in-flight // transactions is the number of RPC handlers. - // we can't use Long.MAX_VALUE because we still want to maintain the ordering when multiple - // changes touch same row key + // We can't use Long.MAX_VALUE because we still want to maintain the ordering when multiple + // changes touch same row key. // If for any reason, the bumped value isn't reset due to failure situations, we'll reset - // curSeqNum to NO_WRITE_NUMBER in order NOT to advance memstore read point at all + // curSeqNum to NO_WRITE_NUMBER in order NOT to advance memstore read point at all. + // St.Ack 20150901 Where is the reset to NO_WRITE_NUMBER done? return sequenceId.incrementAndGet() + 1000000000; } @@ -128,6 +129,23 @@ public class MultiVersionConsistencyControl { } /** + * Cancel a write insert that failed. + * Removes the write entry without advancing read point or without interfering with write + * entries queued behind us. It is like #advanceMemstore(WriteEntry) only this method + * will move the read point to the sequence id that is in WriteEntry even if it ridiculous (see + * the trick in HRegion where we call {@link #getPreAssignedWriteNumber(AtomicLong)} just to mark + * it as for special handling). + * @param writeEntry Failed attempt at write. Does cleanup. + */ + public void cancelMemstoreInsert(WriteEntry writeEntry) { + // I'm not clear on how this voodoo all works but setting write number to -1 does NOT advance + // readpoint and gets my little writeEntry completed and removed from queue of outstanding + // events which seems right. St.Ack 20150901. + writeEntry.setWriteNumber(NO_WRITE_NUMBER); + advanceMemstore(writeEntry); + } + + /** * Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}. At the * end of this call, the global read point is at least as large as the write point of the passed * in WriteEntry. Thus, the write is visible to MVCC readers. http://git-wip-us.apache.org/repos/asf/hbase/blob/0bf97bac/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DamagedWALException.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DamagedWALException.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DamagedWALException.java new file mode 100644 index 0000000..6c57f56 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DamagedWALException.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Thrown when a failed append or sync on a WAL. + * Thrown when WAL can no longer be used. Roll the WAL. + */ +@SuppressWarnings("serial") +@InterfaceAudience.Private +public class DamagedWALException extends HBaseIOException { + public DamagedWALException() { + super(); + } + + public DamagedWALException(String message) { + super(message); + } + + public DamagedWALException(String message, Throwable cause) { + super(message, cause); + } + + public DamagedWALException(Throwable cause) { + super(cause); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/0bf97bac/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 47bdbc1..6f5b273 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -117,6 +117,16 @@ import com.lmax.disruptor.dsl.ProducerType; * * <p>To read an WAL, call {@link WALFactory#createReader(org.apache.hadoop.fs.FileSystem, * org.apache.hadoop.fs.Path)}. + * + * <h2>Failure Semantic</h2> + * If an exception on append or sync, roll the WAL because the current WAL is now a lame duck; + * any more appends or syncs will fail also with the same original exception. If we have made + * successful appends to the WAL and we then are unable to sync them, our current semantic is to + * return error to the client that the appends failed but also to abort the current context, + * usually the hosting server. We need to replay the WALs. TODO: Change this semantic. A roll of + * WAL may be sufficient as long as we have flagged client that the append failed. TODO: + * replication may pick up these last edits though they have been marked as failed append (Need to + * keep our own file lengths, not rely on HDFS). */ @InterfaceAudience.Private public class FSHLog implements WAL { @@ -386,7 +396,7 @@ public class FSHLog implements WAL { * WAL Comparator; it compares the timestamp (log filenum), present in the log file name. * Throws an IllegalArgumentException if used to compare paths from different wals. */ - public final Comparator<Path> LOG_NAME_COMPARATOR = new Comparator<Path>() { + final Comparator<Path> LOG_NAME_COMPARATOR = new Comparator<Path>() { @Override public int compare(Path o1, Path o2) { long t1 = getFileNumFromFileName(o1); @@ -490,7 +500,7 @@ public class FSHLog implements WAL { prefix == null || prefix.isEmpty() ? "wal" : URLEncoder.encode(prefix, "UTF8"); // we only correctly differentiate suffices when numeric ones start with '.' if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER))) { - throw new IllegalArgumentException("wal suffix must start with '" + WAL_FILE_NAME_DELIMITER + + throw new IllegalArgumentException("WAL suffix must start with '" + WAL_FILE_NAME_DELIMITER + "' but instead was '" + suffix + "'"); } // Now that it exists, set the storage policy for the entire directory of wal files related to @@ -614,7 +624,9 @@ public class FSHLog implements WAL { */ @VisibleForTesting OutputStream getOutputStream() { - return this.hdfs_out.getWrappedStream(); + FSDataOutputStream fsdos = this.hdfs_out; + if (fsdos == null) return null; + return fsdos.getWrappedStream(); } @Override @@ -897,6 +909,19 @@ public class FSHLog implements WAL { } /** + * Used to manufacture race condition reliably. For testing only. + * @see #beforeWaitOnSafePoint() + */ + @VisibleForTesting + protected void afterCreatingZigZagLatch() {} + + /** + * @see #afterCreatingZigZagLatch() + */ + @VisibleForTesting + protected void beforeWaitOnSafePoint() {}; + + /** * Cleans up current writer closing it and then puts in place the passed in * <code>nextWriter</code>. * @@ -925,6 +950,7 @@ public class FSHLog implements WAL { SyncFuture syncFuture = null; SafePointZigZagLatch zigzagLatch = (this.ringBufferEventHandler == null)? null: this.ringBufferEventHandler.attainSafePoint(); + afterCreatingZigZagLatch(); TraceScope scope = Trace.startSpan("FSHFile.replaceWriter"); try { // Wait on the safe point to be achieved. Send in a sync in case nothing has hit the @@ -938,9 +964,9 @@ public class FSHLog implements WAL { syncFuture = zigzagLatch.waitSafePoint(publishSyncOnRingBuffer()); } } catch (FailedSyncBeforeLogCloseException e) { + // If unflushed/unsynced entries on close, it is reason to abort. if (isUnflushedEntries()) throw e; - // Else, let is pass through to the close. - LOG.warn("Failed last sync but no outstanding unsync edits so falling through to close; " + + LOG.warn("Failed sync-before-close but no outstanding appends; closing WAL: " + e.getMessage()); } @@ -991,8 +1017,19 @@ public class FSHLog implements WAL { // Let the writer thread go regardless, whether error or not. if (zigzagLatch != null) { zigzagLatch.releaseSafePoint(); - // It will be null if we failed our wait on safe point above. - if (syncFuture != null) blockOnSync(syncFuture); + // syncFuture will be null if we failed our wait on safe point above. Otherwise, if + // latch was obtained successfully, the sync we threw in either trigger the latch or it + // got stamped with an exception because the WAL was damaged and we could not sync. Now + // the write pipeline has been opened up again by releasing the safe point, process the + // syncFuture we got above. This is probably a noop but it may be stale exception from + // when old WAL was in place. Catch it if so. + if (syncFuture != null) { + try { + blockOnSync(syncFuture); + } catch (IOException ioe) { + if (LOG.isTraceEnabled()) LOG.trace("Stale sync exception", ioe); + } + } } } finally { scope.close(); @@ -1047,7 +1084,7 @@ public class FSHLog implements WAL { */ protected Path computeFilename(final long filenum) { if (filenum < 0) { - throw new RuntimeException("wal file number can't be < 0"); + throw new RuntimeException("WAL file number can't be < 0"); } String child = logFilePrefix + WAL_FILE_NAME_DELIMITER + filenum + logFileSuffix; return new Path(fullPathLogDir, child); @@ -1079,7 +1116,7 @@ public class FSHLog implements WAL { if (fileName == null) throw new IllegalArgumentException("file name can't be null"); if (!ourFiles.accept(fileName)) { throw new IllegalArgumentException("The log file " + fileName + - " doesn't belong to this wal. (" + toString() + ")"); + " doesn't belong to this WAL. (" + toString() + ")"); } final String fileNameString = fileName.toString(); String chompedPath = fileNameString.substring(prefixPathStr.length(), @@ -1170,6 +1207,7 @@ public class FSHLog implements WAL { * @param clusterIds that have consumed the change * @return New log key. */ + @SuppressWarnings("deprecation") protected WALKey makeKey(byte[] encodedRegionName, TableName tableName, long seqnum, long now, List<UUID> clusterIds, long nonceGroup, long nonce) { // we use HLogKey here instead of WALKey directly to support legacy coprocessors. @@ -1222,6 +1260,7 @@ public class FSHLog implements WAL { */ private class SyncRunner extends HasThread { private volatile long sequence; + // Keep around last exception thrown. Clear on successful sync. private final BlockingQueue<SyncFuture> syncFutures; /** @@ -1340,28 +1379,27 @@ public class FSHLog implements WAL { // while we run. TraceScope scope = Trace.continueSpan(takeSyncFuture.getSpan()); long start = System.nanoTime(); - Throwable t = null; + Throwable lastException = null; try { Trace.addTimelineAnnotation("syncing writer"); writer.sync(); Trace.addTimelineAnnotation("writer synced"); currentSequence = updateHighestSyncedSequence(currentSequence); } catch (IOException e) { - LOG.error("Error syncing, request close of wal ", e); - t = e; + LOG.error("Error syncing, request close of WAL", e); + lastException = e; } catch (Exception e) { LOG.warn("UNEXPECTED", e); - t = e; + lastException = e; } finally { // reattach the span to the future before releasing. takeSyncFuture.setSpan(scope.detach()); // First release what we 'took' from the queue. - syncCount += releaseSyncFuture(takeSyncFuture, currentSequence, t); + syncCount += releaseSyncFuture(takeSyncFuture, currentSequence, lastException); // Can we release other syncs? - syncCount += releaseSyncFutures(currentSequence, t); - if (t != null) { - requestLogRoll(); - } else checkLogRoll(); + syncCount += releaseSyncFutures(currentSequence, lastException); + if (lastException != null) requestLogRoll(); + else checkLogRoll(); } postSync(System.nanoTime() - start, syncCount); } catch (InterruptedException e) { @@ -1410,7 +1448,7 @@ public class FSHLog implements WAL { LOG.warn("HDFS pipeline error detected. " + "Found " + numCurrentReplicas + " replicas but expecting no less than " + this.minTolerableReplication + " replicas. " - + " Requesting close of wal. current pipeline: " + + " Requesting close of WAL. current pipeline: " + Arrays.toString(getPipeLine())); logRollNeeded = true; // If rollWriter is requested, increase consecutiveLogRolls. Once it @@ -1439,8 +1477,8 @@ public class FSHLog implements WAL { } } } catch (Exception e) { - LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e + - " still proceeding ahead..."); + LOG.warn("DFSOutputStream.getNumCurrentReplicas failed because of " + e + + ", continuing..."); } return logRollNeeded; } @@ -1915,6 +1953,11 @@ public class FSHLog implements WAL { private volatile int syncFuturesCount = 0; private volatile SafePointZigZagLatch zigzagLatch; /** + * Set if we get an exception appending or syncing so that all subsequence appends and syncs + * on this WAL fail until WAL is replaced. + */ + private Exception exception = null; + /** * Object to block on while waiting on safe point. */ private final Object safePointWaiter = new Object(); @@ -1934,17 +1977,32 @@ public class FSHLog implements WAL { } private void cleanupOutstandingSyncsOnException(final long sequence, final Exception e) { + // There could be handler-count syncFutures outstanding. for (int i = 0; i < this.syncFuturesCount; i++) this.syncFutures[i].done(sequence, e); this.syncFuturesCount = 0; } + /** + * @return True if outstanding sync futures still + */ + private boolean isOutstandingSyncs() { + for (int i = 0; i < this.syncFuturesCount; i++) { + if (!this.syncFutures[i].isDone()) return true; + } + return false; + } + @Override // We can set endOfBatch in the below method if at end of our this.syncFutures array public void onEvent(final RingBufferTruck truck, final long sequence, boolean endOfBatch) throws Exception { // Appends and syncs are coming in order off the ringbuffer. We depend on this fact. We'll // add appends to dfsclient as they come in. Batching appends doesn't give any significant - // benefit on measurement. Handler sync calls we will batch up. + // benefit on measurement. Handler sync calls we will batch up. If we get an exception + // appending an edit, we fail all subsequent appends and syncs with the same exception until + // the WAL is reset. It is important that we not short-circuit and exit early this method. + // It is important that we always go through the attainSafePoint on the end. Another thread, + // the log roller may be waiting on a signal from us here and will just hang without it. try { if (truck.hasSyncFuturePayload()) { @@ -1954,19 +2012,29 @@ public class FSHLog implements WAL { } else if (truck.hasFSWALEntryPayload()) { TraceScope scope = Trace.continueSpan(truck.unloadSpanPayload()); try { - append(truck.unloadFSWALEntryPayload()); + FSWALEntry entry = truck.unloadFSWALEntryPayload(); + if (this.exception != null) { + // We got an exception on an earlier attempt at append. Do not let this append + // go through. Fail it but stamp the sequenceid into this append though failed. + // We need to do this to close the latch held down deep in WALKey...that is waiting + // on sequenceid assignment otherwise it will just hang out (The #append method + // called below does this also internally). + entry.stampRegionSequenceId(); + // Return to keep processing events coming off the ringbuffer + return; + } + append(entry); } catch (Exception e) { - // If append fails, presume any pending syncs will fail too; let all waiting handlers - // know of the exception. - cleanupOutstandingSyncsOnException(sequence, e); - // Return to keep processing. + // Failed append. Record the exception. + this.exception = e; + // Return to keep processing events coming off the ringbuffer return; } finally { assert scope == NullScope.INSTANCE || !scope.isDetached(); scope.close(); // append scope is complete } } else { - // They can't both be null. Fail all up to this!!! + // What is this if not an append or sync. Fail all up to this!!! cleanupOutstandingSyncsOnException(sequence, new IllegalStateException("Neither append nor sync")); // Return to keep processing. @@ -1985,16 +2053,22 @@ public class FSHLog implements WAL { LOG.trace("Sequence=" + sequence + ", syncCount=" + this.syncFuturesCount); } - // Below expects that the offer 'transfers' responsibility for the outstanding syncs to the - // syncRunner. We should never get an exception in here. HBASE-11145 was because queue - // was sized exactly to the count of user handlers but we could have more if we factor in - // meta handlers doing opens and closes. - int index = Math.abs(this.syncRunnerIndex++) % this.syncRunners.length; - try { - this.syncRunners[index].offer(sequence, this.syncFutures, this.syncFuturesCount); - } catch (Exception e) { - cleanupOutstandingSyncsOnException(sequence, e); - throw e; + if (this.exception == null) { + // Below expects that the offer 'transfers' responsibility for the outstanding syncs to + // the syncRunner. We should never get an exception in here. + int index = Math.abs(this.syncRunnerIndex++) % this.syncRunners.length; + try { + this.syncRunners[index].offer(sequence, this.syncFutures, this.syncFuturesCount); + } catch (Exception e) { + // Should NEVER get here. + requestLogRoll(); + this.exception = new DamagedWALException("Failed offering sync", e); + } + } + // We may have picked up an exception above trying to offer sync + if (this.exception != null) { + cleanupOutstandingSyncsOnException(sequence, + new DamagedWALException("On sync", this.exception)); } attainSafePoint(sequence); this.syncFuturesCount = 0; @@ -2015,16 +2089,24 @@ public class FSHLog implements WAL { private void attainSafePoint(final long currentSequence) { if (this.zigzagLatch == null || !this.zigzagLatch.isCocked()) return; // If here, another thread is waiting on us to get to safe point. Don't leave it hanging. + beforeWaitOnSafePoint(); try { // Wait on outstanding syncers; wait for them to finish syncing (unless we've been - // shutdown or unless our latch has been thrown because we have been aborted). + // shutdown or unless our latch has been thrown because we have been aborted or unless + // this WAL is broken and we can't get a sync/append to complete). while (!this.shutdown && this.zigzagLatch.isCocked() && - highestSyncedSequence.get() < currentSequence) { + highestSyncedSequence.get() < currentSequence && + // We could be in here and all syncs are failing or failed. Check for this. Otherwise + // we'll just be stuck here for ever. In other words, ensure there syncs running. + isOutstandingSyncs()) { synchronized (this.safePointWaiter) { this.safePointWaiter.wait(0, 1); } } - // Tell waiting thread we've attained safe point + // Tell waiting thread we've attained safe point. Can clear this.throwable if set here + // because we know that next event through the ringbuffer will be going to a new WAL + // after we do the zigzaglatch dance. + this.exception = null; this.zigzagLatch.safePointAttained(); } catch (InterruptedException e) { LOG.warn("Interrupted ", e); @@ -2096,9 +2178,10 @@ public class FSHLog implements WAL { // Update metrics. postAppend(entry, EnvironmentEdgeManager.currentTime() - start); } catch (Exception e) { - LOG.warn("Could not append. Requesting close of wal", e); + String msg = "Failed appending " + regionSequenceId + ", requesting roll of WAL"; + LOG.warn(msg, e); requestLogRoll(); - throw e; + throw new DamagedWALException(msg, e); } numEntries.incrementAndGet(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/0bf97bac/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java index 1ea9d4f..a768660 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java @@ -44,7 +44,7 @@ import org.apache.hadoop.hbase.wal.WALKey; * region sequence id (we want to use this later, just before we write the WAL to ensure region * edits maintain order). The extra info added here is not 'serialized' as part of the WALEdit * hence marked 'transient' to underline this fact. It also adds mechanism so we can wait on - * the assign of the region sequence id. See {@link #stampRegionSequenceId()}. + * the assign of the region sequence id. See #stampRegionSequenceId(). */ @InterfaceAudience.Private class FSWALEntry extends Entry { http://git-wip-us.apache.org/repos/asf/hbase/blob/0bf97bac/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java index 9fd2a37..d498c7e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java @@ -126,6 +126,9 @@ public class HLogKey extends WALKey implements Writable { super(encodedRegionName, tablename, logSeqNum, nonceGroup, nonce); } + /** + * @deprecated Don't use these Writables methods. Use PB instead. + */ @Override @Deprecated public void write(DataOutput out) throws IOException { @@ -204,6 +207,7 @@ public class HLogKey extends WALKey implements Writable { in.readByte(); } catch(EOFException e) { // Means it's a very old key, just continue + if (LOG.isTraceEnabled()) LOG.trace(e); } } try { http://git-wip-us.apache.org/repos/asf/hbase/blob/0bf97bac/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java index 3ed9da0..dc5c9cc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java @@ -53,11 +53,12 @@ import com.google.protobuf.CodedInputStream; * <TrailerSize> <PB_WAL_COMPLETE_MAGIC> * </p> * The Reader reads meta information (WAL Compression state, WALTrailer, etc) in - * {@link ProtobufLogReader#initReader(FSDataInputStream)}. A WALTrailer is an extensible structure + * ProtobufLogReader#initReader(FSDataInputStream). A WALTrailer is an extensible structure * which is appended at the end of the WAL. This is empty for now; it can contain some meta * information such as Region level stats, etc in future. */ -@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX, HBaseInterfaceAudience.CONFIG}) +@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX, + HBaseInterfaceAudience.CONFIG}) public class ProtobufLogReader extends ReaderBase { private static final Log LOG = LogFactory.getLog(ProtobufLogReader.class); // public for WALFactory until we move everything to o.a.h.h.wal @@ -78,8 +79,8 @@ public class ProtobufLogReader extends ReaderBase { protected WALCellCodec.ByteStringUncompressor byteStringUncompressor; protected boolean hasCompression = false; protected boolean hasTagCompression = false; - // walEditsStopOffset is the position of the last byte to read. After reading the last WALEdit entry - // in the wal, the inputstream's position is equal to walEditsStopOffset. + // walEditsStopOffset is the position of the last byte to read. After reading the last WALEdit + // entry in the wal, the inputstream's position is equal to walEditsStopOffset. private long walEditsStopOffset; private boolean trailerPresent; protected WALTrailer trailer; http://git-wip-us.apache.org/repos/asf/hbase/blob/0bf97bac/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java index ca80e4c..070ab2c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java @@ -159,7 +159,7 @@ public class ProtobufLogWriter extends WriterBase { output.write(ProtobufLogReader.PB_WAL_COMPLETE_MAGIC); this.trailerWritten = true; } catch (IOException ioe) { - LOG.error("Got IOException while writing trailer", ioe); + LOG.warn("Failed to write trailer, non-fatal, continuing...", ioe); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/0bf97bac/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java index 62ab458..7de8367 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java @@ -42,9 +42,9 @@ import org.apache.htrace.Span; * SyncFutures as done; i.e. we batch up sync calls rather than do a dfs sync * call every time a Handler asks for it. * <p> - * SyncFutures are immutable but recycled. Call {@link #reset(long, Span)} before use even + * SyncFutures are immutable but recycled. Call #reset(long, Span) before use even * if it the first time, start the sync, then park the 'hitched' thread on a call to - * {@link #get()} + * #get(). */ @InterfaceAudience.Private class SyncFuture { http://git-wip-us.apache.org/repos/asf/hbase/blob/0bf97bac/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java index 61c7a97..ab4831c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java @@ -30,12 +30,13 @@ import java.util.NavigableMap; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.CountDownLatch; - +import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; @@ -49,6 +50,9 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; + + + // imports for things that haven't moved from regionserver.wal yet. import org.apache.hadoop.hbase.regionserver.wal.CompressionContext; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; @@ -285,7 +289,7 @@ public class WALKey implements SequenceId, Comparable<WALKey> { public void setOrigLogSeqNum(final long seqId) { this.origLogSeqNum = seqId; } - + /** * Return a positive long if current WALKey is created from a replay edit * @return original sequence number of the WALEdit @@ -293,16 +297,29 @@ public class WALKey implements SequenceId, Comparable<WALKey> { public long getOrigLogSeqNum() { return this.origLogSeqNum; } - + + @Override + public long getSequenceId() throws IOException { + return getSequenceId(-1); + } + /** - * Wait for sequence number is assigned & return the assigned value + * Wait for sequence number to be assigned & return the assigned value. + * @param maxWaitForSeqId maximum time to wait in milliseconds for sequenceid * @return long the new assigned sequence number * @throws InterruptedException */ - @Override - public long getSequenceId() throws IOException { + public long getSequenceId(final long maxWaitForSeqId) throws IOException { + // TODO: This implementation waiting on a latch is problematic because if a higher level + // determines we should stop or abort, there is not global list of all these blocked WALKeys + // waiting on a sequence id; they can't be cancelled... interrupted. See getNextSequenceId try { - this.seqNumAssignedLatch.await(); + if (maxWaitForSeqId < 0) { + this.seqNumAssignedLatch.await(); + } else if (!this.seqNumAssignedLatch.await(maxWaitForSeqId, TimeUnit.MILLISECONDS)) { + throw new TimeoutIOException("Failed to get sequenceid after " + maxWaitForSeqId + + "ms; WAL system stuck or has gone away?"); + } } catch (InterruptedException ie) { LOG.warn("Thread interrupted waiting for next log sequence number"); InterruptedIOException iie = new InterruptedIOException(); http://git-wip-us.apache.org/repos/asf/hbase/blob/0bf97bac/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java index c270951..dde7c5a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java @@ -112,6 +112,7 @@ import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -910,7 +911,7 @@ public class TestDistributedLogSplitting { } } - @Test(timeout = 300000) + @Ignore ("We don't support DLR anymore") @Test(timeout = 300000) public void testDisallowWritesInRecovering() throws Exception { LOG.info("testDisallowWritesInRecovering"); conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); http://git-wip-us.apache.org/repos/asf/hbase/blob/0bf97bac/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java index 4e97738..b69f672 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.TableName; @@ -219,7 +220,8 @@ public class TestFSErrorsExposed { util.getDFSCluster().restartDataNodes(); } finally { - util.getMiniHBaseCluster().killAll(); + MiniHBaseCluster cluster = util.getMiniHBaseCluster(); + if (cluster != null) cluster.killAll(); util.shutdownMiniCluster(); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/0bf97bac/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java new file mode 100644 index 0000000..945361f --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java @@ -0,0 +1,273 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.DroppedSnapshotException; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.regionserver.wal.FSHLog; +import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALProvider.Writer; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.mockito.Mockito; +import org.mockito.exceptions.verification.WantedButNotInvoked; + +/** + * Testing sync/append failures. + * Copied from TestHRegion. + */ +@Category({MediumTests.class}) +public class TestFailedAppendAndSync { + private static final Log LOG = LogFactory.getLog(TestFailedAppendAndSync.class); + @Rule public TestName name = new TestName(); + + private static final String COLUMN_FAMILY = "MyCF"; + private static final byte [] COLUMN_FAMILY_BYTES = Bytes.toBytes(COLUMN_FAMILY); + + HRegion region = null; + // Do not run unit tests in parallel (? Why not? It don't work? Why not? St.Ack) + private static HBaseTestingUtility TEST_UTIL; + public static Configuration CONF ; + private String dir; + + // Test names + protected TableName tableName; + + @Before + public void setup() throws IOException { + TEST_UTIL = HBaseTestingUtility.createLocalHTU(); + CONF = TEST_UTIL.getConfiguration(); + // Disable block cache. + CONF.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f); + dir = TEST_UTIL.getDataTestDir("TestHRegion").toString(); + tableName = TableName.valueOf(name.getMethodName()); + } + + @After + public void tearDown() throws Exception { + EnvironmentEdgeManagerTestHelper.reset(); + LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir()); + TEST_UTIL.cleanupTestDir(); + } + + String getName() { + return name.getMethodName(); + } + + /** + * Reproduce locking up that happens when we get an exceptions appending and syncing. + * See HBASE-14317. + * First I need to set up some mocks for Server and RegionServerServices. I also need to + * set up a dodgy WAL that will throw an exception when we go to append to it. + */ + @Test (timeout=300000) + public void testLockupAroundBadAssignSync() throws IOException { + final AtomicLong rolls = new AtomicLong(0); + // Dodgy WAL. Will throw exceptions when flags set. + class DodgyFSLog extends FSHLog { + volatile boolean throwSyncException = false; + volatile boolean throwAppendException = false; + + public DodgyFSLog(FileSystem fs, Path root, String logDir, Configuration conf) + throws IOException { + super(fs, root, logDir, conf); + } + + @Override + public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException { + byte [][] regions = super.rollWriter(force); + rolls.getAndIncrement(); + return regions; + } + + @Override + protected Writer createWriterInstance(Path path) throws IOException { + final Writer w = super.createWriterInstance(path); + return new Writer() { + @Override + public void close() throws IOException { + w.close(); + } + + @Override + public void sync() throws IOException { + if (throwSyncException) { + throw new IOException("FAKE! Failed to replace a bad datanode..."); + } + w.sync(); + } + + @Override + public void append(Entry entry) throws IOException { + if (throwAppendException) { + throw new IOException("FAKE! Failed to replace a bad datanode..."); + } + w.append(entry); + } + + @Override + public long getLength() throws IOException { + return w.getLength(); + } + }; + } + } + + // Make up mocked server and services. + Server server = mock(Server.class); + when(server.getConfiguration()).thenReturn(CONF); + when(server.isStopped()).thenReturn(false); + when(server.isAborted()).thenReturn(false); + RegionServerServices services = mock(RegionServerServices.class); + // OK. Now I have my mocked up Server and RegionServerServices and my dodgy WAL, go ahead with + // the test. + FileSystem fs = FileSystem.get(CONF); + Path rootDir = new Path(dir + getName()); + DodgyFSLog dodgyWAL = new DodgyFSLog(fs, rootDir, getName(), CONF); + LogRoller logRoller = new LogRoller(server, services); + logRoller.addWAL(dodgyWAL); + logRoller.start(); + + boolean threwOnSync = false; + boolean threwOnAppend = false; + boolean threwOnBoth = false; + + HRegion region = initHRegion(tableName, null, null, dodgyWAL); + try { + // Get some random bytes. + byte[] value = Bytes.toBytes(getName()); + try { + // First get something into memstore + Put put = new Put(value); + put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), value); + region.put(put); + } catch (IOException ioe) { + fail(); + } + long rollsCount = rolls.get(); + try { + dodgyWAL.throwAppendException = true; + dodgyWAL.throwSyncException = false; + Put put = new Put(value); + put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("3"), value); + region.put(put); + } catch (IOException ioe) { + threwOnAppend = true; + } + while (rollsCount == rolls.get()) Threads.sleep(100); + rollsCount = rolls.get(); + + // When we get to here.. we should be ok. A new WAL has been put in place. There were no + // appends to sync. We should be able to continue. + + try { + dodgyWAL.throwAppendException = true; + dodgyWAL.throwSyncException = true; + Put put = new Put(value); + put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("4"), value); + region.put(put); + } catch (IOException ioe) { + threwOnBoth = true; + } + while (rollsCount == rolls.get()) Threads.sleep(100); + + // Again, all should be good. New WAL and no outstanding unsync'd edits so we should be able + // to just continue. + + // So, should be no abort at this stage. Verify. + Mockito.verify(server, Mockito.atLeast(0)). + abort(Mockito.anyString(), (Throwable)Mockito.anyObject()); + try { + dodgyWAL.throwAppendException = false; + dodgyWAL.throwSyncException = true; + Put put = new Put(value); + put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("2"), value); + region.put(put); + } catch (IOException ioe) { + threwOnSync = true; + } + // An append in the WAL but the sync failed is a server abort condition. That is our + // current semantic. Verify. It takes a while for abort to be called. Just hang here till it + // happens. If it don't we'll timeout the whole test. That is fine. + while (true) { + try { + Mockito.verify(server, Mockito.atLeast(1)). + abort(Mockito.anyString(), (Throwable)Mockito.anyObject()); + break; + } catch (WantedButNotInvoked t) { + Threads.sleep(1); + } + } + } finally { + // To stop logRoller, its server has to say it is stopped. + Mockito.when(server.isStopped()).thenReturn(true); + if (logRoller != null) logRoller.interrupt(); + if (region != null) { + try { + region.close(true); + } catch (DroppedSnapshotException e) { + LOG.info("On way out; expected!", e); + } + } + if (dodgyWAL != null) dodgyWAL.close(); + assertTrue("The regionserver should have thrown an exception", threwOnBoth); + assertTrue("The regionserver should have thrown an exception", threwOnAppend); + assertTrue("The regionserver should have thrown an exception", threwOnSync); + } + } + + /** + * @return A region on which you must call + * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done. + */ + public HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey, WAL wal) + throws IOException { + return TEST_UTIL.createLocalHRegion(tableName.getName(), startKey, stopKey, + getName(), CONF, false, Durability.SYNC_WAL, + wal, COLUMN_FAMILY_BYTES); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/0bf97bac/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 6d3b4b2..5add20e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -33,7 +33,9 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Matchers.*; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyLong; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; @@ -129,7 +131,13 @@ import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; import org.apache.hadoop.hbase.regionserver.Region.RowLock; import org.apache.hadoop.hbase.regionserver.TestStore.FaultyFileSystem; import org.apache.hadoop.hbase.regionserver.handler.FinishRegionRecoveringHandler; -import org.apache.hadoop.hbase.regionserver.wal.*; +import org.apache.hadoop.hbase.regionserver.wal.FSHLog; +import org.apache.hadoop.hbase.regionserver.wal.HLogKey; +import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; +import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource; +import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.test.MetricsAssertHelper; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -147,6 +155,7 @@ import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALProvider; +import org.apache.hadoop.hbase.wal.WALProvider.Writer; import org.apache.hadoop.hbase.wal.WALSplitter; import org.junit.After; import org.junit.Assert; @@ -256,6 +265,8 @@ public class TestHRegion { HRegion.closeHRegion(region); } + + /* * This test is for verifying memstore snapshot size is correctly updated in case of rollback * See HBASE-10845 @@ -335,7 +346,8 @@ public class TestHRegion { // save normalCPHost and replaced by mockedCPHost, which will cancel flush requests RegionCoprocessorHost normalCPHost = region.getCoprocessorHost(); RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class); - when(mockedCPHost.preFlush(isA(HStore.class), isA(InternalScanner.class))).thenReturn(null); + when(mockedCPHost.preFlush(Mockito.isA(HStore.class), Mockito.isA(InternalScanner.class))). + thenReturn(null); region.setCoprocessorHost(mockedCPHost); region.put(put); region.flush(true); @@ -400,9 +412,18 @@ public class TestHRegion { } catch (DroppedSnapshotException dse) { // What we are expecting region.closing.set(false); // this is needed for the rest of the test to work + } catch (Exception e) { + // What we are expecting + region.closing.set(false); // this is needed for the rest of the test to work } // Make it so all writes succeed from here on out ffs.fault.set(false); + // WAL is bad because of above faulty fs. Roll WAL. + try { + region.getWAL().rollWriter(true); + } catch (Exception e) { + int x = 0; + } // Check sizes. Should still be the one entry. Assert.assertEquals(sizeOfOnePut, region.getMemstoreSize()); // Now add two entries so that on this next flush that fails, we can see if we @@ -418,6 +439,8 @@ public class TestHRegion { region.flush(true); // Make sure our memory accounting is right. Assert.assertEquals(sizeOfOnePut * 2, region.getMemstoreSize()); + } catch (Exception e) { + int x = 0; } finally { HRegion.closeHRegion(region); } @@ -465,12 +488,13 @@ public class TestHRegion { // Now try close on top of a failing flush. region.close(); fail(); - } catch (DroppedSnapshotException dse) { + } catch (IOException dse) { // Expected LOG.info("Expected DroppedSnapshotException"); } finally { // Make it so all writes succeed from here on out so can close clean ffs.fault.set(false); + region.getWAL().rollWriter(true); HRegion.closeHRegion(region); } return null; @@ -898,7 +922,7 @@ public class TestHRegion { // now verify that the flush markers are written wal.shutdown(); - WAL.Reader reader = wals.createReader(fs, DefaultWALProvider.getCurrentFileName(wal), + WAL.Reader reader = WALFactory.createReader(fs, DefaultWALProvider.getCurrentFileName(wal), TEST_UTIL.getConfiguration()); try { List<WAL.Entry> flushDescriptors = new ArrayList<WAL.Entry>(); @@ -1014,8 +1038,7 @@ public class TestHRegion { } } - @Test - @SuppressWarnings("unchecked") + @Test (timeout=60000) public void testFlushMarkersWALFail() throws Exception { // test the cases where the WAL append for flush markers fail. String method = name.getMethodName(); @@ -1027,9 +1050,56 @@ public class TestHRegion { final Configuration walConf = new Configuration(TEST_UTIL.getConfiguration()); FSUtils.setRootDir(walConf, logDir); - final WALFactory wals = new WALFactory(walConf, null, method); - WAL wal = spy(wals.getWAL(tableName.getName())); + // Make up a WAL that we can manipulate at append time. + class FailAppendFlushMarkerWAL extends FSHLog { + volatile FlushAction [] flushActions = null; + + public FailAppendFlushMarkerWAL(FileSystem fs, Path root, String logDir, Configuration conf) + throws IOException { + super(fs, root, logDir, conf); + } + + @Override + protected Writer createWriterInstance(Path path) throws IOException { + final Writer w = super.createWriterInstance(path); + return new Writer() { + @Override + public void close() throws IOException { + w.close(); + } + + @Override + public void sync() throws IOException { + w.sync(); + } + + @Override + public void append(Entry entry) throws IOException { + List<Cell> cells = entry.getEdit().getCells(); + if (WALEdit.isMetaEditFamily(cells.get(0))) { + FlushDescriptor desc = WALEdit.getFlushDescriptor(cells.get(0)); + if (desc != null) { + for (FlushAction flushAction: flushActions) { + if (desc.getAction().equals(flushAction)) { + throw new IOException("Failed to append flush marker! " + flushAction); + } + } + } + } + w.append(entry); + } + + @Override + public long getLength() throws IOException { + return w.getLength(); + } + }; + } + } + FailAppendFlushMarkerWAL wal = + new FailAppendFlushMarkerWAL(FileSystem.get(walConf), FSUtils.getRootDir(walConf), + getName(), walConf); this.region = initHRegion(tableName.getName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, method, CONF, false, Durability.USE_DEFAULT, wal, family); try { @@ -1040,13 +1110,7 @@ public class TestHRegion { region.put(put); // 1. Test case where START_FLUSH throws exception - IsFlushWALMarker isFlushWALMarker = new IsFlushWALMarker(FlushAction.START_FLUSH); - - // throw exceptions if the WalEdit is a start flush action - when(wal.append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any(), - (WALEdit)argThat(isFlushWALMarker), (AtomicLong)any(), Mockito.anyBoolean(), - (List<Cell>)any())) - .thenThrow(new IOException("Fail to append flush marker")); + wal.flushActions = new FlushAction [] {FlushAction.START_FLUSH}; // start cache flush will throw exception try { @@ -1058,9 +1122,13 @@ public class TestHRegion { } catch (IOException expected) { // expected } + // The WAL is hosed. It has failed an append and a sync. It has an exception stuck in it + // which it will keep returning until we roll the WAL to prevent any further appends going + // in or syncs succeeding on top of failed appends, a no-no. + wal.rollWriter(true); // 2. Test case where START_FLUSH succeeds but COMMIT_FLUSH will throw exception - isFlushWALMarker.set(FlushAction.COMMIT_FLUSH); + wal.flushActions = new FlushAction [] {FlushAction.COMMIT_FLUSH}; try { region.flush(true); @@ -1073,6 +1141,8 @@ public class TestHRegion { } region.close(); + // Roll WAL to clean out any exceptions stuck in it. See note above where we roll WAL. + wal.rollWriter(true); this.region = initHRegion(tableName.getName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, method, CONF, false, Durability.USE_DEFAULT, wal, family); region.put(put); @@ -1080,7 +1150,7 @@ public class TestHRegion { // 3. Test case where ABORT_FLUSH will throw exception. // Even if ABORT_FLUSH throws exception, we should not fail with IOE, but continue with // DroppedSnapshotException. Below COMMMIT_FLUSH will cause flush to abort - isFlushWALMarker.set(FlushAction.COMMIT_FLUSH, FlushAction.ABORT_FLUSH); + wal.flushActions = new FlushAction [] {FlushAction.COMMIT_FLUSH, FlushAction.ABORT_FLUSH}; try { region.flush(true); @@ -5668,7 +5738,6 @@ public class TestHRegion { putData(startRow, numRows, qualifier, families); int splitRow = startRow + numRows; putData(splitRow, numRows, qualifier, families); - int endRow = splitRow + numRows; region.flush(true); HRegion [] regions = null; http://git-wip-us.apache.org/repos/asf/hbase/blob/0bf97bac/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java index e876a94..77f0230 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java @@ -128,7 +128,5 @@ public class TestMultiVersionConsistencyControl extends TestCase { for (int i = 0; i < n; ++i) { assertTrue(statuses[i].get()); } - } - -} +} \ No newline at end of file