Repository: phoenix
Updated Branches:
  refs/heads/txn 16dd8ca15 -> d905a6662


Fix index maintenance for transactional tables


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d905a666
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d905a666
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d905a666

Branch: refs/heads/txn
Commit: d905a6662801a104b33228f184de4dfa9158bad0
Parents: 16dd8ca
Author: James Taylor <[email protected]>
Authored: Wed May 13 11:07:35 2015 -0700
Committer: James Taylor <[email protected]>
Committed: Wed May 13 11:07:35 2015 -0700

----------------------------------------------------------------------
 .../end2end/index/BaseMutableIndexIT.java       |  6 ++-
 .../apache/phoenix/index/IndexMaintainer.java   | 10 +++--
 .../index/PhoenixTransactionalIndexer.java      | 39 +++++++++++++++++++-
 3 files changed, 49 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/d905a666/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseMutableIndexIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseMutableIndexIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseMutableIndexIT.java
index b2f8630..c6aadca 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseMutableIndexIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseMutableIndexIT.java
@@ -1094,6 +1094,7 @@ public abstract class BaseMutableIndexIT extends 
BaseHBaseManagedTimeIT {
         }
     }
     
+
     @Test
     public void testUpsertingNullForIndexedColumns() throws Exception {
        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
@@ -1107,11 +1108,12 @@ public abstract class BaseMutableIndexIT extends 
BaseHBaseManagedTimeIT {
                //create a row with value null for indexed column v2
                stmt.executeUpdate("upsert into DEMO values('cc1', null, 
'abc')");
                conn.commit();
-            
+               
                //assert values in index table 
                ResultSet rs = stmt.executeQuery("select * from DEMO_IDX");
                assertTrue(rs.next());
                assertEquals(0, Doubles.compare(0, rs.getDouble(1)));
+               assertTrue(rs.wasNull());
                assertEquals("cc1", rs.getString(2));
                assertEquals("abc", rs.getString(3));
                assertFalse(rs.next());
@@ -1121,6 +1123,7 @@ public abstract class BaseMutableIndexIT extends 
BaseHBaseManagedTimeIT {
                assertTrue(rs.next());
                assertEquals("cc1", rs.getString(1));
                assertEquals(0, Doubles.compare(0, rs.getDouble(2)));
+               assertTrue(rs.wasNull());
                assertEquals("abc", rs.getString(3));
                assertFalse(rs.next());
                
@@ -1152,6 +1155,7 @@ public abstract class BaseMutableIndexIT extends 
BaseHBaseManagedTimeIT {
                rs = stmt.executeQuery("select * from DEMO_IDX");
                assertTrue(rs.next());
                assertEquals(0, Doubles.compare(0, rs.getDouble(1)));
+               assertTrue(rs.wasNull());
                assertEquals("cc1", rs.getString(2));
                assertEquals("abc", rs.getString(3));
                assertFalse(rs.next());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d905a666/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java 
b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index 3e1cb9c..1be0aa3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -845,15 +845,19 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         // Delete the entire row if any of the indexed columns changed
         DeleteType deleteType = null;
         if (oldState == null || 
(deleteType=getDeleteTypeOrNull(pendingUpdates)) != null || 
hasIndexedColumnChanged(oldState, pendingUpdates)) { // Deleting the entire row
-            Delete delete;
+            byte[] emptyCF = emptyKeyValueCFPtr.copyBytesIfNecessary();
+            Delete delete = new Delete(indexRowKey);
             // If table delete was single version, then index delete should be 
as well
             if (deleteType == DeleteType.SINGLE_VERSION) {
-                delete = new Delete(indexRowKey);
                 for (ColumnReference ref : getAllColumns()) { // FIXME: Keep 
Set<byte[]> for index CFs?
                     delete.deleteFamilyVersion(ref.getFamily(), ts);
                 }
+                delete.deleteFamilyVersion(emptyCF, ts);
             } else {
-                delete = new Delete(indexRowKey, ts);
+                for (ColumnReference ref : getAllColumns()) { // FIXME: Keep 
Set<byte[]> for index CFs?
+                    delete.deleteFamily(ref.getFamily(), ts);
+                }
+                delete.deleteFamily(emptyCF, ts);
             }
             delete.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : 
Durability.SKIP_WAL);
             return delete;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d905a666/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index 38d6fd1..6f1e28c 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -37,7 +37,9 @@ import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.hbase.index.MultiMutation;
@@ -50,6 +52,8 @@ import 
org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.write.IndexWriter;
 import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.trace.TracingUtils;
 import org.apache.phoenix.trace.util.NullSpan;
@@ -132,6 +136,29 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
         }
     }
 
+    private static final String TX_NO_READ_OWN_WRITES = 
"TX_NO_READ_OWN_WRITES";
+    @Override
+    public RegionScanner 
preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan, 
RegionScanner s) {
+        /*
+         * TODO: remove once Tephra gives us a way to not read our own writes.
+         *  Hack to force scan not to read their own writes. Since the 
mutations have already been
+         *  applied by the time the preBatchMutate hook is called, we need to 
adjust the max time
+         *  range down by one to prevent us from seeing the current state. 
Instead, we need to
+         *  see the state right before our Puts have been applied.
+         */
+        byte[] encoded = scan.getAttribute(TX_NO_READ_OWN_WRITES);
+        if (encoded != null) {
+            TimeRange range = scan.getTimeRange();
+            long maxTime = range.getMax();
+            try {
+                scan.setTimeRange(range.getMin(), maxTime == Long.MAX_VALUE ? 
maxTime : maxTime-1);
+            } catch (IOException e1) {
+                throw new RuntimeException(e1);
+            }
+        }
+        return s;
+    }
+
     private Collection<Pair<Mutation, byte[]>> 
getIndexUpdates(RegionCoprocessorEnvironment env, 
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
         // Collect the set of mutable ColumnReferences so that we can first
         // run a scan to get the current state. We'll need this to delete
@@ -174,6 +201,13 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
                     
keys.add(PVarbinary.INSTANCE.getKeyRange(ptr.copyBytesIfNecessary()));
                 }
                 Scan scan = new Scan();
+                scan.setAttribute(TX_NO_READ_OWN_WRITES, 
PDataType.TRUE_BYTES); // TODO: remove when Tephra allows this
+                // Project all mutable columns
+                for (ColumnReference ref : mutableColumns) {
+                    scan.addColumn(ref.getFamily(), ref.getQualifier());
+                }
+                // Project empty key value column
+                
scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), 
QueryConstants.EMPTY_COLUMN_BYTES);
                 ScanRanges scanRanges = 
ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, 
Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN);
                 scanRanges.initializeScan(scan);
                 scan.setFilter(scanRanges.getSkipScanFilter());
@@ -190,8 +224,9 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
                     TxTableState state = new TxTableState(env, mutableColumns, 
updateAttributes, tx.getWritePointer(), m, result);
                     Iterable<IndexUpdate> deletes = 
codec.getIndexDeletes(state, indexMetaData);
                     for (IndexUpdate delete : deletes) {
-                       if (delete.isValid()) 
-                               indexUpdates.add(new Pair<Mutation, 
byte[]>(delete.getUpdate(),delete.getTableName()));
+                        if (delete.isValid()) {
+                            indexUpdates.add(new Pair<Mutation, 
byte[]>(delete.getUpdate(),delete.getTableName()));
+                        }
                     }
                     state.applyMutation();
                     Iterable<IndexUpdate> puts = codec.getIndexUpserts(state, 
indexMetaData);

Reply via email to