PHOENIX-4173 Ensure that the rebuild fails if an index that transitions back to 
disabled while rebuilding


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/6c5bc3bb
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/6c5bc3bb
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/6c5bc3bb

Branch: refs/heads/master
Commit: 6c5bc3bba7732357bf3fc4ab39e7fda10e97539e
Parents: 28aebd6
Author: James Taylor <jamestay...@apache.org>
Authored: Wed Sep 6 12:46:34 2017 -0700
Committer: James Taylor <jamestay...@apache.org>
Committed: Wed Sep 6 18:24:51 2017 -0700

----------------------------------------------------------------------
 .../end2end/index/PartialIndexRebuilderIT.java  | 151 ++++++++++++++++++-
 1 file changed, 143 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/6c5bc3bb/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
index cacf0fa..067f50f 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
@@ -30,7 +31,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
@@ -38,10 +39,13 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PMetaData;
 import org.apache.phoenix.schema.PTable;
@@ -634,6 +638,94 @@ public class PartialIndexRebuilderIT extends 
BaseUniqueNamesOwnClusterIT {
         }
     }
 
+    private final static CountDownLatch WAIT_FOR_REBUILD_TO_START = new 
CountDownLatch(1);
+    private final static CountDownLatch WAIT_FOR_INDEX_WRITE = new 
CountDownLatch(1);
+
+    
+    @Test
+    public void testDisableIndexDuringRebuild() throws Throwable {
+        String schemaName = generateUniqueName();
+        String tableName = generateUniqueName();
+        String indexName = generateUniqueName();
+        final String fullTableName = SchemaUtil.getTableName(schemaName, 
tableName);
+        final String fullIndexName = SchemaUtil.getTableName(schemaName, 
indexName);
+        PTableKey key = new PTableKey(null,fullTableName);
+        final MyClock clock = new MyClock(1000);
+        EnvironmentEdgeManager.injectEdge(clock);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            PMetaData metaCache = 
conn.unwrap(PhoenixConnection.class).getMetaDataCache();
+            conn.createStatement().execute("CREATE TABLE " + fullTableName + 
"(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR, v3 VARCHAR) 
COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true");
+            clock.time += 100;
+            conn.createStatement().execute("CREATE INDEX " + indexName + " ON 
" + fullTableName + " (v1, v2) INCLUDE (v3)");
+            clock.time += 100;
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES('a','a','0','x')");
+            conn.commit();
+            clock.time += 100;
+            try (HTableInterface metaTable = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES))
 {
+                // By using an INDEX_DISABLE_TIMESTAMP of 0, we prevent the 
partial index rebuilder from triggering
+                IndexUtil.updateIndexState(fullIndexName, 0L, metaTable, 
PIndexState.DISABLE);
+                clock.time += 100;
+                long disableTime = clock.currentTime();
+                // Set some values while index disabled
+                conn.createStatement().execute("UPSERT INTO " + fullTableName 
+ " VALUES('b','bb', '11','yy')");
+                conn.commit();
+                clock.time += 100;
+                assertTrue(hasDisabledIndex(metaCache, key));
+                conn.createStatement().execute("UPSERT INTO " + fullTableName 
+ " VALUES('a','ccc','222','zzz')");
+                conn.commit();
+                clock.time += 100;
+                conn.createStatement().execute("UPSERT INTO " + fullTableName 
+ " VALUES('a','dddd','3333','zzzz')");
+                conn.commit();
+                clock.time += 100;
+                // Will cause partial index rebuilder to be triggered
+                IndexUtil.updateIndexState(fullIndexName, disableTime, 
metaTable, PIndexState.DISABLE);
+                final CountDownLatch doneSignal = new CountDownLatch(1);
+                advanceClockUntilPartialRebuildStarts(fullIndexName, clock, 
doneSignal);
+                // Set some values while index is in INACTIVE state
+                clock.time += 100;
+                conn.createStatement().execute("UPSERT INTO " + fullTableName 
+ " VALUES('a','eeeee','44444','zzzzz')");
+                conn.commit();
+                clock.time += 100;
+                conn.createStatement().execute("UPSERT INTO " + fullTableName 
+ " VALUES('a','fffff','55555','zzzzzz')");
+                conn.commit();
+                doneSignal.await(30, TimeUnit.SECONDS);
+                // Install coprocessor that will simulate an index write 
failure during index rebuild
+                addWriteFailingCoprocessor(conn,fullIndexName);
+                clock.time += WAIT_AFTER_DISABLED;
+                doneSignal.await(30, TimeUnit.SECONDS);
+                WAIT_FOR_REBUILD_TO_START.await(30, TimeUnit.SECONDS);
+                // By using an INDEX_DISABLE_TIMESTAMP of 0, we prevent the 
partial index rebuilder from triggering
+                IndexUtil.updateIndexState(fullIndexName, 0L, metaTable, 
PIndexState.DISABLE);
+                   clock.time += 100;
+                   disableTime = clock.currentTime();
+                   // Set some values while index disabled
+                   conn.createStatement().execute("UPSERT INTO " + 
fullTableName + " VALUES('b','bbbbb', '11','yy')");
+                   conn.commit();
+                   clock.time += 100;
+                   conn.createStatement().execute("UPSERT INTO " + 
fullTableName + " VALUES('a','cccccc','222','zzz')");
+                   conn.commit();
+                   clock.time += 100;
+                   conn.createStatement().execute("UPSERT INTO " + 
fullTableName + " VALUES('a','ddddddd','3333','zzzz')");
+                   conn.commit();
+                   clock.time += 100;
+                   // Simulates another write failure. Should cause current 
run of rebuilder to fail and retry again later
+                   IndexUtil.updateIndexState(fullIndexName, disableTime, 
metaTable, PIndexState.DISABLE);
+                removeWriteFailingCoprocessor(conn,fullIndexName);
+                   WAIT_FOR_INDEX_WRITE.countDown();
+            }
+            // Original rebuilder should have failed
+            
+            advanceClockUntilPartialRebuildStarts(fullIndexName, clock);
+            clock.time += WAIT_AFTER_DISABLED * 2;
+            // Enough time has passed, so rebuild will start now
+            TestUtil.waitForIndexRebuild(conn, fullIndexName, 
PIndexState.ACTIVE);
+            clock.time += 100;
+            IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
+        } finally {
+            EnvironmentEdgeManager.injectEdge(null);
+        }
+    }
+
     @Test
     public void testDeleteAndUpsertValuesAtSameTS1() throws Throwable {
         String schemaName = generateUniqueName();
@@ -751,15 +843,58 @@ public class PartialIndexRebuilderIT extends 
BaseUniqueNamesOwnClusterIT {
         t.start();
     }
     
-    public static class DelayingRegionObserver extends SimpleRegionObserver {
-        @Override
-        public void 
preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, 
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
-            try {
-                Thread.sleep(Math.abs(RAND.nextInt()) % 10);
-            } catch (InterruptedException e) {
+    private static void addWriteFailingCoprocessor(Connection conn, String 
tableName) throws Exception {
+        int priority = QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY + 100;
+        ConnectionQueryServices services = 
conn.unwrap(PhoenixConnection.class).getQueryServices();
+        HTableDescriptor descriptor = 
services.getTableDescriptor(Bytes.toBytes(tableName));
+        descriptor.addCoprocessor(WriteFailingRegionObserver.class.getName(), 
null, priority, null);
+        int numTries = 10;
+        try (HBaseAdmin admin = services.getAdmin()) {
+            admin.modifyTable(Bytes.toBytes(tableName), descriptor);
+            while 
(!admin.getTableDescriptor(Bytes.toBytes(tableName)).equals(descriptor)
+                    && numTries > 0) {
+                numTries--;
+                if (numTries == 0) {
+                    throw new Exception(
+                            "Check to detect if delaying co-processor was 
added failed after "
+                                    + numTries + " retries.");
+                }
+                Thread.sleep(1000);
             }
-            
         }
     }
     
+    private static void removeWriteFailingCoprocessor(Connection conn, String 
tableName) throws Exception {
+        ConnectionQueryServices services = 
conn.unwrap(PhoenixConnection.class).getQueryServices();
+        HTableDescriptor descriptor = 
services.getTableDescriptor(Bytes.toBytes(tableName));
+        
descriptor.removeCoprocessor(WriteFailingRegionObserver.class.getName());
+        int numTries = 10;
+        try (HBaseAdmin admin = services.getAdmin()) {
+            admin.modifyTable(Bytes.toBytes(tableName), descriptor);
+            while 
(!admin.getTableDescriptor(Bytes.toBytes(tableName)).equals(descriptor)
+                    && numTries > 0) {
+                numTries--;
+                if (numTries == 0) {
+                    throw new Exception(
+                            "Check to detect if delaying co-processor was 
removed failed after "
+                                    + numTries + " retries.");
+                }
+                Thread.sleep(1000);
+            }
+        }
+    }
+    
+    public static class WriteFailingRegionObserver extends 
SimpleRegionObserver {
+        @Override
+        public void 
postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, 
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+               WAIT_FOR_REBUILD_TO_START.countDown();
+               try {
+                               WAIT_FOR_INDEX_WRITE.await(30, 
TimeUnit.SECONDS);
+                       } catch (InterruptedException e) {
+                               Thread.interrupted();
+                               throw new IOException(e);
+                       }
+        }
+    }
+
 }

Reply via email to