PHOENIX-2890 Extend IndexTool to allow incremental index rebuilds
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/068c1cd9 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/068c1cd9 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/068c1cd9 Branch: refs/heads/4.x-HBase-1.1 Commit: 068c1cd96db6b46e80721cb66be9263e0565f5d1 Parents: c1b8d79 Author: Ankit Singhal <ankitsingha...@gmail.com> Authored: Mon Dec 26 12:24:19 2016 +0530 Committer: Ankit Singhal <ankitsingha...@gmail.com> Committed: Mon Dec 26 12:24:19 2016 +0530 ---------------------------------------------------------------------- .../phoenix/end2end/AutomaticRebuildIT.java | 219 +++++++++ .../end2end/IndexToolForPartialBuildIT.java | 298 ++++++++++++ ...olForPartialBuildWithNamespaceEnabledIT.java | 70 +++ .../phoenix/end2end/index/IndexMetadataIT.java | 58 +++ .../end2end/index/MutableIndexFailureIT.java | 8 +- phoenix-core/src/main/antlr3/PhoenixSQL.g | 4 +- .../coprocessor/MetaDataEndpointImpl.java | 9 +- .../coprocessor/MetaDataRegionObserver.java | 291 +++++++----- .../phoenix/exception/SQLExceptionCode.java | 3 +- .../index/PhoenixIndexFailurePolicy.java | 52 +-- .../phoenix/jdbc/PhoenixDatabaseMetaData.java | 2 + .../apache/phoenix/jdbc/PhoenixStatement.java | 8 +- .../phoenix/mapreduce/index/IndexTool.java | 455 +++++++++++++------ .../phoenix/mapreduce/index/IndexToolUtil.java | 6 +- .../index/PhoenixIndexImportDirectMapper.java | 2 +- .../index/PhoenixIndexPartialBuildMapper.java | 182 ++++++++ .../util/PhoenixConfigurationUtil.java | 31 ++ .../phoenix/parse/AlterIndexStatement.java | 8 +- .../apache/phoenix/parse/ParseNodeFactory.java | 6 +- .../org/apache/phoenix/query/QueryServices.java | 4 + .../apache/phoenix/schema/MetaDataClient.java | 47 +- .../java/org/apache/phoenix/util/IndexUtil.java | 61 ++- 22 files changed, 1503 insertions(+), 321 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/068c1cd9/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutomaticRebuildIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutomaticRebuildIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutomaticRebuildIT.java new file mode 100644 index 0000000..cbb7745 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutomaticRebuildIT.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver; +import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; +import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.PIndexState; +import org.apache.phoenix.schema.PTableType; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.SchemaUtil; +import org.apache.phoenix.util.StringUtil; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import com.google.common.collect.Maps; + +/** + * Tests for the {@link AutomaticRebuildIT} + */ +@RunWith(Parameterized.class) +public class AutomaticRebuildIT extends BaseOwnClusterIT { + + private final boolean localIndex; + protected boolean isNamespaceEnabled = false; + protected final String tableDDLOptions; + + public AutomaticRebuildIT(boolean localIndex) { + this.localIndex = localIndex; + StringBuilder optionBuilder = new StringBuilder(); + optionBuilder.append(" SPLIT ON(1,2)"); + this.tableDDLOptions = optionBuilder.toString(); + } + + @BeforeClass + public static void doSetup() throws Exception { + Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(7); + serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS); + serverProps.put("hbase.coprocessor.region.classes", FailingRegionObserver.class.getName()); + serverProps.put(" yarn.scheduler.capacity.maximum-am-resource-percent", "1.0"); + serverProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2"); + serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000"); + serverProps.put("hbase.client.pause", "5000"); + serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_PERIOD, "1000"); + serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_NUMBER_OF_BATCHES_PER_TABLE, "5"); + Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1); + setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), + new ReadOnlyProps(clientProps.entrySet().iterator())); + } + + @Parameters(name = "localIndex = {0}") + public static Collection<Boolean[]> data() { + return Arrays.asList(new Boolean[][] { { false }, { true } }); + } + + @Test + public void testSecondaryAutomaticRebuildIndex() throws Exception { + String schemaName = generateUniqueName(); + String dataTableName = generateUniqueName(); + String fullTableName = SchemaUtil.getTableName(schemaName, dataTableName); + final String indxTable = String.format("%s_%s", dataTableName, FailingRegionObserver.INDEX_NAME); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + props.setProperty(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString()); + props.setProperty(QueryServices.EXPLAIN_ROW_COUNT_ATTRIB, Boolean.FALSE.toString()); + props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(isNamespaceEnabled)); + final Connection conn = DriverManager.getConnection(getUrl(), props); + Statement stmt = conn.createStatement(); + try { + if (isNamespaceEnabled) { + conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName); + } + stmt.execute(String.format( + "CREATE TABLE %s (ID BIGINT NOT NULL, NAME VARCHAR, ZIP INTEGER CONSTRAINT PK PRIMARY KEY(ID ROW_TIMESTAMP)) %s", + fullTableName, tableDDLOptions)); + String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", fullTableName); + PreparedStatement stmt1 = conn.prepareStatement(upsertQuery); + FailingRegionObserver.FAIL_WRITE = false; + // insert two rows + upsertRow(stmt1, 1000); + upsertRow(stmt1, 2000); + + conn.commit(); + stmt.execute(String.format("CREATE %s INDEX %s ON %s (LPAD(UPPER(NAME),11,'x')||'_xyz') ", + (localIndex ? "LOCAL" : ""), indxTable, fullTableName)); + FailingRegionObserver.FAIL_WRITE = true; + upsertRow(stmt1, 3000); + upsertRow(stmt1, 4000); + upsertRow(stmt1, 5000); + try { + conn.commit(); + fail(); + } catch (SQLException e) { + } catch (Exception e) { + } + FailingRegionObserver.FAIL_WRITE = false; + ResultSet rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schemaName), indxTable, + new String[] { PTableType.INDEX.toString() }); + assertTrue(rs.next()); + assertEquals(indxTable, rs.getString(3)); + String indexState = rs.getString("INDEX_STATE"); + assertEquals(PIndexState.DISABLE.toString(), indexState); + assertFalse(rs.next()); + upsertRow(stmt1, 6000); + upsertRow(stmt1, 7000); + conn.commit(); + int maxTries = 4, nTries = 0; + boolean isInactive = false; + do { + rs = conn.createStatement() + .executeQuery(String.format("SELECT " + PhoenixDatabaseMetaData.INDEX_STATE + "," + + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " FROM " + + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + " (" + + PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + " bigint) where " + + PhoenixDatabaseMetaData.TABLE_SCHEM + "='" + schemaName + "' and " + + PhoenixDatabaseMetaData.TABLE_NAME + "='" + indxTable + "'")); + rs.next(); + if (PIndexState.INACTIVE.getSerializedValue().equals(rs.getString(1)) && rs.getLong(2) > 3000) { + isInactive = true; + break; + } + Thread.sleep(10 * 1000); // sleep 10 secs + } while (++nTries < maxTries); + assertTrue(isInactive); + nTries = 0; + boolean isActive = false; + do { + Thread.sleep(15 * 1000); // sleep 15 secs + rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schemaName), indxTable, + new String[] { PTableType.INDEX.toString() }); + assertTrue(rs.next()); + if (PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))) { + isActive = true; + break; + } + } while (++nTries < maxTries); + assertTrue(isActive); + + } finally { + conn.close(); + } + } + + public static void upsertRow(PreparedStatement stmt, int i) throws SQLException { + // insert row + stmt.setInt(1, i); + stmt.setString(2, "uname" + String.valueOf(i)); + stmt.setInt(3, 95050 + i); + stmt.executeUpdate(); + } + + public static class FailingRegionObserver extends SimpleRegionObserver { + public static volatile boolean FAIL_WRITE = false; + public static final String INDEX_NAME = "IDX"; + + @Override + public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, + MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException { + if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().contains(INDEX_NAME) && FAIL_WRITE) { + throw new DoNotRetryIOException(); + } + Mutation operation = miniBatchOp.getOperation(0); + Set<byte[]> keySet = operation.getFamilyMap().keySet(); + for (byte[] family : keySet) { + if (Bytes.toString(family).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX) && FAIL_WRITE) { + throw new DoNotRetryIOException(); + } + } + } + + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/068c1cd9/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java new file mode 100644 index 0000000..116c47f --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver; +import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; +import org.apache.phoenix.mapreduce.index.IndexTool; +import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.PIndexState; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableType; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.SchemaUtil; +import org.apache.phoenix.util.StringUtil; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +/** + * Tests for the {@link IndexToolForPartialBuildIT} + */ +@RunWith(Parameterized.class) +public class IndexToolForPartialBuildIT extends BaseOwnClusterIT { + + private final boolean localIndex; + protected boolean isNamespaceEnabled = false; + protected final String tableDDLOptions; + + public IndexToolForPartialBuildIT(boolean localIndex) { + + this.localIndex = localIndex; + StringBuilder optionBuilder = new StringBuilder(); + optionBuilder.append(" SPLIT ON(1,2)"); + this.tableDDLOptions = optionBuilder.toString(); + } + + @BeforeClass + public static void doSetup() throws Exception { + Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(7); + serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS); + serverProps.put("hbase.coprocessor.region.classes", FailingRegionObserver.class.getName()); + serverProps.put(" yarn.scheduler.capacity.maximum-am-resource-percent", "1.0"); + serverProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2"); + serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000"); + serverProps.put("hbase.client.pause", "5000"); + serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, Boolean.FALSE.toString()); + Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1); + setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator())); + } + + @Parameters(name="localIndex = {0}") + public static Collection<Boolean[]> data() { + return Arrays.asList(new Boolean[][] { + { false},{ true } + }); + } + + @Test + public void testSecondaryIndex() throws Exception { + String schemaName = generateUniqueName(); + String dataTableName = generateUniqueName(); + String fullTableName = SchemaUtil.getTableName(schemaName, dataTableName); + final String indxTable = String.format("%s_%s", dataTableName, FailingRegionObserver.INDEX_NAME); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + props.setProperty(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString()); + props.setProperty(QueryServices.EXPLAIN_ROW_COUNT_ATTRIB, Boolean.FALSE.toString()); + props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(isNamespaceEnabled)); + final Connection conn = DriverManager.getConnection(getUrl(), props); + Statement stmt = conn.createStatement(); + try { + if (isNamespaceEnabled) { + conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName); + } + stmt.execute( + String.format("CREATE TABLE %s (ID BIGINT NOT NULL, NAME VARCHAR, ZIP INTEGER CONSTRAINT PK PRIMARY KEY(ID ROW_TIMESTAMP)) %s", + fullTableName, tableDDLOptions)); + String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", fullTableName); + PreparedStatement stmt1 = conn.prepareStatement(upsertQuery); + FailingRegionObserver.FAIL_WRITE = false; + // insert two rows + upsertRow(stmt1, 1000); + upsertRow(stmt1, 2000); + + conn.commit(); + stmt.execute(String.format("CREATE %s INDEX %s ON %s (LPAD(UPPER(NAME),11,'x')||'_xyz') ", + (localIndex ? "LOCAL" : ""), indxTable, fullTableName)); + FailingRegionObserver.FAIL_WRITE = true; + upsertRow(stmt1, 3000); + upsertRow(stmt1, 4000); + upsertRow(stmt1, 5000); + try { + conn.commit(); + fail(); + } catch (SQLException e) {} catch (Exception e) {} + conn.createStatement() + .execute(String.format("ALTER INDEX %s on %s REBUILD ASYNC", indxTable, fullTableName)); + + FailingRegionObserver.FAIL_WRITE = false; + ResultSet rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schemaName), indxTable, + new String[] { PTableType.INDEX.toString() }); + assertTrue(rs.next()); + assertEquals(indxTable, rs.getString(3)); + String indexState = rs.getString("INDEX_STATE"); + assertEquals(PIndexState.BUILDING.toString(), indexState); + assertFalse(rs.next()); + upsertRow(stmt1, 6000); + upsertRow(stmt1, 7000); + conn.commit(); + + rs = conn.createStatement() + .executeQuery(String.format("SELECT " + PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + "," + + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " FROM " + + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + " (" + + PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + " bigint) where " + + PhoenixDatabaseMetaData.TABLE_SCHEM + "='" + schemaName + "' and " + + PhoenixDatabaseMetaData.TABLE_NAME + "='" + indxTable + "'")); + rs.next(); + PTable pindexTable = PhoenixRuntime.getTable(conn, SchemaUtil.getTableName(schemaName, indxTable)); + assertEquals(PIndexState.BUILDING, pindexTable.getIndexState()); + assertEquals(rs.getLong(1), pindexTable.getTimeStamp()); + //assert disabled timestamp + assertEquals(rs.getLong(2), 3000); + + String selectSql = String.format("SELECT LPAD(UPPER(NAME),11,'x')||'_xyz',ID FROM %s", fullTableName); + rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql); + String actualExplainPlan = QueryUtil.getExplainPlan(rs); + + // assert we are pulling from data table. + assertExplainPlan(actualExplainPlan, schemaName, dataTableName, null, false, isNamespaceEnabled); + + rs = stmt1.executeQuery(selectSql); + for (int i = 1; i <= 7; i++) { + assertTrue(rs.next()); + assertEquals("xxUNAME" + i*1000 + "_xyz", rs.getString(1)); + } + + // Validate Index table data till disabled timestamp + rs = stmt1.executeQuery(String.format("SELECT * FROM %s", SchemaUtil.getTableName(schemaName, indxTable))); + for (int i = 1; i <= 2; i++) { + assertTrue(rs.next()); + assertEquals("xxUNAME" + i*1000 + "_xyz", rs.getString(1)); + } + assertFalse(rs.next()); + // run the index MR job. + final IndexTool indexingTool = new IndexTool(); + Configuration conf = new Configuration(getUtility().getConfiguration()); + conf.set(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(isNamespaceEnabled)); + indexingTool.setConf(conf); + + final String[] cmdArgs = getArgValues(schemaName, dataTableName); + int status = indexingTool.run(cmdArgs); + assertEquals(0, status); + + // insert two more rows + upsertRow(stmt1, 8000); + upsertRow(stmt1, 9000); + conn.commit(); + + // assert we are pulling from index table. + rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql); + actualExplainPlan = QueryUtil.getExplainPlan(rs); + assertExplainPlan(actualExplainPlan, schemaName, dataTableName, indxTable, localIndex, isNamespaceEnabled); + + rs = stmt.executeQuery(selectSql); + + for (int i = 1; i <= 9; i++) { + assertTrue(rs.next()); + assertEquals("xxUNAME" + i*1000 + "_xyz", rs.getString(1)); + } + + assertFalse(rs.next()); + + // conn.createStatement().execute(String.format("DROP INDEX %s ON %s", indxTable, fullTableName)); + } finally { + conn.close(); + } + } + + public static void assertExplainPlan(final String actualExplainPlan, String schemaName, String dataTable, + String indxTable, boolean isLocal, boolean isNamespaceMapped) { + + String expectedExplainPlan = ""; + if (indxTable != null) { + if (isLocal) { + final String localIndexName = SchemaUtil + .getPhysicalHBaseTableName(SchemaUtil.getTableName(schemaName, dataTable), isNamespaceMapped, + PTableType.INDEX) + .getString(); + expectedExplainPlan = String.format("CLIENT PARALLEL 3-WAY RANGE SCAN OVER %s [1]", localIndexName); + } else { + expectedExplainPlan = String.format("CLIENT PARALLEL 1-WAY FULL SCAN OVER %s", + SchemaUtil.getPhysicalHBaseTableName(SchemaUtil.getTableName(schemaName, indxTable), + isNamespaceMapped, PTableType.INDEX)); + } + } else { + expectedExplainPlan = String.format("CLIENT PARALLEL 1-WAY FULL SCAN OVER %s", + SchemaUtil.getPhysicalHBaseTableName(SchemaUtil.getTableName(schemaName, dataTable), + isNamespaceMapped, PTableType.TABLE)); + } + assertTrue(actualExplainPlan.contains(expectedExplainPlan)); + } + + public String[] getArgValues(String schemaName, String dataTable) { + final List<String> args = Lists.newArrayList(); + if (schemaName!=null) { + args.add("-s"); + args.add(schemaName); + } + args.add("-dt"); + args.add(dataTable); + args.add("-pr"); + args.add("-op"); + args.add("/tmp/output/partialTable_"+localIndex); + return args.toArray(new String[0]); + } + + public static void upsertRow(PreparedStatement stmt, int i) throws SQLException { + // insert row + stmt.setInt(1, i); + stmt.setString(2, "uname" + String.valueOf(i)); + stmt.setInt(3, 95050 + i); + stmt.executeUpdate(); + } + + + public static class FailingRegionObserver extends SimpleRegionObserver { + public static volatile boolean FAIL_WRITE = false; + public static final String INDEX_NAME = "IDX"; + @Override + public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException { + if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().contains(INDEX_NAME) && FAIL_WRITE) { + throw new DoNotRetryIOException(); + } + Mutation operation = miniBatchOp.getOperation(0); + Set<byte[]> keySet = operation.getFamilyMap().keySet(); + for(byte[] family: keySet) { + if(Bytes.toString(family).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX) && FAIL_WRITE) { + throw new DoNotRetryIOException(); + } + } + } + + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/068c1cd9/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java new file mode 100644 index 0000000..4b2371c --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.BeforeClass; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import com.google.common.collect.Maps; + +/** + * Tests for the {@link IndexToolForPartialBuildWithNamespaceEnabled} + */ +@RunWith(Parameterized.class) +public class IndexToolForPartialBuildWithNamespaceEnabledIT extends IndexToolForPartialBuildIT { + + + public IndexToolForPartialBuildWithNamespaceEnabledIT(boolean localIndex, boolean isNamespaceEnabled) { + super(localIndex); + this.isNamespaceEnabled=isNamespaceEnabled; + } + + @BeforeClass + public static void doSetup() throws Exception { + Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(7); + serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS); + serverProps.put("hbase.coprocessor.region.classes", FailingRegionObserver.class.getName()); + serverProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2"); + serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000"); + serverProps.put("hbase.client.pause", "5000"); + serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_PERIOD, "2000"); + serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, "1000"); + serverProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, "true"); + Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1); + clientProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, "true"); + setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator())); + } + + @Parameters(name="localIndex = {0} , isNamespaceEnabled = {1}") + public static Collection<Boolean[]> data() { + return Arrays.asList(new Boolean[][] { + { false, true},{ true, false } + }); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/068c1cd9/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java index f0c670b..63a6bd6 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java @@ -31,6 +31,7 @@ import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Statement; import java.sql.Types; import java.util.Properties; @@ -43,10 +44,13 @@ import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.AmbiguousColumnException; import org.apache.phoenix.schema.ColumnAlreadyExistsException; import org.apache.phoenix.schema.PIndexState; +import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.types.PDate; +import org.apache.phoenix.schema.types.PLong; +import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.StringUtil; @@ -216,6 +220,15 @@ public class IndexMetadataIT extends ParallelStatsDisabledIT { assertFalse(rs.next()); assertActiveIndex(conn, INDEX_DATA_SCHEMA, indexDataTable); + + ddl = "ALTER INDEX " + indexName + " ON " + INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + indexDataTable + " REBUILD ASYNC"; + conn.createStatement().execute(ddl); + // Verify the metadata for index is correct. + rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(INDEX_DATA_SCHEMA), indexName , new String[] {PTableType.INDEX.toString()}); + assertTrue(rs.next()); + assertEquals(indexName , rs.getString(3)); + assertEquals(PIndexState.BUILDING.toString(), rs.getString("INDEX_STATE")); + assertFalse(rs.next()); ddl = "DROP INDEX " + indexName + " ON " + INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + indexDataTable; stmt = conn.prepareStatement(ddl); @@ -568,4 +581,49 @@ public class IndexMetadataIT extends ParallelStatsDisabledIT { assertTrue(d2.after(d1)); assertFalse(rs.next()); } + + @Test + public void testAsyncRebuildTimestamp() throws Exception { + long startTimestamp = System.currentTimeMillis(); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.setAutoCommit(false); + String testTable = generateUniqueName(); + + + String ddl = "create table " + testTable + " (k varchar primary key, v1 varchar, v2 varchar, v3 varchar)"; + Statement stmt = conn.createStatement(); + stmt.execute(ddl); + String indexName = "R_ASYNCIND_" + generateUniqueName(); + + ddl = "CREATE INDEX " + indexName + "1 ON " + testTable + " (v1) "; + stmt.execute(ddl); + ddl = "CREATE INDEX " + indexName + "2 ON " + testTable + " (v2) "; + stmt.execute(ddl); + ddl = "CREATE INDEX " + indexName + "3 ON " + testTable + " (v3)"; + stmt.execute(ddl); + conn.createStatement().execute("ALTER INDEX "+indexName+"1 ON " + testTable +" DISABLE "); + conn.createStatement().execute("ALTER INDEX "+indexName+"2 ON " + testTable +" REBUILD "); + conn.createStatement().execute("ALTER INDEX "+indexName+"3 ON " + testTable +" REBUILD ASYNC"); + + ResultSet rs = conn.createStatement().executeQuery( + "select table_name, " + PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + " " + + "from system.catalog (" + PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + " " + PLong.INSTANCE.getSqlTypeName() + ") " + + "where " + PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + " !=0 and table_name like 'R_ASYNCIND_%' " + + "order by table_name"); + assertTrue(rs.next()); + assertEquals(indexName + "3", rs.getString(1)); + long asyncTimestamp = rs.getLong(2); + assertTrue("Async timestamp is recent timestamp", asyncTimestamp > startTimestamp); + PTable table = PhoenixRuntime.getTable(conn, indexName+"3"); + assertEquals(table.getTimeStamp(), asyncTimestamp); + assertFalse(rs.next()); + conn.createStatement().execute("ALTER INDEX "+indexName+"3 ON " + testTable +" DISABLE"); + rs = conn.createStatement().executeQuery( + "select table_name, " + PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + " " + + "from system.catalog (" + PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + " " + PLong.INSTANCE.getSqlTypeName() + ") " + + "where " + PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + " !=0 and table_name like 'ASYNCIND_%' " + + "order by table_name" ); + assertFalse(rs.next()); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/068c1cd9/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java index 5ec9c24..60e847e 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java @@ -153,7 +153,7 @@ public class MutableIndexFailureIT extends BaseTest { rs = conn.createStatement().executeQuery(query); assertFalse(rs.next()); - FAIL_WRITE = false; + FailingRegionObserver.FAIL_WRITE = false; conn.createStatement().execute( "CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)"); conn.createStatement().execute( @@ -193,7 +193,7 @@ public class MutableIndexFailureIT extends BaseTest { assertEquals("z", rs.getString(2)); assertFalse(rs.next()); - FAIL_WRITE = true; + FailingRegionObserver.FAIL_WRITE = true; updateTable(conn, fullTableName); updateTable(conn, secondTableName); // Verify the metadata for index is correct. @@ -250,7 +250,7 @@ public class MutableIndexFailureIT extends BaseTest { } // re-enable index table - FAIL_WRITE = false; + FailingRegionObserver.FAIL_WRITE = false; waitForIndexToBeActive(conn,indexName); waitForIndexToBeActive(conn,secondIndexName); @@ -381,6 +381,8 @@ public class MutableIndexFailureIT extends BaseTest { } public static class FailingRegionObserver extends SimpleRegionObserver { + public static volatile boolean FAIL_WRITE = false; + public static final String INDEX_NAME = "IDX"; @Override public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException { if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().contains(INDEX_NAME) && FAIL_WRITE) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/068c1cd9/phoenix-core/src/main/antlr3/PhoenixSQL.g ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/antlr3/PhoenixSQL.g b/phoenix-core/src/main/antlr3/PhoenixSQL.g index 3e09766..07a51ce 100644 --- a/phoenix-core/src/main/antlr3/PhoenixSQL.g +++ b/phoenix-core/src/main/antlr3/PhoenixSQL.g @@ -567,8 +567,8 @@ drop_index_node returns [DropIndexStatement ret] // Parse a alter index statement alter_index_node returns [AlterIndexStatement ret] - : ALTER INDEX (IF ex=EXISTS)? i=index_name ON t=from_table_name s=(USABLE | UNUSABLE | REBUILD | DISABLE | ACTIVE) - {ret = factory.alterIndex(factory.namedTable(null, TableName.create(t.getSchemaName(), i.getName())), t.getTableName(), ex!=null, PIndexState.valueOf(SchemaUtil.normalizeIdentifier(s.getText()))); } + : ALTER INDEX (IF ex=EXISTS)? i=index_name ON t=from_table_name s=(USABLE | UNUSABLE | REBUILD | DISABLE | ACTIVE) (async=ASYNC)? + {ret = factory.alterIndex(factory.namedTable(null, TableName.create(t.getSchemaName(), i.getName())), t.getTableName(), ex!=null, PIndexState.valueOf(SchemaUtil.normalizeIdentifier(s.getText())), async!=null); } ; // Parse a trace statement. http://git-wip-us.apache.org/repos/asf/phoenix/blob/068c1cd9/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index dc3cbd8..76bda44 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -3296,11 +3296,13 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso List<Cell> newKVs = tableMetadata.get(0).getFamilyCellMap().get(TABLE_FAMILY_BYTES); Cell newKV = null; int disableTimeStampKVIndex = -1; + int indexStateKVIndex = 0; int index = 0; for(Cell cell : newKVs){ if(Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(), INDEX_STATE_BYTES, 0, INDEX_STATE_BYTES.length) == 0){ newKV = cell; + indexStateKVIndex = index; } else if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(), INDEX_DISABLE_TIMESTAMP_BYTES, 0, INDEX_DISABLE_TIMESTAMP_BYTES.length) == 0){ disableTimeStampKVIndex = index; @@ -3378,11 +3380,11 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso if ((currentState == PIndexState.UNUSABLE && newState == PIndexState.ACTIVE) || (currentState == PIndexState.ACTIVE && newState == PIndexState.UNUSABLE)) { newState = PIndexState.INACTIVE; - newKVs.set(0, KeyValueUtil.newKeyValue(key, TABLE_FAMILY_BYTES, + newKVs.set(indexStateKVIndex, KeyValueUtil.newKeyValue(key, TABLE_FAMILY_BYTES, INDEX_STATE_BYTES, timeStamp, Bytes.toBytes(newState.getSerializedValue()))); } else if (currentState == PIndexState.INACTIVE && newState == PIndexState.USABLE) { newState = PIndexState.ACTIVE; - newKVs.set(0, KeyValueUtil.newKeyValue(key, TABLE_FAMILY_BYTES, + newKVs.set(indexStateKVIndex, KeyValueUtil.newKeyValue(key, TABLE_FAMILY_BYTES, INDEX_STATE_BYTES, timeStamp, Bytes.toBytes(newState.getSerializedValue()))); } @@ -3414,7 +3416,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso if(dataTableKey != null) { metaDataCache.invalidate(new ImmutableBytesPtr(dataTableKey)); } - if (setRowKeyOrderOptimizableCell || disableTimeStampKVIndex != -1) { + if (setRowKeyOrderOptimizableCell || disableTimeStampKVIndex != -1 + || currentState == PIndexState.DISABLE || newState == PIndexState.BUILDING) { returnTable = doGetTable(key, HConstants.LATEST_TIMESTAMP, rowLock); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/068c1cd9/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java index e790b59..a60de03 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java @@ -17,12 +17,19 @@ */ package org.apache.phoenix.coprocessor; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID; import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES; import java.io.IOException; +import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -45,6 +52,7 @@ import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.RegionScanner; @@ -69,6 +77,7 @@ import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.MetaDataClient; import org.apache.phoenix.schema.PIndexState; +import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.TableNotFoundException; @@ -97,6 +106,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver { private boolean enableRebuildIndex = QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD; private long rebuildIndexTimeInterval = QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL; private boolean blockWriteRebuildIndex = false; + private static Map<PName, Long> batchExecutedPerTableMap = new HashMap<PName, Long>(); @Override public void preClose(final ObserverContext<RegionCoprocessorEnvironment> c, @@ -125,6 +135,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver { QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL); blockWriteRebuildIndex = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE, QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE); + } @Override @@ -195,9 +206,15 @@ public class MetaDataRegionObserver extends BaseRegionObserver { // running private final static AtomicInteger inProgress = new AtomicInteger(0); RegionCoprocessorEnvironment env; + private long rebuildIndexBatchSize = HConstants.LATEST_TIMESTAMP; + private long configuredBatches = 10; public BuildIndexScheduleTask(RegionCoprocessorEnvironment env) { this.env = env; + this.rebuildIndexBatchSize = env.getConfiguration().getLong( + QueryServices.INDEX_FAILURE_HANDLING_REBUILD_PERIOD, HConstants.LATEST_TIMESTAMP); + this.configuredBatches = env.getConfiguration().getLong( + QueryServices.INDEX_FAILURE_HANDLING_REBUILD_NUMBER_OF_BATCHES_PER_TABLE, configuredBatches); } @Override @@ -228,6 +245,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver { PhoenixDatabaseMetaData.INDEX_STATE_BYTES); scan.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES); + PreparedStatement updateDisabledTimeStampSmt = null; Map<PTable, List<PTable>> dataTableToIndexesMap = null; MetaDataClient client = null; @@ -243,8 +261,13 @@ public class MetaDataRegionObserver extends BaseRegionObserver { Result r = Result.create(results); byte[] disabledTimeStamp = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES); + byte[] indexState = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, + PhoenixDatabaseMetaData.INDEX_STATE_BYTES); + + if (disabledTimeStamp == null || disabledTimeStamp.length == 0 || (indexState != null + && PIndexState.BUILDING == PIndexState.fromSerializedValue(Bytes.toString(indexState)))) { - if (disabledTimeStamp == null || disabledTimeStamp.length == 0) { + // Don't rebuild the building index , because they are marked for aysnc continue; } @@ -255,8 +278,6 @@ public class MetaDataRegionObserver extends BaseRegionObserver { } byte[] dataTable = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES); - byte[] indexState = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, - PhoenixDatabaseMetaData.INDEX_STATE_BYTES); if ((dataTable == null || dataTable.length == 0) || (indexState == null || indexState.length == 0)) { // data table name can't be empty continue; @@ -317,109 +338,169 @@ public class MetaDataRegionObserver extends BaseRegionObserver { indexesToPartiallyRebuild.add(indexPTable); } while (hasMore); - if (dataTableToIndexesMap != null) { - long overlapTime = env.getConfiguration().getLong( - QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB, - QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME); - for (Map.Entry<PTable, List<PTable>> entry : dataTableToIndexesMap.entrySet()) { - PTable dataPTable = entry.getKey(); - List<PTable> indexesToPartiallyRebuild = entry.getValue(); - try { - long earliestDisableTimestamp = Long.MAX_VALUE; - List<IndexMaintainer> maintainers = Lists - .newArrayListWithExpectedSize(indexesToPartiallyRebuild.size()); - for (PTable index : indexesToPartiallyRebuild) { - long disabledTimeStampVal = index.getIndexDisableTimestamp(); - if (disabledTimeStampVal > 0) { - if (disabledTimeStampVal < earliestDisableTimestamp) { - earliestDisableTimestamp = disabledTimeStampVal; - } - - maintainers.add(index.getIndexMaintainer(dataPTable, conn)); - } - } - // No indexes are disabled, so skip this table - if (earliestDisableTimestamp == Long.MAX_VALUE) { - continue; - } - - long timeStamp = Math.max(0, earliestDisableTimestamp - overlapTime); - LOG.info("Starting to build " + dataPTable + " indexes " + indexesToPartiallyRebuild - + " from timestamp=" + timeStamp); - TableRef tableRef = new TableRef(null, dataPTable, HConstants.LATEST_TIMESTAMP, false); - // TODO Need to set high timeout - PostDDLCompiler compiler = new PostDDLCompiler(conn); - MutationPlan plan = compiler.compile(Collections.singletonList(tableRef), null, null, null, - HConstants.LATEST_TIMESTAMP); - Scan dataTableScan = IndexManagementUtil.newLocalStateScan(plan.getContext().getScan(), - maintainers); - dataTableScan.setTimeRange(timeStamp, HConstants.LATEST_TIMESTAMP); - dataTableScan.setCacheBlocks(false); - dataTableScan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, TRUE_BYTES); - - ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable( - ByteUtil.EMPTY_BYTE_ARRAY); - IndexMaintainer.serializeAdditional(dataPTable, indexMetaDataPtr, indexesToPartiallyRebuild, - conn); - byte[] attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr); - - dataTableScan.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue); - MutationState mutationState = plan.execute(); - long rowCount = mutationState.getUpdateCount(); - LOG.info(rowCount + " rows of index which are rebuild"); - for (PTable indexPTable : indexesToPartiallyRebuild) { - String indexTableFullName = SchemaUtil.getTableName(indexPTable.getSchemaName() - .getString(), indexPTable.getTableName().getString()); - updateIndexState(conn, indexTableFullName, env, PIndexState.INACTIVE, PIndexState.ACTIVE); - } - } catch (Exception e) { // Log, but try next table's indexes - LOG.warn("Unable to rebuild " + dataPTable + " indexes " + indexesToPartiallyRebuild - + ". Will try again next on next scheduled invocation.", e); - } - } - } - } catch (Throwable t) { - LOG.warn("ScheduledBuildIndexTask failed!", t); - } finally { - inProgress.decrementAndGet(); - if (scanner != null) { - try { - scanner.close(); - } catch (IOException ignored) { - LOG.debug("ScheduledBuildIndexTask can't close scanner.", ignored); - } - } - if (conn != null) { - try { - conn.close(); - } catch (SQLException ignored) { - LOG.debug("ScheduledBuildIndexTask can't close connection", ignored); - } - } - } + if (dataTableToIndexesMap != null) { + long overlapTime = env.getConfiguration().getLong( + QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB, + QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME); + for (Map.Entry<PTable, List<PTable>> entry : dataTableToIndexesMap.entrySet()) { + PTable dataPTable = entry.getKey(); + List<PTable> indexesToPartiallyRebuild = entry.getValue(); + ReadOnlyProps props = new ReadOnlyProps(env.getConfiguration().iterator()); + try (HTableInterface metaTable = env.getTable( + SchemaUtil.getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, props))) { + long earliestDisableTimestamp = Long.MAX_VALUE; + List<IndexMaintainer> maintainers = Lists + .newArrayListWithExpectedSize(indexesToPartiallyRebuild.size()); + for (PTable index : indexesToPartiallyRebuild) { + long disabledTimeStampVal = index.getIndexDisableTimestamp(); + if (disabledTimeStampVal > 0) { + if (disabledTimeStampVal < earliestDisableTimestamp) { + earliestDisableTimestamp = disabledTimeStampVal; + } + + maintainers.add(index.getIndexMaintainer(dataPTable, conn)); + } + } + // No indexes are disabled, so skip this table + if (earliestDisableTimestamp == Long.MAX_VALUE) { + continue; + } + long timeStamp = Math.max(0, earliestDisableTimestamp - overlapTime); + LOG.info("Starting to build " + dataPTable + " indexes " + indexesToPartiallyRebuild + + " from timestamp=" + timeStamp); + + TableRef tableRef = new TableRef(null, dataPTable, HConstants.LATEST_TIMESTAMP, false); + // TODO Need to set high timeout + PostDDLCompiler compiler = new PostDDLCompiler(conn); + MutationPlan plan = compiler.compile(Collections.singletonList(tableRef), null, null, null, + HConstants.LATEST_TIMESTAMP); + Scan dataTableScan = IndexManagementUtil.newLocalStateScan(plan.getContext().getScan(), + maintainers); + + long scanEndTime = getTimestampForBatch(timeStamp, + batchExecutedPerTableMap.get(dataPTable.getName())); + dataTableScan.setTimeRange(timeStamp, scanEndTime); + dataTableScan.setCacheBlocks(false); + dataTableScan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, TRUE_BYTES); + + ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable( + ByteUtil.EMPTY_BYTE_ARRAY); + IndexMaintainer.serializeAdditional(dataPTable, indexMetaDataPtr, indexesToPartiallyRebuild, + conn); + byte[] attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr); + dataTableScan.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue); + MutationState mutationState = plan.execute(); + long rowCount = mutationState.getUpdateCount(); + LOG.info(rowCount + " rows of index which are rebuild"); + for (PTable indexPTable : indexesToPartiallyRebuild) { + String indexTableFullName = SchemaUtil.getTableName( + indexPTable.getSchemaName().getString(), + indexPTable.getTableName().getString()); + if (scanEndTime == HConstants.LATEST_TIMESTAMP) { + updateIndexState(conn, indexTableFullName, env, PIndexState.INACTIVE, + PIndexState.ACTIVE); + batchExecutedPerTableMap.remove(dataPTable.getName()); + } else { + + updateDisableTimestamp(conn, indexTableFullName, env, scanEndTime, metaTable); + Long noOfBatches = batchExecutedPerTableMap.get(dataPTable.getName()); + if (noOfBatches == null) { + noOfBatches = 0l; + } + batchExecutedPerTableMap.put(dataPTable.getName(), ++noOfBatches); + // clearing cache to get the updated + // disabled timestamp + new MetaDataClient(conn).updateCache(dataPTable.getSchemaName().getString(), + dataPTable.getTableName().getString()); + new MetaDataClient(conn).updateCache(indexPTable.getSchemaName().getString(), + indexPTable.getTableName().getString()); + LOG.info( + "During Round-robin build: Successfully updated index disabled timestamp for " + + indexTableFullName + " to " + scanEndTime); + } + + } + } catch (Exception e) { // Log, but try next table's + // indexes + LOG.warn("Unable to rebuild " + dataPTable + " indexes " + indexesToPartiallyRebuild + + ". Will try again next on next scheduled invocation.", e); + } + } + } + } catch (Throwable t) { + LOG.warn("ScheduledBuildIndexTask failed!", t); + } finally { + inProgress.decrementAndGet(); + if (scanner != null) { + try { + scanner.close(); + } catch (IOException ignored) { + LOG.debug("ScheduledBuildIndexTask can't close scanner.", ignored); + } + } + if (conn != null) { + try { + conn.close(); + } catch (SQLException ignored) { + LOG.debug("ScheduledBuildIndexTask can't close connection", ignored); + } + } + } } - } - - private static void updateIndexState(PhoenixConnection conn, String indexTableName, RegionCoprocessorEnvironment env, PIndexState oldState, - PIndexState newState) throws ServiceException, Throwable { - byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(indexTableName); - String schemaName = SchemaUtil.getSchemaNameFromFullName(indexTableName); - String indexName = SchemaUtil.getTableNameFromFullName(indexTableName); - // Mimic the Put that gets generated by the client on an update of the index state - Put put = new Put(indexTableKey); - put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES, - newState.getSerializedBytes()); - if (newState == PIndexState.ACTIVE) { - put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES, - PLong.INSTANCE.toBytes(0)); + + private long getTimestampForBatch(long disabledTimeStamp, Long noOfBatches) { + if (disabledTimeStamp < 0 || rebuildIndexBatchSize > (HConstants.LATEST_TIMESTAMP + - disabledTimeStamp)) { return HConstants.LATEST_TIMESTAMP; } + long timestampForNextBatch = disabledTimeStamp + rebuildIndexBatchSize; + if (timestampForNextBatch < 0 || timestampForNextBatch > System.currentTimeMillis() + || (noOfBatches != null && noOfBatches > configuredBatches)) { + // if timestampForNextBatch cross current time , then we should + // build the complete index + timestampForNextBatch = HConstants.LATEST_TIMESTAMP; + } + return timestampForNextBatch; } - final List<Mutation> tableMetadata = Collections.<Mutation> singletonList(put); - MetaDataMutationResult result = conn.getQueryServices().updateIndexState(tableMetadata, null); - MutationCode code = result.getMutationCode(); - if (code == MutationCode.TABLE_NOT_FOUND) { throw new TableNotFoundException(schemaName, indexName); } - if (code == MutationCode.UNALLOWED_TABLE_MUTATION) { throw new SQLExceptionInfo.Builder( - SQLExceptionCode.INVALID_INDEX_STATE_TRANSITION) - .setMessage(" currentState=" + oldState + ". requestedState=" + newState).setSchemaName(schemaName) - .setTableName(indexName).build().buildException(); } } + + private static void updateIndexState(PhoenixConnection conn, String indexTableName, + RegionCoprocessorEnvironment env, PIndexState oldState, PIndexState newState) + throws ServiceException, Throwable { + byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(indexTableName); + String schemaName = SchemaUtil.getSchemaNameFromFullName(indexTableName); + String indexName = SchemaUtil.getTableNameFromFullName(indexTableName); + // Mimic the Put that gets generated by the client on an update of the + // index state + Put put = new Put(indexTableKey); + put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES, + newState.getSerializedBytes()); + if (newState == PIndexState.ACTIVE) { + put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES, PLong.INSTANCE.toBytes(0)); + put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, + PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP_BYTES, PLong.INSTANCE.toBytes(0)); + } + final List<Mutation> tableMetadata = Collections.<Mutation> singletonList(put); + MetaDataMutationResult result = conn.getQueryServices().updateIndexState(tableMetadata, null); + MutationCode code = result.getMutationCode(); + if (code == MutationCode.TABLE_NOT_FOUND) { + throw new TableNotFoundException(schemaName, indexName); + } + if (code == MutationCode.UNALLOWED_TABLE_MUTATION) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_INDEX_STATE_TRANSITION) + .setMessage(" currentState=" + oldState + ". requestedState=" + newState).setSchemaName(schemaName) + .setTableName(indexName).build().buildException(); + } + } + + private static void updateDisableTimestamp(PhoenixConnection conn, String indexTableName, + RegionCoprocessorEnvironment env, long disabledTimestamp, HTableInterface metaTable) throws IOException { + byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(indexTableName); + Put put = new Put(indexTableKey); + put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES, + PLong.INSTANCE.toBytes(disabledTimestamp)); + metaTable.checkAndPut(indexTableKey, PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, + PhoenixDatabaseMetaData.INDEX_STATE_BYTES, CompareOp.EQUAL, PIndexState.INACTIVE.getSerializedBytes(), + put); + + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/068c1cd9/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java index fb4e3c3..fde403c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java @@ -421,7 +421,8 @@ public enum SQLExceptionCode { 724, "43M07", "Schema name not allowed!!"), CREATE_SCHEMA_NOT_ALLOWED(725, "43M08", "Cannot create schema because config " + QueryServices.IS_NAMESPACE_MAPPING_ENABLED + " for enabling name space mapping isn't enabled."), INCONSISTENET_NAMESPACE_MAPPING_PROPERTIES( - 726, "43M10", " Inconsistent namespace mapping properites.."); + 726, "43M10", " Inconsistent namespace mapping properites.."), ASYNC_NOT_ALLOWED( + 727, "43M11", " ASYNC option is not allowed.. "); private final int errorCode; private final String sqlState; http://git-wip-us.apache.org/repos/asf/phoenix/blob/068c1cd9/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java index eb73d6b..e515dbb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java @@ -35,30 +35,21 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; -import org.apache.hadoop.hbase.ipc.ServerRpcController; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode; -import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse; -import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService; -import org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest; import org.apache.phoenix.hbase.index.table.HTableInterfaceReference; import org.apache.phoenix.hbase.index.write.DelegateIndexFailurePolicy; import org.apache.phoenix.hbase.index.write.KillServerOnFailurePolicy; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; -import org.apache.phoenix.protobuf.ProtobufUtil; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; -import org.apache.phoenix.schema.types.PLong; +import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.QueryUtil; @@ -153,47 +144,10 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy { String indexTableName = tableTimeElement.getKey(); long minTimeStamp = tableTimeElement.getValue(); // Disable the index by using the updateIndexState method of MetaDataProtocol end point coprocessor. - byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(indexTableName); HTableInterface systemTable = env.getTable(SchemaUtil .getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, env.getConfiguration())); - // Mimic the Put that gets generated by the client on an update of the index state - Put put = new Put(indexTableKey); - if (blockWriteRebuildIndex) - put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES, - PIndexState.ACTIVE.getSerializedBytes()); - else - put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES, - PIndexState.DISABLE.getSerializedBytes()); - put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES, - PLong.INSTANCE.toBytes(minTimeStamp)); - final List<Mutation> tableMetadata = Collections.<Mutation>singletonList(put); - - final Map<byte[], MetaDataResponse> results = - systemTable.coprocessorService(MetaDataService.class, indexTableKey, indexTableKey, - new Batch.Call<MetaDataService, MetaDataResponse>() { - @Override - public MetaDataResponse call(MetaDataService instance) throws IOException { - ServerRpcController controller = new ServerRpcController(); - BlockingRpcCallback<MetaDataResponse> rpcCallback = - new BlockingRpcCallback<MetaDataResponse>(); - UpdateIndexStateRequest.Builder builder = UpdateIndexStateRequest.newBuilder(); - for (Mutation m : tableMetadata) { - MutationProto mp = ProtobufUtil.toProto(m); - builder.addTableMetadataMutations(mp.toByteString()); - } - instance.updateIndexState(controller, builder.build(), rpcCallback); - if (controller.getFailedOn() != null) { - throw controller.getFailedOn(); - } - return rpcCallback.get(); - } - }); - if (results.isEmpty()) { - throw new IOException("Didn't get expected result size"); - } - MetaDataResponse tmpResponse = results.values().iterator().next(); - MetaDataMutationResult result = MetaDataMutationResult.constructFromProto(tmpResponse); - + MetaDataMutationResult result = IndexUtil.disableIndexWithTimestamp(indexTableName, minTimeStamp, + systemTable, blockWriteRebuildIndex); if (result.getMutationCode() == MutationCode.TABLE_NOT_FOUND) { LOG.info("Index " + indexTableName + " has been dropped. Ignore uncommitted mutations"); continue; http://git-wip-us.apache.org/repos/asf/phoenix/blob/068c1cd9/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java index 54080d1..5142b57 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java @@ -279,6 +279,8 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData { public static final String LAST_STATS_UPDATE_TIME = "LAST_STATS_UPDATE_TIME"; public static final byte[] LAST_STATS_UPDATE_TIME_BYTES = Bytes.toBytes(LAST_STATS_UPDATE_TIME); public static final String GUIDE_POST_KEY = "GUIDE_POST_KEY"; + public static final String ASYNC_REBUILD_TIMESTAMP = "ASYNC_REBUILD_TIMESTAMP"; + public static final byte[] ASYNC_REBUILD_TIMESTAMP_BYTES = Bytes.toBytes(ASYNC_REBUILD_TIMESTAMP); public static final String PARENT_TENANT_ID = "PARENT_TENANT_ID"; public static final byte[] PARENT_TENANT_ID_BYTES = Bytes.toBytes(PARENT_TENANT_ID); http://git-wip-us.apache.org/repos/asf/phoenix/blob/068c1cd9/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index d57c250..f3c6d30 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -989,8 +989,8 @@ public class PhoenixStatement implements Statement, SQLCloseable { private static class ExecutableAlterIndexStatement extends AlterIndexStatement implements CompilableStatement { - public ExecutableAlterIndexStatement(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState state) { - super(indexTableNode, dataTableName, ifExists, state); + public ExecutableAlterIndexStatement(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState state, boolean async) { + super(indexTableNode, dataTableName, ifExists, state, async); } @SuppressWarnings("unchecked") @@ -1302,8 +1302,8 @@ public class PhoenixStatement implements Statement, SQLCloseable { } @Override - public AlterIndexStatement alterIndex(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState state) { - return new ExecutableAlterIndexStatement(indexTableNode, dataTableName, ifExists, state); + public AlterIndexStatement alterIndex(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState state, boolean async) { + return new ExecutableAlterIndexStatement(indexTableNode, dataTableName, ifExists, state, async); } @Override