Repository: phoenix Updated Branches: refs/heads/master ad52201e0 -> dd5642ff5
PHOENIX-4170 Remove rebuildIndexOnFailure param from MutableIndexFailureIT Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/dd5642ff Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/dd5642ff Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/dd5642ff Branch: refs/heads/master Commit: dd5642ff55cbc829765114d0be051cb48081e4a6 Parents: ad52201 Author: Samarth Jain <sama...@apache.org> Authored: Wed Sep 6 17:54:01 2017 -0700 Committer: Samarth Jain <sama...@apache.org> Committed: Wed Sep 6 17:54:01 2017 -0700 ---------------------------------------------------------------------- .../end2end/BaseClientManagedTimeIT.java | 2 +- .../phoenix/end2end/BaseHBaseManagedTimeIT.java | 2 +- .../org/apache/phoenix/end2end/NotQueryIT.java | 1 - .../end2end/ParallelStatsDisabledIT.java | 2 +- .../phoenix/end2end/ParallelStatsEnabledIT.java | 2 +- .../end2end/index/MutableIndexFailureIT.java | 82 +++++++------------- .../apache/phoenix/rpc/PhoenixServerRpcIT.java | 5 +- .../phoenix/query/QueryServicesOptions.java | 4 +- .../java/org/apache/phoenix/query/BaseTest.java | 73 +++++++++-------- 9 files changed, 75 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/dd5642ff/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseClientManagedTimeIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseClientManagedTimeIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseClientManagedTimeIT.java index dee7200..ee038e6 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseClientManagedTimeIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseClientManagedTimeIT.java @@ -74,6 +74,6 @@ public abstract class BaseClientManagedTimeIT extends BaseTest { @AfterClass public static void doTeardown() throws Exception { - dropNonSystemTables(); + dropNonSystemTables(true); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/dd5642ff/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseHBaseManagedTimeIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseHBaseManagedTimeIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseHBaseManagedTimeIT.java index 7439b1d..367eb22 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseHBaseManagedTimeIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseHBaseManagedTimeIT.java @@ -59,7 +59,7 @@ public abstract class BaseHBaseManagedTimeIT extends BaseTest { @AfterClass public static void doTeardown() throws Exception { - dropNonSystemTables(); + dropNonSystemTables(true); } @After http://git-wip-us.apache.org/repos/asf/phoenix/blob/dd5642ff/phoenix-core/src/it/java/org/apache/phoenix/end2end/NotQueryIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/NotQueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/NotQueryIT.java index 9a285ff..148d14f 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/NotQueryIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/NotQueryIT.java @@ -38,7 +38,6 @@ import java.sql.ResultSet; import java.util.Collection; import java.util.Properties; -import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; import org.junit.Test; import org.junit.runner.RunWith; http://git-wip-us.apache.org/repos/asf/phoenix/blob/dd5642ff/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsDisabledIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsDisabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsDisabledIT.java index fb980a3..0a1fbd8 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsDisabledIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsDisabledIT.java @@ -39,6 +39,6 @@ public abstract class ParallelStatsDisabledIT extends BaseTest { @AfterClass public static void tearDownMiniCluster() throws Exception { - BaseTest.tearDownMiniClusterIfBeyondThreshold(); + BaseTest.dropNonSystemTables(false); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/dd5642ff/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsEnabledIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsEnabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsEnabledIT.java index a62d50d..ea2d9db 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsEnabledIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsEnabledIT.java @@ -47,6 +47,6 @@ public abstract class ParallelStatsEnabledIT extends BaseTest { @AfterClass public static void tearDownMiniCluster() throws Exception { - BaseTest.tearDownMiniClusterIfBeyondThreshold(); + BaseTest.dropNonSystemTables(false); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/dd5642ff/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java index 1f425cf..dbac5a9 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java @@ -24,7 +24,6 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.sql.Connection; -import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; @@ -101,7 +100,6 @@ public class MutableIndexFailureIT extends BaseTest { private final String tableDDLOptions; private final boolean isNamespaceMapped; private final boolean leaveIndexActiveOnFailure; - private final boolean rebuildIndexOnWriteFailure; private final boolean failRebuildTask; private final boolean throwIndexWriteFailure; private String schema = generateUniqueName(); @@ -111,12 +109,11 @@ public class MutableIndexFailureIT extends BaseTest { private static final int disableTimestampThresholdMs = 10000; private static final int numRpcRetries = 2; - public MutableIndexFailureIT(boolean transactional, boolean localIndex, boolean isNamespaceMapped, Boolean disableIndexOnWriteFailure, Boolean rebuildIndexOnWriteFailure, boolean failRebuildTask, Boolean throwIndexWriteFailure) { + public MutableIndexFailureIT(boolean transactional, boolean localIndex, boolean isNamespaceMapped, Boolean disableIndexOnWriteFailure, boolean failRebuildTask, Boolean throwIndexWriteFailure) { this.transactional = transactional; this.localIndex = localIndex; this.tableDDLOptions = " SALT_BUCKETS=2 " + (transactional ? ", TRANSACTIONAL=true " : "") + (disableIndexOnWriteFailure == null ? "" : (", " + PhoenixIndexFailurePolicy.DISABLE_INDEX_ON_WRITE_FAILURE + "=" + disableIndexOnWriteFailure)) - + (rebuildIndexOnWriteFailure == null ? "" : (", " + PhoenixIndexFailurePolicy.REBUILD_INDEX_ON_WRITE_FAILURE + "=" + rebuildIndexOnWriteFailure)) + (throwIndexWriteFailure == null ? "" : (", " + PhoenixIndexFailurePolicy.THROW_INDEX_WRITE_FAILURE + "=" + throwIndexWriteFailure)); this.tableName = FailingRegionObserver.FAIL_TABLE_NAME; this.indexName = "A_" + FailingRegionObserver.FAIL_INDEX_NAME; @@ -124,7 +121,6 @@ public class MutableIndexFailureIT extends BaseTest { this.fullIndexName = SchemaUtil.getTableName(schema, indexName); this.isNamespaceMapped = isNamespaceMapped; this.leaveIndexActiveOnFailure = ! (disableIndexOnWriteFailure == null ? QueryServicesOptions.DEFAULT_INDEX_FAILURE_DISABLE_INDEX : disableIndexOnWriteFailure); - this.rebuildIndexOnWriteFailure = ! Boolean.FALSE.equals(rebuildIndexOnWriteFailure); this.failRebuildTask = failRebuildTask; this.throwIndexWriteFailure = ! Boolean.FALSE.equals(throwIndexWriteFailure); } @@ -161,31 +157,31 @@ public class MutableIndexFailureIT extends BaseTest { indexRebuildTaskRegionEnvironment.getConfiguration()); } - @Parameters(name = "MutableIndexFailureIT_transactional={0},localIndex={1},isNamespaceMapped={2},disableIndexOnWriteFailure={3},rebuildIndexOnWriteFailure={4},failRebuildTask={5},throwIndexWriteFailure={6}") // name is used by failsafe as file name in reports + @Parameters(name = "MutableIndexFailureIT_transactional={0},localIndex={1},isNamespaceMapped={2},disableIndexOnWriteFailure={3},failRebuildTask={4},throwIndexWriteFailure={5}") // name is used by failsafe as file name in reports public static List<Object[]> data() { return Arrays.asList(new Object[][] { - { false, false, false, true, true, false, false}, - { false, false, true, true, true, false, null}, - { false, false, true, true, true, false, true}, - { false, false, false, true, true, false, null}, - { true, false, false, true, true, false, null}, - { true, false, true, true, true, false, null}, - { false, true, true, true, true, false, null}, - { false, true, false, null, null, false, null}, - { true, true, false, true, null, false, null}, - { true, true, true, null, true, false, null}, + { false, false, false, true, false, false}, + { false, false, true, true, false, null}, + { false, false, true, true, false, true}, + { false, false, false, true, false, null}, + { true, false, false, true, false, null}, + { true, false, true, true, false, null}, + { false, true, true, true, false, null}, + { false, true, false, null, false, null}, + { true, true, false, true, false, null}, + { true, true, true, null, false, null}, - { false, false, false, false, true, false, null}, - { false, true, false, false, null, false, null}, - { false, false, false, false, false, false, null}, - { false, false, false, true, true, false, null}, - { false, false, false, true, true, false, null}, - { false, true, false, true, true, false, null}, - { false, true, false, true, true, false, null}, - { false, false, false, true, true, true, null}, - { false, false, true, true, true, true, null}, - { false, false, false, true, true, true, false}, - { false, false, true, true, true, true, false}, + { false, false, false, false, false, null}, + { false, true, false, false, false, null}, + { false, false, false, false, false, null}, + { false, false, false, true, false, null}, + { false, false, false, true, false, null}, + { false, true, false, true, false, null}, + { false, true, false, true, false, null}, + { false, false, false, true, true, null}, + { false, false, true, true, true, null}, + { false, false, false, true, true, false}, + { false, false, true, true, true, false}, } ); } @@ -340,15 +336,9 @@ public class MutableIndexFailureIT extends BaseTest { if (!failRebuildTask) { // re-enable index table FailingRegionObserver.FAIL_WRITE = false; - if (rebuildIndexOnWriteFailure) { - runRebuildTask(conn); - // wait for index to be rebuilt automatically - checkStateAfterRebuild(conn, fullIndexName, PIndexState.ACTIVE); - } else { - // simulate replaying failed mutation - replayMutations(); - } - + runRebuildTask(conn); + // wait for index to be rebuilt automatically + checkStateAfterRebuild(conn, fullIndexName, PIndexState.ACTIVE); // Verify UPSERT on data table still works after index table is caught up PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)"); stmt.setString(1, "a3"); @@ -449,25 +439,7 @@ public class MutableIndexFailureIT extends BaseTest { assertFalse(rs.next()); } } - - private void replayMutations() throws SQLException { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - for (int i = 0; i < exceptions.size(); i++) { - CommitException e = exceptions.get(i); - long ts = e.getServerTimestamp(); - props.setProperty(PhoenixRuntime.REPLAY_AT_ATTRIB, Long.toString(ts)); - try (Connection conn = DriverManager.getConnection(getUrl(), props)) { - if (i == 0) { - updateTable(conn, false); - } else if (i == 1) { - updateTableAgain(conn, false); - } else { - fail(); - } - } - } - } - + private void updateTable(Connection conn, boolean commitShouldFail) throws SQLException { PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)"); // Insert new row http://git-wip-us.apache.org/repos/asf/phoenix/blob/dd5642ff/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java index d8a9ed4..1c18667 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java @@ -143,8 +143,9 @@ public class PhoenixServerRpcIT extends BaseUniqueNamesOwnClusterIT { TestPhoenixIndexRpcSchedulerFactory.reset(); createIndex(conn, indexName + "_1"); - // verify that that index queue is used and only once (during Upsert Select on server to build the index) - Mockito.verify(TestPhoenixIndexRpcSchedulerFactory.getIndexRpcExecutor()).dispatch(Mockito.any(CallRunner.class)); + // Verify that that index queue is not used since running upsert select on server side has been disabled + // See PHOENIX-4171 + Mockito.verify(TestPhoenixIndexRpcSchedulerFactory.getIndexRpcExecutor(), Mockito.never()).dispatch(Mockito.any(CallRunner.class)); } finally { conn.close(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/dd5642ff/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 4ff65db..feaf5dd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -329,8 +329,8 @@ public class QueryServicesOptions { // 4.10, psql and CSVBulkLoad // expects binary data to be base 64 // encoded - // RS -> RS calls for upsert select statements are enabled by default - public static final boolean DEFAULT_ENABLE_SERVER_UPSERT_SELECT = true; + // RS -> RS calls for upsert select statements are disabled by default + public static final boolean DEFAULT_ENABLE_SERVER_UPSERT_SELECT = false; private final Configuration config; http://git-wip-us.apache.org/repos/asf/phoenix/blob/dd5642ff/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java index fe6d847..65b5ab0 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java @@ -175,8 +175,9 @@ public abstract class BaseTest { private static final int dropTableTimeout = 300; // 5 mins should be long enough. private static final ThreadFactory factory = new ThreadFactoryBuilder().setDaemon(true) .setNameFormat("DROP-TABLE-BASETEST" + "-thread-%s").build(); + private static final int DROP_HTABLE_SERVICE_THREADS = 10; private static final ExecutorService dropHTableService = Executors - .newSingleThreadExecutor(factory); + .newFixedThreadPool(DROP_HTABLE_SERVICE_THREADS, factory); static { ImmutableMap.Builder<String,String> builder = ImmutableMap.builder(); @@ -473,9 +474,9 @@ public abstract class BaseTest { } } - protected static void dropNonSystemTables() throws Exception { + protected static void dropNonSystemTables(boolean wait) throws Exception { try { - disableAndDropNonSystemTables(); + disableAndDropNonSystemTables(wait); } finally { destroyDriver(); } @@ -1486,52 +1487,56 @@ public abstract class BaseTest { /** * Disable and drop all the tables except SYSTEM.CATALOG and SYSTEM.SEQUENCE */ - private static void disableAndDropNonSystemTables() throws Exception { + private static void disableAndDropNonSystemTables(boolean wait) throws Exception { if (driver == null) return; - HBaseAdmin admin = driver.getConnectionQueryServices(null, null).getAdmin(); - try { + try (HBaseAdmin admin = driver.getConnectionQueryServices(null, null).getAdmin()) { HTableDescriptor[] tables = admin.listTables(); + Integer currentTableCount = null; + if (!wait) { + /* + * We don't want to drop a table that may be getting created asynchronously. So we + * need to cap the tables we are going to drop. Fortunately, we have the table + * counter available with us that tells us the name of the last table that was + * created. + */ + currentTableCount = NAME_SUFFIX.get(); + } for (HTableDescriptor table : tables) { String schemaName = SchemaUtil.getSchemaNameFromFullName(table.getName()); if (!QueryConstants.SYSTEM_SCHEMA_NAME.equals(schemaName)) { - disableAndDropTable(admin, table.getTableName()); + if (currentTableCount == null || (!table.getTableName().getNameAsString() + .contains(currentTableCount + ""))) { + disableAndDropTable(admin, table.getTableName(), wait); + } } } - } finally { - admin.close(); } } - private static void disableAndDropTable(final HBaseAdmin admin, final TableName tableName) - throws Exception { + private static void disableAndDropTable(final HBaseAdmin admin, final TableName tableName, + boolean wait) throws Exception { Future<Void> future = null; - boolean success = false; try { - try { - future = dropHTableService.submit(new Callable<Void>() { - @Override - public Void call() throws Exception { - if (admin.isTableEnabled(tableName)) { - admin.disableTable(tableName); - admin.deleteTable(tableName); - } - return null; + future = dropHTableService.submit(new Callable<Void>() { + @Override + public Void call() throws Exception { + if (admin.isTableEnabled(tableName)) { + admin.disableTable(tableName); + admin.deleteTable(tableName); } - }); + return null; + } + }); + if (wait) { future.get(dropTableTimeout, TimeUnit.SECONDS); - success = true; - } catch (TimeoutException e) { - throw new SQLExceptionInfo.Builder(SQLExceptionCode.OPERATION_TIMED_OUT) - .setMessage( - "Not able to disable and delete table " + tableName.getNameAsString() - + " in " + dropTableTimeout + " seconds.").build().buildException(); - } catch (Exception e) { - throw e; - } - } finally { - if (future != null && !success) { - future.cancel(true); } + } catch (TimeoutException e) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.OPERATION_TIMED_OUT) + .setMessage("Not able to disable and delete table " + + tableName.getNameAsString() + " in " + dropTableTimeout + " seconds.") + .build().buildException(); + } catch (Throwable e) { + logger.error("Exception caught when dropping table: " + tableName, e); } }