Repository: phoenix Updated Branches: refs/heads/4.x-HBase-0.98 7f38f7e70 -> 458973fd3
PHOENIX-4169 Explicitly cap timeout for index disable RPC on compaction (Vincent Poon) Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/63a409a4 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/63a409a4 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/63a409a4 Branch: refs/heads/4.x-HBase-0.98 Commit: 63a409a4028ac27e06c8a152ea75cf6b8cd32d1a Parents: 7f38f7e Author: James Taylor <jamestay...@apache.org> Authored: Tue Sep 12 17:06:21 2017 -0700 Committer: James Taylor <jamestay...@apache.org> Committed: Tue Sep 12 17:06:21 2017 -0700 ---------------------------------------------------------------------- .../UngroupedAggregateRegionObserver.java | 29 ++++++++++++++++---- .../org/apache/phoenix/hbase/index/Indexer.java | 14 +++++++++- .../org/apache/phoenix/query/QueryServices.java | 4 +++ .../phoenix/query/QueryServicesOptions.java | 5 ++++ 4 files changed, 46 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/63a409a4/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index b4d7e7f..4ae5087 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.CoprocessorHConnection; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; @@ -68,6 +69,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScanType; @@ -98,6 +100,7 @@ import org.apache.phoenix.index.PhoenixIndexCodec; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.join.HashJoinInfo; import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.ColumnFamilyNotFoundException; import org.apache.phoenix.schema.PColumn; @@ -192,6 +195,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver private static final Logger logger = LoggerFactory.getLogger(UngroupedAggregateRegionObserver.class); private KeyValueBuilder kvBuilder; private Configuration upsertSelectConfig; + private Configuration compactionConfig; @Override public void start(CoprocessorEnvironment e) throws IOException { @@ -212,6 +216,15 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver */ upsertSelectConfig.setClass(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, InterRegionServerIndexRpcControllerFactory.class, RpcControllerFactory.class); + + compactionConfig = PropertiesUtil.cloneConfig(e.getConfiguration()); + // lower the number of rpc retries, so we don't hang the compaction + compactionConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + e.getConfiguration().getInt(QueryServices.METADATA_WRITE_RETRIES_NUMBER, + QueryServicesOptions.DEFAULT_METADATA_WRITE_RETRIES_NUMBER)); + compactionConfig.setInt(HConstants.HBASE_CLIENT_PAUSE, + e.getConfiguration().getInt(QueryServices.METADATA_WRITE_RETRY_PAUSE, + QueryServicesOptions.DEFAULT_METADATA_WRITE_RETRY_PAUSE)); } private void commitBatch(HRegion region, List<Mutation> mutations, long blockingMemstoreSize) throws IOException { @@ -929,11 +942,16 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver public Void run() throws Exception { MutationCode mutationCode = null; long disableIndexTimestamp = 0; - - try (HTableInterface htable = e.getEnvironment().getTable( - SchemaUtil.getPhysicalTableName( - PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, - e.getEnvironment().getConfiguration()))) { + + try (CoprocessorHConnection coprocessorHConnection = + new CoprocessorHConnection(compactionConfig, + (HRegionServer) e.getEnvironment() + .getRegionServerServices()); + HTableInterface htable = + coprocessorHConnection + .getTable(SchemaUtil.getPhysicalTableName( + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, + compactionConfig))) { String tableName = e.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString(); // FIXME: if this is an index on a view, we won't find a row for it in SYSTEM.CATALOG // Instead, we need to disable all indexes on the view. @@ -946,6 +964,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver if (cell.getValueLength() > 0) { disableIndexTimestamp = PLong.INSTANCE.getCodec().decodeLong(cell.getValueArray(), cell.getValueOffset(), SortOrder.getDefault()); if (disableIndexTimestamp != 0) { + logger.info("Major compaction running while index on table is disabled. Clearing index disable timestamp: " + tableName); mutationCode = IndexUtil.updateIndexState(tableKey, 0L, htable, PIndexState.DISABLE).getMutationCode(); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/63a409a4/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java index 9c851c4..b50b900 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java @@ -87,6 +87,7 @@ import org.apache.phoenix.hbase.index.write.RecoveryIndexWriter; import org.apache.phoenix.hbase.index.write.recovery.PerRegionIndexWriteCache; import org.apache.phoenix.hbase.index.write.recovery.StoreFailuresInCachePolicy; import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PTable; @@ -194,6 +195,7 @@ public class Indexer extends BaseRegionObserver { private long slowPostOpenThreshold; private long slowPreIncrementThreshold; private int rowLockWaitDuration; + private Configuration compactionConfig; public static final String RecoveryFailurePolicyKeyForTesting = INDEX_RECOVERY_FAILURE_POLICY_KEY; @@ -250,6 +252,15 @@ public class Indexer extends BaseRegionObserver { this.metricSource = MetricsIndexerSourceFactory.getInstance().create(); setSlowThresholds(e.getConfiguration()); + compactionConfig = PropertiesUtil.cloneConfig(e.getConfiguration()); + // lower the number of rpc retries, so we don't hang the compaction + compactionConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + e.getConfiguration().getInt(QueryServices.METADATA_WRITE_RETRIES_NUMBER, + QueryServicesOptions.DEFAULT_METADATA_WRITE_RETRIES_NUMBER)); + compactionConfig.setInt(HConstants.HBASE_CLIENT_PAUSE, + e.getConfiguration().getInt(QueryServices.METADATA_WRITE_RETRY_PAUSE, + QueryServicesOptions.DEFAULT_METADATA_WRITE_RETRY_PAUSE)); + try { // get the specified failure policy. We only ever override it in tests, but we need to do it // here @@ -868,12 +879,13 @@ public class Indexer extends BaseRegionObserver { public Void run() throws Exception { String fullTableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString(); try { - PhoenixConnection conn = QueryUtil.getConnectionOnServer(c.getEnvironment().getConfiguration()).unwrap(PhoenixConnection.class); + PhoenixConnection conn = QueryUtil.getConnectionOnServer(compactionConfig).unwrap(PhoenixConnection.class); PTable table = PhoenixRuntime.getTableNoCache(conn, fullTableName); // FIXME: we may need to recurse into children of this table too for (PTable index : table.getIndexes()) { if (index.getIndexDisableTimestamp() != 0) { try { + LOG.info("Major compaction running while index on table is disabled. Clearing index disable timestamp: " + fullTableName); IndexUtil.updateIndexState(conn, index.getName().getString(), PIndexState.DISABLE, Long.valueOf(0L)); } catch (SQLException e) { LOG.warn("Unable to permanently disable index " + index.getName().getString(), e); http://git-wip-us.apache.org/repos/asf/phoenix/blob/63a409a4/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index ab5f51a..83887b6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -161,6 +161,10 @@ public interface QueryServices extends SQLCloseable { public static final String METADATA_PRIOIRTY_ATTRIB = "phoenix.metadata.rpc.priority"; public static final String ALLOW_LOCAL_INDEX_ATTRIB = "phoenix.index.allowLocalIndex"; + // Retries when doing server side writes to SYSTEM.CATALOG + public static final String METADATA_WRITE_RETRIES_NUMBER = "phoenix.metadata.rpc.retries.number"; + public static final String METADATA_WRITE_RETRY_PAUSE = "phoenix.metadata.rpc.pause"; + // Config parameters for for configuring tracing public static final String TRACING_FREQ_ATTRIB = "phoenix.trace.frequency"; public static final String TRACING_PAGE_SIZE_ATTRIB = "phoenix.trace.read.pagesize"; http://git-wip-us.apache.org/repos/asf/phoenix/blob/63a409a4/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index b9a578f..5896dae 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -195,6 +195,11 @@ public class QueryServicesOptions { public static final int DEFAULT_INDEX_HANDLER_COUNT = 30; public static final int DEFAULT_METADATA_HANDLER_COUNT = 30; + // Retries when doing server side writes to SYSTEM.CATALOG + // 20 retries with 100 pause = 230 seconds total retry time + public static final int DEFAULT_METADATA_WRITE_RETRIES_NUMBER = 20; + public static final int DEFAULT_METADATA_WRITE_RETRY_PAUSE = 100; + public static final int DEFAULT_TRACING_PAGE_SIZE = 100; /** * Configuration key to overwrite the tablename that should be used as the target table