Repository: hbase Updated Branches: refs/heads/branch-1.3 efa6620a1 -> 9c13e0daf
HBASE-15470 Add a setting for Priority queue length Summary: Move the config keys to one place Make Two different config keys. One for default, one for priority Test Plan: unit tests Differential Revision: https://reviews.facebook.net/D55575 Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9c13e0da Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9c13e0da Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9c13e0da Branch: refs/heads/branch-1.3 Commit: 9c13e0daffcf489f91755b1f93aff0fa23b51aff Parents: efa6620 Author: Elliott Clark <[email protected]> Authored: Wed Mar 16 10:13:46 2016 -0700 Committer: Elliott Clark <[email protected]> Committed: Wed Mar 16 16:52:27 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java | 3 +-- .../main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java | 7 ++++++- .../main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java | 5 +++++ .../org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java | 9 +++++++-- 4 files changed, 19 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/9c13e0da/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java index b069a5a..ee36f3f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java @@ -32,7 +32,6 @@ import java.util.concurrent.atomic.AtomicInteger; * This can be used for HMaster, where no prioritization is needed. */ public class FifoRpcScheduler extends RpcScheduler { - private final int handlerCount; private final int maxQueueLength; private final AtomicInteger queueSize = new AtomicInteger(0); @@ -40,7 +39,7 @@ public class FifoRpcScheduler extends RpcScheduler { public FifoRpcScheduler(Configuration conf, int handlerCount) { this.handlerCount = handlerCount; - this.maxQueueLength = conf.getInt("hbase.ipc.server.max.callqueue.length", + this.maxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); } http://git-wip-us.apache.org/repos/asf/hbase/blob/9c13e0da/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java index 22cb195..40c11aa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java @@ -43,6 +43,7 @@ public abstract class RpcExecutor { private static final Log LOG = LogFactory.getLog(RpcExecutor.class); protected static final int DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT = 250; + protected volatile int currentQueueLimit; private final AtomicInteger activeHandlerCount = new AtomicInteger(0); @@ -219,6 +220,10 @@ public abstract class RpcExecutor { * @param conf updated configuration */ public void resizeQueues(Configuration conf) { - currentQueueLimit = conf.getInt("hbase.ipc.server.max.callqueue.length", currentQueueLimit); + String configKey = RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH; + if (name != null && name.toLowerCase().contains("priority")) { + configKey = RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH; + } + currentQueueLimit = conf.getInt(configKey, currentQueueLimit); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/9c13e0da/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java index 50886cb..2414e3d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java @@ -31,6 +31,11 @@ import java.net.InetSocketAddress; @InterfaceStability.Evolving public abstract class RpcScheduler { + public static final String IPC_SERVER_MAX_CALLQUEUE_LENGTH = + "hbase.ipc.server.max.callqueue.length"; + public static final String IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH = + "hbase.ipc.server.priority.max.callqueue.length"; + /** Exposes runtime information of a {@code RpcServer} that a {@code RpcScheduler} may need. */ static abstract class Context { public abstract InetSocketAddress getListenerAddress(); http://git-wip-us.apache.org/repos/asf/hbase/blob/9c13e0da/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index 12ee540..0cd34bb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -166,8 +166,12 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs PriorityFunction priority, Abortable server, int highPriorityLevel) { - int maxQueueLength = conf.getInt("hbase.ipc.server.max.callqueue.length", + + int maxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); + int maxPriorityQueueLength = + conf.getInt(RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH, maxQueueLength); + this.priority = priority; this.highPriorityLevel = highPriorityLevel; this.abortable = server; @@ -226,7 +230,8 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs // Create 2 queues to help priorityExecutor be more scalable. this.priorityExecutor = priorityHandlerCount > 0 ? - new BalancedQueueRpcExecutor("Priority", priorityHandlerCount, 2, maxQueueLength) : null; + new BalancedQueueRpcExecutor("Priority", priorityHandlerCount, 2, maxPriorityQueueLength) : + null; this.replicationExecutor = replicationHandlerCount > 0 ? new BalancedQueueRpcExecutor("Replication",
