HBASE-16768 Inconsistent results from the Append/Increment (ChiaPing Tsai)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/96d34f2a Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/96d34f2a Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/96d34f2a Branch: refs/heads/hbase-12439 Commit: 96d34f2a79bf977d83f0b9814b253669e6c6e671 Parents: 2c7211e Author: tedyu <yuzhih...@gmail.com> Authored: Fri Oct 7 00:59:27 2016 -0700 Committer: tedyu <yuzhih...@gmail.com> Committed: Fri Oct 7 00:59:27 2016 -0700 ---------------------------------------------------------------------- .../hbase/regionserver/AbstractMemStore.java | 9 +++ .../hadoop/hbase/regionserver/HRegion.java | 51 ++++++++-------- .../hadoop/hbase/regionserver/HStore.java | 10 ++++ .../hadoop/hbase/regionserver/MemStore.java | 7 +++ .../apache/hadoop/hbase/regionserver/Store.java | 7 +++ .../hadoop/hbase/client/TestFromClientSide.java | 61 ++++++++++++++++++++ 6 files changed, 121 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/96d34f2a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java index aa6576f..5544251 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java @@ -97,6 +97,15 @@ public abstract class AbstractMemStore implements MemStore { */ public abstract void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent); + @Override + public long add(Iterable<Cell> cells) { + long size = 0; + for (Cell cell : cells) { + size += add(cell); + } + return size; + } + /** * Write an update * @param cell the cell to be added http://git-wip-us.apache.org/repos/asf/hbase/blob/96d34f2a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 757ddab..d1684a3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -3256,8 +3256,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { continue; } - addedSize += applyFamilyMapToMemstore(familyMaps[i], replay, + // We need to update the sequence id for following reasons. + // 1) If the op is in replay mode, FSWALEntry#stampRegionSequenceId won't stamp sequence id. + // 2) If no WAL, FSWALEntry won't be used + boolean updateSeqId = replay || batchOp.getMutation(i).getDurability() == Durability.SKIP_WAL; + if (updateSeqId) { + this.updateSequenceId(familyMaps[i].values(), replay? batchOp.getReplaySequenceId(): writeEntry.getWriteNumber()); + } + addedSize += applyFamilyMapToMemstore(familyMaps[i]); } // STEP 6. Complete mvcc. @@ -3673,6 +3680,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + private void updateSequenceId(final Iterable<List<Cell>> cellItr, final long sequenceId) + throws IOException { + for (List<Cell> cells: cellItr) { + if (cells == null) return; + for (Cell cell : cells) { + CellUtil.setSequenceId(cell, sequenceId); + } + } + } + @Override public void updateCellTimestamps(final Iterable<List<Cell>> cellItr, final byte[] now) throws IOException { @@ -3783,15 +3800,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @param familyMap Map of Cells by family * @return the additional memory usage of the memstore caused by the new entries. */ - private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap, boolean replay, - long sequenceId) + private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap) throws IOException { long size = 0; for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) { byte[] family = e.getKey(); List<Cell> cells = e.getValue(); assert cells instanceof RandomAccess; - size += applyToMemstore(getStore(family), cells, false, replay, sequenceId); + size += applyToMemstore(getStore(family), cells, false); } return size; } @@ -3803,34 +3819,22 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @return Memstore change in size on insert of these Cells. * @see #applyToMemstore(Store, Cell, long) */ - private long applyToMemstore(final Store store, final List<Cell> cells, - final boolean delta, boolean replay, long sequenceId) + private long applyToMemstore(final Store store, final List<Cell> cells, final boolean delta) throws IOException { // Any change in how we update Store/MemStore needs to also be done in other applyToMemstore!!!! - long size = 0; boolean upsert = delta && store.getFamily().getMaxVersions() == 1; - int count = cells.size(); if (upsert) { - size += store.upsert(cells, getSmallestReadPoint()); + return store.upsert(cells, getSmallestReadPoint()); } else { - for (int i = 0; i < count; i++) { - Cell cell = cells.get(i); - // TODO: This looks wrong.. checking for sequenceid of zero is expensive!!!!! St.Ack - // When is it zero anyways? When replay? Then just rely on that flag. - if (cell.getSequenceId() == 0 || replay) { - CellUtil.setSequenceId(cell, sequenceId); - } - size += store.add(cell); - } + return store.add(cells); } - return size; } /** * @return Memstore change in size on insert of these Cells. * @see #applyToMemstore(Store, List, boolean, boolean, long) */ - private long applyToMemstore(final Store store, final Cell cell, long sequenceId) + private long applyToMemstore(final Store store, final Cell cell) throws IOException { // Any change in how we update Store/MemStore needs to also be done in other applyToMemstore!!!! if (store == null) { @@ -7045,7 +7049,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi CellUtil.setSequenceId(cell, sequenceId); } Store store = getStore(cell); - addedSize += applyToMemstore(store, cell, sequenceId); + addedSize += applyToMemstore(store, cell); } } // STEP 8. Complete mvcc. @@ -7231,12 +7235,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // transaction. recordMutationWithoutWal(mutation.getFamilyCellMap()); writeEntry = mvcc.begin(); + updateSequenceId(forMemStore.values(), writeEntry.getWriteNumber()); } // Now write to MemStore. Do it a column family at a time. - long sequenceId = writeEntry.getWriteNumber(); for (Map.Entry<Store, List<Cell>> e: forMemStore.entrySet()) { - accumulatedResultSize += - applyToMemstore(e.getKey(), e.getValue(), true, false, sequenceId); + accumulatedResultSize += applyToMemstore(e.getKey(), e.getValue(), true); } mvcc.completeAndWait(writeEntry); if (rsServices != null && rsServices.getNonceManager() != null) { http://git-wip-us.apache.org/repos/asf/hbase/blob/96d34f2a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index ce5c91d..e9c05c7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -635,6 +635,16 @@ public class HStore implements Store { } @Override + public long add(final Iterable<Cell> cells) { + lock.readLock().lock(); + try { + return memstore.add(cells); + } finally { + lock.readLock().unlock(); + } + } + + @Override public long timeOfOldestEdit() { return memstore.timeOfOldestEdit(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/96d34f2a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index d52b863..bcaf3a2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -74,6 +74,13 @@ public interface MemStore extends HeapSize { long add(final Cell cell); /** + * Write the updates + * @param cells + * @return approximate size of the passed cell. + */ + long add(Iterable<Cell> cells); + + /** * @return Oldest timestamp of all the Cells in the MemStore */ long timeOfOldestEdit(); http://git-wip-us.apache.org/repos/asf/hbase/blob/96d34f2a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 2d8b1a4..7159502 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -151,6 +151,13 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf long add(Cell cell); /** + * Adds the specified value to the memstore + * @param cells + * @return memstore size delta + */ + long add(Iterable<Cell> cells); + + /** * When was the last edit done in the memstore */ long timeOfOldestEdit(); http://git-wip-us.apache.org/repos/asf/hbase/blob/96d34f2a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index 6bd9ccd..80337a2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -33,6 +33,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -4477,6 +4478,66 @@ public class TestFromClientSide { assertEquals(r.getColumnLatestCell(FAMILY, QUALIFIERS[0]).getTimestamp(), r.getColumnLatestCell(FAMILY, QUALIFIERS[2]).getTimestamp()); } + private List<Result> doAppend(final boolean walUsed) throws IOException { + LOG.info("Starting testAppend, walUsed is " + walUsed); + final TableName TABLENAME = TableName.valueOf(walUsed ? "testAppendWithWAL" : "testAppendWithoutWAL"); + Table t = TEST_UTIL.createTable(TABLENAME, FAMILY); + final byte[] row1 = Bytes.toBytes("c"); + final byte[] row2 = Bytes.toBytes("b"); + final byte[] row3 = Bytes.toBytes("a"); + final byte[] qual = Bytes.toBytes("qual"); + Put put_0 = new Put(row2); + put_0.addColumn(FAMILY, qual, Bytes.toBytes("put")); + Put put_1 = new Put(row3); + put_1.addColumn(FAMILY, qual, Bytes.toBytes("put")); + Append append_0 = new Append(row1); + append_0.add(FAMILY, qual, Bytes.toBytes("i")); + Append append_1 = new Append(row1); + append_1.add(FAMILY, qual, Bytes.toBytes("k")); + Append append_2 = new Append(row1); + append_2.add(FAMILY, qual, Bytes.toBytes("e")); + if (!walUsed) { + append_2.setDurability(Durability.SKIP_WAL); + } + Append append_3 = new Append(row1); + append_3.add(FAMILY, qual, Bytes.toBytes("a")); + Scan s = new Scan(); + s.setCaching(1); + t.append(append_0); + t.put(put_0); + t.put(put_1); + List<Result> results = new LinkedList<>(); + try (ResultScanner scanner = t.getScanner(s)) { + t.append(append_1); + t.append(append_2); + t.append(append_3); + for (Result r : scanner) { + results.add(r); + } + } + TEST_UTIL.deleteTable(TABLENAME); + return results; + } + + @Test + public void testAppendWithoutWAL() throws Exception { + List<Result> resultsWithWal = doAppend(true); + List<Result> resultsWithoutWal = doAppend(false); + assertEquals(resultsWithWal.size(), resultsWithoutWal.size()); + for (int i = 0; i != resultsWithWal.size(); ++i) { + Result resultWithWal = resultsWithWal.get(i); + Result resultWithoutWal = resultsWithoutWal.get(i); + assertEquals(resultWithWal.rawCells().length, resultWithoutWal.rawCells().length); + for (int j = 0; j != resultWithWal.rawCells().length; ++j) { + Cell cellWithWal = resultWithWal.rawCells()[j]; + Cell cellWithoutWal = resultWithoutWal.rawCells()[j]; + assertTrue(Bytes.equals(CellUtil.cloneRow(cellWithWal), CellUtil.cloneRow(cellWithoutWal))); + assertTrue(Bytes.equals(CellUtil.cloneFamily(cellWithWal), CellUtil.cloneFamily(cellWithoutWal))); + assertTrue(Bytes.equals(CellUtil.cloneQualifier(cellWithWal), CellUtil.cloneQualifier(cellWithoutWal))); + assertTrue(Bytes.equals(CellUtil.cloneValue(cellWithWal), CellUtil.cloneValue(cellWithoutWal))); + } + } + } @Test public void testClientPoolRoundRobin() throws IOException {