Repository: phoenix Updated Branches: refs/heads/4.8-HBase-0.98 d9a54cf84 -> c8f2f48df
PHOENIX-3237 Automatic rebuild of disabled index will fail if indexes of two tables are disabled at the same time Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/c8f2f48d Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/c8f2f48d Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/c8f2f48d Branch: refs/heads/4.8-HBase-0.98 Commit: c8f2f48dfcb574168f8ac1534e74792279b54b73 Parents: d9a54cf Author: Ankit Singhal <ankitsingha...@gmail.com> Authored: Wed Sep 14 13:51:44 2016 +0530 Committer: Ankit Singhal <ankitsingha...@gmail.com> Committed: Wed Sep 14 13:51:44 2016 +0530 ---------------------------------------------------------------------- .../end2end/index/MutableIndexFailureIT.java | 239 +++++++++++-------- .../coprocessor/MetaDataRegionObserver.java | 6 +- 2 files changed, 143 insertions(+), 102 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8f2f48d/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java index 8870d8f..e3d4c75 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java @@ -49,7 +49,6 @@ import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PTableType; -import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.ReadOnlyProps; @@ -128,6 +127,9 @@ public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT { } public void helpTestWriteFailureDisablesIndex() throws Exception { + String secondTableName = fullTableName + "_2"; + String secondIndexName = indexName + "_2"; + String secondFullIndexName = fullIndexName + "_2"; Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); props.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, String.valueOf(isNamespaceMapped)); try (Connection conn = driver.connect(url, props)) { @@ -139,6 +141,8 @@ public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT { } conn.createStatement().execute("CREATE TABLE " + fullTableName + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) " + tableDDLOptions); + conn.createStatement().execute("CREATE TABLE " + secondTableName + + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) " + tableDDLOptions); query = "SELECT * FROM " + fullTableName; rs = conn.createStatement().executeQuery(query); assertFalse(rs.next()); @@ -146,34 +150,26 @@ public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT { FAIL_WRITE = false; conn.createStatement().execute( "CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)"); + conn.createStatement().execute( + "CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + secondIndexName + " ON " + secondTableName + " (v1) INCLUDE (v2)"); query = "SELECT * FROM " + fullIndexName; rs = conn.createStatement().executeQuery(query); assertFalse(rs.next()); // Verify the metadata for index is correct. - rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schema), indexName, + rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schema), indexName+"%", new String[] { PTableType.INDEX.toString() }); assertTrue(rs.next()); assertEquals(indexName, rs.getString(3)); assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE")); + assertTrue(rs.next()); + assertEquals(secondIndexName, rs.getString(3)); + assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE")); assertFalse(rs.next()); - - PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)"); - stmt.setString(1, "a"); - stmt.setString(2, "x"); - stmt.setString(3, "1"); - stmt.execute(); - stmt.setString(1, "b"); - stmt.setString(2, "y"); - stmt.setString(3, "2"); - stmt.execute(); - stmt.setString(1, "c"); - stmt.setString(2, "z"); - stmt.setString(3, "3"); - stmt.execute(); - conn.commit(); - + initializeTable(conn, fullTableName); + initializeTable(conn, secondTableName); + query = "SELECT /*+ NO_INDEX */ k,v1 FROM " + fullTableName; rs = conn.createStatement().executeQuery("EXPLAIN " + query); String expectedPlan = "CLIENT PARALLEL 1-WAY FULL SCAN OVER " @@ -192,31 +188,8 @@ public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT { assertFalse(rs.next()); FAIL_WRITE = true; - - stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)"); - // Insert new row - stmt.setString(1, "d"); - stmt.setString(2, "d"); - stmt.setString(3, "4"); - stmt.execute(); - // Update existing row - stmt.setString(1, "a"); - stmt.setString(2, "x2"); - stmt.setString(3, "2"); - stmt.execute(); - // Delete existing row - stmt = conn.prepareStatement("DELETE FROM " + fullTableName + " WHERE k=?"); - stmt.setString(1, "b"); - stmt.execute(); - try { - conn.commit(); - fail(); - } catch (SQLException e) { - System.out.println(); - } catch(Exception e) { - System.out.println(); - } - + updateTable(conn, fullTableName); + updateTable(conn, secondTableName); // Verify the metadata for index is correct. rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schema), indexName, new String[] { PTableType.INDEX.toString() }); @@ -236,13 +209,18 @@ public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT { // would have succeeded while the index writes would have failed. if (!transactional) { // Verify UPSERT on data table still work after index is disabled - stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)"); + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)"); + stmt.setString(1, "a3"); + stmt.setString(2, "x3"); + stmt.setString(3, "3"); + stmt.execute(); + conn.commit(); + stmt = conn.prepareStatement("UPSERT INTO " + secondTableName + " VALUES(?,?,?)"); stmt.setString(1, "a3"); stmt.setString(2, "x3"); stmt.setString(3, "3"); stmt.execute(); conn.commit(); - // Verify previous writes succeeded to data table query = "SELECT /*+ NO_INDEX */ k,v1 FROM " + fullTableName; rs = conn.createStatement().executeQuery("EXPLAIN " + query); @@ -267,25 +245,18 @@ public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT { // re-enable index table FAIL_WRITE = false; - - boolean isActive = false; - if (!transactional) { - int maxTries = 3, nTries = 0; - do { - Thread.sleep(15 * 1000); // sleep 15 secs - rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schema), indexName, - new String[] { PTableType.INDEX.toString() }); - assertTrue(rs.next()); - if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){ - isActive = true; - break; - } - } while(++nTries < maxTries); - assertTrue(isActive); - } + waitForIndexToBeActive(conn,indexName); + waitForIndexToBeActive(conn,secondIndexName); // Verify UPSERT on data table still work after index table is recreated - stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)"); + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)"); + stmt.setString(1, "a3"); + stmt.setString(2, "x4"); + stmt.setString(3, "4"); + stmt.execute(); + conn.commit(); + + stmt = conn.prepareStatement("UPSERT INTO " + secondTableName + " VALUES(?,?,?)"); stmt.setString(1, "a3"); stmt.setString(2, "x4"); stmt.setString(3, "4"); @@ -293,48 +264,116 @@ public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT { conn.commit(); // verify index table has correct data - query = "SELECT /*+ INDEX(" + indexName + ") */ k,v1 FROM " + fullTableName; - rs = conn.createStatement().executeQuery("EXPLAIN " + query); - expectedPlan = " OVER " - + (localIndex - ? Bytes.toString(SchemaUtil - .getPhysicalTableName(fullTableName.getBytes(), isNamespaceMapped).getName()) - : SchemaUtil.getPhysicalTableName(fullIndexName.getBytes(), isNamespaceMapped).getNameAsString()); - String explainPlan = QueryUtil.getExplainPlan(rs); - assertTrue(explainPlan.contains(expectedPlan)); - rs = conn.createStatement().executeQuery(query); - if (transactional) { // failed commit does not get retried - assertTrue(rs.next()); - assertEquals("a", rs.getString(1)); - assertEquals("x", rs.getString(2)); - assertTrue(rs.next()); - assertEquals("a3", rs.getString(1)); - assertEquals("x4", rs.getString(2)); - assertTrue(rs.next()); - assertEquals("b", rs.getString(1)); - assertEquals("y", rs.getString(2)); - assertTrue(rs.next()); - assertEquals("c", rs.getString(1)); - assertEquals("z", rs.getString(2)); - assertFalse(rs.next()); - } else { // failed commit eventually succeeds - assertTrue(rs.next()); - assertEquals("d", rs.getString(1)); - assertEquals("d", rs.getString(2)); - assertTrue(rs.next()); - assertEquals("a", rs.getString(1)); - assertEquals("x2", rs.getString(2)); - assertTrue(rs.next()); - assertEquals("a3", rs.getString(1)); - assertEquals("x4", rs.getString(2)); + validateDataWithIndex(conn, fullTableName, fullIndexName); + validateDataWithIndex(conn, secondTableName, secondFullIndexName); + } + } + + private void waitForIndexToBeActive(Connection conn, String index) throws InterruptedException, SQLException { + boolean isActive = false; + if (!transactional) { + int maxTries = 4, nTries = 0; + do { + Thread.sleep(15 * 1000); // sleep 15 secs + ResultSet rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schema), index, + new String[] { PTableType.INDEX.toString() }); assertTrue(rs.next()); - assertEquals("c", rs.getString(1)); - assertEquals("z", rs.getString(2)); - assertFalse(rs.next()); - } + if (PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))) { + isActive = true; + break; + } + } while (++nTries < maxTries); + assertTrue(isActive); + } + } + + private void initializeTable(Connection conn, String tableName) throws SQLException { + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES(?,?,?)"); + stmt.setString(1, "a"); + stmt.setString(2, "x"); + stmt.setString(3, "1"); + stmt.execute(); + stmt.setString(1, "b"); + stmt.setString(2, "y"); + stmt.setString(3, "2"); + stmt.execute(); + stmt.setString(1, "c"); + stmt.setString(2, "z"); + stmt.setString(3, "3"); + stmt.execute(); + conn.commit(); + + } + + private void validateDataWithIndex(Connection conn, String tableName, String indexName) throws SQLException { + String query = "SELECT /*+ INDEX(" + indexName + ") */ k,v1 FROM " + tableName; + ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + query); + String expectedPlan = " OVER " + + (localIndex + ? Bytes.toString( + SchemaUtil.getPhysicalTableName(tableName.getBytes(), isNamespaceMapped).getName()) + : SchemaUtil.getPhysicalTableName(indexName.getBytes(), isNamespaceMapped).getNameAsString()); + String explainPlan = QueryUtil.getExplainPlan(rs); + assertTrue(explainPlan.contains(expectedPlan)); + rs = conn.createStatement().executeQuery(query); + if (transactional) { // failed commit does not get retried + assertTrue(rs.next()); + assertEquals("a", rs.getString(1)); + assertEquals("x", rs.getString(2)); + assertTrue(rs.next()); + assertEquals("a3", rs.getString(1)); + assertEquals("x4", rs.getString(2)); + assertTrue(rs.next()); + assertEquals("b", rs.getString(1)); + assertEquals("y", rs.getString(2)); + assertTrue(rs.next()); + assertEquals("c", rs.getString(1)); + assertEquals("z", rs.getString(2)); + assertFalse(rs.next()); + } else { // failed commit eventually succeeds + assertTrue(rs.next()); + assertEquals("d", rs.getString(1)); + assertEquals("d", rs.getString(2)); + assertTrue(rs.next()); + assertEquals("a", rs.getString(1)); + assertEquals("x2", rs.getString(2)); + assertTrue(rs.next()); + assertEquals("a3", rs.getString(1)); + assertEquals("x4", rs.getString(2)); + assertTrue(rs.next()); + assertEquals("c", rs.getString(1)); + assertEquals("z", rs.getString(2)); + assertFalse(rs.next()); } } + private void updateTable(Connection conn, String tableName) throws SQLException { + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES(?,?,?)"); + // Insert new row + stmt.setString(1, "d"); + stmt.setString(2, "d"); + stmt.setString(3, "4"); + stmt.execute(); + // Update existing row + stmt.setString(1, "a"); + stmt.setString(2, "x2"); + stmt.setString(3, "2"); + stmt.execute(); + // Delete existing row + stmt = conn.prepareStatement("DELETE FROM " + tableName + " WHERE k=?"); + stmt.setString(1, "b"); + stmt.execute(); + try { + conn.commit(); + fail(); + } catch (SQLException e) { + System.out.println(); + } catch (Exception e) { + System.out.println(); + } + + } + public static class FailingRegionObserver extends SimpleRegionObserver { @Override public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException { http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8f2f48d/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java index 5e60e56..66a5dcb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java @@ -257,9 +257,11 @@ public class MetaDataRegionObserver extends BaseRegionObserver { PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES); byte[] indexStat = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES); - if ((dataTable == null || dataTable.length == 0) - || (indexStat == null || indexStat.length == 0)) { + if ((dataTable == null || dataTable.length == 0) || (indexStat == null || indexStat.length == 0) + || (dataPTable != null + && Bytes.compareTo(dataPTable.getName().getBytes(), dataTable) != 0)) { // data table name can't be empty + // we need to build indexes of same data table. so skip other indexes for this task. continue; }