PHOENIX-4685 Properly handle connection caching for Phoenix inside RegionServers(Rajeshbabu)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/dde1054f Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/dde1054f Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/dde1054f Branch: refs/heads/4.x-cdh5.13 Commit: dde1054f34fe1784447307600d0949f35e6f475a Parents: 8ed7eb0 Author: Rajeshbabu Chintaguntla <rajeshb...@apache.org> Authored: Tue May 8 09:26:41 2018 +0530 Committer: James Taylor <jtay...@salesforce.com> Committed: Wed May 9 13:27:36 2018 -0700 ---------------------------------------------------------------------- .../DelegateRegionCoprocessorEnvironment.java | 7 +- .../UngroupedAggregateRegionObserver.java | 14 +- .../org/apache/phoenix/hbase/index/Indexer.java | 19 +-- .../hbase/index/write/IndexWriterUtils.java | 27 +--- .../index/PhoenixTransactionalIndexer.java | 18 +-- .../org/apache/phoenix/util/ServerUtil.java | 141 ++++++++++++++++--- 6 files changed, 142 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/dde1054f/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionCoprocessorEnvironment.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionCoprocessorEnvironment.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionCoprocessorEnvironment.java index 284d53c..a791f4a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionCoprocessorEnvironment.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionCoprocessorEnvironment.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.phoenix.hbase.index.table.HTableFactory; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.util.ServerUtil; +import org.apache.phoenix.util.ServerUtil.ConnectionType; /** * Class to encapsulate {@link RegionCoprocessorEnvironment} for phoenix coprocessors. Often we @@ -44,10 +45,10 @@ public class DelegateRegionCoprocessorEnvironment implements RegionCoprocessorEn private RegionCoprocessorEnvironment delegate; private HTableFactory tableFactory; - public DelegateRegionCoprocessorEnvironment(Configuration config, RegionCoprocessorEnvironment delegate) { - this.config = config; + public DelegateRegionCoprocessorEnvironment(RegionCoprocessorEnvironment delegate, ConnectionType connectionType) { + this.config = ServerUtil.ConnectionFactory.getTypeSpecificConfiguration(connectionType, delegate.getConfiguration()); this.delegate = delegate; - this.tableFactory = ServerUtil.getDelegateHTableFactory(this, config); + this.tableFactory = ServerUtil.getDelegateHTableFactory(this, connectionType); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/dde1054f/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 6bee65c..14213f4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -144,6 +144,7 @@ import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.ServerUtil; +import org.apache.phoenix.util.ServerUtil.ConnectionType; import org.apache.phoenix.util.StringUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -225,14 +226,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver upsertSelectConfig.setClass(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, InterRegionServerIndexRpcControllerFactory.class, RpcControllerFactory.class); - compactionConfig = PropertiesUtil.cloneConfig(e.getConfiguration()); - // lower the number of rpc retries, so we don't hang the compaction - compactionConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, - e.getConfiguration().getInt(QueryServices.METADATA_WRITE_RETRIES_NUMBER, - QueryServicesOptions.DEFAULT_METADATA_WRITE_RETRIES_NUMBER)); - compactionConfig.setInt(HConstants.HBASE_CLIENT_PAUSE, - e.getConfiguration().getInt(QueryServices.METADATA_WRITE_RETRY_PAUSE, - QueryServicesOptions.DEFAULT_METADATA_WRITE_RETRY_PAUSE)); + compactionConfig = ServerUtil.getCompactionConfig(e.getConfiguration()); // For retries of index write failures, use the same # of retries as the rebuilder indexWriteConfig = PropertiesUtil.cloneConfig(e.getConfiguration()); @@ -984,7 +978,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver InternalScanner internalScanner = scanner; try { long clientTimeStamp = EnvironmentEdgeManager.currentTimeMillis(); - DelegateRegionCoprocessorEnvironment compactionConfEnv = new DelegateRegionCoprocessorEnvironment(compactionConfig, c.getEnvironment()); + DelegateRegionCoprocessorEnvironment compactionConfEnv = + new DelegateRegionCoprocessorEnvironment(c.getEnvironment(), + ConnectionType.COMPACTION_CONNECTION); StatisticsCollector stats = StatisticsCollectorFactory.createStatisticsCollector( compactionConfEnv, table.getNameAsString(), clientTimeStamp, store.getFamily().getName()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/dde1054f/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java index 1ef09fe..e4e6991 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java @@ -93,6 +93,7 @@ import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; +import org.apache.phoenix.util.ServerUtil.ConnectionType; import com.google.common.collect.Lists; import com.google.common.collect.Multimap; @@ -220,25 +221,11 @@ public class Indexer extends BaseRegionObserver { this.builder = new IndexBuildManager(env); // Clone the config since it is shared - Configuration clonedConfig = PropertiesUtil.cloneConfig(e.getConfiguration()); - /* - * Set the rpc controller factory so that the HTables used by IndexWriter would - * set the correct priorities on the remote RPC calls. - */ - clonedConfig.setClass(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, - InterRegionServerIndexRpcControllerFactory.class, RpcControllerFactory.class); - // lower the number of rpc retries. We inherit config from HConnectionManager#setServerSideHConnectionRetries, - // which by default uses a multiplier of 10. That is too many retries for our synchronous index writes - clonedConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, - env.getConfiguration().getInt(INDEX_WRITER_RPC_RETRIES_NUMBER, - DEFAULT_INDEX_WRITER_RPC_RETRIES_NUMBER)); - clonedConfig.setInt(HConstants.HBASE_CLIENT_PAUSE, env.getConfiguration() - .getInt(INDEX_WRITER_RPC_PAUSE, DEFAULT_INDEX_WRITER_RPC_PAUSE)); - DelegateRegionCoprocessorEnvironment indexWriterEnv = new DelegateRegionCoprocessorEnvironment(clonedConfig, env); + DelegateRegionCoprocessorEnvironment indexWriterEnv = new DelegateRegionCoprocessorEnvironment(env, ConnectionType.INDEX_WRITER_CONNECTION); // setup the actual index writer this.writer = new IndexWriter(indexWriterEnv, serverName + "-index-writer"); - this.rowLockWaitDuration = clonedConfig.getInt("hbase.rowlock.wait.duration", + this.rowLockWaitDuration = env.getConfiguration().getInt("hbase.rowlock.wait.duration", DEFAULT_ROWLOCK_WAIT_DURATION); this.lockManager = new LockManager(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/dde1054f/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java index 0d3004f..ef53b9f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java @@ -26,6 +26,7 @@ import org.apache.phoenix.hbase.index.table.HTableFactory; import org.apache.phoenix.hbase.index.util.IndexManagementUtil; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.ServerUtil; +import org.apache.phoenix.util.ServerUtil.ConnectionType; public class IndexWriterUtils { @@ -50,9 +51,9 @@ public class IndexWriterUtils { * threads. Currently, HBase doesn't support a custom thread-pool to back the HTable via the * coprocesor hooks, so we can't modify this behavior. */ - private static final String INDEX_WRITER_PER_TABLE_THREADS_CONF_KEY = + public static final String INDEX_WRITER_PER_TABLE_THREADS_CONF_KEY = "index.writer.threads.pertable.max"; - private static final int DEFAULT_NUM_PER_TABLE_THREADS = Integer.MAX_VALUE; + public static final int DEFAULT_NUM_PER_TABLE_THREADS = Integer.MAX_VALUE; /** Configuration key that HBase uses to set the max number of threads for an HTable */ public static final String HTABLE_THREAD_KEY = "hbase.htable.threads.max"; @@ -79,19 +80,7 @@ public class IndexWriterUtils { } public static HTableFactory getDefaultDelegateHTableFactory(CoprocessorEnvironment env) { - // create a simple delegate factory, setup the way we need - Configuration conf = PropertiesUtil.cloneConfig(env.getConfiguration()); - setHTableThreads(conf); - return ServerUtil.getDelegateHTableFactory(env, conf); - } - - private static void setHTableThreads(Configuration conf) { - // set the number of threads allowed per table. - int htableThreads = - conf.getInt(IndexWriterUtils.INDEX_WRITER_PER_TABLE_THREADS_CONF_KEY, - IndexWriterUtils.DEFAULT_NUM_PER_TABLE_THREADS); - LOG.trace("Creating HTableFactory with " + htableThreads + " threads for each HTable."); - IndexManagementUtil.setIfNotSet(conf, HTABLE_THREAD_KEY, htableThreads); + return ServerUtil.getDelegateHTableFactory(env, ConnectionType.INDEX_WRITER_CONNECTION_WITH_CUSTOM_THREADS); } /** @@ -99,12 +88,8 @@ public class IndexWriterUtils { * instead to avoid tying up the handler */ public static HTableFactory getNoRetriesHTableFactory(CoprocessorEnvironment env) { - Configuration conf = PropertiesUtil.cloneConfig(env.getConfiguration()); - setHTableThreads(conf); - // note in HBase 2+, numTries = numRetries + 1 - // in prior versions, numTries = numRetries - conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); - return ServerUtil.getDelegateHTableFactory(env, conf); + return ServerUtil.getDelegateHTableFactory(env, + ConnectionType.INDEX_WRITER_CONNECTION_WITH_CUSTOM_THREADS_NO_RETRIES); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/dde1054f/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java index 610ea44..0893bf6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java @@ -55,6 +55,7 @@ import org.apache.phoenix.trace.util.NullSpan; import org.apache.phoenix.transaction.PhoenixTransactionContext; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.ServerUtil; +import org.apache.phoenix.util.ServerUtil.ConnectionType; /** * Do all the work of managing local index updates for a transactional table from a single coprocessor. Since the transaction @@ -90,22 +91,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { Configuration conf = e.getConfiguration(); String serverName = env.getRegionServerServices().getServerName().getServerName(); codec = new PhoenixIndexCodec(conf, env.getRegion().getRegionInfo().getStartKey(), env.getRegion().getRegionInfo().getEndKey(), env.getRegionInfo().getTable().getName()); - // Clone the config since it is shared - Configuration clonedConfig = PropertiesUtil.cloneConfig(e.getConfiguration()); - /* - * Set the rpc controller factory so that the HTables used by IndexWriter would - * set the correct priorities on the remote RPC calls. - */ - clonedConfig.setClass(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, - InterRegionServerIndexRpcControllerFactory.class, RpcControllerFactory.class); - // lower the number of rpc retries. We inherit config from HConnectionManager#setServerSideHConnectionRetries, - // which by default uses a multiplier of 10. That is too many retries for our synchronous index writes - clonedConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, - env.getConfiguration().getInt(INDEX_WRITER_RPC_RETRIES_NUMBER, - DEFAULT_INDEX_WRITER_RPC_RETRIES_NUMBER)); - clonedConfig.setInt(HConstants.HBASE_CLIENT_PAUSE, env.getConfiguration() - .getInt(INDEX_WRITER_RPC_PAUSE, DEFAULT_INDEX_WRITER_RPC_PAUSE)); - DelegateRegionCoprocessorEnvironment indexWriterEnv = new DelegateRegionCoprocessorEnvironment(clonedConfig, env); + DelegateRegionCoprocessorEnvironment indexWriterEnv = new DelegateRegionCoprocessorEnvironment(env, ConnectionType.INDEX_WRITER_CONNECTION); // setup the actual index writer // For transactional tables, we keep the index active upon a write failure // since we have the all versus none behavior for transactions. Also, we http://git-wip-us.apache.org/repos/asf/phoenix/blob/dde1054f/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java index 4b3cc43..2dab076 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java @@ -17,11 +17,17 @@ */ package org.apache.phoenix.util; +import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.DEFAULT_INDEX_WRITER_RPC_PAUSE; +import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.DEFAULT_INDEX_WRITER_RPC_RETRIES_NUMBER; +import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITER_RPC_PAUSE; +import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITER_RPC_RETRIES_NUMBER; + import java.io.IOException; import java.sql.SQLException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; +import java.util.function.Function; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -35,12 +41,15 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.CoprocessorHConnection; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.HTablePool; import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.RegionServerServices; @@ -52,8 +61,13 @@ import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.hbase.index.table.CoprocessorHTableFactory; import org.apache.phoenix.hbase.index.table.HTableFactory; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.hbase.index.util.IndexManagementUtil; import org.apache.phoenix.hbase.index.util.VersionUtil; +import org.apache.phoenix.hbase.index.write.IndexWriterUtils; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.StaleRegionBoundaryCacheException; +import org.jboss.netty.util.internal.ConcurrentHashMap; @SuppressWarnings("deprecation") @@ -269,12 +283,12 @@ public class ServerUtil { endKey) < 0)); } - public static HTableFactory getDelegateHTableFactory(CoprocessorEnvironment env, Configuration conf) { + public static HTableFactory getDelegateHTableFactory(CoprocessorEnvironment env, ConnectionType connectionType) { if (env instanceof RegionCoprocessorEnvironment) { RegionCoprocessorEnvironment e = (RegionCoprocessorEnvironment) env; RegionServerServices services = e.getRegionServerServices(); if (services instanceof HRegionServer) { - return new CoprocessorHConnectionTableFactory(conf, (HRegionServer) services); + return new CoprocessorHConnectionTableFactory(env.getConfiguration(), (HRegionServer) services, connectionType); } } return new CoprocessorHTableFactory(env); @@ -286,44 +300,133 @@ public class ServerUtil { * https://issues.apache.org/jira/browse/HBASE-18359 */ public static class CoprocessorHConnectionTableFactory implements HTableFactory { - @GuardedBy("CoprocessorHConnectionTableFactory.this") - private HConnection connection; private final Configuration conf; private final HRegionServer server; + private final ConnectionType connectionType; - CoprocessorHConnectionTableFactory(Configuration conf, HRegionServer server) { + CoprocessorHConnectionTableFactory(Configuration conf, HRegionServer server, ConnectionType connectionType) { this.conf = conf; this.server = server; + this.connectionType = connectionType; } - private synchronized HConnection getConnection(Configuration conf) throws IOException { - if (connection == null || connection.isClosed()) { - connection = new CoprocessorHConnection(conf, server); - } - return connection; + private ClusterConnection getConnection() throws IOException { + return ConnectionFactory.getConnection(connectionType, conf, server); } @Override public HTableInterface getTable(ImmutableBytesPtr tablename) throws IOException { - return getConnection(conf).getTable(tablename.copyBytesIfNecessary()); + return getConnection().getTable(tablename.copyBytesIfNecessary()); } @Override public synchronized void shutdown() { - try { - if (connection != null && !connection.isClosed()) { - connection.close(); - } - } catch (Throwable e) { - LOG.warn("Error while trying to close the HConnection used by CoprocessorHConnectionTableFactory", e); - } + // We need not close the cached connections as they are shared across the server. } @Override public HTableInterface getTable(ImmutableBytesPtr tablename, ExecutorService pool) throws IOException { - return getConnection(conf).getTable(tablename.copyBytesIfNecessary(), pool); + return getConnection().getTable(tablename.copyBytesIfNecessary(), pool); + } + } + + public static enum ConnectionType { + COMPACTION_CONNECTION, + INDEX_WRITER_CONNECTION, + INDEX_WRITER_CONNECTION_WITH_CUSTOM_THREADS, + INDEX_WRITER_CONNECTION_WITH_CUSTOM_THREADS_NO_RETRIES, + DEFAULT_SERVER_CONNECTION; + } + + public static class ConnectionFactory { + + private static Map<ConnectionType, ClusterConnection> connections = + new ConcurrentHashMap<ConnectionType, ClusterConnection>(); + + public static ClusterConnection getConnection(final ConnectionType connectionType, final Configuration conf, final HRegionServer server) throws IOException { + ClusterConnection connection = null; + if((connection = connections.get(connectionType)) == null) { + synchronized (CoprocessorHConnectionTableFactory.class) { + if(connections.get(connectionType) == null) { + connection = new CoprocessorHConnection(conf, server); + connections.put(connectionType, connection); + return connection; + } + } + } + return connection; } + + public static Configuration getTypeSpecificConfiguration(ConnectionType connectionType, Configuration conf) { + switch (connectionType) { + case COMPACTION_CONNECTION: + return getCompactionConfig(conf); + case DEFAULT_SERVER_CONNECTION: + return conf; + case INDEX_WRITER_CONNECTION: + return getIndexWriterConnection(conf); + case INDEX_WRITER_CONNECTION_WITH_CUSTOM_THREADS: + return getIndexWriterConfigurationWithCustomThreads(conf); + case INDEX_WRITER_CONNECTION_WITH_CUSTOM_THREADS_NO_RETRIES: + return getNoRetriesIndexWriterConfigurationWithCustomThreads(conf); + default: + return conf; + } + } + } + + public static Configuration getCompactionConfig(Configuration conf) { + Configuration compactionConfig = PropertiesUtil.cloneConfig(conf); + // lower the number of rpc retries, so we don't hang the compaction + compactionConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + conf.getInt(QueryServices.METADATA_WRITE_RETRIES_NUMBER, + QueryServicesOptions.DEFAULT_METADATA_WRITE_RETRIES_NUMBER)); + compactionConfig.setInt(HConstants.HBASE_CLIENT_PAUSE, + conf.getInt(QueryServices.METADATA_WRITE_RETRY_PAUSE, + QueryServicesOptions.DEFAULT_METADATA_WRITE_RETRY_PAUSE)); + return compactionConfig; + } + + public static Configuration getIndexWriterConnection(Configuration conf) { + Configuration clonedConfig = PropertiesUtil.cloneConfig(conf); + /* + * Set the rpc controller factory so that the HTables used by IndexWriter would + * set the correct priorities on the remote RPC calls. + */ + clonedConfig.setClass(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, + InterRegionServerIndexRpcControllerFactory.class, RpcControllerFactory.class); + // lower the number of rpc retries. We inherit config from HConnectionManager#setServerSideHConnectionRetries, + // which by default uses a multiplier of 10. That is too many retries for our synchronous index writes + clonedConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + conf.getInt(INDEX_WRITER_RPC_RETRIES_NUMBER, + DEFAULT_INDEX_WRITER_RPC_RETRIES_NUMBER)); + clonedConfig.setInt(HConstants.HBASE_CLIENT_PAUSE, conf + .getInt(INDEX_WRITER_RPC_PAUSE, DEFAULT_INDEX_WRITER_RPC_PAUSE)); + return clonedConfig; + } + + public static Configuration getIndexWriterConfigurationWithCustomThreads(Configuration conf) { + Configuration clonedConfig = PropertiesUtil.cloneConfig(conf); + setHTableThreads(clonedConfig); + return clonedConfig; + } + + private static void setHTableThreads(Configuration conf) { + // set the number of threads allowed per table. + int htableThreads = + conf.getInt(IndexWriterUtils.INDEX_WRITER_PER_TABLE_THREADS_CONF_KEY, + IndexWriterUtils.DEFAULT_NUM_PER_TABLE_THREADS); + IndexManagementUtil.setIfNotSet(conf, IndexWriterUtils.HTABLE_THREAD_KEY, htableThreads); + } + + public static Configuration getNoRetriesIndexWriterConfigurationWithCustomThreads(Configuration conf) { + Configuration clonedConf = getIndexWriterConfigurationWithCustomThreads(conf); + // note in HBase 2+, numTries = numRetries + 1 + // in prior versions, numTries = numRetries + clonedConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); + return clonedConf; + } }