PHOENIX-2890 Extend IndexTool to allow incremental index rebuilds

Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/068c1cd9
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/068c1cd9
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/068c1cd9

Branch: refs/heads/4.x-HBase-1.1
Commit: 068c1cd96db6b46e80721cb66be9263e0565f5d1
Parents: c1b8d79
Author: Ankit Singhal <ankitsingha...@gmail.com>
Authored: Mon Dec 26 12:24:19 2016 +0530
Committer: Ankit Singhal <ankitsingha...@gmail.com>
Committed: Mon Dec 26 12:24:19 2016 +0530

----------------------------------------------------------------------
 .../phoenix/end2end/AutomaticRebuildIT.java     | 219 +++++++++
 .../end2end/IndexToolForPartialBuildIT.java     | 298 ++++++++++++
 ...olForPartialBuildWithNamespaceEnabledIT.java |  70 +++
 .../phoenix/end2end/index/IndexMetadataIT.java  |  58 +++
 .../end2end/index/MutableIndexFailureIT.java    |   8 +-
 phoenix-core/src/main/antlr3/PhoenixSQL.g       |   4 +-
 .../coprocessor/MetaDataEndpointImpl.java       |   9 +-
 .../coprocessor/MetaDataRegionObserver.java     | 291 +++++++-----
 .../phoenix/exception/SQLExceptionCode.java     |   3 +-
 .../index/PhoenixIndexFailurePolicy.java        |  52 +--
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java   |   2 +
 .../apache/phoenix/jdbc/PhoenixStatement.java   |   8 +-
 .../phoenix/mapreduce/index/IndexTool.java      | 455 +++++++++++++------
 .../phoenix/mapreduce/index/IndexToolUtil.java  |   6 +-
 .../index/PhoenixIndexImportDirectMapper.java   |   2 +-
 .../index/PhoenixIndexPartialBuildMapper.java   | 182 ++++++++
 .../util/PhoenixConfigurationUtil.java          |  31 ++
 .../phoenix/parse/AlterIndexStatement.java      |   8 +-
 .../apache/phoenix/parse/ParseNodeFactory.java  |   6 +-
 .../org/apache/phoenix/query/QueryServices.java |   4 +
 .../apache/phoenix/schema/MetaDataClient.java   |  47 +-
 .../java/org/apache/phoenix/util/IndexUtil.java |  61 ++-
 22 files changed, 1503 insertions(+), 321 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/068c1cd9/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutomaticRebuildIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutomaticRebuildIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutomaticRebuildIT.java
new file mode 100644
index 0000000..cbb7745
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutomaticRebuildIT.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.StringUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.Maps;
+
+/**
+ * Tests for the {@link AutomaticRebuildIT}
+ */
+@RunWith(Parameterized.class)
+public class AutomaticRebuildIT extends BaseOwnClusterIT {
+
+       private final boolean localIndex;
+       protected boolean isNamespaceEnabled = false;
+       protected final String tableDDLOptions;
+
+       public AutomaticRebuildIT(boolean localIndex) {
+               this.localIndex = localIndex;
+               StringBuilder optionBuilder = new StringBuilder();
+               optionBuilder.append(" SPLIT ON(1,2)");
+               this.tableDDLOptions = optionBuilder.toString();
+       }
+
+       @BeforeClass
+       public static void doSetup() throws Exception {
+               Map<String, String> serverProps = 
Maps.newHashMapWithExpectedSize(7);
+               serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, 
QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
+               serverProps.put("hbase.coprocessor.region.classes", 
FailingRegionObserver.class.getName());
+               serverProps.put(" 
yarn.scheduler.capacity.maximum-am-resource-percent", "1.0");
+               serverProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
+               serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000");
+               serverProps.put("hbase.client.pause", "5000");
+               
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_PERIOD, "1000");
+               
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_NUMBER_OF_BATCHES_PER_TABLE,
 "5");
+               Map<String, String> clientProps = 
Maps.newHashMapWithExpectedSize(1);
+               setUpTestDriver(new 
ReadOnlyProps(serverProps.entrySet().iterator()),
+                               new 
ReadOnlyProps(clientProps.entrySet().iterator()));
+       }
+
+       @Parameters(name = "localIndex = {0}")
+       public static Collection<Boolean[]> data() {
+               return Arrays.asList(new Boolean[][] { { false }, { true } });
+       }
+
+       @Test
+       public void testSecondaryAutomaticRebuildIndex() throws Exception {
+               String schemaName = generateUniqueName();
+               String dataTableName = generateUniqueName();
+               String fullTableName = SchemaUtil.getTableName(schemaName, 
dataTableName);
+               final String indxTable = String.format("%s_%s", dataTableName, 
FailingRegionObserver.INDEX_NAME);
+               Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+               props.setProperty(QueryServices.TRANSACTIONS_ENABLED, 
Boolean.TRUE.toString());
+               props.setProperty(QueryServices.EXPLAIN_ROW_COUNT_ATTRIB, 
Boolean.FALSE.toString());
+               props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, 
Boolean.toString(isNamespaceEnabled));
+               final Connection conn = DriverManager.getConnection(getUrl(), 
props);
+               Statement stmt = conn.createStatement();
+               try {
+                       if (isNamespaceEnabled) {
+                               conn.createStatement().execute("CREATE SCHEMA 
IF NOT EXISTS " + schemaName);
+                       }
+                       stmt.execute(String.format(
+                                       "CREATE TABLE %s (ID BIGINT NOT NULL, 
NAME VARCHAR, ZIP INTEGER CONSTRAINT PK PRIMARY KEY(ID ROW_TIMESTAMP)) %s",
+                                       fullTableName, tableDDLOptions));
+                       String upsertQuery = String.format("UPSERT INTO %s 
VALUES(?, ?, ?)", fullTableName);
+                       PreparedStatement stmt1 = 
conn.prepareStatement(upsertQuery);
+                       FailingRegionObserver.FAIL_WRITE = false;
+                       // insert two rows
+                       upsertRow(stmt1, 1000);
+                       upsertRow(stmt1, 2000);
+
+                       conn.commit();
+                       stmt.execute(String.format("CREATE %s INDEX %s ON %s  
(LPAD(UPPER(NAME),11,'x')||'_xyz') ",
+                                       (localIndex ? "LOCAL" : ""), indxTable, 
fullTableName));
+                       FailingRegionObserver.FAIL_WRITE = true;
+                       upsertRow(stmt1, 3000);
+                       upsertRow(stmt1, 4000);
+                       upsertRow(stmt1, 5000);
+                       try {
+                               conn.commit();
+                               fail();
+                       } catch (SQLException e) {
+                       } catch (Exception e) {
+                       }
+                       FailingRegionObserver.FAIL_WRITE = false;
+                       ResultSet rs = conn.getMetaData().getTables(null, 
StringUtil.escapeLike(schemaName), indxTable,
+                                       new String[] { 
PTableType.INDEX.toString() });
+                       assertTrue(rs.next());
+                       assertEquals(indxTable, rs.getString(3));
+                       String indexState = rs.getString("INDEX_STATE");
+                       assertEquals(PIndexState.DISABLE.toString(), 
indexState);
+                       assertFalse(rs.next());
+                       upsertRow(stmt1, 6000);
+                       upsertRow(stmt1, 7000);
+                       conn.commit();
+                       int maxTries = 4, nTries = 0;
+                       boolean isInactive = false;
+                       do {
+                               rs = conn.createStatement()
+                                               
.executeQuery(String.format("SELECT " + PhoenixDatabaseMetaData.INDEX_STATE + 
","
+                                                               + 
PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " FROM "
+                                                               + 
PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + " ("
+                                                               + 
PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + " bigint) where "
+                                                               + 
PhoenixDatabaseMetaData.TABLE_SCHEM + "='" + schemaName + "' and "
+                                                               + 
PhoenixDatabaseMetaData.TABLE_NAME + "='" + indxTable + "'"));
+                               rs.next();
+                               if 
(PIndexState.INACTIVE.getSerializedValue().equals(rs.getString(1)) && 
rs.getLong(2) > 3000) {
+                                       isInactive = true;
+                                       break;
+                               }
+                               Thread.sleep(10 * 1000); // sleep 10 secs
+                       } while (++nTries < maxTries);
+                       assertTrue(isInactive);
+                       nTries = 0;
+                       boolean isActive = false;
+                       do {
+                               Thread.sleep(15 * 1000); // sleep 15 secs
+                               rs = conn.getMetaData().getTables(null, 
StringUtil.escapeLike(schemaName), indxTable,
+                                               new String[] { 
PTableType.INDEX.toString() });
+                               assertTrue(rs.next());
+                               if 
(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))) {
+                                       isActive = true;
+                                       break;
+                               }
+                       } while (++nTries < maxTries);
+                       assertTrue(isActive);
+
+               } finally {
+                       conn.close();
+               }
+       }
+
+       public static void upsertRow(PreparedStatement stmt, int i) throws 
SQLException {
+               // insert row
+               stmt.setInt(1, i);
+               stmt.setString(2, "uname" + String.valueOf(i));
+               stmt.setInt(3, 95050 + i);
+               stmt.executeUpdate();
+       }
+
+       public static class FailingRegionObserver extends SimpleRegionObserver {
+               public static volatile boolean FAIL_WRITE = false;
+               public static final String INDEX_NAME = "IDX";
+
+               @Override
+               public void 
preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
+                               MiniBatchOperationInProgress<Mutation> 
miniBatchOp) throws HBaseIOException {
+                       if 
(c.getEnvironment().getRegionInfo().getTable().getNameAsString().contains(INDEX_NAME)
 && FAIL_WRITE) {
+                               throw new DoNotRetryIOException();
+                       }
+                       Mutation operation = miniBatchOp.getOperation(0);
+                       Set<byte[]> keySet = operation.getFamilyMap().keySet();
+                       for (byte[] family : keySet) {
+                               if 
(Bytes.toString(family).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)
 && FAIL_WRITE) {
+                                       throw new DoNotRetryIOException();
+                               }
+                       }
+               }
+
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/068c1cd9/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
new file mode 100644
index 0000000..116c47f
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.StringUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * Tests for the {@link IndexToolForPartialBuildIT}
+ */
+@RunWith(Parameterized.class)
+public class IndexToolForPartialBuildIT extends BaseOwnClusterIT {
+    
+    private final boolean localIndex;
+    protected boolean isNamespaceEnabled = false;
+    protected final String tableDDLOptions;
+    
+    public IndexToolForPartialBuildIT(boolean localIndex) {
+
+        this.localIndex = localIndex;
+        StringBuilder optionBuilder = new StringBuilder();
+        optionBuilder.append(" SPLIT ON(1,2)");
+        this.tableDDLOptions = optionBuilder.toString();
+    }
+    
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(7);
+        serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, 
QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
+        serverProps.put("hbase.coprocessor.region.classes", 
FailingRegionObserver.class.getName());
+        serverProps.put(" 
yarn.scheduler.capacity.maximum-am-resource-percent", "1.0");
+        serverProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
+        serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000");
+        serverProps.put("hbase.client.pause", "5000");
+        serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, 
Boolean.FALSE.toString());
+        Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1);
+        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), 
new ReadOnlyProps(clientProps.entrySet().iterator()));
+    }
+    
+    @Parameters(name="localIndex = {0}")
+    public static Collection<Boolean[]> data() {
+        return Arrays.asList(new Boolean[][] {     
+                 { false},{ true }
+           });
+    }
+    
+    @Test
+    public void testSecondaryIndex() throws Exception {
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(schemaName, 
dataTableName);
+        final String indxTable = String.format("%s_%s", dataTableName, 
FailingRegionObserver.INDEX_NAME);
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(QueryServices.TRANSACTIONS_ENABLED, 
Boolean.TRUE.toString());
+        props.setProperty(QueryServices.EXPLAIN_ROW_COUNT_ATTRIB, 
Boolean.FALSE.toString());
+        props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, 
Boolean.toString(isNamespaceEnabled));
+        final Connection conn = DriverManager.getConnection(getUrl(), props);
+        Statement stmt = conn.createStatement();
+        try {
+            if (isNamespaceEnabled) {
+                conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " 
+ schemaName);
+            }
+            stmt.execute(
+                    String.format("CREATE TABLE %s (ID BIGINT NOT NULL, NAME 
VARCHAR, ZIP INTEGER CONSTRAINT PK PRIMARY KEY(ID ROW_TIMESTAMP)) %s",
+                            fullTableName, tableDDLOptions));
+            String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, 
?)", fullTableName);
+            PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
+            FailingRegionObserver.FAIL_WRITE = false;
+            // insert two rows
+            upsertRow(stmt1, 1000);
+            upsertRow(stmt1, 2000);
+
+            conn.commit();
+            stmt.execute(String.format("CREATE %s INDEX %s ON %s  
(LPAD(UPPER(NAME),11,'x')||'_xyz') ",
+                    (localIndex ? "LOCAL" : ""), indxTable, fullTableName));
+            FailingRegionObserver.FAIL_WRITE = true;
+            upsertRow(stmt1, 3000);
+            upsertRow(stmt1, 4000);
+            upsertRow(stmt1, 5000);
+            try {
+                conn.commit();
+                fail();
+            } catch (SQLException e) {} catch (Exception e) {}
+            conn.createStatement()
+                    .execute(String.format("ALTER INDEX %s on %s REBUILD 
ASYNC", indxTable, fullTableName));
+            
+            FailingRegionObserver.FAIL_WRITE = false;
+            ResultSet rs = conn.getMetaData().getTables(null, 
StringUtil.escapeLike(schemaName), indxTable,
+                    new String[] { PTableType.INDEX.toString() });
+            assertTrue(rs.next());
+            assertEquals(indxTable, rs.getString(3));
+            String indexState = rs.getString("INDEX_STATE");
+            assertEquals(PIndexState.BUILDING.toString(), indexState);         
   
+            assertFalse(rs.next());
+            upsertRow(stmt1, 6000);
+            upsertRow(stmt1, 7000);
+            conn.commit();
+            
+                       rs = conn.createStatement()
+                                       .executeQuery(String.format("SELECT " + 
PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + ","
+                                                       + 
PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " FROM "
+                                                       + 
PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + " ("
+                                                       + 
PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + " bigint) where "
+                                                       + 
PhoenixDatabaseMetaData.TABLE_SCHEM + "='" + schemaName + "' and "
+                                                       + 
PhoenixDatabaseMetaData.TABLE_NAME + "='" + indxTable + "'"));
+                       rs.next();
+            PTable pindexTable = PhoenixRuntime.getTable(conn, 
SchemaUtil.getTableName(schemaName, indxTable));
+            assertEquals(PIndexState.BUILDING, pindexTable.getIndexState());
+            assertEquals(rs.getLong(1), pindexTable.getTimeStamp());
+            //assert disabled timestamp
+            assertEquals(rs.getLong(2), 3000);
+
+            String selectSql = String.format("SELECT 
LPAD(UPPER(NAME),11,'x')||'_xyz',ID FROM %s", fullTableName);
+            rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
+            String actualExplainPlan = QueryUtil.getExplainPlan(rs);
+
+            // assert we are pulling from data table.
+                       assertExplainPlan(actualExplainPlan, schemaName, 
dataTableName, null, false, isNamespaceEnabled);
+
+            rs = stmt1.executeQuery(selectSql);
+            for (int i = 1; i <= 7; i++) {
+                assertTrue(rs.next());
+                assertEquals("xxUNAME" + i*1000 + "_xyz", rs.getString(1));
+            }
+
+            // Validate Index table data till disabled timestamp
+            rs = stmt1.executeQuery(String.format("SELECT * FROM %s", 
SchemaUtil.getTableName(schemaName, indxTable)));
+            for (int i = 1; i <= 2; i++) {
+                assertTrue(rs.next());
+                assertEquals("xxUNAME" + i*1000 + "_xyz", rs.getString(1));
+            }
+            assertFalse(rs.next());
+            // run the index MR job.
+            final IndexTool indexingTool = new IndexTool();
+            Configuration conf = new 
Configuration(getUtility().getConfiguration());
+            conf.set(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, 
Boolean.toString(isNamespaceEnabled));
+            indexingTool.setConf(conf);
+
+            final String[] cmdArgs = getArgValues(schemaName, dataTableName);
+            int status = indexingTool.run(cmdArgs);
+            assertEquals(0, status);
+
+            // insert two more rows
+            upsertRow(stmt1, 8000);
+            upsertRow(stmt1, 9000);
+            conn.commit();
+
+            // assert we are pulling from index table.
+            rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
+            actualExplainPlan = QueryUtil.getExplainPlan(rs);
+            assertExplainPlan(actualExplainPlan, schemaName, dataTableName, 
indxTable, localIndex, isNamespaceEnabled);
+
+            rs = stmt.executeQuery(selectSql);
+
+            for (int i = 1; i <= 9; i++) {
+                assertTrue(rs.next());
+                assertEquals("xxUNAME" + i*1000 + "_xyz", rs.getString(1));
+            }
+
+            assertFalse(rs.next());
+
+           // conn.createStatement().execute(String.format("DROP INDEX  %s ON 
%s", indxTable, fullTableName));
+        } finally {
+            conn.close();
+        }
+    }
+    
+       public static void assertExplainPlan(final String actualExplainPlan, 
String schemaName, String dataTable,
+                       String indxTable, boolean isLocal, boolean 
isNamespaceMapped) {
+
+               String expectedExplainPlan = "";
+               if (indxTable != null) {
+                       if (isLocal) {
+                               final String localIndexName = SchemaUtil
+                                               
.getPhysicalHBaseTableName(SchemaUtil.getTableName(schemaName, dataTable), 
isNamespaceMapped,
+                                                               
PTableType.INDEX)
+                                               .getString();
+                               expectedExplainPlan = String.format("CLIENT 
PARALLEL 3-WAY RANGE SCAN OVER %s [1]", localIndexName);
+                       } else {
+                               expectedExplainPlan = String.format("CLIENT 
PARALLEL 1-WAY FULL SCAN OVER %s",
+                                               
SchemaUtil.getPhysicalHBaseTableName(SchemaUtil.getTableName(schemaName, 
indxTable),
+                                                               
isNamespaceMapped, PTableType.INDEX));
+                       }
+               } else {
+                       expectedExplainPlan = String.format("CLIENT PARALLEL 
1-WAY FULL SCAN OVER %s",
+                                       
SchemaUtil.getPhysicalHBaseTableName(SchemaUtil.getTableName(schemaName, 
dataTable),
+                                                       isNamespaceMapped, 
PTableType.TABLE));
+               }
+               assertTrue(actualExplainPlan.contains(expectedExplainPlan));
+       }
+
+    public String[] getArgValues(String schemaName, String dataTable) {
+        final List<String> args = Lists.newArrayList();
+        if (schemaName!=null) {
+            args.add("-s");
+            args.add(schemaName);
+        }
+        args.add("-dt");
+        args.add(dataTable);
+        args.add("-pr");
+        args.add("-op");
+        args.add("/tmp/output/partialTable_"+localIndex);
+        return args.toArray(new String[0]);
+    }
+
+    public static void upsertRow(PreparedStatement stmt, int i) throws 
SQLException {
+        // insert row
+        stmt.setInt(1, i);
+        stmt.setString(2, "uname" + String.valueOf(i));
+        stmt.setInt(3, 95050 + i);
+        stmt.executeUpdate();
+    }
+    
+
+    public static class FailingRegionObserver extends SimpleRegionObserver {
+        public static volatile boolean FAIL_WRITE = false;
+        public static final String INDEX_NAME = "IDX";
+        @Override
+        public void 
preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, 
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
+            if 
(c.getEnvironment().getRegionInfo().getTable().getNameAsString().contains(INDEX_NAME)
 && FAIL_WRITE) {
+                throw new DoNotRetryIOException();
+            }
+            Mutation operation = miniBatchOp.getOperation(0);
+            Set<byte[]> keySet = operation.getFamilyMap().keySet();
+            for(byte[] family: keySet) {
+                
if(Bytes.toString(family).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)
 && FAIL_WRITE) {
+                    throw new DoNotRetryIOException();
+                }
+            }
+        }
+
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/068c1cd9/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java
new file mode 100644
index 0000000..4b2371c
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.Maps;
+
+/**
+ * Tests for the {@link IndexToolForPartialBuildWithNamespaceEnabled}
+ */
+@RunWith(Parameterized.class)
+public class IndexToolForPartialBuildWithNamespaceEnabledIT extends 
IndexToolForPartialBuildIT {
+    
+    
+    public IndexToolForPartialBuildWithNamespaceEnabledIT(boolean localIndex, 
boolean isNamespaceEnabled) {
+        super(localIndex);
+        this.isNamespaceEnabled=isNamespaceEnabled;
+    }
+    
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(7);
+        serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, 
QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
+        serverProps.put("hbase.coprocessor.region.classes", 
FailingRegionObserver.class.getName());
+        serverProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
+        serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000");
+        serverProps.put("hbase.client.pause", "5000");
+        serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_PERIOD, 
"2000");
+        
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, 
"1000");
+        serverProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, "true");
+        Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1);
+        clientProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, "true");
+        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), 
new ReadOnlyProps(clientProps.entrySet().iterator()));
+    }
+    
+    @Parameters(name="localIndex = {0} , isNamespaceEnabled = {1}")
+    public static Collection<Boolean[]> data() {
+        return Arrays.asList(new Boolean[][] {     
+                 { false, true},{ true, false }
+           });
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/068c1cd9/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java
index f0c670b..63a6bd6 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java
@@ -31,6 +31,7 @@ import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.sql.Types;
 import java.util.Properties;
 
@@ -43,10 +44,13 @@ import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.AmbiguousColumnException;
 import org.apache.phoenix.schema.ColumnAlreadyExistsException;
 import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.types.PDate;
+import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.StringUtil;
@@ -216,6 +220,15 @@ public class IndexMetadataIT extends 
ParallelStatsDisabledIT {
             assertFalse(rs.next());
             
             assertActiveIndex(conn, INDEX_DATA_SCHEMA, indexDataTable);
+            
+            ddl = "ALTER INDEX " + indexName + " ON " + INDEX_DATA_SCHEMA + 
QueryConstants.NAME_SEPARATOR + indexDataTable + " REBUILD ASYNC";
+            conn.createStatement().execute(ddl);
+            // Verify the metadata for index is correct.
+            rs = conn.getMetaData().getTables(null, 
StringUtil.escapeLike(INDEX_DATA_SCHEMA), indexName , new String[] 
{PTableType.INDEX.toString()});
+            assertTrue(rs.next());
+            assertEquals(indexName , rs.getString(3));
+            assertEquals(PIndexState.BUILDING.toString(), 
rs.getString("INDEX_STATE"));
+            assertFalse(rs.next());
 
             ddl = "DROP INDEX " + indexName + " ON " + INDEX_DATA_SCHEMA + 
QueryConstants.NAME_SEPARATOR + indexDataTable;
             stmt = conn.prepareStatement(ddl);
@@ -568,4 +581,49 @@ public class IndexMetadataIT extends 
ParallelStatsDisabledIT {
         assertTrue(d2.after(d1));
         assertFalse(rs.next());
     }
+    
+    @Test
+    public void testAsyncRebuildTimestamp() throws Exception {
+        long startTimestamp = System.currentTimeMillis();
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(false);
+        String testTable = generateUniqueName();
+
+
+        String ddl = "create table " + testTable  + " (k varchar primary key, 
v1 varchar, v2 varchar, v3 varchar)";
+        Statement stmt = conn.createStatement();
+        stmt.execute(ddl);
+        String indexName = "R_ASYNCIND_" + generateUniqueName();
+        
+        ddl = "CREATE INDEX " + indexName + "1 ON " + testTable  + " (v1) ";
+        stmt.execute(ddl);
+        ddl = "CREATE INDEX " + indexName + "2 ON " + testTable  + " (v2) ";
+        stmt.execute(ddl);
+        ddl = "CREATE INDEX " + indexName + "3 ON " + testTable  + " (v3)";
+        stmt.execute(ddl);
+        conn.createStatement().execute("ALTER INDEX "+indexName+"1 ON " + 
testTable +" DISABLE ");
+        conn.createStatement().execute("ALTER INDEX "+indexName+"2 ON " + 
testTable +" REBUILD ");
+        conn.createStatement().execute("ALTER INDEX "+indexName+"3 ON " + 
testTable +" REBUILD ASYNC");
+        
+        ResultSet rs = conn.createStatement().executeQuery(
+            "select table_name, " + 
PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + " " +
+            "from system.catalog (" + 
PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + " " + 
PLong.INSTANCE.getSqlTypeName() + ") " +
+            "where " + PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + " !=0 
and table_name like 'R_ASYNCIND_%' " +
+            "order by table_name");
+        assertTrue(rs.next());
+        assertEquals(indexName + "3", rs.getString(1));
+        long asyncTimestamp = rs.getLong(2);
+               assertTrue("Async timestamp is recent timestamp", 
asyncTimestamp > startTimestamp);
+        PTable table = PhoenixRuntime.getTable(conn, indexName+"3");
+        assertEquals(table.getTimeStamp(), asyncTimestamp);
+        assertFalse(rs.next());
+        conn.createStatement().execute("ALTER INDEX "+indexName+"3 ON " + 
testTable +" DISABLE");
+        rs = conn.createStatement().executeQuery(
+                "select table_name, " + 
PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + " " +
+                "from system.catalog (" + 
PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + " " + 
PLong.INSTANCE.getSqlTypeName() + ") " +
+                "where " + PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + " 
!=0 and table_name like 'ASYNCIND_%' " +
+                "order by table_name" );
+        assertFalse(rs.next());
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/068c1cd9/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
index 5ec9c24..60e847e 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
@@ -153,7 +153,7 @@ public class MutableIndexFailureIT extends BaseTest {
             rs = conn.createStatement().executeQuery(query);
             assertFalse(rs.next());
 
-            FAIL_WRITE = false;
+            FailingRegionObserver.FAIL_WRITE = false;
             conn.createStatement().execute(
                     "CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + 
indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)");
             conn.createStatement().execute(
@@ -193,7 +193,7 @@ public class MutableIndexFailureIT extends BaseTest {
             assertEquals("z", rs.getString(2));
             assertFalse(rs.next());
 
-            FAIL_WRITE = true;
+            FailingRegionObserver.FAIL_WRITE = true;
             updateTable(conn, fullTableName);
             updateTable(conn, secondTableName);
             // Verify the metadata for index is correct.
@@ -250,7 +250,7 @@ public class MutableIndexFailureIT extends BaseTest {
             }
 
             // re-enable index table
-            FAIL_WRITE = false;
+            FailingRegionObserver.FAIL_WRITE = false;
             waitForIndexToBeActive(conn,indexName);
             waitForIndexToBeActive(conn,secondIndexName);
 
@@ -381,6 +381,8 @@ public class MutableIndexFailureIT extends BaseTest {
     }
 
     public static class FailingRegionObserver extends SimpleRegionObserver {
+        public static volatile boolean FAIL_WRITE = false;
+        public static final String INDEX_NAME = "IDX";
         @Override
         public void 
preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, 
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
             if 
(c.getEnvironment().getRegionInfo().getTable().getNameAsString().contains(INDEX_NAME)
 && FAIL_WRITE) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/068c1cd9/phoenix-core/src/main/antlr3/PhoenixSQL.g
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/antlr3/PhoenixSQL.g 
b/phoenix-core/src/main/antlr3/PhoenixSQL.g
index 3e09766..07a51ce 100644
--- a/phoenix-core/src/main/antlr3/PhoenixSQL.g
+++ b/phoenix-core/src/main/antlr3/PhoenixSQL.g
@@ -567,8 +567,8 @@ drop_index_node returns [DropIndexStatement ret]
 
 // Parse a alter index statement
 alter_index_node returns [AlterIndexStatement ret]
-    : ALTER INDEX (IF ex=EXISTS)? i=index_name ON t=from_table_name s=(USABLE 
| UNUSABLE | REBUILD | DISABLE | ACTIVE)
-      {ret = factory.alterIndex(factory.namedTable(null, 
TableName.create(t.getSchemaName(), i.getName())), t.getTableName(), ex!=null, 
PIndexState.valueOf(SchemaUtil.normalizeIdentifier(s.getText()))); }
+    : ALTER INDEX (IF ex=EXISTS)? i=index_name ON t=from_table_name s=(USABLE 
| UNUSABLE | REBUILD | DISABLE | ACTIVE) (async=ASYNC)?
+      {ret = factory.alterIndex(factory.namedTable(null, 
TableName.create(t.getSchemaName(), i.getName())), t.getTableName(), ex!=null, 
PIndexState.valueOf(SchemaUtil.normalizeIdentifier(s.getText())), async!=null); 
}
     ;
 
 // Parse a trace statement.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/068c1cd9/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index dc3cbd8..76bda44 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -3296,11 +3296,13 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
             List<Cell> newKVs = 
tableMetadata.get(0).getFamilyCellMap().get(TABLE_FAMILY_BYTES);
             Cell newKV = null;
             int disableTimeStampKVIndex = -1;
+            int indexStateKVIndex = 0;
             int index = 0;
             for(Cell cell : newKVs){
                 if(Bytes.compareTo(cell.getQualifierArray(), 
cell.getQualifierOffset(), cell.getQualifierLength(),
                       INDEX_STATE_BYTES, 0, INDEX_STATE_BYTES.length) == 0){
                   newKV = cell;
+                  indexStateKVIndex = index;
                 } else if (Bytes.compareTo(cell.getQualifierArray(), 
cell.getQualifierOffset(), cell.getQualifierLength(),
                   INDEX_DISABLE_TIMESTAMP_BYTES, 0, 
INDEX_DISABLE_TIMESTAMP_BYTES.length) == 0){
                   disableTimeStampKVIndex = index;
@@ -3378,11 +3380,11 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                 if ((currentState == PIndexState.UNUSABLE && newState == 
PIndexState.ACTIVE)
                         || (currentState == PIndexState.ACTIVE && newState == 
PIndexState.UNUSABLE)) {
                     newState = PIndexState.INACTIVE;
-                    newKVs.set(0, KeyValueUtil.newKeyValue(key, 
TABLE_FAMILY_BYTES,
+                    newKVs.set(indexStateKVIndex, 
KeyValueUtil.newKeyValue(key, TABLE_FAMILY_BYTES,
                         INDEX_STATE_BYTES, timeStamp, 
Bytes.toBytes(newState.getSerializedValue())));
                 } else if (currentState == PIndexState.INACTIVE && newState == 
PIndexState.USABLE) {
                     newState = PIndexState.ACTIVE;
-                    newKVs.set(0, KeyValueUtil.newKeyValue(key, 
TABLE_FAMILY_BYTES,
+                    newKVs.set(indexStateKVIndex, 
KeyValueUtil.newKeyValue(key, TABLE_FAMILY_BYTES,
                         INDEX_STATE_BYTES, timeStamp, 
Bytes.toBytes(newState.getSerializedValue())));
                 }
 
@@ -3414,7 +3416,8 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                     if(dataTableKey != null) {
                         metaDataCache.invalidate(new 
ImmutableBytesPtr(dataTableKey));
                     }
-                    if (setRowKeyOrderOptimizableCell || 
disableTimeStampKVIndex != -1) {
+                    if (setRowKeyOrderOptimizableCell || 
disableTimeStampKVIndex != -1
+                            || currentState == PIndexState.DISABLE || newState 
== PIndexState.BUILDING) {
                         returnTable = doGetTable(key, 
HConstants.LATEST_TIMESTAMP, rowLock);
                     }
                 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/068c1cd9/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
index e790b59..a60de03 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
@@ -17,12 +17,19 @@
  */
 package org.apache.phoenix.coprocessor;
 
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
 import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
 
 import java.io.IOException;
+import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -45,6 +52,7 @@ import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
@@ -69,6 +77,7 @@ import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.MetaDataClient;
 import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableNotFoundException;
@@ -97,6 +106,7 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
     private boolean enableRebuildIndex = 
QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD;
     private long rebuildIndexTimeInterval = 
QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL;
     private boolean blockWriteRebuildIndex = false;
+    private static Map<PName, Long> batchExecutedPerTableMap = new 
HashMap<PName, Long>();
 
     @Override
     public void preClose(final ObserverContext<RegionCoprocessorEnvironment> c,
@@ -125,6 +135,7 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
             
QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL);
         blockWriteRebuildIndex = 
env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE,
                QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE);
+        
     }
     
     @Override
@@ -195,9 +206,15 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
         // running
         private final static AtomicInteger inProgress = new AtomicInteger(0);
         RegionCoprocessorEnvironment env;
+        private long rebuildIndexBatchSize = HConstants.LATEST_TIMESTAMP;
+        private long configuredBatches = 10;
 
         public BuildIndexScheduleTask(RegionCoprocessorEnvironment env) {
             this.env = env;
+            this.rebuildIndexBatchSize = env.getConfiguration().getLong(
+                    QueryServices.INDEX_FAILURE_HANDLING_REBUILD_PERIOD, 
HConstants.LATEST_TIMESTAMP);
+            this.configuredBatches = env.getConfiguration().getLong(
+                    
QueryServices.INDEX_FAILURE_HANDLING_REBUILD_NUMBER_OF_BATCHES_PER_TABLE, 
configuredBatches);
         }
 
         @Override
@@ -228,6 +245,7 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
                     PhoenixDatabaseMetaData.INDEX_STATE_BYTES);
                 scan.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                     PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES);
+                PreparedStatement updateDisabledTimeStampSmt = null;
 
                 Map<PTable, List<PTable>> dataTableToIndexesMap = null;
                 MetaDataClient client = null;
@@ -243,8 +261,13 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
                     Result r = Result.create(results);
                     byte[] disabledTimeStamp = 
r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                         PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES);
+                    byte[] indexState = 
r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                            PhoenixDatabaseMetaData.INDEX_STATE_BYTES);
+
+                    if (disabledTimeStamp == null || disabledTimeStamp.length 
== 0 || (indexState != null
+                            && PIndexState.BUILDING == 
PIndexState.fromSerializedValue(Bytes.toString(indexState)))) {
 
-                    if (disabledTimeStamp == null || disabledTimeStamp.length 
== 0) {
+                        // Don't rebuild the building index , because they are 
marked for aysnc
                         continue;
                     }
 
@@ -255,8 +278,6 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
                     }
                     byte[] dataTable = 
r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                         PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES);
-                    byte[] indexState = 
r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
-                        PhoenixDatabaseMetaData.INDEX_STATE_BYTES);
                     if ((dataTable == null || dataTable.length == 0) || 
(indexState == null || indexState.length == 0)) {
                         // data table name can't be empty
                         continue;
@@ -317,109 +338,169 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
                     indexesToPartiallyRebuild.add(indexPTable);
                 } while (hasMore);
 
-                if (dataTableToIndexesMap != null) {
-                    long overlapTime = env.getConfiguration().getLong(
-                            
QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB,
-                            
QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME);
-                    for (Map.Entry<PTable, List<PTable>> entry : 
dataTableToIndexesMap.entrySet()) {
-                        PTable dataPTable = entry.getKey();
-                        List<PTable> indexesToPartiallyRebuild = 
entry.getValue();
-                        try {
-                            long earliestDisableTimestamp = Long.MAX_VALUE;
-                            List<IndexMaintainer> maintainers = Lists
-                                    
.newArrayListWithExpectedSize(indexesToPartiallyRebuild.size());
-                            for (PTable index : indexesToPartiallyRebuild) {
-                                long disabledTimeStampVal = 
index.getIndexDisableTimestamp();
-                                if (disabledTimeStampVal > 0) {
-                                    if (disabledTimeStampVal < 
earliestDisableTimestamp) {
-                                        earliestDisableTimestamp = 
disabledTimeStampVal;
-                                    }
-    
-                                    
maintainers.add(index.getIndexMaintainer(dataPTable, conn));
-                                }
-                            }
-                            // No indexes are disabled, so skip this table
-                            if (earliestDisableTimestamp == Long.MAX_VALUE) {
-                                continue;
-                            }
-
-                            long timeStamp = Math.max(0, 
earliestDisableTimestamp - overlapTime);
-                            LOG.info("Starting to build " + dataPTable + " 
indexes " + indexesToPartiallyRebuild
-                                    + " from timestamp=" + timeStamp);
-                            TableRef tableRef = new TableRef(null, dataPTable, 
HConstants.LATEST_TIMESTAMP, false);
-                            // TODO Need to set high timeout 
-                            PostDDLCompiler compiler = new 
PostDDLCompiler(conn);
-                            MutationPlan plan = 
compiler.compile(Collections.singletonList(tableRef), null, null, null,
-                                    HConstants.LATEST_TIMESTAMP);
-                            Scan dataTableScan = 
IndexManagementUtil.newLocalStateScan(plan.getContext().getScan(),
-                                    maintainers);
-                            dataTableScan.setTimeRange(timeStamp, 
HConstants.LATEST_TIMESTAMP);
-                            dataTableScan.setCacheBlocks(false);
-                            
dataTableScan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, 
TRUE_BYTES);
-                            
-                            ImmutableBytesWritable indexMetaDataPtr = new 
ImmutableBytesWritable(
-                                    ByteUtil.EMPTY_BYTE_ARRAY);
-                            IndexMaintainer.serializeAdditional(dataPTable, 
indexMetaDataPtr, indexesToPartiallyRebuild,
-                                    conn);
-                            byte[] attribValue = 
ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
-
-                            
dataTableScan.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
-                            MutationState mutationState = plan.execute();
-                            long rowCount = mutationState.getUpdateCount();
-                            LOG.info(rowCount + " rows of index which are 
rebuild");
-                            for (PTable indexPTable : 
indexesToPartiallyRebuild) {
-                                String indexTableFullName = 
SchemaUtil.getTableName(indexPTable.getSchemaName()
-                                        .getString(), 
indexPTable.getTableName().getString());
-                                updateIndexState(conn, indexTableFullName, 
env, PIndexState.INACTIVE, PIndexState.ACTIVE);
-                            }
-                        } catch (Exception e) { // Log, but try next table's 
indexes
-                            LOG.warn("Unable to rebuild " + dataPTable + " 
indexes " + indexesToPartiallyRebuild
-                                    + ". Will try again next on next scheduled 
invocation.", e);
-                        }
-                    }
-                }
-            } catch (Throwable t) {
-                LOG.warn("ScheduledBuildIndexTask failed!", t);
-            } finally {
-                inProgress.decrementAndGet();
-                if (scanner != null) {
-                    try {
-                        scanner.close();
-                    } catch (IOException ignored) {
-                        LOG.debug("ScheduledBuildIndexTask can't close 
scanner.", ignored);
-                    }
-                }
-                if (conn != null) {
-                    try {
-                        conn.close();
-                    } catch (SQLException ignored) {
-                        LOG.debug("ScheduledBuildIndexTask can't close 
connection", ignored);
-                    }
-                }
-            }
+                               if (dataTableToIndexesMap != null) {
+                                       long overlapTime = 
env.getConfiguration().getLong(
+                                                       
QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB,
+                                                       
QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME);
+                                       for (Map.Entry<PTable, List<PTable>> 
entry : dataTableToIndexesMap.entrySet()) {
+                                               PTable dataPTable = 
entry.getKey();
+                                               List<PTable> 
indexesToPartiallyRebuild = entry.getValue();
+                                               ReadOnlyProps props = new 
ReadOnlyProps(env.getConfiguration().iterator());
+                                               try (HTableInterface metaTable 
= env.getTable(
+                                                               
SchemaUtil.getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, 
props))) {
+                                                       long 
earliestDisableTimestamp = Long.MAX_VALUE;
+                                                       List<IndexMaintainer> 
maintainers = Lists
+                                                                       
.newArrayListWithExpectedSize(indexesToPartiallyRebuild.size());
+                                                       for (PTable index : 
indexesToPartiallyRebuild) {
+                                                               long 
disabledTimeStampVal = index.getIndexDisableTimestamp();
+                                                               if 
(disabledTimeStampVal > 0) {
+                                                                       if 
(disabledTimeStampVal < earliestDisableTimestamp) {
+                                                                               
earliestDisableTimestamp = disabledTimeStampVal;
+                                                                       }
+
+                                                                       
maintainers.add(index.getIndexMaintainer(dataPTable, conn));
+                                                               }
+                                                       }
+                                                       // No indexes are 
disabled, so skip this table
+                                                       if 
(earliestDisableTimestamp == Long.MAX_VALUE) {
+                                                               continue;
+                                                       }
+                                                       long timeStamp = 
Math.max(0, earliestDisableTimestamp - overlapTime);
+                                                       LOG.info("Starting to 
build " + dataPTable + " indexes " + indexesToPartiallyRebuild
+                                                                       + " 
from timestamp=" + timeStamp);
+
+                                                       TableRef tableRef = new 
TableRef(null, dataPTable, HConstants.LATEST_TIMESTAMP, false);
+                                                       // TODO Need to set 
high timeout
+                                                       PostDDLCompiler 
compiler = new PostDDLCompiler(conn);
+                                                       MutationPlan plan = 
compiler.compile(Collections.singletonList(tableRef), null, null, null,
+                                                                       
HConstants.LATEST_TIMESTAMP);
+                                                       Scan dataTableScan = 
IndexManagementUtil.newLocalStateScan(plan.getContext().getScan(),
+                                                                       
maintainers);
+
+                                                       long scanEndTime = 
getTimestampForBatch(timeStamp,
+                                                                       
batchExecutedPerTableMap.get(dataPTable.getName()));
+                                                       
dataTableScan.setTimeRange(timeStamp, scanEndTime);
+                                                       
dataTableScan.setCacheBlocks(false);
+                                                       
dataTableScan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, 
TRUE_BYTES);
+
+                                                       ImmutableBytesWritable 
indexMetaDataPtr = new ImmutableBytesWritable(
+                                                                       
ByteUtil.EMPTY_BYTE_ARRAY);
+                                                       
IndexMaintainer.serializeAdditional(dataPTable, indexMetaDataPtr, 
indexesToPartiallyRebuild,
+                                                                       conn);
+                                                       byte[] attribValue = 
ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
+                                                       
dataTableScan.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
+                                                       MutationState 
mutationState = plan.execute();
+                                                       long rowCount = 
mutationState.getUpdateCount();
+                                                       LOG.info(rowCount + " 
rows of index which are rebuild");
+                                                       for (PTable indexPTable 
: indexesToPartiallyRebuild) {
+                                                               String 
indexTableFullName = SchemaUtil.getTableName(
+                                                                               
indexPTable.getSchemaName().getString(),
+                                                                               
indexPTable.getTableName().getString());
+                                                               if (scanEndTime 
== HConstants.LATEST_TIMESTAMP) {
+                                                                       
updateIndexState(conn, indexTableFullName, env, PIndexState.INACTIVE,
+                                                                               
        PIndexState.ACTIVE);
+                                                                       
batchExecutedPerTableMap.remove(dataPTable.getName());
+                                                               } else {
+
+                                                                       
updateDisableTimestamp(conn, indexTableFullName, env, scanEndTime, metaTable);
+                                                                       Long 
noOfBatches = batchExecutedPerTableMap.get(dataPTable.getName());
+                                                                       if 
(noOfBatches == null) {
+                                                                               
noOfBatches = 0l;
+                                                                       }
+                                                                       
batchExecutedPerTableMap.put(dataPTable.getName(), ++noOfBatches);
+                                                                       // 
clearing cache to get the updated
+                                                                       // 
disabled timestamp
+                                                                       new 
MetaDataClient(conn).updateCache(dataPTable.getSchemaName().getString(),
+                                                                               
        dataPTable.getTableName().getString());
+                                                                       new 
MetaDataClient(conn).updateCache(indexPTable.getSchemaName().getString(),
+                                                                               
        indexPTable.getTableName().getString());
+                                                                       
LOG.info(
+                                                                               
        "During Round-robin build: Successfully updated index disabled 
timestamp  for "
+                                                                               
                        + indexTableFullName + " to " + scanEndTime);
+                                                               }
+
+                                                       }
+                                               } catch (Exception e) { // Log, 
but try next table's
+                                                                               
                // indexes
+                                                       LOG.warn("Unable to 
rebuild " + dataPTable + " indexes " + indexesToPartiallyRebuild
+                                                                       + ". 
Will try again next on next scheduled invocation.", e);
+                                               }
+                                       }
+                               }
+                       } catch (Throwable t) {
+                               LOG.warn("ScheduledBuildIndexTask failed!", t);
+                       } finally {
+                               inProgress.decrementAndGet();
+                               if (scanner != null) {
+                                       try {
+                                               scanner.close();
+                                       } catch (IOException ignored) {
+                                               
LOG.debug("ScheduledBuildIndexTask can't close scanner.", ignored);
+                                       }
+                               }
+                               if (conn != null) {
+                                       try {
+                                               conn.close();
+                                       } catch (SQLException ignored) {
+                                               
LOG.debug("ScheduledBuildIndexTask can't close connection", ignored);
+                                       }
+                               }
+                       }
         }
-    }
-    
-    private static void updateIndexState(PhoenixConnection conn, String 
indexTableName, RegionCoprocessorEnvironment env, PIndexState oldState,
-            PIndexState newState) throws ServiceException, Throwable {
-        byte[] indexTableKey = 
SchemaUtil.getTableKeyFromFullName(indexTableName);
-        String schemaName = 
SchemaUtil.getSchemaNameFromFullName(indexTableName);
-        String indexName = SchemaUtil.getTableNameFromFullName(indexTableName);
-        // Mimic the Put that gets generated by the client on an update of the 
index state
-        Put put = new Put(indexTableKey);
-        put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.INDEX_STATE_BYTES,
-                newState.getSerializedBytes());
-        if (newState == PIndexState.ACTIVE) {
-            put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES,
-                    PLong.INSTANCE.toBytes(0));
+
+        private long getTimestampForBatch(long disabledTimeStamp, Long 
noOfBatches) {
+            if (disabledTimeStamp < 0 || rebuildIndexBatchSize > 
(HConstants.LATEST_TIMESTAMP
+                    - disabledTimeStamp)) { return 
HConstants.LATEST_TIMESTAMP; }
+            long timestampForNextBatch = disabledTimeStamp + 
rebuildIndexBatchSize;
+                       if (timestampForNextBatch < 0 || timestampForNextBatch 
> System.currentTimeMillis()
+                                       || (noOfBatches != null && noOfBatches 
> configuredBatches)) {
+                               // if timestampForNextBatch cross current time 
, then we should
+                               // build the complete index
+                               timestampForNextBatch = 
HConstants.LATEST_TIMESTAMP;
+                       }
+            return timestampForNextBatch;
         }
-        final List<Mutation> tableMetadata = Collections.<Mutation> 
singletonList(put);
-        MetaDataMutationResult result = 
conn.getQueryServices().updateIndexState(tableMetadata, null);
-        MutationCode code = result.getMutationCode();
-        if (code == MutationCode.TABLE_NOT_FOUND) { throw new 
TableNotFoundException(schemaName, indexName); }
-        if (code == MutationCode.UNALLOWED_TABLE_MUTATION) { throw new 
SQLExceptionInfo.Builder(
-                SQLExceptionCode.INVALID_INDEX_STATE_TRANSITION)
-                .setMessage(" currentState=" + oldState + ". requestedState=" 
+ newState).setSchemaName(schemaName)
-                .setTableName(indexName).build().buildException(); }
     }
+    
+       private static void updateIndexState(PhoenixConnection conn, String 
indexTableName,
+                       RegionCoprocessorEnvironment env, PIndexState oldState, 
PIndexState newState)
+                                       throws ServiceException, Throwable {
+               byte[] indexTableKey = 
SchemaUtil.getTableKeyFromFullName(indexTableName);
+               String schemaName = 
SchemaUtil.getSchemaNameFromFullName(indexTableName);
+               String indexName = 
SchemaUtil.getTableNameFromFullName(indexTableName);
+               // Mimic the Put that gets generated by the client on an update 
of the
+               // index state
+               Put put = new Put(indexTableKey);
+               put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.INDEX_STATE_BYTES,
+                               newState.getSerializedBytes());
+               if (newState == PIndexState.ACTIVE) {
+                       
put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                                       
PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES, 
PLong.INSTANCE.toBytes(0));
+                       
put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                                       
PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP_BYTES, 
PLong.INSTANCE.toBytes(0));
+               }
+               final List<Mutation> tableMetadata = Collections.<Mutation> 
singletonList(put);
+               MetaDataMutationResult result = 
conn.getQueryServices().updateIndexState(tableMetadata, null);
+               MutationCode code = result.getMutationCode();
+               if (code == MutationCode.TABLE_NOT_FOUND) {
+                       throw new TableNotFoundException(schemaName, indexName);
+               }
+               if (code == MutationCode.UNALLOWED_TABLE_MUTATION) {
+                       throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_INDEX_STATE_TRANSITION)
+                                       .setMessage(" currentState=" + oldState 
+ ". requestedState=" + newState).setSchemaName(schemaName)
+                                       
.setTableName(indexName).build().buildException();
+               }
+       }
+
+       private static void updateDisableTimestamp(PhoenixConnection conn, 
String indexTableName,
+                       RegionCoprocessorEnvironment env, long 
disabledTimestamp, HTableInterface metaTable) throws IOException {
+               byte[] indexTableKey = 
SchemaUtil.getTableKeyFromFullName(indexTableName);
+               Put put = new Put(indexTableKey);
+               put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES,
+                               PLong.INSTANCE.toBytes(disabledTimestamp));
+               metaTable.checkAndPut(indexTableKey, 
PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                               PhoenixDatabaseMetaData.INDEX_STATE_BYTES, 
CompareOp.EQUAL, PIndexState.INACTIVE.getSerializedBytes(),
+                               put);
+
+       }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/068c1cd9/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java 
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index fb4e3c3..fde403c 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -421,7 +421,8 @@ public enum SQLExceptionCode {
             724, "43M07", "Schema name not allowed!!"), 
CREATE_SCHEMA_NOT_ALLOWED(725, "43M08",
                     "Cannot create schema because config " + 
QueryServices.IS_NAMESPACE_MAPPING_ENABLED
                             + " for enabling name space mapping isn't 
enabled."), INCONSISTENET_NAMESPACE_MAPPING_PROPERTIES(
-                                    726, "43M10", " Inconsistent namespace 
mapping properites..");
+                                    726, "43M10", " Inconsistent namespace 
mapping properites.."), ASYNC_NOT_ALLOWED(
+                                    727, "43M11", " ASYNC option is not 
allowed.. ");
 
     private final int errorCode;
     private final String sqlState;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/068c1cd9/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
index eb73d6b..e515dbb 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
@@ -35,30 +35,21 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
-import org.apache.hadoop.hbase.ipc.ServerRpcController;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
-import 
org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse;
-import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService;
-import 
org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest;
 import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
 import org.apache.phoenix.hbase.index.write.DelegateIndexFailurePolicy;
 import org.apache.phoenix.hbase.index.write.KillServerOnFailurePolicy;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
-import org.apache.phoenix.protobuf.ProtobufUtil;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
-import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
@@ -153,47 +144,10 @@ public class PhoenixIndexFailurePolicy extends 
DelegateIndexFailurePolicy {
             String indexTableName = tableTimeElement.getKey();
             long minTimeStamp = tableTimeElement.getValue();
             // Disable the index by using the updateIndexState method of 
MetaDataProtocol end point coprocessor.
-            byte[] indexTableKey = 
SchemaUtil.getTableKeyFromFullName(indexTableName);
             HTableInterface systemTable = env.getTable(SchemaUtil
                     
.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, 
env.getConfiguration()));
-            // Mimic the Put that gets generated by the client on an update of 
the index state
-            Put put = new Put(indexTableKey);
-            if (blockWriteRebuildIndex) 
-                put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.INDEX_STATE_BYTES,
-                        PIndexState.ACTIVE.getSerializedBytes());
-            else  
-                put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.INDEX_STATE_BYTES,
-                        PIndexState.DISABLE.getSerializedBytes());
-            put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES,
-                PLong.INSTANCE.toBytes(minTimeStamp));
-            final List<Mutation> tableMetadata = 
Collections.<Mutation>singletonList(put);
-
-            final Map<byte[], MetaDataResponse> results =
-                    systemTable.coprocessorService(MetaDataService.class, 
indexTableKey, indexTableKey,
-                            new Batch.Call<MetaDataService, 
MetaDataResponse>() {
-                                @Override
-                                public MetaDataResponse call(MetaDataService 
instance) throws IOException {
-                                    ServerRpcController controller = new 
ServerRpcController();
-                                    BlockingRpcCallback<MetaDataResponse> 
rpcCallback =
-                                            new 
BlockingRpcCallback<MetaDataResponse>();
-                                    UpdateIndexStateRequest.Builder builder = 
UpdateIndexStateRequest.newBuilder();
-                                    for (Mutation m : tableMetadata) {
-                                        MutationProto mp = 
ProtobufUtil.toProto(m);
-                                        
builder.addTableMetadataMutations(mp.toByteString());
-                                    }
-                                    instance.updateIndexState(controller, 
builder.build(), rpcCallback);
-                                    if (controller.getFailedOn() != null) {
-                                        throw controller.getFailedOn();
-                                    }
-                                    return rpcCallback.get();
-                                }
-                            });
-            if (results.isEmpty()) {
-                throw new IOException("Didn't get expected result size");
-            }
-            MetaDataResponse tmpResponse = results.values().iterator().next();
-            MetaDataMutationResult result = 
MetaDataMutationResult.constructFromProto(tmpResponse);
-
+            MetaDataMutationResult result = 
IndexUtil.disableIndexWithTimestamp(indexTableName, minTimeStamp,
+                    systemTable, blockWriteRebuildIndex);
             if (result.getMutationCode() == MutationCode.TABLE_NOT_FOUND) {
                 LOG.info("Index " + indexTableName + " has been dropped. 
Ignore uncommitted mutations");
                 continue;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/068c1cd9/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 54080d1..5142b57 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -279,6 +279,8 @@ public class PhoenixDatabaseMetaData implements 
DatabaseMetaData {
     public static final String LAST_STATS_UPDATE_TIME = 
"LAST_STATS_UPDATE_TIME";
     public static final byte[] LAST_STATS_UPDATE_TIME_BYTES = 
Bytes.toBytes(LAST_STATS_UPDATE_TIME);
     public static final String GUIDE_POST_KEY = "GUIDE_POST_KEY";
+    public static final String ASYNC_REBUILD_TIMESTAMP = 
"ASYNC_REBUILD_TIMESTAMP";
+    public static final byte[] ASYNC_REBUILD_TIMESTAMP_BYTES = 
Bytes.toBytes(ASYNC_REBUILD_TIMESTAMP);
 
     public static final String PARENT_TENANT_ID = "PARENT_TENANT_ID";
     public static final byte[] PARENT_TENANT_ID_BYTES = 
Bytes.toBytes(PARENT_TENANT_ID);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/068c1cd9/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index d57c250..f3c6d30 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -989,8 +989,8 @@ public class PhoenixStatement implements Statement, 
SQLCloseable {
 
     private static class ExecutableAlterIndexStatement extends 
AlterIndexStatement implements CompilableStatement {
 
-        public ExecutableAlterIndexStatement(NamedTableNode indexTableNode, 
String dataTableName, boolean ifExists, PIndexState state) {
-            super(indexTableNode, dataTableName, ifExists, state);
+        public ExecutableAlterIndexStatement(NamedTableNode indexTableNode, 
String dataTableName, boolean ifExists, PIndexState state, boolean async) {
+            super(indexTableNode, dataTableName, ifExists, state, async);
         }
 
         @SuppressWarnings("unchecked")
@@ -1302,8 +1302,8 @@ public class PhoenixStatement implements Statement, 
SQLCloseable {
         }
         
         @Override
-        public AlterIndexStatement alterIndex(NamedTableNode indexTableNode, 
String dataTableName, boolean ifExists, PIndexState state) {
-            return new ExecutableAlterIndexStatement(indexTableNode, 
dataTableName, ifExists, state);
+        public AlterIndexStatement alterIndex(NamedTableNode indexTableNode, 
String dataTableName, boolean ifExists, PIndexState state, boolean async) {
+            return new ExecutableAlterIndexStatement(indexTableNode, 
dataTableName, ifExists, state, async);
         }
 
         @Override

Reply via email to