Repository: hbase Updated Branches: refs/heads/0.98 4e4aabb93 -> 3fe903cb7
HBASE-14269 FuzzyRowFilter omits certain rows when multiple fuzzy keys exist (hongbin ma) Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3fe903cb Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3fe903cb Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3fe903cb Branch: refs/heads/0.98 Commit: 3fe903cb77c09eb6905dc235b7e7109e3135bbc6 Parents: 4e4aabb Author: tedyu <yuzhih...@gmail.com> Authored: Wed Aug 26 21:02:51 2015 -0700 Committer: tedyu <yuzhih...@gmail.com> Committed: Wed Aug 26 21:02:51 2015 -0700 ---------------------------------------------------------------------- .../hadoop/hbase/filter/FuzzyRowFilter.java | 118 +++++++++---------- .../hadoop/hbase/filter/TestFuzzyRowFilter.java | 1 - .../filter/TestFuzzyRowFilterEndToEnd.java | 94 ++++++++++----- 3 files changed, 126 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/3fe903cb/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/FuzzyRowFilter.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/FuzzyRowFilter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/FuzzyRowFilter.java index 7a3f1c7..d6ad3a0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/FuzzyRowFilter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/FuzzyRowFilter.java @@ -19,9 +19,9 @@ package org.apache.hadoop.hbase.filter; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.PriorityQueue; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; @@ -157,83 +157,83 @@ public class FuzzyRowFilter extends FilterBase { @Override public Cell getNextCellHint(Cell currentCell) { - boolean result = true; - if (tracker.needsUpdate()) { - result = tracker.updateTracker(currentCell); - } + boolean result = tracker.updateTracker(currentCell); if (result == false) { done = true; return null; } byte[] nextRowKey = tracker.nextRow(); - // We need to compare nextRowKey with currentCell - int compareResult = - Bytes.compareTo(nextRowKey, 0, nextRowKey.length, currentCell.getRowArray(), - currentCell.getRowOffset(), currentCell.getRowLength()); - if ((reversed && compareResult > 0) || (!reversed && compareResult < 0)) { - // This can happen when we have multilpe filters and some other filter - // returns next row with hint which is larger (smaller for reverse) - // than the current (really?) - result = tracker.updateTracker(currentCell); - if (result == false) { - done = true; - return null; - } else { - nextRowKey = tracker.nextRow(); - } - } return KeyValue.createFirstOnRow(nextRowKey); } /** - * If we have multiple fuzzy keys, row tracker should improve overall performance It calculates - * all next rows (one per every fuzzy key), sort them accordingly (ascending for regular and - * descending for reverse). Next time getNextCellHint is called we check row tracker first and - * return next row from the tracker if it exists, if there are no rows in the tracker we update - * tracker with a current cell and return first row. + * If we have multiple fuzzy keys, row tracker should improve overall performance. It calculates + * all next rows (one per every fuzzy key) and put them (the fuzzy key is bundled) into a priority + * queue so that the smallest row key always appears at queue head, which helps to decide the + * "Next Cell Hint". As scanning going on, the number of candidate rows in the RowTracker will + * remain the size of fuzzy keys until some of the fuzzy keys won't possibly have matches any + * more. */ private class RowTracker { - private final List<byte[]> nextRows; - private int next = -1; + private final PriorityQueue<Pair<byte[], Pair<byte[], byte[]>>> nextRows; + private boolean initialized = false; RowTracker() { - nextRows = new ArrayList<byte[]>(); - } - - boolean needsUpdate() { - return next == -1 || next == nextRows.size(); + nextRows = + new PriorityQueue<Pair<byte[], Pair<byte[], byte[]>>>(fuzzyKeysData.size(), + new Comparator<Pair<byte[], Pair<byte[], byte[]>>>() { + @Override + public int compare(Pair<byte[], Pair<byte[], byte[]>> o1, + Pair<byte[], Pair<byte[], byte[]>> o2) { + int compare = Bytes.compareTo(o1.getFirst(), o2.getFirst()); + if (!isReversed()) { + return compare; + } else { + return -compare; + } + } + }); } byte[] nextRow() { - if (next < 0 || next == nextRows.size()) return null; - return nextRows.get(next++); + if (nextRows.isEmpty()) { + throw new IllegalStateException( + "NextRows should not be empty, make sure to call nextRow() after updateTracker() return true"); + } else { + return nextRows.peek().getFirst(); + } } boolean updateTracker(Cell currentCell) { - nextRows.clear(); - for (Pair<byte[], byte[]> fuzzyData : fuzzyKeysData) { - byte[] nextRowKeyCandidate = - getNextForFuzzyRule(isReversed(), currentCell.getRowArray(), - currentCell.getRowOffset(), currentCell.getRowLength(), fuzzyData.getFirst(), - fuzzyData.getSecond()); - if (nextRowKeyCandidate == null) { - continue; + if (!initialized) { + for (Pair<byte[], byte[]> fuzzyData : fuzzyKeysData) { + updateWith(currentCell, fuzzyData); } - nextRows.add(nextRowKeyCandidate); - } - // Sort all next row candidates - Collections.sort(nextRows, new Comparator<byte[]>() { - @Override - public int compare(byte[] o1, byte[] o2) { - if (reversed) { - return -Bytes.compareTo(o1, o2); - } else { - return Bytes.compareTo(o1, o2); - } + initialized = true; + } else { + while (!nextRows.isEmpty() && !lessThan(currentCell, nextRows.peek().getFirst())) { + Pair<byte[], Pair<byte[], byte[]>> head = nextRows.poll(); + Pair<byte[], byte[]> fuzzyData = head.getSecond(); + updateWith(currentCell, fuzzyData); } - }); - next = 0; - return nextRows.size() > 0; + } + return !nextRows.isEmpty(); + } + + boolean lessThan(Cell currentCell, byte[] nextRowKey) { + int compareResult = + Bytes.compareTo(currentCell.getRowArray(), currentCell.getRowOffset(), + currentCell.getRowLength(), nextRowKey, 0, nextRowKey.length); + return (!isReversed() && compareResult < 0) || (isReversed() && compareResult > 0); + } + + void updateWith(Cell currentCell, Pair<byte[], byte[]> fuzzyData) { + byte[] nextRowKeyCandidate = + getNextForFuzzyRule(isReversed(), currentCell.getRowArray(), currentCell.getRowOffset(), + currentCell.getRowLength(), fuzzyData.getFirst(), fuzzyData.getSecond()); + if (nextRowKeyCandidate != null) { + nextRows.add(new Pair<byte[], Pair<byte[], byte[]>>(nextRowKeyCandidate, fuzzyData)); + } } } @@ -394,8 +394,8 @@ public class FuzzyRowFilter extends FilterBase { return SatisfiesCode.YES; } - static SatisfiesCode satisfiesNoUnsafe(boolean reverse, byte[] row, int offset, - int length, byte[] fuzzyKeyBytes, byte[] fuzzyKeyMeta) { + static SatisfiesCode satisfiesNoUnsafe(boolean reverse, byte[] row, int offset, int length, + byte[] fuzzyKeyBytes, byte[] fuzzyKeyMeta) { if (row == null) { // do nothing, let scan to proceed return SatisfiesCode.YES; http://git-wip-us.apache.org/repos/asf/hbase/blob/3fe903cb/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFuzzyRowFilter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFuzzyRowFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFuzzyRowFilter.java index 25cbeb2..8799ebf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFuzzyRowFilter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFuzzyRowFilter.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.filter; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Assert; http://git-wip-us.apache.org/repos/asf/hbase/blob/3fe903cb/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFuzzyRowFilterEndToEnd.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFuzzyRowFilterEndToEnd.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFuzzyRowFilterEndToEnd.java index 1ff49a7..c31bb93 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFuzzyRowFilterEndToEnd.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFuzzyRowFilterEndToEnd.java @@ -59,12 +59,14 @@ import com.google.common.collect.Lists; @Category(MediumTests.class) public class TestFuzzyRowFilterEndToEnd { private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final static byte fuzzyValue = (byte) 63; private static final Log LOG = LogFactory.getLog(TestFuzzyRowFilterEndToEnd.class); private static int firstPartCardinality = 50; - private static int secondPartCardinality = 40; - private static int colQualifiersTotal = 50; - private static int totalFuzzyKeys = secondPartCardinality / 2; + private static int secondPartCardinality = 50; + private static int thirdPartCardinality = 50; + private static int colQualifiersTotal = 5; + private static int totalFuzzyKeys = thirdPartCardinality / 2; private static String table = "TestFuzzyRowFilterEndToEnd"; @@ -118,25 +120,27 @@ public class TestFuzzyRowFilterEndToEnd { // 4 byte qualifier // 4 byte value - for (int i1 = 0; i1 < firstPartCardinality; i1++) { - if ((i1 % 1000) == 0) LOG.info("put " + i1); + for (int i0 = 0; i0 < firstPartCardinality; i0++) { - for (int i2 = 0; i2 < secondPartCardinality; i2++) { - byte[] rk = new byte[10]; + for (int i1 = 0; i1 < secondPartCardinality; i1++) { - ByteBuffer buf = ByteBuffer.wrap(rk); - buf.clear(); - buf.putShort((short) 2); - buf.putInt(i1); - buf.putInt(i2); - for (int c = 0; c < colQualifiersTotal; c++) { - byte[] cq = new byte[4]; - Bytes.putBytes(cq, 0, Bytes.toBytes(c), 0, 4); + for (int i2 = 0; i2 < thirdPartCardinality; i2++) { + byte[] rk = new byte[10]; - Put p = new Put(rk); - p.setDurability(Durability.SKIP_WAL); - p.add(cf.getBytes(), cq, Bytes.toBytes(c)); - ht.put(p); + ByteBuffer buf = ByteBuffer.wrap(rk); + buf.clear(); + buf.putShort((short) i0); + buf.putInt(i1); + buf.putInt(i2); + for (int c = 0; c < colQualifiersTotal; c++) { + byte[] cq = new byte[4]; + Bytes.putBytes(cq, 0, Bytes.toBytes(c), 0, 4); + + Put p = new Put(rk); + p.setDurability(Durability.SKIP_WAL); + p.add(cf.getBytes(), cq, Bytes.toBytes(c)); + ht.put(p); + } } } } @@ -144,11 +148,12 @@ public class TestFuzzyRowFilterEndToEnd { TEST_UTIL.flush(); // test passes - runTest(ht); + runTest1(ht); + runTest2(ht); } - private void runTest(HTable hTable) throws IOException { + private void runTest1(HTable hTable) throws IOException { // [0, 2, ?, ?, ?, ?, 0, 0, 0, 1] byte[] mask = new byte[] { 0, 0, 1, 1, 1, 1, 0, 0, 0, 0 }; @@ -160,7 +165,7 @@ public class TestFuzzyRowFilterEndToEnd { buf.clear(); buf.putShort((short) 2); for (int j = 0; j < 4; j++) { - buf.put((byte) 63); + buf.put(fuzzyValue); } buf.putInt(i); @@ -168,7 +173,41 @@ public class TestFuzzyRowFilterEndToEnd { list.add(pair); } - int expectedSize = firstPartCardinality * totalFuzzyKeys * colQualifiersTotal; + int expectedSize = secondPartCardinality * totalFuzzyKeys * colQualifiersTotal; + FuzzyRowFilter fuzzyRowFilter0 = new FuzzyRowFilter(list); + // Filters are not stateless - we can't reuse them + FuzzyRowFilter fuzzyRowFilter1 = new FuzzyRowFilter(list); + + // regular test + runScanner(hTable, expectedSize, fuzzyRowFilter0); + // optimized from block cache + runScanner(hTable, expectedSize, fuzzyRowFilter1); + + } + + private void runTest2(HTable hTable) throws IOException { + // [0, 0, ?, ?, ?, ?, 0, 0, 0, 0] , [0, 1, ?, ?, ?, ?, 0, 0, 0, 1]... + + byte[] mask = new byte[] { 0, 0, 1, 1, 1, 1, 0, 0, 0, 0 }; + + List<Pair<byte[], byte[]>> list = new ArrayList<Pair<byte[], byte[]>>(); + + for (int i = 0; i < totalFuzzyKeys; i++) { + byte[] fuzzyKey = new byte[10]; + ByteBuffer buf = ByteBuffer.wrap(fuzzyKey); + buf.clear(); + buf.putShort((short) (i * 2)); + for (int j = 0; j < 4; j++) { + buf.put(fuzzyValue); + } + buf.putInt(i * 2); + + Pair<byte[], byte[]> pair = new Pair<byte[], byte[]>(fuzzyKey, mask); + list.add(pair); + } + + int expectedSize = totalFuzzyKeys * secondPartCardinality * colQualifiersTotal; + FuzzyRowFilter fuzzyRowFilter0 = new FuzzyRowFilter(list); // Filters are not stateless - we can't reuse them FuzzyRowFilter fuzzyRowFilter1 = new FuzzyRowFilter(list); @@ -207,7 +246,7 @@ public class TestFuzzyRowFilterEndToEnd { assertEquals(expectedSize, found); } - + @SuppressWarnings("deprecation") @Test public void testFilterList() throws Exception { @@ -260,7 +299,7 @@ public class TestFuzzyRowFilterEndToEnd { buf.clear(); buf.putShort((short) 2); for (int i = 0; i < 4; i++) - buf.put((byte) 63); + buf.put(fuzzyValue); buf.putInt((short) 1); byte[] mask1 = new byte[] { 0, 0, 1, 1, 1, 1, 0, 0, 0, 0 }; @@ -270,7 +309,7 @@ public class TestFuzzyRowFilterEndToEnd { buf.putShort((short) 2); buf.putInt((short) 2); for (int i = 0; i < 4; i++) - buf.put((byte) 63); + buf.put(fuzzyValue); byte[] mask2 = new byte[] { 0, 0, 0, 0, 0, 0, 1, 1, 1, 1 }; @@ -283,7 +322,8 @@ public class TestFuzzyRowFilterEndToEnd { runScanner(hTable, expectedSize, fuzzyRowFilter1, fuzzyRowFilter2); } - private void runScanner(HTable hTable, int expectedSize, Filter filter1, Filter filter2) throws IOException { + private void runScanner(HTable hTable, int expectedSize, Filter filter1, Filter filter2) + throws IOException { String cf = "f"; Scan scan = new Scan(); scan.addFamily(cf.getBytes());