HBASE-14919 Refactoring for in-memory flush and compaction Signed-off-by: stack <st...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/25dfc112 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/25dfc112 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/25dfc112 Branch: refs/heads/hbase-12439 Commit: 25dfc112dd76134a9a3ce1f2e88c4075aef76557 Parents: a975408 Author: eshcar <esh...@yahoo-inc.com> Authored: Mon Feb 8 23:35:02 2016 +0200 Committer: stack <st...@apache.org> Committed: Thu Feb 11 10:39:01 2016 -0800 ---------------------------------------------------------------------- .../hbase/regionserver/AbstractMemStore.java | 497 +++++++++++ .../hadoop/hbase/regionserver/CellSet.java | 183 ++++ .../hbase/regionserver/CellSkipListSet.java | 185 ---- .../hbase/regionserver/DefaultMemStore.java | 859 +------------------ .../hadoop/hbase/regionserver/HStore.java | 22 +- .../hbase/regionserver/ImmutableSegment.java | 72 ++ .../regionserver/ImmutableSegmentAdapter.java | 107 +++ .../hadoop/hbase/regionserver/MemStore.java | 16 +- .../hbase/regionserver/MemStoreScanner.java | 348 ++++++++ .../hbase/regionserver/MemStoreSnapshot.java | 13 +- .../regionserver/MutableCellSetSegment.java | 153 ++++ .../MutableCellSetSegmentScanner.java | 258 ++++++ .../hbase/regionserver/MutableSegment.java | 57 ++ .../hadoop/hbase/regionserver/Segment.java | 218 +++++ .../hbase/regionserver/SegmentFactory.java | 89 ++ .../hbase/regionserver/SegmentScanner.java | 152 ++++ .../hbase/regionserver/StoreFlushContext.java | 2 +- .../apache/hadoop/hbase/io/TestHeapSize.java | 49 +- .../hbase/regionserver/TestCellSkipListSet.java | 13 +- .../hbase/regionserver/TestDefaultMemStore.java | 133 +-- .../hbase/regionserver/TestHMobStore.java | 29 +- .../hadoop/hbase/regionserver/TestHRegion.java | 150 ++-- .../regionserver/TestMemStoreChunkPool.java | 29 +- .../hadoop/hbase/regionserver/TestStore.java | 11 +- 24 files changed, 2380 insertions(+), 1265 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/25dfc112/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java new file mode 100644 index 0000000..18d2f8a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java @@ -0,0 +1,497 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NavigableSet; +import java.util.SortedSet; + +import org.apache.commons.logging.Log; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +/** + * An abstract class, which implements the behaviour shared by all concrete memstore instances. + */ +@InterfaceAudience.Private +public abstract class AbstractMemStore implements MemStore { + + private static final long NO_SNAPSHOT_ID = -1; + + private final Configuration conf; + private final CellComparator comparator; + + // active segment absorbs write operations + private volatile MutableSegment active; + // Snapshot of memstore. Made for flusher. + private volatile ImmutableSegment snapshot; + protected volatile long snapshotId; + // Used to track when to flush + private volatile long timeOfOldestEdit; + + public final static long FIXED_OVERHEAD = ClassSize.align( + ClassSize.OBJECT + + (4 * ClassSize.REFERENCE) + + (2 * Bytes.SIZEOF_LONG)); + + public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + + 2 * (ClassSize.ATOMIC_LONG + ClassSize.TIMERANGE_TRACKER + + ClassSize.CELL_SKIPLIST_SET + ClassSize.CONCURRENT_SKIPLISTMAP)); + + + protected AbstractMemStore(final Configuration conf, final CellComparator c) { + this.conf = conf; + this.comparator = c; + resetCellSet(); + this.snapshot = SegmentFactory.instance().createImmutableSegment(conf, c, 0); + this.snapshotId = NO_SNAPSHOT_ID; + } + + protected void resetCellSet() { + // Reset heap to not include any keys + this.active = SegmentFactory.instance().createMutableSegment( + conf, comparator, DEEP_OVERHEAD); + this.timeOfOldestEdit = Long.MAX_VALUE; + } + + /* + * Calculate how the MemStore size has changed. Includes overhead of the + * backing Map. + * @param cell + * @param notPresent True if the cell was NOT present in the set. + * @return change in size + */ + static long heapSizeChange(final Cell cell, final boolean notPresent) { + return notPresent ? ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + + CellUtil.estimatedHeapSizeOf(cell)) : 0; + } + + /** + * Updates the wal with the lowest sequence id (oldest entry) that is still in memory + * @param onlyIfMoreRecent a flag that marks whether to update the sequence id no matter what or + * only if it is greater than the previous sequence id + */ + public abstract void updateLowestUnflushedSequenceIdInWal(boolean onlyIfMoreRecent); + + /** + * Write an update + * @param cell the cell to be added + * @return approximate size of the passed cell & newly added cell which maybe different than the + * passed-in cell + */ + @Override + public long add(Cell cell) { + Cell toAdd = maybeCloneWithAllocator(cell); + return internalAdd(toAdd); + } + + /** + * Update or insert the specified Cells. + * <p> + * For each Cell, insert into MemStore. This will atomically upsert the + * value for that row/family/qualifier. If a Cell did already exist, + * it will then be removed. + * <p> + * Currently the memstoreTS is kept at 0 so as each insert happens, it will + * be immediately visible. May want to change this so it is atomic across + * all Cells. + * <p> + * This is called under row lock, so Get operations will still see updates + * atomically. Scans will only see each Cell update as atomic. + * + * @param cells the cells to be updated + * @param readpoint readpoint below which we can safely remove duplicate KVs + * @return change in memstore size + */ + @Override + public long upsert(Iterable<Cell> cells, long readpoint) { + long size = 0; + for (Cell cell : cells) { + size += upsert(cell, readpoint); + } + return size; + } + + /** + * @return Oldest timestamp of all the Cells in the MemStore + */ + @Override + public long timeOfOldestEdit() { + return timeOfOldestEdit; + } + + + /** + * Write a delete + * @param deleteCell the cell to be deleted + * @return approximate size of the passed key and value. + */ + @Override + public long delete(Cell deleteCell) { + Cell toAdd = maybeCloneWithAllocator(deleteCell); + long s = internalAdd(toAdd); + return s; + } + + /** + * An override on snapshot so the no arg version of the method implies zero seq num, + * like for cases without wal + */ + public MemStoreSnapshot snapshot() { + return snapshot(0); + } + + /** + * The passed snapshot was successfully persisted; it can be let go. + * @param id Id of the snapshot to clean out. + * @see MemStore#snapshot(long) + */ + @Override + public void clearSnapshot(long id) throws UnexpectedStateException { + if (this.snapshotId != id) { + throw new UnexpectedStateException("Current snapshot id is " + this.snapshotId + ",passed " + + id); + } + // OK. Passed in snapshot is same as current snapshot. If not-empty, + // create a new snapshot and let the old one go. + Segment oldSnapshot = this.snapshot; + if (!this.snapshot.isEmpty()) { + this.snapshot = SegmentFactory.instance().createImmutableSegment( + getComparator(), 0); + } + this.snapshotId = NO_SNAPSHOT_ID; + oldSnapshot.close(); + } + + /** + * Get the entire heap usage for this MemStore not including keys in the + * snapshot. + */ + @Override + public long heapSize() { + return getActive().getSize(); + } + + /** + * On flush, how much memory we will clear from the active cell set. + * + * @return size of data that is going to be flushed from active set + */ + @Override + public long getFlushableSize() { + long snapshotSize = getSnapshot().getSize(); + return snapshotSize > 0 ? snapshotSize : keySize(); + } + + + /** + * @return a list containing a single memstore scanner. + */ + @Override + public List<KeyValueScanner> getScanners(long readPt) throws IOException { + return Collections.<KeyValueScanner> singletonList(new MemStoreScanner(this, readPt)); + } + + @Override + public long getSnapshotSize() { + return getSnapshot().getSize(); + } + + @Override + public String toString() { + StringBuffer buf = new StringBuffer(); + int i = 1; + try { + for (Segment segment : getListOfSegments()) { + buf.append("Segment (" + i + ") " + segment.toString() + "; "); + i++; + } + } catch (IOException e){ + return e.toString(); + } + return buf.toString(); + } + + protected void rollbackInSnapshot(Cell cell) { + // If the key is in the snapshot, delete it. We should not update + // this.size, because that tracks the size of only the memstore and + // not the snapshot. The flush of this snapshot to disk has not + // yet started because Store.flush() waits for all rwcc transactions to + // commit before starting the flush to disk. + snapshot.rollback(cell); + } + + protected void rollbackInActive(Cell cell) { + // If the key is in the memstore, delete it. Update this.size. + long sz = active.rollback(cell); + if (sz != 0) { + setOldestEditTimeToNow(); + } + } + + protected Configuration getConfiguration() { + return conf; + } + + protected void dump(Log log) { + active.dump(log); + snapshot.dump(log); + } + + + /** + * Inserts the specified Cell into MemStore and deletes any existing + * versions of the same row/family/qualifier as the specified Cell. + * <p> + * First, the specified Cell is inserted into the Memstore. + * <p> + * If there are any existing Cell in this MemStore with the same row, + * family, and qualifier, they are removed. + * <p> + * Callers must hold the read lock. + * + * @param cell the cell to be updated + * @param readpoint readpoint below which we can safely remove duplicate KVs + * @return change in size of MemStore + */ + private long upsert(Cell cell, long readpoint) { + // Add the Cell to the MemStore + // Use the internalAdd method here since we (a) already have a lock + // and (b) cannot safely use the MSLAB here without potentially + // hitting OOME - see TestMemStore.testUpsertMSLAB for a + // test that triggers the pathological case if we don't avoid MSLAB + // here. + long addedSize = internalAdd(cell); + + // Get the Cells for the row/family/qualifier regardless of timestamp. + // For this case we want to clean up any other puts + Cell firstCell = KeyValueUtil.createFirstOnRow( + cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), + cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), + cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); + SortedSet<Cell> ss = active.tailSet(firstCell); + Iterator<Cell> it = ss.iterator(); + // versions visible to oldest scanner + int versionsVisible = 0; + while (it.hasNext()) { + Cell cur = it.next(); + + if (cell == cur) { + // ignore the one just put in + continue; + } + // check that this is the row and column we are interested in, otherwise bail + if (CellUtil.matchingRow(cell, cur) && CellUtil.matchingQualifier(cell, cur)) { + // only remove Puts that concurrent scanners cannot possibly see + if (cur.getTypeByte() == KeyValue.Type.Put.getCode() && + cur.getSequenceId() <= readpoint) { + if (versionsVisible >= 1) { + // if we get here we have seen at least one version visible to the oldest scanner, + // which means we can prove that no scanner will see this version + + // false means there was a change, so give us the size. + long delta = heapSizeChange(cur, true); + addedSize -= delta; + active.incSize(-delta); + it.remove(); + setOldestEditTimeToNow(); + } else { + versionsVisible++; + } + } + } else { + // past the row or column, done + break; + } + } + return addedSize; + } + + /* + * @param a + * @param b + * @return Return lowest of a or b or null if both a and b are null + */ + protected Cell getLowest(final Cell a, final Cell b) { + if (a == null) { + return b; + } + if (b == null) { + return a; + } + return comparator.compareRows(a, b) <= 0? a: b; + } + + /* + * @param key Find row that follows this one. If null, return first. + * @param set Set to look in for a row beyond <code>row</code>. + * @return Next row or null if none found. If one found, will be a new + * KeyValue -- can be destroyed by subsequent calls to this method. + */ + protected Cell getNextRow(final Cell key, + final NavigableSet<Cell> set) { + Cell result = null; + SortedSet<Cell> tail = key == null? set: set.tailSet(key); + // Iterate until we fall into the next row; i.e. move off current row + for (Cell cell: tail) { + if (comparator.compareRows(cell, key) <= 0) { + continue; + } + // Note: Not suppressing deletes or expired cells. Needs to be handled + // by higher up functions. + result = cell; + break; + } + return result; + } + + /** + * Given the specs of a column, update it, first by inserting a new record, + * then removing the old one. Since there is only 1 KeyValue involved, the memstoreTS + * will be set to 0, thus ensuring that they instantly appear to anyone. The underlying + * store will ensure that the insert/delete each are atomic. A scanner/reader will either + * get the new value, or the old value and all readers will eventually only see the new + * value after the old was removed. + */ + @VisibleForTesting + @Override + public long updateColumnValue(byte[] row, byte[] family, byte[] qualifier, + long newValue, long now) { + Cell firstCell = KeyValueUtil.createFirstOnRow(row, family, qualifier); + // Is there a Cell in 'snapshot' with the same TS? If so, upgrade the timestamp a bit. + Cell snc = snapshot.getFirstAfter(firstCell); + if(snc != null) { + // is there a matching Cell in the snapshot? + if (CellUtil.matchingRow(snc, firstCell) && CellUtil.matchingQualifier(snc, firstCell)) { + if (snc.getTimestamp() == now) { + now += 1; + } + } + } + // logic here: the new ts MUST be at least 'now'. But it could be larger if necessary. + // But the timestamp should also be max(now, mostRecentTsInMemstore) + + // so we cant add the new Cell w/o knowing what's there already, but we also + // want to take this chance to delete some cells. So two loops (sad) + + SortedSet<Cell> ss = getActive().tailSet(firstCell); + for (Cell cell : ss) { + // if this isnt the row we are interested in, then bail: + if (!CellUtil.matchingColumn(cell, family, qualifier) + || !CellUtil.matchingRow(cell, firstCell)) { + break; // rows dont match, bail. + } + + // if the qualifier matches and it's a put, just RM it out of the active. + if (cell.getTypeByte() == KeyValue.Type.Put.getCode() && + cell.getTimestamp() > now && CellUtil.matchingQualifier(firstCell, cell)) { + now = cell.getTimestamp(); + } + } + + // create or update (upsert) a new Cell with + // 'now' and a 0 memstoreTS == immediately visible + List<Cell> cells = new ArrayList<Cell>(1); + cells.add(new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue))); + return upsert(cells, 1L); + } + + private Cell maybeCloneWithAllocator(Cell cell) { + return active.maybeCloneWithAllocator(cell); + } + + /** + * Internal version of add() that doesn't clone Cells with the + * allocator, and doesn't take the lock. + * + * Callers should ensure they already have the read lock taken + */ + private long internalAdd(final Cell toAdd) { + long s = active.add(toAdd); + setOldestEditTimeToNow(); + checkActiveSize(); + return s; + } + + private void setOldestEditTimeToNow() { + if (timeOfOldestEdit == Long.MAX_VALUE) { + timeOfOldestEdit = EnvironmentEdgeManager.currentTime(); + } + } + + protected long keySize() { + return heapSize() - DEEP_OVERHEAD; + } + + protected CellComparator getComparator() { + return comparator; + } + + protected MutableSegment getActive() { + return active; + } + + protected ImmutableSegment getSnapshot() { + return snapshot; + } + + protected AbstractMemStore setSnapshot(ImmutableSegment snapshot) { + this.snapshot = snapshot; + return this; + } + + protected void setSnapshotSize(long snapshotSize) { + getSnapshot().setSize(snapshotSize); + } + + /** + * Check whether anything need to be done based on the current active set size + */ + protected abstract void checkActiveSize(); + + /** + * Returns a list of Store segment scanners, one per each store segment + * @param readPt the version number required to initialize the scanners + * @return a list of Store segment scanners, one per each store segment + */ + protected abstract List<SegmentScanner> getListOfScanners(long readPt) throws IOException; + + /** + * Returns an ordered list of segments from most recent to oldest in memstore + * @return an ordered list of segments from most recent to oldest in memstore + */ + protected abstract List<Segment> getListOfSegments() throws IOException; + + public long getActiveSize() { + return getActive().getSize(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/25dfc112/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java new file mode 100644 index 0000000..4433302 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java @@ -0,0 +1,183 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.Collection; +import java.util.Comparator; +import java.util.Iterator; +import java.util.NavigableSet; +import java.util.SortedSet; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * A {@link java.util.Set} of {@link Cell}s, where an add will overwrite the entry if already + * exists in the set. The call to add returns true if no value in the backing map or false if + * there was an entry with same key (though value may be different). + * implementation is tolerant of concurrent get and set and won't throw + * ConcurrentModificationException when iterating. + */ +@InterfaceAudience.Private +public class CellSet implements NavigableSet<Cell> { + // Implemented on top of a {@link java.util.concurrent.ConcurrentSkipListMap} + // Differ from CSLS in one respect, where CSLS does "Adds the specified element to this set if it + // is not already present.", this implementation "Adds the specified element to this set EVEN + // if it is already present overwriting what was there previous". + // Otherwise, has same attributes as ConcurrentSkipListSet + private final ConcurrentNavigableMap<Cell, Cell> delegatee; + + CellSet(final CellComparator c) { + this.delegatee = new ConcurrentSkipListMap<Cell, Cell>(c); + } + + CellSet(final ConcurrentNavigableMap<Cell, Cell> m) { + this.delegatee = m; + } + + public Cell ceiling(Cell e) { + throw new UnsupportedOperationException("Not implemented"); + } + + public Iterator<Cell> descendingIterator() { + return this.delegatee.descendingMap().values().iterator(); + } + + public NavigableSet<Cell> descendingSet() { + throw new UnsupportedOperationException("Not implemented"); + } + + public Cell floor(Cell e) { + throw new UnsupportedOperationException("Not implemented"); + } + + public SortedSet<Cell> headSet(final Cell toElement) { + return headSet(toElement, false); + } + + public NavigableSet<Cell> headSet(final Cell toElement, + boolean inclusive) { + return new CellSet(this.delegatee.headMap(toElement, inclusive)); + } + + public Cell higher(Cell e) { + throw new UnsupportedOperationException("Not implemented"); + } + + public Iterator<Cell> iterator() { + return this.delegatee.values().iterator(); + } + + public Cell lower(Cell e) { + throw new UnsupportedOperationException("Not implemented"); + } + + public Cell pollFirst() { + throw new UnsupportedOperationException("Not implemented"); + } + + public Cell pollLast() { + throw new UnsupportedOperationException("Not implemented"); + } + + public SortedSet<Cell> subSet(Cell fromElement, Cell toElement) { + throw new UnsupportedOperationException("Not implemented"); + } + + public NavigableSet<Cell> subSet(Cell fromElement, + boolean fromInclusive, Cell toElement, boolean toInclusive) { + throw new UnsupportedOperationException("Not implemented"); + } + + public SortedSet<Cell> tailSet(Cell fromElement) { + return tailSet(fromElement, true); + } + + public NavigableSet<Cell> tailSet(Cell fromElement, boolean inclusive) { + return new CellSet(this.delegatee.tailMap(fromElement, inclusive)); + } + + public Comparator<? super Cell> comparator() { + throw new UnsupportedOperationException("Not implemented"); + } + + public Cell first() { + return this.delegatee.get(this.delegatee.firstKey()); + } + + public Cell last() { + return this.delegatee.get(this.delegatee.lastKey()); + } + + public boolean add(Cell e) { + return this.delegatee.put(e, e) == null; + } + + public boolean addAll(Collection<? extends Cell> c) { + throw new UnsupportedOperationException("Not implemented"); + } + + public void clear() { + this.delegatee.clear(); + } + + public boolean contains(Object o) { + //noinspection SuspiciousMethodCalls + return this.delegatee.containsKey(o); + } + + public boolean containsAll(Collection<?> c) { + throw new UnsupportedOperationException("Not implemented"); + } + + public boolean isEmpty() { + return this.delegatee.isEmpty(); + } + + public boolean remove(Object o) { + return this.delegatee.remove(o) != null; + } + + public boolean removeAll(Collection<?> c) { + throw new UnsupportedOperationException("Not implemented"); + } + + public boolean retainAll(Collection<?> c) { + throw new UnsupportedOperationException("Not implemented"); + } + + public Cell get(Cell kv) { + return this.delegatee.get(kv); + } + + public int size() { + return this.delegatee.size(); + } + + public Object[] toArray() { + throw new UnsupportedOperationException("Not implemented"); + } + + public <T> T[] toArray(T[] a) { + throw new UnsupportedOperationException("Not implemented"); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/25dfc112/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSkipListSet.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSkipListSet.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSkipListSet.java deleted file mode 100644 index e9941b3..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSkipListSet.java +++ /dev/null @@ -1,185 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import java.util.Collection; -import java.util.Comparator; -import java.util.Iterator; -import java.util.NavigableSet; -import java.util.SortedSet; -import java.util.concurrent.ConcurrentNavigableMap; -import java.util.concurrent.ConcurrentSkipListMap; - -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellComparator; -import org.apache.hadoop.hbase.classification.InterfaceAudience; - -/** - * A {@link java.util.Set} of {@link Cell}s implemented on top of a - * {@link java.util.concurrent.ConcurrentSkipListMap}. Works like a - * {@link java.util.concurrent.ConcurrentSkipListSet} in all but one regard: - * An add will overwrite if already an entry for the added key. In other words, - * where CSLS does "Adds the specified element to this set if it is not already - * present.", this implementation "Adds the specified element to this set EVEN - * if it is already present overwriting what was there previous". The call to - * add returns true if no value in the backing map or false if there was an - * entry with same key (though value may be different). - * <p>Otherwise, - * has same attributes as ConcurrentSkipListSet: e.g. tolerant of concurrent - * get and set and won't throw ConcurrentModificationException when iterating. - */ -@InterfaceAudience.Private -public class CellSkipListSet implements NavigableSet<Cell> { - private final ConcurrentNavigableMap<Cell, Cell> delegatee; - - CellSkipListSet(final CellComparator c) { - this.delegatee = new ConcurrentSkipListMap<Cell, Cell>(c); - } - - CellSkipListSet(final ConcurrentNavigableMap<Cell, Cell> m) { - this.delegatee = m; - } - - public Cell ceiling(Cell e) { - throw new UnsupportedOperationException("Not implemented"); - } - - public Iterator<Cell> descendingIterator() { - return this.delegatee.descendingMap().values().iterator(); - } - - public NavigableSet<Cell> descendingSet() { - throw new UnsupportedOperationException("Not implemented"); - } - - public Cell floor(Cell e) { - throw new UnsupportedOperationException("Not implemented"); - } - - public SortedSet<Cell> headSet(final Cell toElement) { - return headSet(toElement, false); - } - - public NavigableSet<Cell> headSet(final Cell toElement, - boolean inclusive) { - return new CellSkipListSet(this.delegatee.headMap(toElement, inclusive)); - } - - public Cell higher(Cell e) { - throw new UnsupportedOperationException("Not implemented"); - } - - public Iterator<Cell> iterator() { - return this.delegatee.values().iterator(); - } - - public Cell lower(Cell e) { - throw new UnsupportedOperationException("Not implemented"); - } - - public Cell pollFirst() { - throw new UnsupportedOperationException("Not implemented"); - } - - public Cell pollLast() { - throw new UnsupportedOperationException("Not implemented"); - } - - public SortedSet<Cell> subSet(Cell fromElement, Cell toElement) { - throw new UnsupportedOperationException("Not implemented"); - } - - public NavigableSet<Cell> subSet(Cell fromElement, - boolean fromInclusive, Cell toElement, boolean toInclusive) { - throw new UnsupportedOperationException("Not implemented"); - } - - public SortedSet<Cell> tailSet(Cell fromElement) { - return tailSet(fromElement, true); - } - - public NavigableSet<Cell> tailSet(Cell fromElement, boolean inclusive) { - return new CellSkipListSet(this.delegatee.tailMap(fromElement, inclusive)); - } - - public Comparator<? super Cell> comparator() { - throw new UnsupportedOperationException("Not implemented"); - } - - public Cell first() { - return this.delegatee.get(this.delegatee.firstKey()); - } - - public Cell last() { - return this.delegatee.get(this.delegatee.lastKey()); - } - - public boolean add(Cell e) { - return this.delegatee.put(e, e) == null; - } - - public boolean addAll(Collection<? extends Cell> c) { - throw new UnsupportedOperationException("Not implemented"); - } - - public void clear() { - this.delegatee.clear(); - } - - public boolean contains(Object o) { - //noinspection SuspiciousMethodCalls - return this.delegatee.containsKey(o); - } - - public boolean containsAll(Collection<?> c) { - throw new UnsupportedOperationException("Not implemented"); - } - - public boolean isEmpty() { - return this.delegatee.isEmpty(); - } - - public boolean remove(Object o) { - return this.delegatee.remove(o) != null; - } - - public boolean removeAll(Collection<?> c) { - throw new UnsupportedOperationException("Not implemented"); - } - - public boolean retainAll(Collection<?> c) { - throw new UnsupportedOperationException("Not implemented"); - } - - public Cell get(Cell kv) { - return this.delegatee.get(kv); - } - - public int size() { - return this.delegatee.size(); - } - - public Object[] toArray() { - throw new UnsupportedOperationException("Not implemented"); - } - - public <T> T[] toArray(T[] a) { - throw new UnsupportedOperationException("Not implemented"); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/25dfc112/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index f61d871..82d40b6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -19,35 +19,22 @@ package org.apache.hadoop.hbase.regionserver; +import java.io.IOException; import java.lang.management.ManagementFactory; import java.lang.management.RuntimeMXBean; import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; import java.util.List; -import java.util.NavigableSet; -import java.util.SortedSet; -import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; -import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.io.TimeRange; -import org.apache.hadoop.hbase.util.ByteRange; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.ClassSize; -import org.apache.hadoop.hbase.util.CollectionBackedScanner; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.ReflectionUtils; -import org.apache.htrace.Trace; /** * The MemStore holds in-memory modifications to the Store. Modifications @@ -66,40 +53,8 @@ import org.apache.htrace.Trace; * in KV size. */ @InterfaceAudience.Private -public class DefaultMemStore implements MemStore { +public class DefaultMemStore extends AbstractMemStore { private static final Log LOG = LogFactory.getLog(DefaultMemStore.class); - static final String USEMSLAB_KEY = "hbase.hregion.memstore.mslab.enabled"; - private static final boolean USEMSLAB_DEFAULT = true; - static final String MSLAB_CLASS_NAME = "hbase.regionserver.mslab.class"; - - private Configuration conf; - - // MemStore. Use a CellSkipListSet rather than SkipListSet because of the - // better semantics. The Map will overwrite if passed a key it already had - // whereas the Set will not add new Cell if key is same though value might be - // different. Value is not important -- just make sure always same - // reference passed. - volatile CellSkipListSet cellSet; - - // Snapshot of memstore. Made for flusher. - volatile CellSkipListSet snapshot; - - final CellComparator comparator; - - // Used to track own heapSize - final AtomicLong size; - private volatile long snapshotSize; - - // Used to track when to flush - volatile long timeOfOldestEdit = Long.MAX_VALUE; - - TimeRangeTracker timeRangeTracker; - TimeRangeTracker snapshotTimeRangeTracker; - - volatile MemStoreLAB allocator; - volatile MemStoreLAB snapshotAllocator; - volatile long snapshotId; - volatile boolean tagsPresent; /** * Default constructor. Used for tests. @@ -112,183 +67,54 @@ public class DefaultMemStore implements MemStore { * Constructor. * @param c Comparator */ - public DefaultMemStore(final Configuration conf, - final CellComparator c) { - this.conf = conf; - this.comparator = c; - this.cellSet = new CellSkipListSet(c); - this.snapshot = new CellSkipListSet(c); - timeRangeTracker = new TimeRangeTracker(); - snapshotTimeRangeTracker = new TimeRangeTracker(); - this.size = new AtomicLong(DEEP_OVERHEAD); - this.snapshotSize = 0; - if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) { - String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName()); - this.allocator = ReflectionUtils.instantiateWithCustomCtor(className, - new Class[] { Configuration.class }, new Object[] { conf }); - } else { - this.allocator = null; - } + public DefaultMemStore(final Configuration conf, final CellComparator c) { + super(conf, c); } void dump() { - for (Cell cell: this.cellSet) { - LOG.info(cell); - } - for (Cell cell: this.snapshot) { - LOG.info(cell); - } + super.dump(LOG); } /** * Creates a snapshot of the current memstore. * Snapshot must be cleared by call to {@link #clearSnapshot(long)} + * @param flushOpSeqId the sequence id that is attached to the flush operation in the wal */ @Override - public MemStoreSnapshot snapshot() { + public MemStoreSnapshot snapshot(long flushOpSeqId) { // If snapshot currently has entries, then flusher failed or didn't call // cleanup. Log a warning. - if (!this.snapshot.isEmpty()) { + if (!getSnapshot().isEmpty()) { LOG.warn("Snapshot called again without clearing previous. " + "Doing nothing. Another ongoing flush or did we fail last attempt?"); } else { this.snapshotId = EnvironmentEdgeManager.currentTime(); - this.snapshotSize = keySize(); - if (!this.cellSet.isEmpty()) { - this.snapshot = this.cellSet; - this.cellSet = new CellSkipListSet(this.comparator); - this.snapshotTimeRangeTracker = this.timeRangeTracker; - this.timeRangeTracker = new TimeRangeTracker(); - // Reset heap to not include any keys - this.size.set(DEEP_OVERHEAD); - this.snapshotAllocator = this.allocator; - // Reset allocator so we get a fresh buffer for the new memstore - if (allocator != null) { - String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName()); - this.allocator = ReflectionUtils.instantiateWithCustomCtor(className, - new Class[] { Configuration.class }, new Object[] { conf }); - } - timeOfOldestEdit = Long.MAX_VALUE; + if (!getActive().isEmpty()) { + ImmutableSegment immutableSegment = SegmentFactory.instance(). + createImmutableSegment(getConfiguration(), getActive()); + setSnapshot(immutableSegment); + setSnapshotSize(keySize()); + resetCellSet(); } } - MemStoreSnapshot memStoreSnapshot = new MemStoreSnapshot(this.snapshotId, snapshot.size(), this.snapshotSize, - this.snapshotTimeRangeTracker, new CollectionBackedScanner(snapshot, this.comparator), - this.tagsPresent); - this.tagsPresent = false; - return memStoreSnapshot; - } - - /** - * The passed snapshot was successfully persisted; it can be let go. - * @param id Id of the snapshot to clean out. - * @throws UnexpectedStateException - * @see #snapshot() - */ - @Override - public void clearSnapshot(long id) throws UnexpectedStateException { - MemStoreLAB tmpAllocator = null; - if (this.snapshotId != id) { - throw new UnexpectedStateException("Current snapshot id is " + this.snapshotId + ",passed " - + id); - } - // OK. Passed in snapshot is same as current snapshot. If not-empty, - // create a new snapshot and let the old one go. - if (!this.snapshot.isEmpty()) { - this.snapshot = new CellSkipListSet(this.comparator); - this.snapshotTimeRangeTracker = new TimeRangeTracker(); - } - this.snapshotSize = 0; - this.snapshotId = -1; - if (this.snapshotAllocator != null) { - tmpAllocator = this.snapshotAllocator; - this.snapshotAllocator = null; - } - if (tmpAllocator != null) { - tmpAllocator.close(); - } - } - - @Override - public long getFlushableSize() { - return this.snapshotSize > 0 ? this.snapshotSize : keySize(); - } + return new MemStoreSnapshot(this.snapshotId, getSnapshot()); - @Override - public long getSnapshotSize() { - return this.snapshotSize; } - /** - * Write an update - * @param cell - * @return approximate size of the passed Cell. - */ @Override - public long add(Cell cell) { - Cell toAdd = maybeCloneWithAllocator(cell); - return internalAdd(toAdd); + protected List<SegmentScanner> getListOfScanners(long readPt) throws IOException { + List<SegmentScanner> list = new ArrayList<SegmentScanner>(2); + list.add(0, getActive().getSegmentScanner(readPt)); + list.add(1, getSnapshot().getSegmentScanner(readPt)); + return list; } @Override - public long timeOfOldestEdit() { - return timeOfOldestEdit; - } - - private boolean addToCellSet(Cell e) { - boolean b = this.cellSet.add(e); - // In no tags case this NoTagsKeyValue.getTagsLength() is a cheap call. - // When we use ACL CP or Visibility CP which deals with Tags during - // mutation, the TagRewriteCell.getTagsLength() is a cheaper call. We do not - // parse the byte[] to identify the tags length. - if(e.getTagsLength() > 0) { - tagsPresent = true; - } - setOldestEditTimeToNow(); - return b; - } - - private boolean removeFromCellSet(Cell e) { - boolean b = this.cellSet.remove(e); - setOldestEditTimeToNow(); - return b; - } - - void setOldestEditTimeToNow() { - if (timeOfOldestEdit == Long.MAX_VALUE) { - timeOfOldestEdit = EnvironmentEdgeManager.currentTime(); - } - } - - /** - * Internal version of add() that doesn't clone Cells with the - * allocator, and doesn't take the lock. - * - * Callers should ensure they already have the read lock taken - */ - private long internalAdd(final Cell toAdd) { - long s = heapSizeChange(toAdd, addToCellSet(toAdd)); - timeRangeTracker.includeTimestamp(toAdd); - this.size.addAndGet(s); - return s; - } - - private Cell maybeCloneWithAllocator(Cell cell) { - if (allocator == null) { - return cell; - } - - int len = KeyValueUtil.length(cell); - ByteRange alloc = allocator.allocateBytes(len); - if (alloc == null) { - // The allocation was too large, allocator decided - // not to do anything with it. - return cell; - } - assert alloc.getBytes() != null; - KeyValueUtil.appendToByteArray(cell, alloc.getBytes(), alloc.getOffset()); - KeyValue newKv = new KeyValue(alloc.getBytes(), alloc.getOffset(), len); - newKv.setSequenceId(cell.getSequenceId()); - return newKv; + protected List<Segment> getListOfSegments() throws IOException { + List<Segment> list = new ArrayList<Segment>(2); + list.add(0, getActive()); + list.add(1, getSnapshot()); + return list; } /** @@ -301,39 +127,8 @@ public class DefaultMemStore implements MemStore { */ @Override public void rollback(Cell cell) { - // If the key is in the snapshot, delete it. We should not update - // this.size, because that tracks the size of only the memstore and - // not the snapshot. The flush of this snapshot to disk has not - // yet started because Store.flush() waits for all rwcc transactions to - // commit before starting the flush to disk. - Cell found = this.snapshot.get(cell); - if (found != null && found.getSequenceId() == cell.getSequenceId()) { - this.snapshot.remove(cell); - long sz = heapSizeChange(cell, true); - this.snapshotSize -= sz; - } - // If the key is in the memstore, delete it. Update this.size. - found = this.cellSet.get(cell); - if (found != null && found.getSequenceId() == cell.getSequenceId()) { - removeFromCellSet(cell); - long s = heapSizeChange(cell, true); - this.size.addAndGet(-s); - } - } - - /** - * Write a delete - * @param deleteCell - * @return approximate size of the passed key and value. - */ - @Override - public long delete(Cell deleteCell) { - long s = 0; - Cell toAdd = maybeCloneWithAllocator(deleteCell); - s += heapSizeChange(toAdd, addToCellSet(toAdd)); - timeRangeTracker.includeTimestamp(toAdd); - this.size.addAndGet(s); - return s; + rollbackInSnapshot(cell); + rollbackInActive(cell); } /** @@ -342,604 +137,29 @@ public class DefaultMemStore implements MemStore { * @return Next row or null if none found. */ Cell getNextRow(final Cell cell) { - return getLowest(getNextRow(cell, this.cellSet), getNextRow(cell, this.snapshot)); - } - - /* - * @param a - * @param b - * @return Return lowest of a or b or null if both a and b are null - */ - private Cell getLowest(final Cell a, final Cell b) { - if (a == null) { - return b; - } - if (b == null) { - return a; - } - return comparator.compareRows(a, b) <= 0? a: b; + return getLowest( + getNextRow(cell, getActive().getCellSet()), + getNextRow(cell, getSnapshot().getCellSet())); } - /* - * @param key Find row that follows this one. If null, return first. - * @param map Set to look in for a row beyond <code>row</code>. - * @return Next row or null if none found. If one found, will be a new - * KeyValue -- can be destroyed by subsequent calls to this method. - */ - private Cell getNextRow(final Cell key, - final NavigableSet<Cell> set) { - Cell result = null; - SortedSet<Cell> tail = key == null? set: set.tailSet(key); - // Iterate until we fall into the next row; i.e. move off current row - for (Cell cell: tail) { - if (comparator.compareRows(cell, key) <= 0) - continue; - // Note: Not suppressing deletes or expired cells. Needs to be handled - // by higher up functions. - result = cell; - break; - } - return result; + @Override public void updateLowestUnflushedSequenceIdInWal(boolean onlyIfMoreRecent) { } /** - * Only used by tests. TODO: Remove - * - * Given the specs of a column, update it, first by inserting a new record, - * then removing the old one. Since there is only 1 KeyValue involved, the memstoreTS - * will be set to 0, thus ensuring that they instantly appear to anyone. The underlying - * store will ensure that the insert/delete each are atomic. A scanner/reader will either - * get the new value, or the old value and all readers will eventually only see the new - * value after the old was removed. - * - * @param row - * @param family - * @param qualifier - * @param newValue - * @param now - * @return Timestamp + * @return Total memory occupied by this MemStore. */ @Override - public long updateColumnValue(byte[] row, - byte[] family, - byte[] qualifier, - long newValue, - long now) { - Cell firstCell = KeyValueUtil.createFirstOnRow(row, family, qualifier); - // Is there a Cell in 'snapshot' with the same TS? If so, upgrade the timestamp a bit. - SortedSet<Cell> snSs = snapshot.tailSet(firstCell); - if (!snSs.isEmpty()) { - Cell snc = snSs.first(); - // is there a matching Cell in the snapshot? - if (CellUtil.matchingRow(snc, firstCell) && CellUtil.matchingQualifier(snc, firstCell)) { - if (snc.getTimestamp() == now) { - // poop, - now += 1; - } - } - } - - // logic here: the new ts MUST be at least 'now'. But it could be larger if necessary. - // But the timestamp should also be max(now, mostRecentTsInMemstore) - - // so we cant add the new Cell w/o knowing what's there already, but we also - // want to take this chance to delete some cells. So two loops (sad) - - SortedSet<Cell> ss = cellSet.tailSet(firstCell); - for (Cell cell : ss) { - // if this isnt the row we are interested in, then bail: - if (!CellUtil.matchingColumn(cell, family, qualifier) - || !CellUtil.matchingRow(cell, firstCell)) { - break; // rows dont match, bail. - } - - // if the qualifier matches and it's a put, just RM it out of the cellSet. - if (cell.getTypeByte() == KeyValue.Type.Put.getCode() && - cell.getTimestamp() > now && CellUtil.matchingQualifier(firstCell, cell)) { - now = cell.getTimestamp(); - } - } - - // create or update (upsert) a new Cell with - // 'now' and a 0 memstoreTS == immediately visible - List<Cell> cells = new ArrayList<Cell>(1); - cells.add(new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue))); - return upsert(cells, 1L); - } - - /** - * Update or insert the specified KeyValues. - * <p> - * For each KeyValue, insert into MemStore. This will atomically upsert the - * value for that row/family/qualifier. If a KeyValue did already exist, - * it will then be removed. - * <p> - * This is called under row lock, so Get operations will still see updates - * atomically. Scans will only see each KeyValue update as atomic. - * - * @param readpoint readpoint below which we can safely remove duplicate KVs - * @return change in memstore size - */ - @Override - public long upsert(Iterable<Cell> cells, long readpoint) { - long size = 0; - for (Cell cell : cells) { - size += upsert(cell, readpoint); - } - return size; - } - - /** - * Inserts the specified KeyValue into MemStore and deletes any existing - * versions of the same row/family/qualifier as the specified KeyValue. - * <p> - * First, the specified KeyValue is inserted into the Memstore. - * <p> - * If there are any existing KeyValues in this MemStore with the same row, - * family, and qualifier, they are removed. - * <p> - * Callers must hold the read lock. - * @param readpoint Smallest outstanding readpoint; below which we can remove duplicate Cells. - * @return change in size of MemStore - */ - private long upsert(Cell cell, long readpoint) { - // Add the Cell to the MemStore - // Use the internalAdd method here since we (a) already have a lock - // and (b) cannot safely use the MSLAB here without potentially - // hitting OOME - see TestMemStore.testUpsertMSLAB for a - // test that triggers the pathological case if we don't avoid MSLAB - // here. - long addedSize = internalAdd(cell); - - // Get the Cells for the row/family/qualifier regardless of timestamp. - // For this case we want to clean up any other puts - Cell firstCell = KeyValueUtil.createFirstOnRow( - cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), - cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), - cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); - SortedSet<Cell> ss = cellSet.tailSet(firstCell); - Iterator<Cell> it = ss.iterator(); - // Versions visible to oldest scanner. - int versionsVisible = 0; - while ( it.hasNext() ) { - Cell cur = it.next(); - - if (cell == cur) { - // ignore the one just put in - continue; - } - // check that this is the row and column we are interested in, otherwise bail - if (CellUtil.matchingRow(cell, cur) && CellUtil.matchingQualifier(cell, cur)) { - // only remove Puts that concurrent scanners cannot possibly see - if (cur.getTypeByte() == KeyValue.Type.Put.getCode() && - cur.getSequenceId() <= readpoint) { - if (versionsVisible >= 1) { - // if we get here we have seen at least one version visible to the oldest scanner, - // which means we can prove that no scanner will see this version - - // false means there was a change, so give us the size. - long delta = heapSizeChange(cur, true); - addedSize -= delta; - this.size.addAndGet(-delta); - it.remove(); - setOldestEditTimeToNow(); - } else { - versionsVisible++; - } - } - } else { - // past the row or column, done - break; - } - } - return addedSize; - } - - /** - * @return scanner on memstore and snapshot in this order. - */ - @Override - public List<KeyValueScanner> getScanners(long readPt) { - return Collections.<KeyValueScanner> singletonList(new MemStoreScanner(readPt)); - } - - /** - * Check if this memstore may contain the required keys - * @param scan scan - * @param store holds reference to cf - * @param oldestUnexpiredTS - * @return False if the key definitely does not exist in this Memstore - */ - public boolean shouldSeek(Scan scan, Store store, long oldestUnexpiredTS) { - byte[] cf = store.getFamily().getName(); - TimeRange timeRange = scan.getColumnFamilyTimeRange().get(cf); - if (timeRange == null) { - timeRange = scan.getTimeRange(); - } - return (timeRangeTracker.includesTimeRange(timeRange) || - snapshotTimeRangeTracker.includesTimeRange(timeRange)) - && (Math.max(timeRangeTracker.getMaximumTimestamp(), - snapshotTimeRangeTracker.getMaximumTimestamp()) >= - oldestUnexpiredTS); - } - - /* - * MemStoreScanner implements the KeyValueScanner. - * It lets the caller scan the contents of a memstore -- both current - * map and snapshot. - * This behaves as if it were a real scanner but does not maintain position. - */ - protected class MemStoreScanner extends NonLazyKeyValueScanner { - // Next row information for either cellSet or snapshot - private Cell cellSetNextRow = null; - private Cell snapshotNextRow = null; - - // last iterated Cells for cellSet and snapshot (to restore iterator state after reseek) - private Cell cellSetItRow = null; - private Cell snapshotItRow = null; - - // iterator based scanning. - private Iterator<Cell> cellSetIt; - private Iterator<Cell> snapshotIt; - - // The cellSet and snapshot at the time of creating this scanner - private CellSkipListSet cellSetAtCreation; - private CellSkipListSet snapshotAtCreation; - - // the pre-calculated Cell to be returned by peek() or next() - private Cell theNext; - - // The allocator and snapshot allocator at the time of creating this scanner - volatile MemStoreLAB allocatorAtCreation; - volatile MemStoreLAB snapshotAllocatorAtCreation; - - // A flag represents whether could stop skipping Cells for MVCC - // if have encountered the next row. Only used for reversed scan - private boolean stopSkippingCellsIfNextRow = false; - - private long readPoint; - - /* - Some notes... - - So memstorescanner is fixed at creation time. this includes pointers/iterators into - existing kvset/snapshot. during a snapshot creation, the kvset is null, and the - snapshot is moved. since kvset is null there is no point on reseeking on both, - we can save us the trouble. During the snapshot->hfile transition, the memstore - scanner is re-created by StoreScanner#updateReaders(). StoreScanner should - potentially do something smarter by adjusting the existing memstore scanner. - - But there is a greater problem here, that being once a scanner has progressed - during a snapshot scenario, we currently iterate past the kvset then 'finish' up. - if a scan lasts a little while, there is a chance for new entries in kvset to - become available but we will never see them. This needs to be handled at the - StoreScanner level with coordination with MemStoreScanner. - - Currently, this problem is only partly managed: during the small amount of time - when the StoreScanner has not yet created a new MemStoreScanner, we will miss - the adds to kvset in the MemStoreScanner. - */ - - MemStoreScanner(long readPoint) { - super(); - - this.readPoint = readPoint; - cellSetAtCreation = cellSet; - snapshotAtCreation = snapshot; - if (allocator != null) { - this.allocatorAtCreation = allocator; - this.allocatorAtCreation.incScannerCount(); - } - if (snapshotAllocator != null) { - this.snapshotAllocatorAtCreation = snapshotAllocator; - this.snapshotAllocatorAtCreation.incScannerCount(); - } - if (Trace.isTracing() && Trace.currentSpan() != null) { - Trace.currentSpan().addTimelineAnnotation("Creating MemStoreScanner"); - } - } - - /** - * Lock on 'this' must be held by caller. - * @param it - * @return Next Cell - */ - private Cell getNext(Iterator<Cell> it) { - Cell startCell = theNext; - Cell v = null; - try { - while (it.hasNext()) { - v = it.next(); - if (v.getSequenceId() <= this.readPoint) { - return v; - } - if (stopSkippingCellsIfNextRow && startCell != null - && comparator.compareRows(v, startCell) > 0) { - return null; - } - } - - return null; - } finally { - if (v != null) { - // in all cases, remember the last Cell iterated to - if (it == snapshotIt) { - snapshotItRow = v; - } else { - cellSetItRow = v; - } - } - } - } - - /** - * Set the scanner at the seek key. - * Must be called only once: there is no thread safety between the scanner - * and the memStore. - * @param key seek value - * @return false if the key is null or if there is no data - */ - @Override - public synchronized boolean seek(Cell key) { - if (key == null) { - close(); - return false; - } - // kvset and snapshot will never be null. - // if tailSet can't find anything, SortedSet is empty (not null). - cellSetIt = cellSetAtCreation.tailSet(key).iterator(); - snapshotIt = snapshotAtCreation.tailSet(key).iterator(); - cellSetItRow = null; - snapshotItRow = null; - - return seekInSubLists(key); - } - - - /** - * (Re)initialize the iterators after a seek or a reseek. - */ - private synchronized boolean seekInSubLists(Cell key){ - cellSetNextRow = getNext(cellSetIt); - snapshotNextRow = getNext(snapshotIt); - - // Calculate the next value - theNext = getLowest(cellSetNextRow, snapshotNextRow); - - // has data - return (theNext != null); - } - - - /** - * Move forward on the sub-lists set previously by seek. - * @param key seek value (should be non-null) - * @return true if there is at least one KV to read, false otherwise - */ - @Override - public synchronized boolean reseek(Cell key) { - /* - See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation. - This code is executed concurrently with flush and puts, without locks. - Two points must be known when working on this code: - 1) It's not possible to use the 'kvTail' and 'snapshot' - variables, as they are modified during a flush. - 2) The ideal implementation for performance would use the sub skip list - implicitly pointed by the iterators 'kvsetIt' and - 'snapshotIt'. Unfortunately the Java API does not offer a method to - get it. So we remember the last keys we iterated to and restore - the reseeked set to at least that point. - */ - cellSetIt = cellSetAtCreation.tailSet(getHighest(key, cellSetItRow)).iterator(); - snapshotIt = snapshotAtCreation.tailSet(getHighest(key, snapshotItRow)).iterator(); - - return seekInSubLists(key); - } - - - @Override - public synchronized Cell peek() { - //DebugPrint.println(" MS@" + hashCode() + " peek = " + getLowest()); - return theNext; - } - - @Override - public synchronized Cell next() { - if (theNext == null) { - return null; - } - - final Cell ret = theNext; - - // Advance one of the iterators - if (theNext == cellSetNextRow) { - cellSetNextRow = getNext(cellSetIt); - } else { - snapshotNextRow = getNext(snapshotIt); - } - - // Calculate the next value - theNext = getLowest(cellSetNextRow, snapshotNextRow); - - //long readpoint = ReadWriteConsistencyControl.getThreadReadPoint(); - //DebugPrint.println(" MS@" + hashCode() + " next: " + theNext + " next_next: " + - // getLowest() + " threadpoint=" + readpoint); - return ret; - } - - /* - * Returns the lower of the two key values, or null if they are both null. - * This uses comparator.compare() to compare the KeyValue using the memstore - * comparator. - */ - private Cell getLowest(Cell first, Cell second) { - if (first == null && second == null) { - return null; - } - if (first != null && second != null) { - int compare = comparator.compare(first, second); - return (compare <= 0 ? first : second); - } - return (first != null ? first : second); - } - - /* - * Returns the higher of the two cells, or null if they are both null. - * This uses comparator.compare() to compare the Cell using the memstore - * comparator. - */ - private Cell getHighest(Cell first, Cell second) { - if (first == null && second == null) { - return null; - } - if (first != null && second != null) { - int compare = comparator.compare(first, second); - return (compare > 0 ? first : second); - } - return (first != null ? first : second); - } - - public synchronized void close() { - this.cellSetNextRow = null; - this.snapshotNextRow = null; - - this.cellSetIt = null; - this.snapshotIt = null; - - if (allocatorAtCreation != null) { - this.allocatorAtCreation.decScannerCount(); - this.allocatorAtCreation = null; - } - if (snapshotAllocatorAtCreation != null) { - this.snapshotAllocatorAtCreation.decScannerCount(); - this.snapshotAllocatorAtCreation = null; - } - - this.cellSetItRow = null; - this.snapshotItRow = null; - } - - /** - * MemStoreScanner returns max value as sequence id because it will - * always have the latest data among all files. - */ - @Override - public long getSequenceID() { - return Long.MAX_VALUE; - } - - @Override - public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) { - return shouldSeek(scan, store, oldestUnexpiredTS); - } - - /** - * Seek scanner to the given key first. If it returns false(means - * peek()==null) or scanner's peek row is bigger than row of given key, seek - * the scanner to the previous row of given key - */ - @Override - public synchronized boolean backwardSeek(Cell key) { - seek(key); - if (peek() == null || comparator.compareRows(peek(), key) > 0) { - return seekToPreviousRow(key); - } - return true; - } - - /** - * Separately get the KeyValue before the specified key from kvset and - * snapshotset, and use the row of higher one as the previous row of - * specified key, then seek to the first KeyValue of previous row - */ - @Override - public synchronized boolean seekToPreviousRow(Cell originalKey) { - boolean keepSeeking = false; - Cell key = originalKey; - do { - Cell firstKeyOnRow = CellUtil.createFirstOnRow(key); - SortedSet<Cell> cellHead = cellSetAtCreation.headSet(firstKeyOnRow); - Cell cellSetBeforeRow = cellHead.isEmpty() ? null : cellHead.last(); - SortedSet<Cell> snapshotHead = snapshotAtCreation - .headSet(firstKeyOnRow); - Cell snapshotBeforeRow = snapshotHead.isEmpty() ? null : snapshotHead - .last(); - Cell lastCellBeforeRow = getHighest(cellSetBeforeRow, snapshotBeforeRow); - if (lastCellBeforeRow == null) { - theNext = null; - return false; - } - Cell firstKeyOnPreviousRow = CellUtil.createFirstOnRow(lastCellBeforeRow); - this.stopSkippingCellsIfNextRow = true; - seek(firstKeyOnPreviousRow); - this.stopSkippingCellsIfNextRow = false; - if (peek() == null - || comparator.compareRows(peek(), firstKeyOnPreviousRow) > 0) { - keepSeeking = true; - key = firstKeyOnPreviousRow; - continue; - } else { - keepSeeking = false; - } - } while (keepSeeking); - return true; - } - - @Override - public synchronized boolean seekToLastRow() { - Cell first = cellSetAtCreation.isEmpty() ? null : cellSetAtCreation - .last(); - Cell second = snapshotAtCreation.isEmpty() ? null - : snapshotAtCreation.last(); - Cell higherCell = getHighest(first, second); - if (higherCell == null) { - return false; - } - Cell firstCellOnLastRow = CellUtil.createFirstOnRow(higherCell); - if (seek(firstCellOnLastRow)) { - return true; - } else { - return seekToPreviousRow(higherCell); - } - - } - } - - public final static long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT - + (9 * ClassSize.REFERENCE) + (3 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN); - - public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + - ClassSize.ATOMIC_LONG + (2 * ClassSize.TIMERANGE_TRACKER) + - (2 * ClassSize.CELL_SKIPLIST_SET) + (2 * ClassSize.CONCURRENT_SKIPLISTMAP)); - - /* - * Calculate how the MemStore size has changed. Includes overhead of the - * backing Map. - * @param cell - * @param notpresent True if the cell was NOT present in the set. - * @return Size - */ - static long heapSizeChange(final Cell cell, final boolean notpresent) { - return notpresent ? ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY - + CellUtil.estimatedHeapSizeOf(cell)) : 0; - } - - private long keySize() { - return heapSize() - DEEP_OVERHEAD; + public long size() { + return heapSize(); } /** - * Get the entire heap usage for this MemStore not including keys in the - * snapshot. + * Check whether anything need to be done based on the current active set size + * Nothing need to be done for the DefaultMemStore */ @Override - public long heapSize() { - return size.get(); - } - - @Override - public long size() { - return heapSize(); + protected void checkActiveSize() { + return; } /** @@ -978,9 +198,6 @@ public class DefaultMemStore implements MemStore { LOG.info("memstore2 estimated size=" + size); final int seconds = 30; LOG.info("Waiting " + seconds + " seconds while heap dump is taken"); - for (int i = 0; i < seconds; i++) { - // Thread.sleep(1000); - } LOG.info("Exiting."); } http://git-wip-us.apache.org/repos/asf/hbase/blob/25dfc112/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index c65326a..5c29fb4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -18,6 +18,13 @@ */ package org.apache.hadoop.hbase.regionserver; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableCollection; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + import java.io.IOException; import java.io.InterruptedIOException; import java.net.InetSocketAddress; @@ -91,13 +98,6 @@ import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableCollection; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - /** * A Store holds a column family in a Region. Its a memstore and a set of zero * or more StoreFiles, which stretch backwards over time. @@ -1636,7 +1636,7 @@ public class HStore implements Store { this.lock.readLock().unlock(); } - LOG.debug(getRegionInfo().getEncodedName() + " - " + getColumnFamilyName() + LOG.debug(getRegionInfo().getEncodedName() + " - " + getColumnFamilyName() + ": Initiating " + (request.isMajor() ? "major" : "minor") + " compaction" + (request.isAllFiles() ? " (all files)" : "")); this.region.reportCompactionRequestStart(request.isMajor()); @@ -1990,8 +1990,6 @@ public class HStore implements Store { } /** - * Used in tests. TODO: Remove - * * Updates the value for the given row/family/qualifier. This function will always be seen as * atomic by other readers because it only puts a single KV to memstore. Thus no read/write * control necessary. @@ -2002,6 +2000,7 @@ public class HStore implements Store { * @return memstore size delta * @throws IOException */ + @VisibleForTesting public long updateColumnValue(byte [] row, byte [] f, byte [] qualifier, long newValue) throws IOException { @@ -2055,7 +2054,8 @@ public class HStore implements Store { */ @Override public void prepare() { - this.snapshot = memstore.snapshot(); + // passing the current sequence number of the wal - to allow bookkeeping in the memstore + this.snapshot = memstore.snapshot(cacheFlushSeqNum); this.cacheFlushCount = snapshot.getCellsCount(); this.cacheFlushSize = snapshot.getSize(); committedFiles = new ArrayList<Path>(1); http://git-wip-us.apache.org/repos/asf/hbase/blob/25dfc112/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java new file mode 100644 index 0000000..cfcd81e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java @@ -0,0 +1,72 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * ImmutableSegment is an abstract class that extends the API supported by a {@link Segment}, + * and is not needed for a {@link MutableSegment}. Specifically, the method + * {@link ImmutableSegment#getKeyValueScanner()} builds a special scanner for the + * {@link MemStoreSnapshot} object. + * In addition, this class overrides methods that are not likely to be supported by an immutable + * segment, e.g. {@link Segment#rollback(Cell)} and {@link Segment#getCellSet()}, which + * can be very inefficient. + */ +@InterfaceAudience.Private +public abstract class ImmutableSegment extends Segment { + + public ImmutableSegment(Segment segment) { + super(segment); + } + + /** + * Removes the given cell from this segment. + * By default immutable store segment can not rollback + * It may be invoked by tests in specific cases where it is known to be supported {@link + * ImmutableSegmentAdapter} + */ + @Override + public long rollback(Cell cell) { + return 0; + } + + /** + * Returns a set of all the cells in the segment. + * The implementation of this method might be very inefficient for some immutable segments + * that do not maintain a cell set. Therefore by default this method is not supported. + * It may be invoked by tests in specific cases where it is known to be supported {@link + * ImmutableSegmentAdapter} + */ + @Override + public CellSet getCellSet() { + throw new NotImplementedException("Immutable Segment does not support this operation by " + + "default"); + } + + /** + * Builds a special scanner for the MemStoreSnapshot object that may be different than the + * general segment scanner. + * @return a special scanner for the MemStoreSnapshot object + */ + public abstract KeyValueScanner getKeyValueScanner(); + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/25dfc112/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegmentAdapter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegmentAdapter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegmentAdapter.java new file mode 100644 index 0000000..058865a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegmentAdapter.java @@ -0,0 +1,107 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.commons.logging.Log; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.CollectionBackedScanner; + +/** + * This segment is adapting a mutable segment making it into an immutable segment. + * This is used when a mutable segment is moved to being a snapshot or pushed into a compaction + * pipeline, that consists only of immutable segments. + * The compaction may generate different type of immutable segment + */ +@InterfaceAudience.Private +public class ImmutableSegmentAdapter extends ImmutableSegment { + + final private MutableSegment adaptee; + + public ImmutableSegmentAdapter(MutableSegment segment) { + super(segment); + this.adaptee = segment; + } + + @Override + public KeyValueScanner getKeyValueScanner() { + return new CollectionBackedScanner(adaptee.getCellSet(), adaptee.getComparator()); + } + + @Override + public SegmentScanner getSegmentScanner(long readPoint) { + return adaptee.getSegmentScanner(readPoint); + } + + @Override + public boolean isEmpty() { + return adaptee.isEmpty(); + } + + @Override + public int getCellsCount() { + return adaptee.getCellsCount(); + } + + @Override + public long add(Cell cell) { + return adaptee.add(cell); + } + + @Override + public Cell getFirstAfter(Cell cell) { + return adaptee.getFirstAfter(cell); + } + + @Override + public void close() { + adaptee.close(); + } + + @Override + public Cell maybeCloneWithAllocator(Cell cell) { + return adaptee.maybeCloneWithAllocator(cell); + } + + @Override + public Segment setSize(long size) { + adaptee.setSize(size); + return this; + } + + @Override + public long getSize() { + return adaptee.getSize(); + } + + @Override + public long rollback(Cell cell) { + return adaptee.rollback(cell); + } + + @Override + public CellSet getCellSet() { + return adaptee.getCellSet(); + } + + @Override + public void dump(Log log) { + adaptee.dump(log); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/25dfc112/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index e9f8103..a10ccd9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -17,10 +17,11 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.io.IOException; import java.util.List; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.HeapSize; /** @@ -41,10 +42,19 @@ public interface MemStore extends HeapSize { MemStoreSnapshot snapshot(); /** + * Creates a snapshot of the current memstore. Snapshot must be cleared by call to + * {@link #clearSnapshot(long)}. + * @param flushOpSeqId the current sequence number of the wal; to be attached to the flushed + * segment + * @return {@link MemStoreSnapshot} + */ + MemStoreSnapshot snapshot(long flushOpSeqId); + + /** * Clears the current snapshot of the Memstore. * @param id * @throws UnexpectedStateException - * @see #snapshot() + * @see #snapshot(long) */ void clearSnapshot(long id) throws UnexpectedStateException; @@ -128,7 +138,7 @@ public interface MemStore extends HeapSize { * @return scanner over the memstore. This might include scanner over the snapshot when one is * present. */ - List<KeyValueScanner> getScanners(long readPt); + List<KeyValueScanner> getScanners(long readPt) throws IOException; /** * @return Total memory occupied by this MemStore.