PHOENIX-4130 Avoid server retries for mutable indexes
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b539cd62 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b539cd62 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b539cd62 Branch: refs/heads/4.x-cdh5.11.2 Commit: b539cd625bec0d625ddd1dfd61d3b4f58abeabd2 Parents: 00940b3 Author: Vincent Poon <vincentp...@apache.org> Authored: Mon Jan 29 23:06:12 2018 +0000 Committer: Pedro Boado <pbo...@apache.org> Committed: Sun Feb 11 15:54:05 2018 +0000 ---------------------------------------------------------------------- .../end2end/index/MutableIndexFailureIT.java | 12 +- .../end2end/index/PartialIndexRebuilderIT.java | 76 ++++++-- .../coprocessor/MetaDataEndpointImpl.java | 53 ++++-- .../phoenix/coprocessor/MetaDataProtocol.java | 6 +- .../coprocessor/MetaDataRegionObserver.java | 19 +- .../UngroupedAggregateRegionObserver.java | 82 ++++++-- .../phoenix/exception/SQLExceptionCode.java | 1 + .../apache/phoenix/execute/MutationState.java | 39 +++- .../org/apache/phoenix/hbase/index/Indexer.java | 10 - .../index/exception/IndexWriteException.java | 49 ++++- .../MultiIndexWriteFailureException.java | 29 ++- .../SingleIndexWriteFailureException.java | 23 ++- .../hbase/index/write/IndexWriterUtils.java | 14 +- .../write/ParallelWriterIndexCommitter.java | 5 +- .../TrackingParallelWriterIndexCommitter.java | 5 +- .../index/PhoenixIndexFailurePolicy.java | 189 +++++++++++++++++-- .../phoenix/jdbc/PhoenixDatabaseMetaData.java | 1 + .../apache/phoenix/optimize/QueryOptimizer.java | 29 ++- .../org/apache/phoenix/query/QueryServices.java | 2 + .../phoenix/query/QueryServicesOptions.java | 1 + .../org/apache/phoenix/schema/PIndexState.java | 7 +- .../org/apache/phoenix/util/KeyValueUtil.java | 12 ++ .../org/apache/phoenix/util/ServerUtil.java | 23 ++- .../hbase/index/write/TestIndexWriter.java | 8 + .../index/write/TestParalleIndexWriter.java | 6 + .../write/TestParalleWriterIndexCommitter.java | 6 + 26 files changed, 591 insertions(+), 116 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/b539cd62/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java index 0318925..c2e0cb6 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java @@ -29,7 +29,6 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Properties; @@ -130,7 +129,6 @@ public class MutableIndexFailureIT extends BaseTest { public static void doSetup() throws Exception { Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10); serverProps.put("hbase.coprocessor.region.classes", FailingRegionObserver.class.getName()); - serverProps.put(IndexWriterUtils.INDEX_WRITER_RPC_RETRIES_NUMBER, "2"); serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000"); serverProps.put(IndexWriterUtils.INDEX_WRITER_RPC_PAUSE, "5000"); serverProps.put("data.tx.snapshot.dir", "/tmp"); @@ -144,7 +142,8 @@ public class MutableIndexFailureIT extends BaseTest { * because we want to control it's execution ourselves */ serverProps.put(QueryServices.INDEX_REBUILD_TASK_INITIAL_DELAY, Long.toString(Long.MAX_VALUE)); - Map<String, String> clientProps = Collections.singletonMap(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString()); + Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2); + clientProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2"); NUM_SLAVES_BASE = 4; setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator())); indexRebuildTaskRegionEnvironment = @@ -161,7 +160,8 @@ public class MutableIndexFailureIT extends BaseTest { @Parameters(name = "MutableIndexFailureIT_transactional={0},localIndex={1},isNamespaceMapped={2},disableIndexOnWriteFailure={3},failRebuildTask={4},throwIndexWriteFailure={5}") // name is used by failsafe as file name in reports public static List<Object[]> data() { return Arrays.asList(new Object[][] { - { false, false, false, true, false, false}, + // note - can't disableIndexOnWriteFailure without throwIndexWriteFailure, PHOENIX-4130 + { false, false, false, false, false, false}, { false, false, true, true, false, null}, { false, false, true, true, false, true}, { false, false, false, true, false, null}, @@ -181,8 +181,8 @@ public class MutableIndexFailureIT extends BaseTest { { false, true, false, true, false, null}, { false, false, false, true, true, null}, { false, false, true, true, true, null}, - { false, false, false, true, true, false}, - { false, false, true, true, true, false}, + { false, false, false, false, true, false}, + { false, false, true, false, true, false}, } ); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b539cd62/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java index 31649bd..dd986aa 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java @@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; @@ -46,12 +47,13 @@ import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.phoenix.coprocessor.MetaDataRegionObserver; import org.apache.phoenix.coprocessor.MetaDataRegionObserver.BuildIndexScheduleTask; import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT; -import org.apache.phoenix.exception.PhoenixIOException; +import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.execute.CommitException; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PMetaData; import org.apache.phoenix.schema.PTable; @@ -94,7 +96,9 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { serverProps.put(QueryServices.INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD, "50000000"); serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_PERIOD, Long.toString(REBUILD_PERIOD)); // batch at 50 seconds serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_FORWARD_TIME_ATTRIB, Long.toString(WAIT_AFTER_DISABLED)); - setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), ReadOnlyProps.EMPTY_PROPS); + Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1); + clientProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2"); + setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator())); indexRebuildTaskRegionEnvironment = (RegionCoprocessorEnvironment) getUtility() .getRSForFirstRegionInTable( @@ -1027,6 +1031,51 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { } } + // Tests that when we've been in PENDING_DISABLE for too long, queries don't use the index, + // and the rebuilder should mark the index DISABLED + @Test + public void testPendingDisable() throws Throwable { + String schemaName = generateUniqueName(); + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + final String fullTableName = SchemaUtil.getTableName(schemaName, tableName); + final String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); + final MyClock clock = new MyClock(1000); + EnvironmentEdgeManager.injectEdge(clock); + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR, v3 VARCHAR) COLUMN_ENCODED_BYTES = 0, DISABLE_INDEX_ON_WRITE_FAILURE = TRUE"); + clock.time += 100; + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v1, v2)"); + clock.time += 100; + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a','0')"); + conn.commit(); + clock.time += 100; + HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); + IndexUtil.updateIndexState(fullIndexName, clock.currentTime(), metaTable, PIndexState.PENDING_DISABLE); + Configuration conf = + conn.unwrap(PhoenixConnection.class).getQueryServices().getConfiguration(); + // under threshold should use the index + PhoenixStatement stmt = conn.createStatement().unwrap(PhoenixStatement.class); + ResultSet rs = stmt.executeQuery("SELECT V2 FROM " + fullTableName + " WHERE V1 = 'a'"); + assertTrue(rs.next()); + assertEquals("0", rs.getString(1)); + assertEquals(fullIndexName, stmt.getQueryPlan().getContext().getCurrentTable().getTable().getName().getString()); + // over threshold should not use the index + long pendingDisableThreshold = conf.getLong(QueryServices.INDEX_PENDING_DISABLE_THRESHOLD, + QueryServicesOptions.DEFAULT_INDEX_PENDING_DISABLE_THRESHOLD); + clock.time += pendingDisableThreshold + 1000; + stmt = conn.createStatement().unwrap(PhoenixStatement.class); + rs = stmt.executeQuery("SELECT V2 FROM " + fullTableName + " WHERE V1 = 'a'"); + assertTrue(rs.next()); + assertEquals("0", rs.getString(1)); + assertEquals(fullTableName, stmt.getQueryPlan().getContext().getCurrentTable().getTable().getName().getString()); + // if we're over the threshold, the rebuilder should disable the index + waitForIndexState(conn, fullTableName, fullIndexName, PIndexState.DISABLE); + } finally { + EnvironmentEdgeManager.reset(); + } + } + //Tests that when we're updating an index from within the RS (e.g. UngruopedAggregateRegionObserver), // if the index write fails the index gets disabled @Test @@ -1048,22 +1097,9 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { try { conn.createStatement().execute("DELETE FROM " + fullTableName); fail(); - } catch (CommitException|PhoenixIOException e) { - // Expected - } - assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.DISABLE, null)); - // reset the index state to ACTIVE - HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); - IndexUtil.updateIndexState(fullIndexName, 0, metaTable, PIndexState.INACTIVE); - IndexUtil.updateIndexState(fullIndexName, 0, metaTable, PIndexState.ACTIVE); - TestUtil.removeCoprocessor(conn, fullIndexName, WriteFailingRegionObserver.class); - conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a','0', 't')"); - TestUtil.addCoprocessor(conn, fullIndexName, WriteFailingRegionObserver.class); - try { - conn.createStatement().execute("DELETE FROM " + fullTableName + " WHERE v1='a'"); - fail(); - } catch (CommitException|PhoenixIOException e) { + } catch (SQLException e) { // Expected + assertEquals(SQLExceptionCode.INDEX_WRITE_FAILURE.getErrorCode(), e.getErrorCode()); } assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.DISABLE, null)); } finally { @@ -1075,6 +1111,12 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { public static class WriteFailingRegionObserver extends SimpleRegionObserver { @Override public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { + // we need to advance the clock, since the index retry logic (copied from HBase) has a time component + EnvironmentEdge delegate = EnvironmentEdgeManager.getDelegate(); + if (delegate instanceof MyClock) { + MyClock myClock = (MyClock) delegate; + myClock.time += 1000; + } throw new DoNotRetryIOException("Simulating write failure on " + c.getEnvironment().getRegionInfo().getTable().getNameAsString()); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b539cd62/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index 47ad7cf..d08db27 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -552,22 +552,27 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso TableName.valueOf(table.getPhysicalName().getBytes())); builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_ALREADY_EXISTS); - long disableIndexTimestamp = table.getIndexDisableTimestamp(); - long minNonZerodisableIndexTimestamp = disableIndexTimestamp > 0 ? disableIndexTimestamp : Long.MAX_VALUE; - for (PTable index : table.getIndexes()) { - disableIndexTimestamp = index.getIndexDisableTimestamp(); - if (disableIndexTimestamp > 0 && (index.getIndexState() == PIndexState.ACTIVE || index.getIndexState() == PIndexState.PENDING_ACTIVE) && disableIndexTimestamp < minNonZerodisableIndexTimestamp) { - minNonZerodisableIndexTimestamp = disableIndexTimestamp; + builder.setMutationTime(currentTime); + if (blockWriteRebuildIndex) { + long disableIndexTimestamp = table.getIndexDisableTimestamp(); + long minNonZerodisableIndexTimestamp = disableIndexTimestamp > 0 ? disableIndexTimestamp : Long.MAX_VALUE; + for (PTable index : table.getIndexes()) { + disableIndexTimestamp = index.getIndexDisableTimestamp(); + if (disableIndexTimestamp > 0 + && (index.getIndexState() == PIndexState.ACTIVE + || index.getIndexState() == PIndexState.PENDING_ACTIVE + || index.getIndexState() == PIndexState.PENDING_DISABLE) + && disableIndexTimestamp < minNonZerodisableIndexTimestamp) { + minNonZerodisableIndexTimestamp = disableIndexTimestamp; + } + } + // Freeze time for table at min non-zero value of INDEX_DISABLE_TIMESTAMP + // This will keep the table consistent with index as the table has had one more + // batch applied to it. + if (minNonZerodisableIndexTimestamp != Long.MAX_VALUE) { + // Subtract one because we add one due to timestamp granularity in Windows + builder.setMutationTime(minNonZerodisableIndexTimestamp - 1); } - } - // Freeze time for table at min non-zero value of INDEX_DISABLE_TIMESTAMP - // This will keep the table consistent with index as the table has had one more - // batch applied to it. - if (minNonZerodisableIndexTimestamp == Long.MAX_VALUE) { - builder.setMutationTime(currentTime); - } else { - // Subtract one because we add one due to timestamp granularity in Windows - builder.setMutationTime(minNonZerodisableIndexTimestamp - 1); } if (table.getTimeStamp() != tableTimeStamp) { @@ -932,6 +937,12 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso if (indexState == PIndexState.PENDING_ACTIVE && clientVersion < PhoenixDatabaseMetaData.MIN_PENDING_ACTIVE_INDEX) { indexState = PIndexState.ACTIVE; } + // If client is not yet up to 4.14, then translate PENDING_DISABLE to DISABLE + // since the client won't have this index state in its enum. + if (indexState == PIndexState.PENDING_DISABLE && clientVersion < PhoenixDatabaseMetaData.MIN_PENDING_DISABLE_INDEX) { + // note: for older clients, we have to rely on the rebuilder to transition PENDING_DISABLE -> DISABLE + indexState = PIndexState.DISABLE; + } Cell immutableRowsKv = tableKeyValues[IMMUTABLE_ROWS_INDEX]; boolean isImmutableRows = immutableRowsKv == null ? false : (Boolean) PBoolean.INSTANCE.toObject( @@ -3663,6 +3674,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso // Timestamp of INDEX_STATE gets updated with each call long actualTimestamp = currentStateKV.getTimestamp(); long curTimeStampVal = 0; + long newDisableTimeStamp = 0; if ((currentDisableTimeStamp != null && currentDisableTimeStamp.getValueLength() > 0)) { curTimeStampVal = (Long) PLong.INSTANCE.toObject(currentDisableTimeStamp.getValueArray(), currentDisableTimeStamp.getValueOffset(), currentDisableTimeStamp.getValueLength()); @@ -3679,7 +3691,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso done.run(builder.build()); return; } - long newDisableTimeStamp = (Long) PLong.INSTANCE.toObject(newDisableTimeStampCell.getValueArray(), + newDisableTimeStamp = (Long) PLong.INSTANCE.toObject(newDisableTimeStampCell.getValueArray(), newDisableTimeStampCell.getValueOffset(), newDisableTimeStampCell.getValueLength()); // We use the sign of the INDEX_DISABLE_TIMESTAMP to differentiate the keep-index-active (negative) // from block-writes-to-data-table case. In either case, we want to keep the oldest timestamp to @@ -3688,7 +3700,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso // We do legitimately move the INDEX_DISABLE_TIMESTAMP to be newer when we're rebuilding the // index in which case the state will be INACTIVE or PENDING_ACTIVE. if (curTimeStampVal != 0 - && (newState == PIndexState.DISABLE || newState == PIndexState.PENDING_ACTIVE) + && (newState == PIndexState.DISABLE || newState == PIndexState.PENDING_ACTIVE || newState == PIndexState.PENDING_DISABLE) && Math.abs(curTimeStampVal) < Math.abs(newDisableTimeStamp)) { // do not reset disable timestamp as we want to keep the min newKVs.remove(disableTimeStampKVIndex); @@ -3717,6 +3729,13 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso if (newState == PIndexState.ACTIVE) { newState = PIndexState.DISABLE; } + // Can't transition from DISABLE to PENDING_DISABLE + if (newState == PIndexState.PENDING_DISABLE) { + builder.setReturnCode(MetaDataProtos.MutationCode.UNALLOWED_TABLE_MUTATION); + builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis()); + done.run(builder.build()); + return; + } } if (currentState == PIndexState.BUILDING && newState != PIndexState.ACTIVE) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/b539cd62/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java index fe11ec7..efad1e7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java @@ -63,7 +63,7 @@ import com.google.protobuf.ByteString; */ public abstract class MetaDataProtocol extends MetaDataService { public static final int PHOENIX_MAJOR_VERSION = 4; - public static final int PHOENIX_MINOR_VERSION = 13; + public static final int PHOENIX_MINOR_VERSION = 14; public static final int PHOENIX_PATCH_NUMBER = 0; public static final int PHOENIX_VERSION = VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER); @@ -93,8 +93,9 @@ public abstract class MetaDataProtocol extends MetaDataService { // Since there's no upgrade code, keep the version the same as the previous version public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_12_0 = MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0; public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_13_0 = MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0; + public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_14_0 = MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0; // MIN_SYSTEM_TABLE_TIMESTAMP needs to be set to the max of all the MIN_SYSTEM_TABLE_TIMESTAMP_* constants - public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_SYSTEM_TABLE_TIMESTAMP_4_13_0; + public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_SYSTEM_TABLE_TIMESTAMP_4_14_0; // ALWAYS update this map whenever rolling out a new release (major, minor or patch release). // Key is the SYSTEM.CATALOG timestamp for the version and value is the version string. @@ -114,6 +115,7 @@ public abstract class MetaDataProtocol extends MetaDataService { TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0, "4.11.x"); TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_12_0, "4.12.x"); TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_13_0, "4.13.x"); + TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_14_0, "4.14.x"); } public static final String CURRENT_CLIENT_VERSION = PHOENIX_MAJOR_VERSION + "." + PHOENIX_MINOR_VERSION + "." + PHOENIX_PATCH_NUMBER; http://git-wip-us.apache.org/repos/asf/phoenix/blob/b539cd62/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java index af06235..e51a61e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java @@ -229,6 +229,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver { private final long rebuildIndexBatchSize; private final long configuredBatches; private final long indexDisableTimestampThreshold; + private final long pendingDisableThreshold; private final ReadOnlyProps props; private final List<String> onlyTheseTables; @@ -247,6 +248,9 @@ public class MetaDataRegionObserver extends BaseRegionObserver { this.indexDisableTimestampThreshold = configuration.getLong(QueryServices.INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD, QueryServicesOptions.DEFAULT_INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD); + this.pendingDisableThreshold = + configuration.getLong(QueryServices.INDEX_PENDING_DISABLE_THRESHOLD, + QueryServicesOptions.DEFAULT_INDEX_PENDING_DISABLE_THRESHOLD); this.props = new ReadOnlyProps(env.getConfiguration().iterator()); } @@ -342,6 +346,18 @@ public class MetaDataRegionObserver extends BaseRegionObserver { } PIndexState indexState = PIndexState.fromSerializedValue(indexStateBytes[0]); + long elapsedSinceDisable = EnvironmentEdgeManager.currentTimeMillis() - Math.abs(indexDisableTimestamp); + + // on an index write failure, the server side transitions to PENDING_DISABLE, then the client + // retries, and after retries are exhausted, disables the index + if (indexState == PIndexState.PENDING_DISABLE) { + if (elapsedSinceDisable > pendingDisableThreshold) { + // too long in PENDING_DISABLE - client didn't disable the index, so we do it here + IndexUtil.updateIndexState(conn, indexTableFullName, PIndexState.DISABLE, indexDisableTimestamp); + } + continue; + } + // Only perform relatively expensive check for all regions online when index // is disabled or pending active since that's the state it's placed into when // an index write fails. @@ -351,7 +367,8 @@ public class MetaDataRegionObserver extends BaseRegionObserver { + indexPTable.getName() + " are online."); continue; } - if (EnvironmentEdgeManager.currentTimeMillis() - Math.abs(indexDisableTimestamp) > indexDisableTimestampThreshold) { + + if (elapsedSinceDisable > indexDisableTimestampThreshold) { /* * It has been too long since the index has been disabled and any future * attempts to reenable it likely will fail. So we are going to mark the http://git-wip-us.apache.org/repos/asf/phoenix/blob/b539cd62/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index af50420..7692bc8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -53,22 +53,18 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.CoprocessorHConnection; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; -import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory; -import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; @@ -81,9 +77,9 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.WritableUtils; import org.apache.phoenix.cache.ServerCacheClient; -import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode; import org.apache.phoenix.coprocessor.generated.PTableProtos; import org.apache.phoenix.exception.DataExceedsCapacityException; +import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.execute.TupleProjector; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.ExpressionType; @@ -92,13 +88,15 @@ import org.apache.phoenix.expression.aggregator.Aggregators; import org.apache.phoenix.expression.aggregator.ServerAggregators; import org.apache.phoenix.hbase.index.ValueGetter; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; +import org.apache.phoenix.hbase.index.exception.IndexWriteException; import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.index.PhoenixIndexCodec; +import org.apache.phoenix.index.PhoenixIndexFailurePolicy; +import org.apache.phoenix.index.PhoenixIndexFailurePolicy.MutateCommand; import org.apache.phoenix.jdbc.PhoenixConnection; -import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.join.HashJoinInfo; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; @@ -136,22 +134,19 @@ import org.apache.phoenix.util.ExpressionUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.KeyValueUtil; import org.apache.phoenix.util.LogUtil; -import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.ServerUtil; import org.apache.phoenix.util.StringUtil; -import org.apache.phoenix.util.TimeKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Predicate; import com.google.common.base.Throwables; -import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.primitives.Ints; @@ -205,6 +200,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver private KeyValueBuilder kvBuilder; private Configuration upsertSelectConfig; private Configuration compactionConfig; + private Configuration indexWriteConfig; + private ReadOnlyProps indexWriteProps; @Override public void start(CoprocessorEnvironment e) throws IOException { @@ -234,6 +231,13 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver compactionConfig.setInt(HConstants.HBASE_CLIENT_PAUSE, e.getConfiguration().getInt(QueryServices.METADATA_WRITE_RETRY_PAUSE, QueryServicesOptions.DEFAULT_METADATA_WRITE_RETRY_PAUSE)); + + // For retries of index write failures, use the same # of retries as the rebuilder + indexWriteConfig = PropertiesUtil.cloneConfig(e.getConfiguration()); + indexWriteConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + e.getConfiguration().getInt(QueryServices.INDEX_REBUILD_RPC_RETRIES_COUNTER, + QueryServicesOptions.DEFAULT_INDEX_REBUILD_RPC_RETRIES_COUNTER)); + indexWriteProps = new ReadOnlyProps(indexWriteConfig.iterator()); } private void commitBatch(Region region, List<Mutation> mutations, long blockingMemstoreSize) throws IOException { @@ -254,7 +258,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } } // TODO: should we use the one that is all or none? - logger.debug("Committing bactch of " + mutations.size() + " mutations for " + region.getRegionInfo().getTable().getNameAsString()); + logger.debug("Committing batch of " + mutations.size() + " mutations for " + region.getRegionInfo().getTable().getNameAsString()); region.batchMutate(mutations.toArray(mutationArray), HConstants.NO_NONCE, HConstants.NO_NONCE); } @@ -860,21 +864,65 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } } - private void commit(Region region, List<Mutation> mutations, byte[] indexUUID, long blockingMemStoreSize, - byte[] indexMaintainersPtr, byte[] txState, HTable targetHTable, boolean useIndexProto, + private void commit(final Region region, List<Mutation> mutations, byte[] indexUUID, final long blockingMemStoreSize, + byte[] indexMaintainersPtr, byte[] txState, final HTable targetHTable, boolean useIndexProto, boolean isPKChanging) throws IOException { - List<Mutation> localRegionMutations = Lists.newArrayList(); - List<Mutation> remoteRegionMutations = Lists.newArrayList(); + final List<Mutation> localRegionMutations = Lists.newArrayList(); + final List<Mutation> remoteRegionMutations = Lists.newArrayList(); setIndexAndTransactionProperties(mutations, indexUUID, indexMaintainersPtr, txState, useIndexProto); separateLocalAndRemoteMutations(targetHTable, region, mutations, localRegionMutations, remoteRegionMutations, isPKChanging); - commitBatch(region, localRegionMutations, blockingMemStoreSize); - commitBatchWithHTable(targetHTable, remoteRegionMutations); + try { + commitBatch(region, localRegionMutations, blockingMemStoreSize); + } catch (IOException e) { + handleIndexWriteException(localRegionMutations, e, new MutateCommand() { + @Override + public void doMutation() throws IOException { + commitBatch(region, localRegionMutations, blockingMemStoreSize); + } + }); + } + try { + commitBatchWithHTable(targetHTable, remoteRegionMutations); + } catch (IOException e) { + handleIndexWriteException(remoteRegionMutations, e, new MutateCommand() { + @Override + public void doMutation() throws IOException { + commitBatchWithHTable(targetHTable, remoteRegionMutations); + } + }); + } localRegionMutations.clear(); remoteRegionMutations.clear(); } + private void handleIndexWriteException(final List<Mutation> localRegionMutations, IOException origIOE, + MutateCommand mutateCommand) throws IOException { + long serverTimestamp = ServerUtil.parseTimestampFromRemoteException(origIOE); + SQLException inferredE = ServerUtil.parseLocalOrRemoteServerException(origIOE); + if (inferredE != null && inferredE.getErrorCode() == SQLExceptionCode.INDEX_WRITE_FAILURE.getErrorCode()) { + // For an index write failure, the data table write succeeded, + // so when we retry we need to set REPLAY_WRITES + for (Mutation mutation : localRegionMutations) { + mutation.setAttribute(REPLAY_WRITES, REPLAY_ONLY_INDEX_WRITES); + // use the server timestamp for index write retrys + KeyValueUtil.setTimestamp(mutation, serverTimestamp); + } + IndexWriteException iwe = PhoenixIndexFailurePolicy.getIndexWriteException(inferredE); + try (PhoenixConnection conn = + QueryUtil.getConnectionOnServer(indexWriteConfig) + .unwrap(PhoenixConnection.class)) { + PhoenixIndexFailurePolicy.doBatchWithRetries(mutateCommand, iwe, conn, + indexWriteProps); + } catch (Exception e) { + throw new DoNotRetryIOException(e); + } + } else { + throw origIOE; + } + } + private void separateLocalAndRemoteMutations(HTable targetHTable, Region region, List<Mutation> mutations, List<Mutation> localRegionMutations, List<Mutation> remoteRegionMutations, boolean isPKChanging){ http://git-wip-us.apache.org/repos/asf/phoenix/blob/b539cd62/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java index 2301c32..0f29f3f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java @@ -367,6 +367,7 @@ public enum SQLExceptionCode { CONNECTION_CLOSED(1111, "XCL11", "Connectioin is closed."), INDEX_FAILURE_BLOCK_WRITE(1120, "XCL20", "Writes to table blocked until index can be updated."), + INDEX_WRITE_FAILURE(1121, "XCL21", "Write to the index failed."), UPDATE_CACHE_FREQUENCY_INVALID(1130, "XCL30", "UPDATE_CACHE_FREQUENCY cannot be set to ALWAYS if APPEND_ONLY_SCHEMA is true."), CANNOT_DROP_COL_APPEND_ONLY_SCHEMA(1131, "XCL31", "Cannot drop column from table that with append only schema."), http://git-wip-us.apache.org/repos/asf/phoenix/blob/b539cd62/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 993438e..0719966 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -53,11 +53,14 @@ import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; +import org.apache.phoenix.hbase.index.exception.IndexWriteException; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.index.IndexMetaDataCacheClient; import org.apache.phoenix.index.PhoenixIndexBuilder; import org.apache.phoenix.index.PhoenixIndexCodec; +import org.apache.phoenix.index.PhoenixIndexFailurePolicy; +import org.apache.phoenix.index.PhoenixIndexFailurePolicy.MutateCommand; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixStatement.Operation; import org.apache.phoenix.monitoring.GlobalClientMetrics; @@ -983,6 +986,8 @@ public class MutationState implements SQLCloseable { long mutationCommitTime = 0; long numFailedMutations = 0;; long startTime = 0; + boolean shouldRetryIndexedMutation = false; + IndexWriteException iwe = null; do { TableRef origTableRef = tableInfo.getOrigTableRef(); PTable table = origTableRef.getTable(); @@ -1016,8 +1021,25 @@ public class MutationState implements SQLCloseable { startTime = System.currentTimeMillis(); child.addTimelineAnnotation("Attempt " + retryCount); List<List<Mutation>> mutationBatchList = getMutationBatchList(batchSize, batchSizeBytes, mutationList); - for (List<Mutation> mutationBatch : mutationBatchList) { - hTable.batch(mutationBatch); + for (final List<Mutation> mutationBatch : mutationBatchList) { + if (shouldRetryIndexedMutation) { + // if there was an index write failure, retry the mutation in a loop + final HTableInterface finalHTable = hTable; + PhoenixIndexFailurePolicy.doBatchWithRetries(new MutateCommand() { + @Override + public void doMutation() throws IOException { + try { + finalHTable.batch(mutationBatch); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException(e); + } + }}, iwe, + connection, connection.getQueryServices().getProps()); + } else { + hTable.batch(mutationBatch); + } + batchCount++; if (logger.isDebugEnabled()) logger.debug("Sent batch of " + mutationBatch.size() + " for " + Bytes.toString(htableName)); } @@ -1054,6 +1076,19 @@ public class MutationState implements SQLCloseable { child = Tracing.child(span,"Failed batch, attempting retry"); continue; + } else if (inferredE.getErrorCode() == SQLExceptionCode.INDEX_WRITE_FAILURE.getErrorCode()) { + iwe = PhoenixIndexFailurePolicy.getIndexWriteException(inferredE); + if (iwe != null && !shouldRetryIndexedMutation) { + // For an index write failure, the data table write succeeded, + // so when we retry we need to set REPLAY_WRITES + for (Mutation m : mutationList) { + m.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES, BaseScannerRegionObserver.REPLAY_ONLY_INDEX_WRITES); + KeyValueUtil.setTimestamp(m, serverTimestamp); + } + shouldRetry = true; + shouldRetryIndexedMutation = true; + continue; + } } e = inferredE; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b539cd62/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java index 9686789..f8195f1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java @@ -185,7 +185,6 @@ public class Indexer extends BaseRegionObserver { private long slowPostOpenThreshold; private long slowPreIncrementThreshold; private int rowLockWaitDuration; - private Configuration compactionConfig; public static final String RecoveryFailurePolicyKeyForTesting = INDEX_RECOVERY_FAILURE_POLICY_KEY; @@ -242,15 +241,6 @@ public class Indexer extends BaseRegionObserver { this.metricSource = MetricsIndexerSourceFactory.getInstance().create(); setSlowThresholds(e.getConfiguration()); - compactionConfig = PropertiesUtil.cloneConfig(e.getConfiguration()); - // lower the number of rpc retries, so we don't hang the compaction - compactionConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, - e.getConfiguration().getInt(QueryServices.METADATA_WRITE_RETRIES_NUMBER, - QueryServicesOptions.DEFAULT_METADATA_WRITE_RETRIES_NUMBER)); - compactionConfig.setInt(HConstants.HBASE_CLIENT_PAUSE, - e.getConfiguration().getInt(QueryServices.METADATA_WRITE_RETRY_PAUSE, - QueryServicesOptions.DEFAULT_METADATA_WRITE_RETRY_PAUSE)); - try { // get the specified failure policy. We only ever override it in tests, but we need to do it // here http://git-wip-us.apache.org/repos/asf/phoenix/blob/b539cd62/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/IndexWriteException.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/IndexWriteException.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/IndexWriteException.java index 2ec29bc..531baa6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/IndexWriteException.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/IndexWriteException.java @@ -17,7 +17,10 @@ */ package org.apache.phoenix.hbase.index.exception; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.phoenix.query.QueryServicesOptions; /** * Generic {@link Exception} that an index write has failed @@ -25,19 +28,57 @@ import org.apache.hadoop.hbase.HBaseIOException; @SuppressWarnings("serial") public class IndexWriteException extends HBaseIOException { + /* + * We pass this message back to the client so that the config only needs to be set on the + * server side. + */ + private static final String DISABLE_INDEX_ON_FAILURE_MSG = "disableIndexOnFailure="; + private boolean disableIndexOnFailure; + public IndexWriteException() { super(); } + /** + * Used for the case where we cannot reach the index, but not sure of the table or the mutations + * that caused the failure + * @param message + * @param cause + */ public IndexWriteException(String message, Throwable cause) { - super(message, cause); + super(message, cause); + } + + public IndexWriteException(String message, Throwable cause, boolean disableIndexOnFailure) { + super(prependDisableIndexMsg(message, disableIndexOnFailure), cause); } - public IndexWriteException(String message) { - super(message); + public IndexWriteException(String message, boolean disableIndexOnFailure) { + super(prependDisableIndexMsg(message, disableIndexOnFailure)); + this.disableIndexOnFailure = disableIndexOnFailure; } - public IndexWriteException(Throwable cause) { + private static String prependDisableIndexMsg(String message, boolean disableIndexOnFailure) { + return DISABLE_INDEX_ON_FAILURE_MSG + disableIndexOnFailure + " " + message; +} + +public IndexWriteException(Throwable cause) { super(cause); } + + public static boolean parseDisableIndexOnFailure(String message) { + Pattern p = + Pattern.compile(DISABLE_INDEX_ON_FAILURE_MSG + "(true|false)", + Pattern.CASE_INSENSITIVE); + Matcher m = p.matcher(message); + if (m.find()) { + boolean disableIndexOnFailure = Boolean.parseBoolean(m.group(1)); + return disableIndexOnFailure; + } + return QueryServicesOptions.DEFAULT_INDEX_FAILURE_DISABLE_INDEX; + } + + public boolean isDisableIndexOnFailure() { + return disableIndexOnFailure; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/b539cd62/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/MultiIndexWriteFailureException.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/MultiIndexWriteFailureException.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/MultiIndexWriteFailureException.java index 546b43d..d593791 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/MultiIndexWriteFailureException.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/MultiIndexWriteFailureException.java @@ -18,8 +18,14 @@ package org.apache.phoenix.hbase.index.exception; import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.hbase.index.table.HTableInterfaceReference; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; + +import com.google.common.collect.Lists; /** * Indicate a failure to write to multiple index tables. @@ -27,15 +33,34 @@ import org.apache.phoenix.hbase.index.table.HTableInterfaceReference; @SuppressWarnings("serial") public class MultiIndexWriteFailureException extends IndexWriteException { + public static final String FAILURE_MSG = "Failed to write to multiple index tables: "; private List<HTableInterfaceReference> failures; /** * @param failures the tables to which the index write did not succeed */ - public MultiIndexWriteFailureException(List<HTableInterfaceReference> failures) { - super("Failed to write to multiple index tables"); + public MultiIndexWriteFailureException(List<HTableInterfaceReference> failures, boolean disableIndexOnFailure) { + super(FAILURE_MSG + failures, disableIndexOnFailure); this.failures = failures; + } + /** + * This constructor used to rematerialize this exception when receiving + * an rpc exception from the server + * @param message detail message + */ + public MultiIndexWriteFailureException(String message) { + super(message, IndexWriteException.parseDisableIndexOnFailure(message)); + Pattern p = Pattern.compile(FAILURE_MSG + "\\[(.*)\\]"); + Matcher m = p.matcher(message); + if (m.find()) { + failures = Lists.newArrayList(); + String tablesStr = m.group(1); + for (String tableName : tablesStr.split(",\\s")) { + HTableInterfaceReference tableRef = new HTableInterfaceReference(new ImmutableBytesPtr(Bytes.toBytes(tableName))); + failures.add(tableRef); + } + } } public List<HTableInterfaceReference> getFailedTables() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/b539cd62/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/SingleIndexWriteFailureException.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/SingleIndexWriteFailureException.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/SingleIndexWriteFailureException.java index eb3b521..610a82a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/SingleIndexWriteFailureException.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/SingleIndexWriteFailureException.java @@ -18,6 +18,8 @@ package org.apache.phoenix.hbase.index.exception; import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.hadoop.hbase.client.Mutation; @@ -27,6 +29,7 @@ import org.apache.hadoop.hbase.client.Mutation; @SuppressWarnings("serial") public class SingleIndexWriteFailureException extends IndexWriteException { + public static final String FAILED_MSG = "Failed to make index update:"; private String table; /** @@ -45,13 +48,27 @@ public class SingleIndexWriteFailureException extends IndexWriteException { * @param cause underlying reason for the failure */ public SingleIndexWriteFailureException(String targetTableName, List<Mutation> mutations, - Exception cause) { - super("Failed to make index update:\n\t table: " + targetTableName + "\n\t edits: " + mutations - + "\n\tcause: " + cause == null ? "UNKNOWN" : cause.getMessage(), cause); + Exception cause, boolean disableIndexOnFailure) { + super(FAILED_MSG + "\n\t table: " + targetTableName + "\n\t edits: " + mutations + + "\n\tcause: " + cause == null ? "UNKNOWN" : cause.getMessage(), cause, disableIndexOnFailure); this.table = targetTableName; } /** + * This constructor used to rematerialize this exception when receiving + * an rpc exception from the server + * @param message detail message + */ + public SingleIndexWriteFailureException(String msg) { + super(msg, IndexWriteException.parseDisableIndexOnFailure(msg)); + Pattern pattern = Pattern.compile(FAILED_MSG + ".* table: ([\\S]*)\\s.*", Pattern.DOTALL); + Matcher m = pattern.matcher(msg); + if (m.find()) { + this.table = m.group(1); + } + } + + /** * @return The table to which we failed to write the index updates. If unknown, returns * <tt>null</tt> */ http://git-wip-us.apache.org/repos/asf/phoenix/blob/b539cd62/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java index 3649069..29b9faf 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java @@ -70,13 +70,13 @@ public class IndexWriterUtils { public static final String HTABLE_KEEP_ALIVE_KEY = "hbase.htable.threads.keepalivetime"; public static final String INDEX_WRITER_RPC_RETRIES_NUMBER = "phoenix.index.writes.rpc.retries.number"; - /** - * Based on the logic in HBase's AsyncProcess, a default of 11 retries with a pause of 100ms - * approximates 48 sec total retry time (factoring in backoffs). The total time should be less - * than HBase's rpc timeout (default of 60 sec) or else the client will retry before receiving - * the response - */ - public static final int DEFAULT_INDEX_WRITER_RPC_RETRIES_NUMBER = 11; + /** + * Retry server-server index write rpc only once, and let the client retry the data write + * instead to avoid typing up the handler + */ + // note in HBase 2+, numTries = numRetries + 1 + // in prior versions, numTries = numRetries + public static final int DEFAULT_INDEX_WRITER_RPC_RETRIES_NUMBER = 1; public static final String INDEX_WRITER_RPC_PAUSE = "phoenix.index.writes.rpc.pause"; public static final int DEFAULT_INDEX_WRITER_RPC_PAUSE = 100; http://git-wip-us.apache.org/repos/asf/phoenix/blob/b539cd62/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java index e4e8343..0bb8784 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java @@ -35,6 +35,7 @@ import org.apache.phoenix.hbase.index.parallel.ThreadPoolManager; import org.apache.phoenix.hbase.index.table.HTableFactory; import org.apache.phoenix.hbase.index.table.HTableInterfaceReference; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; +import org.apache.phoenix.index.PhoenixIndexFailurePolicy; import org.apache.phoenix.util.IndexUtil; import com.google.common.collect.Multimap; @@ -167,11 +168,11 @@ public class ParallelWriterIndexCommitter implements IndexCommitter { } catch (SingleIndexWriteFailureException e) { throw e; } catch (IOException e) { - throw new SingleIndexWriteFailureException(tableReference.toString(), mutations, e); + throw new SingleIndexWriteFailureException(tableReference.toString(), mutations, e, PhoenixIndexFailurePolicy.getDisableIndexOnFailure(env)); } catch (InterruptedException e) { // reset the interrupt status on the thread Thread.currentThread().interrupt(); - throw new SingleIndexWriteFailureException(tableReference.toString(), mutations, e); + throw new SingleIndexWriteFailureException(tableReference.toString(), mutations, e, PhoenixIndexFailurePolicy.getDisableIndexOnFailure(env)); } finally{ if (table != null) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/b539cd62/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java index 0449e9e..94d4f0f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java @@ -40,6 +40,7 @@ import org.apache.phoenix.hbase.index.parallel.WaitForCompletionTaskRunner; import org.apache.phoenix.hbase.index.table.HTableFactory; import org.apache.phoenix.hbase.index.table.HTableInterfaceReference; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; +import org.apache.phoenix.index.PhoenixIndexFailurePolicy; import org.apache.phoenix.util.IndexUtil; import com.google.common.collect.Multimap; @@ -110,6 +111,7 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter { this.factory = factory; this.abortable = new CapturingAbortable(abortable); this.stopped = stop; + this.env = env; } @Override @@ -226,7 +228,8 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter { // if any of the tasks failed, then we need to propagate the failure if (failures.size() > 0) { // make the list unmodifiable to avoid any more synchronization concerns - throw new MultiIndexWriteFailureException(Collections.unmodifiableList(failures)); + throw new MultiIndexWriteFailureException(Collections.unmodifiableList(failures), + PhoenixIndexFailurePolicy.getDisableIndexOnFailure(env)); } return; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b539cd62/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java index ba6371b..14f8307 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java @@ -30,21 +30,28 @@ import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode; +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.exception.SQLExceptionInfo; +import org.apache.phoenix.hbase.index.exception.IndexWriteException; import org.apache.phoenix.hbase.index.exception.MultiIndexWriteFailureException; +import org.apache.phoenix.hbase.index.exception.SingleIndexWriteFailureException; import org.apache.phoenix.hbase.index.table.HTableInterfaceReference; import org.apache.phoenix.hbase.index.write.DelegateIndexFailurePolicy; import org.apache.phoenix.hbase.index.write.KillServerOnFailurePolicy; @@ -59,6 +66,7 @@ import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.ServerUtil; @@ -102,14 +110,8 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy { rebuildIndexOnFailure = Boolean.parseBoolean(value); } } - String value = htd.getValue(DISABLE_INDEX_ON_WRITE_FAILURE); - if (value == null) { - disableIndexOnFailure = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_DISABLE_INDEX, - QueryServicesOptions.DEFAULT_INDEX_FAILURE_DISABLE_INDEX); - } else { - disableIndexOnFailure = Boolean.parseBoolean(value); - } - value = htd.getValue(BLOCK_DATA_TABLE_WRITES_ON_WRITE_FAILURE); + disableIndexOnFailure = getDisableIndexOnFailure(env); + String value = htd.getValue(BLOCK_DATA_TABLE_WRITES_ON_WRITE_FAILURE); if (value == null) { blockDataTableWritesOnFailure = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE, QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE); @@ -149,7 +151,11 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy { throwing = false; } finally { if (!throwing) { - IOException ioException = ServerUtil.wrapInDoNotRetryIOException("Unable to update the following indexes: " + attempted.keySet(), cause, timestamp); + SQLException sqlException = + new SQLExceptionInfo.Builder(SQLExceptionCode.INDEX_WRITE_FAILURE) + .setRootCause(cause).setMessage(cause.getLocalizedMessage()).build() + .buildException(); + IOException ioException = ServerUtil.wrapInDoNotRetryIOException(null, sqlException, timestamp); Mutation m = attempted.entries().iterator().next().getValue(); boolean isIndexRebuild = PhoenixIndexMetaData.isIndexRebuild(m.getAttributesMap()); // Always throw if rebuilding index since the rebuilder needs to know if it was successful @@ -212,7 +218,7 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy { return timestamp; } - final PIndexState newState = disableIndexOnFailure ? PIndexState.DISABLE : PIndexState.PENDING_ACTIVE; + final PIndexState newState = disableIndexOnFailure ? PIndexState.PENDING_DISABLE : PIndexState.PENDING_ACTIVE; final long fTimestamp=timestamp; // for all the index tables that we've found, try to disable them and if that fails, try to return User.runAsLoginUser(new PrivilegedExceptionAction<Long>() { @@ -254,12 +260,9 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy { throw new DoNotRetryIOException("Attempt to disable " + indexTableName + " failed."); } } - if (leaveIndexActive) - LOG.info("Successfully update INDEX_DISABLE_TIMESTAMP for " + indexTableName - + " due to an exception while writing updates.", cause); - else - LOG.info("Successfully disabled index " + indexTableName - + " due to an exception while writing updates.", cause); + LOG.info("Successfully update INDEX_DISABLE_TIMESTAMP for " + indexTableName + + " due to an exception while writing updates. indexState=" + newState, + cause); } catch (Throwable t) { if (t instanceof Exception) { throw (Exception)t; @@ -331,4 +334,158 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy { } return indexTableNames; } + + /** + * Check config for whether to disable index on index write failures + * @param htd + * @param config + * @param connection + * @return The table config for {@link PhoenixIndexFailurePolicy.DISABLE_INDEX_ON_WRITE_FAILURE} + * @throws SQLException + */ + public static boolean getDisableIndexOnFailure(RegionCoprocessorEnvironment env) { + HTableDescriptor htd = env.getRegion().getTableDesc(); + Configuration config = env.getConfiguration(); + String value = htd.getValue(PhoenixIndexFailurePolicy.DISABLE_INDEX_ON_WRITE_FAILURE); + boolean disableIndexOnFailure; + if (value == null) { + disableIndexOnFailure = + config.getBoolean(QueryServices.INDEX_FAILURE_DISABLE_INDEX, + QueryServicesOptions.DEFAULT_INDEX_FAILURE_DISABLE_INDEX); + } else { + disableIndexOnFailure = Boolean.parseBoolean(value); + } + return disableIndexOnFailure; + } + + /** + * If we're leaving the index active after index write failures on the server side, then we get + * the exception on the client side here after hitting the max # of hbase client retries. We + * disable the index as it may now be inconsistent. The indexDisableTimestamp was already set + * on the server side, so the rebuilder will be run. + */ + private static void handleIndexWriteFailureFromClient(IndexWriteException indexWriteException, + PhoenixConnection conn) { + handleExceptionFromClient(indexWriteException, conn, PIndexState.DISABLE); + } + + private static void handleIndexWriteSuccessFromClient(IndexWriteException indexWriteException, + PhoenixConnection conn) { + handleExceptionFromClient(indexWriteException, conn, PIndexState.ACTIVE); + } + + private static void handleExceptionFromClient(IndexWriteException indexWriteException, + PhoenixConnection conn, PIndexState indexState) { + try { + Set<String> indexesToUpdate = new HashSet<>(); + if (indexWriteException instanceof MultiIndexWriteFailureException) { + MultiIndexWriteFailureException indexException = + (MultiIndexWriteFailureException) indexWriteException; + List<HTableInterfaceReference> failedIndexes = indexException.getFailedTables(); + if (indexException.isDisableIndexOnFailure() && failedIndexes != null) { + for (HTableInterfaceReference failedIndex : failedIndexes) { + String failedIndexTable = failedIndex.getTableName(); + if (!indexesToUpdate.contains(failedIndexTable)) { + updateIndex(failedIndexTable, conn, indexState); + indexesToUpdate.add(failedIndexTable); + } + } + } + } else if (indexWriteException instanceof SingleIndexWriteFailureException) { + SingleIndexWriteFailureException indexException = + (SingleIndexWriteFailureException) indexWriteException; + String failedIndex = indexException.getTableName(); + if (indexException.isDisableIndexOnFailure() && failedIndex != null) { + updateIndex(failedIndex, conn, indexState); + } + } + } catch (Exception handleE) { + LOG.warn("Error while trying to handle index write exception", indexWriteException); + } + } + + public static interface MutateCommand { + void doMutation() throws IOException; + } + + /** + * Retries a mutationBatch where the index write failed. + * One attempt should have already been made before calling this. + * Max retries and exponential backoff logic mimics that of HBase's client + * If max retries are hit, the index is disabled. + * If the write is successful on a subsequent retry, the index is set back to ACTIVE + * @param mutateCommand mutation command to execute + * @param iwe original IndexWriteException + * @param connection connection to use + * @param config config used to get retry settings + * @throws Exception + */ + public static void doBatchWithRetries(MutateCommand mutateCommand, + IndexWriteException iwe, PhoenixConnection connection, ReadOnlyProps config) + throws IOException { + int maxTries = config.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + long pause = config.getLong(HConstants.HBASE_CLIENT_PAUSE, + HConstants.DEFAULT_HBASE_CLIENT_PAUSE); + int numRetry = 1; // already tried once + // calculate max time to retry for + int timeout = 0; + for (int i = 0; i < maxTries; ++i) { + timeout = (int) (timeout + ConnectionUtils.getPauseTime(pause, i)); + } + long canRetryUntil = EnvironmentEdgeManager.currentTime() + timeout; + while (canRetryMore(numRetry++, maxTries, canRetryUntil)) { + try { + Thread.sleep(ConnectionUtils.getPauseTime(pause, numRetry)); // HBase's exponential backoff + mutateCommand.doMutation(); + // success - change the index state from PENDING_DISABLE back to ACTIVE + handleIndexWriteSuccessFromClient(iwe, connection); + return; + } catch (IOException e) { + SQLException inferredE = ServerUtil.parseLocalOrRemoteServerException(e); + if (inferredE == null || inferredE.getErrorCode() != SQLExceptionCode.INDEX_WRITE_FAILURE.getErrorCode()) { + // if it's not an index write exception, throw exception, to be handled normally in caller's try-catch + throw e; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException(e); + } + } + // max retries hit - disable the index + handleIndexWriteFailureFromClient(iwe, connection); + throw new DoNotRetryIOException(iwe); // send failure back to client + } + + private static boolean canRetryMore(int numRetry, int maxRetries, long canRetryUntil) { + // If there is a single try we must not take into account the time. + return numRetry < maxRetries + || (maxRetries > 1 && EnvironmentEdgeManager.currentTime() < canRetryUntil); + } + + /** + * Converts from SQLException to IndexWriteException + * @param sqlE the SQLException + * @return the IndexWriteException + */ + public static IndexWriteException getIndexWriteException(SQLException sqlE) { + String sqlMsg = sqlE.getMessage(); + if (sqlMsg.contains(MultiIndexWriteFailureException.FAILURE_MSG)) { + return new MultiIndexWriteFailureException(sqlMsg); + } else if (sqlMsg.contains(SingleIndexWriteFailureException.FAILED_MSG)) { + return new SingleIndexWriteFailureException(sqlMsg); + } + return null; + } + + private static void updateIndex(String indexFullName, PhoenixConnection conn, + PIndexState indexState) throws SQLException { + if (PIndexState.DISABLE.equals(indexState)) { + LOG.info("Disabling index after hitting max number of index write retries: " + + indexFullName); + } else if (PIndexState.ACTIVE.equals(indexState)) { + LOG.debug("Resetting index to active after subsequent success " + indexFullName); + } + IndexUtil.updateIndexState(conn, indexFullName, indexState, null); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/b539cd62/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java index 23330d8..094f743 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java @@ -322,6 +322,7 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData { public static final int MIN_RENEW_LEASE_VERSION = VersionUtil.encodeVersion("1", "1", "3"); public static final int MIN_NAMESPACE_MAPPED_PHOENIX_VERSION = VersionUtil.encodeVersion("4", "8", "0"); public static final int MIN_PENDING_ACTIVE_INDEX = VersionUtil.encodeVersion("4", "12", "0"); + public static final int MIN_PENDING_DISABLE_INDEX = VersionUtil.encodeVersion("4", "14", "0"); // Version below which we should turn off essential column family. public static final int ESSENTIAL_FAMILY_VERSION_THRESHOLD = VersionUtil.encodeVersion("0", "94", "7"); http://git-wip-us.apache.org/repos/asf/phoenix/blob/b539cd62/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java index 5cc415d..8481bc5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java @@ -57,7 +57,9 @@ import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.PTableType; +import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.IndexUtil; import com.google.common.collect.Lists; @@ -68,11 +70,14 @@ public class QueryOptimizer { private final QueryServices services; private final boolean useIndexes; private final boolean costBased; + private long indexPendingDisabledThreshold; public QueryOptimizer(QueryServices services) { this.services = services; this.useIndexes = this.services.getProps().getBoolean(QueryServices.USE_INDEXES_ATTRIB, QueryServicesOptions.DEFAULT_USE_INDEXES); this.costBased = this.services.getProps().getBoolean(QueryServices.COST_BASED_OPTIMIZER_ENABLED, QueryServicesOptions.DEFAULT_COST_BASED_OPTIMIZER_ENABLED); + this.indexPendingDisabledThreshold = this.services.getProps().getLong(QueryServices.INDEX_PENDING_DISABLE_THRESHOLD, + QueryServicesOptions.DEFAULT_INDEX_PENDING_DISABLE_THRESHOLD); } public QueryPlan optimize(PhoenixStatement statement, QueryPlan dataPlan) throws SQLException { @@ -158,7 +163,7 @@ public class QueryOptimizer { return hintedPlan == null ? orderPlansBestToWorst(select, plans, stopAtBestPlan) : plans; } - private static QueryPlan getHintedQueryPlan(PhoenixStatement statement, SelectStatement select, List<PTable> indexes, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, List<QueryPlan> plans) throws SQLException { + private QueryPlan getHintedQueryPlan(PhoenixStatement statement, SelectStatement select, List<PTable> indexes, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, List<QueryPlan> plans) throws SQLException { QueryPlan dataPlan = plans.get(0); String indexHint = select.getHint().getHint(Hint.INDEX); if (indexHint == null) { @@ -215,7 +220,7 @@ public class QueryOptimizer { return -1; } - private static QueryPlan addPlan(PhoenixStatement statement, SelectStatement select, PTable index, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, QueryPlan dataPlan, boolean isHinted) throws SQLException { + private QueryPlan addPlan(PhoenixStatement statement, SelectStatement select, PTable index, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, QueryPlan dataPlan, boolean isHinted) throws SQLException { int nColumns = dataPlan.getProjector().getColumnCount(); String tableAlias = dataPlan.getTableRef().getTableAlias(); String alias = tableAlias==null ? null : '"' + tableAlias + '"'; // double quote in case it's case sensitive @@ -229,8 +234,11 @@ public class QueryOptimizer { // We will or will not do tuple projection according to the data plan. boolean isProjected = dataPlan.getContext().getResolver().getTables().get(0).getTable().getType() == PTableType.PROJECTED; // Check index state of now potentially updated index table to make sure it's active - PIndexState indexState = resolver.getTables().get(0).getTable().getIndexState(); - if (indexState == PIndexState.ACTIVE || indexState == PIndexState.PENDING_ACTIVE) { + TableRef indexTableRef = resolver.getTables().get(0); + PTable indexTable = indexTableRef.getTable(); + PIndexState indexState = indexTable.getIndexState(); + if (indexState == PIndexState.ACTIVE || indexState == PIndexState.PENDING_ACTIVE + || (indexState == PIndexState.PENDING_DISABLE && isUnderPendingDisableThreshold(indexTableRef.getCurrentTime(), indexTable.getIndexDisableTimestamp()))) { try { // translate nodes that match expressions that are indexed to the associated column parse node indexSelect = ParseNodeRewriter.rewrite(indexSelect, new IndexExpressionParseNodeRewriter(index, null, statement.getConnection(), indexSelect.getUdfParseNodes())); @@ -246,10 +254,13 @@ public class QueryOptimizer { && !plan.getContext().getDataColumns().isEmpty()) { return null; } - indexState = plan.getTableRef().getTable().getIndexState(); + indexTableRef = plan.getTableRef(); + indexTable = indexTableRef.getTable(); + indexState = indexTable.getIndexState(); // Checking number of columns handles the wildcard cases correctly, as in that case the index // must contain all columns from the data table to be able to be used. - if (indexState == PIndexState.ACTIVE || indexState == PIndexState.PENDING_ACTIVE) { + if (indexState == PIndexState.ACTIVE || indexState == PIndexState.PENDING_ACTIVE + || (indexState == PIndexState.PENDING_DISABLE && isUnderPendingDisableThreshold(indexTableRef.getCurrentTime(), indexTable.getIndexDisableTimestamp()))) { if (plan.getProjector().getColumnCount() == nColumns) { return plan; } else if (index.getIndexType() == IndexType.GLOBAL) { @@ -312,6 +323,12 @@ public class QueryOptimizer { return null; } + // returns true if we can still use the index + // retuns false if we've been in PENDING_DISABLE too long - index should be considered disabled + private boolean isUnderPendingDisableThreshold(long currentTimestamp, long indexDisableTimestamp) { + return currentTimestamp - indexDisableTimestamp <= indexPendingDisabledThreshold; + } + /** * Order the plans among all the possible ones from best to worst. * If option COST_BASED_OPTIMIZER_ENABLED is on and stats are available, we order the plans based on http://git-wip-us.apache.org/repos/asf/phoenix/blob/b539cd62/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 0b80f4d..2a31f09 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -144,6 +144,8 @@ public interface QueryServices extends SQLCloseable { public static final String INDEX_FAILURE_HANDLING_REBUILD_NUMBER_OF_BATCHES_PER_TABLE = "phoenix.index.rebuild.batch.perTable"; // If index disable timestamp is older than this threshold, then index rebuild task won't attempt to rebuild it public static final String INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD = "phoenix.index.rebuild.disabletimestamp.threshold"; + // threshold number of ms an index has been in PENDING_DISABLE, beyond which we consider it disabled + public static final String INDEX_PENDING_DISABLE_THRESHOLD = "phoenix.index.pending.disable.threshold"; // Block writes to data table when index write fails public static final String INDEX_FAILURE_BLOCK_WRITE = "phoenix.index.failure.block.write"; http://git-wip-us.apache.org/repos/asf/phoenix/blob/b539cd62/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 4d31974..d749433 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -198,6 +198,7 @@ public class QueryServicesOptions { public static final long DEFAULT_INDEX_REBUILD_CLIENT_SCANNER_TIMEOUT = 30000 * 60; // 30 mins public static final int DEFAULT_INDEX_REBUILD_RPC_RETRIES_COUNTER = 5; // 5 total tries at rpc level public static final int DEFAULT_INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD = 30000 * 60; // 30 mins + public static final long DEFAULT_INDEX_PENDING_DISABLE_THRESHOLD = 30000; // 30 secs /** * HConstants#HIGH_QOS is the max we will see to a standard table. We go higher to differentiate http://git-wip-us.apache.org/repos/asf/phoenix/blob/b539cd62/phoenix-core/src/main/java/org/apache/phoenix/schema/PIndexState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PIndexState.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PIndexState.java index d7dbeca..2b6ac4a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PIndexState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PIndexState.java @@ -27,7 +27,12 @@ public enum PIndexState { INACTIVE("i"), DISABLE("x"), REBUILD("r"), - PENDING_ACTIVE("p"); + PENDING_ACTIVE("p"), + // Used when disabling an index on write failure (PHOENIX-4130) + // When an index write fails, it is put in this state, and we let the client retry the mutation + // After retries are exhausted, the client should mark the index as disabled, but if that + // doesn't happen, then the index is considered disabled if it's been in this state too long + PENDING_DISABLE("w"); private final String serializedValue; private final byte[] serializedBytes; http://git-wip-us.apache.org/repos/asf/phoenix/blob/b539cd62/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java index df6a349..4d8565f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.execute.MutationState.MultiRowMutationState; import org.apache.phoenix.execute.MutationState.RowMutationState; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; @@ -132,6 +133,17 @@ public class KeyValueUtil { return kvs[pos]; } + public static void setTimestamp(Mutation m, long timestamp) { + byte[] tsBytes = Bytes.toBytes(timestamp); + for (List<Cell> family : m.getFamilyCellMap().values()) { + List<KeyValue> familyKVs = org.apache.hadoop.hbase.KeyValueUtil.ensureKeyValues(family); + for (KeyValue kv : familyKVs) { + int tsOffset = kv.getTimestampOffset(); + System.arraycopy(tsBytes, 0, kv.getBuffer(), tsOffset, Bytes.SIZEOF_LONG); + } + } + } + /* * Special comparator, *only* works for binary search. *