HBASE-15306 Make RPC call queue length dynamically configurable
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f47dba74 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f47dba74 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f47dba74 Branch: refs/heads/hbase-12439 Commit: f47dba74d498d5d39f124ad8ea5723c437acbc85 Parents: 58283fa Author: Mikhail Antonov <anto...@apache.org> Authored: Tue Feb 23 14:20:40 2016 -0800 Committer: Mikhail Antonov <anto...@apache.org> Committed: Tue Feb 23 14:20:40 2016 -0800 ---------------------------------------------------------------------- .../hbase/ipc/BalancedQueueRpcExecutor.java | 11 +++++- .../hadoop/hbase/ipc/RWQueueRpcExecutor.java | 19 +++++++++- .../apache/hadoop/hbase/ipc/RpcExecutor.java | 11 ++++++ .../org/apache/hadoop/hbase/ipc/RpcServer.java | 3 ++ .../hadoop/hbase/ipc/SimpleRpcScheduler.java | 18 ++++++++- .../hbase/ipc/TestSimpleRpcScheduler.java | 39 ++++++++++++++++++++ 6 files changed, 97 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/f47dba74/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java index 79b4ec8..e4205eb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java @@ -66,6 +66,10 @@ public class BalancedQueueRpcExecutor extends RpcExecutor { protected void initializeQueues(final int numQueues, final Class<? extends BlockingQueue> queueClass, Object... initargs) { + if (initargs.length > 0) { + currentQueueLimit = (int) initargs[0]; + initargs[0] = Math.max((int) initargs[0], DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT); + } for (int i = 0; i < numQueues; ++i) { queues.add((BlockingQueue<CallRunner>) ReflectionUtils.newInstance(queueClass, initargs)); } @@ -74,7 +78,12 @@ public class BalancedQueueRpcExecutor extends RpcExecutor { @Override public boolean dispatch(final CallRunner callTask) throws InterruptedException { int queueIndex = balancer.getNextQueue(); - return queues.get(queueIndex).offer(callTask); + BlockingQueue<CallRunner> queue = queues.get(queueIndex); + // that means we can overflow by at most <num reader> size (5), that's ok + if (queue.size() >= currentQueueLimit) { + return false; + } + return queue.offer(callTask); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/f47dba74/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java index 544370d..a9648b0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java @@ -139,12 +139,22 @@ public class RWQueueRpcExecutor extends RpcExecutor { " readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount + ((numScanQueues == 0) ? "" : " scanQueues=" + numScanQueues + " scanHandlers=" + scanHandlersCount)); - + if (writeQueueInitArgs.length > 0) { + currentQueueLimit = (int) writeQueueInitArgs[0]; + writeQueueInitArgs[0] = Math.max((int) writeQueueInitArgs[0], + DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT); + } for (int i = 0; i < numWriteQueues; ++i) { + queues.add((BlockingQueue<CallRunner>) ReflectionUtils.newInstance(writeQueueClass, writeQueueInitArgs)); } + if (readQueueInitArgs.length > 0) { + currentQueueLimit = (int) readQueueInitArgs[0]; + readQueueInitArgs[0] = Math.max((int) readQueueInitArgs[0], + DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT); + } for (int i = 0; i < (numReadQueues + numScanQueues); ++i) { queues.add((BlockingQueue<CallRunner>) ReflectionUtils.newInstance(readQueueClass, readQueueInitArgs)); @@ -170,7 +180,12 @@ public class RWQueueRpcExecutor extends RpcExecutor { } else { queueIndex = numWriteQueues + readBalancer.getNextQueue(); } - return queues.get(queueIndex).offer(callTask); + + BlockingQueue<CallRunner> queue = queues.get(queueIndex); + if (queue.size() >= currentQueueLimit) { + return false; + } + return queue.offer(callTask); } private boolean isWriteRequest(final RequestHeader header, final Message param) { http://git-wip-us.apache.org/repos/asf/hbase/blob/f47dba74/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java index 017bf39..22cb195 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java @@ -42,6 +42,9 @@ import com.google.common.base.Strings; public abstract class RpcExecutor { private static final Log LOG = LogFactory.getLog(RpcExecutor.class); + protected static final int DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT = 250; + protected volatile int currentQueueLimit; + private final AtomicInteger activeHandlerCount = new AtomicInteger(0); private final List<Thread> handlers; private final int handlerCount; @@ -210,4 +213,12 @@ public abstract class RpcExecutor { return ThreadLocalRandom.current().nextInt(queueSize); } } + + /** + * Update current soft limit for executor's call queues + * @param conf updated configuration + */ + public void resizeQueues(Configuration conf) { + currentQueueLimit = conf.getInt("hbase.ipc.server.max.callqueue.length", currentQueueLimit); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/f47dba74/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 58fc598..6ddfb9a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -2099,6 +2099,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { @Override public void onConfigurationChange(Configuration newConf) { initReconfigurable(newConf); + if (scheduler instanceof ConfigurationObserver) { + ((ConfigurationObserver)scheduler).onConfigurationChange(newConf); + } } private void initReconfigurable(Configuration confToLoad) { http://git-wip-us.apache.org/repos/asf/hbase/blob/f47dba74/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index 8de714d..0003254 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue; /** @@ -36,7 +37,7 @@ import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue; */ @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) @InterfaceStability.Evolving -public class SimpleRpcScheduler extends RpcScheduler { +public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObserver { private static final Log LOG = LogFactory.getLog(SimpleRpcScheduler.class); public static final String CALL_QUEUE_READ_SHARE_CONF_KEY = @@ -56,6 +57,21 @@ public class SimpleRpcScheduler extends RpcScheduler { = "hbase.ipc.server.queue.max.call.delay"; /** + * Resize call queues; + * @param conf new configuration + */ + @Override + public void onConfigurationChange(Configuration conf) { + callExecutor.resizeQueues(conf); + if (priorityExecutor != null) { + priorityExecutor.resizeQueues(conf); + } + if (replicationExecutor != null) { + replicationExecutor.resizeQueues(conf); + } + } + + /** * Comparator used by the "normal callQueue" if DEADLINE_CALL_QUEUE_CONF_KEY is set to true. * It uses the calculated "deadline" e.g. to deprioritize long-running job * http://git-wip-us.apache.org/repos/asf/hbase/blob/f47dba74/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java index db992cd..66032e9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java @@ -56,7 +56,9 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.eq; @@ -323,4 +325,41 @@ public class TestSimpleRpcScheduler { } }).when(callTask).run(); } + + @Test + public void testSoftAndHardQueueLimits() throws Exception { + Configuration schedConf = HBaseConfiguration.create(); + + schedConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 0); + schedConf.setInt("hbase.ipc.server.max.callqueue.length", 5); + + PriorityFunction priority = mock(PriorityFunction.class); + when(priority.getPriority(any(RequestHeader.class), any(Message.class), + any(User.class))).thenReturn(HConstants.NORMAL_QOS); + SimpleRpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 0, 0, 0, priority, + HConstants.QOS_THRESHOLD); + try { + scheduler.start(); + + CallRunner putCallTask = mock(CallRunner.class); + RpcServer.Call putCall = mock(RpcServer.Call.class); + putCall.param = RequestConverter.buildMutateRequest( + Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))); + RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build(); + when(putCallTask.getCall()).thenReturn(putCall); + when(putCall.getHeader()).thenReturn(putHead); + + assertTrue(scheduler.dispatch(putCallTask)); + + schedConf.setInt("hbase.ipc.server.max.callqueue.length", 0); + scheduler.onConfigurationChange(schedConf); + assertFalse(scheduler.dispatch(putCallTask)); + + schedConf.setInt("hbase.ipc.server.max.callqueue.length", 1); + scheduler.onConfigurationChange(schedConf); + assertTrue(scheduler.dispatch(putCallTask)); + } finally { + scheduler.stop(); + } + } }