Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-1.0 421ad309b -> fd757a055


PHOENIX-2657 Transactionally deleted cells become visible after few hours


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/9d3e8efc
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/9d3e8efc
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/9d3e8efc

Branch: refs/heads/4.x-HBase-1.0
Commit: 9d3e8efc5ee54bfdc114843308300d372a74ae00
Parents: 421ad30
Author: James Taylor <jtay...@salesforce.com>
Authored: Fri Feb 12 09:38:43 2016 -0800
Committer: James Taylor <jtay...@salesforce.com>
Committed: Fri Feb 12 11:05:26 2016 -0800

----------------------------------------------------------------------
 .../apache/phoenix/filter/SkipScanFilter.java   | 47 +++++++++++++-------
 .../phoenix/index/PhoenixIndexBuilder.java      |  4 +-
 .../index/PhoenixTransactionalIndexer.java      |  5 ++-
 3 files changed, 38 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/9d3e8efc/phoenix-core/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java 
b/phoenix-core/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java
index 00320ce..c966d91 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java
@@ -43,8 +43,6 @@ import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ScanUtil.BytesComparator;
 import org.apache.phoenix.util.SchemaUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Objects;
 import com.google.common.collect.Lists;
@@ -63,8 +61,6 @@ import com.google.common.hash.Hashing;
  * @since 0.1
  */
 public class SkipScanFilter extends FilterBase implements Writable {
-    private static final Logger logger = 
LoggerFactory.getLogger(SkipScanFilter.class);
-
     private enum Terminate {AT, AFTER};
     // Conjunctive normal form of or-ed ranges or point lookups
     private List<List<KeyRange>> slots;
@@ -72,6 +68,7 @@ public class SkipScanFilter extends FilterBase implements 
Writable {
     private int[] slotSpan;
     // schema of the row key
     private RowKeySchema schema;
+    private boolean includeMultipleVersions;
     // current position for each slot
     private int[] position;
     // buffer used for skip hint
@@ -94,19 +91,27 @@ public class SkipScanFilter extends FilterBase implements 
Writable {
     public SkipScanFilter() {
     }
 
+    public SkipScanFilter(SkipScanFilter filter, boolean 
includeMultipleVersions) {
+        this(filter.slots, filter.slotSpan, filter.schema, 
includeMultipleVersions);
+    }
+
     public SkipScanFilter(List<List<KeyRange>> slots, RowKeySchema schema) {
         this(slots, ScanUtil.getDefaultSlotSpans(slots.size()), schema);
     }
 
     public SkipScanFilter(List<List<KeyRange>> slots, int[] slotSpan, 
RowKeySchema schema) {
-        init(slots, slotSpan, schema);
+        this(slots, slotSpan, schema, false);
+    }
+    
+    private SkipScanFilter(List<List<KeyRange>> slots, int[] slotSpan, 
RowKeySchema schema, boolean includeMultipleVersions) {
+        init(slots, slotSpan, schema, includeMultipleVersions);
     }
     
     public void setOffset(int offset) {
         this.offset = offset;
     }
 
-    private void init(List<List<KeyRange>> slots, int[] slotSpan, RowKeySchema 
schema) {
+    private void init(List<List<KeyRange>> slots, int[] slotSpan, RowKeySchema 
schema, boolean includeMultipleVersions) {
         for (List<KeyRange> ranges : slots) {
             if (ranges.isEmpty()) {
                 throw new IllegalStateException();
@@ -117,9 +122,10 @@ public class SkipScanFilter extends FilterBase implements 
Writable {
         this.schema = schema;
         this.maxKeyLength = SchemaUtil.getMaxKeyLength(schema, slots);
         this.position = new int[slots.size()];
-        startKey = new byte[maxKeyLength];
-        endKey = new byte[maxKeyLength];
-        endKeyLength = 0;
+        this.startKey = new byte[maxKeyLength];
+        this.endKey = new byte[maxKeyLength];
+        this.endKeyLength = 0;
+        this.includeMultipleVersions = includeMultipleVersions;
     }
 
     // Exposed for testing.
@@ -345,15 +351,20 @@ public class SkipScanFilter extends FilterBase implements 
Writable {
         return i;
     }
     
+    private ReturnCode getIncludeReturnCode() {
+        return includeMultipleVersions ? ReturnCode.INCLUDE : 
ReturnCode.INCLUDE_AND_NEXT_COL;
+    }
+    
     @edu.umd.cs.findbugs.annotations.SuppressWarnings(
             value="QBA_QUESTIONABLE_BOOLEAN_ASSIGNMENT", 
             justification="Assignment designed to work this way.")
     private ReturnCode navigate(final byte[] currentKey, final int offset, 
final int length, Terminate terminate) {
         int nSlots = slots.size();
+
         // First check to see if we're in-range until we reach our end key
         if (endKeyLength > 0) {
             if (Bytes.compareTo(currentKey, offset, length, endKey, 0, 
endKeyLength) < 0) {
-                return ReturnCode.INCLUDE_AND_NEXT_COL;
+                return getIncludeReturnCode();
             }
 
             // If key range of last slot is a single key, we can increment our 
position
@@ -485,7 +496,7 @@ public class SkipScanFilter extends FilterBase implements 
Writable {
         // up to the upper range of our last slot. We do this for ranges and 
single keys
         // since we potentially have multiple key values for the same row key.
         setEndKey(ptr, minOffset, i);
-        return ReturnCode.INCLUDE_AND_NEXT_COL;
+        return getIncludeReturnCode();
     }
 
     private boolean allTrailingNulls(int i) {
@@ -559,9 +570,14 @@ public class SkipScanFilter extends FilterBase implements 
Writable {
         RowKeySchema schema = new RowKeySchema();
         schema.readFields(in);
         int andLen = in.readInt();
+        boolean includeMultipleVersions = false;
+        if (andLen < 0) {
+            andLen = -andLen;
+            includeMultipleVersions = true;
+        }
         int[] slotSpan = new int[andLen];
         List<List<KeyRange>> slots = 
Lists.newArrayListWithExpectedSize(andLen);
-        for (int i=0; i<andLen; i++) {
+        for (int i = 0; i < andLen; i++) {
             int orLenWithSlotSpan = in.readInt();
             int orLen = orLenWithSlotSpan;
             /*
@@ -582,15 +598,16 @@ public class SkipScanFilter extends FilterBase implements 
Writable {
                 orClause.add(range);
             }
         }
-        this.init(slots, slotSpan, schema);
+        this.init(slots, slotSpan, schema, includeMultipleVersions);
     }
 
     @Override
     public void write(DataOutput out) throws IOException {
         assert(slots.size() == slotSpan.length);
         schema.write(out);
-        out.writeInt(slots.size());
-        for (int i = 0; i < slots.size(); i++) {
+        int nSlots = slots.size();
+        out.writeInt(this.includeMultipleVersions ? -nSlots : nSlots);
+        for (int i = 0; i < nSlots; i++) {
             List<KeyRange> orLen = slots.get(i);
             int span = slotSpan[i];
             int orLenWithSlotSpan = -( ( (span << KEY_RANGE_LENGTH_BITS) | 
orLen.size() ) + 1);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9d3e8efc/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
index fed8dcf..aa3ea51 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.filter.SkipScanFilter;
 import org.apache.phoenix.hbase.index.covered.IndexMetaData;
 import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
 import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
@@ -90,10 +91,9 @@ public class PhoenixIndexBuilder extends NonTxIndexBuilder {
         }
         if (maintainers.isEmpty()) return;
         Scan scan = IndexManagementUtil.newLocalStateScan(new 
ArrayList<IndexMaintainer>(maintainers.values()));
-        scan.setRaw(true);
         ScanRanges scanRanges = ScanRanges.createPointLookup(keys);
         scanRanges.initializeScan(scan);
-        scan.setFilter(scanRanges.getSkipScanFilter());
+        scan.setFilter(new 
SkipScanFilter(scanRanges.getSkipScanFilter(),true));
         HRegion region = env.getRegion();
         RegionScanner scanner = region.getScanner(scan);
         // Run through the scanner using internal nextRaw method

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9d3e8efc/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index 2925b09..0e91dd6 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -53,6 +53,7 @@ import org.apache.htrace.Span;
 import org.apache.htrace.Trace;
 import org.apache.htrace.TraceScope;
 import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.filter.SkipScanFilter;
 import org.apache.phoenix.hbase.index.MultiMutation;
 import org.apache.phoenix.hbase.index.ValueGetter;
 import org.apache.phoenix.hbase.index.covered.IndexUpdate;
@@ -229,7 +230,6 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
                 
scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), 
QueryConstants.EMPTY_COLUMN_BYTES);
                 ScanRanges scanRanges = 
ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, 
Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN, 
KeyRange.EVERYTHING_RANGE, null, true, -1);
                 scanRanges.initializeScan(scan);
-                scan.setFilter(scanRanges.getSkipScanFilter());
                 TableName tableName = 
env.getRegion().getRegionInfo().getTable();
                 HTableInterface htable = env.getTable(tableName);
                 txTable = new TransactionAwareHTable(htable);
@@ -237,9 +237,12 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
                 // For rollback, we need to see all versions, including
                 // the last committed version as there may be multiple
                 // checkpointed versions.
+                SkipScanFilter filter = scanRanges.getSkipScanFilter();
                 if (isRollback) {
+                    filter = new SkipScanFilter(filter,true);
                     tx.setVisibility(VisibilityLevel.SNAPSHOT_ALL);
                 }
+                scan.setFilter(filter);
                 currentScanner = txTable.getScanner(scan);
             }
             if (isRollback) {

Reply via email to