http://git-wip-us.apache.org/repos/asf/hbase/blob/ec92a8a7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index f03c205..ac846b6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1,5 +1,4 @@ /* - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -29,6 +28,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -41,6 +41,7 @@ import java.util.NavigableSet; import java.util.RandomAccess; import java.util.Set; import java.util.TreeMap; +import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ConcurrentHashMap; @@ -69,7 +70,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.ArrayBackedTag; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellScanner; @@ -77,7 +77,6 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CompoundConfiguration; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DroppedSnapshotException; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants.OperationStatusCode; @@ -93,7 +92,6 @@ import org.apache.hadoop.hbase.ShareableMemory; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.TagRewriteCell; -import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.TagUtil; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.backup.HFileArchiver; @@ -112,7 +110,7 @@ import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.conf.ConfigurationManager; import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; -import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType; import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare; import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException; @@ -123,8 +121,6 @@ import org.apache.hadoop.hbase.filter.FilterWrapper; import org.apache.hadoop.hbase.filter.IncompatibleFilterException; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.TimeRange; -import org.apache.hadoop.hbase.io.hfile.BlockCache; -import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.ipc.CallerDisconnectedException; import org.apache.hadoop.hbase.ipc.RpcCallContext; @@ -148,6 +144,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.Stor import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor; +import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry; import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; @@ -169,7 +166,6 @@ import org.apache.hadoop.hbase.util.CompressionTest; import org.apache.hadoop.hbase.util.Counter; import org.apache.hadoop.hbase.util.EncryptionTest; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HashedBytes; import org.apache.hadoop.hbase.util.Pair; @@ -199,6 +195,7 @@ import com.google.protobuf.RpcController; import com.google.protobuf.Service; import com.google.protobuf.TextFormat; +@SuppressWarnings("deprecation") @InterfaceAudience.Private public class HRegion implements HeapSize, PropagatingConfigurationObserver, Region { private static final Log LOG = LogFactory.getLog(HRegion.class); @@ -207,18 +204,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi "hbase.hregion.scan.loadColumnFamiliesOnDemand"; /** - * Longest time we'll wait on a sequenceid. - * Sequenceid comes up out of the WAL subsystem. WAL subsystem can go bad or a test might use - * it without cleaning up previous usage properly; generally, a WAL roll is needed. The timeout - * is for a latch in WALKey. There is no global accounting of outstanding WALKeys; intentionally - * to avoid contention, but it makes it so if an abort or problem, we could be stuck waiting - * on the WALKey latch. Revisit. - */ - private final int maxWaitForSeqId; - private static final String MAX_WAIT_FOR_SEQ_ID_KEY = "hbase.hregion.max.wait.for.sequenceid.ms"; - private static final int DEFAULT_MAX_WAIT_FOR_SEQ_ID = 30000; - - /** * This is the global default value for durability. All tables/mutations not * defining a durability or using USE_DEFAULT will default to this value. */ @@ -282,7 +267,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi final Counter checkAndMutateChecksPassed = new Counter(); final Counter checkAndMutateChecksFailed = new Counter(); - //Number of requests + // Number of requests final Counter readRequestsCount = new Counter(); final Counter filteredReadRequestsCount = new Counter(); final Counter writeRequestsCount = new Counter(); @@ -357,7 +342,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi */ private boolean disallowWritesInRecovering = false; - // when a region is in recovering state, it can only accept writes not reads + // When a region is in recovering state, it can only accept writes not reads private volatile boolean recovering = false; private volatile Optional<ConfigurationManager> configurationManager; @@ -374,7 +359,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // We achieve this by synchronizing on the scannerReadPoints object. synchronized(scannerReadPoints) { minimumReadPoint = mvcc.getReadPoint(); - for (Long readPoint: this.scannerReadPoints.values()) { if (readPoint < minimumReadPoint) { minimumReadPoint = readPoint; @@ -674,7 +658,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration", DEFAULT_ROWLOCK_WAIT_DURATION); - this.maxWaitForSeqId = conf.getInt(MAX_WAIT_FOR_SEQ_ID_KEY, DEFAULT_MAX_WAIT_FOR_SEQ_ID); this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, true); this.htableDescriptor = htd; this.rsServices = rsServices; @@ -1183,7 +1166,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi */ public void setRecovering(boolean newState) { boolean wasRecovering = this.recovering; - // before we flip the recovering switch (enabling reads) we should write the region open + // Before we flip the recovering switch (enabling reads) we should write the region open // event to WAL if needed if (wal != null && getRegionServerServices() != null && !writestate.readOnly && wasRecovering && !newState) { @@ -2051,7 +2034,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi /** * Should the store be flushed because it is old enough. * <p> - * Every FlushPolicy should call this to determine whether a store is old enough to flush(except + * Every FlushPolicy should call this to determine whether a store is old enough to flush (except * that you always flush all stores). Otherwise the {@link #shouldFlush()} method will always * returns true which will make a lot of flush requests. */ @@ -2152,19 +2135,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * for say installing a bulk loaded file just ahead of the last hfile that was * the result of this flush, etc. * - * @param wal - * Null if we're NOT to go via wal. - * @param myseqid - * The seqid to use if <code>wal</code> is null writing out flush - * file. - * @param storesToFlush - * The list of stores to flush. + * @param wal Null if we're NOT to go via wal. + * @param myseqid The seqid to use if <code>wal</code> is null writing out flush file. + * @param storesToFlush The list of stores to flush. * @return object describing the flush's state - * @throws IOException - * general io exceptions - * @throws DroppedSnapshotException - * Thrown when replay of wal is required because a Snapshot was not - * properly persisted. + * @throws IOException general io exceptions + * @throws DroppedSnapshotException Thrown when replay of WAL is required. */ protected FlushResult internalFlushcache(final WAL wal, final long myseqid, final Collection<Store> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker) @@ -2188,65 +2164,48 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi throw new IOException("Aborting flush because server is aborted..."); } final long startTime = EnvironmentEdgeManager.currentTime(); - // If nothing to flush, return, but we need to safely update the region sequence id + // If nothing to flush, return, but return with a valid unused sequenceId. + // Its needed by bulk upload IIRC. It flushes until no edits in memory so it can insert a + // bulk loaded file between memory and existing hfiles. It wants a good seqeunceId that belongs + // to no other that it can use to associate with the bulk load. Hence this little dance below + // to go get one. if (this.memstoreSize.get() <= 0) { - // Take an update lock because am about to change the sequence id and we want the sequence id - // to be at the border of the empty memstore. - MultiVersionConcurrencyControl.WriteEntry writeEntry = null; + // Take an update lock so no edits can come into memory just yet. this.updatesLock.writeLock().lock(); + WriteEntry writeEntry = null; try { if (this.memstoreSize.get() <= 0) { // Presume that if there are still no edits in the memstore, then there are no edits for // this region out in the WAL subsystem so no need to do any trickery clearing out - // edits in the WAL system. Up the sequence number so the resulting flush id is for - // sure just beyond the last appended region edit (useful as a marker when bulk loading, - // etc.). NOTE: The writeEntry write number is NOT in the WAL.. there is no WAL writing - // here. + // edits in the WAL sub-system. Up the sequence number so the resulting flush id is for + // sure just beyond the last appended region edit and not associated with any edit + // (useful as marker when bulk loading, etc.). + FlushResult flushResult = null; if (wal != null) { writeEntry = mvcc.begin(); long flushOpSeqId = writeEntry.getWriteNumber(); - FlushResult flushResult = new FlushResultImpl( - FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, - flushOpSeqId, - "Nothing to flush", - writeFlushRequestMarkerToWAL(wal, writeFlushWalMarker)); - // TODO: Lets see if we hang here, if there is a scenario where an outstanding reader - // with a read point is in advance of this write point. + flushResult = new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, + flushOpSeqId, "Nothing to flush", + writeFlushRequestMarkerToWAL(wal, writeFlushWalMarker)); mvcc.completeAndWait(writeEntry); + // Set to null so we don't complete it again down in finally block. writeEntry = null; return new PrepareFlushResult(flushResult, myseqid); } else { - return new PrepareFlushResult( - new FlushResultImpl( - FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, - "Nothing to flush", - false), - myseqid); + return new PrepareFlushResult(new FlushResultImpl( + FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, "Nothing to flush", false), myseqid); } } } finally { - this.updatesLock.writeLock().unlock(); if (writeEntry != null) { + // If writeEntry is non-null, this operation failed; the mvcc transaction failed... + // but complete it anyways so it doesn't block the mvcc queue. mvcc.complete(writeEntry); } + this.updatesLock.writeLock().unlock(); } } - - if (LOG.isInfoEnabled()) { - // Log a fat line detailing what is being flushed. - StringBuilder perCfExtras = null; - if (!isAllFamilies(storesToFlush)) { - perCfExtras = new StringBuilder(); - for (Store store: storesToFlush) { - perCfExtras.append("; ").append(store.getColumnFamilyName()); - perCfExtras.append("=").append(StringUtils.byteDesc(store.getMemStoreSize())); - } - } - LOG.info("Flushing " + + storesToFlush.size() + "/" + stores.size() + - " column families, memstore=" + StringUtils.byteDesc(this.memstoreSize.get()) + - ((perCfExtras != null && perCfExtras.length() > 0)? perCfExtras.toString(): "") + - ((wal != null) ? "" : "; WAL is null, using passed sequenceid=" + myseqid)); - } + logFatLineOnFlush(storesToFlush, myseqid); // Stop updates while we snapshot the memstore of all of these regions' stores. We only have // to do this for a moment. It is quick. We also set the memstore size to zero here before we // allow updates again so its value will represent the size of the updates received @@ -2257,8 +2216,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi status.setStatus("Obtaining lock to block concurrent updates"); // block waiting for the lock for internal flush this.updatesLock.writeLock().lock(); - status.setStatus("Preparing to flush by snapshotting stores in " + - getRegionInfo().getEncodedName()); + status.setStatus("Preparing flush snapshotting stores in " + getRegionInfo().getEncodedName()); long totalFlushableSizeOfFlushableStores = 0; Set<byte[]> flushedFamilyNames = new HashSet<byte[]>(); @@ -2280,109 +2238,117 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // will be in advance of this sequence id. long flushedSeqId = HConstants.NO_SEQNUM; byte[] encodedRegionName = getRegionInfo().getEncodedNameAsBytes(); - - long trxId = 0; - MultiVersionConcurrencyControl.WriteEntry writeEntry = mvcc.begin(); try { - try { - if (wal != null) { - Long earliestUnflushedSequenceIdForTheRegion = + if (wal != null) { + Long earliestUnflushedSequenceIdForTheRegion = wal.startCacheFlush(encodedRegionName, flushedFamilyNames); - if (earliestUnflushedSequenceIdForTheRegion == null) { - // This should never happen. This is how startCacheFlush signals flush cannot proceed. - String msg = this.getRegionInfo().getEncodedName() + " flush aborted; WAL closing."; - status.setStatus(msg); - return new PrepareFlushResult( + if (earliestUnflushedSequenceIdForTheRegion == null) { + // This should never happen. This is how startCacheFlush signals flush cannot proceed. + String msg = this.getRegionInfo().getEncodedName() + " flush aborted; WAL closing."; + status.setStatus(msg); + return new PrepareFlushResult( new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false), myseqid); - } - flushOpSeqId = getNextSequenceId(wal); - // Back up 1, minus 1 from oldest sequence id in memstore to get last 'flushed' edit - flushedSeqId = - earliestUnflushedSequenceIdForTheRegion.longValue() == HConstants.NO_SEQNUM? - flushOpSeqId: earliestUnflushedSequenceIdForTheRegion.longValue() - 1; - } else { - // use the provided sequence Id as WAL is not being used for this flush. - flushedSeqId = flushOpSeqId = myseqid; } + flushOpSeqId = getNextSequenceId(wal); + // Back up 1, minus 1 from oldest sequence id in memstore to get last 'flushed' edit + flushedSeqId = + earliestUnflushedSequenceIdForTheRegion.longValue() == HConstants.NO_SEQNUM? + flushOpSeqId: earliestUnflushedSequenceIdForTheRegion.longValue() - 1; + } else { + // use the provided sequence Id as WAL is not being used for this flush. + flushedSeqId = flushOpSeqId = myseqid; + } - for (Store s : storesToFlush) { - totalFlushableSizeOfFlushableStores += s.getFlushableSize(); - storeFlushCtxs.put(s.getFamily().getName(), s.createFlushContext(flushOpSeqId)); - committedFiles.put(s.getFamily().getName(), null); // for writing stores to WAL - storeFlushableSize.put(s.getFamily().getName(), s.getFlushableSize()); - } + for (Store s : storesToFlush) { + totalFlushableSizeOfFlushableStores += s.getFlushableSize(); + storeFlushCtxs.put(s.getFamily().getName(), s.createFlushContext(flushOpSeqId)); + committedFiles.put(s.getFamily().getName(), null); // for writing stores to WAL + storeFlushableSize.put(s.getFamily().getName(), s.getFlushableSize()); + } - // write the snapshot start to WAL - if (wal != null && !writestate.readOnly) { - FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH, + // write the snapshot start to WAL + if (wal != null && !writestate.readOnly) { + FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH, getRegionInfo(), flushOpSeqId, committedFiles); - // no sync. Sync is below where we do not hold the updates lock - trxId = WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), - desc, false, mvcc); - } - - // Prepare flush (take a snapshot) - for (StoreFlushContext flush : storeFlushCtxs.values()) { - flush.prepare(); - } - } catch (IOException ex) { - if (wal != null) { - if (trxId > 0) { // check whether we have already written START_FLUSH to WAL - try { - FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH, - getRegionInfo(), flushOpSeqId, committedFiles); - WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), - desc, false, mvcc); - } catch (Throwable t) { - LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" + - StringUtils.stringifyException(t)); - // ignore this since we will be aborting the RS with DSE. - } - } - // we have called wal.startCacheFlush(), now we have to abort it - wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes()); - throw ex; // let upper layers deal with it. - } - } finally { - this.updatesLock.writeLock().unlock(); - } - String s = "Finished memstore snapshotting " + this + - ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSizeOfFlushableStores; - status.setStatus(s); - if (LOG.isTraceEnabled()) LOG.trace(s); - // sync unflushed WAL changes - // see HBASE-8208 for details - if (wal != null) { - try { - wal.sync(); // ensure that flush marker is sync'ed - } catch (IOException ioe) { - wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes()); - throw ioe; - } + // No sync. Sync is below where no updates lock and we do FlushAction.COMMIT_FLUSH + WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, false, mvcc); } - // wait for all in-progress transactions to commit to WAL before - // we can start the flush. This prevents - // uncommitted transactions from being written into HFiles. - // We have to block before we start the flush, otherwise keys that - // were removed via a rollbackMemstore could be written to Hfiles. - mvcc.completeAndWait(writeEntry); - // set writeEntry to null to prevent mvcc.complete from being called again inside finally - // block - writeEntry = null; - } finally { - if (writeEntry != null) { - // In case of failure just mark current writeEntry as complete. - mvcc.complete(writeEntry); + // Prepare flush (take a snapshot) + for (StoreFlushContext flush : storeFlushCtxs.values()) { + flush.prepare(); } + } catch (IOException ex) { + doAbortFlushToWAL(wal, flushOpSeqId, committedFiles); + throw ex; + } finally { + this.updatesLock.writeLock().unlock(); } + String s = "Finished memstore snapshotting " + this + ", syncing WAL and waiting on mvcc, " + + "flushsize=" + totalFlushableSizeOfFlushableStores; + status.setStatus(s); + doSyncOfUnflushedWALChanges(wal, getRegionInfo()); return new PrepareFlushResult(storeFlushCtxs, committedFiles, storeFlushableSize, startTime, flushOpSeqId, flushedSeqId, totalFlushableSizeOfFlushableStores); } /** - * @param families + * Utility method broken out of internalPrepareFlushCache so that method is smaller. + */ + private void logFatLineOnFlush(final Collection<Store> storesToFlush, final long sequenceId) { + if (!LOG.isInfoEnabled()) { + return; + } + // Log a fat line detailing what is being flushed. + StringBuilder perCfExtras = null; + if (!isAllFamilies(storesToFlush)) { + perCfExtras = new StringBuilder(); + for (Store store: storesToFlush) { + perCfExtras.append("; ").append(store.getColumnFamilyName()); + perCfExtras.append("=").append(StringUtils.byteDesc(store.getMemStoreSize())); + } + } + LOG.info("Flushing " + + storesToFlush.size() + "/" + stores.size() + + " column families, memstore=" + StringUtils.byteDesc(this.memstoreSize.get()) + + ((perCfExtras != null && perCfExtras.length() > 0)? perCfExtras.toString(): "") + + ((wal != null) ? "" : "; WAL is null, using passed sequenceid=" + sequenceId)); + } + + private void doAbortFlushToWAL(final WAL wal, final long flushOpSeqId, + final Map<byte[], List<Path>> committedFiles) { + if (wal == null) return; + try { + FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH, + getRegionInfo(), flushOpSeqId, committedFiles); + WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, false, + mvcc); + } catch (Throwable t) { + LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" + + StringUtils.stringifyException(t)); + // ignore this since we will be aborting the RS with DSE. + } + // we have called wal.startCacheFlush(), now we have to abort it + wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes()); + } + + /** + * Sync unflushed WAL changes. See HBASE-8208 for details + */ + private static void doSyncOfUnflushedWALChanges(final WAL wal, final HRegionInfo hri) + throws IOException { + if (wal == null) { + return; + } + try { + wal.sync(); // ensure that flush marker is sync'ed + } catch (IOException ioe) { + wal.abortCacheFlush(hri.getEncodedNameAsBytes()); + throw ioe; + } + } + + /** * @return True if passed Set is all families in the region. */ private boolean isAllFamilies(final Collection<Store> families) { @@ -2400,8 +2366,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.CANNOT_FLUSH, getRegionInfo(), -1, new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR)); try { - WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), - desc, true, mvcc); + WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, true, mvcc); return true; } catch (IOException e) { LOG.warn(getRegionInfo().getEncodedName() + " : " @@ -2471,8 +2436,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // write flush marker to WAL. If fail, we should throw DroppedSnapshotException FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH, getRegionInfo(), flushOpSeqId, committedFiles); - WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), - desc, true, mvcc); + WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, true, mvcc); } } catch (Throwable t) { // An exception here means that the snapshot was not persisted. @@ -2485,8 +2449,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi try { FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH, getRegionInfo(), flushOpSeqId, committedFiles); - WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), - desc, false, mvcc); + WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, false, mvcc); } catch (Throwable ex) { LOG.warn(getRegionInfo().getEncodedName() + " : " + "failed writing ABORT_FLUSH marker to WAL", ex); @@ -2557,15 +2520,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi */ @VisibleForTesting protected long getNextSequenceId(final WAL wal) throws IOException { - // TODO: For review. Putting an empty edit in to get a sequenceid out will not work if the - // WAL is banjaxed... if it has gotten an exception and the WAL has not yet been rolled or - // aborted. In this case, we'll just get stuck here. For now, until HBASE-12751, just have - // a timeout. May happen in tests after we tightened the semantic via HBASE-14317. - // Also, the getSequenceId blocks on a latch. There is no global list of outstanding latches - // so if an abort or stop, there is no way to call them in. - WALKey key = this.appendEmptyEdit(wal); - mvcc.complete(key.getWriteEntry()); - return key.getSequenceId(this.maxWaitForSeqId); + WriteEntry we = mvcc.begin(); + mvcc.completeAndWait(we); + return we.getWriteNumber(); } ////////////////////////////////////////////////////////////////////////////// @@ -2754,13 +2711,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * accumulating status codes and tracking the index at which processing * is proceeding. */ - private abstract static class BatchOperationInProgress<T> { + private abstract static class BatchOperation<T> { T[] operations; int nextIndexToProcess = 0; OperationStatus[] retCodeDetails; WALEdit[] walEditsFromCoprocessors; - public BatchOperationInProgress(T[] operations) { + public BatchOperation(T[] operations) { this.operations = operations; this.retCodeDetails = new OperationStatus[operations.length]; this.walEditsFromCoprocessors = new WALEdit[operations.length]; @@ -2780,7 +2737,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - private static class MutationBatch extends BatchOperationInProgress<Mutation> { + private static class MutationBatch extends BatchOperation<Mutation> { private long nonceGroup; private long nonce; public MutationBatch(Mutation[] operations, long nonceGroup, long nonce) { @@ -2820,7 +2777,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - private static class ReplayBatch extends BatchOperationInProgress<MutationReplay> { + private static class ReplayBatch extends BatchOperation<MutationReplay> { private long replaySeqId = 0; public ReplayBatch(MutationReplay[] operations, long seqId) { super(operations); @@ -2906,7 +2863,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * OperationStatusCode and the exceptionMessage if any. * @throws IOException */ - OperationStatus[] batchMutate(BatchOperationInProgress<?> batchOp) throws IOException { + OperationStatus[] batchMutate(BatchOperation<?> batchOp) throws IOException { boolean initialized = false; Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE; startRegionOperation(op); @@ -2920,11 +2877,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (!initialized) { this.writeRequestsCount.add(batchOp.operations.length); if (!batchOp.isInReplay()) { - doPreMutationHook(batchOp); + doPreBatchMutateHook(batchOp); } initialized = true; } - long addedSize = doMiniBatchMutation(batchOp); + long addedSize = doMiniBatchMutate(batchOp); long newSize = this.addAndGetGlobalMemstoreSize(addedSize); if (isFlushSize(newSize)) { requestFlush(); @@ -2936,8 +2893,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return batchOp.retCodeDetails; } - - private void doPreMutationHook(BatchOperationInProgress<?> batchOp) + private void doPreBatchMutateHook(BatchOperation<?> batchOp) throws IOException { /* Run coprocessor pre hook outside of locks to avoid deadlock */ WALEdit walEdit = new WALEdit(); @@ -2976,103 +2932,58 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + /** + * Called to do a piece of the batch that came in to {@link #batchMutate(Mutation[], long, long)} + * In here we also handle replay of edits on region recover. + * @return Change in size brought about by applying <code>batchOp</code> + */ @SuppressWarnings("unchecked") - private long doMiniBatchMutation(BatchOperationInProgress<?> batchOp) throws IOException { - boolean isInReplay = batchOp.isInReplay(); - // variable to note if all Put items are for the same CF -- metrics related + // TODO: This needs a rewrite. Doesn't have to be this long. St.Ack 20160120 + private long doMiniBatchMutate(BatchOperation<?> batchOp) throws IOException { + boolean replay = batchOp.isInReplay(); + // Variable to note if all Put items are for the same CF -- metrics related boolean putsCfSetConsistent = true; - //The set of columnFamilies first seen for Put. - Set<byte[]> putsCfSet = null; - // variable to note if all Delete items are for the same CF -- metrics related + // Variable to note if all Delete items are for the same CF -- metrics related boolean deletesCfSetConsistent = true; - //The set of columnFamilies first seen for Delete. + // The set of columnFamilies first seen for Put. + Set<byte[]> putsCfSet = null; + // The set of columnFamilies first seen for Delete. Set<byte[]> deletesCfSet = null; - - long currentNonceGroup = HConstants.NO_NONCE, currentNonce = HConstants.NO_NONCE; - WALEdit walEdit = new WALEdit(isInReplay); - MultiVersionConcurrencyControl.WriteEntry writeEntry = null; - long txid = 0; - boolean doRollBackMemstore = false; + long currentNonceGroup = HConstants.NO_NONCE; + long currentNonce = HConstants.NO_NONCE; + WALEdit walEdit = new WALEdit(replay); boolean locked = false; - - /** Keep track of the locks we hold so we can release them in finally clause */ - List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length); // reference family maps directly so coprocessors can mutate them if desired Map<byte[], List<Cell>>[] familyMaps = new Map[batchOp.operations.length]; // We try to set up a batch in the range [firstIndex,lastIndexExclusive) int firstIndex = batchOp.nextIndexToProcess; int lastIndexExclusive = firstIndex; boolean success = false; - int noOfPuts = 0, noOfDeletes = 0; - WALKey walKey = null; - long mvccNum = 0; + int noOfPuts = 0; + int noOfDeletes = 0; + WriteEntry writeEntry = null; + /** Keep track of the locks we hold so we can release them in finally clause */ + List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length); try { - // ------------------------------------ - // STEP 1. Try to acquire as many locks as we can, and ensure - // we acquire at least one. - // ---------------------------------- + // STEP 1. Try to acquire as many locks as we can, and ensure we acquire at least one. int numReadyToWrite = 0; long now = EnvironmentEdgeManager.currentTime(); while (lastIndexExclusive < batchOp.operations.length) { - Mutation mutation = batchOp.getMutation(lastIndexExclusive); - boolean isPutMutation = mutation instanceof Put; - - Map<byte[], List<Cell>> familyMap = mutation.getFamilyCellMap(); - // store the family map reference to allow for mutations - familyMaps[lastIndexExclusive] = familyMap; - - // skip anything that "ran" already - if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode() - != OperationStatusCode.NOT_RUN) { - lastIndexExclusive++; - continue; - } - - try { - if (isPutMutation) { - // Check the families in the put. If bad, skip this one. - if (isInReplay) { - removeNonExistentColumnFamilyForReplay(familyMap); - } else { - checkFamilies(familyMap.keySet()); - } - checkTimestamps(mutation.getFamilyCellMap(), now); - } else { - prepareDelete((Delete) mutation); - } - checkRow(mutation.getRow(), "doMiniBatchMutation"); - } catch (NoSuchColumnFamilyException nscf) { - LOG.warn("No such column family in batch mutation", nscf); - batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus( - OperationStatusCode.BAD_FAMILY, nscf.getMessage()); - lastIndexExclusive++; - continue; - } catch (FailedSanityCheckException fsce) { - LOG.warn("Batch Mutation did not pass sanity check", fsce); - batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus( - OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage()); - lastIndexExclusive++; - continue; - } catch (WrongRegionException we) { - LOG.warn("Batch mutation had a row that does not belong to this region", we); - batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus( - OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage()); + if (checkBatchOp(batchOp, lastIndexExclusive, familyMaps, now)) { lastIndexExclusive++; continue; } - - // If we haven't got any rows in our batch, we should block to - // get the next one. + Mutation mutation = batchOp.getMutation(lastIndexExclusive); + // If we haven't got any rows in our batch, we should block to get the next one. RowLock rowLock = null; try { rowLock = getRowLock(mutation.getRow(), true); } catch (IOException ioe) { - LOG.warn("Failed getting lock in batch put, row=" - + Bytes.toStringBinary(mutation.getRow()), ioe); + LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(mutation.getRow()), ioe); } if (rowLock == null) { // We failed to grab another lock - break; // stop acquiring more rows for this batch + break; // Stop acquiring more rows for this batch } else { acquiredRowLocks.add(rowLock); } @@ -3080,9 +2991,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi lastIndexExclusive++; numReadyToWrite++; - if (isPutMutation) { + if (mutation instanceof Put) { // If Column Families stay consistent through out all of the - // individual puts then metrics can be reported as a mutliput across + // individual puts then metrics can be reported as a multiput across // column families in the first put. if (putsCfSet == null) { putsCfSet = mutation.getFamilyCellMap().keySet(); @@ -3100,23 +3011,26 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - // we should record the timestamp only after we have acquired the rowLock, + // We've now grabbed as many mutations off the list as we can + + // STEP 2. Update any LATEST_TIMESTAMP timestamps + // We should record the timestamp only after we have acquired the rowLock, // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp now = EnvironmentEdgeManager.currentTime(); byte[] byteNow = Bytes.toBytes(now); // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily? - if (numReadyToWrite <= 0) return 0L; - - // We've now grabbed as many mutations off the list as we can + if (numReadyToWrite <= 0) { + return 0L; + } - // ------------------------------------ - // STEP 2. Update any LATEST_TIMESTAMP timestamps - // ---------------------------------- - for (int i = firstIndex; !isInReplay && i < lastIndexExclusive; i++) { + for (int i = firstIndex; !replay && i < lastIndexExclusive; i++) { // skip invalid if (batchOp.retCodeDetails[i].getOperationStatusCode() - != OperationStatusCode.NOT_RUN) continue; + != OperationStatusCode.NOT_RUN) { + // lastIndexExclusive was incremented above. + continue; + } Mutation mutation = batchOp.getMutation(i); if (mutation instanceof Put) { @@ -3133,16 +3047,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi locked = true; // calling the pre CP hook for batch mutation - if (!isInReplay && coprocessorHost != null) { + if (!replay && coprocessorHost != null) { MiniBatchOperationInProgress<Mutation> miniBatchOp = new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(), batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); - if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L; + if (coprocessorHost.preBatchMutate(miniBatchOp)) { + return 0L; + } } - // ------------------------------------ // STEP 3. Build WAL edit - // ---------------------------------- Durability durability = Durability.USE_DEFAULT; for (int i = firstIndex; i < lastIndexExclusive; i++) { // Skip puts that were determined to be invalid during preprocessing @@ -3160,26 +3074,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi continue; } - long nonceGroup = batchOp.getNonceGroup(i), nonce = batchOp.getNonce(i); + long nonceGroup = batchOp.getNonceGroup(i); + long nonce = batchOp.getNonce(i); // In replay, the batch may contain multiple nonces. If so, write WALEdit for each. // Given how nonces are originally written, these should be contiguous. // They don't have to be, it will still work, just write more WALEdits than needed. if (nonceGroup != currentNonceGroup || nonce != currentNonce) { - if (walEdit.size() > 0) { - assert isInReplay; - if (!isInReplay) { - throw new IOException("Multiple nonces per batch and not in replay"); - } - // txid should always increase, so having the one from the last call is ok. - // we use HLogKey here instead of WALKey directly to support legacy coprocessors. - walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), now, m.getClusterIds(), - currentNonceGroup, currentNonce, mvcc); - txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, - walEdit, true); - walEdit = new WALEdit(isInReplay); - walKey = null; - } + // Write what we have so far for nonces out to WAL + appendCurrentNonces(m, replay, walEdit, now, currentNonceGroup, currentNonce); + walEdit = new WALEdit(replay); currentNonceGroup = nonceGroup; currentNonce = nonce; } @@ -3194,107 +3097,83 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi addFamilyMapToWALEdit(familyMaps[i], walEdit); } - // ------------------------- - // STEP 4. Append the final edit to WAL. Do not sync wal. - // ------------------------- + // STEP 4. Append the final edit to WAL and sync. Mutation mutation = batchOp.getMutation(firstIndex); - if (isInReplay) { + WALKey walKey = null; + if (replay) { // use wal key from the original walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(), this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc); - long replaySeqId = batchOp.getReplaySequenceId(); - walKey.setOrigLogSeqNum(replaySeqId); - } - if (walEdit.size() > 0) { - if (!isInReplay) { - // we use HLogKey here instead of WALKey directly to support legacy coprocessors. - walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), + walKey.setOrigLogSeqNum(batchOp.getReplaySequenceId()); + } + // Not sure what is going on here when replay is going on... does the below append get + // called for replayed edits? Am afraid to change it without test. + if (!walEdit.isEmpty()) { + if (!replay) { + // we use HLogKey here instead of WALKey directly to support legacy coprocessors. + walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc); } - txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true); + // TODO: Use the doAppend methods below... complicated by the replay stuff above. + try { + long txid = + this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true); + if (txid != 0) sync(txid, durability); + writeEntry = walKey.getWriteEntry(); + } catch (IOException ioe) { + if (walKey != null) mvcc.complete(walKey.getWriteEntry()); + throw ioe; + } } - // ------------------------------------ - // Acquire the latest mvcc number - // ---------------------------------- if (walKey == null) { - // If this is a skip wal operation just get the read point from mvcc - walKey = this.appendEmptyEdit(this.wal); - } - if (!isInReplay) { - writeEntry = walKey.getWriteEntry(); - mvccNum = writeEntry.getWriteNumber(); - } else { - mvccNum = batchOp.getReplaySequenceId(); + // If no walKey, then skipping WAL or some such. Being an mvcc transaction so sequenceid. + writeEntry = mvcc.begin(); } - // ------------------------------------ // STEP 5. Write back to memstore - // Write to memstore. It is ok to write to memstore - // first without syncing the WAL because we do not roll - // forward the memstore MVCC. The MVCC will be moved up when - // the complete operation is done. These changes are not yet - // visible to scanners till we update the MVCC. The MVCC is - // moved only when the sync is complete. - // ---------------------------------- long addedSize = 0; for (int i = firstIndex; i < lastIndexExclusive; i++) { - if (batchOp.retCodeDetails[i].getOperationStatusCode() - != OperationStatusCode.NOT_RUN) { + if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { continue; } - doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote - addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, isInReplay); + addedSize += applyFamilyMapToMemstore(familyMaps[i], replay, + replay? batchOp.getReplaySequenceId(): writeEntry.getWriteNumber()); + } + + // STEP 6. Complete mvcc. + if (replay) { + this.mvcc.advanceTo(batchOp.getReplaySequenceId()); + } else if (writeEntry != null/*Can be null if in replay mode*/) { + mvcc.completeAndWait(writeEntry); + writeEntry = null; } - // ------------------------------- - // STEP 6. Release row locks, etc. - // ------------------------------- + // STEP 7. Release row locks, etc. if (locked) { this.updatesLock.readLock().unlock(); locked = false; } releaseRowLocks(acquiredRowLocks); - // ------------------------- - // STEP 7. Sync wal. - // ------------------------- - if (txid != 0) { - syncOrDefer(txid, durability); - } - - doRollBackMemstore = false; // calling the post CP hook for batch mutation - if (!isInReplay && coprocessorHost != null) { + if (!replay && coprocessorHost != null) { MiniBatchOperationInProgress<Mutation> miniBatchOp = new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(), batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); coprocessorHost.postBatchMutate(miniBatchOp); } - // ------------------------------------------------------------------ - // STEP 8. Advance mvcc. This will make this put visible to scanners and getters. - // ------------------------------------------------------------------ - if (writeEntry != null) { - mvcc.completeAndWait(writeEntry); - writeEntry = null; - } else if (isInReplay) { - // ensure that the sequence id of the region is at least as big as orig log seq id - mvcc.advanceTo(mvccNum); - } - for (int i = firstIndex; i < lastIndexExclusive; i ++) { if (batchOp.retCodeDetails[i] == OperationStatus.NOT_RUN) { batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; } } - // ------------------------------------ - // STEP 9. Run coprocessor post hooks. This should be done after the wal is + // STEP 8. Run coprocessor post hooks. This should be done after the wal is // synced so that the coprocessor contract is adhered to. - // ------------------------------------ - if (!isInReplay && coprocessorHost != null) { + if (!replay && coprocessorHost != null) { for (int i = firstIndex; i < lastIndexExclusive; i++) { // only for successful puts if (batchOp.retCodeDetails[i].getOperationStatusCode() @@ -3313,18 +3192,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi success = true; return addedSize; } finally { - // if the wal sync was unsuccessful, remove keys from memstore - if (doRollBackMemstore) { - for (int j = 0; j < familyMaps.length; j++) { - for(List<Cell> cells:familyMaps[j].values()) { - rollbackMemstore(cells); - } - } - if (writeEntry != null) mvcc.complete(writeEntry); - } else if (writeEntry != null) { - mvcc.completeAndWait(writeEntry); - } - + // Call complete rather than completeAndWait because we probably had error if walKey != null + if (writeEntry != null) mvcc.complete(writeEntry); if (locked) { this.updatesLock.readLock().unlock(); } @@ -3369,6 +3238,88 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + private void appendCurrentNonces(final Mutation mutation, final boolean replay, + final WALEdit walEdit, final long now, final long currentNonceGroup, final long currentNonce) + throws IOException { + if (walEdit.isEmpty()) return; + if (!replay) throw new IOException("Multiple nonces per batch and not in replay"); + WALKey walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), now, mutation.getClusterIds(), + currentNonceGroup, currentNonce, mvcc); + this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true); + // Complete the mvcc transaction started down in append else it will block others + this.mvcc.complete(walKey.getWriteEntry()); + } + + private boolean checkBatchOp(BatchOperation<?> batchOp, final int lastIndexExclusive, + final Map<byte[], List<Cell>>[] familyMaps, final long now) + throws IOException { + boolean skip = false; + // Skip anything that "ran" already + if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode() + != OperationStatusCode.NOT_RUN) { + return true; + } + Mutation mutation = batchOp.getMutation(lastIndexExclusive); + Map<byte[], List<Cell>> familyMap = mutation.getFamilyCellMap(); + // store the family map reference to allow for mutations + familyMaps[lastIndexExclusive] = familyMap; + + try { + if (mutation instanceof Put) { + // Check the families in the put. If bad, skip this one. + if (batchOp.isInReplay()) { + removeNonExistentColumnFamilyForReplay(familyMap); + } else { + checkFamilies(familyMap.keySet()); + } + checkTimestamps(mutation.getFamilyCellMap(), now); + } else { + prepareDelete((Delete)mutation); + } + checkRow(mutation.getRow(), "doMiniBatchMutation"); + } catch (NoSuchColumnFamilyException nscf) { + LOG.warn("No such column family in batch mutation", nscf); + batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus( + OperationStatusCode.BAD_FAMILY, nscf.getMessage()); + skip = true; + } catch (FailedSanityCheckException fsce) { + LOG.warn("Batch Mutation did not pass sanity check", fsce); + batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus( + OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage()); + skip = true; + } catch (WrongRegionException we) { + LOG.warn("Batch mutation had a row that does not belong to this region", we); + batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus( + OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage()); + skip = true; + } + return skip; + } + + /** + * During replay, there could exist column families which are removed between region server + * failure and replay + */ + private void removeNonExistentColumnFamilyForReplay(final Map<byte[], List<Cell>> familyMap) { + List<byte[]> nonExistentList = null; + for (byte[] family : familyMap.keySet()) { + if (!this.htableDescriptor.hasFamily(family)) { + if (nonExistentList == null) { + nonExistentList = new ArrayList<byte[]>(); + } + nonExistentList.add(family); + } + } + if (nonExistentList != null) { + for (byte[] family : nonExistentList) { + // Perhaps schema was changed between crash and replay + LOG.info("No family for " + Bytes.toString(family) + " omit from reply."); + familyMap.remove(family); + } + } + } + /** * Returns effective durability from the passed durability and * the table descriptor. @@ -3377,93 +3328,82 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return d == Durability.USE_DEFAULT ? this.durability : d; } - //TODO, Think that gets/puts and deletes should be refactored a bit so that - //the getting of the lock happens before, so that you would just pass it into - //the methods. So in the case of checkAndMutate you could just do lockRow, - //get, put, unlockRow or something - @Override public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier, - CompareOp compareOp, ByteArrayComparable comparator, Mutation w, + CompareOp compareOp, ByteArrayComparable comparator, Mutation mutation, boolean writeToWAL) throws IOException{ + checkMutationType(mutation, row); + return doCheckAndRowMutate(row, family, qualifier, compareOp, comparator, null, + mutation, writeToWAL); + } + + @Override + public boolean checkAndRowMutate(byte [] row, byte [] family, byte [] qualifier, + CompareOp compareOp, ByteArrayComparable comparator, RowMutations rm, + boolean writeToWAL) + throws IOException { + return doCheckAndRowMutate(row, family, qualifier, compareOp, comparator, rm, null, + writeToWAL); + } + + /** + * checkAndMutate and checkAndRowMutate are 90% the same. Rather than copy/paste, below has + * switches in the few places where there is deviation. + */ + private boolean doCheckAndRowMutate(byte [] row, byte [] family, byte [] qualifier, + CompareOp compareOp, ByteArrayComparable comparator, RowMutations rowMutations, + Mutation mutation, boolean writeToWAL) + throws IOException { + // Could do the below checks but seems wacky with two callers only. Just comment out for now. + // One caller passes a Mutation, the other passes RowMutation. Presume all good so we don't + // need these commented out checks. + // if (rowMutations == null && mutation == null) throw new DoNotRetryIOException("Both null"); + // if (rowMutations != null && mutation != null) throw new DoNotRetryIOException("Both set"); checkReadOnly(); - //TODO, add check for value length or maybe even better move this to the - //client if this becomes a global setting + // TODO, add check for value length also move this check to the client checkResources(); - boolean isPut = w instanceof Put; - if (!isPut && !(w instanceof Delete)) - throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action must " + - "be Put or Delete"); - if (!Bytes.equals(row, w.getRow())) { - throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action's " + - "getRow must match the passed row"); - } - startRegionOperation(); try { Get get = new Get(row); checkFamily(family); get.addColumn(family, qualifier); - // Lock row - note that doBatchMutate will relock this row if called RowLock rowLock = getRowLock(get.getRow()); - // wait for all previous transactions to complete (with lock held) - mvcc.await(); try { - if (this.getCoprocessorHost() != null) { + if (mutation != null && this.getCoprocessorHost() != null) { + // Call coprocessor. Boolean processed = null; - if (w instanceof Put) { + if (mutation instanceof Put) { processed = this.getCoprocessorHost().preCheckAndPutAfterRowLock(row, family, - qualifier, compareOp, comparator, (Put) w); - } else if (w instanceof Delete) { + qualifier, compareOp, comparator, (Put)mutation); + } else if (mutation instanceof Delete) { processed = this.getCoprocessorHost().preCheckAndDeleteAfterRowLock(row, family, - qualifier, compareOp, comparator, (Delete) w); + qualifier, compareOp, comparator, (Delete)mutation); } if (processed != null) { return processed; } } + // NOTE: We used to wait here until mvcc caught up: mvcc.await(); + // Supposition is that now all changes are done under row locks, then when we go to read, + // we'll get the latest on this row. List<Cell> result = get(get, false); - - boolean valueIsNull = comparator.getValue() == null || - comparator.getValue().length == 0; + boolean valueIsNull = comparator.getValue() == null || comparator.getValue().length == 0; boolean matches = false; long cellTs = 0; if (result.size() == 0 && valueIsNull) { matches = true; - } else if (result.size() > 0 && result.get(0).getValueLength() == 0 && - valueIsNull) { + } else if (result.size() > 0 && result.get(0).getValueLength() == 0 && valueIsNull) { matches = true; cellTs = result.get(0).getTimestamp(); } else if (result.size() == 1 && !valueIsNull) { Cell kv = result.get(0); cellTs = kv.getTimestamp(); int compareResult = CellComparator.compareValue(kv, comparator); - switch (compareOp) { - case LESS: - matches = compareResult < 0; - break; - case LESS_OR_EQUAL: - matches = compareResult <= 0; - break; - case EQUAL: - matches = compareResult == 0; - break; - case NOT_EQUAL: - matches = compareResult != 0; - break; - case GREATER_OR_EQUAL: - matches = compareResult >= 0; - break; - case GREATER: - matches = compareResult > 0; - break; - default: - throw new RuntimeException("Unknown Compare op " + compareOp.name()); - } + matches = matches(compareOp, compareResult); } - //If matches put the new put or delete the new delete + // If matches put the new put or delete the new delete if (matches) { // We have acquired the row lock already. If the system clock is NOT monotonically // non-decreasing (see HBASE-14070) we should make sure that the mutation has a @@ -3472,16 +3412,27 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi long now = EnvironmentEdgeManager.currentTime(); long ts = Math.max(now, cellTs); // ensure write is not eclipsed byte[] byteTs = Bytes.toBytes(ts); - - if (w instanceof Put) { - updateCellTimestamps(w.getFamilyCellMap().values(), byteTs); + if (mutation != null) { + if (mutation instanceof Put) { + updateCellTimestamps(mutation.getFamilyCellMap().values(), byteTs); + } + // And else 'delete' is not needed since it already does a second get, and sets the + // timestamp from get (see prepareDeleteTimestamps). + } else { + for (Mutation m: rowMutations.getMutations()) { + if (m instanceof Put) { + updateCellTimestamps(m.getFamilyCellMap().values(), byteTs); + } + } + // And else 'delete' is not needed since it already does a second get, and sets the + // timestamp from get (see prepareDeleteTimestamps). + } + // All edits for the given row (across all column families) must happen atomically. + if (mutation != null) { + doBatchMutate(mutation); + } else { + mutateRow(rowMutations); } - // else delete is not needed since it already does a second get, and sets the timestamp - // from get (see prepareDeleteTimestamps). - - // All edits for the given row (across all column families) must - // happen atomically. - doBatchMutate(w); this.checkAndMutateChecksPassed.increment(); return true; } @@ -3495,113 +3446,54 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - //TODO, Think that gets/puts and deletes should be refactored a bit so that - //the getting of the lock happens before, so that you would just pass it into - //the methods. So in the case of checkAndMutate you could just do lockRow, - //get, put, unlockRow or something + private void checkMutationType(final Mutation mutation, final byte [] row) + throws DoNotRetryIOException { + boolean isPut = mutation instanceof Put; + if (!isPut && !(mutation instanceof Delete)) { + throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action must be Put or Delete"); + } + if (!Bytes.equals(row, mutation.getRow())) { + throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action's getRow must match"); + } + } - @Override - public boolean checkAndRowMutate(byte [] row, byte [] family, byte [] qualifier, - CompareOp compareOp, ByteArrayComparable comparator, RowMutations rm, - boolean writeToWAL) throws IOException { - checkReadOnly(); - //TODO, add check for value length or maybe even better move this to the - //client if this becomes a global setting - checkResources(); + private boolean matches(final CompareOp compareOp, final int compareResult) { + boolean matches = false; + switch (compareOp) { + case LESS: + matches = compareResult < 0; + break; + case LESS_OR_EQUAL: + matches = compareResult <= 0; + break; + case EQUAL: + matches = compareResult == 0; + break; + case NOT_EQUAL: + matches = compareResult != 0; + break; + case GREATER_OR_EQUAL: + matches = compareResult >= 0; + break; + case GREATER: + matches = compareResult > 0; + break; + default: + throw new RuntimeException("Unknown Compare op " + compareOp.name()); + } + return matches; + } - startRegionOperation(); - try { - Get get = new Get(row); - checkFamily(family); - get.addColumn(family, qualifier); - // Lock row - note that doBatchMutate will relock this row if called - RowLock rowLock = getRowLock(get.getRow()); - // wait for all previous transactions to complete (with lock held) - mvcc.await(); - try { - List<Cell> result = get(get, false); - - boolean valueIsNull = comparator.getValue() == null || - comparator.getValue().length == 0; - boolean matches = false; - long cellTs = 0; - if (result.size() == 0 && valueIsNull) { - matches = true; - } else if (result.size() > 0 && result.get(0).getValueLength() == 0 && - valueIsNull) { - matches = true; - cellTs = result.get(0).getTimestamp(); - } else if (result.size() == 1 && !valueIsNull) { - Cell kv = result.get(0); - cellTs = kv.getTimestamp(); - int compareResult = CellComparator.compareValue(kv, comparator); - switch (compareOp) { - case LESS: - matches = compareResult < 0; - break; - case LESS_OR_EQUAL: - matches = compareResult <= 0; - break; - case EQUAL: - matches = compareResult == 0; - break; - case NOT_EQUAL: - matches = compareResult != 0; - break; - case GREATER_OR_EQUAL: - matches = compareResult >= 0; - break; - case GREATER: - matches = compareResult > 0; - break; - default: - throw new RuntimeException("Unknown Compare op " + compareOp.name()); - } - } - //If matches put the new put or delete the new delete - if (matches) { - // We have acquired the row lock already. If the system clock is NOT monotonically - // non-decreasing (see HBASE-14070) we should make sure that the mutation has a - // larger timestamp than what was observed via Get. doBatchMutate already does this, but - // there is no way to pass the cellTs. See HBASE-14054. - long now = EnvironmentEdgeManager.currentTime(); - long ts = Math.max(now, cellTs); // ensure write is not eclipsed - byte[] byteTs = Bytes.toBytes(ts); - - for (Mutation w : rm.getMutations()) { - if (w instanceof Put) { - updateCellTimestamps(w.getFamilyCellMap().values(), byteTs); - } - // else delete is not needed since it already does a second get, and sets the timestamp - // from get (see prepareDeleteTimestamps). - } - - // All edits for the given row (across all column families) must - // happen atomically. - mutateRow(rm); - this.checkAndMutateChecksPassed.increment(); - return true; - } - this.checkAndMutateChecksFailed.increment(); - return false; - } finally { - rowLock.release(); - } - } finally { - closeRegionOperation(); - } - } - - private void doBatchMutate(Mutation mutation) throws IOException { - // Currently this is only called for puts and deletes, so no nonces. - OperationStatus[] batchMutate = this.batchMutate(new Mutation[]{mutation}); - if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) { - throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg()); - } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) { - throw new NoSuchColumnFamilyException(batchMutate[0].getExceptionMsg()); - } - } + private void doBatchMutate(Mutation mutation) throws IOException { + // Currently this is only called for puts and deletes, so no nonces. + OperationStatus[] batchMutate = this.batchMutate(new Mutation[]{mutation}); + if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) { + throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg()); + } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) { + throw new NoSuchColumnFamilyException(batchMutate[0].getExceptionMsg()); + } + } /** * Complete taking the snapshot on the region. Writes the region info and adds references to the @@ -3663,40 +3555,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi void rewriteCellTags(Map<byte[], List<Cell>> familyMap, final Mutation m) { // Check if we have any work to do and early out otherwise // Update these checks as more logic is added here - if (m.getTTL() == Long.MAX_VALUE) { return; } // From this point we know we have some work to do - for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) { List<Cell> cells = e.getValue(); assert cells instanceof RandomAccess; int listSize = cells.size(); for (int i = 0; i < listSize; i++) { Cell cell = cells.get(i); - List<Tag> newTags = new ArrayList<Tag>(); - Iterator<Tag> tagIterator = CellUtil.tagsIterator(cell); - - // Carry forward existing tags - - while (tagIterator.hasNext()) { - - // Add any filters or tag specific rewrites here - - newTags.add(tagIterator.next()); - } - - // Cell TTL handling - - // Check again if we need to add a cell TTL because early out logic - // above may change when there are more tag based features in core. - if (m.getTTL() != Long.MAX_VALUE) { - // Add a cell TTL tag - newTags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(m.getTTL()))); - } - + List<Tag> newTags = TagUtil.carryForwardTags(null, cell); + newTags = TagUtil.carryForwardTTLTag(newTags, m.getTTL()); // Rewrite the cell with the updated set of tags cells.set(i, new TagRewriteCell(cell, TagUtil.fromList(newTags))); } @@ -3772,49 +3643,64 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * should already have locked updatesLock.readLock(). This also does * <b>not</b> check the families for validity. * - * @param familyMap Map of kvs per family - * @param mvccNum The MVCC for this transaction. - * @param isInReplay true when adding replayed KVs into memstore - * @return the additional memory usage of the memstore caused by the - * new entries. + * @param familyMap Map of Cells by family + * @return the additional memory usage of the memstore caused by the new entries. */ - private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap, - long mvccNum, boolean isInReplay) throws IOException { + private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap, boolean replay, + long sequenceId) + throws IOException { long size = 0; - for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) { byte[] family = e.getKey(); List<Cell> cells = e.getValue(); assert cells instanceof RandomAccess; - Store store = getStore(family); - int listSize = cells.size(); - for (int i=0; i < listSize; i++) { + size += applyToMemstore(getStore(family), cells, false, replay, sequenceId); + } + return size; + } + + /** + * @param delta If we are doing delta changes -- e.g. increment/append -- then this flag will be + * set; when set we will run operations that make sense in the increment/append scenario but + * that do not make sense otherwise. + * @return Memstore change in size on insert of these Cells. + * @see #applyToMemstore(Store, Cell, long) + */ + private long applyToMemstore(final Store store, final List<Cell> cells, + final boolean delta, boolean replay, long sequenceId) + throws IOException { + // Any change in how we update Store/MemStore needs to also be done in other applyToMemstore!!!! + long size = 0; + boolean upsert = delta && store.getFamily().getMaxVersions() == 1; + int count = cells.size(); + if (upsert) { + size += store.upsert(cells, getSmallestReadPoint()); + } else { + for (int i = 0; i < count; i++) { Cell cell = cells.get(i); - if (cell.getSequenceId() == 0 || isInReplay) { - CellUtil.setSequenceId(cell, mvccNum); + // TODO: This looks wrong.. checking for sequenceid of zero is expensive!!!!! St.Ack + // When is it zero anyways? When replay? Then just rely on that flag. + if (cell.getSequenceId() == 0 || replay) { + CellUtil.setSequenceId(cell, sequenceId); } size += store.add(cell); } } - - return size; - } + return size; + } /** - * Remove all the keys listed in the map from the memstore. This method is - * called when a Put/Delete has updated memstore but subsequently fails to update - * the wal. This method is then invoked to rollback the memstore. + * @return Memstore change in size on insert of these Cells. + * @see #applyToMemstore(Store, List, boolean, boolean, long) */ - private void rollbackMemstore(List<Cell> memstoreCells) { - int kvsRolledback = 0; - - for (Cell cell : memstoreCells) { - byte[] family = CellUtil.cloneFamily(cell); - Store store = getStore(family); - store.rollback(cell); - kvsRolledback++; + private long applyToMemstore(final Store store, final Cell cell, long sequenceId) + throws IOException { + // Any change in how we update Store/MemStore needs to also be done in other applyToMemstore!!!! + if (store == null) { + checkFamily(CellUtil.cloneFamily(cell)); + // Unreachable because checkFamily will throw exception } - LOG.debug("rollbackMemstore rolled back " + kvsRolledback); + return store.add(cell); } @Override @@ -3824,30 +3710,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - /** - * During replay, there could exist column families which are removed between region server - * failure and replay - */ - private void removeNonExistentColumnFamilyForReplay( - final Map<byte[], List<Cell>> familyMap) { - List<byte[]> nonExistentList = null; - for (byte[] family : familyMap.keySet()) { - if (!this.htableDescriptor.hasFamily(family)) { - if (nonExistentList == null) { - nonExistentList = new ArrayList<byte[]>(); - } - nonExistentList.add(family); - } - } - if (nonExistentList != null) { - for (byte[] family : nonExistentList) { - // Perhaps schema was changed between crash and replay - LOG.info("No family for " + Bytes.toString(family) + " omit from reply."); - familyMap.remove(family); - } - } - } - @Override public void checkTimestamps(final Map<byte[], List<Cell>> familyMap, long now) throws FailedSanityCheckException { @@ -5490,12 +5352,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return true; } finally { if (wal != null && !storeFiles.isEmpty()) { - // write a bulk load event when not all hfiles are loaded + // @rite a bulk load event when not all hfiles are loaded try { WALProtos.BulkLoadDescriptor loadDescriptor = ProtobufUtil.toBulkLoadDescriptor( this.getRegionInfo().getTable(), ByteStringer.wrap(this.getRegionInfo().getEncodedNameAsBytes()), storeFiles, seqId); - WALUtil.writeBulkLoadMarkerAndSync(wal, this.htableDescriptor, getRegionInfo(), + WALUtil.writeBulkLoadMarkerAndSync(this.wal, getTableDesc(), getRegionInfo(), loadDescriptor, mvcc); } catch (IOException ioe) { if (this.rsServices != null) { @@ -5593,7 +5455,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // getSmallestReadPoint, before scannerReadPoints is updated. IsolationLevel isolationLevel = scan.getIsolationLevel(); synchronized(scannerReadPoints) { - this.readPt = getReadpoint(isolationLevel); + this.readPt = getReadPoint(isolationLevel); scannerReadPoints.put(this, this.readPt); } @@ -5758,7 +5620,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // As the data is obtained from two independent heaps, we need to // ensure that result list is sorted, because Result relies on that. - Collections.sort(results, comparator); + sort(results, comparator); return moreValues; } @@ -6876,7 +6738,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public void processRowsWithLocks(RowProcessor<?,?> processor, long timeout, long nonceGroup, long nonce) throws IOException { - for (byte[] row : processor.getRowsToLock()) { checkRow(row, "processRowsWithLocks"); } @@ -6884,23 +6745,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi checkReadOnly(); } checkResources(); - startRegionOperation(); WALEdit walEdit = new WALEdit(); - // 1. Run pre-process hook - try { - processor.preProcess(this, walEdit); - } catch (IOException e) { - closeRegionOperation(); - throw e; - } + // STEP 1. Run pre-process hook + preProcess(processor, walEdit); // Short circuit the read only case if (processor.readOnly()) { try { long now = EnvironmentEdgeManager.currentTime(); - doProcessRowWithTimeout( - processor, now, this, null, null, timeout); + doProcessRowWithTimeout(processor, now, this, null, null, timeout); processor.postProcess(this, walEdit, true); } finally { closeRegionOperation(); @@ -6908,118 +6762,81 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return; } - MultiVersionConcurrencyControl.WriteEntry writeEntry = null; boolean locked; - boolean walSyncSuccessful = false; List<RowLock> acquiredRowLocks; long addedSize = 0; List<Mutation> mutations = new ArrayList<Mutation>(); Collection<byte[]> rowsToLock = processor.getRowsToLock(); - long mvccNum = 0; - WALKey walKey = null; + // This is assigned by mvcc either explicity in the below or in the guts of the WAL append + // when it assigns the edit a sequencedid (A.K.A the mvcc write number). + WriteEntry writeEntry = null; try { - // 2. Acquire the row lock(s) + // STEP 2. Acquire the row lock(s) acquiredRowLocks = new ArrayList<RowLock>(rowsToLock.size()); for (byte[] row : rowsToLock) { // Attempt to lock all involved rows, throw if any lock times out // use a writer lock for mixed reads and writes acquiredRowLocks.add(getRowLock(row)); } - // 3. Region lock + // STEP 3. Region lock lock(this.updatesLock.readLock(), acquiredRowLocks.size() == 0 ? 1 : acquiredRowLocks.size()); locked = true; - + boolean success = false; long now = EnvironmentEdgeManager.currentTime(); try { - // 4. Let the processor scan the rows, generate mutations and add - // waledits - doProcessRowWithTimeout( - processor, now, this, mutations, walEdit, timeout); - + // STEP 4. Let the processor scan the rows, generate mutations and add waledits + doProcessRowWithTimeout(processor, now, this, mutations, walEdit, timeout); if (!mutations.isEmpty()) { - - // 5. Call the preBatchMutate hook + // STEP 5. Call the preBatchMutate hook processor.preBatchMutate(this, walEdit); - long txid = 0; - // 6. Append no sync + // STEP 6. Append and sync if walEdit has data to write out. if (!walEdit.isEmpty()) { - // we use HLogKey here instead of WALKey directly to support legacy coprocessors. - walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, - processor.getClusterIds(), nonceGroup, nonce, mvcc); - txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), - walKey, walEdit, false); - } - if(walKey == null){ - // since we use wal sequence Id as mvcc, for SKIP_WAL changes we need a "faked" WALEdit - // to get a sequence id assigned which is done by FSWALEntry#stampRegionSequenceId - walKey = this.appendEmptyEdit(this.wal); + writeEntry = doWALAppend(walEdit, getEffectiveDurability(processor.useDurability()), + processor.getClusterIds(), now, nonceGroup, nonce); + } else { + // We are here if WAL is being skipped. + writeEntry = this.mvcc.begin(); } - // 7. Start mvcc transaction - writeEntry = walKey.getWriteEntry(); - mvccNum = walKey.getSequenceId(); - - - - // 8. Apply to memstore + // STEP 7. Apply to memstore + long sequenceId = writeEntry.getWriteNumber(); for (Mutation m : mutations) { - // Handle any tag based cell features + // Handle any tag based cell features. + // TODO: Do we need to call rewriteCellTags down in applyToMemstore()? Why not before + // so tags go into WAL? rewriteCellTags(m.getFamilyCellMap(), m); - for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) { Cell cell = cellScanner.current(); - CellUtil.setSequenceId(cell, mvccNum); - Store store = getStore(cell); - if (store == null) { - checkFamily(CellUtil.cloneFamily(cell)); - // unreachable + if (walEdit.isEmpty()) { + // If walEdit is empty, we put nothing in WAL. WAL stamps Cells with sequence id. + // If no WAL, need to stamp it here. + CellUtil.setSequenceId(cell, sequenceId); } - addedSize += store.add(cell); + Store store = getStore(cell); + addedSize += applyToMemstore(store, cell, sequenceId); } } + // STEP 8. Complete mvcc. + mvcc.completeAndWait(writeEntry); + writeEntry = null; - // 9. Release region lock + // STEP 9. Release region lock if (locked) { this.updatesLock.readLock().unlock(); locked = false; } - // 10. Release row lock(s) + // STEP 10. Release row lock(s) releaseRowLocks(acquiredRowLocks); - // 11. Sync edit log - if (txid != 0) { - syncOrDefer(txid, getEffectiveDurability(processor.useDurability())); - } - walSyncSuccessful = true; - // 12. call postBatchMutate hook + // STEP 11. call postBatchMutate hook processor.postBatchMutate(this); } + success = true; } finally { - // TODO: Make this method look like all other methods that are doing append/sync and - // memstore rollback such as append and doMiniBatchMutation. Currently it is a little - // different. Make them all share same code! - if (!mutations.isEmpty() && !walSyncSuccessful) { - LOG.warn("Wal sync failed. Roll back " + mutations.size() + - " memstore keyvalues for row(s):" + StringUtils.byteToHexString( - processor.getRowsToLock().iterator().next()) + "..."); - for (Mutation m : mutations) { - for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) { - Cell cell = cellScanner.current(); - getStore(cell).rollback(cell); - } - } - if (writeEntry != null) { - mvcc.complete(writeEntry); - writeEntry = null; - } - } - // 13. Roll mvcc forward - if (writeEntry != null) { - mvcc.completeAndWait(writeEntry); - } + // Call complete rather than completeAndWait because we probably had error if walKey != null + if (writeEntry != null) mvcc.complete(writeEntry); if (locked) { this.updatesLock.readLock().unlock(); } @@ -7027,18 +6844,26 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi releaseRowLocks(acquiredRowLocks); } - // 14. Run post-process hook - processor.postProcess(this, walEdit, walSyncSuccessful); - + // 12. Run post-process hook + processor.postProcess(this, walEdit, success); } finally { closeRegionOperation(); - if (!mutations.isEmpty() && - isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize))) { + if (!mutations.isEmpty() && isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize))) { requestFlush(); } } } + private void preProcess(final RowProcessor<?,?> processor, final WALEdit walEdit) + throws IOException { + try { + processor.preProcess(this, walEdit); + } catch (IOException e) { + closeRegionOperation(); + throw e; + } + } + private void doProcessRowWithTimeout(final RowProcessor<?,?> processor, final long now, final HRegion region, @@ -7089,500 +6914,400 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - /** - * @return The passed-in {@code tags} but with the tags from {@code cell} added. - */ - private static List<Tag> carryForwardTags(final Cell cell, final List<Tag> tags) { - if (cell.getTagsLength() <= 0) return tags; - List<Tag> newTags = tags == null? new ArrayList<Tag>(): /*Append Tags*/tags; - Iterator<Tag> i = CellUtil.tagsIterator(cell); - while (i.hasNext()) newTags.add(i.next()); - return newTags; + public Result append(Append append) throws IOException { + return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE); } - /** - * Run a Get against passed in <code>store</code> on passed <code>row</code>, etc. - * @return Get result. - */ - private List<Cell> doGet(final Store store, final byte [] row, - final Map.Entry<byte[], List<Cell>> family, final TimeRange tr) - throws IOException { - // Sort the cells so that they match the order that they - // appear in the Get results. Otherwise, we won't be able to - // find the existing values if the cells are not specified - // in order by the client since cells are in an array list. - Collections.sort(family.getValue(), store.getComparator()); - // Get previous values for all columns in this family - Get get = new Get(row); - for (Cell cell : family.getValue()) { - get.addColumn(family.getKey(), CellUtil.cloneQualifier(cell)); - } - if (tr != null) get.setTimeRange(tr.getMin(), tr.getMax()); - return get(get, false); + @Override + public Result append(Append mutation, long nonceGroup, long nonce) throws IOException { + return doDelta(Operation.APPEND, mutation, nonceGroup, nonce, mutation.isReturnResults()); } - public Result append(Append append) throws IOException { - return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE); + public Result increment(Increment increment) throws IOException { + return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE); } - // TODO: There's a lot of boiler plate code identical to increment. - // We should refactor append and increment as local get-mutate-put - // transactions, so all stores only go through one code path for puts. - @Override - public Result append(Append mutate, long nonceGroup, long nonce) throws IOException { - Operation op = Operation.APPEND; - byte[] row = mutate.getRow(); - checkRow(row, op.toString()); - checkFamilies(mutate.getFamilyCellMap().keySet()); - boolean flush = false; - Durability durability = getEffectiveDurability(mutate.getDurability()); - boolean writeToWAL = durability != Durability.SKIP_WAL; - WALEdit walEdits = null; - List<Cell> allKVs = new ArrayList<Cell>(mutate.size()); - Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>(); - long size = 0; - long txid = 0; + public Result increment(Increment mutation, long nonceGroup, long nonce) + throws IOException { + return doDelta(Operation.INCREMENT, mutation, nonceGroup, nonce, mutation.isReturnResults()); + } + + /** + * Add "deltas" to Cells. Deltas are increments or appends. Switch on <code>op</code>. + * + * <p>If increment, add deltas to current values or if an append, then + * append the deltas to the current Cell values. + * + * <p>Append and Increment code paths are mostly the same. They differ in just a few pl
<TRUNCATED>