HBASE-18295  The result contains the cells across different rows

Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d215cb49
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d215cb49
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d215cb49

Branch: refs/heads/HBASE-18147
Commit: d215cb495085c5b7e5b27ce2ffdb6c4b4a90b95f
Parents: 1978b78
Author: Chia-Ping Tsai <chia7...@gmail.com>
Authored: Wed Jul 12 02:27:29 2017 +0800
Committer: Chia-Ping Tsai <chia7...@gmail.com>
Committed: Wed Jul 12 02:27:29 2017 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/StoreScanner.java |  46 +++-
 .../hadoop/hbase/regionserver/TestStore.java    | 228 ++++++++++++++++---
 2 files changed, 232 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/d215cb49/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index 11301d8..1fcb314 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -22,9 +22,7 @@ package org.apache.hadoop.hbase.regionserver;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.NavigableSet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.locks.ReentrantLock;
@@ -158,6 +156,7 @@ public class StoreScanner extends 
NonReversedNonLazyKeyValueScanner
   private final ReentrantLock flushLock = new ReentrantLock();
 
   protected final long readPt;
+  private boolean topChanged = false;
 
   // used by the injection framework to test race between StoreScanner 
construction and compaction
   enum StoreScannerCompactionRace {
@@ -606,6 +605,7 @@ public class StoreScanner extends 
NonReversedNonLazyKeyValueScanner
       int cellSize = CellUtil.estimatedSerializedSizeOf(cell);
       bytesRead += cellSize;
       prevCell = cell;
+      topChanged = false;
       ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
       switch (qcode) {
         case INCLUDE:
@@ -692,10 +692,18 @@ public class StoreScanner extends 
NonReversedNonLazyKeyValueScanner
           }
           matcher.clearCurrentRow();
           seekOrSkipToNextRow(cell);
+          NextState stateAfterSeekNextRow = needToReturn(outResult);
+          if (stateAfterSeekNextRow != null) {
+            return 
scannerContext.setScannerState(stateAfterSeekNextRow).hasMoreValues();
+          }
           break;
 
         case SEEK_NEXT_COL:
           seekOrSkipToNextColumn(cell);
+          NextState stateAfterSeekNextColumn = needToReturn(outResult);
+          if (stateAfterSeekNextColumn != null) {
+            return 
scannerContext.setScannerState(stateAfterSeekNextColumn).hasMoreValues();
+          }
           break;
 
         case SKIP:
@@ -706,6 +714,10 @@ public class StoreScanner extends 
NonReversedNonLazyKeyValueScanner
           Cell nextKV = matcher.getNextKeyHint(cell);
           if (nextKV != null) {
             seekAsDirection(nextKV);
+            NextState stateAfterSeekByHint = needToReturn(outResult);
+            if (stateAfterSeekByHint != null) {
+              return 
scannerContext.setScannerState(stateAfterSeekByHint).hasMoreValues();
+            }
           } else {
             heap.next();
           }
@@ -725,6 +737,24 @@ public class StoreScanner extends 
NonReversedNonLazyKeyValueScanner
     return 
scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
   }
 
+  /**
+   * If the top cell won't be flushed into disk, the new top cell may be
+   * changed after #reopenAfterFlush. Because the older top cell only exist
+   * in the memstore scanner but the memstore scanner is replaced by hfile
+   * scanner after #reopenAfterFlush. If the row of top cell is changed,
+   * we should return the current cells. Otherwise, we may return
+   * the cells across different rows.
+   * @param outResult the cells which are visible for user scan
+   * @return null is the top cell doesn't change. Otherwise, the NextState
+   *         to return
+   */
+  private NextState needToReturn(List<Cell> outResult) {
+    if (!outResult.isEmpty() && topChanged) {
+      return heap.peek() == null ? NextState.NO_MORE_VALUES : 
NextState.MORE_VALUES;
+    }
+    return null;
+  }
+
   private void seekOrSkipToNextRow(Cell cell) throws IOException {
     // If it is a Get Scan, then we know that we are done with this row; there 
are no more
     // rows beyond the current one: don't try to optimize.
@@ -901,13 +931,13 @@ public class StoreScanner extends 
NonReversedNonLazyKeyValueScanner
     resetKVHeap(this.currentScanners, store.getComparator());
     resetQueryMatcher(lastTop);
     if (heap.peek() == null || store.getComparator().compareRows(lastTop, 
this.heap.peek()) != 0) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Storescanner.peek() is changed where before = " + 
lastTop.toString() +
-            ",and after = " + heap.peek());
-      }
-      return true;
+      LOG.info("Storescanner.peek() is changed where before = " + 
lastTop.toString() +
+          ",and after = " + heap.peek());
+      topChanged = true;
+    } else {
+      topChanged = false;
     }
-    return false;
+    return topChanged;
   }
 
   private void resetQueryMatcher(Cell lastTopKey) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/d215cb49/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
index 9a49f07..56a6182 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
@@ -106,6 +106,10 @@ import org.junit.rules.TestName;
 import org.mockito.Mockito;
 
 import com.google.common.collect.Lists;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Test class for the Store
@@ -184,12 +188,12 @@ public class TestStore {
 
   @SuppressWarnings("deprecation")
   private Store init(String methodName, Configuration conf, HTableDescriptor 
htd,
-      HColumnDescriptor hcd, MyScannerHook hook) throws IOException {
+      HColumnDescriptor hcd, MyStoreHook hook) throws IOException {
     return init(methodName, conf, htd, hcd, hook, false);
   }
   @SuppressWarnings("deprecation")
   private Store init(String methodName, Configuration conf, HTableDescriptor 
htd,
-      HColumnDescriptor hcd, MyScannerHook hook, boolean switchToPread) throws 
IOException {
+      HColumnDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws 
IOException {
     //Setting up a Store
     Path basedir = new Path(DIR+methodName);
     Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName());
@@ -1061,6 +1065,150 @@ public class TestStore {
   }
 
   @Test
+  public void testFlushBeforeCompletingScanWoFilter() throws IOException, 
InterruptedException {
+    final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
+    testFlushBeforeCompletingScan(new MyListHook() {
+      @Override
+      public void hook(int currentSize) {
+        if (currentSize == 2) {
+          try {
+            flushStore(store, id++);
+            timeToGoNextRow.set(true);
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+        }
+      }
+    }, new FilterBase() {
+      @Override
+      public Filter.ReturnCode filterKeyValue(Cell v) throws IOException {
+        return ReturnCode.INCLUDE;
+      }
+    });
+  }
+
+  @Test
+  public void testFlushBeforeCompletingScanWithFilter() throws IOException, 
InterruptedException {
+    final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
+    testFlushBeforeCompletingScan(new MyListHook() {
+      @Override
+      public void hook(int currentSize) {
+        if (currentSize == 2) {
+          try {
+            flushStore(store, id++);
+            timeToGoNextRow.set(true);
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+        }
+      }
+    }, new FilterBase() {
+      @Override
+      public Filter.ReturnCode filterKeyValue(Cell v) throws IOException {
+        if (timeToGoNextRow.get()) {
+          timeToGoNextRow.set(false);
+          return ReturnCode.NEXT_ROW;
+        } else {
+          return ReturnCode.INCLUDE;
+        }
+      }
+    });
+  }
+
+  @Test
+  public void testFlushBeforeCompletingScanWithFilterHint() throws 
IOException, InterruptedException {
+    final AtomicBoolean timeToGetHint = new AtomicBoolean(false);
+    testFlushBeforeCompletingScan(new MyListHook() {
+      @Override
+      public void hook(int currentSize) {
+        if (currentSize == 2) {
+          try {
+            flushStore(store, id++);
+            timeToGetHint.set(true);
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+        }
+      }
+    }, new FilterBase() {
+      @Override
+      public Filter.ReturnCode filterKeyValue(Cell v) throws IOException {
+        if (timeToGetHint.get()) {
+          timeToGetHint.set(false);
+          return Filter.ReturnCode.SEEK_NEXT_USING_HINT;
+        } else {
+          return Filter.ReturnCode.INCLUDE;
+        }
+      }
+      @Override
+      public Cell getNextCellHint(Cell currentCell) throws IOException {
+        return currentCell;
+      }
+    });
+  }
+
+  private void testFlushBeforeCompletingScan(MyListHook hook, Filter filter)
+          throws IOException, InterruptedException {
+    Configuration conf = HBaseConfiguration.create();
+    HColumnDescriptor hcd = new HColumnDescriptor(family);
+    hcd.setMaxVersions(1);
+    byte[] r0 = Bytes.toBytes("row0");
+    byte[] r1 = Bytes.toBytes("row1");
+    byte[] r2 = Bytes.toBytes("row2");
+    byte[] value0 = Bytes.toBytes("value0");
+    byte[] value1 = Bytes.toBytes("value1");
+    byte[] value2 = Bytes.toBytes("value2");
+    MemstoreSize memStoreSize = new MemstoreSize();
+    long ts = EnvironmentEdgeManager.currentTime();
+    long seqId = 100;
+    init(name.getMethodName(), conf, new 
HTableDescriptor(TableName.valueOf(table)), hcd, new MyStoreHook() {
+      @Override
+      public long getSmallestReadPoint(HStore store) {
+        return seqId + 3;
+      }
+    });
+    // The cells having the value0 won't be flushed to disk because the value 
of max version is 1
+    store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSize);
+    store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSize);
+    store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSize);
+    store.add(createCell(r1, qf1, ts + 1, seqId + 1, value1), memStoreSize);
+    store.add(createCell(r1, qf2, ts + 1, seqId + 1, value1), memStoreSize);
+    store.add(createCell(r1, qf3, ts + 1, seqId + 1, value1), memStoreSize);
+    store.add(createCell(r2, qf1, ts + 2, seqId + 2, value2), memStoreSize);
+    store.add(createCell(r2, qf2, ts + 2, seqId + 2, value2), memStoreSize);
+    store.add(createCell(r2, qf3, ts + 2, seqId + 2, value2), memStoreSize);
+    store.add(createCell(r1, qf1, ts + 3, seqId + 3, value1), memStoreSize);
+    store.add(createCell(r1, qf2, ts + 3, seqId + 3, value1), memStoreSize);
+    store.add(createCell(r1, qf3, ts + 3, seqId + 3, value1), memStoreSize);
+    List<Cell> myList = new MyList<>(hook);
+    Scan scan = new Scan()
+            .withStartRow(r1)
+            .setFilter(filter);
+    try (InternalScanner scanner = (InternalScanner) store.getScanner(
+          scan, null, seqId + 3)){
+      // r1
+      scanner.next(myList);
+      assertEquals(3, myList.size());
+      for (Cell c : myList) {
+        byte[] actualValue = CellUtil.cloneValue(c);
+        assertTrue("expected:" + Bytes.toStringBinary(value1)
+          + ", actual:" + Bytes.toStringBinary(actualValue)
+          , Bytes.equals(actualValue, value1));
+      }
+      List<Cell> normalList = new ArrayList<>(3);
+      // r2
+      scanner.next(normalList);
+      assertEquals(3, normalList.size());
+      for (Cell c : normalList) {
+        byte[] actualValue = CellUtil.cloneValue(c);
+        assertTrue("expected:" + Bytes.toStringBinary(value2)
+          + ", actual:" + Bytes.toStringBinary(actualValue)
+          , Bytes.equals(actualValue, value2));
+      }
+    }
+  }
+
+  @Test
   public void testCreateScannerAndSnapshotConcurrently() throws IOException, 
InterruptedException {
     Configuration conf = HBaseConfiguration.create();
     conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore.class.getName());
@@ -1120,25 +1268,28 @@ public class TestStore {
   public void testScanWithDoubleFlush() throws IOException {
     Configuration conf = HBaseConfiguration.create();
     // Initialize region
-    MyStore myStore = initMyStore(name.getMethodName(), conf, (final MyStore 
store1) -> {
-      final long tmpId = id++;
-      ExecutorService s = Executors.newSingleThreadExecutor();
-      s.submit(() -> {
+    MyStore myStore = initMyStore(name.getMethodName(), conf, new 
MyStoreHook(){
+      @Override
+      public void getScanners(MyStore store) throws IOException {
+        final long tmpId = id++;
+        ExecutorService s = Executors.newSingleThreadExecutor();
+        s.submit(() -> {
+          try {
+            // flush the store before storescanner updates the scanners from 
store.
+            // The current data will be flushed into files, and the memstore 
will
+            // be clear.
+            // -- phase (4/4)
+            flushStore(store, tmpId);
+          }catch (IOException ex) {
+            throw new RuntimeException(ex);
+          }
+        });
+        s.shutdown();
         try {
-          // flush the store before storescanner updates the scanners from 
store.
-          // The current data will be flushed into files, and the memstore will
-          // be clear.
-          // -- phase (4/4)
-          flushStore(store1, tmpId);
-        }catch (IOException ex) {
-          throw new RuntimeException(ex);
+          // wait for the flush, the thread will be blocked in 
HStore#notifyChangedReadersObservers.
+          s.awaitTermination(3, TimeUnit.SECONDS);
+        } catch (InterruptedException ex) {
         }
-      });
-      s.shutdown();
-      try {
-        // wait for the flush, the thread will be blocked in 
HStore#notifyChangedReadersObservers.
-        s.awaitTermination(3, TimeUnit.SECONDS);
-      } catch (InterruptedException ex) {
       }
     });
     byte[] oldValue = Bytes.toBytes("oldValue");
@@ -1290,7 +1441,7 @@ public class TestStore {
     storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
   }
 
-  private MyStore initMyStore(String methodName, Configuration conf, 
MyScannerHook hook)
+  private MyStore initMyStore(String methodName, Configuration conf, 
MyStoreHook hook)
       throws IOException {
     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
     HColumnDescriptor hcd = new HColumnDescriptor(family);
@@ -1299,10 +1450,10 @@ public class TestStore {
   }
 
   class MyStore extends HStore {
-    private final MyScannerHook hook;
+    private final MyStoreHook hook;
 
     MyStore(final HRegion region, final HColumnDescriptor family, final 
Configuration confParam,
-        MyScannerHook hook, boolean switchToPread) throws IOException {
+        MyStoreHook hook, boolean switchToPread) throws IOException {
       super(region, family, confParam);
       this.hook = hook;
     }
@@ -1312,13 +1463,22 @@ public class TestStore {
         boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, 
byte[] startRow,
         boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long 
readPt,
         boolean includeMemstoreScanner) throws IOException {
-      hook.hook(this);
+      hook.getScanners(this);
       return super.getScanners(files, cacheBlocks, usePread, isCompaction, 
matcher, startRow, true,
         stopRow, false, readPt, includeMemstoreScanner);
     }
+    @Override
+    public long getSmallestReadPoint() {
+      return hook.getSmallestReadPoint(this);
+    }
   }
-  private interface MyScannerHook {
-    void hook(MyStore store) throws IOException;
+
+  private abstract class MyStoreHook {
+    void getScanners(MyStore store) throws IOException {
+    }
+    long getSmallestReadPoint(HStore store) {
+      return store.getHRegion().getSmallestReadPoint();
+    }
   }
 
   @Test
@@ -1330,12 +1490,7 @@ public class TestStore {
     // Set the lower threshold to invoke the "MERGE" policy
     HColumnDescriptor hcd = new HColumnDescriptor(family);
     hcd.setInMemoryCompaction(MemoryCompactionPolicy.BASIC);
-    MyStore store = initMyStore(name.getMethodName(), conf, new 
MyScannerHook() {
-      @Override
-      public void hook(org.apache.hadoop.hbase.regionserver.TestStore.MyStore 
store)
-          throws IOException {
-      }
-    });
+    MyStore store = initMyStore(name.getMethodName(), conf, new MyStoreHook() 
{});
     MemstoreSize memStoreSize = new MemstoreSize();
     long ts = System.currentTimeMillis();
     long seqID = 1l;
@@ -1497,10 +1652,15 @@ public class TestStore {
       }
     }
   }
+
+  interface MyListHook {
+    void hook(int currentSize);
+  }
+
   private static class MyList<T> implements List<T> {
     private final List<T> delegatee = new ArrayList<>();
-    private final Consumer<Integer> hookAtAdd;
-    MyList(final Consumer<Integer> hookAtAdd) {
+    private final MyListHook hookAtAdd;
+    MyList(final MyListHook hookAtAdd) {
       this.hookAtAdd = hookAtAdd;
     }
     @Override
@@ -1523,7 +1683,7 @@ public class TestStore {
 
     @Override
     public boolean add(T e) {
-      hookAtAdd.accept(size());
+      hookAtAdd.hook(size());
       return delegatee.add(e);
     }
 

Reply via email to