Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-1.1 63779600d -> 2c38afffd


PHOENIX-4156 Fix flapping MutableIndexFailureIT


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/2c38afff
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/2c38afff
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/2c38afff

Branch: refs/heads/4.x-HBase-1.1
Commit: 2c38afffd1a9363bc7892706eb69bc476a634e08
Parents: 6377960
Author: Samarth Jain <sama...@apache.org>
Authored: Tue Sep 5 13:51:26 2017 -0700
Committer: Samarth Jain <sama...@apache.org>
Committed: Tue Sep 5 13:51:26 2017 -0700

----------------------------------------------------------------------
 .../end2end/index/MutableIndexFailureIT.java    | 79 +++++++++++++++++---
 .../coprocessor/MetaDataRegionObserver.java     | 12 ++-
 .../org/apache/phoenix/query/QueryServices.java |  1 +
 .../phoenix/query/QueryServicesOptions.java     |  1 +
 4 files changed, 82 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/2c38afff/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
index 0abd5ae..5797819 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
@@ -44,15 +44,21 @@ import 
org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.MetaDataRegionObserver;
+import 
org.apache.phoenix.coprocessor.MetaDataRegionObserver.BuildIndexScheduleTask;
 import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
 import org.apache.phoenix.execute.CommitException;
 import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
 import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.BaseTest;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.util.MetaDataUtil;
@@ -102,6 +108,10 @@ public class MutableIndexFailureIT extends BaseTest {
     private final boolean throwIndexWriteFailure;
     private String schema = generateUniqueName();
     private List<CommitException> exceptions = Lists.newArrayList();
+    private static RegionCoprocessorEnvironment 
indexRebuildTaskRegionEnvironment;
+    private static final int forwardOverlapMs = 1000;
+    private static final int disableTimestampThresholdMs = 10000;
+    private static final int numRpcRetries = 2;
 
     public MutableIndexFailureIT(boolean transactional, boolean localIndex, 
boolean isNamespaceMapped, Boolean disableIndexOnWriteFailure, Boolean 
rebuildIndexOnWriteFailure, boolean failRebuildTask, Boolean 
throwIndexWriteFailure) {
         this.transactional = transactional;
@@ -130,15 +140,27 @@ public class MutableIndexFailureIT extends BaseTest {
         serverProps.put(IndexWriterUtils.INDEX_WRITER_RPC_PAUSE, "5000");
         serverProps.put("data.tx.snapshot.dir", "/tmp");
         serverProps.put("hbase.balancer.period", 
String.valueOf(Integer.MAX_VALUE));
-        serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, 
Boolean.TRUE.toString());
-        
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, 
"4000");
-        
serverProps.put(QueryServices.INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD, 
"30000"); // give up rebuilding after 30 seconds
         // need to override rpc retries otherwise test doesn't pass
-        serverProps.put(QueryServices.INDEX_REBUILD_RPC_RETRIES_COUNTER, 
Long.toString(1));
-        
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_FORWARD_TIME_ATTRIB,
 Long.toString(1000));
+        serverProps.put(QueryServices.INDEX_REBUILD_RPC_RETRIES_COUNTER, 
Long.toString(numRpcRetries));
+        
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_FORWARD_TIME_ATTRIB,
 Long.toString(forwardOverlapMs));
+        /*
+         * Effectively disable running the index rebuild task by having an 
infinite delay
+         * because we want to control it's execution ourselves
+         */
+        serverProps.put(QueryServices.INDEX_REBUILD_TASK_INITIAL_DELAY, 
Long.toString(Long.MAX_VALUE));
+        
serverProps.put(QueryServices.INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD, 
Long.toString(disableTimestampThresholdMs));
         Map<String, String> clientProps = 
Collections.singletonMap(QueryServices.TRANSACTIONS_ENABLED, 
Boolean.TRUE.toString());
         NUM_SLAVES_BASE = 4;
         setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), 
new ReadOnlyProps(clientProps.entrySet().iterator()));
+        indexRebuildTaskRegionEnvironment =
+                (RegionCoprocessorEnvironment) getUtility()
+                        .getRSForFirstRegionInTable(
+                            
PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME)
+                        
.getOnlineRegions(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME)
+                        .get(0).getCoprocessorHost()
+                        
.findCoprocessorEnvironment(MetaDataRegionObserver.class.getName());
+        MetaDataRegionObserver.initRebuildIndexConnectionProps(
+            indexRebuildTaskRegionEnvironment.getConfiguration());
     }
 
     @Parameters(name = 
"MutableIndexFailureIT_transactional={0},localIndex={1},isNamespaceMapped={2},disableIndexOnWriteFailure={3},rebuildIndexOnWriteFailure={4},failRebuildTask={5},throwIndexWriteFailure={6}")
 // name is used by failsafe as file name in reports
@@ -170,6 +192,43 @@ public class MutableIndexFailureIT extends BaseTest {
         );
     }
 
+    private void runRebuildTask(Connection conn) throws InterruptedException, 
SQLException {
+        BuildIndexScheduleTask task =
+                new MetaDataRegionObserver.BuildIndexScheduleTask(
+                        indexRebuildTaskRegionEnvironment);
+        dumpStateOfIndexes(conn, fullTableName, true);
+        task.run();
+        dumpStateOfIndexes(conn, fullTableName, false);
+        Thread.sleep(forwardOverlapMs + 100);
+        if (failRebuildTask) {
+            Thread.sleep(disableTimestampThresholdMs + 100);
+        }
+        dumpStateOfIndexes(conn, fullTableName, true);
+        task.run();
+        dumpStateOfIndexes(conn, fullTableName, false);
+    }
+
+    private static final void dumpStateOfIndexes(Connection conn, String 
tableName,
+            boolean beforeRebuildTaskRun) throws SQLException {
+        PhoenixConnection phxConn = conn.unwrap(PhoenixConnection.class);
+        PTable table = phxConn.getTable(new PTableKey(phxConn.getTenantId(), 
tableName));
+        List<PTable> indexes = table.getIndexes();
+        String s = beforeRebuildTaskRun ? "before rebuild run" : "after 
rebuild run";
+        System.out.println("************Index state in connection " + s + 
"******************");
+        for (PTable idx : indexes) {
+            System.out.println(
+                "Index Name: " + idx.getName().getString() + " State: " + 
idx.getIndexState()
+                        + " Disable timestamp: " + 
idx.getIndexDisableTimestamp());
+        }
+        System.out.println("************Index state from server  " + s + 
"******************");
+        table = PhoenixRuntime.getTableNoCache(phxConn, fullTableName);
+        for (PTable idx : table.getIndexes()) {
+            System.out.println(
+                "Index Name: " + idx.getName().getString() + " State: " + 
idx.getIndexState()
+                        + " Disable timestamp: " + 
idx.getIndexDisableTimestamp());
+        }
+    }
+
     @Test
     public void testIndexWriteFailure() throws Exception {
         String secondIndexName = "B_" + FailingRegionObserver.FAIL_INDEX_NAME;
@@ -284,8 +343,9 @@ public class MutableIndexFailureIT extends BaseTest {
                 // re-enable index table
                 FailingRegionObserver.FAIL_WRITE = false;
                 if (rebuildIndexOnWriteFailure) {
+                    runRebuildTask(conn);
                     // wait for index to be rebuilt automatically
-                    waitForIndexRebuild(conn,fullIndexName, 
PIndexState.ACTIVE);
+                    checkStateAfterRebuild(conn, fullIndexName, 
PIndexState.ACTIVE);
                 } else {
                     // simulate replaying failed mutation
                     replayMutations();
@@ -305,7 +365,8 @@ public class MutableIndexFailureIT extends BaseTest {
                 // Wait for index to be rebuilt automatically. This should 
fail because
                 // we haven't flipped the FAIL_WRITE flag to false and as a 
result this
                 // should cause index rebuild to fail too.
-                waitForIndexRebuild(conn, fullIndexName, PIndexState.DISABLE);
+                runRebuildTask(conn);
+                checkStateAfterRebuild(conn, fullIndexName, 
PIndexState.DISABLE);
                 // verify that the index was marked as disabled and the index 
disable
                 // timestamp set to 0
                 String q =
@@ -325,9 +386,9 @@ public class MutableIndexFailureIT extends BaseTest {
         }
     }
 
-    private void waitForIndexRebuild(Connection conn, String fullIndexName, 
PIndexState expectedIndexState) throws InterruptedException, SQLException {
+    private void checkStateAfterRebuild(Connection conn, String fullIndexName, 
PIndexState expectedIndexState) throws InterruptedException, SQLException {
         if (!transactional) {
-            TestUtil.waitForIndexRebuild(conn, fullIndexName, 
expectedIndexState);
+            assertTrue(TestUtil.checkIndexState(conn,fullIndexName, 
expectedIndexState, 0l));
         }
     }
     

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2c38afff/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
index 5bfc2e2..e42aca2 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
@@ -87,6 +87,7 @@ import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.UpgradeUtil;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
@@ -104,6 +105,8 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
     private static Map<PName, Long> batchExecutedPerTableMap = new 
HashMap<PName, Long>();
     @GuardedBy("MetaDataRegionObserver.class")
     private static Properties rebuildIndexConnectionProps;
+    // Added for test purposes
+    private long initialRebuildTaskDelay;
 
     @Override
     public void preClose(final ObserverContext<RegionCoprocessorEnvironment> c,
@@ -135,6 +138,10 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
                 config.getLong(
                     
QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB,
                     
QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL);
+        initialRebuildTaskDelay =
+                config.getLong(
+                    QueryServices.INDEX_REBUILD_TASK_INITIAL_DELAY,
+                    
QueryServicesOptions.DEFAULT_INDEX_REBUILD_TASK_INITIAL_DELAY);
     }
     
     @Override
@@ -190,7 +197,7 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
             
initRebuildIndexConnectionProps(e.getEnvironment().getConfiguration());
             // starts index rebuild schedule work
             BuildIndexScheduleTask task = new 
BuildIndexScheduleTask(e.getEnvironment());
-            executor.scheduleWithFixedDelay(task, 10000, 
rebuildIndexTimeInterval, TimeUnit.MILLISECONDS);
+            executor.scheduleWithFixedDelay(task, initialRebuildTaskDelay, 
rebuildIndexTimeInterval, TimeUnit.MILLISECONDS);
         } catch (ClassNotFoundException ex) {
             LOG.error("BuildIndexScheduleTask cannot start!", ex);
         }
@@ -537,7 +544,8 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
                                put);
        }
 
-    private static synchronized void 
initRebuildIndexConnectionProps(Configuration config) {
+    @VisibleForTesting
+    public static synchronized void 
initRebuildIndexConnectionProps(Configuration config) {
         if (rebuildIndexConnectionProps == null) {
             Properties props = new Properties();
             long indexRebuildQueryTimeoutMs =

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2c38afff/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 2df7bf0..0c3b25b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -140,6 +140,7 @@ public interface QueryServices extends SQLCloseable {
     // Time interval to check if there is an index needs to be rebuild
     public static final String INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB =
         "phoenix.index.failure.handling.rebuild.interval";
+    public static final String INDEX_REBUILD_TASK_INITIAL_DELAY = 
"phoenix.index.rebuild.task.initial.delay";
     
     public static final String 
INDEX_FAILURE_HANDLING_REBUILD_NUMBER_OF_BATCHES_PER_TABLE = 
"phoenix.index.rebuild.batch.perTable";
     // If index disable timestamp is older than this threshold, then index 
rebuild task won't attempt to rebuild it

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2c38afff/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 80e8674..4ff65db 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -187,6 +187,7 @@ public class QueryServicesOptions {
     public static final boolean DEFAULT_INDEX_FAILURE_DISABLE_INDEX = true;
     public static final boolean DEFAULT_INDEX_FAILURE_THROW_EXCEPTION = true;
     public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL = 
60000; // 60 secs
+    public static final long DEFAULT_INDEX_REBUILD_TASK_INITIAL_DELAY = 10000; 
// 10 secs
     public static final long 
DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_BACKWARD_TIME = 1; // 1 ms
     public static final long 
DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_FORWARD_TIME = 60000 * 3; // 3 
mins
     // 30 min rpc timeout * 5 tries, with 2100ms total pause time between 
retries

Reply via email to