Repository: phoenix Updated Branches: refs/heads/master dd5642ff5 -> 64658fe5a
Revert "PHOENIX-4170 Remove rebuildIndexOnFailure param from MutableIndexFailureIT" This reverts commit dd5642ff55cbc829765114d0be051cb48081e4a6. Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/64658fe5 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/64658fe5 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/64658fe5 Branch: refs/heads/master Commit: 64658fe5a64e7089f5208ece25769bf644f96846 Parents: dd5642f Author: Samarth Jain <sama...@apache.org> Authored: Wed Sep 6 18:03:17 2017 -0700 Committer: Samarth Jain <sama...@apache.org> Committed: Wed Sep 6 18:03:17 2017 -0700 ---------------------------------------------------------------------- .../end2end/BaseClientManagedTimeIT.java | 2 +- .../phoenix/end2end/BaseHBaseManagedTimeIT.java | 2 +- .../org/apache/phoenix/end2end/NotQueryIT.java | 1 + .../end2end/ParallelStatsDisabledIT.java | 2 +- .../phoenix/end2end/ParallelStatsEnabledIT.java | 2 +- .../end2end/index/MutableIndexFailureIT.java | 82 +++++++++++++------- .../apache/phoenix/rpc/PhoenixServerRpcIT.java | 5 +- .../phoenix/query/QueryServicesOptions.java | 4 +- .../java/org/apache/phoenix/query/BaseTest.java | 73 ++++++++--------- 9 files changed, 98 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/64658fe5/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseClientManagedTimeIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseClientManagedTimeIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseClientManagedTimeIT.java index ee038e6..dee7200 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseClientManagedTimeIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseClientManagedTimeIT.java @@ -74,6 +74,6 @@ public abstract class BaseClientManagedTimeIT extends BaseTest { @AfterClass public static void doTeardown() throws Exception { - dropNonSystemTables(true); + dropNonSystemTables(); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/64658fe5/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseHBaseManagedTimeIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseHBaseManagedTimeIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseHBaseManagedTimeIT.java index 367eb22..7439b1d 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseHBaseManagedTimeIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseHBaseManagedTimeIT.java @@ -59,7 +59,7 @@ public abstract class BaseHBaseManagedTimeIT extends BaseTest { @AfterClass public static void doTeardown() throws Exception { - dropNonSystemTables(true); + dropNonSystemTables(); } @After http://git-wip-us.apache.org/repos/asf/phoenix/blob/64658fe5/phoenix-core/src/it/java/org/apache/phoenix/end2end/NotQueryIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/NotQueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/NotQueryIT.java index 148d14f..9a285ff 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/NotQueryIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/NotQueryIT.java @@ -38,6 +38,7 @@ import java.sql.ResultSet; import java.util.Collection; import java.util.Properties; +import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; import org.junit.Test; import org.junit.runner.RunWith; http://git-wip-us.apache.org/repos/asf/phoenix/blob/64658fe5/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsDisabledIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsDisabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsDisabledIT.java index 0a1fbd8..fb980a3 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsDisabledIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsDisabledIT.java @@ -39,6 +39,6 @@ public abstract class ParallelStatsDisabledIT extends BaseTest { @AfterClass public static void tearDownMiniCluster() throws Exception { - BaseTest.dropNonSystemTables(false); + BaseTest.tearDownMiniClusterIfBeyondThreshold(); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/64658fe5/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsEnabledIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsEnabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsEnabledIT.java index ea2d9db..a62d50d 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsEnabledIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsEnabledIT.java @@ -47,6 +47,6 @@ public abstract class ParallelStatsEnabledIT extends BaseTest { @AfterClass public static void tearDownMiniCluster() throws Exception { - BaseTest.dropNonSystemTables(false); + BaseTest.tearDownMiniClusterIfBeyondThreshold(); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/64658fe5/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java index dbac5a9..1f425cf 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.sql.Connection; +import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; @@ -100,6 +101,7 @@ public class MutableIndexFailureIT extends BaseTest { private final String tableDDLOptions; private final boolean isNamespaceMapped; private final boolean leaveIndexActiveOnFailure; + private final boolean rebuildIndexOnWriteFailure; private final boolean failRebuildTask; private final boolean throwIndexWriteFailure; private String schema = generateUniqueName(); @@ -109,11 +111,12 @@ public class MutableIndexFailureIT extends BaseTest { private static final int disableTimestampThresholdMs = 10000; private static final int numRpcRetries = 2; - public MutableIndexFailureIT(boolean transactional, boolean localIndex, boolean isNamespaceMapped, Boolean disableIndexOnWriteFailure, boolean failRebuildTask, Boolean throwIndexWriteFailure) { + public MutableIndexFailureIT(boolean transactional, boolean localIndex, boolean isNamespaceMapped, Boolean disableIndexOnWriteFailure, Boolean rebuildIndexOnWriteFailure, boolean failRebuildTask, Boolean throwIndexWriteFailure) { this.transactional = transactional; this.localIndex = localIndex; this.tableDDLOptions = " SALT_BUCKETS=2 " + (transactional ? ", TRANSACTIONAL=true " : "") + (disableIndexOnWriteFailure == null ? "" : (", " + PhoenixIndexFailurePolicy.DISABLE_INDEX_ON_WRITE_FAILURE + "=" + disableIndexOnWriteFailure)) + + (rebuildIndexOnWriteFailure == null ? "" : (", " + PhoenixIndexFailurePolicy.REBUILD_INDEX_ON_WRITE_FAILURE + "=" + rebuildIndexOnWriteFailure)) + (throwIndexWriteFailure == null ? "" : (", " + PhoenixIndexFailurePolicy.THROW_INDEX_WRITE_FAILURE + "=" + throwIndexWriteFailure)); this.tableName = FailingRegionObserver.FAIL_TABLE_NAME; this.indexName = "A_" + FailingRegionObserver.FAIL_INDEX_NAME; @@ -121,6 +124,7 @@ public class MutableIndexFailureIT extends BaseTest { this.fullIndexName = SchemaUtil.getTableName(schema, indexName); this.isNamespaceMapped = isNamespaceMapped; this.leaveIndexActiveOnFailure = ! (disableIndexOnWriteFailure == null ? QueryServicesOptions.DEFAULT_INDEX_FAILURE_DISABLE_INDEX : disableIndexOnWriteFailure); + this.rebuildIndexOnWriteFailure = ! Boolean.FALSE.equals(rebuildIndexOnWriteFailure); this.failRebuildTask = failRebuildTask; this.throwIndexWriteFailure = ! Boolean.FALSE.equals(throwIndexWriteFailure); } @@ -157,31 +161,31 @@ public class MutableIndexFailureIT extends BaseTest { indexRebuildTaskRegionEnvironment.getConfiguration()); } - @Parameters(name = "MutableIndexFailureIT_transactional={0},localIndex={1},isNamespaceMapped={2},disableIndexOnWriteFailure={3},failRebuildTask={4},throwIndexWriteFailure={5}") // name is used by failsafe as file name in reports + @Parameters(name = "MutableIndexFailureIT_transactional={0},localIndex={1},isNamespaceMapped={2},disableIndexOnWriteFailure={3},rebuildIndexOnWriteFailure={4},failRebuildTask={5},throwIndexWriteFailure={6}") // name is used by failsafe as file name in reports public static List<Object[]> data() { return Arrays.asList(new Object[][] { - { false, false, false, true, false, false}, - { false, false, true, true, false, null}, - { false, false, true, true, false, true}, - { false, false, false, true, false, null}, - { true, false, false, true, false, null}, - { true, false, true, true, false, null}, - { false, true, true, true, false, null}, - { false, true, false, null, false, null}, - { true, true, false, true, false, null}, - { true, true, true, null, false, null}, + { false, false, false, true, true, false, false}, + { false, false, true, true, true, false, null}, + { false, false, true, true, true, false, true}, + { false, false, false, true, true, false, null}, + { true, false, false, true, true, false, null}, + { true, false, true, true, true, false, null}, + { false, true, true, true, true, false, null}, + { false, true, false, null, null, false, null}, + { true, true, false, true, null, false, null}, + { true, true, true, null, true, false, null}, - { false, false, false, false, false, null}, - { false, true, false, false, false, null}, - { false, false, false, false, false, null}, - { false, false, false, true, false, null}, - { false, false, false, true, false, null}, - { false, true, false, true, false, null}, - { false, true, false, true, false, null}, - { false, false, false, true, true, null}, - { false, false, true, true, true, null}, - { false, false, false, true, true, false}, - { false, false, true, true, true, false}, + { false, false, false, false, true, false, null}, + { false, true, false, false, null, false, null}, + { false, false, false, false, false, false, null}, + { false, false, false, true, true, false, null}, + { false, false, false, true, true, false, null}, + { false, true, false, true, true, false, null}, + { false, true, false, true, true, false, null}, + { false, false, false, true, true, true, null}, + { false, false, true, true, true, true, null}, + { false, false, false, true, true, true, false}, + { false, false, true, true, true, true, false}, } ); } @@ -336,9 +340,15 @@ public class MutableIndexFailureIT extends BaseTest { if (!failRebuildTask) { // re-enable index table FailingRegionObserver.FAIL_WRITE = false; - runRebuildTask(conn); - // wait for index to be rebuilt automatically - checkStateAfterRebuild(conn, fullIndexName, PIndexState.ACTIVE); + if (rebuildIndexOnWriteFailure) { + runRebuildTask(conn); + // wait for index to be rebuilt automatically + checkStateAfterRebuild(conn, fullIndexName, PIndexState.ACTIVE); + } else { + // simulate replaying failed mutation + replayMutations(); + } + // Verify UPSERT on data table still works after index table is caught up PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)"); stmt.setString(1, "a3"); @@ -439,7 +449,25 @@ public class MutableIndexFailureIT extends BaseTest { assertFalse(rs.next()); } } - + + private void replayMutations() throws SQLException { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + for (int i = 0; i < exceptions.size(); i++) { + CommitException e = exceptions.get(i); + long ts = e.getServerTimestamp(); + props.setProperty(PhoenixRuntime.REPLAY_AT_ATTRIB, Long.toString(ts)); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + if (i == 0) { + updateTable(conn, false); + } else if (i == 1) { + updateTableAgain(conn, false); + } else { + fail(); + } + } + } + } + private void updateTable(Connection conn, boolean commitShouldFail) throws SQLException { PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)"); // Insert new row http://git-wip-us.apache.org/repos/asf/phoenix/blob/64658fe5/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java index 1c18667..d8a9ed4 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java @@ -143,9 +143,8 @@ public class PhoenixServerRpcIT extends BaseUniqueNamesOwnClusterIT { TestPhoenixIndexRpcSchedulerFactory.reset(); createIndex(conn, indexName + "_1"); - // Verify that that index queue is not used since running upsert select on server side has been disabled - // See PHOENIX-4171 - Mockito.verify(TestPhoenixIndexRpcSchedulerFactory.getIndexRpcExecutor(), Mockito.never()).dispatch(Mockito.any(CallRunner.class)); + // verify that that index queue is used and only once (during Upsert Select on server to build the index) + Mockito.verify(TestPhoenixIndexRpcSchedulerFactory.getIndexRpcExecutor()).dispatch(Mockito.any(CallRunner.class)); } finally { conn.close(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/64658fe5/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index feaf5dd..4ff65db 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -329,8 +329,8 @@ public class QueryServicesOptions { // 4.10, psql and CSVBulkLoad // expects binary data to be base 64 // encoded - // RS -> RS calls for upsert select statements are disabled by default - public static final boolean DEFAULT_ENABLE_SERVER_UPSERT_SELECT = false; + // RS -> RS calls for upsert select statements are enabled by default + public static final boolean DEFAULT_ENABLE_SERVER_UPSERT_SELECT = true; private final Configuration config; http://git-wip-us.apache.org/repos/asf/phoenix/blob/64658fe5/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java index 65b5ab0..fe6d847 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java @@ -175,9 +175,8 @@ public abstract class BaseTest { private static final int dropTableTimeout = 300; // 5 mins should be long enough. private static final ThreadFactory factory = new ThreadFactoryBuilder().setDaemon(true) .setNameFormat("DROP-TABLE-BASETEST" + "-thread-%s").build(); - private static final int DROP_HTABLE_SERVICE_THREADS = 10; private static final ExecutorService dropHTableService = Executors - .newFixedThreadPool(DROP_HTABLE_SERVICE_THREADS, factory); + .newSingleThreadExecutor(factory); static { ImmutableMap.Builder<String,String> builder = ImmutableMap.builder(); @@ -474,9 +473,9 @@ public abstract class BaseTest { } } - protected static void dropNonSystemTables(boolean wait) throws Exception { + protected static void dropNonSystemTables() throws Exception { try { - disableAndDropNonSystemTables(wait); + disableAndDropNonSystemTables(); } finally { destroyDriver(); } @@ -1487,56 +1486,52 @@ public abstract class BaseTest { /** * Disable and drop all the tables except SYSTEM.CATALOG and SYSTEM.SEQUENCE */ - private static void disableAndDropNonSystemTables(boolean wait) throws Exception { + private static void disableAndDropNonSystemTables() throws Exception { if (driver == null) return; - try (HBaseAdmin admin = driver.getConnectionQueryServices(null, null).getAdmin()) { + HBaseAdmin admin = driver.getConnectionQueryServices(null, null).getAdmin(); + try { HTableDescriptor[] tables = admin.listTables(); - Integer currentTableCount = null; - if (!wait) { - /* - * We don't want to drop a table that may be getting created asynchronously. So we - * need to cap the tables we are going to drop. Fortunately, we have the table - * counter available with us that tells us the name of the last table that was - * created. - */ - currentTableCount = NAME_SUFFIX.get(); - } for (HTableDescriptor table : tables) { String schemaName = SchemaUtil.getSchemaNameFromFullName(table.getName()); if (!QueryConstants.SYSTEM_SCHEMA_NAME.equals(schemaName)) { - if (currentTableCount == null || (!table.getTableName().getNameAsString() - .contains(currentTableCount + ""))) { - disableAndDropTable(admin, table.getTableName(), wait); - } + disableAndDropTable(admin, table.getTableName()); } } + } finally { + admin.close(); } } - private static void disableAndDropTable(final HBaseAdmin admin, final TableName tableName, - boolean wait) throws Exception { + private static void disableAndDropTable(final HBaseAdmin admin, final TableName tableName) + throws Exception { Future<Void> future = null; + boolean success = false; try { - future = dropHTableService.submit(new Callable<Void>() { - @Override - public Void call() throws Exception { - if (admin.isTableEnabled(tableName)) { - admin.disableTable(tableName); - admin.deleteTable(tableName); + try { + future = dropHTableService.submit(new Callable<Void>() { + @Override + public Void call() throws Exception { + if (admin.isTableEnabled(tableName)) { + admin.disableTable(tableName); + admin.deleteTable(tableName); + } + return null; } - return null; - } - }); - if (wait) { + }); future.get(dropTableTimeout, TimeUnit.SECONDS); + success = true; + } catch (TimeoutException e) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.OPERATION_TIMED_OUT) + .setMessage( + "Not able to disable and delete table " + tableName.getNameAsString() + + " in " + dropTableTimeout + " seconds.").build().buildException(); + } catch (Exception e) { + throw e; + } + } finally { + if (future != null && !success) { + future.cancel(true); } - } catch (TimeoutException e) { - throw new SQLExceptionInfo.Builder(SQLExceptionCode.OPERATION_TIMED_OUT) - .setMessage("Not able to disable and delete table " - + tableName.getNameAsString() + " in " + dropTableTimeout + " seconds.") - .build().buildException(); - } catch (Throwable e) { - logger.error("Exception caught when dropping table: " + tableName, e); } }