PHOENIX-2221 Option to make data regions not writable when index regions are 
not available (Alicia Ying Shu, James Taylor)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e2a6386f
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e2a6386f
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e2a6386f

Branch: refs/heads/calcite
Commit: e2a6386f3b9343aec74c5f96f0e0124e80b9f8b1
Parents: 6881aef
Author: James Taylor <jtay...@salesforce.com>
Authored: Sun Feb 14 09:06:14 2016 -0800
Committer: James Taylor <jtay...@salesforce.com>
Committed: Mon Feb 15 00:33:18 2016 -0800

----------------------------------------------------------------------
 .../end2end/index/MutableIndexFailureIT.java    |  31 +-
 .../end2end/index/ReadOnlyIndexFailureIT.java   | 289 +++++++++++++++++++
 .../apache/phoenix/compile/FromCompiler.java    |   2 +-
 .../apache/phoenix/compile/JoinCompiler.java    |   2 +-
 .../compile/TupleProjectionCompiler.java        |   4 +-
 .../apache/phoenix/compile/UnionCompiler.java   |   2 +-
 .../coprocessor/MetaDataEndpointImpl.java       |  92 +++---
 .../coprocessor/MetaDataRegionObserver.java     |  27 +-
 .../coprocessor/generated/PTableProtos.java     | 103 ++++++-
 .../phoenix/exception/SQLExceptionCode.java     |   2 +
 .../apache/phoenix/execute/MutationState.java   |  39 ++-
 .../index/write/DelegateIndexFailurePolicy.java |  58 ++++
 .../index/PhoenixIndexFailurePolicy.java        |  48 ++-
 .../org/apache/phoenix/query/QueryServices.java |   3 +
 .../phoenix/query/QueryServicesOptions.java     |   1 +
 .../apache/phoenix/schema/DelegateTable.java    |   5 +
 .../apache/phoenix/schema/MetaDataClient.java   |  38 +--
 .../java/org/apache/phoenix/schema/PTable.java  |   1 +
 .../org/apache/phoenix/schema/PTableImpl.java   |  51 ++--
 .../phoenix/execute/CorrelatePlanTest.java      |   2 +-
 phoenix-protocol/src/main/PTable.proto          |   1 +
 21 files changed, 660 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
index 5f39515..176c5a0 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
@@ -172,7 +172,7 @@ public class MutableIndexFailureIT extends 
BaseOwnClusterHBaseManagedTimeIT {
             TableName indexTable =
                     TableName.valueOf(localIndex ? MetaDataUtil
                             .getLocalIndexTableName(fullTableName) : 
fullIndexName);
-            HBaseAdmin admin = this.getUtility().getHBaseAdmin();
+            HBaseAdmin admin = getUtility().getHBaseAdmin();
             HTableDescriptor indexTableDesc = 
admin.getTableDescriptor(indexTable);
             try{
                 admin.disableTable(indexTable);
@@ -184,20 +184,10 @@ public class MutableIndexFailureIT extends 
BaseOwnClusterHBaseManagedTimeIT {
             stmt.setString(2, "x2");
             stmt.setString(3, "2");
             stmt.execute();
-            if (transactional) {
-                try {
-                    conn.commit();
-                    fail();
-                } catch (SQLException e) {
-                    conn.rollback();
-                }
-            }
-            else {
-                try {
-                    conn.commit();
-                    fail();
-                } catch (SQLException e) {
-                }
+            try {
+                conn.commit();
+                fail();
+            } catch (SQLException e) {
             }
 
             // Verify the metadata for index is correct.
@@ -341,9 +331,9 @@ public class MutableIndexFailureIT extends 
BaseOwnClusterHBaseManagedTimeIT {
             // find a RS which doesn't has CATALOG table
             TableName catalogTable = TableName.valueOf("SYSTEM.CATALOG");
             TableName indexTable = TableName.valueOf(fullIndexName);
-            final HBaseCluster cluster = this.getUtility().getHBaseCluster();
+            final HBaseCluster cluster = getUtility().getHBaseCluster();
             Collection<ServerName> rss = 
cluster.getClusterStatus().getServers();
-            HBaseAdmin admin = this.getUtility().getHBaseAdmin();
+            HBaseAdmin admin = getUtility().getHBaseAdmin();
             List<HRegionInfo> regions = admin.getTableRegions(catalogTable);
             ServerName catalogRS = 
cluster.getServerHoldingRegion(regions.get(0).getTable(),
                     regions.get(0).getRegionName());
@@ -363,7 +353,7 @@ public class MutableIndexFailureIT extends 
BaseOwnClusterHBaseManagedTimeIT {
             final HRegionInfo indexRegion = regions.get(0);
             final ServerName dstRS = rsToBeKilled;
             admin.move(indexRegion.getEncodedNameAsBytes(), 
Bytes.toBytes(rsToBeKilled.getServerName()));
-            this.getUtility().waitFor(30000, 200, new 
Waiter.Predicate<Exception>() {
+            getUtility().waitFor(30000, 200, new Waiter.Predicate<Exception>() 
{
                 @Override
                 public boolean evaluate() throws Exception {
                     ServerName sn = 
cluster.getServerHoldingRegion(indexRegion.getTable(),
@@ -379,10 +369,10 @@ public class MutableIndexFailureIT extends 
BaseOwnClusterHBaseManagedTimeIT {
             Thread.sleep(100);
 
             // kill RS hosting index table
-            this.getUtility().getHBaseCluster().killRegionServer(rsToBeKilled);
+            getUtility().getHBaseCluster().killRegionServer(rsToBeKilled);
 
             // wait for index table completes recovery
-            this.getUtility().waitUntilAllRegionsAssigned(indexTable);
+            getUtility().waitUntilAllRegionsAssigned(indexTable);
 
             // Verify the metadata for index is correct.       
             do {
@@ -413,6 +403,7 @@ public class MutableIndexFailureIT extends 
BaseOwnClusterHBaseManagedTimeIT {
             this.fullTableName = fullTableName;
         }
 
+        @Override
         public void run() {
             if(inProgress.get() > 0){
                 return;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java
new file mode 100644
index 0000000..8df82ce
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.index;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.end2end.BaseOwnClusterHBaseManagedTimeIT;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.hbase.index.Indexer;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.StringUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.Maps;
+/**
+ * 
+ * Test for failure of region server to write to index table.
+ * For some reason dropping tables after running this test
+ * fails unless it runs its own mini cluster. 
+ * 
+ * 
+ * @since 2.1
+ */
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class ReadOnlyIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
+    private static final String FAIL_ON_FIRST_PUT = "bbb";
+
+    private String tableName;
+    private String indexName;
+    private String fullTableName;
+    private String fullIndexName;
+
+    public ReadOnlyIndexFailureIT() {
+        this.tableName = TestUtil.DEFAULT_DATA_TABLE_NAME;
+        this.indexName = "IDX";
+        this.fullTableName = 
SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+        this.fullIndexName = 
SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
+    }
+
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10);
+        serverProps.put("hbase.client.retries.number", "2");
+        serverProps.put("hbase.client.pause", "5000");
+        serverProps.put("hbase.balancer.period", 
String.valueOf(Integer.MAX_VALUE));
+        
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB,
 "0");
+        serverProps.put(QueryServices.INDEX_FAILURE_BLOCK_WRITE, "true");
+        serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, 
"true");
+        
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, 
"1000");
+        serverProps.put("hbase.coprocessor.region.classes", 
FailingRegionObserver.class.getName());
+        serverProps.put("hbase.coprocessor.abortonerror", "false");
+        serverProps.put(Indexer.CHECK_VERSION_CONF_KEY, "false");
+        Map<String, String> clientProps = 
+                Collections.singletonMap(QueryServices.TRANSACTIONS_ENABLED, 
"true");
+        NUM_SLAVES_BASE = 4;
+        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), 
+                new ReadOnlyProps(clientProps.entrySet().iterator()));
+    }
+
+    @Test
+    public void testWriteFailureReadOnlyLocalIndex() throws Exception {
+        helpTestWriteFailureReadOnlyIndex(true);
+    }
+
+    @Test
+    public void testWriteFailureReadOnlyIndex() throws Exception {
+        helpTestWriteFailureReadOnlyIndex(false);
+    }
+
+    public void helpTestWriteFailureReadOnlyIndex(boolean localIndex) throws 
Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = driver.connect(url, props)) {
+            String query;
+            ResultSet rs;
+            conn.setAutoCommit(false);
+            conn.createStatement().execute(
+                    "CREATE TABLE " + fullTableName + " (k VARCHAR NOT NULL 
PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
+            query = "SELECT * FROM " + fullTableName;
+            rs = conn.createStatement().executeQuery(query);
+            assertFalse(rs.next());
+
+            if(localIndex) {
+                conn.createStatement().execute(
+                        "CREATE LOCAL INDEX " + indexName + " ON " + 
fullTableName 
+                        + " (v1) INCLUDE (v2)");
+            } else {
+                conn.createStatement().execute(
+                        "CREATE INDEX " + indexName + " ON " + fullTableName 
+                        + " (v1) INCLUDE (v2)");
+            }
+
+            query = "SELECT * FROM " + fullIndexName;
+            rs = conn.createStatement().executeQuery(query);
+            assertFalse(rs.next());
+
+            // Verify the metadata for index is correct.
+            rs = conn.getMetaData().getTables(null, 
+                    StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), 
indexName,
+                    new String[] { PTableType.INDEX.toString() });
+            assertTrue(rs.next());
+            assertEquals(indexName, rs.getString(3));
+            assertEquals(PIndexState.ACTIVE.toString(), 
rs.getString("INDEX_STATE"));
+            assertFalse(rs.next());
+
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + 
fullTableName 
+                    + " VALUES(?,?,?)");
+            stmt.setString(1, "1");
+            stmt.setString(2, "aaa");
+            stmt.setString(3, "a1");
+            stmt.execute();
+            conn.commit();
+
+            stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " 
VALUES(?,?,?)");
+            stmt.setString(1, "2");
+            stmt.setString(2, FAIL_ON_FIRST_PUT);
+            stmt.setString(3, "b2");
+            stmt.execute();
+            try {
+                conn.commit();
+                fail();
+            } catch (SQLException e) {
+            }
+
+            // Only successfully committed row should be seen
+            query = "SELECT /*+ NO_INDEX*/ v1 FROM " + fullTableName;
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals("aaa", rs.getString(1));
+            assertFalse(rs.next());
+            
+            // Verify the metadata for index is correct.
+            rs = conn.getMetaData().getTables(null, 
+                    StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), 
indexName,
+                    new String[] { PTableType.INDEX.toString() });
+            assertTrue(rs.next());
+            assertEquals(indexName, rs.getString(3));
+            // the index is always active for tables upon index table write 
failure
+            assertEquals(PIndexState.ACTIVE.toString(), 
rs.getString("INDEX_STATE"));
+            assertFalse(rs.next());
+
+            // if the table is transactional the write to the index table will 
fail because the
+            // index has not been disabled
+            // Verify UPSERT on data table is blocked  after index write failed
+            stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " 
VALUES(?,?,?)");
+            stmt.setString(1, "3");
+            stmt.setString(2, "ccc");
+            stmt.setString(3, "3c");
+            try {
+                stmt.execute();
+                /* Writes would be blocked */
+                conn.commit();
+                fail();
+            } catch (SQLException e) {
+                
assertEquals(SQLExceptionCode.INDEX_FAILURE_BLOCK_WRITE.getErrorCode(), 
e.getErrorCode());
+            }
+
+            // Second attempt at writing will succeed
+            int retries = 0;
+            do {
+                Thread.sleep(5 * 1000); // sleep 5 secs
+                if(!hasIndexDisableTimestamp(conn, indexName)){
+                    break;
+                }
+                if (++retries == 5) {
+                    fail("Failed to rebuild index with allowed time");
+                }
+            } while(true);
+
+            // Verify UPSERT on data table still work after index table is 
recreated
+            stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " 
VALUES(?,?,?)");
+            stmt.setString(1, "4");
+            stmt.setString(2, "ddd");
+            stmt.setString(3, "4d");
+            stmt.execute();
+            conn.commit();
+
+            // verify index table has data
+            query = "SELECT count(1) FROM " + indexName;
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals(3, rs.getInt(1));
+            
+            query = "SELECT v1 FROM " + fullTableName;
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals("aaa", rs.getString(1));
+            assertTrue(rs.next());
+            assertEquals("bbb", rs.getString(1));
+            assertTrue(rs.next());
+            assertEquals("ddd", rs.getString(1));
+            assertFalse(rs.next());
+
+            query = "SELECT /*+ NO_INDEX*/ v1 FROM " + fullTableName;
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals("aaa", rs.getString(1));
+            assertTrue(rs.next());
+            assertEquals("bbb", rs.getString(1));
+            assertTrue(rs.next());
+            assertEquals("ddd", rs.getString(1));
+            assertFalse(rs.next());
+        }
+    }
+    
+    private static boolean hasIndexDisableTimestamp(Connection conn, String 
indexName) throws SQLException {
+        ResultSet rs = conn.createStatement().executeQuery("SELECT " + 
PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP +
+                " FROM " + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + 
+                " WHERE " + PhoenixDatabaseMetaData.COLUMN_NAME + " IS NULL" +
+                " AND " + PhoenixDatabaseMetaData.TENANT_ID + " IS NULL" +
+                " AND " + PhoenixDatabaseMetaData.TABLE_SCHEM + " IS NULL" +
+                " AND " + PhoenixDatabaseMetaData.TABLE_NAME +  " = '" + 
indexName + "'");
+        assertTrue(rs.next());
+        long ts = rs.getLong(1);
+        return (!rs.wasNull() && ts > 0);
+    }
+
+    public static class FailingRegionObserver extends SimpleRegionObserver {
+        private Integer failCount = new Integer(0);
+        
+        @Override
+        public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, 
Put put, WALEdit edit,
+                final Durability durability) throws HBaseIOException {
+            if (shouldFailUpsert(c, put)) {
+                synchronized (failCount) {
+                    failCount++;
+                    if (failCount.intValue() == 1) {
+                        // throwing anything other than instances of 
IOException result
+                        // in this coprocessor being unloaded
+                        // DoNotRetryIOException tells HBase not to retry this 
mutation
+                        // multiple times
+                        throw new DoNotRetryIOException();
+                    }
+                }
+            }
+        }
+        
+        private boolean 
shouldFailUpsert(ObserverContext<RegionCoprocessorEnvironment> c, Put put) {
+            return Bytes.contains(put.getRow(), 
Bytes.toBytes(FAIL_ON_FIRST_PUT));
+        }
+        
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
index dd93c81..ffe9621 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
@@ -652,7 +652,7 @@ public class FromCompiler {
                     PTableType.SUBQUERY, null, 
MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM,
                     null, null, columns, null, null, 
Collections.<PTable>emptyList(),
                     false, Collections.<PName>emptyList(), null, null, false, 
false, false, null,
-                    null, null, false, false, 0);
+                    null, null, false, false, 0, 0L);
 
             String alias = subselectNode.getAlias();
             TableRef tableRef = new TableRef(alias, t, 
MetaDataProtocol.MIN_TABLE_TIMESTAMP, false);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
index b64b9b7..5d03f57 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
@@ -1302,7 +1302,7 @@ public class JoinCompiler {
                 left.getBucketNum(), merged,left.getParentSchemaName(), 
left.getParentTableName(), left.getIndexes(),
                 left.isImmutableRows(), Collections.<PName>emptyList(), null, 
null, PTable.DEFAULT_DISABLE_WAL,
                 left.isMultiTenant(), left.getStoreNulls(), 
left.getViewType(), left.getViewIndexId(), left.getIndexType(),
-                left.rowKeyOrderOptimizable(), left.isTransactional(), 
left.getUpdateCacheFrequency());
+                left.rowKeyOrderOptimizable(), left.isTransactional(), 
left.getUpdateCacheFrequency(), left.getIndexDisableTimestamp());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
index 0fc6d74..4be78a9 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
@@ -152,7 +152,7 @@ public class TupleProjectionCompiler {
                 table.getBucketNum(), projectedColumns, 
table.getParentSchemaName(),
                 table.getParentName(), table.getIndexes(), 
table.isImmutableRows(), Collections.<PName>emptyList(), null, null,
                 table.isWALDisabled(), table.isMultiTenant(), 
table.getStoreNulls(), table.getViewType(), table.getViewIndexId(),
-                table.getIndexType(), table.rowKeyOrderOptimizable(), 
table.isTransactional(), table.getUpdateCacheFrequency());
+                table.getIndexType(), table.rowKeyOrderOptimizable(), 
table.isTransactional(), table.getUpdateCacheFrequency(), 
table.getIndexDisableTimestamp());
     }
 
     public static PTable createProjectedTable(TableRef tableRef, 
List<ColumnRef> sourceColumnRefs, boolean retainPKColumns) throws SQLException {
@@ -179,7 +179,7 @@ public class TupleProjectionCompiler {
                     retainPKColumns ? table.getBucketNum() : null, 
projectedColumns, null,
                     null, Collections.<PTable>emptyList(), 
table.isImmutableRows(), Collections.<PName>emptyList(), null, null,
                     table.isWALDisabled(), table.isMultiTenant(), 
table.getStoreNulls(), table.getViewType(), table.getViewIndexId(),
-                    null, table.rowKeyOrderOptimizable(), 
table.isTransactional(), table.getUpdateCacheFrequency());
+                    null, table.rowKeyOrderOptimizable(), 
table.isTransactional(), table.getUpdateCacheFrequency(), 
table.getIndexDisableTimestamp());
     }
 
     // For extracting column references from single select statement

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
index f8b2778..b25baf7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
@@ -82,7 +82,7 @@ public class UnionCompiler {
         PTable tempTable = 
PTableImpl.makePTable(statement.getConnection().getTenantId(), 
UNION_SCHEMA_NAME, UNION_TABLE_NAME, 
                 PTableType.SUBQUERY, null, HConstants.LATEST_TIMESTAMP, scn == 
null ? HConstants.LATEST_TIMESTAMP : scn, null, null,
                         projectedColumns, null, null, null,
-                        true, null, null, null, true, true, true, null, null, 
null, false, false, 0);
+                        true, null, null, null, true, true, true, null, null, 
null, false, false, 0, 0L);
         TableRef tableRef = new TableRef(null, tempTable, 0, false);
         return tableRef;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 78f9700..ba7eb39 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -91,7 +91,6 @@ import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
@@ -110,12 +109,9 @@ import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.Region.RowLock;
-import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.cache.GlobalCache;
@@ -153,6 +149,8 @@ import org.apache.phoenix.parse.PFunction.FunctionArgument;
 import org.apache.phoenix.protobuf.ProtobufUtil;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.AmbiguousColumnException;
 import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
 import org.apache.phoenix.schema.ColumnNotFoundException;
@@ -192,7 +190,6 @@ import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.UpgradeUtil;
-import org.hamcrest.core.IsInstanceOf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -298,6 +295,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
     private static final int ROW_KEY_ORDER_OPTIMIZABLE_INDEX = 
TABLE_KV_COLUMNS.indexOf(ROW_KEY_ORDER_OPTIMIZABLE_KV);
     private static final int TRANSACTIONAL_INDEX = 
TABLE_KV_COLUMNS.indexOf(TRANSACTIONAL_KV);
     private static final int UPDATE_CACHE_FREQUENCY_INDEX = 
TABLE_KV_COLUMNS.indexOf(UPDATE_CACHE_FREQUENCY_KV);
+    private static final int INDEX_DISABLE_TIMESTAMP = 
TABLE_KV_COLUMNS.indexOf(INDEX_DISABLE_TIMESTAMP_KV);
 
     // KeyValues for Column
     private static final KeyValue DECIMAL_DIGITS_KV = 
createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
DECIMAL_DIGITS_BYTES);
@@ -458,7 +456,23 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
                 return;
             }
             
builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_ALREADY_EXISTS);
-            builder.setMutationTime(currentTime);
+            long disableIndexTimestamp = table.getIndexDisableTimestamp();
+            long minNonZerodisableIndexTimestamp = disableIndexTimestamp > 0 ? 
disableIndexTimestamp : Long.MAX_VALUE;
+            for (PTable index : table.getIndexes()) {
+                disableIndexTimestamp = index.getIndexDisableTimestamp();
+                if (disableIndexTimestamp > 0 && index.getIndexState() == 
PIndexState.ACTIVE && disableIndexTimestamp < minNonZerodisableIndexTimestamp) {
+                    minNonZerodisableIndexTimestamp = disableIndexTimestamp;
+                }
+            }
+            // Freeze time for table at min non-zero value of 
INDEX_DISABLE_TIMESTAMP
+            // This will keep the table consistent with index as the table has 
had one more
+            // batch applied to it.
+            if (minNonZerodisableIndexTimestamp == Long.MAX_VALUE) {
+                builder.setMutationTime(currentTime);
+            } else {
+                // Subtract one because we add one due to timestamp 
granularity in Windows
+                builder.setMutationTime(minNonZerodisableIndexTimestamp - 1);
+            }
 
             if (table.getTimeStamp() != tableTimeStamp) {
                 builder.setTable(PTableImpl.toProto(table));
@@ -482,11 +496,14 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
             PTable oldTable = (PTable)metaDataCache.getIfPresent(cacheKey);
             long tableTimeStamp = oldTable == null ? MIN_TABLE_TIMESTAMP-1 : 
oldTable.getTimeStamp();
             PTable newTable;
+            boolean blockWriteRebuildIndex = 
env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE, 
+                    QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE);
             newTable = getTable(scanner, clientTimeStamp, tableTimeStamp);
             if (newTable == null) {
                 return null;
             }
-            if (oldTable == null || tableTimeStamp < newTable.getTimeStamp()) {
+            if (oldTable == null || tableTimeStamp < newTable.getTimeStamp()
+                    || (blockWriteRebuildIndex && 
newTable.getIndexDisableTimestamp() > 0)) {
                 if (logger.isDebugEnabled()) {
                     logger.debug("Caching table "
                             + Bytes.toStringBinary(cacheKey.get(), 
cacheKey.getOffset(),
@@ -819,7 +836,10 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
         long updateCacheFrequency = updateCacheFrequencyKv == null ? 0 :
             
PLong.INSTANCE.getCodec().decodeLong(updateCacheFrequencyKv.getValueArray(),
                     updateCacheFrequencyKv.getValueOffset(), 
SortOrder.getDefault());
-
+        Cell indexDisableTimestampKv = tableKeyValues[INDEX_DISABLE_TIMESTAMP];
+        long indexDisableTimestamp = indexDisableTimestampKv == null ? 0L : 
PLong.INSTANCE.getCodec().decodeLong(indexDisableTimestampKv.getValueArray(),
+                indexDisableTimestampKv.getValueOffset(), 
SortOrder.getDefault());
+        
         List<PColumn> columns = 
Lists.newArrayListWithExpectedSize(columnCount);
         List<PTable> indexes = new ArrayList<PTable>();
         List<PName> physicalTables = new ArrayList<PName>();
@@ -864,7 +884,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
             tableSeqNum, pkName, saltBucketNum, columns, tableType == INDEX ? 
schemaName : null,
             tableType == INDEX ? dataTableName : null, indexes, 
isImmutableRows, physicalTables, defaultFamilyName, viewStatement,
             disableWAL, multiTenant, storeNulls, viewType, viewIndexId, 
indexType, rowKeyOrderOptimizable, transactional, updateCacheFrequency,
-            stats, baseColumnCount);
+            stats, baseColumnCount, indexDisableTimestamp);
     }
 
     private PFunction getFunction(RegionScanner scanner, final boolean 
isReplace, long clientTimeStamp, List<Mutation> deleteMutationsForReplace)
@@ -2410,19 +2430,6 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
         ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
         Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
                 GlobalCache.getInstance(this.env).getMetaDataCache();
-        PTable table = (PTable)metaDataCache.getIfPresent(cacheKey);
-        // We only cache the latest, so we'll end up building the table with 
every call if the
-        // client connection has specified an SCN.
-        // TODO: If we indicate to the client that we're returning an older 
version, but there's a
-        // newer version available, the client
-        // can safely not call this, since we only allow modifications to the 
latest.
-        if (table != null && table.getTimeStamp() < clientTimeStamp) {
-            // Table on client is up-to-date with table on server, so just 
return
-            if (isTableDeleted(table)) {
-                return null;
-            }
-            return table;
-        }
         // Ask Lars about the expense of this call - if we don't take the 
lock, we still won't get
         // partial results
         // get the co-processor environment
@@ -2434,6 +2441,8 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
          * from getting rebuilt too often.
          */
         final boolean wasLocked = (rowLock != null);
+        boolean blockWriteRebuildIndex = 
env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE, 
+                QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE);
         if (!wasLocked) {
             rowLock = region.getRowLock(key, true);
             if (rowLock == null) {
@@ -2441,6 +2450,19 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
             }
         }
         try {
+            PTable table = (PTable)metaDataCache.getIfPresent(cacheKey);
+            // We only cache the latest, so we'll end up building the table 
with every call if the
+            // client connection has specified an SCN.
+            // TODO: If we indicate to the client that we're returning an 
older version, but there's a
+            // newer version available, the client
+            // can safely not call this, since we only allow modifications to 
the latest.
+            if (table != null && table.getTimeStamp() < clientTimeStamp) {
+                // Table on client is up-to-date with table on server, so just 
return
+                if (isTableDeleted(table)) {
+                    return null;
+                }
+                return table;
+            }
             // Try cache again in case we were waiting on a lock
             table = (PTable)metaDataCache.getIfPresent(cacheKey);
             // We only cache the latest, so we'll end up building the table 
with every call if the
@@ -2457,7 +2479,8 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
             }
             // Query for the latest table first, since it's not cached
             table = buildTable(key, cacheKey, region, 
HConstants.LATEST_TIMESTAMP);
-            if (table != null && table.getTimeStamp() < clientTimeStamp) {
+            if ((table != null && table.getTimeStamp() < clientTimeStamp) || 
+                    (blockWriteRebuildIndex && 
table.getIndexDisableTimestamp() > 0)) {
                 return table;
             }
             // Otherwise, query for an older version of the table - it won't 
be cached
@@ -2773,23 +2796,20 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                         
PIndexState.fromSerializedValue(currentStateKV.getValueArray()[currentStateKV
                                 .getValueOffset()]);
 
-                // check if we need reset disable time stamp
-                if( (newState == PIndexState.DISABLE) &&
-                    (currentState == PIndexState.DISABLE || currentState == 
PIndexState.INACTIVE) &&
-                    (currentDisableTimeStamp != null && 
currentDisableTimeStamp.getValueLength() > 0) &&
-                    (disableTimeStampKVIndex >= 0)) {
-                    Long curTimeStampVal = (Long) 
PLong.INSTANCE.toObject(currentDisableTimeStamp.getValueArray(),
-                      currentDisableTimeStamp.getValueOffset(), 
currentDisableTimeStamp.getValueLength());
+                if ((currentDisableTimeStamp != null && 
currentDisableTimeStamp.getValueLength() > 0) &&
+                        (disableTimeStampKVIndex >= 0)) {
+                    long curTimeStampVal = (Long) 
PLong.INSTANCE.toObject(currentDisableTimeStamp.getValueArray(),
+                            currentDisableTimeStamp.getValueOffset(), 
currentDisableTimeStamp.getValueLength());
                     // new DisableTimeStamp is passed in
                     Cell newDisableTimeStampCell = 
newKVs.get(disableTimeStampKVIndex);
-                    Long newDisableTimeStamp = (Long) 
PLong.INSTANCE.toObject(newDisableTimeStampCell.getValueArray(),
-                      newDisableTimeStampCell.getValueOffset(), 
newDisableTimeStampCell.getValueLength());
+                    long newDisableTimeStamp = (Long) 
PLong.INSTANCE.toObject(newDisableTimeStampCell.getValueArray(),
+                            newDisableTimeStampCell.getValueOffset(), 
newDisableTimeStampCell.getValueLength());
                     if(curTimeStampVal > 0 && curTimeStampVal < 
newDisableTimeStamp){
                         // not reset disable timestamp
                         newKVs.remove(disableTimeStampKVIndex);
+                        disableTimeStampKVIndex = -1;
                     }
                 }
-
                 // Detect invalid transitions
                 if (currentState == PIndexState.BUILDING) {
                     if (newState == PIndexState.USABLE) {
@@ -2827,7 +2847,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                 }
 
                 PTable returnTable = null;
-                if (currentState != newState) {
+                if (currentState != newState || disableTimeStampKVIndex != -1) 
{
                     byte[] dataTableKey = null;
                     if(dataTableKV != null) {
                         dataTableKey = SchemaUtil.getTableKey(tenantId, 
schemaName, dataTableKV.getValue());
@@ -2837,7 +2857,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                         tableMetadata = new ArrayList<Mutation>(tableMetadata);
                         // insert an empty KV to trigger time stamp update on 
data table row
                         Put p = new Put(dataTableKey);
-                        p.add(TABLE_FAMILY_BYTES, 
QueryConstants.EMPTY_COLUMN_BYTES, timeStamp, ByteUtil.EMPTY_BYTE_ARRAY);
+                        p.add(TABLE_FAMILY_BYTES, 
QueryConstants.EMPTY_COLUMN_BYTES, timeStamp, 
QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
                         tableMetadata.add(p);
                     }
                     boolean setRowKeyOrderOptimizableCell = newState == 
PIndexState.BUILDING && !rowKeyOrderOptimizable;
@@ -2854,7 +2874,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                     if(dataTableKey != null) {
                         metaDataCache.invalidate(new 
ImmutableBytesPtr(dataTableKey));
                     }
-                    if (setRowKeyOrderOptimizableCell) {
+                    if (setRowKeyOrderOptimizableCell || 
disableTimeStampKVIndex != -1) {
                         returnTable = doGetTable(key, 
HConstants.LATEST_TIMESTAMP, rowLock);
                     }
                 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
index a2f7282..4e019cd 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
@@ -72,6 +72,7 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
     protected ScheduledThreadPoolExecutor executor = new 
ScheduledThreadPoolExecutor(1);
     private boolean enableRebuildIndex = 
QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD;
     private long rebuildIndexTimeInterval = 
QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL;
+    private boolean blockWriteRebuildIndex = false;
 
     @Override
     public void preClose(final ObserverContext<RegionCoprocessorEnvironment> c,
@@ -98,6 +99,8 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
             QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD);
         rebuildIndexTimeInterval = 
env.getConfiguration().getLong(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB,
             
QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL);
+        blockWriteRebuildIndex = 
env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE,
+               QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE);
     }
     
     private static String getJdbcUrl(RegionCoprocessorEnvironment env) {
@@ -145,7 +148,7 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
         };
         (new Thread(r)).start();
 
-        if (!enableRebuildIndex) {
+        if (!enableRebuildIndex && !blockWriteRebuildIndex) {
             LOG.info("Failure Index Rebuild is skipped by configuration.");
             return;
         }
@@ -181,8 +184,14 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
 
         @Override
         public void run() {
+            // FIXME: we should replay the data table Put, as doing a partial 
index build would only add
+            // the new rows and not delete the previous index value. Also, we 
should restrict the scan
+            // to only data within this region (as otherwise *every* region 
will be running this code
+            // separately, all updating the same data.
             RegionScanner scanner = null;
             PhoenixConnection conn = null;
+            boolean blockWriteRebuildIndex = 
env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE, 
+                    QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE);
             if (inProgress.get() > 0) {
                 LOG.debug("New ScheduledBuildIndexTask skipped as there is 
already one running");
                 return;
@@ -192,7 +201,7 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
                 Scan scan = new Scan();
                 SingleColumnValueFilter filter = new 
SingleColumnValueFilter(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                     PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES,
-                    CompareFilter.CompareOp.NOT_EQUAL, 
PLong.INSTANCE.toBytes(0L));
+                    CompareFilter.CompareOp.GREATER, 
PLong.INSTANCE.toBytes(0L));
                 filter.setFilterIfMissing(true);
                 scan.setFilter(filter);
                 scan.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
@@ -233,11 +242,14 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
                     byte[] indexStat = 
r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                         PhoenixDatabaseMetaData.INDEX_STATE_BYTES);
                     if ((dataTable == null || dataTable.length == 0)
-                            || (indexStat == null || indexStat.length == 0)
-                            || 
((Bytes.compareTo(PIndexState.DISABLE.getSerializedBytes(), indexStat) != 0)
+                            || (indexStat == null || indexStat.length == 0)) {
+                        // data table name can't be empty
+                        continue;
+                    }
+
+                    if (!blockWriteRebuildIndex && 
((Bytes.compareTo(PIndexState.DISABLE.getSerializedBytes(), indexStat) != 0)
                                     && 
(Bytes.compareTo(PIndexState.INACTIVE.getSerializedBytes(), indexStat) != 0))) {
                         // index has to be either in disable or inactive state
-                        // data table name can't be empty
                         continue;
                     }
 
@@ -254,6 +266,9 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
 
                     if (conn == null) {
                        final Properties props = new Properties();
+                       // Set SCN so that we don't ping server and have the 
upper bound set back to
+                       // the timestamp when the failure occurred.
+                       props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(Long.MAX_VALUE));
                        // don't run a second index populations upsert select 
                         
props.setProperty(QueryServices.INDEX_POPULATION_SLEEP_TIME, "0"); 
                         conn = DriverManager.getConnection(getJdbcUrl(env), 
props).unwrap(PhoenixConnection.class);
@@ -276,7 +291,7 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
                     long timeStamp = Math.max(0, disabledTimeStampVal - 
overlapTime);
                     
                     LOG.info("Starting to build index=" + 
indexPTable.getName() + " from timestamp=" + timeStamp);
-                    client.buildPartialIndexFromTimeStamp(indexPTable, new 
TableRef(dataPTable, Long.MAX_VALUE, timeStamp));
+                    client.buildPartialIndexFromTimeStamp(indexPTable, new 
TableRef(dataPTable, Long.MAX_VALUE, timeStamp), blockWriteRebuildIndex);
 
                 } while (hasMore);
             } catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
index f74ed0b..9fdfe51 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
@@ -3328,6 +3328,16 @@ public final class PTableProtos {
      * <code>optional int64 updateCacheFrequency = 28;</code>
      */
     long getUpdateCacheFrequency();
+
+    // optional int64 indexDisableTimestamp = 29;
+    /**
+     * <code>optional int64 indexDisableTimestamp = 29;</code>
+     */
+    boolean hasIndexDisableTimestamp();
+    /**
+     * <code>optional int64 indexDisableTimestamp = 29;</code>
+     */
+    long getIndexDisableTimestamp();
   }
   /**
    * Protobuf type {@code PTable}
@@ -3538,6 +3548,11 @@ public final class PTableProtos {
               updateCacheFrequency_ = input.readInt64();
               break;
             }
+            case 232: {
+              bitField0_ |= 0x01000000;
+              indexDisableTimestamp_ = input.readInt64();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -4132,6 +4147,22 @@ public final class PTableProtos {
       return updateCacheFrequency_;
     }
 
+    // optional int64 indexDisableTimestamp = 29;
+    public static final int INDEXDISABLETIMESTAMP_FIELD_NUMBER = 29;
+    private long indexDisableTimestamp_;
+    /**
+     * <code>optional int64 indexDisableTimestamp = 29;</code>
+     */
+    public boolean hasIndexDisableTimestamp() {
+      return ((bitField0_ & 0x01000000) == 0x01000000);
+    }
+    /**
+     * <code>optional int64 indexDisableTimestamp = 29;</code>
+     */
+    public long getIndexDisableTimestamp() {
+      return indexDisableTimestamp_;
+    }
+
     private void initFields() {
       schemaNameBytes_ = com.google.protobuf.ByteString.EMPTY;
       tableNameBytes_ = com.google.protobuf.ByteString.EMPTY;
@@ -4161,6 +4192,7 @@ public final class PTableProtos {
       rowKeyOrderOptimizable_ = false;
       transactional_ = false;
       updateCacheFrequency_ = 0L;
+      indexDisableTimestamp_ = 0L;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -4312,6 +4344,9 @@ public final class PTableProtos {
       if (((bitField0_ & 0x00800000) == 0x00800000)) {
         output.writeInt64(28, updateCacheFrequency_);
       }
+      if (((bitField0_ & 0x01000000) == 0x01000000)) {
+        output.writeInt64(29, indexDisableTimestamp_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -4438,6 +4473,10 @@ public final class PTableProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeInt64Size(28, updateCacheFrequency_);
       }
+      if (((bitField0_ & 0x01000000) == 0x01000000)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(29, indexDisableTimestamp_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -4589,6 +4628,11 @@ public final class PTableProtos {
         result = result && (getUpdateCacheFrequency()
             == other.getUpdateCacheFrequency());
       }
+      result = result && (hasIndexDisableTimestamp() == 
other.hasIndexDisableTimestamp());
+      if (hasIndexDisableTimestamp()) {
+        result = result && (getIndexDisableTimestamp()
+            == other.getIndexDisableTimestamp());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -4714,6 +4758,10 @@ public final class PTableProtos {
         hash = (37 * hash) + UPDATECACHEFREQUENCY_FIELD_NUMBER;
         hash = (53 * hash) + hashLong(getUpdateCacheFrequency());
       }
+      if (hasIndexDisableTimestamp()) {
+        hash = (37 * hash) + INDEXDISABLETIMESTAMP_FIELD_NUMBER;
+        hash = (53 * hash) + hashLong(getIndexDisableTimestamp());
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -4894,6 +4942,8 @@ public final class PTableProtos {
         bitField0_ = (bitField0_ & ~0x04000000);
         updateCacheFrequency_ = 0L;
         bitField0_ = (bitField0_ & ~0x08000000);
+        indexDisableTimestamp_ = 0L;
+        bitField0_ = (bitField0_ & ~0x10000000);
         return this;
       }
 
@@ -5050,6 +5100,10 @@ public final class PTableProtos {
           to_bitField0_ |= 0x00800000;
         }
         result.updateCacheFrequency_ = updateCacheFrequency_;
+        if (((from_bitField0_ & 0x10000000) == 0x10000000)) {
+          to_bitField0_ |= 0x01000000;
+        }
+        result.indexDisableTimestamp_ = indexDisableTimestamp_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -5228,6 +5282,9 @@ public final class PTableProtos {
         if (other.hasUpdateCacheFrequency()) {
           setUpdateCacheFrequency(other.getUpdateCacheFrequency());
         }
+        if (other.hasIndexDisableTimestamp()) {
+          setIndexDisableTimestamp(other.getIndexDisableTimestamp());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -6964,6 +7021,39 @@ public final class PTableProtos {
         return this;
       }
 
+      // optional int64 indexDisableTimestamp = 29;
+      private long indexDisableTimestamp_ ;
+      /**
+       * <code>optional int64 indexDisableTimestamp = 29;</code>
+       */
+      public boolean hasIndexDisableTimestamp() {
+        return ((bitField0_ & 0x10000000) == 0x10000000);
+      }
+      /**
+       * <code>optional int64 indexDisableTimestamp = 29;</code>
+       */
+      public long getIndexDisableTimestamp() {
+        return indexDisableTimestamp_;
+      }
+      /**
+       * <code>optional int64 indexDisableTimestamp = 29;</code>
+       */
+      public Builder setIndexDisableTimestamp(long value) {
+        bitField0_ |= 0x10000000;
+        indexDisableTimestamp_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int64 indexDisableTimestamp = 29;</code>
+       */
+      public Builder clearIndexDisableTimestamp() {
+        bitField0_ = (bitField0_ & ~0x10000000);
+        indexDisableTimestamp_ = 0L;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:PTable)
     }
 
@@ -7011,7 +7101,7 @@ public final class PTableProtos {
       "\016\n\006values\030\002 
\003(\014\022\033\n\023guidePostsByteCount\030\003",
       " \001(\003\022\025\n\rkeyBytesCount\030\004 
\001(\003\022\027\n\017guidePost" +
       "sCount\030\005 \001(\005\022!\n\013pGuidePosts\030\006 
\001(\0132\014.PGui" +
-      "dePosts\"\244\005\n\006PTable\022\027\n\017schemaNameBytes\030\001 " +
+      "dePosts\"\303\005\n\006PTable\022\027\n\017schemaNameBytes\030\001 " +
       "\002(\014\022\026\n\016tableNameBytes\030\002 
\002(\014\022\036\n\ttableType" +
       "\030\003 \002(\0162\013.PTableType\022\022\n\nindexState\030\004 
\001(\t\022" +
       "\026\n\016sequenceNumber\030\005 \002(\003\022\021\n\ttimeStamp\030\006 
\002" +
@@ -7028,10 +7118,11 @@ public final class PTableProtos {
       "storeNulls\030\030 \001(\010\022\027\n\017baseColumnCount\030\031 
\001(" +
       "\005\022\036\n\026rowKeyOrderOptimizable\030\032 
\001(\010\022\025\n\rtra" +
       "nsactional\030\033 \001(\010\022\034\n\024updateCacheFrequency" +
-      "\030\034 
\001(\003*A\n\nPTableType\022\n\n\006SYSTEM\020\000\022\010\n\004USER",
-      
"\020\001\022\010\n\004VIEW\020\002\022\t\n\005INDEX\020\003\022\010\n\004JOIN\020\004B@\n(org"
 +
-      ".apache.phoenix.coprocessor.generatedB\014P" +
-      "TableProtosH\001\210\001\001\240\001\001"
+      "\030\034 \001(\003\022\035\n\025indexDisableTimestamp\030\035 
\001(\003*A\n",
+      
"\nPTableType\022\n\n\006SYSTEM\020\000\022\010\n\004USER\020\001\022\010\n\004VIE"
 +
+      
"W\020\002\022\t\n\005INDEX\020\003\022\010\n\004JOIN\020\004B@\n(org.apache.p" 
+
+      "hoenix.coprocessor.generatedB\014PTableProt" +
+      "osH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner 
assigner =
       new 
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -7055,7 +7146,7 @@ public final class PTableProtos {
           internal_static_PTable_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_PTable_descriptor,
-              new java.lang.String[] { "SchemaNameBytes", "TableNameBytes", 
"TableType", "IndexState", "SequenceNumber", "TimeStamp", "PkNameBytes", 
"BucketNum", "Columns", "Indexes", "IsImmutableRows", "GuidePosts", 
"DataTableNameBytes", "DefaultFamilyName", "DisableWAL", "MultiTenant", 
"ViewType", "ViewStatement", "PhysicalNames", "TenantId", "ViewIndexId", 
"IndexType", "StatsTimeStamp", "StoreNulls", "BaseColumnCount", 
"RowKeyOrderOptimizable", "Transactional", "UpdateCacheFrequency", });
+              new java.lang.String[] { "SchemaNameBytes", "TableNameBytes", 
"TableType", "IndexState", "SequenceNumber", "TimeStamp", "PkNameBytes", 
"BucketNum", "Columns", "Indexes", "IsImmutableRows", "GuidePosts", 
"DataTableNameBytes", "DefaultFamilyName", "DisableWAL", "MultiTenant", 
"ViewType", "ViewStatement", "PhysicalNames", "TenantId", "ViewIndexId", 
"IndexType", "StatsTimeStamp", "StoreNulls", "BaseColumnCount", 
"RowKeyOrderOptimizable", "Transactional", "UpdateCacheFrequency", 
"IndexDisableTimestamp", });
           return null;
         }
       };

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java 
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index b1d8e7d..7ddd14c 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -338,6 +338,8 @@ public enum SQLExceptionCode {
     CANNOT_SPLIT_LOCAL_INDEX(1109,"XCL09", "Local index may not be 
pre-split."),
     CANNOT_SALT_LOCAL_INDEX(1110,"XCL10", "Local index may not be salted."),
 
+    INDEX_FAILURE_BLOCK_WRITE(1120, "XCL20", "Writes to table blocked until 
index can be updated."),
+
     /**
      * Implementation defined class. Phoenix internal error. (errorcode 20, 
sqlstate INT).
      */

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 1658962..6095089 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -37,6 +37,18 @@ import java.util.concurrent.TimeoutException;
 import javax.annotation.Nonnull;
 import javax.annotation.concurrent.Immutable;
 
+import co.cask.tephra.Transaction;
+import co.cask.tephra.Transaction.VisibilityLevel;
+import co.cask.tephra.TransactionAware;
+import co.cask.tephra.TransactionCodec;
+import co.cask.tephra.TransactionConflictException;
+import co.cask.tephra.TransactionContext;
+import co.cask.tephra.TransactionFailureException;
+import co.cask.tephra.TransactionSystemClient;
+import co.cask.tephra.hbase11.TransactionAwareHTable;
+import co.cask.tephra.visibility.FenceWait;
+import co.cask.tephra.visibility.VisibilityFence;
+
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.HTableInterface;
@@ -68,6 +80,7 @@ import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.IllegalDataException;
 import org.apache.phoenix.schema.MetaDataClient;
 import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PMetaData;
 import org.apache.phoenix.schema.PRow;
 import org.apache.phoenix.schema.PTable;
@@ -91,18 +104,6 @@ import org.apache.phoenix.util.TransactionUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import co.cask.tephra.Transaction;
-import co.cask.tephra.Transaction.VisibilityLevel;
-import co.cask.tephra.TransactionAware;
-import co.cask.tephra.TransactionCodec;
-import co.cask.tephra.TransactionConflictException;
-import co.cask.tephra.TransactionContext;
-import co.cask.tephra.TransactionFailureException;
-import co.cask.tephra.TransactionSystemClient;
-import co.cask.tephra.hbase11.TransactionAwareHTable;
-import co.cask.tephra.visibility.FenceWait;
-import co.cask.tephra.visibility.VisibilityFence;
-
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
@@ -737,6 +738,16 @@ public class MutationState implements SQLCloseable {
         // Always update tableRef table as the one we've cached may be out of 
date since when we executed
         // the UPSERT VALUES call and updated in the cache before this.
         tableRef.setTable(resolvedTable);
+        List<PTable> indexes = resolvedTable.getIndexes();
+        for (PTable idxTtable : indexes) {
+            // If index is still active, but has a non zero 
INDEX_DISABLE_TIMESTAMP value, then infer that
+            // our failure mode is block writes on index failure.
+            if (idxTtable.getIndexState() == PIndexState.ACTIVE && 
idxTtable.getIndexDisableTimestamp() > 0) {
+                throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.INDEX_FAILURE_BLOCK_WRITE)
+                .setSchemaName(table.getSchemaName().getString())
+                
.setTableName(table.getTableName().getString()).build().buildException();
+            }
+        } 
         long timestamp = result.getMutationTime();
         if (timestamp != QueryConstants.UNSET_TIMESTAMP) {
             serverTimeStamp = timestamp;
@@ -748,8 +759,8 @@ public class MutationState implements SQLCloseable {
                         Map<PColumn, byte[]> colValues = 
valueEntry.getColumnValues();
                         if (colValues != PRow.DELETE_MARKER) {
                             for (PColumn column : colValues.keySet()) {
-                               if (!column.isDynamic())
-                                       columns.add(column);
+                                if (!column.isDynamic())
+                                    columns.add(column);
                             }
                         }
                     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/DelegateIndexFailurePolicy.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/DelegateIndexFailurePolicy.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/DelegateIndexFailurePolicy.java
new file mode 100644
index 0000000..a7fb7ec
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/DelegateIndexFailurePolicy.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.write;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
+
+import com.google.common.collect.Multimap;
+
+public class DelegateIndexFailurePolicy implements IndexFailurePolicy {
+
+    private final IndexFailurePolicy delegate;
+    
+    public DelegateIndexFailurePolicy(IndexFailurePolicy delegate) {
+        this.delegate = delegate;
+    }
+
+    @Override
+    public void handleFailure(Multimap<HTableInterfaceReference, Mutation> 
attempted, Exception cause)
+            throws IOException {
+        delegate.handleFailure(attempted, cause);
+    }
+
+    @Override
+    public boolean isStopped() {
+        return delegate.isStopped();
+    }
+
+    @Override
+    public void setup(Stoppable parent, RegionCoprocessorEnvironment env) {
+        delegate.setup(parent, env);
+    }
+
+    @Override
+    public void stop(String arg0) {
+        delegate.stop(arg0);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
index 09a8676..c7ed49b 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
@@ -49,10 +49,13 @@ import 
org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService;
 import 
org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest;
 import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
+import org.apache.phoenix.hbase.index.write.DelegateIndexFailurePolicy;
 import org.apache.phoenix.hbase.index.write.KillServerOnFailurePolicy;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.protobuf.ProtobufUtil;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
@@ -71,22 +74,19 @@ import com.google.common.collect.Multimap;
  * region server. First attempts to disable the index and failing that falls
  * back to the default behavior of killing the region server.
  *
- * TODO: use delegate pattern instead
- * 
- * 
- * @since 2.1
  */
-public class PhoenixIndexFailurePolicy extends KillServerOnFailurePolicy {
+public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy {
     private static final Log LOG = 
LogFactory.getLog(PhoenixIndexFailurePolicy.class);
     private RegionCoprocessorEnvironment env;
 
     public PhoenixIndexFailurePolicy() {
+        super(new KillServerOnFailurePolicy());
     }
 
     @Override
     public void setup(Stoppable parent, RegionCoprocessorEnvironment env) {
-      super.setup(parent, env);
-      this.env = env;
+        super.setup(parent, env);
+        this.env = env;
     }
 
     /**
@@ -101,9 +101,11 @@ public class PhoenixIndexFailurePolicy extends 
KillServerOnFailurePolicy {
      */
     @Override
     public void handleFailure(Multimap<HTableInterfaceReference, Mutation> 
attempted, Exception cause) throws IOException {
+        boolean blockWriteRebuildIndex = 
env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE, 
+                QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE);
         boolean throwing = true;
         try {
-            handleFailureWithExceptions(attempted, cause);
+            handleFailureWithExceptions(attempted, cause, 
blockWriteRebuildIndex);
             throwing = false;
         } catch (Throwable t) {
             LOG.warn("handleFailure failed", t);
@@ -115,7 +117,7 @@ public class PhoenixIndexFailurePolicy extends 
KillServerOnFailurePolicy {
     }
 
     private void 
handleFailureWithExceptions(Multimap<HTableInterfaceReference, Mutation> 
attempted,
-            Exception cause) throws Throwable {
+            Exception cause, boolean blockWriteRebuildIndex) throws Throwable {
         Set<HTableInterfaceReference> refs = attempted.asMap().keySet();
         Map<String, Long> indexTableNames = new HashMap<String, 
Long>(refs.size());
         // start by looking at all the tables to which we attempted to write
@@ -157,8 +159,12 @@ public class PhoenixIndexFailurePolicy extends 
KillServerOnFailurePolicy {
                     
env.getTable(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES));
             // Mimic the Put that gets generated by the client on an update of 
the index state
             Put put = new Put(indexTableKey);
-            put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.INDEX_STATE_BYTES,
-                    PIndexState.DISABLE.getSerializedBytes());
+            if (blockWriteRebuildIndex) 
+                put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.INDEX_STATE_BYTES,
+                        PIndexState.ACTIVE.getSerializedBytes());
+            else  
+                put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.INDEX_STATE_BYTES,
+                        PIndexState.DISABLE.getSerializedBytes());
             put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES,
                 PLong.INSTANCE.toBytes(minTimeStamp));
             final List<Mutation> tableMetadata = 
Collections.<Mutation>singletonList(put);
@@ -194,12 +200,22 @@ public class PhoenixIndexFailurePolicy extends 
KillServerOnFailurePolicy {
                 continue;
             }
             if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) 
{
-                LOG.warn("Attempt to disable index " + indexTableName + " 
failed with code = "
-                        + result.getMutationCode() + ". Will use default 
failure policy instead.");
-                throw new DoNotRetryIOException("Attempt to disable " + 
indexTableName + " failed.");
+                if (blockWriteRebuildIndex) {
+                    LOG.warn("Attempt to update INDEX_DISABLE_TIMESTAMP " + " 
failed with code = "
+                            + result.getMutationCode());
+                    throw new DoNotRetryIOException("Attempt to update 
INDEX_DISABLE_TIMESTAMP failed.");
+                } else {
+                    LOG.warn("Attempt to disable index " + indexTableName + " 
failed with code = "
+                            + result.getMutationCode() + ". Will use default 
failure policy instead.");
+                    throw new DoNotRetryIOException("Attempt to disable " + 
indexTableName + " failed.");
+                } 
             }
-            LOG.info("Successfully disabled index " + indexTableName + " due 
to an exception while writing updates.",
-                    cause);
+            if (blockWriteRebuildIndex)
+                LOG.info("Successfully update INDEX_DISABLE_TIMESTAMP for " + 
indexTableName + " due to an exception while writing updates.",
+                        cause);
+            else
+                LOG.info("Successfully disabled index " + indexTableName + " 
due to an exception while writing updates.",
+                        cause);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index b0e7b6e..fe40d60 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -129,6 +129,9 @@ public interface QueryServices extends SQLCloseable {
     public static final String INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB =
         "phoenix.index.failure.handling.rebuild.interval";
 
+    // A master switch if to block writes when index build failed
+    public static final String INDEX_FAILURE_BLOCK_WRITE = 
"phoenix.index.failure.block.write";
+
     // Index will be partially re-built from index disable time stamp - 
following overlap time
     public static final String 
INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB =
         "phoenix.index.failure.handling.rebuild.overlap.time";

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 1838b51..62297ee 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -152,6 +152,7 @@ public class QueryServicesOptions {
     public static final int DEFAULT_GROUPBY_ESTIMATED_DISTINCT_VALUES = 1000;
     public static final int DEFAULT_CLOCK_SKEW_INTERVAL = 2000;
     public static final boolean DEFAULT_INDEX_FAILURE_HANDLING_REBUILD = true; 
// auto rebuild on
+    public static final boolean DEFAULT_INDEX_FAILURE_BLOCK_WRITE = false; 
     public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL = 
10000; // 10 secs
     public static final long 
DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME = 300000; // 5 mins
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
index e7bf961..b294f03 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
@@ -32,6 +32,11 @@ public class DelegateTable implements PTable {
     }
 
     @Override
+    public long getIndexDisableTimestamp() {
+        return delegate.getIndexDisableTimestamp();
+    }
+
+    @Override
     public long getSequenceNumber() {
         return delegate.getSequenceNumber();
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 0456335..6409dcd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -1098,29 +1098,33 @@ public class MetaDataClient {
     /**
      * Rebuild indexes from a timestamp which is the value from hbase row key 
timestamp field
      */
-    public void buildPartialIndexFromTimeStamp(PTable index, TableRef 
dataTableRef) throws SQLException {
-        boolean needRestoreIndexState = false;
-        // Need to change index state from Disable to InActive when build 
index partially so that
-        // new changes will be indexed during index rebuilding
-        AlterIndexStatement indexStatement = 
FACTORY.alterIndex(FACTORY.namedTable(null,
-            TableName.create(index.getSchemaName().getString(), 
index.getTableName().getString())),
-            dataTableRef.getTable().getTableName().getString(), false, 
PIndexState.INACTIVE);
-        alterIndex(indexStatement);
-        needRestoreIndexState = true;
+    public void buildPartialIndexFromTimeStamp(PTable index, TableRef 
dataTableRef, boolean blockWriteRebuildIndex) throws SQLException {
+        boolean needRestoreIndexState = true;
+        AlterIndexStatement indexStatement = null;
+        if (!blockWriteRebuildIndex) {
+            // Need to change index state from Disable to InActive when build 
index partially so that
+            // new changes will be indexed during index rebuilding
+            indexStatement = FACTORY.alterIndex(FACTORY.namedTable(null,
+                    TableName.create(index.getSchemaName().getString(), 
index.getTableName().getString())),
+                    dataTableRef.getTable().getTableName().getString(), false, 
PIndexState.INACTIVE);
+            alterIndex(indexStatement); 
+        } 
         try {
             buildIndex(index, dataTableRef);
             needRestoreIndexState = false;
         } finally {
             if(needRestoreIndexState) {
-                // reset index state to disable
-                indexStatement = FACTORY.alterIndex(FACTORY.namedTable(null,
-                    TableName.create(index.getSchemaName().getString(), 
index.getTableName().getString())),
-                    dataTableRef.getTable().getTableName().getString(), false, 
PIndexState.DISABLE);
-                alterIndex(indexStatement);
+                if (!blockWriteRebuildIndex) {
+                    // reset index state to disable
+                    indexStatement = 
FACTORY.alterIndex(FACTORY.namedTable(null,
+                            
TableName.create(index.getSchemaName().getString(), 
index.getTableName().getString())),
+                            
dataTableRef.getTable().getTableName().getString(), false, PIndexState.DISABLE);
+                    alterIndex(indexStatement);
+                }
             }
         }
     }
-
+    
     /**
      * Create an index table by morphing the CreateIndexStatement into a 
CreateTableStatement and calling
      * MetaDataClient.createTable. In doing so, we perform the following 
translations:
@@ -2004,7 +2008,7 @@ public class MetaDataClient {
                         Collections.<PTable>emptyList(), isImmutableRows,
                         Collections.<PName>emptyList(), defaultFamilyName == 
null ? null :
                                 PNameFactory.newName(defaultFamilyName), null,
-                        Boolean.TRUE.equals(disableWAL), false, false, null, 
indexId, indexType, true, false, 0);
+                        Boolean.TRUE.equals(disableWAL), false, false, null, 
indexId, indexType, true, false, 0, 0L);
                 connection.addTable(table, 
MetaDataProtocol.MIN_TABLE_TIMESTAMP);
             } else if (tableType == PTableType.INDEX && indexId == null) {
                 if (tableProps.get(HTableDescriptor.MAX_FILESIZE) == null) {
@@ -2174,7 +2178,7 @@ public class MetaDataClient {
                         PTable.INITIAL_SEQ_NUM, pkName == null ? null : 
PNameFactory.newName(pkName), saltBucketNum, columns,
                         dataTableName == null ? null : newSchemaName, 
dataTableName == null ? null : PNameFactory.newName(dataTableName), 
Collections.<PTable>emptyList(), isImmutableRows,
                         physicalNames, defaultFamilyName == null ? null : 
PNameFactory.newName(defaultFamilyName), viewStatement, 
Boolean.TRUE.equals(disableWAL), multiTenant, storeNulls, viewType,
-                        indexId, indexType, rowKeyOrderOptimizable, 
transactional, updateCacheFrequency);
+                        indexId, indexType, rowKeyOrderOptimizable, 
transactional, updateCacheFrequency, 0L);
                 result = new MetaDataMutationResult(code, 
result.getMutationTime(), table, true);
                 addTableToCache(result);
                 return table;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index 4a338f6..b2a1d58 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -157,6 +157,7 @@ public interface PTable extends PMetaDataEntity {
 
     long getTimeStamp();
     long getSequenceNumber();
+    long getIndexDisableTimestamp();
     /**
      * @return table name
      */

Reply via email to