PHOENIX-2221 Option to make data regions not writable when index regions are not available (Alicia Ying Shu, James Taylor)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e2a6386f Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e2a6386f Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e2a6386f Branch: refs/heads/calcite Commit: e2a6386f3b9343aec74c5f96f0e0124e80b9f8b1 Parents: 6881aef Author: James Taylor <jtay...@salesforce.com> Authored: Sun Feb 14 09:06:14 2016 -0800 Committer: James Taylor <jtay...@salesforce.com> Committed: Mon Feb 15 00:33:18 2016 -0800 ---------------------------------------------------------------------- .../end2end/index/MutableIndexFailureIT.java | 31 +- .../end2end/index/ReadOnlyIndexFailureIT.java | 289 +++++++++++++++++++ .../apache/phoenix/compile/FromCompiler.java | 2 +- .../apache/phoenix/compile/JoinCompiler.java | 2 +- .../compile/TupleProjectionCompiler.java | 4 +- .../apache/phoenix/compile/UnionCompiler.java | 2 +- .../coprocessor/MetaDataEndpointImpl.java | 92 +++--- .../coprocessor/MetaDataRegionObserver.java | 27 +- .../coprocessor/generated/PTableProtos.java | 103 ++++++- .../phoenix/exception/SQLExceptionCode.java | 2 + .../apache/phoenix/execute/MutationState.java | 39 ++- .../index/write/DelegateIndexFailurePolicy.java | 58 ++++ .../index/PhoenixIndexFailurePolicy.java | 48 ++- .../org/apache/phoenix/query/QueryServices.java | 3 + .../phoenix/query/QueryServicesOptions.java | 1 + .../apache/phoenix/schema/DelegateTable.java | 5 + .../apache/phoenix/schema/MetaDataClient.java | 38 +-- .../java/org/apache/phoenix/schema/PTable.java | 1 + .../org/apache/phoenix/schema/PTableImpl.java | 51 ++-- .../phoenix/execute/CorrelatePlanTest.java | 2 +- phoenix-protocol/src/main/PTable.proto | 1 + 21 files changed, 660 insertions(+), 141 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java index 5f39515..176c5a0 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java @@ -172,7 +172,7 @@ public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT { TableName indexTable = TableName.valueOf(localIndex ? MetaDataUtil .getLocalIndexTableName(fullTableName) : fullIndexName); - HBaseAdmin admin = this.getUtility().getHBaseAdmin(); + HBaseAdmin admin = getUtility().getHBaseAdmin(); HTableDescriptor indexTableDesc = admin.getTableDescriptor(indexTable); try{ admin.disableTable(indexTable); @@ -184,20 +184,10 @@ public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT { stmt.setString(2, "x2"); stmt.setString(3, "2"); stmt.execute(); - if (transactional) { - try { - conn.commit(); - fail(); - } catch (SQLException e) { - conn.rollback(); - } - } - else { - try { - conn.commit(); - fail(); - } catch (SQLException e) { - } + try { + conn.commit(); + fail(); + } catch (SQLException e) { } // Verify the metadata for index is correct. @@ -341,9 +331,9 @@ public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT { // find a RS which doesn't has CATALOG table TableName catalogTable = TableName.valueOf("SYSTEM.CATALOG"); TableName indexTable = TableName.valueOf(fullIndexName); - final HBaseCluster cluster = this.getUtility().getHBaseCluster(); + final HBaseCluster cluster = getUtility().getHBaseCluster(); Collection<ServerName> rss = cluster.getClusterStatus().getServers(); - HBaseAdmin admin = this.getUtility().getHBaseAdmin(); + HBaseAdmin admin = getUtility().getHBaseAdmin(); List<HRegionInfo> regions = admin.getTableRegions(catalogTable); ServerName catalogRS = cluster.getServerHoldingRegion(regions.get(0).getTable(), regions.get(0).getRegionName()); @@ -363,7 +353,7 @@ public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT { final HRegionInfo indexRegion = regions.get(0); final ServerName dstRS = rsToBeKilled; admin.move(indexRegion.getEncodedNameAsBytes(), Bytes.toBytes(rsToBeKilled.getServerName())); - this.getUtility().waitFor(30000, 200, new Waiter.Predicate<Exception>() { + getUtility().waitFor(30000, 200, new Waiter.Predicate<Exception>() { @Override public boolean evaluate() throws Exception { ServerName sn = cluster.getServerHoldingRegion(indexRegion.getTable(), @@ -379,10 +369,10 @@ public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT { Thread.sleep(100); // kill RS hosting index table - this.getUtility().getHBaseCluster().killRegionServer(rsToBeKilled); + getUtility().getHBaseCluster().killRegionServer(rsToBeKilled); // wait for index table completes recovery - this.getUtility().waitUntilAllRegionsAssigned(indexTable); + getUtility().waitUntilAllRegionsAssigned(indexTable); // Verify the metadata for index is correct. do { @@ -413,6 +403,7 @@ public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT { this.fullTableName = fullTableName; } + @Override public void run() { if(inProgress.get() > 0){ return; http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java new file mode 100644 index 0000000..8df82ce --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end.index; + +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Collections; +import java.util.Map; +import java.util.Properties; + +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.end2end.BaseOwnClusterHBaseManagedTimeIT; +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.hbase.index.Indexer; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.schema.PIndexState; +import org.apache.phoenix.schema.PTableType; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.SchemaUtil; +import org.apache.phoenix.util.StringUtil; +import org.apache.phoenix.util.TestUtil; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.collect.Maps; +/** + * + * Test for failure of region server to write to index table. + * For some reason dropping tables after running this test + * fails unless it runs its own mini cluster. + * + * + * @since 2.1 + */ + +@Category(NeedsOwnMiniClusterTest.class) +public class ReadOnlyIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT { + private static final String FAIL_ON_FIRST_PUT = "bbb"; + + private String tableName; + private String indexName; + private String fullTableName; + private String fullIndexName; + + public ReadOnlyIndexFailureIT() { + this.tableName = TestUtil.DEFAULT_DATA_TABLE_NAME; + this.indexName = "IDX"; + this.fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName); + this.fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName); + } + + @BeforeClass + public static void doSetup() throws Exception { + Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10); + serverProps.put("hbase.client.retries.number", "2"); + serverProps.put("hbase.client.pause", "5000"); + serverProps.put("hbase.balancer.period", String.valueOf(Integer.MAX_VALUE)); + serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB, "0"); + serverProps.put(QueryServices.INDEX_FAILURE_BLOCK_WRITE, "true"); + serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, "true"); + serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, "1000"); + serverProps.put("hbase.coprocessor.region.classes", FailingRegionObserver.class.getName()); + serverProps.put("hbase.coprocessor.abortonerror", "false"); + serverProps.put(Indexer.CHECK_VERSION_CONF_KEY, "false"); + Map<String, String> clientProps = + Collections.singletonMap(QueryServices.TRANSACTIONS_ENABLED, "true"); + NUM_SLAVES_BASE = 4; + setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), + new ReadOnlyProps(clientProps.entrySet().iterator())); + } + + @Test + public void testWriteFailureReadOnlyLocalIndex() throws Exception { + helpTestWriteFailureReadOnlyIndex(true); + } + + @Test + public void testWriteFailureReadOnlyIndex() throws Exception { + helpTestWriteFailureReadOnlyIndex(false); + } + + public void helpTestWriteFailureReadOnlyIndex(boolean localIndex) throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = driver.connect(url, props)) { + String query; + ResultSet rs; + conn.setAutoCommit(false); + conn.createStatement().execute( + "CREATE TABLE " + fullTableName + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"); + query = "SELECT * FROM " + fullTableName; + rs = conn.createStatement().executeQuery(query); + assertFalse(rs.next()); + + if(localIndex) { + conn.createStatement().execute( + "CREATE LOCAL INDEX " + indexName + " ON " + fullTableName + + " (v1) INCLUDE (v2)"); + } else { + conn.createStatement().execute( + "CREATE INDEX " + indexName + " ON " + fullTableName + + " (v1) INCLUDE (v2)"); + } + + query = "SELECT * FROM " + fullIndexName; + rs = conn.createStatement().executeQuery(query); + assertFalse(rs.next()); + + // Verify the metadata for index is correct. + rs = conn.getMetaData().getTables(null, + StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName, + new String[] { PTableType.INDEX.toString() }); + assertTrue(rs.next()); + assertEquals(indexName, rs.getString(3)); + assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE")); + assertFalse(rs.next()); + + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + + " VALUES(?,?,?)"); + stmt.setString(1, "1"); + stmt.setString(2, "aaa"); + stmt.setString(3, "a1"); + stmt.execute(); + conn.commit(); + + stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)"); + stmt.setString(1, "2"); + stmt.setString(2, FAIL_ON_FIRST_PUT); + stmt.setString(3, "b2"); + stmt.execute(); + try { + conn.commit(); + fail(); + } catch (SQLException e) { + } + + // Only successfully committed row should be seen + query = "SELECT /*+ NO_INDEX*/ v1 FROM " + fullTableName; + rs = conn.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertEquals("aaa", rs.getString(1)); + assertFalse(rs.next()); + + // Verify the metadata for index is correct. + rs = conn.getMetaData().getTables(null, + StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName, + new String[] { PTableType.INDEX.toString() }); + assertTrue(rs.next()); + assertEquals(indexName, rs.getString(3)); + // the index is always active for tables upon index table write failure + assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE")); + assertFalse(rs.next()); + + // if the table is transactional the write to the index table will fail because the + // index has not been disabled + // Verify UPSERT on data table is blocked after index write failed + stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)"); + stmt.setString(1, "3"); + stmt.setString(2, "ccc"); + stmt.setString(3, "3c"); + try { + stmt.execute(); + /* Writes would be blocked */ + conn.commit(); + fail(); + } catch (SQLException e) { + assertEquals(SQLExceptionCode.INDEX_FAILURE_BLOCK_WRITE.getErrorCode(), e.getErrorCode()); + } + + // Second attempt at writing will succeed + int retries = 0; + do { + Thread.sleep(5 * 1000); // sleep 5 secs + if(!hasIndexDisableTimestamp(conn, indexName)){ + break; + } + if (++retries == 5) { + fail("Failed to rebuild index with allowed time"); + } + } while(true); + + // Verify UPSERT on data table still work after index table is recreated + stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)"); + stmt.setString(1, "4"); + stmt.setString(2, "ddd"); + stmt.setString(3, "4d"); + stmt.execute(); + conn.commit(); + + // verify index table has data + query = "SELECT count(1) FROM " + indexName; + rs = conn.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertEquals(3, rs.getInt(1)); + + query = "SELECT v1 FROM " + fullTableName; + rs = conn.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertEquals("aaa", rs.getString(1)); + assertTrue(rs.next()); + assertEquals("bbb", rs.getString(1)); + assertTrue(rs.next()); + assertEquals("ddd", rs.getString(1)); + assertFalse(rs.next()); + + query = "SELECT /*+ NO_INDEX*/ v1 FROM " + fullTableName; + rs = conn.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertEquals("aaa", rs.getString(1)); + assertTrue(rs.next()); + assertEquals("bbb", rs.getString(1)); + assertTrue(rs.next()); + assertEquals("ddd", rs.getString(1)); + assertFalse(rs.next()); + } + } + + private static boolean hasIndexDisableTimestamp(Connection conn, String indexName) throws SQLException { + ResultSet rs = conn.createStatement().executeQuery("SELECT " + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + + " FROM " + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + + " WHERE " + PhoenixDatabaseMetaData.COLUMN_NAME + " IS NULL" + + " AND " + PhoenixDatabaseMetaData.TENANT_ID + " IS NULL" + + " AND " + PhoenixDatabaseMetaData.TABLE_SCHEM + " IS NULL" + + " AND " + PhoenixDatabaseMetaData.TABLE_NAME + " = '" + indexName + "'"); + assertTrue(rs.next()); + long ts = rs.getLong(1); + return (!rs.wasNull() && ts > 0); + } + + public static class FailingRegionObserver extends SimpleRegionObserver { + private Integer failCount = new Integer(0); + + @Override + public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit, + final Durability durability) throws HBaseIOException { + if (shouldFailUpsert(c, put)) { + synchronized (failCount) { + failCount++; + if (failCount.intValue() == 1) { + // throwing anything other than instances of IOException result + // in this coprocessor being unloaded + // DoNotRetryIOException tells HBase not to retry this mutation + // multiple times + throw new DoNotRetryIOException(); + } + } + } + } + + private boolean shouldFailUpsert(ObserverContext<RegionCoprocessorEnvironment> c, Put put) { + return Bytes.contains(put.getRow(), Bytes.toBytes(FAIL_ON_FIRST_PUT)); + } + + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java index dd93c81..ffe9621 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java @@ -652,7 +652,7 @@ public class FromCompiler { PTableType.SUBQUERY, null, MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM, null, null, columns, null, null, Collections.<PTable>emptyList(), false, Collections.<PName>emptyList(), null, null, false, false, false, null, - null, null, false, false, 0); + null, null, false, false, 0, 0L); String alias = subselectNode.getAlias(); TableRef tableRef = new TableRef(alias, t, MetaDataProtocol.MIN_TABLE_TIMESTAMP, false); http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java index b64b9b7..5d03f57 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java @@ -1302,7 +1302,7 @@ public class JoinCompiler { left.getBucketNum(), merged,left.getParentSchemaName(), left.getParentTableName(), left.getIndexes(), left.isImmutableRows(), Collections.<PName>emptyList(), null, null, PTable.DEFAULT_DISABLE_WAL, left.isMultiTenant(), left.getStoreNulls(), left.getViewType(), left.getViewIndexId(), left.getIndexType(), - left.rowKeyOrderOptimizable(), left.isTransactional(), left.getUpdateCacheFrequency()); + left.rowKeyOrderOptimizable(), left.isTransactional(), left.getUpdateCacheFrequency(), left.getIndexDisableTimestamp()); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java index 0fc6d74..4be78a9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java @@ -152,7 +152,7 @@ public class TupleProjectionCompiler { table.getBucketNum(), projectedColumns, table.getParentSchemaName(), table.getParentName(), table.getIndexes(), table.isImmutableRows(), Collections.<PName>emptyList(), null, null, table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), - table.getIndexType(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency()); + table.getIndexType(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp()); } public static PTable createProjectedTable(TableRef tableRef, List<ColumnRef> sourceColumnRefs, boolean retainPKColumns) throws SQLException { @@ -179,7 +179,7 @@ public class TupleProjectionCompiler { retainPKColumns ? table.getBucketNum() : null, projectedColumns, null, null, Collections.<PTable>emptyList(), table.isImmutableRows(), Collections.<PName>emptyList(), null, null, table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), - null, table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency()); + null, table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp()); } // For extracting column references from single select statement http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java index f8b2778..b25baf7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java @@ -82,7 +82,7 @@ public class UnionCompiler { PTable tempTable = PTableImpl.makePTable(statement.getConnection().getTenantId(), UNION_SCHEMA_NAME, UNION_TABLE_NAME, PTableType.SUBQUERY, null, HConstants.LATEST_TIMESTAMP, scn == null ? HConstants.LATEST_TIMESTAMP : scn, null, null, projectedColumns, null, null, null, - true, null, null, null, true, true, true, null, null, null, false, false, 0); + true, null, null, null, true, true, true, null, null, null, false, false, 0, 0L); TableRef tableRef = new TableRef(null, tempTable, 0, false); return tableRef; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index 78f9700..ba7eb39 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -91,7 +91,6 @@ import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; @@ -110,12 +109,9 @@ import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.Region.RowLock; -import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionScanner; -import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.cache.GlobalCache; @@ -153,6 +149,8 @@ import org.apache.phoenix.parse.PFunction.FunctionArgument; import org.apache.phoenix.protobuf.ProtobufUtil; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.AmbiguousColumnException; import org.apache.phoenix.schema.ColumnFamilyNotFoundException; import org.apache.phoenix.schema.ColumnNotFoundException; @@ -192,7 +190,6 @@ import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.ServerUtil; import org.apache.phoenix.util.UpgradeUtil; -import org.hamcrest.core.IsInstanceOf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -298,6 +295,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso private static final int ROW_KEY_ORDER_OPTIMIZABLE_INDEX = TABLE_KV_COLUMNS.indexOf(ROW_KEY_ORDER_OPTIMIZABLE_KV); private static final int TRANSACTIONAL_INDEX = TABLE_KV_COLUMNS.indexOf(TRANSACTIONAL_KV); private static final int UPDATE_CACHE_FREQUENCY_INDEX = TABLE_KV_COLUMNS.indexOf(UPDATE_CACHE_FREQUENCY_KV); + private static final int INDEX_DISABLE_TIMESTAMP = TABLE_KV_COLUMNS.indexOf(INDEX_DISABLE_TIMESTAMP_KV); // KeyValues for Column private static final KeyValue DECIMAL_DIGITS_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DECIMAL_DIGITS_BYTES); @@ -458,7 +456,23 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso return; } builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_ALREADY_EXISTS); - builder.setMutationTime(currentTime); + long disableIndexTimestamp = table.getIndexDisableTimestamp(); + long minNonZerodisableIndexTimestamp = disableIndexTimestamp > 0 ? disableIndexTimestamp : Long.MAX_VALUE; + for (PTable index : table.getIndexes()) { + disableIndexTimestamp = index.getIndexDisableTimestamp(); + if (disableIndexTimestamp > 0 && index.getIndexState() == PIndexState.ACTIVE && disableIndexTimestamp < minNonZerodisableIndexTimestamp) { + minNonZerodisableIndexTimestamp = disableIndexTimestamp; + } + } + // Freeze time for table at min non-zero value of INDEX_DISABLE_TIMESTAMP + // This will keep the table consistent with index as the table has had one more + // batch applied to it. + if (minNonZerodisableIndexTimestamp == Long.MAX_VALUE) { + builder.setMutationTime(currentTime); + } else { + // Subtract one because we add one due to timestamp granularity in Windows + builder.setMutationTime(minNonZerodisableIndexTimestamp - 1); + } if (table.getTimeStamp() != tableTimeStamp) { builder.setTable(PTableImpl.toProto(table)); @@ -482,11 +496,14 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso PTable oldTable = (PTable)metaDataCache.getIfPresent(cacheKey); long tableTimeStamp = oldTable == null ? MIN_TABLE_TIMESTAMP-1 : oldTable.getTimeStamp(); PTable newTable; + boolean blockWriteRebuildIndex = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE, + QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE); newTable = getTable(scanner, clientTimeStamp, tableTimeStamp); if (newTable == null) { return null; } - if (oldTable == null || tableTimeStamp < newTable.getTimeStamp()) { + if (oldTable == null || tableTimeStamp < newTable.getTimeStamp() + || (blockWriteRebuildIndex && newTable.getIndexDisableTimestamp() > 0)) { if (logger.isDebugEnabled()) { logger.debug("Caching table " + Bytes.toStringBinary(cacheKey.get(), cacheKey.getOffset(), @@ -819,7 +836,10 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso long updateCacheFrequency = updateCacheFrequencyKv == null ? 0 : PLong.INSTANCE.getCodec().decodeLong(updateCacheFrequencyKv.getValueArray(), updateCacheFrequencyKv.getValueOffset(), SortOrder.getDefault()); - + Cell indexDisableTimestampKv = tableKeyValues[INDEX_DISABLE_TIMESTAMP]; + long indexDisableTimestamp = indexDisableTimestampKv == null ? 0L : PLong.INSTANCE.getCodec().decodeLong(indexDisableTimestampKv.getValueArray(), + indexDisableTimestampKv.getValueOffset(), SortOrder.getDefault()); + List<PColumn> columns = Lists.newArrayListWithExpectedSize(columnCount); List<PTable> indexes = new ArrayList<PTable>(); List<PName> physicalTables = new ArrayList<PName>(); @@ -864,7 +884,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso tableSeqNum, pkName, saltBucketNum, columns, tableType == INDEX ? schemaName : null, tableType == INDEX ? dataTableName : null, indexes, isImmutableRows, physicalTables, defaultFamilyName, viewStatement, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, rowKeyOrderOptimizable, transactional, updateCacheFrequency, - stats, baseColumnCount); + stats, baseColumnCount, indexDisableTimestamp); } private PFunction getFunction(RegionScanner scanner, final boolean isReplace, long clientTimeStamp, List<Mutation> deleteMutationsForReplace) @@ -2410,19 +2430,6 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key); Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); - PTable table = (PTable)metaDataCache.getIfPresent(cacheKey); - // We only cache the latest, so we'll end up building the table with every call if the - // client connection has specified an SCN. - // TODO: If we indicate to the client that we're returning an older version, but there's a - // newer version available, the client - // can safely not call this, since we only allow modifications to the latest. - if (table != null && table.getTimeStamp() < clientTimeStamp) { - // Table on client is up-to-date with table on server, so just return - if (isTableDeleted(table)) { - return null; - } - return table; - } // Ask Lars about the expense of this call - if we don't take the lock, we still won't get // partial results // get the co-processor environment @@ -2434,6 +2441,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso * from getting rebuilt too often. */ final boolean wasLocked = (rowLock != null); + boolean blockWriteRebuildIndex = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE, + QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE); if (!wasLocked) { rowLock = region.getRowLock(key, true); if (rowLock == null) { @@ -2441,6 +2450,19 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } } try { + PTable table = (PTable)metaDataCache.getIfPresent(cacheKey); + // We only cache the latest, so we'll end up building the table with every call if the + // client connection has specified an SCN. + // TODO: If we indicate to the client that we're returning an older version, but there's a + // newer version available, the client + // can safely not call this, since we only allow modifications to the latest. + if (table != null && table.getTimeStamp() < clientTimeStamp) { + // Table on client is up-to-date with table on server, so just return + if (isTableDeleted(table)) { + return null; + } + return table; + } // Try cache again in case we were waiting on a lock table = (PTable)metaDataCache.getIfPresent(cacheKey); // We only cache the latest, so we'll end up building the table with every call if the @@ -2457,7 +2479,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } // Query for the latest table first, since it's not cached table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP); - if (table != null && table.getTimeStamp() < clientTimeStamp) { + if ((table != null && table.getTimeStamp() < clientTimeStamp) || + (blockWriteRebuildIndex && table.getIndexDisableTimestamp() > 0)) { return table; } // Otherwise, query for an older version of the table - it won't be cached @@ -2773,23 +2796,20 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso PIndexState.fromSerializedValue(currentStateKV.getValueArray()[currentStateKV .getValueOffset()]); - // check if we need reset disable time stamp - if( (newState == PIndexState.DISABLE) && - (currentState == PIndexState.DISABLE || currentState == PIndexState.INACTIVE) && - (currentDisableTimeStamp != null && currentDisableTimeStamp.getValueLength() > 0) && - (disableTimeStampKVIndex >= 0)) { - Long curTimeStampVal = (Long) PLong.INSTANCE.toObject(currentDisableTimeStamp.getValueArray(), - currentDisableTimeStamp.getValueOffset(), currentDisableTimeStamp.getValueLength()); + if ((currentDisableTimeStamp != null && currentDisableTimeStamp.getValueLength() > 0) && + (disableTimeStampKVIndex >= 0)) { + long curTimeStampVal = (Long) PLong.INSTANCE.toObject(currentDisableTimeStamp.getValueArray(), + currentDisableTimeStamp.getValueOffset(), currentDisableTimeStamp.getValueLength()); // new DisableTimeStamp is passed in Cell newDisableTimeStampCell = newKVs.get(disableTimeStampKVIndex); - Long newDisableTimeStamp = (Long) PLong.INSTANCE.toObject(newDisableTimeStampCell.getValueArray(), - newDisableTimeStampCell.getValueOffset(), newDisableTimeStampCell.getValueLength()); + long newDisableTimeStamp = (Long) PLong.INSTANCE.toObject(newDisableTimeStampCell.getValueArray(), + newDisableTimeStampCell.getValueOffset(), newDisableTimeStampCell.getValueLength()); if(curTimeStampVal > 0 && curTimeStampVal < newDisableTimeStamp){ // not reset disable timestamp newKVs.remove(disableTimeStampKVIndex); + disableTimeStampKVIndex = -1; } } - // Detect invalid transitions if (currentState == PIndexState.BUILDING) { if (newState == PIndexState.USABLE) { @@ -2827,7 +2847,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } PTable returnTable = null; - if (currentState != newState) { + if (currentState != newState || disableTimeStampKVIndex != -1) { byte[] dataTableKey = null; if(dataTableKV != null) { dataTableKey = SchemaUtil.getTableKey(tenantId, schemaName, dataTableKV.getValue()); @@ -2837,7 +2857,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso tableMetadata = new ArrayList<Mutation>(tableMetadata); // insert an empty KV to trigger time stamp update on data table row Put p = new Put(dataTableKey); - p.add(TABLE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, timeStamp, ByteUtil.EMPTY_BYTE_ARRAY); + p.add(TABLE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, timeStamp, QueryConstants.EMPTY_COLUMN_VALUE_BYTES); tableMetadata.add(p); } boolean setRowKeyOrderOptimizableCell = newState == PIndexState.BUILDING && !rowKeyOrderOptimizable; @@ -2854,7 +2874,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso if(dataTableKey != null) { metaDataCache.invalidate(new ImmutableBytesPtr(dataTableKey)); } - if (setRowKeyOrderOptimizableCell) { + if (setRowKeyOrderOptimizableCell || disableTimeStampKVIndex != -1) { returnTable = doGetTable(key, HConstants.LATEST_TIMESTAMP, rowLock); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java index a2f7282..4e019cd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java @@ -72,6 +72,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver { protected ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); private boolean enableRebuildIndex = QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD; private long rebuildIndexTimeInterval = QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL; + private boolean blockWriteRebuildIndex = false; @Override public void preClose(final ObserverContext<RegionCoprocessorEnvironment> c, @@ -98,6 +99,8 @@ public class MetaDataRegionObserver extends BaseRegionObserver { QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD); rebuildIndexTimeInterval = env.getConfiguration().getLong(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL); + blockWriteRebuildIndex = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE, + QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE); } private static String getJdbcUrl(RegionCoprocessorEnvironment env) { @@ -145,7 +148,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver { }; (new Thread(r)).start(); - if (!enableRebuildIndex) { + if (!enableRebuildIndex && !blockWriteRebuildIndex) { LOG.info("Failure Index Rebuild is skipped by configuration."); return; } @@ -181,8 +184,14 @@ public class MetaDataRegionObserver extends BaseRegionObserver { @Override public void run() { + // FIXME: we should replay the data table Put, as doing a partial index build would only add + // the new rows and not delete the previous index value. Also, we should restrict the scan + // to only data within this region (as otherwise *every* region will be running this code + // separately, all updating the same data. RegionScanner scanner = null; PhoenixConnection conn = null; + boolean blockWriteRebuildIndex = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE, + QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE); if (inProgress.get() > 0) { LOG.debug("New ScheduledBuildIndexTask skipped as there is already one running"); return; @@ -192,7 +201,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver { Scan scan = new Scan(); SingleColumnValueFilter filter = new SingleColumnValueFilter(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES, - CompareFilter.CompareOp.NOT_EQUAL, PLong.INSTANCE.toBytes(0L)); + CompareFilter.CompareOp.GREATER, PLong.INSTANCE.toBytes(0L)); filter.setFilterIfMissing(true); scan.setFilter(filter); scan.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, @@ -233,11 +242,14 @@ public class MetaDataRegionObserver extends BaseRegionObserver { byte[] indexStat = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES); if ((dataTable == null || dataTable.length == 0) - || (indexStat == null || indexStat.length == 0) - || ((Bytes.compareTo(PIndexState.DISABLE.getSerializedBytes(), indexStat) != 0) + || (indexStat == null || indexStat.length == 0)) { + // data table name can't be empty + continue; + } + + if (!blockWriteRebuildIndex && ((Bytes.compareTo(PIndexState.DISABLE.getSerializedBytes(), indexStat) != 0) && (Bytes.compareTo(PIndexState.INACTIVE.getSerializedBytes(), indexStat) != 0))) { // index has to be either in disable or inactive state - // data table name can't be empty continue; } @@ -254,6 +266,9 @@ public class MetaDataRegionObserver extends BaseRegionObserver { if (conn == null) { final Properties props = new Properties(); + // Set SCN so that we don't ping server and have the upper bound set back to + // the timestamp when the failure occurred. + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(Long.MAX_VALUE)); // don't run a second index populations upsert select props.setProperty(QueryServices.INDEX_POPULATION_SLEEP_TIME, "0"); conn = DriverManager.getConnection(getJdbcUrl(env), props).unwrap(PhoenixConnection.class); @@ -276,7 +291,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver { long timeStamp = Math.max(0, disabledTimeStampVal - overlapTime); LOG.info("Starting to build index=" + indexPTable.getName() + " from timestamp=" + timeStamp); - client.buildPartialIndexFromTimeStamp(indexPTable, new TableRef(dataPTable, Long.MAX_VALUE, timeStamp)); + client.buildPartialIndexFromTimeStamp(indexPTable, new TableRef(dataPTable, Long.MAX_VALUE, timeStamp), blockWriteRebuildIndex); } while (hasMore); } catch (Throwable t) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java index f74ed0b..9fdfe51 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java @@ -3328,6 +3328,16 @@ public final class PTableProtos { * <code>optional int64 updateCacheFrequency = 28;</code> */ long getUpdateCacheFrequency(); + + // optional int64 indexDisableTimestamp = 29; + /** + * <code>optional int64 indexDisableTimestamp = 29;</code> + */ + boolean hasIndexDisableTimestamp(); + /** + * <code>optional int64 indexDisableTimestamp = 29;</code> + */ + long getIndexDisableTimestamp(); } /** * Protobuf type {@code PTable} @@ -3538,6 +3548,11 @@ public final class PTableProtos { updateCacheFrequency_ = input.readInt64(); break; } + case 232: { + bitField0_ |= 0x01000000; + indexDisableTimestamp_ = input.readInt64(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -4132,6 +4147,22 @@ public final class PTableProtos { return updateCacheFrequency_; } + // optional int64 indexDisableTimestamp = 29; + public static final int INDEXDISABLETIMESTAMP_FIELD_NUMBER = 29; + private long indexDisableTimestamp_; + /** + * <code>optional int64 indexDisableTimestamp = 29;</code> + */ + public boolean hasIndexDisableTimestamp() { + return ((bitField0_ & 0x01000000) == 0x01000000); + } + /** + * <code>optional int64 indexDisableTimestamp = 29;</code> + */ + public long getIndexDisableTimestamp() { + return indexDisableTimestamp_; + } + private void initFields() { schemaNameBytes_ = com.google.protobuf.ByteString.EMPTY; tableNameBytes_ = com.google.protobuf.ByteString.EMPTY; @@ -4161,6 +4192,7 @@ public final class PTableProtos { rowKeyOrderOptimizable_ = false; transactional_ = false; updateCacheFrequency_ = 0L; + indexDisableTimestamp_ = 0L; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -4312,6 +4344,9 @@ public final class PTableProtos { if (((bitField0_ & 0x00800000) == 0x00800000)) { output.writeInt64(28, updateCacheFrequency_); } + if (((bitField0_ & 0x01000000) == 0x01000000)) { + output.writeInt64(29, indexDisableTimestamp_); + } getUnknownFields().writeTo(output); } @@ -4438,6 +4473,10 @@ public final class PTableProtos { size += com.google.protobuf.CodedOutputStream .computeInt64Size(28, updateCacheFrequency_); } + if (((bitField0_ & 0x01000000) == 0x01000000)) { + size += com.google.protobuf.CodedOutputStream + .computeInt64Size(29, indexDisableTimestamp_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -4589,6 +4628,11 @@ public final class PTableProtos { result = result && (getUpdateCacheFrequency() == other.getUpdateCacheFrequency()); } + result = result && (hasIndexDisableTimestamp() == other.hasIndexDisableTimestamp()); + if (hasIndexDisableTimestamp()) { + result = result && (getIndexDisableTimestamp() + == other.getIndexDisableTimestamp()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -4714,6 +4758,10 @@ public final class PTableProtos { hash = (37 * hash) + UPDATECACHEFREQUENCY_FIELD_NUMBER; hash = (53 * hash) + hashLong(getUpdateCacheFrequency()); } + if (hasIndexDisableTimestamp()) { + hash = (37 * hash) + INDEXDISABLETIMESTAMP_FIELD_NUMBER; + hash = (53 * hash) + hashLong(getIndexDisableTimestamp()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -4894,6 +4942,8 @@ public final class PTableProtos { bitField0_ = (bitField0_ & ~0x04000000); updateCacheFrequency_ = 0L; bitField0_ = (bitField0_ & ~0x08000000); + indexDisableTimestamp_ = 0L; + bitField0_ = (bitField0_ & ~0x10000000); return this; } @@ -5050,6 +5100,10 @@ public final class PTableProtos { to_bitField0_ |= 0x00800000; } result.updateCacheFrequency_ = updateCacheFrequency_; + if (((from_bitField0_ & 0x10000000) == 0x10000000)) { + to_bitField0_ |= 0x01000000; + } + result.indexDisableTimestamp_ = indexDisableTimestamp_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -5228,6 +5282,9 @@ public final class PTableProtos { if (other.hasUpdateCacheFrequency()) { setUpdateCacheFrequency(other.getUpdateCacheFrequency()); } + if (other.hasIndexDisableTimestamp()) { + setIndexDisableTimestamp(other.getIndexDisableTimestamp()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -6964,6 +7021,39 @@ public final class PTableProtos { return this; } + // optional int64 indexDisableTimestamp = 29; + private long indexDisableTimestamp_ ; + /** + * <code>optional int64 indexDisableTimestamp = 29;</code> + */ + public boolean hasIndexDisableTimestamp() { + return ((bitField0_ & 0x10000000) == 0x10000000); + } + /** + * <code>optional int64 indexDisableTimestamp = 29;</code> + */ + public long getIndexDisableTimestamp() { + return indexDisableTimestamp_; + } + /** + * <code>optional int64 indexDisableTimestamp = 29;</code> + */ + public Builder setIndexDisableTimestamp(long value) { + bitField0_ |= 0x10000000; + indexDisableTimestamp_ = value; + onChanged(); + return this; + } + /** + * <code>optional int64 indexDisableTimestamp = 29;</code> + */ + public Builder clearIndexDisableTimestamp() { + bitField0_ = (bitField0_ & ~0x10000000); + indexDisableTimestamp_ = 0L; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:PTable) } @@ -7011,7 +7101,7 @@ public final class PTableProtos { "\016\n\006values\030\002 \003(\014\022\033\n\023guidePostsByteCount\030\003", " \001(\003\022\025\n\rkeyBytesCount\030\004 \001(\003\022\027\n\017guidePost" + "sCount\030\005 \001(\005\022!\n\013pGuidePosts\030\006 \001(\0132\014.PGui" + - "dePosts\"\244\005\n\006PTable\022\027\n\017schemaNameBytes\030\001 " + + "dePosts\"\303\005\n\006PTable\022\027\n\017schemaNameBytes\030\001 " + "\002(\014\022\026\n\016tableNameBytes\030\002 \002(\014\022\036\n\ttableType" + "\030\003 \002(\0162\013.PTableType\022\022\n\nindexState\030\004 \001(\t\022" + "\026\n\016sequenceNumber\030\005 \002(\003\022\021\n\ttimeStamp\030\006 \002" + @@ -7028,10 +7118,11 @@ public final class PTableProtos { "storeNulls\030\030 \001(\010\022\027\n\017baseColumnCount\030\031 \001(" + "\005\022\036\n\026rowKeyOrderOptimizable\030\032 \001(\010\022\025\n\rtra" + "nsactional\030\033 \001(\010\022\034\n\024updateCacheFrequency" + - "\030\034 \001(\003*A\n\nPTableType\022\n\n\006SYSTEM\020\000\022\010\n\004USER", - "\020\001\022\010\n\004VIEW\020\002\022\t\n\005INDEX\020\003\022\010\n\004JOIN\020\004B@\n(org" + - ".apache.phoenix.coprocessor.generatedB\014P" + - "TableProtosH\001\210\001\001\240\001\001" + "\030\034 \001(\003\022\035\n\025indexDisableTimestamp\030\035 \001(\003*A\n", + "\nPTableType\022\n\n\006SYSTEM\020\000\022\010\n\004USER\020\001\022\010\n\004VIE" + + "W\020\002\022\t\n\005INDEX\020\003\022\010\n\004JOIN\020\004B@\n(org.apache.p" + + "hoenix.coprocessor.generatedB\014PTableProt" + + "osH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -7055,7 +7146,7 @@ public final class PTableProtos { internal_static_PTable_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_PTable_descriptor, - new java.lang.String[] { "SchemaNameBytes", "TableNameBytes", "TableType", "IndexState", "SequenceNumber", "TimeStamp", "PkNameBytes", "BucketNum", "Columns", "Indexes", "IsImmutableRows", "GuidePosts", "DataTableNameBytes", "DefaultFamilyName", "DisableWAL", "MultiTenant", "ViewType", "ViewStatement", "PhysicalNames", "TenantId", "ViewIndexId", "IndexType", "StatsTimeStamp", "StoreNulls", "BaseColumnCount", "RowKeyOrderOptimizable", "Transactional", "UpdateCacheFrequency", }); + new java.lang.String[] { "SchemaNameBytes", "TableNameBytes", "TableType", "IndexState", "SequenceNumber", "TimeStamp", "PkNameBytes", "BucketNum", "Columns", "Indexes", "IsImmutableRows", "GuidePosts", "DataTableNameBytes", "DefaultFamilyName", "DisableWAL", "MultiTenant", "ViewType", "ViewStatement", "PhysicalNames", "TenantId", "ViewIndexId", "IndexType", "StatsTimeStamp", "StoreNulls", "BaseColumnCount", "RowKeyOrderOptimizable", "Transactional", "UpdateCacheFrequency", "IndexDisableTimestamp", }); return null; } }; http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java index b1d8e7d..7ddd14c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java @@ -338,6 +338,8 @@ public enum SQLExceptionCode { CANNOT_SPLIT_LOCAL_INDEX(1109,"XCL09", "Local index may not be pre-split."), CANNOT_SALT_LOCAL_INDEX(1110,"XCL10", "Local index may not be salted."), + INDEX_FAILURE_BLOCK_WRITE(1120, "XCL20", "Writes to table blocked until index can be updated."), + /** * Implementation defined class. Phoenix internal error. (errorcode 20, sqlstate INT). */ http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 1658962..6095089 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -37,6 +37,18 @@ import java.util.concurrent.TimeoutException; import javax.annotation.Nonnull; import javax.annotation.concurrent.Immutable; +import co.cask.tephra.Transaction; +import co.cask.tephra.Transaction.VisibilityLevel; +import co.cask.tephra.TransactionAware; +import co.cask.tephra.TransactionCodec; +import co.cask.tephra.TransactionConflictException; +import co.cask.tephra.TransactionContext; +import co.cask.tephra.TransactionFailureException; +import co.cask.tephra.TransactionSystemClient; +import co.cask.tephra.hbase11.TransactionAwareHTable; +import co.cask.tephra.visibility.FenceWait; +import co.cask.tephra.visibility.VisibilityFence; + import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTableInterface; @@ -68,6 +80,7 @@ import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.IllegalDataException; import org.apache.phoenix.schema.MetaDataClient; import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PMetaData; import org.apache.phoenix.schema.PRow; import org.apache.phoenix.schema.PTable; @@ -91,18 +104,6 @@ import org.apache.phoenix.util.TransactionUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import co.cask.tephra.Transaction; -import co.cask.tephra.Transaction.VisibilityLevel; -import co.cask.tephra.TransactionAware; -import co.cask.tephra.TransactionCodec; -import co.cask.tephra.TransactionConflictException; -import co.cask.tephra.TransactionContext; -import co.cask.tephra.TransactionFailureException; -import co.cask.tephra.TransactionSystemClient; -import co.cask.tephra.hbase11.TransactionAwareHTable; -import co.cask.tephra.visibility.FenceWait; -import co.cask.tephra.visibility.VisibilityFence; - import com.google.common.base.Predicate; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; @@ -737,6 +738,16 @@ public class MutationState implements SQLCloseable { // Always update tableRef table as the one we've cached may be out of date since when we executed // the UPSERT VALUES call and updated in the cache before this. tableRef.setTable(resolvedTable); + List<PTable> indexes = resolvedTable.getIndexes(); + for (PTable idxTtable : indexes) { + // If index is still active, but has a non zero INDEX_DISABLE_TIMESTAMP value, then infer that + // our failure mode is block writes on index failure. + if (idxTtable.getIndexState() == PIndexState.ACTIVE && idxTtable.getIndexDisableTimestamp() > 0) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.INDEX_FAILURE_BLOCK_WRITE) + .setSchemaName(table.getSchemaName().getString()) + .setTableName(table.getTableName().getString()).build().buildException(); + } + } long timestamp = result.getMutationTime(); if (timestamp != QueryConstants.UNSET_TIMESTAMP) { serverTimeStamp = timestamp; @@ -748,8 +759,8 @@ public class MutationState implements SQLCloseable { Map<PColumn, byte[]> colValues = valueEntry.getColumnValues(); if (colValues != PRow.DELETE_MARKER) { for (PColumn column : colValues.keySet()) { - if (!column.isDynamic()) - columns.add(column); + if (!column.isDynamic()) + columns.add(column); } } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/DelegateIndexFailurePolicy.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/DelegateIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/DelegateIndexFailurePolicy.java new file mode 100644 index 0000000..a7fb7ec --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/DelegateIndexFailurePolicy.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hbase.index.write; + +import java.io.IOException; + +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.phoenix.hbase.index.table.HTableInterfaceReference; + +import com.google.common.collect.Multimap; + +public class DelegateIndexFailurePolicy implements IndexFailurePolicy { + + private final IndexFailurePolicy delegate; + + public DelegateIndexFailurePolicy(IndexFailurePolicy delegate) { + this.delegate = delegate; + } + + @Override + public void handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause) + throws IOException { + delegate.handleFailure(attempted, cause); + } + + @Override + public boolean isStopped() { + return delegate.isStopped(); + } + + @Override + public void setup(Stoppable parent, RegionCoprocessorEnvironment env) { + delegate.setup(parent, env); + } + + @Override + public void stop(String arg0) { + delegate.stop(arg0); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java index 09a8676..c7ed49b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java @@ -49,10 +49,13 @@ import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest; import org.apache.phoenix.hbase.index.table.HTableInterfaceReference; +import org.apache.phoenix.hbase.index.write.DelegateIndexFailurePolicy; import org.apache.phoenix.hbase.index.write.KillServerOnFailurePolicy; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.protobuf.ProtobufUtil; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; @@ -71,22 +74,19 @@ import com.google.common.collect.Multimap; * region server. First attempts to disable the index and failing that falls * back to the default behavior of killing the region server. * - * TODO: use delegate pattern instead - * - * - * @since 2.1 */ -public class PhoenixIndexFailurePolicy extends KillServerOnFailurePolicy { +public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy { private static final Log LOG = LogFactory.getLog(PhoenixIndexFailurePolicy.class); private RegionCoprocessorEnvironment env; public PhoenixIndexFailurePolicy() { + super(new KillServerOnFailurePolicy()); } @Override public void setup(Stoppable parent, RegionCoprocessorEnvironment env) { - super.setup(parent, env); - this.env = env; + super.setup(parent, env); + this.env = env; } /** @@ -101,9 +101,11 @@ public class PhoenixIndexFailurePolicy extends KillServerOnFailurePolicy { */ @Override public void handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause) throws IOException { + boolean blockWriteRebuildIndex = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE, + QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE); boolean throwing = true; try { - handleFailureWithExceptions(attempted, cause); + handleFailureWithExceptions(attempted, cause, blockWriteRebuildIndex); throwing = false; } catch (Throwable t) { LOG.warn("handleFailure failed", t); @@ -115,7 +117,7 @@ public class PhoenixIndexFailurePolicy extends KillServerOnFailurePolicy { } private void handleFailureWithExceptions(Multimap<HTableInterfaceReference, Mutation> attempted, - Exception cause) throws Throwable { + Exception cause, boolean blockWriteRebuildIndex) throws Throwable { Set<HTableInterfaceReference> refs = attempted.asMap().keySet(); Map<String, Long> indexTableNames = new HashMap<String, Long>(refs.size()); // start by looking at all the tables to which we attempted to write @@ -157,8 +159,12 @@ public class PhoenixIndexFailurePolicy extends KillServerOnFailurePolicy { env.getTable(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES)); // Mimic the Put that gets generated by the client on an update of the index state Put put = new Put(indexTableKey); - put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES, - PIndexState.DISABLE.getSerializedBytes()); + if (blockWriteRebuildIndex) + put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES, + PIndexState.ACTIVE.getSerializedBytes()); + else + put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES, + PIndexState.DISABLE.getSerializedBytes()); put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES, PLong.INSTANCE.toBytes(minTimeStamp)); final List<Mutation> tableMetadata = Collections.<Mutation>singletonList(put); @@ -194,12 +200,22 @@ public class PhoenixIndexFailurePolicy extends KillServerOnFailurePolicy { continue; } if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) { - LOG.warn("Attempt to disable index " + indexTableName + " failed with code = " - + result.getMutationCode() + ". Will use default failure policy instead."); - throw new DoNotRetryIOException("Attempt to disable " + indexTableName + " failed."); + if (blockWriteRebuildIndex) { + LOG.warn("Attempt to update INDEX_DISABLE_TIMESTAMP " + " failed with code = " + + result.getMutationCode()); + throw new DoNotRetryIOException("Attempt to update INDEX_DISABLE_TIMESTAMP failed."); + } else { + LOG.warn("Attempt to disable index " + indexTableName + " failed with code = " + + result.getMutationCode() + ". Will use default failure policy instead."); + throw new DoNotRetryIOException("Attempt to disable " + indexTableName + " failed."); + } } - LOG.info("Successfully disabled index " + indexTableName + " due to an exception while writing updates.", - cause); + if (blockWriteRebuildIndex) + LOG.info("Successfully update INDEX_DISABLE_TIMESTAMP for " + indexTableName + " due to an exception while writing updates.", + cause); + else + LOG.info("Successfully disabled index " + indexTableName + " due to an exception while writing updates.", + cause); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index b0e7b6e..fe40d60 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -129,6 +129,9 @@ public interface QueryServices extends SQLCloseable { public static final String INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB = "phoenix.index.failure.handling.rebuild.interval"; + // A master switch if to block writes when index build failed + public static final String INDEX_FAILURE_BLOCK_WRITE = "phoenix.index.failure.block.write"; + // Index will be partially re-built from index disable time stamp - following overlap time public static final String INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB = "phoenix.index.failure.handling.rebuild.overlap.time"; http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 1838b51..62297ee 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -152,6 +152,7 @@ public class QueryServicesOptions { public static final int DEFAULT_GROUPBY_ESTIMATED_DISTINCT_VALUES = 1000; public static final int DEFAULT_CLOCK_SKEW_INTERVAL = 2000; public static final boolean DEFAULT_INDEX_FAILURE_HANDLING_REBUILD = true; // auto rebuild on + public static final boolean DEFAULT_INDEX_FAILURE_BLOCK_WRITE = false; public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL = 10000; // 10 secs public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME = 300000; // 5 mins http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java index e7bf961..b294f03 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java @@ -32,6 +32,11 @@ public class DelegateTable implements PTable { } @Override + public long getIndexDisableTimestamp() { + return delegate.getIndexDisableTimestamp(); + } + + @Override public long getSequenceNumber() { return delegate.getSequenceNumber(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 0456335..6409dcd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -1098,29 +1098,33 @@ public class MetaDataClient { /** * Rebuild indexes from a timestamp which is the value from hbase row key timestamp field */ - public void buildPartialIndexFromTimeStamp(PTable index, TableRef dataTableRef) throws SQLException { - boolean needRestoreIndexState = false; - // Need to change index state from Disable to InActive when build index partially so that - // new changes will be indexed during index rebuilding - AlterIndexStatement indexStatement = FACTORY.alterIndex(FACTORY.namedTable(null, - TableName.create(index.getSchemaName().getString(), index.getTableName().getString())), - dataTableRef.getTable().getTableName().getString(), false, PIndexState.INACTIVE); - alterIndex(indexStatement); - needRestoreIndexState = true; + public void buildPartialIndexFromTimeStamp(PTable index, TableRef dataTableRef, boolean blockWriteRebuildIndex) throws SQLException { + boolean needRestoreIndexState = true; + AlterIndexStatement indexStatement = null; + if (!blockWriteRebuildIndex) { + // Need to change index state from Disable to InActive when build index partially so that + // new changes will be indexed during index rebuilding + indexStatement = FACTORY.alterIndex(FACTORY.namedTable(null, + TableName.create(index.getSchemaName().getString(), index.getTableName().getString())), + dataTableRef.getTable().getTableName().getString(), false, PIndexState.INACTIVE); + alterIndex(indexStatement); + } try { buildIndex(index, dataTableRef); needRestoreIndexState = false; } finally { if(needRestoreIndexState) { - // reset index state to disable - indexStatement = FACTORY.alterIndex(FACTORY.namedTable(null, - TableName.create(index.getSchemaName().getString(), index.getTableName().getString())), - dataTableRef.getTable().getTableName().getString(), false, PIndexState.DISABLE); - alterIndex(indexStatement); + if (!blockWriteRebuildIndex) { + // reset index state to disable + indexStatement = FACTORY.alterIndex(FACTORY.namedTable(null, + TableName.create(index.getSchemaName().getString(), index.getTableName().getString())), + dataTableRef.getTable().getTableName().getString(), false, PIndexState.DISABLE); + alterIndex(indexStatement); + } } } } - + /** * Create an index table by morphing the CreateIndexStatement into a CreateTableStatement and calling * MetaDataClient.createTable. In doing so, we perform the following translations: @@ -2004,7 +2008,7 @@ public class MetaDataClient { Collections.<PTable>emptyList(), isImmutableRows, Collections.<PName>emptyList(), defaultFamilyName == null ? null : PNameFactory.newName(defaultFamilyName), null, - Boolean.TRUE.equals(disableWAL), false, false, null, indexId, indexType, true, false, 0); + Boolean.TRUE.equals(disableWAL), false, false, null, indexId, indexType, true, false, 0, 0L); connection.addTable(table, MetaDataProtocol.MIN_TABLE_TIMESTAMP); } else if (tableType == PTableType.INDEX && indexId == null) { if (tableProps.get(HTableDescriptor.MAX_FILESIZE) == null) { @@ -2174,7 +2178,7 @@ public class MetaDataClient { PTable.INITIAL_SEQ_NUM, pkName == null ? null : PNameFactory.newName(pkName), saltBucketNum, columns, dataTableName == null ? null : newSchemaName, dataTableName == null ? null : PNameFactory.newName(dataTableName), Collections.<PTable>emptyList(), isImmutableRows, physicalNames, defaultFamilyName == null ? null : PNameFactory.newName(defaultFamilyName), viewStatement, Boolean.TRUE.equals(disableWAL), multiTenant, storeNulls, viewType, - indexId, indexType, rowKeyOrderOptimizable, transactional, updateCacheFrequency); + indexId, indexType, rowKeyOrderOptimizable, transactional, updateCacheFrequency, 0L); result = new MetaDataMutationResult(code, result.getMutationTime(), table, true); addTableToCache(result); return table; http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2a6386f/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java index 4a338f6..b2a1d58 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java @@ -157,6 +157,7 @@ public interface PTable extends PMetaDataEntity { long getTimeStamp(); long getSequenceNumber(); + long getIndexDisableTimestamp(); /** * @return table name */