This is an automated email from the ASF dual-hosted git repository. kadir pushed a commit to branch 5.2 in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.2 by this push: new 97c63c1cae PHOENIX-7287 Leverage bloom filters for multi-key point lookups (#1923) 97c63c1cae is described below commit 97c63c1cae90a826dc44b23dbc53954961206ecc Author: Kadir Ozdemir <37155482+kadiro...@users.noreply.github.com> AuthorDate: Tue Jul 9 18:04:47 2024 -0700 PHOENIX-7287 Leverage bloom filters for multi-key point lookups (#1923) --- .../org/apache/phoenix/compile/ScanRanges.java | 2 +- .../execute/PhoenixTxIndexMutationGenerator.java | 2 +- .../org/apache/phoenix/filter/PagingFilter.java | 109 +++++---- .../org/apache/phoenix/filter/SkipScanFilter.java | 48 +++- .../java/org/apache/phoenix/util/ScanUtil.java | 44 ++++ .../coprocessor/GlobalIndexRegionScanner.java | 4 +- .../coprocessor/IndexRepairRegionScanner.java | 2 +- .../phoenix/coprocessor/PagingRegionScanner.java | 260 ++++++++++++++++----- .../coprocessor/UncoveredIndexRegionScanner.java | 2 +- .../hbase/index/covered/data/CachedLocalTable.java | 2 +- .../java/org/apache/phoenix/end2end/InListIT.java | 38 ++- .../org/apache/phoenix/end2end/TableTTLIT.java | 3 +- .../apache/phoenix/compile/WhereCompilerTest.java | 14 +- .../filter/SkipScanFilterIntersectTest.java | 2 +- .../apache/phoenix/filter/SkipScanFilterTest.java | 4 +- .../phoenix/query/ParallelIteratorsSplitTest.java | 4 +- 16 files changed, 391 insertions(+), 149 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/ScanRanges.java b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/ScanRanges.java index e9de7b75a6..1667bf1058 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/ScanRanges.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/ScanRanges.java @@ -198,7 +198,7 @@ public class ScanRanges { ranges = ranges.subList(0, boundSlotCount); slotSpan = Arrays.copyOf(slotSpan, boundSlotCount); } - this.filter = new SkipScanFilter(ranges, slotSpan, this.schema); + this.filter = new SkipScanFilter(ranges, slotSpan, this.schema, isPointLookup); } } diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java index 91f0c92f4c..0b4423ba0f 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java @@ -199,7 +199,7 @@ public class PhoenixTxIndexMutationGenerator { // checkpointed versions. SkipScanFilter filter = scanRanges.getSkipScanFilter(); if (isRollback) { - filter = new SkipScanFilter(filter,true); + filter = new SkipScanFilter(filter,true, false); indexMetaData.getTransactionContext().setVisibilityLevel(PhoenixVisibilityLevel.SNAPSHOT_ALL); } scan.setFilter(filter); diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/filter/PagingFilter.java b/phoenix-core-client/src/main/java/org/apache/phoenix/filter/PagingFilter.java index 988efddf6a..eb9eb3b426 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/filter/PagingFilter.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/filter/PagingFilter.java @@ -35,27 +35,54 @@ import org.apache.hadoop.io.Writable; import org.apache.phoenix.util.EnvironmentEdgeManager; /** - * This is a top level Phoenix filter which is injected to a scan at the server side. If the scan has - * already a filter then PagingFilter wraps it. This filter for server paging. It makes sure that - * the scan does not take more than pageSizeInMs. + * This is a top level Phoenix filter which is injected to a scan at the server side. If the scan + * already has a filter then PagingFilter wraps it. This filter is for server pagination. It makes + * sure that the scan does not take more than pageSizeInMs. + * + * PagingRegionScanner initializes PagingFilter before retrieving a row. The state of PagingFilter + * consists of three variables startTime, isStopped, and currentCell. During this + * initialization, starTime is set to the current time, isStopped to false, and currentCell to null. + * + * PagingFilter implements the paging state machine in three filter methods that are + * hasFilterRow(), filterAllRemaining(), and filterRowKey(). These methods are called in the + * following order for each row: hasFilterRow(), filterAllRemaining(), filterRowKey(), and + * filterAllRemaining(). Please note that filterAllRemaining() is called twice (before and after + * filterRowKey()). Sometimes, filterAllRemaining() is called multiple times back to back. + * + * In hasFilterRow(), if currentCell is not null, meaning that at least one row has been + * scanned, and it is time to page out, then PagingFilter sets isStopped to true. + * + * In filterAllRemaining(), PagingFilter returns true if isStopped is true. Returning true from this + * method causes the HBase region scanner to signal the caller (that is PagingRegionScanner in this + * case) that there are no more rows to scan by returning false from the next() call. In that case, + * PagingRegionScanner checks if PagingFilter is stopped. If PagingFilter is stopped, then it means + * the last next() call paged out rather than the scan operation reached at its last row. + * Please note it is crucial that PagingFilter returns true in the first filterAllRemaining() call + * for a given row. This allows to the HBase region scanner to resume the scanning rows when the + * next() method is called even though the region scanner already signaled the caller that there + * were no more rows to scan. PagingRegionScanner leverages this behavior to resume the scan + * operation using the same scanner instead closing the current one and starting a new scanner. If + * this specific HBase region scanner behavior changes, it will cause server paging test failures. + * To fix them, the PagingRegionScanner code needs to change such that PagingRegionScanner needs to + * create a new scanner with adjusted start row to resume the scan operation after PagingFilter + * stops. + * + * If the scan operation has not been terminated by PageFilter, HBase subsequently calls + * filterRowKey(). In this method, PagingFilter records the last row that is scanned. + * */ public class PagingFilter extends FilterBase implements Writable { - private enum State { - INITIAL, STARTED, TIME_TO_STOP, STOPPED - } - State state; private long pageSizeMs; private long startTime; // tracks the row we last visited private Cell currentCell; + private boolean isStopped; private Filter delegate = null; public PagingFilter() { - init(); } public PagingFilter(Filter delegate, long pageSizeMs) { - init(); this.delegate = delegate; this.pageSizeMs = pageSizeMs; } @@ -77,38 +104,33 @@ public class PagingFilter extends FilterBase implements Writable { } public boolean isStopped() { - return state == State.STOPPED; + return isStopped; } public void init() { - state = State.INITIAL; + isStopped = false; currentCell = null; + startTime = EnvironmentEdgeManager.currentTimeMillis(); } @Override - public void reset() throws IOException { - long currentTime = EnvironmentEdgeManager.currentTimeMillis(); - // reset can be called multiple times for the same row sometimes even before we have - // scanned even one row. The order in which it is called is not very predictable. - // So we need to ensure that we have seen at least one row before we page. - // The currentCell != null check ensures that. - if (state == State.STARTED && currentCell != null - && currentTime - startTime >= pageSizeMs) { - state = State.TIME_TO_STOP; - } - if (delegate != null) { - delegate.reset(); - return; + public boolean hasFilterRow() { + if (currentCell != null + && EnvironmentEdgeManager.currentTimeMillis() - startTime >= pageSizeMs) { + isStopped = true; } - super.reset(); + return true; } @Override - public Cell getNextCellHint(Cell currentKV) throws IOException { + public boolean filterAllRemaining() throws IOException { + if (isStopped) { + return true; + } if (delegate != null) { - return delegate.getNextCellHint(currentKV); + return delegate.filterAllRemaining(); } - return super.getNextCellHint(currentKV); + return super.filterAllRemaining(); } @Override @@ -121,37 +143,24 @@ public class PagingFilter extends FilterBase implements Writable { } @Override - public boolean filterAllRemaining() throws IOException { - if (state == State.TIME_TO_STOP) { - state = State.STOPPED; - return true; - } - if (state == State.STOPPED) { - return true; - } + public void reset() throws IOException { if (delegate != null) { - return delegate.filterAllRemaining(); + delegate.reset(); + return; } - return super.filterAllRemaining(); + super.reset(); } @Override - /** - * This is called once for every row in the beginning. - */ - public boolean hasFilterRow() { - if (state == State.INITIAL) { - startTime = EnvironmentEdgeManager.currentTimeMillis(); - state = State.STARTED; + public Cell getNextCellHint(Cell currentKV) throws IOException { + if (delegate != null) { + return delegate.getNextCellHint(currentKV); } - return true; + return super.getNextCellHint(currentKV); } @Override public boolean filterRow() throws IOException { - if (state == State.TIME_TO_STOP) { - return true; - } if (delegate != null) { return delegate.filterRow(); } @@ -201,7 +210,6 @@ public class PagingFilter extends FilterBase implements Writable { @Override public ReturnCode filterKeyValue(Cell v) throws IOException { - if (delegate != null) { return delegate.filterKeyValue(v); } @@ -210,7 +218,6 @@ public class PagingFilter extends FilterBase implements Writable { @Override public Filter.ReturnCode filterCell(Cell c) throws IOException { - currentCell = c; if (delegate != null) { return delegate.filterCell(c); } diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java b/phoenix-core-client/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java index 30728d366c..a9cb711653 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java @@ -21,6 +21,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -79,6 +80,7 @@ public class SkipScanFilter extends FilterBase implements Writable { private int endKeyLength; private boolean isDone; private int offset; + private boolean isMultiKeyPointLookup; private Map<ImmutableBytesWritable, Cell> nextCellHintMap = new HashMap<ImmutableBytesWritable, Cell>(); @@ -91,27 +93,42 @@ public class SkipScanFilter extends FilterBase implements Writable { public SkipScanFilter() { } - public SkipScanFilter(SkipScanFilter filter, boolean includeMultipleVersions) { - this(filter.slots, filter.slotSpan, filter.schema, includeMultipleVersions); + public SkipScanFilter(SkipScanFilter filter, boolean includeMultipleVersions, + boolean isMultiKeyPointLookup) { + this(filter.slots, filter.slotSpan, filter.schema, includeMultipleVersions, + isMultiKeyPointLookup); } - public SkipScanFilter(List<List<KeyRange>> slots, RowKeySchema schema) { - this(slots, ScanUtil.getDefaultSlotSpans(slots.size()), schema); + public SkipScanFilter(List<List<KeyRange>> slots, RowKeySchema schema, boolean isMultiKeyPointLookup) { + this(slots, ScanUtil.getDefaultSlotSpans(slots.size()), schema, isMultiKeyPointLookup); } - public SkipScanFilter(List<List<KeyRange>> slots, int[] slotSpan, RowKeySchema schema) { - this(slots, slotSpan, schema, false); + public SkipScanFilter(List<List<KeyRange>> slots, int[] slotSpan, RowKeySchema schema, + boolean isMultiKeyPointLookup) { + this(slots, slotSpan, schema, false, isMultiKeyPointLookup); } - private SkipScanFilter(List<List<KeyRange>> slots, int[] slotSpan, RowKeySchema schema, boolean includeMultipleVersions) { - init(slots, slotSpan, schema, includeMultipleVersions); + private SkipScanFilter(List<List<KeyRange>> slots, int[] slotSpan, RowKeySchema schema, + boolean includeMultipleVersions, boolean isMultiKeyPointLookup) { + init(slots, slotSpan, schema, includeMultipleVersions, isMultiKeyPointLookup); } public void setOffset(int offset) { this.offset = offset; } + public int getOffset() { + return offset; + } + public boolean isMultiKeyPointLookup() { + return isMultiKeyPointLookup; + } + + public List<KeyRange> getPointLookupKeyRanges() { + return isMultiKeyPointLookup ? slots.get(0) : Collections.emptyList(); + } - private void init(List<List<KeyRange>> slots, int[] slotSpan, RowKeySchema schema, boolean includeMultipleVersions) { + private void init(List<List<KeyRange>> slots, int[] slotSpan, RowKeySchema schema, + boolean includeMultipleVersions, boolean isPointLookup) { for (List<KeyRange> ranges : slots) { if (ranges.isEmpty()) { throw new IllegalStateException(); @@ -126,6 +143,7 @@ public class SkipScanFilter extends FilterBase implements Writable { this.endKey = new byte[maxKeyLength]; this.endKeyLength = 0; this.includeMultipleVersions = includeMultipleVersions; + this.isMultiKeyPointLookup = isPointLookup; } // Exposed for testing. @@ -194,7 +212,7 @@ public class SkipScanFilter extends FilterBase implements Writable { public SkipScanFilter intersect(byte[] lowerInclusiveKey, byte[] upperExclusiveKey) { List<List<KeyRange>> newSlots = Lists.newArrayListWithCapacity(slots.size()); if (intersect(lowerInclusiveKey, upperExclusiveKey, newSlots)) { - return new SkipScanFilter(newSlots, slotSpan, schema); + return new SkipScanFilter(newSlots, slotSpan, schema, isMultiKeyPointLookup); } return null; } @@ -618,7 +636,14 @@ public class SkipScanFilter extends FilterBase implements Writable { orClause.add(range); } } - this.init(slots, slotSpan, schema, includeMultipleVersions); + try { + boolean isPointLookup = in.readBoolean(); + this.init(slots, slotSpan, schema, includeMultipleVersions, isPointLookup); + } catch (IOException e) { + // Reached the end of the stream before reading the boolean field. The client can be + // an older client + this.init(slots, slotSpan, schema, includeMultipleVersions, false); + } } @Override @@ -636,6 +661,7 @@ public class SkipScanFilter extends FilterBase implements Writable { range.write(out); } } + out.writeBoolean(isMultiKeyPointLookup); } @Override diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java index c978acf6f6..13a8aeda8e 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java @@ -1637,6 +1637,50 @@ public class ScanUtil { return null; } + public static SkipScanFilter removeSkipScanFilterFromFilterList(FilterList filterList) { + Iterator<Filter> filterIterator = filterList.getFilters().iterator(); + while (filterIterator.hasNext()) { + Filter filter = filterIterator.next(); + if (filter instanceof SkipScanFilter + && ((SkipScanFilter) filter).isMultiKeyPointLookup()) { + filterIterator.remove(); + return (SkipScanFilter) filter; + } else if (filter instanceof FilterList) { + SkipScanFilter skipScanFilter = removeSkipScanFilterFromFilterList((FilterList) filter); + if (skipScanFilter != null) { + return skipScanFilter; + } + } + } + return null; + } + public static SkipScanFilter removeSkipScanFilter(Scan scan) { + Filter filter = scan.getFilter(); + if (filter != null) { + PagingFilter pagingFilter = null; + if (filter instanceof PagingFilter) { + pagingFilter = (PagingFilter) filter; + filter = pagingFilter.getDelegateFilter(); + if (filter == null) { + return null; + } + } + if (filter instanceof SkipScanFilter + && ((SkipScanFilter) filter).isMultiKeyPointLookup()) { + if (pagingFilter != null) { + pagingFilter.setDelegateFilter(null); + scan.setFilter(pagingFilter); + } else { + scan.setFilter(null); + } + return (SkipScanFilter) filter; + } else if (filter instanceof FilterList) { + return removeSkipScanFilterFromFilterList((FilterList) filter); + } + } + return null; + } + /** * Verify whether the given row key is in the scan boundaries i.e. scan start and end keys. * diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java index 5af9950aa6..5706916cae 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java @@ -1099,7 +1099,7 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner { indexScan.setTimeRange(scan.getTimeRange().getMin(), scan.getTimeRange().getMax()); scanRanges.initializeScan(indexScan); SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter(); - indexScan.setFilter(new SkipScanFilter(skipScanFilter, true)); + indexScan.setFilter(new SkipScanFilter(skipScanFilter, true, true)); indexScan.setRaw(true); indexScan.readAllVersions(); indexScan.setCacheBlocks(false); @@ -1502,7 +1502,7 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner { ScanRanges scanRanges = ScanRanges.createPointLookup(keys); scanRanges.initializeScan(incrScan); SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter(); - incrScan.setFilter(new SkipScanFilter(skipScanFilter, true)); + incrScan.setFilter(new SkipScanFilter(skipScanFilter, true, true)); //putting back the min time to 0 for index and data reads incrScan.setTimeRange(0, scan.getTimeRange().getMax()); scan.setTimeRange(0, scan.getTimeRange().getMax()); diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java index 74c8e2edf6..19707bd533 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java @@ -152,7 +152,7 @@ public class IndexRepairRegionScanner extends GlobalIndexRegionScanner { dataScan.setTimeRange(scan.getTimeRange().getMin(), scan.getTimeRange().getMax()); scanRanges.initializeScan(dataScan); SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter(); - dataScan.setFilter(new SkipScanFilter(skipScanFilter, true)); + dataScan.setFilter(new SkipScanFilter(skipScanFilter, true, true)); dataScan.setRaw(true); dataScan.readAllVersions(); dataScan.setCacheBlocks(false); diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PagingRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PagingRegionScanner.java index 965cd370c9..719ce43731 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PagingRegionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PagingRegionScanner.java @@ -23,11 +23,16 @@ import java.util.List; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.client.PackagePrivateFieldAccessor; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.filter.PagingFilter; +import org.apache.phoenix.filter.SkipScanFilter; +import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.ScanUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,82 +44,225 @@ import org.slf4j.LoggerFactory; * to be filtered out), PagingFilter stops the HBase region scanner and sets its state * to STOPPED. In this case, the HBase region scanner next() returns false and * PagingFilter#isStopped() returns true. PagingRegionScanner is responsible for detecting - * PagingFilter has stopped the scanner, and then closing the current HBase region scanner, - * starting a new one to resume the scan operation and returning a dummy result to signal to + * PagingFilter has stopped the scanner, and returning a dummy result to signal to * Phoenix client to resume the scan operation by skipping this dummy result and calling * ResultScanner#next(). + * + * PagingRegionScanner also converts a multi-key point lookup scan into N single point lookup + * scans to allow individual scan to leverage HBase bloom filter. This conversion is done within + * the MultiKeyPointLookup inner class. */ public class PagingRegionScanner extends BaseRegionScanner { + private static final Logger LOGGER = LoggerFactory.getLogger(PagingRegionScanner.class); private Region region; private Scan scan; private PagingFilter pagingFilter; + private MultiKeyPointLookup multiKeyPointLookup = null; + private boolean initialized = false; - private static final Logger LOGGER = LoggerFactory.getLogger(PagingRegionScanner.class); + private class MultiKeyPointLookup { + private SkipScanFilter skipScanFilter; + private List<KeyRange> pointLookupRanges = null; + private int lookupPosition = 0; + private byte[] lookupKeyPrefix = null; + private long pageSizeMs; - public PagingRegionScanner(Region region, RegionScanner scanner, Scan scan) { - super(scanner); - this.region = region; - this.scan = scan; - pagingFilter = ScanUtil.getPhoenixPagingFilter(scan); - if (pagingFilter != null) { - pagingFilter.init(); + private MultiKeyPointLookup(SkipScanFilter skipScanFilter) throws IOException { + this.skipScanFilter = skipScanFilter; + pageSizeMs = ScanUtil.getPageSizeMsForRegionScanner(scan); + pointLookupRanges = skipScanFilter.getPointLookupKeyRanges(); + lookupPosition = findLookupPosition(scan.getStartRow()); + if (skipScanFilter.getOffset() > 0) { + lookupKeyPrefix = new byte[skipScanFilter.getOffset()]; + System.arraycopy(scan.getStartRow(), 0, lookupKeyPrefix, 0, + skipScanFilter.getOffset()); + } + // A point lookup scan does not need to have a paging filter + if (pagingFilter != null) { + scan.setFilter(pagingFilter.getDelegateFilter()); + } } - } - private boolean next(List<Cell> results, boolean raw) throws IOException { - try { - byte[] adjustedStartRowKey = - scan.getAttribute(QueryServices.PHOENIX_PAGING_NEW_SCAN_START_ROWKEY); - byte[] adjustedStartRowKeyIncludeBytes = - scan.getAttribute(QueryServices.PHOENIX_PAGING_NEW_SCAN_START_ROWKEY_INCLUDE); - // If scanners at higher level needs to re-scan the data that were already scanned - // earlier, they can provide adjusted new start rowkey for the scan and whether to - // include it. - // If they are set as the scan attributes, close the scanner, reopen it with - // updated start rowkey and whether to include it. Update mvcc read point from the - // previous scanner and set it back to the new scanner to maintain the read - // consistency for the given region. - // Once done, continue the scan operation and reset the attributes. - if (adjustedStartRowKey != null && adjustedStartRowKeyIncludeBytes != null) { - long mvccReadPoint = delegate.getMvccReadPoint(); - delegate.close(); - scan.withStartRow(adjustedStartRowKey, - Bytes.toBoolean(adjustedStartRowKeyIncludeBytes)); - PackagePrivateFieldAccessor.setMvccReadPoint(scan, mvccReadPoint); - delegate = region.getScanner(scan); - scan.setAttribute(QueryServices.PHOENIX_PAGING_NEW_SCAN_START_ROWKEY, null); - scan.setAttribute(QueryServices.PHOENIX_PAGING_NEW_SCAN_START_ROWKEY_INCLUDE, null); + private int findLookupPosition(byte[] startRowKey) { + for (int i = 0; i < pointLookupRanges.size(); i++) { + byte[] rowKey = pointLookupRanges.get(i).getLowerRange(); + if (Bytes.compareTo(startRowKey, skipScanFilter.getOffset(), + startRowKey.length - skipScanFilter.getOffset(), rowKey, 0, + rowKey.length) <= 0) { + return i; + } } - if (pagingFilter != null) { - pagingFilter.init(); + return pointLookupRanges.size(); + } + + private boolean verifyStartRowKey(byte[] startRowKey) { + // The startRowKey may not be one of the point lookup keys. This happens when + // the region moves and the HBase client adjusts the scan start row key. + lookupPosition = findLookupPosition(startRowKey); + if (lookupPosition == pointLookupRanges.size()) { + return false; } - boolean hasMore = raw ? delegate.nextRaw(results) : delegate.next(results); - if (pagingFilter == null) { - return hasMore; + byte[] rowKey = pointLookupRanges.get(lookupPosition++).getLowerRange(); + scan.withStopRow(rowKey, true); + scan.withStopRow(rowKey, true); + return true; + } + + private RegionScanner getNewScanner() throws IOException { + if (lookupPosition >= pointLookupRanges.size()) { + return null; } - if (!hasMore) { - // There is no more row from the HBase region scanner. We need to check if PageFilter - // has stopped the region scanner - if (pagingFilter.isStopped()) { - if (results.isEmpty()) { - byte[] rowKey = pagingFilter.getCurrentRowKeyToBeExcluded(); - LOGGER.info("Page filter stopped, generating dummy key {} ", - Bytes.toStringBinary(rowKey)); + byte[] rowKey = pointLookupRanges.get(lookupPosition++).getLowerRange(); + byte[] adjustedRowKey = rowKey; + if (lookupKeyPrefix != null) { + int len = rowKey.length + lookupKeyPrefix.length; + adjustedRowKey = new byte[len]; + System.arraycopy(lookupKeyPrefix, 0, adjustedRowKey, 0, + lookupKeyPrefix.length); + System.arraycopy(rowKey, 0, adjustedRowKey, lookupKeyPrefix.length, + rowKey.length); + } + scan.withStartRow(adjustedRowKey, true); + scan.withStopRow(adjustedRowKey, true); + return region.getScanner(scan); + } + + private boolean hasMore() { + return lookupPosition < pointLookupRanges.size(); + } + private boolean next(List<Cell> results, boolean raw, RegionScanner scanner) + throws IOException { + try { + long startTime = EnvironmentEdgeManager.currentTimeMillis(); + while (true) { + if (raw ? scanner.nextRaw(results) : scanner.next(results)) { + // Since each scan is supposed to return only one row (even when the + // start and stop row key are not the same, which happens after region + // moves or when there are delete markers in the table), this should not + // happen + LOGGER.warn("Each scan is supposed to return only one row, scan " + scan + + ", region " + region); + } + if (!results.isEmpty()) { + return hasMore(); + } + // The scanner returned an empty result. This means that one of the rows + // has been deleted. + if (!hasMore()) { + return false; + } + + if (EnvironmentEdgeManager.currentTimeMillis() - startTime > pageSizeMs) { + byte[] rowKey = pointLookupRanges.get(lookupPosition - 1).getLowerRange(); ScanUtil.getDummyResult(rowKey, results); + return true; + } + + RegionScanner regionScanner = getNewScanner(); + if (regionScanner == null) { + return false; } - return true; + scanner.close(); + scanner = regionScanner; } + } catch (Exception e) { + lookupPosition--; + throw e; + } finally { + scanner.close(); + } + } + } + + public PagingRegionScanner(Region region, RegionScanner scanner, Scan scan) { + super(scanner); + this.region = region; + this.scan = scan; + pagingFilter = ScanUtil.getPhoenixPagingFilter(scan); + } + + void init() throws IOException { + if (initialized) { + return; + } + TableDescriptor tableDescriptor = region.getTableDescriptor(); + BloomType bloomFilterType = tableDescriptor.getColumnFamilies()[0].getBloomFilterType(); + if (bloomFilterType == BloomType.ROW) { + // Check if the scan is a multi-point-lookup scan if so remove it from the scan + SkipScanFilter skipScanFilter = ScanUtil.removeSkipScanFilter(scan); + if (skipScanFilter != null) { + multiKeyPointLookup = new MultiKeyPointLookup(skipScanFilter); + } + } + initialized = true; + } + + private boolean next(List<Cell> results, boolean raw) throws IOException { + init(); + if (pagingFilter != null) { + pagingFilter.init(); + } + byte[] adjustedStartRowKey = + scan.getAttribute(QueryServices.PHOENIX_PAGING_NEW_SCAN_START_ROWKEY); + byte[] adjustedStartRowKeyIncludeBytes = + scan.getAttribute(QueryServices.PHOENIX_PAGING_NEW_SCAN_START_ROWKEY_INCLUDE); + // If scanners at higher level needs to re-scan the data that were already scanned + // earlier, they can provide adjusted new start row key for the scan and whether to + // include it. + // If they are set as the scan attributes, close the scanner, reopen it with + // updated start row key and whether to include it. Update mvcc read point from the + // previous scanner and set it back to the new scanner to maintain the read + // consistency for the given region. + // Once done, continue the scan operation and reset the attributes. + if (adjustedStartRowKey != null && adjustedStartRowKeyIncludeBytes != null) { + long mvccReadPoint = delegate.getMvccReadPoint(); + delegate.close(); + scan.withStartRow(adjustedStartRowKey, + Bytes.toBoolean(adjustedStartRowKeyIncludeBytes)); + PackagePrivateFieldAccessor.setMvccReadPoint(scan, mvccReadPoint); + if (multiKeyPointLookup != null + && !multiKeyPointLookup.verifyStartRowKey(adjustedStartRowKey)) { return false; - } else { - // We got a row from the HBase scanner within the configured time (i.e., the page size). We need to - // start a new page on the next next() call. - return true; } - } catch (Exception e) { - if (pagingFilter != null) { - pagingFilter.init(); + delegate = region.getScanner(scan); + scan.setAttribute(QueryServices.PHOENIX_PAGING_NEW_SCAN_START_ROWKEY, null); + scan.setAttribute(QueryServices.PHOENIX_PAGING_NEW_SCAN_START_ROWKEY_INCLUDE, null); + + } else { + if (multiKeyPointLookup != null) { + RegionScanner regionScanner = multiKeyPointLookup.getNewScanner(); + if (regionScanner == null) { + return false; + } + delegate.close(); + delegate = regionScanner; + } + } + + if (multiKeyPointLookup != null) { + return multiKeyPointLookup.next(results, raw, delegate); + } + boolean hasMore = raw ? delegate.nextRaw(results) : delegate.next(results); + if (pagingFilter == null) { + return hasMore; + } + if (!hasMore) { + // There is no more row from the HBase region scanner. We need to check if + // PagingFilter has stopped the region scanner + if (pagingFilter.isStopped()) { + if (results.isEmpty()) { + byte[] rowKey = pagingFilter.getCurrentRowKeyToBeExcluded(); + LOGGER.info("Page filter stopped, generating dummy key {} ", + Bytes.toStringBinary(rowKey)); + ScanUtil.getDummyResult(rowKey, results); + } + return true; } - throw e; + return false; + } else { + // We got a row from the HBase scanner within the configured time (i.e., + // the page size). We need to start a new page on the next next() call. + return true; } } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java index a9dd460599..ec61489af7 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java @@ -198,7 +198,7 @@ public abstract class UncoveredIndexRegionScanner extends BaseRegionScanner { dataScan.setTimeRange(scan.getTimeRange().getMin(), scan.getTimeRange().getMax()); scanRanges.initializeScan(dataScan); SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter(); - dataScan.setFilter(new SkipScanFilter(skipScanFilter, false)); + dataScan.setFilter(new SkipScanFilter(skipScanFilter, false, true)); dataScan.setAttribute(SERVER_PAGE_SIZE_MS, Bytes.toBytes(Long.valueOf(pageSizeMs))); return dataScan; diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/covered/data/CachedLocalTable.java b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/covered/data/CachedLocalTable.java index eeab9f4264..60b8b5a936 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/covered/data/CachedLocalTable.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/covered/data/CachedLocalTable.java @@ -162,7 +162,7 @@ public class CachedLocalTable implements LocalHBaseState { */ long timestamp = getMaxTimestamp(dataTableMutationsWithSameRowKeyAndTimestamp); scan.setTimeRange(0, timestamp); - scan.setFilter(new SkipScanFilter(skipScanFilter, true)); + scan.setFilter(new SkipScanFilter(skipScanFilter, true, true)); } else { assert scan.isRaw(); scan.readVersions(1); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/InListIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/InListIT.java index d154ecdfd5..04416fa2fe 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/InListIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/InListIT.java @@ -18,6 +18,7 @@ package org.apache.phoenix.end2end; import static java.util.Collections.singletonList; +import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN; import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB; import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; import static org.junit.Assert.assertEquals; @@ -172,10 +173,13 @@ public class InListIT extends ParallelStatsDisabledIT { private static String prefix = generateUniqueName(); boolean checkMaxSkipScanCardinality = true; + boolean bloomFilter = true; - public InListIT(boolean param) throws Exception { + public InListIT(boolean param1, boolean param2) throws Exception { // Setup max skip scan size appropriate for the tests. - checkMaxSkipScanCardinality = param; + checkMaxSkipScanCardinality = param1; + // Run tests with and with bloom filter + bloomFilter = param2; Map<String, String> DEFAULT_PROPERTIES = new HashMap<String, String>() {{ put(QueryServices.MAX_IN_LIST_SKIP_SCAN_SIZE, checkMaxSkipScanCardinality ? String.valueOf(15) : String.valueOf(-1)); }}; @@ -184,10 +188,13 @@ public class InListIT extends ParallelStatsDisabledIT { } - @Parameterized.Parameters(name="checkMaxSkipScanCardinality = {0}") - public static synchronized Collection<Boolean[]> data() { - return Arrays.asList(new Boolean[][] { - { false },{ true } + @Parameterized.Parameters(name="checkMaxSkipScanCardinality = {0}, bloomFilter = {1}") + public static synchronized Collection<Object[]> data() { + return Arrays.asList(new Object[][] { + {true, true}, + {true, false}, + {false, true}, + {false, false} }); } @@ -272,7 +279,8 @@ public class InListIT extends ParallelStatsDisabledIT { * @param isMultiTenant whether or not the table needs a tenant_id column * @return the final DDL statement */ - private static String createTableDDL(String tableName, PDataType pkType, int saltBuckets, boolean isMultiTenant) { + private static String createTableDDL(String tableName, PDataType pkType, int saltBuckets, + boolean isMultiTenant, boolean isBloomFilterEnabled) { StringBuilder ddlBuilder = new StringBuilder(); ddlBuilder.append("CREATE TABLE ").append(tableName).append(" ( "); @@ -302,7 +310,12 @@ public class InListIT extends ParallelStatsDisabledIT { if(isMultiTenant) { ddlBuilder.append("MULTI_TENANT=true"); } - + if (isBloomFilterEnabled) { + if (saltBuckets != 0 || isMultiTenant) { + ddlBuilder.append(", "); + } + ddlBuilder.append("BLOOMFILTER='ROW'"); + } return ddlBuilder.toString(); } @@ -316,9 +329,12 @@ public class InListIT extends ParallelStatsDisabledIT { * @param saltBuckets the number of salt buckets if the table is salted, otherwise 0 * @return the table or view name that should be used to access the created table */ - private static String initializeAndGetTable(Connection baseConn, Connection conn, boolean isMultiTenant, PDataType pkType, int saltBuckets) throws SQLException { + private static String initializeAndGetTable(Connection baseConn, Connection conn, + boolean isMultiTenant, PDataType pkType, int saltBuckets, boolean isBloomFilterEnabled) + throws SQLException { String tableName = getTableName(isMultiTenant, pkType, saltBuckets); - String tableDDL = createTableDDL(tableName, pkType, saltBuckets, isMultiTenant); + String tableDDL = createTableDDL(tableName, pkType, saltBuckets, + isMultiTenant, isBloomFilterEnabled); baseConn.createStatement().execute(tableDDL); // if requested, create a tenant specific view and return the view name instead @@ -440,7 +456,7 @@ public class InListIT extends ParallelStatsDisabledIT { // use a different table with a unique name for each variation String tableName = initializeAndGetTable(baseConn, conn, isMultiTenant, pkType, - saltBuckets); + saltBuckets, bloomFilter); // upsert the given data for (String upsertBody : DEFAULT_UPSERT_BODIES) { diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java index 59dea4e367..43a8ab1a60 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java @@ -173,7 +173,8 @@ public class TableTTLIT extends BaseTest { int flushCounter = RAND.nextInt(maxFlushCounter) + 1; int maskingCounter = RAND.nextInt(maxMaskingCounter) + 1; int verificationCounter = RAND.nextInt(maxVerificationCounter) + 1; - for (int i = 0; i < 500; i++) { + int maxIterationCount = multiCF ? 250 : 500; + for (int i = 0; i < maxIterationCount; i++) { if (flushCounter-- == 0) { injectEdge.incrementValue(1000); LOG.info("Flush " + i + " current time: " + injectEdge.currentTime()); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java index 6e4d17121c..7fa819fad6 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java @@ -139,7 +139,7 @@ public class WhereCompilerTest extends BaseConnectionlessQueryTest { ImmutableList.of(Arrays.asList( pointRange("i1"), pointRange("i2"))), - SchemaUtil.VAR_BINARY_SCHEMA), + SchemaUtil.VAR_BINARY_SCHEMA, false), singleKVFilter( or(constantComparison(CompareOperator.EQUAL,id,"i1"), and(constantComparison(CompareOperator.EQUAL,id,"i2"), @@ -682,7 +682,7 @@ public class WhereCompilerTest extends BaseConnectionlessQueryTest { pointRange(tenantId1), pointRange(tenantId2), pointRange(tenantId3))), - plan.getTableRef().getTable().getRowKeySchema()), + plan.getTableRef().getTable().getRowKeySchema(), false), filter); } @@ -705,7 +705,7 @@ public class WhereCompilerTest extends BaseConnectionlessQueryTest { pointRange(tenantId1), pointRange(tenantId2), pointRange(tenantId3))), - plan.getTableRef().getTable().getRowKeySchema()), + plan.getTableRef().getTable().getRowKeySchema(), false), filter); byte[] startRow = PVarchar.INSTANCE.toBytes(tenantId1); @@ -738,7 +738,7 @@ public class WhereCompilerTest extends BaseConnectionlessQueryTest { Arrays.asList( pointRange(tenantId,entityId1), pointRange(tenantId,entityId2))), - SchemaUtil.VAR_BINARY_SCHEMA), + SchemaUtil.VAR_BINARY_SCHEMA, false), filter); } @@ -768,7 +768,7 @@ public class WhereCompilerTest extends BaseConnectionlessQueryTest { true, Bytes.toBytes(entityId2), true, SortOrder.ASC))), - plan.getTableRef().getTable().getRowKeySchema()), + plan.getTableRef().getTable().getRowKeySchema(), false), filter); } @@ -792,7 +792,7 @@ public class WhereCompilerTest extends BaseConnectionlessQueryTest { pointRange(tenantId1, entityId), pointRange(tenantId2, entityId), pointRange(tenantId3, entityId))), - SchemaUtil.VAR_BINARY_SCHEMA), + SchemaUtil.VAR_BINARY_SCHEMA, false), filter); } @Test @@ -846,7 +846,7 @@ public class WhereCompilerTest extends BaseConnectionlessQueryTest { pointRange(tenantId1, entityId2), pointRange(tenantId2, entityId1), pointRange(tenantId2, entityId2))), - SchemaUtil.VAR_BINARY_SCHEMA), + SchemaUtil.VAR_BINARY_SCHEMA, false), filter); } diff --git a/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanFilterIntersectTest.java b/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanFilterIntersectTest.java index d3597b78ed..1e5314d264 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanFilterIntersectTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanFilterIntersectTest.java @@ -55,7 +55,7 @@ public class SkipScanFilterIntersectTest { public SkipScanFilterIntersectTest(List<List<KeyRange>> slots, RowKeySchema schema, byte[] lowerInclusiveKey, byte[] upperExclusiveKey, List<List<KeyRange>> expectedNewSlots) { - this.filter = new SkipScanFilter(slots, schema); + this.filter = new SkipScanFilter(slots, schema, false); this.lowerInclusiveKey = lowerInclusiveKey; this.upperExclusiveKey = upperExclusiveKey; this.expectedNewSlots = expectedNewSlots; diff --git a/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanFilterTest.java b/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanFilterTest.java index ceeaa6bcdd..1e02c9de9d 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanFilterTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanFilterTest.java @@ -95,9 +95,9 @@ public class SkipScanFilterTest extends TestCase { }, width <= 0, SortOrder.getDefault()); } if(slotSpans==null) { - skipper = new SkipScanFilter(cnf, builder.build()); + skipper = new SkipScanFilter(cnf, builder.build(), false); } else { - skipper = new SkipScanFilter(cnf, slotSpans,builder.build()); + skipper = new SkipScanFilter(cnf, slotSpans,builder.build(), false); } } diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java index 8433906f37..b6e367a77b 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java @@ -291,7 +291,7 @@ public class ParallelIteratorsSplitTest extends BaseConnectionlessQueryTest { } private static Collection<?> foreach(ScanRanges scanRanges, int[] widths, KeyRange[] expectedSplits) { - SkipScanFilter filter = new SkipScanFilter(scanRanges.getRanges(), buildSchema(widths)); + SkipScanFilter filter = new SkipScanFilter(scanRanges.getRanges(), buildSchema(widths), false); Scan scan = new Scan().setFilter(filter).withStartRow(KeyRange.UNBOUND).withStopRow(KeyRange.UNBOUND, true); List<Object> ret = Lists.newArrayList(); ret.add(new Object[] {scan, scanRanges, Arrays.<KeyRange>asList(expectedSplits)}); @@ -301,7 +301,7 @@ public class ParallelIteratorsSplitTest extends BaseConnectionlessQueryTest { private static Collection<?> foreach(KeyRange[][] ranges, int[] widths, KeyRange[] expectedSplits) { RowKeySchema schema = buildSchema(widths); List<List<KeyRange>> slots = Lists.transform(Lists.newArrayList(ranges), ARRAY_TO_LIST); - SkipScanFilter filter = new SkipScanFilter(slots, schema); + SkipScanFilter filter = new SkipScanFilter(slots, schema, false); // Always set start and stop key to max to verify we are using the information in skipscan // filter over the scan's KMIN and KMAX. Scan scan = new Scan().setFilter(filter);