Repository: hbase
Updated Branches:
  refs/heads/branch-1 cadc4cf15 -> a6f8214e9


HBASE-15236 Inconsistent cell reads over multiple bulk-loaded HFiles. In 
KeyValueHeap, if two cells are same i.e. have same key and timestamp, then 
instead of directly using seq id to determine newer one, we should use 
StoreFile.Comparater.SEQ_ID because that's what is used to determine order of 
hfiles. In this patch, we assign each scanner an order based on it's index in 
storefiles list, which is then used in KeyValueHeap to disambiguate between 
same cells. Changes the getSequenceId() in KeyValueScanner class to 
getScannerOrder(). Testing: Adds unit test to TestKeyValueHeap. Manual testing: 
Three cases (Tables t, t2, t3 in the jira description), single region, 2 hfiles 
with same seq id, timestamps and duplicate KVs. Made sure that returned kv was 
same for get and scan. (Apekshit)

Change-Id: I22600c91c0a51fb63eb17db73472839d2f13957c

Signed-off-by: stack <st...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a6f8214e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a6f8214e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a6f8214e

Branch: refs/heads/branch-1
Commit: a6f8214e9b8cce79edcaa12bc026d9ef7f77970f
Parents: cadc4cf
Author: Apekshit <apeksha...@gmail.com>
Authored: Tue Feb 23 00:31:18 2016 -0800
Committer: stack <st...@apache.org>
Committed: Thu May 12 21:31:13 2016 -0700

----------------------------------------------------------------------
 .../hbase/regionserver/DefaultMemStore.java     |   7 +-
 .../hadoop/hbase/regionserver/KeyValueHeap.java |  20 +-
 .../hbase/regionserver/KeyValueScanner.java     |  12 +-
 .../hadoop/hbase/regionserver/StoreFile.java    |  52 ++--
 .../hbase/regionserver/StoreFileScanner.java    |  38 ++-
 .../hadoop/hbase/regionserver/StoreScanner.java |   5 +-
 .../hbase/util/CollectionBackedScanner.java     |   5 +-
 .../hbase/regionserver/TestKeyValueHeap.java    | 268 +++++++------------
 .../hbase/regionserver/TestStoreFile.java       |   4 +-
 .../regionserver/compactions/TestCompactor.java |   2 +-
 .../compactions/TestStripeCompactionPolicy.java |   3 +-
 11 files changed, 198 insertions(+), 218 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/a6f8214e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
index 05041f5..ce793a9 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
@@ -939,11 +939,12 @@ public class DefaultMemStore implements MemStore {
     }
 
     /**
-     * MemStoreScanner returns max value as sequence id because it will
-     * always have the latest data among all files.
+     * MemStoreScanner returns Long.MAX_VALUE because it will always have the 
latest data among all
+     * scanners.
+     * @see KeyValueScanner#getScannerOrder()
      */
     @Override
-    public long getSequenceID() {
+    public long getScannerOrder() {
       return Long.MAX_VALUE;
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/a6f8214e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
index ac76bfd..4b77e179 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
@@ -182,17 +182,10 @@ public class KeyValueHeap extends 
NonReversedNonLazyKeyValueScanner
       if (comparison != 0) {
         return comparison;
       } else {
-        // Since both the keys are exactly the same, we break the tie in favor
-        // of the key which came latest.
-        long leftSequenceID = left.getSequenceID();
-        long rightSequenceID = right.getSequenceID();
-        if (leftSequenceID > rightSequenceID) {
-          return -1;
-        } else if (leftSequenceID < rightSequenceID) {
-          return 1;
-        } else {
-          return 0;
-        }
+        // Since both the keys are exactly the same, we break the tie in favor 
of higher ordered
+        // scanner since it'll have newer data. Since higher value should come 
first, we reverse
+        // sort here.
+        return Long.compare(right.getScannerOrder(), left.getScannerOrder());
       }
     }
     /**
@@ -392,8 +385,11 @@ public class KeyValueHeap extends 
NonReversedNonLazyKeyValueScanner
     return this.heap;
   }
 
+  /**
+   * @see KeyValueScanner#getScannerOrder()
+   */
   @Override
-  public long getSequenceID() {
+  public long getScannerOrder() {
     return 0;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/a6f8214e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java
index aaff7fd..1757cb6 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java
@@ -70,13 +70,13 @@ public interface KeyValueScanner {
   boolean reseek(Cell key) throws IOException;
 
   /**
-   * Get the sequence id associated with this KeyValueScanner. This is required
-   * for comparing multiple files to find out which one has the latest data.
-   * The default implementation for this would be to return 0. A file having
-   * lower sequence id will be considered to be the older one.
+   * Get the order of this KeyValueScanner. This is only relevant for 
StoreFileScanners and
+   * MemStoreScanners (other scanners simply return 0). This is required for 
comparing multiple
+   * files to find out which one has the latest data. StoreFileScanners are 
ordered from 0
+   * (oldest) to newest in increasing order. MemStoreScanner gets LONG.max 
since it always
+   * contains freshest data.
    */
-  // TODO: Implement SequenceId Interface instead.
-  long getSequenceID();
+  long getScannerOrder();
 
   /**
    * Close the KeyValue scanner.

http://git-wip-us.apache.org/repos/asf/hbase/blob/a6f8214e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
index 845a8d2..c48380d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
@@ -174,7 +174,8 @@ public class StoreFile {
     Bytes.toBytes("BULKLOAD_TIMESTAMP");
 
   /**
-   * Map of the metadata entries in the corresponding HFile
+   * Map of the metadata entries in the corresponding HFile. Populated when 
Reader is opened
+   * after which it is not modified again.
    */
   private Map<byte[], byte[]> metadataMap;
 
@@ -247,6 +248,7 @@ public class StoreFile {
     this.fileInfo = other.fileInfo;
     this.cacheConf = other.cacheConf;
     this.cfBloomType = other.cfBloomType;
+    this.metadataMap = other.metadataMap;
   }
 
   /**
@@ -380,7 +382,7 @@ public class StoreFile {
     if (startPos != -1) {
       bulkLoadedHFile = true;
     }
-    return bulkLoadedHFile || metadataMap.containsKey(BULKLOAD_TIME_KEY);
+    return bulkLoadedHFile || (metadataMap != null && 
metadataMap.containsKey(BULKLOAD_TIME_KEY));
   }
 
   @VisibleForTesting
@@ -1155,37 +1157,43 @@ public class StoreFile {
     }
 
     /**
-     * Get a scanner to scan over this StoreFile. Do not use
-     * this overload if using this scanner for compactions.
-     *
-     * @param cacheBlocks should this scanner cache blocks?
-     * @param pread use pread (for highly concurrent small readers)
-     * @return a scanner
+     * Uses {@link #getStoreFileScanner(boolean, boolean, boolean, long, 
long)} by setting
+     * {@code isCompaction} to false, {@code readPt} to 0 and {@code 
scannerOrder} to 0.
+     * Do not use this overload if using this scanner for compactions.
+     * @see #getStoreFileScanner(boolean, boolean, boolean, long, long)
      */
-    public StoreFileScanner getStoreFileScanner(boolean cacheBlocks,
-                                               boolean pread) {
-      return getStoreFileScanner(cacheBlocks, pread, false,
-        // 0 is passed as readpoint because this method is only used by test
-        // where StoreFile is directly operated upon
-        0);
+    public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, boolean 
pread) {
+      // 0 is passed as readpoint because this method is only used by test
+      // where StoreFile is directly operated upon
+      return getStoreFileScanner(cacheBlocks, pread, false, 0, 0);
+    }
+
+    /**
+     * Uses {@link #getStoreFileScanner(boolean, boolean, boolean, long, 
long)} by setting
+     * {@code scannerOrder} to 0.
+     * @see #getStoreFileScanner(boolean, boolean, boolean, long, long)
+     */
+    public StoreFileScanner getStoreFileScanner(
+        boolean cacheBlocks, boolean pread, boolean isCompaction, long readPt) 
{
+      return getStoreFileScanner(cacheBlocks, pread, isCompaction, readPt, 0);
     }
 
     /**
      * Get a scanner to scan over this StoreFile.
-     *
      * @param cacheBlocks should this scanner cache blocks?
      * @param pread use pread (for highly concurrent small readers)
      * @param isCompaction is scanner being used for compaction?
+     * @param scannerOrder Order of this scanner relative to other scanners. 
See
+     *   {@link KeyValueScanner#getScannerOrder()}.
      * @return a scanner
      */
-    public StoreFileScanner getStoreFileScanner(boolean cacheBlocks,
-                                               boolean pread,
-                                               boolean isCompaction, long 
readPt) {
+    public StoreFileScanner getStoreFileScanner(
+        boolean cacheBlocks, boolean pread, boolean isCompaction, long readPt, 
long scannerOrder) {
       // Increment the ref count
       refCount.incrementAndGet();
-      return new StoreFileScanner(this,
-                                 getScanner(cacheBlocks, pread, isCompaction),
-                                 !isCompaction, reader.hasMVCCInfo(), readPt);
+      return new StoreFileScanner(
+          this, getScanner(cacheBlocks, pread, isCompaction), !isCompaction, 
reader.hasMVCCInfo(),
+          readPt, scannerOrder);
     }
 
     /**
@@ -1250,7 +1258,7 @@ public class StoreFile {
     /**
      * Check if this storeFile may contain keys within the TimeRange that
      * have not expired (i.e. not older than oldestUnexpiredTS).
-     * @param timeRange the timeRange to restrict
+     * @param tr the timeRange to restrict
      * @param oldestUnexpiredTS the oldest timestamp that is not expired, as
      *          determined by the column family's TTL
      * @return false if queried keys definitely don't exist in this StoreFile

http://git-wip-us.apache.org/repos/asf/hbase/blob/a6f8214e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
index 4055188..0608756 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
@@ -23,6 +23,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -69,17 +70,37 @@ public class StoreFileScanner implements KeyValueScanner {
 
   private long readPt;
 
+  // Order of this scanner relative to other scanners when duplicate key-value 
is found.
+  // Higher values means scanner has newer data.
+  private long scannerOrder;
+
   /**
    * Implements a {@link KeyValueScanner} on top of the specified {@link 
HFileScanner}
-   * @param hfs HFile scanner
+   * @param useMVCC If true, scanner will filter out updates with MVCC larger 
than {@code readPt}.
+   * @param readPt MVCC value to use to filter out the updates newer than this 
scanner.
+   * @param hasMVCC Set to true if underlying store file reader has MVCC info.
    */
   public StoreFileScanner(StoreFile.Reader reader, HFileScanner hfs, boolean 
useMVCC,
       boolean hasMVCC, long readPt) {
+    this (reader, hfs, useMVCC, hasMVCC, readPt, 0);
+  }
+
+  /**
+   * Implements a {@link KeyValueScanner} on top of the specified {@link 
HFileScanner}
+   * @param useMVCC If true, scanner will filter out updates with MVCC larger 
than {@code readPt}.
+   * @param readPt MVCC value to use to filter out the updates newer than this 
scanner.
+   * @param hasMVCC Set to true if underlying store file reader has MVCC info.
+   * @param scannerOrder Order of the scanner relative to other scanners.
+   *   See {@link KeyValueScanner#getScannerOrder()}.
+   */
+  public StoreFileScanner(StoreFile.Reader reader, HFileScanner hfs, boolean 
useMVCC,
+      boolean hasMVCC, long readPt, long scannerOrder) {
     this.readPt = readPt;
     this.reader = reader;
     this.hfs = hfs;
     this.enforceMVCC = useMVCC;
     this.hasMVCCInfo = hasMVCC;
+    this.scannerOrder = scannerOrder;
   }
 
   boolean isPrimaryReplica() {
@@ -119,11 +140,13 @@ public class StoreFileScanner implements KeyValueScanner {
       ScanQueryMatcher matcher, long readPt, boolean isPrimaryReplica) throws 
IOException {
     List<StoreFileScanner> scanners = new ArrayList<StoreFileScanner>(
         files.size());
-    for (StoreFile file : files) {
-      StoreFile.Reader r = file.createReader(canUseDrop);
+    List<StoreFile> sorted_files = new ArrayList<>(files);
+    Collections.sort(sorted_files, StoreFile.Comparators.SEQ_ID);
+    for (int i = 0; i < sorted_files.size(); i++) {
+      StoreFile.Reader r = sorted_files.get(i).createReader(canUseDrop);
       r.setReplicaStoreFile(isPrimaryReplica);
       StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread,
-          isCompaction, readPt);
+          isCompaction, readPt, i);
       scanner.setScanQueryMatcher(matcher);
       scanners.add(scanner);
     }
@@ -308,9 +331,12 @@ public class StoreFileScanner implements KeyValueScanner {
     return s.next();
   }
 
+  /**
+   * @see KeyValueScanner#getScannerOrder()
+   */
   @Override
-  public long getSequenceID() {
-    return reader.getSequenceID();
+  public long getScannerOrder() {
+    return scannerOrder;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/a6f8214e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index 24b12fb..d8882a0 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -906,8 +906,11 @@ public class StoreScanner extends 
NonReversedNonLazyKeyValueScanner
     return false;
   }
 
+  /**
+   * @see KeyValueScanner#getScannerOrder()
+   */
   @Override
-  public long getSequenceID() {
+  public long getScannerOrder() {
     return 0;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/a6f8214e/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java
index 76079a7..5a43765 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java
@@ -116,8 +116,11 @@ public class CollectionBackedScanner extends 
NonReversedNonLazyKeyValueScanner {
     return false;
   }
 
+  /**
+   * @see 
org.apache.hadoop.hbase.regionserver.KeyValueScanner#getScannerOrder()
+   */
   @Override
-  public long getSequenceID() {
+  public long getScannerOrder() {
     return 0;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/a6f8214e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java
index a54a589..c62df07 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java
@@ -21,7 +21,7 @@ package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Iterator;
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.hadoop.hbase.Cell;
@@ -30,175 +30,107 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CollectionBackedScanner;
+import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 @Category(SmallTests.class)
 public class TestKeyValueHeap extends HBaseTestCase {
-  private static final boolean PRINT = false;
-
-  List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
-
-  private byte[] row1;
-  private byte[] fam1;
-  private byte[] col1;
-  private byte[] data;
+  private byte[] row1 = Bytes.toBytes("row1");
+  private byte[] fam1 = Bytes.toBytes("fam1");
+  private byte[] col1 = Bytes.toBytes("col1");
+  private byte[] data = Bytes.toBytes("data");
+
+  private byte[] row2 = Bytes.toBytes("row2");
+  private byte[] fam2 = Bytes.toBytes("fam2");
+  private byte[] col2 = Bytes.toBytes("col2");
+
+  private byte[] col3 = Bytes.toBytes("col3");
+  private byte[] col4 = Bytes.toBytes("col4");
+  private byte[] col5 = Bytes.toBytes("col5");
+
+  // Variable name encoding. kv<row#><fam#><col#>
+  Cell kv111 = new KeyValue(row1, fam1, col1, data);
+  Cell kv112 = new KeyValue(row1, fam1, col2, data);
+  Cell kv113 = new KeyValue(row1, fam1, col3, data);
+  Cell kv114 = new KeyValue(row1, fam1, col4, data);
+  Cell kv115 = new KeyValue(row1, fam1, col5, data);
+  Cell kv121 = new KeyValue(row1, fam2, col1, data);
+  Cell kv122 = new KeyValue(row1, fam2, col2, data);
+  Cell kv211 = new KeyValue(row2, fam1, col1, data);
+  Cell kv212 = new KeyValue(row2, fam1, col2, data);
+  Cell kv213 = new KeyValue(row2, fam1, col3, data);
+
+  TestScanner s1 = new TestScanner(Arrays.asList(kv115, kv211, kv212));
+  TestScanner s2 = new TestScanner(Arrays.asList(kv111, kv112));
+  TestScanner s3 = new TestScanner(Arrays.asList(kv113, kv114, kv121, kv122, 
kv213));
+
+  List<KeyValueScanner> scanners = new 
ArrayList<KeyValueScanner>(Arrays.asList(s1, s2, s3));
+
+  /*
+   * Uses {@code scanners} to build a KeyValueHeap, iterates over it and 
asserts that returned
+   * Cells are same as {@code expected}.
+   * @return List of Cells returned from scanners.
+   */
+  public List<Cell> assertCells(List<Cell> expected, List<KeyValueScanner> 
scanners)
+      throws IOException {
+    //Creating KeyValueHeap
+    KeyValueHeap kvh = new KeyValueHeap(scanners, KeyValue.COMPARATOR);
 
-  private byte[] row2;
-  private byte[] fam2;
-  private byte[] col2;
+    List<Cell> actual = new ArrayList<>();
+    while(kvh.peek() != null){
+      actual.add(kvh.next());
+    }
 
-  private byte[] col3;
-  private byte[] col4;
-  private byte[] col5;
+    assertEquals(expected, actual);
+    return actual;
+  }
 
   public void setUp() throws Exception {
     super.setUp();
-    data = Bytes.toBytes("data");
-    row1 = Bytes.toBytes("row1");
-    fam1 = Bytes.toBytes("fam1");
-    col1 = Bytes.toBytes("col1");
-    row2 = Bytes.toBytes("row2");
-    fam2 = Bytes.toBytes("fam2");
-    col2 = Bytes.toBytes("col2");
-    col3 = Bytes.toBytes("col3");
-    col4 = Bytes.toBytes("col4");
-    col5 = Bytes.toBytes("col5");
   }
 
   public void testSorted() throws IOException{
     //Cases that need to be checked are:
-    //1. The "smallest" KeyValue is in the same scanners as current
+    //1. The "smallest" Cell is in the same scanners as current
     //2. Current scanner gets empty
 
-    List<Cell> l1 = new ArrayList<Cell>();
-    l1.add(new KeyValue(row1, fam1, col5, data));
-    l1.add(new KeyValue(row2, fam1, col1, data));
-    l1.add(new KeyValue(row2, fam1, col2, data));
-    scanners.add(new Scanner(l1));
-
-    List<Cell> l2 = new ArrayList<Cell>();
-    l2.add(new KeyValue(row1, fam1, col1, data));
-    l2.add(new KeyValue(row1, fam1, col2, data));
-    scanners.add(new Scanner(l2));
-
-    List<Cell> l3 = new ArrayList<Cell>();
-    l3.add(new KeyValue(row1, fam1, col3, data));
-    l3.add(new KeyValue(row1, fam1, col4, data));
-    l3.add(new KeyValue(row1, fam2, col1, data));
-    l3.add(new KeyValue(row1, fam2, col2, data));
-    l3.add(new KeyValue(row2, fam1, col3, data));
-    scanners.add(new Scanner(l3));
-
-    List<KeyValue> expected = new ArrayList<KeyValue>();
-    expected.add(new KeyValue(row1, fam1, col1, data));
-    expected.add(new KeyValue(row1, fam1, col2, data));
-    expected.add(new KeyValue(row1, fam1, col3, data));
-    expected.add(new KeyValue(row1, fam1, col4, data));
-    expected.add(new KeyValue(row1, fam1, col5, data));
-    expected.add(new KeyValue(row1, fam2, col1, data));
-    expected.add(new KeyValue(row1, fam2, col2, data));
-    expected.add(new KeyValue(row2, fam1, col1, data));
-    expected.add(new KeyValue(row2, fam1, col2, data));
-    expected.add(new KeyValue(row2, fam1, col3, data));
-
-    //Creating KeyValueHeap
-    KeyValueHeap kvh =
-      new KeyValueHeap(scanners, KeyValue.COMPARATOR);
-
-    List<Cell> actual = new ArrayList<Cell>();
-    while(kvh.peek() != null){
-      actual.add(kvh.next());
-    }
+    List<Cell> expected = Arrays.asList(
+        kv111, kv112, kv113, kv114, kv115, kv121, kv122, kv211, kv212, kv213);
 
-    assertEquals(expected.size(), actual.size());
-    for(int i=0; i<expected.size(); i++){
-      assertEquals(expected.get(i), actual.get(i));
-      if(PRINT){
-        System.out.println("expected " +expected.get(i)+
-            "\nactual   " +actual.get(i) +"\n");
-      }
-    }
+    List<Cell> actual = assertCells(expected, scanners);
 
     //Check if result is sorted according to Comparator
     for(int i=0; i<actual.size()-1; i++){
       int ret = KeyValue.COMPARATOR.compare(actual.get(i), actual.get(i+1));
       assertTrue(ret < 0);
     }
-
   }
 
   public void testSeek() throws IOException {
     //Cases:
-    //1. Seek KeyValue that is not in scanner
+    //1. Seek Cell that is not in scanner
     //2. Check that smallest that is returned from a seek is correct
 
-    List<Cell> l1 = new ArrayList<Cell>();
-    l1.add(new KeyValue(row1, fam1, col5, data));
-    l1.add(new KeyValue(row2, fam1, col1, data));
-    l1.add(new KeyValue(row2, fam1, col2, data));
-    scanners.add(new Scanner(l1));
-
-    List<Cell> l2 = new ArrayList<Cell>();
-    l2.add(new KeyValue(row1, fam1, col1, data));
-    l2.add(new KeyValue(row1, fam1, col2, data));
-    scanners.add(new Scanner(l2));
-
-    List<Cell> l3 = new ArrayList<Cell>();
-    l3.add(new KeyValue(row1, fam1, col3, data));
-    l3.add(new KeyValue(row1, fam1, col4, data));
-    l3.add(new KeyValue(row1, fam2, col1, data));
-    l3.add(new KeyValue(row1, fam2, col2, data));
-    l3.add(new KeyValue(row2, fam1, col3, data));
-    scanners.add(new Scanner(l3));
-
-    List<KeyValue> expected = new ArrayList<KeyValue>();
-    expected.add(new KeyValue(row2, fam1, col1, data));
+    List<Cell> expected = Arrays.asList(kv211);
 
     //Creating KeyValueHeap
     KeyValueHeap kvh =
       new KeyValueHeap(scanners, KeyValue.COMPARATOR);
 
-    KeyValue seekKv = new KeyValue(row2, fam1, null, null);
+    Cell seekKv = new KeyValue(row2, fam1, null, null);
     kvh.seek(seekKv);
 
-    List<Cell> actual = new ArrayList<Cell>();
-    actual.add(kvh.peek());
-
-    assertEquals(expected.size(), actual.size());
-    for(int i=0; i<expected.size(); i++){
-      assertEquals(expected.get(i), actual.get(i));
-      if(PRINT){
-        System.out.println("expected " +expected.get(i)+
-            "\nactual   " +actual.get(i) +"\n");
-      }
-    }
+    List<Cell> actual = Arrays.asList(kvh.peek());
 
+    assertEquals("Expected = " + Arrays.toString(expected.toArray())
+        + "\n Actual = " + Arrays.toString(actual.toArray()), expected, 
actual);
   }
 
   public void testScannerLeak() throws IOException {
     // Test for unclosed scanners (HBASE-1927)
 
-    List<Cell> l1 = new ArrayList<Cell>();
-    l1.add(new KeyValue(row1, fam1, col5, data));
-    l1.add(new KeyValue(row2, fam1, col1, data));
-    l1.add(new KeyValue(row2, fam1, col2, data));
-    scanners.add(new Scanner(l1));
-
-    List<Cell> l2 = new ArrayList<Cell>();
-    l2.add(new KeyValue(row1, fam1, col1, data));
-    l2.add(new KeyValue(row1, fam1, col2, data));
-    scanners.add(new Scanner(l2));
-
-    List<Cell> l3 = new ArrayList<Cell>();
-    l3.add(new KeyValue(row1, fam1, col3, data));
-    l3.add(new KeyValue(row1, fam1, col4, data));
-    l3.add(new KeyValue(row1, fam2, col1, data));
-    l3.add(new KeyValue(row1, fam2, col2, data));
-    l3.add(new KeyValue(row2, fam1, col3, data));
-    scanners.add(new Scanner(l3));
-
-    List<Cell> l4 = new ArrayList<Cell>();
-    scanners.add(new Scanner(l4));
+    TestScanner s4 = new TestScanner(new ArrayList<Cell>());
+    scanners.add(s4);
 
     //Creating KeyValueHeap
     KeyValueHeap kvh = new KeyValueHeap(scanners, KeyValue.COMPARATOR);
@@ -206,45 +138,26 @@ public class TestKeyValueHeap extends HBaseTestCase {
     while(kvh.next() != null);
 
     for(KeyValueScanner scanner : scanners) {
-      assertTrue(((Scanner)scanner).isClosed());
+      assertTrue(((TestScanner)scanner).isClosed());
     }
   }
 
   public void testScannerException() throws IOException {
     // Test for NPE issue when exception happens in scanners (HBASE-13835)
 
-    List<Cell> l1 = new ArrayList<Cell>();
-    l1.add(new KeyValue(row1, fam1, col5, data));
-    l1.add(new KeyValue(row2, fam1, col1, data));
-    l1.add(new KeyValue(row2, fam1, col2, data));
-    SeekScanner s1 = new SeekScanner(l1);
-    scanners.add(s1);
-
-    List<Cell> l2 = new ArrayList<Cell>();
-    l2.add(new KeyValue(row1, fam1, col1, data));
-    l2.add(new KeyValue(row1, fam1, col2, data));
-    SeekScanner s2 = new SeekScanner(l2);
-    scanners.add(s2);
-
-    List<Cell> l3 = new ArrayList<Cell>();
-    l3.add(new KeyValue(row1, fam1, col3, data));
-    l3.add(new KeyValue(row1, fam1, col4, data));
-    l3.add(new KeyValue(row1, fam2, col1, data));
-    l3.add(new KeyValue(row1, fam2, col2, data));
-    l3.add(new KeyValue(row2, fam1, col3, data));
-    SeekScanner s3 = new SeekScanner(l3);
-    scanners.add(s3);
-
-    List<Cell> l4 = new ArrayList<Cell>();
-    SeekScanner s4 = new SeekScanner(l4);
-    scanners.add(s4);
+    TestScanner s1 = new SeekTestScanner(Arrays.asList(kv115, kv211, kv212));
+    TestScanner s2 = new SeekTestScanner(Arrays.asList(kv111, kv112));
+    TestScanner s3 = new SeekTestScanner(Arrays.asList(kv113, kv114, kv121, 
kv122, kv213));
+    TestScanner s4 = new SeekTestScanner(new ArrayList<Cell>());
+
+    List<KeyValueScanner> scanners = new 
ArrayList<KeyValueScanner>(Arrays.asList(s1, s2, s3, s4));
 
     // Creating KeyValueHeap
     KeyValueHeap kvh = new KeyValueHeap(scanners, KeyValue.COMPARATOR);
 
     try {
       for (KeyValueScanner scanner : scanners) {
-        ((SeekScanner) scanner).setRealSeekDone(false);
+        ((SeekTestScanner) scanner).setRealSeekDone(false);
       }
       while (kvh.next() != null);
       // The pollRealKV should throw IOE.
@@ -256,20 +169,47 @@ public class TestKeyValueHeap extends HBaseTestCase {
     // It implies there is no NPE thrown from kvh.close() if getting here
     for (KeyValueScanner scanner : scanners) {
       // Verify that close is called and only called once for each scanner
-      assertTrue(((SeekScanner) scanner).isClosed());
-      assertEquals(((SeekScanner) scanner).getClosedNum(), 1);
+      assertTrue(((SeekTestScanner) scanner).isClosed());
+      assertEquals(((SeekTestScanner) scanner).getClosedNum(), 1);
     }
   }
 
-  private static class Scanner extends CollectionBackedScanner {
-    private Iterator<Cell> iter;
-    private Cell current;
+  @Test
+  public void testPriorityId() throws IOException {
+    Cell kv113A = new KeyValue(row1, fam1, col3, Bytes.toBytes("aaa"));
+    Cell kv113B = new KeyValue(row1, fam1, col3, Bytes.toBytes("bbb"));
+    {
+      TestScanner scan1 = new TestScanner(Arrays.asList(kv111, kv112, kv113A), 
1);
+      TestScanner scan2 = new TestScanner(Arrays.asList(kv113B), 2);
+      List<Cell> expected = Arrays.asList(kv111, kv112, kv113B, kv113A);
+      assertCells(expected, new 
ArrayList<KeyValueScanner>(Arrays.asList(scan1, scan2)));
+    }
+    {
+      TestScanner scan1 = new TestScanner(Arrays.asList(kv111, kv112, kv113A), 
2);
+      TestScanner scan2 = new TestScanner(Arrays.asList(kv113B), 1);
+      List<Cell> expected = Arrays.asList(kv111, kv112, kv113A, kv113B);
+      assertCells(expected, new 
ArrayList<KeyValueScanner>(Arrays.asList(scan1, scan2)));
+    }
+  }
+
+  private static class TestScanner extends CollectionBackedScanner {
     private boolean closed = false;
+    private long scannerOrder = 0;
 
-    public Scanner(List<Cell> list) {
+    public TestScanner(List<Cell> list) {
       super(list);
     }
 
+    public TestScanner(List<Cell> list, long scannerOrder) {
+      this(list);
+      this.scannerOrder = scannerOrder;
+    }
+
+    @Override
+    public long getScannerOrder() {
+      return scannerOrder;
+    }
+
     @Override
     public void close(){
       closed = true;
@@ -280,11 +220,11 @@ public class TestKeyValueHeap extends HBaseTestCase {
     }
   }
 
-  private static class SeekScanner extends Scanner {
+  private static class SeekTestScanner extends TestScanner {
     private int closedNum = 0;
     private boolean realSeekDone = true;
 
-    public SeekScanner(List<Cell> list) {
+    public SeekTestScanner(List<Cell> list) {
       super(list);
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/a6f8214e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java
index da4593b..074a22e 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.util.BloomFilterFactory;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ChecksumType;
 import org.apache.hadoop.hbase.util.FSUtils;
+import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.mockito.Mockito;
 
@@ -233,6 +234,7 @@ public class TestStoreFile extends HBaseTestCase {
     assertEquals((LAST_CHAR - FIRST_CHAR + 1) * (LAST_CHAR - FIRST_CHAR + 1), 
count);
   }
 
+  @Test
   public void testEmptyStoreFileRestrictKeyRanges() throws Exception {
     StoreFile.Reader reader = mock(StoreFile.Reader.class);
     Store store = mock(Store.class);
@@ -241,7 +243,7 @@ public class TestStoreFile extends HBaseTestCase {
     when(hcd.getName()).thenReturn(cf);
     when(store.getFamily()).thenReturn(hcd);
     StoreFileScanner scanner =
-        new StoreFileScanner(reader, mock(HFileScanner.class), false, false, 
0);
+        new StoreFileScanner(reader, mock(HFileScanner.class), false, false, 
0, 0);
     Scan scan = new Scan();
     scan.setColumnFamilyTimeRange(cf, 0, 1);
     assertFalse(scanner.shouldUseScanner(scan, store, 0));

http://git-wip-us.apache.org/repos/asf/hbase/blob/a6f8214e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java
index 6ec4cd4..328f7d9 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java
@@ -62,7 +62,7 @@ public class TestCompactor {
     when(r.length()).thenReturn(1L);
     when(r.getBloomFilterType()).thenReturn(BloomType.NONE);
     when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class));
-    when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), 
anyLong()))
+    when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), 
anyLong(), anyLong()))
         .thenReturn(mock(StoreFileScanner.class));
     when(sf.getReader()).thenReturn(r);
     when(sf.createReader()).thenReturn(r);

http://git-wip-us.apache.org/repos/asf/hbase/blob/a6f8214e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
index 4d47840..a9528a9 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
@@ -738,7 +738,8 @@ public class TestStripeCompactionPolicy {
     when(r.length()).thenReturn(size);
     when(r.getBloomFilterType()).thenReturn(BloomType.NONE);
     when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class));
-    when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), 
anyLong())).thenReturn(
+    when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), 
anyLong(), anyLong()))
+        .thenReturn(
       mock(StoreFileScanner.class));
     when(sf.getReader()).thenReturn(r);
     when(sf.createReader(anyBoolean())).thenReturn(r);

Reply via email to