Repository: phoenix Updated Branches: refs/heads/4.x-HBase-1.1 63779600d -> 2c38afffd
PHOENIX-4156 Fix flapping MutableIndexFailureIT Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/2c38afff Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/2c38afff Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/2c38afff Branch: refs/heads/4.x-HBase-1.1 Commit: 2c38afffd1a9363bc7892706eb69bc476a634e08 Parents: 6377960 Author: Samarth Jain <sama...@apache.org> Authored: Tue Sep 5 13:51:26 2017 -0700 Committer: Samarth Jain <sama...@apache.org> Committed: Tue Sep 5 13:51:26 2017 -0700 ---------------------------------------------------------------------- .../end2end/index/MutableIndexFailureIT.java | 79 +++++++++++++++++--- .../coprocessor/MetaDataRegionObserver.java | 12 ++- .../org/apache/phoenix/query/QueryServices.java | 1 + .../phoenix/query/QueryServicesOptions.java | 1 + 4 files changed, 82 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/2c38afff/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java index 0abd5ae..5797819 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java @@ -44,15 +44,21 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.coprocessor.MetaDataRegionObserver; +import org.apache.phoenix.coprocessor.MetaDataRegionObserver.BuildIndexScheduleTask; import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; import org.apache.phoenix.execute.CommitException; import org.apache.phoenix.hbase.index.write.IndexWriterUtils; import org.apache.phoenix.index.PhoenixIndexFailurePolicy; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.query.BaseTest; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PIndexState; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.util.MetaDataUtil; @@ -102,6 +108,10 @@ public class MutableIndexFailureIT extends BaseTest { private final boolean throwIndexWriteFailure; private String schema = generateUniqueName(); private List<CommitException> exceptions = Lists.newArrayList(); + private static RegionCoprocessorEnvironment indexRebuildTaskRegionEnvironment; + private static final int forwardOverlapMs = 1000; + private static final int disableTimestampThresholdMs = 10000; + private static final int numRpcRetries = 2; public MutableIndexFailureIT(boolean transactional, boolean localIndex, boolean isNamespaceMapped, Boolean disableIndexOnWriteFailure, Boolean rebuildIndexOnWriteFailure, boolean failRebuildTask, Boolean throwIndexWriteFailure) { this.transactional = transactional; @@ -130,15 +140,27 @@ public class MutableIndexFailureIT extends BaseTest { serverProps.put(IndexWriterUtils.INDEX_WRITER_RPC_PAUSE, "5000"); serverProps.put("data.tx.snapshot.dir", "/tmp"); serverProps.put("hbase.balancer.period", String.valueOf(Integer.MAX_VALUE)); - serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, Boolean.TRUE.toString()); - serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, "4000"); - serverProps.put(QueryServices.INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD, "30000"); // give up rebuilding after 30 seconds // need to override rpc retries otherwise test doesn't pass - serverProps.put(QueryServices.INDEX_REBUILD_RPC_RETRIES_COUNTER, Long.toString(1)); - serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_FORWARD_TIME_ATTRIB, Long.toString(1000)); + serverProps.put(QueryServices.INDEX_REBUILD_RPC_RETRIES_COUNTER, Long.toString(numRpcRetries)); + serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_FORWARD_TIME_ATTRIB, Long.toString(forwardOverlapMs)); + /* + * Effectively disable running the index rebuild task by having an infinite delay + * because we want to control it's execution ourselves + */ + serverProps.put(QueryServices.INDEX_REBUILD_TASK_INITIAL_DELAY, Long.toString(Long.MAX_VALUE)); + serverProps.put(QueryServices.INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD, Long.toString(disableTimestampThresholdMs)); Map<String, String> clientProps = Collections.singletonMap(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString()); NUM_SLAVES_BASE = 4; setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator())); + indexRebuildTaskRegionEnvironment = + (RegionCoprocessorEnvironment) getUtility() + .getRSForFirstRegionInTable( + PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME) + .getOnlineRegions(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME) + .get(0).getCoprocessorHost() + .findCoprocessorEnvironment(MetaDataRegionObserver.class.getName()); + MetaDataRegionObserver.initRebuildIndexConnectionProps( + indexRebuildTaskRegionEnvironment.getConfiguration()); } @Parameters(name = "MutableIndexFailureIT_transactional={0},localIndex={1},isNamespaceMapped={2},disableIndexOnWriteFailure={3},rebuildIndexOnWriteFailure={4},failRebuildTask={5},throwIndexWriteFailure={6}") // name is used by failsafe as file name in reports @@ -170,6 +192,43 @@ public class MutableIndexFailureIT extends BaseTest { ); } + private void runRebuildTask(Connection conn) throws InterruptedException, SQLException { + BuildIndexScheduleTask task = + new MetaDataRegionObserver.BuildIndexScheduleTask( + indexRebuildTaskRegionEnvironment); + dumpStateOfIndexes(conn, fullTableName, true); + task.run(); + dumpStateOfIndexes(conn, fullTableName, false); + Thread.sleep(forwardOverlapMs + 100); + if (failRebuildTask) { + Thread.sleep(disableTimestampThresholdMs + 100); + } + dumpStateOfIndexes(conn, fullTableName, true); + task.run(); + dumpStateOfIndexes(conn, fullTableName, false); + } + + private static final void dumpStateOfIndexes(Connection conn, String tableName, + boolean beforeRebuildTaskRun) throws SQLException { + PhoenixConnection phxConn = conn.unwrap(PhoenixConnection.class); + PTable table = phxConn.getTable(new PTableKey(phxConn.getTenantId(), tableName)); + List<PTable> indexes = table.getIndexes(); + String s = beforeRebuildTaskRun ? "before rebuild run" : "after rebuild run"; + System.out.println("************Index state in connection " + s + "******************"); + for (PTable idx : indexes) { + System.out.println( + "Index Name: " + idx.getName().getString() + " State: " + idx.getIndexState() + + " Disable timestamp: " + idx.getIndexDisableTimestamp()); + } + System.out.println("************Index state from server " + s + "******************"); + table = PhoenixRuntime.getTableNoCache(phxConn, fullTableName); + for (PTable idx : table.getIndexes()) { + System.out.println( + "Index Name: " + idx.getName().getString() + " State: " + idx.getIndexState() + + " Disable timestamp: " + idx.getIndexDisableTimestamp()); + } + } + @Test public void testIndexWriteFailure() throws Exception { String secondIndexName = "B_" + FailingRegionObserver.FAIL_INDEX_NAME; @@ -284,8 +343,9 @@ public class MutableIndexFailureIT extends BaseTest { // re-enable index table FailingRegionObserver.FAIL_WRITE = false; if (rebuildIndexOnWriteFailure) { + runRebuildTask(conn); // wait for index to be rebuilt automatically - waitForIndexRebuild(conn,fullIndexName, PIndexState.ACTIVE); + checkStateAfterRebuild(conn, fullIndexName, PIndexState.ACTIVE); } else { // simulate replaying failed mutation replayMutations(); @@ -305,7 +365,8 @@ public class MutableIndexFailureIT extends BaseTest { // Wait for index to be rebuilt automatically. This should fail because // we haven't flipped the FAIL_WRITE flag to false and as a result this // should cause index rebuild to fail too. - waitForIndexRebuild(conn, fullIndexName, PIndexState.DISABLE); + runRebuildTask(conn); + checkStateAfterRebuild(conn, fullIndexName, PIndexState.DISABLE); // verify that the index was marked as disabled and the index disable // timestamp set to 0 String q = @@ -325,9 +386,9 @@ public class MutableIndexFailureIT extends BaseTest { } } - private void waitForIndexRebuild(Connection conn, String fullIndexName, PIndexState expectedIndexState) throws InterruptedException, SQLException { + private void checkStateAfterRebuild(Connection conn, String fullIndexName, PIndexState expectedIndexState) throws InterruptedException, SQLException { if (!transactional) { - TestUtil.waitForIndexRebuild(conn, fullIndexName, expectedIndexState); + assertTrue(TestUtil.checkIndexState(conn,fullIndexName, expectedIndexState, 0l)); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/2c38afff/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java index 5bfc2e2..e42aca2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java @@ -87,6 +87,7 @@ import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.UpgradeUtil; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -104,6 +105,8 @@ public class MetaDataRegionObserver extends BaseRegionObserver { private static Map<PName, Long> batchExecutedPerTableMap = new HashMap<PName, Long>(); @GuardedBy("MetaDataRegionObserver.class") private static Properties rebuildIndexConnectionProps; + // Added for test purposes + private long initialRebuildTaskDelay; @Override public void preClose(final ObserverContext<RegionCoprocessorEnvironment> c, @@ -135,6 +138,10 @@ public class MetaDataRegionObserver extends BaseRegionObserver { config.getLong( QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL); + initialRebuildTaskDelay = + config.getLong( + QueryServices.INDEX_REBUILD_TASK_INITIAL_DELAY, + QueryServicesOptions.DEFAULT_INDEX_REBUILD_TASK_INITIAL_DELAY); } @Override @@ -190,7 +197,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver { initRebuildIndexConnectionProps(e.getEnvironment().getConfiguration()); // starts index rebuild schedule work BuildIndexScheduleTask task = new BuildIndexScheduleTask(e.getEnvironment()); - executor.scheduleWithFixedDelay(task, 10000, rebuildIndexTimeInterval, TimeUnit.MILLISECONDS); + executor.scheduleWithFixedDelay(task, initialRebuildTaskDelay, rebuildIndexTimeInterval, TimeUnit.MILLISECONDS); } catch (ClassNotFoundException ex) { LOG.error("BuildIndexScheduleTask cannot start!", ex); } @@ -537,7 +544,8 @@ public class MetaDataRegionObserver extends BaseRegionObserver { put); } - private static synchronized void initRebuildIndexConnectionProps(Configuration config) { + @VisibleForTesting + public static synchronized void initRebuildIndexConnectionProps(Configuration config) { if (rebuildIndexConnectionProps == null) { Properties props = new Properties(); long indexRebuildQueryTimeoutMs = http://git-wip-us.apache.org/repos/asf/phoenix/blob/2c38afff/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 2df7bf0..0c3b25b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -140,6 +140,7 @@ public interface QueryServices extends SQLCloseable { // Time interval to check if there is an index needs to be rebuild public static final String INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB = "phoenix.index.failure.handling.rebuild.interval"; + public static final String INDEX_REBUILD_TASK_INITIAL_DELAY = "phoenix.index.rebuild.task.initial.delay"; public static final String INDEX_FAILURE_HANDLING_REBUILD_NUMBER_OF_BATCHES_PER_TABLE = "phoenix.index.rebuild.batch.perTable"; // If index disable timestamp is older than this threshold, then index rebuild task won't attempt to rebuild it http://git-wip-us.apache.org/repos/asf/phoenix/blob/2c38afff/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 80e8674..4ff65db 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -187,6 +187,7 @@ public class QueryServicesOptions { public static final boolean DEFAULT_INDEX_FAILURE_DISABLE_INDEX = true; public static final boolean DEFAULT_INDEX_FAILURE_THROW_EXCEPTION = true; public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL = 60000; // 60 secs + public static final long DEFAULT_INDEX_REBUILD_TASK_INITIAL_DELAY = 10000; // 10 secs public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_BACKWARD_TIME = 1; // 1 ms public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_FORWARD_TIME = 60000 * 3; // 3 mins // 30 min rpc timeout * 5 tries, with 2100ms total pause time between retries