PHOENIX-4095 Prevent index from getting out of sync with data table during 
partial rebuild


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/572d242d
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/572d242d
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/572d242d

Branch: refs/heads/4.x-HBase-1.2
Commit: 572d242dca11c93fcc9d2dca0bf5c836a05ce36b
Parents: 9219bc2
Author: James Taylor <jamestay...@apache.org>
Authored: Thu Aug 17 15:47:11 2017 -0700
Committer: James Taylor <jamestay...@apache.org>
Committed: Thu Aug 17 17:02:22 2017 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/ConcurrentMutationsIT.java  |   3 +-
 .../phoenix/end2end/OutOfOrderMutationsIT.java  |  19 +-
 .../end2end/index/PartialIndexRebuilderIT.java  | 197 +++++++++--
 .../apache/phoenix/util/IndexScrutinyIT.java    |  89 +++++
 .../org/apache/phoenix/util/TestUtilIT.java     |  89 -----
 .../UngroupedAggregateRegionObserver.java       |   5 +
 .../org/apache/phoenix/hbase/index/Indexer.java | 171 ++++------
 .../hbase/index/covered/NonTxIndexBuilder.java  |  93 +----
 .../covered/example/CoveredColumnIndexer.java   |   5 +-
 .../hbase/index/util/IndexManagementUtil.java   |  94 +++++
 .../hbase/index/covered/CoveredColumnsTest.java |  46 +++
 .../index/covered/LocalTableStateTest.java      | 293 ++++++++++++++++
 .../index/covered/NonTxIndexBuilderTest.java    | 341 +++++++++++++++++++
 .../hbase/index/covered/TestCoveredColumns.java |  46 ---
 .../index/covered/TestLocalTableState.java      | 293 ----------------
 .../index/covered/TestNonTxIndexBuilder.java    | 335 ------------------
 .../org/apache/phoenix/util/IndexScrutiny.java  | 144 ++++++++
 .../java/org/apache/phoenix/util/TestUtil.java  | 109 +-----
 18 files changed, 1289 insertions(+), 1083 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/572d242d/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java
index d3e3761..e674d8f 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java
@@ -47,6 +47,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.IndexScrutiny;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
@@ -274,7 +275,7 @@ public class ConcurrentMutationsIT extends 
BaseUniqueNamesOwnClusterIT {
         }
         
         assertTrue("Ran out of time", doneSignal.await(120, TimeUnit.SECONDS));
-        long actualRowCount = TestUtil.scrutinizeIndex(conn, tableName, 
indexName);
+        long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, tableName, 
indexName);
         assertEquals(nRows, actualRowCount);
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/572d242d/phoenix-core/src/it/java/org/apache/phoenix/end2end/OutOfOrderMutationsIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OutOfOrderMutationsIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OutOfOrderMutationsIT.java
index 5cdc1ee..e8adf6b 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OutOfOrderMutationsIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OutOfOrderMutationsIT.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.util.IndexScrutiny;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.TestUtil;
@@ -91,7 +92,7 @@ public class OutOfOrderMutationsIT extends 
ParallelStatsDisabledIT {
         props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));
         conn = DriverManager.getConnection(getUrl(), props);
         
-        TestUtil.scrutinizeIndex(conn, tableName, indexName);        
+        IndexScrutiny.scrutinizeIndex(conn, tableName, indexName);        
         assertNoTimeStampAt(conn, indexName, 1030);
         conn.close();
         
@@ -175,7 +176,7 @@ public class OutOfOrderMutationsIT extends 
ParallelStatsDisabledIT {
         props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));
         conn = DriverManager.getConnection(getUrl(), props);
         
-        TestUtil.scrutinizeIndex(conn, tableName, indexName);        
+        IndexScrutiny.scrutinizeIndex(conn, tableName, indexName);        
         
         ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ 
NO_INDEX */ ts FROM " + tableName);
         assertTrue(rs.next());
@@ -265,7 +266,7 @@ public class OutOfOrderMutationsIT extends 
ParallelStatsDisabledIT {
         
TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName)));
         
TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(indexName)));
 
-        TestUtil.scrutinizeIndex(conn, tableName, indexName);        
+        IndexScrutiny.scrutinizeIndex(conn, tableName, indexName);        
         
         ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ 
NO_INDEX */ ts FROM " + tableName);
         assertTrue(rs.next());
@@ -331,7 +332,7 @@ public class OutOfOrderMutationsIT extends 
ParallelStatsDisabledIT {
         
TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName)));
         
TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(indexName)));
 
-        TestUtil.scrutinizeIndex(conn, tableName, indexName);        
+        IndexScrutiny.scrutinizeIndex(conn, tableName, indexName);        
         
         ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ 
NO_INDEX */ ts FROM " + tableName);
         assertTrue(rs.next());
@@ -393,7 +394,7 @@ public class OutOfOrderMutationsIT extends 
ParallelStatsDisabledIT {
         
TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName)));
         
TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(indexName)));
 
-        TestUtil.scrutinizeIndex(conn, tableName, indexName);        
+        IndexScrutiny.scrutinizeIndex(conn, tableName, indexName);        
         
         ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ 
NO_INDEX */ ts,v FROM " + tableName);
         assertTrue(rs.next());
@@ -458,7 +459,7 @@ public class OutOfOrderMutationsIT extends 
ParallelStatsDisabledIT {
         
TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName)));
         
TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(indexName)));
 
-        TestUtil.scrutinizeIndex(conn, tableName, indexName);        
+        IndexScrutiny.scrutinizeIndex(conn, tableName, indexName);        
         
         ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ 
NO_INDEX */ ts,v FROM " + tableName);
         assertTrue(rs.next());
@@ -523,7 +524,7 @@ public class OutOfOrderMutationsIT extends 
ParallelStatsDisabledIT {
         
TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName)));
         
TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(indexName)));
 
-        TestUtil.scrutinizeIndex(conn, tableName, indexName);        
+        IndexScrutiny.scrutinizeIndex(conn, tableName, indexName);        
         
         ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ 
NO_INDEX */ ts,v FROM " + tableName);
         assertTrue(rs.next());
@@ -587,7 +588,7 @@ public class OutOfOrderMutationsIT extends 
ParallelStatsDisabledIT {
         
TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName)));
         
TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(indexName)));
 
-        long rowCount = TestUtil.scrutinizeIndex(conn, tableName, indexName);
+        long rowCount = IndexScrutiny.scrutinizeIndex(conn, tableName, 
indexName);
         assertEquals(0,rowCount);
         
         conn.close();
@@ -640,7 +641,7 @@ public class OutOfOrderMutationsIT extends 
ParallelStatsDisabledIT {
         
TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName)));
         
TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(indexName)));
 
-        long rowCount = TestUtil.scrutinizeIndex(conn, tableName, indexName);
+        long rowCount = IndexScrutiny.scrutinizeIndex(conn, tableName, 
indexName);
         assertEquals(0,rowCount);
         
         conn.close();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/572d242d/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
index 8c43e3f..bc0dda8 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
@@ -18,44 +18,203 @@
 package org.apache.phoenix.end2end.index;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.assertTrue;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
+import java.sql.SQLException;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PMetaData;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.IndexScrutiny;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.Repeat;
+import org.apache.phoenix.util.RunUntilFailure;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 
 import com.google.common.collect.Maps;
 
+@RunWith(RunUntilFailure.class)
 public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
+    private static final Random RAND = new Random(5);
+
     @BeforeClass
     public static void doSetup() throws Exception {
         Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10);
         serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, 
Boolean.TRUE.toString());
         
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, 
"1000");
         
serverProps.put(QueryServices.INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD, 
"30000"); // give up rebuilding after 30 seconds
-        
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_FORWARD_TIME_ATTRIB,
 Long.toString(1000));
+        
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_FORWARD_TIME_ATTRIB,
 Long.toString(2000));
         setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), 
ReadOnlyProps.EMPTY_PROPS);
     }
 
+    private static boolean mutateRandomly(Connection conn, String 
fullTableName, int nRows) throws Exception {
+        return mutateRandomly(conn, fullTableName, nRows, false);
+    }
+    
+    private static boolean hasInactiveIndex(PMetaData metaCache, PTableKey 
key) throws TableNotFoundException {
+        PTable table = metaCache.getTableRef(key).getTable();
+        for (PTable index : table.getIndexes()) {
+            if (index.getIndexState() == PIndexState.INACTIVE) {
+                return true;
+            }
+        }
+        return false;
+    }
+    
+    private static boolean isAllActiveIndex(PMetaData metaCache, PTableKey 
key) throws TableNotFoundException {
+        PTable table = metaCache.getTableRef(key).getTable();
+        for (PTable index : table.getIndexes()) {
+            if (index.getIndexState() != PIndexState.ACTIVE) {
+                return false;
+            }
+        }
+        return true;
+    }
+    
+    private static boolean mutateRandomly(Connection conn, String 
fullTableName, int nRows, boolean checkForInactive) throws SQLException, 
InterruptedException {
+        PTableKey key = new PTableKey(null,fullTableName);
+        PMetaData metaCache = 
conn.unwrap(PhoenixConnection.class).getMetaDataCache();
+        boolean hasInactiveIndex = false;
+        int batchSize = checkForInactive && !isAllActiveIndex(metaCache, key) 
? 1 : 200;
+        for (int i = 0; i < 10000; i++) {
+            int pk = Math.abs(RAND.nextInt()) % nRows;
+            int v1 = Math.abs(RAND.nextInt()) % nRows;
+            int v2 = Math.abs(RAND.nextInt()) % nRows;
+            if (checkForInactive && hasInactiveIndex(metaCache, key)) {
+                checkForInactive = false;
+                hasInactiveIndex = true;
+                batchSize = 200;
+            }
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES(" + pk + "," + v1 + "," + v2 + ")");
+            if (i % batchSize == 0) {
+                conn.commit();
+                if (checkForInactive) Thread.sleep(100);
+            }
+        }
+        conn.commit();
+        for (int i = 0; i < 10000; i++) {
+            int pk = Math.abs(RAND.nextInt()) % nRows;
+            if (checkForInactive && hasInactiveIndex(metaCache, key)) {
+                checkForInactive = false;
+                hasInactiveIndex = true;
+                batchSize = 200;
+            }
+            conn.createStatement().execute("DELETE FROM " + fullTableName + " 
WHERE k= " + pk);
+            if (i % batchSize == 0) {
+                conn.commit();
+            }
+        }
+        conn.commit();
+        for (int i = 0; i < 10000; i++) {
+            int pk = Math.abs(RAND.nextInt()) % nRows;
+            int v1 = Math.abs(RAND.nextInt()) % nRows;
+            int v2 = Math.abs(RAND.nextInt()) % nRows;
+            if (checkForInactive && hasInactiveIndex(metaCache, key)) {
+                checkForInactive = false;
+                hasInactiveIndex = true;
+                batchSize = 200;
+            }
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES(" + pk + "," + v1 + "," + v2 + ")");
+            if (i % batchSize == 0) {
+                conn.commit();
+            }
+        }
+        conn.commit();
+        return hasInactiveIndex;
+    }
+    
+    @Test
+    @Repeat(20)
+    public void testDeleteAndUpsertAfterFailure() throws Throwable {
+        final int nRows = 10;
+        String schemaName = generateUniqueName();
+        String tableName = generateUniqueName();
+        String indexName = generateUniqueName();
+        final String fullTableName = SchemaUtil.getTableName(schemaName, 
tableName);
+        String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute("CREATE TABLE " + fullTableName + 
"(k INTEGER PRIMARY KEY, v1 INTEGER, v2 INTEGER) COLUMN_ENCODED_BYTES = 0, 
STORE_NULLS=true");
+            conn.createStatement().execute("CREATE INDEX " + indexName + " ON 
" + fullTableName + " (v1) INCLUDE (v2)");
+            mutateRandomly(conn, fullTableName, nRows);
+            long disableTS = EnvironmentEdgeManager.currentTimeMillis();
+            HTableInterface metaTable = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
+            IndexUtil.updateIndexState(fullIndexName, disableTS, metaTable, 
PIndexState.DISABLE);
+            mutateRandomly(conn, fullTableName, nRows);
+            TestUtil.waitForIndexRebuild(conn, fullIndexName, 
PIndexState.ACTIVE);
+            
+            long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, 
fullTableName, fullIndexName);
+            assertEquals(nRows,actualRowCount);
+       }
+    }
+    
     @Test
+    public void testWriteWhileRebuilding() throws Throwable {
+        final int nRows = 10;
+        String schemaName = generateUniqueName();
+        String tableName = generateUniqueName();
+        String indexName = generateUniqueName();
+        final String fullTableName = SchemaUtil.getTableName(schemaName, 
tableName);
+        String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute("CREATE TABLE " + fullTableName + 
"(k INTEGER PRIMARY KEY, v1 INTEGER, v2 INTEGER) COLUMN_ENCODED_BYTES = 0, 
STORE_NULLS=true");
+            conn.createStatement().execute("CREATE INDEX " + indexName + " ON 
" + fullTableName + " (v1) INCLUDE (v2)");
+            mutateRandomly(conn, fullTableName, nRows);
+            long disableTS = EnvironmentEdgeManager.currentTimeMillis();
+            HTableInterface metaTable = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
+            IndexUtil.updateIndexState(fullIndexName, disableTS, metaTable, 
PIndexState.DISABLE);
+            mutateRandomly(conn, fullTableName, nRows);
+            final boolean[] hasInactiveIndex = new boolean[1];
+            final CountDownLatch doneSignal = new CountDownLatch(1);
+            Runnable r = new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        Connection conn = 
DriverManager.getConnection(getUrl());
+                        hasInactiveIndex[0] = mutateRandomly(conn, 
fullTableName, nRows, true);
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    } finally {
+                        doneSignal.countDown();
+                    }
+                }
+                
+            };
+            Thread t = new Thread(r);
+            t.setDaemon(true);
+            t.start();
+            TestUtil.waitForIndexRebuild(conn, fullIndexName, 
PIndexState.ACTIVE);
+            doneSignal.await(120, TimeUnit.SECONDS);
+            assertTrue(hasInactiveIndex[0]);
+            
+            long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, 
fullTableName, fullIndexName);
+            assertEquals(nRows,actualRowCount);
+       }
+    }
+
+      @Test
     public void testMultiVersionsAfterFailure() throws Throwable {
         String schemaName = generateUniqueName();
         String tableName = generateUniqueName();
@@ -77,10 +236,8 @@ public class PartialIndexRebuilderIT extends 
BaseUniqueNamesOwnClusterIT {
             conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('a','eeeee')");
             conn.commit();
             TestUtil.waitForIndexRebuild(conn, fullIndexName, 
PIndexState.ACTIVE);
-            
TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullTableName)));
-            
TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullIndexName)));
 
-            TestUtil.scrutinizeIndex(conn, fullTableName, fullIndexName);
+            IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
         }
     }
     
@@ -106,10 +263,8 @@ public class PartialIndexRebuilderIT extends 
BaseUniqueNamesOwnClusterIT {
             conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('a','ccc')");
             conn.commit();
             TestUtil.waitForIndexRebuild(conn, fullIndexName, 
PIndexState.ACTIVE);
-            
TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullTableName)));
-            
TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullIndexName)));
 
-            TestUtil.scrutinizeIndex(conn, fullTableName, fullIndexName);
+            IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
         }
     }
     
@@ -135,10 +290,8 @@ public class PartialIndexRebuilderIT extends 
BaseUniqueNamesOwnClusterIT {
             conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('a',null)");
             conn.commit();
             TestUtil.waitForIndexRebuild(conn, fullIndexName, 
PIndexState.ACTIVE);
-            
TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullTableName)));
-            
TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullIndexName)));
 
-            TestUtil.scrutinizeIndex(conn, fullTableName, fullIndexName);
+            IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
         }
     }
     
@@ -162,10 +315,8 @@ public class PartialIndexRebuilderIT extends 
BaseUniqueNamesOwnClusterIT {
             conn.createStatement().execute("DELETE FROM " + fullTableName);
             conn.commit();
             TestUtil.waitForIndexRebuild(conn, fullIndexName, 
PIndexState.ACTIVE);
-            
TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullTableName)));
-            
TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullIndexName)));
 
-            TestUtil.scrutinizeIndex(conn, fullTableName, fullIndexName);
+            IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
        }
     }
     
@@ -189,10 +340,8 @@ public class PartialIndexRebuilderIT extends 
BaseUniqueNamesOwnClusterIT {
             conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('a','b')");
             conn.commit();
             TestUtil.waitForIndexRebuild(conn, fullIndexName, 
PIndexState.ACTIVE);
-            
TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullTableName)));
-            
TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullIndexName)));
 
-            TestUtil.scrutinizeIndex(conn, fullTableName, fullIndexName);
+            IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
         }
     }
     
@@ -220,10 +369,8 @@ public class PartialIndexRebuilderIT extends 
BaseUniqueNamesOwnClusterIT {
                 conn2.commit();
             }
             TestUtil.waitForIndexRebuild(conn, fullIndexName, 
PIndexState.ACTIVE);
-            
TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullTableName)));
-            
TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullIndexName)));
 
-            TestUtil.scrutinizeIndex(conn, fullTableName, fullIndexName);
+            IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
         }
     }
     
@@ -251,10 +398,8 @@ public class PartialIndexRebuilderIT extends 
BaseUniqueNamesOwnClusterIT {
                 conn2.commit();
             }
             TestUtil.waitForIndexRebuild(conn, fullIndexName, 
PIndexState.ACTIVE);
-            
TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullTableName)));
-            
TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullIndexName)));
 
-            TestUtil.scrutinizeIndex(conn, fullTableName, fullIndexName);
+            IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
         }
     }
     
@@ -282,10 +427,8 @@ public class PartialIndexRebuilderIT extends 
BaseUniqueNamesOwnClusterIT {
                 conn2.commit();
             }
             TestUtil.waitForIndexRebuild(conn, fullIndexName, 
PIndexState.ACTIVE);
-            
TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullTableName)));
-            
TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullIndexName)));
 
-            TestUtil.scrutinizeIndex(conn, fullTableName, fullIndexName);
+            IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/572d242d/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java
new file mode 100644
index 0000000..a703294
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java
@@ -0,0 +1,89 @@
+package org.apache.phoenix.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.junit.Test;
+
+public class IndexScrutinyIT extends ParallelStatsDisabledIT {
+    @Test
+    public void testRowCountIndexScrutiny() throws Throwable {
+        String schemaName = generateUniqueName();
+        String tableName = generateUniqueName();
+        String indexName = generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute("CREATE TABLE " + fullTableName + 
"(k VARCHAR PRIMARY KEY, v VARCHAR) COLUMN_ENCODED_BYTES = 0, 
STORE_NULLS=true");
+            conn.createStatement().execute("CREATE INDEX " + indexName + " ON 
" + fullTableName + " (v)");
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('b','bb')");
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('a','ccc')");
+            conn.commit();
+            
+            int count = conn.createStatement().executeUpdate("DELETE FROM " + 
fullIndexName + " WHERE \":K\"='a' AND \"0:V\"='ccc'");
+            assertEquals(1,count);
+            conn.commit();
+            try {
+                IndexScrutiny.scrutinizeIndex(conn, fullTableName, 
fullIndexName);
+                fail();
+            } catch (AssertionError e) {
+                assertEquals(e.getMessage(),"Expected data table row count to 
match expected:<2> but was:<1>");
+            }
+        }
+    }
+    @Test
+    public void testExtraRowIndexScrutiny() throws Throwable {
+        String schemaName = generateUniqueName();
+        String tableName = generateUniqueName();
+        String indexName = generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute("CREATE TABLE " + fullTableName + 
"(k VARCHAR PRIMARY KEY, v VARCHAR, v2 VARCHAR) COLUMN_ENCODED_BYTES = 0, 
STORE_NULLS=true");
+            conn.createStatement().execute("CREATE INDEX " + indexName + " ON 
" + fullTableName + " (v) INCLUDE (v2)");
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('b','bb','0')");
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('a','ccc','1')");
+            conn.commit();
+            
+            conn.createStatement().executeUpdate("UPSERT INTO " + 
fullIndexName + " VALUES ('bbb','x','0')");
+            conn.commit();
+            try {
+                IndexScrutiny.scrutinizeIndex(conn, fullTableName, 
fullIndexName);
+                fail();
+            } catch (AssertionError e) {
+                assertEquals(e.getMessage(),"Expected to find PK in data 
table: ('x')");
+            }
+        }
+    }
+    
+    @Test
+    public void testValueIndexScrutiny() throws Throwable {
+        String schemaName = generateUniqueName();
+        String tableName = generateUniqueName();
+        String indexName = generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute("CREATE TABLE " + fullTableName + 
"(k VARCHAR PRIMARY KEY, v VARCHAR, v2 VARCHAR) COLUMN_ENCODED_BYTES = 0, 
STORE_NULLS=true");
+            conn.createStatement().execute("CREATE INDEX " + indexName + " ON 
" + fullTableName + " (v) INCLUDE (v2)");
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('b','bb','0')");
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('a','ccc','1')");
+            conn.commit();
+            
+            conn.createStatement().executeUpdate("UPSERT INTO " + 
fullIndexName + " VALUES ('ccc','a','2')");
+            conn.commit();
+            try {
+                IndexScrutiny.scrutinizeIndex(conn, fullTableName, 
fullIndexName);
+                fail();
+            } catch (AssertionError e) {
+                assertEquals(e.getMessage(),"Expected equality for V2, but 
'2'!='1'");
+            }
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/572d242d/phoenix-core/src/it/java/org/apache/phoenix/util/TestUtilIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/util/TestUtilIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/util/TestUtilIT.java
deleted file mode 100644
index d26d9f6..0000000
--- a/phoenix-core/src/it/java/org/apache/phoenix/util/TestUtilIT.java
+++ /dev/null
@@ -1,89 +0,0 @@
-package org.apache.phoenix.util;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-
-import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
-import org.junit.Test;
-
-public class TestUtilIT extends ParallelStatsDisabledIT {
-    @Test
-    public void testRowCountIndexScrutiny() throws Throwable {
-        String schemaName = generateUniqueName();
-        String tableName = generateUniqueName();
-        String indexName = generateUniqueName();
-        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
-        String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
-        try (Connection conn = DriverManager.getConnection(getUrl())) {
-            conn.createStatement().execute("CREATE TABLE " + fullTableName + 
"(k VARCHAR PRIMARY KEY, v VARCHAR) COLUMN_ENCODED_BYTES = 0, 
STORE_NULLS=true");
-            conn.createStatement().execute("CREATE INDEX " + indexName + " ON 
" + fullTableName + " (v)");
-            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('b','bb')");
-            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('a','ccc')");
-            conn.commit();
-            
-            int count = conn.createStatement().executeUpdate("DELETE FROM " + 
fullIndexName + " WHERE \":K\"='a' AND \"0:V\"='ccc'");
-            assertEquals(1,count);
-            conn.commit();
-            try {
-                TestUtil.scrutinizeIndex(conn, fullTableName, fullIndexName);
-                fail();
-            } catch (AssertionError e) {
-                assertEquals(e.getMessage(),"Expected data table row count to 
match expected:<2> but was:<1>");
-            }
-        }
-    }
-    @Test
-    public void testExtraRowIndexScrutiny() throws Throwable {
-        String schemaName = generateUniqueName();
-        String tableName = generateUniqueName();
-        String indexName = generateUniqueName();
-        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
-        String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
-        try (Connection conn = DriverManager.getConnection(getUrl())) {
-            conn.createStatement().execute("CREATE TABLE " + fullTableName + 
"(k VARCHAR PRIMARY KEY, v VARCHAR, v2 VARCHAR) COLUMN_ENCODED_BYTES = 0, 
STORE_NULLS=true");
-            conn.createStatement().execute("CREATE INDEX " + indexName + " ON 
" + fullTableName + " (v) INCLUDE (v2)");
-            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('b','bb','0')");
-            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('a','ccc','1')");
-            conn.commit();
-            
-            conn.createStatement().executeUpdate("UPSERT INTO " + 
fullIndexName + " VALUES ('bbb','x','0')");
-            conn.commit();
-            try {
-                TestUtil.scrutinizeIndex(conn, fullTableName, fullIndexName);
-                fail();
-            } catch (AssertionError e) {
-                assertEquals(e.getMessage(),"Expected to find PK in data 
table: ('x')");
-            }
-        }
-    }
-    
-    @Test
-    public void testValueIndexScrutiny() throws Throwable {
-        String schemaName = generateUniqueName();
-        String tableName = generateUniqueName();
-        String indexName = generateUniqueName();
-        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
-        String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
-        try (Connection conn = DriverManager.getConnection(getUrl())) {
-            conn.createStatement().execute("CREATE TABLE " + fullTableName + 
"(k VARCHAR PRIMARY KEY, v VARCHAR, v2 VARCHAR) COLUMN_ENCODED_BYTES = 0, 
STORE_NULLS=true");
-            conn.createStatement().execute("CREATE INDEX " + indexName + " ON 
" + fullTableName + " (v) INCLUDE (v2)");
-            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('b','bb','0')");
-            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('a','ccc','1')");
-            conn.commit();
-            
-            conn.createStatement().executeUpdate("UPSERT INTO " + 
fullIndexName + " VALUES ('ccc','a','2')");
-            conn.commit();
-            try {
-                TestUtil.scrutinizeIndex(conn, fullTableName, fullIndexName);
-                fail();
-            } catch (AssertionError e) {
-                assertEquals(e.getMessage(),"Expected equality for V2, but 
'2'!='1'");
-            }
-        }
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/572d242d/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index a07b5d0..298f9e9 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
@@ -897,6 +898,8 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                                     
put.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS,
                                             PDataType.TRUE_BYTES);
                                     mutations.add(put);
+                                    // Since we're replaying existing 
mutations, it makes no sense to write them to the wal
+                                    put.setDurability(Durability.SKIP_WAL);
                                 }
                                 put.add(cell);
                             } else {
@@ -907,6 +910,8 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                                     
del.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS,
                                             PDataType.TRUE_BYTES);
                                     mutations.add(del);
+                                    // Since we're replaying existing 
mutations, it makes no sense to write them to the wal
+                                    del.setDurability(Durability.SKIP_WAL);
                                 }
                                 del.addDeleteMarker(cell);
                             }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/572d242d/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index d017bc1..02b7a0b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -123,6 +123,11 @@ public class Indexer extends BaseRegionObserver {
   protected IndexBuildManager builder;
   private LockManager lockManager;
 
+  // Hack to get around not being able to save any state between
+  // coprocessor calls. TODO: remove after HBASE-18127 when available
+  private ThreadLocal<Collection<Pair<Mutation, byte[]>>> indexUpdates =
+          new ThreadLocal<Collection<Pair<Mutation, byte[]>>>();
+  
   /** Configuration key for the {@link IndexBuilder} to use */
   public static final String INDEX_BUILDER_CONF_KEY = "index.builder";
 
@@ -483,9 +488,12 @@ public class Indexer extends BaseRegionObserver {
               }
     
               // Only copy mutations if we found duplicate rows
-              // (which is pretty much never for Phoenix)
+              // which only occurs when we're partially rebuilding
+              // the index (since we'll potentially have both a
+              // Put and a Delete mutation for the same row).
               if (copyMutations) {
                   // Add the mutation to the batch set
+
                   ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
                   MultiMutation stored = mutationsMap.get(row);
                   // we haven't seen this row before, so add it
@@ -508,6 +516,9 @@ public class Indexer extends BaseRegionObserver {
           miniBatchOp.setWalEdit(0, edit);
       }
   
+      if (copyMutations) {
+          mutations = 
IndexManagementUtil.flattenMutationsByTimestamp(mutations);
+      }
 
       // get the current span, or just use a null-span to avoid a bunch of if 
statements
       try (TraceScope scope = Trace.startSpan("Starting to build index 
updates")) {
@@ -531,42 +542,29 @@ public class Indexer extends BaseRegionObserver {
           metricSource.updateIndexPrepareTime(duration);
           current.addTimelineAnnotation("Built index updates, doing preStep");
           TracingUtils.addAnnotation(current, "index update count", 
indexUpdates.size());
-
-          // write them, either to WAL or the index tables
-          doPre(indexUpdates, edit, durability);
+          if (!indexUpdates.isEmpty()) {
+              setIndexUpdates(c, indexUpdates);
+              // write index updates to WAL
+              if (durability != Durability.SKIP_WAL) {
+                  // we have all the WAL durability, so we just update the WAL 
entry and move on
+                  for (Pair<Mutation, byte[]> entry : indexUpdates) {
+                    edit.add(new IndexedKeyValue(entry.getSecond(), 
entry.getFirst()));
+                  }              
+              }
+          }
       }
   }
 
-  /**
-   * Add the index updates to the WAL, or write to the index table, if the WAL 
has been disabled
-   * @return <tt>true</tt> if the WAL has been updated.
-   * @throws IOException
-   */
-  private boolean doPre(Collection<Pair<Mutation, byte[]>> indexUpdates, final 
WALEdit edit,
-      final Durability durability) throws IOException {
-    // no index updates, so we are done
-    if (indexUpdates == null || indexUpdates.size() == 0) {
-      return false;
-    }
-
-    // if writing to wal is disabled, we never see the WALEdit updates down 
the way, so do the index
-    // update right away
-    if (durability == Durability.SKIP_WAL) {
-      try {
-        this.writer.write(indexUpdates, false);
-        return false;
-      } catch (Throwable e) {
-        LOG.error("Failed to update index with entries:" + indexUpdates, e);
-        IndexManagementUtil.rethrowIndexingException(e);
-      }
-    }
-
-    // we have all the WAL durability, so we just update the WAL entry and 
move on
-    for (Pair<Mutation, byte[]> entry : indexUpdates) {
-      edit.add(new IndexedKeyValue(entry.getSecond(), entry.getFirst()));
-    }
-
-    return true;
+  private void setIndexUpdates(ObserverContext<RegionCoprocessorEnvironment> 
c, Collection<Pair<Mutation, byte[]>> indexUpdates) {
+      this.indexUpdates.set(indexUpdates);
+  }
+  
+  private Collection<Pair<Mutation, byte[]>> 
getIndexUpdates(ObserverContext<RegionCoprocessorEnvironment> c) {
+      return this.indexUpdates.get();
+  }
+  
+  private void 
removeIndexUpdates(ObserverContext<RegionCoprocessorEnvironment> c) {
+      this.indexUpdates.remove();
   }
 
   @Override
@@ -589,8 +587,7 @@ public class Indexer extends BaseRegionObserver {
           if (success) { // if miniBatchOp was successfully written, write 
index updates
               //each batch operation, only the first one will have anything 
useful, so we can just grab that
               Mutation mutation = miniBatchOp.getOperation(0);
-              WALEdit edit = miniBatchOp.getWalEdit(0);
-              doPost(edit, mutation, mutation.getDurability());
+              doPost(c, mutation);
           }
        } finally {
            long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
@@ -604,22 +601,21 @@ public class Indexer extends BaseRegionObserver {
        }
   }
 
-  private void doPost(WALEdit edit, Mutation m, final Durability durability) 
throws IOException {
-    try {
-      doPostWithExceptions(edit, m, durability);
-      return;
-    } catch (Throwable e) {
-      rethrowIndexingException(e);
+  private void doPost(ObserverContext<RegionCoprocessorEnvironment> c, 
Mutation m) throws IOException {
+      try {
+        doPostWithExceptions(c,m);
+        return;
+      } catch (Throwable e) {
+        rethrowIndexingException(e);
+      }
+      throw new RuntimeException(
+          "Somehow didn't complete the index update, but didn't return 
succesfully either!");
     }
-    throw new RuntimeException(
-        "Somehow didn't complete the index update, but didn't return 
succesfully either!");
-  }
 
-  private void doPostWithExceptions(WALEdit edit, Mutation m, final Durability 
durability)
-          throws Exception {
+  private void 
doPostWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c, Mutation 
m)
+          throws IOException {
       //short circuit, if we don't need to do any work
-      if (durability == Durability.SKIP_WAL || !this.builder.isEnabled(m) || 
edit == null) {
-          // already did the index update in prePut, so we are done
+      if (!this.builder.isEnabled(m)) {
           return;
       }
 
@@ -631,59 +627,38 @@ public class Indexer extends BaseRegionObserver {
           }
           long start = EnvironmentEdgeManager.currentTimeMillis();
 
-          // there is a little bit of excess here- we iterate all the 
non-indexed kvs for this check first
-          // and then do it again later when getting out the index updates. 
This should be pretty minor
-          // though, compared to the rest of the runtime
-          IndexedKeyValue ikv = getFirstIndexedKeyValue(edit);
-
-          /*
-           * early exit - we have nothing to write, so we don't need to do 
anything else. NOTE: we don't
-           * release the WAL Rolling lock (INDEX_UPDATE_LOCK) since we never 
take it in doPre if there are
-           * no index updates.
-           */
-          if (ikv == null) {
+          
+          Collection<Pair<Mutation, byte[]>> indexUpdates = getIndexUpdates(c);
+          if (indexUpdates == null) {
               return;
           }
 
-          /*
-           * only write the update if we haven't already seen this batch. We 
only want to write the batch
-           * once (this hook gets called with the same WALEdit for each 
Put/Delete in a batch, which can
-           * lead to writing all the index updates for each Put/Delete).
-           */
-          if (!ikv.getBatchFinished()) {
-              Collection<Pair<Mutation, byte[]>> indexUpdates = 
extractIndexUpdate(edit);
-
-              // the WAL edit is kept in memory and we already specified the 
factory when we created the
-              // references originally - therefore, we just pass in a null 
factory here and use the ones
-              // already specified on each reference
-              try {
-                         current.addTimelineAnnotation("Actually doing index 
update for first time");
-                  Collection<Pair<Mutation, byte[]>> localUpdates =
-                          new ArrayList<Pair<Mutation, byte[]>>();
-                  Collection<Pair<Mutation, byte[]>> remoteUpdates =
-                          new ArrayList<Pair<Mutation, byte[]>>();
-                         for (Pair<Mutation, byte[]> mutation : indexUpdates) {
-                                 if 
(Bytes.toString(mutation.getSecond()).equals(
-                                                 
environment.getRegion().getTableDesc().getNameAsString())) {
-                                         localUpdates.add(mutation);
-                                 } else {
-                          remoteUpdates.add(mutation);
-                                 }
-                         }
-                  if(!remoteUpdates.isEmpty()) {
-                      writer.writeAndKillYourselfOnFailure(remoteUpdates, 
false);
-                  }
-                  if(!localUpdates.isEmpty()) {
-                      writer.writeAndKillYourselfOnFailure(localUpdates, true);
-                  }
-              } finally {                  // With a custom kill policy, we 
may throw instead of kill the server.
-                  // Without doing this in a finally block (at least with the 
mini cluster),
-                  // the region server never goes down.
-
-                  // mark the batch as having been written. In the 
single-update case, this never gets check
-                  // again, but in the batch case, we will check it again (see 
above).
-                  ikv.markBatchFinished();
+          // the WAL edit is kept in memory and we already specified the 
factory when we created the
+          // references originally - therefore, we just pass in a null factory 
here and use the ones
+          // already specified on each reference
+          try {
+                 current.addTimelineAnnotation("Actually doing index update 
for first time");
+              Collection<Pair<Mutation, byte[]>> localUpdates =
+                      new ArrayList<Pair<Mutation, byte[]>>();
+              Collection<Pair<Mutation, byte[]>> remoteUpdates =
+                      new ArrayList<Pair<Mutation, byte[]>>();
+                 for (Pair<Mutation, byte[]> mutation : indexUpdates) {
+                         if (Bytes.toString(mutation.getSecond()).equals(
+                                         
environment.getRegion().getTableDesc().getNameAsString())) {
+                                 localUpdates.add(mutation);
+                         } else {
+                      remoteUpdates.add(mutation);
+                         }
+                 }
+              if(!remoteUpdates.isEmpty()) {
+                  writer.writeAndKillYourselfOnFailure(remoteUpdates, false);
+              }
+              if(!localUpdates.isEmpty()) {
+                  writer.writeAndKillYourselfOnFailure(localUpdates, true);
               }
+          } finally { // With a custom kill policy, we may throw instead of 
kill the server.
+              // mark the batch as having been written so it won't be written 
again
+              removeIndexUpdates(c);
           }
 
           long duration = EnvironmentEdgeManager.currentTimeMillis() - start;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/572d242d/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
index 8d2bd83..c013b5b 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
@@ -12,24 +12,17 @@ package org.apache.phoenix.hbase.index.covered;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.hbase.index.builder.BaseIndexBuilder;
 import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
@@ -39,7 +32,6 @@ import 
org.apache.phoenix.hbase.index.covered.update.IndexUpdateManager;
 import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
 
 import com.google.common.collect.Lists;
-import com.google.common.primitives.Longs;
 
 /**
  * Build covered indexes for phoenix updates.
@@ -94,76 +86,27 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
      * @throws IOException
      */
     private void batchMutationAndAddUpdates(IndexUpdateManager manager, 
LocalTableState state, Mutation m, IndexMetaData indexMetaData) throws 
IOException {
-        // split the mutation into timestamp-based batches
-        Collection<Batch> batches = createTimestampBatchesFromMutation(m);
-
-        // go through each batch of keyvalues and build separate index entries 
for each
-        boolean cleanupCurrentState = !indexMetaData.isImmutableRows();
-        for (Batch batch : batches) {
-            /*
-             * We have to split the work between the cleanup and the update 
for each group because when we update the
-             * current state of the row for the current batch (appending the 
mutations for the current batch) the next
-             * group will see that as the current state, which will can cause 
the a delete and a put to be created for
-             * the next group.
-             */
-            if (addMutationsForBatch(manager, batch, state, 
cleanupCurrentState, indexMetaData)) {
-                cleanupCurrentState = false;
-            }
-        }
-    }
-
-    /**
-     * Batch all the {@link KeyValue}s in a {@link Mutation} by timestamp. 
Updates any {@link KeyValue} with a timestamp
-     * == {@link HConstants#LATEST_TIMESTAMP} to the timestamp at the time the 
method is called.
-     * 
-     * @param m
-     *            {@link Mutation} from which to extract the {@link KeyValue}s
-     * @return the mutation, broken into batches and sorted in ascending order 
(smallest first)
-     */
-    protected Collection<Batch> createTimestampBatchesFromMutation(Mutation m) 
{
-        Map<Long, Batch> batches = new HashMap<Long, Batch>();
+        // The cells of a mutation are broken up into time stamp batches prior 
to this call (in Indexer).
+        long ts = 
m.getFamilyCellMap().values().iterator().next().iterator().next().getTimestamp();
+        Batch batch = new Batch(ts);
         for (List<Cell> family : m.getFamilyCellMap().values()) {
-            List<KeyValue> familyKVs = KeyValueUtil.ensureKeyValues(family);
-            createTimestampBatchesFromKeyValues(familyKVs, batches);
-        }
-        // sort the batches
-        List<Batch> sorted = new ArrayList<Batch>(batches.values());
-        Collections.sort(sorted, new Comparator<Batch>() {
-            @Override
-            public int compare(Batch o1, Batch o2) {
-                return Longs.compare(o1.getTimestamp(), o2.getTimestamp());
+            List<KeyValue> kvs = KeyValueUtil.ensureKeyValues(family);
+            for (KeyValue kv : kvs) {
+                batch.add(kv);
+                assert(ts == kv.getTimestamp());
             }
-        });
-        return sorted;
-    }
-
-    /**
-     * Batch all the {@link KeyValue}s in a collection of kvs by timestamp. 
Updates any {@link KeyValue} with a
-     * timestamp == {@link HConstants#LATEST_TIMESTAMP} to the timestamp at 
the time the method is called.
-     * 
-     * @param kvs
-     *            {@link KeyValue}s to break into batches
-     * @param batches
-     *            to update with the given kvs
-     */
-    protected void createTimestampBatchesFromKeyValues(Collection<KeyValue> 
kvs, Map<Long, Batch> batches) {
-        long now = EnvironmentEdgeManager.currentTime();
-        byte[] nowBytes = Bytes.toBytes(now);
+        }
 
-        // batch kvs by timestamp
-        for (KeyValue kv : kvs) {
-            long ts = kv.getTimestamp();
-            // override the timestamp to the current time, so the index and 
primary tables match
-            // all the keys with LATEST_TIMESTAMP will then be put into the 
same batch
-            if (kv.updateLatestStamp(nowBytes)) {
-                ts = now;
-            }
-            Batch batch = batches.get(ts);
-            if (batch == null) {
-                batch = new Batch(ts);
-                batches.put(ts, batch);
-            }
-            batch.add(kv);
+        // go through each batch of keyvalues and build separate index entries 
for each
+        boolean cleanupCurrentState = !indexMetaData.isImmutableRows();
+        /*
+         * We have to split the work between the cleanup and the update for 
each group because when we update the
+         * current state of the row for the current batch (appending the 
mutations for the current batch) the next
+         * group will see that as the current state, which will can cause the 
a delete and a put to be created for
+         * the next group.
+         */
+        if (addMutationsForBatch(manager, batch, state, cleanupCurrentState, 
indexMetaData)) {
+            cleanupCurrentState = false;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/572d242d/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java
index c830e54..925bcbb 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java
@@ -40,6 +40,7 @@ import org.apache.phoenix.hbase.index.covered.IndexMetaData;
 import org.apache.phoenix.hbase.index.covered.LocalTableState;
 import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
 import org.apache.phoenix.hbase.index.covered.update.IndexUpdateManager;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 
 /**
  * Index maintainer that maintains multiple indexes based on '{@link 
ColumnGroup}s'. Each group is a
@@ -141,7 +142,7 @@ public class CoveredColumnIndexer extends NonTxIndexBuilder 
{
       }
 
       // do the usual thing as for deletes
-      Collection<Batch> timeBatch = createTimestampBatchesFromMutation(p);
+      Collection<Batch> timeBatch = 
IndexManagementUtil.createTimestampBatchesFromMutation(p);
       LocalTableState state = new LocalTableState(env, localTable, p);
       for (Batch entry : timeBatch) {
         //just set the timestamp on the table - it already has all the future 
state
@@ -158,7 +159,7 @@ public class CoveredColumnIndexer extends NonTxIndexBuilder 
{
    */
   private Collection<Batch>  batchByRow(Collection<KeyValue> filtered) {
     Map<Long, Batch> batches = new HashMap<Long, Batch>();
-    createTimestampBatchesFromKeyValues(filtered, batches);
+    IndexManagementUtil.createTimestampBatchesFromKeyValues(filtered, batches);
     return batches.values();
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/572d242d/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
index 6582c8a..697caef 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
@@ -18,22 +18,39 @@
 package org.apache.phoenix.hbase.index.util;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
 import org.apache.phoenix.hbase.index.ValueGetter;
 import org.apache.phoenix.hbase.index.builder.IndexBuildingFailureException;
+import org.apache.phoenix.hbase.index.covered.Batch;
 import org.apache.phoenix.hbase.index.covered.data.LazyValueGetter;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import 
org.apache.phoenix.hbase.index.scanner.ScannerBuilder.CoveredDeleteScanner;
 
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Longs;
+
 /**
  * Utility class to help manage indexes
  */
@@ -189,4 +206,81 @@ public class IndexManagementUtil {
             conf.setInt(key, value);
         }
     }
+
+    /**
+     * Batch all the {@link KeyValue}s in a collection of kvs by timestamp. 
Updates any {@link KeyValue} with a
+     * timestamp == {@link HConstants#LATEST_TIMESTAMP} to the timestamp at 
the time the method is called.
+     * 
+     * @param kvs {@link KeyValue}s to break into batches
+     * @param batches to update with the given kvs
+     */
+    public static void 
createTimestampBatchesFromKeyValues(Collection<KeyValue> kvs, Map<Long, Batch> 
batches) {
+        // batch kvs by timestamp
+        for (KeyValue kv : kvs) {
+            long ts = kv.getTimestamp();
+            Batch batch = batches.get(ts);
+            if (batch == null) {
+                batch = new Batch(ts);
+                batches.put(ts, batch);
+            }
+            batch.add(kv);
+        }
+    }
+
+    /**
+     * Batch all the {@link KeyValue}s in a {@link Mutation} by timestamp. 
Updates any {@link KeyValue} with a timestamp
+     * == {@link HConstants#LATEST_TIMESTAMP} to the timestamp at the time the 
method is called.
+     * 
+     * @param m {@link Mutation} from which to extract the {@link KeyValue}s
+     * @return the mutation, broken into batches and sorted in ascending order 
(smallest first)
+     */
+    public static Collection<Batch> 
createTimestampBatchesFromMutation(Mutation m) {
+        Map<Long, Batch> batches = new HashMap<Long, Batch>();
+        for (List<Cell> family : m.getFamilyCellMap().values()) {
+            List<KeyValue> familyKVs = KeyValueUtil.ensureKeyValues(family);
+            createTimestampBatchesFromKeyValues(familyKVs, batches);
+        }
+        // sort the batches
+        List<Batch> sorted = new ArrayList<Batch>(batches.values());
+        Collections.sort(sorted, new Comparator<Batch>() {
+            @Override
+            public int compare(Batch o1, Batch o2) {
+                return Longs.compare(o1.getTimestamp(), o2.getTimestamp());
+            }
+        });
+        return sorted;
+    }
+
+    public static Collection<? extends Mutation> 
flattenMutationsByTimestamp(Collection<? extends Mutation> mutations) {
+          List<Mutation> flattenedMutations = 
Lists.newArrayListWithExpectedSize(mutations.size() * 10);
+          for (Mutation m : mutations) {
+              byte[] row = m.getRow();
+              Collection<Batch> batches = 
createTimestampBatchesFromMutation(m);
+              for (Batch batch : batches) {
+                  Mutation mWithSameTS;
+                  Cell firstCell = batch.getKvs().get(0);
+                  if (KeyValue.Type.codeToType(firstCell.getTypeByte()) == 
KeyValue.Type.Put) {
+                      mWithSameTS = new Put(row);
+                  } else {
+                      mWithSameTS = new Delete(row);
+                  }
+                  if (m.getAttributesMap() != null) {
+                      for (Map.Entry<String,byte[]> entry : 
m.getAttributesMap().entrySet()) {
+                          mWithSameTS.setAttribute(entry.getKey(), 
entry.getValue());
+                      }
+                  }
+                  for (Cell cell : batch.getKvs()) {
+                      byte[] fam = CellUtil.cloneFamily(cell);
+                      List<Cell> famCells = 
mWithSameTS.getFamilyCellMap().get(fam);
+                      if (famCells == null) {
+                          famCells = Lists.newArrayList();
+                          mWithSameTS.getFamilyCellMap().put(fam, famCells);
+                      }
+                      famCells.add(cell);
+                  }
+                  flattenedMutations.add(mWithSameTS);
+              }
+          }
+          return flattenedMutations;
+      }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/572d242d/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredColumnsTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredColumnsTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredColumnsTest.java
new file mode 100644
index 0000000..db7d838
--- /dev/null
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredColumnsTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.covered;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+
+import org.apache.phoenix.hbase.index.covered.CoveredColumns;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+
+public class CoveredColumnsTest {
+
+  private static final byte[] fam = Bytes.toBytes("fam");
+  private static final byte[] qual = Bytes.toBytes("qual");
+
+  @Test
+  public void testCovering() {
+    ColumnReference ref = new ColumnReference(fam, qual);
+    CoveredColumns columns = new CoveredColumns();
+    assertEquals("Should have only found a single column to cover", 1, columns
+        .findNonCoveredColumns(Arrays.asList(ref)).size());
+
+    columns.addColumn(ref);
+    assertEquals("Shouldn't have any columns to cover", 0,
+      columns.findNonCoveredColumns(Arrays.asList(ref)).size());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/572d242d/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java
new file mode 100644
index 0000000..dcf330f
--- /dev/null
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.covered;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
+import org.apache.phoenix.hbase.index.covered.data.LocalTable;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.scanner.Scanner;
+import 
org.apache.phoenix.hbase.index.scanner.ScannerBuilder.CoveredDeleteScanner;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ *
+ */
+public class LocalTableStateTest {
+
+  private static final byte[] row = Bytes.toBytes("row");
+  private static final byte[] fam = Bytes.toBytes("fam");
+  private static final byte[] qual = Bytes.toBytes("qual");
+  private static final byte[] val = Bytes.toBytes("val");
+  private static final long ts = 10;
+  private static final IndexMetaData indexMetaData = new IndexMetaData() {
+
+    @Override
+    public boolean isImmutableRows() {
+        return false;
+    }
+
+    @Override
+    public boolean ignoreNewerMutations() {
+        return false;
+    }
+      
+  };
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testCorrectOrderingWithLazyLoadingColumns() throws Exception {
+    Put m = new Put(row);
+    m.add(fam, qual, ts, val);
+    // setup mocks
+    Configuration conf = new Configuration(false);
+    RegionCoprocessorEnvironment env = 
Mockito.mock(RegionCoprocessorEnvironment.class);
+    Mockito.when(env.getConfiguration()).thenReturn(conf);
+
+    Region region = Mockito.mock(Region.class);
+    Mockito.when(env.getRegion()).thenReturn(region);
+    RegionScanner scanner = Mockito.mock(RegionScanner.class);
+    
Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenReturn(scanner);
+    final byte[] stored = Bytes.toBytes("stored-value");
+    Mockito.when(scanner.next(Mockito.any(List.class))).thenAnswer(new 
Answer<Boolean>() {
+      @Override
+      public Boolean answer(InvocationOnMock invocation) throws Throwable {
+        List<KeyValue> list = (List<KeyValue>) invocation.getArguments()[0];
+        KeyValue kv = new KeyValue(row, fam, qual, ts, Type.Put, stored);
+        kv.setSequenceId(0);
+        list.add(kv);
+        return false;
+      }
+    });
+
+
+    LocalHBaseState state = new LocalTable(env);
+    LocalTableState table = new LocalTableState(env, state, m);
+    //add the kvs from the mutation
+    table.addPendingUpdates(KeyValueUtil.ensureKeyValues(m.get(fam, qual)));
+
+    // setup the lookup
+    ColumnReference col = new ColumnReference(fam, qual);
+    table.setCurrentTimestamp(ts);
+    //check that our value still shows up first on scan, even though this is a 
lazy load
+    Pair<CoveredDeleteScanner, IndexUpdate> p = 
table.getIndexedColumnsTableState(Arrays.asList(col), false, false, 
indexMetaData);
+    Scanner s = p.getFirst();
+    assertEquals("Didn't get the pending mutation's value first", m.get(fam, 
qual).get(0), s.next());
+  }
+
+  public static final class ScannerCreatedException extends RuntimeException {
+      ScannerCreatedException(String msg) {
+          super(msg);
+      }
+  }
+  
+  @Test(expected = ScannerCreatedException.class)
+  public void testScannerForMutableRows() throws Exception {
+      IndexMetaData indexMetaData = new IndexMetaData() {
+
+          @Override
+          public boolean isImmutableRows() {
+              return false;
+          }
+
+          @Override
+          public boolean ignoreNewerMutations() {
+              return false;
+          }
+            
+        };
+    Put m = new Put(row);
+    m.add(fam, qual, ts, val);
+    // setup mocks
+    Configuration conf = new Configuration(false);
+    RegionCoprocessorEnvironment env = 
Mockito.mock(RegionCoprocessorEnvironment.class);
+    Mockito.when(env.getConfiguration()).thenReturn(conf);
+
+    Region region = Mockito.mock(Region.class);
+    Mockito.when(env.getRegion()).thenReturn(region);
+    Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenThrow(new 
ScannerCreatedException("Should not open scanner when data is immutable"));
+
+    LocalHBaseState state = new LocalTable(env);
+    LocalTableState table = new LocalTableState(env, state, m);
+    //add the kvs from the mutation
+    table.addPendingUpdates(KeyValueUtil.ensureKeyValues(m.get(fam, qual)));
+
+    // setup the lookup
+    ColumnReference col = new ColumnReference(fam, qual);
+    table.setCurrentTimestamp(ts);
+    table.getIndexedColumnsTableState(Arrays.asList(col), false, false, 
indexMetaData);
+  }
+
+  @Test
+  public void testNoScannerForImmutableRows() throws Exception {
+      IndexMetaData indexMetaData = new IndexMetaData() {
+
+          @Override
+          public boolean isImmutableRows() {
+              return true;
+          }
+
+          @Override
+          public boolean ignoreNewerMutations() {
+              return false;
+          }
+            
+        };
+    Put m = new Put(row);
+    m.add(fam, qual, ts, val);
+    // setup mocks
+    Configuration conf = new Configuration(false);
+    RegionCoprocessorEnvironment env = 
Mockito.mock(RegionCoprocessorEnvironment.class);
+    Mockito.when(env.getConfiguration()).thenReturn(conf);
+
+    Region region = Mockito.mock(Region.class);
+    Mockito.when(env.getRegion()).thenReturn(region);
+    Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenThrow(new 
ScannerCreatedException("Should not open scanner when data is immutable"));
+
+    LocalHBaseState state = new LocalTable(env);
+    LocalTableState table = new LocalTableState(env, state, m);
+    //add the kvs from the mutation
+    table.addPendingUpdates(KeyValueUtil.ensureKeyValues(m.get(fam, qual)));
+
+    // setup the lookup
+    ColumnReference col = new ColumnReference(fam, qual);
+    table.setCurrentTimestamp(ts);
+    //check that our value still shows up first on scan, even though this is a 
lazy load
+    Pair<CoveredDeleteScanner, IndexUpdate> p = 
table.getIndexedColumnsTableState(Arrays.asList(col), false, false, 
indexMetaData);
+    Scanner s = p.getFirst();
+    assertEquals("Didn't get the pending mutation's value first", m.get(fam, 
qual).get(0), s.next());
+  }
+
+  /**
+   * Test that we correctly rollback the state of keyvalue
+   * @throws Exception
+   */
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testCorrectRollback() throws Exception {
+    Put m = new Put(row);
+    m.add(fam, qual, ts, val);
+    // setup mocks
+    RegionCoprocessorEnvironment env = 
Mockito.mock(RegionCoprocessorEnvironment.class);
+
+    Region region = Mockito.mock(Region.class);
+    Mockito.when(env.getRegion()).thenReturn(region);
+    RegionScanner scanner = Mockito.mock(RegionScanner.class);
+    
Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenReturn(scanner);
+    final byte[] stored = Bytes.toBytes("stored-value");
+    final KeyValue storedKv = new KeyValue(row, fam, qual, ts, Type.Put, 
stored);
+    storedKv.setSequenceId(2);
+    Mockito.when(scanner.next(Mockito.any(List.class))).thenAnswer(new 
Answer<Boolean>() {
+      @Override
+      public Boolean answer(InvocationOnMock invocation) throws Throwable {
+        List<KeyValue> list = (List<KeyValue>) invocation.getArguments()[0];
+
+        list.add(storedKv);
+        return false;
+      }
+    });
+    LocalHBaseState state = new LocalTable(env);
+    LocalTableState table = new LocalTableState(env, state, m);
+    // add the kvs from the mutation
+    KeyValue kv = KeyValueUtil.ensureKeyValue(m.get(fam, qual).get(0));
+    kv.setSequenceId(0);
+    table.addPendingUpdates(kv);
+
+    // setup the lookup
+    ColumnReference col = new ColumnReference(fam, qual);
+    table.setCurrentTimestamp(ts);
+    // check that the value is there
+    Pair<CoveredDeleteScanner, IndexUpdate> p = 
table.getIndexedColumnsTableState(Arrays.asList(col), false, false, 
indexMetaData);
+    Scanner s = p.getFirst();
+    assertEquals("Didn't get the pending mutation's value first", kv, 
s.next());
+
+    // rollback that value
+    table.rollback(Arrays.asList(kv));
+    p = table.getIndexedColumnsTableState(Arrays.asList(col), false, false, 
indexMetaData);
+    s = p.getFirst();
+    assertEquals("Didn't correctly rollback the row - still found it!", null, 
s.next());
+    Mockito.verify(env, Mockito.times(1)).getRegion();
+    Mockito.verify(region, 
Mockito.times(1)).getScanner(Mockito.any(Scan.class));
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testOnlyLoadsRequestedColumns() throws Exception {
+    // setup mocks
+    RegionCoprocessorEnvironment env = 
Mockito.mock(RegionCoprocessorEnvironment.class);
+
+    Region region = Mockito.mock(Region.class);
+    Mockito.when(env.getRegion()).thenReturn(region);
+    RegionScanner scanner = Mockito.mock(RegionScanner.class);
+    
Mockito.when(region.getScanner(Mockito.any(Scan.class))).thenReturn(scanner);
+    final KeyValue storedKv =
+        new KeyValue(row, fam, qual, ts, Type.Put, 
Bytes.toBytes("stored-value"));
+    storedKv.setSequenceId(2);
+    Mockito.when(scanner.next(Mockito.any(List.class))).thenAnswer(new 
Answer<Boolean>() {
+      @Override
+      public Boolean answer(InvocationOnMock invocation) throws Throwable {
+        List<KeyValue> list = (List<KeyValue>) invocation.getArguments()[0];
+
+        list.add(storedKv);
+        return false;
+      }
+    });
+    LocalHBaseState state = new LocalTable(env);
+    Put pendingUpdate = new Put(row);
+    pendingUpdate.add(fam, qual, ts, val);
+    LocalTableState table = new LocalTableState(env, state, pendingUpdate);
+
+    // do the lookup for the given column
+    ColumnReference col = new ColumnReference(fam, qual);
+    table.setCurrentTimestamp(ts);
+    // check that the value is there
+    Pair<CoveredDeleteScanner, IndexUpdate> p = 
table.getIndexedColumnsTableState(Arrays.asList(col), false, false, 
indexMetaData);
+    Scanner s = p.getFirst();
+    // make sure it read the table the one time
+    assertEquals("Didn't get the stored keyvalue!", storedKv, s.next());
+
+    // on the second lookup it shouldn't access the underlying table again - 
the cached columns
+    // should know they are done
+    p = table.getIndexedColumnsTableState(Arrays.asList(col), false, false, 
indexMetaData);
+    s = p.getFirst();
+    assertEquals("Lost already loaded update!", storedKv, s.next());
+    Mockito.verify(env, Mockito.times(1)).getRegion();
+    Mockito.verify(region, 
Mockito.times(1)).getScanner(Mockito.any(Scan.class));
+  }
+
+  // TODO add test here for making sure multiple column references with the 
same column family don't
+  // cause an infinite loop
+}

Reply via email to