Repository: phoenix Updated Branches: refs/heads/4.x-HBase-1.0 421ad309b -> fd757a055
PHOENIX-2657 Transactionally deleted cells become visible after few hours Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/9d3e8efc Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/9d3e8efc Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/9d3e8efc Branch: refs/heads/4.x-HBase-1.0 Commit: 9d3e8efc5ee54bfdc114843308300d372a74ae00 Parents: 421ad30 Author: James Taylor <jtay...@salesforce.com> Authored: Fri Feb 12 09:38:43 2016 -0800 Committer: James Taylor <jtay...@salesforce.com> Committed: Fri Feb 12 11:05:26 2016 -0800 ---------------------------------------------------------------------- .../apache/phoenix/filter/SkipScanFilter.java | 47 +++++++++++++------- .../phoenix/index/PhoenixIndexBuilder.java | 4 +- .../index/PhoenixTransactionalIndexer.java | 5 ++- 3 files changed, 38 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/9d3e8efc/phoenix-core/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java index 00320ce..c966d91 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java @@ -43,8 +43,6 @@ import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ScanUtil.BytesComparator; import org.apache.phoenix.util.SchemaUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.google.common.base.Objects; import com.google.common.collect.Lists; @@ -63,8 +61,6 @@ import com.google.common.hash.Hashing; * @since 0.1 */ public class SkipScanFilter extends FilterBase implements Writable { - private static final Logger logger = LoggerFactory.getLogger(SkipScanFilter.class); - private enum Terminate {AT, AFTER}; // Conjunctive normal form of or-ed ranges or point lookups private List<List<KeyRange>> slots; @@ -72,6 +68,7 @@ public class SkipScanFilter extends FilterBase implements Writable { private int[] slotSpan; // schema of the row key private RowKeySchema schema; + private boolean includeMultipleVersions; // current position for each slot private int[] position; // buffer used for skip hint @@ -94,19 +91,27 @@ public class SkipScanFilter extends FilterBase implements Writable { public SkipScanFilter() { } + public SkipScanFilter(SkipScanFilter filter, boolean includeMultipleVersions) { + this(filter.slots, filter.slotSpan, filter.schema, includeMultipleVersions); + } + public SkipScanFilter(List<List<KeyRange>> slots, RowKeySchema schema) { this(slots, ScanUtil.getDefaultSlotSpans(slots.size()), schema); } public SkipScanFilter(List<List<KeyRange>> slots, int[] slotSpan, RowKeySchema schema) { - init(slots, slotSpan, schema); + this(slots, slotSpan, schema, false); + } + + private SkipScanFilter(List<List<KeyRange>> slots, int[] slotSpan, RowKeySchema schema, boolean includeMultipleVersions) { + init(slots, slotSpan, schema, includeMultipleVersions); } public void setOffset(int offset) { this.offset = offset; } - private void init(List<List<KeyRange>> slots, int[] slotSpan, RowKeySchema schema) { + private void init(List<List<KeyRange>> slots, int[] slotSpan, RowKeySchema schema, boolean includeMultipleVersions) { for (List<KeyRange> ranges : slots) { if (ranges.isEmpty()) { throw new IllegalStateException(); @@ -117,9 +122,10 @@ public class SkipScanFilter extends FilterBase implements Writable { this.schema = schema; this.maxKeyLength = SchemaUtil.getMaxKeyLength(schema, slots); this.position = new int[slots.size()]; - startKey = new byte[maxKeyLength]; - endKey = new byte[maxKeyLength]; - endKeyLength = 0; + this.startKey = new byte[maxKeyLength]; + this.endKey = new byte[maxKeyLength]; + this.endKeyLength = 0; + this.includeMultipleVersions = includeMultipleVersions; } // Exposed for testing. @@ -345,15 +351,20 @@ public class SkipScanFilter extends FilterBase implements Writable { return i; } + private ReturnCode getIncludeReturnCode() { + return includeMultipleVersions ? ReturnCode.INCLUDE : ReturnCode.INCLUDE_AND_NEXT_COL; + } + @edu.umd.cs.findbugs.annotations.SuppressWarnings( value="QBA_QUESTIONABLE_BOOLEAN_ASSIGNMENT", justification="Assignment designed to work this way.") private ReturnCode navigate(final byte[] currentKey, final int offset, final int length, Terminate terminate) { int nSlots = slots.size(); + // First check to see if we're in-range until we reach our end key if (endKeyLength > 0) { if (Bytes.compareTo(currentKey, offset, length, endKey, 0, endKeyLength) < 0) { - return ReturnCode.INCLUDE_AND_NEXT_COL; + return getIncludeReturnCode(); } // If key range of last slot is a single key, we can increment our position @@ -485,7 +496,7 @@ public class SkipScanFilter extends FilterBase implements Writable { // up to the upper range of our last slot. We do this for ranges and single keys // since we potentially have multiple key values for the same row key. setEndKey(ptr, minOffset, i); - return ReturnCode.INCLUDE_AND_NEXT_COL; + return getIncludeReturnCode(); } private boolean allTrailingNulls(int i) { @@ -559,9 +570,14 @@ public class SkipScanFilter extends FilterBase implements Writable { RowKeySchema schema = new RowKeySchema(); schema.readFields(in); int andLen = in.readInt(); + boolean includeMultipleVersions = false; + if (andLen < 0) { + andLen = -andLen; + includeMultipleVersions = true; + } int[] slotSpan = new int[andLen]; List<List<KeyRange>> slots = Lists.newArrayListWithExpectedSize(andLen); - for (int i=0; i<andLen; i++) { + for (int i = 0; i < andLen; i++) { int orLenWithSlotSpan = in.readInt(); int orLen = orLenWithSlotSpan; /* @@ -582,15 +598,16 @@ public class SkipScanFilter extends FilterBase implements Writable { orClause.add(range); } } - this.init(slots, slotSpan, schema); + this.init(slots, slotSpan, schema, includeMultipleVersions); } @Override public void write(DataOutput out) throws IOException { assert(slots.size() == slotSpan.length); schema.write(out); - out.writeInt(slots.size()); - for (int i = 0; i < slots.size(); i++) { + int nSlots = slots.size(); + out.writeInt(this.includeMultipleVersions ? -nSlots : nSlots); + for (int i = 0; i < nSlots; i++) { List<KeyRange> orLen = slots.get(i); int span = slotSpan[i]; int orLenWithSlotSpan = -( ( (span << KEY_RANGE_LENGTH_BITS) | orLen.size() ) + 1); http://git-wip-us.apache.org/repos/asf/phoenix/blob/9d3e8efc/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java index fed8dcf..aa3ea51 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.phoenix.compile.ScanRanges; +import org.apache.phoenix.filter.SkipScanFilter; import org.apache.phoenix.hbase.index.covered.IndexMetaData; import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder; import org.apache.phoenix.hbase.index.util.IndexManagementUtil; @@ -90,10 +91,9 @@ public class PhoenixIndexBuilder extends NonTxIndexBuilder { } if (maintainers.isEmpty()) return; Scan scan = IndexManagementUtil.newLocalStateScan(new ArrayList<IndexMaintainer>(maintainers.values())); - scan.setRaw(true); ScanRanges scanRanges = ScanRanges.createPointLookup(keys); scanRanges.initializeScan(scan); - scan.setFilter(scanRanges.getSkipScanFilter()); + scan.setFilter(new SkipScanFilter(scanRanges.getSkipScanFilter(),true)); HRegion region = env.getRegion(); RegionScanner scanner = region.getScanner(scan); // Run through the scanner using internal nextRaw method http://git-wip-us.apache.org/repos/asf/phoenix/blob/9d3e8efc/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java index 2925b09..0e91dd6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java @@ -53,6 +53,7 @@ import org.apache.htrace.Span; import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; import org.apache.phoenix.compile.ScanRanges; +import org.apache.phoenix.filter.SkipScanFilter; import org.apache.phoenix.hbase.index.MultiMutation; import org.apache.phoenix.hbase.index.ValueGetter; import org.apache.phoenix.hbase.index.covered.IndexUpdate; @@ -229,7 +230,6 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), QueryConstants.EMPTY_COLUMN_BYTES); ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN, KeyRange.EVERYTHING_RANGE, null, true, -1); scanRanges.initializeScan(scan); - scan.setFilter(scanRanges.getSkipScanFilter()); TableName tableName = env.getRegion().getRegionInfo().getTable(); HTableInterface htable = env.getTable(tableName); txTable = new TransactionAwareHTable(htable); @@ -237,9 +237,12 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { // For rollback, we need to see all versions, including // the last committed version as there may be multiple // checkpointed versions. + SkipScanFilter filter = scanRanges.getSkipScanFilter(); if (isRollback) { + filter = new SkipScanFilter(filter,true); tx.setVisibility(VisibilityLevel.SNAPSHOT_ALL); } + scan.setFilter(filter); currentScanner = txTable.getScanner(scan); } if (isRollback) {