HBASE-16768 Inconsistent results from the Append/Increment (ChiaPing Tsai)

Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/96d34f2a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/96d34f2a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/96d34f2a

Branch: refs/heads/hbase-12439
Commit: 96d34f2a79bf977d83f0b9814b253669e6c6e671
Parents: 2c7211e
Author: tedyu <yuzhih...@gmail.com>
Authored: Fri Oct 7 00:59:27 2016 -0700
Committer: tedyu <yuzhih...@gmail.com>
Committed: Fri Oct 7 00:59:27 2016 -0700

----------------------------------------------------------------------
 .../hbase/regionserver/AbstractMemStore.java    |  9 +++
 .../hadoop/hbase/regionserver/HRegion.java      | 51 ++++++++--------
 .../hadoop/hbase/regionserver/HStore.java       | 10 ++++
 .../hadoop/hbase/regionserver/MemStore.java     |  7 +++
 .../apache/hadoop/hbase/regionserver/Store.java |  7 +++
 .../hadoop/hbase/client/TestFromClientSide.java | 61 ++++++++++++++++++++
 6 files changed, 121 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/96d34f2a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
index aa6576f..5544251 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
@@ -97,6 +97,15 @@ public abstract class AbstractMemStore implements MemStore {
    */
   public abstract void updateLowestUnflushedSequenceIdInWAL(boolean 
onlyIfMoreRecent);
 
+  @Override
+  public long add(Iterable<Cell> cells) {
+    long size = 0;
+    for (Cell cell : cells) {
+      size += add(cell);
+    }
+    return size;
+  }
+  
   /**
    * Write an update
    * @param cell the cell to be added

http://git-wip-us.apache.org/repos/asf/hbase/blob/96d34f2a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 757ddab..d1684a3 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -3256,8 +3256,15 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
         if (batchOp.retCodeDetails[i].getOperationStatusCode() != 
OperationStatusCode.NOT_RUN) {
           continue;
         }
-        addedSize += applyFamilyMapToMemstore(familyMaps[i], replay,
+        // We need to update the sequence id for following reasons.
+        // 1) If the op is in replay mode, FSWALEntry#stampRegionSequenceId 
won't stamp sequence id.
+        // 2) If no WAL, FSWALEntry won't be used
+        boolean updateSeqId = replay || batchOp.getMutation(i).getDurability() 
== Durability.SKIP_WAL;
+        if (updateSeqId) {
+          this.updateSequenceId(familyMaps[i].values(),
             replay? batchOp.getReplaySequenceId(): 
writeEntry.getWriteNumber());
+        }
+        addedSize += applyFamilyMapToMemstore(familyMaps[i]);
       }
 
       // STEP 6. Complete mvcc.
@@ -3673,6 +3680,16 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
     }
   }
 
+  private void updateSequenceId(final Iterable<List<Cell>> cellItr, final long 
sequenceId)
+      throws IOException {
+    for (List<Cell> cells: cellItr) {
+      if (cells == null) return;
+      for (Cell cell : cells) {
+        CellUtil.setSequenceId(cell, sequenceId);
+      }
+    }
+  }
+
   @Override
   public void updateCellTimestamps(final Iterable<List<Cell>> cellItr, final 
byte[] now)
       throws IOException {
@@ -3783,15 +3800,14 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
    * @param familyMap Map of Cells by family
    * @return the additional memory usage of the memstore caused by the new 
entries.
    */
-  private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap, 
boolean replay,
-      long sequenceId)
+  private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap)
   throws IOException {
     long size = 0;
     for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
       byte[] family = e.getKey();
       List<Cell> cells = e.getValue();
       assert cells instanceof RandomAccess;
-      size += applyToMemstore(getStore(family), cells, false, replay, 
sequenceId);
+      size += applyToMemstore(getStore(family), cells, false);
     }
     return size;
   }
@@ -3803,34 +3819,22 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
    * @return Memstore change in size on insert of these Cells.
    * @see #applyToMemstore(Store, Cell, long)
    */
-  private long applyToMemstore(final Store store, final List<Cell> cells,
-      final boolean delta, boolean replay, long sequenceId)
+  private long applyToMemstore(final Store store, final List<Cell> cells, 
final boolean delta)
   throws IOException {
     // Any change in how we update Store/MemStore needs to also be done in 
other applyToMemstore!!!!
-    long size = 0;
     boolean upsert = delta && store.getFamily().getMaxVersions() == 1;
-    int count = cells.size();
     if (upsert) {
-      size += store.upsert(cells, getSmallestReadPoint());
+      return store.upsert(cells, getSmallestReadPoint());
     } else {
-      for (int i = 0; i < count; i++) {
-        Cell cell = cells.get(i);
-        // TODO: This looks wrong.. checking for sequenceid of zero is 
expensive!!!!! St.Ack
-        // When is it zero anyways? When replay? Then just rely on that flag.
-        if (cell.getSequenceId() == 0 || replay) {
-          CellUtil.setSequenceId(cell, sequenceId);
-        }
-        size += store.add(cell);
-      }
+      return store.add(cells);
     }
-    return size;
   }
 
   /**
    * @return Memstore change in size on insert of these Cells.
    * @see #applyToMemstore(Store, List, boolean, boolean, long)
    */
-  private long applyToMemstore(final Store store, final Cell cell, long 
sequenceId)
+  private long applyToMemstore(final Store store, final Cell cell)
   throws IOException {
     // Any change in how we update Store/MemStore needs to also be done in 
other applyToMemstore!!!!
     if (store == null) {
@@ -7045,7 +7049,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
                 CellUtil.setSequenceId(cell, sequenceId);
               }
               Store store = getStore(cell);
-              addedSize += applyToMemstore(store, cell, sequenceId);
+              addedSize += applyToMemstore(store, cell);
             }
           }
           // STEP 8. Complete mvcc.
@@ -7231,12 +7235,11 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
           // transaction.
           recordMutationWithoutWal(mutation.getFamilyCellMap());
           writeEntry = mvcc.begin();
+          updateSequenceId(forMemStore.values(), writeEntry.getWriteNumber());
         }
         // Now write to MemStore. Do it a column family at a time.
-        long sequenceId = writeEntry.getWriteNumber();
         for (Map.Entry<Store, List<Cell>> e: forMemStore.entrySet()) {
-          accumulatedResultSize +=
-              applyToMemstore(e.getKey(), e.getValue(), true, false, 
sequenceId);
+          accumulatedResultSize += applyToMemstore(e.getKey(), e.getValue(), 
true);
         }
         mvcc.completeAndWait(writeEntry);
         if (rsServices != null && rsServices.getNonceManager() != null) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/96d34f2a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index ce5c91d..e9c05c7 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -635,6 +635,16 @@ public class HStore implements Store {
   }
 
   @Override
+  public long add(final Iterable<Cell> cells) {
+    lock.readLock().lock();
+    try {
+      return memstore.add(cells);
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  @Override
   public long timeOfOldestEdit() {
     return memstore.timeOfOldestEdit();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/96d34f2a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
index d52b863..bcaf3a2 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
@@ -74,6 +74,13 @@ public interface MemStore extends HeapSize {
   long add(final Cell cell);
 
   /**
+   * Write the updates
+   * @param cells
+   * @return approximate size of the passed cell.
+   */
+  long add(Iterable<Cell> cells);
+
+  /**
    * @return Oldest timestamp of all the Cells in the MemStore
    */
   long timeOfOldestEdit();

http://git-wip-us.apache.org/repos/asf/hbase/blob/96d34f2a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index 2d8b1a4..7159502 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -151,6 +151,13 @@ public interface Store extends HeapSize, 
StoreConfigInformation, PropagatingConf
   long add(Cell cell);
 
   /**
+   * Adds the specified value to the memstore
+   * @param cells
+   * @return memstore size delta
+   */
+  long add(Iterable<Cell> cells);
+
+  /**
    * When was the last edit done in the memstore
    */
   long timeOfOldestEdit();

http://git-wip-us.apache.org/repos/asf/hbase/blob/96d34f2a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
index 6bd9ccd..80337a2 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
@@ -33,6 +33,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
@@ -4477,6 +4478,66 @@ public class TestFromClientSide {
     assertEquals(r.getColumnLatestCell(FAMILY, QUALIFIERS[0]).getTimestamp(),
         r.getColumnLatestCell(FAMILY, QUALIFIERS[2]).getTimestamp());
   }
+  private List<Result> doAppend(final boolean walUsed) throws IOException {
+    LOG.info("Starting testAppend, walUsed is " + walUsed);
+    final TableName TABLENAME = TableName.valueOf(walUsed ? 
"testAppendWithWAL" : "testAppendWithoutWAL");
+    Table t = TEST_UTIL.createTable(TABLENAME, FAMILY);
+    final byte[] row1 = Bytes.toBytes("c");
+    final byte[] row2 = Bytes.toBytes("b");
+    final byte[] row3 = Bytes.toBytes("a");
+    final byte[] qual = Bytes.toBytes("qual");
+    Put put_0 = new Put(row2);
+    put_0.addColumn(FAMILY, qual, Bytes.toBytes("put"));
+    Put put_1 = new Put(row3);
+    put_1.addColumn(FAMILY, qual, Bytes.toBytes("put"));
+    Append append_0 = new Append(row1);
+    append_0.add(FAMILY, qual, Bytes.toBytes("i"));
+    Append append_1 = new Append(row1);
+    append_1.add(FAMILY, qual, Bytes.toBytes("k"));
+    Append append_2 = new Append(row1);
+    append_2.add(FAMILY, qual, Bytes.toBytes("e"));
+    if (!walUsed) {
+      append_2.setDurability(Durability.SKIP_WAL);
+    }
+    Append append_3 = new Append(row1);
+    append_3.add(FAMILY, qual, Bytes.toBytes("a"));
+    Scan s = new Scan();
+    s.setCaching(1);
+    t.append(append_0);
+    t.put(put_0);
+    t.put(put_1);
+    List<Result> results = new LinkedList<>();
+    try (ResultScanner scanner = t.getScanner(s)) {
+      t.append(append_1);
+      t.append(append_2);
+      t.append(append_3);
+      for (Result r : scanner) {
+        results.add(r);
+      }
+    }
+    TEST_UTIL.deleteTable(TABLENAME);
+    return results;
+  }
+
+  @Test
+  public void testAppendWithoutWAL() throws Exception {
+    List<Result> resultsWithWal = doAppend(true);
+    List<Result> resultsWithoutWal = doAppend(false);
+    assertEquals(resultsWithWal.size(), resultsWithoutWal.size());
+    for (int i = 0; i != resultsWithWal.size(); ++i) {
+      Result resultWithWal = resultsWithWal.get(i);
+      Result resultWithoutWal = resultsWithoutWal.get(i);
+      assertEquals(resultWithWal.rawCells().length, 
resultWithoutWal.rawCells().length);
+      for (int j = 0; j != resultWithWal.rawCells().length; ++j) {
+        Cell cellWithWal = resultWithWal.rawCells()[j];
+        Cell cellWithoutWal = resultWithoutWal.rawCells()[j];
+        assertTrue(Bytes.equals(CellUtil.cloneRow(cellWithWal), 
CellUtil.cloneRow(cellWithoutWal)));
+        assertTrue(Bytes.equals(CellUtil.cloneFamily(cellWithWal), 
CellUtil.cloneFamily(cellWithoutWal)));
+        assertTrue(Bytes.equals(CellUtil.cloneQualifier(cellWithWal), 
CellUtil.cloneQualifier(cellWithoutWal)));
+        assertTrue(Bytes.equals(CellUtil.cloneValue(cellWithWal), 
CellUtil.cloneValue(cellWithoutWal)));
+      }
+    }
+  }
 
   @Test
   public void testClientPoolRoundRobin() throws IOException {

Reply via email to