PHOENIX-4173 Ensure that the rebuild fails if an index that transitions back to disabled while rebuilding
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/6c5bc3bb Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/6c5bc3bb Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/6c5bc3bb Branch: refs/heads/master Commit: 6c5bc3bba7732357bf3fc4ab39e7fda10e97539e Parents: 28aebd6 Author: James Taylor <jamestay...@apache.org> Authored: Wed Sep 6 12:46:34 2017 -0700 Committer: James Taylor <jamestay...@apache.org> Committed: Wed Sep 6 18:24:51 2017 -0700 ---------------------------------------------------------------------- .../end2end/index/PartialIndexRebuilderIT.java | 151 ++++++++++++++++++- 1 file changed, 143 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/6c5bc3bb/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java index cacf0fa..067f50f 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; @@ -30,7 +31,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; @@ -38,10 +39,13 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; +import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PMetaData; import org.apache.phoenix.schema.PTable; @@ -634,6 +638,94 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { } } + private final static CountDownLatch WAIT_FOR_REBUILD_TO_START = new CountDownLatch(1); + private final static CountDownLatch WAIT_FOR_INDEX_WRITE = new CountDownLatch(1); + + + @Test + public void testDisableIndexDuringRebuild() throws Throwable { + String schemaName = generateUniqueName(); + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + final String fullTableName = SchemaUtil.getTableName(schemaName, tableName); + final String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); + PTableKey key = new PTableKey(null,fullTableName); + final MyClock clock = new MyClock(1000); + EnvironmentEdgeManager.injectEdge(clock); + try (Connection conn = DriverManager.getConnection(getUrl())) { + PMetaData metaCache = conn.unwrap(PhoenixConnection.class).getMetaDataCache(); + conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR, v3 VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true"); + clock.time += 100; + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v1, v2) INCLUDE (v3)"); + clock.time += 100; + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a','0','x')"); + conn.commit(); + clock.time += 100; + try (HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES)) { + // By using an INDEX_DISABLE_TIMESTAMP of 0, we prevent the partial index rebuilder from triggering + IndexUtil.updateIndexState(fullIndexName, 0L, metaTable, PIndexState.DISABLE); + clock.time += 100; + long disableTime = clock.currentTime(); + // Set some values while index disabled + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb', '11','yy')"); + conn.commit(); + clock.time += 100; + assertTrue(hasDisabledIndex(metaCache, key)); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc','222','zzz')"); + conn.commit(); + clock.time += 100; + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','dddd','3333','zzzz')"); + conn.commit(); + clock.time += 100; + // Will cause partial index rebuilder to be triggered + IndexUtil.updateIndexState(fullIndexName, disableTime, metaTable, PIndexState.DISABLE); + final CountDownLatch doneSignal = new CountDownLatch(1); + advanceClockUntilPartialRebuildStarts(fullIndexName, clock, doneSignal); + // Set some values while index is in INACTIVE state + clock.time += 100; + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','eeeee','44444','zzzzz')"); + conn.commit(); + clock.time += 100; + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','fffff','55555','zzzzzz')"); + conn.commit(); + doneSignal.await(30, TimeUnit.SECONDS); + // Install coprocessor that will simulate an index write failure during index rebuild + addWriteFailingCoprocessor(conn,fullIndexName); + clock.time += WAIT_AFTER_DISABLED; + doneSignal.await(30, TimeUnit.SECONDS); + WAIT_FOR_REBUILD_TO_START.await(30, TimeUnit.SECONDS); + // By using an INDEX_DISABLE_TIMESTAMP of 0, we prevent the partial index rebuilder from triggering + IndexUtil.updateIndexState(fullIndexName, 0L, metaTable, PIndexState.DISABLE); + clock.time += 100; + disableTime = clock.currentTime(); + // Set some values while index disabled + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bbbbb', '11','yy')"); + conn.commit(); + clock.time += 100; + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','cccccc','222','zzz')"); + conn.commit(); + clock.time += 100; + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ddddddd','3333','zzzz')"); + conn.commit(); + clock.time += 100; + // Simulates another write failure. Should cause current run of rebuilder to fail and retry again later + IndexUtil.updateIndexState(fullIndexName, disableTime, metaTable, PIndexState.DISABLE); + removeWriteFailingCoprocessor(conn,fullIndexName); + WAIT_FOR_INDEX_WRITE.countDown(); + } + // Original rebuilder should have failed + + advanceClockUntilPartialRebuildStarts(fullIndexName, clock); + clock.time += WAIT_AFTER_DISABLED * 2; + // Enough time has passed, so rebuild will start now + TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE); + clock.time += 100; + IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName); + } finally { + EnvironmentEdgeManager.injectEdge(null); + } + } + @Test public void testDeleteAndUpsertValuesAtSameTS1() throws Throwable { String schemaName = generateUniqueName(); @@ -751,15 +843,58 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { t.start(); } - public static class DelayingRegionObserver extends SimpleRegionObserver { - @Override - public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException { - try { - Thread.sleep(Math.abs(RAND.nextInt()) % 10); - } catch (InterruptedException e) { + private static void addWriteFailingCoprocessor(Connection conn, String tableName) throws Exception { + int priority = QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY + 100; + ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices(); + HTableDescriptor descriptor = services.getTableDescriptor(Bytes.toBytes(tableName)); + descriptor.addCoprocessor(WriteFailingRegionObserver.class.getName(), null, priority, null); + int numTries = 10; + try (HBaseAdmin admin = services.getAdmin()) { + admin.modifyTable(Bytes.toBytes(tableName), descriptor); + while (!admin.getTableDescriptor(Bytes.toBytes(tableName)).equals(descriptor) + && numTries > 0) { + numTries--; + if (numTries == 0) { + throw new Exception( + "Check to detect if delaying co-processor was added failed after " + + numTries + " retries."); + } + Thread.sleep(1000); } - } } + private static void removeWriteFailingCoprocessor(Connection conn, String tableName) throws Exception { + ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices(); + HTableDescriptor descriptor = services.getTableDescriptor(Bytes.toBytes(tableName)); + descriptor.removeCoprocessor(WriteFailingRegionObserver.class.getName()); + int numTries = 10; + try (HBaseAdmin admin = services.getAdmin()) { + admin.modifyTable(Bytes.toBytes(tableName), descriptor); + while (!admin.getTableDescriptor(Bytes.toBytes(tableName)).equals(descriptor) + && numTries > 0) { + numTries--; + if (numTries == 0) { + throw new Exception( + "Check to detect if delaying co-processor was removed failed after " + + numTries + " retries."); + } + Thread.sleep(1000); + } + } + } + + public static class WriteFailingRegionObserver extends SimpleRegionObserver { + @Override + public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { + WAIT_FOR_REBUILD_TO_START.countDown(); + try { + WAIT_FOR_INDEX_WRITE.await(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.interrupted(); + throw new IOException(e); + } + } + } + }