HBASE-15236 Inconsistent cell reads over multiple bulk-loaded HFiles. In 
KeyValueHeap, if two cells are same i.e. have same key and timestamp, then 
instead of directly using seq id to determine newer one, we should use 
StoreFile.Comparater.SEQ_ID because that's what is used to determine order of 
hfiles. In this patch, we assign each scanner an order based on it's index in 
storefiles list, which is then used in KeyValueHeap to disambiguate between 
same cells. Changes the getSequenceId() in KeyValueScanner class to 
getScannerOrder(). Testing: Adds unit test to TestKeyValueHeap. Manual testing: 
Three cases (Tables t, t2, t3 in the jira description), single region, 2 hfiles 
with same seq id, timestamps and duplicate KVs. Made sure that returned kv was 
same for get and scan. (Apekshit)

Change-Id: I22600c91c0a51fb63eb17db73472839d2f13957c

Signed-off-by: stack <st...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/11740570
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/11740570
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/11740570

Branch: refs/heads/hbase-12439
Commit: 11740570c1440254a76fae67d318c6a852cb56b8
Parents: a6e2967
Author: Apekshit <apeksha...@gmail.com>
Authored: Tue Feb 23 00:31:18 2016 -0800
Committer: stack <st...@apache.org>
Committed: Mon May 9 16:57:06 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/KeyValueHeap.java |  20 +-
 .../hbase/regionserver/KeyValueScanner.java     |  12 +-
 .../hbase/regionserver/MemStoreScanner.java     |   7 +-
 .../hbase/regionserver/SegmentScanner.java      |  35 +--
 .../hadoop/hbase/regionserver/StoreFile.java    |   6 +-
 .../hbase/regionserver/StoreFileReader.java     |  43 +--
 .../hbase/regionserver/StoreFileScanner.java    |  38 ++-
 .../hadoop/hbase/regionserver/StoreScanner.java |   5 +-
 .../hbase/util/CollectionBackedScanner.java     |   5 +-
 .../hbase/regionserver/TestKeyValueHeap.java    | 269 +++++++------------
 .../hbase/regionserver/TestStoreFile.java       |   2 +-
 .../regionserver/compactions/TestCompactor.java |   2 +-
 .../compactions/TestStripeCompactionPolicy.java |   3 +-
 13 files changed, 212 insertions(+), 235 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/11740570/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
index 89fc8fb..9ece14b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
@@ -189,17 +189,10 @@ public class KeyValueHeap extends 
NonReversedNonLazyKeyValueScanner
       if (comparison != 0) {
         return comparison;
       } else {
-        // Since both the keys are exactly the same, we break the tie in favor
-        // of the key which came latest.
-        long leftSequenceID = left.getSequenceID();
-        long rightSequenceID = right.getSequenceID();
-        if (leftSequenceID > rightSequenceID) {
-          return -1;
-        } else if (leftSequenceID < rightSequenceID) {
-          return 1;
-        } else {
-          return 0;
-        }
+        // Since both the keys are exactly the same, we break the tie in favor 
of higher ordered
+        // scanner since it'll have newer data. Since higher value should come 
first, we reverse
+        // sort here.
+        return Long.compare(right.getScannerOrder(), left.getScannerOrder());
       }
     }
     /**
@@ -406,8 +399,11 @@ public class KeyValueHeap extends 
NonReversedNonLazyKeyValueScanner
     return this.heap;
   }
 
+  /**
+   * @see KeyValueScanner#getScannerOrder()
+   */
   @Override
-  public long getSequenceID() {
+  public long getScannerOrder() {
     return 0;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/11740570/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java
index ed86a83..44b081b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java
@@ -70,13 +70,13 @@ public interface KeyValueScanner extends Shipper, Closeable 
{
   boolean reseek(Cell key) throws IOException;
 
   /**
-   * Get the sequence id associated with this KeyValueScanner. This is required
-   * for comparing multiple files to find out which one has the latest data.
-   * The default implementation for this would be to return 0. A file having
-   * lower sequence id will be considered to be the older one.
+   * Get the order of this KeyValueScanner. This is only relevant for 
StoreFileScanners and
+   * MemStoreScanners (other scanners simply return 0). This is required for 
comparing multiple
+   * files to find out which one has the latest data. StoreFileScanners are 
ordered from 0
+   * (oldest) to newest in increasing order. MemStoreScanner gets LONG.max 
since it always
+   * contains freshest data.
    */
-  // TODO: Implement SequenceId Interface instead.
-  long getSequenceID();
+  long getScannerOrder();
 
   /**
    * Close the KeyValue scanner.

http://git-wip-us.apache.org/repos/asf/hbase/blob/11740570/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java
index dfcec25..01a7ff3 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java
@@ -186,11 +186,12 @@ public class MemStoreScanner extends 
NonLazyKeyValueScanner {
   }
 
   /**
-   * MemStoreScanner returns max value as sequence id because it will
-   * always have the latest data among all files.
+   * MemStoreScanner returns Long.MAX_VALUE because it will always have the 
latest data among all
+   * scanners.
+   * @see KeyValueScanner#getScannerOrder()
    */
   @Override
-  public synchronized long getSequenceID() {
+  public long getScannerOrder() {
     return Long.MAX_VALUE;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/11740570/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
index b5aabb8..45f72d83 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
@@ -33,7 +33,12 @@ import org.apache.hadoop.hbase.client.Scan;
 @InterfaceAudience.Private
 public class SegmentScanner implements KeyValueScanner {
 
-  private long sequenceID = Long.MAX_VALUE;
+  /**
+   * Order of this scanner relative to other scanners. See
+   * {@link KeyValueScanner#getScannerOrder()}.
+   */
+  private long scannerOrder;
+  private static final long DEFAULT_SCANNER_ORDER = Long.MAX_VALUE;
 
   // the observed structure
   private final Segment segment;
@@ -52,6 +57,13 @@ public class SegmentScanner implements KeyValueScanner {
   private Cell last = null;
 
   protected SegmentScanner(Segment segment, long readPoint) {
+    this(segment, readPoint, DEFAULT_SCANNER_ORDER);
+  }
+
+  /**
+   * @param scannerOrder see {@link KeyValueScanner#getScannerOrder()}.
+   */
+  protected SegmentScanner(Segment segment, long readPoint, long scannerOrder) 
{
     this.segment = segment;
     this.readPoint = readPoint;
     iter = segment.iterator();
@@ -59,6 +71,7 @@ public class SegmentScanner implements KeyValueScanner {
     current = getNext();
     //increase the reference count so the underlying structure will not be 
de-allocated
     this.segment.incScannerCount();
+    this.scannerOrder = scannerOrder;
   }
 
   /**
@@ -208,14 +221,11 @@ public class SegmentScanner implements KeyValueScanner {
   }
 
   /**
-   * Get the sequence id associated with this KeyValueScanner. This is required
-   * for comparing multiple files (or memstore segments) scanners to find out
-   * which one has the latest data.
-   *
+   * @see KeyValueScanner#getScannerOrder()
    */
   @Override
-  public long getSequenceID() {
-    return sequenceID;
+  public long getScannerOrder() {
+    return scannerOrder;
   }
 
   /**
@@ -297,15 +307,6 @@ public class SegmentScanner implements KeyValueScanner {
   }
 
   /**
-   * Set the sequence id of the scanner.
-   * This is used to determine an order between memory segment scanners.
-   * @param x a unique sequence id
-   */
-  public void setSequenceID(long x) {
-    sequenceID = x;
-  }
-
-  /**
    * Returns whether the given scan should seek in this segment
    * @return whether the given scan should seek in this segment
    */
@@ -321,7 +322,7 @@ public class SegmentScanner implements KeyValueScanner {
   @Override
   public String toString() {
     String res = "Store segment scanner of type "+this.getClass().getName()+"; 
";
-    res += "sequence id "+getSequenceID()+"; ";
+    res += "Scanner order " + getScannerOrder() + "; ";
     res += getSegment().toString();
     return res;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/11740570/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
index b14c6b6..46f0cb8 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
@@ -157,7 +157,8 @@ public class StoreFile {
     Bytes.toBytes("BULKLOAD_TIMESTAMP");
 
   /**
-   * Map of the metadata entries in the corresponding HFile
+   * Map of the metadata entries in the corresponding HFile. Populated when 
Reader is opened
+   * after which it is not modified again.
    */
   private Map<byte[], byte[]> metadataMap;
 
@@ -237,6 +238,7 @@ public class StoreFile {
     this.fileInfo = other.fileInfo;
     this.cacheConf = other.cacheConf;
     this.cfBloomType = other.cfBloomType;
+    this.metadataMap = other.metadataMap;
   }
 
   /**
@@ -370,7 +372,7 @@ public class StoreFile {
     if (startPos != -1) {
       bulkLoadedHFile = true;
     }
-    return bulkLoadedHFile || metadataMap.containsKey(BULKLOAD_TIME_KEY);
+    return bulkLoadedHFile || (metadataMap != null && 
metadataMap.containsKey(BULKLOAD_TIME_KEY));
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/hbase/blob/11740570/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
index 4e7e829..e890381 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
@@ -114,19 +114,27 @@ public class StoreFileReader {
   }
 
   /**
-   * Get a scanner to scan over this StoreFile. Do not use
-   * this overload if using this scanner for compactions.
+   * Uses {@link #getStoreFileScanner(boolean, boolean, boolean, long, long)} 
by setting
+   * {@code isCompaction} to false, {@code readPt} to 0 and {@code 
scannerOrder} to 0.
+   * Do not use this overload if using this scanner for compactions.
    *
-   * @param cacheBlocks should this scanner cache blocks?
-   * @param pread use pread (for highly concurrent small readers)
-   * @return a scanner
+   * @see #getStoreFileScanner(boolean, boolean, boolean, long, long)
+   */
+  public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, boolean 
pread) {
+    // 0 is passed as readpoint because this method is only used by test
+    // where StoreFile is directly operated upon
+    return getStoreFileScanner(cacheBlocks, pread, false, 0, 0);
+  }
+
+  /**
+   * Uses {@link #getStoreFileScanner(boolean, boolean, boolean, long, long)} 
by setting
+   * {@code scannerOrder} to 0.
+   *
+   * @see #getStoreFileScanner(boolean, boolean, boolean, long, long)
    */
-  public StoreFileScanner getStoreFileScanner(boolean cacheBlocks,
-                                             boolean pread) {
-    return getStoreFileScanner(cacheBlocks, pread, false,
-      // 0 is passed as readpoint because this method is only used by test
-      // where StoreFile is directly operated upon
-      0);
+  public StoreFileScanner getStoreFileScanner(
+      boolean cacheBlocks, boolean pread, boolean isCompaction, long readPt) {
+    return getStoreFileScanner(cacheBlocks, pread, isCompaction, readPt, 0);
   }
 
   /**
@@ -135,16 +143,17 @@ public class StoreFileReader {
    * @param cacheBlocks should this scanner cache blocks?
    * @param pread use pread (for highly concurrent small readers)
    * @param isCompaction is scanner being used for compaction?
+   * @param scannerOrder Order of this scanner relative to other scanners. See
+   *  {@link KeyValueScanner#getScannerOrder()}.
    * @return a scanner
    */
-  public StoreFileScanner getStoreFileScanner(boolean cacheBlocks,
-                                             boolean pread,
-                                             boolean isCompaction, long 
readPt) {
+  public StoreFileScanner getStoreFileScanner(
+      boolean cacheBlocks, boolean pread, boolean isCompaction, long readPt, 
long scannerOrder) {
     // Increment the ref count
     refCount.incrementAndGet();
-    return new StoreFileScanner(this,
-                               getScanner(cacheBlocks, pread, isCompaction),
-                               !isCompaction, reader.hasMVCCInfo(), readPt);
+    return new StoreFileScanner(
+        this, getScanner(cacheBlocks, pread, isCompaction), !isCompaction, 
reader.hasMVCCInfo(),
+        readPt, scannerOrder);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/11740570/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
index e7f8f88..abade0e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
@@ -23,6 +23,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -65,17 +66,37 @@ public class StoreFileScanner implements KeyValueScanner {
 
   private long readPt;
 
+  // Order of this scanner relative to other scanners when duplicate key-value 
is found.
+  // Higher values means scanner has newer data.
+  private long scannerOrder;
+
   /**
    * Implements a {@link KeyValueScanner} on top of the specified {@link 
HFileScanner}
-   * @param hfs HFile scanner
+   * @param useMVCC If true, scanner will filter out updates with MVCC larger 
than {@code readPt}.
+   * @param readPt MVCC value to use to filter out the updates newer than this 
scanner.
+   * @param hasMVCC Set to true if underlying store file reader has MVCC info.
    */
   public StoreFileScanner(StoreFileReader reader, HFileScanner hfs, boolean 
useMVCC,
       boolean hasMVCC, long readPt) {
+    this (reader, hfs, useMVCC, hasMVCC, readPt, 0);
+  }
+
+  /**
+   * Implements a {@link KeyValueScanner} on top of the specified {@link 
HFileScanner}
+   * @param useMVCC If true, scanner will filter out updates with MVCC larger 
than {@code readPt}.
+   * @param readPt MVCC value to use to filter out the updates newer than this 
scanner.
+   * @param hasMVCC Set to true if underlying store file reader has MVCC info.
+   * @param scannerOrder Order of the scanner relative to other scanners.
+   *   See {@link KeyValueScanner#getScannerOrder()}.
+   */
+  public StoreFileScanner(StoreFileReader reader, HFileScanner hfs, boolean 
useMVCC,
+      boolean hasMVCC, long readPt, long scannerOrder) {
     this.readPt = readPt;
     this.reader = reader;
     this.hfs = hfs;
     this.enforceMVCC = useMVCC;
     this.hasMVCCInfo = hasMVCC;
+    this.scannerOrder = scannerOrder;
   }
 
   boolean isPrimaryReplica() {
@@ -115,11 +136,13 @@ public class StoreFileScanner implements KeyValueScanner {
       ScanQueryMatcher matcher, long readPt, boolean isPrimaryReplica) throws 
IOException {
     List<StoreFileScanner> scanners = new ArrayList<StoreFileScanner>(
         files.size());
-    for (StoreFile file : files) {
-      StoreFileReader r = file.createReader(canUseDrop);
+    List<StoreFile> sorted_files = new ArrayList<>(files);
+    Collections.sort(sorted_files, StoreFile.Comparators.SEQ_ID);
+    for (int i = 0; i < sorted_files.size(); i++) {
+      StoreFileReader r = sorted_files.get(i).createReader();
       r.setReplicaStoreFile(isPrimaryReplica);
       StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread,
-          isCompaction, readPt);
+          isCompaction, readPt, i);
       scanner.setScanQueryMatcher(matcher);
       scanners.add(scanner);
     }
@@ -303,9 +326,12 @@ public class StoreFileScanner implements KeyValueScanner {
     return s.next();
   }
 
+  /**
+   * @see KeyValueScanner#getScannerOrder()
+   */
   @Override
-  public long getSequenceID() {
-    return reader.getSequenceID();
+  public long getScannerOrder() {
+    return scannerOrder;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/11740570/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index ecae787..7ebae94 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -887,8 +887,11 @@ public class StoreScanner extends 
NonReversedNonLazyKeyValueScanner
     return false;
   }
 
+  /**
+   * @see KeyValueScanner#getScannerOrder()
+   */
   @Override
-  public long getSequenceID() {
+  public long getScannerOrder() {
     return 0;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/11740570/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java
index 4720880..3f05969 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java
@@ -116,8 +116,11 @@ public class CollectionBackedScanner extends 
NonReversedNonLazyKeyValueScanner {
     return false;
   }
 
+  /**
+   * @see 
org.apache.hadoop.hbase.regionserver.KeyValueScanner#getScannerOrder()
+   */
   @Override
-  public long getSequenceID() {
+  public long getScannerOrder() {
     return 0;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/11740570/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java
index aff40c1..b030c74 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java
@@ -21,7 +21,7 @@ package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Iterator;
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.hadoop.hbase.Cell;
@@ -38,178 +38,105 @@ import org.junit.experimental.categories.Category;
 
 @Category({RegionServerTests.class, SmallTests.class})
 public class TestKeyValueHeap extends HBaseTestCase {
-  private static final boolean PRINT = false;
-
-  List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
-
-  private byte[] row1;
-  private byte[] fam1;
-  private byte[] col1;
-  private byte[] data;
+  private byte[] row1 = Bytes.toBytes("row1");
+  private byte[] fam1 = Bytes.toBytes("fam1");
+  private byte[] col1 = Bytes.toBytes("col1");
+  private byte[] data = Bytes.toBytes("data");
+
+  private byte[] row2 = Bytes.toBytes("row2");
+  private byte[] fam2 = Bytes.toBytes("fam2");
+  private byte[] col2 = Bytes.toBytes("col2");
+
+  private byte[] col3 = Bytes.toBytes("col3");
+  private byte[] col4 = Bytes.toBytes("col4");
+  private byte[] col5 = Bytes.toBytes("col5");
+
+  // Variable name encoding. kv<row#><fam#><col#>
+  Cell kv111 = new KeyValue(row1, fam1, col1, data);
+  Cell kv112 = new KeyValue(row1, fam1, col2, data);
+  Cell kv113 = new KeyValue(row1, fam1, col3, data);
+  Cell kv114 = new KeyValue(row1, fam1, col4, data);
+  Cell kv115 = new KeyValue(row1, fam1, col5, data);
+  Cell kv121 = new KeyValue(row1, fam2, col1, data);
+  Cell kv122 = new KeyValue(row1, fam2, col2, data);
+  Cell kv211 = new KeyValue(row2, fam1, col1, data);
+  Cell kv212 = new KeyValue(row2, fam1, col2, data);
+  Cell kv213 = new KeyValue(row2, fam1, col3, data);
+
+  TestScanner s1 = new TestScanner(Arrays.asList(kv115, kv211, kv212));
+  TestScanner s2 = new TestScanner(Arrays.asList(kv111, kv112));
+  TestScanner s3 = new TestScanner(Arrays.asList(kv113, kv114, kv121, kv122, 
kv213));
+
+  List<KeyValueScanner> scanners = new 
ArrayList<KeyValueScanner>(Arrays.asList(s1, s2, s3));
+
+  /*
+   * Uses {@code scanners} to build a KeyValueHeap, iterates over it and 
asserts that returned
+   * Cells are same as {@code expected}.
+   * @return List of Cells returned from scanners.
+   */
+  public List<Cell> assertCells(List<Cell> expected, List<KeyValueScanner> 
scanners)
+      throws IOException {
+    //Creating KeyValueHeap
+    KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparator.COMPARATOR);
 
-  private byte[] row2;
-  private byte[] fam2;
-  private byte[] col2;
+    List<Cell> actual = new ArrayList<>();
+    while(kvh.peek() != null){
+      actual.add(kvh.next());
+    }
 
-  private byte[] col3;
-  private byte[] col4;
-  private byte[] col5;
+    assertEquals(expected, actual);
+    return actual;
+  }
 
   @Before
   public void setUp() throws Exception {
     super.setUp();
-    data = Bytes.toBytes("data");
-    row1 = Bytes.toBytes("row1");
-    fam1 = Bytes.toBytes("fam1");
-    col1 = Bytes.toBytes("col1");
-    row2 = Bytes.toBytes("row2");
-    fam2 = Bytes.toBytes("fam2");
-    col2 = Bytes.toBytes("col2");
-    col3 = Bytes.toBytes("col3");
-    col4 = Bytes.toBytes("col4");
-    col5 = Bytes.toBytes("col5");
   }
 
   @Test
   public void testSorted() throws IOException{
     //Cases that need to be checked are:
-    //1. The "smallest" KeyValue is in the same scanners as current
+    //1. The "smallest" Cell is in the same scanners as current
     //2. Current scanner gets empty
 
-    List<Cell> l1 = new ArrayList<Cell>();
-    l1.add(new KeyValue(row1, fam1, col5, data));
-    l1.add(new KeyValue(row2, fam1, col1, data));
-    l1.add(new KeyValue(row2, fam1, col2, data));
-    scanners.add(new Scanner(l1));
-
-    List<Cell> l2 = new ArrayList<Cell>();
-    l2.add(new KeyValue(row1, fam1, col1, data));
-    l2.add(new KeyValue(row1, fam1, col2, data));
-    scanners.add(new Scanner(l2));
-
-    List<Cell> l3 = new ArrayList<Cell>();
-    l3.add(new KeyValue(row1, fam1, col3, data));
-    l3.add(new KeyValue(row1, fam1, col4, data));
-    l3.add(new KeyValue(row1, fam2, col1, data));
-    l3.add(new KeyValue(row1, fam2, col2, data));
-    l3.add(new KeyValue(row2, fam1, col3, data));
-    scanners.add(new Scanner(l3));
-
-    List<KeyValue> expected = new ArrayList<KeyValue>();
-    expected.add(new KeyValue(row1, fam1, col1, data));
-    expected.add(new KeyValue(row1, fam1, col2, data));
-    expected.add(new KeyValue(row1, fam1, col3, data));
-    expected.add(new KeyValue(row1, fam1, col4, data));
-    expected.add(new KeyValue(row1, fam1, col5, data));
-    expected.add(new KeyValue(row1, fam2, col1, data));
-    expected.add(new KeyValue(row1, fam2, col2, data));
-    expected.add(new KeyValue(row2, fam1, col1, data));
-    expected.add(new KeyValue(row2, fam1, col2, data));
-    expected.add(new KeyValue(row2, fam1, col3, data));
+    List<Cell> expected = Arrays.asList(
+        kv111, kv112, kv113, kv114, kv115, kv121, kv122, kv211, kv212, kv213);
 
-    //Creating KeyValueHeap
-    KeyValueHeap kvh =
-      new KeyValueHeap(scanners, CellComparator.COMPARATOR);
-
-    List<Cell> actual = new ArrayList<Cell>();
-    while(kvh.peek() != null){
-      actual.add(kvh.next());
-    }
-
-    assertEquals(expected.size(), actual.size());
-    for(int i=0; i<expected.size(); i++){
-      assertEquals(expected.get(i), actual.get(i));
-      if(PRINT){
-        System.out.println("expected " +expected.get(i)+
-            "\nactual   " +actual.get(i) +"\n");
-      }
-    }
+    List<Cell> actual = assertCells(expected, scanners);
 
     //Check if result is sorted according to Comparator
     for(int i=0; i<actual.size()-1; i++){
       int ret = CellComparator.COMPARATOR.compare(actual.get(i), 
actual.get(i+1));
       assertTrue(ret < 0);
     }
-
   }
 
   @Test
   public void testSeek() throws IOException {
     //Cases:
-    //1. Seek KeyValue that is not in scanner
+    //1. Seek Cell that is not in scanner
     //2. Check that smallest that is returned from a seek is correct
 
-    List<Cell> l1 = new ArrayList<Cell>();
-    l1.add(new KeyValue(row1, fam1, col5, data));
-    l1.add(new KeyValue(row2, fam1, col1, data));
-    l1.add(new KeyValue(row2, fam1, col2, data));
-    scanners.add(new Scanner(l1));
-
-    List<Cell> l2 = new ArrayList<Cell>();
-    l2.add(new KeyValue(row1, fam1, col1, data));
-    l2.add(new KeyValue(row1, fam1, col2, data));
-    scanners.add(new Scanner(l2));
-
-    List<Cell> l3 = new ArrayList<Cell>();
-    l3.add(new KeyValue(row1, fam1, col3, data));
-    l3.add(new KeyValue(row1, fam1, col4, data));
-    l3.add(new KeyValue(row1, fam2, col1, data));
-    l3.add(new KeyValue(row1, fam2, col2, data));
-    l3.add(new KeyValue(row2, fam1, col3, data));
-    scanners.add(new Scanner(l3));
-
-    List<KeyValue> expected = new ArrayList<KeyValue>();
-    expected.add(new KeyValue(row2, fam1, col1, data));
+    List<Cell> expected = Arrays.asList(kv211);
 
     //Creating KeyValueHeap
     KeyValueHeap kvh =
       new KeyValueHeap(scanners, CellComparator.COMPARATOR);
 
-    KeyValue seekKv = new KeyValue(row2, fam1, null, null);
+    Cell seekKv = new KeyValue(row2, fam1, null, null);
     kvh.seek(seekKv);
 
-    List<Cell> actual = new ArrayList<Cell>();
-    actual.add(kvh.peek());
-
-    assertEquals(expected.size(), actual.size());
-    for(int i=0; i<expected.size(); i++){
-      assertEquals(expected.get(i), actual.get(i));
-      if(PRINT){
-        System.out.println("expected " +expected.get(i)+
-            "\nactual   " +actual.get(i) +"\n");
-      }
-    }
+    List<Cell> actual = Arrays.asList(kvh.peek());
 
+    assertEquals("Expected = " + Arrays.toString(expected.toArray())
+        + "\n Actual = " + Arrays.toString(actual.toArray()), expected, 
actual);
   }
 
   @Test
   public void testScannerLeak() throws IOException {
     // Test for unclosed scanners (HBASE-1927)
 
-    List<Cell> l1 = new ArrayList<Cell>();
-    l1.add(new KeyValue(row1, fam1, col5, data));
-    l1.add(new KeyValue(row2, fam1, col1, data));
-    l1.add(new KeyValue(row2, fam1, col2, data));
-    Scanner s1 = new Scanner(l1);
-    scanners.add(s1);
-
-    List<Cell> l2 = new ArrayList<Cell>();
-    l2.add(new KeyValue(row1, fam1, col1, data));
-    l2.add(new KeyValue(row1, fam1, col2, data));
-    Scanner s2 = new Scanner(l2);
-    scanners.add(s2);
-
-    List<Cell> l3 = new ArrayList<Cell>();
-    l3.add(new KeyValue(row1, fam1, col3, data));
-    l3.add(new KeyValue(row1, fam1, col4, data));
-    l3.add(new KeyValue(row1, fam2, col1, data));
-    l3.add(new KeyValue(row1, fam2, col2, data));
-    l3.add(new KeyValue(row2, fam1, col3, data));
-    Scanner s3 = new Scanner(l3);
-    scanners.add(s3);
-
-    List<Cell> l4 = new ArrayList<Cell>();
-    Scanner s4 = new Scanner(l4);
+    TestScanner s4 = new TestScanner(new ArrayList<Cell>());
     scanners.add(s4);
 
     //Creating KeyValueHeap
@@ -225,7 +152,7 @@ public class TestKeyValueHeap extends HBaseTestCase {
     assertTrue(kvh.scannersForDelayedClose.contains(s4));
     kvh.close();
     for(KeyValueScanner scanner : scanners) {
-      assertTrue(((Scanner)scanner).isClosed());
+      assertTrue(((TestScanner)scanner).isClosed());
     }
   }
 
@@ -233,38 +160,19 @@ public class TestKeyValueHeap extends HBaseTestCase {
   public void testScannerException() throws IOException {
     // Test for NPE issue when exception happens in scanners (HBASE-13835)
 
-    List<Cell> l1 = new ArrayList<Cell>();
-    l1.add(new KeyValue(row1, fam1, col5, data));
-    l1.add(new KeyValue(row2, fam1, col1, data));
-    l1.add(new KeyValue(row2, fam1, col2, data));
-    SeekScanner s1 = new SeekScanner(l1);
-    scanners.add(s1);
-
-    List<Cell> l2 = new ArrayList<Cell>();
-    l2.add(new KeyValue(row1, fam1, col1, data));
-    l2.add(new KeyValue(row1, fam1, col2, data));
-    SeekScanner s2 = new SeekScanner(l2);
-    scanners.add(s2);
-
-    List<Cell> l3 = new ArrayList<Cell>();
-    l3.add(new KeyValue(row1, fam1, col3, data));
-    l3.add(new KeyValue(row1, fam1, col4, data));
-    l3.add(new KeyValue(row1, fam2, col1, data));
-    l3.add(new KeyValue(row1, fam2, col2, data));
-    l3.add(new KeyValue(row2, fam1, col3, data));
-    SeekScanner s3 = new SeekScanner(l3);
-    scanners.add(s3);
-
-    List<Cell> l4 = new ArrayList<Cell>();
-    SeekScanner s4 = new SeekScanner(l4);
-    scanners.add(s4);
+    TestScanner s1 = new SeekTestScanner(Arrays.asList(kv115, kv211, kv212));
+    TestScanner s2 = new SeekTestScanner(Arrays.asList(kv111, kv112));
+    TestScanner s3 = new SeekTestScanner(Arrays.asList(kv113, kv114, kv121, 
kv122, kv213));
+    TestScanner s4 = new SeekTestScanner(new ArrayList<Cell>());
+
+    List<KeyValueScanner> scanners = new 
ArrayList<KeyValueScanner>(Arrays.asList(s1, s2, s3, s4));
 
     // Creating KeyValueHeap
     KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparator.COMPARATOR);
 
     try {
       for (KeyValueScanner scanner : scanners) {
-        ((SeekScanner) scanner).setRealSeekDone(false);
+        ((SeekTestScanner) scanner).setRealSeekDone(false);
       }
       while (kvh.next() != null);
       // The pollRealKV should throw IOE.
@@ -276,20 +184,47 @@ public class TestKeyValueHeap extends HBaseTestCase {
     // It implies there is no NPE thrown from kvh.close() if getting here
     for (KeyValueScanner scanner : scanners) {
       // Verify that close is called and only called once for each scanner
-      assertTrue(((SeekScanner) scanner).isClosed());
-      assertEquals(((SeekScanner) scanner).getClosedNum(), 1);
+      assertTrue(((SeekTestScanner) scanner).isClosed());
+      assertEquals(((SeekTestScanner) scanner).getClosedNum(), 1);
+    }
+  }
+
+  @Test
+  public void testPriorityId() throws IOException {
+    Cell kv113A = new KeyValue(row1, fam1, col3, Bytes.toBytes("aaa"));
+    Cell kv113B = new KeyValue(row1, fam1, col3, Bytes.toBytes("bbb"));
+    {
+      TestScanner scan1 = new TestScanner(Arrays.asList(kv111, kv112, kv113A), 
1);
+      TestScanner scan2 = new TestScanner(Arrays.asList(kv113B), 2);
+      List<Cell> expected = Arrays.asList(kv111, kv112, kv113B, kv113A);
+      assertCells(expected, new 
ArrayList<KeyValueScanner>(Arrays.asList(scan1, scan2)));
+    }
+    {
+      TestScanner scan1 = new TestScanner(Arrays.asList(kv111, kv112, kv113A), 
2);
+      TestScanner scan2 = new TestScanner(Arrays.asList(kv113B), 1);
+      List<Cell> expected = Arrays.asList(kv111, kv112, kv113A, kv113B);
+      assertCells(expected, new 
ArrayList<KeyValueScanner>(Arrays.asList(scan1, scan2)));
     }
   }
 
-  private static class Scanner extends CollectionBackedScanner {
-    private Iterator<Cell> iter;
-    private Cell current;
+  private static class TestScanner extends CollectionBackedScanner {
     private boolean closed = false;
+    private long scannerOrder = 0;
 
-    public Scanner(List<Cell> list) {
+    public TestScanner(List<Cell> list) {
       super(list);
     }
 
+    public TestScanner(List<Cell> list, long scannerOrder) {
+      this(list);
+      this.scannerOrder = scannerOrder;
+    }
+
+    @Override
+    public long getScannerOrder() {
+      return scannerOrder;
+    }
+
     @Override
     public void close(){
       closed = true;
@@ -300,11 +235,11 @@ public class TestKeyValueHeap extends HBaseTestCase {
     }
   }
 
-  private static class SeekScanner extends Scanner {
+  private static class SeekTestScanner extends TestScanner {
     private int closedNum = 0;
     private boolean realSeekDone = true;
 
-    public SeekScanner(List<Cell> list) {
+    public SeekTestScanner(List<Cell> list) {
       super(list);
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/11740570/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java
index d8acd44..ab0c173 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java
@@ -211,7 +211,7 @@ public class TestStoreFile extends HBaseTestCase {
     when(hcd.getName()).thenReturn(cf);
     when(store.getFamily()).thenReturn(hcd);
     StoreFileScanner scanner =
-        new StoreFileScanner(reader, mock(HFileScanner.class), false, false, 
0);
+        new StoreFileScanner(reader, mock(HFileScanner.class), false, false, 
0, 0);
     Scan scan = new Scan();
     scan.setColumnFamilyTimeRange(cf, 0, 1);
     assertFalse(scanner.shouldUseScanner(scan, store, 0));

http://git-wip-us.apache.org/repos/asf/hbase/blob/11740570/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java
index 7707116..670a8d3 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java
@@ -64,7 +64,7 @@ public class TestCompactor {
     when(r.length()).thenReturn(1L);
     when(r.getBloomFilterType()).thenReturn(BloomType.NONE);
     when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class));
-    when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), 
anyLong()))
+    when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), 
anyLong(), anyLong()))
         .thenReturn(mock(StoreFileScanner.class));
     when(sf.getReader()).thenReturn(r);
     when(sf.createReader()).thenReturn(r);

http://git-wip-us.apache.org/repos/asf/hbase/blob/11740570/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
index 160deb3..d8770e0 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
@@ -750,7 +750,8 @@ public class TestStripeCompactionPolicy {
     when(r.length()).thenReturn(size);
     when(r.getBloomFilterType()).thenReturn(BloomType.NONE);
     when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class));
-    when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), 
anyLong())).thenReturn(
+    when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), 
anyLong(), anyLong()))
+        .thenReturn(
       mock(StoreFileScanner.class));
     when(sf.getReader()).thenReturn(r);
     when(sf.createReader(anyBoolean())).thenReturn(r);

Reply via email to