Revert "PHOENIX-4130 Avoid server retries for mutable indexes" This reverts commit d1241a09c24925a46c6a0e64252d0bbbcd991c58.
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/878a264e Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/878a264e Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/878a264e Branch: refs/heads/4.x-HBase-1.2 Commit: 878a264e5d1f6316d611be1f31a4bcde620c4c8e Parents: d1241a0 Author: Vincent Poon <vincentp...@apache.org> Authored: Wed Jan 31 10:09:54 2018 -0800 Committer: Vincent Poon <vincentp...@apache.org> Committed: Wed Jan 31 10:09:54 2018 -0800 ---------------------------------------------------------------------- .../end2end/index/MutableIndexFailureIT.java | 12 +- .../end2end/index/PartialIndexRebuilderIT.java | 76 ++------ .../coprocessor/MetaDataEndpointImpl.java | 53 ++---- .../phoenix/coprocessor/MetaDataProtocol.java | 6 +- .../coprocessor/MetaDataRegionObserver.java | 19 +- .../UngroupedAggregateRegionObserver.java | 82 ++------ .../phoenix/exception/SQLExceptionCode.java | 1 - .../apache/phoenix/execute/MutationState.java | 39 +--- .../org/apache/phoenix/hbase/index/Indexer.java | 10 + .../index/exception/IndexWriteException.java | 49 +---- .../MultiIndexWriteFailureException.java | 29 +-- .../SingleIndexWriteFailureException.java | 23 +-- .../hbase/index/write/IndexWriterUtils.java | 14 +- .../write/ParallelWriterIndexCommitter.java | 5 +- .../TrackingParallelWriterIndexCommitter.java | 5 +- .../index/PhoenixIndexFailurePolicy.java | 189 ++----------------- .../phoenix/jdbc/PhoenixDatabaseMetaData.java | 1 - .../apache/phoenix/optimize/QueryOptimizer.java | 29 +-- .../org/apache/phoenix/query/QueryServices.java | 2 - .../phoenix/query/QueryServicesOptions.java | 1 - .../org/apache/phoenix/schema/PIndexState.java | 7 +- .../org/apache/phoenix/util/KeyValueUtil.java | 12 -- .../org/apache/phoenix/util/ServerUtil.java | 23 +-- .../hbase/index/write/TestIndexWriter.java | 8 - .../index/write/TestParalleIndexWriter.java | 6 - .../write/TestParalleWriterIndexCommitter.java | 6 - 26 files changed, 116 insertions(+), 591 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/878a264e/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java index c2e0cb6..0318925 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java @@ -29,6 +29,7 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Properties; @@ -129,6 +130,7 @@ public class MutableIndexFailureIT extends BaseTest { public static void doSetup() throws Exception { Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10); serverProps.put("hbase.coprocessor.region.classes", FailingRegionObserver.class.getName()); + serverProps.put(IndexWriterUtils.INDEX_WRITER_RPC_RETRIES_NUMBER, "2"); serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000"); serverProps.put(IndexWriterUtils.INDEX_WRITER_RPC_PAUSE, "5000"); serverProps.put("data.tx.snapshot.dir", "/tmp"); @@ -142,8 +144,7 @@ public class MutableIndexFailureIT extends BaseTest { * because we want to control it's execution ourselves */ serverProps.put(QueryServices.INDEX_REBUILD_TASK_INITIAL_DELAY, Long.toString(Long.MAX_VALUE)); - Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2); - clientProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2"); + Map<String, String> clientProps = Collections.singletonMap(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString()); NUM_SLAVES_BASE = 4; setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator())); indexRebuildTaskRegionEnvironment = @@ -160,8 +161,7 @@ public class MutableIndexFailureIT extends BaseTest { @Parameters(name = "MutableIndexFailureIT_transactional={0},localIndex={1},isNamespaceMapped={2},disableIndexOnWriteFailure={3},failRebuildTask={4},throwIndexWriteFailure={5}") // name is used by failsafe as file name in reports public static List<Object[]> data() { return Arrays.asList(new Object[][] { - // note - can't disableIndexOnWriteFailure without throwIndexWriteFailure, PHOENIX-4130 - { false, false, false, false, false, false}, + { false, false, false, true, false, false}, { false, false, true, true, false, null}, { false, false, true, true, false, true}, { false, false, false, true, false, null}, @@ -181,8 +181,8 @@ public class MutableIndexFailureIT extends BaseTest { { false, true, false, true, false, null}, { false, false, false, true, true, null}, { false, false, true, true, true, null}, - { false, false, false, false, true, false}, - { false, false, true, false, true, false}, + { false, false, false, true, true, false}, + { false, false, true, true, true, false}, } ); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/878a264e/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java index dd986aa..31649bd 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java @@ -36,7 +36,6 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; @@ -47,13 +46,12 @@ import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.phoenix.coprocessor.MetaDataRegionObserver; import org.apache.phoenix.coprocessor.MetaDataRegionObserver.BuildIndexScheduleTask; import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT; -import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.exception.PhoenixIOException; import org.apache.phoenix.execute.CommitException; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.query.QueryServices; -import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PMetaData; import org.apache.phoenix.schema.PTable; @@ -96,9 +94,7 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { serverProps.put(QueryServices.INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD, "50000000"); serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_PERIOD, Long.toString(REBUILD_PERIOD)); // batch at 50 seconds serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_FORWARD_TIME_ATTRIB, Long.toString(WAIT_AFTER_DISABLED)); - Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1); - clientProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2"); - setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator())); + setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), ReadOnlyProps.EMPTY_PROPS); indexRebuildTaskRegionEnvironment = (RegionCoprocessorEnvironment) getUtility() .getRSForFirstRegionInTable( @@ -1031,51 +1027,6 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { } } - // Tests that when we've been in PENDING_DISABLE for too long, queries don't use the index, - // and the rebuilder should mark the index DISABLED - @Test - public void testPendingDisable() throws Throwable { - String schemaName = generateUniqueName(); - String tableName = generateUniqueName(); - String indexName = generateUniqueName(); - final String fullTableName = SchemaUtil.getTableName(schemaName, tableName); - final String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); - final MyClock clock = new MyClock(1000); - EnvironmentEdgeManager.injectEdge(clock); - try (Connection conn = DriverManager.getConnection(getUrl())) { - conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR, v3 VARCHAR) COLUMN_ENCODED_BYTES = 0, DISABLE_INDEX_ON_WRITE_FAILURE = TRUE"); - clock.time += 100; - conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v1, v2)"); - clock.time += 100; - conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a','0')"); - conn.commit(); - clock.time += 100; - HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); - IndexUtil.updateIndexState(fullIndexName, clock.currentTime(), metaTable, PIndexState.PENDING_DISABLE); - Configuration conf = - conn.unwrap(PhoenixConnection.class).getQueryServices().getConfiguration(); - // under threshold should use the index - PhoenixStatement stmt = conn.createStatement().unwrap(PhoenixStatement.class); - ResultSet rs = stmt.executeQuery("SELECT V2 FROM " + fullTableName + " WHERE V1 = 'a'"); - assertTrue(rs.next()); - assertEquals("0", rs.getString(1)); - assertEquals(fullIndexName, stmt.getQueryPlan().getContext().getCurrentTable().getTable().getName().getString()); - // over threshold should not use the index - long pendingDisableThreshold = conf.getLong(QueryServices.INDEX_PENDING_DISABLE_THRESHOLD, - QueryServicesOptions.DEFAULT_INDEX_PENDING_DISABLE_THRESHOLD); - clock.time += pendingDisableThreshold + 1000; - stmt = conn.createStatement().unwrap(PhoenixStatement.class); - rs = stmt.executeQuery("SELECT V2 FROM " + fullTableName + " WHERE V1 = 'a'"); - assertTrue(rs.next()); - assertEquals("0", rs.getString(1)); - assertEquals(fullTableName, stmt.getQueryPlan().getContext().getCurrentTable().getTable().getName().getString()); - // if we're over the threshold, the rebuilder should disable the index - waitForIndexState(conn, fullTableName, fullIndexName, PIndexState.DISABLE); - } finally { - EnvironmentEdgeManager.reset(); - } - } - //Tests that when we're updating an index from within the RS (e.g. UngruopedAggregateRegionObserver), // if the index write fails the index gets disabled @Test @@ -1097,9 +1048,22 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { try { conn.createStatement().execute("DELETE FROM " + fullTableName); fail(); - } catch (SQLException e) { + } catch (CommitException|PhoenixIOException e) { + // Expected + } + assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.DISABLE, null)); + // reset the index state to ACTIVE + HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); + IndexUtil.updateIndexState(fullIndexName, 0, metaTable, PIndexState.INACTIVE); + IndexUtil.updateIndexState(fullIndexName, 0, metaTable, PIndexState.ACTIVE); + TestUtil.removeCoprocessor(conn, fullIndexName, WriteFailingRegionObserver.class); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a','0', 't')"); + TestUtil.addCoprocessor(conn, fullIndexName, WriteFailingRegionObserver.class); + try { + conn.createStatement().execute("DELETE FROM " + fullTableName + " WHERE v1='a'"); + fail(); + } catch (CommitException|PhoenixIOException e) { // Expected - assertEquals(SQLExceptionCode.INDEX_WRITE_FAILURE.getErrorCode(), e.getErrorCode()); } assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.DISABLE, null)); } finally { @@ -1111,12 +1075,6 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { public static class WriteFailingRegionObserver extends SimpleRegionObserver { @Override public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { - // we need to advance the clock, since the index retry logic (copied from HBase) has a time component - EnvironmentEdge delegate = EnvironmentEdgeManager.getDelegate(); - if (delegate instanceof MyClock) { - MyClock myClock = (MyClock) delegate; - myClock.time += 1000; - } throw new DoNotRetryIOException("Simulating write failure on " + c.getEnvironment().getRegionInfo().getTable().getNameAsString()); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/878a264e/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index b75bf3d..bf8ba39 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -541,28 +541,23 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso TableName.valueOf(table.getPhysicalName().getBytes())); builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_ALREADY_EXISTS); - builder.setMutationTime(currentTime); - if (blockWriteRebuildIndex) { - long disableIndexTimestamp = table.getIndexDisableTimestamp(); - long minNonZerodisableIndexTimestamp = disableIndexTimestamp > 0 ? disableIndexTimestamp : Long.MAX_VALUE; - for (PTable index : table.getIndexes()) { - disableIndexTimestamp = index.getIndexDisableTimestamp(); - if (disableIndexTimestamp > 0 - && (index.getIndexState() == PIndexState.ACTIVE - || index.getIndexState() == PIndexState.PENDING_ACTIVE - || index.getIndexState() == PIndexState.PENDING_DISABLE) - && disableIndexTimestamp < minNonZerodisableIndexTimestamp) { - minNonZerodisableIndexTimestamp = disableIndexTimestamp; - } - } - // Freeze time for table at min non-zero value of INDEX_DISABLE_TIMESTAMP - // This will keep the table consistent with index as the table has had one more - // batch applied to it. - if (minNonZerodisableIndexTimestamp != Long.MAX_VALUE) { - // Subtract one because we add one due to timestamp granularity in Windows - builder.setMutationTime(minNonZerodisableIndexTimestamp - 1); + long disableIndexTimestamp = table.getIndexDisableTimestamp(); + long minNonZerodisableIndexTimestamp = disableIndexTimestamp > 0 ? disableIndexTimestamp : Long.MAX_VALUE; + for (PTable index : table.getIndexes()) { + disableIndexTimestamp = index.getIndexDisableTimestamp(); + if (disableIndexTimestamp > 0 && (index.getIndexState() == PIndexState.ACTIVE || index.getIndexState() == PIndexState.PENDING_ACTIVE) && disableIndexTimestamp < minNonZerodisableIndexTimestamp) { + minNonZerodisableIndexTimestamp = disableIndexTimestamp; } } + // Freeze time for table at min non-zero value of INDEX_DISABLE_TIMESTAMP + // This will keep the table consistent with index as the table has had one more + // batch applied to it. + if (minNonZerodisableIndexTimestamp == Long.MAX_VALUE) { + builder.setMutationTime(currentTime); + } else { + // Subtract one because we add one due to timestamp granularity in Windows + builder.setMutationTime(minNonZerodisableIndexTimestamp - 1); + } if (table.getTimeStamp() != tableTimeStamp) { builder.setTable(PTableImpl.toProto(table)); @@ -928,12 +923,6 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso if (indexState == PIndexState.PENDING_ACTIVE && clientVersion < PhoenixDatabaseMetaData.MIN_PENDING_ACTIVE_INDEX) { indexState = PIndexState.ACTIVE; } - // If client is not yet up to 4.14, then translate PENDING_DISABLE to DISABLE - // since the client won't have this index state in its enum. - if (indexState == PIndexState.PENDING_DISABLE && clientVersion < PhoenixDatabaseMetaData.MIN_PENDING_DISABLE_INDEX) { - // note: for older clients, we have to rely on the rebuilder to transition PENDING_DISABLE -> DISABLE - indexState = PIndexState.DISABLE; - } Cell immutableRowsKv = tableKeyValues[IMMUTABLE_ROWS_INDEX]; boolean isImmutableRows = immutableRowsKv == null ? false : (Boolean) PBoolean.INSTANCE.toObject( @@ -3672,7 +3661,6 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso // Timestamp of INDEX_STATE gets updated with each call long actualTimestamp = currentStateKV.getTimestamp(); long curTimeStampVal = 0; - long newDisableTimeStamp = 0; if ((currentDisableTimeStamp != null && currentDisableTimeStamp.getValueLength() > 0)) { curTimeStampVal = (Long) PLong.INSTANCE.toObject(currentDisableTimeStamp.getValueArray(), currentDisableTimeStamp.getValueOffset(), currentDisableTimeStamp.getValueLength()); @@ -3689,7 +3677,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso done.run(builder.build()); return; } - newDisableTimeStamp = (Long) PLong.INSTANCE.toObject(newDisableTimeStampCell.getValueArray(), + long newDisableTimeStamp = (Long) PLong.INSTANCE.toObject(newDisableTimeStampCell.getValueArray(), newDisableTimeStampCell.getValueOffset(), newDisableTimeStampCell.getValueLength()); // We use the sign of the INDEX_DISABLE_TIMESTAMP to differentiate the keep-index-active (negative) // from block-writes-to-data-table case. In either case, we want to keep the oldest timestamp to @@ -3698,7 +3686,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso // We do legitimately move the INDEX_DISABLE_TIMESTAMP to be newer when we're rebuilding the // index in which case the state will be INACTIVE or PENDING_ACTIVE. if (curTimeStampVal != 0 - && (newState == PIndexState.DISABLE || newState == PIndexState.PENDING_ACTIVE || newState == PIndexState.PENDING_DISABLE) + && (newState == PIndexState.DISABLE || newState == PIndexState.PENDING_ACTIVE) && Math.abs(curTimeStampVal) < Math.abs(newDisableTimeStamp)) { // do not reset disable timestamp as we want to keep the min newKVs.remove(disableTimeStampKVIndex); @@ -3727,13 +3715,6 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso if (newState == PIndexState.ACTIVE) { newState = PIndexState.DISABLE; } - // Can't transition from DISABLE to PENDING_DISABLE - if (newState == PIndexState.PENDING_DISABLE) { - builder.setReturnCode(MetaDataProtos.MutationCode.UNALLOWED_TABLE_MUTATION); - builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis()); - done.run(builder.build()); - return; - } } if (currentState == PIndexState.BUILDING && newState != PIndexState.ACTIVE) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/878a264e/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java index efad1e7..fe11ec7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java @@ -63,7 +63,7 @@ import com.google.protobuf.ByteString; */ public abstract class MetaDataProtocol extends MetaDataService { public static final int PHOENIX_MAJOR_VERSION = 4; - public static final int PHOENIX_MINOR_VERSION = 14; + public static final int PHOENIX_MINOR_VERSION = 13; public static final int PHOENIX_PATCH_NUMBER = 0; public static final int PHOENIX_VERSION = VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER); @@ -93,9 +93,8 @@ public abstract class MetaDataProtocol extends MetaDataService { // Since there's no upgrade code, keep the version the same as the previous version public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_12_0 = MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0; public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_13_0 = MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0; - public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_14_0 = MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0; // MIN_SYSTEM_TABLE_TIMESTAMP needs to be set to the max of all the MIN_SYSTEM_TABLE_TIMESTAMP_* constants - public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_SYSTEM_TABLE_TIMESTAMP_4_14_0; + public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_SYSTEM_TABLE_TIMESTAMP_4_13_0; // ALWAYS update this map whenever rolling out a new release (major, minor or patch release). // Key is the SYSTEM.CATALOG timestamp for the version and value is the version string. @@ -115,7 +114,6 @@ public abstract class MetaDataProtocol extends MetaDataService { TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0, "4.11.x"); TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_12_0, "4.12.x"); TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_13_0, "4.13.x"); - TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_14_0, "4.14.x"); } public static final String CURRENT_CLIENT_VERSION = PHOENIX_MAJOR_VERSION + "." + PHOENIX_MINOR_VERSION + "." + PHOENIX_PATCH_NUMBER; http://git-wip-us.apache.org/repos/asf/phoenix/blob/878a264e/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java index e51a61e..af06235 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java @@ -229,7 +229,6 @@ public class MetaDataRegionObserver extends BaseRegionObserver { private final long rebuildIndexBatchSize; private final long configuredBatches; private final long indexDisableTimestampThreshold; - private final long pendingDisableThreshold; private final ReadOnlyProps props; private final List<String> onlyTheseTables; @@ -248,9 +247,6 @@ public class MetaDataRegionObserver extends BaseRegionObserver { this.indexDisableTimestampThreshold = configuration.getLong(QueryServices.INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD, QueryServicesOptions.DEFAULT_INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD); - this.pendingDisableThreshold = - configuration.getLong(QueryServices.INDEX_PENDING_DISABLE_THRESHOLD, - QueryServicesOptions.DEFAULT_INDEX_PENDING_DISABLE_THRESHOLD); this.props = new ReadOnlyProps(env.getConfiguration().iterator()); } @@ -346,18 +342,6 @@ public class MetaDataRegionObserver extends BaseRegionObserver { } PIndexState indexState = PIndexState.fromSerializedValue(indexStateBytes[0]); - long elapsedSinceDisable = EnvironmentEdgeManager.currentTimeMillis() - Math.abs(indexDisableTimestamp); - - // on an index write failure, the server side transitions to PENDING_DISABLE, then the client - // retries, and after retries are exhausted, disables the index - if (indexState == PIndexState.PENDING_DISABLE) { - if (elapsedSinceDisable > pendingDisableThreshold) { - // too long in PENDING_DISABLE - client didn't disable the index, so we do it here - IndexUtil.updateIndexState(conn, indexTableFullName, PIndexState.DISABLE, indexDisableTimestamp); - } - continue; - } - // Only perform relatively expensive check for all regions online when index // is disabled or pending active since that's the state it's placed into when // an index write fails. @@ -367,8 +351,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver { + indexPTable.getName() + " are online."); continue; } - - if (elapsedSinceDisable > indexDisableTimestampThreshold) { + if (EnvironmentEdgeManager.currentTimeMillis() - Math.abs(indexDisableTimestamp) > indexDisableTimestampThreshold) { /* * It has been too long since the index has been disabled and any future * attempts to reenable it likely will fail. So we are going to mark the http://git-wip-us.apache.org/repos/asf/phoenix/blob/878a264e/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 7692bc8..af50420 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -53,18 +53,22 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.CoprocessorHConnection; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory; +import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; @@ -77,9 +81,9 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.WritableUtils; import org.apache.phoenix.cache.ServerCacheClient; +import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode; import org.apache.phoenix.coprocessor.generated.PTableProtos; import org.apache.phoenix.exception.DataExceedsCapacityException; -import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.execute.TupleProjector; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.ExpressionType; @@ -88,15 +92,13 @@ import org.apache.phoenix.expression.aggregator.Aggregators; import org.apache.phoenix.expression.aggregator.ServerAggregators; import org.apache.phoenix.hbase.index.ValueGetter; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; -import org.apache.phoenix.hbase.index.exception.IndexWriteException; import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.index.PhoenixIndexCodec; -import org.apache.phoenix.index.PhoenixIndexFailurePolicy; -import org.apache.phoenix.index.PhoenixIndexFailurePolicy.MutateCommand; import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.join.HashJoinInfo; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; @@ -134,19 +136,22 @@ import org.apache.phoenix.util.ExpressionUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.KeyValueUtil; import org.apache.phoenix.util.LogUtil; +import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.QueryUtil; -import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.ServerUtil; import org.apache.phoenix.util.StringUtil; +import org.apache.phoenix.util.TimeKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Predicate; import com.google.common.base.Throwables; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.primitives.Ints; @@ -200,8 +205,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver private KeyValueBuilder kvBuilder; private Configuration upsertSelectConfig; private Configuration compactionConfig; - private Configuration indexWriteConfig; - private ReadOnlyProps indexWriteProps; @Override public void start(CoprocessorEnvironment e) throws IOException { @@ -231,13 +234,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver compactionConfig.setInt(HConstants.HBASE_CLIENT_PAUSE, e.getConfiguration().getInt(QueryServices.METADATA_WRITE_RETRY_PAUSE, QueryServicesOptions.DEFAULT_METADATA_WRITE_RETRY_PAUSE)); - - // For retries of index write failures, use the same # of retries as the rebuilder - indexWriteConfig = PropertiesUtil.cloneConfig(e.getConfiguration()); - indexWriteConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, - e.getConfiguration().getInt(QueryServices.INDEX_REBUILD_RPC_RETRIES_COUNTER, - QueryServicesOptions.DEFAULT_INDEX_REBUILD_RPC_RETRIES_COUNTER)); - indexWriteProps = new ReadOnlyProps(indexWriteConfig.iterator()); } private void commitBatch(Region region, List<Mutation> mutations, long blockingMemstoreSize) throws IOException { @@ -258,7 +254,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } } // TODO: should we use the one that is all or none? - logger.debug("Committing batch of " + mutations.size() + " mutations for " + region.getRegionInfo().getTable().getNameAsString()); + logger.debug("Committing bactch of " + mutations.size() + " mutations for " + region.getRegionInfo().getTable().getNameAsString()); region.batchMutate(mutations.toArray(mutationArray), HConstants.NO_NONCE, HConstants.NO_NONCE); } @@ -864,65 +860,21 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } } - private void commit(final Region region, List<Mutation> mutations, byte[] indexUUID, final long blockingMemStoreSize, - byte[] indexMaintainersPtr, byte[] txState, final HTable targetHTable, boolean useIndexProto, + private void commit(Region region, List<Mutation> mutations, byte[] indexUUID, long blockingMemStoreSize, + byte[] indexMaintainersPtr, byte[] txState, HTable targetHTable, boolean useIndexProto, boolean isPKChanging) throws IOException { - final List<Mutation> localRegionMutations = Lists.newArrayList(); - final List<Mutation> remoteRegionMutations = Lists.newArrayList(); + List<Mutation> localRegionMutations = Lists.newArrayList(); + List<Mutation> remoteRegionMutations = Lists.newArrayList(); setIndexAndTransactionProperties(mutations, indexUUID, indexMaintainersPtr, txState, useIndexProto); separateLocalAndRemoteMutations(targetHTable, region, mutations, localRegionMutations, remoteRegionMutations, isPKChanging); - try { - commitBatch(region, localRegionMutations, blockingMemStoreSize); - } catch (IOException e) { - handleIndexWriteException(localRegionMutations, e, new MutateCommand() { - @Override - public void doMutation() throws IOException { - commitBatch(region, localRegionMutations, blockingMemStoreSize); - } - }); - } - try { - commitBatchWithHTable(targetHTable, remoteRegionMutations); - } catch (IOException e) { - handleIndexWriteException(remoteRegionMutations, e, new MutateCommand() { - @Override - public void doMutation() throws IOException { - commitBatchWithHTable(targetHTable, remoteRegionMutations); - } - }); - } + commitBatch(region, localRegionMutations, blockingMemStoreSize); + commitBatchWithHTable(targetHTable, remoteRegionMutations); localRegionMutations.clear(); remoteRegionMutations.clear(); } - private void handleIndexWriteException(final List<Mutation> localRegionMutations, IOException origIOE, - MutateCommand mutateCommand) throws IOException { - long serverTimestamp = ServerUtil.parseTimestampFromRemoteException(origIOE); - SQLException inferredE = ServerUtil.parseLocalOrRemoteServerException(origIOE); - if (inferredE != null && inferredE.getErrorCode() == SQLExceptionCode.INDEX_WRITE_FAILURE.getErrorCode()) { - // For an index write failure, the data table write succeeded, - // so when we retry we need to set REPLAY_WRITES - for (Mutation mutation : localRegionMutations) { - mutation.setAttribute(REPLAY_WRITES, REPLAY_ONLY_INDEX_WRITES); - // use the server timestamp for index write retrys - KeyValueUtil.setTimestamp(mutation, serverTimestamp); - } - IndexWriteException iwe = PhoenixIndexFailurePolicy.getIndexWriteException(inferredE); - try (PhoenixConnection conn = - QueryUtil.getConnectionOnServer(indexWriteConfig) - .unwrap(PhoenixConnection.class)) { - PhoenixIndexFailurePolicy.doBatchWithRetries(mutateCommand, iwe, conn, - indexWriteProps); - } catch (Exception e) { - throw new DoNotRetryIOException(e); - } - } else { - throw origIOE; - } - } - private void separateLocalAndRemoteMutations(HTable targetHTable, Region region, List<Mutation> mutations, List<Mutation> localRegionMutations, List<Mutation> remoteRegionMutations, boolean isPKChanging){ http://git-wip-us.apache.org/repos/asf/phoenix/blob/878a264e/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java index 0f29f3f..2301c32 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java @@ -367,7 +367,6 @@ public enum SQLExceptionCode { CONNECTION_CLOSED(1111, "XCL11", "Connectioin is closed."), INDEX_FAILURE_BLOCK_WRITE(1120, "XCL20", "Writes to table blocked until index can be updated."), - INDEX_WRITE_FAILURE(1121, "XCL21", "Write to the index failed."), UPDATE_CACHE_FREQUENCY_INVALID(1130, "XCL30", "UPDATE_CACHE_FREQUENCY cannot be set to ALWAYS if APPEND_ONLY_SCHEMA is true."), CANNOT_DROP_COL_APPEND_ONLY_SCHEMA(1131, "XCL31", "Cannot drop column from table that with append only schema."), http://git-wip-us.apache.org/repos/asf/phoenix/blob/878a264e/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 0719966..993438e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -53,14 +53,11 @@ import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; -import org.apache.phoenix.hbase.index.exception.IndexWriteException; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.index.IndexMetaDataCacheClient; import org.apache.phoenix.index.PhoenixIndexBuilder; import org.apache.phoenix.index.PhoenixIndexCodec; -import org.apache.phoenix.index.PhoenixIndexFailurePolicy; -import org.apache.phoenix.index.PhoenixIndexFailurePolicy.MutateCommand; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixStatement.Operation; import org.apache.phoenix.monitoring.GlobalClientMetrics; @@ -986,8 +983,6 @@ public class MutationState implements SQLCloseable { long mutationCommitTime = 0; long numFailedMutations = 0;; long startTime = 0; - boolean shouldRetryIndexedMutation = false; - IndexWriteException iwe = null; do { TableRef origTableRef = tableInfo.getOrigTableRef(); PTable table = origTableRef.getTable(); @@ -1021,25 +1016,8 @@ public class MutationState implements SQLCloseable { startTime = System.currentTimeMillis(); child.addTimelineAnnotation("Attempt " + retryCount); List<List<Mutation>> mutationBatchList = getMutationBatchList(batchSize, batchSizeBytes, mutationList); - for (final List<Mutation> mutationBatch : mutationBatchList) { - if (shouldRetryIndexedMutation) { - // if there was an index write failure, retry the mutation in a loop - final HTableInterface finalHTable = hTable; - PhoenixIndexFailurePolicy.doBatchWithRetries(new MutateCommand() { - @Override - public void doMutation() throws IOException { - try { - finalHTable.batch(mutationBatch); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException(e); - } - }}, iwe, - connection, connection.getQueryServices().getProps()); - } else { - hTable.batch(mutationBatch); - } - + for (List<Mutation> mutationBatch : mutationBatchList) { + hTable.batch(mutationBatch); batchCount++; if (logger.isDebugEnabled()) logger.debug("Sent batch of " + mutationBatch.size() + " for " + Bytes.toString(htableName)); } @@ -1076,19 +1054,6 @@ public class MutationState implements SQLCloseable { child = Tracing.child(span,"Failed batch, attempting retry"); continue; - } else if (inferredE.getErrorCode() == SQLExceptionCode.INDEX_WRITE_FAILURE.getErrorCode()) { - iwe = PhoenixIndexFailurePolicy.getIndexWriteException(inferredE); - if (iwe != null && !shouldRetryIndexedMutation) { - // For an index write failure, the data table write succeeded, - // so when we retry we need to set REPLAY_WRITES - for (Mutation m : mutationList) { - m.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES, BaseScannerRegionObserver.REPLAY_ONLY_INDEX_WRITES); - KeyValueUtil.setTimestamp(m, serverTimestamp); - } - shouldRetry = true; - shouldRetryIndexedMutation = true; - continue; - } } e = inferredE; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/878a264e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java index f8195f1..9686789 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java @@ -185,6 +185,7 @@ public class Indexer extends BaseRegionObserver { private long slowPostOpenThreshold; private long slowPreIncrementThreshold; private int rowLockWaitDuration; + private Configuration compactionConfig; public static final String RecoveryFailurePolicyKeyForTesting = INDEX_RECOVERY_FAILURE_POLICY_KEY; @@ -241,6 +242,15 @@ public class Indexer extends BaseRegionObserver { this.metricSource = MetricsIndexerSourceFactory.getInstance().create(); setSlowThresholds(e.getConfiguration()); + compactionConfig = PropertiesUtil.cloneConfig(e.getConfiguration()); + // lower the number of rpc retries, so we don't hang the compaction + compactionConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + e.getConfiguration().getInt(QueryServices.METADATA_WRITE_RETRIES_NUMBER, + QueryServicesOptions.DEFAULT_METADATA_WRITE_RETRIES_NUMBER)); + compactionConfig.setInt(HConstants.HBASE_CLIENT_PAUSE, + e.getConfiguration().getInt(QueryServices.METADATA_WRITE_RETRY_PAUSE, + QueryServicesOptions.DEFAULT_METADATA_WRITE_RETRY_PAUSE)); + try { // get the specified failure policy. We only ever override it in tests, but we need to do it // here http://git-wip-us.apache.org/repos/asf/phoenix/blob/878a264e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/IndexWriteException.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/IndexWriteException.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/IndexWriteException.java index 531baa6..2ec29bc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/IndexWriteException.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/IndexWriteException.java @@ -17,10 +17,7 @@ */ package org.apache.phoenix.hbase.index.exception; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import org.apache.hadoop.hbase.HBaseIOException; -import org.apache.phoenix.query.QueryServicesOptions; /** * Generic {@link Exception} that an index write has failed @@ -28,57 +25,19 @@ import org.apache.phoenix.query.QueryServicesOptions; @SuppressWarnings("serial") public class IndexWriteException extends HBaseIOException { - /* - * We pass this message back to the client so that the config only needs to be set on the - * server side. - */ - private static final String DISABLE_INDEX_ON_FAILURE_MSG = "disableIndexOnFailure="; - private boolean disableIndexOnFailure; - public IndexWriteException() { super(); } - /** - * Used for the case where we cannot reach the index, but not sure of the table or the mutations - * that caused the failure - * @param message - * @param cause - */ public IndexWriteException(String message, Throwable cause) { - super(message, cause); - } - - public IndexWriteException(String message, Throwable cause, boolean disableIndexOnFailure) { - super(prependDisableIndexMsg(message, disableIndexOnFailure), cause); + super(message, cause); } - public IndexWriteException(String message, boolean disableIndexOnFailure) { - super(prependDisableIndexMsg(message, disableIndexOnFailure)); - this.disableIndexOnFailure = disableIndexOnFailure; + public IndexWriteException(String message) { + super(message); } - private static String prependDisableIndexMsg(String message, boolean disableIndexOnFailure) { - return DISABLE_INDEX_ON_FAILURE_MSG + disableIndexOnFailure + " " + message; -} - -public IndexWriteException(Throwable cause) { + public IndexWriteException(Throwable cause) { super(cause); } - - public static boolean parseDisableIndexOnFailure(String message) { - Pattern p = - Pattern.compile(DISABLE_INDEX_ON_FAILURE_MSG + "(true|false)", - Pattern.CASE_INSENSITIVE); - Matcher m = p.matcher(message); - if (m.find()) { - boolean disableIndexOnFailure = Boolean.parseBoolean(m.group(1)); - return disableIndexOnFailure; - } - return QueryServicesOptions.DEFAULT_INDEX_FAILURE_DISABLE_INDEX; - } - - public boolean isDisableIndexOnFailure() { - return disableIndexOnFailure; - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/878a264e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/MultiIndexWriteFailureException.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/MultiIndexWriteFailureException.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/MultiIndexWriteFailureException.java index d593791..546b43d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/MultiIndexWriteFailureException.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/MultiIndexWriteFailureException.java @@ -18,14 +18,8 @@ package org.apache.phoenix.hbase.index.exception; import java.util.List; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.hbase.index.table.HTableInterfaceReference; -import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; - -import com.google.common.collect.Lists; /** * Indicate a failure to write to multiple index tables. @@ -33,34 +27,15 @@ import com.google.common.collect.Lists; @SuppressWarnings("serial") public class MultiIndexWriteFailureException extends IndexWriteException { - public static final String FAILURE_MSG = "Failed to write to multiple index tables: "; private List<HTableInterfaceReference> failures; /** * @param failures the tables to which the index write did not succeed */ - public MultiIndexWriteFailureException(List<HTableInterfaceReference> failures, boolean disableIndexOnFailure) { - super(FAILURE_MSG + failures, disableIndexOnFailure); + public MultiIndexWriteFailureException(List<HTableInterfaceReference> failures) { + super("Failed to write to multiple index tables"); this.failures = failures; - } - /** - * This constructor used to rematerialize this exception when receiving - * an rpc exception from the server - * @param message detail message - */ - public MultiIndexWriteFailureException(String message) { - super(message, IndexWriteException.parseDisableIndexOnFailure(message)); - Pattern p = Pattern.compile(FAILURE_MSG + "\\[(.*)\\]"); - Matcher m = p.matcher(message); - if (m.find()) { - failures = Lists.newArrayList(); - String tablesStr = m.group(1); - for (String tableName : tablesStr.split(",\\s")) { - HTableInterfaceReference tableRef = new HTableInterfaceReference(new ImmutableBytesPtr(Bytes.toBytes(tableName))); - failures.add(tableRef); - } - } } public List<HTableInterfaceReference> getFailedTables() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/878a264e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/SingleIndexWriteFailureException.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/SingleIndexWriteFailureException.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/SingleIndexWriteFailureException.java index 610a82a..eb3b521 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/SingleIndexWriteFailureException.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/SingleIndexWriteFailureException.java @@ -18,8 +18,6 @@ package org.apache.phoenix.hbase.index.exception; import java.util.List; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import org.apache.hadoop.hbase.client.Mutation; @@ -29,7 +27,6 @@ import org.apache.hadoop.hbase.client.Mutation; @SuppressWarnings("serial") public class SingleIndexWriteFailureException extends IndexWriteException { - public static final String FAILED_MSG = "Failed to make index update:"; private String table; /** @@ -48,27 +45,13 @@ public class SingleIndexWriteFailureException extends IndexWriteException { * @param cause underlying reason for the failure */ public SingleIndexWriteFailureException(String targetTableName, List<Mutation> mutations, - Exception cause, boolean disableIndexOnFailure) { - super(FAILED_MSG + "\n\t table: " + targetTableName + "\n\t edits: " + mutations - + "\n\tcause: " + cause == null ? "UNKNOWN" : cause.getMessage(), cause, disableIndexOnFailure); + Exception cause) { + super("Failed to make index update:\n\t table: " + targetTableName + "\n\t edits: " + mutations + + "\n\tcause: " + cause == null ? "UNKNOWN" : cause.getMessage(), cause); this.table = targetTableName; } /** - * This constructor used to rematerialize this exception when receiving - * an rpc exception from the server - * @param message detail message - */ - public SingleIndexWriteFailureException(String msg) { - super(msg, IndexWriteException.parseDisableIndexOnFailure(msg)); - Pattern pattern = Pattern.compile(FAILED_MSG + ".* table: ([\\S]*)\\s.*", Pattern.DOTALL); - Matcher m = pattern.matcher(msg); - if (m.find()) { - this.table = m.group(1); - } - } - - /** * @return The table to which we failed to write the index updates. If unknown, returns * <tt>null</tt> */ http://git-wip-us.apache.org/repos/asf/phoenix/blob/878a264e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java index 29b9faf..3649069 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java @@ -70,13 +70,13 @@ public class IndexWriterUtils { public static final String HTABLE_KEEP_ALIVE_KEY = "hbase.htable.threads.keepalivetime"; public static final String INDEX_WRITER_RPC_RETRIES_NUMBER = "phoenix.index.writes.rpc.retries.number"; - /** - * Retry server-server index write rpc only once, and let the client retry the data write - * instead to avoid typing up the handler - */ - // note in HBase 2+, numTries = numRetries + 1 - // in prior versions, numTries = numRetries - public static final int DEFAULT_INDEX_WRITER_RPC_RETRIES_NUMBER = 1; + /** + * Based on the logic in HBase's AsyncProcess, a default of 11 retries with a pause of 100ms + * approximates 48 sec total retry time (factoring in backoffs). The total time should be less + * than HBase's rpc timeout (default of 60 sec) or else the client will retry before receiving + * the response + */ + public static final int DEFAULT_INDEX_WRITER_RPC_RETRIES_NUMBER = 11; public static final String INDEX_WRITER_RPC_PAUSE = "phoenix.index.writes.rpc.pause"; public static final int DEFAULT_INDEX_WRITER_RPC_PAUSE = 100; http://git-wip-us.apache.org/repos/asf/phoenix/blob/878a264e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java index 0bb8784..e4e8343 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java @@ -35,7 +35,6 @@ import org.apache.phoenix.hbase.index.parallel.ThreadPoolManager; import org.apache.phoenix.hbase.index.table.HTableFactory; import org.apache.phoenix.hbase.index.table.HTableInterfaceReference; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; -import org.apache.phoenix.index.PhoenixIndexFailurePolicy; import org.apache.phoenix.util.IndexUtil; import com.google.common.collect.Multimap; @@ -168,11 +167,11 @@ public class ParallelWriterIndexCommitter implements IndexCommitter { } catch (SingleIndexWriteFailureException e) { throw e; } catch (IOException e) { - throw new SingleIndexWriteFailureException(tableReference.toString(), mutations, e, PhoenixIndexFailurePolicy.getDisableIndexOnFailure(env)); + throw new SingleIndexWriteFailureException(tableReference.toString(), mutations, e); } catch (InterruptedException e) { // reset the interrupt status on the thread Thread.currentThread().interrupt(); - throw new SingleIndexWriteFailureException(tableReference.toString(), mutations, e, PhoenixIndexFailurePolicy.getDisableIndexOnFailure(env)); + throw new SingleIndexWriteFailureException(tableReference.toString(), mutations, e); } finally{ if (table != null) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/878a264e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java index 94d4f0f..0449e9e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java @@ -40,7 +40,6 @@ import org.apache.phoenix.hbase.index.parallel.WaitForCompletionTaskRunner; import org.apache.phoenix.hbase.index.table.HTableFactory; import org.apache.phoenix.hbase.index.table.HTableInterfaceReference; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; -import org.apache.phoenix.index.PhoenixIndexFailurePolicy; import org.apache.phoenix.util.IndexUtil; import com.google.common.collect.Multimap; @@ -111,7 +110,6 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter { this.factory = factory; this.abortable = new CapturingAbortable(abortable); this.stopped = stop; - this.env = env; } @Override @@ -228,8 +226,7 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter { // if any of the tasks failed, then we need to propagate the failure if (failures.size() > 0) { // make the list unmodifiable to avoid any more synchronization concerns - throw new MultiIndexWriteFailureException(Collections.unmodifiableList(failures), - PhoenixIndexFailurePolicy.getDisableIndexOnFailure(env)); + throw new MultiIndexWriteFailureException(Collections.unmodifiableList(failures)); } return; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/878a264e/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java index 14f8307..ba6371b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java @@ -30,28 +30,21 @@ import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.Stoppable; -import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode; -import org.apache.phoenix.exception.SQLExceptionCode; -import org.apache.phoenix.exception.SQLExceptionInfo; -import org.apache.phoenix.hbase.index.exception.IndexWriteException; import org.apache.phoenix.hbase.index.exception.MultiIndexWriteFailureException; -import org.apache.phoenix.hbase.index.exception.SingleIndexWriteFailureException; import org.apache.phoenix.hbase.index.table.HTableInterfaceReference; import org.apache.phoenix.hbase.index.write.DelegateIndexFailurePolicy; import org.apache.phoenix.hbase.index.write.KillServerOnFailurePolicy; @@ -66,7 +59,6 @@ import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.QueryUtil; -import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.ServerUtil; @@ -110,8 +102,14 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy { rebuildIndexOnFailure = Boolean.parseBoolean(value); } } - disableIndexOnFailure = getDisableIndexOnFailure(env); - String value = htd.getValue(BLOCK_DATA_TABLE_WRITES_ON_WRITE_FAILURE); + String value = htd.getValue(DISABLE_INDEX_ON_WRITE_FAILURE); + if (value == null) { + disableIndexOnFailure = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_DISABLE_INDEX, + QueryServicesOptions.DEFAULT_INDEX_FAILURE_DISABLE_INDEX); + } else { + disableIndexOnFailure = Boolean.parseBoolean(value); + } + value = htd.getValue(BLOCK_DATA_TABLE_WRITES_ON_WRITE_FAILURE); if (value == null) { blockDataTableWritesOnFailure = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE, QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE); @@ -151,11 +149,7 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy { throwing = false; } finally { if (!throwing) { - SQLException sqlException = - new SQLExceptionInfo.Builder(SQLExceptionCode.INDEX_WRITE_FAILURE) - .setRootCause(cause).setMessage(cause.getLocalizedMessage()).build() - .buildException(); - IOException ioException = ServerUtil.wrapInDoNotRetryIOException(null, sqlException, timestamp); + IOException ioException = ServerUtil.wrapInDoNotRetryIOException("Unable to update the following indexes: " + attempted.keySet(), cause, timestamp); Mutation m = attempted.entries().iterator().next().getValue(); boolean isIndexRebuild = PhoenixIndexMetaData.isIndexRebuild(m.getAttributesMap()); // Always throw if rebuilding index since the rebuilder needs to know if it was successful @@ -218,7 +212,7 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy { return timestamp; } - final PIndexState newState = disableIndexOnFailure ? PIndexState.PENDING_DISABLE : PIndexState.PENDING_ACTIVE; + final PIndexState newState = disableIndexOnFailure ? PIndexState.DISABLE : PIndexState.PENDING_ACTIVE; final long fTimestamp=timestamp; // for all the index tables that we've found, try to disable them and if that fails, try to return User.runAsLoginUser(new PrivilegedExceptionAction<Long>() { @@ -260,9 +254,12 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy { throw new DoNotRetryIOException("Attempt to disable " + indexTableName + " failed."); } } - LOG.info("Successfully update INDEX_DISABLE_TIMESTAMP for " + indexTableName - + " due to an exception while writing updates. indexState=" + newState, - cause); + if (leaveIndexActive) + LOG.info("Successfully update INDEX_DISABLE_TIMESTAMP for " + indexTableName + + " due to an exception while writing updates.", cause); + else + LOG.info("Successfully disabled index " + indexTableName + + " due to an exception while writing updates.", cause); } catch (Throwable t) { if (t instanceof Exception) { throw (Exception)t; @@ -334,158 +331,4 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy { } return indexTableNames; } - - /** - * Check config for whether to disable index on index write failures - * @param htd - * @param config - * @param connection - * @return The table config for {@link PhoenixIndexFailurePolicy.DISABLE_INDEX_ON_WRITE_FAILURE} - * @throws SQLException - */ - public static boolean getDisableIndexOnFailure(RegionCoprocessorEnvironment env) { - HTableDescriptor htd = env.getRegion().getTableDesc(); - Configuration config = env.getConfiguration(); - String value = htd.getValue(PhoenixIndexFailurePolicy.DISABLE_INDEX_ON_WRITE_FAILURE); - boolean disableIndexOnFailure; - if (value == null) { - disableIndexOnFailure = - config.getBoolean(QueryServices.INDEX_FAILURE_DISABLE_INDEX, - QueryServicesOptions.DEFAULT_INDEX_FAILURE_DISABLE_INDEX); - } else { - disableIndexOnFailure = Boolean.parseBoolean(value); - } - return disableIndexOnFailure; - } - - /** - * If we're leaving the index active after index write failures on the server side, then we get - * the exception on the client side here after hitting the max # of hbase client retries. We - * disable the index as it may now be inconsistent. The indexDisableTimestamp was already set - * on the server side, so the rebuilder will be run. - */ - private static void handleIndexWriteFailureFromClient(IndexWriteException indexWriteException, - PhoenixConnection conn) { - handleExceptionFromClient(indexWriteException, conn, PIndexState.DISABLE); - } - - private static void handleIndexWriteSuccessFromClient(IndexWriteException indexWriteException, - PhoenixConnection conn) { - handleExceptionFromClient(indexWriteException, conn, PIndexState.ACTIVE); - } - - private static void handleExceptionFromClient(IndexWriteException indexWriteException, - PhoenixConnection conn, PIndexState indexState) { - try { - Set<String> indexesToUpdate = new HashSet<>(); - if (indexWriteException instanceof MultiIndexWriteFailureException) { - MultiIndexWriteFailureException indexException = - (MultiIndexWriteFailureException) indexWriteException; - List<HTableInterfaceReference> failedIndexes = indexException.getFailedTables(); - if (indexException.isDisableIndexOnFailure() && failedIndexes != null) { - for (HTableInterfaceReference failedIndex : failedIndexes) { - String failedIndexTable = failedIndex.getTableName(); - if (!indexesToUpdate.contains(failedIndexTable)) { - updateIndex(failedIndexTable, conn, indexState); - indexesToUpdate.add(failedIndexTable); - } - } - } - } else if (indexWriteException instanceof SingleIndexWriteFailureException) { - SingleIndexWriteFailureException indexException = - (SingleIndexWriteFailureException) indexWriteException; - String failedIndex = indexException.getTableName(); - if (indexException.isDisableIndexOnFailure() && failedIndex != null) { - updateIndex(failedIndex, conn, indexState); - } - } - } catch (Exception handleE) { - LOG.warn("Error while trying to handle index write exception", indexWriteException); - } - } - - public static interface MutateCommand { - void doMutation() throws IOException; - } - - /** - * Retries a mutationBatch where the index write failed. - * One attempt should have already been made before calling this. - * Max retries and exponential backoff logic mimics that of HBase's client - * If max retries are hit, the index is disabled. - * If the write is successful on a subsequent retry, the index is set back to ACTIVE - * @param mutateCommand mutation command to execute - * @param iwe original IndexWriteException - * @param connection connection to use - * @param config config used to get retry settings - * @throws Exception - */ - public static void doBatchWithRetries(MutateCommand mutateCommand, - IndexWriteException iwe, PhoenixConnection connection, ReadOnlyProps config) - throws IOException { - int maxTries = config.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, - HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); - long pause = config.getLong(HConstants.HBASE_CLIENT_PAUSE, - HConstants.DEFAULT_HBASE_CLIENT_PAUSE); - int numRetry = 1; // already tried once - // calculate max time to retry for - int timeout = 0; - for (int i = 0; i < maxTries; ++i) { - timeout = (int) (timeout + ConnectionUtils.getPauseTime(pause, i)); - } - long canRetryUntil = EnvironmentEdgeManager.currentTime() + timeout; - while (canRetryMore(numRetry++, maxTries, canRetryUntil)) { - try { - Thread.sleep(ConnectionUtils.getPauseTime(pause, numRetry)); // HBase's exponential backoff - mutateCommand.doMutation(); - // success - change the index state from PENDING_DISABLE back to ACTIVE - handleIndexWriteSuccessFromClient(iwe, connection); - return; - } catch (IOException e) { - SQLException inferredE = ServerUtil.parseLocalOrRemoteServerException(e); - if (inferredE == null || inferredE.getErrorCode() != SQLExceptionCode.INDEX_WRITE_FAILURE.getErrorCode()) { - // if it's not an index write exception, throw exception, to be handled normally in caller's try-catch - throw e; - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException(e); - } - } - // max retries hit - disable the index - handleIndexWriteFailureFromClient(iwe, connection); - throw new DoNotRetryIOException(iwe); // send failure back to client - } - - private static boolean canRetryMore(int numRetry, int maxRetries, long canRetryUntil) { - // If there is a single try we must not take into account the time. - return numRetry < maxRetries - || (maxRetries > 1 && EnvironmentEdgeManager.currentTime() < canRetryUntil); - } - - /** - * Converts from SQLException to IndexWriteException - * @param sqlE the SQLException - * @return the IndexWriteException - */ - public static IndexWriteException getIndexWriteException(SQLException sqlE) { - String sqlMsg = sqlE.getMessage(); - if (sqlMsg.contains(MultiIndexWriteFailureException.FAILURE_MSG)) { - return new MultiIndexWriteFailureException(sqlMsg); - } else if (sqlMsg.contains(SingleIndexWriteFailureException.FAILED_MSG)) { - return new SingleIndexWriteFailureException(sqlMsg); - } - return null; - } - - private static void updateIndex(String indexFullName, PhoenixConnection conn, - PIndexState indexState) throws SQLException { - if (PIndexState.DISABLE.equals(indexState)) { - LOG.info("Disabling index after hitting max number of index write retries: " - + indexFullName); - } else if (PIndexState.ACTIVE.equals(indexState)) { - LOG.debug("Resetting index to active after subsequent success " + indexFullName); - } - IndexUtil.updateIndexState(conn, indexFullName, indexState, null); - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/878a264e/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java index 094f743..23330d8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java @@ -322,7 +322,6 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData { public static final int MIN_RENEW_LEASE_VERSION = VersionUtil.encodeVersion("1", "1", "3"); public static final int MIN_NAMESPACE_MAPPED_PHOENIX_VERSION = VersionUtil.encodeVersion("4", "8", "0"); public static final int MIN_PENDING_ACTIVE_INDEX = VersionUtil.encodeVersion("4", "12", "0"); - public static final int MIN_PENDING_DISABLE_INDEX = VersionUtil.encodeVersion("4", "14", "0"); // Version below which we should turn off essential column family. public static final int ESSENTIAL_FAMILY_VERSION_THRESHOLD = VersionUtil.encodeVersion("0", "94", "7"); http://git-wip-us.apache.org/repos/asf/phoenix/blob/878a264e/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java index 3f5f5ed..4192869 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java @@ -57,9 +57,7 @@ import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.PTableType; -import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.types.PDataType; -import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.IndexUtil; import com.google.common.collect.Lists; @@ -70,14 +68,11 @@ public class QueryOptimizer { private final QueryServices services; private final boolean useIndexes; private final boolean costBased; - private long indexPendingDisabledThreshold; public QueryOptimizer(QueryServices services) { this.services = services; this.useIndexes = this.services.getProps().getBoolean(QueryServices.USE_INDEXES_ATTRIB, QueryServicesOptions.DEFAULT_USE_INDEXES); this.costBased = this.services.getProps().getBoolean(QueryServices.COST_BASED_OPTIMIZER_ENABLED, QueryServicesOptions.DEFAULT_COST_BASED_OPTIMIZER_ENABLED); - this.indexPendingDisabledThreshold = this.services.getProps().getLong(QueryServices.INDEX_PENDING_DISABLE_THRESHOLD, - QueryServicesOptions.DEFAULT_INDEX_PENDING_DISABLE_THRESHOLD); } public QueryPlan optimize(PhoenixStatement statement, QueryPlan dataPlan) throws SQLException { @@ -163,7 +158,7 @@ public class QueryOptimizer { return hintedPlan == null ? orderPlansBestToWorst(select, plans, stopAtBestPlan) : plans; } - private QueryPlan getHintedQueryPlan(PhoenixStatement statement, SelectStatement select, List<PTable> indexes, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, List<QueryPlan> plans) throws SQLException { + private static QueryPlan getHintedQueryPlan(PhoenixStatement statement, SelectStatement select, List<PTable> indexes, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, List<QueryPlan> plans) throws SQLException { QueryPlan dataPlan = plans.get(0); String indexHint = select.getHint().getHint(Hint.INDEX); if (indexHint == null) { @@ -220,7 +215,7 @@ public class QueryOptimizer { return -1; } - private QueryPlan addPlan(PhoenixStatement statement, SelectStatement select, PTable index, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, QueryPlan dataPlan, boolean isHinted) throws SQLException { + private static QueryPlan addPlan(PhoenixStatement statement, SelectStatement select, PTable index, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, QueryPlan dataPlan, boolean isHinted) throws SQLException { int nColumns = dataPlan.getProjector().getColumnCount(); String tableAlias = dataPlan.getTableRef().getTableAlias(); String alias = tableAlias==null ? null : '"' + tableAlias + '"'; // double quote in case it's case sensitive @@ -234,11 +229,8 @@ public class QueryOptimizer { // We will or will not do tuple projection according to the data plan. boolean isProjected = dataPlan.getContext().getResolver().getTables().get(0).getTable().getType() == PTableType.PROJECTED; // Check index state of now potentially updated index table to make sure it's active - TableRef indexTableRef = resolver.getTables().get(0); - PTable indexTable = indexTableRef.getTable(); - PIndexState indexState = indexTable.getIndexState(); - if (indexState == PIndexState.ACTIVE || indexState == PIndexState.PENDING_ACTIVE - || (indexState == PIndexState.PENDING_DISABLE && isUnderPendingDisableThreshold(indexTableRef.getCurrentTime(), indexTable.getIndexDisableTimestamp()))) { + PIndexState indexState = resolver.getTables().get(0).getTable().getIndexState(); + if (indexState == PIndexState.ACTIVE || indexState == PIndexState.PENDING_ACTIVE) { try { // translate nodes that match expressions that are indexed to the associated column parse node indexSelect = ParseNodeRewriter.rewrite(indexSelect, new IndexExpressionParseNodeRewriter(index, null, statement.getConnection(), indexSelect.getUdfParseNodes())); @@ -254,13 +246,10 @@ public class QueryOptimizer { && !plan.getContext().getDataColumns().isEmpty()) { return null; } - indexTableRef = plan.getTableRef(); - indexTable = indexTableRef.getTable(); - indexState = indexTable.getIndexState(); + indexState = plan.getTableRef().getTable().getIndexState(); // Checking number of columns handles the wildcard cases correctly, as in that case the index // must contain all columns from the data table to be able to be used. - if (indexState == PIndexState.ACTIVE || indexState == PIndexState.PENDING_ACTIVE - || (indexState == PIndexState.PENDING_DISABLE && isUnderPendingDisableThreshold(indexTableRef.getCurrentTime(), indexTable.getIndexDisableTimestamp()))) { + if (indexState == PIndexState.ACTIVE || indexState == PIndexState.PENDING_ACTIVE) { if (plan.getProjector().getColumnCount() == nColumns) { return plan; } else if (index.getIndexType() == IndexType.GLOBAL) { @@ -323,12 +312,6 @@ public class QueryOptimizer { return null; } - // returns true if we can still use the index - // retuns false if we've been in PENDING_DISABLE too long - index should be considered disabled - private boolean isUnderPendingDisableThreshold(long currentTimestamp, long indexDisableTimestamp) { - return currentTimestamp - indexDisableTimestamp <= indexPendingDisabledThreshold; - } - /** * Order the plans among all the possible ones from best to worst. * If option COST_BASED_OPTIMIZER_ENABLED is on and stats are available, we order the plans based on http://git-wip-us.apache.org/repos/asf/phoenix/blob/878a264e/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 2a31f09..0b80f4d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -144,8 +144,6 @@ public interface QueryServices extends SQLCloseable { public static final String INDEX_FAILURE_HANDLING_REBUILD_NUMBER_OF_BATCHES_PER_TABLE = "phoenix.index.rebuild.batch.perTable"; // If index disable timestamp is older than this threshold, then index rebuild task won't attempt to rebuild it public static final String INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD = "phoenix.index.rebuild.disabletimestamp.threshold"; - // threshold number of ms an index has been in PENDING_DISABLE, beyond which we consider it disabled - public static final String INDEX_PENDING_DISABLE_THRESHOLD = "phoenix.index.pending.disable.threshold"; // Block writes to data table when index write fails public static final String INDEX_FAILURE_BLOCK_WRITE = "phoenix.index.failure.block.write"; http://git-wip-us.apache.org/repos/asf/phoenix/blob/878a264e/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index d749433..4d31974 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -198,7 +198,6 @@ public class QueryServicesOptions { public static final long DEFAULT_INDEX_REBUILD_CLIENT_SCANNER_TIMEOUT = 30000 * 60; // 30 mins public static final int DEFAULT_INDEX_REBUILD_RPC_RETRIES_COUNTER = 5; // 5 total tries at rpc level public static final int DEFAULT_INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD = 30000 * 60; // 30 mins - public static final long DEFAULT_INDEX_PENDING_DISABLE_THRESHOLD = 30000; // 30 secs /** * HConstants#HIGH_QOS is the max we will see to a standard table. We go higher to differentiate http://git-wip-us.apache.org/repos/asf/phoenix/blob/878a264e/phoenix-core/src/main/java/org/apache/phoenix/schema/PIndexState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PIndexState.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PIndexState.java index 2b6ac4a..d7dbeca 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PIndexState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PIndexState.java @@ -27,12 +27,7 @@ public enum PIndexState { INACTIVE("i"), DISABLE("x"), REBUILD("r"), - PENDING_ACTIVE("p"), - // Used when disabling an index on write failure (PHOENIX-4130) - // When an index write fails, it is put in this state, and we let the client retry the mutation - // After retries are exhausted, the client should mark the index as disabled, but if that - // doesn't happen, then the index is considered disabled if it's been in this state too long - PENDING_DISABLE("w"); + PENDING_ACTIVE("p"); private final String serializedValue; private final byte[] serializedBytes; http://git-wip-us.apache.org/repos/asf/phoenix/blob/878a264e/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java index 4d8565f..df6a349 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java @@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.execute.MutationState.MultiRowMutationState; import org.apache.phoenix.execute.MutationState.RowMutationState; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; @@ -133,17 +132,6 @@ public class KeyValueUtil { return kvs[pos]; } - public static void setTimestamp(Mutation m, long timestamp) { - byte[] tsBytes = Bytes.toBytes(timestamp); - for (List<Cell> family : m.getFamilyCellMap().values()) { - List<KeyValue> familyKVs = org.apache.hadoop.hbase.KeyValueUtil.ensureKeyValues(family); - for (KeyValue kv : familyKVs) { - int tsOffset = kv.getTimestampOffset(); - System.arraycopy(tsBytes, 0, kv.getBuffer(), tsOffset, Bytes.SIZEOF_LONG); - } - } - } - /* * Special comparator, *only* works for binary search. *